目录
1.服务器用docker安装kafka
2.springboot集成kafka实现生产者和消费者
1.服务器用docker安装kafka
①、安装docker(docker类似于linux的软件商店,下载所有应用都能从docker去下载)
a、自动安装
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
b、启动docker
sudo systemctl start docker
c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。
// 拉取镜像
sudo docker pull hello-world
// 执行
hello-world sudo docker run hello-world
d、安装成功
②、zookeeper
a、docker search zookeeper
b、docker pull zookeeper
③、安装kafka
a、docker search kafka
b、docker pull wurstmeister/kafka
④、运行zookeeper
a、docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper
⑤、运行kafka
a、 docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
b、参数说明
参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
⑥、检验kafka是否可以使用
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1/
cd bin
a、运行kafka生产者并发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
b、在开一个页面,运行kafka消费者发送消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
⑦、结果是这个样子的
⑧、每条消息都有一个主题,消费者指定监听哪个主题的消息,如果进来消息队列的是我们指定监听的主题,就消费,否则不消费(topic这里指定的生产和消费的主题)
⑨、消费者宕掉了,生产者接着发,消息不会丢,消费者重启之后会重新接收到宕机之后发的所有消息
2.springboot集成kafka实现生产者和消费者
①、在pom中创建依赖
org.springframework.kafka spring-kafka
2.7.8
②、配置kafka
a、在 application.yml 文件中添加以下配置:(注:yml中两个相同名字的会报错,比如两个spring)
spring:
kafka:
#自己的kafka所在的ip地址和端口号
bootstrap-servers: localhost:9092
consumer:
#一个group-id代表一个消费组,一个消息可以被几个消费组消费
group-id: my-group
auto-offset-reset: earliest
producer: #序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
b、创建一个生产者
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public MapproducerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
sendMessage 方法,用于发送消息到 Kafka。
@RestController public class KafkaController { @Autowired private KafkaTemplatekafkaTemplate; @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } }
c、 创建一个消费者
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public MapconsumerConfigs() { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
@KafkaListener 注解声明了一个消费者方法,用于接收从
my-topic 主题中读取的消息
@Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } }
猜你喜欢
- 14天前(兰州旅游文化产业发展有限公司)甘肃省兰州市2023年乡村旅游暨A级旅游景区管理工作培训班开班
- 14天前(2020海丝之路文化博览会)2023海丝之路文化和旅游博览会开幕
- 14天前(兵团猛进秦剧团持续开展“戏曲进校园”活动)兵团猛进秦剧团持续开展“戏曲进校园”活动
- 14天前(2025年“文化和自然遗产日”广东主会场活动举办)2025年“文化和自然遗产日”广东主会场活动举办
- 14天前(甘肃文化旅游宣传片)甘肃文旅推介走进重庆
- 14天前(“为人民绽放——国家艺术基金优秀剧目展演”在合肥开幕)“为人民绽放——国家艺术基金优秀剧目展演”在合肥开幕
- 14天前(新西兰航空官方网站)新西兰航空85周年焕新启航 全方位客舱升级,飞「悦」快意时光
- 14天前(纳米比亚旅游报价)纳米比亚旅游局2024年中国推介会圆满落幕
- 14天前(安岚度假村及酒店推出"山海之约"目的地婚礼计划)安岚度假村及酒店推出"山海之约"目的地婚礼计划
- 14天前(北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
网友评论
- 搜索
- 最新文章
- (2020广州车展哈弗)你的猛龙 独一无二 哈弗猛龙广州车展闪耀登场
- (哈弗新能源suv2019款)智能科技颠覆出行体验 哈弗重塑新能源越野SUV价值认知
- (2021款全新哈弗h5自动四驱报价)新哈弗H5再赴保障之旅,无惧冰雪护航哈弗全民电四驱挑战赛
- (海南航空现况怎样)用一场直播找到市场扩张新渠道,海南航空做对了什么?
- (visa jcb 日本)优惠面面俱到 JCB信用卡邀您畅玩日本冰雪季
- (第三届“堡里有年味·回村过大年”民俗花灯会活动)第三届“堡里有年味·回村过大年”民俗花灯会活动
- (展示非遗魅力 长安启源助力铜梁龙舞出征)展示非遗魅力 长安启源助力铜梁龙舞出征
- (阿斯塔纳航空公司)阿斯塔纳航空机队飞机数量增至50架
- (北京香港航班动态查询)香港快运航空北京大兴新航线今日首航
- (我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉)我在港航“呵护”飞机 每一次安全着陆就是最好的荣誉
- 热门文章