上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Hadoop-MapReduce排序(超级详细)

guduadmin11天前

N.1 MapReduce的模型

————————————————————————

Hadoop-MapReduce排序(超级详细),第1张

———————————————————————— 

(1)map

map task会从本地⽂件系统读取数据,转换成key-value形式的键值对集合。使⽤的是hadoop内置的数据类型,⽐如longwritable、text等。

(2)shuffle

[1] 溢出

[2] 分区:mapper的key-value在输出之后会进⾏⼀个partition分区操作,默认使⽤的是hashpartitioner,可以通过重写hashpartitioner的getpartition⽅法来⾃定义分区规则。

[3] 归并排序:会对key进⾏进⾏sort排序,grouping分组操作将相同key的value合并分组输出。当然,在这⾥可以使⽤⾃定义的数据类型,重写WritableComparator的Comparator⽅法来⾃定义排序规则

[4] 预合并排序

[5] reduce拉取的总归并排序:和归并排序排序一样

(3)reduce

reduce阶段的排序,负责接接收shuffle处理好的数据,直接循环迭代( key,valus{..} )即可。最后将数据保存或者显⽰,结束整个job。(当然这里要在做排序也是可以的)

N.2 WritableComparable排序概念

0)排序阶段的介绍_sefl来源百度:

(1)shuffle过程中执行过程中的排序(默认字典排序,默认是过group分组,一个对象单独一组),分别是:

[1] 溢出

[2] 分区阶段:根据分区以及key进行快速排序

[3] 预合并阶段:是在排序的基础上做合并,在排序。

[4] 归并阶段:将同一个分区的多个溢写文件进行归并排序,合成大的溢出文件。

[5] reduce拉取阶段总归并:会对不同的mapTask进行归并排序。

(2)reduce阶段的排序,负责接接收shuffle处理好的数据,直接循环迭代( key,valus{..} )即可。

1)排序的分类:

(1)部分排序(某一个分区,或者是部分文件排序):

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部排序。

(2)全排序(就一个分区,且所有文件合在一起排序):

如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce所提供的并行架构。

替代方案:首先创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要思路是使用一个分区来描述输出的全局排序。例如:可以为上述文件创建3个分区,在第一分区中,记录的单词首字母a-g,第二分区记录单词首字母h-n, 第三分区记录单词首字母o-z。

(3)辅助排序(某一个分区,或者部分文件排序,这个一般涉及对象的排序):(GroupingComparator分组)执行默认的。

[1] 默认情况下,如果使用自定义对象排序,每new出一个就单独分成一组。

GroupingComparator是在reduce阶段分组来使用的,由于reduce阶段,如果key相同的一组,只取第一个key作为key,迭代所有的values。 如果reduce的key是自定义的对象,我们只需要bean里面的某个属性相同就认为这样的key是相同的,这是我们就需要之定义GroupCoparator来“欺骗”reduce了如果每一个对象key都是单独一组,那么有些状态数据就要传递了,就会增加复杂度。所以辅助排序,相当优化。

[2] 一般来说,大多数MapReduce程序会避免让reduce函数依赖于value的排序。但是,有时也需要通过特定的方法对key进行排序和分组等, 以实现对value的排序。

(4)二次排序(某一个分区,或者部分文件排序,这里是二次排序):

[1] 二次排序 就是对key进行第一与二次的排序,案例:如key是一个对象,对象有多个属性,而value是别的值,而使用二次排序可以对key 里面的属性进行二次排序。

[2] 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序,如果是一个条件就是一次排序,定义一个类实现WritableComparable接口重写compareTo方法,就可以实现排序。

@Override

public int compareTo(FlowBean o) {

        // 倒序排列,从大到小

//FlowBean o表示后面的一个对象。-1表示交换位置,1表示不交换位置。

        return this.sumFlow > o.getSumFlow() ? -1 : 1;

}

N.3 二次排序

1)源数据:

————————————————————————

Hadoop-MapReduce排序(超级详细),第2张

————————————————————————

Hadoop-MapReduce排序(超级详细),第3张

————————————————————————

2)以下代码使用的是一个分区,输出也是一个文件。

package study190616_2;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class FlowBean implements WritableComparable{

private int id;

private double price;

//反序列化调用

public FlowBean(){

}

//比较器

public int compareTo(FlowBean o) {

//-1不交换位置(负数) 1交换位置(正数)

//比较原则 从上到下 “升”序排序,如果遇到相同的数据,就按后面的数据降序。

if(this.id>o.id){

return 1;

}else if(this.id

return -1;

}else {

//自定义的二次排序,

return this.price>o.price? -1:1;

}

}

//序列化

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeInt(this.id);

dataOutput.writeDouble(this.price);

}

//反序列化

public void readFields(DataInput dataInput) throws IOException {

this.id = dataInput.readInt();

this.price = dataInput.readDouble();

}

public void setId(int id) {

this.id = id;

}

public double getPrice() {

return price;

}

public void setPrice(double price) {

this.price = price;

}

@Override

public String toString() {

return this.id+"\t"+this.price;

}

}

package study190616_2;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

* KEYIN, VALUEIN, KEYOUT, VALUEOUT

* LongWritable,Text, FlowBean,NullWritable

* map 的输入 map的输出

*

* */

public class SortMap extends Mapper {

FlowBean f = new FlowBean();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//获取数据

String line = value.toString();

//切分

String[] split = line.split("\t");

//赋值/数据封装

//数据行:0000001 Pdt_01 222.8

f.setId(Integer.parseInt(split[0]));

f.setPrice(Double.parseDouble(split[2]));

context.write(f,NullWritable.get()); //注意 这里是按照对象传到reduce ,而每一个对象是通过new出来的 ,所以地址不一样,在reduce接受的时候,每个对象不合在一起,它们的value也不会集中成一个迭代器,也就是说个对象都有自己的迭代器。所以后面的章节会学到赋值排序,让它们合起来

}

}

package study190616_2;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReduce extends Reducer {

@Override

protected void reduce(FlowBean flowBean, Iterable values, Context context) throws IOException, InterruptedException {

//输出

context.write(flowBean, NullWritable.get()); //这里的reduce自己写 因为每一个对象不一样 ,迭代器不会聚集值。

}

}

package study190616_2;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MainWritable {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args=new String[]{"C:\\Users\\HMTX\\Desktop\\0616-压缩\\GroupingComparator.txt","C:\\Users\\HMTX\\Desktop\\0616-压缩\\t11"};

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 加载jar包

job.setJarByClass(MainWritable.class);

// 3 关联map和reduce

job.setMapperClass(SortMap.class);

job.setReducerClass(SortReduce.class);

// 4 设置最终输出类型

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(FlowBean.class);

job.setOutputValueClass(NullWritable.class);

// 5 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交

job.waitForCompletion(true);

}

}

3)远行结果

————————————————————————

Hadoop-MapReduce排序(超级详细),第4张

 ————————————————————————

N.4 GroupingComparator分组排序

1)如果使用某一个字段进行辅助排序,那么这个字段"必须"在之前"有过排序"的处理,所有"辅助"顾名思义就是在前者排序好的基础上发挥的作用, 单独使用的辅助排序 很可能生成的结果顺序是乱的,最好不要使用。而辅助里面在使用排序 一般 都跟之前的字段排序规则一样,其实辅助主要的不是排序,而是分组才是关键(可以认为写个排序的代码就是想分组)。所以 下面的案例 使用了二次排序作为辅助排序的基础。

2)源数据

————————————————————————

Hadoop-MapReduce排序(超级详细),第5张

————————————————————————

Hadoop-MapReduce排序(超级详细),第6张

————————————————————————

package study190616_3;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class OrderBean implements WritableComparable {

private int id;

private double price;

public OrderBean(){

}

//排序,先按ID排序,相等ID在价格排序

public int compareTo(OrderBean o) {

//-1不交换位置(负数) 1交换位置(正数)

if(this.id>o.id){

return 1;

}else if(this.id

return -1;

}else {

return this.price>o.price? -1:1;

}

}

//序列化

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeInt(this.id);

dataOutput.writeDouble(this.price);

}

//返序列化

public void readFields(DataInput dataInput) throws IOException {

this.id = dataInput.readInt();

this.price = dataInput.readDouble();

}

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public double getPrice() {

return price;

}

public void setPrice(double price) {

this.price = price;

}

@Override

public String toString() {

return this.id+"\t"+this.price;

}

}

package study190616_3;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMap extends Mapper {

//输入文件的原的行数据格式:0000002 Pdt_06 722.4

OrderBean f = new OrderBean();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//获取数据

String line = value.toString();

//切分

String[] split = line.split("\t");

//赋值/数据封装

f.setId(Integer.parseInt(split[0]));

f.setPrice(Double.parseDouble(split[2]));

context.write(f,new Text(split[1]));

}

}

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReduce extends Reducer {

@Override

protected void reduce(OrderBean flowBean, Iterable values, Context context) throws IOException, InterruptedException {

//输出

for(Text text:values){

//输出验证下

//System.out.println("key:" + flowBean.toString() + "," + "value:" + text.toString());

context.write(flowBean,text); //注意 这里的value 会合并成迭代器 ,因为后面的代码设置了分组

/**使用对象的某字段值分组,对象间的某字段的值相同,则这些对象就会组成一个"变化key",而value会聚集成迭代器,而这个变化key是根据遍历迭代器产生value"对应的对象做为key"。

如果不遍历迭代器,context.write(Bean,value)只使用一次 默认就是使用排序为第一的对象作为变量key

**/

}

//输出验证下

System.out.println("=========分隔符==========");

}

}

/**

注意这里 遍历的时候 没有跳出循环 对象却换了

* key:3 222.8,value:Pdt_01

* key:3 33.8,value:Pdt_02

* =========分隔符==========

* key:1 222.8,value:Pdt_01

* key:1 25.8,value:Pdt_06

* =========分隔符==========

* key:2 722.4,value:Pdt_05

* key:2 522.8,value:Pdt_03

* key:2 122.4,value:Pdt_04

* =========分隔符==========

当然 如果不遍历迭代器,就写一个

System.out.println("key:" + flowBean.toString() + "," + "value:" + text.toString());

那么输出的结构就是每一个组的第一个数据:

* key:3 222.8,value:Pdt_01

* =========分隔符==========

* key:1 222.8,value:Pdt_01

* =========分隔符==========

* key:2 722.4,value:Pdt_05

* =========分隔符=========

*/

package study190616_3;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class OrderPartition extends Partitioner {

@Override

public int getPartition(OrderBean orderBean, Text text, int i) {

//&表示位运算(还可以做逻辑运算符),orderBean.getId() & 2147483647得出结果为1,1%3=0...1,所以余数为1

//有0、1 、2的返回值

return (orderBean.getId() & 2147483647) % i;

}

}

package study190616_3;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

public class GroupComparee extends WritableComparator{

//构造方法

protected GroupComparee(){

//调用父类方法,里面传入参数,true表示开启辅助排序

//调用父类方法的WritableComparator(Class keyClass, boolean createInstances){..}

super(OrderBean.class,true);

}

@Override

public int compare(WritableComparable a, WritableComparable b) {

OrderBean oa = (OrderBean) a;

OrderBean ob = (OrderBean) b;

/**使用对象的某字段值分组,对象间的某字段的值相同,则这些对象就会组成一个"变化key,就是一个key",而value会聚集成迭代器,而这个变化key是根据遍历迭代器产生value"对应的对象做为key"。

如果不遍历迭代器,context.write(Bean,value)只使用一次 默认就是使用排序为第一的对象作为变量key

**/

//对key排序就是对key进行分组,在组排序

if(oa.getId()>ob.getId()){

return 1;

}else if(oa.getId()

return -1;

}else {

///相等

return 0;

}

}

}

package study190616_3;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MainOrder {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

args=new String[]{"C:\\Users\\HMTX\\Desktop\\0616-压缩\\GroupingComparator.txt","C:\\Users\\HMTX\\Desktop\\0616-压缩\\t111qq11"};

// 1 获取job信息

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

// 2 加载jar包

job.setJarByClass(MainOrder.class);

// 3 关联map和reduce

job.setMapperClass(OrderMap.class);

job.setReducerClass(OrderReduce.class);

// 4 设置最终输出类型

job.setMapOutputKeyClass(OrderBean.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(OrderBean.class);

job.setOutputValueClass(Text.class);

//指定辅助排序

job.setGroupingComparatorClass(GroupComparee.class);

//指定分区

job.setPartitionerClass(OrderPartition.class);

job.setNumReduceTasks(3);

// 5 设置输入和输出路径

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 6 提交

job.waitForCompletion(true);

}

}

————————————————————————

Hadoop-MapReduce排序(超级详细),第7张

————————————————————————

Hadoop-MapReduce排序(超级详细),第8张

————————————————————————

Hadoop-MapReduce排序(超级详细),第9张

————————————————————————

网友评论

搜索
最新文章
热门文章
热门标签