【Java开源代码栏目提醒】:网学会员Java开源代码为您提供Hadoop中HDFS的实现代码分析 - 毕业设计参考,解决您在Hadoop中HDFS的实现代码分析 - 毕业设计学习中工作中的难题,参考学习。
Hadoop中HDFS的实现
代码分析 姓名余红波 学号200921060432 专业信息安全 一、简介 Hadoop 由 Apache Software Foundation 公司于 2005 年秋天作为 Lucene 的子项目 Nutch 的一部分正式引入。
它受到最先由 Google Lab 开发的 MapReduce和GoogleFileSystem的启发。
2006 年 3 月份MapReduce 和 Nutch Distributed File System NDFS 分别被纳入称为 Hadoop 的项目中。
Hadoop 是最受欢迎的在 Internet 上对
搜索关键字进行内容分类的工具但它也可以解决许多要求极大伸缩性的问题。
Hadoop为应用程序透明的提供了一组稳定/可靠的接口和数据运动。
在Hadoop中实现了Google的MapReduce算法它能够把应用程序分割成许多很小的工作单元每个单元可以在任何集群节点上执行或重复执行。
此外Hadoop还提供一个分布式文件系统用来在各个计算节点上存储数据并提供了对数据读写的高吞吐率。
由于应用了map/reduce和分布式文件系统使得Hadoop框架具有高容错性它会自动处理失败节点。
已经在具有600个节点的集群测试过Hadoop框架。
Hadoop 是一个能够对大量数据进行分布式处理的软件框架。
但是 Hadoop 是以一种可靠、高效、可伸缩的方式进行处理的。
Hadoop 是可靠的因为它假
设计算元素和存储会失败因此它维护多个工作数据副本确保能够针对失败的节点重新分布处理。
Hadoop 是高效的因为它以并行的方式
工作通过并行处理加快处理速度。
Hadoop 还是可伸缩的能够处理 PB 级数据。
此外Hadoop 依赖于社区服务器因此它的成本比较低任何人都可以使用。
Hadoop带有用
Java 语言编写的框架因此运行在 Linux 生产平台上是非常理想的。
Hadoop 上的应用
程序也可以使用其他语言编写比如 C。
二、Hadoop-架构 Hadoop 有许多元素构成。
最底部是 Hadoop Distributed File SystemHDFS它存储 Hadoop 集群中所有存储节点上的文件。
HDFS对于本文的上一层是 MapReduce 引擎该引擎由 JobTrackers 和 TaskTrackers 组成。
2.1、HDFS 对外部客户机而言HDFS 就像一个传统的分级文件系统。
可以创建、删除、移动或重命名文件等等。
但是 HDFS 的架构是基于一组特定的节点构建的 参见图 1这是由它自身的特点决定的。
这些节点包括 NameNode仅一个它在 HDFS 内部提供元数据服务DataNode它为 HDFS 提供存储块。
由于仅存在一个 NameNode因此这是 HDFS 的一个缺点单点失败。
图 1. Hadoop 集群的简化视图 存储在 HDFS 中的文件被分成块然后将这些块复制到多个计算机中DataNode。
这与传统的 RAID 架构大不相同。
块的大小通常 为 64MB和复制的块数量在创建文件时由客户机决定。
NameNode 可以控制所有文件操作。
HDFS 内部的所有通信都基于标准的 TCP/IP 协议。
2.2、NameNode NameNode 是一个通常在 HDFS 实例中的单独机器上运行的
软件。
它负责管理文件系统名称空间和控制外部客户机的访问。
NameNode 决定是否将文件映射到 DataNode 上的复制块上。
对于最常见的 3 个复制块第一个复制块存储在同一机架的不同节点上最后一个复制块存储在不同机架的某个节点上。
实际的 I/O 事务并没有经过 NameNode只有表示 DataNode 和块的文件映射的元数据经过 NameNode。
当外部客户机发送请求要求创建文件时NameNode 会以块标识和该块的第一个副本的 DataNode IP 地址作为响应。
这个 NameNode 还会通知其他将要接收该块的副本的 DataNode。
NameNode 在一个称为 FsImage 的文件中存储所有关于文件系统名称空间的信息。
这个文件和一个包含所有事务的记录文件这里是 EditLog将存储在 NameNode 的本地文件系统上。
FsImage 和 EditLog 文件也需要复制副本以防文件损坏或 NameNode 系统丢失。
2.3、DataNode NameNode 也是一个通常在 HDFS 实例中的单独机器上运行的软件。
Hadoop 集群包含一个 NameNode 和大量 DataNode。
DataNode 通常以机架的形式组织机架通过一个交换机将所有系统连接起来。
Hadoop 的一个假设是机架内部节点之间的传输速度快于机架间节点的传输速度。
DataNode 响应来自 HDFS 客户机的读写请求。
它们还响应创建、删除和复制来自 NameNode 的块的命令。
NameNode 依赖来自每个 DataNode 的定期心跳heartbeat消息。
每条消息都包含一个块报告NameNode 可以根据这个
报告验证块映射和其他文件系统元数据。
如果 DataNode 不能发送心跳消息NameNode 将采取修复措施重新复制在该节点上丢失的块。
2.4、文件操作 可见HDFS 并不是一个万能的文件系统。
它的主要目的是支持以流的形式访问写入的大型文件。
如果客户机想将文件写到 HDFS 上首先需要将该文件 缓存到本地的临时存储。
如果缓存的数据大于所需的 HDFS 块大小创建文件的请求将发送给 NameNode。
NameNode 将以 DataNode 标识和目标块响应客户机。
同时也通知将要保存文件块副本的 DataNode。
当客户机开始将临时文件发送给第一个 DataNode 时将立即通过管道方式将块内容转发给副本 DataNode。
客户机也负责创建保存在相同 HDFS 名称空间中的校验和checksum文件。
在最后的文件块发送之后NameNode 将文件创建提交到它的持久化元数据存储 在 EditLog 和 FsImage 文件。
2.5、Linux 集群 Hadoop 框架可在单一的 Linux 平台上使用开发和调试时但是使用存放在机架上的商业服务器才能发挥它的力量。
这些机架组成一个 Hadoop 集群。
它通过集群拓扑知识决定如何在整个集群中分配
作业和文件。
Hadoop 假定节点可能失败因此采用本机方法处理单个
计算机甚至所有机架的失败。
三、Hadoop源
代码分析 Google的核心竞争技术是它的计算平台。
它的云计算基础设施包括五个方面GoogleCluster、Chubby、GFS、BigTable、MapReduce。
很快Apache上就出现了一个类似的解决方案目前它们都属于Apache的Hadoop项目对应的分别是:Chubby--gtZooKeeper、GFS--gtHDFS、BigTable--gtHBase、MapReduce--gtHadoop 。
目前基于类似思想的开放源
代码的项目还很多如Facebook用于用户分析的Hive等。
HDFS作为一个分布式文件系统是所有这些项目的基础。
分析好HDFS有利于了解其他系统。
Hadoop各个包之间的依赖关系比较复杂原因是HDFS提供了一个分布式文件系统该系统提供API可以屏蔽本地文件系统和分布式文件系统的区别甚至像Amazon S3这样的在线存储
系统。
这就造成了分布式文件系统的实现或者是分布式文件系统的底层的实现依赖于某些貌似高层的功能。
功能的相互引用造成了蜘蛛网型的依赖关系。
一个典型的例子就是包confconf用于读取系统配置它依赖于fs主要是读取配置文件的时候需要使用文件系统而部分的文件系统的功能在fs中被抽象了。
3.1、org.apache.hadoop.io 由于Hadoop 的MapReduce和HDFS都有通信的需求需要对通信的对象进行序列化。
Hadoop并没有采用
Java的序列化而是引入了它自己的系统。
org.apache.hadoop.io中定义了大量的可序列化对象他们都实现了Writable接口。
实现了Writable接口的一个典型例子如下
Java代码 1. public class MyWritable implements Writable 2. // Some data 3. private int counter 4. private long timestamp 5. 6. public void writeDataOutput out throws IOException 7. out.writeIntcounter 8. out.writeLongtimestamp 9. 10. 11. public void readFieldsDataInput in throws IOException 12. counter in.readInt 13. timestamp in.readLong 14. 15. 16. public static MyWritable readDataInput in throws IOException 17. MyWritable w new MyWritable 18. w.readFieldsin 19. return w 20. 21. 其中的write和readFields分别实现了把对象序列化和反序列化的功能是Writable接口定义的两个方法。
图4给出了庞大的org.apache.hadoop.io中对象的关系。
图4. org.apache.hadoop.io中对象的关系 这里我把ObjectWritable 标为红色是因为相对于其他对象它有不同的地位。
当我们讨论Hadoop的RPC时我们会提到RPC上交换的信息必须是
Java的基本类型String和Writable接口的实现类以及元素为以上类型的数组。
ObjectWritable对象保存了一个可以在RPC上传输的对象和对象的类型信息。
这样我们就有了一个万能的可以用于客户端/服务器间传输的Writable对象。
例如我们要把上面例子中的对象作为RPC请求需要 根据MyWritable创建一个ObjectWritableObjectWritable往流里会写如下信息 对象类名长度对象类名对象自己的串行化结果 这样到了对端ObjectWritable可以根据对象类名创建对应的对象并解串行。
应该注意到ObjectWritable依赖于WritableFactories那存储了Writable子类对应的工厂。
我们需要把MyWritable的工厂保存在WritableFactories中通过WritableFactories.setFactory。
3.2、org.apache.hadoop.rpc 介绍完org.apache.hadoop.io以后我们开始来分析org.apache.hadoop.rpc。
RPC采用客户机/服务器模式请求程序就是一个客户机而服务提供程序就是一个服务器。
当我们讨论HDFS的通信可能发生在 Client-NameNode之间其中NameNode是服务器 Client-DataNode之间其中DataNode是服务器 DataNode-NameNode之间其中NameNode是服务器 DataNode-DateNode之间其中某一个DateNode是 服务器另一个是客户端。
如果我们考虑Hadoop的Map/Reduce以后这些系统间的通信就更复杂了。
为了解决这些客户机/服务器之间的通信Hadoop 引入了一个RPC框架。
该RPC框架利用
Java的反射能力避免了某些RPC解决
方案中需要根据某种接口语言如CORBA的IDL生成存根和框架的
问题。
但是该RPC框架要求调用的参数和返回结果必须是
Java的基本类型String和Writable接口的实现类以及元素为以上类型的数组。
同时接口方法应该只抛出IOException异常。
既然是RPC当然就有客户端和服务器所以org.apache.hadoop.rpc也就有了类Client和类Server。
但是类Server是一个抽象类 RPC封装了Server利用反射把某个对象的方法开放出来变成RPC中的服务器。
图5是org.apache.hadoop.rpc 的类图。
图5. org.apache.hadoop.rpc的类图 既然是RPC自然就有客户端和服务器当然org.apache.hadoop.rpc也就有了类Client和类Server。
在这里我们来仔细考察 org.apache.hadoop.rpc.Client。
下面的图包含了org.apache.hadoop.rpc.Client中的关键类和关键方法。
图6.org.apache.hadoop.rpc.Client中的关键类和关键方法 由于Client 可能和多个Server
通信典型的一次HDFS读需要和NameNode打交道也需要和某个或某些DataNode通信。
这就意味着某一个 Client需要维护多个连接。
同时为了减少不必要的连接现在Client的做法是拿ConnectionId图6中最右侧来做为 Connection的ID。
ConnectionId包括一个InetSocketAddressIP地址端口号或主机名端口号对象和一个用户信息对象。
这就是说同一个用户到同一个InetSocketAddress的通信将共享同一个连接。
连接被封装在类Client.Connection 中所有的RPC调用都是通过Connection进行通信。
一个RPC调用有输入参数输出参数和可能的异常同时为了区分在同一个 Connection上的不同调用每个调用都有唯一的id。
调用是否结束也需要一个标记所有的这些都体现在对象Client.Call中。
Connection对象通过一个Hash表维护在这个连接上的所有Call
Java代码 1. private HashtableltInteger Callgt calls new HashtableltInteger Callgt 一个RPC 调用通过addCall把请求加到Connection里。
为了能够在这个框架上传输
Java的基本类型String和Writable接口的实现类以及元素为以上类型的数组我们一般把Call需要的参数打包成为ObjectWritable对象。
Client.Connection会通过socket连接服务器连接成功后回校验客户端/服务器的版本号 Client.ConnectionwriteHeader方法校验成功后就可以通过Writable对象来进行请求的发送/应答了。
注意每个Client.Connection会起一个线程不断去读取socket并将收到的结果解包找出对应的Call设置Call并通知结果已经获取。
Call使用Obejct的wait和notify把RPC上的异步消息交互转成同步调用。
还有一点需要注意一个Client会有多个Client.Connection这是一个很自然的结果。
下面介绍Server图7是org.apache.hadoop.rpc.Server中的关键类和关键方法。
图7.org.apache.hadoop.rpc.Server中的关键类和关键方法 需要注意的是这里的Server类是个抽象类。
Java代码 1. public abstract Writable callWritable param long receiveTime throws IOException 这表明Server提供了一个架子Server的具体功能需要具体类来完成。
而具体类当然就是实现 call方法。
我们先来分析Server.Call。
和Client.Call类似Server.Call包含了一次请求其中id和param的含义和Client.Call是一致的。
不同之处在后面三个属性connection是该Call来自的连接当请求处理结束时相应的结果会通过相同的connection发送给客户端。
属性timestamp是请求到达的时间戳如果请求很长时间没被处理对应的连接会被关闭客户端也就知道出错了。
最后的response是请求处理的结果可能是一个Writable的串行化结果也可能一个异常的串行化结果。
Server.Connection维护了一个来自客户端的socket连接。
它处理版本校验读取请求并把请求发送到请求处理线程接收处理结果并把结果发送给客户端。
Hadoop的Server采用了
Java的NIO这样的就不需要为每一个socket连接建立一个线程读取socket上的数据。
在Server中只需要一个线程就可以accept新的连接请求和读取socket上的数据这个线程就是图7中的Listener。
请求处理线程一般有多个它们都是Server.Handle类的实例。
它们的run方法循环地取出一个Server.Call调用Server.call方法搜集结果并串行化然后将结果放入Responder队列中。
对于处理完的请求需要将结果写回去同样利用NIO只需要一个线程相关的逻辑在Responder里。
有了Client 和Server很自然就能过渡到RPC。
下面是RPC.
java。
一般来说分布式对象一般都会要求根据接口生成存根和框架。
如CORBA可以通过IDL生成存根和框架。
但是在 org.apache.hadoop.rpc我们就不需要这样的步骤了。
图8. org.apache.hadoop.rpc类图 为了分析Invoker我们需要介绍一些
Java反射实现Dynamic Proxy的背景。
Dynamic Proxy是由两个class实现的
java.lang.reflect.Proxy 和
java.lang.reflect.InvocationHandler后者是一个接口。
所谓Dynamic Proxy是这样一种class它是在运行时生成的类在生成它时你必须提供一组接口给它然后该类就宣称它实现了这些接口。
这个Dynamic Proxy其实就是一个典型的代理模式它不会替你做实质性的工作生成它的实例时你必须提供一个handler由它接管实际的工作。
这个 handler在Hadoop的RPC中就是Invoker对象。
我们可以简单地理解就是你可以通过一个接口来生成一个类这个类上的所有方法调用都会传递到你生成类时传递的InvocationHandler实现中。
在Hadoop的RPC中Invoker实现了InvocationHandler的invoke方法 invoke方法也是InvocationHandler的唯一方法。
Invoker会把所有跟这次调用相关的调用方法名参数类型列表参数列表打包然后利用前面我们分析过的Client通过socket传递到服务器端。
就是说你在proxy类上的任何调用都通过Client发送到远方的服务器上。
Invoker使用Invocation。
Invocation封装了一个远程调用的所有相关信息它的主要的属性有: methodName调用方法名parameterClasses调用方法参数的类型
列表和parameters调用方法参数。
注意它实现了 Writable接口可以串行化。
RPC.Server实现了org.apache.hadoop.ipc.Server你可以把一个对象通过RPC升级成为一个服务器。
服务器接收到的请求通过Invocation解串行化以后就变成了方法名方法参数列表和参数列表。
利用
Java反射我们就可以调用对应的对象的方法。
调用的结果再通过socket返回给客户端客户端把结果解包后就可以返回给Dynamic Proxy的使用者了。
一个典型的HDFS系统包括一个NameNode和多个DataNode。
NameNode维护名字空间而DataNode存储数据块。
DataNode负责存储数据一个数据块在多个DataNode中有备份而一个DataNode对于一个块最多只包含一个备份。
所以我们可以简单地认为DataNode上存了数据块ID和数据块内容以及他们的映射关系。
一个HDFS 集群可能包含上千DataNode节点这些DataNode定时和NameNode通信接受NameNode的指令。
为了减轻NameNode的负担NameNode上并不永久保存那个DataNode上有那些数据块的信息而是通过DataNode启动时的上报来更新NameNode上的映射表。
DataNode和NameNode建立连接以后就会不断地和 NameNode保持心跳。
心跳的返回其还也包含了NameNode对DataNode的一些命令如删除数据库或者是把数据块复制到另一个 DataNode。
应该注意的是NameNode不会发起到DataNode的请求在这个通信过程中它们是严格的客户端/服务器架构。
DataNode当然也以作为服务器接受来自客户端的访问处理数据块读/写 请求。
DataNode之间还会相互通信执行数据块复制任务同时在客户端做写操作的时候DataNode需要相互配合保证写操作的一致性。
3.3、DataNode的本地数据块管理 下面我们就来具体分析一下DataNode的实现。
DataNode的实现包括两部分一部分是对本地数据块的管理另一部分就是和其他的实体的交互。
我们先来看本地数据块管理部分。
安装Hadoop的时候我们会指定对应的数据块存放目录当我们检查数据块存放目录时我们回发现下面有个叫dfs的目录所有的数据就存放在dfs/data里面。
其中有两个文件storage里存的东西是一些出错信息。
in_use.lock是一个空文件它的作用是如果需要对整个系统做排斥操作应用应该获取它上面的一个锁。
接下来是3个目录current存放的是当前有效的数据块detach存的是快照snapshot目前没有实现tmp保存的是一些操作需要的临时数据块。
但我们进入current目录以后就会发现有一系列的数据块文件和数据块元数据文件。
同时还有一些子目录它们的名字是subdir0到subdir63子目录下也有数据块文件和数据块元数据。
这是因为HDFS限定了每个目录存放数据块文件的数量多了以后会创建子目录来保存。
数据块文件显然保存了HDFS中的数据数据块最大可以到64M。
每个数据块文件都会有对应的数据块元数据文件。
里面存放的是数据块的校验信息。
下面是数据块文件名和它的元数据文件名的例子 blk_3148782637964391313 blk_3148782637964391313_242812.meta 上面的例子中3148782637964391313 是数据块的ID号242812是数据块的版本号用于一致性检查。
在current目录下还有下面几个文件 VERSION保存了一些文件系统的元信息。
dncp_block_verification.log.curr 和 dncp_block_verification.log.prev它记录了一些DataNode对文件系定时统做一致性检查需要的信息。
在继续分析DataNode之前我们有必要看一下系统的工作状态。
启动HDFS的时候我们可以选择以下启动参数 FORMATquot-formatquot格式化系统 REGULARquot-regularquot正常启动 UPGRADEquot-upgradequot升级 ROLLBACKquot-rollbackquot回滚 FINALIZEquot-finalizequot提交 IMPORTquot-importCheckpointquot从Checkpoint恢复。
作为一个大型的分布式系统Hadoop内部实现了一套升级机制http://wiki.apache.org/hadoop/Hadoop_Upgrade。
upgrade参数就是为了这个目的而存在的当然升级可能成功也可能失败。
如果失败了那就用 rollback进行回滚如果过了一段时间系统运行正常那就可以通过finalize正式提交这次升级。
importCheckpoint 选项用于NameNode发生故障后从某个检查点恢复。
有了上面的描述我们得到下面的状态图 上面的升级/回滚/提交都不可能一次完成就是说系统故障时它可能处于上面右边状态中的某一个。
特别是分布式的各个节点上甚至可能出现某些节点已经升级成功但有些节点可能处.