背景:
我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?
外部定时器定时加载实现
1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示
package wikiedits.schedule; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import org.apache.flink.util.ExecutorUtils; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduleRichMapFunction extends RichFlatMapFunction{ // 定时任务执行器 private transient ScheduledExecutorService scheduledExecutorService; // 本地变量 private int threshold; @Override public void open(Configuration parameters) throws Exception { // 1.从db查询数据初始化本地变量 // threshold = DBManager.SELECTSQL.getConfig("threshold"); // 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值 scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.scheduleWithFixedDelay(() -> { // 2.1 定时任务更新本地内存配置项 // List configList = DBManager.SELECTSQL.getConfigs(); // for(ConfigEntity entity : configList){ ConfigEntityLocalCache.getInstance().update("key", "value"); // } // 2.2 更新本地变量threshold的值 // threshold = DBManager.SELECTSQL.getConfig("threshold"); }, 0, 100, TimeUnit.SECONDS); } @Override public void flatMap(String value, Collector out) throws Exception { } @Override public void close() throws Exception { ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService); } } //本地缓存实现 package wikiedits.schedule; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; /** * 保存Config信息的本地缓存 ---定时同步DB配置表的数据 */ public class ConfigEntityLocalCache { private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache(); /** * 获取本地缓存实例 */ public static ConfigEntityLocalCache getInstance() { return instance; } /** 缓存内存配置项 */ private static Cache configCache = CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build(); /** * 更新本地缓存数据 */ public boolean update(String key, String value){ configCache.put(key, value); return true; } /** * 更新本地缓存数据 */ public String getByKey(String key){ return configCache.getIfPresent(key); } }
2.在静态类中通过static语句块创建定时器并定时加载,代码如下
package wikiedits.schedule; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; /** * 静态类定时加载DB配置表到本地内存中 */ public class StaticLoadUtil { // 定时任务执行器 private static transient ScheduledExecutorService scheduledExecutorService; public static final CacheconfigCache = CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build(); // 通过定时执行器定时同步本地缓存和DB配置表 static { scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.scheduleWithFixedDelay(() -> { // 2.1 定时任务更新本地内存配置项 // List configList = DBManager.SELECTSQL.getConfigs(); // for(ConfigEntity entity : configList){ configCache.put("key", "value"); // } // 2.2 更新本地变量threshold的值 // threshold = DBManager.SELECTSQL.getConfig("threshold"); }, 0, 100, TimeUnit.SECONDS); } /** * 获取本地缓存 */ public static Cache getConfigCache() { return configCache; } }
总结:
1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行
2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章