在Spark,两个DataFrame做join操作后,会出现重复的列。例如:
DatasetmoviesWithRating = moviesDF .join(averageRatingMoviesDF, moviesDF.col("movieId").equalTo(averageRatingMoviesDF.col("movieId")));
其schema如下:
//moviesWithRating.printSchema(); /** * root * |-- _id: struct (nullable = true) * | |-- oid: string (nullable = true) * |-- actors: string (nullable = true) * |-- description: string (nullable = true) * |-- directors: string (nullable = true) * |-- genres: string (nullable = true) * |-- issue: string (nullable = true) * |-- language: string (nullable = true) * |-- movieId: integer (nullable = true) * |-- shoot: string (nullable = true) * |-- timeLong: string (nullable = true) * |-- title: string (nullable = true) * |-- movieId: integer (nullable = true) * |-- avgRating: double (nullable = true) */
我们在继续操作这个DataFrame时,可能就会报错,如下:org.apache.spark.sql.AnalysisException: Reference ‘movieId’ is ambiguous
解决方案有两种方法可以用来移除重复的列
- 方法一:join表达式使用字符串数组(用于join的列)
Seq
joinColumns = JavaConversions.asScalaBuffer(Arrays.asList("movieId")).toList(); Dataset moviesWithRating = moviesDF.join( averageRatingMoviesDF, joinColumns, "inner");
这里DataFrame moviesDF和averageRatingMoviesDF使用了movieId和movieId两列来做join,返回的结果会对这两列去重
如果是scala,解决方案如下:
val moviesWithRating = moviesDf.join(averageRatingMoviesDF, Seq("movieId"))
- 方法二:使用select返回指定的列
Dataset
moviesWithRating = moviesDF .join(averageRatingMoviesDF, moviesDF.col("movieId").equalTo(averageRatingMoviesDF.col("movieId"))) .select( moviesDF.col("movieId"), col("actors"), col("description"), col("directors"), col("genres"), col("issue"), col("language"), col("shoot"), col("timeLong"), col("title"), col("avgRating") );
说明:
如果列较少, 推荐使用第二种.
如果列较多, 推荐使用第一种.
- 方法二:使用select返回指定的列
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章