【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ
- 1)准备环境
- 2)代码实现
- 2.1.主程序
- 2.2.conf
- 2.2.1.ConfigTools
- 2.3.utils
- 2.3.1.DBConn
- 2.3.2.CommonUtils
- 2.4.function
- 2.4.1.MqSinkFunction
- 2.5.resources
- 2.5.1.appconfig.yml
- 2.5.2.log4j.properties
- 2.5.3.log4j2.xml
1)准备环境
这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。
4.0.0 org.example flink-kafka2mq 1.0-SNAPSHOT 2.3.3 3.1.1 3.0.2 2.12.10 8 8 UTF-8 1.14.6 2.12 1.8 ${target.java.version} ${target.java.version} 2.17.2 3.1.2 3.1.2 org.apache.rocketmq rocketmq-client 4.8.0 io.netty netty-all 4.1.68.Final org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} org.jyaml jyaml 1.3 gaei.cn.x5l tsp-gb-decode 1.0.0 org.apache.logging.log4j log4j-core org.apache.logging.log4j log4j-api org.apache.logging.log4j log4j-slf4j-impl mysql mysql-connector-java 5.1.44 runtime org.apache.flink flink-shaded-hadoop-3 3.1.1.7.2.8.0-224-9.0 slf4j-log4j12 org.slf4j org.apache.logging.log4j log4j-core org.apache.logging.log4j log4j-api org.apache.logging.log4j log4j-slf4j-impl log4j log4j org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} provided org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-state-processor-api_${scala.binary.version} ${flink.version} provided org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} commons-lang commons-lang 2.5 compile org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} provided org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime org.apache.hadoop hadoop-client 3.3.1 org.apache.hadoop hadoop-auth ${hadoop.version} org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${flink.version} provided com.alibaba fastjson 1.1.23 org.projectlombok lombok 1.16.18 provided org.apache.maven.plugins maven-shade-plugin 3.0.0 package shade false org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* org.apache.flink:flink-runtime-web_2.11 *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA com.owp.flink.kafka.KafkaSourceDemo maven-assembly-plugin RocketMQProducerDemo false jar-with-dependencies kafka2mq.api.mq.RocketMQProducerDemo make-assembly package assembly org.eclipse.m2e lifecycle-mapping 1.0.0 org.apache.maven.plugins maven-shade-plugin [3.0.0,) shade org.apache.maven.plugins maven-compiler-plugin [3.1,) testCompile compile cdh.releases.repo https://repository.cloudera.com/artifactory/libs-release-local/ Releases Repository 2)代码实现
注意:
1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。
2、此程序中 Kafka 涉及到了 Kerberos 认证操作,大家的操作环境中没有的话可以去掉。
2.1.主程序
import cdp.kafka2mq.test.conf.ConfigTools; import cdp.kafka2mq.test.function.MqSinkFunction; import cdp.kafka2mq.test.utils.CommonUtils; import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.rocketmq.client.producer.DefaultMQProducer; import java.util.*; public class Test { // public static Logger logger = Logger.getLogger(Test.class); public static void main(String[] args) throws Exception { ConfigTools.initMySqlConf(args[0], Test.class); Map
mapConf = ConfigTools.mapConf; Map mqProducer = (Map ) mapConf.get("mq-producer"); Map kafkaConsumerConf = (Map ) mapConf.get("kafka-consumer"); String mqTopic = String.valueOf(mqProducer.get("defaultTopic")); System.out.println("mq-topic:" + mqTopic); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining(); //自定义source FlinkKafkaConsumer > myConsumer = CommonUtils.getKafkaConsumer(kafkaConsumerConf); DataStream > stream = env.addSource(myConsumer); //自定义sink stream.addSink(new MqSinkFunction(mqTopic, mapConf)).setParallelism(1); env.execute(); } } 2.2.conf
2.2.1.ConfigTools
读取 Mysql 中的配置
package cdp.kafka2mq.test.conf; import com.alibaba.fastjson.JSON; import cdp.kafka2mq.test.utils.DBConn; import lombok.extern.slf4j.Slf4j; import org.ho.yaml.Yaml; import java.io.InputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @Slf4j public class ConfigTools { public static Map
mapConf; /** * 获取对应的配置文件 * * @param option */ public static void initConf(String option) { String confFile = "/flink_backup_" + option + ".yml"; try { InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile); mapConf = Yaml.loadType(dumpFile, HashMap.class); } catch (Exception e) { e.printStackTrace(); } } /** * 获取对应的配置文件 * * @param option */ public static void initMySqlConf(String option, Class clazz) { String className = clazz.getName(); String confFile = "/appconfig.yml"; Map mysqlConf; try { InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile); mysqlConf = Yaml.loadType(dumpFile, HashMap.class); String username = mysqlConf.get("mysql.username"); String password = mysqlConf.get("mysql.password"); String url = mysqlConf.get("mysql.url"); Connection conn = DBConn.conn(url, username, password); Map config = getConfig(conn, className, option); if (config == null || config.size() == 0) { log.error("获取配置文件失败"); return; } mapConf = config; } catch (Exception e) { e.printStackTrace(); } } private static Map getConfig(Connection conn, String className, String option) throws SQLException { PreparedStatement preparedStatement = null; try { String sql = "select config_context from base_app_config where app_name = '%s' and config_name = '%s'"; preparedStatement = conn.prepareStatement(String.format(sql, className, option)); ResultSet rs = preparedStatement.executeQuery(); Map map = new LinkedHashMap<>(); String config_context = ""; while (rs.next()) { config_context = rs.getString("config_context"); } System.out.println("配置信息config_context:"+config_context); Map mysqlConfMap = JSON.parseObject(config_context, Map.class); return mysqlConfMap; }finally { if (preparedStatement != null) { preparedStatement.close(); } if (conn != null) { conn.close(); } } } public static void main(String[] args) { initConf("local"); String s = JSON.toJSONString(mapConf); System.out.println(s); } } 2.3.utils
2.3.1.DBConn
Mysql 连接工具类
package cdp.kafka2mq.test.utils; import java.sql.*; public class DBConn { private static final String driver = "com.mysql.jdbc.Driver"; //mysql驱动 private static Connection conn = null; private static PreparedStatement ps = null; private static ResultSet rs = null; private static final CallableStatement cs = null; /** * 连接数据库 * @return */ public static Connection conn(String url,String username,String password) { Connection conn = null; try { Class.forName(driver); //加载数据库驱动 try { conn = DriverManager.getConnection(url, username, password); //连接数据库 } catch (SQLException e) { e.printStackTrace(); } } catch (ClassNotFoundException e) { e.printStackTrace(); } return conn; } /** * 关闭数据库链接 * @return */ public static void close() { if(conn != null) { try { conn.close(); //关闭数据库链接 } catch (SQLException e) { e.printStackTrace(); } } } }
2.3.2.CommonUtils
Kafka 消费工具类
package cdp.kafka2mq.test.utils; import cdp.kafka2mq.test.conf.ConfigTools; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.TimeUnit; @Slf4j public class CommonUtils { public static FlinkKafkaConsumer
> getKafkaConsumer(Map kafkaConf) throws IOException { String[] topics = ((String) kafkaConf.get("topics")).split(","); log.info("监听的topic: {}", topics); Properties properties = new Properties(); Map kafkaProp = (Map ) kafkaConf.get("prop"); for (String key : kafkaProp.keySet()) { properties.setProperty(key, kafkaProp.get(key).toString()); } if (!StringUtils.isBlank((String) kafkaProp.get("isKerberized")) && "1".equals(kafkaProp.get("isKerberized"))) { System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf")); properties.put("security.protocol", kafkaProp.get("security_protocol")); properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=" + kafkaProp.get("useTicketCache") + " " + "serviceName=\"" + kafkaProp.get("serviceName") + "\" " + "useKeyTab=true " + "keyTab=\"" + kafkaProp.get("keytab").toString() + "\" " + "principal=\"" + kafkaProp.get("principal").toString() + "\";"); } properties.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer"); properties.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer"); FlinkKafkaConsumer > consumerRecordFlinkKafkaConsumer = new FlinkKafkaConsumer >(Arrays.asList(topics), new KafkaDeserializationSchema >() { @Override public TypeInformation > getProducedType() { return TypeInformation.of(new TypeHint >() { }); } @Override public boolean isEndOfStream(ConsumerRecord stringStringConsumerRecord) { return false; } @Override public ConsumerRecord deserialize(ConsumerRecord record) throws Exception { return new ConsumerRecord ( record.topic(), record.partition(), record.offset(), record.timestamp(), record.timestampType(), record.checksum(), record.serializedKeySize(), record.serializedValueSize(), new String(record.key() == null ? "".getBytes(StandardCharsets.UTF_8) : record.key(), StandardCharsets.UTF_8), new String(record.value() == null ? "{}".getBytes(StandardCharsets.UTF_8) : record.value(), StandardCharsets.UTF_8)); } }, properties); return consumerRecordFlinkKafkaConsumer; } } 2.4.function
2.4.1.MqSinkFunction
自定义 sink-function 实现数据写入 RocketMQ
package cdp.kafka2mq.test.function; import cdp.kafka2mq.test.conf.ConfigTools; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; public class MqSinkFunction extends RichSinkFunction
> { private static DefaultMQProducer producer = new DefaultMQProducer(); private String topic; private Map confMap; public MqSinkFunction(String topic, Map confMap) { this.topic = topic; this.confMap = confMap; } @Override public void open(Configuration parameters) throws Exception { // super.open(parameters); System.out.println("confMap: " + confMap); Map mqProducer = (Map ) confMap.get("mq-producer"); Map mqProp = (Map ) mqProducer.get("prop"); String groupId = String.valueOf(mqProp.get("group")); String srvAddr = String.valueOf(mqProp.get("server.address")); int retries = Integer.parseInt(String.valueOf(mqProp.get("retries"))); System.out.println("mq生产者组:" + groupId); System.out.println("mq地址:" + srvAddr); System.out.println("retries:" + retries); producer.setProducerGroup(groupId); producer.setNamesrvAddr(srvAddr); producer.setRetryTimesWhenSendFailed(retries); producer.setUseTLS(true); producer.start(); } @Override public void invoke(ConsumerRecord record, Context context) throws Exception { String message = record.value(); // System.out.println(message); HashMap infoMap = JSON.parseObject(message, HashMap.class); // System.out.println(infoMap); String id = String.valueOf(infoMap.get("id")); if (StringUtils.isBlank(id)) { return; } Message msg = new Message(); msg.setTopic(topic); msg.setTags(id); msg.setBody(message.getBytes(StandardCharsets.UTF_8)); msg.setTransactionId(id); // System.out.println("msg:" + msg); System.out.println("send前"); SendResult send = producer.send(msg); System.out.printf("%s%n", send); System.out.println("send后"); } } 2.5.resources
其他配置文件
2.5.1.appconfig.yml
mysql.url: "jdbc:mysql://ip:3306/dbName?useSSL=false" mysql.username: "username" mysql.password: "password" mysql.driver: "com.mysql.jdbc.Driver"
2.5.2.log4j.properties
log4j.rootLogger=info, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
2.5.3.log4j2.xml
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章