上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

ELK日志收集平台部署(kafka)

guduadmin11天前

正文:ELK日志收集平台部署

Kafka和zookeeper简介

Kafka:

数据缓冲队列。作为消息队列解耦合处理过程,同时提高了可扩展性。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

Kafka的特性:

  • 高吞吐量:kafka每秒可以处理几十万条消息。

  • 可扩展性:kafka集群支持热扩展

  • 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

  • 高并发:支持数千个客户端同时读写

    它主要包括以下组件

    话题(Topic):是特定类型的消息流。(每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。) 生产者(Producer):是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务). 消费者(Consumer):可以订阅一个或多个话题,从而消费这些已发布的消息。 服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。 zookeeper:kafka 通过 zookeeper 来存储集群的信息。

    ZooKeeper的特性:

    ZooKeeper是一个分布式协调服务,Kafka的运行依赖ZooKeeper。ZooKeeper主要用来协调Kafka的各个broker,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。

    在Kafka中集群中broker的分布情况与消费者当前消费的状态信息都会保存在ZooKeeper中。

    =========================================================================

    系统类型:Centos7

    节点IP:192.168.246.234,192.168.246.231、192.168.246.235

    软件版本:jdk-8u121-linux-x64.tar.gz、kafka_2.11-2.1.0.tgz

    示例节点:172.16.246.231

    1.安装配置jdk8

    1)Kafka、Zookeeper(简称:ZK)运行依赖jdk8

    tar zxvf /usr/local/package/jdk-8u121-linux-x64.tar.gz -C /usr/local/
    echo '
    JAVA_HOME=/usr/local/jdk1.8.0_121
    PATH=$JAVA_HOME/bin:$PATH
    export JAVA_HOME PATH
    ' >>/etc/profile
    source /etc/profile

     2.安装配置ZK

    Kafka运行依赖ZK,Kafka官网提供的tar包中,已经包含了ZK,这里不再额下载ZK程序。

    配置相互解析---三台机器

    [root@es-2-zk-log ~]# vim /etc/hosts

    192.168.246.234 mes-1

    192.168.246.231 es-2-zk-log

    192.168.246.235 es-3-head-kib

    1)安装

    [root@es-2-zk-log ~]# tar xzvf kafka_2.11-2.1.0.tgz -C /usr/local/

    2)配置

    [root@mes-1 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties

    [root@mes-1 ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties  #添加如下配置

    dataDir=/opt/data/zookeeper/data
    dataLogDir=/opt/data/zookeeper/logs
    clientPort=2181
    tickTime=2000
    initLimit=20
    syncLimit=10
    server.1=192.168.246.231:2888:3888             //kafka集群IP:Port
    server.2=192.168.246.234:2888:3888
    server.3=192.168.246.235:2888:3888

    #创建data、log目录

    [root@mes-1 ~]# mkdir -p /opt/data/zookeeper/{data,logs}

    #创建myid文件

    [root@mes-1 ~]# echo 1 > /opt/data/zookeeper/data/myid     #myid号按顺序排

    [root@es-2-zk-log ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties

    [root@es-2-zk-log ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties

    dataDir=/opt/data/zookeeper/data
    dataLogDir=/opt/data/zookeeper/logs
    clientPort=2181
    tickTime=2000
    initLimit=20
    syncLimit=10
    server.1=192.168.246.231:2888:3888
    server.2=192.168.246.234:2888:3888
    server.3=192.168.246.235:2888:3888

    #创建data、log目录

    [root@es-2-zk-log ~]# mkdir -p /opt/data/zookeeper/{data,logs}

    #创建myid文件

    [root@es-2-zk-log ~]# echo 2 > /opt/data/zookeeper/data/myid

    [root@es-3 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties

    [root@es-3-head-kib ~]# vim /usr/local/kafka_2.11-2.1.0/config/zookeeper.properties

    dataDir=/opt/data/zookeeper/data
    dataLogDir=/opt/data/zookeeper/logs
    clientPort=2181
    tickTime=2000
    initLimit=20
    syncLimit=10
    server.1=192.168.246.231:2888:3888
    server.2=192.168.246.234:2888:3888
    server.3=192.168.246.235:2888:3888

    #创建data、log目录

    [root@es-3-head-kib ~]# mkdir -p /opt/data/zookeeper/{data,logs}

    #创建myid文件

    [root@es-3-head-kib ~]# echo 3 > /opt/data/zookeeper/data/myid

    3.配置Kafka

    1)配置

    节点1:

    [root@mes-1 ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties

    [root@mes-1 ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties  #在最后添加

    broker.id=1
    listeners=PLAINTEXT://192.168.246.231:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/data/kafka/logs
    num.partitions=6
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=2
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=536870912
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0

    [root@mes-1 ~]# mkdir -p /opt/data/kafka/logs

    节点2:

    [root@es-2-zk-log ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties

    [root@es-2-zk-log ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties

    broker.id=2
    listeners=PLAINTEXT://192.168.246.234:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/data/kafka/logs
    num.partitions=6
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=2
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=536870912
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0

    [root@es-2-zk-log ~]# mkdir -p /opt/data/kafka/logs

    节点3:

    [root@es-3-head-kib ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka_2.11-2.1.0/config/server.properties

    [root@es-3-head-kib ~]# vim /usr/local/kafka_2.11-2.1.0/config/server.properties

    broker.id=3
    listeners=PLAINTEXT://192.168.246.235:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/opt/data/kafka/logs
    num.partitions=6
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=2
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=536870912
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.246.231:2181,192.168.246.234:2181,192.168.246.235:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0

    [root@es-3-head-kib ~]# mkdir -p /opt/data/kafka/logs

    4、其他节点配置

    只需把配置好的安装包直接分发到其他节点,Kafka的broker.id和listeners就可以了。

    5、启动、验证ZK集群

    1)启动

    在三个节点依次执行:

    [root@mes-1 ~]# cd /usr/local/kafka_2.11-2.1.0/

    [root@mes-1 kafka_2.11-2.1.0]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

    2)验证

    查看端口

    [root@mes-1 ~]# netstat -lntp | grep 2181

    tcp6       0      0 :::2181                 :::*                    LISTEN      1226/java

    6、启动、验证Kafka

    1)启动

    在三个节点依次执行:

    [root@mes-1 ~]# cd /usr/local/kafka_2.11-2.1.0/

    [root@mes-1 kafka_2.11-2.1.0]# nohup bin/kafka-server-start.sh config/server.properties &

    2)验证

    在192.168.246.231上创建topic

    [root@es-2-zk-log kafka_2.11-2.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

    Created topic "testtopic".

    在246.235上面查询192.168.246.231上的topic

    [root@es-3-head-kib kafka_2.11-2.1.0]# bin/kafka-topics.sh --zookeeper 192.168.246.231:2181 --list

    testtopic

    模拟消息生产和消费 发送消息到192.168.246.231

    [root@mes-1 kafka_2.11-2.1.0]# bin/kafka-console-producer.sh --broker-list 192.168.246.231:9092 --topic testtopic

    >hello

     

    ​从192.168.246.234接受消息

    [root@es-2-zk-log kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server  192.168.246.234:9092 --topic testtopic --from-beginning

    hello

    kafka配置完成

    kafka没有问题之后,回到logstash服务器:

    #安装完kafka之后的操作:

    [root@es-2-zk-log ~]# cd /usr/local/logstash-6.5.4/etc/conf.d/

    [root@es-2-zk-log conf.d]# cp input.conf input.conf.bak

    [root@es-2-zk-log conf.d]# vim input.conf

    input {
    kafka {               #指定kafka服务
       type => "nginx_log"
       codec => "json"        #通用选项,用于输入数据的编解码器
       topics => "nginx"        #这里定义的topic
       decorate_events => true  #会将当前topic信息也带到message中
       bootstrap_servers => "192.168.246.234:9092, 192.168.246.231:9092, 192.168.246.235:9092"
      }
    }  

    启动 logstash

    [root@es-2-zk-log conf.d]# cd /usr/local/logstash-6.5.4/

    [root@es-2-zk-log logstash-6.5.4]# nohup bin/logstash -f etc/conf.d/  --config.reload.automatic &

网友评论

搜索
最新文章
热门文章
热门标签