上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

SpringBoot 使用 Spark

guduadmin13小时前

文章目录

  • 读取 txt 文件
  • 读取 csv 文件
  • 读取 MySQL 数据库表
  • 读取 Json 文件
  • 中文输出乱码

    前提: 可以参考文章 SpringBoot 接入 Spark

    1. SpringBoot 已经接入 Spark
    2. 已配置 JavaSparkContext
    3. 已配置 SparkSession
    @Resource
    private SparkSession sparkSession;
    @Resource
    private JavaSparkContext javaSparkContext;
    

     

    读取 txt 文件

    测试文件 word.txt

    SpringBoot 使用 Spark,在这里插入图片描述,第1张

    java 代码

    • textFile:获取文件内容,返回 JavaRDD
    • flatMap:过滤数据
    • mapToPair:把每个元素都转换成一个类型的对象,如 <123,1>,<456,1>
    • reduceByKey:对相同key的数据集进行预聚合
      public void testSparkText() {
          String file = "D:\\TEMP\\word.txt";
          JavaRDD fileRDD =  javaSparkContext.textFile(file);
          JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
          JavaPairRDD wordAndOneRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
          JavaPairRDD wordAndCountRDD = wordAndOneRDD.reduceByKey((a, b) -> a + b);
          //输出结果
          List> result = wordAndCountRDD.collect();
          result.forEach(System.out::println);
      }
      

      结果得出,123 有 3 个,456 有 2 个,789 有 1 个

      SpringBoot 使用 Spark,在这里插入图片描述,第2张

       

      读取 csv 文件

      测试文件 testcsv.csv

      SpringBoot 使用 Spark,在这里插入图片描述,第3张

      java 代码

      public void testSparkCsv() {
          String file = "D:\\TEMP\\testcsv.csv";
          JavaRDD fileRDD = javaSparkContext.textFile(file);
          JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
          //输出结果
          System.out.println(wordsRDD.collect());
      }
      

      输出结果

      SpringBoot 使用 Spark,在这里插入图片描述,第4张

       

      读取 MySQL 数据库表

      • format:获取数据库建议是 jdbc
      • option.url:添加 MySQL 连接 url
      • option.user:MySQL 用户名
      • option.password:MySQL 用户密码
      • option.dbtable:sql 语句
      • option.driver:数据库 driver,MySQL 使用 com.mysql.cj.jdbc.Driver
      • public void testSparkMysql() throws IOException {
            
            Dataset jdbcDF = sparkSession.read()
                    .format("jdbc")
                    .option("url", "jdbc:mysql://192.168.140.1:3306/user?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
                    .option("dbtable", "(SELECT * FROM xxxtable) tmp")
                    .option("user", "root")
                    .option("password", "xxxxxxxxxx*k")
                    .option("driver", "com.mysql.cj.jdbc.Driver")
                    .load();
            jdbcDF.printSchema();
            jdbcDF.show();
            //转化为RDD
            JavaRDD rowJavaRDD = jdbcDF.javaRDD();
            System.out.println(rowJavaRDD.collect());
        }
        

        也可以把表内容输出到文件,添加以下代码

        List list = rowJavaRDD.collect();
        BufferedWriter bw;
        bw = new BufferedWriter(new FileWriter("d:/test.txt"));
        for (int j = 0; j < list.size(); j++) {
            bw.write(list.get(j).toString());
            bw.newLine();
            bw.flush();
        }
        bw.close();
        

        结果输出

        SpringBoot 使用 Spark,在这里插入图片描述,第5张

         

        读取 Json 文件

        测试文件 testjson.json,内容如下

        [{
        	"name": "name1",
        	"age": "1"
        }, {
        	"name": "name2",
        	"age": "2"
        }, {
        	"name": "name3",
        	"age": "3"
        }, {
        	"name": "name4",
        	"age": "4"
        }]
        

        注意:testjson.json 文件的内容不能带格式,需要进行压缩

        SpringBoot 使用 Spark,在这里插入图片描述,第6张

        java 代码

        • createOrReplaceTempView:读取 json 数据后,创建数据表 t
        • sparkSession.sql:使用 sql 对 t 进行查询,输出 age 大于 3 的数据
          public void testSparkJson() {
              Dataset df = sparkSession.read().json("D:\\TEMP\\testjson.json");
              df.printSchema();
              df.createOrReplaceTempView("t");
              Dataset row = sparkSession.sql("select age,name from t where age > 3");
              JavaRDD rowJavaRDD = row.javaRDD();
              System.out.println(rowJavaRDD.collect());
          }
          

          输出结果

          SpringBoot 使用 Spark,在这里插入图片描述,第7张

           

          中文输出乱码

          测试文件 testcsv.csv

          SpringBoot 使用 Spark,在这里插入图片描述,第8张

          public void testSparkCsv() {
              String file = "D:\\TEMP\\testcsv.csv";
              JavaRDD fileRDD = javaSparkContext.textFile(file);
              JavaRDD wordsRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
              //输出结果
              System.out.println(wordsRDD.collect());
          }
          

          输出结果,发现中文乱码,可恶

          SpringBoot 使用 Spark,在这里插入图片描述,第9张

          原因:textFile 读取文件没有解决乱码问题,但 sparkSession.read() 却不会乱码

          解决办法:获取文件方式由 textFile 改成 hadoopFile,由 hadoopFile 指定具体编码

              public void testSparkCsv() {
                  String file = "D:\\TEMP\\testcsv.csv";
                  String code = "gbk";
                  
                  JavaRDD gbkRDD = javaSparkContext.hadoopFile(file, TextInputFormat.class, LongWritable.class, Text.class).map(p -> new String(p._2.getBytes(), 0, p._2.getLength(), code));
                  JavaRDD gbkWordsRDD = gbkRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());
                  //输出结果
                  System.out.println(gbkWordsRDD.collect());
              }
          

          输出结果

          SpringBoot 使用 Spark,在这里插入图片描述,第10张

网友评论

搜索
最新文章
热门文章
热门标签