上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink实现同时消费多个kafka topic,并输出到多个topic

guduadmin18小时前

Flink实现同时消费多个kafka topic,并输出到多个topic

  • 1.说明
  • 2.依赖引用
  • 3. 方案一:适用于==sink topic==存在跨集群等kafka生产者配置信息不相同的情况
    • 3.1配置文件
    • 3.2 java代码
    • 3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)
    • 4.方案二:适用于输入及输出topic都用属于一个集群的场景
      • 4.1 配置文件同上
      • 4.2 Java代码
      • 5. 业务使用场景:

        1.说明

        1)代码使用的flink版本为1.16.1,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换;

        2)本次编写的两个方案,均只适用于数据源topic来自同一个集群,且kafka消费组相同,暂未研究flink的connect算子join多条流

        2.依赖引用

         
                8
                8
                UTF-8
                1.16.1
                5.8.15
            
            
         
                    cn.hutool
                    hutool-all
                    ${hutool.version}
                
                
                
                    org.apache.flink
                    flink-java
                    ${flink.version}
                    
                        
                            commons-lang3
                            org.apache.commons
                        
                    
                
                
                    org.apache.flink
                    flink-streaming-java
                    ${flink.version}
                    
                        
                            commons-lang3
                            org.apache.commons
                        
                    
                
                
                    org.apache.flink
                    flink-clients
                    ${flink.version}
                
                
                    org.apache.flink
                    flink-connector-kafka
                    ${flink.version}
                
                
                    org.apache.flink
                    flink-connector-base
                    ${flink.version}
                
                
                    org.apache.flink
                    flink-connector-files
                    ${flink.version}
                
                
                    org.apache.flink
                    flink-connector-jdbc
                    ${flink.version}
                
                
                    org.apache.flink
                    flink-runtime-web
                    ${flink.version}
                
        

        3. 方案一:适用于sink topic存在跨集群等kafka生产者配置信息不相同的情况

        代码涉及Hadoop相关环境,若无该环境的同学,可以设置为本地路径

        3.1配置文件

        # 输入topic列表
        newInputTopic=hive_data_input_topic
        # 输出topic列表
        newOutputTopic=topic-test
        

        3.2 java代码

        public static void main(String[] args) throws Exception {
        		// 设置操作HDFS的用户
                System.setProperty("HADOOP_USER_NAME", "hadoop");
                // 获取命令行参数,args[0] 为配置文件路径 input/customer.properties
                ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
                String inputTopic = parameterTool.get("newInputTopic");
                String outputTopic = parameterTool.get("newOutputTopic");
                // 构建输入topic
                ArrayList inputTopicList = new ArrayList<>();
                inputTopicList.add("canal_mysql_input_topic");
                if (!StringUtils.isNullOrWhitespaceOnly(inputTopic)) {
                    inputTopicList.add(inputTopic);
                }
                // 构建输出topic
                Map hashMap = new HashMap<>();
                hashMap.put("ap_article", "canal_input_topic");
                hashMap.put("ap_user", "cast_topic_input");
                if (!StringUtils.isNullOrWhitespaceOnly(outputTopic)) {
                    hashMap.put("hive_table_orders", "topic-test");
                }
                // 构建配置
                Configuration configuration = new Configuration();
                // 设定本地flink dashboard的webUi访问端口,即http://localhost:9091
                configuration.setString("rest.port", "9091");
                // 设定从指定的checkpoint恢复,此处为HDFS路径,可更换为本地路径"file:///D:\\test\\flink-tuning\\checkpoint\\jobId\\chk-xx"
                String savePointPath = "hdfs://masterNode:8020/flink-tuning/checkpoint/b66ee8431170f07764db0e777c58848a/chk-36";
                // 设置savepoint路径,以及是否允许本次提交的程序有新增有状态算子,必须给原来的算子配置uid作为唯一标识,否则会出现问题
                SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savePointPath, true);
                SavepointRestoreSettings.toConfiguration(restoreSettings, configuration);
                
        		// 获取执行环境
                StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
                // 开启检查点,设置检查点间隔时间
                environment.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
                // 设置状态后端类型
                environment.setStateBackend(new HashMapStateBackend());
                CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
                // 设置checkpoint文件存放路径,设置本地路径:file:///D:\\test\\flink-tuning\\checkpoint
                checkpointConfig.setCheckpointStorage("hdfs://masterNode:8020/flink-tuning/checkpoint");
                // 设置并发数,同时最多可以有几个checkpoint执行
                checkpointConfig.setMaxConcurrentCheckpoints(1);
                // checkpoint失败次数,超过此次数,job挂掉(checkpoint不会重试,会等待下一个checkpoint)
                checkpointConfig.setTolerableCheckpointFailureNumber(5);
                // 超时多久没完成checkpoint,任务失败
                checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
                // 手动cancel掉job时,保留在外部系统的checkpoint不会被删除
                checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                // 从kafka读取数据
                KafkaSource kafkaSource = KafkaSource.builder()
                        .setBootstrapServers("192.168.200.130:9092")
                        .setTopics(inputTopicList)
                        .setGroupId("group-test-savepoint")
                        // 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置
                        .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                        .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
                        .setValueOnlyDeserializer(new SimpleStringSchema())
                        .build();
                SingleOutputStreamOperator streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source")
                        .uid("kafka_source")	// 最好设置一下算子的id
                        .setParallelism(5); // 设置并行度 = topic分区数
                // 此处使用循环,会开辟map键值对个数的算子链,多个filter --> sink算子链,详情见下图
              	// map中可配置topic所属集群,以及鉴权信息等,此处省略
                for (String key : hashMap.keySet()) {
                	// filter算子根据数据中的表名table与topic之间的映射关系,过滤数据
                    SingleOutputStreamOperator outputStreamOperator = streamSource.filter(vo -> {
                        JSONObject jsonObject = JSONUtil.parseObj(vo);
                        String tableName = (String) jsonObject.get("table");
                        return tableName.equals(key);
                    }).uid("filter-" + key).setParallelism(5);
        			// 构建kafka sink
                    KafkaSink kafkaSink = KafkaSink.builder()
                    		// kafka集群,可根据不同topic所在集群不同,动态更换ip
                            .setBootstrapServers("192.168.200.130:9092")
                            // 自定义kafka序列化器
                            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                            		// 根据映射获取输出topic
                                    .setTopic(hashMap.get(key))
                                    .setValueSerializationSchema(new SimpleStringSchema())
                                    .build())
                             // 一致性语义:至少一次
                            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                            .build();
                    // sink算子
                    outputStreamOperator.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1);
                }
                // 执行
                environment.execute();
        

        3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)

        Flink实现同时消费多个kafka topic,并输出到多个topic,算子运行图,第1张

        4.方案二:适用于输入及输出topic都用属于一个集群的场景

        4.1 配置文件同上

        4.2 Java代码

        public static void main(String[] args) throws Exception {
        		// 环境配置同上,故此处省略。。。
                // 从kafka读取数据
                KafkaSource kafkaSource = KafkaSource.builder()
                        .setBootstrapServers("192.168.200.130:9092")
                        .setTopics(inputTopicList)
                        .setGroupId("group-test-savepoint")
                        // 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置
                        .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                        .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
                        .setValueOnlyDeserializer(new SimpleStringSchema())
                        .build();
                SingleOutputStreamOperator streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source")
                        .uid("kafka_source")	// 最好设置一下算子的id
                        .setParallelism(5); // 设置并行度 = topic分区数
                        
                // 输出到kafka,此处没有循环,只会产生一条算子链
                KafkaSink kafkaSink = KafkaSink.builder()
                        .setBootstrapServers("192.168.200.130:9092")	// 输出topic的kafka集群固定
                        .setRecordSerializer((KafkaRecordSerializationSchema) (data, context, timestamp) -> {
                            JSONObject jsonObject = JSONUtil.parseObj(data);
                            // 获取表名
                            String table = (String) jsonObject.get("table");
                            // 获取topic
                            String topic = hashMap.get(table);
                            return new ProducerRecord<>(topic, data.getBytes(StandardCharsets.UTF_8));
                        })
                        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                        .build();
                // sink算子
                streamSource.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1);
                }
                // 执行
                environment.execute();
        

        5. 业务使用场景:

        Flink实现同时消费多个kafka topic,并输出到多个topic,在这里插入图片描述,第2张

网友评论

搜索
最新文章
热门文章
热门标签