正文:ELK日志收集平台部署
Kafka和zookeeper简介
Kafka:
数据缓冲队列。作为消息队列解耦合处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
Kafka的特性:
-
高吞吐量:kafka每秒可以处理几十万条消息。
-
可扩展性:kafka集群支持热扩展
-
可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
-
高并发:支持数千个客户端同时读写
它主要包括以下组件
话题(Topic):是特定类型的消息流。(每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。) 生产者(Producer):是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务). 消费者(Consumer):可以订阅一个或多个话题,从而消费这些已发布的消息。 服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。 zookeeper:kafka 通过 zookeeper 来存储集群的信息。
ZooKeeper的特性:
ZooKeeper是一个分布式协调服务,Kafka的运行依赖ZooKeeper。ZooKeeper主要用来协调Kafka的各个broker,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。
在Kafka中集群中broker的分布情况与消费者当前消费的状态信息都会保存在ZooKeeper中。
=========================================================================
系统类型:Centos7
节点IP:192.168.246.234,192.168.246.231、192.168.246.235
软件版本:jdk-8u121-linux-x64.tar.gz、kafka_2.11-2.1.0.tgz
示例节点:172.16.246.231
1.安装配置jdk8
1)Kafka、Zookeeper(简称:ZK)运行依赖jdk8
tar zxvf /usr/local/package/jdk-8u121-linux-x64.tar.gz -C /usr/local/
echo ' JAVA_HOME=/usr/local/jdk1.8.0_121 PATH=$JAVA_HOME/bin:$PATH export JAVA_HOME PATH ' >>/etc/profile source /etc/profile
2.安装配置ZK
Kafka运行依赖ZK,Kafka官网提供的tar包中,已经包含了ZK,这里不再额下载ZK程序。
配置相互解析---三台机器
[root@es-2-zk-log ~]# vim /etc/hosts
192.168.246.234 mes-1
192.168.246.231 es-2-zk-log
192.168.246.235 es-3-head-kib
1)安装
[root@es-2-zk-log ~]# tar xzvf kafka_2.11-2.1.0.tgz -C /usr/local/
2)配置
[root@mes-1 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
[root@mes-1 ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties #添加如下配置
dataDir=/opt/data/zookeeper/data dataLogDir=/opt/data/zookeeper/logs clientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.246.231:2888:3888 //kafka集群IP:Port server.2=192.168.246.234:2888:3888 server.3=192.168.246.235:2888:3888
#创建data、log目录
[root@mes-1 ~]# mkdir -p /opt/data/zookeeper/{data,logs}
#创建myid文件
[root@mes-1 ~]# echo 1 > /opt/data/zookeeper/data/myid #myid号按顺序排
[root@es-2-zk-log ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
[root@es-2-zk-log ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data dataLogDir=/opt/data/zookeeper/logs clientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.246.231:2888:3888 server.2=192.168.246.234:2888:3888 server.3=192.168.246.235:2888:3888
#创建data、log目录
[root@es-2-zk-log ~]# mkdir -p /opt/data/zookeeper/{data,logs}
#创建myid文件
[root@es-2-zk-log ~]# echo 2 > /opt/data/zookeeper/data/myid
[root@es-3 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
[root@es-3-head-kib ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data dataLogDir=/opt/data/zookeeper/logs clientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.246.231:2888:3888 server.2=192.168.246.234:2888:3888 server.3=192.168.246.235:2888:3888
#创建data、log目录
[root@es-3-head-kib ~]# mkdir -p /opt/data/zookeeper/{data,logs}
#创建myid文件
[root@es-3-head-kib ~]# echo 3 > /opt/data/zookeeper/data/myid
3.配置Kafka
1)配置
节点1:
[root@mes-1 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties
[root@mes-1 ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties #在最后添加
broker.id=1 listeners=PLAINTEXT://192.168.246.231:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/data/kafka/logs num.partitions=6 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
[root@mes-1 ~]# mkdir -p /opt/data/kafka/logs
节点2:
[root@es-2-zk-log ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties
[root@es-2-zk-log ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties
broker.id=2 listeners=PLAINTEXT://192.168.246.234:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/data/kafka/logs num.partitions=6 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
[root@es-2-zk-log ~]# mkdir -p /opt/data/kafka/logs
节点3:
[root@es-3-head-kib ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties
[root@es-3-head-kib ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties
broker.id=3 listeners=PLAINTEXT://192.168.246.235:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/data/kafka/logs num.partitions=6 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
[root@es-3-head-kib ~]# mkdir -p /opt/data/kafka/logs
4、其他节点配置
只需把配置好的安装包直接分发到其他节点,Kafka的broker.id和listeners就可以了。
5、启动、验证ZK集群
1)启动
在三个节点依次执行:
[root@mes-1 ~]# cd /usr/local/kafka_2.11-2.1.0/
[root@mes-1 kafka_2.11-2.1.0]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
2)验证
查看端口
[root@mes-1 ~]# netstat -lntp | grep 2181
tcp6 0 0 :::2181 :::* LISTEN 1226/java
6、启动、验证Kafka
1)启动
在三个节点依次执行:
[root@mes-1 ~]# cd /usr/local/kafka_2.11-2.1.0/
[root@mes-1 kafka_2.11-2.1.0]# nohup bin/kafka-server-start.sh config/server.properties &
2)验证
在192.168.246.231上创建topic
[root@es-2-zk-log kafka_2.11-2.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
Created topic "testtopic".
在246.235上面查询192.168.246.231上的topic
[root@es-3-head-kib kafka_2.11-2.1.0]# bin/kafka-topics.sh --zookeeper 192.168.246.231:2181 --list
testtopic
模拟消息生产和消费 发送消息到192.168.246.231
[root@mes-1 kafka_2.11-2.1.0]# bin/kafka-console-producer.sh --broker-list 192.168.246.231:9092 --topic testtopic
>hello
从192.168.246.234接受消息
[root@es-2-zk-log kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.246.234:9092 --topic testtopic --from-beginning
hello
kafka配置完成
kafka没有问题之后,回到logstash服务器:
#安装完kafka之后的操作:
[root@es-2-zk-log ~]# cd /usr/local/logstash-6.5.4/etc/conf.d/
[root@es-2-zk-log conf.d]# cp input.conf input.conf.bak
[root@es-2-zk-log conf.d]# vim input.conf
input { kafka { #指定kafka服务 type => "nginx_log" codec => "json" #通用选项,用于输入数据的编解码器 topics => "nginx" #这里定义的topic decorate_events => true #会将当前topic信息也带到message中 bootstrap_servers => "192.168.246.234:9092, 192.168.246.231:9092, 192.168.246.235:9092" } }
启动 logstash
[root@es-2-zk-log conf.d]# cd /usr/local/logstash-6.5.4/
[root@es-2-zk-log logstash-6.5.4]# nohup bin/logstash -f etc/conf.d/ --config.reload.automatic &
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章