一、flink-connector-kakfa
1. kafka配置文件
kafka jaas必须配置,如果缺少,则报一下错误。
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
对于Flink只能通过配置java.security.auth.login.config的方式。
jaas配置
1.1 方式一:
System.setProperty配置系统变量:
System.setProperty("java.security.auth.login.config", "D:\configs\kafka_client_jaas_keytab.conf");
kafka_client_jaas_keytab.conf文件内容如下:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab = true useTicketCache=false storeKey = true keyTab="D://configs//xxx.keytab" principal="xxx@XXXXXX.COM" serviceName="kafka"; };
1.2 方法二:在IDEA中添加jvm参数:
-Djava.security.auth.login.config=D:\configs\kafka_client_jaas_keytab.conf
注意:将参数添加至kafka 的properties中是错误的。如下:
Properties properties = new Properties(); properties.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf"); FlinkKafkaProducerproducer = new FlinkKafkaProducer<>(topic, simpleStringSchema, properties);
2 配置Flink kerberos
2.1 Idea中配置jvm环境变量
idea配置
-Dsecurity.kerberos.krb5-conf.path=D:\configs\krb5.conf -Dsecurity.kerberos.login.keytab=D:\configs\xxx.keytab -Dsecurity.kerberos.login.principal=xxx@XXXXXX.COM
2.2 传递stream env
直接传递参数给flink StreamExecutionEnvironment
Properties flinkProps = new Properties(); flinkProps.setProperty("security.kerberos.krb5-conf.path", "D:\configs\krb5.conf"); flinkProps.setProperty("security.kerberos.login.keytab", "D:\configs\xxx.keytab"); flinkProps.setProperty("security.kerberos.login.principal", "xxx@XXXXXX.COM"); flinkProps.setProperty("security.kerberos.login.contexts", "Client,KafkaClient"); flinkProps.setProperty("state.backend", "hashmap"); // Configuration flinkConfig = ConfigUtils.getFlinkConfig(); Configuration flinkConfig = new Configuration(); flinkConfig.addAllToProperties(flinkProps); StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig);
3. 查看是否连接成功
kafka连接成功可以看到如下日志内容:
09:38:26.473 [Sink: Unnamed (6/8)#0] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. ... ... 09:38:27.534 [kafka-producer-network-thread | producer-3] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-3] Cluster ID: vj0AfElIS12S0Cp0WDBU7Q ... ... 09:38:27.618 [kafka-kerberos-refresh-thread-xxx@XXXXXX.COM] WARN org.apache.kafka.common.security.kerberos.KerberosLogin - [Principal=xxx@XXXXXX.COM]: TGT renewal thread has been interrupted and will exit.
4. 配置成cache是不行的。
注意:设置成如下cache格式的,是不行的。
虽然flink已经设置了kerberos的principal和keytab 。
System.setProperty("java.security.auth.login.config", "D:\configs\kafka_client_jaas_cache.conf");
kafka_client_jaas_cache.conf文件内容:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka"; };
会报如下错误:
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
附代码:
@Test public void testWrite() throws Exception { // jvm配置:-Dsecurity.kerberos.krb5-conf.path=D:\configs\krb5.conf -Dsecurity.kerberos.login.keytab=D:\configs\xxx.keytab -Dsecurity.kerberos.login.principal=xxx@XXXXXX.COM // System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf"); Properties flinkProps = new Properties(); flinkProps.setProperty("security.kerberos.krb5-conf.path", "D:\\configs\\krb5.conf"); flinkProps.setProperty("security.kerberos.login.keytab", "D:\\configs\\xxx.keytab"); flinkProps.setProperty("security.kerberos.login.principal", "xxx@XXXXXX.COM"); flinkProps.setProperty("security.kerberos.login.contexts", "Client,KafkaClient"); flinkProps.setProperty("state.backend", "hashmap"); // Configuration flinkConfig = ConfigUtils.getFlinkConfig(); Configuration flinkConfig = new Configuration(); flinkConfig.addAllToProperties(flinkProps); StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); Properties properties = new Properties(); properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667"); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI"); properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka"); // flink-connector-kafka api中错误配置jaas的方法:properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG,String.format(JAAS_CONFIG_KEYTAB_TEMPLATE, "D:\\configs\\xxx.keytab", "xxx@XXXXXX.COM")); String topic = "flinkcdc"; SimpleStringSchema simpleStringSchema = new SimpleStringSchema(); FlinkKafkaProducerproducer = new FlinkKafkaProducer<>(topic, simpleStringSchema, properties); senv.fromElements("hello world", "coming again").addSink(producer); senv.execute("test"); }
二、kafka-client方式
1. kafka 的jaas配置
配置 java的 java.security.auth.login.config 或者 kafka 的sasl.jaas.config 都是可以的。
但注意jaas配置优先级
sasl.jaas.config > java.security.auth.login.config
所以如果配置了 sasl.jaas.config, 就会导致 java.security.auth.login.config 失效
上代码:
首先需要注意sasl.jaas.config 中的路径分隔符不能是 \\ 必须是 /:
错误的: D:\configs\kafka_client_jaas_keytab.conf 正确的: D:/configs/kafka_client_jaas_keytab.conf
private static final String JAAS_CONFIG_KEYTAB_TEMPLATE = "com.sun.security.auth.module.Krb5LoginModule required\n" + "debug=true\n" + "doNotPrompt=true\n" + "storeKey=true\n" + "useKeyTab=true\n" + "keyTab=\"%s\"\n" + "principal=\"%s\";"; @Test public void testKafkaWrite() { Properties properties = new Properties(); properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667"); properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI"); properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka"); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 以下二者选其中之一就可以了。 System.setProperty("java.security.auth.login.config", "D:\\configs\\kafka_client_jaas_keytab.conf"); // properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG, String.format(JAAS_CONFIG_KEYTAB_TEMPLATE, "D:/configs/xxx.keytab", "xxx@XXXXXX.COM")); try { KafkaProducerproducer = new KafkaProducer<>(properties); ProducerRecord record1 = new ProducerRecord<>("flinkcdc", "hello kafka"); ProducerRecord record2 = new ProducerRecord<>("flinkcdc", "coming soon"); Future f1 = producer.send(record1); Future f2 = producer.send(record2); producer.flush(); List > fs = new ArrayList<>(); fs.add(f1); fs.add(f2); for (Future future : fs) { RecordMetadata metadata = future.get(); System.out.println(metadata.toString()); } } catch (Exception e) { throw new RuntimeException(e); } }
kafka_client_jaas_keytab.conf 文件内容和flink-conector-kakfka的一样。
三、kafka console 启动命令
console producer启动命令:
bin/kafka-console-producer.sh --bootstrap-server xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667 --topic flinkcdc --producer-property security.protocol=SASL_PLAINTEXT
console consumer启动命令:
bin/kafka-console-consumer.sh --bootstrap-server xxx06.xxx.com:6667,xxx07.xxx.com:6667,xxx08.xxx.com:6667 --topic flinkcdc --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --group tester
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章