上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

flink中使用外部定时器实现定时刷新

guduadmin18小时前

背景:

我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?

外部定时器定时加载实现

1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示

package wikiedits.schedule;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleRichMapFunction extends RichFlatMapFunction {
    // 定时任务执行器
    private transient ScheduledExecutorService scheduledExecutorService;
    // 本地变量
    private int threshold;
    @Override
    public void open(Configuration parameters) throws Exception {
        // 1.从db查询数据初始化本地变量
//        threshold = DBManager.SELECTSQL.getConfig("threshold");
        // 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值
        scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            // 2.1 定时任务更新本地内存配置项
            // List configList = DBManager.SELECTSQL.getConfigs();
//            for(ConfigEntity entity : configList){
                ConfigEntityLocalCache.getInstance().update("key", "value");
//            }
            // 2.2 更新本地变量threshold的值
//            threshold = DBManager.SELECTSQL.getConfig("threshold");
        }, 0, 100, TimeUnit.SECONDS);
    }
    @Override
    public void flatMap(String value, Collector out) throws Exception {
    }
    @Override
    public void close() throws Exception {
        ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);
    }
}
//本地缓存实现
package wikiedits.schedule;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
 * 保存Config信息的本地缓存 ---定时同步DB配置表的数据
 */
public class ConfigEntityLocalCache {
    private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();
    /**
     * 获取本地缓存实例
     */
    public static ConfigEntityLocalCache getInstance() {
        return instance;
    }
    /** 缓存内存配置项 */
    private static Cache configCache =
            CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();
    /**
     * 更新本地缓存数据
     */
    public boolean update(String key, String value){
        configCache.put(key, value);
        return true;
    }
    /**
     * 更新本地缓存数据
     */
    public  String getByKey(String key){
        return configCache.getIfPresent(key);
    }
}

2.在静态类中通过static语句块创建定时器并定时加载,代码如下

package wikiedits.schedule;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
 * 静态类定时加载DB配置表到本地内存中
 */
public class StaticLoadUtil {
    // 定时任务执行器
    private static transient ScheduledExecutorService scheduledExecutorService;
    public static final Cache configCache =
            CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();
    // 通过定时执行器定时同步本地缓存和DB配置表
    static {
        scheduledExecutorService = Executors.newScheduledThreadPool(10);
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            // 2.1 定时任务更新本地内存配置项
            // List configList = DBManager.SELECTSQL.getConfigs();
            // for(ConfigEntity entity : configList){
            configCache.put("key", "value");
            // }
            // 2.2 更新本地变量threshold的值
            // threshold = DBManager.SELECTSQL.getConfig("threshold");
        }, 0, 100, TimeUnit.SECONDS);
    }
    /**
     * 获取本地缓存
     */
    public static Cache getConfigCache() {
        return configCache;
    }
}

总结:

1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行

2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行

网友评论

搜索
最新文章
热门文章
热门标签