上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

[大数据 Flink,Java实现不同数据库实时数据同步过程]

guduadmin11天前

目录

🌮前言:

🌮实现Mysql同步Es的过程包括以下步骤:

🌮配置Mysql数据库连接

🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代码如下:

🌮最后,将步骤2中读取到的数据封装成一个Flink的DataStream程序,用于后续的数据处理和写入Es中。

🌮配置Elasticsearch连接

🌮在Flink程序中,使用ElasticsearchSinkFunction将数据写入Elasticsearch中。具体代码如下:

🌮实现数据的转换和处理

🌮实现数据的批量写入:

🌮实现实时同步:

🌮依赖:


🌮前言:

     🌮笔记

🌮实现Mysql同步Es的过程包括以下步骤:

  • 配置Mysql数据库连接: 使用Flink的JDBC连接器来连接Mysql数据库,并定义查询语句,获取需要同步的数据。同时,需要在Flink的配置文件中配置Mysql数据库的连接信息。

  • 配置Elasticsearch连接: 使用Flink的Elasticsearch连接器来连接Elasticsearch,并定义索引和类型,用于将同步的数据写入到指定的索引中。同时,需要在Flink的配置文件中配置Elasticsearch的连接信息。

  • 实现数据的转换和处理: 通过Flink的DataStream API,将从Mysql中查询到的数据转换为Elasticsearch中的文档格式,并进行相应的处理和处理,如去重、过滤等。

  • 实现数据的批量写入: 使用Flink的Elasticsearch连接器提供的批量写入接口,将转换后的数据批量写入到Elasticsearch中。

  • 实现实时同步: 将以上步骤组合成一个Flink Job,并通过Flink的DataStream API实现实时同步,即从Mysql数据库中读取到最新的数据,经过转换和处理后,实时写入到Elasticsearch中。

    需要注意的是,在实现实时同步过程中,需要考虑到数据的幂等性和错误处理机制,以保证同步过程的稳定性和可靠性。同时,也需要考虑到数据的增量同步和全量同步的情况,以便根据实际需求进行调整和优化。

    🌮配置Mysql数据库连接

    需要使用Flink的JDBC连接器来连接Mysql数据库,并定义查询语句,获取需要同步的数据。同时,需要在Flink的配置文件中配置Mysql数据库的连接信息。

    # Mysql数据库连接信息

    env.java.opts: "-Dmysql.url=jdbc:mysql://localhost:3306/test -Dmysql.username=root -Dmysql.password=123456"

     

    mysql.url表示Mysql数据库的连接地址,mysql.username表示Mysql数据库的用户名,mysql.password表示Mysql数据库的密码。

    🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代码如下:
    // 定义Mysql数据库连接信息
    String mysqlUrl = System.getProperty("mysql.url");
    String mysqlUsername = System.getProperty("mysql.username");
    String mysqlPassword = System.getProperty("mysql.password");
    // 定义查询语句
    String query = "SELECT * FROM user";
    // 定义JDBC连接器
    JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername("com.mysql.jdbc.Driver")
        .setDBUrl(mysqlUrl)
        .setUsername(mysqlUsername)
        .setPassword(mysqlPassword)
        .setQuery(query)
        .setRowTypeInfo(rowTypeInfo)
        .finish();
    // 读取Mysql数据库中的数据
    DataStream mysqlDataStream = env.createInput(jdbcInputFormat);
    

    rowTypeInfo表示数据类型信息,需要根据Mysql数据库中的表结构来定义。

    🌮最后,将步骤2中读取到的数据封装成一个Flink的DataStream程序,用于后续的数据处理和写入Es中。
    // 将读取到的数据封装成一个Flink的DataStream程序
    DataStream jsonDataStream = mysqlDataStream.map(new MapFunction() {
        @Override
        public String map(Row row) throws Exception {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("id", row.getField(0));
            jsonObject.put("name", row.getField(1));
            jsonObject.put("age", row.getField(2));
            return jsonObject.toJSONString();
        }
    });
    

    🌮配置Elasticsearch连接

    需要配置Elasticsearch连接,使用Flink的Elasticsearch连接器来连接Elasticsearch,并定义索引和类型,用于将同步的数据写入到指定的索引中。同时,需要在Flink的配置文件中配置Elasticsearch的连接信息。

    # Elasticsearch连接信息

    env.java.opts: "-Delasticsearch.hosts=http://localhost:9200"

     

    🌮在Flink程序中,使用ElasticsearchSinkFunction将数据写入Elasticsearch中。具体代码如下:
    // 定义Elasticsearch连接信息
    List httpHosts = new ArrayList<>();
    httpHosts.add(new HttpHost("localhost", 9200, "http"));
    // 定义ElasticsearchSinkFunction
    ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction() {
        @Override
        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
            IndexRequest indexRequest = Requests.indexRequest()
                .index("user")
                .type("_doc")
                .source(element, XContentType.JSON);
            indexer.add(indexRequest);
        }
    });
    // 将数据写入Elasticsearch中
    jsonDataStream.addSink(esSinkBuilder.build());
    

    httpHosts表示Elasticsearch的连接地址,ElasticsearchSinkFunction用于将数据写入Elasticsearch中。在ElasticsearchSinkFunction中,可以定义索引和类型,用于将数据写入到指定的索引中。

    以上代码中,将数据写入到名为"user"的索引中,类型为"_doc"。同时,使用IndexRequest将数据写入Elasticsearch中。

    🌮实现数据的转换和处理
    • 在第二步中,已经将从Mysql中查询到的数据转换成了JSON格式。接下来,需要将JSON格式的数据转换成Elasticsearch中的文档格式。可以使用Elasticsearch的Bulk API来实现。

    • 在转换成Elasticsearch中的文档格式之前,需要进行去重操作,避免重复写入相同的数据。可以使用Flink的KeyedStream API来实现。

      // 将JSON格式的数据转换成Elasticsearch中的文档格式
      DataStream esDataStream = jsonDataStream.map(new MapFunction() {
          @Override
          public IndexRequest map(String json) throws Exception {
              JSONObject jsonObject = JSON.parseObject(json);
              String id = jsonObject.getString("id");
              IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
              indexRequest.source(json, XContentType.JSON);
              return indexRequest;
          }
      });
      // 进行去重操作
      KeyedStream keyedStream = esDataStream.keyBy(new KeySelector() {
          @Override
          public String getKey(IndexRequest indexRequest) throws Exception {
              return indexRequest.id();
          }
      });
      // 将去重后的数据写入Elasticsearch中
      keyedStream.addSink(esSinkBuilder.build());
      

      使用MapFunction将JSON格式的数据转换成Elasticsearch中的文档格式。在转换成Elasticsearch中的文档格式之前,使用KeyedStream API进行去重操作,避免重复写入相同的数据。最后,将去重后的数据写入Elasticsearch中。

      🌮实现数据的批量写入:

      在第三步中已经使用了Elasticsearch的Bulk API来实现将转换后的数据批量写入到Elasticsearch中。具体代码如下:

      // 将JSON格式的数据转换成Elasticsearch中的文档格式
      DataStream esDataStream = jsonDataStream.map(new MapFunction() {
          @Override
          public IndexRequest map(String json) throws Exception {
              JSONObject jsonObject = JSON.parseObject(json);
              String id = jsonObject.getString("id");
              IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
              indexRequest.source(json, XContentType.JSON);
              return indexRequest;
          }
      });
      // 进行去重操作
      KeyedStream keyedStream = esDataStream.keyBy(new KeySelector() {
          @Override
          public String getKey(IndexRequest indexRequest) throws Exception {
              return indexRequest.id();
          }
      });
      // 将去重后的数据写入Elasticsearch中
      ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction() {
          @Override
          public void process(IndexRequest indexRequest, RuntimeContext ctx, RequestIndexer indexer) {
              indexer.add(indexRequest);
          }
      });
      keyedStream.addSink(esSinkBuilder.build());
      

      在ElasticsearchSinkFunction中,使用RequestIndexer将数据批量写入到Elasticsearch中。需要注意的是,ElasticsearchSinkFunction的泛型类型需要与KeyedStream的泛型类型保持一致。

      以上代码中,使用KeyedStream API进行去重操作,避免重复写入相同的数据。最后,使用Elasticsearch的Bulk API将去重后的数据批量写入到Elasticsearch中。

      🌮实现实时同步:
      // 定义Mysql数据库连接信息
      String mysqlUrl = System.getProperty("mysql.url");
      String mysqlUsername = System.getProperty("mysql.username");
      String mysqlPassword = System.getProperty("mysql.password");
      // 定义查询语句
      String query = "SELECT * FROM user";
      // 定义JDBC连接器
      JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
          .setDrivername("com.mysql.jdbc.Driver")
          .setDBUrl(mysqlUrl)
          .setUsername(mysqlUsername)
          .setPassword(mysqlPassword)
          .setQuery(query)
          .setRowTypeInfo(rowTypeInfo)
          .finish();
      // 读取Mysql数据库中的数据
      DataStream mysqlDataStream = env.createInput(jdbcInputFormat);
      // 将读取到的数据转换成JSON格式
      DataStream jsonDataStream = mysqlDataStream.map(new MapFunction() {
          @Override
          public String map(Row row) throws Exception {
              JSONObject jsonObject = new JSONObject();
              jsonObject.put("id", row.getField(0));
              jsonObject.put("name", row.getField(1));
              jsonObject.put("age", row.getField(2));
              return jsonObject.toJSONString();
          }
      });
      // 将JSON格式的数据转换成Elasticsearch中的文档格式
      DataStream esDataStream = jsonDataStream.map(new MapFunction() {
          @Override
          public IndexRequest map(String json) throws Exception {
              JSONObject jsonObject = JSON.parseObject(json);
              String id = jsonObject.getString("id");
              IndexRequest indexRequest = new IndexRequest("user", "_doc", id);
              indexRequest.source(json, XContentType.JSON);
              return indexRequest;
          }
      });
      // 进行去重操作
      KeyedStream keyedStream = esDataStream.keyBy(new KeySelector() {
          @Override
          public String getKey(IndexRequest indexRequest) throws Exception {
              return indexRequest.id();
          }
      });
      // 将去重后的数据写入Elasticsearch中
      List httpHosts = new ArrayList<>();
      httpHosts.add(new HttpHost("localhost", 9200, "http"));
      ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction() {
          @Override
          public void process(IndexRequest indexRequest, RuntimeContext ctx, RequestIndexer indexer) {
              indexer.add(indexRequest);
          }
      });
      keyedStream.addSink(esSinkBuilder.build());
      // 执行Flink程序
      env.execute("Mysql to Es");
      

      🌮依赖:

      
          
              org.apache.flink
              flink-java
              1.13.2
          
          
              org.apache.flink
              flink-streaming-java_2.12
              1.13.2
          
          
              org.apache.flink
              flink-connector-jdbc_2.12
              1.13.2
          
          
              org.apache.flink
              flink-connector-elasticsearch7_2.12
              1.13.2
          
          
              com.alibaba
              fastjson
              1.2.76
          
          
              org.elasticsearch.client
              elasticsearch-rest-high-level-client
              7.15.0
          
      
      

      flink-java、flink-streaming-java_2.12、flink-connector-jdbc_2.12、flink-connector-elasticsearch7_2.12是Flink的核心依赖;fastjson是用于将数据转换成JSON格式的依赖;elasticsearch-rest-high-level-client是Elasticsearch的Java客户端依赖。

网友评论

搜索
最新文章
热门文章
热门标签