上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【flink-sql实战】flink 主键声明与upsert功能实战

guduadmin11天前

文章目录

  • 一. flink 主键声明语法
  • 二. 物理表创建联合主键表
  • 三. flink sql使用

    一. flink 主键声明语法

    主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。

    主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

     

    有效性检查

    SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查(是否唯一)。

     

    Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。

    注意: 在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

     

    sql声明语法:

    CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
      (
        {  |  |  }[ , ...n]
        [  ]
        [  ][ , ...n]
      )
     ...
    :
      [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
    :
      [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
    ...
    

    联合主键声明

     create table t_sink_01 ( 
    f1 varchar, 
    f2 varchar,
    f3 int,
    f4 timestamp(3),
    f5 varchar, 
    primary key(f1,f2)  NOT ENFORCED  -- 主键声明,字段之间逗号分隔
    )
    with( 
    ..
    ) ;
    

     

     

    二. 物理表创建联合主键表

    CREATE TABLE test003(
         id INT(10),
         name VARCHAR(25),
         age int(10),
         PRIMARY KEY(id,name));
    desc test003
    Field|Type       |Null|Key|Default|Extra|
    -----+-----------+----+---+-------+-----+
    id   |int        |NO  |PRI|       |     |
    name |varchar(25)|NO  |PRI|       |     |
    age  |int        |YES |   |       |     |
    

     

    三. flink sql使用

    CREATE TABLE source
    (   `id` int,
     	`username` varchar,
     	`age` int
    ) WITH (
      'connector' = 'binlog-x'
          ,'username' = 'root'
          ,'password' = '11111111'
          ,'cat' = 'insert,delete,update'
          ,'url' = 'jdbc:mysql://10.17.31.234:3306/360test'
          ,'host' = '10.17.31.234'
          ,'port' = '3306'
          -- 什么都不加:最新位置消费
          -- 加文件名,从此文件开头消费
           ,'journal-name' = 'binlog.000194'
          --  ,'timestamp'='169944781200'
          ,'table' = '360test.dimension_table'
          ,'timestamp-format.standard' = 'SQL'
          );
    CREATE TABLE sink
    (   `id` int,
     	`name` varchar,
     	`age` int,
     	PRIMARY KEY (id,name) NOT ENFORCED
    ) WITH (
            'connector' = 'mysql-x',
               'url' = 'jdbc:mysql://localhost:3306/360test',
               'table-name' = 'test003',
               'username' = 'root',
               'password' = '11111111',
               'sink.buffer-flush.max-rows' = '1024', -- 批量写数据条数,默认:1024
               'sink.buffer-flush.interval' = '10000', -- 批量写时间间隔,默认:10000毫秒
               -- insert时的选项,覆盖或者忽略。
               -- 声明了主键时,设置all-replace为true,全部更新覆盖,
               -- 或者是忽略,即来的新数据不插入?
               'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
                                           -- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。
                                           -- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。
               -- 新增写入选项:默认会判断,当声明了key则是update
               'sink.parallelism' = '1'    -- 写入结果的并行度,默认:null
          );
    insert into sink select id,username as name,age as age  from source;
    

网友评论

搜索
最新文章
热门文章
热门标签