上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

spark之action算子学习笔记(scala,pyspark双语言)

guduadmin13小时前

目录

  • 一、collect
  • 二、count
  • 三、first
  • 四、take
  • 五、takeOrdered
  • 六、countByKey
  • 七、foreach
  • 八、简单案例
  • 九、一个综合案例
    • 9.1 需求1的实现
    • 9.2 需求2的实现
    • 9.3 需求3的实现

      一、collect

      函数签名:def collect(): Array[T]

      功能说明:收集每个分区数据,以数组Array的形式封装后发给driver。设置driver内存:bin/spark-submit --driver-memory 10G(内存大小)

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第1张

      注意:collect会把所有分区的数据全部拉取到driver端,如果数据量过大,可能内存溢出。

      import org.apache.spark.{SparkConf, SparkContext}
      object actons_demo {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //创建数据
          val rdd = sc.parallelize(List(2, 3, 0, 4, 1), 2)
          println(rdd.collect.toList)
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第2张

      图1 结果
      from pyspark import SparkConf,SparkContext
      # 创建sparkcontext
      conf = SparkConf().setAppName("test").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 创建数据
      datas = sc.parallelize([2, 3, 0, 4, 1],numSlices=2)
      # 使用collect
      print(datas.collect())
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第3张

      图2 结果

      二、count

      返回RDD中元素的个数

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第4张

      import org.apache.spark.{SparkConf, SparkContext}
      object actons_demo {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //创建数据
          val rdd = sc.parallelize(List(2, 3, 0, 4, 1), 2)
          println(rdd.count)
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第5张

      图3 结果
      from pyspark import SparkConf,SparkContext
      # 创建sparkcontext
      conf = SparkConf().setAppName("test").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 创建数据
      datas = sc.parallelize([2, 3, 0, 4, 1],numSlices=2)
      # 使用count
      print(datas.count())
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第6张

      图4 结果

      三、first

      返回RDD中的第一个元素

      first首先启动一个job从0号分区获取数据,如果0号分区没有数据,则启动第二个job从其他分区获取数据

      import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
      object actons_demo {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //创建数据
          val rdd = sc.parallelize(List(2, 3, 0, 4, 1), 2)
          println(rdd.first())
          //利用哈希分区器重新分区
          val rdd2 = rdd.map(x=>(x,null)).partitionBy(new HashPartitioner(10))
          println(rdd2.first())
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第7张

      图5 结果
      from pyspark import SparkConf, SparkContext
      # 创建sparkcontext
      conf = SparkConf().setAppName("test").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 创建数据
      datas = sc.parallelize([2, 3, 0, 4, 1], numSlices=2)
      # 使用first
      print(datas.first())
      # 重新分区
      datas2 = datas.map(lambda x: (x, None)).partitionBy(10, lambda x: x % 10)
      # 使用first
      print(datas2.first())
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第8张

      图6 结果

      四、take

      返回一个由RDD的前n个元素组成的数组

      首先启动一个job从0号分区获取n个数据,如果0号分区没有n个数据,则启动第二个job从其他分区继续获取剩余数据

      import org.apache.spark.{SparkConf, SparkContext}
      object actons_demo {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //创建数据
          val rdd = sc.parallelize(List(2, 3, 0, 4, 1), 2)
          println(rdd.take(2).toList)
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第9张

      图7 结果
      from pyspark import SparkConf, SparkContext
      # 创建sparkcontext
      conf = SparkConf().setAppName("test").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 创建数据
      datas = sc.parallelize([2, 3, 0, 4, 1], numSlices=2)
      # 使用take
      print(datas.take(2))
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第10张

      图8 结果

      五、takeOrdered

      返回RDD排序后的前n个元素组成的数组。全局有序排列后的结果

      import org.apache.spark.{SparkConf, SparkContext}
      object actons_demo {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //创建数据
          val rdd = sc.parallelize(List(7,2,3,4,9,0,-5,-2,8,6,5),4)
          println(rdd.takeOrdered(4).toList)
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第11张

      图9 结果
      from pyspark import SparkConf, SparkContext
      # 创建sparkcontext
      conf = SparkConf().setAppName("test").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 创建数据
      datas = sc.parallelize([7,2,3,4,9,0,-5,-2,8,6,5], numSlices=4)
      # 使用takeOrdered
      print(datas.takeOrdered(4))
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第12张

      图10 结果

      六、countByKey

      函数签名:def countByKey(): Map[K, Long]

      功能说明:统计每种key的个数

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第13张

      import org.apache.spark.{SparkConf, SparkContext}
      object actons_demo {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //创建数据a
          val rdd = sc.parallelize(List("aa"->11,("aa",5),"bb"->4,"bb"->11,("bb",9),"cc"->1))
          println(rdd.countByKey().toList)
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第14张

      图11 结果
      from pyspark import SparkConf, SparkContext
      # 创建sparkcontext
      conf = SparkConf().setAppName("test").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 创建数据
      datas = sc.parallelize([("aa",11),("aa",5),("bb",4),("bb",11),("bb",9),("cc",1)])
      # 使用countByKey
      print(datas.countByKey())
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第15张

      图12 结果

      七、foreach

      遍历RDD中每一个元素

      import org.apache.spark.{SparkConf, SparkContext}
      object actons_demo {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext
          val conf = new SparkConf().setAppName("test").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //创建数据
          val rdd = sc.parallelize(List("aa"->11,("aa",5),"bb"->4,"bb"->11,("bb",9),"cc"->1))
          rdd.foreach(println)
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第16张

      图13 结果
      from pyspark import SparkConf, SparkContext
      # 创建sparkcontext
      conf = SparkConf().setAppName("test").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 创建数据
      datas = sc.parallelize([("aa",11),("aa",5),("bb",4),("bb",11),("bb",9),("cc",1)])
      # 使用foreach
      datas.foreach(print)
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第17张

      图14 结果

      八、简单案例

      java spark
      hadoop spark java
      python spark
      C++ python
      

      统计word.txt中每个单词的数量,并按照数量多少从高到低排序

      import org.apache.spark.{SparkConf, SparkContext}
      object WordCount {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext对象
          val conf = new SparkConf().setAppName("wordcount").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //读取数据
          val datas = sc.textFile("hdfs://hadoop101:8020/input/word.txt")
          //处理
          // 实现思路:按空格切分单词,并压平 -> 处理成键值对,键是单词,值是1 -> 按照键分组统计 -> 排序
          val res1 = datas.flatMap(_.split(" ")).
            map(x => (x, 1)).
            reduceByKey(_ + _).
            sortBy(_._1, ascending = false)
          println(s"结果:${res1.collect.toList}")
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第18张

      图15 结果
      from pyspark import SparkConf, SparkContext
      # 创建sparkcontext
      conf = SparkConf().setAppName("test").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 读取数据
      datas = sc.textFile("hdfs:///input/word.txt")
      # 处理
      # 实现思路:按空格切分单词,并压平 -> 处理成键值对,键是单词,值是1 -> 按照键分组统计 -> 排序
      res = datas. \
          flatMap(lambda x: x.split(" ")). \
          map(lambda x: (x, 1)). \
          reduceByKey(lambda x, y: x + y). \
          sortBy(lambda x: x[1])
      print(f"结果为:{res.collect()}")
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第19张

      图16 结果

      九、一个综合案例

      00:00:02*****7139261067744087*****[党宁的博客]*****5*****4*****blog.sina.com.cn/u/1232113891*****党宁 的 博客
      00:00:02*****8181820631750396*****[窗外]*****5*****2*****www.zhulang.com/4212/index.html*****窗外
      00:00:02*****28556208222257873*****[2008年新兴行业]*****3*****15*****zhidao.baidu.com/question/48881311*****2008 年 新兴 行业
      00:00:02*****3648075681022645*****[耐克凉鞋]*****4*****18*****auction1.taobao.com/auction/item_detail-0db2-a4938311dd2df6585692bee8e67ce6e6.jhtml*****耐克 凉鞋
      00:00:02*****6317584696510536*****[哄抢救灾物资]*****1*****1*****news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml*****哄抢 救灾物资
      

      以上数据是2008年用户在搜狗搜索上的部分搜索情况。

      数据数据格式:访问时间 用户ID 查询词 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL 查询词的分词结果。

      数据利用五个*进行分割,最后一个*****的右边是对查询词分词后的结果。查询词分词结果用空格进行分割

      需求如下:

      1. 搜索关键词的词频统计:按词频数量倒序排序
      2. 用户搜索点击统计:用户id+用户搜索内容的点击数,逆序排序
      3. 搜索时间段统计:统计不同时间范围内容搜索次数,按照小时统计

      9.1 需求1的实现

      要统计关键词的词频,需要先对用户输入的查询词进行分词处理,然后再按照分词后的结果进行词频统计。由于文本文件中的数据已经对查询词进行了分词,所以直接读取分词结果就可以。再对分词结果进行切分,统计词频,最后再倒序输出。

      import org.apache.spark.{SparkConf, SparkContext}
      object demo1 {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext对象
          val conf = new SparkConf().setAppName("demo1").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //读取数据
          val datas = sc.textFile("hdfs://hadoop101:8020/data/sougou/output.txt")
          //处理
          // 实现思路:
          // 1.获取查询词的分词结果,并利用空格进行切分
          // 2.对查询词分词后的结果进行分组统计
          // 3.对词频倒序排序
          val res1 = datas.map(x => x.split("\\*\\*\\*\\*\\*")).
            flatMap(x=>x.last.split(" ")).
            map(x => (x, 1)).
            reduceByKey(_ + _).
            sortBy(_._2, ascending = false)
          println(res1.take(20).toList)
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),在这里插入图片描述,第20张

      图17 结果
      from pyspark import SparkConf, SparkContext
      from pprint import pprint
      # 创建sparkcontext
      conf = SparkConf().setAppName("demo1").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 读取数据
      print("读取数据...")
      datas = sc.textFile("hdfs:data/sougou/output.txt")
      print("数据读取完毕!")
      # 处理
      # 数据格式:访问时间         用户ID                查询词   该URL在返回结果中的排名  用户点击的顺序号  用户点击的URL               查询词的分词结果。
      # 样例数据:00:00:02*****28556208222257873*****[2008年新兴行业]*****3*****15*****zhidao.baidu.com/question/48881311*****2008 年 新兴 行业
      # 需要对查询词进行分词
      print("开始处理...")
      # 实现思路:
      # 1.获取查询词的分词结果,并利用空格进行切分
      # 2.对查询词分词后的结果进行分组统计
      # 3.对词频倒序排序
      datas_query = datas.map(lambda x:x.split("*****")[-1]).\
          flatMap(lambda x:x.split(" ")).map(lambda x:(x,1)).\
          reduceByKey(lambda x,y:x+y).\
          sortBy(lambda x:x[1],ascending=False)
      print("处理完毕!")
      pprint(f"前20个结果为:{datas_query.take(20)}")
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第21张

      图18 结果

      9.2 需求2的实现

      首先拼接用户id和用户搜索内容,再对统计数量,最后再倒序输出。

      import org.apache.spark.{SparkConf, SparkContext}
      object Demo1 {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext对象
          val conf = new SparkConf().setAppName("demo1").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //读取数据
              val datas = sc.textFile("hdfs://hadoop101:8020/data/sougou/output.txt")
          // 实现思路:
          //  1.获取数据,并利用分隔符*****进行切分
          //  2.合并用户ID和查询词,同时增加一个计数1
          //  3.按照键进行分组统计
          //  4.对词频倒序排序
          //scala中的*号是特殊符号,需要转义
          val datas_query = datas.map(x => x.split("\\*\\*\\*\\*\\*")).
            map(x=>(x(1)+x(2),1)).
            reduceByKey(_+_).sortBy(_._2,ascending = false)
          println(datas_query.take(20).toList)
          sc.stop()
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第22张

      图19 结果
      from pyspark import SparkConf, SparkContext
      from pprint import pprint
      # 创建sparkcontext
      conf = SparkConf().setAppName("demo1").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 读取数据
      print("读取数据...")
      datas = sc.textFile("hdfs:data/sougou/output.txt")
      print("数据读取完毕!")
      # 处理
      # 数据格式:访问时间         用户ID                查询词   该URL在返回结果中的排名  用户点击的顺序号  用户点击的URL               查询词的分词结果。
      # 样例数据:00:00:02*****28556208222257873*****[2008年新兴行业]*****3*****15*****zhidao.baidu.com/question/48881311*****2008 年 新兴 行业
      # 需要对查询词进行分词
      print("开始处理...")
      # 实现思路:
      # 1.获取数据,并利用分隔符*****进行切分
      # 2.合并用户ID和查询词,同时增加一个计数1
      # 3.按照键进行分组统计
      # 4.对词频倒序排序
      datas_query = datas.\
          map(lambda x:x.split("*****")).\
          map(lambda x:(x[1]+x[2],1)).\
          reduceByKey(lambda x,y:x+y).\
          sortBy(lambda x:x[1],ascending=False)
      print("处理完毕!")
      pprint(f"前20个结果为:{datas_query.take(20)}")
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第23张

      图20 结果

      9.3 需求3的实现

      统计不同时间范围内容搜索次数,按照小时统计

      选取时间列,构造以小时为键,1为值的键值对,再对结果进行聚合统计,最后再倒序输出。

      import org.apache.spark.{SparkConf, SparkContext}
      object Demo1 {
        def main(args: Array[String]): Unit = {
          //创建sparkcontext对象
          val conf = new SparkConf().setAppName("demo1").setMaster("local[*]")
          val sc = new SparkContext(conf)
          //读取数据
              val datas = sc.textFile("hdfs://hadoop101:8020/data/sougou/output.txt")
          // 实现思路:
          //  1.获取数据,并利用分隔符*****进行切分
          //  2.对时间进行分割,以小时为键,1为值,构建键值对
          //  3.按照键进行分组统计
          //  4.对词频倒序排序
          //scala中的*号是特殊符号,需要转义
          val datas_query = datas.map(x => x.split("\\*\\*\\*\\*\\*")).
            map(x=>(x(0).split(":")(0),1)).
            reduceByKey(_+_).
            sortBy(_._2,ascending = false)
          println(datas_query.take(20).toList)
          sc.stop()
        }
      }
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第24张

      图21 结果
      from pyspark import SparkConf, SparkContext
      from pprint import pprint
      # 创建sparkcontext
      conf = SparkConf().setAppName("demo1").setMaster("local[*]")
      sc = SparkContext(conf=conf)
      # 读取数据
      print("读取数据...")
      datas = sc.textFile("hdfs:data/sougou/output.txt")
      print("数据读取完毕!")
      # 处理
      # 数据格式:访问时间         用户ID                查询词   该URL在返回结果中的排名  用户点击的顺序号  用户点击的URL               查询词的分词结果。
      # 样例数据:00:00:02*****28556208222257873*****[2008年新兴行业]*****3*****15*****zhidao.baidu.com/question/48881311*****2008 年 新兴 行业
      # 需要对查询词进行分词
      print("开始处理...")
      # 实现思路:
      # 1.获取数据,并利用分隔符*****进行切分
      # 2.对时间进行分割,以小时为键,1为值,构建键值对
      # 3.按照键进行分组统计
      # 4.对词频倒序排序
      datas_query = datas.\
          map(lambda x:x.split("*****")).\
          map(lambda x:(x[0].split(":")[0],1)).\
          reduceByKey(lambda x,y:x+y).\
          sortBy(lambda x:x[1],ascending=False)
      print("处理完毕!")
      pprint(f"前20个结果为:{datas_query.take(20)}")
      # 关闭sparkcontext
      sc.stop()
      

      spark之action算子学习笔记(scala,pyspark双语言),image.png,第25张

      图22 结果

网友评论

搜索
最新文章
热门文章
热门标签