SPARK–RDD
1、RDD的介绍
- RDD 弹性分布式数据集合
- 是Spark中的一种数据类型,管理spark的内存数据 [1,2,3,4]
- spark中还有dataframe,dataset类型
- 拓展:开发中可以通过类的形式自定以数据类型
- 同时还提供各种计算方法
- 弹性
- 可以对海量数据根据需求分成多份(分区),每一份数据会有对应的task线程执行计算
- [1,2,3,4,5,6]
- [[1,2],[3,4],[5,6]]
- 分布式
- 利用集群中多台机器资源进行计算
- 数据集合
- 规定数据形式 类似Python中的列表 []
2、RDD的特性
- 分区
- 可以将计算的海量数据分成多份,需要分成多少可分区可以通过方法指定
- 每个分区都可以对应一个task线程执行计算
- 只读
- rdd中的数据不能直接修改,需要通过方法计算后得到一个新的rdd
- rdd本身存储的数只能读取
- 依赖
- rdd之间是有依赖关系的
- 新的rdd是通过旧的rdd计算得到
- 缓存
- 可以将计算的中结果缓存起来,如果后续计算错误时,可以从缓存位置重新计算
- 将数据存储在内存或本地磁盘
- 作用是容错
- 缓存在执行计算任务程序结束后会释放删除
- checkpoint
- 作用和缓存一样
- checkpoint可以将数据存储在分布式存储系统中,比如hdfs
3、创建RDD数据
将需要计算的数据转为rdd的数据,就可以利用spark的内存计算方法进行分布式计算操作,这些计算方法就是有rdd提供的
rdd数据的转发方法是有sparkcontext提供的,所以需要先生成sparkcontext,
SparkContext称为Spark的入口类
3.1、Python数据转化为rdd
只要是能被遍历的,都能转化为RDD数据
# 导入sparkcontext from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext() # 将Python数据转为rdd # data_int = 10 # 数值类型不能转化rdd # 能for循环遍历的数据都能转为rdd data_str = 'abc' data_list = [1, 2, 3, 4] data_dict = {'a': 1, 'b': 2} data_set = {1, 2, 3, 4} data_tuple = (1, 2, 3, 4) rdd = sc.parallelize(data_tuple) # rdd的数据输出展示 # 获取所有rdd数据 res = rdd.collect() print(res)
3.2、文件数据(hdfs)转化为rdd
8020是namenode默认的端口号
# 将读取的hdfs文件数据转为rdd from pyspark import SparkContext # 生成SparkContext类对象 sc = SparkContext() # 读取文件数据转为rdd rdd1 = sc.textFile('hdfs://node1:8020/data') # 8020是namenode端口号? # 读取目录下的所有文件 简写如果报错就写全写,也就是上面的内容 rdd3 = sc.textFile('/data') # 只读取单独文件 rdd2 = sc.textFile('/data/words.txt') # 查看数据 res = rdd1.collect() print(res) res = rdd2.collect() print(res)
3.3、rdd的分区
python数据转发的分区数指定 # RDD分区使用 # 导入sparkcontext from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext() # 创建生成rdd是可以指定分区数 # Python数据转为rdd指定 # numSlices 可以指定分区数 rdd_py = sc.parallelize([1,2,3,4,5,6],numSlices=10) # 查看rdd分区数据 res1 = rdd_py.glom().collect() print(res1)
读取的文件数据进行分区数指定 # RDD分区使用 # 导入sparkcontext from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext() # 创建生成rdd是可以指定分区数 # file文件读取数据指定分区数据 # minPartitions 指定分区 # 文件大小/分区数 = 值 -----余数 # 余数/值 * 100%=百分比 百分比大于10% 会多创建一个分区 rdd_file = sc.textFile('hdfs://node1:8020/data',minPartitions=1) # 在spark并行度部分会讲解如何根据资源设置分区数 # rdd计算 # 查看rdd分区数据 res2 = rdd_file.glom().collect() print(res2)
3.3.3、小文件数据读取
一个分区对应一个task线程,当小文件过多时,会占用大量的线程,造成资源浪费
使用wholeTextFiles方法可以解决
该方法会现将读取到的数据合并在一起,然后重新进行分区
# 导入sparkcontext from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext(master='yarn') # rdd = sc.textFile('hdfs://node1:8020/data') # rdd计算 # wholeTextFiles 会合并小文件数据 # minPartitions 指定分区数 rdd_mini = sc.wholeTextFiles('hdfs://node1:8020/data',minPartitions=1) # 展示数据 # res1 = rdd.glom().collect() # print(res1) res2 = rdd_mini.glom().collect() print(res2)
4、常用RDD算子
4.1、 算子(方法)介绍
rdd中封装了各种算子方便进行计算,主要分为两类
- transformation
- 转化算子 对rdd数据进行转化计算得到新的rdd ,定义了一个线程任务
- action
- 执行算子 触发计算任务,让计算任务进行执行,得到结果
- 触发线程执行的
4.2、常用transformation算子
- map
- rdd.map(lambda 参数:参数计算)
- 参数接受每个元素数据
- flatMap
- 处理的是二维嵌套列表数据 [[1,2,3],[4,5,6],[7,8,9]]
- rdd.flatMap(lambda 参数:[参数计算])
- fliter
- rdd.filter(lambda 参数:参数条件过滤)
- 条件过滤的书写和Python中if判断一样
- map
- transformation
- 分区
- 规定数据形式 类似Python中的列表 []
- 是Spark中的一种数据类型,管理spark的内存数据 [1,2,3,4]
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章