头歌的大数据作业,答案没找着,遂自己整了一份
第1关:SparkSql 数据清洗
任务描述
本关任务:将出租车轨迹数据规整化,清洗掉多余的字符串。
相关知识
为了完成本关任务,你需要掌握:1. 如何使用 SparkSQL 读取 CSV 文件,2. 如何使用正则表达式清洗掉多余字符串。
编程要求
在右侧编辑器补充代码,将出租车轨迹数据规整化,清洗掉多余的字符串,并使用 DataFrame.show() 打印输出。
# -*- coding: UTF-8 -*- from pyspark.sql import SparkSession if __name__ =='__main__': spark = SparkSession.builder.master("local").appName("demo").getOrCreate() #**********begin**********# df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data.csv") df.createTempView("data") spark.sql(""" select regexp_replace(TRIP_ID,'\\W+','') as TRIP_ID , regexp_replace(CALL_TYPE,'\\W+','') as CALL_TYPE , regexp_replace(ORIGIN_CALL,'\\W+','') as ORIGIN_CALL , regexp_replace(TAXI_ID,'\\W+','') as TAXI_ID , regexp_replace(ORIGIN_STAND,'\\W+','') as ORIGIN_STAND , regexp_replace(TIMESTAMP,'\\W+','') as TIMESTAMP , regexp_replace(POLYLINE,'\\W+','') as POLYLINE from data """).show() #**********end**********# spark.stop()
第2关:SparkSql数据分析
任务描述
本关任务:使用 SparkSQL 完成数据分析。
相关知识
为了完成本关任务,你需要掌握:如何使用 SparkSQL 进行数据分析
# -*- coding: UTF-8 -*- from pyspark.sql import SparkSession import json if __name__ == '__main__' : spark = SparkSession.builder.master("local").appName("demo").getOrCreate() #**********begin**********# df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data2.csv") df.createTempView("data") spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL, TAXI_ID, ORIGIN_STAND, from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME ,POLYLINE from data").show() spark.udf.register("timeLen", lambda x: { (len(json.loads(x)) - 1) * 15 if len(json.loads(x)) > 0 else 8 }) spark.udf.register("startLocation", lambda x: { str(json.loads(x)[0]) if len(json.loads(x)) > 0 else "" }) spark.udf.register( "endLocation", lambda x: { str(json.loads(x)[len(json.loads(x)) - 1]) if len(json.loads(x)) > 0 else "" }) df.createTempView("data2") res=spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME, POLYLINE, timeLen(POLYLINE) as TIMELEN, startLocation(POLYLINE) as STARTLOCATION, endLocation(POLYLINE) as ENDLOCATION from data2") res.createTempView("data3") res.show() spark.sql("select CALL_TYPE,TIME,count(1) as NUM from data3 group by TIME,CALL_TYPE order by CALL_TYPE,TIME").show() #**********end**********#
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章