目录
1. 简述Spark SQL与HIVE的对比
2. Spark SQL是什么?
3.代码题
需求1 先将RDD转换DataFrame,完成SparkSQL版的WordCount词频统计。DSL和SQL两种方式都要实现
4.创建Spark DataFrame的几种方式?
5. 创建得到DataFrame的方式有哪些,各自适用场景是怎么样的?
3.1 text方式读取:
3.2 CSV方式读取:
3.3 JSON读取数据:
1. 简述Spark SQL与HIVE的对比
相同点:
1.都是分布式SQL计算引擎
2.都可以处理大规模的结构化数据
3.都可以建立在YARN集群之上运行
不同点:
1. Sparksql是基于内存计算 , Hivesql底层是运行在Mr上,也就是基于磁盘进行计算
2. sparksql没有元数据管理服务, hivesql是有metastore元数据管理服务的
3. Sparksql底层执行RDD程序 , HIVEsql底层执行MapReduce
4. Sparksql可以编写sql也可以编写代码, HIVEsql只能编写sql
2. Spark SQL是什么?
SparkSQL是建立在Spark上的一个工具模块,用于处理结构化的数据
3.代码题
需求1 先将RDD转换DataFrame,完成SparkSQL版的WordCount词频统计。DSL和SQL两种方式都要实现
测试数据: hello spark hadoop hive oozie sqoop hello hive hadoop java java python hadoop hive hadoop
import os from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession import pyspark.sql.functions as F # 绑定指定的Python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' # 绑定指定的Python解释器 from pyspark.sql.types import StructType, IntegerType, StringType, StructField if __name__ == '__main__': # 1- 创建SparkSession对象 spark = SparkSession.builder\ .appName('RDD转DataFrame')\ .master('local[*]')\ .getOrCreate() # 通过SparkSession得到SparkContext sc = spark.sparkContext # 2- 数据输入 # 2.1- 创建一个RDD init_rdd = sc.textFile('hdfs://node1:8020/input/words.txt') # 2.2- 将RDD的数据结构转换成二维结构 new_rdd = init_rdd.flatMap(lambda line:line.split()).map(lambda line:(line,1)) print(new_rdd.collect()) ''' [('hadoop', 1), ('hive', 1), ('hadoop', 1), ('sqoop', 1), ('hive', 1), ('sqoop', 1), ('hadoop', 1), ('zookeeper', 1), ('hive', 1), ('hue', 1), ('hue', 1), ('sqoop', 1), ('hue', 1), ('zookeeper', 1), ('hive', 1), ('spark', 1), ('oozie', 1), ('spark', 1), ('hadoop', 1), ('oozie', 1), ('hive', 1), ('oozie', 1), ('spark', 1), ('hadoop', 1)] ''' init_df = new_rdd.toDF(schema=['word','cnt']) ''' +---------+---+ | word|cnt| +---------+---+ | hadoop| 1| | hive| 1| | hadoop| 1| | sqoop| 1| | hive| 1| | sqoop| 1| | hadoop| 1| |zookeeper| 1| | hive| 1| | hue| 1| | hue| 1| | sqoop| 1| | hue| 1| |zookeeper| 1| | hive| 1| | spark| 1| | oozie| 1| | spark| 1| | hadoop| 1| | oozie| 1| +---------+---+ only showing top 20 rows root |-- word: string (nullable = true) |-- cnt: long (nullable = true) ''' init_df.show() init_df.printSchema() print('sql方法进行词频统计') # 创建临时视图 init_df.createTempView('word_table') spark.sql(""" select word,count(1) as cnt from word_table group by word order by word desc """).show() print('DSL方法进行词频统计') init_df.select( 'word','cnt' ).groupby('word').agg( F.count('word').alias('count_col') ).withColumnRenamed('word','bigdata_col')\ .orderBy('count_col',ascending=False).show() # 5- 释放资源 spark.stop() sc.stop()
4.创建Spark DataFrame的几种方式?
1 . 通过RDD得到DataFrame
2. 内部初始化数据得到DataFrame
3. 读取外部文件得到DataFrame
5. 创建得到DataFrame的方式有哪些,各自适用场景是怎么样的?
1 . RDD转DataFrame , 场景 : RDD可以存储任意结构的数据类型,而DataFrame只能存储二维表结构化数据, 在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者非结构化的,那么可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发高效率的SparkSQL进行后续数据处理;
2. 内部初始化数据得到DataFrame , 通过createDataFrame创建DataFrame , 一般用在开发和测试中.因为只能处理少量的数据
3. 读取外部文件得到DataFrame , Text方式\CSV方式\JSON方式 ;
3.1 text方式读取:
不管文件内容如何,会将所有内容放到一个列中;
默认生成的列名叫做value,数据类型String;并且只能修改value的名称,其他内容无法修改;
3.2 CSV方式读取:
常设置的参数
path:指定文件路径,本地或者hdfs
schema:手动指定元数据信息
sep:指定字段间的分隔符
encoding:指定文件的编码方式
header:指定文件中的第一行是否是字段名称
inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确
3.3 JSON读取数据:
需要手动指定schema信息.如果手动指定的时候,名称字段与json中的key名称不一致,会解析不成功, 以null值填充
csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章