Flink实现同时消费多个kafka topic,并输出到多个topic
- 1.说明
- 2.依赖引用
- 3. 方案一:适用于==sink topic==存在跨集群等kafka生产者配置信息不相同的情况
- 3.1配置文件
- 3.2 java代码
- 3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)
- 4.方案二:适用于输入及输出topic都用属于一个集群的场景
- 4.1 配置文件同上
- 4.2 Java代码
- 5. 业务使用场景:
1.说明
1)代码使用的flink版本为1.16.1,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换;
2)本次编写的两个方案,均只适用于数据源topic来自同一个集群,且kafka消费组相同,暂未研究flink的connect算子join多条流
2.依赖引用
8 8 UTF-8 1.16.1 5.8.15 cn.hutool hutool-all ${hutool.version} org.apache.flink flink-java ${flink.version} commons-lang3 org.apache.commons org.apache.flink flink-streaming-java ${flink.version} commons-lang3 org.apache.commons org.apache.flink flink-clients ${flink.version} org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-connector-base ${flink.version} org.apache.flink flink-connector-files ${flink.version} org.apache.flink flink-connector-jdbc ${flink.version} org.apache.flink flink-runtime-web ${flink.version} 3. 方案一:适用于sink topic存在跨集群等kafka生产者配置信息不相同的情况
代码涉及Hadoop相关环境,若无该环境的同学,可以设置为本地路径
3.1配置文件
# 输入topic列表 newInputTopic=hive_data_input_topic # 输出topic列表 newOutputTopic=topic-test
3.2 java代码
public static void main(String[] args) throws Exception { // 设置操作HDFS的用户 System.setProperty("HADOOP_USER_NAME", "hadoop"); // 获取命令行参数,args[0] 为配置文件路径 input/customer.properties ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]); String inputTopic = parameterTool.get("newInputTopic"); String outputTopic = parameterTool.get("newOutputTopic"); // 构建输入topic ArrayList
inputTopicList = new ArrayList<>(); inputTopicList.add("canal_mysql_input_topic"); if (!StringUtils.isNullOrWhitespaceOnly(inputTopic)) { inputTopicList.add(inputTopic); } // 构建输出topic Map hashMap = new HashMap<>(); hashMap.put("ap_article", "canal_input_topic"); hashMap.put("ap_user", "cast_topic_input"); if (!StringUtils.isNullOrWhitespaceOnly(outputTopic)) { hashMap.put("hive_table_orders", "topic-test"); } // 构建配置 Configuration configuration = new Configuration(); // 设定本地flink dashboard的webUi访问端口,即http://localhost:9091 configuration.setString("rest.port", "9091"); // 设定从指定的checkpoint恢复,此处为HDFS路径,可更换为本地路径"file:///D:\\test\\flink-tuning\\checkpoint\\jobId\\chk-xx" String savePointPath = "hdfs://masterNode:8020/flink-tuning/checkpoint/b66ee8431170f07764db0e777c58848a/chk-36"; // 设置savepoint路径,以及是否允许本次提交的程序有新增有状态算子,必须给原来的算子配置uid作为唯一标识,否则会出现问题 SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savePointPath, true); SavepointRestoreSettings.toConfiguration(restoreSettings, configuration); // 获取执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); // 开启检查点,设置检查点间隔时间 environment.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // 设置状态后端类型 environment.setStateBackend(new HashMapStateBackend()); CheckpointConfig checkpointConfig = environment.getCheckpointConfig(); // 设置checkpoint文件存放路径,设置本地路径:file:///D:\\test\\flink-tuning\\checkpoint checkpointConfig.setCheckpointStorage("hdfs://masterNode:8020/flink-tuning/checkpoint"); // 设置并发数,同时最多可以有几个checkpoint执行 checkpointConfig.setMaxConcurrentCheckpoints(1); // checkpoint失败次数,超过此次数,job挂掉(checkpoint不会重试,会等待下一个checkpoint) checkpointConfig.setTolerableCheckpointFailureNumber(5); // 超时多久没完成checkpoint,任务失败 checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1)); // 手动cancel掉job时,保留在外部系统的checkpoint不会被删除 checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 从kafka读取数据 KafkaSource kafkaSource = KafkaSource. builder() .setBootstrapServers("192.168.200.130:9092") .setTopics(inputTopicList) .setGroupId("group-test-savepoint") // 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费 .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); SingleOutputStreamOperator streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source") .uid("kafka_source") // 最好设置一下算子的id .setParallelism(5); // 设置并行度 = topic分区数 // 此处使用循环,会开辟map键值对个数的算子链,多个filter --> sink算子链,详情见下图 // map中可配置topic所属集群,以及鉴权信息等,此处省略 for (String key : hashMap.keySet()) { // filter算子根据数据中的表名table与topic之间的映射关系,过滤数据 SingleOutputStreamOperator outputStreamOperator = streamSource.filter(vo -> { JSONObject jsonObject = JSONUtil.parseObj(vo); String tableName = (String) jsonObject.get("table"); return tableName.equals(key); }).uid("filter-" + key).setParallelism(5); // 构建kafka sink KafkaSink kafkaSink = KafkaSink. builder() // kafka集群,可根据不同topic所在集群不同,动态更换ip .setBootstrapServers("192.168.200.130:9092") // 自定义kafka序列化器 .setRecordSerializer(KafkaRecordSerializationSchema.builder() // 根据映射获取输出topic .setTopic(hashMap.get(key)) .setValueSerializationSchema(new SimpleStringSchema()) .build()) // 一致性语义:至少一次 .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); // sink算子 outputStreamOperator.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1); } // 执行 environment.execute(); 3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)
4.方案二:适用于输入及输出topic都用属于一个集群的场景
4.1 配置文件同上
4.2 Java代码
public static void main(String[] args) throws Exception { // 环境配置同上,故此处省略。。。 // 从kafka读取数据 KafkaSource
kafkaSource = KafkaSource. builder() .setBootstrapServers("192.168.200.130:9092") .setTopics(inputTopicList) .setGroupId("group-test-savepoint") // 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费 .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); SingleOutputStreamOperator streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source") .uid("kafka_source") // 最好设置一下算子的id .setParallelism(5); // 设置并行度 = topic分区数 // 输出到kafka,此处没有循环,只会产生一条算子链 KafkaSink kafkaSink = KafkaSink. builder() .setBootstrapServers("192.168.200.130:9092") // 输出topic的kafka集群固定 .setRecordSerializer((KafkaRecordSerializationSchema ) (data, context, timestamp) -> { JSONObject jsonObject = JSONUtil.parseObj(data); // 获取表名 String table = (String) jsonObject.get("table"); // 获取topic String topic = hashMap.get(table); return new ProducerRecord<>(topic, data.getBytes(StandardCharsets.UTF_8)); }) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); // sink算子 streamSource.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1); } // 执行 environment.execute(); 5. 业务使用场景:
猜你喜欢
- 8小时前hadoop报错ERROR: Cannot set priority of namenode process
- 8小时前Spark简介
- 8小时前[springboot配置Kafka] springboot配置多个kafka,包含账号密码
- 8小时前HBase高阶(一)基础架构及存储原理
- 8小时前日志系统二(ilogtail+kafka+logstash+es+kibana)
- 8小时前【大数据-Hadoop】从入门到源码编译-概念篇
- 5小时前汽车脚踏板安装图解(汽车脚踏板怎么拆卸)
- 3小时前人和马的成语(人跟马成语)
- 3小时前排气扇什么牌子好(强力静音排气扇靠前品牌)
- 59分钟前试验检测师报考条件(试验检测师报考条件及时间安排)
网友评论
- 搜索
- 最新文章
- 热门文章