目录
- 一、collect
- 二、count
- 三、first
- 四、take
- 五、takeOrdered
- 六、countByKey
- 七、foreach
- 八、简单案例
- 九、一个综合案例
- 9.1 需求1的实现
- 9.2 需求2的实现
- 9.3 需求3的实现
一、collect
函数签名:def collect(): Array[T]
功能说明:收集每个分区数据,以数组Array的形式封装后发给driver。设置driver内存:bin/spark-submit --driver-memory 10G(内存大小)
注意:collect会把所有分区的数据全部拉取到driver端,如果数据量过大,可能内存溢出。
import org.apache.spark.{SparkConf, SparkContext} object actons_demo { def main(args: Array[String]): Unit = { //创建sparkcontext val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) //创建数据 val rdd = sc.parallelize(List(2, 3, 0, 4, 1), 2) println(rdd.collect.toList) } }
图1 结果 from pyspark import SparkConf,SparkContext # 创建sparkcontext conf = SparkConf().setAppName("test").setMaster("local[*]") sc = SparkContext(conf=conf) # 创建数据 datas = sc.parallelize([2, 3, 0, 4, 1],numSlices=2) # 使用collect print(datas.collect()) # 关闭sparkcontext sc.stop()
图2 结果 二、count
返回RDD中元素的个数
import org.apache.spark.{SparkConf, SparkContext} object actons_demo { def main(args: Array[String]): Unit = { //创建sparkcontext val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) //创建数据 val rdd = sc.parallelize(List(2, 3, 0, 4, 1), 2) println(rdd.count) } }
图3 结果 from pyspark import SparkConf,SparkContext # 创建sparkcontext conf = SparkConf().setAppName("test").setMaster("local[*]") sc = SparkContext(conf=conf) # 创建数据 datas = sc.parallelize([2, 3, 0, 4, 1],numSlices=2) # 使用count print(datas.count()) # 关闭sparkcontext sc.stop()
图4 结果 三、first
返回RDD中的第一个元素
first首先启动一个job从0号分区获取数据,如果0号分区没有数据,则启动第二个job从其他分区获取数据
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} object actons_demo { def main(args: Array[String]): Unit = { //创建sparkcontext val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) //创建数据 val rdd = sc.parallelize(List(2, 3, 0, 4, 1), 2) println(rdd.first()) //利用哈希分区器重新分区 val rdd2 = rdd.map(x=>(x,null)).partitionBy(new HashPartitioner(10)) println(rdd2.first()) } }
图5 结果 from pyspark import SparkConf, SparkContext # 创建sparkcontext conf = SparkConf().setAppName("test").setMaster("local[*]") sc = SparkContext(conf=conf) # 创建数据 datas = sc.parallelize([2, 3, 0, 4, 1], numSlices=2) # 使用first print(datas.first()) # 重新分区 datas2 = datas.map(lambda x: (x, None)).partitionBy(10, lambda x: x % 10) # 使用first print(datas2.first()) # 关闭sparkcontext sc.stop()
图6 结果 四、take
返回一个由RDD的前n个元素组成的数组
首先启动一个job从0号分区获取n个数据,如果0号分区没有n个数据,则启动第二个job从其他分区继续获取剩余数据
import org.apache.spark.{SparkConf, SparkContext} object actons_demo { def main(args: Array[String]): Unit = { //创建sparkcontext val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) //创建数据 val rdd = sc.parallelize(List(2, 3, 0, 4, 1), 2) println(rdd.take(2).toList) } }
图7 结果 from pyspark import SparkConf, SparkContext # 创建sparkcontext conf = SparkConf().setAppName("test").setMaster("local[*]") sc = SparkContext(conf=conf) # 创建数据 datas = sc.parallelize([2, 3, 0, 4, 1], numSlices=2) # 使用take print(datas.take(2)) # 关闭sparkcontext sc.stop()
图8 结果 五、takeOrdered
返回RDD排序后的前n个元素组成的数组。全局有序排列后的结果
import org.apache.spark.{SparkConf, SparkContext} object actons_demo { def main(args: Array[String]): Unit = { //创建sparkcontext val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) //创建数据 val rdd = sc.parallelize(List(7,2,3,4,9,0,-5,-2,8,6,5),4) println(rdd.takeOrdered(4).toList) } }
图9 结果 from pyspark import SparkConf, SparkContext # 创建sparkcontext conf = SparkConf().setAppName("test").setMaster("local[*]") sc = SparkContext(conf=conf) # 创建数据 datas = sc.parallelize([7,2,3,4,9,0,-5,-2,8,6,5], numSlices=4) # 使用takeOrdered print(datas.takeOrdered(4)) # 关闭sparkcontext sc.stop()
图10 结果 六、countByKey
函数签名:def countByKey(): Map[K, Long]
功能说明:统计每种key的个数
import org.apache.spark.{SparkConf, SparkContext} object actons_demo { def main(args: Array[String]): Unit = { //创建sparkcontext val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) //创建数据a val rdd = sc.parallelize(List("aa"->11,("aa",5),"bb"->4,"bb"->11,("bb",9),"cc"->1)) println(rdd.countByKey().toList) } }
图11 结果 from pyspark import SparkConf, SparkContext # 创建sparkcontext conf = SparkConf().setAppName("test").setMaster("local[*]") sc = SparkContext(conf=conf) # 创建数据 datas = sc.parallelize([("aa",11),("aa",5),("bb",4),("bb",11),("bb",9),("cc",1)]) # 使用countByKey print(datas.countByKey()) # 关闭sparkcontext sc.stop()
图12 结果 七、foreach
遍历RDD中每一个元素
import org.apache.spark.{SparkConf, SparkContext} object actons_demo { def main(args: Array[String]): Unit = { //创建sparkcontext val conf = new SparkConf().setAppName("test").setMaster("local[*]") val sc = new SparkContext(conf) //创建数据 val rdd = sc.parallelize(List("aa"->11,("aa",5),"bb"->4,"bb"->11,("bb",9),"cc"->1)) rdd.foreach(println) } }
图13 结果 from pyspark import SparkConf, SparkContext # 创建sparkcontext conf = SparkConf().setAppName("test").setMaster("local[*]") sc = SparkContext(conf=conf) # 创建数据 datas = sc.parallelize([("aa",11),("aa",5),("bb",4),("bb",11),("bb",9),("cc",1)]) # 使用foreach datas.foreach(print) # 关闭sparkcontext sc.stop()
图14 结果 八、简单案例
java spark hadoop spark java python spark C++ python
统计word.txt中每个单词的数量,并按照数量多少从高到低排序
import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { //创建sparkcontext对象 val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]") val sc = new SparkContext(conf) //读取数据 val datas = sc.textFile("hdfs://hadoop101:8020/input/word.txt") //处理 // 实现思路:按空格切分单词,并压平 -> 处理成键值对,键是单词,值是1 -> 按照键分组统计 -> 排序 val res1 = datas.flatMap(_.split(" ")). map(x => (x, 1)). reduceByKey(_ + _). sortBy(_._1, ascending = false) println(s"结果:${res1.collect.toList}") } }
图15 结果 from pyspark import SparkConf, SparkContext # 创建sparkcontext conf = SparkConf().setAppName("test").setMaster("local[*]") sc = SparkContext(conf=conf) # 读取数据 datas = sc.textFile("hdfs:///input/word.txt") # 处理 # 实现思路:按空格切分单词,并压平 -> 处理成键值对,键是单词,值是1 -> 按照键分组统计 -> 排序 res = datas. \ flatMap(lambda x: x.split(" ")). \ map(lambda x: (x, 1)). \ reduceByKey(lambda x, y: x + y). \ sortBy(lambda x: x[1]) print(f"结果为:{res.collect()}") # 关闭sparkcontext sc.stop()
图16 结果 九、一个综合案例
00:00:02*****7139261067744087*****[党宁的博客]*****5*****4*****blog.sina.com.cn/u/1232113891*****党宁 的 博客 00:00:02*****8181820631750396*****[窗外]*****5*****2*****www.zhulang.com/4212/index.html*****窗外 00:00:02*****28556208222257873*****[2008年新兴行业]*****3*****15*****zhidao.baidu.com/question/48881311*****2008 年 新兴 行业 00:00:02*****3648075681022645*****[耐克凉鞋]*****4*****18*****auction1.taobao.com/auction/item_detail-0db2-a4938311dd2df6585692bee8e67ce6e6.jhtml*****耐克 凉鞋 00:00:02*****6317584696510536*****[哄抢救灾物资]*****1*****1*****news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml*****哄抢 救灾物资
以上数据是2008年用户在搜狗搜索上的部分搜索情况。
数据数据格式:访问时间 用户ID 查询词 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL 查询词的分词结果。
数据利用五个*进行分割,最后一个*****的右边是对查询词分词后的结果。查询词分词结果用空格进行分割
需求如下:
- 搜索关键词的词频统计:按词频数量倒序排序
- 用户搜索点击统计:用户id+用户搜索内容的点击数,逆序排序
- 搜索时间段统计:统计不同时间范围内容搜索次数,按照小时统计
9.1 需求1的实现
要统计关键词的词频,需要先对用户输入的查询词进行分词处理,然后再按照分词后的结果进行词频统计。由于文本文件中的数据已经对查询词进行了分词,所以直接读取分词结果就可以。再对分词结果进行切分,统计词频,最后再倒序输出。
import org.apache.spark.{SparkConf, SparkContext} object demo1 { def main(args: Array[String]): Unit = { //创建sparkcontext对象 val conf = new SparkConf().setAppName("demo1").setMaster("local[*]") val sc = new SparkContext(conf) //读取数据 val datas = sc.textFile("hdfs://hadoop101:8020/data/sougou/output.txt") //处理 // 实现思路: // 1.获取查询词的分词结果,并利用空格进行切分 // 2.对查询词分词后的结果进行分组统计 // 3.对词频倒序排序 val res1 = datas.map(x => x.split("\\*\\*\\*\\*\\*")). flatMap(x=>x.last.split(" ")). map(x => (x, 1)). reduceByKey(_ + _). sortBy(_._2, ascending = false) println(res1.take(20).toList) } }
图17 结果 from pyspark import SparkConf, SparkContext from pprint import pprint # 创建sparkcontext conf = SparkConf().setAppName("demo1").setMaster("local[*]") sc = SparkContext(conf=conf) # 读取数据 print("读取数据...") datas = sc.textFile("hdfs:data/sougou/output.txt") print("数据读取完毕!") # 处理 # 数据格式:访问时间 用户ID 查询词 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL 查询词的分词结果。 # 样例数据:00:00:02*****28556208222257873*****[2008年新兴行业]*****3*****15*****zhidao.baidu.com/question/48881311*****2008 年 新兴 行业 # 需要对查询词进行分词 print("开始处理...") # 实现思路: # 1.获取查询词的分词结果,并利用空格进行切分 # 2.对查询词分词后的结果进行分组统计 # 3.对词频倒序排序 datas_query = datas.map(lambda x:x.split("*****")[-1]).\ flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False) print("处理完毕!") pprint(f"前20个结果为:{datas_query.take(20)}") # 关闭sparkcontext sc.stop()
图18 结果 9.2 需求2的实现
首先拼接用户id和用户搜索内容,再对统计数量,最后再倒序输出。
import org.apache.spark.{SparkConf, SparkContext} object Demo1 { def main(args: Array[String]): Unit = { //创建sparkcontext对象 val conf = new SparkConf().setAppName("demo1").setMaster("local[*]") val sc = new SparkContext(conf) //读取数据 val datas = sc.textFile("hdfs://hadoop101:8020/data/sougou/output.txt") // 实现思路: // 1.获取数据,并利用分隔符*****进行切分 // 2.合并用户ID和查询词,同时增加一个计数1 // 3.按照键进行分组统计 // 4.对词频倒序排序 //scala中的*号是特殊符号,需要转义 val datas_query = datas.map(x => x.split("\\*\\*\\*\\*\\*")). map(x=>(x(1)+x(2),1)). reduceByKey(_+_).sortBy(_._2,ascending = false) println(datas_query.take(20).toList) sc.stop() } }
图19 结果 from pyspark import SparkConf, SparkContext from pprint import pprint # 创建sparkcontext conf = SparkConf().setAppName("demo1").setMaster("local[*]") sc = SparkContext(conf=conf) # 读取数据 print("读取数据...") datas = sc.textFile("hdfs:data/sougou/output.txt") print("数据读取完毕!") # 处理 # 数据格式:访问时间 用户ID 查询词 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL 查询词的分词结果。 # 样例数据:00:00:02*****28556208222257873*****[2008年新兴行业]*****3*****15*****zhidao.baidu.com/question/48881311*****2008 年 新兴 行业 # 需要对查询词进行分词 print("开始处理...") # 实现思路: # 1.获取数据,并利用分隔符*****进行切分 # 2.合并用户ID和查询词,同时增加一个计数1 # 3.按照键进行分组统计 # 4.对词频倒序排序 datas_query = datas.\ map(lambda x:x.split("*****")).\ map(lambda x:(x[1]+x[2],1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False) print("处理完毕!") pprint(f"前20个结果为:{datas_query.take(20)}") # 关闭sparkcontext sc.stop()
图20 结果 9.3 需求3的实现
统计不同时间范围内容搜索次数,按照小时统计
选取时间列,构造以小时为键,1为值的键值对,再对结果进行聚合统计,最后再倒序输出。
import org.apache.spark.{SparkConf, SparkContext} object Demo1 { def main(args: Array[String]): Unit = { //创建sparkcontext对象 val conf = new SparkConf().setAppName("demo1").setMaster("local[*]") val sc = new SparkContext(conf) //读取数据 val datas = sc.textFile("hdfs://hadoop101:8020/data/sougou/output.txt") // 实现思路: // 1.获取数据,并利用分隔符*****进行切分 // 2.对时间进行分割,以小时为键,1为值,构建键值对 // 3.按照键进行分组统计 // 4.对词频倒序排序 //scala中的*号是特殊符号,需要转义 val datas_query = datas.map(x => x.split("\\*\\*\\*\\*\\*")). map(x=>(x(0).split(":")(0),1)). reduceByKey(_+_). sortBy(_._2,ascending = false) println(datas_query.take(20).toList) sc.stop() } }
图21 结果 from pyspark import SparkConf, SparkContext from pprint import pprint # 创建sparkcontext conf = SparkConf().setAppName("demo1").setMaster("local[*]") sc = SparkContext(conf=conf) # 读取数据 print("读取数据...") datas = sc.textFile("hdfs:data/sougou/output.txt") print("数据读取完毕!") # 处理 # 数据格式:访问时间 用户ID 查询词 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL 查询词的分词结果。 # 样例数据:00:00:02*****28556208222257873*****[2008年新兴行业]*****3*****15*****zhidao.baidu.com/question/48881311*****2008 年 新兴 行业 # 需要对查询词进行分词 print("开始处理...") # 实现思路: # 1.获取数据,并利用分隔符*****进行切分 # 2.对时间进行分割,以小时为键,1为值,构建键值对 # 3.按照键进行分组统计 # 4.对词频倒序排序 datas_query = datas.\ map(lambda x:x.split("*****")).\ map(lambda x:(x[0].split(":")[0],1)).\ reduceByKey(lambda x,y:x+y).\ sortBy(lambda x:x[1],ascending=False) print("处理完毕!") pprint(f"前20个结果为:{datas_query.take(20)}") # 关闭sparkcontext sc.stop()
图22 结果
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章