上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink之JDBC Sink

guduadmin11天前

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务

  • 非事务代码
    import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
    import org.apache.flink.connector.jdbc.JdbcSink;
    import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/2
     * @Description: 测试
     **/
    public class FlinkJdbcSink {
        public static void main(String[] args) throws Exception {
            // 构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
            DataStreamSource customizeSource = env.addSource(new CustomizeSource());
            // 构建jdbc sink
            SinkFunction jdbcSink = JdbcSink.sink(
                    "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
                    new JdbcStatementBuilder() {
                        @Override
                        public void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {
                            pStmt.setString(1, customizeBean.getName());
                            pStmt.setInt(2, customizeBean.getAge());
                            pStmt.setString(3, customizeBean.getGender());
                            pStmt.setString(4, customizeBean.getHobbit());
                        }
                    }, // 字段映射配置,这部分就和常规的java api差不多了
                    JdbcExecutionOptions.builder()
                            .withBatchSize(10) // 批次大小,条数
                            .withBatchIntervalMs(5000) // 批次最大等待时间
                            .withMaxRetries(1) // 重复次数
                            .build(), // 写入参数配置
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withDriverName("com.mysql.jdbc.Driver")
                            .withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false")
                            .withUsername("root")
                            .withPassword("password")
                            .build() // jdbc信息配置
            );
            // 添加jdbc sink
            customizeSource.addSink(jdbcSink);
            env.execute();
        }
    }
    
    • 事务代码
      import com.mysql.cj.jdbc.MysqlXADataSource;
      import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
      import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
      import org.apache.flink.connector.jdbc.JdbcSink;
      import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
      import org.apache.flink.streaming.api.CheckpointingMode;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.CheckpointConfig;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.SinkFunction;
      import org.apache.flink.util.function.SerializableSupplier;
      import javax.sql.XADataSource;
      /**
       * @Author: J
       * @Version: 1.0
       * @CreateTime: 2023/8/2
       * @Description: 测试
       **/
      public class FlinkJdbcSink {
          public static void main(String[] args) throws Exception {
              // 构建流环境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
              DataStreamSource customizeSource = env.addSource(new CustomizeSource());
              // 每20秒作为checkpoint的一个周期
              env.enableCheckpointing(20000);
              // 两次checkpoint间隔最少是10秒
              env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
              // 程序取消或者停止时不删除checkpoint
              env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
              // checkpoint必须在60秒结束,否则将丢弃
              env.getCheckpointConfig().setCheckpointTimeout(60000);
              // 同一时间只能有一个checkpoint
              env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
              // 设置EXACTLY_ONCE语义,默认就是这个
              env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
              // checkpoint存储位置
              env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");
              // 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpoint
              SinkFunction exactlyOneJdbcSink = JdbcSink.exactlyOnceSink(
                      "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
                      (JdbcStatementBuilder) (pStmt, customizeBean) -> {
                          pStmt.setString(1, customizeBean.getName());
                          pStmt.setInt(2, customizeBean.getAge());
                          pStmt.setString(3, customizeBean.getGender());
                          pStmt.setString(4, customizeBean.getHobbit());
                      }, // 字段映射配置,这部分就和常规的java api差不多了
                      JdbcExecutionOptions.builder()
                              .withMaxRetries(0) // 设置重复次数
                              .withBatchSize(25) // 设置批次大小,数据条数
                              .withBatchIntervalMs(1000) // 批次最大等待时间
                              .build(),
                      JdbcExactlyOnceOptions.builder()
                              // 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的
                              .withTransactionPerConnection(true)
                              .build(),
                      (SerializableSupplier) () -> {
                          // XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接
                          MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
                          mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置url
                          mysqlXADataSource.setUser("root"); // 设置用户
                          mysqlXADataSource.setPassword("password"); // 设置密码
                          return mysqlXADataSource;
                      }
              );
              // 添加jdbc sink
              customizeSource.addSink(exactlyOneJdbcSink);
              env.execute();
          }
      }
      
      • pom依赖
                
                
                
                
                    org.apache.flink
                    flink-connector-jdbc
                    ${flink.version}
                
                
                
                    mysql
                    mysql-connector-java
                    8.0.28
                
        
        • 结果

          Flink之JDBC Sink,在这里插入图片描述,第1张

          jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

网友评论

搜索
最新文章
热门文章
热门标签