您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
谷歌技术之MapReduce和BigTable
 
作者 OpenNaive博客,火龙果软件    发布于 2014-11-24
  4077  次浏览      1
 

谷歌技术"三宝"之MapReduce

1. MapReduce是干啥的

因为没找到谷歌的示意图,所以我想借用一张Hadoop项目的结构图来说明下MapReduce所处的位置,如下图。

Hadoop实际上就是谷歌三宝的开源实现,Hadoop MapReduce对应Google MapReduce,HBase对应BigTable,HDFS对应GFS。HDFS(或GFS)为上层提供高效的非结构化存储服务,HBase(或BigTable)是提供结构化数据服务的分布式数据库,Hadoop MapReduce(或Google MapReduce)是一种并行计算的编程模型,用于作业调度。

GFS和BigTable已经为我们提供了高性能、高并发的服务,但是并行编程可不是所有程序员都玩得转的活儿,如果我们的应用本身不能并发,那GFS、BigTable也都是没有意义的。MapReduce的伟大之处就在于让不熟悉并行编程的程序员也能充分发挥分布式系统的威力。

简单概括的说,MapReduce是将一个大作业拆分为多个小作业的框架(大作业和小作业应该本质是一样的,只是规模不同),用户需要做的就是决定拆成多少份,以及定义作业本身。

下面用一个贯穿全文的例子来解释MapReduce是如何工作的。

2. 例子:统计词频

如果我想统计下过去10年计算机论文出现最多的几个单词,看看大家都在研究些什么,那我收集好论文后,该怎么办呢?

方法一:我可以写一个小程序,把所有论文按顺序遍历一遍,统计每一个遇到的单词的出现次数,最后就可以知道哪几个单词最热门了。

这种方法在数据集比较小时,是非常有效的,而且实现最简单,用来解决这个问题很合适。

方法二:写一个多线程程序,并发遍历论文。

这个问题理论上是可以高度并发的,因为统计一个文件时不会影响统计另一个文件。当我们的机器是多核或者多处理器,方法二肯定比方法一高效。但是写一个多线程程序要比方法一困难多了,我们必须自己同步共享数据,比如要防止两个线程重复统计文件。

方法三:把作业交给多个计算机去完成。

我们可以使用方法一的程序,部署到N台机器上去,然后把论文集分成N份,一台机器跑一个作业。这个方法跑得足够快,但是部署起来很麻烦,我们要人工把程序copy到别的机器,要人工把论文集分开,最痛苦的是还要把N个运行结果进行整合(当然我们也可以再写一个程序)。

方法四:让MapReduce来帮帮我们吧!

MapReduce本质上就是方法三,但是如何拆分文件集,如何copy程序,如何整合结果这些都是框架定义好的。我们只要定义好这个任务(用户程序),其它都交给MapReduce。

在介绍MapReduce如何工作之前,先讲讲两个核心函数map和reduce以及MapReduce的伪代码。

3. map函数和reduce函数

map函数和reduce函数是交给用户实现的,这两个函数定义了任务本身。

1.map函数:接受一个键值对(key-value pair),产生一组中间键值对。MapReduce框架会将map函数产生的中间键值对里键相同的值传递给一个reduce函数。

2.reduce函数:接受一个键,以及相关的一组值,将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)。

统计词频的MapReduce函数的核心代码非常简短,主要就是实现这两个函数。

map(String key, String value):  
02. // key: document name
03. // value: document contents
04. for each word w in value:
05. EmitIntermediate(w, "1");
06.
07.reduce(String key, Iterator values):
08. // key: a word
09. // values: a list of counts
10. int result = 0;
11. for each v in values:
12. result += ParseInt(v);
13. Emit(AsString(result));

在统计词频的例子里,map函数接受的键是文件名,值是文件的内容,map逐个遍历单词,每遇到一个单词w,就产生一个中间键值对<w, "1">,这表示单词w咱又找到了一个;MapReduce将键相同(都是单词w)的键值对传给reduce函数,这样reduce函数接受的键就是单词w,值是一串"1"(最基本的实现是这样,但可以优化),个数等于键为w的键值对的个数,然后将这些“1”累加就得到单词w的出现次数。最后这些单词的出现次数会被写到用户定义的位置,存储在底层的分布式存储系统(GFS或HDFS)。

4. MapReduce是如何工作的

上图是论文里给出的流程图。一切都是从最上方的user program开始的,user program链接了MapReduce库,实现了最基本的Map函数和Reduce函数。图中执行的顺序都用数字标记了。

1.MapReduce库先把user program的输入文件划分为M份(M为用户定义),每一份通常有16MB到64MB,如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内其它机器上。

2.user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业或者Reduce作业),worker的数量也是可以由用户指定的。

3.被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。

4.缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker。

5.master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业(谁让分区少呢),所以排序是必须的。

6.reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。

7.当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码。

所有执行完毕后,MapReduce输出放在了R个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这R个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。而且我们要注意Map/Reduce作业和map/reduce函数的区别:Map作业处理一个输入数据的分片,可能需要调用多次map函数来处理每个输入键值对;Reduce作业处理一个分区的中间键值对,期间要对每个不同的键调用一次reduce函数,Reduce作业最终也对应一个输出文件。

我更喜欢把流程分为三个阶段。第一阶段是准备阶段,包括1、2,主角是MapReduce库,完成拆分作业和拷贝用户程序等任务;第二阶段是运行阶段,包括3、4、5、6,主角是用户定义的map和reduce函数,每个小作业都独立运行着;第三阶段是扫尾阶段,这时作业已经完成,作业结果被放在输出文件里,就看用户想怎么处理这些输出了。

5. 词频是怎么统计出来的

结合第四节,我们就可以知道第三节的代码是如何工作的了。假设咱们定义M=5,R=3,并且有6台机器,一台master。

这幅图描述了MapReduce如何处理词频统计。由于map worker数量不够,首先处理了分片1、3、4,并产生中间键值对;当所有中间值都准备好了,Reduce作业就开始读取对应分区,并输出统计结果。

6. 用户的权利

用户最主要的任务是实现map和reduce接口,但还有一些有用的接口是向用户开放的。

1.an input reader。这个函数会将输入分为M个部分,并且定义了如何从数据中抽取最初的键值对,比如词频的例子中定义文件名和文件内容是键值对。

2.a partition function。这个函数用于将map函数产生的中间键值对映射到一个分区里去,最简单的实现就是将键求哈希再对R取模。

3.a compare function。这个函数用于Reduce作业排序,这个函数定义了键的大小关系。

4.an output writer。负责将结果写入底层分布式文件系统。

5.a combiner function。实际就是reduce函数,这是用于前面提到的优化的,比如统计词频时,如果每个<w, "1">要读一次,因为reduce和map通常不在一台机器,非常浪费时间,所以可以在map执行的地方先运行一次combiner,这样reduce只需要读一次<w, "n">了。

6.map和reduce函数就不多说了。

7. MapReduce的实现

目前MapReduce已经有多种实现,除了谷歌自己的实现外,还有著名的hadoop,区别是谷歌是c++,而hadoop是用java。另外斯坦福大学实现了一个在多核/多处理器、共享内存环境内运行的MapReduce,称为Phoenix(介绍),相关的论文发表在07年的HPCA,是当年的最佳论文哦!

谷歌技术"三宝"之BigTable

1 什么是BigTable

Bigtable是一个为管理大规模结构化数据而设计的分布式存储系统,可以扩展到PB级数据和上千台服务器。很多google的项目使用Bigtable存储数据,这些应用对Bigtable提出了不同的挑战,比如数据规模的要求、延迟的要求。Bigtable能满足这些多变的要求,为这些产品成功地提供了灵活、高性能的存储解决方案。

Bigtable看起来像一个数据库,采用了很多数据库的实现策略。但是Bigtable并不支持完整的关系型数据模型;而是为客户端提供了一种简单的数据模型,客户端可以动态地控制数据的布局和格式,并且利用底层数据存储的局部性特征。Bigtable将数据统统看成无意义的字节串,客户端需要将结构化和非结构化数据串行化再存入Bigtable。

下文对BigTable的数据模型和基本工作原理进行介绍,而各种优化技术(如压缩、Bloom Filter等)不在讨论范围。

2 BigTable的数据模型

Bigtable不是关系型数据库,但是却沿用了很多关系型数据库的术语,像table(表)、row(行)、column(列)等。这容易让读者误入歧途,将其与关系型数据库的概念对应起来,从而难以理解论文。Understanding HBase and BigTable是篇很优秀的文章,可以帮助读者从关系型数据模型的思维定势中走出来。

本质上说,Bigtable是一个键值(key-value)映射。按作者的说法,Bigtable是一个稀疏的,分布式的,持久化的,多维的排序映射。

先来看看多维、排序、映射。Bigtable的键有三维,分别是行键(row key)、列键(column key)和时间戳(timestamp),行键和列键都是字节串,时间戳是64位整型;而值是一个字节串。可以用 (row:string, column:string, time:int64)→string 来表示一条键值对记录。

行键可以是任意字节串,通常有10-100字节。行的读写都是原子性的。Bigtable按照行键的字典序存储数据。Bigtable的表会根据行键自动划分为片(tablet),片是负载均衡的单元。最初表都只有一个片,但随着表不断增大,片会自动分裂,片的大小控制在100-200MB。行是表的第一级索引,我们可以把该行的列、时间和值看成一个整体,简化为一维键值映射,类似于:

table{  
02. "1" : {sth.},//一行
03. "aaaaa" : {sth.},
04. "aaaab" : {sth.},
05. "xyz" : {sth.},
06. "zzzzz" : {sth.}
07.}

列是第二级索引,每行拥有的列是不受限制的,可以随时增加减少。为了方便管理,列被分为多个列族(column family,是访问控制的单元),一个列族里的列一般存储相同类型的数据。一行的列族很少变化,但是列族里的列可以随意添加删除。列键按照family:qualifier格式命名的。这次我们将列拿出来,将时间和值看成一个整体,简化为二维键值映射,类似于:

table{  
02. // ...
03. "aaaaa" : { //一行
04. "A:foo" : {sth.},//一列
05. "A:bar" : {sth.},//一列
06. "B:" : {sth.} //一列,列族名为B,但是列名是空字串
07. },
08. "aaaab" : { //一行
09. "A:foo" : {sth.},
10. "B:" : {sth.}
11. },
12. // ...
13.}

或者可以将列族当作一层新的索引,类似于:

table{  
02. // ...
03. "aaaaa" : { //一行
04. "A" : { //列族A
05. "foo" : {sth.}, //一列
06. "bar" : {sth.}
07. },
08. "B" : { //列族B
09. "" : {sth.}
10. }
11. },
12. "aaaab" : { //一行
13. "A" : {
14. "foo" : {sth.},
15. },
16. "B" : {
17. "" : "ocean"
18. }
19. },
20. // ...
21.}

时间戳是第三级索引。Bigtable允许保存数据的多个版本,版本区分的依据就是时间戳。时间戳可以由Bigtable赋值,代表数据进入Bigtable的准确时间,也可以由客户端赋值。数据的不同版本按照时间戳降序存储,因此先读到的是最新版本的数据。我们加入时间戳后,就得到了Bigtable的完整数据模型,类似于:

table{  
02. // ...
03. "aaaaa" : { //一行
04. "A:foo" : { //一列
05. 15 : "y", //一个版本
06. 4 : "m"
07. },
08. "A:bar" : { //一列
09. 15 : "d",
10. },
11. "B:" : { //一列
12. 6 : "w"
13. 3 : "o"
14. 1 : "w"
15. }
16. },
17. // ...
18.}

查询时,如果只给出行列,那么返回的是最新版本的数据;如果给出了行列时间戳,那么返回的是时间小于或等于时间戳的数据。比如,我们查询"aaaaa"/"A:foo",返回的值是"y";查询"aaaaa"/"A:foo"/10,返回的结果就是"m";查询"aaaaa"/"A:foo"/2,返回的结果是空。

图1是Bigtable论文里给出的例子,Webtable表存储了大量的网页和相关信息。在Webtable,每一行存储一个网页,其反转的url作为行键,比如maps.google.com/index.html的数据存储在键为com.google.maps/index.html的行里,反转的原因是为了让同一个域名下的子域名网页能聚集在一起。图1中的列族"anchor"保存了该网页的引用站点(比如引用了CNN主页的站点),qualifier是引用站点的名称,而数据是链接文本;列族"contents"保存的是网页的内容,这个列族只有一个空列"contents:"。图1中"contents:"列下保存了网页的三个版本,我们可以用("com.cnn.www", "contents:", t5)来找到CNN主页在t5时刻的内容。

再来看看作者说的其它特征:稀疏,分布式,持久化。持久化的意思很简单,Bigtable的数据最终会以文件的形式放到GFS去。Bigtable建立在GFS之上本身就意味着分布式,当然分布式的意义还不仅限于此。稀疏的意思是,一个表里不同的行,列可能完完全全不一样。

3 支撑技术

Bigtable依赖于google的几项技术。用GFS来存储日志和数据文件;按SSTable文件格式存储数据;用Chubby管理元数据。

GFS参见谷歌技术"三宝"之谷歌文件系统。BigTable的数据和日志都是写入GFS的。

SSTable的全称是Sorted Strings Table,是一种不可修改的有序的键值映射,提供了查询、遍历等功能。每个SSTable由一系列的块(block)组成,Bigtable将块默认设为64KB。在SSTable的尾部存储着块索引,在访问SSTable时,整个索引会被读入内存。BigTable论文没有提到SSTable的具体结构,LevelDb日知录之四: SSTable文件这篇文章对LevelDb的SSTable格式进行了介绍,因为LevelDB的作者JeffreyDean正是BigTable的设计师,所以极具参考价值。每一个片(tablet)在GFS里都是按照SSTable的格式存储的,每个片可能对应多个SSTable。

Chubby是一种高可用的分布式锁服务,Chubby有五个活跃副本,同时只有一个主副本提供服务,副本之间用Paxos算法维持一致性,Chubby提供了一个命名空间(包括一些目录和文件),每个目录和文件就是一个锁,Chubby的客户端必须和Chubby保持会话,客户端的会话若过期则会丢失所有的锁。关于Chubby的详细信息可以看google的另一篇论文:The Chubby lock service for loosely-coupled distributed systems。Chubby用于片定位,片服务器的状态监控,访问控制列表存储等任务。

4 Bigtable集群

Bigtable集群包括三个主要部分:一个供客户端使用的库,一个主服务器(master server),许多片服务器(tablet server)。

正如数据模型小节所说,Bigtable会将表(table)进行分片,片(tablet)的大小维持在100-200MB范围,一旦超出范围就将分裂成更小的片,或者合并成更大的片。每个片服务器负责一定量的片,处理对其片的读写请求,以及片的分裂或合并。片服务器可以根据负载随时添加和删除。这里片服务器并不真实存储数据,而相当于一个连接Bigtable和GFS的代理,客户端的一些数据操作都通过片服务器代理间接访问GFS。

主服务器负责将片分配给片服务器,监控片服务器的添加和删除,平衡片服务器的负载,处理表和列族的创建等。注意,主服务器不存储任何片,不提供任何数据服务,也不提供片的定位信息。

客户端需要读写数据时,直接与片服务器联系。因为客户端并不需要从主服务器获取片的位置信息,所以大多数客户端从来不需要访问主服务器,主服务器的负载一般很轻。

5 片的定位

前面提到主服务器不提供片的位置信息,那么客户端是如何访问片的呢?来看看论文给的示意图,Bigtable使用一个类似B+树的数据结构存储片的位置信息。

首先是第一层,Chubby file。这一层是一个Chubby文件,它保存着root tablet的位置。这个Chubby文件属于Chubby服务的一部分,一旦Chubby不可用,就意味着丢失了root tablet的位置,整个Bigtable也就不可用了。

第二层是root tablet。root tablet其实是元数据表(METADATA table)的第一个分片,它保存着元数据表其它片的位置。root tablet很特别,为了保证树的深度不变,root tablet从不分裂。

第三层是其它的元数据片,它们和root tablet一起组成完整的元数据表。每个元数据片都包含了许多用户片的位置信息。

可以看出整个定位系统其实只是两部分,一个Chubby文件,一个元数据表。注意元数据表虽然特殊,但也仍然服从前文的数据模型,每个分片也都是由专门的片服务器负责,这就是不需要主服务器提供位置信息的原因。客户端会缓存片的位置信息,如果在缓存里找不到一个片的位置信息,就需要查找这个三层结构了,包括访问一次Chubby服务,访问两次片服务器。

6 片的存储和访问

片的数据最终还是写到GFS里的,片在GFS里的物理形态就是若干个SSTable文件。图5展示了读写操作基本情况。

当片服务器收到一个写请求,片服务器首先检查请求是否合法。如果合法,先将写请求提交到日志去,然后将数据写入内存中的memtable。memtable相当于SSTable的缓存,当memtable成长到一定规模会被冻结,Bigtable随之创建一个新的memtable,并且将冻结的memtable转换为SSTable格式写入GFS,这个操作称为minor compaction。

当片服务器收到一个读请求,同样要检查请求是否合法。如果合法,这个读操作会查看所有SSTable文件和memtable的合并视图,因为SSTable和memtable本身都是已排序的,所以合并相当快。

每一次minor compaction都会产生一个新的SSTable文件,SSTable文件太多读操作的效率就降低了,所以Bigtable定期执行merging compaction操作,将几个SSTable和memtable合并为一个新的SSTable。BigTable还有个更厉害的叫major compaction,它将所有SSTable合并为一个新的SSTable。

遗憾的是,BigTable作者没有介绍memtable和SSTable的详细数据结构。

7 BigTable和GFS的关系

集群包括主服务器和片服务器,主服务器负责将片分配给片服务器,而具体的数据服务则全权由片服务器负责。但是不要误以为片服务器真的存储了数据(除了内存中memtable的数据),数据的真实位置只有GFS才知道,主服务器将片分配给片服务器的意思应该是,片服务器获取了片的所有SSTable文件名,片服务器通过一些索引机制可以知道所需要的数据在哪个SSTable文件,然后从GFS中读取SSTable文件的数据,这个SSTable文件可能分布在好几台chunkserver上。

8 元数据表的结构

元数据表(METADATA table)是一张特殊的表,它被用于数据的定位以及一些元数据服务,不可谓不重要。但是Bigtable论文里只给出了少量线索,而对表的具体结构没有说明。这里我试图根据论文的一些线索,猜测一下表的结构。首先列出论文中的线索:

1.The METADATA table stores the location of a tablet under a row key that is an encoding of the tablet's table identifier and its end row.

2.Each METADATA row stores approximately 1KB of data in memory(因为访问量比较大,元数据表是放在内存里的,这个优化在论文的locality groups中提到).This feature(将locality group放到内存中的特性) is useful for small pieces of data that are accessed frequently: we use it internally for the location column family in the METADATA table.

3.We also store secondary information in the METADATA table, including a log of all events pertaining to each tablet(such as when a server begins
serving it).

第一条线索,元数据表的行键是由片所属表名的id和片最后一行编码而成,所以每个片在元数据表中占据一条记录(一行),而且行键既包含了其所属表的信息也包含了其所拥有的行的范围。譬如采取最简单的编码方式,元数据表的行键等于strcat(表名,片最后一行的行键)。

第二点线索,除了知道元数据表的地址部分是常驻内存以外,还可以发现元数据表有一个列族称为location,我们已经知道元数据表每一行代表一个片,那么为什么需要一个列族来存储地址呢?因为每个片都可能由多个SSTable文件组成,列族可以用来存储任意多个SSTable文件的位置。一个合理的假设就是每个SSTable文件的位置信息占据一列,列名为location:filename。当然不一定非得用列键存储完整文件名,更大的可能性是把SSTable文件名存在值里。获取了文件名就可以向GFS索要数据了。

第三个线索告诉我们元数据表不止存储位置信息,也就是说列族不止location,这些数据暂时不是咱们关心的。

通过以上信息,我画了一个简化的Bigtable结构图:

结构图以Webtable表为例,表中存储了网易、百度和豆瓣的几个网页。当我们想查找百度贴吧昨天的网页内容,可以向Bigtable发出查询Webtable表的(com.baidu.tieba, contents:, yesterday)。

假设客户端没有该缓存,那么Bigtable访问root tablet的片服务器,希望得到该网页所属的片的位置信息在哪个元数据片中。使用METADATA.Webtable.com.baidu.tieba为行键在root tablet中查找,定位到最后一个比它大的是METADATA.Webtable.com.baidu.www,于是确定需要的就是元数据表的片A。访问片A的片服务器,继续查找Webtable.com.baidu.tieba,定位到Webtable.com.baidu.www是比它大的,确定需要的是Webtable表的片B。访问片B的片服务器,获得数据。

这里需要注意的是,每个片实际都由若干SSTable文件和memtable组成,而且这些SSTable和memtable都是已排序的。这就导致查找片B时,可能需要将所有SSTable和memtable都查找一遍;另外客户端应该不会直接从元数据表获得SSTable的文件名,而只是获得片属于片服务器的信息,通过片服务器为代理访问SSTable。

   
4077 次浏览       1
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...