上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Flink电商实时数仓(四)

guduadmin13小时前

日志数据结构

业务数据:数据都是MySQL中的表格数据, 使用Flink SQL 处理

日志数据:分为page页面日志(页面信息,曝光信息,动作信息,报错信息)和启动日志(启动信息,报错信息),使用Flink Stream API处理

五种日志数据:

  • “start”; 启动信息
  • “err”; 错误信息
  • “display”; 曝光信息
  • “action”; 动作信息
  • “page”; 页面信息
    "actions": [
            {
                "action_id": "cart_add",
                "item": "3",
                "item_type": "sku_id",
                "ts": 1645926900000
            }
        ],
        "common": {
            "ar": "4",
            "ba": "xiaomi",
            "ch": "xiaomi",
            "is_new": "0",
            "md": "xiaomi 12 ultra ",
            "mid": "mid_409",
            "os": "Android 13.0",
            "sid": "1879f0a8-2218-48ce-aa2d-efb32f2f6b7a",
            "uid": "57",
            "vc": "v2.1.134"
        },
        "page": {
            "during_time": 14499,
            "from_pos_id": 10,
            "from_pos_seq": 11,
            "item": "3",
            "item_type": "sku_id",
            "last_page_id": "good_list",
            "page_id": "good_detail"
        },
        "ts": 1645926900000
        "displays": [
            {
                "item": "31",
                "item_type": "sku_id",
                "pos_id": 4,
                "pos_seq": 0
            },
            {
                "item": "1",
                "item_type": "sku_id",
                "pos_id": 4,
                "pos_seq": 1
            },
            {
                "item": "3",
                "item_type": "sku_id",
                "pos_id": 4,
                "pos_seq": 2
            },
            {
                "item": "30",
                "item_type": "sku_id",
                "pos_id": 4,
                "pos_seq": 3
            },
            {
                "item": "33",
                "item_type": "sku_id",
                "pos_id": 4,
                "pos_seq": 4
            },
            {
                "item": "30",
                "item_type": "sku_id",
                "pos_id": 4,
                "pos_seq": 5
            },
            {
                "item": "15",
                "item_type": "sku_id",
                "pos_id": 4,
                "pos_seq": 6
            },
            {
                "item": "11",
                "item_type": "sku_id",
                "pos_id": 4,
                "pos_seq": 7
            }
        ],
    "start": {
            "entry": "icon",
            "loading_time": 1234,
            "open_ad_id": 10,
            "open_ad_ms": 6169,
            "open_ad_skip_ms": 56163
        },
    

    日志数据处理主要逻辑

    1. 数据清洗
    2. 新老访客状态标记修复
      • 埋点策略
        • 网页端:cookie中设置首日访问标记
        • 小程序端:小程序的缓存里面创建首日标记
        • APP端:手机的本地缓存
        • 如果客户删除本地缓存或cookie时,导致老客户可以这样变更为新客户。
        • 因此需要将访客是否新客户的标记存储到服务端,记录下首日访问的日志,当客户访问时就只需查询该客户对应的状态即可。
          • 如果is_new的值为1
            • 状态为null, 正常
            • 状态不为null,但是第一次登录日期不等于今天,修复为老客户
            • 如果is_new的值为0
              • 状态为null, 将状态的日期设置为昨天
              • 状态不为null, 正常的
    //1. 获取当前数据的is_new字段
       JSONObject common = value.getJSONObject("common");
          String isNew = common.getString("is_new");
          String firstLoginDt = firstLoginDtState.value();
          Long ts = value.getLong("ts");
          String curDt = DateFormatUtil.tsToDate(ts);
          if("1".equals(isNew)){
              //判断当前状态
              if(firstLoginDt !=null && !firstLoginDt.equals(curDt)){
              }else if(firstLoginDt == null){
                  //状态为空,确实为新用户,更新登录时间状态
                  firstLoginDtState.update(curDt);
              }else{//留空,同一天新访客重复登录
              }
          }else if("0".equals(isNew)){
              //老用户,flink实时数仓没有记录相应的登录时间
              if(firstLoginDt == null){
                  //把访客登录日期补充一个值,使用昨天的日期
                  firstLoginDtState.update(DateFormatUtil.tsToDate(ts - 24*60*60*1000L));
              }else{
                  //留空
                  //正常情况,不需要修复
              }
          }else{
              //当前数据is_new不为0也不为1,错误数据
          }
    
    1. 分流拆分数据,按照信息类型进行拆分。使用process的侧输出流来对不同数据打上对应的标签。主流中存储最常见的页面信息,其余分别是曝光信息,动作信息,启动信息,报错信息都使用侧输出流。
    2. 写出到不同的kafka主题中
    3. 测试消费不同主题的数据,查看是否写入成功和检查数据格式

    常见报错及解决思路

    1. 出现空指针异常,并且报错为Could not forward element to next operator,主要是以下两个错误原因:
      • 数据流中没有ts时间戳字段java.lang.NullPointerException,
      • 数据流中没有添加水位线Assigned key must not be null!,用来进行keyby的字段不能为空

网友评论

搜索
最新文章
热门文章
热门标签