这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务
- 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.sql.PreparedStatement; import java.sql.SQLException; /** * @Author: J * @Version: 1.0 * @CreateTime: 2023/8/2 * @Description: 测试 **/ public class FlinkJdbcSink { public static void main(String[] args) throws Exception { // 构建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可 DataStreamSource
customizeSource = env.addSource(new CustomizeSource()); // 构建jdbc sink SinkFunction jdbcSink = JdbcSink.sink( "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句 new JdbcStatementBuilder () { @Override public void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException { pStmt.setString(1, customizeBean.getName()); pStmt.setInt(2, customizeBean.getAge()); pStmt.setString(3, customizeBean.getGender()); pStmt.setString(4, customizeBean.getHobbit()); } }, // 字段映射配置,这部分就和常规的java api差不多了 JdbcExecutionOptions.builder() .withBatchSize(10) // 批次大小,条数 .withBatchIntervalMs(5000) // 批次最大等待时间 .withMaxRetries(1) // 重复次数 .build(), // 写入参数配置 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("com.mysql.jdbc.Driver") .withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false") .withUsername("root") .withPassword("password") .build() // jdbc信息配置 ); // 添加jdbc sink customizeSource.addSink(jdbcSink); env.execute(); } } - 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.function.SerializableSupplier; import javax.sql.XADataSource; /** * @Author: J * @Version: 1.0 * @CreateTime: 2023/8/2 * @Description: 测试 **/ public class FlinkJdbcSink { public static void main(String[] args) throws Exception { // 构建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可 DataStreamSource
customizeSource = env.addSource(new CustomizeSource()); // 每20秒作为checkpoint的一个周期 env.enableCheckpointing(20000); // 两次checkpoint间隔最少是10秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000); // 程序取消或者停止时不删除checkpoint env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // checkpoint必须在60秒结束,否则将丢弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只能有一个checkpoint env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 设置EXACTLY_ONCE语义,默认就是这个 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // checkpoint存储位置 env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint"); // 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpoint SinkFunction exactlyOneJdbcSink = JdbcSink.exactlyOnceSink( "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句 (JdbcStatementBuilder ) (pStmt, customizeBean) -> { pStmt.setString(1, customizeBean.getName()); pStmt.setInt(2, customizeBean.getAge()); pStmt.setString(3, customizeBean.getGender()); pStmt.setString(4, customizeBean.getHobbit()); }, // 字段映射配置,这部分就和常规的java api差不多了 JdbcExecutionOptions.builder() .withMaxRetries(0) // 设置重复次数 .withBatchSize(25) // 设置批次大小,数据条数 .withBatchIntervalMs(1000) // 批次最大等待时间 .build(), JdbcExactlyOnceOptions.builder() // 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的 .withTransactionPerConnection(true) .build(), (SerializableSupplier ) () -> { // XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接 MysqlXADataSource mysqlXADataSource = new MysqlXADataSource(); mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置url mysqlXADataSource.setUser("root"); // 设置用户 mysqlXADataSource.setPassword("password"); // 设置密码 return mysqlXADataSource; } ); // 添加jdbc sink customizeSource.addSink(exactlyOneJdbcSink); env.execute(); } } - pom依赖
org.apache.flink flink-connector-jdbc ${flink.version} mysql mysql-connector-java 8.0.28 - 结果
jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.
- 结果
- pom依赖
- 事务代码
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章