在前一篇笔记中,提到了Hadoop 2.x在CentOS 6.9和Windows下的伪分布式部署。有了上一篇的基础,就可以在Linux或者Windows上搭建一个伪分布式的Hadoop了,继续捣鼓。这篇依旧是我的笔记,因为时间跨度大,本文是我的一系列文章,我直接或间接的引用了多篇网络文章的解决办法,用于解决本文实践中遇到的问题,时间过于仓促,并没有完全标明引用地址,再次表示感谢。

最好配着官方的文档食用,这里是翻译版:Hadoop分布式文件系统:架构和设计
文档里面对HDFS的各项特性,描述的非常完整全面,我就不画蛇添足了。
还可以看一些优秀的博客文章:初步掌握HDFS的架构及原理,本文中还提及了很多。

1. 分布式文件系统与HDFS

一开始先让HDFS跑起来。前面的环境配置好了,到这里就很简单了。你可以把它当作平时操作本地文件这样,当然本质上也的确是包装后的。

20200212004141488_32682.webp

操作HDFS文件系统可以使用hadoop fs,附加的参数和Linux的命令几乎相同,这里列出了几样常见的命令,手册或者help都写明白了的。
上传文件可以使用命令:
Linux:hadoop fs -copyFromLocal /root/logo.svg /logo.svg
Windows:hadoop fs -copyFromLocal C:\Users\Administrator\Desktop\logo.svg /logo.svg

至于启动流程是这样:

  • 先启动的NameNode
  • 再启动DataNode
  • 然后启动SecondaryNode

在图形界面中也可以看到该内容。

20200212001802873_21148.webp

在命令中也可以看到这个内容:

hadoop fs -ls hdfs://182centos:9000/logo_cyqsd.svg

[-ls [-d] [-h] [-R] [<path> ...]]

-R则是递归查看所有目录下的文件。
-h则是可读的大小。

20200212002445475_29607.webp

hadoop fs -cat hdfs://182centos:9000/logo_cyqsd.svg

20200212002100048_15837.webp

如果文件太长可以使用 | more

-copyToLocal复制文件到本地

-count计数

20200212002018127_6822.webp

创建文件夹:

hadoop fs -mkdir /cyqsd_test

-rm删除文件

-rm -r删除文件夹

hadoop fs -chmod a+x /logo_cyqsd.svg

HDFS的操作就是这么简单,他完成了很多原本很复杂的工作。那么是怎么实现的呢?

1.1. HDFS架构(Namenode 和 Datanode)

放上一张网上泛滥的结构图,来自前文提到过的官方文档。

20200223001300265_29486.webp

这里面重要的就是几个节点及其相互作用关系。还需提到的是多个块数据同时读取进行拼接的时候不能同时读取。遵循就近读取原则。后文是对其的详细介绍。

  • NameNode:负责管理节点
  • DataNode:存储数据节点
  • SecondaryNode:辅助DataNode节点,但是并非热备份,无法完全替代其工作

Namenode和Datanode被设计成可以在普通的商用机器上运行。这些机器一般运行着GNU/Linux操作系统(OS)。HDFS采用Java语言开发,因此任何支持Java的机器都可以部署Namenode或Datanode。由于采用了可移植性极强的Java语言,使得HDFS可以部署到多种类型的机器上。一个典型的部署场景是一台机器上只运行一个Namenode实例,而集群中的其它机器分别运行一个Datanode实例。这种架构并不排斥在一台机器上运行多个Datanode,只不过这样的情况比较少见。

集群中单一Namenode的结构大大简化了系统的架构。Namenode是所有HDFS元数据的仲裁者和管理者,这样,用户数据永远不会流过Namenode。

1.2. Metadata

我把文件MetaData(元数据),(fsimage + editslog)认为是数据本身(后文会提到这两项),后面的几个小点都是围绕着元数据(Metadata)储存细节展开的。为了数据安全,应该内存和磁盘各保存一份。需要注意的是元数据是存放在内存中的信息。

1.3. NameNode

NameNode是一个主节点,主要的目的是为了保存文件元数据,单节点。接收用户的操作。
管理和维护着整个文件系统,包含着下面的文件。

  • fsimage,元数据的镜像文件,保存在磁盘上。并不是和内存实时同步。
  • edits,日志文件,保存在磁盘上。
  • fstime:保存最近一次checkpoint的时间,保存在磁盘上。

我们可以实际看一下长啥样:

20200212013336347_31625.webp

20200212013352138_1455.webp

20200212013554927_5582.webp

20200212013612723_16682.webp

总结前文的元数据(Metadata)和NameNode,就是NameNode会在内存中保存Metadata,但并不是真正意义上的的持久储存,同时等待其他进程的请求,并处理读的请求。
NameNode中的元数据(Metadata)会有一个镜像,称作fsimage。当然是通过一系列的操作生成的,但是是交给SecondaryNameNode来操作的,详见下一小节。
NameNode中的元数据(Metadata)还有一个称作日志文件的edits。当有写的请求时,NameNode会向edits中写日志,成功,会返回内存中,在提交给客户端。

Hadoop 0.18的文档Hadoop分布式文件系统使用指南中提到了Secondary NameNode的工作内容和流程。简要概括就是帮助NameNode同步数据的一个小老弟。

NameNode将对文件系统的改动追加保存到本地文件系统上的一个日志文件(edits)。当一个NameNode启动时,它首先从一个映像文件(fsimage)中读取HDFS的状态,接着应用日志文件中的edits操作。然后它将新的HDFS状态写入(fsimage)中,并使用一个空的edits文件开始正常操作。因为NameNode只有在启动阶段才合并fsimage和edits,所以久而久之日志文件可能会变得非常庞大,特别是对大型的集群。日志文件太大的另一个副作用是下一次NameNode启动会花很长时间。

由于每一个进程本身就是一个类,如果有兴趣探究源码的话,可以进入下面的类:

20200213023754476_8750.webp

20200213023935530_22782.webp

20200213024000746_13951.webp

1.4. SecondaryNameNode

在Hadoop2.0以后没有SecondaryNameNode,但是在伪分布式上有。SecondaryNameNode也被称作从元数据节点。

Secondary NameNode定期合并fsimage和edits日志,将edits日志文件大小控制在一个限度下。因为内存需求和NameNode在一个数量级上,所以通常secondary NameNode和NameNode运行在不同的机器上。Secondary NameNode通过bin/start-dfs.sh在conf/masters中指定的节点上启动。

前面的意思基本上就是从节点NameNode上先下载映像文件(fsimage),读取状态,然后下载日志文件(edits),生成新的合并后在发送到NameNode上,替换旧的。后文会提到触发条件。

在Hadoop1.0时,NameNode和SecondaryNameNode应该分开部署。如果NameNode的磁盘发生了损坏,元数据也损坏了。在官方文档的元数据磁盘错误中提到了:

FsImage和Editlog是HDFS的核心数据结构。如果这些文件损坏了,整个HDFS实例都将失效。因而,Namenode可以配置成支持维护多个FsImage和Editlog的副本。任何对FsImage或者Editlog的修改,都将同步到它们的副本上。这种多副本的同步操作可能会降低Namenode每秒处理的名字空间事务数量。然而这个代价是可以接受的,因为即使HDFS的应用是数据密集的,它们也非元数据密集的。当Namenode重启的时候,它会选取最近的完整的FsImage和Editlog来使用。

Namenode是HDFS集群中的单点故障(single point of failure)所在。如果Namenode机器故障,是需要手工干预的。目前,自动重启或在另一台机器上做Namenode故障转移的功能还没实现。

20200217030520898_21874.webp

还有我没有深究的是,我在有的博客资料中看到,元数据文件,在从Namenode往SecondaryNameNode复制的时候使用HTTP GET请求,回去的时候用HTTP POST请求。这个我不太确定,还有待考证。

还有就是一些小细节。客户端先请求NameNode,NameNode再将信息返回到客户端,再往DataNode里面写数据,要操作edits,记录日志信息,成功的记录成功,失败的记录失败。内存中的Metadata也要记录。这时fsimage没同步,不加。比如很长一段时间时间以前,往HDFS上传了两个文件,然后接下来没做任何操作。内存中的Metadata保存了两个。fsimage也是两条。edits可能一条数据都没有,因为合并一次就会有新的。(可能合并了多次了。)只要数据一同步,合并后edits要清空。

前面提到了会进行一系列的操作,那么具体多久操作呢?
这样的操作叫做checkpoint,你可以认为这是一种阈值或是触发设计。分别由下面的两个参数确定,满足了下面设定的任何一个条件,都会触发SecondaryNameNode下载edits和fsimage,单是下载完成后不会删除掉,比如合并的过程出现失败。直到最后新生成的fsimage才会替换旧的这时才会删除。也被称为切换。

  • fs.checkpoint.period 默认是3600秒,每隔一个小时,SecondaryNameNode就要下载fsimage和edits,进行数据的同步。
  • fs.checkpoint.size edits作为日志文件,只要在不断正常运行,就会一直增加。一旦达到,就要进行合并。

1.5. DataNode

前文讲了NameNode和他的小老弟SecondaryNameNode,他们都是HDFS的核心,但是都是负责的元数据。真正的苦力活还是交给了DataNode。用户数据永远不会流过Namenode。本小节只提到基本的流程,可以到HDFS 之 DataNode 详解,看详解,写得很好,很清楚。

DataNode储存的基本单位是块(Block),文件的大小是size,依照设置好的Block大小,遵循文件偏移量的顺序对文件进行排序。还需注意的是,最后一个文件块,可能大于实际分出来的大小,不占用存储空间。
hdfs-site.xml中可以配置dfs.replication属性,设置多副本,默认是3个。

下面是块的复制:

20200223005158529_11047.webp

我们也可以看看实际长什么样子:

hadoop fs -copyFromLocal /root/laragon.7z /

因为HDFS默认的块(Block)大小被设置为了64MB,上传一个243MB的文件到HDFS中,查看/tmp/目录:

[root@182centos dfs]# ls
data  name  namesecondary
[root@182centos dfs]# cd data/
[root@182centos data]# ls
current  in_use.lock
[root@182centos data]# cd current/
[root@182centos current]# ls
BP-1770640857-127.0.0.1-1579014237318  VERSION
[root@182centos current]# cd BP-1770640857-127.0.0.1-1579014237318/
[root@182centos BP-1770640857-127.0.0.1-1579014237318]# ls
current  dncp_block_verification.log.curr  dncp_block_verification.log.prev  tmp
[root@182centos BP-1770640857-127.0.0.1-1579014237318]# cd current/
[root@182centos current]# ls
finalized  rbw  VERSION
[root@182centos current]# cd finalized/
[root@182centos finalized]# ls
blk_1073741825            blk_1073741826            blk_1073741827
blk_1073741825_1001.meta  blk_1073741826_1002.meta  blk_1073741827_1003.meta
[root@182centos finalized]# pwd
/cyqsd/hadoop-2.4.1/tmp/dfs/data/current/BP-1770640857-127.0.0.1-1579014237318/current/finalized
[root@182centos finalized]#

对我来说,我最终进入的目录路径就是这样的:

/cyqsd/hadoop-2.4.1/tmp/dfs/data/current/BP-1770640857-127.0.0.1-1579014237318/current/finalized

里面有这些东西:

20200212033103446_11371.webp

因为是伪分布式,所以只保存一份。

源文件的大小是这样的:

20200212033257309_880.webp

可以试着往回推一下,是肯定能推回去的。

当然如果数据受损,我们也可以进行一些操作,例如打印受损的块:

[root@VMcentos sbin]# hadoop fsck /
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Connecting to namenode via http://VMcentos:50070
FSCK started by root (auth:SIMPLE) from /127.0.0.1 for path / at Thu Feb 13 00:35:41 CST 2020
.Status: HEALTHY
 Total size:    1582 B
 Total dirs:    1
 Total files:   1
 Total symlinks:                0
 Total blocks (validated):      1 (avg. block size 1582 B)
 Minimally replicated blocks:   1 (100.0 %)
 Over-replicated blocks:        0 (0.0 %)
 Under-replicated blocks:       0 (0.0 %)
 Mis-replicated blocks:         0 (0.0 %)
 Default replication factor:    1
 Average block replication:     1.0
 Corrupt blocks:                0
 Missing replicas:              0 (0.0 %)
 Number of data-nodes:          1
 Number of racks:               1
FSCK ended at Thu Feb 13 00:35:41 CST 2020 in 2 milliseconds
The filesystem under path '/' is HEALTHY

2. JAVA接口及其常用API

使用HDFS自然不会全部使用原生命令,最好是能够自行编程操作,这里写的就是使用Java来简单的操作HDFS,非常的简单,只需要短短数行代码。
开发机的JDK版本应该尽量和服务器搭建Hadoop的JDK版本一致,减少问题。
本文使用的JDK1.7,jdk-7u80-windows-x64.exe
如果是使用Eclipse进行开发使用的JDK可以看:Java for Windows。请自行使用Maven或者导入jar包。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

定义一个类得到远程文件系统的具体实现类fs:

FileSystem fs = FileSystem.get(new URI("hdfs://vmcentos:9000"), new Configuration());

使用抽象类InputStream定义输出到本地的文件流:

InputStream in = fs.open(new Path("/laragon.7z"));

合起来就是这样,只有几行:

FileSystem fs = FileSystem.get(new URI("hdfs://vmcentos:9000"), new Configuration());
InputStream in = fs.open(new Path("/logo.svg"));
FileOutputStream out = new FileOutputStream(new File("c:/tmp.svg"));
IOUtils.copyBytes(in, out, 1024, true);

其他常用的简单操作:

另一种简单的下载文件。

    fs.copyFromLocalFile(new Path("/USB_HUB_1.pdf"), new Path("c://USB_HUB_1.pdf"));

删除文件

boolean mark = fs.delete(new Path("/USB_HUB_1.pdf"), true);

建立目录

boolean mark = fs.mkdirs(new Path("/USB_HUB"));

3. Hadoop RPC

那么HDFS之间是怎么通信的呢?通过RPC通信,是一种不同进程间的方法调用,不同于WebService,底层还是TCP实现的,进行了很多的封装,使我们少做很多事。HDFS原理学习笔记中有所提及。如果你想亲自看看如何实现的,可以看org.apache.hadoop.ipc中的代码。还可以看:Hadoop中RPC小结

官方文档中有通信协议这一小节

所有的HDFS通讯协议都是建立在TCP/IP协议之上。客户端通过一个可配置的TCP端口连接到Namenode,通过ClientProtocol协议与Namenode交互。而Datanode使用DatanodeProtocol协议与Namenode交互。一个远程过程调用(RPC)模型被抽象出来封装ClientProtocol和Datanodeprotocol协议。在设计上,Namenode不会主动发起RPC,而是响应来自客户端或 Datanode 的RPC请求。

4. HDFS上传和下载的原理

这里的原理指的是HDFS FileSystem的原理。前文提到了每一个进程本身就是一个类,于是乎FileSystem是一个抽象类。HDFS上传和下载的原理遵循RPC,见上一小节。时间有限,此处的代码就没有完全整理,没有逐步核验是否正确
我们通过上面的代码可以直接进入FileSystem类。

20200213032737765_15898.webp

20200213032859080_5408.webp

FileSystem由下图的内容组成:

20200213033138950_7163.webp

其中DistributedFileSystem 就是 FileSystem的一个子类。

会到前面,进入new URI,进入public static FileSystem get(URI uri, Configuration conf)

20200213034453749_9046.webp

最后会return CACHE.get(uri, conf);,从CACHE中取NameNode。

20200213035004731_14961.webp

Key key = new Key(uri, conf);
Key里面的内容就是NameNode的所在地址和当前的Configuration构成的。

20200213035110017_10951.webp

private FileSystem getInternal(URI uri, Configuration conf, Key key)
    throws IOException{
  FileSystem fs;
  synchronized (this) {
    fs = map.get(key);
  }
  if (fs != null) {
    return fs;
  }

20200213035324139_16704.webp

这里使用了单例的设计模式(分为懒汉和饿汉),这里是懒汉,可能是为了安全,因为创建多个线程同时创建时,会产生多个对象。

再往下继续运行会到fs = createFileSystem(uri, conf);

  private static FileSystem createFileSystem(URI uri, Configuration conf)
      throws IOException {
    Tracer tracer = FsTracer.get(conf);
    try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
      scope.addKVAnnotation("scheme", uri.getScheme());
      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
      fs.initialize(uri, conf);
      return fs;
    }
  }

20200213035702032_28185.webp

public static Class<? extends FileSystem> getFileSystemClass(String scheme,
  Configuration conf)

再往后执行会到loadFileSystems();

 public static Class<? extends FileSystem> getFileSystemClass(String scheme,
  Configuration conf)

20200213040122586_25270.webp

configuration中取得配置文件。

本文到这里就结束了。接下来就到了MapReduce了。这代人的热情-MapReduce的工作原理学习(一)

文章目录