您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
MapReduce实例浅析
 
来源:open经验库 发布于:2015-1-20
  2105  次浏览      15
 

1.MapReduce概述

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。

Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

虽然Hadoop框架是用Java实现的,但Map/Reduce应用程序则不一定要用 Java来写 。

2.样例分析:单词计数

1、WordCount源码分析

单词计数是最简单也是最能体现MapReduce思想的程序之一,该程序完整的代码可以在Hadoop安装包的src/examples目录下找到

单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如图所示:

(1)Map过程

Map过程需要继承org.apache.hadoop.mapreduce包中的Mapper类,并重写map方法

通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中的value值存储的是文本文件中的一行(以回车符作为行结束标记),而key值为该行的首字符相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交由MapReduce框架处理。其中IntWritable和Text类是Hadoop对int和string类的封装,这些类能够被串行化,以方便在分布式环境中进行数据交换。

TokenizerMapper的实现代码如下:

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
       
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("key = " + key.toString());//添加查看key值
        System.out.println("value = " + value.toString());//添加查看value值
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

(2)Reduce过程

Reduce过程需要继承org.apache.hadoop.mapreduce包中的Reducer类,并重写reduce方法

reduce方法的输入参数key为单个单词,而values是由各Mapper上对应单词的计数值所组成的列表,所以只要遍历values并求和,即可得到某个单词的出现总次数

IntSumReduce类的实现代码如下:

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
 
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
          sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
   }
}

(3)执行MapReduce任务

在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程和使用的IntSumReduce完成Combine和Reduce过程。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输入和输出路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务,主函数实现如下:

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount  ");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(wordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

运行结果如下:

14/12/17 05:53:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=

14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2

14/12/17 05:53:26 INFO mapred.JobClient: Running job: job_local_0001

14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2

14/12/17 05:53:26 INFO mapred.MapTask: io.sort.mb = 100

14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720

14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680

key = 0

value = Hello World

key = 12

value = Bye World

14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output

14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0

14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0′ done.

14/12/17 05:53:27 INFO mapred.MapTask: io.sort.mb = 100

14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720

14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680

14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output

key = 0

value = Hello Hadoop

key = 13

value = Bye Hadoop

14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0

14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0′ done.

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.Merger: Merging 2 sorted segments

14/12/17 05:53:27 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 73 bytes

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now

14/12/17 05:53:27 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0′ to out

14/12/17 05:53:27 INFO mapred.LocalJobRunner: reduce > reduce

14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0′ done.

14/12/17 05:53:27 INFO mapred.JobClient: map 100% reduce 100%

14/12/17 05:53:27 INFO mapred.JobClient: Job complete: job_local_0001

14/12/17 05:53:27 INFO mapred.JobClient: Counters: 14

14/12/17 05:53:27 INFO mapred.JobClient: FileSystemCounters

14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_READ=17886

14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_READ=52932

14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54239

14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=71431

14/12/17 05:53:27 INFO mapred.JobClient: Map-Reduce Framework

14/12/17 05:53:27 INFO mapred.JobClient: Reduce input groups=4

14/12/17 05:53:27 INFO mapred.JobClient: Combine output records=6

14/12/17 05:53:27 INFO mapred.JobClient: Map input records=4

14/12/17 05:53:27 INFO mapred.JobClient: Reduce shuffle bytes=0

14/12/17 05:53:27 INFO mapred.JobClient: Reduce output records=4

14/12/17 05:53:27 INFO mapred.JobClient: Spilled Records=12

14/12/17 05:53:27 INFO mapred.JobClient: Map output bytes=78

14/12/17 05:53:27 INFO mapred.JobClient: Combine input records=8

14/12/17 05:53:27 INFO mapred.JobClient: Map output records=8

14/12/17 05:53:27 INFO mapred.JobClient: Reduce input records=6

2、WordCount处理过程

上面给出了WordCount的设计思路和源码,但是没有深入细节,下面对WordCount进行更加详细的分析:

(1)将文件拆分成splits,由于测试用的文件较小,所以每一个文件为一个split,并将文件按行分割成<key, value>对,如图,这一步由Mapreduce框架自动完成,其中偏移量包括了回车所占的字符

(2)将分割好的<key, value>对交给用户定义的map方法进行处理,生成新的<key, value>对

(3)得到map方法输出的<key, value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果,如图:

(4)Reduce先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key, value>对,并作为WordCount的输出结果,如图:

3.MapReduce,你够了解吗?

MapReduce框架在幕后默默地完成了很多的事情,如果不重写map和reduce方法,会出现什么情况呢?

下面来实现一个简化的MapReduce,新建一个LazyMapReduce,该类只对任务进行必要的初始化及输入/输出路径的设置,其余的参数均保持默认

代码如下:

public class LazyMapReduce {
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage:wordcount");
            System.exit(2);
        }
        Job job = new Job(conf, "LazyMapReduce");
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)? 0:1);
    }
}

运行结果为:

14/12/17 23:04:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=

14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2

14/12/17 23:04:14 INFO mapred.JobClient: Running job: job_local_0001

14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2

14/12/17 23:04:14 INFO mapred.MapTask: io.sort.mb = 100

14/12/17 23:04:15 INFO mapred.JobClient: map 0% reduce 0%

14/12/17 23:04:18 INFO mapred.MapTask: data buffer = 79691776/99614720

14/12/17 23:04:18 INFO mapred.MapTask: record buffer = 262144/327680

14/12/17 23:04:18 INFO mapred.MapTask: Starting flush of map output

14/12/17 23:04:19 INFO mapred.MapTask: Finished spill 0

14/12/17 23:04:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

14/12/17 23:04:19 INFO mapred.LocalJobRunner: 

14/12/17 23:04:19 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0′ done.

14/12/17 23:04:20 INFO mapred.MapTask: io.sort.mb = 100

14/12/17 23:04:20 INFO mapred.MapTask: data buffer = 79691776/99614720

14/12/17 23:04:20 INFO mapred.MapTask: record buffer = 262144/327680

14/12/17 23:04:20 INFO mapred.MapTask: Starting flush of map output

14/12/17 23:04:20 INFO mapred.MapTask: Finished spill 0

14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting

14/12/17 23:04:20 INFO mapred.LocalJobRunner: 

14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000001_0′ done.

14/12/17 23:04:20 INFO mapred.LocalJobRunner: 

14/12/17 23:04:20 INFO mapred.Merger: Merging 2 sorted segments

14/12/17 23:04:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 90 bytes

14/12/17 23:04:20 INFO mapred.LocalJobRunner: 

14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

14/12/17 23:04:20 INFO mapred.LocalJobRunner: 

14/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now

14/12/17 23:04:20 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0′ to out

14/12/17 23:04:20 INFO mapred.LocalJobRunner: reduce > reduce

14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0′ done.

14/12/17 23:04:20 INFO mapred.JobClient: map 100% reduce 100%

14/12/17 23:04:20 INFO mapred.JobClient: Job complete: job_local_0001

14/12/17 23:04:20 INFO mapred.JobClient: Counters: 14

14/12/17 23:04:20 INFO mapred.JobClient: FileSystemCounters

14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_READ=46040

14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_READ=51471

14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=52808

14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=98132

14/12/17 23:04:20 INFO mapred.JobClient: Map-Reduce Framework

14/12/17 23:04:20 INFO mapred.JobClient: Reduce input groups=3

14/12/17 23:04:20 INFO mapred.JobClient: Combine output records=0

14/12/17 23:04:20 INFO mapred.JobClient: Map input records=4

14/12/17 23:04:20 INFO mapred.JobClient: Reduce shuffle bytes=0

14/12/17 23:04:20 INFO mapred.JobClient: Reduce output records=4

14/12/17 23:04:20 INFO mapred.JobClient: Spilled Records=8

14/12/17 23:04:20 INFO mapred.JobClient: Map output bytes=78

14/12/17 23:04:20 INFO mapred.JobClient: Combine input records=0

14/12/17 23:04:20 INFO mapred.JobClient: Map output records=4

14/12/17 23:04:20 INFO mapred.JobClient: Reduce input records=4

可见在默认情况下,MapReduce原封不动地将输入<key, value>写到输出

下面介绍MapReduce的部分参数及其默认设置:

(1)InputFormat类

该类的作用是将输入的数据分割成一个个的split,并将split进一步拆分成<key, value>对作为map函数的输入

(2)Mapper类

实现map函数,根据输入的<key, value>对生产中间结果

(3)Combiner

实现combine函数,合并中间结果中具有相同key值的键值对。

(4)Partitioner类

实现getPartition函数,用于在Shuffle过程按照key值将中间数据分成R份,每一份由一个Reduce负责

(5)Reducer类

实现reduce函数,将中间结果合并,得到最终的结果

(6)OutputFormat类

该类负责输出最终的结果

上面的代码可以改写为:

public class LazyMapReduce {
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 2) {
            System.err.println("Usage:wordcount");
            System.exit(2);
        }
        Job job = new Job(conf, "LazyMapReduce");
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
         
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setReducerClass(Reducer.class);
         
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(FileOutputFormat.class);
         
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)? 0:1);
    }
}

不过由于版本问题,显示有些类已经过时

   
2105 次浏览       15
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划
信息架构建模(基于UML+EA)3-21[北京]
软件架构设计师 3-21[北京]
图数据库与知识图谱 3-25[北京]
业务架构设计 4-11[北京]
SysML和EA系统设计与建模 4-22[北京]
DoDAF规范、模型与实例 5-23[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...