Hadoop权威指南---中文版


2023年12月19日发(作者:米兰理工大学)

目录

目录 I

初识Hadoop 1

1.1 数据!数据 1

1.2 数据的存储和分析 3

1.3 相较于其他系统 4

1.4 Hadoop发展简史 9

1.5 Apache Hadoop项目 12

MapReduce简介 15

2.1 一个气象数据集 15

2.2 使用Unix Tools来分析数据 17

2.3 使用Hadoop进行数据分析 19

2.4 分布化 30

2.5 Hadoop流 35

2.6 Hadoop管道 40

Hadoop分布式文件系统 44

3.1 HDFS的设计 44

3.2 HDFS的概念 45

3.3 命令行接口 48

3.4 Hadoop文件系统 50

3.5 Java接口 54

3.6 数据流 68

3.7 通过distcp进行并行复制 75

3.8 Hadoop归档文件 77

Hadoop的I/O 80

4.1 数据完整性 80

4.2 压缩 83

4.3 序列化 92

4.4 基于文件的数据结构 111

MapReduce应用开发 125

5.1 API的配置 126

5.2 配置开发环境 128

5.3 编写单元测试 134

5.4 本地运行测试数据 138

5.5 在集上运行 144

5.6 作业调优 159

5.7 MapReduce的工作流 162

MapReduce的工作原理 166

6.1 运行MapReduce作业 166

6.2 失败 172

6.3 作业的调度 174

6.4 shuffle和排序 175

6.6 任务的执行 181

MapReduce的类型与格式 188

7.1 MapReduce类型 188

7.3 输出格式 217

MapReduce 特性 227

8.1 计数器 227

8.2 排序 235

8.3 联接 252

8.4 次要数据的分布 258

8.5 MapReduce的类库 263

Hadoop集的安装 264

9.1 集说明 264

9.2 集的建立和安装 268

9.3 SSH配置 270

9.4 Hadoop配置 271

9.5 安装之后 286

9.6 Hadoop集基准测试 286

9.7 云计算中的Hadoop 290

Hadoop的管理 293

10.1 HDFS 293

10.2 监控 306

10.3 维护 313

Pig简介 321

11.1 安装和运行Pig 322

11.2 实例 325

11.3 与数据库比较 329

11.4 Pig Latin 330

11.5 用户定义函数 343

11.6 数据处理操作符 353

11.7 Pig实践提示与技巧 363

Hbase简介 366

12.1 HBase基础 366

12.2 概念 367

12.3 安装 371

12.4 客户端 374

12.5 示例 377

12.6 HBase与RDBMS的比较 385

12.7 实践 390

ZooKeeper简介 394

13.1 ZooKeeper的安装和运行 395

13.2 范例 396

13.3 ZooKeeper服务 405

13.4 使用ZooKeeper建立应用程序 417

13.5 工业界中的ZooKeeper 428

案例研究 431

14.1 Hadoop在的应用 431

14.2 Hadoop和Hive在Facebook的应用 441

14.3 Hadoop在Nutch搜索引擎 451

14.4 Hadoop用于Rackspace的日志处理 466

14.5 Cascading项目 474

14.6 Apache Hadoop的1 TB排序 488

Apache Hadoop的安装 491

Cloudera的Hadoop分发包 497

预备NCDC气象资料 502

第1章 初识Hadoop

古时候,人们用牛来拉重物,当一头牛拉不动一根圆木的时候,他们不曾想过培育个头更大的牛。同样,我们也不需要尝试更大的计算机,而是应该开发更多的计算系统。

--格蕾斯·霍珀

1.1 数据!数据

我们生活在数据时代!很难估计全球存储的电子数据总量是多少,但是据IDC估计2006年"数字全球"项目(digital universe)的数据总量为0.18 ZB,并且预测到2011年这个数字将达到1.8 ZB,为2006年的10倍 。1 ZB相当于10的21次方字节的数据,或者相当于1000 EB,1 000 000 PB,或者大家更熟悉的10亿TB的数据!这相当于世界上每个人一个磁盘驱动器的数量级。

这一数据洪流有许多来源。考虑下文:

纽约证券交易所每天产生1 TB的交易数据。

著名社交网站Facebook的主机存储着约100亿张照片,占据PB级存储空间。

,一个家谱网站,存储着2.5 PB数据。

互联网档案馆(The Internet Archive)存储着约2 PB数据,并以每月至少20 TB的速度增长。

瑞士日内瓦附近的大型强子对撞机每年产生约15 PB的数据。

此外还有大量数据。但是你可能会想它对自己有何影响。大部分数据被锁定在最大的网页内容里面(如搜索引擎)或者是金融和科学机构,对不对?是不是所谓的"大数据"的出现会影响到较小的组织或个人?

我认为是这样的。以照片为例,我妻子的祖父是一个狂热的摄影爱好者,并且他成人之后,几乎一直都在拍照片。他的所有照片(中等格式、幻灯片和35 mm胶片),在扫描成高解析度照片时,占了大约10 GB的空间。相比之下,我家去年一年用数码相机拍摄的照片就占用了5 GB的空间。我家产生照片数据的速度是我妻子祖父的35倍!并且,随着拍摄更多的照片变得越来越容易,这个速度还在增加中。

更常见的情况是,个人数据的产生量正在快速地增长。微软研究院的MyLifeBits项目(/en-us/projects/mylifebits/)显示,在不久的将来,个人信息档案将可能成为普遍现象。MyLifeBits是这样的一个实验:一个人与外界的联系(电话、邮件和文件)被抓取和存储供以后访问。收集的数据包括每分钟拍摄的照片等,导致整个数

据量达到每月1 GB的大小。当存储成本下降到使其可以存储连续的音频和视频时,服务于未来MyLifeBits项目的数据量将是现在的许多倍。

个人数据的增长的确是大势所趋,但更重要的是,计算机所产生的数据可能比人所产生的数据更大。机器日志、RFID读取器、传感器网络、车载GPS和零售交易数据等,这些都会促使"数据之山越来越高"。

公开发布的数据量也在逐年增加。作为组织或企业,再也不能只管理自己的数据,未来的成功在很大程度上取决于它是否能从其他组织的数据中提取出价值。

这方面的先锋(如亚马逊网络服务器、或者)的公共数据集,它们的存在就在于促进"信息共享",任何人都可以共享并自由(或以AWS平台的形式,或以适度的价格)下载和分析这些数据。不同来源的信息混合处理后会带来意外的效果和至今难以想像的应用。

以项目为例,这是一个研究Flickr网站上天体爱好者中新照片的项目。它分析每一张上传的照片,并确定它是天空的哪一部分,或者是否是有趣的天体,如恒星或者星系。虽然这只是一个带实验性质的新服务,但是它显示了数据(这里特指摄影照片)的可用性并且被用来进行某些活动(图像分析),而这些活动很多时候并不是数据创建者预先能够想像到的。

有句话是这么说的:"算法再好,通常也难敌更多的数据。"意思是说对于某些问题(譬如基于既往偏好生成的电影和音乐推荐),不论你的算法有多么猛,它们总是会在更多的数据面前无能为力(更不用说没有优化过的算法了)。

现在,我们有一个好消息和一个坏消息。好消息是有海量数据!坏消息是我们正在为存储和分析这些数据而奋斗不息。

1.2 数据的存储和分析

问题很简单:多年来硬盘存储容量快速增加的同时,访问速度-- 数据从硬盘读取的速度-- 却未能与时俱进。1990年,一个普通的硬盘驱动器可存储1370 MB的数据并拥有4.4

MB/s的传输速度 ,所以,只需五分钟的时间就可以读取整个磁盘的数据。20年过去了,1

TB级别的磁盘驱动器是很正常的,但是数据传输的速度却在100 MB/s左右。所以它需要花两个半小时以上的时间读取整个驱动器的数据。

从一个驱动器上读取所有的数据需要很长的时间,写甚至更慢。一个很简单的减少读取时间的办法是同时从多个磁盘上读取数据。试想一下,我们拥有100个磁盘,每个存储百分之一的数据。如果它们并行运行,那么不到两分钟我们就可以读完所有的数据。

只使用一个磁盘的百分之一似乎很浪费。但是我们可以存储100个数据集,每个1 TB,并让它们共享磁盘的访问。我们可以想像,此类系统的用户会很高兴看到共享访问可以缩短分析时间,并且,从统计角度来看,他们的分析工作会分散到不同的时间点,所以互相之间不会有太多干扰。

尽管如此,现在更可行的是从多个磁盘并行读写数据。

第一个需要解决的问题是硬件故障。一旦开始使用多个硬件设施,其中一个会出故障的概率是非常高的。避免数据丢失的常见做法是复制:通过系统保存数据的冗余副本,在故障发生时,可以使用数据的另一份副本。这就是冗余磁盘阵列的工作方式。Hadoop的文件系统HDFS(Hadoop Distributed Filesystem)也是一个例子,虽然它采取的是另一种稍有不同的方法,详见后文描述。

第二个问题是大部分分析任务需要通过某种方式把数据合并起来,即从一个磁盘读取的数据可能需要和另外99个磁盘中读取的数据合并起来才能使用。各种不同的分布式系统能够组合多个来源的数据,但是如何保证正确性是一个非常难的挑战。MapReduce提供了一个编程模型,其抽象出上述磁盘读写的问题,将其转换为计算一个由成对键/值组成的数据集。这种模型的具体细节将在后面的章节讨论。但是目前讨论的重点是,这个计算由两部分组成:Map和Reduce。这两者的接口就是"整合"之地。就像HDFS一样,MapReduce是内建可靠性这个功能的。

简而言之,Hadoop提供了一个稳定的共享存储和分析系统。存储由HDFS实现,分析由MapReduce实现。纵然Hadoop还有其他功能,但这些功能是它的核心所在。

1.3 相较于其他系统

MapReduce似乎采用的是一种蛮力方法。即,针对每个查询,每一个数据集-- 至少是很大一部分-- 都会被处理。但这正是它的能力。MapReduce可以处理一批查询,并且它针对整个数据集处理即席查询并在合理时间内获得结果的能力也是具有突破性的。它改变了我们对数据的看法,并且解放了以前存储在磁带和磁盘上的数据。它赋予我们对数据进行创新的机会。那些以前需要很长时间才能获得答案的问题现在已经迎刃而解,但反过来,这又带来了新的问题和见解。

例如,Rackspace的邮件部门Mailtrust,用Hadoop处理邮件的日志。他们写的一个查询是到其用户的地理分布。他们是这样说的:

"随着我们的壮大,这些数据非常有用,我们每月运行一次MapReduce任务来帮助我们决定哪些Rackspace数据中心需要添加新的邮件服务器。"

通过将数百GB的数据整合,借助于分析工具,Rackspace的工程师得以了解这些数据,否则他们永远都不会了解,并且他们可以运用这些信息去改善他们为用户提供的服务。第14章将详细介绍Rackspace公司是如何运用Hadoop的。

1.3.1 关系型数据库管理系统

为什么我们不能使用数据库加上更多磁盘来做大规模的批量分析?为什么我们需要MapReduce?

这个问题的答案来自于磁盘驱动器的另一个发展趋势:寻址时间的提高速度远远慢于传输速率的提高速度。寻址就是将磁头移动到特定位置进行读写操作的工序。它的特点是磁盘操作有延迟,而传输速率对应于磁盘的带宽。

如果数据的访问模式受限于磁盘的寻址,势必会导致它花更长时间(相较于流)来读或写大部分数据。另一方面,在更新一小部分数据库记录的时候,传统的B树(关系型数据库中使用的一种数据结构,受限于执行查的速度)效果很好。但在更新大部分数据库数据的时候,B树的效率就没有MapReduce的效率高,因为它需要使用排序/合并来重建数据库。

在许多情况下,MapReduce能够被视为一种RDBMS(关系型数据库管理系统)的补充。(两个系统之间的差异见表1-1)。MapReduce很适合处理那些需要分析整个数据集的问题,以批处理的方式,尤其是Ad Hoc(自主或即时)分析。RDBMS适用于点查询和更新(其中,数据集已经被索引以提供低延迟的检索和短时间的少量数据更新。MapReduce适合数据被一次写入和多次读取的应用,而关系型数据库更适合持续更新的数据集。

表1-1:关系型数据库和MapReduce的比较

数据大小 GB

访问

更新

结构

集成度

伸缩性

交互型和批处理

多次读写

静态模式

非线性

传统关系型数据库

PB

批处理

一次写入多次读取

动态模式

线性

MapReduce

MapReduce和关系型数据库之间的另一个区别是它们操作的数据集中的结构化数据的数量。结构化数据是拥有准确定义的实体化数据,具有诸如XML文档或数据库表定义的格式,符合特定的预定义模式。这就是RDBMS包括的内容。另一方面,半结构化数据比较宽松,虽然可能有模式,但经常被忽略,所以它只能用作数据结构指南。例如,一张电子表格,其中的结构便是单元格组成的网格,尽管其本身可能保存任何形式的数据。非结构化数据没有什么特别的内部结构,例如纯文本或图像数据。MapReduce对于非结构化或半结构化数据非

常有效,因为它被设计为在处理时间内解释数据。换句话说:MapReduce输入的键和值并不是数据固有的属性,它们是由分析数据的人来选择的。

关系型数据往往是规范的,以保持其完整性和删除冗余。规范化为MapReduce带来问题,因为它使读取记录成为一个非本地操作,并且MapReduce的核心假设之一就是,它可以进行(高速)流的读写。

Web服务器日志是记录集的一个很好的非规范化例子(例如,客户端主机名每次都以全名来指定,即使同一客户端可能会出现很多次),这也是MapReduce非常适合用于分析各种日志文件的原因之一。

MapReduce是一种线性的可伸缩的编程模型。程序员编写两个函数-- map函数和Reduce函数-- 每一个都定义一个键/值对集映射到另一个。这些函数无视数据的大小或者它们正在使用的集的特性,这样它们就可以原封不动地应用到小规模数据集或者大的数据集上。更重要的是,如果放入两倍的数据量,运行的时间会少于两倍。但是如果是两倍大小的集,一个任务任然只是和原来的一样快。这不是一般的SQL查询的效果。

随着时间的推移,关系型数据库和MapReduce之间的差异很可能变得模糊。关系型数据库都开始吸收MapReduce的一些思路(如ASTER DATA的和GreenPlum的数据库),另一方面,基于MapReduce的高级查询语言(如Pig和Hive)使MapReduce的系统更接近传统的数据库编程人员。

1.3.2 网格计算

高性能计算(High Performance Computing,HPC)和网格计算社区多年来一直在做大规模的数据处理,它们使用的是消息传递接口(Message Passing Interface,MPI)这样的API。从广义上讲,高性能计算的方法是将作业分配给一个机器集,这些机器访问共享文件系统,由一个存储区域网络(Storage Area Network,SAN)进行管理。这非常适用于以主计算密集型为主的作业,但当节点需要访问的大数据量(数百GB的数据,这是MapReduce实际开始"发光"的起点)时,这会成为一个问题,因为网络带宽成为"瓶颈",所以计算节点闲置下来了。

MapReduce尝试在计算节点本地存储数据,因此数据访问速度会因为它是本地数据而比较快。 这项"数据本地化"功能,成为MapReduce的核心功能并且也是它拥有良好性能的原因之一。意识到网络带宽在数据中心环境是最有价值的资源(到处复制数据会很容易的把网络带宽饱和)之后,MapReduce便通过显式网络拓扑结构不遗余力地加以保护。请注意,这种安排不会排除MapReduce中的高CPU使用分析。

MPI赋予程序员很大的控制,但也要求显式控制数据流机制,需要使用传统的C语言的功能模块完成(例如socket),以及更高级的算法来进行分析。而MapReduce却是在更高层面上完成任务,即程序员从键/值对函数的角度来考虑,同时数据流是隐含的。

在一个大规模分布式计算平台上协调进程是一个很大的挑战。最困难的部分是恰当的处理失效与错误-- 在不知道一个远程进程是否已经失败的时候-- 仍然需要继续整个计算。MapReduce将程序员从必须考虑失败任务的情况中解放出来,它检测失败的map或者reduce任务,在健康的机器上重新安排任务。MapReduce能够做到这一点,因为它是一个无共享的架构,这意味着各个任务之间彼此并不依赖。(这里讲得稍微简单了一些,因为mapper的输出是反馈给reducer的,但这由MapReduce系统控制。在这种情况下,相对于返回失败的map,应该对返回reducer给予更多关注,因为它必须确保它可以检索到必要的map输出,如果不行,必须重新运行相关的map从而生成必要的这些输出。)因此,从程序员的角度来看,执行任务的顺序是无关紧要的。相比之下,MPI程序必须显式地管理自己的检查点和恢复机制,从而把更多控制权交给程序员,但这样会加大编程的难度。

MapReduce听起来似乎是一个相当严格的编程模型,而且在某种意义上看的确如此:我们被限定于键/值对的类型(它们按照指定的方式关联在一起),mapper和reducer彼此间的协作有限,一个接一个地运行(mapper传输键/值对给reducer)。对此,一个很自然的问题是:你是否能用它做点儿有用或普通的事情?

答案是肯定的。MapReduce作为一个建立搜索索引产品系统,是由Google的工程师们开发出来的,因为他们发现自己一遍又一遍地解决相同的问题(MapReduce的灵感来自传统的函数式编程、分布式计算和数据库社区),但它后来被应用于其他行业的其他许多应用。我们惊喜地看到许多算法的变体在MapReduce中得以表示,从图像图形分析,到基于图表的问题,再到机器学习算法 。它当然不能解决所有问题,但它是一个很普遍的数据处理工具。

第14章将介绍一些Hadoop应用范例。

1.3.3 志愿计算

人们第一次听说Hadoop和MapReduce的时候,经常会问:"和SETI@home有什么区别?"SETI,全称为Search for Extra-Terrestrial Intelligence(搜寻外星人),运行着一个称为SETI@home的项目()。在此项目中,志愿者把自己计算机CPU的空闲时间贡献出来分析无线天文望远镜的数据借此寻外星智慧生命信号。SETI@home是最有名的拥有许多志愿者的项目,其他的还有Great Internet Mersenne Prime Search(搜索大素数)与Folding@home项目(了解蛋白质构成及其与疾病之间的关系)。

志愿计算项目通过将他们试图解决的问题分为几个他们成为工作单元的块来工作,并将它们送到世界各地的电脑上进行分析。例如,SETI@home的工作单元大约是0.35 MB的无线电望远镜数据,并且一个典型的计算机需要数小时或数天来分析。完成分析后,结果发送回服务器,客户端获得的另一项工作单元。作为防止欺骗的预防措施,每个工作单元必须送到三台机器上并且需要有至少两个结果相同才会被接受。

虽然SETI@home在表面上可能类似于MapReduce(将问题分为独立的块,然后进行并行计算),但差异还是显著的。SETI@home问题是CPU高度密集型的,使其适合运行于世界各地成千上万台计算机上,因为相对于其计算时间而言,传输工作单元的时间微不足道 。志愿者捐献的是CPU周期,而不是带宽。

MapReduce被设计为用来运行那些需要数分钟或数小时的作业,这些作业在一个聚集带宽很高的数据中心中可信任的专用硬件设备上运行。相比之下,SETI@home项目是在接入互联网的不可信的计算机上运行,这些计算机的网速不同,而且数据也不在本地。

1.4 Hadoop发展简史

Hadoop是Doug Cutting-- Apache Lucene创始人-- 开发的使用广泛的文本搜索库。Hadoop起源于Apache Nutch,后者是一个开源的网络搜索引擎,本身也是由Lucene项目的一部分。

Hadoop名字的起源

Hadoop这个名字不是一个缩写,它是一个虚构的名字。该项目的创建者,Doug Cutting如此解释Hadoop的得名:"这个名字是我孩子给一头吃饱了的棕黄大象命名的。我的命名标准就是简短,容易发音和拼写,没有太多的意义,并且不会被用于别处。小孩子是这方面的高手。Googol就是由小孩命名的。"

Hadoop及其子项目和后继模块所使用的名字往往也与其功能不相关,经常用一头大象或其他动物主题(例如:"Pig")。较小的各个组成部分给与更多描述性(因此也更俗)的名称。这是一个很好的原则,因为它意味着可以大致从其名字猜测其功能,例如,jobtracker 的任务就是跟踪MapReduce作业。

从头开始构建一个网络搜索引擎是一个雄心勃勃的目标,不只是要编写一个复杂的、能够抓取和索引网站的软件,还需要面临着没有专有运行团队支持运行它的挑战,因为它有那么多独立部件。同样昂贵的还有:据Mike Cafarella和Doug Cutting估计,一个支持此10亿页的索引需要价值约50万美元的硬件投入,每月运行费用还需要3万美元。 不过,他们相信这是一个有价值的目标,因为这会开放并最终使搜索引擎算法普及化。

Nutch项目开始于2002年,一个可工作的抓取工具和搜索系统很快浮出水面。但他们意识到,他们的架构将无法扩展到拥有数十亿网页的网络。在2003年发表的一篇描述Google分布式文件系统(简称GFS)的论文为他们提供了及时的帮助,文中称Google正在使用此文件系统。 GFS或类似的东西,可以解决他们在网络抓取和索引过程中产生的大量的文件的存储需求。具体而言,GFS会省掉管理所花的时间,如管理存储节点。在2004年,他们开始写一个开放源码的应用,即Nutch的分布式文件系统(NDFS)。

2004年,Google发表了论文,向全世界介绍了MapReduce。 2005年初,Nutch的开发者在Nutch上有了一个可工作的MapReduce应用,到当年年中,所有主要的Nutch算法被移植到使用MapReduce和NDFS来运行。

Nutch中的NDFS和MapReduce实现的应用远不只是搜索领域,在2006年2月,他们从Nutch转移出来成为一个独立的Lucene子项目,称为Hadoop。大约在同一时间,Doug

Cutting加入雅虎,Yahoo提供一个专门的团队和资源将Hadoop发展成一个可在网络上运行的系统(见后文的补充材料)。在2008年2月,雅虎宣布其搜索引擎产品部署在一个拥有1万个内核的Hadoop集上。

2008年1月,Hadoop已成为Apache顶级项目,证明它是成功的,是一个多样化、活跃的社区。通过这次机会,Hadoop成功地被雅虎之外的很多公司应用,如、Facebook和《纽约时报》。(一些应用在第14章的案例研究和Hadoop维基有介绍,Hadoop维基的网址为/hadoop/PoweredBy。)

有一个良好的宣传范例,《纽约时报》使用亚马逊的EC2云计算将4 TB的报纸扫描文档压缩,转换为用于Web的PDF文件。 这个过程历时不到24小时,使用100台机器运行,如果不结合亚马逊的按小时付费的模式(即允许《纽约时报》在很短的一段时间内访问大量机器)和Hadoop易于使用的并行程序设计模型,该项目很可能不会这么快开始启动。

2008年4月,Hadoop打破世界纪录,成为最快排序1TB数据的系统。运行在一个910节点的集,Hadoop在209秒内排序了1 TB的数据(还不到三分半钟),击败了前一年的297秒冠军。同年11月,谷歌在报告中声称,它的MapReduce实现执行1TB数据的排序只用了68秒。 在2009年5月,有报道宣称Yahoo的团队使用Hadoop对1 TB的数据进行排序只花了62秒时间。

Hadoop@Yahoo!

构建互联网规模的搜索引擎需要大量的数据,因此需要大量的机器来进行处理。Yahoo!Search包括四个主要组成部分:Crawler,从因特网下载网页;WebMap,构建一个网络地图;Indexer,为最佳页面构建一个反向索引;Runtime(运行时),回答用户的查询。WebMap是一幅图,大约包括一万亿条边(每条代表一个网络链接)和一千亿个节点(每个节点代表不同的网址)。创建和分析此类大图需要大量计算机运行若干天。在2005年初,WebMap所用的基础设施名为Dreadnaught,需要重新设计以适应更多节点的需求。Dreadnaught成功地从20个节点扩展到600个,但需要一个完全重新的设计,以进一步扩大。Dreadnaught与MapReduce有许多相似的地方,但灵活性更强,结构更少。具体说来,每一个分段(fragment),Dreadnaught作业可以将输出发送到此作业下一阶段中的每一个分段,但排序是在库函数中完成的。在实际情形中,大多数WebMap阶段都是成对存在的,对应于MapReduce。因此,WebMap应用并不需要为了适应MapReduce而进行大量重构。

Eric Baldeschwieler(Eric14)组建了一个小团队,我们开始设计并原型化一个新的框架(原型为GFS和MapReduce,用C++语言编写),打算用它来替换Dreadnaught。尽管当务之急是我们需要一个WebMap新框架,但显然,标准化对于整个Yahoo! Search平台至关重要,并且通过使这个框架泛化,足以支持其他用户,我们才能够充分运用对整个平台的投资。

与此同时,我们在关注Hadoop(当时还是Nutch的一部分)及其进展情况。2006年1月,雅虎聘请了Doug Cutting,一个月后,我们决定放弃我们的原型,转而使用Hadoop。相较于我们的原型和设计,Hadoop的优势在于它已经在20个节点上实际应用过。这样一来,我们便能在两个月内搭建一个研究集,并着手帮助真正的客户使用这个新的框架,速度比原来预计的快许多。另一个明显的优点是Hadoop已经开源,较容易(虽然远没有那么容易!)从雅虎法务部门获得许可在开源方面进行工作。因此,我们在2006年初设立了一个200个节点的研究集,我们将WebMap的计划暂时搁置,转而为研究用户支持和发展Hadoop。

Hadoop大事记

2004年-- 最初的版本(现在称为HDFS和MapReduce)由Doug Cutting和Mike Cafarella开始实施。

2005年12月-- Nutch移植到新的框架,Hadoop在20个节点上稳定运行。

2006年1月-- Doug Cutting加入雅虎。

2006年2月-- Apache Hadoop项目正式启动以支持MapReduce和HDFS的独立发展。

2006年2月-- 雅虎的网格计算团队采用Hadoop。

2006年4月-- 标准排序(10 GB每个节点)在188个节点上运行47.9个小时。

2006年5月-- 雅虎建立了一个300个节点的Hadoop研究集。

2006年5月-- 标准排序在500个节点上运行42个小时(硬件配置比4月的更好)。

06年11月-- 研究集增加到600个节点。

06年12月-- 标准排序在20个节点上运行1.8个小时,100个节点3.3小时,500个节点5.2小时,900个节点7.8个小时。

07年1月-- 研究集到达900个节点。

07年4月-- 研究集达到两个1000个节点的集。

08年4月-- 赢得世界最快1 TB数据排序在900个节点上用时209秒。

08年10月-- 研究集每天装载10 TB的数据。

09年3月-- 17个集总共24 000台机器。

09年4月-- 赢得每分钟排序,59秒内排序500 GB(在1400个节点上)和173分钟内排序100 TB数据(在3400个节点上)。

1.5 Apache Hadoop项目

今天,Hadoop是一个分布式计算基础架构这把"大伞"下的相关子项目的集合。这些项目属于Apache软件基金会(),后者为开源软件项目社区提供支持。虽然Hadoop最出名的是MapReduce及其分布式文件系统(HDFS,从NDFS改名而来),但还有其他子项目提供配套服务,其他子项目提供补充性服务。这些子项目的简要描述如下,其技术栈如图1-1所示。

图1-1:Hadoop的子项目

Core

一系列分布式文件系统和通用I/O的组件和接口(序列化、Java RPC和持久化数据结构)。

Avro

一种提供高效、跨语言RPC的数据序列系统,持久化数据存储。(在本书写作期间,Avro只是被当作一个新的子项目创建,而且尚未有其他Hadoop子项目在使用它。)

MapReduce

分布式数据处理模式和执行环境,运行于大型商用机集。

HDFS

分布式文件系统,运行于大型商用机集。

Pig

一种数据流语言和运行环境,用以检索非常大的数据集。Pig运行在MapReduce和HDFS的集上。

Hbase

一个分布式的、列存储数据库。HBase使用HDFS作为底层存储,同时支持MapReduce的批量式计算和点查询(随机读取)。

ZooKeeper

一个分布式的、高可用性的协调服务。ZooKeeper提供分布式锁之类的基本服务用于构建分布式应用。

Hive

分布式数据仓库。Hive管理HDFS中存储的数据,并提供基于SQL的查询语言(由运行时引擎翻译成MapReduce作业)用以查询数据。

Chukwa

分布式数据收集和分析系统。Chukwa运行HDFS中存储数据的收集器,它使用MapReduce来生成报告。(在写作本书期间,Chukwa刚刚从Core中的"contrib"模块分离出来独立成为一个独立的子项目。)

第2章 MapReduce简介

MapReduce是一种用于数据处理的编程模型。该模型非常简单。同一个程序Hadoop可以运行用各种语言编写的MapReduce程序。在本章中,我们将看到用Java,Ruby,Python和C++这些不同语言编写的不同版本。最重要的是,MapReduce程序本质上是并行的,因此可以将大规模的数据分析交给任何一个拥有足够多机器的运营商。MapReduce的优势在于处理大型数据集,所以下面首先来看一个例子。

2.1 一个气象数据集

在我们这个例子里,要编写一个挖掘气象数据的程序。分布在全球各地的气象传感器每隔一小时便收集当地的气象数据,从而积累了大量的日志数据。它们是适合用MapReduce进行分析的最佳候选,因为它们是半结构化且面向记录的数据。

数据的格式

我们将使用National Climatic Data Center(国家气候数据中心,NCDC,网址为/)提供的数据。数据是以面向行的ASCII格式存储的,每一行便是一个记录。该格式支持许多气象元素,其中许多数据是可选的或长度可变的。为简单起见,我们将重点讨论基本元素(如气温),这些数据是始终都有且有固定宽度的。

例2-1显示了一个简单的示例行,其中一些重要字段加粗显示。该行已被分成多行以显示出每个字段,在实际文件中,字段被整合成一行且没有任何分隔符。

例2-1:国家气候数据中心数据记录的格式

1. 0057

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

332130 # USAF weather station identifier

99999 # WBAN weather station identifier

19500101 # observation date

0300 # observation time

4

+51317 # latitude (degrees × 1000)

+028783 # longitude (degrees × 1000)

FM-12

+0171 # elevation (meters)

99999

V020

320 # wind direction (degrees)

1 # quality code

N

0072

1

00450 # sky ceiling height (meters)

1 # quality code

C

N

010000 # visibility distance (meters)

1 # quality code

N

9

-0128 # air temperature (degrees Celsius × 10)

1 # quality code

-0139 # dew point temperature (degrees Celsius × 10)

1 # quality code

10268 # atmospheric pressure (hectopascals × 10)

1 # quality code

数据文件按照日期和气象站进行组织。从1901年到2001 年,每一年都有一个目录,每一个目录都包含一个打包文件,文件中的每一个气象站都带有当年的数据。例如,1990年的前面的数据项如下:

1.

2.

3.

4.

% ls raw/1990 | head

5.

6.

7.

8.

9.

10.

11.

因为实际生活中有成千上万个气象台,所以整个数据集由大量较小的文件组成。通常情况下,我们更容易、更有效地处理数量少的大型文件,因此,数据会被预先处理而使每年记录的读数连接到一个单独的文件中。(具体做法请参见附录C)

2.2 使用Unix Tools来分析数据

在全球气温数据中每年记录的最高气温是多少?我们先不用Hadoop来回答这一问题,因为答案中需要提供一个性能标准(baseline)和一种检查结果的有效工具。

对于面向行的数据,传统的处理工具是awk。例2-2是一个小的程序脚本,用于计算每年的最高气温。

例2-2:一个用于从NCDC气象记录中出每年最高气温的程序

1.

2.

3.

4.

5.

6.

7.

8.

#!/usr/bin/env bash

for year in all/*

do

echo -ne 'basename $year .gz'"t"

gunzip -c $year |

awk '{ temp = substr($0, 88, 5) + 0;

q = substr($0, 93, 1);

if (temp !=9999 && q ~ /[01459]/

&& temp > max) max = temp }

9.

10.

END { print max }'

done

该脚本循环遍历压缩文件,首先显示年份,然后使用awk处理每个文件。awk脚本从数据中提取两个字段:气温和质量代码。气温值通过加上一个0变成一个整数。接下来,执行测试,从而判断气温值是否有效(值9999代表在NCDC数据集缺少值),质量代码显示的读数是有疑问还是根本就是错误的。如果读数是正确的,那么该值将与目前看到的最大值进行比较,如果该值比原先的最大值大,就替换掉目前的最大值。当文件中所有的行都已处理完并打印出最大值后,END块中的代码才会被执行。

下面是某次运行结果的开始部分:

1.

2.

3.

4.

5.

6.

7.

%./max_

1901 317

1902 244

1903 289

1904 256

1905 283

...

由于源文件中的气温值按比例增加到10倍,所以结果1901年的最高气温是31.7°C(在本世纪初几乎没有多少气温读数会被记录下来,所以这是可能的)。为完成对跨越一世纪这么长时间的查,程序在EC2 High-CPU Extra Large Instance机器上一共运行了42分钟。

为加快处理,我们需要并行运行部分程序。从理论上讲,这很简单:我们可以通过使用计算机上所有可用的硬件线程来处理在不同线程中的各个年份的数据。但是这之中存在一些问题。

首先,划分成大小相同的作业块通常并不容易或明显。在这种情况下,不同年份的文件,大小差异很大,所以一些线程会比其他线程更早完成。即使它们继续下一步的工作,但是整个运行中占主导地位的还是那些运行时间很长的文件。另一种方法是将输入数据分成固定大小的块,然后把每块分配到各个进程。

其次,独立线程运行结果在合并后,可能还需要进一步的处理。在这种情况下,每年的结果是独立于其他年份,并可能通过连接所有结果和按年份排序这两种方式来合并它们。如果使用固定大小的块这种方法,则此类合并会更紧凑。对于这个例子,某年的数据通常被分割成几个块,每个进行独立处理。我们将最终获得每个数据块的最高气温,所以最后一步是寻这些每年气温值中的最大值。

最后,我们仍然受限于一台计算机的处理能力。如果手中所有的处理器都使用上都至少需要20分钟,那就只能这样了。我们不能使它更快。另外,一些数据集的增长会超出一台计算机的处理能力。当我们开始使用多台计算机时,整个大环境中的许多其他因素将发挥作用,可能由于协调性和可靠性的问题而出现当机等错误。谁运行整个作业?我们如何处理失败的进程?

因此,尽管并行处理可行,但实际上它非常复杂。使用Hadoop之类的框架非常有助于处理这些问题。

2.3 使用Hadoop进行数据分析

为了更好地发挥Hadoop提供的并行处理机制的优势,我们必须把查询表示成MapReduce作业。经过一些本地的小规模测试,我们将能够在机器集上运 行它。

2.3.1 map和reduce

MapReduce的工作过程分为两个阶段:map阶段和reduce阶段。每个阶段都有键/值对作为输入和输出,并且它们的类型可由程序员选择。程序员还具体定义了两个函数:map函数和reduce函数。

我们在map阶段输入的是原始的NCDC数据。我们选择的是一种文本输入格式,以便数据集的每一行都会是一个文本值。键是在文件开头部分文本行起始处的偏移量,但我们没有这方面的需要,所以将其忽略。

map函数很简单。我们使用map函数来出年份和气温,因为我们只对它们有兴趣。在本例中,map函数只是一个数据准备阶段,通过这种方式来建立数据,使得reducer函数能在此基础上进行工作:出每年的最高气温。map函数也是很适合去除已损记录的地方:在这里,我们将筛选掉缺失的、不可靠的或错误的气温 数据。

为了全面了解map的工作方式,我们思考下面几行示例的输入数据(考虑到页面篇幅,一些未使用的列已被去除,用省略号表示):

1.

2.

3.

4.

5.

9999999N9+

9999999N9+

0500001N9+

0500001N9+

这些行以键/值对的方式来表示map函数:

1.

2.

(0, 9999999N9+)

(106, 9999999N9+)

3. (212, )

4. (318, 0500001N9+)

5. (424, 0500001N9+)

键是文件中的行偏移量,而这往往是我们在map函数中所忽视的。map函数的功能仅仅提取年份和气温(以粗体显示),并将其作为输出被发送。(气温值已被解释为整数)

1.

2.

3.

(1950, 0)

(1950, 22)

(1950, ?11)

4.

5.

(1949, 111)

(1949, 78)

map函数的输出先由MapReduce框架处理,然后再被发送到reduce函数。这一处理过程根据键来对键/值对进行排序和分组。因此,继续我们的示例,reduce 函数会看到如下输入:

1.

2.

(1949, [111, 78])

(1950, [0, 22, ?11])

每年的年份后都有一系列气温读数。所有reduce函数现在必须重复这个列表并从中出最大的读数:

1.

2.

(1949, 111)

(1950, 22)

这是最后的输出:全球气温记录中每年的最高气温。

整个数据流如图2-1所示。在图的底部是Unix的管道,模拟整个MapReduce的流程,其中的内容我们将在以后讨论Hadoop 数据流时再次提到。

(点击查看大图)图2-1:MapReduce的逻辑数据流

2.3.2 Java MapReduce(1)

在明白MapReduce程序的工作原理之后,下一步就是要用代码来实现它。我们需要三样东西:一个map函数、一个reduce函数和一些来运行作业的代码。map函数是由一个Mapper接口来实现的,其中声明了一个map()方法。例2-3显示了我们的map函数的实现。

例2-3:最高气温示例的Mapper接口

1.

2.

3.

4.

import ption;

import table;

import itable;

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

import ;

import uceBase;

import ;

import Collector;

import er;

public class MaxTemperatureMapper extends MapReduceBase

implements Mapper {

private static final int MISSING = 9999;

public void map(LongWritable key, Text value,

OutputCollector output, Reporter reporter)

throws IOException {

String line = ng();

String year = ing(15, 19);

int airTemperature;

if ((87) == '+') { // parseInt

doesn't like leading plus signs

24.

25.

26.

27.

28.

29.

30.

airTemperature = nt(ing(88, 92));

} else {

airTemperature = nt(ing(87, 92));

}

String quality = ing(92, 93);

if (airTemperature != MISSING && s("[01459]")) {

t(new Text(year), new IntWritable(airTemperature));

31.

32.

33.

}

}

}

该Mapper接口是一个泛型类型,它有4个形式参数类型,由它们来指定map函数的输入键、输入值、输出键和输出值的类型。就目前的示例来说,输入键是一个长整数偏移量,输入的值是一行文本,输出的键是年份,输出的值是气温(整数)。Hadoop规定了自己的一套可用于网络序列优化的基本类型,而不是使用内置的Java类型。这些都可以在包中到。现在我们使用的是LongWritable类型(相当于Java的Long类型)、Text类型(相当于Java的String类型)和IntWritable类型(相当于Java的Integer类型)。

map()方法需要传入一个键和一个值。我们将一个包含Java字符串输入行的Text值转换成Java的String类型,然后利用其substring()方法提取我们感兴趣的列。

map()方法还提供了一个OutputCollector实例来写入输出内容。在这种情况下,我们写入年份作为一个Text对象(因为我们只使用一个键),用IntWritable类型包装气温值。我们只有在气温显示出来后并且它的质量代码表示的是正确的气温读数时才写入输出记录。

reduce函数同样在使用Reducer时被定义,如例2-4所示。

例2-4:最高气温示例的Reducer

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

import ption;

import or;

import table;

import ;

import uceBase;

import Collector;

import r;

import er;

public class MaxTemperatureReducer extends MapReduceBase

implements Reducer {

public void reduce(Text key, Iterator values,

OutputCollector output, Reporter reporter)

16.

17.

18.

19.

20.

21.

22.

23.

24.

throws IOException {

int maxValue = _VALUE;

while (t()) {

maxValue = (maxValue, ().get());

}

t(key, new IntWritable(maxValue));

}

}

同样,四个形式参数类型用于指定reduce函数的输入和输出类型。reduce函数的输入类型必须与map函数的输出类型相匹配:Text类型和IntWritable类型。在这种情况下,reduce函数的输出类型是Text和IntWritable这两种类型,前者是年份的类型而后者是最高气温的

类型,在这些输入类型之中,我们遍历所有气温,并把每个记录进行比较直到到一个最高的为止。

第三部分代码运行的是MapReduce作业(请参见例2-5)。

2.3.2 Java MapReduce(2)

例2-5:在气象数据集中出最高气温的应用程序

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

import ption;

import ;

import table;

import ;

import putFormat;

import tputFormat;

import ent;

import f;

public class MaxTemperature {

public static void main(String[] args) throws IOException {

if ( != 2) {

n("Usage: MaxTemperature

path>");

(-1);

}

JobConf conf = new JobConf();

Name("Max temperature");

utPath(conf, new Path(args[0]));

putPath(conf, new Path(args[1]));

perClass();

ucerClass();

putKeyClass();

putValueClass();

32.

33.

34.

(conf);

}

}

JobConf对象指定了作业的各种参数。它授予你对整个作业如何运行的控制权。当我们在Hadoop集上运行这个作业时,我们把代码打包成一个JAR文件(Hadoop会在集分发这个包)。我们没有明确指定JAR文件的名称,而是在JobConf构造函数中传送一个类,Hadoop会到这个包含此类的JAR文件。

在创建JobConf对象后,我们将指定输入和输出的路径。通过调用FileInputFormat内的静态方法addInputPath()来定义输入的路径,它可以是单个文件、目录(本例中,输入的内容组成此目录下所有文件)或文件模式的路径。同时,addInputPath()可被调用多次从而实现使用多路径输入。

输出路径(其中只有一个)是在FileOutputFormat内的静态方法setOutputPath()来指定的。它指定了reduce函数输出文件写入的目录。在运行作业前该目录不应该存在,否则Hadoop会报错并且拒绝运行任务。这种预防措施是为了防止数据丢失(一个长时间的任务可能非常恼人地被另一个意外覆盖)。

接下来,通过setMapperClass()和setReducerClass()这两个方法来指定要使用的map和reduce类型。

setOutputKeyClass()和setOutputValueClass()方法控制map和reduce 函数的输出类型,正如本例所示,这两个方法往往是相同的。如果它们不同,那么map的输出类型可设置成使用setMapOutputKeyClass()和setMapOutputValue-Class()方法。

输入的类型通过输入格式来控制,我们没有设置,因为我们使用的是默认的TextInputFormat(文本输入格式)。

在设置了定义map和reduce函数的类之后,运行作业的准备工作就算完成了。JobClient内的静态方法runJob()会提交作业并等待它完成,把进展情况写入控制台。

运行测试

写完MapReduce作业之后,拿一个小型的数据集进行测试以排除与代码直接有关的问题,这是常规做法。首先,以独立模式安装Hadoop(详细说明请参见附录A)。在这种模式下,Hadoop运行中使用本地带job runner(作业运行程序)的文件系统。让我们用前面讨论过的五行代码的例子来测试它(考虑到页面,这里已经对输出稍做修改和重新排版):

1.

2.

% export HADOOP_CLASSPATH=build/classes

% hadoop MaxTemperature input/ncdc/ output

3. 09/04/07 12:34:35 INFO rics:

Initializing JVM Metrics with processName=Job

4.

5.

Tracker, sessionId=

09/04/07 12:34:35 WARN ent:

Use GenericOptionsParser for parsing the

6.

7.

arguments. Applications should implement Tool for the same.

09/04/07 12:34:35 WARN ent:

No job jar file set. User classes may not

8.

9.

be found. See JobConf(Class) or JobConf#setJar(String).

09/04/07 12:34:35 INFO put

Format: Total input paths to process : 1

10. 09/04/07 12:34:35 INFO ent:

Running job: job_local_0001

11. 09/04/07 12:34:35 INFO put

Format: Total input paths to process : 1

12. 09/04/07 12:34:35 INFO k:

numReduceTasks: 1

13. 09/04/07 12:34:35 INFO k:

= 100

14. 09/04/07 12:34:35 INFO k:

data buffer = 79691776/99614720

15. 09/04/07 12:34:35 INFO k:

record buffer = 262144/327680

16. 09/04/07 12:34:35 INFO k:

Starting flush of map output

17. 09/04/07 12:34:36 INFO k:

Finished spill 0

18. 09/04/07 12:34:36 INFO nner:

Task:attempt_local_0001_m_000000_0 is

19.

20.

done. And is in the process of commiting

09/04/07 12:34:36 INFO obRunner:

file:/Users/tom/workspace/htdg/input/n

21.

22.

cdc/:0+529

09/04/07 12:34:36 INFO nner: Task '

attempt_local_0001_m_000000_0' done.

23.

24.

09/04/07 12:34:36 INFO obRunner:

09/04/07 12:34:36 INFO : Merging

1 sorted segments

25. 09/04/07 12:34:36 INFO : Down to

the last merge-pass, with 1 segments

26.

27.

28.

left of total size: 57 bytes

09/04/07 12:34:36 INFO obRunner:

09/04/07 12:34:36 INFO nner:

Task:attempt_local_0001_r_000000_0 is done

29.

30.

31.

. And is in the process of commiting

09/04/07 12:34:36 INFO obRunner:

09/04/07 12:34:36 INFO nner:

Task attempt_local_0001_r_000000_0 is

32.

33.

allowed to commit now

09/04/07 12:34:36 INFO mapred.

FileOutputCommitter: Saved output of task

34. 'attempt_local_0001_r_000000_0' to file:/

Users/tom/workspace/htdg/output

35. 09/04/07 12:34:36 INFO mapred.

LocalJobRunner: reduce > reduce

36. 09/04/07 12:34:36 INFO nner:

Task 'attempt_local_0001_r_000000_0' done.

37. 09/04/07 12:34:36 INFO ent:

map 100% reduce 100%

38. 09/04/07 12:34:36 INFO ent:

Job complete: job_local_0001

39. 09/04/07 12:34:36 INFO ent:

Counters: 13

40. 09/04/07 12:34:36 INFO ent:

FileSystemCounters

41. 09/04/07 12:34:36 INFO ent:

FILE_BYTES_READ=27571

42. 09/04/07 12:34:36 INFO ent:

FILE_BYTES_WRITTEN=53907

43. 09/04/07 12:34:36 INFO ent:

Map-Reduce Framework

44. 09/04/07 12:34:36 INFO ent:

Reduce input groups=2

45. 09/04/07 12:34:36 INFO ent:

Combine output records=0

46. 09/04/07 12:34:36 INFO ent:

Map input records=5

47. 09/04/07 12:34:36 INFO ent:

Reduce shuffle bytes=0

48. 09/04/07 12:34:36 INFO ent:

Reduce output records=2

49. 09/04/07 12:34:36 INFO ent:

Spilled Records=10

50. 09/04/07 12:34:36 INFO ent:

Map output bytes=45

51. 09/04/07 12:34:36 INFO ent:

Map input bytes=529

52. 09/04/07 12:34:36 INFO ent:

Combine input records=0

53. 09/04/07 12:34:36 INFO ent:

Map output records=5

54. 09/04/07 12:34:36 INFO ent:

Reduce input records=5

2.3.2 Java MapReduce(3)

如果Hadoop命令是以类名作为第一个参数,它就会启动一个JVM来运行这个类。使用命令比直接使用Java更方便,因为前者把类的路径(及其依赖关系)加入Hadoop的库中,并获得Hadoop的配置。要添加应用程序类的路径,我们需要定义一个HADOOP_CLASSPATH环境变量,Hadoop脚本会来执行相关操作。

注意:以本地(独立)模式运行时,本书所有程序希望都以这种方式来设置HADOOP_CLA-SSPATH。命令必须在示例代码所在的文件夹下被运行。

运行作业所得到的输出提供了一些有用的信息。(无法到作业JAR文件的相关信息是意料之中的,因为我们是在本地模式下没有JAR的情况下运行的。在集上运行时,不会看到此警告。)例如,我们可以看到,这个作业被给予了一个ID job_local_0001,并且它运行了一个map任务和一个reduce任务(使用attempt_local_0001_m_000000_0和attempt_local_0001_r_000000_0两个ID)。在调试MapReduce作业时,知道作业和任务的ID是非常有用的。

输出的最后一部分叫"计数器"(Counter),显示了在Hadoop上运行的每个作业产生的统计信息。这些对检查处理的数据量是否符合预期非常有用。例如,我们可以遵循整个系统中记录的数目:5个map输入产生了5个map的输出,然后5个reduce输入产生两个reduce输出。

输出被写入output目录,其中每个reducer包括一个输出文件。作业包含一个reducer,所以我们只能到一个文件,名为part-00000:

1.

2.

3.

%cat output/part-00000

1949 111

1950 22

这个结果和我们之前手动寻的结果一样。我们可以把前面这个结果解释为在1949年的最高气温记录为11.1℃,而在1950年为2.2℃。

新的Java Mapreduce API

Hadoop最新版Java MapReduce Release 0.20.0的API包括一个全新的MapReduce Java

API,有时也称为"context object"(上下文对象),旨在使API在未来更容易扩展。新的API类型上不兼容以前的API,所以,以前的应用程序需要重写才能使新的API发挥其作用 。

新的API和旧的API之间有下面几个明显的区别。

新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。

新的API是在uce包(和子包)中的。之前版本的API则是放在中的。

新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角。

新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。

新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展(用于配置守护进程,请参见5.1节)。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。

作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。

例2-6使用新API重写了MaxTemperature的代码,不同之处用黑体字突出显示。

例2-6:使用新的context object(上下文对象)MapReduce API在气象数据集中查最高气温

1. public class NewMaxTemperature {

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

static class NewMaxTemperatureMapper

extends Mapper {

private static final int MISSING = 9999;

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = ng();

String year = ing(15, 19);

int airTemperature;

if ((87) == '+') { // parseInt doesn't like

leading plus signs

airTemperature = nt(ing(88, 92));

} else {

airTemperature = nt(ing(87, 92));

}

String quality = ing(92, 93);

if (airTemperature !=MISSING && s("[01459]")){

(new Text(year), new

IntWritable(airTemperature));

}

}

}

static class NewMaxTemperatureReducer

extends Reducer {

public void reduce(Text key, Iterable values,

Context context)

throws IOException, InterruptedException {

int maxValue = _VALUE;

for (IntWritable value : values) {

maxValue = (maxValue, ());

}

(key, new IntWritable(maxValue));

}

40.

41.

42.

43.

44.

45.

46.

47.

48.

49.

50.

51.

52.

53.

54.

55.

56.

57.

58.

59.

60.

61.

62.

63.

64.

}

public static void main(String[] args) throws Exception {

if ( != 2) {

n("Usage: NewMaxTemperature

");

(-1);

}

Job job = new Job();

ByClass();

utPath(job, new Path(args[0]));

putPath(job, new Path(args[1]));

perClass();

ucerClass();

putKeyClass();

putValueClass();

(rCompletion(true) ? 0 : 1);

}

}

65. 2.4 分布化

66. 前面展示了MapReduce针对小量输入的工作方式,现在是时候整体了解系统并进入大数据流作为输入了。为简单起见,我们的例子到目前为止都使用本地文件系统中的文件。然而,为了分布化,我们需要把数据存储在分布式文件系统中,典型的如HDFS(详情参见第3章),以允许Hadoop把MapReduce的计算移到承载部分数据的各台机器。下面我们就来看看这是如何工作的。

67. 2.4.1 数据流(1)

68. 首先是一些术语的说明。MapReduce作业(job)是客户端执行的单位:它包括输入数据、MapReduce程序和配置信息。Hadoop通过把作业分成若干个小任务(task)来工作,其包括两种类型的任务:map任务和reduce任务。

69. 有两种类型的节点控制着作业执行过程:jobtracker和多个tasktracker。jobtracker通过调度任务在tasktracker上运行,来协调所有运行在系统上的作业。Tasktracker运行任务的同时,把进度报告传送到jobtracker,jobtracker则记录着每项任务的整体进展情况。如果其中一个任务失败,jobtracker可以重新调度任务到另外一个tasktracker。Hadoop把输入数据划分成等长的小数据发送到MapReduce,称为输入分片(input split)或分片。Hadoop为每个分片(split)创建一个map任务,由它来运行用户自定义的map函数来分析每个分片中的记录。

70. 拥有许多分片就意味着处理每个分片的时间与处理整个输入的时间相比是比较小的。因此,如果我们并行处理每个分片,且分片是小块的数据,那么处理过程将有一个更好的负载平衡,因为更快的计算机将能够比一台速度较慢的机器在作业过程中处理完比例更多的数据分片。即使是相同的机器,没有处理的或其他同时运行的作业也会使负载平衡得以实现,并且在分片变得更细时,负载平衡质量也会更佳。

71. 另一方面,如果分片太小,那么管理分片的总时间和map任务创建的总时间将决定作业的执行的总时间。对于大多数作业,一个理想的分片大小往往是一个HDFS块的大小,默认是64 MB,虽然这可以根据集进行调整(对于所有新建文件)或在新建每个文件时具体进行指定。

72. map任务的执行节点和输入数据的存储节点是同一节点,Hadoop的性能达到最佳。这就是所谓的data locality optimization(数据局部性优化)。现在我们应该清楚为什么最佳分片的大小与块大小相同:它是最大的可保证存储在单个节点上的数据量。如果分区跨越两个块,那么对于任何一个HDFS节点而言,基本不可能同时存储这两数据块,因此此分布的某部分必须通过网络传输到节点,这与使用本地数据运行map任务相比,显然效率更低。

73. map任务把输出写入本地硬盘,而不是HDFS。这是为什么?因为map的输出作为中间输出:而中间输出则被reduce任务处理后产生最终的输出,一旦作业完成,map的输出就可以删除了。因此,把它及其副本存储在HDFS中,难免有些小题大做。如果该节点上运行的map任务在map输出给reduce任务处理之前崩溃,那么Hadoop将在另一个节点上重新运行map任务以再次创建map的输出。

74. reduce任务并不具备数据本地读取的优势-- 一个单一的reduce任务的输入往往来自于所有mapper的输出。在本例中,我们有一个单独的reduce任务,其输入是由所有map任务的输出组成的。因此,有序map的输出必须通过网络传输到reduce任务运行的节点,并在那里进行合并,然后传递到用户定义的reduce函数中。为增加其

可靠性,reduce的输出通常存储在HDFS中。如第3章所述,对于每个reduce输出的HDFS块,第一个副本存储在本地节点上,其他副本存储在其他机架节点中。因此,编写reduce的输出确实十分占用网络带宽,但是只和正常的HDFS写管线的消耗一样。

75. 一个单一的reduce 任务的整个数据流如图2-2所示。虚线框表示节点,虚线箭头表示数据传输到一个节点上,而实线的箭头表示节点之间的数据传输。

76. reduce任务的数目并不是由输入的大小来决定,而是单独具体指定的。在第7章的7.1节中,将介绍如何为一个给定的作业选择reduce任务数量。

77. 如果有多个reducer,map任务会对其输出进行分区,为每个reduce任务创建一个分区(partition)。每个分区包含许多键(及其关联的值),但每个键的记录都在同一个分区中。分区可以通过用户定义的partitioner来控制,但通常是用默认的分区工具,它使用的是hash函数来形成"木桶"键/值,这种方法效率很高。

78. 一般情况下,多个reduce任务的数据流如图2-3所示。此图清楚地表明了map和reduce任务之间的数据流为什么要称为"shuffle"(洗牌),因为每个reduce任务的输入都由许多map任务来提供。shuffle其实比此图所显示的更复杂,并且调整它可能会对作业的执行时间产生很大的影响,详见6.4节。

(点击查看大图)图2-2:MapReduce中单一reduce任务的数据流图

(点击查看大图)图2-3:多个reduce任务的MapReduce数据流

2.4.1 数据流(2)

最后,也有可能不存在reduce任务,不需要shuffle的时候,这样的情况是可能的,因为处理可以并行进行(第7章有几个例子讨论了这个问题)。在这种情况下,唯一的非本地节点数据传输是当map任务写入到HDFS中(见图2-4)。

图2-4:MapReduce中没有reduce任务的数据流

集的可用带宽限制了MapReduce作业的数量,因此map和reduce任务之间数据传输的代价是最小的。Hadoop允许用户声明一个combiner,运行在map的输出上-- 该函数的输出作为reduce函数的输入。由于combiner是一个优化方法,所以Hadoop不保证对于某个

map的输出记录是否调用该方法,调用该方法多少次。换言之,不调用该方法或者调用该方法多次,reducer的输出结果都一样。

combiner的规则限制着可用的函数类型。我们将用一个例子来巧妙地加以说明。以前面的最高气温例子为例,1950年的读数由两个map处理(因为它们在不同的分片中)。假设第一个map的输出如下:

1.

2.

3.

(1950, 0)

(1950, 20)

(1950, 10)

第二个map的输出如下:

1.

2.

(1950, 25)

(1950, 15)

reduce函数再调用时被传入以下数字:

1. (1950, [0, 20, 10, 25, 15])

因为25是输入值中的最大数,所以输出如下:

1. (1950, 25)

我们可以用combiner,像reduce函数那样,为每个map输出到最高气温。reduce函数被调用时将被传入如下数值:

1. (1950, [20, 25])

然而,reduce输出的结果和以前一样。更简单地说,我们可以像下面这样,对本例中的气温值进行如下函数调用:

1. max(0, 20, 10, 25, 15) = max(max(0, 20, 10),

max(25, 15)) = max(20, 25) = 25

并非所有函数都有此属性。例如,如果我们计算平均气温,便不能用mean作为combiner,因为:

1. mean(0, 20, 10, 25, 15) = 14

但是:

1. mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15

combiner并不能取代reduce函数。(为什么呢?reduce函数仍然需要处理来自不同的map给出的相同键记录。)但它可以帮助减少map和reduce之间的数据传输量,而正因为此,是否在MapReduce作业中使用combiner是需要慎重考虑的。

2.4.2 具体定义一个combiner

让我们回到Java MapReduce程序,combiner是用reducer接口来定义的,并且该应用程序的实现与MaxTemperatureReducer中的函数相同。唯一需要强调的不同是如何在JobConf上设置combiner类(见例2-7)。

例2-7:使用combiner高效查最高气温

1.

2.

3.

4.

5.

public class MaxTemperatureWithCombiner {

public static void main(String[] args) throws IOException {

if ( != 2) {

n("Usage: MaxTemperatureWithCombiner

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

23.

24.

25.

26.

path> " +"");

(-1);

}

JobConf conf = new JobConf();

Name("Max temperature");

utPath(conf, new Path(args[0]));

putPath(conf, new Path(args[1]));

perClass();

binerClass();

ucerClass();

putKeyClass();

putValueClass();

(conf);

}

}

2.4.3 运行分布式MapReduce作业

同一个程序将在一个完整的数据集中直接运行而不做更改。这是MapReduce的优势之一:它扩充数据大小和硬件规模。这里有一个运行结果:在一个10节点EC2 High-CPU Extra

Large lnstance,程序只用6分钟就运行完了。

我们将在第5章分析程序在集上运行的机制。

2.5 Hadoop流

Hadoop提供了一个API来运行MapReduce,并允许你用除java以外的语言来编写自己的map和reduce函数。Hadoop流使用Unix标准流作为Hadoop和程序之间的接口,所以可以使用任何语言,只要编写的MapReduce程序能够读取标准输入,并写入到标准输出。

流适用于文字处理(尽管0.21.0版本也可以处理二进制流),在文本模式下使用时,它有一个面向行的数据视图。map的输入数据把标准输入流传输到map函数,其中是一行一行的传输,然后再把行写入标准输出。一个map输出的键/值对是以单一的制表符分隔的行来写入的。reduce函数的输入具有相同的格式-- 通过制表符来分隔的键/值对-- 传输标准输入流。reduce函数从标准输入流读入行,然后为保证结果的有序性用键来排序,最后将结果写入标准输出。

下面使用流来重写按年份寻最高气温的MapReduce程序。

2.5.1 Ruby语言

例2-8中的map函数是用ruby来写的。

例2-8:用于查最高气温的map函数(ruby版)

1.

2.

3.

4.

5.

6.

7.

#!/usr/bin/env ruby

_line do |line|

val = line

year, temp, q = val[15,4], val[87,5], val[92,1]

puts "#{year}t#{temp}" if (temp != "+9999" && q =~ /[01459]/)

end

程序通过执行STDIN(IO类型的一个全局常量)中的每行,不断从标准输入中反复查。它把相关的字段从每行中取出,如果气温有效,就过一个tab字符t把年份和气温通隔开,然后输入标准输出(使用puts)。

注意:指出流和Java MapReduce API 之间的差异是十分值得的。 Java的API是主要面向一次处理一个map函数的记录。该框架调用Mapper的map()方法来处理输入中的每条记录,然而map程序可以决定如何处理输入流,例如,它可以轻松地读取和同一时间处理

多行,因为它控制着多行读取。用户的Java map实现是压栈记录,但它仍可以考虑处理多行,具体做法是将mapper中实例变量中之前的行汇聚在一起。 在这种情况下,需要实现close()方法,以便了解最后的记录何时被读取,从而完成对最后一组记录行的处理。

由于该脚本只运行在标准输入和输出上,所以只需使用Unix管道而不使用Hadoop来进行测试,这是不具有说服力的:

1.

2.

3.

4.

5.

6.

7.

8.

% cat input/ncdc/ | src/main/ch02/ruby/max_

temperature_

1950 +0000

1950 +0022

1950 -0011

1949 +0111

1949 +0078

例2-9显示的reduce函数稍微复杂一些。

例2-9:用于查最高气温的Reduce函数(Ruby版本)

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

#!/usr/bin/env ruby

last_key, max_val = nil, 0

_line do |line|

key, val = ("t")

if last_key && last_key != key

puts "#{last_key}t#{max_val}"

last_key, max_val = key, _i

else

last_key, max_val = key, [max_val, _i].max

end

end

puts "#{last_key}t#{max_val}" if last_key

同样,程序遍历标准输入中的行,但是当我们处理每组键时,我们要存储一些状态。在这种情况下,这个键是气象站的标识符,我们把看到的最后一个键和迄今为止见到的最高气温存储到那个键组中。MapReduce框架确保键是有序的,以便让我们知道,如果一个键与其上一个键不同,可将其它移入一个新的键组。相比之下,Java API则提供每一个键组的迭代器,我们只能在流中使用程序来到键组的边界。

我们从每行中取出键和值,随后,如果刚刚完成一组键(last_key&last_key =键),则在将新值更新为最高气温之前,写入键和该组的最高气温,它们由一个制表符来分隔。如果我们没有结束一个组,只能更新当前键的最高气温。

程序的最后一行保证了输入的最后一组键会有一行输出。

现在可以用UNIX管线(来模拟整个MapReduce的管线,它与图2-1显示的Unix 管线是相同的)。

1. % cat input/ncdc/ | src/main/ch02

/ruby/max_temperature_ |

2.

3.

4.

sort | src/main/ch02/ruby/max_temperature_

1949 111

1950 22

输出结果和Java 程序一样,所以下一步是用Hadoop来运行它。

Hadoop命令不支持Streaming函数,因此,我们需要指定JAR文件流与jar选项。流程序的选项指定了输入和输出路径选项,map和reudce脚本,如下所示:

1.

2.

3.

4.

5.

6.

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-

-input input/ncdc/

-output output

-mapper src/main/ch02/ruby/max_temperature_

-reducer src/main/ch02/ruby/max_temperature_

在一个集上运行一个庞大的数据集时,要使用-combiner选项来设置combiner。

从0.21.0版开始,combiner可以是任何一个流命令。对于早期版本的combiner,只能用Java来编写,所以作为变通的方法,一般都在mapper中手动合并,从而避开Java语言。在这种情况下,我们可以把mapper改成管线:

1.

2.

3.

4.

5.

6.

7.

8.

9.

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-

-input input/ncdc/all

-output output

-mapper "ch02/ruby/max_temperature_ | sort |

ch02/ruby/max_temperature_"

-reducer src/main/ch02/ruby/max_temperature_

-file src/main/ch02/ruby/max_temperature_

-file src/main/ch02/ruby/max_temperature_

还要注意-file的使用,在集上运行流程序把脚本传输到集上时,可使用此选项。

2.5.2 Python

Hadoop流支持任何可以从标准输入读取和写入到标准输出中的编程语言,因此对于更熟悉Python的读者,下面提供了同一个例子的Python版本 。map脚本参见例2-10,reduce脚本参见例2-11。

例2-10:用于查最高气温的Map函数(Python版本)

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

#!/usr/bin/env python

import re

import sys

for line in :

val = ()

(year, temp, q) = (val[15:19], val[87:92], val[92:93])

if (temp != "+9999" and ("[01459]", q)):

print "%st%s" % (year, temp)

例2-11 用于查最高气温的Reduce函数(Python版本)

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

#!/usr/bin/env python

import sys

(last_key, max_val) = (None, 0)

for line in :

(key, val) = ().split("t")

if last_key and last_key != key:

print "%st%s" % (last_key, max_val)

(last_key, max_val) = (key, int(val))

else:

(last_key, max_val) = (key, max(max_val, int(val)))

if last_key:

print "%st%s" % (last_key, max_val)

我们可以测试程序且用在Ruby中用过的相同的方法来运行作业。比如,运行一个测试:

1. % cat input/ncdc/ |

2.

3.

4.

5.

6.

src/main/ch02/python/max_temperature_ |

sort | src/main/ch02/python/max_temperature_

1949 111

1950 22

2.6 Hadoop管道

Hadoop管道是Hadoop MapReduce的C++接口的代称。与流不同,流使用标准输入和输出让map和reduce节点之间相互交流,管道使用sockets作为tasktracker与C++编写的map或者reduce函数的进程之间的通道。JNI未被使用。

我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用管道来运行它。例2-12显示了用C++语言编写的map函数和reduce函数的源代码。

例2-12:用C++语言编写的最高气温程序

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

#include

#include

#include

#include "hadoop/"

#include "hadoop/"

#include "hadoop/"

class MaxTemperatureMapper : public HadoopPipes::Mapper {

public:

MaxTemperatureMapper(HadoopPipes::TaskContext& context) {

}

void map(HadoopPipes::MapContext& context) {

std::string line = utValue();

std::string year = (15, 4);

std::string airTemperature = (87, 5);

std::string q = (92, 1);

if (airTemperature != "+9999" &&

(q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) {

20.

21.

22.

23.

(year, airTemperature);

}

}

};

24.

25.

26.

27.

28.

29.

30.

31.

32.

33.

34.

35.

36.

37.

38.

39.

40.

41.

42.

43.

class MapTemperatureReducer : public HadoopPipes::Reducer {

public:

MapTemperatureReducer(HadoopPipes::TaskContext& context) {

}

void reduce(HadoopPipes::ReduceContext& context) {

int maxValue = INT_MIN;

while (lue()) {

maxValue = std::max(maxValue,

HadoopUtils::toInt(utValue()));

}

(utKey(),

HadoopUtils::toString(maxValue));

}

};

int main(int argc, char *argv[]) {

return HadoopPipes::runTask(HadoopPipes::TemplateFactory

MapTemperatureReducer>());

44. }

此应用程序连接到Hadoop C++库,后者是一个用于与tasktracker子进程进行通信的轻量级的封装器。通过扩展在HadoopPipes命名空间的Mapper和Reducer类且提供map()和reduce()方法的实现,我们便可定义map和reduce函数。这些方法采用了一个上下文对象(MapContext类型或ReduceContext类型),后者提供读取输入和写入输出,通过JobConf类来访问作业配置信息。本例中的处理过程非常类似于Java的处理方式。

与Java接口不同,C++接口中的键和值是字节缓冲,表示为标准模板库(Standard

Template Library,STL)的字符串。这使接口变得更简单,尽管它把更重的负担留给了应用程序的开发人员,因为开发人员必须将字符串convert to and from表示to和from两个逆操作。开发人员必须在字符串及其他类型之间进行转换。这一点在MapTemperatureReducer中十分明显,其中,我们必须把输入值转换为整数的输入值(使用HadoopUtils中的便利方法),在最大值被写出之前将其转换为字符串。在某些情况下,我们可以省略这个转化,如在MaxTemperatureMapper中,它的airTemperature值从来不用转换为整数,因为它在map()方法中从来不会被当作数字来处理。

main()方法是应用程序的入口点。它调用HadoopPipes::runTask,连接到从Mapper或Reducer连接到Java的父进程和marshals 数据。runTask()方法被传入一个Factory参数,使其可以创建Mapper或Reducer的实例。它创建的其中一个将受套接字连接中Java父进程控制。我们可以用重载模板factory方法来设置一个combiner(combiner)、partitioner(partitioner)、记录读取函数(record reader)或记录写入函数(record writer)。

编译运行

现在我们可以用Makerfile编译连接例2-13的程序。

例2-13:C++版本的MapReduce程序的Makefile

1.

2.

3.

4.

5.

6.

7.

CC = g++

CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

max_temperature: max_

$(CC) $(CPPFLAGS) $< -Wall

-L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes

-lhadooputils -lpthread -g -O2 -o $@

在Makefile中应当设置许多环境变量。除了HADOOP_INSTALL(如果遵循附录A的安装说明,应该已经设置好),还需要定义平台,指定操作系统、体系结构和数据模型(例如,32位或64位)。我在32位Linux系统的机器编译运行了如下内容:

1.

2.

% export PLATFORM=Linux-i386-32

% make

在成功完成之前,当前目录中便有max_temperature可执行文件。

要运行管道作业,我们需要在伪分布式(pseudo_distrinuted)模式下(其中所有守护进程运行在本地计算机上)运行Hadoop,其中的安装步骤参见附录A。管道不在独立模式(本地运行)中运行,因为它依赖于Hadoop的分布式缓存机制,仅在HDFS运行时才运行。

Hadoop守护进程开始运行后,第一步是把可执行文件复制到HDFS,以便它们启动map和reduce任务时,它能够被tasktracker取出:

1. % hadoop fs -put max_temperature bin/max_temperature

示例数据也需要从本地文件系统复制到HDFS:

1. % hadoop fs -put input/ncdc/

现在可以运行这个作业。为了使其运行,我们用Hadoop 管道命令,使用-program参数来传递在HDFS中可执行文件的URI。

1.

2.

3.

4.

5.

6.

% hadoop pipes

-D reader

-D writer

inpit

output output

program bin/max_temperature

我们使用-D选项来指定两个属性:reader和writer,这两个属性都被设置为true,表示我们并没有指定一个C+++记录读取函数或记录写入函数,但我们要使用默认的Java设置(用来设置文本输入和输出)。管道还允许你设置一个Java mapper,reducer,combiner或partitioner。事实上,在任何一个作业中,都可以混合使用Java类或C++类。

结果和用其他语言编写的程序所得的结果一样。

第3章 Hadoop分布式文件系统

当数据集超过一个单独的物理计算机的存储能力时,便有必要将它分布到多个独立的计算机。管理着跨计算机网络存储的文件系统称为分布式文件系统。因为它们是基于网络的,所有网络编程的复杂性都会随之而来,所以分布式文件系统比普通磁盘文件系统更复杂。举例来说,使这个文件系统能容忍节点故障而不损失数据就是一个极大的挑战。

Hadoop有一个被称为HDFS的分布式系统,全称为Hadoop Distributed Filesystem。(有时可能简称为DFS,在非正式情况或是文档和配置中,其实是一样的。) HDFS是Hadoop的旗舰级文件系统,也是本章的重点,但Hadoop实际上有一个综合性的文件系统抽象,因而接下来我们将看看Hadoop如何与其他文件系统集成(如本地文件系统或Amazon S3)。

3.1 HDFS的设计

HDFS是为以流式数据访问模式存储超大文件而设计的文件系统,在商用硬件的集上运行。让我们仔细看看下面的明。

超大文件

"超大文件"在这里指几百MB,几百GB甚至几百TB大小的文件。目前已经有Hadoop集存储PB(petabytes)级的数据了。

流式数据访问

HDFS建立在这样一个思想上:一次写入、多次读取模式是最高效的。一个数据集通常由数据源生成或复制,接着在此基础上进行各种各样的分析。每个分析至少都会涉及数据集中的大部分数据 (甚至全部),因此读取整个数据集的时间比读取第一条记录的延迟更为重要。

商用硬件

Hadoop不需要运行在昂贵并且高可靠性的硬件上。它被设计运行在商用硬件(在各种零售店都能买到的普通硬件)的集上,因此至少对于大的集来说,节点故障的几率还是较高的。HDFS在面对这种故障时,被设计为能够继续运行而让用户察觉不到明显的中断。

同时,那些并不适合HDFS的应用也是值得研究的。在目前,HDFS还不太适用于某些领域,不过日后可能会有所改进。

低延迟数据访问

需要低延迟访问数据在毫秒范围内的应用并不适合HDFS。HDFS是为达到高数据吞吐量而优化的,这有可能会以延迟为代价。目前,对于低延迟访问,HBase(参见第12章)是更好的选择。

大量的小文件

名称节点(namenode)存储着文件系统的元数据,因此文件数量的限制也由名称节点的内存量决定。根据经验,每个文件,索引目录以及块占大约150个字节。因此,举例来说,如果有一百万个文件,每个文件占一个块,就至少需要300 MB的内存。虽然存储上百万的文件是可行的,十亿或更多的文件就超出目前硬件的能力了。

多用户写入,任意修改文件

HDFS中的文件只有一个写入者,而且写操作总是在文件的末尾。它不支持多个写入者,或是在文件的任意位置修改。(可能在以后这些会被支持,但它们也相对不那么高效。)

3.2 HDFS的概念

3.2.1 块

一个磁盘有它的块大小,代表着它能够读写的最小数据量。文件系统通过处理大小为一个磁盘块大小的整数倍数的数据块来运作这个磁盘。文件系统块一般为几千字节,而磁盘块一般为512个字节。这些信息,对于仅仅在一个文件上读或写任意长度的文件系统用户来说是透明的。但是,有些工具会维护文件系统,如df 和 fsck, 它们都在系统块级上操作。

HDFS也有块的概念,不过是更大的单元,默认为64 MB。与单一磁盘上的文件系统相似,HDFS上的文件也被分为以块为大小的分块,作为单独的单元存储。但与其不同的是,HDFS中小于一个块大小的文件不会占据整个块的空间。如果没有特殊指出,"块"在本书中就指代HDFS中的块。

为何HDFS中的一个块那么大?

HDFS的块比磁盘的块大,目的是为了减小寻址开销。通过让一个块足够大,从磁盘转移数据的时间能够远远大于定位这个块开始端的时间。因此,传送一个由多个块组成的文件的时间就取决于磁盘传输送率。

我们来做一个速算,如果寻址时间在10毫秒左右,传输速率是100兆/秒,为了使寻址时间为传输时间的1%,我们需要100 MB左右的块大小。而默认的大小实际为64 MB,尽管很多HDFS设置使用128 MB的块。这一数字将在以后随着新一代磁盘驱动带来的传输速度加快而继续调整。

当然这种假定不应该如此夸张。MapReduce过程中的map任务通常是在一个时间内运行操作一个块,因此如果任务数过于少(少于集上的节点数量),作业的运行速度显然就比预期的慢。

在分布式文件系统中使用抽象块会带来很多好处。第一个最明显的好处是,一个文件可以大于网络中任意一个磁盘的容量。文件的分块(block,后文有些地方也简称为"块")不需要存储在同一个磁盘上,因此它们可以利用集上的任意一个磁盘。其实,虽然不常见,但对于HDFS集而言,也可以存储一个其分块占满集中所有磁盘的文件。

第二个好处是,使用块抽象单元而不是文件会简化存储子系统。简单化是所有系统的追求,但对于故障种类繁多的分布式系统来说尤为重要的。存储子系统控制的是块,简化了存储管理。(因为块的大小固定,计算一个磁盘能存多少块就相对容易),也消除了对元数据的顾虑(块只是一部分存储的数据-而文件的元数据,如许可信息,不需要与块一同存储,这样一来,其他系统就可以正交地管理元数据。)

不仅如此,块很适合于为提供容错和实用性而做的复制操作。为了应对损坏的块以及磁盘或机器的故障,每个块都在少数其他分散的机器(一般为3个)进行复制。如果一个块损坏了,系统会在其他地方读取另一个副本,而这个过程是对用户透明的。一个因损坏或机器故障而丢失的块会从其他候选地点复制到正常运行的机器上,以保证副本的数量回到正常水平。(参见第4章的"数据的完整性"小节,进一步了解如何应对数据损坏。)同样,有些应用程序可能选择为热门的文件块设置更高的副本数量以提高集的读取负载量。

与磁盘文件系统相似,HDFS中 fsck 指令会显示块的信息。例如,执行以下命令将列出文件系统中组成各个文件的块(参见第10章的"文件系统查看(fsck)"小节):

1. % hadoop fsck / -files -blocks

3.2.2 名称节点与数据节点

HDFS集有两种节点,以管理者-工作者的模式运行,即一个名称节点(管理者)和多个数据节点(工作者)。名称节点管理文件系统的命名空间。它维护着这个文件系统树及这个树内所有的文件和索引目录。这些信息以两种形式将文件永久保存在本地磁盘上:命名空间镜像和

编辑日志。名称节点也记录着每个文件的每个块所在的数据节点,但它并不永久保存块的位置,因为这些信息会在系统启动时由数据节点重建。

客户端代表用户通过与名称节点和数据节点交互来访问整个文件系统。客户端提供一个类似POSIX(可移植操作系统界面)的文件系统接口,因此用户在编程时并不需要知道名称节点和数据节点及其功能。

数据节点是文件系统的工作者。它们存储并提供定位块的服务(被用户或名称节点调用时),并且定时的向名称节点发送它们存储的块的列表。

没有名称节点,文件系统将无法使用。事实上,如果运行名称节点的机器被毁坏了,文件系统上所有的文件都会丢失,因为我们无法知道如何通过数据节点上的块来重建文件。因此,名称节点能够经受故障是非常重要的,Hadoop提供了两种机制来确保这一点。

第一种机制就是复制那些组成文件系统元数据持久状态的文件。Hadoop可以通过配置使名称节点在多个文件系统上写入其持久化状态。这些写操作是具同步性和原子性的。一般的配置选择是,在本地磁盘上写入的同时,写入一个远程NFS挂载(mount)。

另一种可行的方法是运行一个二级名称节点,虽然它不能作为名称节点使用。这个二级名称节点的重要作用就是定期的通过编辑日志合并命名空间镜像,以防止编辑日志过大。这个二级名称节点一般在其他单独的物理计算机上运行,因为它也需要占用大量CPU和内存来执行合并操作。它会保存合并后的命名空间镜像的副本,在名称节点失效后就可以使用。但是,二级名称节点的状态是比主节点滞后的,所以主节点的数据若全部丢失,损失仍在所难免。在这种情况下,一般把存在NFS上的主名称节点元数据复制到二级名称节点上并将其作为新的主名称节点运行。

详情请参见第10章的"文件系统镜像与编辑日志"小节。

3.3 命令行接口

现在我们将通过命令行与HDFS交互。HDFS还有很多其他接口,但命令行是最简单的,同时也是许多开发者最熟悉的。

我们将在一台机器上运行HDFS,所以请先参照附录A中在伪分布模式下设置Hadoop的说明。稍后将介绍如何在集上运行HDFS从而为我们提供伸缩性与容错性。

在我们设置伪分布配置时,有两个属性需要进一步解释。首先是,设置为hdfs://localhost/, 用来为Hadoop设置默认文件系统。文件系统是由URI指定的,这里我们已使用了一个hdfs URI 来配置HDFS为Hadoop的默认文件系统。HDFS的守护程序将通

过这个属性来决定HDFS名称节点的宿主机和端口。我们将在localhost上运行,默认端口为8020。这样一来,HDFS用户将通过这个属性得知名称节点在哪里运行以便于连接到它。

第二个属性ation,我们设为1,这样一来,HDFS就不会按默认设置将文件系统块复制3份。在单独一个数据节点上运行时,HDFS无法将块复制到3个数据节点上,所以会持续警告块的副本不够。此设置可以解决这个问题。

基本文件系统操作

文件系统已经就绪,我们可以执行所有其他文件系统都有的操作,例如,读取文件,创建目录,移动文件,删除数据,列出索引目录,等等。输入hadoop fs -help命令即可看到所有命令详细的帮助文件。

首先将本地文件系统的一个文件复制到HDFS:

1. % hadoopfs -copyFromLocal input/docs/quangle.

txt hdfs://localhost/user/tom/

该命令调用Hadoop文件系统的shell命令fs,提供一系列的子命令。在这里,我们执行的是-copyFromLocal。本地文件被复制到运行在localhost上的HDFS实体中的/user/tom/文件。其实我们可以省略URI的格式与主机而选择默认设置,即省略hdfs://localhost,就像中指定的那样。

1. % hadoop fs -copyFromLocal input/docs/quangle.

txt /user/tom/

也可以使用相对路径,并将文件复制到home目录,即/user/tom:

1. % hadoop fs -copyFromLocal input/docs/

我们把文件复制回本地文件系统,看看是否一样:

1.

2.

3.

4.

% hadoop fs -copyToLocal

% md5 input/docs/

MD5 (input/docs/) = a16f231da6b05e2ba7a339320e7dacd9

MD5 () = a16f231da6b05e2ba7a339320e7dacd9

MD5分析结果是一样的,表明这个文件在HDFS之旅中得以幸存并完整。

最后,我们看一下HDFS文件列表。我们创建一个目录来看看它在列表中如何显示:

1.

2.

3.

% hadoop fs -mkdir books

% hadoop fs -ls .

Found 2 items

4. drwxr-xr-x - tom supergroup 0

2009-04-02 22:41 /user/tom/books

5. -rw-r--r-- 1 tom supergroup 118

2009-04-02 22:29 /user/tom/

返回的信息结果与Unix命令ls -l的输出非常相似,仅有细微差别。第一列显示的是文件格式。第二列是这个文件的副本数(这在Unix文件系统是没有的)。由于我们设置的默认副本数在网站范围内为1,所以这里显示的也都是1。这一列的开头目录栏是空的,因为副本的概念并没有应用-- 目录是作为元数据并存在名称节点中的,而非数据节点。第三列和第四列显示文件的所属用户和组别。第五列是文件的大小,以字节显示,目录大小为0。第六列和第七列是文件的最后修改日期与时间。最后的第八列是文件或目录的绝对路径。

HDFS中的文件许可

HDFS对于文件及目录有与POSIX非常相似的许可模式。

共有三种形式的许可:读取许可(r)、写入许可(w)和执行许可(x)。读取文件或列出目录内容时需要读取许可。写入一个文件,或是在一个目录上创建或删除文件或目录,需要写入许可。对于文件而言执行许可可以忽略因为HDFS中不能执行文件(与POSIX不同),但在访问一个目录的子项时是需要的。

每个文件和目录都有一个所属用户、所属组别和模式。这个模式是由所属用户的许可、组内其他成员的许可及其他用户的许可组成。

客户端的标识是通过它正在运行的进程的username(名称)和groups(组别)来确定的。由于客户端是远程的,任何人都可以简单地在远程系统上创建一个账户来进行访问。因此,许可只能在一个合作的团体中的用户中使用,作为共享文件系统资源和防止数据意外损失的机制,而不能在一个敌意的环境中保护资源。但是,除去这些缺点,为防止用户或自动工具及程序意外修改或删除文件系统的重要部分,使用许可还是值得的(这也是默认的配置,参见sions属性)。

如果启用了许可检查,所属用户许可与组别许可都会被检查,以确认用户的用户名与所属用户许可是否相同,确认他是否属于此用户组的成员;若不符,则检查其他许可。

这里有一个超级用户的概念,超级用户是名称节点进程的标识。对于超级用户,系统不会执行任何许可检查。

3.4 Hadoop文件系统(1)

Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现。Java抽象类

stem展示了Hadoop的一个文件系统,而且有几个具体实现,如表 3-1所示。

文件系统

RI

UJava实

现(全部在

描述

方案 )

Local file ileSystem 针对有客户端校验和

的本地连接磁盘使用

的文件系统。针对没

有校验和的本

地文件系统使用

RawLocalFileSystem。

详情参见第4章

HDFS hdfs buted-

FileSystem

Hadoop的分布式

文件系统。HDFS

被设计为结合使用

Map-Reduce实现高

效工作

HFTP hftp leSystem

一个在HTTP上提

供对HDFS只读访

问的文件系统(虽然

其名称为HFTP,但

它与FTP无关)。通

常与distcp结合使用

(参见第3章),在运

行不同版本HDFS的

集间复制数据

HSFTP hsftp

-

FileSystem

在HTTPS上提供对

HDFS只读访问的

文件系统(同上,与

FTP无关)

HAR har eSystem 一个构建在其他文

件系统上来存档文

件的文件系统。Hadoop

存档一般在HDFS中

的文件存档时使用,

以减少名称节点内存的使用

KFS(Cloud-Storekfs

)

-

FleSystem

cloudstore(其前身是

Kosmos文件系统)

是相似于HDFS或是

Google的GFS的文件

系统,用C++编

写。详

情可参见kosmosfs.

/

FTP ftp - 由FTP服务器支持的

FileSystem

S3(本地) s3n -

S3FileSystem.

文件系统

由Amazon S3支

持的文件

系统。可参见

/hadoop/AmazonS3

S3(基于

块)

s3 fs.s3.S3FileSystem 由 Amazon S3支

持的文件系统,

以块格式存储文件

(与HDFS很相似)

来解决S3的5 GB

文件大小限制

Hadoop提供了许多文件系统的接口,它一般使用URI 方案来选取合适的文件系统实例交互。举例来说,我们在前一小节中研究的文件系统shell可以操作所有的Hadoop文件系统。列出本地文件系统根目录下的文件,输入以下命令:

1. % hadoop fs -ls file:///

尽管运行那些可访问任何文件系统的MapReduce程序是可行的(有时也很方便),但在处理大量数据时,仍然需要选择一个有最优本地数据的分布式文件系统,如HDFS或者KFS(参见第1章)。

3.4 Hadoop文件系统(2)

接口

Hadoop是用Java编写的,所有Hadoop文件系统间的相互作用都是由Java API调解的。

举个例子,文件系统的shell就是一个Java应用,它使用Java文件系统类来提供文件系统操作。这些接口在HDFS中被广泛应用,因为Hadoop中的其他文件系统一般都有访问基本文件系统的工具(FTP的FTP 客户,S3的S3工具等),但它们大多数都能和任意一个Hadoop文件系统协作。

Thrift

因为Hadoop的文件系统接口是Java API,所以其他非Java应用访问Hadoop文件系统会比较麻烦。在"Thriftfs"分类单元中的Thrift API通过将Hadoop文件系统展示为一个Apache

Thrift服务来弥补这个不足,使得任何有Thrift绑定的语言都能轻松地与Hadoop文件系统互动,如HDFS。

使用Thrift API,需要运行提供Thrift服务的Java服务器,以代理的方式访问Hadoop文件系统。你的应用程序在访问Thrift服务时,后者实际上就和它运行在同一台机器上。


本文发布于:2024-09-23 13:18:49,感谢您对本站的认可!

本文链接:https://www.17tex.com/fanyi/16002.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   运行   节点
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议