Hadoop源码篇 — DataNode的初始化与注册流程

前言

因为大家读源码的方式都各有千秋,这里的阅读过程并不代表最佳实践,只是一个自身阅读过程的再现而已。所以如果有一些遗漏的重点,也可以在留言处替我指出。先前也有很多小伙伴提出了一些我的错误或者改进的地方,这里表示衷心的感谢。

那按照先前的套路,我们提出两个任务,整篇就为了完成验证下面的任务而进行:

1、DataNode初始化:我们平时搭建集群时,通过jps命令时可以看到DataNode的服务的,所以DataNode就应该是RPC的服务端

2、DataNode的注册:HDFS是一个主从架构,NameNode是主节点而DataNode是从节点,所以DataNode启动时是需要和NameNode进行注册的

3、心跳机制:从节点需要发送心跳让主节点得知从节点的存活

4、NameNode是如何管理元数据的,HA高可用方案的实现原理

(补充)Hadoop HA高可用方案原理

先把一个简单的给搞定吧,因为直接上源码真的挺犯困的

在Hadoop1.x的时候,我们只有一个NameNode和DataNode,NameNode负责管理元数据,DataNode用于存储数据,为了保证数据安全,每个副本会存在三个备份,每个block占据64M的大小

这样NameNode就会存在单点故障的问题,所以这时候hadoop的团队就开始解决这个问题了,因为NameNode是管理着集群的元数据的,这是一个有状态的服务,这就说明了它是不能随便停止服务的,那既然问题其实是算只有一个NameNode不好干活的锅,那我们增加NameNode数量不就好了?可是平白无故地增加一个NameNode,它俩如何能够保证元数据的一致性呢?

解决这个单点的问题,首先就是这两个NameNode如何保证元数据的一致,此时解决方案有三个

① 使用linux的共享存储目录

这个做法其实就是让我们的集群元数据不存放于Namenode中

而是放在一个共享的存储单元中,这是apache官方当时推荐的做法,可是大家都没吃它的这一套。现在肯定是没人再这么玩的了

② 使用zookeeper集群

这也很简单,就是我们的一个NameNode去往zookeeper集群中写元数据,然后另外一个就去zookeeper里面去读,有部分公司确实也是在采用这个方案

③ cloudera QJM

第三种方案其实是由cloudera这家公司所提出的,它通过一个Journalnode集群来完成,这个东西和zookeeper其实差不太多,实现逻辑也基本雷同

而且它本身也是一个集群,健康依据为过半节点存活即可,比如我图中的3台挂掉一台,那就是2/3>0.5,那此时判断这个集群是健康状态。

有可能你会问,为什么你就这样草率地决定了第一个NameNode是写元数据的第二个是同步元数据的呢?因为它们两个还有各自的状态问题

此时active状态下的NameNode负责往集群写,而standby的从集群读

当然现在我们的问题解决了吗,其实并没有,如果NameNode(active)在某天凌晨突然宕机,那我岂不是凌晨就要打开电脑,连上公司的服务,使用命令强行把standby状态下的NameNode设置为active集群才恢复正常工作了?

那我们现在如何保证NameNode状态的自动切换呢?

④ 实现NameNode状态的切换

此时我们又引入了老朋友zookeeper,真是哪都有它,在zookeeper中创建一个锁的目录,然后NameNode启动的时候都会过去抢占锁,两个NameNode谁先抢到,谁就是active状态。

而且每一个NameNode上还有一个ZKFC的服务,持续监听NameNode的健康状态,如果active NameNode出现问题,ZKFC将会报告给zookeeper,然后zookeeper会将锁分配给standby的NameNode上。使其自动切换为active状态

一、DataNode的初始化

我们直接找到DataNode的main方法处,它和NameNode差不多,进行一个参数的判断之后,不满足就退出。除了这么一句,就还剩这么一个secureMain(args,null)了,那核心代码就是这一个

点进去发现了一个creataDataNode,所以我就最喜欢这种命名这么直接的,那就点进去creataDataNode吧

把注释先复制一下,粘贴到百度

那我们一看,实例化的英文不就是instantiate嘛,所以不用想太多,点进去就是了,下面的那个if也很简单,因为当这个实例化成功,DataNode不为null,那就把这个DataNode启动起来呗,就是这么简单而已,这个启动其实就是就是一堆线程

看源码的时候要带着目的,不然就会被其他的一些代码带跑,有try看try,还有就是一大串单词的时候看最后一个单词判断它的作用,比如前面的if,Configuration是配置呀,args是参数集啊,这些我们都不关心。我们现在就想知道你怎么实例化,所以就盯着这个instance这玩意点就是了

同理,前面那些permission权限,checker检查器我们也不关心,看到最后是返回一个DataNode即可。那我们就点这个

映入眼帘的是一堆参数的配置,先不管,拉到我们想看的位置,大概到465行左右会有一个try,我们瞧瞧

StartDataNode到底经历了几个过程

看到启动DataNode的代码了,所以这个就是我们想看的

① (补充)DataXceiver

补充一下:大致拉到1182行,这个initDataXceiver(conf)是初始化了我们的DataXceiver,这个是干啥用的呢,点进去

我们知道,NameNode只是负责管理集群中的元数据信息的,而真正存储的数据是存储在DataNode上的,实际上DataNode就是通过这个DataXceiver的服务来接收上传上来的数据的

在974行左右有一段设置为后台线程的代码,这个意思其实就是这个线程和主线程共存亡,如果主线程结束了,这个线程也会随之停止。

② (补充)startInfoServer

回到StartDataNode的那个位置

相信你一定还有印象,就是在我们的 Hadoop源码篇 — NameNode的启动流程 1.4.1解析NameNode启动流程中,也存在类似的一个startHttpServer,当时这个startHttpServer就是绑定了很多servlet,来增强自身的功能。而我们的DataNode它也是一样,会绑定servlet增强自身

眼尖的小伙伴肯定就看到了一些熟悉的名词,checksum不就是我们对数据进行完整性校验所使用的校验和嘛,这说明获取校验和这个功能也是绑定上去的一个servlet

③ 初始化RPC的服务

再次回到StartDataNode的那个位置,看到initIpcServer

initIpcServer是启动RPC的服务使用的。

这个代码我们又很熟悉了,这和 Hadoop源码篇 — NameNode的启动流程 1.6.2 验证存在设置参数的过程又是十分类似。而且同样的创建服务端完成之后,它也绑定了很多的协议,比如DataNode和DataNode之间进行通信的interDatanodeProtocolXlator,而且同样这些协议也有一个唯一的versionID

④ 创建BlockPoolManager

blockManager的作用在注释上已经给出来了,我们点进去refreshNamenodes瞧瞧

看见do,继续点进去,这个东西有100多行代码,需要分点说明,但是仅说明最重要的两个点

判断存在着多少份不同的元数据

把注释给翻译一下,大致意思就是,它会判断对于每一个新的nameservices到底是对已有的nameservices的更新还是一个全新的nameservices

我们平时搭建的HDFS集群是HA高可用架构的,即NameNode分为active和standby两个,这两个管理的元数据也是一样的,所以它们管理的是同一个nameservices(在这里这么理解,nameservices就是存放元数据的一个目录即可)。而对于联邦来说,每一个联邦都会管理着一份元数据,两个联邦那自然就会存在两份不一样的元数据。

遍历联邦

针对每一个联邦创建一个BPOfferService,针对联邦里的每个NameNode(其实也就2个)创建一个BPServiceActor

到这里初始化的步骤就已经走完了,是不是感觉一脸蒙圈,没事,还有注册流程

二、DataNode的注册流程

刚刚的位置结尾有一个startAll(),这就是注册的主要逻辑,点进去

可以看到这里它遍历了联邦,然后我们点进去start()方法

连起来就是,先遍历联邦,再遍历联邦里面的NameNode,然后将DataNode分别注册进去遍历出来的NameNode,继续点start

调用线程的start方法实际上就是调用run方法,这个大家应该还是知道的。

这里的connectToNNAndHandshake()就是注册的核心代码了,直译过来呢就是和NameNode进行握手。这里使用了一个死循环来保证这个注册一定能够被执行,如果出现异常,设置回睡眠5秒之后再次尝试。如果执行成功,就break。。反正我就死赖在这里了,就一定要你执行成功。

2.1 connectToNNAndHandshake()

既然刚刚都不惜使用死循环来让connectToNNAndHandshake()一定要执行成功,那就点进来瞧瞧吧

此时我们的主线目标就是调用NameNode的方法将DataNode给注册进去,其实就是往NameNode存储这个DataNode的信息。这里我们获取了NameNode的代理,为什么这块需要用到代理呢

代理对象角色内部含有对真实对象的引用,从而可以操作真实对象,同时代理对象提供与真实对象相同的接口以便在任何时刻都能代替真实对象。同时,代理对象可以在执行真实对象操作时,附加其他的操作,相当于对真实对象进行封装。

所以此时我们获取到了一个bpNameNode的代理对象,然后通过register()往上面进行注册

2.2 DataNode的注册信息createRegistration

一直往下点可以看到DatanodeRegistration类的属性字段,有兴趣的可以去了解一下

而且我们也可以发现,在这个过程中,我们的主机名,端口号···等信息都已经一并发送过去了

获取完DataNode信息后的registerDatanode

回到register()

因为向NameNode进行注册的代码也是十分重要,所以使用了和刚刚一样的死循环套路,保证中间的注册过程一定被执行,成功就break,异常就延迟一秒后重试。

此时我们看到通过bpNameNode这个对象调用了一个registerDatanode方法,这里明显是调用了NamenodeRpcServer里的同名方法的。不信我们点进去,打开NamenodeRpcServer然后ctrl+f这个registerDatanode即可

这里继续点进去

有try看try,这里明显是DataNodeManager来处理关于DataNode的问题的,继续点进去register

里面代码比较长,我们一直拉到大约995行处,可以看到一个addDataNode

addDatanode(nodeDescr)

参数nodeDescr就是刚刚createRegistration中提到的封装好的一个DataNode的注册信息

注册信息的方式其实就是往这些数据结构中去填,比如这个什么datanodeMap,点进去一看

这明显就是第一个参数为这个DataNode的唯一标识,第二个参数DataNode的description描述,就肯定是这DataNode的注册信息,把这些信息分别存储在不同的数据结构中

addDatanode(nodeDescr)

这里的参数nodeDescr和上面是一样的,都是DataNode的注册信息

好处就是以后遍历心跳的信息的话就直接遍历DatanodeDescriptor这个数据结构而不用再先遍历DataNode然后再一个个取它们的心跳信息出来

总得来说,注册其实就是往各个数据结构存放信息,写入完成后,注册就完成了

三、开始发送心跳

跳转回 2.1 connectToNNAndHandshake() 的那里往下看,也就是BPServiceActor类的大概890行。如果connectToNNAndHandshake成功,就break掉,开始发送心跳的步骤

这个循环就真的是一个死循环了,连break都没有,出现异常也只会重试,所以我们点进去看看

DataNode和NameNode的心跳间隔

在if (startTime – lastHeartbeat >= dnConf.heartBeatInterval)是判断如果当前时间-最后一次心跳的时间大于dnConf.heartBeatInterval,就执行,所以我们看看这个heartBeatInterval到底是多长,点进去

看一下这个 DFS_HEARTBEAT_INTERVAL_DEFAULT 默认值为3

所以以后咱们就知道了,DataNode和NameNode保持心跳其实是3秒一次的

之后就是 HeartbeatResponse resp = sendHeartBeat() 这句,它这个 HeartbeatResponse 就是NameNode给DataNode下发的指令,点进去sendHeartBeat()

发现其实心跳时会连带这把DataNode的一些信息也附带过去,比如说第一个reports是报告的意思,点进去看看

你看,其实就是内存容量,内存使用情况···等等这些信息

发送心跳 sendHeartbeat

通过NameNode的代理对象bpNamenode调用发送心跳,那其实就是直接调用NameNode的sendHeartbeat方法,我们直接来到NamenodeRpcServer找到同名方法

那些参数也一并带进来了,我们继续点

这里的cmds就是NameNode希望DataNode进行操作的指令,它会作为一个参数封装成一个HeartbeatResponse对象返回给DataNode

我们尝试着看看处理心跳的方法

处理心跳 handleHeartbeat

看到这里其实都有种一直在看重复代码的感觉,不过读源码就是那么纠结的过程

第一个getDatanode是根据每一个DataNode独一无二的id号来取出对应的DataNode

updateHeartbeat这个方法一直往下点可以点到这个更新心跳状态的这个方法

这里因为我们的DataNode一直是在工作的,它必须保持自己的状态更新,然后非常重要的就是它必须修改上一次的心跳时间,因为我们刚刚也看过了,我们是通过当前时间-上一次心跳时间=3s的话,就会再次发送心跳,所以这一步非常重要。而且我们也需要用到心跳这项指标来判断这个DataNode节点是否存活,因为如果超过了一定的时间(这个值是自己配置的)未建立心跳,NameNode是会判定这个节点挂掉然后它再让其他的block复制多一份数据的。

那到这里其实DataNode就差不多了。

finally

https://juejin.im/post/5dee1f7be51d45581f5ee39d

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论