上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Hadoop3教程(十四):MapReduce中的排序

guduadmin11天前

文章目录

  • (99)WritableComparable排序
    • 什么是排序
    • 什么时候需要排序
    • 排序有哪些分类
    • 如何实现自定义排序
    • (100)全排序案例
      • 案例需求
      • 思路分析
      • 实际代码
      • (101)二次排序案例
      • (102) 区内排序案例
      • 参考文献

        (99)WritableComparable排序

        什么是排序

        排序是MR中最重要的操作之一,也是面试中可能被问到的重点。

        MapTask和ReduceTask中都会对数据按照KEY来排序,主要是为了效率,排完序之后,相同key值的数据会被放在一起,更方便下一步(如Reducer())的汇总处理。

        默认排序是按照字典顺序(字母由小到大,或者是数字由小到大)排序,且实现该排序的方法是快速排序。

        什么时候需要排序

        MR的过程中,什么时候用到了排序呢?

        Map阶段:

        • 环形缓冲区溢写到磁盘之前,会将每个分区内数据分别进行一个快排,这个排序是在内存中完成的;(对key的索引,按照字典顺序排列)
        • 环形缓冲区多轮溢写完毕后,会形成一堆文件,这时候会对这些文件做merge归并排序,我理解是单个MapTask最终会汇总形成一个文件;

          Reduce阶段:

          • ReduceTask会主动拉取MapTask们的输出文件,理论上是会优先保存到内存里,但是往往内存里放不下,所以多数情况下会直接溢写到磁盘,于是我们会得到多个文件。当文件数量超过阈值,之后需要做归并排序,合并成一个大文件。如果是内存中的数据超过阈值,则会进行一次合并后将数据溢写到磁盘。当所有数据拷贝完后,ReduceTask会统一对内存和磁盘上的所有数据进行一次归并排序
          • 文件合并后其实还可以进行一个分组排序,过于复杂,这里就不介绍了。

            排序有哪些分类

            MR里的排序还有部分排序、全排序、辅助排序、二次排序的不同说法,注意,它们之间不是像那种传统的排序算法之间的区别,只是当排序在不同场景的时候,分别起了个名字。

            MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部是有序的,这就是部分排序。

            最终输出结果只有一个文件,且文件内部有序。这就是全排序。

            全排序的实现方式是只设置一个ReduceTask。但是这种方式在处理大型文件时效率很低很低,因为一台机器处理全部数据,完全没有利用MR所提供的并行架构的优势,生产环境上完全不适用。

            所以生产环境里,常用的还是部分排序。

            辅助排序,就是GroupingComparator分组。

            这个似乎是可选的,是在Reduce阶段,Reducer在从Map阶段主动拉取完数据后,会对所有文件做一次归并排序。做完归并排序之后,理论上就可以进行辅助排序。

            辅助排序有啥用呢,就是当接收到的Key是个bean对象时,辅助排序可以让一个或者几个字段相同的key(全部字段不相同)进入同一个Reduce(),所以也起名叫做分组排序。

            二次排序比较简单,在自定义排序过程中,如果compareTo中的判断条件为两个,那它就是二次排序。

            如何实现自定义排序

            说到这里,那 如何实现自定义排序 呢?

            如果是bean对象作为key传输,那需要实现WritableComparable接口,重写compareTo方法,就可以实现自定义排序。

            @Override
            public int compareTo(FlowBean bean) {
            	int result;
            		
            	// 按照总流量大小,倒序排列
            	if (this.sumFlow > bean.getSumFlow()) {
            		result = -1;
            	}else if (this.sumFlow < bean.getSumFlow()) {
            		result = 1;
            	}else {
            		result = 0;
            	}
            	return result;
            }
            

            (100)全排序案例

            案例需求

            之前我们做过一个案例,输入文件有一个,里面放的是每个手机号的上行流量和下行流量,输出同样是一个文件,里面放的除了手机号的上行流量和下行流量之外,还多了一行总流量。

            这时候我们提一个新需求,就是我不止要这个输出文件,我还要这个文件里的内容,按照总流量降序排列。

            思路分析

            MapReduce里,只能对Key进行排序。在先前的需求里,我们是用手机号作为key,上行流量、下行流量和总流量组成一个bean,作为value,这样的安排显然不适合新需求。

            因此我们需要改变一下,将上行流量、下行流量和总流量组成的bean作为key,而将手机号作为value,如此来排序。

            所以第一步,我们需要对我们自定义的FlowBean对象声明WritableComparable接口,并重写CompareTo方法,这一步的目的是使得FlowBean可进行算数比较,从而允许排序:

            @Override
            public int CompareTo(FlowBean o){
                // 按照总流量,降序排列
                return this.sumFlow > o.getSumFlow()?-1:1;
            }
            

            注意这里,因为Hadoop里默认的字典排序是从小到大排序,如果想实现案例里由大到小的排序,那么当大于的时候,就要返回-1,从而将大的值排在前面。

            其次,Mapper类里:

            context.write(bean, 手机号)
            

            bean成了key,手机号成了value。

            最后,Reduce类里,需要循环输出,避免出现总流量相同的情况。

            for (Text text: values){
                context.write(text, key);	// 注意顺序,原先的key放在value位置
            }
            

            2023-7-19 11:16:04 这里没懂。。。

            哦哦明白了,什么样的数据会进一个Reducer呢,当然是key 值相同的会进同一个,又因为我们之前compareTo的时候用的是总流量,所以最后是总流量相同的记录会送进同一个Reducer,然后汇总成一条记录做输出,毕竟reducer就是用来做汇总的。

            但"汇总成一条记录"这并不是我们想要的,我们需要的是把这些数据原模原样输出来。这就是为什么我们在Reducer的reduce()里面,要加上循环输出的原因。

            实际代码

            贴一下教程里的代码实现:

            首先是FlowBean对象,需要声明WritableComparable接口,并重写CompareTo()

            package com.atguigu.mapreduce.writablecompable;
            import org.apache.hadoop.io.WritableComparable;
            import java.io.DataInput;
            import java.io.DataOutput;
            import java.io.IOException;
            public class FlowBean implements WritableComparable {
                private long upFlow; //上行流量
                private long downFlow; //下行流量
                private long sumFlow; //总流量
                //提供无参构造
                public FlowBean() {
                }
                //生成三个属性的getter和setter方法
                public long getUpFlow() {
                    return upFlow;
                }
                public void setUpFlow(long upFlow) {
                    this.upFlow = upFlow;
                }
                public long getDownFlow() {
                    return downFlow;
                }
                public void setDownFlow(long downFlow) {
                    this.downFlow = downFlow;
                }
                public long getSumFlow() {
                    return sumFlow;
                }
                public void setSumFlow(long sumFlow) {
                    this.sumFlow = sumFlow;
                }
                public void setSumFlow() {
                    this.sumFlow = this.upFlow + this.downFlow;
                }
                //实现序列化和反序列化方法,注意顺序一定要一致
                @Override
                public void write(DataOutput out) throws IOException {
                    out.writeLong(this.upFlow);
                    out.writeLong(this.downFlow);
                    out.writeLong(this.sumFlow);
                }
                @Override
                public void readFields(DataInput in) throws IOException {
                    this.upFlow = in.readLong();
                    this.downFlow = in.readLong();
                    this.sumFlow = in.readLong();
                }
                //重写ToString,最后要输出FlowBean
                @Override
                public String toString() {
                    return upFlow + "\t" + downFlow + "\t" + sumFlow;
                }
                @Override
                public int compareTo(FlowBean o) {
                    //按照总流量比较,倒序排列
                    if(this.sumFlow > o.sumFlow){
                        return -1;
                    }else if(this.sumFlow < o.sumFlow){
                        return 1;
                    }else {
                        return 0;
                    }
                }
            }
            

            然后编写Mapper类:

            package com.atguigu.mapreduce.writablecompable;
            import org.apache.hadoop.io.LongWritable;
            import org.apache.hadoop.io.Text;
            import org.apache.hadoop.mapreduce.Mapper;
            import java.io.IOException;
            public class FlowMapper extends Mapper {
                private FlowBean outK = new FlowBean();
                private Text outV = new Text();
                @Override
                protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                    //1 获取一行数据
                    String line = value.toString();
                    //2 按照"\t",切割数据
                    String[] split = line.split("\t");
                    //3 封装outK outV
                    outK.setUpFlow(Long.parseLong(split[1]));
                    outK.setDownFlow(Long.parseLong(split[2]));
                    outK.setSumFlow();
                    outV.set(split[0]);
                    //4 写出outK outV
                    context.write(outK,outV);
                }
            }
            

            然后编写Reducer类:

            package com.atguigu.mapreduce.writablecompable;
            import org.apache.hadoop.io.Text;
            import org.apache.hadoop.mapreduce.Reducer;
            import java.io.IOException;
            public class FlowReducer extends Reducer {
                @Override
                protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
                    //遍历values集合,循环写出,避免总流量相同的情况
                    for (Text value : values) {
                        //调换KV位置,反向写出
                        context.write(value,key);
                    }
                }
            }
            

            最后编写驱动类:

            package com.atguigu.mapreduce.writablecompable;
            import org.apache.hadoop.conf.Configuration;
            import org.apache.hadoop.fs.Path;
            import org.apache.hadoop.io.Text;
            import org.apache.hadoop.mapreduce.Job;
            import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
            import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
            import java.io.IOException;
            public class FlowDriver {
                public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                    //1 获取job对象
                    Configuration conf = new Configuration();
                    Job job = Job.getInstance(conf);
                    //2 关联本Driver类
                    job.setJarByClass(FlowDriver.class);
                    //3 关联Mapper和Reducer
                    job.setMapperClass(FlowMapper.class);
                    job.setReducerClass(FlowReducer.class);
                    //4 设置Map端输出数据的KV类型
                    job.setMapOutputKeyClass(FlowBean.class);
                    job.setMapOutputValueClass(Text.class);
                    //5 设置程序最终输出的KV类型
                    job.setOutputKeyClass(Text.class);
                    job.setOutputValueClass(FlowBean.class);
                    //6 设置输入输出路径
                    FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));
                    FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));
                    //7 提交Job
                    boolean b = job.waitForCompletion(true);
                    System.exit(b ? 0 : 1);
                }
            }
            

            完成,仅做了解即可。

            (101)二次排序案例

            二次排序的概念很简单,其实之前提过了,就是在自定义排序的时候,判断条件有两个。

            比如说,原先我对一堆人排序,是按照身高从高到低排,但是身高一样的就没法排序了,这时候我可以再加入一个判断条件,比如说如果身高一样的话,就按体重排序。

            具体就是修改FlowBean的CompareTo方法,在第一条件相等的时候,添加第二判定条件。

            public int compareTo(FlowBean o) {
                //按照总流量比较,倒序排列
                if(this.sumFlow > o.sumFlow){
                    return -1;
                }else if(this.sumFlow < o.sumFlow){
                    return 1;
                }else {
                    if (this.upFlow > o.upFlow){
                        return 1;
                    } else if (this.upFlow < o.upFlow){
                        return -1;
                    }
                    else {
                        return 0;
                    }
                    
                }
            }
            

            如果有需要的话,还可以继续加第三判定条件。

            (102) 区内排序案例

            还是之前的手机号案例,之前我们想要的是,只有一个文件,然后文件内所有数据按照总流量降序排列。

            现在我们提出一个新要求,按照前3位来分区输出,比如说136的在一个文件里,137的在一个文件里,以此类推。而且每个文件内部,还需要按照总流量降序排列。

            本质上就是之前说的分区 + 排序,这两部分的结合。需要额外定义好Partitioner类。

            贴一下教程里的代码示例,其实只需要在上一小节的基础上补充自定义分区类即可:

            首先自定义好分区类:

            package com.atguigu.mapreduce.partitionercompable;
            import org.apache.hadoop.io.Text;
            import org.apache.hadoop.mapreduce.Partitioner;
            public class ProvincePartitioner2 extends Partitioner {
                @Override
                public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
                    //获取手机号前三位
                    String phone = text.toString();
                    String prePhone = phone.substring(0, 3);
                    //定义一个分区号变量partition,根据prePhone设置分区号
                    int partition;
                    if("136".equals(prePhone)){
                        partition = 0;
                    }else if("137".equals(prePhone)){
                        partition = 1;
                    }else if("138".equals(prePhone)){
                        partition = 2;
                    }else if("139".equals(prePhone)){
                        partition = 3;
                    }else {
                        partition = 4;
                    }
                    //最后返回分区号partition
                    return partition;
                }
            }
            

            然后在驱动类里注册好分区器:

            // 设置自定义分区器
            job.setPartitionerClass(ProvincePartitioner2.class);
            // 设置对应的ReduceTask的个数
            job.setNumReduceTasks(5);
            

            其他跟上一小节保持一致即可。

            参考文献

            1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】

网友评论

搜索
最新文章
热门文章
热门标签