本篇笔记是完全对前一篇文章内容的补充,这代人的热情-MapReduce的工作原理学习(一)
在前面的笔记中写到了MapReduce中重要的两个处理阶段,那就是Map和Reduce阶段嘛。

我在网上看到了这一段有趣的解释部件间关系的说法,挺好记,很形象的说清楚了都是干啥的。

NameNode是HDFS的老大。DataNode是HDFS的小弟。SecondaryNameNode是NameNode的助理。ResourceManager是YARN的老大。YARN的小弟是NodeManager。

首先是有一些补充执行流程的,因为后来我也再看后面的,发现了其他好的资料也就加上来了。

Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.

缺少Jar包:Hadoop-mapreduce-client-common-2.2.0.jar

引用了下面的资料:
Hadoop Map/Reduce教程
Partitioner-一蓑烟雨任平生、
Partitioner与自定义Partitioner
Partitioner-小月半会飞

在本篇笔记之前,还有我写的这几篇,可以参阅一下:
Hadoop 2.x在CentOS 6.9和Windows下的伪分布式部署
HDFS的再探究
这代人的热情-MapReduce的工作原理学习(一)

1. MapReduce的执行流程

前面我们自建了一个非常简单的小例子,叫wordcount,在编写好测试的代码以后,打包成了Jar包,我们把这个包交给服务器,那么处理的流程是怎么样的?

  1. 客户端提交一个MapReduce的jar包给JobClient(提交方式:hadoop jar ...)
  2. JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
  3. client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
  4. 开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
  5. JobTracker进行初始化任务
  6. 读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
  7. TaskTracker通过心跳机制领取任务(任务的描述信息)
  8. 下载所需的jar,配置文件等
  9. TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
  10. 将结果写回到HDFS储存。

2. 作业的输入

具体可以看文档:作业的输入,我这里只是列举了两个。

InputFormat 为Map/Reduce作业描述输入的细节规范。

InputSplit输入切片,每一个切出来的对应一个Mapper,Mapper的输出作为Reducer的输入。

前文也提到了类型:

LongWritable->WritableComparable->Writable, Comparable
LongWritable实现了WritableComparable,既可序列化也可以比较,
接口WritableComparable继承了Writable, Comparable

在接口Writable中,定义了两个方法:
void write(DataOutput out)void readFields(DataInput in)

自定义一个对象。

这里用到了反射
使用了有参的构造方法,要添加一个无参的构造方法

还有就是提到序列化:

  • 序列化(Serialization) 是指把结构化对象转化为字节流。
  • 反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
  • Java序列化(java.io.Serializable )

hadoop学习第四天-Writable和WritableComparable序列化接口的使用&&MapReduce中传递javaBean的简单例子
hadoop自定义对象序列化Writable和WritableComparable

如果一个javaBean想要作为MapReduce的key或者value,就一定要实现序列化,因为在Map到Reduce阶段的时候,只能是传输二进制数据,不可能将字符流直接进行RPC传输。

public class LongWritable implements WritableComparable<LongWritable>中实现了接口WritableComparable,这个接口又继承了extends Writable, Comparable<T>,我们自己建立的类也应该继承接口WritableComparable
实现下面的三个方法:

public int compareTo(InfoBean o) {
    return 0;
}

public void write(DataOutput dataOutput) throws IOException {

}

public void readFields(DataInput dataInput) throws IOException {

}

后面开始才是本来的内容:

3. Partitioner

在运行前文的wordcount案例时,我们创建了一个类,继承Mapper重写map,Mapper处理后的结果是<K2,V2>,会交给Reducer合并(当然还有很重要的一环就是shuffle过程,在后文专门会写到)。

合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。

Hadoop和大多数的集群应用一样,缓存的数据依靠Hash来均匀分布,这在后文也有。

官方文档是这样描述的:

Partitioner用于划分键值空间(key space)。Partitioner负责控制map输出结果key的分割。Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。

在前文中的案例运行以后,part-r-00000中就储存了结果文件。这就是一个结果文件,我们可以通过Partitioner来控制结果文件的数量。

20200216053040285_22675.webp

那么我们怎么自定义一个呢?前面引用官方文档的内容已经提到了:HashPartitioner是默认的 Partitioner

新建立一个public static class ProviderPartitioner extends Partitioner<Text,DataBean>,继承Partitioner,会提示需要实现一个方法,返回的就是分区号:

public int getPartition(Text text, DataBean dataBean, int numPartitions) {
            return 0;
        }

可以用单例模式加载。

使用job.setPartitionerClass加入自定义的ProviderPartitioner,要求该类必须继承Partitioner
job.setPartitionerClass(ServiceProviderPartitioner.class);

hadoop中map和reduce的数量设置问题

MapReduce如果不进行设置Reduce的数量,默认启动一个。
之所以前面的计算文件只有一个,就是因为只启动了一个Reduce。通过job.setNumReduceTasks(Integer.parseInt(args[2]));可以将args的第三个参数设置为Reduce的数量。
这样结果就变化了:

20200216061608584_16514.webp

那里面又是怎么跑的呢?

当自定义setNumReduceTasks的数量大于实际的分区号数量时,Reduce依旧会启动,会创建结果文件,只是是空白的。如果小于,有的Reduce就无法把有的数据发到文件,会报 java.io.IOException: Illegal partition错。实际使用时要尽量大于实际的分区数。

20200216064326948_13906.webp

getPartition中得到当前的分区号,因为是包装类型,不会返回0。

前文也提到了哈希分布,HashPartitioner。

20200217060732806_13875.webp

20200217060758131_17218.webp

return (key.hashCode() & 2147483647) % numReduceTasks;

从这一句中可以知晓,是key的hashCode值与上int类型最大的数(2147483647),对Reduce的数量numReduceTasks求余数。
这样做可以均匀的分布ReduceTasks。

4. Combiner

map阶段可能会产生大量的输出,Combiner(如果有的话)会在map阶段对输出先做一次合并(在本地聚合),再往下走,这样做传输到下一阶段,也就是输入到reducer中的数量(个数)会变少。
Combiner相当于特殊的Reducer,相当于本地的一个reduce,实现本地key的归并

Combiner是一个可选的功能,我们完全可以不使用那么,map阶段输出的所有的结果都会交给reduce,这样就少了这一层的优化。但是是不是一定用了Combiner就好呢?这就得注意: Combiner的输出就是Reducer的输入,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出keylvalue类型完全一致,且不影响最终结果的场景。比如过滤数据,累加,最大值等。

Combiner的使用就比较容易了,Reducer和Combiner的功能是一样的,我们可以将自己的Reducer当作Combinerjob.setCombinerClass(WCReducer.class);。这样运行,也会是正常的。

依旧用上次不怎么太好的例子。

leaf yyh
yyh xpleaf
katy ling
yeyonghao leaf
xpleaf katy

那么在Combiner使用和不使用时有什么区别?

20200302101744538_7571.webp

20200302103326882_5957.webp

上下两种执行的结果是相同的,数据量特别大时,下面的效率高;数据量比较小时,上面的效率高。

下面是执行过程:当我们把Reducer当作Combiner,程序执行时会经过3个组件,Mapper,Combiner,Reducer。只是Reducer会经过两次。我们可以debug看是不是三个流程,在WordCount的main上打断点,分别在WCMapper和WCReducer。

下面是一些注意的。
进入程序的第一次key是0。

20200217055432240_23938.webp

然后会进行切分为两个词。

20200217055535529_4280.webp

5. Shuffle

下面的内容来自文档:Reducer有3个主要阶段:shuffle、sort和reduce。

Shuffle
Reducer的输入就是Mapper已经排好序的输出。在这个阶段,框架通过HTTP为每个Reducer获得所有Mapper输出中与之相关的分块。

Sort
这个阶段,框架将按照key的值对Reducer的输入进行分组 (因为不同mapper的输出中可能会有相同的key)。

Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。

20200219014540865_4185.webp

一个输入切片对应一个map,一个map任务只读取文件的一部分,每一个map都有一个内存缓冲区,当达到一定的阈值时(80%),就会将数据写入到磁盘,写入的过程和复杂。
首先是对数据进行分区,如果我们没有定义Partitioner,系统会有一个默认的分区规则。

首先按照分区号排序,排在前面的是0号分区的。
其次将每个分区中的数据再进行排序,这时,如果是Text类型,按照字典顺序排;Long类型,按自然数顺序;如果是自定义的对象,按照自定义的顺序排列。
如果内存缓冲区写满了,硬盘速度更不上,会map阻塞,直到内存中的数据写入到了"硬盘"中。

上面的操作会产生很多的小文件,到了后面依然会合并,合并的方式就是按照相同分区号的数据进行合并,例如:0号分区的数据进行合并。
最终得到的大文件也是分区且排序的大文件。

这时Map的任务就完成了。接下来是Reduce。
Reduce会到map中取数据,每一个Reduce会取前面的一部分数据,又进行合并和排序,再给Reduce进行计算,再写回到HDFS中。

在Q'Reilly的Hadoop权威指南中,有一小节,讲到了shuffle 和排序下面的资料都是引用的,这里面的内容太庞大了,如果是想要学通还要花很多时间。

20200217063931837_31691.webp

MapReduce确保每个reducer 的输人都按键排序。系统执行排序的过程一将map输出作为输入传给reducer- 称为 shuffle。“ 在此, 我们将学习shuffle 是如何工作的,因为它有助于我们理解工作机制(如果需要优化MapReduce 程序的话)。.shuffle属于不断被优化和改进的代码库的一部分,因此下面的描述有必要隐藏一些细节(也可能随时间而改变,目前是0.20 版本)。从许多方面来看,shufle 是MapReduce的“心脏”,是奇迹发生的地方。

在写磁盘之前,线程首先根据数据最终要传送到的 reducer 把数据划分成相应的分(partition) 。在每个分区中,后台线程按键进行内排序,如果有一个 combiner它会在排序后的输出上运行。

接下来是reduce端。Reduce通信是RPC,真正下载数据是HTTP。

在Hadoop 1.0中,MapReduce的老大叫JobTracker,作业跟踪器,负责监控全局的任务,又要分配资源,负责调度(监控)Map运行在那台机器上,在2.0中就拆分了,资源的分配交给ResourceManager,资源的监控交给MRAppMaster。
在官方文档中有:作业的提交与监控
Yarn之 MRAppMaster 概览
MRAppMaster详细分析

小弟TaskTracker会发送心跳,往老大请求任务,TaskTracker领到任务后,需要启动子进程Child(不叫YARN-Child),可以一台机器多个Child

假设前两个中运行的map,后一个运行的Reduce,

538556663.webp

Hadoop 2.0中,MRAppMaster用来监控属于它任务的YARNChild。当不同机器时,还是使用RPC来通信。

Yarn之 MRAppMaster 概览

MRAppMaster是MapReduce的ApplicationMaster实现,它使得MapReduce计算框架可以运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括创建MapReduce作业,向ResourceManager申请资源,与NodeManage通信要求其启动Container,监控作业的运行状态,当任务失败时重新启动任务等。

6. 例子:WordCount v2.0

这个例子在官方文档,例子:WordCount v2.0

文章目录