SpringBoot整合Flink(施耐德PLC物联网信息采集)
Linux环境安装kafka
前情:
施耐德PLC设备(TM200C16R)设置好信息采集程序,连接局域网,SpringBoot订阅MQTT主题,消息转至kafka,由flink接收并持久化到mysql数据库;
Wireshark抓包如下:
MQTTBox测试订阅如下:
已知参数:
服务器IP:139.220.193.14
端口号:1883
应用端账号:admin@tenlink
应用端密码:Tenlink@123
物联网账号:202303171001
物联网账号密码:03171001
订阅话题(topic):
202303171001/p(发布话题,由设备发送,应用端接收)
202303171001/s(订阅话题,由应用端发送,设备接收)
订阅mqtt (前提是kafka是已经就绪状态且plc_thoroughfare主题是存在的)
maven pom
org.eclipse.paho org.eclipse.paho.client.mqttv31.2.5 yaml配置
spring: kafka: bootstrap-servers: ip:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer ## 自定义 kafka: topics: # kafka 主题 plc1: plc_thoroughfare plc: broker: tcp://139.220.193.14:1883 subscribe-topic: 202303171001/p username: admin@tenlink password: Tenlink@123 client-id: subscribe_client
订阅mqtt并将报文发送到kafka主题
import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * PLC 订阅消息 */ @Component public class SubscribeSample { private static final Logger log = LoggerFactory.getLogger(SubscribeSample.class); @Autowired private KafkaTemplate
kafkaTemplate; @Value("${kafka.topics.plc1}") private String plc1; @Value("${plc.broker}") private String broker; @Value("${plc.subscribe-topic}") private String subscribeTopic; @Value("${plc.username}") private String username; @Value("${plc.password}") private String password; @Value("${plc.client-id}") private String clientId; @PostConstruct public void plcGather() { int qos = 0; Thread thread = new Thread(new Runnable() { @Override public void run() { MqttClient client = null; try { client = new MqttClient(broker, clientId, new MemoryPersistence()); // 连接参数 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(60); options.setKeepAliveInterval(60); // 设置回调 client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("connectionLost: " + cause.getMessage()); } public void messageArrived(String topic, MqttMessage message) { String data = new String(message.getPayload()); kafkaTemplate.send(plc1,data).addCallback(success ->{ // 消息发送到的topic String kafkaTopic = success.getRecordMetadata().topic(); // 消息发送到的分区 // int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset // long offset = success.getRecordMetadata().offset(); log.info("mqtt成功将消息:{},转入到kafka主题->{}", data,kafkaTopic); },failure ->{ throw new RuntimeException("发送消息失败:" + failure.getMessage()); }); } public void deliveryComplete(IMqttDeliveryToken token) { log.info("deliveryComplete---------{}", token.isComplete()); } }); client.connect(options); client.subscribe(subscribeTopic, qos); } catch (MqttException e) { e.printStackTrace(); } } }); thread.start(); } } 采集报文测试(如下图表示成功,并且已经发送到了kafka主题上)
Flink接收kafka数据
maven pom
com.alibaba fastjson1.2.83 org.apache.commons commons-collections44.4 org.apache.commons commons-lang3org.projectlombok lombok1.18.26 org.apache.flink flink-java1.13.1 org.apache.flink flink-streaming-java_2.111.13.1 org.apache.flink flink-clients_2.111.13.1 org.apache.flink flink-connector-kafka_2.111.13.1 org.apache.flink flink-json1.13.1 org.apache.flink flink-jdbc_2.111.10.0 org.springframework.boot spring-boot-starter-data-jpayaml配置
# 服务接口 server: port: 8222 spring: kafka: bootstrap-servers: ip:9092 consumer: group-id: kafka key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer datasource: url: jdbc:mysql://127.0.0.01:3306/ceshi?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai driver-class-name: com.mysql.cj.jdbc.Driver username: root password: root druid: initial-size: 5 #初始化时建立物理连接的个数 min-idle: 5 #最小连接池数量 maxActive: 20 #最大连接池数量 maxWait: 60000 #获取连接时最大等待时间,单位毫秒 timeBetweenEvictionRunsMillis: 60000 #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 minEvictableIdleTimeMillis: 300000 #配置一个连接在池中最小生存的时间,单位是毫秒 validationQuery: SELECT 1 #用来检测连接是否有效的sql testWhileIdle: true #申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效 testOnBorrow: false #申请连接时执行validationQuery检测连接是否有效,如果为true会降低性能 testOnReturn: false #归还连接时执行validationQuery检测连接是否有效,如果为true会降低性能 poolPreparedStatements: true # 打开PSCache,并且指定每个连接上PSCache的大小 maxPoolPreparedStatementPerConnectionSize: 20 #要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。在Druid中,不会存在Oracle下PSCache占用内存过多的问题,可以把这个数值配置大一些,比如说100 filters: stat,wall,slf4j #配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 #通过connectProperties属性来打开mergeSql功能;慢SQL记录 connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000 jpa: hibernate: ddl-auto: none show-sql: true repositories: packages: com.hzh.demo.domain.* #自定义配置 customer: #flink相关配置 flink: # 功能开关 plc-status: true plc-topic: plc_thoroughfare # 定时任务定时清理失效数据 task: plc-time: 0 0/1 * * * ?
表结构
-- plc_test definition CREATE TABLE `plc_test` ( `pkid` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '主键id', `json_str` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT 'json格式数据', `create_time` bigint NOT NULL COMMENT '创建时间', PRIMARY KEY (`pkid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='plc存储数据测试表';
启动类
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableJpaRepositories(basePackages = "repository basePackages") @EntityScan("entity basePackages") @EnableScheduling public class PLCStorageApplication { public static void main(String[] args) { SpringApplication.run(PLCStorageApplication.class, args); } }
实体类
import lombok.Builder; import lombok.Data; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; import java.io.Serializable; /** * PLC接收实体 */ @Table(name = "plc_test") @Data @Builder @Entity public class PLCDomain implements Serializable { private static final long serialVersionUID = 4122384962907036649L; @Id @Column(name = "pkid") public String id; @Column(name = "json_str") public String jsonStr; @Column(name = "create_time") private Long createTime; public PLCDomain(String id, String jsonStr,Long createTime) { this.id = id; this.jsonStr = jsonStr; this.createTime = createTime; } public PLCDomain() { } }
jpa 接口
import com.hzh.demo.domain.PLCDomain; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; @Repository public interface PLCRepository extends JpaRepository
{ } 封装获取上下文工具类(ApplicationContextAware)由于加载先后顺序,flink无法使用spring bean注入的方式,特此封装工具类
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.i18n.LocaleContextHolder; import org.springframework.stereotype.Component; @Component public class ApplicationContextProvider implements ApplicationContextAware { /** * 上下文对象实例 */ private static ApplicationContext applicationContext; /** * 获取applicationContext * * @return */ public static ApplicationContext getApplicationContext() { return applicationContext; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { ApplicationContextProvider.applicationContext = applicationContext; } /** * 通过name获取 Bean. * * @param name * @return */ public static Object getBean(String name) { return getApplicationContext().getBean(name); } /** * 通过class获取Bean. * * @param clazz * @param
* @return */ public static T getBean(Class clazz) { return getApplicationContext().getBean(clazz); } /** * 通过name,以及Clazz返回指定的Bean * * @param name * @param clazz * @param * @return */ public static T getBean(String name, Class clazz) { return getApplicationContext().getBean(name, clazz); } /** * 描述 : <获得多语言的资源内容>.
** <使用方法说明> *
* * @param code * @param args * @return */ public static String getMessage(String code, Object[] args) { return getApplicationContext().getMessage(code, args, LocaleContextHolder.getLocale()); } /** * 描述 : <获得多语言的资源内容>.
** <使用方法说明> *
* * @param code * @param args * @param defaultMessage * @return */ public static String getMessage(String code, Object[] args, String defaultMessage) { return getApplicationContext().getMessage(code, args, defaultMessage, LocaleContextHolder.getLocale()); } }FIink 第三方输出(mysql写入)
import com.hzh.demo.config.ApplicationContextProvider; import com.hzh.demo.domain.PLCDomain; import com.hzh.demo.repository.PLCRepository; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import java.util.UUID; /** * 向mysql写入数据 */ @Component @ConditionalOnProperty(name = "customer.flink.plc-status") public class MysqlSink implements SinkFunction
{ private static final Logger log = LoggerFactory.getLogger(MysqlSink.class); @Override public void invoke(String value, Context context) throws Exception { long currentTime = context.currentProcessingTime(); PLCDomain build = PLCDomain.builder() .id(UUID.randomUUID().toString().replaceAll("-", "")) .jsonStr(value) .createTime(currentTime) .build(); PLCRepository repository = ApplicationContextProvider.getBean(PLCRepository.class); repository.save(build); log.info("持久化写入:{}",build); SinkFunction.super.invoke(value, context); } } Flink订阅kafka topic读取持续数据
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Properties; /** * 接收 kafka topic 读取数据 */ @Component @ConditionalOnProperty(name = "customer.flink.plc-status") public class FlinkReceivingPLC { private static final Logger log = LoggerFactory.getLogger(MyKeyedProcessFunction.class); @Value("${spring.kafka.bootstrap-servers:localhost:9092}") private String kafkaServer; @Value("${customer.flink.plc-topic}") private String topic; @Value("${spring.kafka.consumer.group-id:kafka}") private String groupId; @Value("${spring.kafka.consumer.key-deserializer:org.apache.kafka.common.serialization.StringDeserializer}") private String keyDeserializer; @Value("${spring.kafka.consumer.value-deserializer:org.apache.kafka.common.serialization.StringDeserializer}") private String valueDeserializer; /** * 执行方法 * * @throws Exception 异常 */ @PostConstruct public void execute(){ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); //设定全局并发度 env.setParallelism(1); Properties properties = new Properties(); //kafka的节点的IP或者hostName,多个使用逗号分隔 properties.setProperty("bootstrap.servers", kafkaServer); //kafka的消费者的group.id properties.setProperty("group.id", groupId); properties.setProperty("key-deserializer",keyDeserializer); properties.setProperty("value-deserializer",valueDeserializer); FlinkKafkaConsumer
myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); DataStream stream = env.addSource(myConsumer); stream.print().setParallelism(1); stream //分组 .keyBy(new KeySelector () { @Override public String getKey(String value) throws Exception { return value; } }) //指定处理类 // .process(new MyKeyedProcessFunction()) //数据第三方输出,mysql持久化 .addSink(new MysqlSink()); //启动任务 new Thread(() -> { try { env.execute("PLCPersistenceJob"); } catch (Exception e) { log.error(e.toString(), e); } }).start(); } } 失效数据清理机制(为了方便测试,所以清理机制执行频率高且数据失效低)
import com.hzh.demo.repository.PLCRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.Optional; /** * 定时任务配置 */ @Component @Configuration public class QutrzConfig { private static final Logger log = LoggerFactory.getLogger(QutrzConfig.class); @Autowired private PLCRepository plcRepository; /** * 数据清理机制 */ @Scheduled(cron = "${task.plc-time}") private void PLCCleaningMechanism (){ log.info("执行数据清理机制:{}","PLCCleaningMechanism"); long currentTimeMillis = System.currentTimeMillis(); Optional.of(this.plcRepository.findAll()).ifPresent(list ->{ list.forEach(plc ->{ Long createTime = plc.getCreateTime(); //大于1分钟为失效数据 if ((currentTimeMillis - createTime) > (1000 * 60 * 1) ){ this.plcRepository.delete(plc); log.info("过期数据已经被清理:{}",plc); } }); }); } }
测试结果
mysql入库数据
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章