日志数据结构
业务数据:数据都是MySQL中的表格数据, 使用Flink SQL 处理
日志数据:分为page页面日志(页面信息,曝光信息,动作信息,报错信息)和启动日志(启动信息,报错信息),使用Flink Stream API处理
五种日志数据:
- “start”; 启动信息
- “err”; 错误信息
- “display”; 曝光信息
- “action”; 动作信息
- “page”; 页面信息
"actions": [ { "action_id": "cart_add", "item": "3", "item_type": "sku_id", "ts": 1645926900000 } ], "common": { "ar": "4", "ba": "xiaomi", "ch": "xiaomi", "is_new": "0", "md": "xiaomi 12 ultra ", "mid": "mid_409", "os": "Android 13.0", "sid": "1879f0a8-2218-48ce-aa2d-efb32f2f6b7a", "uid": "57", "vc": "v2.1.134" }, "page": { "during_time": 14499, "from_pos_id": 10, "from_pos_seq": 11, "item": "3", "item_type": "sku_id", "last_page_id": "good_list", "page_id": "good_detail" }, "ts": 1645926900000 "displays": [ { "item": "31", "item_type": "sku_id", "pos_id": 4, "pos_seq": 0 }, { "item": "1", "item_type": "sku_id", "pos_id": 4, "pos_seq": 1 }, { "item": "3", "item_type": "sku_id", "pos_id": 4, "pos_seq": 2 }, { "item": "30", "item_type": "sku_id", "pos_id": 4, "pos_seq": 3 }, { "item": "33", "item_type": "sku_id", "pos_id": 4, "pos_seq": 4 }, { "item": "30", "item_type": "sku_id", "pos_id": 4, "pos_seq": 5 }, { "item": "15", "item_type": "sku_id", "pos_id": 4, "pos_seq": 6 }, { "item": "11", "item_type": "sku_id", "pos_id": 4, "pos_seq": 7 } ], "start": { "entry": "icon", "loading_time": 1234, "open_ad_id": 10, "open_ad_ms": 6169, "open_ad_skip_ms": 56163 },
日志数据处理主要逻辑
- 数据清洗
- 新老访客状态标记修复
- 埋点策略
- 网页端:cookie中设置首日访问标记
- 小程序端:小程序的缓存里面创建首日标记
- APP端:手机的本地缓存
- 如果客户删除本地缓存或cookie时,导致老客户可以这样变更为新客户。
- 因此需要将访客是否新客户的标记存储到服务端,记录下首日访问的日志,当客户访问时就只需查询该客户对应的状态即可。
- 如果is_new的值为1
- 状态为null, 正常
- 状态不为null,但是第一次登录日期不等于今天,修复为老客户
- 如果is_new的值为0
- 状态为null, 将状态的日期设置为昨天
- 状态不为null, 正常的
- 如果is_new的值为1
- 埋点策略
//1. 获取当前数据的is_new字段 JSONObject common = value.getJSONObject("common"); String isNew = common.getString("is_new"); String firstLoginDt = firstLoginDtState.value(); Long ts = value.getLong("ts"); String curDt = DateFormatUtil.tsToDate(ts); if("1".equals(isNew)){ //判断当前状态 if(firstLoginDt !=null && !firstLoginDt.equals(curDt)){ }else if(firstLoginDt == null){ //状态为空,确实为新用户,更新登录时间状态 firstLoginDtState.update(curDt); }else{//留空,同一天新访客重复登录 } }else if("0".equals(isNew)){ //老用户,flink实时数仓没有记录相应的登录时间 if(firstLoginDt == null){ //把访客登录日期补充一个值,使用昨天的日期 firstLoginDtState.update(DateFormatUtil.tsToDate(ts - 24*60*60*1000L)); }else{ //留空 //正常情况,不需要修复 } }else{ //当前数据is_new不为0也不为1,错误数据 }
- 分流拆分数据,按照信息类型进行拆分。使用process的侧输出流来对不同数据打上对应的标签。主流中存储最常见的页面信息,其余分别是曝光信息,动作信息,启动信息,报错信息都使用侧输出流。
- 写出到不同的kafka主题中
- 测试消费不同主题的数据,查看是否写入成功和检查数据格式
常见报错及解决思路
- 出现空指针异常,并且报错为Could not forward element to next operator,主要是以下两个错误原因:
- 数据流中没有ts时间戳字段java.lang.NullPointerException,
- 数据流中没有添加水位线Assigned key must not be null!,用来进行keyby的字段不能为空
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章