您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iProcess 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
用python + hadoop streaming编写分布式程序
 
来源:CSDN 发布于 2015-12-11
5303 次浏览     评价:      
 

MapReduce与HDFS简介

什么是Hadoop?

Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS 、 MapReduce)。 Doug Cutting和Mike Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop。

MapReduce的Data flow如下图,原始数据经过mapper处理,再进行partition和sort,到达reducer,输出最后结果。

图片来自Hadoop: The Definitive Guide

Hadoop Streaming原理

Hadoop本身是用Java开发的,程序也需要用Java编写,但是通过Hadoop Streaming,我们可以使用任意语言来编写程序,让Hadoop运行。

Hadoop Streaming的相关源代码可以在Hadoop的Github repo 查看。简单来说,就是通过将用其他语言编写的mapper和reducer通过参数传给一个事先写好的Java程序(Hadoop自带的*-streaming.jar),这个Java程序会负责创建MR作业,另开一个进程来运行mapper,将得到的输入通过stdin传给它,再将mapper处理后输出到stdout的数据交给Hadoop,partition和sort之后,再另开进程运行reducer,同样地通过stdin/stdout得到最终结果。因此,我们只需要在其他语言编写的程序里,通过stdin接收数据,再将处理过的数据输出到stdout,Hadoop streaming就能通过这个Java的wrapper帮我们解决中间繁琐的步骤,运行分布式程序。

图片来自Hadoop: The Definitive Guide

原理上只要是能够处理stdio的语言都能用来写mapper和reducer,也可以指定mapper或reducer为Linux下的程序(如awk、grep、cat)或者按照一定格式写好的java class。因此,mapper和reducer也不必是同一类的程序。

Hadoop Streaming的优缺点

优点

可以使用自己喜欢的语言来编写MapReduce程序(换句话说,不必写Java XD)

不需要像写Java的MR程序那样import一大堆库,在代码里做一大堆配置,很多东西都抽象到了stdio上,代码量显著减少

因为没有库的依赖,调试方便,并且可以脱离Hadoop先在本地用管道模拟调试

缺点

只能通过命令行参数来控制MapReduce框架,不像Java的程序那样可以在代码里使用API,控制力比较弱,有些东西鞭长莫及

因为中间隔着一层处理,效率会比较慢

所以Hadoop Streaming比较适合做一些简单的任务,比如用python写只有一两百行的脚本。如果项目比较复杂,或者需要进行比较细致的优化,使用Streaming就容易出现一些束手束脚的地方。

用python编写简单的Hadoop Streaming程序

这里提供两个例子:

1.Michael Noll的word count程序

2.Hadoop: The Definitive Guide里的例程

使用python编写Hadoop Streaming程序有几点需要注意:

1.在能使用iterator的情况下,尽量使用iterator,避免将stdin的输入大量储存在内存里,否则会严重降低性能

2.streaming不会帮你分割key和value传进来,传进来的只是一个个字符串而已,需要你自己在代码里手动调用split()

3.从stdin得到的每一行数据末尾似乎会有\n,保险起见一般都需要使用rstrip()来去掉

4.在想获得K-V list而不是一个个处理key-value pair时,可以使用groupby配合itemgetter将key相同的k-v pair组成一个个group,得到类似Java编写的reduce可以直接获取一个Text类型的key和一个iterable作为value的效果。注意itemgetter的效率比lambda表达式要高,所以如果需求不是很复杂的话,尽量用itemgetter比较好。

我在编写Hadoop Streaming程序时的基本模版是

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Some description here...
"""

import sys
from operator import itemgetter
from itertools import groupby
def read_input(file):
"""Read input and split."""
for line in file:
yield line.rstrip().split('\t')
def main():
data = read_input(sys.stdin)
for key, kviter in groupby(data, itemgetter(0)):
# some code here..
if __name__ == "__main__":
main()

如果对输入输出格式有不同于默认的控制,主要会在read_input()里调整。

本地调试

本地调试用于Hadoop Streaming的python程序的基本模式是:

$ cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>

或者如果不想用多余的cat,也可以用<定向

$ python <path to mapper script> < <input path> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>

这里有几点需要注意:

1.Hadoop默认按照tab来分割key和value,以第一个分割出的部分为key,按key进行排序,因此这里使用

sort -t $'\t' -k1,1

来模拟。如果你有其他需求,在交给Hadoop Streaming执行时可以通过命令行参数调,本地调试也可以进行相应的调整,主要是调整sort的参数。因此为了能够熟练进行本地调试,建议先掌握sort命令的用法。

2.如果你在python脚本里加上了shebang,并且为它们添加了执行权限,也可以用类似于

./mapper.py

来代替

python mapper.py

二、在集群上运行与监控

为了方便,这篇文章里的例子均为伪分布式运行,一般来说只要集群配置得当,在伪分布式下能够运行的程序,在真实集群上也不会有什么问题。

为了更好地模拟集群环境,我们可以在mapred-site.xml中增设reducer和mapper的最大数目(默认为2,实际可用数目大约是CPU核数-1)。

假设你为Hadoop安装路径添加的环境变量叫$HADOOP_HOME(如果是$HADOOP_PREFIX,下文看到的命令对应改改就行)

$ vi $HADOOP_HOME/conf/mapred-site.xml

假设这台机子的CPU是4核,那么可以添加下面这几行

<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>3</value>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>3</value>
</property>

这里修改了以后只是增加了slot的数量,如果你在执行MR作业时没有指明需要多少mapper或reducer,不一定会用到这么多,Hadoop会自行分配。

查看文档

首先需要知道用于streaming的java程序在哪里。在1.0.x的版本中,应该都在$HADOOP_HOME/contrib/streaming/下。比如1.0.4的就在

$HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar

里。 首先我们可以先看看这个java程序自带的文档。以下以1.0.4版本为例,执行

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar -info

就会看到一系列自带的帮助,带有各种参数的说明和一些使用样例。

运行命令

用Hadoop Streaming执行python程序的一般步骤是:

1.将输入文件放到HDFS上,建议使用copyFromLocal而不是put命令,参见Difference between hadoop fs -put and hadoop fs -copyFromLocal

1.一般可以新建一个文件夹用于存放输入文件,假设叫input

$ hadoop fs -mkdir input

然后用

$ hadoop fs -ls

查看目录,可以看到出现了一个/user/hadoop/input文件夹。/user/hadoop是默认的用户文件夹,相当于本地文件系统中的/home/hadoop。

2.再使用

$ hadoop fs -copyFromLocal <PATH TO LOCAL FILE(S)> input/

将本地文件放到input文件夹下。copyFromLocal命令的用法类似于Linux的cp命令,支持使用wildcard。如果出现了预期外的执行结果,可以试试看在使用了wildcard的路径外加上引号。

2.开始MR作业,以1.0.4版本为例,假设你现在正在放有mapper和reducer两个脚本的目录下,而且它们刚好就叫mapper.py和reducer.py,在不需要做其他配置的情况下,执行

$hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
-mapper mapper.py -file mapper.py\
-reducer reducer.py -file reducer.py \
-input input/* -output output

第一行是告诉Hadoop运行streaming的java程序,接下来的是参数:

这里的mapper.py和reducer.py是mapper所在python程序的路径。为了让Hadoop将程序分发给其他机器,需要再加一个-file参数用于指明要分发的程序放在哪里。

注意这样写的前提是这个python程序里有shebang而且添加了执行权限。如果没有的话,可以改成

-mapper 'python mapper.py'

加上解释器命令,用引号括住。因为准确来说,mapper后面跟的其实应该是一个命令而不是一个文件名。

假如你执行的程序不放在当前目录下,比如说在当前目录的src文件夹下,可以这样写

-mapper mapper.py   -file src/mapper.py\
-reducer reducer.py -file src/reducer.py \

也就是说,-mapper和-reducer后面跟的文件名不需要带上路径,而-file后的参数则需要。注意如果你在mapper后的命令用了引号,加上路径名反而会报错说找不到这个程序。

-input和-output后面跟的是HDFS上的路径名,同样支持wildcard,这里的input/*指的就是“input文件夹下的所有文件”。注意-output后面跟着的需要是一个不存在于HDFS上的路径,在产生输出的时候hadoop会帮你创建这个文件夹,如果已经存在的话就会产生冲突。

有时候shebang不一定能用,尤其是在执行环境比较复杂的时候。最保险的写法可能是:

这里有点要特别说明的地方$hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
-mapper 'python mapper.py' -file mapper.py\
-reducer 'python reducer.py' -file reducer.py \
-input input/* -output output

这样写还有一个好处,就是可以在引号里写上提供给python程序的命令行参数,以后的教程会提到怎么用。

由于mapper和reducer参数跟的实际上是命令,所以如果每台机器上python的环境配置不一样的话,会用每台机器自己的配置去执行python程序。

运行过程

写完命令回车,顺利的话会开始执行程序。 这里不赘述执行时输出到终端的内容,可以去这里看看正常运行的时候会给些什么。

利用WebUI监控集群运行情况

一般来说要检查运行状况,都是去jobtracker的webUI。如果在master上,用浏览器访问http://localhost:50030即可(如果你在配置hadoop的时候修改了mapred-site.xml的mapred.job.tracker.http.address,请访问对应的其他地址)

在webUI里你可以看到running jobs, completed jobs和retired jobs。点击Jobid下的超链接,可以看到对应job的执行状况。进去后如果看到Failed/Killed Task Attempts下非空,你可以点进对应的超链接,找到对应的log去进行debug。

得到结果

成功执行完这个任务之后,你用output参数在HDFS上指定的输出文件夹里就会多出几个文件

一个空白文件_SUCCESS,表明job运行成功,这个文件可以让其他程序只要查看一下HDFS就能判断这次job是否成功运行,从而进行相关处理。

一个_logs文件夹,顾名思义里面放着任务日志

part-00000, .... part-xxxxx文件,有多少个reducer后面的数字就会有多大,对应每个reducer的输出结果。

假如你的输出很少,比如是一个只有几行的计数,你可以用

$ hadoop fs -cat <PATH ON HDFS>

直接将输出打印到终端查看。

假如你的输出很多,则需要拷贝到本地文件系统来查看。可以使用copyToLocal来获取整个文件夹(与copyFromLocal一样,它与get的区别在于会限制目标文件夹一定在本地文件系统上)。如果你不需要_SUCCESS 和_logs,并且想要将所有reducer的输出合并,可以使用getmerge命令。

比如在上面的例子里,可以用命令

$ hadoop fs -copyToLocal output ./

将output文件夹复制到本地文件系统的当前目录下,或者用

$ hadoop fs -getmerge output ./

将output下的part-xxxxx合并,放到当前目录的一个叫output的文件里。

如何串联多趟MR

如果你有多次任务要执行,下一步需要用上一步的任务做输入,解决办法其实很简单。假设上一步在HDFS的输出文件夹是output1,那么在下一步的运行命令中,指明

-input output1/part-*

即指定上一次的所有输出为本次任务的输入即可。注意这里假设你不需要对上一步的输出做额外处理。

其他

这篇文章只提到了最简单的执行Hadoop streaming程序的方法。涉及到一些其他需求,比如需要有多个输入文件等情况,还需要进一步调整运行命令,会在以后的文章里讲到。

三、 自定义功能

使用额外的文件

假如你跑的job除了输入以外还需要一些额外的文件(side data),有两种选择:

1.大文件

所谓的大文件就是大小大于设置的local.cache.size的文件,默认是10GB。这个时候可以用-file来分发。除此之外代码本身也可以用file来分发。

格式:假如我要加多一个sideData.txt给python脚本用:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
-input iputDir \
-output outputDir \
-mapper mapper.py \
-file mapper.py \
-reducer reduer.py \
-file reducer.py \
-file sideDate.txt

在python脚本里,只要把这个文件当成自己同一目录下的本地文件来打开就可以了。比如:

f = open("sideData.txt")

注意这个file是只读的,不可以写。

2.小文件

如果是比较小的文件,想要提高读写速度可以将它放在distributed cache里(也就是每台机器都有自己的一份copy,不需要网络IO就可以拿到数据)。这里要用到的参数是-cachefile,写法和用法上一个一样,就是-file改成-cachefile而已。

控制partitioner

partitioning指的是数据经过mapper处理后,被分发到reducer上的过程。partitioner控制的,就是“怎样的mapper输出会被分发到哪一个reducer上”。

Hadoop有几个自带的partitioner,解释可以看这里。默认的是HashPartitioner,也就是把第一个tab前的key做hash之后用于分配partition。写Hadoop Streaming程序是可以选择其他partitioner的,你可以选择自带的其他几种里的一种,也可以自己写一个继承Partitioner的java类然后编译成jar,在运行参数里指定为你用的partitioner。

官方自带的partitioner里最常用的是KeyFieldBasedPartitioner(源代码可以看这里)。它会按照key的一部分来做partition,而不是用整个key来做partition。

在学会用KeyFieldBasedPartitioner之前,必然要先学怎么控制key-value的分割。分割key的步骤可以分为两步,用python来描述一下大约是

fields = output.split(seperator)
key = fields[:numKeyfields]

1.选择用什么符号来分割key,也就是选择seperator

map.output.key.field.separator可以指定用于分隔key的符号。比如指定为一点的话,就要加上参数

-D stream.map.output.field.separator=.

假设你的mapper输出是

11.22.33.44

这时会先看准[11, 22, 33, 44]这里的其中一个或几个作为key

2.选择key的范围,也就是选择numKeyfields

控制key的范围的参数是这个,假设我要设置被分割出的前2个元素为key:

-D stream.num.map.output.key.fields=2

那么key就是上面的 1122。值得注意的是假如这个数字设置到覆盖整个输出,在这个例子里是4的话,那么整一行都会变成key。

上面分割出key之后, KeyFieldBasedPartitioner还需要知道你想要用key里的哪部分作为partition的依据。它进行配置的过程可以看源代码来理解。

假设在上一步我们通过使用

-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \

将11.22.33.44的整个字符串都设置成了key,下一步就是在这个key的内部再进行一次分割。map.output.key.field.separator可以用来设置第二次分割用的分割符,mapred.text.key.partitioner.options可以接受参数来划分被分割出来的partition key,比如:

-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2 \

指的就是在key的内部里,将第1到第2个被点分割的元素作为partition key,这个例子里也就是1122。这里的值-ki,j表示从i到j个元素(inclusive)会作为partition key。如果终点省略不写,像-ki的话,那么i和i之后的元素都会作为partition key。

partition key相同的输出会保证分到同一个reducer上,也就是所有11.22.xx.xx的输出都会到同一个partitioner,11.22换成其他各种组合也是一样。

实例说明一下,就是这样的:

1.mapper的输出是

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

2.指定前4个元素做key,key里的前两个元素做partition key,分成3个partition的话,就会被分成

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

3.下一步reducer会对自己得到的每个partition内进行排序,结果就是

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3

命令格式大约就是长这样

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2 \
-input inputDir \
-output outputDir \
-mapper mapper.py -file mapper.py \
-reducer reducer.py -file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

注意-D参数放在前面,指定用KeyFieldBasedPartitioner的-partitioner要放在下面。

控制comparator与自定义排序

上面说到mapper的输出被partition到各个reducer之后,会有一步排序。这个排序的标准也是可以通过设置comparator控制的。和上面一样,要先设置分割出key用的分隔符、key的范围,key内部分割用的分隔符

-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=. \

这里要控制的就是key内部的哪些元素用来做排序依据,是排字典序还是数字序,倒序还是正序。用来控制的参数是mapred.text.key.comparator.options,接受的值格式类似于unix sort。比如我要按第二个元素的数字序(默认字典序)+倒序来排元素的话,就用-D mapred.text.key.comparator.options=-k2,2nr

n表示数字序,r表示倒序。这样一来

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

就会被排成

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1
   
5303 次浏览  评价: 差  订阅 捐助
相关文章

我们该如何设计数据库
数据库设计经验谈
数据库设计过程
数据库编程总结
 
相关文档

数据库性能调优技巧
数据库性能调整
数据库性能优化讲座
数据库系统性能调优系列
相关课程

高性能数据库设计与优化
高级数据库架构师
数据仓库和数据挖掘技术
Hadoop原理、部署与性能调优
 

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   

并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理

GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...