关于MapReduce

初识Hadoop

『大数据胜于好算法。』

为什么不能用数据库来对大量硬盘上的大规模数据进行批量分析呢?我们为什么需要MapReduce?

这两个问题的答案来自于计算机硬盘的另一个发展趋势:寻址时间的提升远远不敌于传输速率的提升。寻址是将磁头移动到特定硬盘位置进行读写操作的过程。它是导致硬盘操作延迟的主要原因,而传输速率取决于硬盘的带宽。

如果数据访问模式中包含大量的硬盘寻址,那么读取大量数据集就必然会花更长的时间。另一方面,如果数据库系统只更新一小部分记录,那么传统的B树就更有优势。但数据库系统如果有大量数据更新时,B树的效率就明显落后于MapReduce,因为需要使用”排序/合并”(sort/merge)来重建数据库。

MapReduce比较适合以批处理方式处理需要分析整个数据集的问题,尤其是动态分析。RDBMS适用于点查询和更新,数据集被索引之后,数据库系统能够提供低延迟的数据检索和快速的少量数据更新。MapReduce适合一次写入、多次读取数据的应用,关系型数据库则更适合持续更新的数据集。

传统的关系型数据库 MapReduce
数据大小 GB PB
数据存取 交互式和批处理 批处理
更新 多次读/写 一次写入,多次读取
结构 静态模式 动态模式
完整性
横向扩展 非线性的 线性的

MapReduce和关系型数据库之间的另一个区别在于它们所操作的数据集的结构化程度。

Hadoop名字是Doug Cutting的小孩给他的毛绒象玩具取得。

列举一些Hadoop项目:

  • Common:一系列组件和接口,用于分布式文件系统和通用I/O。
  • Avro:一种序列化系统,用于支持高效、跨语言的RPC和持久化存储。
  • MapReduce:分布式数据处理模型和执行环境,运行于大型商用集群。
  • HDFS:分布式文件系统,运行于大型商用机群。
  • Pig:数据流语言和运行环境,用以探究非常庞大的数据集。Pig运行在MapReduce和HDFS集群上。
  • Hive:一种分布式的、按列存储的数据仓库。Hive管理HDFS中存储的数据,并提供基于SQL的查询语言用以查询数据。
  • HBase:一种分布式的、按列存储的数据库。HBase使用HDFS作为底层存储,同时支持MapReduce的批量式计算和点查询(随机查询)。
  • ZooKeeper:一种分布式的、可用性高的协调服务。ZooKeeper提供分布式锁之类的基本服务用于构建分布式应用。

关于MapReduce

MapReduce是一种可用于数据处理的编程模型。

map和reduce

MapReduce任务过程分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键值对作为输入和输出,其类型由程序员来选择。程序员还要写两个函数:map函数和reduce函数。

数据流

MapReduce作业(job)是客户端需要执行的一个工作元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个小任务(task)来执行,其中包括两类任务:map任务和reduce任务。

有两类节点控制着作业执行过程:一个jobtracker及一些列tasktracker。jobtracker通过调度tasktracker上运行的任务来协调所有运行在系统上的作业。tasktracker在运行任务的同时将运行进度报告给jobtracker,jobtracker由此记录每项作业任务的整体进度情况。如果一个任务失败,jobtracker可以在另外一个tasktracker节点上重新调度该任务。

Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)或简称『分片』。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数从而处理分片中的每条记录。

Hadoop在存储有输入数据的节点上运行map任务,可以获得最佳性能。这就是所谓『数据本地化优化』(data locality optimization),因为它无需宝贵的集群带宽资源。

map任务将其输出写入本地硬盘,而非HDFS。这是为什么?因此map输出是中间结果:该中间结果由reduce任务处理后才产生最终输出结果,而且一旦完成作业,map的输出结果就可以删除。

**为什么map任务和reduce任务之间的数据流称为shuffle,因为每个reduce任务的输入都来自许多map任务。

Hadoop Streaming

例子.用于查找最高气温的map函数

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
import re
import sys


for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)

用于查找最高气温的reduce函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
import sys


(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split('\t')
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)

Reference

Hadoop权威指南

More than your eyes can see