上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

guduadmin13小时前

【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

  • 1)准备环境
  • 2)代码实现
    • 2.1.主程序
    • 2.2.conf
      • 2.2.1.ConfigTools
      • 2.3.utils
        • 2.3.1.DBConn
        • 2.3.2.CommonUtils
        • 2.4.function
          • 2.4.1.MqSinkFunction
          • 2.5.resources
            • 2.5.1.appconfig.yml
            • 2.5.2.log4j.properties
            • 2.5.3.log4j2.xml

              1)准备环境

              这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。

              
              
                  4.0.0
                  org.example
                  flink-kafka2mq
                  1.0-SNAPSHOT
                  
                      2.3.3
                      3.1.1
                      3.0.2
                      2.12.10
                      8
                      8
                      UTF-8
                      1.14.6
                      2.12
                      1.8
                      ${target.java.version}
                      ${target.java.version}
                      2.17.2
                      3.1.2
                      3.1.2
                  
                  
                  
                      
                      
                          org.apache.rocketmq
                          rocketmq-client
                          4.8.0
                      
                      
                          io.netty
                          netty-all
                          4.1.68.Final
                      
                      
                          org.apache.flink
                          flink-connector-jdbc_${scala.binary.version}
                          ${flink.version}
                      
                      
                          org.jyaml
                          jyaml
                          1.3
                      
                      
                          gaei.cn.x5l
                          tsp-gb-decode
                          1.0.0
                          
                              
                                  org.apache.logging.log4j
                                  log4j-core
                              
                              
                                  org.apache.logging.log4j
                                  log4j-api
                              
                              
                                  org.apache.logging.log4j
                                  log4j-slf4j-impl
                              
                          
                      
                      
                          mysql
                          mysql-connector-java
                          5.1.44
                          runtime
                      
                      
                      
                          org.apache.flink
                          flink-shaded-hadoop-3
                          3.1.1.7.2.8.0-224-9.0
              
                          
                              
                                  slf4j-log4j12
                                  org.slf4j
                              
                              
                                  org.apache.logging.log4j
                                  log4j-core
                              
                              
                                  org.apache.logging.log4j
                                  log4j-api
                              
                              
                                  org.apache.logging.log4j
                                  log4j-slf4j-impl
                              
                              
                                  log4j
                                  log4j
                              
                          
                      
                      
                          org.apache.flink
                          flink-sql-connector-kafka_${scala.binary.version}
                          ${flink.version}
                      
                      
                      
                          org.apache.flink
                          flink-java
                          ${flink.version}
                          provided
                      
                      
                          org.apache.flink
                          flink-streaming-java_${scala.binary.version}
                          ${flink.version}
                          provided
                      
                      
                          org.apache.flink
                          flink-clients_${scala.binary.version}
                          ${flink.version}
                          provided
                      
                      
                          org.apache.flink
                          flink-streaming-scala_${scala.binary.version}
                          ${flink.version}
                          provided
                      
                      
                      
                          org.apache.flink
                          flink-sql-connector-kafka_${scala.binary.version}
                          ${flink.version}
                      
                      
                      
                      
                          org.apache.flink
                          flink-state-processor-api_${scala.binary.version}
                          ${flink.version}
                          provided
                      
                      
                          org.apache.flink
                          flink-connector-kafka_${scala.binary.version}
                          ${flink.version}
                      
                      
                          commons-lang
                          commons-lang
                          2.5
                          compile
                      
                      
                          org.apache.flink
                          flink-runtime-web_${scala.binary.version}
                          ${flink.version}
                          provided
                      
                      
                      
                      
                          org.apache.logging.log4j
                          log4j-slf4j-impl
                          ${log4j.version}
                          runtime
                      
                      
                          org.apache.logging.log4j
                          log4j-api
                          ${log4j.version}
                          runtime
                      
                      
                          org.apache.logging.log4j
                          log4j-core
                          ${log4j.version}
                          runtime
                      
                      
                      
                          org.apache.hadoop
                          hadoop-client
                          3.3.1
                      
                      
                      
                          org.apache.hadoop
                          hadoop-auth
                          ${hadoop.version}
                      
                      
                      
                          org.apache.flink
                          flink-statebackend-rocksdb_${scala.binary.version}
                          ${flink.version}
                          provided
                      
                      
                      
                          com.alibaba
                          fastjson
                          1.1.23
                      
                      
                          org.projectlombok
                          lombok
                          1.16.18
                          provided
                      
                  
                  
                      
                          
                              org.apache.maven.plugins
                              maven-shade-plugin
                              3.0.0
                              
                                  
                                      package
                                      
                                          shade
                                      
                                      
                                          false
                                          
                                          
                                          
                                          
                                          
                                          
                                          
                                              
                                                  org.apache.flink:force-shading
                                                  com.google.code.findbugs:jsr305
                                                  org.slf4j:*
                                                  org.apache.logging.log4j:*
                                                  org.apache.flink:flink-runtime-web_2.11
                                              
                                          
                                          
                                              
                                                  *:*
                                                  
                                                      META-INF/*.SF
                                                      META-INF/*.DSA
                                                      META-INF/*.RSA
                                                  
                                              
                                          
                                          
                                              
                                                  com.owp.flink.kafka.KafkaSourceDemo
                                              
                                              
                                              
                                              
                                              
                                          
                                      
                                  
                              
                          
                          
                              maven-assembly-plugin
                              
                                  RocketMQProducerDemo
                                  false
                                  
                                      jar-with-dependencies
                                  
                                  
                                      
                                          
                                          kafka2mq.api.mq.RocketMQProducerDemo
                                      
                                  
                              
                              
                                  
                                      make-assembly
                                      package
                                      
                                          assembly
                                      
                                  
                              
                          
                      
                      
                          
                              
                              
                                  org.eclipse.m2e
                                  lifecycle-mapping
                                  1.0.0
                                  
                                      
                                          
                                              
                                                  
                                                      org.apache.maven.plugins
                                                      maven-shade-plugin
                                                      [3.0.0,)
                                                      
                                                          shade
                                                      
                                                  
                                                  
                                                      
                                                  
                                              
                                              
                                                  
                                                      org.apache.maven.plugins
                                                      maven-compiler-plugin
                                                      [3.1,)
                                                      
                                                          testCompile
                                                          compile
                                                      
                                                  
                                                  
                                                      
                                                  
                                              
                                          
                                      
                                  
                              
                          
                      
                  
                  
                      
                          cdh.releases.repo
                          https://repository.cloudera.com/artifactory/libs-release-local/
                          Releases Repository
                      
                  
              
              

              2)代码实现

              注意:

              1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。

              2、此程序中 Kafka 涉及到了 Kerberos 认证操作,大家的操作环境中没有的话可以去掉。

              2.1.主程序

              import cdp.kafka2mq.test.conf.ConfigTools;
              import cdp.kafka2mq.test.function.MqSinkFunction;
              import cdp.kafka2mq.test.utils.CommonUtils;
              import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
              import org.apache.flink.streaming.api.datastream.DataStream;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
              import org.apache.rocketmq.client.producer.DefaultMQProducer;
              import java.util.*;
              public class Test {
                  //    public static Logger logger = Logger.getLogger(Test.class);
                  public static void main(String[] args) throws Exception {
                      ConfigTools.initMySqlConf(args[0], Test.class);
                      Map mapConf = ConfigTools.mapConf;
                      Map mqProducer = (Map) mapConf.get("mq-producer");
                      Map kafkaConsumerConf = (Map) mapConf.get("kafka-consumer");
                      String mqTopic = String.valueOf(mqProducer.get("defaultTopic"));
                      System.out.println("mq-topic:" + mqTopic);
                      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();
              		//自定义source
                      FlinkKafkaConsumer> myConsumer = CommonUtils.getKafkaConsumer(kafkaConsumerConf);
                      DataStream> stream = env.addSource(myConsumer);
                      //自定义sink
                      stream.addSink(new MqSinkFunction(mqTopic, mapConf)).setParallelism(1);
                      
                      env.execute();
                  }
              }
              

              2.2.conf

              2.2.1.ConfigTools

              读取 Mysql 中的配置

              package cdp.kafka2mq.test.conf;
              import com.alibaba.fastjson.JSON;
              import cdp.kafka2mq.test.utils.DBConn;
              import lombok.extern.slf4j.Slf4j;
              import org.ho.yaml.Yaml;
              import java.io.InputStream;
              import java.sql.Connection;
              import java.sql.PreparedStatement;
              import java.sql.ResultSet;
              import java.sql.SQLException;
              import java.util.HashMap;
              import java.util.LinkedHashMap;
              import java.util.Map;
              @Slf4j
              public class ConfigTools {
                  public static Map mapConf;
                  /**
                   * 获取对应的配置文件
                   *
                   * @param option
                   */
                  public static void initConf(String option) {
                      String confFile = "/flink_backup_" + option + ".yml";
                      try {
                          InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile);
                          mapConf = Yaml.loadType(dumpFile, HashMap.class);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }
                  /**
                   * 获取对应的配置文件
                   *
                   * @param option
                   */
                  public static void initMySqlConf(String option, Class clazz) {
                      String className = clazz.getName();
                      String confFile = "/appconfig.yml";
                      Map mysqlConf;
                      try {
                          InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile);
                          mysqlConf = Yaml.loadType(dumpFile, HashMap.class);
                          String username = mysqlConf.get("mysql.username");
                          String password = mysqlConf.get("mysql.password");
                          String url = mysqlConf.get("mysql.url");
                          Connection conn = DBConn.conn(url, username, password);
                          Map config = getConfig(conn, className, option);
                          if (config == null || config.size() == 0) {
                              log.error("获取配置文件失败");
                              return;
                          }
                          mapConf = config;
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }
                  private static Map getConfig(Connection conn, String className, String option) throws SQLException {
                      PreparedStatement preparedStatement = null;
                      try {
                          String sql = "select config_context from base_app_config where app_name = '%s' and config_name = '%s'";
                          preparedStatement = conn.prepareStatement(String.format(sql, className, option));
                          ResultSet rs = preparedStatement.executeQuery();
                          Map map = new LinkedHashMap<>();
                          String config_context = "";
                          while (rs.next()) {
                              config_context = rs.getString("config_context");
                          }
                          System.out.println("配置信息config_context:"+config_context);
                          Map mysqlConfMap = JSON.parseObject(config_context, Map.class);
                          return mysqlConfMap;
                      }finally {
                          if (preparedStatement != null) {
                              preparedStatement.close();
                          }
                          if (conn != null) {
                              conn.close();
                          }
                      }
                  }
                  public static void main(String[] args) {
                      initConf("local");
                      String s = JSON.toJSONString(mapConf);
                      System.out.println(s);
                  }
              }
              

              2.3.utils

              2.3.1.DBConn

              Mysql 连接工具类

              package cdp.kafka2mq.test.utils;
              import java.sql.*;
              public class DBConn {
                  private static final String driver = "com.mysql.jdbc.Driver";		//mysql驱动
                  private static Connection conn = null;
                  private static PreparedStatement ps = null;
                  private static ResultSet rs = null;
                  private static final CallableStatement cs = null;
                  /**
                   * 连接数据库
                   * @return
                   */
                  public static Connection conn(String url,String username,String password) {
                      Connection conn = null;
                      try {
                          Class.forName(driver);  //加载数据库驱动
                          try {
                              conn = DriverManager.getConnection(url, username, password);  //连接数据库
                          } catch (SQLException e) {
                              e.printStackTrace();
                          }
                      } catch (ClassNotFoundException e) {
                          e.printStackTrace();
                      }
                      return conn;
                  }
                  /**
                   * 关闭数据库链接
                   * @return
                   */
                  public static void close() {
                      if(conn != null) {
                          try {
                              conn.close();  //关闭数据库链接
                          } catch (SQLException e) {
                              e.printStackTrace();
                          }
                      }
                  }
              }
              

              2.3.2.CommonUtils

              Kafka 消费工具类

              package cdp.kafka2mq.test.utils;
              import cdp.kafka2mq.test.conf.ConfigTools;
              import lombok.extern.slf4j.Slf4j;
              import org.apache.commons.lang3.StringUtils;
              import org.apache.flink.api.common.restartstrategy.RestartStrategies;
              import org.apache.flink.api.common.serialization.SimpleStringSchema;
              import org.apache.flink.api.common.time.Time;
              import org.apache.flink.api.common.typeinfo.TypeHint;
              import org.apache.flink.api.common.typeinfo.TypeInformation;
              import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
              import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
              import org.apache.flink.streaming.api.CheckpointingMode;
              import org.apache.flink.streaming.api.environment.CheckpointConfig;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
              import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
              import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
              import java.io.IOException;
              import java.nio.charset.StandardCharsets;
              import java.text.ParseException;
              import java.text.SimpleDateFormat;
              import java.util.*;
              import java.util.concurrent.TimeUnit;
              @Slf4j
              public class CommonUtils {
                  public static FlinkKafkaConsumer> getKafkaConsumer(Map kafkaConf) throws IOException {
                      String[] topics = ((String) kafkaConf.get("topics")).split(",");
                      log.info("监听的topic: {}", topics);
                      Properties properties = new Properties();
                      Map kafkaProp = (Map) kafkaConf.get("prop");
                      for (String key : kafkaProp.keySet()) {
                          properties.setProperty(key, kafkaProp.get(key).toString());
                      }
                      if (!StringUtils.isBlank((String) kafkaProp.get("isKerberized")) && "1".equals(kafkaProp.get("isKerberized"))) {
                          System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf"));
                          properties.put("security.protocol", kafkaProp.get("security_protocol"));
                          properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
                                  + "useTicketCache=" + kafkaProp.get("useTicketCache") + " "
                                  + "serviceName=\"" + kafkaProp.get("serviceName") + "\" "
                                  + "useKeyTab=true "
                                  + "keyTab=\"" + kafkaProp.get("keytab").toString() + "\" "
                                  + "principal=\"" + kafkaProp.get("principal").toString() + "\";");
                      }
                      properties.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
                      properties.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
                      FlinkKafkaConsumer> consumerRecordFlinkKafkaConsumer = new FlinkKafkaConsumer>(Arrays.asList(topics), new KafkaDeserializationSchema>() {
                          @Override
                          public TypeInformation> getProducedType() {
                              return TypeInformation.of(new TypeHint>() {
                              });
                          }
                          @Override
                          public boolean isEndOfStream(ConsumerRecord stringStringConsumerRecord) {
                              return false;
                          }
                          @Override
                          public ConsumerRecord deserialize(ConsumerRecord record) throws Exception {
                              return new ConsumerRecord(
                                      record.topic(),
                                      record.partition(),
                                      record.offset(),
                                      record.timestamp(),
                                      record.timestampType(),
                                      record.checksum(),
                                      record.serializedKeySize(),
                                      record.serializedValueSize(),
                                      new String(record.key() == null ? "".getBytes(StandardCharsets.UTF_8) : record.key(), StandardCharsets.UTF_8),
                                      new String(record.value() == null ? "{}".getBytes(StandardCharsets.UTF_8) : record.value(), StandardCharsets.UTF_8));
                          }
                      }, properties);
                      return consumerRecordFlinkKafkaConsumer;
                  }
              }
              

              2.4.function

              2.4.1.MqSinkFunction

              自定义 sink-function 实现数据写入 RocketMQ

              package cdp.kafka2mq.test.function;
              import cdp.kafka2mq.test.conf.ConfigTools;
              import com.alibaba.fastjson.JSON;
              import org.apache.commons.lang3.StringUtils;
              import org.apache.flink.configuration.Configuration;
              import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
              import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
              import org.apache.rocketmq.client.producer.DefaultMQProducer;
              import org.apache.rocketmq.client.producer.SendResult;
              import org.apache.rocketmq.common.message.Message;
              import java.nio.charset.StandardCharsets;
              import java.util.HashMap;
              import java.util.Map;
              public class MqSinkFunction extends RichSinkFunction> {
                  private static DefaultMQProducer producer = new DefaultMQProducer();
                  private String topic;
                  private Map confMap;
                  public MqSinkFunction(String topic, Map confMap) {
                      this.topic = topic;
                      this.confMap = confMap;
                  }
                  @Override
                  public void open(Configuration parameters) throws Exception {
              //        super.open(parameters);
                      System.out.println("confMap: " + confMap);
                      Map mqProducer = (Map) confMap.get("mq-producer");
                      Map mqProp = (Map) mqProducer.get("prop");
                      String groupId = String.valueOf(mqProp.get("group"));
                      String srvAddr = String.valueOf(mqProp.get("server.address"));
                      int retries = Integer.parseInt(String.valueOf(mqProp.get("retries")));
                      System.out.println("mq生产者组:" + groupId);
                      System.out.println("mq地址:" + srvAddr);
                      System.out.println("retries:" + retries);
                      producer.setProducerGroup(groupId);
                      producer.setNamesrvAddr(srvAddr);
                      producer.setRetryTimesWhenSendFailed(retries);
                      producer.setUseTLS(true);
                      producer.start();
                  }
                  @Override
                  public void invoke(ConsumerRecord record, Context context) throws Exception {
                      String message = record.value();
              //        System.out.println(message);
                      HashMap infoMap = JSON.parseObject(message, HashMap.class);
              //        System.out.println(infoMap);
                      String id = String.valueOf(infoMap.get("id"));
                      if (StringUtils.isBlank(id)) {
                          return;
                      }
                      Message msg = new Message();
                      msg.setTopic(topic);
                      msg.setTags(id);
                      msg.setBody(message.getBytes(StandardCharsets.UTF_8));
                      msg.setTransactionId(id);
              //            System.out.println("msg:" + msg);
                      System.out.println("send前");
                      SendResult send = producer.send(msg);
                      System.out.printf("%s%n", send);
                      System.out.println("send后");
                  }
              }
              

              2.5.resources

              其他配置文件

              2.5.1.appconfig.yml

              mysql.url: "jdbc:mysql://ip:3306/dbName?useSSL=false"
              mysql.username: "username"
              mysql.password: "password"
              mysql.driver: "com.mysql.jdbc.Driver"
              

              2.5.2.log4j.properties

              log4j.rootLogger=info, stdout
              log4j.appender.stdout=org.apache.log4j.ConsoleAppender
              log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
              log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
              

              2.5.3.log4j2.xml

              
              
                  
                      
                      
                  
                  
                      
                          
                          
                      
                      
                          
                      
                  
                  
                      
                          
                          
                      
                  
              
              

网友评论

搜索
最新文章
热门文章
热门标签