序
本文主要研究一下PowerJob的SystemInfoController
SystemInfoController
tech/powerjob/server/web/controller/SystemInfoController.java
@Slf4j @RestController @RequestMapping("/system") @RequiredArgsConstructor public class SystemInfoController { private final JobInfoRepository jobInfoRepository; private final InstanceInfoRepository instanceInfoRepository; private final ServerInfoService serverInfoService; private final WorkerClusterQueryService workerClusterQueryService; @GetMapping("/listWorker") public ResultDTO> listWorker(Long appId) { List
workerInfos = workerClusterQueryService.getAllWorkers(appId); return ResultDTO.success(workerInfos.stream().map(WorkerStatusVO::new).collect(Collectors.toList())); } @GetMapping("/overview") public ResultDTO getSystemOverview(Long appId) { SystemOverviewVO overview = new SystemOverviewVO(); // 总任务数量 overview.setJobCount(jobInfoRepository.countByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV())); // 运行任务数 overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV())); // 近期失败任务数(24H内) Date date = DateUtils.addDays(new Date(), -1); overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date)); // 服务器时区 overview.setTimezone(TimeZone.getDefault().getDisplayName()); // 服务器时间 overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN)); overview.setServerInfo(serverInfoService.fetchServiceInfo()); return ResultDTO.success(overview); } }
SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数
getAllWorkers
tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
@DesignateServer public ListgetAllWorkers(Long appId) { List workers = Lists.newLinkedList(getWorkerInfosByAppId(appId).values()); workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); return workers; } private Map getWorkerInfosByAppId(Long appId) { ClusterStatusHolder clusterStatusHolder = getAppId2ClusterStatus().get(appId); if (clusterStatusHolder == null) { log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); return Collections.emptyMap(); } return clusterStatusHolder.getAllWorkers(); } public Map getAppId2ClusterStatus() { return WorkerClusterManagerService.getAppId2ClusterStatus(); }
getAllWorkers通过getWorkerInfosByAppId获取WorkerInfo,然后根据getSystemMetrics().calculateScore()进行排序
WorkerClusterManagerService
tech/powerjob/server/remote/worker/WorkerClusterManagerService.java
@Slf4j public class WorkerClusterManagerService { /** * 存储Worker健康信息,appId -> ClusterStatusHolder */ private static final MapAPP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap(); /** * 更新状态 * @param heartbeat Worker的心跳包 */ public static void updateStatus(WorkerHeartbeat heartbeat) { Long appId = heartbeat.getAppId(); String appName = heartbeat.getAppName(); ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName)); clusterStatusHolder.updateStatus(heartbeat); } /** * 清理不需要的worker信息 * @param usingAppIds 需要维护的appId,其余的数据将被删除 */ public static void clean(List usingAppIds) { Set keys = Sets.newHashSet(usingAppIds); APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey())); } /** * 清理缓存信息,防止 OOM */ public static void cleanUp() { APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release); } protected static Map getAppId2ClusterStatus() { return APP_ID_2_CLUSTER_STATUS; } }
WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat)
updateStatus
tech/powerjob/server/remote/worker/ClusterStatusHolder.java
public void updateStatus(WorkerHeartbeat heartbeat) { String workerAddress = heartbeat.getWorkerAddress(); long heartbeatTime = heartbeat.getHeartbeatTime(); WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> { WorkerInfo wf = new WorkerInfo(); wf.refresh(heartbeat); return wf; }); long oldTime = workerInfo.getLastActiveTime(); if (heartbeatTime < oldTime) { log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime()); return; } workerInfo.refresh(heartbeat); ListcontainerInfos = heartbeat.getContainerInfos(); if (!CollectionUtils.isEmpty(containerInfos)) { containerInfos.forEach(containerInfo -> { Map infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap()); infos.put(workerAddress, containerInfo); }); } }
updateStatus方法先根据workerAddress获取workerInfo,若heartbeatTime大于等于lastActiveTime则执行workerInfo.refresh(heartbeat),同时更新containerInfos
getSystemMetrics
tech/powerjob/worker/common/utils/SystemInfoUtils.java
public class SystemInfoUtils { private static final NumberFormat NF = NumberFormat.getNumberInstance(); static { NF.setMaximumFractionDigits(4); NF.setMinimumFractionDigits(4); NF.setRoundingMode(RoundingMode.HALF_UP); // 不按照千分位输出 NF.setGroupingUsed(false); } // JMX bean can be accessed externally and is meant for management tools like hyperic ( or even nagios ) - It would delegate to Runtime anyway. private static final Runtime runtime = Runtime.getRuntime(); private static final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean(); public static SystemMetrics getSystemMetrics() { SystemMetrics metrics = new SystemMetrics(); fillCPUInfo(metrics); fillMemoryInfo(metrics); fillDiskInfo(metrics); // 在Worker完成分数计算,减小Server压力 metrics.calculateScore(); return metrics; } private static void fillCPUInfo(SystemMetrics metrics) { metrics.setCpuProcessors(osMXBean.getAvailableProcessors()); metrics.setCpuLoad(miniDouble(osMXBean.getSystemLoadAverage())); } private static void fillMemoryInfo(SystemMetrics metrics) { // JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存,即-Xmx参数设置的值,totalMemory指JVM当前持久的总内存) long maxMemory = runtime.maxMemory(); long usedMemory = runtime.totalMemory() - runtime.freeMemory(); metrics.setJvmMaxMemory(bytes2GB(maxMemory)); // 已使用内存:当前申请总量 - 当前空余量 metrics.setJvmUsedMemory(bytes2GB(usedMemory)); // 已用内存比例 metrics.setJvmMemoryUsage(miniDouble((double) usedMemory / maxMemory)); } private static void fillDiskInfo(SystemMetrics metrics) { long free = 0; long total = 0; File[] roots = File.listRoots(); for (File file : roots) { free += file.getFreeSpace(); total += file.getTotalSpace(); } metrics.setDiskUsed(bytes2GB(total - free)); metrics.setDiskTotal(bytes2GB(total)); metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal())); } private static double bytes2GB(long bytes) { return miniDouble(bytes / 1024.0 / 1024 / 1024); } private static double miniDouble(double origin) { return Double.parseDouble(NF.format(origin)); } }
SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore();cpu信息通过osMXBean.getAvailableProcessors()、osMXBean.getSystemLoadAverage()获取;memory信息通过Runtime获取;disk信息则通过遍历File.listRoots()去统计freeSpace及totalSpace
calculateScore
tech/powerjob/common/model/SystemMetrics.java
public int calculateScore() { if (score > 0) { return score; } // Memory is vital to TaskTracker, so we set the multiplier factor as 2. double memScore = (jvmMaxMemory - jvmUsedMemory) * 2; // Calculate the remaining load of CPU. Multiplier is set as 1. double cpuScore = cpuProcessors - cpuLoad; // Windows can not fetch CPU load, set cpuScore as 1. if (cpuScore > cpuProcessors) { cpuScore = 1; } score = (int) (memScore + cpuScore); return score; }
SystemMetrics的calculateScore则是由memScore、cpuScore两部分相加而成;memScore为(jvmMaxMemory - jvmUsedMemory) * 2,cpuScore为cpuProcessors - cpuLoad
小结
SystemInfoController提供了listWorker、getSystemOverview方法;listWorker则是根据当前登录的appId来获取其WorkerInfo;getSystemOverview则是统计了当前appId的总任务数量、运行任务数、近期失败任务数;WorkerClusterManagerService定义了APP_ID_2_CLUSTER_STATUS,维护了appId到具体ClusterStatusHolder的映射;其中updateStatus接收WorkerHeartbeat,然后执行clusterStatusHolder.updateStatus(heartbeat);WorkerInfo包含了SystemMetrics,SystemInfoUtils提供了getSystemMetrics方法,它通过fillCPUInfo、fillMemoryInfo、fillDiskInfo填充cpu、memory、disk信息,最后执行metrics.calculateScore()。
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章