GitHub源码https://github.com/zhangchuangiie/SimpleKafka
SimpleKafka(Kafka客户端封装工具类)
一个基于Kafka客户端封装的工具,Kafka开发效率神器
特点:
- 封装了常用的Kafka客户端操作,无需维护配置,无需初始化客户端,真正实现了一行代码调用
- 将连接池的维护封装在工具类里面,多线程使用也无需维护客户端集合
使用方式:
只需要集成1个KafkaUtil.java文件即可,修改里面的kafka服务地址即可
典型示例:
- 同步生产: LinkedHashMap
recordMeta = KafkaUtil.sendToKafka("RULEa93304e6d844000","222","aaaa"); - 异步生产: KafkaUtil.sendToKafkaAsync("RULEa93304e6d844000", "222", "aaaa");
- 消费数据: ArrayList
> buffer = KafkaUtil.recvFromKafka("RULEa93304e6d844000", "group1"); - 重置偏移: KafkaUtil.resetOffsetToEarliest("RULEa93304e6d844000", "group1");
接口介绍:
- kafkaListTopics: topic列表
- createTopic: topic创建
- delTopic: topic删除
- partitionsTopic: topic的分区列表,分区和副本数
- delGroupId: 删除groupId
- descCluster: 集群的节点列表
- kafkaConsumerGroups: 消费者列表
- kafkaConsumerGroups: 指定topic的活跃消费者列表
- sendToKafka: 生产数据到指定的topic,同步接口{"topic":"RULEa93304e6d844000","partition":1,"offset":681}
- sendToKafkaAsync: 生产数据到指定的topic,异步接口,默认回调
- sendToKafkaAsync: 生产数据到指定的topic,异步接口,自定义回调
- recvFromKafka: 按groupId消费指定topic的数据[{"topic":"RULEa93304e6d844000","key":"222","value":"aaaa","partition":1,"offset":681}]
- recvFromKafkaByOffset: 消费指定topic指定partition对应的offset数据
- recvFromKafkaByTimestamp: 消费指定topic指定partition对应的timestamp以后的数据
- resetOffsetToTimestamp: 重置指定topic的offset到对应的timestamp
- resetOffsetToEarliest: 重置指定topic的offset到最早
- resetOffsetToLatest: 重置指定topic的offset到最晚,一般在跳过测试脏数据时候使用
- consumerPositions: 获取当前消费偏移量情况{"partitionNum":2,"dataNum":1,"lagNum":0,"positions":[{"partition":0,"begin":0,"end":0,"current":0,"current1":0,"size":0,"lag":0},{"partition":1,"begin":681,"end":682,"current":682,"current1":682,"size":1,"lag":0}]}
- topicSize: 获取指定topic数据量详情情况 [{"partition": 0,"begin": 65,"end": 65,"size": 0}]
- topicSizeAll: 获取所有topic数据量详情情况
- topicSizeStatistics: 获取指定topic数据量统计{"partitionNum":5452,"dataNum":41570647}
- topicSizeStatisticsAll: 获取所有topic数据量统计{"topicNum":2550,"partitionNum":5452,"dataNum":41570647}
接口列表:
- kafkaListTopics: List kafkaListTopics()
- createTopic: void createTopic(String topic)
- delTopic: void delTopic(String topic)
- partitionsTopic: List partitionsTopic(String topic)
- delGroupId: void delGroupId(String groupId)
- descCluster: List descCluster()
- kafkaConsumerGroups: List kafkaConsumerGroups()
- kafkaConsumerGroups: List kafkaConsumerGroups(String topic)
- sendToKafka: LinkedHashMap
sendToKafka(String topic, String key, String value) - sendToKafkaAsync: void sendToKafkaAsync(String topic, String key, String value)
- sendToKafkaAsync: void sendToKafkaAsync(String topic, String key, String value,Callback callback)
- recvFromKafka: ArrayList
> recvFromKafka(String topic, String groupId) - recvFromKafkaByOffset: ArrayList
> recvFromKafkaByOffset(String topic, String groupId,int partition,long offset) - recvFromKafkaByTimestamp: ArrayList
> recvFromKafkaByTimestamp(String topic, String groupId,int partition,long timestamp) - resetOffsetToTimestamp: boolean resetOffsetToTimestamp(String topic, String groupId, long timestamp)
- resetOffsetToEarliest: boolean resetOffsetToEarliest(String topic, String groupId)
- resetOffsetToLatest: boolean resetOffsetToLatest(String topic, String groupId)
- consumerPositions: List
> consumerPositions(String topic, String groupId) - topicSize: List
> topicSize(String topic) - topicSizeAll: LinkedHashMap
topicSizeAll() - topicSizeStatistics: LinkedHashMap
topicSizeStatistics(String topic) - topicSizeStatisticsAll: LinkedHashMap
topicSizeStatisticsALL()
示范应用:
为了说明该工具的效用,基于该工具实现了一个HTTP接口的消息队列服务,该服务只用了几十行代码,就实现了基于标签内容的发布订阅服务,服务见APIKafka.java,客户端示例见ClientKafka.java。
该服务支持生产者任意标注标签,支持消费者按表达式条件订阅数据,表达式支持与或非,支持集合查找,以及字符串子串匹配。
同时也支持消息回溯消费已经消息统计查询。
实现了流式消息检索的基本需求。
APIKafka,支持生产者任意标注标签,标签是开放的,可以是任意JSON,Key无需预先定义和Value也不必是枚举值,支持消费者按表达式条件订阅数据,支持开源表达式语言OGNL,包括支持与或非,支持对象取值,支持数组和集合的访问,也支持Java表达式,常用的有contains,startsWith,endsWith,length等,也支持matches正则匹配。可以满足流式消息检索的各种匹配要求。
联系人:
有问题可以联系:zhangchuang@iie.ac.cn
猜你喜欢
- 5天前(兰州旅游文化产业发展有限公司)甘肃省兰州市2023年乡村旅游暨A级旅游景区管理工作培训班开班
- 5天前(希尔顿2021活动)希尔顿集团618盛夏大促开启
- 5天前(2020海丝之路文化博览会)2023海丝之路文化和旅游博览会开幕
- 5天前(瑞士大酒店-自助餐怎么样)瑞意心旅,以食为先 瑞士酒店开启全新"瑞士早餐计划"
- 5天前(东北地区全域旅游)东北三省一区宣传贯彻研学旅游行业标准
- 5天前(甘肃文化旅游宣传片)甘肃文旅推介走进重庆
- 5天前(曼谷丽思卡尔顿公寓价格)曼谷丽思卡尔顿酒店盛大启幕,开创泰国奢华雅致新纪元
- 5天前(当科学邂逅喜剧:科技馆喜剧嘉年华背后的"文旅破壁者")当科学邂逅喜剧:科技馆喜剧嘉年华背后的"文旅破壁者"
- 5天前(中国旅游集团旗下酒店)中国旅游集团酒店控股有限公司战略投资雅阁酒店集团
- 5天前(锦州新增两家国家aaa级旅游景区有哪些)锦州新增两家国家AAA级旅游景区
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章