这代人的热情-MapReduce的工作原理学习(一)
本文仅作为我的学习笔记。保存在我的博客,加入了一些我自己的理解,便于我自己区分,错误是肯定存在,请留言联系我,我会第一时间改正。
mapreduce为什么被淘汰了?
我从不会赞同某一编程语言或编程框架会被完全淘汰。虽然程序过时了,但是这样的思想还会继续影响着我们。
1. 什么是MapReduce
具体可以看官方介绍,或是Hadoop集群(第6期)_WordCount运行详解中的1、MapReduce理论简介->1.1 MapReduce编程模型,我因为这开始只是我的笔记,所以搬运了。还参考了Hadoop开发--MapReduce编程--示例代码(九)。
MapReduce Tutorial
MapReduce是一个处理流程,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。更简单的描述就是,MapReduce把原本复杂的计算,分给了下面的各个结点完成。这么说来好像步骤很少,似乎也不是很复杂,但是如果仔细思考就会知道,你自己光是处理每个节点的数据协调就是很费时的事,更不要提再加上各种状态汇报、心跳汇报。而MapReduce就可以让我们不去考虑上面提到的费事的工作,让开发人员专心进行数据的处理,那么又怎么实现?主要是由两个步骤(阶段,流程)组成,分别是:Map和Reduce。
1.1. map()和reduce()
如果只是为了实现简单的分布式计算,也可以理解为实现map()和reduce()两个函数。处理的流程大概是什么样子的?
额外说一下,MapReduce作为Hadoop的一部分,于是他的储存系统就是HDFS。其运算过程中的数据等会保存到HDFS上。
从这个图的最下方开始看,左边是输入,右边是输出的结果。从左往右看,传入map()和reduce()两个函数的形参是key、value对,表示输入进入处理流程的信息,每一个流程的数据(键值对)为了便于区分,都取了名字。我在一些博客上看到,贴切的称为取小名。
阶段 | 输入键值对 | 输出键值对 |
---|---|---|
Map | <K1,V1> | <K2,V2> |
Reduce | <K2,{V2}> | <K3,V3> |
这基本上就相当于,如果有人问到<K3,V3>是什么时候的数据啊?就是Reduce阶段,输出数据的键值对。
下面的内容照抄了:Mapreduce 原理及程序分析
Map
1.1 读取输入文件(HDFS)内容,解析成key1、value1对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
注:key是当前行的起始位置,单位是字节。第一行的起始位置是0,value是当前行的内容。有多少行就产生多少键值对。每个键值对调用一个map函数。
**注意区别map任务与map函数,map函数仅仅是map任务中的一个步骤。**
1.2 覆盖map函数,写自己的逻辑,对输入的key1、value1处理,转换成新的key2、value2输出。
1.3 对输出的key2、value2进行分区。(默认只有一个分区)
1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
1.5 (可选)分组后的数据进行归约。
我还加一句,这里对文件的操作只是逻辑切分,不是物理分割。
Reduce
2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
注:那哪些map任务进入到那些reduce节点,原则是按照分区。
2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的业务逻辑,对输入的key2、values2处理,转换成新的key3、value3输出。
注:为什么需要合并操作?因为需要将多个map任务输出的结果进行合并,合并之后既可以排序。分组前后,键值对的数目有变化吗?答案:没有变化。
2.3 把reduce的输出保存到文件(HDFS)中。
1.2. MapReduce是怎么进行节点间的项目协调的?
MapReduce简介
更进一步,MapReduce是怎么进行节点间的项目协调的?
JobTracker是整个工作的跟踪者,在Hadoop2.0以后叫RM(resourcemanager)。在Hadoop 1.0中,MapReduce由JobTracker,作业跟踪器,负责监控全局的任务,又要分配资源,负责调度(监控)Map运行在那台机器上,在2.0中就拆分了,资源的分配交给ResourceManager,资源的监控交给MRAppMaster。
TaskTracker是JobTracker下的跟踪者,他的下面是Map或者Reduce任务,完成具体的计算。TaskTracker在Hadoop2.0以后叫NM(NodeManager)。
分区,排序,合并都是在shuffle完成的。
按照K2进行分组,将V2放到一个集合里。
2. 自带的WordCount示例
上面已经简单的对MapReduce进行了介绍,怎么用?前面也提到了,MapReduce只是一个流程,我们得“找点事做”,才能体现它的分布式运行。下面这一句来其他博客:
单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的"src/examples"目录下找到。
这里借鉴了:
Example: WordCount v1.0
MapReduce Tutorial
运行Hadoop自带的wordcount单词统计程序
Hadoop入门-WordCount示例
最最简单的就是我们用官方写好的Jar包自己跑一次,于是进入/hadoop-2.4.1/share/hadoop/mapreduce
目录,找到hadoop-mapreduce-examples-2.4.1.jar
。
怎么用呢?当然可以查文档,如果你懒可以这样:
hadoop jar <jar包所在的路径> <main方法所在的类的全类名> <传入参数>
和平时运行Jar包方式相同,只是加入了hadoop。
[root@vmcentos mapreduce]# hadoop jar /hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount
当如上面这样执行时,会得到下面的结果:
Usage: wordcount <in> <out>
得给东西进去,再给东西出来。
于是,往HDFS里面传入了一个word.txt文件。(我是随便传的一个单词文件,来自上面的引用资料的博客里面)。输出指定为/wcout
,这个输入输出都是HDFS的路径。具体操作就看HDFS那一篇了,这里不做赘述。
[root@vmcentos sbin]# hadoop jar /hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /word.txt /wcout
没有其他问题的话,可以得到的输出结果的日志信息:
20/02/13 06:06:39 INFO client.RMProxy: Connecting to ResourceManager at vmcentos/192.168.123.155:8032
20/02/13 06:06:41 INFO input.FileInputFormat: Total input paths to process : 1
20/02/13 06:06:41 INFO mapreduce.JobSubmitter: number of splits:1
20/02/13 06:06:41 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1581544645445_0001
20/02/13 06:06:41 INFO impl.YarnClientImpl: Submitted application application_1581544645445_0001
20/02/13 06:06:41 INFO mapreduce.Job: The url to track the job: http://vmcentos:8088/proxy/application_1581544645445_0001/
20/02/13 06:06:41 INFO mapreduce.Job: Running job: job_1581544645445_0001
20/02/13 06:06:47 INFO mapreduce.Job: Job job_1581544645445_0001 running in uber mode : false
20/02/13 06:06:47 INFO mapreduce.Job: map 0% reduce 0%
20/02/13 06:06:50 INFO mapreduce.Job: map 100% reduce 0%
20/02/13 06:06:54 INFO mapreduce.Job: map 100% reduce 100%
20/02/13 06:06:55 INFO mapreduce.Job: Job job_1581544645445_0001 completed successfully
20/02/13 06:06:56 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=78
FILE: Number of bytes written=185875
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=150
HDFS: Number of bytes written=48
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=1651
Total time spent by all reduces in occupied slots (ms)=1808
Total time spent by all map tasks (ms)=1651
Total time spent by all reduce tasks (ms)=1808
Total vcore-seconds taken by all map tasks=1651
Total vcore-seconds taken by all reduce tasks=1808
Total megabyte-seconds taken by all map tasks=1690624
Total megabyte-seconds taken by all reduce tasks=1851392
Map-Reduce Framework
Map input records=5
Map output records=10
Map output bytes=97
Map output materialized bytes=78
Input split bytes=94
Combine input records=10
Combine output records=6
Reduce input groups=6
Reduce shuffle bytes=78
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=111
CPU time spent (ms)=620
Physical memory (bytes) snapshot=218611712
Virtual memory (bytes) snapshot=730071040
Total committed heap usage (bytes)=136908800
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=56
File Output Format Counters
Bytes Written=48
在命令中设置的结果文件夹是/wcout
[root@vmcentos sbin]# hadoop dfs -ls /wcout
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
Found 2 items
-rw-r--r-- 1 root supergroup 0 2020-02-13 06:06 /wcout/_SUCCESS
-rw-r--r-- 1 root supergroup 48 2020-02-13 06:06 /wcout/part-r-00000
看一下结果究竟是什么:hadoop dfs -cat /wcout/part-r-00000
3. WordCount的执行流程
上面
还有个抱歉的地方是,我一开始没打算把这些东西发到博客上来,这里举的例子都是人家博客上的,我这里举例说明的也是人家博客上的,但是他只是做了上面的示例,并没有这一小节的解析。
首先是HDFS上的word.txt,这里面当然一五一十的存放着待我们处理的原始数据,从我们的角度来说,就是下面这样的:
leaf yyh
yyh xpleaf
katy ling
yeyonghao leaf
xpleaf katy
map()和reduce()
Map
前面提到了,这里的分割并非分割了物理文件,只是逻辑上的切分。回车换行空格算作一个。
<0,"leaf yyh">
<9,"yyh xpleaf">
<20,"katy ling">
<30,"yeyonghao leaf">
<45,"xpleaf katy">
想要进行分割就可以这样写(实现在下一小节):
后来我加了描述,前面的<0,"leaf yyh">
,0
是key
是字符在文本中的偏移量,我们这一步不需要进行处理,"leaf yyh"
是value
才是我们需要进行处理的。再加上前面是用空格进行分割的,所以才是下面这样。
String[] words = line.split(" ");
用循环将每一个单词取出:
for(String word : words){
//输出的也应该是<k,v>这种形式
context.write(word,1);
}
此处后面的v应该是1,索引并不是从0开始的。
当然,到了后面会写到,实际类型应该是这样:new Text(word), new LongWritable(1)
按照开始提到的
其实还应该有Shuffle进行分区,排序与分组,这些在一篇。
当然这一切都是可以自定义的。MapReduce的自定义排序、分区和分组
Reduce
经过前面的处理,到这里内容该是这样的:
<katy,{1,1}>
<leaf,{1,1}>
<xpleaf,{1,1}>
<yyh,{1,1}>
<ling,{1}>
<yeyonghao,{1}>
到了这里就该考虑如何拿到前面的数据并输出:
V2s是指前面过程V2的集合。
int flag = 0;
for(int i : V2s){
flag += i.get();
}
//输出
context.write(key,flag);
实际类型应该是这样:key, new LongWritable(counter)
于是最终结果就是下面的这样了:
katy 2
leaf 2
ling 1
xpleaf 2
yeyonghao 1
yyh 2
4. 自己实现WordCount示例
知道了执行流程,为什么不自己写一个简单的WordCount示例呢?我看其他博客将WordCount简写为WC,这样虽然有歧义,但问题不大。
如果你没有使用Maven,除了Hadoop的那一大堆Jar包,还得导入这些Jar包。
如果你第一次使用的Maven,可以用下面的办法加入国内源:
https://blog.csdn.net/qq_28553681/article/details/80940663
还需要额外注意的是,context传出时应使用包装类,像是下面这样:
Long->LongWritable
String->Text
想要实现map,创建一个类,名叫WCMapper
继承Mapper
重写map
,就像是下面这样:
public class WCMapper extends Mapper<Long , String, String, Long >{
@Override
protected void map(Long key, String value, Mapper<Long, String, String, Long>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.map(key, value, context);
}
}
在进行序列化和反序列化的时候要注意顺序和类型。
继续完善,就像是前面执行流程那一小节所作的那样。
WCMapper
package cn.cyqsd.hadoop.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WCMapper extends Mapper<LongWritable , Text, Text, LongWritable >{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
//split
String[] words = line.split(" ");
for(String w : words){
context.write(new Text(w), new LongWritable(1));
}
}
}
WCReducer
package cn.cyqsd.hadoop.mr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
//接收前面处理后的数据
long counter = 0;
for(LongWritable i : values){
counter += i.get();
}
//输出
context.write(key, new LongWritable(counter));
}
}
WordCount
类WordCount中有main方法:
package cn.cyqsd.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception{
//Job对象
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCount.class);
//设置Mapper
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("/word.txt"));
//设置Reducer
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path("/wcout1"));
//提交
job.waitForCompletion(true);
}
}
前面的都写好以后,打成Jar包,指定main方法在类WordCount中。
5. 额外的报错内容
下面的本文一些额外的内容,也就是常说的排错环节:
遇到报yarn/util/Apps:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/util/Apps
请加入下面的Jar包,或在Maven中加入。
提示:
则可以按照Intellij idea 报错:Error : java 不支持发行版本5,修改版本。
提示:
Exception in thread "main" java.io.IOException: Mkdirs failed to create 外加路径 \META-INF\license
则删除掉Jar包路径下的这个目录。
Hadoop java.io.IOException: Mkdirs failed to create /some/path
执行的日志如下:
C:\Users\Administrator\IdeaProjects\untitled1\out\artifacts\untitled1_jar>hadoop jar untitled1.jar cn.cyqsd.hadoop.mr.dc.DataCount /data.dat /dataout
20/02/16 05:26:56 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/02/16 05:26:58 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/02/16 05:27:01 INFO input.FileInputFormat: Total input paths to process : 1
20/02/16 05:27:02 INFO mapreduce.JobSubmitter: number of splits:1
20/02/16 05:27:03 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1581791945853_0001
20/02/16 05:27:05 INFO impl.YarnClientImpl: Submitted application application_1581791945853_0001
20/02/16 05:27:05 INFO mapreduce.Job: The url to track the job: http://DESKTOP-47RU81S:8088/proxy/application_1581791945853_0001/
20/02/16 05:27:05 INFO mapreduce.Job: Running job: job_1581791945853_0001
20/02/16 05:27:24 INFO mapreduce.Job: Job job_1581791945853_0001 running in uber mode : false
20/02/16 05:27:24 INFO mapreduce.Job: map 0% reduce 0%
20/02/16 05:27:31 INFO mapreduce.Job: map 100% reduce 0%
20/02/16 05:27:40 INFO mapreduce.Job: map 100% reduce 100%
20/02/16 05:27:41 INFO mapreduce.Job: Job job_1581791945853_0001 completed successfully
20/02/16 05:27:42 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1122
FILE: Number of bytes written=189695
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2323
HDFS: Number of bytes written=551
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=5274
Total time spent by all reduces in occupied slots (ms)=6008
Total time spent by all map tasks (ms)=5274
Total time spent by all reduce tasks (ms)=6008
Total vcore-seconds taken by all map tasks=5274
Total vcore-seconds taken by all reduce tasks=6008
Total megabyte-seconds taken by all map tasks=5400576
Total megabyte-seconds taken by all reduce tasks=6152192
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=1072
Map output materialized bytes=1122
Input split bytes=95
Combine input records=0
Combine output records=0
Reduce input groups=21
Reduce shuffle bytes=1122
Reduce input records=22
Reduce output records=21
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=48
CPU time spent (ms)=1654
Physical memory (bytes) snapshot=431742976
Virtual memory (bytes) snapshot=610852864
Total committed heap usage (bytes)=383778816
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2228
File Output Format Counters
Bytes Written=551
6. MapReduce提交过程源码分析
因为只是笔记,我想这一小节看的人并不会很多,时间有限,就没有完全整理,没有逐步核验是否正确。前面的文章中有下面这张流程图,我们简单分析下:1,2,3,4步骤的源码。关于Hadoop1.X和2.X的区别前文也已提到,需要有所区别,勿混淆。
我们先进入job的最后这一步提交:job.waitForCompletion(true);
。
进入:
public boolean waitForCompletion(boolean verbose )
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
里面有:
if (state == JobState.DEFINE) {
submit();
}
检测当前的提交状态,当可行时会往下执行submit()
,所以继续进入该方法:
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
提交完成直接返回。
有setUseNewAPI();
,还会有旧的APIsubmitJobInternal
在一个匿名回调函数中执行的,很重要
connect();
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
这里将cluster
作为当前类Job
的成员变量,cluster
中持有着ResourceManager进程的引用。
客户端想要通信时,需要持有代理对象
ugi是指UserGroupInformation这个类,权限用户的检查
在org.apache.hadoop.security
往下进入new Cluster(getConfiguration());
this(null, conf);
会接着往下调用它的重载方法。
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);
}
initialize(jobTrackAddr, conf);
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: " + e.getMessage());
}
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
在 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
中加了synchronized锁。ClientProtocolProvider provider : frameworkLoader
为了灵活,传那个类就得到那个代理对象。
再往后,创建了一个clientProtocol。provider.create
实现。
public abstract ClientProtocol create(Configuration conf) throws IOException;
public abstract ClientProtocol create(InetSocketAddress addr,
Configuration conf) throws IOException;
用接口ClientProtocol来接public interface ClientProtocol extends VersionedProtocol
有38个通信协议,每一个都有版本号。
得到代理对象后,又赋值给了client = clientProtocol;
private ClientProtocol client;
client
是Cluster
的成员变量。Cluster就持有了RPC通信的代理对象。同时,类Job
持有了cluster
的代理对象。
到这里connect();
就执行完毕了。
回到前面看JobSubmitter submitter
得到的这个提交器。
有getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
cluster.getClient()
ClientProtocol getClient() {
return client;
}
往后submitter.submitJobInternal(Job.this, cluster);
JobStatus submitJobInternal(Job job, Cluster cluster)
在org.apache.hadoop.mapreduce
的JobSubmitter
中
有:
//validate the jobs output specs
checkSpecs(job);
验证作业输出规格
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
job.getConfiguration());
客户端进行请求,服务器端返回Path,其中cluster持有的服务器代理对象。进入getStagingDir
public static Path getStagingDir(Cluster cluster, Configuration conf)
Path stagingArea = cluster.getStagingAreaDir();
FileSystem fs = stagingArea.getFileSystem(conf);
String realUser;
String currentUser;
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
接口里面定义了getStagingAreaDir
方法。
/**
* Grab the jobtracker's view of the staging directory path where
* job-specific files will be placed.
*
* @return the staging directory where job-specific files are to be placed.
*/
public Path getStagingAreaDir() throws IOException, InterruptedException {
if (stagingAreaDir == null) {
stagingAreaDir = new Path(client.getStagingAreaDir());
}
return stagingAreaDir;
}
在new Path(client.getStagingAreaDir());
中,因为client是客户端的代理对象,如何与服务器通信的实现在:
里面是这样的:
public String getSystemDir() throws IOException, InterruptedException {
return this.resMgrDelegate.getSystemDir();
}
进入getSystemDir()
通过getStagingAreaDir
,从配置文件中得到Path。
这样就通过RPC得到了存储Jar包的路径。
再往前看JobID jobId = submitClient.getNewJobID();
得到了jobId,submitClient
使用copyAndConfigureFiles(job, submitJobDir);
将配置信息拷贝到HDFS中。replication
中就显示了写Jar包写了10份,配置信息中配置。
/**
* configure the jobconf of the user with the command line options of
* -libjars, -files, -archives.
* @param conf
* @throws IOException
*/
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
Configuration conf = job.getConfiguration();
short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
copyAndConfigureFiles(job, jobSubmitDir, replication);
// Set the working directory
if (job.getWorkingDirectory() == null) {
job.setWorkingDirectory(jtFs.getWorkingDirectory());
}
}
// configures -files, -libjars and -archives.
private void copyAndConfigureFiles(Job job, Path submitJobDir,short replication)
拷贝临时文件
// get all the command line arguments passed in by the user conf
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
String jobJar = job.getJar();
在 JobStatus submitJobInternal(Job job, Cluster cluster)
中往下走int maps = writeSplits(job, submitJobDir);
决定启动多少个map
// Create the splits for the job
再往后:
现在,实际提交作业(使用提交名称)
将status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
中的信息提交给了ResourceManager
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。