这代人的热情-MapReduce的工作原理学习(二)
本篇笔记是完全对前一篇文章内容的补充,这代人的热情-MapReduce的工作原理学习(一)
在前面的笔记中写到了MapReduce中重要的两个处理阶段,那就是Map和Reduce阶段嘛。
我在网上看到了这一段有趣的解释部件间关系的说法,挺好记,很形象的说清楚了都是干啥的。
NameNode是HDFS的老大。DataNode是HDFS的小弟。SecondaryNameNode是NameNode的助理。ResourceManager是YARN的老大。YARN的小弟是NodeManager。
首先是有一些补充执行流程的,因为后来我也再看后面的,发现了其他好的资料也就加上来了。
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
缺少Jar包:Hadoop-mapreduce-client-common-2.2.0.jar
引用了下面的资料:
Hadoop Map/Reduce教程
Partitioner-一蓑烟雨任平生、
Partitioner与自定义Partitioner
Partitioner-小月半会飞
在本篇笔记之前,还有我写的这几篇,可以参阅一下:
Hadoop 2.x在CentOS 6.9和Windows下的伪分布式部署
HDFS的再探究
这代人的热情-MapReduce的工作原理学习(一)
1. MapReduce的执行流程
前面我们自建了一个非常简单的小例子,叫wordcount,在编写好测试的代码以后,打包成了Jar包,我们把这个包交给服务器,那么处理的流程是怎么样的?
- 客户端提交一个MapReduce的jar包给JobClient(提交方式:hadoop jar ...)
- JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
- client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
- 开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
- JobTracker进行初始化任务
- 读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
- TaskTracker通过心跳机制领取任务(任务的描述信息)
- 下载所需的jar,配置文件等
- TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
- 将结果写回到HDFS储存。
2. 作业的输入
具体可以看文档:作业的输入,我这里只是列举了两个。
InputFormat 为Map/Reduce作业描述输入的细节规范。
InputSplit输入切片,每一个切出来的对应一个Mapper,Mapper的输出作为Reducer的输入。
前文也提到了类型:
LongWritable->WritableComparable->Writable, Comparable
LongWritable实现了WritableComparable,既可序列化也可以比较,
接口WritableComparable继承了Writable, Comparable
在接口Writable中,定义了两个方法:void write(DataOutput out)
和void readFields(DataInput in)
自定义一个对象。
这里用到了反射
使用了有参的构造方法,要添加一个无参的构造方法
还有就是提到序列化:
- 序列化(Serialization) 是指把结构化对象转化为字节流。
- 反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
- Java序列化(java.io.Serializable )
hadoop学习第四天-Writable和WritableComparable序列化接口的使用&&MapReduce中传递javaBean的简单例子
hadoop自定义对象序列化Writable和WritableComparable
如果一个javaBean想要作为MapReduce的key或者value,就一定要实现序列化,因为在Map到Reduce阶段的时候,只能是传输二进制数据,不可能将字符流直接进行RPC传输。
在public class LongWritable implements WritableComparable<LongWritable>
中实现了接口WritableComparable
,这个接口又继承了extends Writable, Comparable<T>
,我们自己建立的类也应该继承接口WritableComparable
。
实现下面的三个方法:
public int compareTo(InfoBean o) {
return 0;
}
public void write(DataOutput dataOutput) throws IOException {
}
public void readFields(DataInput dataInput) throws IOException {
}
后面开始才是本来的内容:
3. Partitioner
在运行前文的wordcount案例时,我们创建了一个类,继承Mapper
重写map
,Mapper处理后的结果是<K2,V2>,会交给Reducer合并(当然还有很重要的一环就是shuffle过程,在后文专门会写到)。
合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。
Hadoop和大多数的集群应用一样,缓存的数据依靠Hash来均匀分布,这在后文也有。
官方文档是这样描述的:
Partitioner用于划分键值空间(key space)。Partitioner负责控制map输出结果key的分割。Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。
在前文中的案例运行以后,part-r-00000中就储存了结果文件。这就是一个结果文件,我们可以通过Partitioner来控制结果文件的数量。
那么我们怎么自定义一个呢?前面引用官方文档的内容已经提到了:HashPartitioner是默认的 Partitioner。
新建立一个public static class ProviderPartitioner extends Partitioner<Text,DataBean>
,继承Partitioner,会提示需要实现一个方法,返回的就是分区号:
public int getPartition(Text text, DataBean dataBean, int numPartitions) {
return 0;
}
可以用单例模式加载。
使用job.setPartitionerClass
加入自定义的ProviderPartitioner
,要求该类必须继承Partitioner
。job.setPartitionerClass(ServiceProviderPartitioner.class);
MapReduce如果不进行设置Reduce的数量,默认启动一个。
之所以前面的计算文件只有一个,就是因为只启动了一个Reduce。通过job.setNumReduceTasks(Integer.parseInt(args[2]));
可以将args的第三个参数设置为Reduce的数量。
这样结果就变化了:
那里面又是怎么跑的呢?
当自定义setNumReduceTasks
的数量大于实际的分区号数量时,Reduce依旧会启动,会创建结果文件,只是是空白的。如果小于,有的Reduce就无法把有的数据发到文件,会报 java.io.IOException: Illegal partition
错。实际使用时要尽量大于实际的分区数。
从getPartition
中得到当前的分区号,因为是包装类型,不会返回0。
前文也提到了哈希分布,HashPartitioner。
return (key.hashCode() & 2147483647) % numReduceTasks;
从这一句中可以知晓,是key的hashCode值与上int类型最大的数(2147483647),对Reduce的数量numReduceTasks求余数。
这样做可以均匀的分布ReduceTasks。
4. Combiner
map阶段可能会产生大量的输出,Combiner(如果有的话)会在map阶段对输出先做一次合并(在本地聚合),再往下走,这样做传输到下一阶段,也就是输入到reducer中的数量(个数)会变少。
Combiner相当于特殊的Reducer,相当于本地的一个reduce,实现本地key的归并。
Combiner是一个可选的功能,我们完全可以不使用那么,map阶段输出的所有的结果都会交给reduce,这样就少了这一层的优化。但是是不是一定用了Combiner就好呢?这就得注意: Combiner的输出就是Reducer的输入,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出keylvalue类型完全一致,且不影响最终结果的场景。比如过滤数据,累加,最大值等。
Combiner的使用就比较容易了,Reducer和Combiner的功能是一样的,我们可以将自己的Reducer当作Combinerjob.setCombinerClass(WCReducer.class);
。这样运行,也会是正常的。
依旧用上次不怎么太好的例子。
leaf yyh
yyh xpleaf
katy ling
yeyonghao leaf
xpleaf katy
那么在Combiner使用和不使用时有什么区别?
上下两种执行的结果是相同的,数据量特别大时,下面的效率高;数据量比较小时,上面的效率高。
下面是执行过程:当我们把Reducer当作Combiner,程序执行时会经过3个组件,Mapper,Combiner,Reducer。只是Reducer会经过两次。我们可以debug看是不是三个流程,在WordCount的main上打断点,分别在WCMapper和WCReducer。
下面是一些注意的。
进入程序的第一次key是0。
然后会进行切分为两个词。
5. Shuffle
下面的内容来自文档:Reducer有3个主要阶段:shuffle、sort和reduce。
Shuffle
Reducer的输入就是Mapper已经排好序的输出。在这个阶段,框架通过HTTP为每个Reducer获得所有Mapper输出中与之相关的分块。
Sort
这个阶段,框架将按照key的值对Reducer的输入进行分组 (因为不同mapper的输出中可能会有相同的key)。
Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。
一个输入切片对应一个map,一个map任务只读取文件的一部分,每一个map都有一个内存缓冲区,当达到一定的阈值时(80%),就会将数据写入到磁盘,写入的过程和复杂。
首先是对数据进行分区,如果我们没有定义Partitioner,系统会有一个默认的分区规则。
首先按照分区号排序,排在前面的是0号分区的。
其次将每个分区中的数据再进行排序,这时,如果是Text类型,按照字典顺序排;Long类型,按自然数顺序;如果是自定义的对象,按照自定义的顺序排列。
如果内存缓冲区写满了,硬盘速度更不上,会map阻塞,直到内存中的数据写入到了"硬盘"中。
上面的操作会产生很多的小文件,到了后面依然会合并,合并的方式就是按照相同分区号的数据进行合并,例如:0号分区的数据进行合并。
最终得到的大文件也是分区且排序的大文件。
这时Map的任务就完成了。接下来是Reduce。
Reduce会到map中取数据,每一个Reduce会取前面的一部分数据,又进行合并和排序,再给Reduce进行计算,再写回到HDFS中。
在Q'Reilly的Hadoop权威指南中,有一小节,讲到了shuffle 和排序,下面的资料都是引用的,这里面的内容太庞大了,如果是想要学通还要花很多时间。
MapReduce确保每个reducer 的输人都按键排序。系统执行排序的过程一将map输出作为输入传给reducer- 称为 shuffle。“ 在此, 我们将学习shuffle 是如何工作的,因为它有助于我们理解工作机制(如果需要优化MapReduce 程序的话)。.shuffle属于不断被优化和改进的代码库的一部分,因此下面的描述有必要隐藏一些细节(也可能随时间而改变,目前是0.20 版本)。从许多方面来看,shufle 是MapReduce的“心脏”,是奇迹发生的地方。
在写磁盘之前,线程首先根据数据最终要传送到的 reducer 把数据划分成相应的分(partition) 。在每个分区中,后台线程按键进行内排序,如果有一个 combiner它会在排序后的输出上运行。
接下来是reduce端。Reduce通信是RPC,真正下载数据是HTTP。
在Hadoop 1.0中,MapReduce的老大叫JobTracker,作业跟踪器,负责监控全局的任务,又要分配资源,负责调度(监控)Map运行在那台机器上,在2.0中就拆分了,资源的分配交给ResourceManager,资源的监控交给MRAppMaster。
在官方文档中有:作业的提交与监控
Yarn之 MRAppMaster 概览
MRAppMaster详细分析
小弟TaskTracker会发送心跳,往老大请求任务,TaskTracker领到任务后,需要启动子进程Child(不叫YARN-Child),可以一台机器多个Child
假设前两个中运行的map,后一个运行的Reduce,
Hadoop 2.0中,MRAppMaster用来监控属于它任务的YARNChild。当不同机器时,还是使用RPC来通信。
MRAppMaster是MapReduce的ApplicationMaster实现,它使得MapReduce计算框架可以运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括创建MapReduce作业,向ResourceManager申请资源,与NodeManage通信要求其启动Container,监控作业的运行状态,当任务失败时重新启动任务等。
6. 例子:WordCount v2.0
这个例子在官方文档,例子:WordCount v2.0
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。