上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

zookeeper

guduadmin09小时前

目录

简介

Zookeeper命令操作

zookeeper数据模型介绍:

Zookeeper服务端命令

Zookeeper客户端命令

Curator

Watch事件监听

ZooKeeper分布式锁原理


简介

概念:zookeeper是一个分布式应用程序的协调服务。

作用:配置管理、分布式锁、集群管理

Zookeeper命令操作

zookeeper数据模型介绍:

zookeeper是一个树形数据结构。每一个节点被称为 ZNode,每个节点会保存自己的数据和节点信息,并允许少量的数据存储到节点下。

节点分为四类:

  • persistent 持久化节点
  • ephemeral 临时节点:-e
  • persistent_sequential 持久化顺序节点:-s
  • ephemeral_sequentia 临时顺序节点: -es

    Zookeeper服务端命令

    • 启动 ZooKeeper 服务: ./zkServer.sh start
    • 查看 ZooKeeper 服务状态: ./zkServer.sh status
    • 停止 ZooKeeper 服务: ./zkServer.sh stop
    • 重启 ZooKeeper 服务: ./zkServer.sh restart

      Zookeeper客户端命令

      • 连接服务端命令            ./zkCli.sh -server ip:port
      • 断开连接命令                quit
      • 查看命令帮助                help
      • 显示指定目录下节点      ls 目录
      • 创建节点                        create /节点path  value
      • 删除节点                        delete /节点path
      • 删除带有子节点的节点   deleteall  /节点path
      • 设置节点值                     set /节点path value
      • 创建临时节点                  create -e /节点path
      • 创建顺序节点                  create -s /节点path
      • 创建临时顺序节点           create -es  /节点path
      • 查看节点详细信息           ls -s /节点path

        zookeeper,第1张

        Curator

        curator是 Apache XooKeeper的java客户端库。

        curator api操作

        public class CuratorTest {
            private CuratorFramework client;
            /**
             * 建立连接
             */
            @Before//注解是一个 JUnit 注解,用于指示应在测试类中的每个测试方法之前执行带注解的方法。换句话说,该方法将在类中的每个测试方法之前运行。@Before
            public void testConnect() {
                /*
                 *
                 * @param connectString       连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
                 * @param sessionTimeoutMs    会话超时时间 单位ms
                 * @param connectionTimeoutMs 连接超时时间 单位ms
                 * @param retryPolicy         重试策略
                 */
               /* //重试策略
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
                //1.第一种方式
                CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
                        60 * 1000, 15 * 1000, retryPolicy);*/
                //重试策略
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
                //2.第二种方式
                //CuratorFrameworkFactory.builder();
                client = CuratorFrameworkFactory.builder()
                        .connectString("192.168.149.135:2181")
                        .sessionTimeoutMs(60 * 1000)
                        .connectionTimeoutMs(15 * 1000)
                        .retryPolicy(retryPolicy)
                        .namespace("itheima")
                        .build();
                //开启连接
                client.start();
            }
        //==============================create=============================================================================
            /**
             * 创建节点:create 持久 临时 顺序 数据
             * 1. 基本创建 :create().forPath("")
             * 2. 创建节点 带有数据:create().forPath("",data)
             * 3. 设置节点的类型:create().withMode().forPath("",data)
             * 4. 创建多级节点  /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
             */
            @Test
            public void testCreate() throws Exception {
                //2. 创建节点 带有数据
                //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
                String path = client.create().forPath("/app2", "hehe".getBytes());
                System.out.println(path);
            }
            @Test
            public void testCreate2() throws Exception {
                //1. 基本创建
                //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
                String path = client.create().forPath("/app1");
                System.out.println(path);
            }
            @Test
            public void testCreate3() throws Exception {
                //3. 设置节点的类型
                //默认类型:持久化
                String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
                System.out.println(path);
            }
            @Test
            public void testCreate4() throws Exception {
                //4. 创建多级节点  /app1/p1
                //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
                String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
                System.out.println(path);
            }
        //===========================get================================================================================
            /**
             * 查询节点:
             * 1. 查询数据:get: getData().forPath()
             * 2. 查询子节点: ls: getChildren().forPath()
             * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
             */
            @Test
            public void testGet1() throws Exception {
                //1. 查询数据:get
                byte[] data = client.getData().forPath("/app1");
                System.out.println(new String(data));
            }
            @Test
            public void testGet2() throws Exception {
                // 2. 查询子节点: ls
                List path = client.getChildren().forPath("/");
                System.out.println(path);
            }
            @Test
            public void testGet3() throws Exception {
                Stat status = new Stat();
                System.out.println(status);
                //3. 查询节点状态信息:ls -s
                client.getData().storingStatIn(status).forPath("/app1");
                System.out.println(status);
            }
            //===========================set================================================================================
            /**
             * 修改数据
             * 1. 基本修改数据:setData().forPath()
             * 2. 根据版本修改: setData().withVersion().forPath()
             * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
             *
             * @throws Exception
             */
            @Test
            public void testSet() throws Exception {
                client.setData().forPath("/app1", "itcast".getBytes());
            }
            @Test
            public void testSetForVersion() throws Exception {
                Stat status = new Stat();
                //3. 查询节点状态信息:ls -s
                client.getData().storingStatIn(status).forPath("/app1");
                int version = status.getVersion();//查询出来的 3
                System.out.println(version);
                client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
            }
            //===========================delete================================================================================
            /**
             * 删除节点: delete deleteall
             * 1. 删除单个节点:delete().forPath("/app1");
             * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
             * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");
             * 4. 回调:inBackground
             * @throws Exception
             */
            @Test
            public void testDelete() throws Exception {
                // 1. 删除单个节点
                client.delete().forPath("/app1");
            }
            @Test
            public void testDelete2() throws Exception {
                //2. 删除带有子节点的节点
                client.delete().deletingChildrenIfNeeded().forPath("/app4");
            }
            @Test
            public void testDelete3() throws Exception {
                //3. 必须成功的删除
                client.delete().guaranteed().forPath("/app2");
            }
            @Test
            public void testDelete4() throws Exception {
                //4. 回调
                client.delete().guaranteed().inBackground(new BackgroundCallback(){
                    @Override
                    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                        System.out.println("我被删除了~");
                        System.out.println(event);
                    }
                }).forPath("/app1");
            }
            @After
            public void close() {
                if (client != null) {
                    client.close();
                }
            }
        }
        

        Watch事件监听

        public class CuratorWatcherTest {
            private CuratorFramework client;
            /**
             * 建立连接
             */
            @Before
            public void testConnect() {
                /*
                 *
                 * @param connectString       连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
                 * @param sessionTimeoutMs    会话超时时间 单位ms
                 * @param connectionTimeoutMs 连接超时时间 单位ms
                 * @param retryPolicy         重试策略
                 */
               /* //重试策略
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
                //1.第一种方式
                CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
                        60 * 1000, 15 * 1000, retryPolicy);*/
                //重试策略
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
                //2.第二种方式
                //CuratorFrameworkFactory.builder();
                client = CuratorFrameworkFactory.builder()
                        .connectString("192.168.149.135:2181")
                        .sessionTimeoutMs(60 * 1000)
                        .connectionTimeoutMs(15 * 1000)
                        .retryPolicy(retryPolicy)
                        .namespace("itheima")
                        .build();
                //开启连接
                client.start();
            }
            @After
            public void close() {
                if (client != null) {
                    client.close();
                }
            }
            /**
             * 演示 NodeCache:给指定一个节点注册监听器
             */
            @Test
            public void testNodeCache() throws Exception {
                //1. 创建NodeCache对象
                final NodeCache nodeCache = new NodeCache(client,"/app1");
                //2. 注册监听
                nodeCache.getListenable().addListener(new NodeCacheListener() {
                    @Override
                    public void nodeChanged() throws Exception {
                        System.out.println("节点变化了~");
                        //获取修改节点后的数据
                        byte[] data = nodeCache.getCurrentData().getData();
                        System.out.println(new String(data));
                    }
                });
                //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
                nodeCache.start(true);
                while (true){
                }
            }
            /**
             * 演示 PathChildrenCache:监听某个节点的所有子节点们
             */
            @Test
            public void testPathChildrenCache() throws Exception {
                //1.创建监听对象
                PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
                //2. 绑定监听器
                pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                        System.out.println("子节点变化了~");
                        System.out.println(event);
                        //监听子节点的数据变更,并且拿到变更后的数据
                        //1.获取类型
                        PathChildrenCacheEvent.Type type = event.getType();
                        //2.判断类型是否是update
                        if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                            System.out.println("数据变了!!!");
                            byte[] data = event.getData().getData();
                            System.out.println(new String(data));
                        }
                    }
                });
                //3. 开启
                pathChildrenCache.start();
                while (true){
                }
            }
        
            /**
             * 演示 TreeCache:监听某个节点自己和所有子节点们
             */
            @Test
            public void testTreeCache() throws Exception {
                //1. 创建监听器
                TreeCache treeCache = new TreeCache(client,"/app2");
                //2. 注册监听
                treeCache.getListenable().addListener(new TreeCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                        System.out.println("节点变化了");
                        System.out.println(event);
                    }
                });
                //3. 开启
                treeCache.start();
                while (true){
                }
            }
        }
        

        ZooKeeper分布式锁原理

        zookeeper,第2张

        public class Ticket12306 implements Runnable{
            private int tickets = 10;//数据库的票数
            private InterProcessMutex lock ;
            public Ticket12306(){
                //重试策略
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
                //2.第二种方式
                //CuratorFrameworkFactory.builder();
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("192.168.149.135:2181")
                        .sessionTimeoutMs(60 * 1000)
                        .connectionTimeoutMs(15 * 1000)
                        .retryPolicy(retryPolicy)
                        .build();
                //开启连接
                client.start();
                lock = new InterProcessMutex(client,"/lock");
            }
            @Override
            public void run() {
                while(true){
                    //获取锁
                    try {
                        lock.acquire(3, TimeUnit.SECONDS);
                        if(tickets > 0){
                            System.out.println(Thread.currentThread()+":"+tickets);
                            Thread.sleep(100);
                            tickets--;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }finally {
                        //释放锁
                        try {
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
        
        public static void main(String[] args) {
                Ticket12306 ticket12306 = new Ticket12306();
                //创建客户端
                Thread t1 = new Thread(ticket12306,"携程");
                Thread t2 = new Thread(ticket12306,"飞猪");
                t1.start();
                t2.start();
        }

网友评论

搜索
最新文章
热门文章
热门标签