上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Zookeeper基础操作

guduadmin13小时前

搭建Zookeeper服务器

windows下部署

下载地址: https://mirrors.cloud.tencent.com/apache/zookeeper/zookeeper-3.7.1/

修改配置文件

Zookeeper基础操作,在这里插入图片描述,第1张

  • 打开conf目录,将 zoo_sample.cfg复制一份,命名为 zoo.cfg
  • 打开 zoo.cfg,修改 dataDir路径,新增日志 dataLogDir路径

    dataDir=…/data

    dataLogDir=…/log

    zoo.cfg 配置文件说明

     # zookeeper时间配置中的基本单位 (毫秒)
     tickTime=2000
     # 允许follower初始化连接到leader最大时长,它表示tickTime时间倍数 即:initLimit*tickTime
     initLimit=10
     # 允许follower与leader数据同步最大时长,它表示tickTime时间倍数 
     syncLimit=5
     #zookeper 数据存储目录及日志保存目录(如果没有指明dataLogDir,则日志也保存在这个文件中)
     dataDir=/tmp/zookeeper
     #对客户端提供的端口号
     clientPort=2181
     #单个客户端与zookeeper最大并发连接数
     maxClientCnxns=60
     # 保存的数据快照数量,之外的将会被清除
     autopurge.snapRetainCount=3
     #自动触发清除任务时间间隔,小时为单位。默认为0,表示不自动清除。
     autopurge.purgeInterval=1
    

    启动Zookeeper

    Zookeeper基础操作,在这里插入图片描述,第2张

    linux下部署

    前提:由于zookeeper是使用java语言开发的,所以,在安装zookeeper之前务必先在本机安装配置好java环境!

    • 上传zookeeper

      Zookeeper基础操作,在这里插入图片描述,第3张

    • 解压zookeeper

      Zookeeper基础操作,在这里插入图片描述,第4张

    • 配置conf

      与windows的差不多

      #zookeeper内部的基本单位,单位是毫秒,这个表示一个tickTime为2000毫秒,在zookeeper的其他配置中,都是基于tickTime来做换算的

      tickTime=2000

      #集群中的follower服务器(F)与leader服务器(L)之间 初始连接 时能容忍的最多心跳数(tickTime的数量)。

      initLimit=10

      #syncLimit:集群中的follower服务器(F)与leader服务器(L)之间 请求和应答 之间能容忍的最多心跳数(tickTime的数量)

      syncLimit=5

      #数据存放文件夹,zookeeper运行过程中有两个数据需要存储,一个是快照数据(持久化数据)另一个是事务日志

      dataDir=/tmp/zookeeper

      #客户端访问端口

      clientPort=2181

      配置环境变量

      vim /etc/profile

      export ZOOKEEPER_PREFIX=/root/software/apache-zookeeper-3.7.1-bin
      export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
      

      执行下面的命令,使配置生效

      source profile
      

      启动服务

      zkServer.sh start
      

      可以看到我们的zkServer以及启动好了。

      可以查看下启动状态:

      zkServer.sh status
      

      客户端连接

      zkCli.sh
      

      根目录下有一个自带的/zookeeper子节点,它来保存Zookeeper的配额管理信息,不要轻易删除。

      Zookeeper基础操作,在这里插入图片描述,第5张

      Zookeeper命令操作

      Zookeeper 数据模型

      ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。

      Zookeeper基础操作,在这里插入图片描述,第6张

      Zookeeper这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。

      Zookeeper基础操作,在这里插入图片描述,第7张

      节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。

      节点可以分为四大类:

      • PERSISTENT 持久化节点
      • EPHEMERAL 临时节点 :-e
      • PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
      • EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

        Zookeeper服务端常用命令

        Zookeeper基础操作,在这里插入图片描述,第8张

        •启动 ZooKeeper 服务

        ./zkServer.sh start
        

        •查看 ZooKeeper 服务状态

        ./zkServer.sh status
        

        •停止 ZooKeeper 服务

        ./zkServer.sh stop 
        

        •重启 ZooKeeper 服务

        ./zkServer.sh restart 
        

        Zookeeper客户端常用命令

        基本CRUD

        • 连接Zookeeper客户端
          # 本地连接
          zkCli.sh
          # 远程连接
          zkCli.sh -server ip:2181
          
          • 断开连接
            quit
            
            • 查看命令帮助
              help
              
              • 显示制定目录下节点
                # ls 目录
                ls /
                
                • 创建节点
                  # create /节点path value
                  [zk: localhost:2181(CONNECTED) 0] ls /
                  [zookeeper]
                  [zk: localhost:2181(CONNECTED) 1] create /app1 yuyang123
                  Created /app1
                  [zk: localhost:2181(CONNECTED) 2] ls /
                  [app1, zookeeper]
                  [zk: localhost:2181(CONNECTED) 3] create /app2
                  Created /app2
                  [zk: localhost:2181(CONNECTED) 4] ls /
                  [app1, app2, zookeeper]
                  
                  • 获取节点值
                    # get /节点path
                    [zk: localhost:2181(CONNECTED) 15] get /app1
                    yuyang123
                    [zk: localhost:2181(CONNECTED) 16] get /app2
                    null
                    
                    • 设置节点值
                      # set /节点path value
                      [zk: localhost:2181(CONNECTED) 17] set /app2 yuyang456
                      [zk: localhost:2181(CONNECTED) 18] get /app2
                      yuyang456
                      
                      • 删除单个节点
                        # delete /节点path
                        [zk: localhost:2181(CONNECTED) 19] delete /app2
                        [zk: localhost:2181(CONNECTED) 20] get /app2
                        Node does not exist: /app2
                        [zk: localhost:2181(CONNECTED) 21] ls /
                        [app1, zookeeper]
                        
                        • 删除带有子节点的节点
                          # deleteall /节点path
                          [zk: localhost:2181(CONNECTED) 22] create /app1
                          Node already exists: /app1
                          [zk: localhost:2181(CONNECTED) 23] create /app1/p1
                          Created /app1/p1
                          [zk: localhost:2181(CONNECTED) 24] create /app1/p2
                          Created /app1/p2
                          [zk: localhost:2181(CONNECTED) 25] delete /app1
                          Node not empty: /app1
                          [zk: localhost:2181(CONNECTED) 26] deleteall /app1
                          [zk: localhost:2181(CONNECTED) 27] ls /
                          [zookeeper]
                          

                          创建临时&顺序节点

                          • 创建临时节点 (-e)
                            • 临时节点是在会话结束后,自动被删除的
                              # create -e /节点path value
                              [zk: localhost:2181(CONNECTED) 29] create -e /app1 yuyang123
                              Created /app1
                              [zk: localhost:2181(CONNECTED) 30] get /app1
                              yuyang123
                              [zk: localhost:2181(CONNECTED) 31] quit
                              # 退出后再次连接,临时节点已经删除
                              [zk: localhost:2181(CONNECTED) 0] ls /
                              [zookeeper]
                              
                              • 创建顺序节点 (-s)
                                • 创建出的节点,根据先后顺序,会在节点之后带上一个数值,越后执行数值越大,适用于分布式锁的应用场景- 单调递增.
                                  # create -s /节点path value
                                  [zk: localhost:2181(CONNECTED) 0] ls /
                                  [zookeeper]
                                  [zk: localhost:2181(CONNECTED) 1] create -s /app2
                                  Created /app20000000003
                                  [zk: localhost:2181(CONNECTED) 2] ls /
                                  [app20000000003, zookeeper]
                                  [zk: localhost:2181(CONNECTED) 3] create -s /app2 
                                  Created /app20000000004
                                  [zk: localhost:2181(CONNECTED) 4] ls /
                                  [app20000000003, app20000000004, zookeeper]
                                  [zk: localhost:2181(CONNECTED) 5] create -s /app2 
                                  Created /app20000000005
                                  [zk: localhost:2181(CONNECTED) 6] ls /
                                  [app20000000003, app20000000004, app20000000005, zookeeper]
                                  # 创建临时顺序节点
                                  [zk: localhost:2181(CONNECTED) 7] create -es /app3
                                  Created /app30000000006
                                  [zk: localhost:2181(CONNECTED) 8] ls /
                                  [app20000000003, app20000000004, app20000000005, app30000000006, zookeeper]
                                  # 退出
                                  [zk: localhost:2181(CONNECTED) 9] quit
                                  # 重新链接,临时顺序节点已经被删除
                                  [zk: localhost:2181(CONNECTED) 0] ls /
                                  [app20000000003, app20000000004, app20000000005, zookeeper]
                                  
                                  • 查询节点详细信息
                                    # ls –s /节点path 
                                    [zk: localhost:2181(CONNECTED) 5] ls / -s
                                    [app20000000003, app20000000004, app20000000005, zookeeper]
                                    cZxid = 0x0
                                    ctime = Thu Jan 01 08:00:00 CST 1970
                                    mZxid = 0x0
                                    mtime = Thu Jan 01 08:00:00 CST 1970
                                    pZxid = 0x14
                                    cversion = 10
                                    dataVersion = 0
                                    aclVersion = 0
                                    ephemeralOwner = 0x0
                                    dataLength = 0
                                    numChildren = 4
                                    
                                    • czxid:节点被创建的事务ID
                                    • ctime: 创建时间
                                    • mzxid: 最后一次被更新的事务ID
                                    • mtime: 修改时间
                                    • pzxid:子节点列表最后一次被更新的事务ID
                                    • cversion:子节点的版本号
                                    • dataversion:数据版本号
                                    • aclversion:权限版本号
                                    • ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
                                    • dataLength:节点存储的数据的长度
                                    • numChildren:当前节点的子节点个数

                                      Zookeeper JavaAPI操作

                                      Curator介绍

                                      Curator是Netflix公司开源的一套zookeeper客户端框架,Curator是对Zookeeper支持最好的客户端框架。Curator封装了大部分Zookeeper的功能,比如Leader选举、分布式锁等,减少了技术人员在使用Zookeeper时的底层细节开发工作。

                                      Curator框架主要解决了三类问题:

                                      • 封装ZooKeeper Client与ZooKeeper Server之间的连接处理(提供连接重试机制等)。
                                      • 提供了一套Fluent风格的API,并且在Java客户端原生API的基础上进行了增强(创捷多层节点、删除多层节点等)。
                                      • 提供ZooKeeper各种应用场景(分布式锁、leader选举、共享计数器、分布式队列等)的抽象封装。

                                        引入Curator

                                        
                                                
                                                    org.apache.curator
                                                    curator-framework
                                                    4.0.0
                                                
                                                
                                                    org.apache.curator
                                                    curator-recipes
                                                    4.0.0
                                                
                                                
                                                
                                                    org.slf4j
                                                    slf4j-api
                                                    1.7.21
                                                
                                                
                                                    org.slf4j
                                                    slf4j-log4j12
                                                    1.7.21
                                                
                                        

                                        建立连接

                                        方式1

                                        public class CuratorTest {
                                            /**
                                             * 建立连接
                                             */
                                            @Test
                                            public void testConnect(){
                                                /**
                                                 * String connectString     连接字符串。 zk地址和端口: "192.168.58.100:2181,192.168.58.101:2181"
                                                 * int sessionTimeoutMs     会话超时时间 单位ms
                                                 * int connectionTimeoutMs  连接超时时间 单位ms
                                                 * RetryPolicy retryPolicy  重试策略
                                                 */
                                                //1. 第一种方式
                                                //重试策略 baseSleepTimeMs 重试之间等待的初始时间,maxRetries 重试的最大次数
                                                RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
                                                CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.58.100:2181", 60 * 1000,
                                                        15 * 1000, retryPolicy);
                                                //开启连接
                                                client.start();
                                            }
                                        }
                                        

                                        重试策略

                                        • RetryNTimes: 重试没有次数限制
                                        • RetryOneTime:只重试没有次数限制,一般也不常用
                                        • ExponentialBackoffRetry: 只重试一次的重试策略

                                          方式2

                                          public class CuratorTest {
                                              private CuratorFramework client;
                                              /**
                                               * 建立连接
                                               */
                                              @Test
                                              public void testConnect(){
                                                  /**
                                                   * String connectString     连接字符串。 zk地址和端口: "192.168.58.100:2181,192.168.58.101:2181"
                                                   * int sessionTimeoutMs     会话超时时间 单位ms
                                                   * int connectionTimeoutMs  连接超时时间 单位ms
                                                   * RetryPolicy retryPolicy  重试策略
                                                   */
                                                  //1. 第一种方式
                                                  //重试策略 baseSleepTimeMs 重试之间等待的初始时间,maxRetries 重试的最大次数
                                                  RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
                                          //      client   = CuratorFrameworkFactory.newClient("192.168.58.100:2181", 60 * 1000,
                                          //                15 * 1000, retryPolicy);
                                                  //2. 第二种方式,建造者方式创建
                                                  client = CuratorFrameworkFactory.builder()
                                                          .connectString("192.168.58.100:2181")
                                                          .sessionTimeoutMs(60*1000)
                                                          .connectionTimeoutMs(15 * 1000)
                                                          .retryPolicy(retryPolicy)
                                                          .namespace("yuyang")  //根节点名称设置
                                                          .build();
                                                  //开启连接
                                                  client.start();
                                              }
                                          }
                                          
                                          • 添加节点

                                            修改testConnect注解,@Before

                                             /**
                                                 * 建立连接
                                                 */
                                                @Before
                                                public void testConnect()
                                            

                                            创建节点:create 持久 临时 顺序 数据

                                            public class CuratorTest {
                                                /**
                                                 * 创建节点 create 持久 临时 顺序 数据
                                                 */
                                                //1.创建节点
                                                @Test
                                                public void testCreate1() throws Exception {
                                                    // 如果没有创建节点,没有指定数据,则默认将当前客户端的IP 作为数据存储
                                                    String path = client.create().forPath("/app1");
                                                    System.out.println(path);
                                                }
                                                @After
                                                public void close(){
                                                    client.close();
                                                }
                                            }
                                            	//2.创建节点 带有数据
                                                @Test
                                                public void testCreate2() throws Exception {
                                                    String path = client.create().forPath("/app2","hehe".getBytes());
                                                    System.out.println(path);
                                                }
                                                
                                                //3.设置节点类型 默认持久化
                                                @Test
                                                public void testCreate3() throws Exception {
                                                    //设置临时节点
                                                    String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
                                                    System.out.println(path);
                                                }
                                                
                                            //1.查询数据 getData
                                            @Test
                                            public void testGet1() throws Exception {
                                                byte[] data = client.getData().forPath("/app1");
                                                System.out.println(new String(data));
                                            }
                                            //2.查询子节点 getChildren()
                                            @Test
                                            public void testGet2() throws Exception {
                                                List path = client.getChildren().forPath("/");
                                                System.out.println(path);
                                            }
                                            //3.查询节点状态信息
                                            @Test
                                            public void testGet3() throws Exception {
                                                Stat status = new Stat();
                                                System.out.println(status);
                                                //查询节点状态信息: ls -s
                                                client.getData().storingStatIn(status).forPath("/app1");
                                                System.out.println(status);
                                            }
                                            
                                            • 修改节点
                                                  //1. 基本数据修改
                                                  @Test
                                                  public void testSet() throws Exception {
                                                      client.setData().forPath("/app1","hahaha".getBytes());
                                                  }
                                                  //根据版本修改(乐观锁)
                                                  @Test
                                                  public void testSetVersion() throws Exception {
                                                      //查询版本
                                                      Stat status = new Stat();
                                                      //查询节点状态信息: ls -s
                                                      client.getData().storingStatIn(status).forPath("/app1");
                                                      int version = status.getVersion();
                                                      System.out.println(version);  //2
                                                      client.setData().withVersion(version).forPath("/app1","hehe".getBytes());
                                                  }
                                              
                                              • 删除节点
                                                  //1.删除单个节点
                                                    @Test
                                                    public void testDelete1() throws Exception {
                                                        client.delete().forPath("/app4");
                                                    }
                                                    //删除带有子节点的节点
                                                    @Test
                                                    public void testDelete2() throws Exception {
                                                        client.delete().deletingChildrenIfNeeded().forPath("/app4");
                                                    }
                                                    //必须删除成功(超时情况下,重试删除)
                                                    @Test
                                                    public void testDelete3() throws Exception {
                                                        client.delete().guaranteed().forPath("/app2");
                                                    }
                                                    //回调 删除完成后执行
                                                    @Test
                                                    public void testDelete4() throws Exception {
                                                        client.delete().guaranteed().inBackground((curatorFramework, curatorEvent) -> {
                                                            System.out.println("我被删除了");
                                                            System.out.println(curatorEvent);
                                                        }).forPath("/app1");
                                                    }
                                                

                                                Watch事件监听

                                                ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。

                                                ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

                                                Zookeeper基础操作,在这里插入图片描述,第9张

                                                zkCli客户端使用watch

                                                添加 -w 参数可实时监听节点与子节点的变化,并且实时收到通知。非常适用保障分布式情况下的数据一至性。

                                                其使用方式如下

                                                命令描述
                                                ls -w path监听子节点的变化(增,删) [监听目录]
                                                get -w path监听节点数据的变化
                                                stat -w path监听节点属性的变化

                                                Zookeeper事件类型

                                                • NodeCreated: 节点创建
                                                • NodeDeleted: 节点删除
                                                • NodeDataChanged:节点数据变化
                                                • NodeChildrenChanged:子节点列表变化
                                                • DataWatchRemoved:节点监听被移除
                                                • ChildWatchRemoved:子节点监听被移除

                                                  1)get -w path 监听节点数据变化

                                                  2) ls -w /path 监听子节点的变化(增,删) [监听目录]

                                                  3) ls -R -w /path 例子二 循环递归的监听

                                                  curator客户端使用watch

                                                  ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐。

                                                  Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

                                                  ZooKeeper提供了三种Watcher:

                                                  • NodeCache : 只是监听某一个特定的节点
                                                  • PathChildrenCache : 监控一个ZNode的子节点.
                                                  • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

                                                    1)watch监听 NodeCache

                                                    public class CuratorWatchTest {
                                                    		/**
                                                         * 演示 NodeCache : 给指定一个节点注册监听
                                                         */
                                                        @Test
                                                        public void testNodeCache() throws Exception {
                                                            //1. 创建NodeCache对象
                                                            NodeCache nodeCache = new NodeCache(client, "/app1");  //监听的是 /yuyang和其子目录app1
                                                            //2. 注册监听
                                                            nodeCache.getListenable().addListener(new NodeCacheListener() {
                                                                @Override
                                                                public void nodeChanged() throws Exception {
                                                                    System.out.println("节点变化了。。。。。。");
                                                                    //获取修改节点后的数据
                                                                    byte[] data = nodeCache.getCurrentData().getData();
                                                                    System.out.println(new String(data));
                                                                }
                                                            });
                                                            //3. 设置为true,开启监听
                                                            nodeCache.start(true);
                                                            while(true){
                                                            }
                                                        } 
                                                    }
                                                    

                                                    2)watch监听 PathChildrenCache

                                                        /**
                                                         * 演示 PathChildrenCache: 监听某个节点的所有子节点
                                                         */
                                                        @Test
                                                        public void testPathChildrenCache() throws Exception {
                                                            //1.创建监听器对象 (第三个参数表示缓存每次节点更新后的数据)
                                                            PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
                                                            //2.绑定监听器
                                                            pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                                                                @Override
                                                                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                                                                    System.out.println("子节点发生变化了。。。。。。");
                                                                    System.out.println(pathChildrenCacheEvent);
                                                                    if(PathChildrenCacheEvent.Type.CHILD_UPDATED == pathChildrenCacheEvent.getType()){
                                                                        //更新子节点
                                                                        System.out.println("子节点更新了!");
                                                                        //在一个getData中有很多数据,我们只拿data部分
                                                                        byte[] data = pathChildrenCacheEvent.getData().getData();
                                                                        System.out.println("更新后的值为:" + new String(data));
                                                                    }else if(PathChildrenCacheEvent.Type.CHILD_ADDED == pathChildrenCacheEvent.getType()){
                                                                        //添加子节点
                                                                        System.out.println("添加子节点!");
                                                                        String path = pathChildrenCacheEvent.getData().getPath();
                                                                        System.out.println("子节点路径为: " + path);
                                                                    }else if(PathChildrenCacheEvent.Type.CHILD_REMOVED == pathChildrenCacheEvent.getType()){
                                                                        //删除子节点
                                                                        System.out.println("删除了子节点");
                                                                        String path = pathChildrenCacheEvent.getData().getPath();
                                                                        System.out.println("子节点路径为: " + path);
                                                                    }
                                                                }
                                                            });
                                                            //3. 开启
                                                            pathChildrenCache.start();
                                                            while(true){
                                                            }
                                                        }
                                                    

                                                    事件对象信息分析

                                                    PathChildrenCacheEvent{
                                                    	type=CHILD_UPDATED, 
                                                    	data=ChildData
                                                    	{
                                                    		path='/app2/m1', 
                                                    		stat=164,166,1670114647087,1670114698259,1,0,0,0,3,0,164, 
                                                    		data=[49, 50, 51]
                                                    	}
                                                    }
                                                    

                                                    Zookeeper基础操作,在这里插入图片描述,第10张

                                                    3)watch监听 TreeCache

                                                    TreeCache相当于NodeCache(只监听当前结点)+ PathChildrenCache(只监听子结点)的结合版,即监听当前和子结点。

                                                      /**
                                                         * 演示 TreeCache: 监听某个节点的所有子节点
                                                         */
                                                        @Test
                                                        public void testCache() throws Exception {
                                                            //1.创建监听器对象
                                                            TreeCache treeCache = new TreeCache(client, "/app2");
                                                            //2.绑定监听器
                                                            treeCache.getListenable().addListener(new TreeCacheListener() {
                                                                @Override
                                                                public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                                                                    System.out.println("节点变化了");
                                                                    System.out.println(treeCacheEvent);
                                                                    if(TreeCacheEvent.Type.NODE_UPDATED == treeCacheEvent.getType()){
                                                                        //更新节点
                                                                        System.out.println("节点更新了!");
                                                                        //在一个getData中有很多数据,我们只拿data部分
                                                                        byte[] data = treeCacheEvent.getData().getData();
                                                                        System.out.println("更新后的值为:" + new String(data));
                                                                    }else if(TreeCacheEvent.Type.NODE_ADDED == treeCacheEvent.getType()){
                                                                        //添加子节点
                                                                        System.out.println("添加节点!");
                                                                        String path = treeCacheEvent.getData().getPath();
                                                                        System.out.println("子节点路径为: " + path);
                                                                    }else if(TreeCacheEvent.Type.NODE_REMOVED == treeCacheEvent.getType()){
                                                                        //删除子节点
                                                                        System.out.println("删除节点");
                                                                        String path = treeCacheEvent.getData().getPath();
                                                                        System.out.println("删除节点路径为: " + path);
                                                                    }
                                                                }
                                                            });
                                                            //3. 开启
                                                            treeCache.start();
                                                            while(true){
                                                            }
                                                        }
                                                    

                                                    一次性监听方式:Watcher

                                                    利用 Watcher 来对节点进行监听操作,可以典型业务场景需要使用可考虑,但一般情况不推荐使用。

                                                    public class CuratorWatchTest {
                                                    	@Autowired
                                                        private CuratorFramework client;
                                                        /**
                                                         * 建立连接
                                                         */
                                                        @Before
                                                        public void testConnect(){
                                                            /**
                                                             * String connectString,  连接字符串 zk地址 端口: "192.168.58.100:2181,,,,"
                                                             * int sessionTimeoutMs,  会话超时时间
                                                             * int connectionTimeoutMs,  连接超时时间
                                                             * RetryPolicy retryPolicy   重试策略
                                                             */
                                                            //1. 第一种方式
                                                            RetryPolicy retryPolicy =new ExponentialBackoffRetry(3000,10);
                                                            //2. 第二种方式
                                                            client = CuratorFrameworkFactory.builder()
                                                                    .connectString("192.168.58.100:2181")
                                                                    .sessionTimeoutMs(60*1000)
                                                                    .connectionTimeoutMs(15*1000)
                                                                    .retryPolicy(retryPolicy)
                                                                    .namespace("yuyang")  //当前程序创建目录的根目录
                                                                    .build();
                                                            client.start();
                                                        }
                                                        /**
                                                         * 演示一次性监听
                                                         */
                                                        @Test
                                                        public  void testOneListener() throws Exception {
                                                            byte[] data = client.getData().usingWatcher(new Watcher() {
                                                                @Override
                                                                public void process(WatchedEvent watchedEvent) {
                                                                    System.out.println("监听器 watchedEvent: " + watchedEvent);
                                                                }
                                                            }).forPath("/test");
                                                            System.out.println("监听节点内容:" + new String(data));
                                                            while(true){
                                                            }
                                                        }
                                                        @After
                                                        public void close(){
                                                            client.close();
                                                        }
                                                    }
                                                    

                                                    上面这段代码对 /test 节点注册了一个 Watcher 监听事件,并且返回当前节点的内容。后面进行两次数据变更,实际上第二次变更时,监听已经失效,无法再次获得节点变动事件了

                                                    Curator事件监听机制

                                                    ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐。

                                                    Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

                                                    ZooKeeper提供了三种Watcher:

                                                    • NodeCache : 只是监听某一个特定的节点
                                                    • PathChildrenCache : 监控一个ZNode的子节点.
                                                    • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

                                                      事务&异步操作演示

                                                      CuratorFramework 的实例包含 inTransaction( ) 接口方法,调用此方法开启一个 ZooKeeper 事务。

                                                      可以复合create、 setData、 check、and/or delete 等操作然后调用 commit() 作为一个原子操作提交。

                                                       /**
                                                      	* 事务操作
                                                      	*/	
                                                      @Test
                                                      public void TestTransaction() throws Exception {
                                                        //1. 创建Curator对象,用于定义事务操作
                                                        CuratorOp createOp = client.transactionOp().create().forPath("/app3", "app1-data".getBytes());
                                                        CuratorOp setDataOp = client.transactionOp().setData().forPath("/app2", "app2-data".getBytes());
                                                        CuratorOp deleteOp = client.transactionOp().delete().forPath("/app2");
                                                        //2. 添加事务操
                                                        Collection results = client.transaction().forOperations(createOp, setDataOp, deleteOp);
                                                        //3. 遍历事务操作结果
                                                        for (CuratorTransactionResult result : results) {
                                                          System.out.println(result.getForPath() + " - " + result.getType());
                                                        }
                                                      }
                                                      

                                                      异步操作

                                                      前面提到的增删改查都是同步的,但是 Curator 也提供了异步接口,引入了 BackgroundCallback 接口用于处理异步接口调用之后服务端返回的结果信息。

                                                      BackgroundCallback 接口中一个重要的回调值为 CuratorEvent,里面包含事件类型、响应码和节点的详细信息。

                                                      // 异步操作
                                                          @Test
                                                          public void TestAsync() throws Exception {
                                                              while(true){
                                                                  // 异步获取子节点列表
                                                                  GetChildrenBuilder builder = client.getChildren();
                                                                  builder.inBackground(new BackgroundCallback() {
                                                                      @Override
                                                                      public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                                                                          System.out.println("子节点列表:" + curatorEvent.getChildren());
                                                                      }
                                                                  }).forPath("/");
                                                                  TimeUnit.SECONDS.sleep(5);
                                                              }
                                                          }
                                                      

                                                      Zookeeper权限控制

                                                      zk权限控制介绍

                                                      Zookeeper作为一个分布式协调框架,内部存储了一些分布式系统运行时的状态的数据,比如master选举、比如分布式锁。对这些数据的操作会直接影响到分布式系统的运行状态。因此,为了保证zookeeper中的数据的安全性,避免误操作带来的影响。Zookeeper提供了一套ACL权限控制机制来保证数据的安全。

                                                      ACL权限控制,使用:scheme:id:perm来标识。

                                                      • Scheme(权限模式),标识授权策略
                                                      • ID(授权对象)
                                                      • Permission:授予的权限

                                                        ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限,每个znode支持设置多种权限控制方案和多个权限,子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点。

                                                        Scheme 权限模式

                                                        Zookeeper提供以下权限模式,所谓权限模式,就是使用什么样的方式来进行授权。

                                                        • world: 默认方式,相当于全部都能访问。

                                                        • auth:代表已经认证通过的用户

                                                          cli中可以通过 addauth digest user:pwd 来添加当前上下文中的授权用户

                                                        • digest:即用户名:密码这种方式认证,这也是业务系统中最常用的。

                                                          username:password 字符串来产生一个MD5串,然后该串被用来作为ACL ID。认证是通过明文发送username:password 来进行的,当用在ACL时,表达式为username:base64 ,base64是password的SHA1摘要的编码。

                                                        • ip:通过ip地址来做权限控制

                                                          比如 ip:192.168.1.1 表示权限控制都是针对这个ip地址的。也可以针对网段 ip:192.168.1.1/24,此时addr中的有效位与客户端addr中的有效位进行比对。

                                                          ID 授权对象

                                                          指权限赋予的用户或一个指定的实体,不同的权限模式下,授权对象不同。

                                                          Id ipId = new Id("ip", "192.168.58.100");
                                                          Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
                                                          

                                                          3.4 Permission权限类型

                                                          指通过权限检查后可以被允许的操作,create /delete /read/write/admin

                                                          • Create 允许对子节点Create 操作
                                                          • Read 允许对本节点GetChildren 和GetData 操作
                                                          • Write 允许对本节点SetData 操作
                                                          • Delete 允许对子节点Delete 操作
                                                          • Admin 允许对本节点setAcl 操作

                                                            权限模式(Schema)和授权对象主要用来确认权限验证过程中使用的验证策略:

                                                            比如ip地址、digest:username:password,匹配到验证策略并验证成功后,再根据权限操作类型来决定当前客户端的访问权限。

                                                            在控制台实现操作

                                                            在Zookeeper中提供了ACL相关的命令

                                                            getAcl        getAcl      读取ACL权限
                                                            setAcl        setAcl       设置ACL权限
                                                            addauth      addauth       添加认证用户
                                                            

                                                            1)word方式

                                                            创建一个节点后默认就是world模式

                                                            [zk: localhost:2181(CONNECTED) 6] create /auth
                                                            Created /auth
                                                            [zk: localhost:2181(CONNECTED) 7] getAcl /auth
                                                            'world,'anyone
                                                            : cdrwa
                                                            [zk: localhost:2181(CONNECTED) 8] create /auth2
                                                            Created /auth2
                                                            [zk: localhost:2181(CONNECTED) 9] getAcl /auth2
                                                            'world,'anyone
                                                            : cdrwa
                                                            [zk: localhost:2181(CONNECTED) 10] 
                                                            

                                                            其中, cdrwa,分别对应 create . delete read write admin

                                                            2)IP方式

                                                            在ip模式中,首先连接到zkServer的命令需要使用如下方式

                                                            zkCli.sh -server 127.0.0.1:2181 
                                                            

                                                            接着按照IP的方式操作如下

                                                            [zk: 127.0.0.1:2181(CONNECTED) 0] create /ip-model
                                                            Created /ip-model
                                                            [zk: 127.0.0.1:2181(CONNECTED) 1] setAcl /ip-model ip:127.0.0.1:cdrwa
                                                            [zk: 127.0.0.1:2181(CONNECTED) 3] getAcl /ip-model
                                                            'ip,'127.0.0.1
                                                            : cdrwa
                                                            

                                                            3) Auth模式

                                                            auth模式的操作如下。

                                                            [zk: 127.0.0.1:2181(CONNECTED) 5] create /spike
                                                            Created /spike
                                                            [zk: 127.0.0.1:2181(CONNECTED) 6] addauth digest spike:123456
                                                            [zk: 127.0.0.1:2181(CONNECTED) 9] setAcl /spike auth:spike:cdrwa
                                                            [zk: 127.0.0.1:2181(CONNECTED) 10] getAcl /spike
                                                            'digest,'spike:pPeKgz2N9Xc8Um6wwnzFUMteLxk=
                                                            : cdrwa
                                                            

                                                            当我们退出当前的会话后,再次连接,执行如下操作,会提示没有权限

                                                            [zk: localhost:2181(CONNECTED) 0] get /spike
                                                            Insufficient permission : /spike 
                                                            

                                                            这时候,我们需要重新授权。

                                                            [zk: localhost:2181(CONNECTED) 1] addauth digest spike:123456
                                                            [zk: localhost:2181(CONNECTED) 2] get /spike
                                                            null 
                                                            

                                                            **4) Digest模式 **

                                                            使用语法,会发现使用方式和Auth模式相同

                                                            setAcl /digest digest:用户名:密码:权限
                                                            

                                                            但是有一个不一样的点,密码需要用加密后的,否则无法被识别。

                                                            密码: 用户名和密码加密后的字符串。

                                                            使用下面程序生成密码

                                                            public class TestAcl {
                                                                @Test
                                                                public void createPw() throws NoSuchAlgorithmException {
                                                                    String up = "yuyang:yuyang";
                                                                    byte[] digest = MessageDigest.getInstance("SHA1").digest(up.getBytes());
                                                                    String encodeStr = Base64.getEncoder().encodeToString(digest);
                                                                    System.out.println(encodeStr);
                                                                }
                                                            }
                                                            

                                                            得到: 5FAC7McRhLdx0QUWsfEbK8pqwxc=

                                                            再回到client上进行如下操作

                                                            [zk: localhost:2181(CONNECTED) 14] create /digest
                                                            Created /digest
                                                            [zk: localhost:2181(CONNECTED) 15] setAcl /digest digest:yuyang:5FAC7McRhLdx0QUWsfEbK8pqwxc=:cdrwa
                                                            [zk: localhost:2181(CONNECTED) 16] getAcl /digest
                                                            'digest,'yuyang:5FAC7McRhLdx0QUWsfEbK8pqwxc=: cdrwa
                                                            

                                                            当退出当前会话后,需要再次授权才能访问**/digest**节点

                                                            [zk: localhost:2181(CONNECTED) 0] get /digest
                                                            Insufficient permission : /digest
                                                            [zk: localhost:2181(CONNECTED) 1] addauth digest yuyang:yuyang
                                                            [zk: localhost:2181(CONNECTED) 2] get /digest
                                                            null
                                                            

网友评论

搜索
最新文章
热门文章
热门标签