本文仅作为我的学习笔记。保存在我的博客,加入了一些我自己的理解,便于我自己区分,错误是肯定存在,请留言联系我,我会第一时间改正。
mapreduce为什么被淘汰了?
我从不会赞同某一编程语言或编程框架会被完全淘汰。虽然程序过时了,但是这样的思想还会继续影响着我们。

1. 什么是MapReduce

具体可以看官方介绍,或是Hadoop集群(第6期)_WordCount运行详解中的1、MapReduce理论简介->1.1 MapReduce编程模型,我因为这开始只是我的笔记,所以搬运了。还参考了Hadoop开发--MapReduce编程--示例代码(九)
MapReduce Tutorial

MapReduce是一个处理流程,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。更简单的描述就是,MapReduce把原本复杂的计算,分给了下面的各个结点完成。这么说来好像步骤很少,似乎也不是很复杂,但是如果仔细思考就会知道,你自己光是处理每个节点的数据协调就是很费时的事,更不要提再加上各种状态汇报、心跳汇报。而MapReduce就可以让我们不去考虑上面提到的费事的工作,让开发人员专心进行数据的处理,那么又怎么实现?主要是由两个步骤(阶段,流程)组成,分别是:Map和Reduce。

1.1. map()和reduce()

如果只是为了实现简单的分布式计算,也可以理解为实现map()和reduce()两个函数。处理的流程大概是什么样子的?
额外说一下,MapReduce作为Hadoop的一部分,于是他的储存系统就是HDFS。其运算过程中的数据等会保存到HDFS上。

20200219020814284_24194.webp

从这个图的最下方开始看,左边是输入,右边是输出的结果。从左往右看,传入map()和reduce()两个函数的形参是key、value对,表示输入进入处理流程的信息,每一个流程的数据(键值对)为了便于区分,都取了名字。我在一些博客上看到,贴切的称为取小名。

阶段输入键值对输出键值对
Map<K1,V1><K2,V2>
Reduce<K2,{V2}><K3,V3>

这基本上就相当于,如果有人问到<K3,V3>是什么时候的数据啊?就是Reduce阶段,输出数据的键值对。
下面的内容照抄了:Mapreduce 原理及程序分析

Map
1.1 读取输入文件(HDFS)内容,解析成key1、value1对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
注:key是当前行的起始位置,单位是字节。第一行的起始位置是0,value是当前行的内容。有多少行就产生多少键值对。每个键值对调用一个map函数。

     **注意区别map任务与map函数,map函数仅仅是map任务中的一个步骤。**

1.2 覆盖map函数,写自己的逻辑,对输入的key1、value1处理,转换成新的key2、value2输出。
1.3 对输出的key2、value2进行分区。(默认只有一个分区)
1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
1.5 (可选)分组后的数据进行归约。

我还加一句,这里对文件的操作只是逻辑切分,不是物理分割。

Reduce
2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
注:那哪些map任务进入到那些reduce节点,原则是按照分区。
2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的业务逻辑,对输入的key2、values2处理,转换成新的key3、value3输出。
注:为什么需要合并操作?因为需要将多个map任务输出的结果进行合并,合并之后既可以排序。分组前后,键值对的数目有变化吗?答案:没有变化。
2.3 把reduce的输出保存到文件(HDFS)中。

1.2. MapReduce是怎么进行节点间的项目协调的?

MapReduce简介
更进一步,MapReduce是怎么进行节点间的项目协调的?

20200219020507453_9778.webp

JobTracker是整个工作的跟踪者,在Hadoop2.0以后叫RM(resourcemanager)。在Hadoop 1.0中,MapReduce由JobTracker,作业跟踪器,负责监控全局的任务,又要分配资源,负责调度(监控)Map运行在那台机器上,在2.0中就拆分了,资源的分配交给ResourceManager,资源的监控交给MRAppMaster。

TaskTracker是JobTracker下的跟踪者,他的下面是Map或者Reduce任务,完成具体的计算。TaskTracker在Hadoop2.0以后叫NM(NodeManager)。

20200219014650498_11441.webp

分区,排序,合并都是在shuffle完成的。
按照K2进行分组,将V2放到一个集合里。

2. 自带的WordCount示例

上面已经简单的对MapReduce进行了介绍,怎么用?前面也提到了,MapReduce只是一个流程,我们得“找点事做”,才能体现它的分布式运行。下面这一句来其他博客:

单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的"src/examples"目录下找到。

这里借鉴了:

Example: WordCount v1.0
MapReduce Tutorial
运行Hadoop自带的wordcount单词统计程序
Hadoop入门-WordCount示例

最最简单的就是我们用官方写好的Jar包自己跑一次,于是进入/hadoop-2.4.1/share/hadoop/mapreduce目录,找到hadoop-mapreduce-examples-2.4.1.jar
怎么用呢?当然可以查文档,如果你懒可以这样:

hadoop jar <jar包所在的路径> <main方法所在的类的全类名> <传入参数>

和平时运行Jar包方式相同,只是加入了hadoop。

[root@vmcentos mapreduce]# hadoop jar /hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount

当如上面这样执行时,会得到下面的结果:

Usage: wordcount <in> <out>

得给东西进去,再给东西出来。
于是,往HDFS里面传入了一个word.txt文件。(我是随便传的一个单词文件,来自上面的引用资料的博客里面)。输出指定为/wcout,这个输入输出都是HDFS的路径。具体操作就看HDFS那一篇了,这里不做赘述。

[root@vmcentos sbin]# hadoop jar /hadoop-2.4.1/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /word.txt /wcout

没有其他问题的话,可以得到的输出结果的日志信息:

20/02/13 06:06:39 INFO client.RMProxy: Connecting to ResourceManager at vmcentos/192.168.123.155:8032
20/02/13 06:06:41 INFO input.FileInputFormat: Total input paths to process : 1
20/02/13 06:06:41 INFO mapreduce.JobSubmitter: number of splits:1
20/02/13 06:06:41 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1581544645445_0001
20/02/13 06:06:41 INFO impl.YarnClientImpl: Submitted application application_1581544645445_0001
20/02/13 06:06:41 INFO mapreduce.Job: The url to track the job: http://vmcentos:8088/proxy/application_1581544645445_0001/
20/02/13 06:06:41 INFO mapreduce.Job: Running job: job_1581544645445_0001
20/02/13 06:06:47 INFO mapreduce.Job: Job job_1581544645445_0001 running in uber mode : false
20/02/13 06:06:47 INFO mapreduce.Job:  map 0% reduce 0%
20/02/13 06:06:50 INFO mapreduce.Job:  map 100% reduce 0%
20/02/13 06:06:54 INFO mapreduce.Job:  map 100% reduce 100%
20/02/13 06:06:55 INFO mapreduce.Job: Job job_1581544645445_0001 completed successfully
20/02/13 06:06:56 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=78
                FILE: Number of bytes written=185875
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=150
                HDFS: Number of bytes written=48
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=1651
                Total time spent by all reduces in occupied slots (ms)=1808
                Total time spent by all map tasks (ms)=1651
                Total time spent by all reduce tasks (ms)=1808
                Total vcore-seconds taken by all map tasks=1651
                Total vcore-seconds taken by all reduce tasks=1808
                Total megabyte-seconds taken by all map tasks=1690624
                Total megabyte-seconds taken by all reduce tasks=1851392
        Map-Reduce Framework
                Map input records=5
                Map output records=10
                Map output bytes=97
                Map output materialized bytes=78
                Input split bytes=94
                Combine input records=10
                Combine output records=6
                Reduce input groups=6
                Reduce shuffle bytes=78
                Reduce input records=6
                Reduce output records=6
                Spilled Records=12
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=111
                CPU time spent (ms)=620
                Physical memory (bytes) snapshot=218611712
                Virtual memory (bytes) snapshot=730071040
                Total committed heap usage (bytes)=136908800
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=56
        File Output Format Counters
                Bytes Written=48

在命令中设置的结果文件夹是/wcout

[root@vmcentos sbin]# hadoop dfs -ls /wcout
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Found 2 items
-rw-r--r--   1 root supergroup          0 2020-02-13 06:06 /wcout/_SUCCESS
-rw-r--r--   1 root supergroup         48 2020-02-13 06:06 /wcout/part-r-00000

看一下结果究竟是什么:hadoop dfs -cat /wcout/part-r-00000

20200213061418304_16870.webp

3. WordCount的执行流程

上面
还有个抱歉的地方是,我一开始没打算把这些东西发到博客上来,这里举的例子都是人家博客上的,我这里举例说明的也是人家博客上的,但是他只是做了上面的示例,并没有这一小节的解析。
首先是HDFS上的word.txt,这里面当然一五一十的存放着待我们处理的原始数据,从我们的角度来说,就是下面这样的:

leaf yyh
yyh xpleaf
katy ling
yeyonghao leaf
xpleaf katy

map()和reduce()
Map
前面提到了,这里的分割并非分割了物理文件,只是逻辑上的切分。回车换行空格算作一个。

<0,"leaf yyh">
<9,"yyh xpleaf">
<20,"katy ling">
<30,"yeyonghao leaf">
<45,"xpleaf katy">

想要进行分割就可以这样写(实现在下一小节):
后来我加了描述,前面的<0,"leaf yyh">0key是字符在文本中的偏移量,我们这一步不需要进行处理,"leaf yyh"value才是我们需要进行处理的。再加上前面是用空格进行分割的,所以才是下面这样。

String[] words = line.split(" ");

用循环将每一个单词取出:

for(String word : words){
//输出的也应该是<k,v>这种形式
    context.write(word,1);
}

此处后面的v应该是1,索引并不是从0开始的。
当然,到了后面会写到,实际类型应该是这样:new Text(word), new LongWritable(1)

按照开始提到的
其实还应该有Shuffle进行分区,排序与分组,这些在一篇。
当然这一切都是可以自定义的。MapReduce的自定义排序、分区和分组

Reduce
经过前面的处理,到这里内容该是这样的:

<katy,{1,1}>
<leaf,{1,1}>
<xpleaf,{1,1}>
<yyh,{1,1}>
<ling,{1}>
<yeyonghao,{1}>

到了这里就该考虑如何拿到前面的数据并输出:
V2s是指前面过程V2的集合。

int flag = 0;
for(int i : V2s){
    flag += i.get();
}
//输出
context.write(key,flag);

实际类型应该是这样:key, new LongWritable(counter)
于是最终结果就是下面的这样了:

katy    2
leaf    2
ling    1
xpleaf  2
yeyonghao       1
yyh     2

4. 自己实现WordCount示例

知道了执行流程,为什么不自己写一个简单的WordCount示例呢?我看其他博客将WordCount简写为WC,这样虽然有歧义,但问题不大。
如果你没有使用Maven,除了Hadoop的那一大堆Jar包,还得导入这些Jar包。

20200213070728084_3308.webp

如果你第一次使用的Maven,可以用下面的办法加入国内源:
https://blog.csdn.net/qq_28553681/article/details/80940663

还需要额外注意的是,context传出时应使用包装类,像是下面这样:
Long->LongWritable
String->Text

想要实现map,创建一个类,名叫WCMapper继承Mapper重写map,就像是下面这样:

public class WCMapper extends Mapper<Long , String, String, Long >{

    @Override
    protected void map(Long key, String value, Mapper<Long, String, String, Long>.Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        super.map(key, value, context);
    }

}

20200216005425283_28233.webp

在进行序列化和反序列化的时候要注意顺序和类型。

继续完善,就像是前面执行流程那一小节所作的那样。

WCMapper

package cn.cyqsd.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WCMapper extends Mapper<LongWritable , Text, Text, LongWritable >{

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String line = value.toString();
        //split
        String[] words = line.split(" ");
        for(String w : words){
            context.write(new Text(w), new LongWritable(1));
        }
    }

}

WCReducer

package cn.cyqsd.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
        //接收前面处理后的数据
        long counter = 0;
        for(LongWritable i : values){
            counter += i.get();
        }
        //输出
        context.write(key, new LongWritable(counter));
    }

}

WordCount
类WordCount中有main方法:

package cn.cyqsd.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static void main(String[] args) throws Exception{
        //Job对象
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(WordCount.class);
        //设置Mapper
        job.setMapperClass(WCMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        FileInputFormat.setInputPaths(job, new Path("/word.txt"));
        //设置Reducer
        job.setReducerClass(WCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileOutputFormat.setOutputPath(job, new Path("/wcout1"));        
        //提交
        job.waitForCompletion(true);
    }

}

前面的都写好以后,打成Jar包,指定main方法在类WordCount中。

5. 额外的报错内容

下面的本文一些额外的内容,也就是常说的排错环节:
遇到报yarn/util/Apps:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/util/Apps

请加入下面的Jar包,或在Maven中加入。

20200217051358208_18962.webp

提示:

20200216024821106_11811.webp

则可以按照Intellij idea 报错:Error : java 不支持发行版本5,修改版本。

提示:

Exception in thread "main" java.io.IOException: Mkdirs failed to create 外加路径 \META-INF\license

则删除掉Jar包路径下的这个目录。

Hadoop java.io.IOException: Mkdirs failed to create /some/path

20200216052601286_894.webp

执行的日志如下:

C:\Users\Administrator\IdeaProjects\untitled1\out\artifacts\untitled1_jar>hadoop jar untitled1.jar cn.cyqsd.hadoop.mr.dc.DataCount /data.dat /dataout
20/02/16 05:26:56 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/02/16 05:26:58 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
20/02/16 05:27:01 INFO input.FileInputFormat: Total input paths to process : 1
20/02/16 05:27:02 INFO mapreduce.JobSubmitter: number of splits:1
20/02/16 05:27:03 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1581791945853_0001
20/02/16 05:27:05 INFO impl.YarnClientImpl: Submitted application application_1581791945853_0001
20/02/16 05:27:05 INFO mapreduce.Job: The url to track the job: http://DESKTOP-47RU81S:8088/proxy/application_1581791945853_0001/
20/02/16 05:27:05 INFO mapreduce.Job: Running job: job_1581791945853_0001
20/02/16 05:27:24 INFO mapreduce.Job: Job job_1581791945853_0001 running in uber mode : false
20/02/16 05:27:24 INFO mapreduce.Job:  map 0% reduce 0%
20/02/16 05:27:31 INFO mapreduce.Job:  map 100% reduce 0%
20/02/16 05:27:40 INFO mapreduce.Job:  map 100% reduce 100%
20/02/16 05:27:41 INFO mapreduce.Job: Job job_1581791945853_0001 completed successfully
20/02/16 05:27:42 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=1122
                FILE: Number of bytes written=189695
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=2323
                HDFS: Number of bytes written=551
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=5274
                Total time spent by all reduces in occupied slots (ms)=6008
                Total time spent by all map tasks (ms)=5274
                Total time spent by all reduce tasks (ms)=6008
                Total vcore-seconds taken by all map tasks=5274
                Total vcore-seconds taken by all reduce tasks=6008
                Total megabyte-seconds taken by all map tasks=5400576
                Total megabyte-seconds taken by all reduce tasks=6152192
        Map-Reduce Framework
                Map input records=22
                Map output records=22
                Map output bytes=1072
                Map output materialized bytes=1122
                Input split bytes=95
                Combine input records=0
                Combine output records=0
                Reduce input groups=21
                Reduce shuffle bytes=1122
                Reduce input records=22
                Reduce output records=21
                Spilled Records=44
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=48
                CPU time spent (ms)=1654
                Physical memory (bytes) snapshot=431742976
                Virtual memory (bytes) snapshot=610852864
                Total committed heap usage (bytes)=383778816
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=2228
        File Output Format Counters
                Bytes Written=551

6. MapReduce提交过程源码分析

因为只是笔记,我想这一小节看的人并不会很多,时间有限,就没有完全整理,没有逐步核验是否正确。前面的文章中有下面这张流程图,我们简单分析下:1,2,3,4步骤的源码。关于Hadoop1.X和2.X的区别前文也已提到,需要有所区别,勿混淆。

20200219014650498_11441.webp

我们先进入job的最后这一步提交:job.waitForCompletion(true);

20200216031502891_31818.webp

进入:

20200216031811755_16627.webp

public boolean waitForCompletion(boolean verbose )

  /**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

里面有:

if (state == JobState.DEFINE) {
  submit();
}

检测当前的提交状态,当可行时会往下执行submit(),所以继续进入该方法:

20200216032054550_9288.webp

  /**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

提交完成直接返回。
setUseNewAPI();,还会有旧的API
submitJobInternal在一个匿名回调函数中执行的,很重要

connect();

20200216032550977_27210.webp

  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

这里将cluster作为当前类Job的成员变量,cluster中持有着ResourceManager进程的引用。
客户端想要通信时,需要持有代理对象

ugi是指UserGroupInformation这个类,权限用户的检查
在org.apache.hadoop.security

往下进入new Cluster(getConfiguration());

20200216033312472_15562.webp

this(null, conf);会接着往下调用它的重载方法。

20200216033152152_31862.webp

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);
  }

initialize(jobTrackAddr, conf);

  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)中加了synchronized锁。
ClientProtocolProvider provider : frameworkLoader为了灵活,传那个类就得到那个代理对象。
再往后,创建了一个clientProtocol。provider.create实现。

  public abstract ClientProtocol create(Configuration conf) throws IOException;
  
  public abstract ClientProtocol create(InetSocketAddress addr,
      Configuration conf) throws IOException;

用接口ClientProtocol来接
public interface ClientProtocol extends VersionedProtocol

20200216034304245_27890.webp

有38个通信协议,每一个都有版本号。
得到代理对象后,又赋值给了client = clientProtocol;

  private ClientProtocol client;

clientCluster的成员变量。Cluster就持有了RPC通信的代理对象。同时,类Job持有了cluster的代理对象。
到这里connect();就执行完毕了。
回到前面看JobSubmitter submitter得到的这个提交器。
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());

cluster.getClient()

  ClientProtocol getClient() {
    return client;
  }

往后submitter.submitJobInternal(Job.this, cluster);
JobStatus submitJobInternal(Job job, Cluster cluster)

org.apache.hadoop.mapreduceJobSubmitter

20200216035138659_18433.webp

有:

//validate the jobs output specs 
checkSpecs(job);

验证作业输出规格

Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
                                                 job.getConfiguration());

客户端进行请求,服务器端返回Path,其中cluster持有的服务器代理对象。进入getStagingDir

public static Path getStagingDir(Cluster cluster, Configuration conf)

Path stagingArea = cluster.getStagingAreaDir();
FileSystem fs = stagingArea.getFileSystem(conf);
String realUser;
String currentUser;
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();

接口里面定义了getStagingAreaDir方法。

  /**
   * Grab the jobtracker's view of the staging directory path where 
   * job-specific files will  be placed.
   * 
   * @return the staging directory where job-specific files are to be placed.
   */
  public Path getStagingAreaDir() throws IOException, InterruptedException {
    if (stagingAreaDir == null) {
      stagingAreaDir = new Path(client.getStagingAreaDir());
    }
    return stagingAreaDir;
  }

20200216035843154_30576.webp

new Path(client.getStagingAreaDir());中,因为client是客户端的代理对象,如何与服务器通信的实现在:

20200219050123316_21959.webp

里面是这样的:

public String getSystemDir() throws IOException, InterruptedException {
    return this.resMgrDelegate.getSystemDir();
}

进入getSystemDir()

20200219050410716_9035.webp

通过getStagingAreaDir,从配置文件中得到Path。

20200219050814341_9462.webp

这样就通过RPC得到了存储Jar包的路径。

再往前看JobID jobId = submitClient.getNewJobID();得到了jobId,submitClient

使用copyAndConfigureFiles(job, submitJobDir);将配置信息拷贝到HDFS中。
replication中就显示了写Jar包写了10份,配置信息中配置。

  /**
   * configure the jobconf of the user with the command line options of 
   * -libjars, -files, -archives.
   * @param conf
   * @throws IOException
   */
  private void copyAndConfigureFiles(Job job, Path jobSubmitDir) 
  throws IOException {
    Configuration conf = job.getConfiguration();
    short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
    copyAndConfigureFiles(job, jobSubmitDir, replication);

    // Set the working directory
    if (job.getWorkingDirectory() == null) {
      job.setWorkingDirectory(jtFs.getWorkingDirectory());          
    }

  }

// configures -files, -libjars and -archives.
private void copyAndConfigureFiles(Job job, Path submitJobDir,short replication)

拷贝临时文件

// get all the command line arguments passed in by the user conf
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
String jobJar = job.getJar();

JobStatus submitJobInternal(Job job, Cluster cluster) 中往下走int maps = writeSplits(job, submitJobDir);
决定启动多少个map

// Create the splits for the job

再往后:

20200216043832792_20413.webp

现在,实际提交作业(使用提交名称)
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());中的信息提交给了ResourceManager

20200219051003349_5313.webp

文章目录