目录
简介
Zookeeper命令操作
zookeeper数据模型介绍:
Zookeeper服务端命令
Zookeeper客户端命令
Curator
Watch事件监听
ZooKeeper分布式锁原理
简介
概念:zookeeper是一个分布式应用程序的协调服务。
作用:配置管理、分布式锁、集群管理
Zookeeper命令操作
zookeeper数据模型介绍:
zookeeper是一个树形数据结构。每一个节点被称为 ZNode,每个节点会保存自己的数据和节点信息,并允许少量的数据存储到节点下。
节点分为四类:
- persistent 持久化节点
- ephemeral 临时节点:-e
- persistent_sequential 持久化顺序节点:-s
- ephemeral_sequentia 临时顺序节点: -es
Zookeeper服务端命令
- 启动 ZooKeeper 服务: ./zkServer.sh start
- 查看 ZooKeeper 服务状态: ./zkServer.sh status
- 停止 ZooKeeper 服务: ./zkServer.sh stop
- 重启 ZooKeeper 服务: ./zkServer.sh restart
Zookeeper客户端命令
- 连接服务端命令 ./zkCli.sh -server ip:port
- 断开连接命令 quit
- 查看命令帮助 help
- 显示指定目录下节点 ls 目录
- 创建节点 create /节点path value
- 删除节点 delete /节点path
- 删除带有子节点的节点 deleteall /节点path
- 设置节点值 set /节点path value
- 创建临时节点 create -e /节点path
- 创建顺序节点 create -s /节点path
- 创建临时顺序节点 create -es /节点path
- 查看节点详细信息 ls -s /节点path
Curator
curator是 Apache XooKeeper的java客户端库。
curator api操作
public class CuratorTest { private CuratorFramework client; /** * 建立连接 */ @Before//注解是一个 JUnit 注解,用于指示应在测试类中的每个测试方法之前执行带注解的方法。换句话说,该方法将在类中的每个测试方法之前运行。@Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("192.168.149.135:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .namespace("itheima") .build(); //开启连接 client.start(); } //==============================create============================================================================= /** * 创建节点:create 持久 临时 顺序 数据 * 1. 基本创建 :create().forPath("") * 2. 创建节点 带有数据:create().forPath("",data) * 3. 设置节点的类型:create().withMode().forPath("",data) * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data) */ @Test public void testCreate() throws Exception { //2. 创建节点 带有数据 //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app2", "hehe".getBytes()); System.out.println(path); } @Test public void testCreate2() throws Exception { //1. 基本创建 //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储 String path = client.create().forPath("/app1"); System.out.println(path); } @Test public void testCreate3() throws Exception { //3. 设置节点的类型 //默认类型:持久化 String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3"); System.out.println(path); } @Test public void testCreate4() throws Exception { //4. 创建多级节点 /app1/p1 //creatingParentsIfNeeded():如果父节点不存在,则创建父节点 String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1"); System.out.println(path); } //===========================get================================================================================ /** * 查询节点: * 1. 查询数据:get: getData().forPath() * 2. 查询子节点: ls: getChildren().forPath() * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath() */ @Test public void testGet1() throws Exception { //1. 查询数据:get byte[] data = client.getData().forPath("/app1"); System.out.println(new String(data)); } @Test public void testGet2() throws Exception { // 2. 查询子节点: ls List
path = client.getChildren().forPath("/"); System.out.println(path); } @Test public void testGet3() throws Exception { Stat status = new Stat(); System.out.println(status); //3. 查询节点状态信息:ls -s client.getData().storingStatIn(status).forPath("/app1"); System.out.println(status); } //===========================set================================================================================ /** * 修改数据 * 1. 基本修改数据:setData().forPath() * 2. 根据版本修改: setData().withVersion().forPath() * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。 * * @throws Exception */ @Test public void testSet() throws Exception { client.setData().forPath("/app1", "itcast".getBytes()); } @Test public void testSetForVersion() throws Exception { Stat status = new Stat(); //3. 查询节点状态信息:ls -s client.getData().storingStatIn(status).forPath("/app1"); int version = status.getVersion();//查询出来的 3 System.out.println(version); client.setData().withVersion(version).forPath("/app1", "hehe".getBytes()); } //===========================delete================================================================================ /** * 删除节点: delete deleteall * 1. 删除单个节点:delete().forPath("/app1"); * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1"); * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2"); * 4. 回调:inBackground * @throws Exception */ @Test public void testDelete() throws Exception { // 1. 删除单个节点 client.delete().forPath("/app1"); } @Test public void testDelete2() throws Exception { //2. 删除带有子节点的节点 client.delete().deletingChildrenIfNeeded().forPath("/app4"); } @Test public void testDelete3() throws Exception { //3. 必须成功的删除 client.delete().guaranteed().forPath("/app2"); } @Test public void testDelete4() throws Exception { //4. 回调 client.delete().guaranteed().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("我被删除了~"); System.out.println(event); } }).forPath("/app1"); } @After public void close() { if (client != null) { client.close(); } } } Watch事件监听
public class CuratorWatcherTest { private CuratorFramework client; /** * 建立连接 */ @Before public void testConnect() { /* * * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181" * @param sessionTimeoutMs 会话超时时间 单位ms * @param connectionTimeoutMs 连接超时时间 单位ms * @param retryPolicy 重试策略 */ /* //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); //1.第一种方式 CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181", 60 * 1000, 15 * 1000, retryPolicy);*/ //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式 //CuratorFrameworkFactory.builder(); client = CuratorFrameworkFactory.builder() .connectString("192.168.149.135:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .namespace("itheima") .build(); //开启连接 client.start(); } @After public void close() { if (client != null) { client.close(); } } /** * 演示 NodeCache:给指定一个节点注册监听器 */ @Test public void testNodeCache() throws Exception { //1. 创建NodeCache对象 final NodeCache nodeCache = new NodeCache(client,"/app1"); //2. 注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了~"); //获取修改节点后的数据 byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据 nodeCache.start(true); while (true){ } } /** * 演示 PathChildrenCache:监听某个节点的所有子节点们 */ @Test public void testPathChildrenCache() throws Exception { //1.创建监听对象 PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true); //2. 绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点变化了~"); System.out.println(event); //监听子节点的数据变更,并且拿到变更后的数据 //1.获取类型 PathChildrenCacheEvent.Type type = event.getType(); //2.判断类型是否是update if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("数据变了!!!"); byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); //3. 开启 pathChildrenCache.start(); while (true){ } } /** * 演示 TreeCache:监听某个节点自己和所有子节点们 */ @Test public void testTreeCache() throws Exception { //1. 创建监听器 TreeCache treeCache = new TreeCache(client,"/app2"); //2. 注册监听 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("节点变化了"); System.out.println(event); } }); //3. 开启 treeCache.start(); while (true){ } } }
ZooKeeper分布式锁原理
public class Ticket12306 implements Runnable{ private int tickets = 10;//数据库的票数 private InterProcessMutex lock ; public Ticket12306(){ //重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //2.第二种方式 //CuratorFrameworkFactory.builder(); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.149.135:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .build(); //开启连接 client.start(); lock = new InterProcessMutex(client,"/lock"); } @Override public void run() { while(true){ //获取锁 try { lock.acquire(3, TimeUnit.SECONDS); if(tickets > 0){ System.out.println(Thread.currentThread()+":"+tickets); Thread.sleep(100); tickets--; } } catch (Exception e) { e.printStackTrace(); }finally { //释放锁 try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
public static void main(String[] args) { Ticket12306 ticket12306 = new Ticket12306(); //创建客户端 Thread t1 = new Thread(ticket12306,"携程"); Thread t2 = new Thread(ticket12306,"飞猪"); t1.start(); t2.start(); }
猜你喜欢
- 9小时前【Spark编程基础】第7章 Structured Streaming
- 9小时前Windows上安装和配置Apache Kafka
- 9小时前hadoop报错ERROR: Cannot set priority of namenode process
- 9小时前Kafka某Topic的部分partition无法消费问题
- 9小时前实时计算大作业kafka+zookeeper+storm+dataV
- 9小时前大数据难学还是java难学,大数据学java还是c语言
- 9小时前Kafka-Topic&Partition
- 7小时前橙子吃多了(橙子吃多了会怎么样)
- 5小时前英语四级多少分过线(英语四级多少分过线2023年)
- 47分钟前汉服文化(汉服文化知识简介)
网友评论
- 搜索
- 最新文章
- 热门文章