1.数据
订单表,分别是店铺id、用户id和支付金额
"店铺id,用户id,支付金额", "shop-1,user-1,1", "shop-1,user-2,1", "shop-1,user-2,1", "shop-1,user-3,1", "shop-1,user-3,1", "shop-1,user-1,1", "shop-1,user-2,1", "shop-1,user-4,1", "shop-2,user-4,1", "shop-2,user-4,1", "shop-2,user-2,1"
2.可运行案例
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class Test03 { public static void main(String[] args) throws Exception { // 1. 创建流式执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.创建表执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 3.读取数据源 SingleOutputStreamOperatorjsonStream = env .fromElements("shop-1,user-1,1", "shop-1,user-2,1", "shop-1,user-2,1", "shop-1,user-3,1", "shop-1,user-3,1", "shop-1,user-1,1", "shop-1,user-2,1", "shop-1,user-4,1", "shop-2,user-4,1", "shop-2,user-4,1", "shop-2,user-2,1" ); // 4.流转换为表 Table table = tableEnv.fromDataStream(jsonStream); // 5. 把注册为一个临时视图 tableEnv.createTemporaryView("tableTmp", table); // 6.求每个商店的用户数 Table table1 = tableEnv.sqlQuery("select shop_id,sum(num) as num,sum(gmv) as gmv from (select shop_id,user_id, 1 as num,sum(gmv) as gmv from (select SPLIT_INDEX(f0,',',0) as shop_id,SPLIT_INDEX(f0,',',1) as user_id,cast(SPLIT_INDEX(f0,',',2) as bigint) as gmv from tableTmp) t1 group by shop_id,user_id) t2 group by shop_id"); // 7.打印 tableEnv.toRetractStream(table1, Row.class).print(">>>>>>"); // 8.执行 env.execute("test"); } }
sql:
select shop_id, sum(num) as num, sum(gmv) as gmv from ( select shop_id, user_id, 1 as num, sum(gmv) as gmv from ( select SPLIT_INDEX(f0, ',', 0) as shop_id, SPLIT_INDEX(f0, ',', 1) as user_id, cast(SPLIT_INDEX(f0, ',', 2) as bigint) as gmv from tableTmp ) t1 group by shop_id, user_id ) t2 group by shop_id
3.运行结果
>>>>>>:7> (true,+U[shop-2, 2, 3])
>>>>>>:1> (true,+U[shop-1, 4, 8])
>>>>>>:7> (true,+I[shop-2, 1, 1]) >>>>>>:1> (true,+I[shop-1, 1, 1]) >>>>>>:1> (false,-U[shop-1, 1, 1]) >>>>>>:7> (false,-U[shop-2, 1, 1]) >>>>>>:1> (true,+U[shop-1, 2, 2]) >>>>>>:7> (true,+U[shop-2, 2, 2]) >>>>>>:1> (false,-U[shop-1, 2, 2]) >>>>>>:7> (false,-U[shop-2, 2, 2]) >>>>>>:1> (true,+U[shop-1, 1, 1]) >>>>>>:7> (true,+U[shop-2, 1, 1]) >>>>>>:1> (false,-U[shop-1, 1, 1]) >>>>>>:7> (false,-U[shop-2, 1, 1]) >>>>>>:7> (true,+U[shop-2, 2, 3]) >>>>>>:1> (true,+U[shop-1, 2, 3]) >>>>>>:1> (false,-U[shop-1, 2, 3]) >>>>>>:1> (true,+U[shop-1, 3, 4]) >>>>>>:1> (false,-U[shop-1, 3, 4]) >>>>>>:1> (true,+U[shop-1, 2, 3]) >>>>>>:1> (false,-U[shop-1, 2, 3]) >>>>>>:1> (true,+U[shop-1, 3, 5]) >>>>>>:1> (false,-U[shop-1, 3, 5]) >>>>>>:1> (true,+U[shop-1, 2, 3]) >>>>>>:1> (false,-U[shop-1, 2, 3]) >>>>>>:1> (true,+U[shop-1, 3, 6]) >>>>>>:1> (false,-U[shop-1, 3, 6]) >>>>>>:1> (true,+U[shop-1, 4, 7]) >>>>>>:1> (false,-U[shop-1, 4, 7]) >>>>>>:1> (true,+U[shop-1, 3, 6]) >>>>>>:1> (false,-U[shop-1, 3, 6]) >>>>>>:1> (true,+U[shop-1, 4, 8])
4.原理
Flink回撤流原理
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章