文章目录
- 前言
- 一、架构图和流程图
- 二、流程说明
- 1.服务启动初始化ZK、注册所有服务节点信息-MasterRegister
- 2.创建、运行服务节点,并管理服务节点-LeaderSelectorZkClient。
- 3.典型场景-调度服务单体执行-DigitalEmpTask
- 总结
- 参考
前言
Spring Boot 主备切换可以采用数据库的主从同步、Zookeeper选举、Redis Sentinel等技术实现高可用。
其中,数据库的主从同步可以通过配置数据库的主从复制来实现。在主节点出现故障时,从节点可以自动接管并成为新的主节点。这种方式实现简单,但需要手动配置主从复制。
Zookeeper选举可以利用Zookeeper的特性来实现,即在Zookeeper上创建一个临时节点作为选举的标志,节点创建成功的服务就是主节点,其他服务则是备节点。在主节点出现故障时,Zookeeper会重新选举一个新的主节点。这种方式实现相对较为复杂,但具有更好的灵活性和可扩展性。
Redis Sentinel是Redis提供的一种高可用性解决方案,可以自动完成主从切换,同时具有自动故障检测和恢复等功能。Redis Sentinel需要在多个节点上运行,并且可以配置多个从节点来实现数据备份和故障转移。当主节点故障时,Redis Sentinel会自动将其中一个从节点升级为新的主节点,保证服务的高可用性。
一、架构图和流程图
说明:
主+备模式中有1个主服务节点、多个备服务节点,由主服务节点向外提供服务,备服务节点监听主机状态,一旦主服务节点宕机,备服务节点速接管主服务继续向外提供服务。
通过Zookeeper(集群)服务注册/发现特性完成主备切换;
- 1-工作服务器启动时,各服务节点在ZooKeeper的Servers节点下创建临时节点,并把基本信息写入临时节点,完成注册;
- 2-各服务节点实时监听Servers节点的子节点列表,并尝试创建Master临时节点,谁创建成功谁就是Master,其他的服务节点就作为Slave
- 3-所有的服务节点关注Master节点的删除事件,通过监听Master节点的删除事件来体现Master服务器是否宕机(创建临时节点的服务器一旦宕机,它所创建的临时节点即会自动删除)
- 4-.一旦Master服务器宕机,其它服务节点开始新一轮的Master选举,计算新的Master服务器。
二、流程说明
1.服务启动初始化ZK、注册所有服务节点信息-MasterRegister
代码如下(示例):
package com.merak.hyper.automation.zk; import com.merak.hyper.automation.util.ZkHelper; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; /** * @author harry * @version 1.0 * @ClassName: ZkMasterRegister * @description: zk主备MasterRegister:启动时初始化ZK、注册所有服务节点信息 */ @Component @Order(1) public class ZkMasterRegister implements CommandLineRunner { public static final Logger log = LoggerFactory.getLogger(ZkMasterRegister.class); @Value("${zk_master.status}") private String status; @Value("${zk_master.serviceurl}") private String serviceurl; @Override public void run(String... args) { if( ZkHelper.getInstance().zookeeperOpen(status) ) { String[] workServerArr = ZkHelper.getInstance().workServerInfo(); if (!StringUtils.isBlank(workServerArr[0]) && !StringUtils.isBlank(serviceurl)) { LeaderSelectorZkClient.getInstance().initZk(serviceurl, workServerArr[0], workServerArr[1], workServerArr[2], ZkHelper.getInstance().zkMasterPath()); log.info("程序启动,初始化ZK、注册服务节点等信息!"); } else { log.warn("参数未配置zookeeper服务器的地址[sys.zookeeper.serviceurl],请检查!");//ip 和 name } } else{ log.warn("当前调度服务为单节点服务,未配置zookeeper服务器"); } } }
2.创建、运行服务节点,并管理服务节点-LeaderSelectorZkClient。
工作服务器节点的基本信息
每个分布式服务节点基本信息包括:serviceIp、servicePort和name, 确保分布式服务节点的唯一性。
package com.merak.hyper.automation.zk; import java.io.Serializable; /** * 工作服务器节点的基本信息 */ public class RunningData implements Serializable { private static final long serialVersionUID = 4260577459043203630L; private String serviceIp; private String servicePort; private String name; public String getServiceIp() { return serviceIp; } public void setServiceIp(String serviceIp) { this.serviceIp = serviceIp; } public String getServicePort() { return servicePort; } public void setServicePort(String servicePort) { this.servicePort = servicePort; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "RunningData{" + "serviceIp='" + serviceIp + '\'' + ", servicePort='" + servicePort + '\'' + ", name='" + name + '\'' + '}'; } }
创建、运行服务节点,并管理服务节点
代码如下(示例):
/** * @description: ZOOKEEPER_SERVER连接和服务节点管理 * @param: [zookeeper_server, session_connection_timeout, serviceIp, serviceName] * @return: void */ public void initZk(String zookeeper_server,String serviceIp, String serviceName, String servicePort,String zkMasterName) { try { log.info("创建服务器节点["+serviceIp+","+serviceName+"]开始!"); //创建zkClient client = ZkConnect.getInstance().connectZkSever(zookeeper_server); //创建serverData runningData = new RunningData(); runningData.setServiceIp(serviceIp); runningData.setName(serviceName); runningData.setServicePort(servicePort); //创建服务 workServer = new WorkServer(runningData,zkMasterName); workServer.setZkClient(client); workServer.start(); log.info("创建服务器节点["+serviceIp+","+serviceName+"]结束!"); } catch (Exception e) { log.error("zookeeper_server init error,msg=" + e.getMessage()); } finally { log.info("zookeeper_server finally ..."); } }
服务节点启动、订阅Master节点删除事件、争抢Master权利成为master节点
代码片断如下:
//初始化工作服务器WorkServer信息 public WorkServer(RunningData rd, String zkMasterName) { this.serverData = rd; // 记录服务器基本信息 this.MASTER_PATH = zkMasterName; this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) { //master切换时需要重置 调度云托管任务表 schedule_status = init zkResetScheduleStatus.switchResetScheduleStatus(); log.info(dataPath + "路径已经删除,开始新一轮Master抢占"); if (masterData != null && masterData.getName().equals(serverData.getName()) && masterData.getServiceIp().equals(serverData.getServiceIp()) && masterData.getServicePort().equals(serverData.getServicePort())) { takeMaster();//自己就是上一轮的Master服务器,则直接抢 } else { //否则延迟5秒后再抢。应对网络抖动给上一轮的Master服务器优先抢占master的权利,避免不必要的数据迁移开销 delayExecutor.schedule(new Runnable() { public void run() { log.info("服务器开始抢占Master权利"); takeMaster(); } }, delayTime, TimeUnit.SECONDS); } } public void handleDataChange(String dataPath, Object data) { log.info("IZkDataListener - handleDataChange,dataPath=" + dataPath + ",data=" + data.toString()); } }; } ..... // 1 启动服务器 public void start() throws Exception { if (running) { throw new Exception("server has startup..."); } running = true; // 2 订阅Master节点删除事件 zkClient.subscribeDataChanges(MASTER_PATH, dataListener); // 3 争抢Master权利 takeMaster(); } ..... // 争抢Master private void takeMaster() { if (!running) return; try { if (!zkClient.exists(MASTER_PATH)) { // 尝试创建Master临时节点 zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; log.info("服务器节点[" + serverData.getServiceIp() + "," + serverData.getName() + "," + serverData.getServicePort() + "]争抢Master成功,成为master[isMaster]!"); } else { // 已被其他服务器创建了,读取Master节点信息 RunningData runningData = zkClient.readData(MASTER_PATH, true); log.info("master已被服务器节点[" + runningData.getServiceIp() + "," + runningData.getName() + "," + runningData.getServicePort() + "]占有,当前节点[" + serverData.getServiceIp() + "," + serverData.getName() + "," + serverData.getServicePort() + "]只能读取master节点信息!"); if (runningData == null) { takeMaster(); // 没读到或读取瞬间Master节点宕机可争抢 } else { masterData = runningData; } } } catch (ZkNodeExistsException e) { log.error("当前节点" + serverData.getServiceIp() + "," + serverData.getName() + "," + serverData.getServicePort() + "]创建Master临时节点异常,msg=" + e.getMessage()); } catch (Exception e) { log.error("当前节点" + serverData.getServiceIp() + "," + serverData.getName() + "," + serverData.getServicePort() + "]争抢Master异常,msg=" + e.getMessage()); } }
3.典型场景-调度服务单体执行-DigitalEmpTask
需求:某个时刻只允许Master节点执行调度服务,其它Slave从节点处于闲置、不执行状态。
package com.merak.hyper.automation.quartz.task; import com.merak.hyper.automation.util.DateUtils; import com.merak.hyper.automation.util.ZkHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * @author harry * @version 1.0 * @ClassName: BizOrderTask * @description: 任务队列服务调度 */ @Component public class DigitalEmpTask { public static final Logger log = LoggerFactory.getLogger(DigitalEmpTask.class); @Value("${zk_master.status}") private String status; @Scheduled(cron = "0/30 * * * * ?") protected void digitalEmpTaskScheduler() { //1.判断是否开启zookeeper分布式调度模式 if( ZkHelper.getInstance().zookeeperOpen(status) ) { //2.判断当前工作服务节点为Master节点 if( ZkHelper.getInstance().checkMaster() ) { executeCloudTask(); } } else{ //1.未开启zookeeper分布式调度模式,为单节点部署 executeCloudTask(); } } public void executeCloudTask(){ log.info("任务开始执行,时间:" + DateUtils.dateTimeNow(DateUtils.YYYY_MM_DD_HH_MM_SS)); try { } catch (Exception e) { log.error("调度失败,原因:" + e.getMessage()); } } }
总结
1.线上1主2从已运行半年,可达到HA业务需求、自动切换能力
2.前端采取Nginx负载、分流,配置多个工作服务节点
参考
浅析如何基于ZooKeeper实现高可用架构
源代码下载
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章