|   不同场景的模式和示例 
                        MapReduce 处理为处理和构建不同类型的查询创建了一整套新范例和结构。然而,要最充分地利用 
                          Hadoop,意味着要编写合适的 MapReduce 查询来处理信息。本文介绍许多不同的场景,其中包含如何开发不同类型的查询的食谱式示例。 
                        高级文本处理 
                        处理文本是 MapReduce 流程的一种常见用法,因为文本处理相对复杂且是处理器资源密集的处理。基本的字数统计常常用于演示 
                          Haddoop 处理大量文本和基本汇总大体内容的能力。 
                        要获得字数,将文本从一个输入文件中拆分(使用一个基本的 string tokenizer)为各个包含计数的单词,并使用一个 
                          Reduce 来计算每个单词的数量。例如,从短语 the quick brown fox jumps over 
                          the lazy dog 中,Map 阶段生成清单 1 中的输出。 
                        清单 1. Map 阶段的输出 
                        
                           
                             the, 1 quick, 1 brown, 1 fox, 1 jumps, 1 over, 1 the, 1 lazy, 1 dog, 1  | 
                           
                         
                        Reduce 阶段然后合计每个惟一的单词出现的次数,得到清单 2 中所示的输出。 
                        清单 2. Reduce 阶段的输出 
                        
                           
                             the, 2 quick, 1 brown, 1 fox, 1 jumps, 1 over, 1 lazy, 1 dog, 1  | 
                           
                         
                        尽管此方法适用于基本的字数统计,但您常常希望识别重要的短语或单词的出现。例如,获取 Amazon 上对不同影片和视频的评论。 
                        使用来自 Stanford University 大数据项目的信息,您可以下载影片评论数据(参见 参考资料)。该数据包含(Amazon 
                          上报告的)原始评论的评分和有用性,如清单 3 中所示。 
                        清单 3. 下载影片评论数据 
                        
                           
                             product/productId: B003AI2VGA review/userId: A3QYDL5CDNYN66 review/profileName: abra "a devoted reader" review/helpfulness: 0/0 review/score: 2.0 review/time: 1229040000 review/summary: Pretty pointless fictionalization review/text: The murders in Juarez are real. This movie is a badly acted  fantasy of revenge and holy intercession. If there is a good movie about  Juarez, I don't know what it is, but it is not this one.  | 
                           
                         
                        请注意,尽管评论者给影片打了 2 分(1 为最差,5 为最好),但评论内容将此影片描述为一部非常差的影片。我们需要一个置信度评分,以便能够了解所给的评分与实际的评论是否彼此匹配。 
                        许多工具可用于执行高级启发式分析,但基本的处理可使用一个简单的索引或正则表达式来实现。然后,我们可统计正面和负面正则表达式匹配数来获得一部影片的分数。 
                        图 1. 统计正面和负面正则表达式匹配数来获得一部影片的分数 
                          
                        该图显示了如何从原始数据的单词分数来获得影片分数 
                        对于 Map 部分,统计影片评论中各个单词或短语的数量,为正面和负面评价提供单个计数。Map 操作从产品评论中统计影片的分数,Reduce 
                          操作然后按产品 ID 汇总这些分数,以提供正面或负面的评分。因此 Map 类似于清单 4。 
                        清单 4. 为正面和负面评论提供单个计数的 Map 函数  
                        
                           
                             // List of positive words/phrases  static String[] pwords = {"good","excellent","brilliant movie"}; // List of negative words/phrases static String[] nwords = {"poor","bad","unwatchable"};  
                               
                                int count = 0; 
                                for (String word : pwords) { 
                                String REGEX = "\\b" + word + "\\b"; 
                                Pattern p = Pattern.compile(REGEX); 
                                Matcher m = p.matcher(INPUT); 
                                while(m.find()) { 
                                count++; 
                                 
                                }  
                                for (String word : nwords) {  
                                String REGEX = "\\b" + word + "\\b"; 
                                 
                                Pattern p = Pattern.compile(REGEX);  
                                Matcher m = p.matcher(INPUT);  
                                while(m.find()) {  
                                count--;  
                                }  
                                } 
                              output.collect(productId, count);  | 
                           
                          
                        Reduce 然后可像传统的内容求和那样计算。 
                        清单 5. 按产品 ID 对正面和负面评论求和的 Reduce 函数  
                        
                           
                             public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
                               public void reduce(Text key,  
                                Iterable<IntWritable> values, Context context) 
                                 
                                throws IOException, InterruptedException { 
                                int sum = 0; 
                                for (IntWritable val : values) { 
                                sum += val.get(); 
                                } 
                                context.write(key, new IntWritable(sum)); 
                                } 
                                }  | 
                           
                          
                        结果是评论的置信度分数。可以扩展单词列表来包含您想要匹配的短语。 
                        读取和写入 JSON 数据 
                        JSON 已成为一种实用的数据交换格式。它的实用性一定程度上源于它的简单性质和结构,以及在如此多的语言和环境中解析的轻松性。 
                        在解析传入的 JSON 数据时,最常见的格式是每个符号输入行一条 JSON 记录。 
                        清单 6. 每个符号输入行一条 JSON 记录  
                        
                           
                             { "productId" : "B003AI2VGA", "score": 2.0, "text" : """} { "productId" : "B007BI4DAT", "score": 3.4, "text" : """} { "productId" : "B006AI2FDH", "score": 4.1, "text" : """} | 
                           
                          
                        此代码可通过使用合适的类(比如 GSON)将传入的字符串转换为 JSON 对象来轻松解析。将此方法用于 
                          GSON 时,您将需要去序列化到一个预先确定的类中。 
                        清单 7. 去序列化到一个预先确定的类中 
                        
                           
                             class amazonRank {  private String productId;  private float score;  private String text;  amazonRank() {  } } | 
                           
                         
                        解析传入的文本,如下所示。 
                        清单 8. 解析传入的文本 
                        
                           
                              public void map(Object key, Text value, Context context) throws IOException, InterruptedException { try { 
                               amazonRank rank = gson.fromJson(value.toString(),amazonRank.class); 
                                ...  | 
                           
                         
                        要写入 JSON 数据,可执行相反的操作。创建您想要与 MapReduce 定义内的 JSON 输出匹配的输出类,然后使用 
                          GSON 类将此转换为此结构的一种 JSON 表示。 
                        清单 9. 写入 JSON 数据 
                        
                           
                             class recipeRecord {  private String recipe;  private String recipetext;  private int recipeid;  private float calories;  private float fat;  private float weight;  recipeRecord() {  } } | 
                           
                         
                        现在您可在输出期间填充对象的一个实例,将它转换为单条 JSON 记录。 
                        清单 10. 在输出期间填充对象的一个实例  
                        
                           
                             recipeNutrition recipe = new recipeRecord();  recipe.recipeid = key.toString(); recipe.calories = sum; 
                              Gson json = new Gson(); 
                                output.collect(key, new Text(json.toJson(recipe)));  | 
                           
                          
                        如果您要在 Hadoop 处理作业中使用一个第三方库,请确保将库 JAR 文件与 MapReduce 
                          代码包含在一起:$ jar -cvf recipenutrition.jar -C recipenutrition/* 
                          google-gson/gson.jar。 
                        尽管在 Hadoop MapReduce 处理器之外,但另一种替代方案是使用 Jaql,它将直接解析并处理 
                          JSON 数据。 
                        合并数据集 
                        一个 MapReduce 作业中通常执行 3 种类型的合并: 
                        组合多个具有相同结构的文件的内容。 
                        组合多个您想要组合的具有类似结构的文件的内容。 
                        联接来自多个来源的与一个特定 ID 或关键词相关的数据。 
                        第一个选项最好,在典型的 MapReduce 作业外部处理,因为它可使用 Hadoop Distributed 
                          File System (HDFS) getmerge 操作或某个类似操作完成。此操作接受单个目录作为内容并输出到一个指定文件。例如,$ 
                          hadoop fs -getmerge srcfiles megafile 将 srcfiles 目录中的所有文件合并到一个文件中:megafile。 
                        合并类似文件 
                        要合并类似但不等同的文件,主要问题在于如何识别输入时使用的格式以及如何指定输出的格式。例如,给定文件 
                          name, phone, count 和第二个文件 name, email, phone, count,您要负责确定哪个文件是正确的并执行 
                          Map 来生成所需的结构。对于更复杂的记录,您可能需要在 Map 阶段对包含和不包含空值的字段执行更复杂的合并,以生成信息。 
                        事实上,Hadoop 不是此过程的理想选择,除非您还将它作为简化、统计或化简信息的一个机会。也就是说,您识别传入记录的数量,有哪些可能的格式,并在您想要选择的字段上执行 
                          Reduce。 
                        联接 
                        尽管有一些潜在的解决方案来执行联接,但它们常常依赖于以一种结构化方式处理信息,然后使用此结构确定对输出信息做什么。 
                        举例而言,给定两条不同的信息线索(比如电子邮件地址、发送的电子邮件数量,以及接收的电子邮件地址数量),目的在于将数据合并到一种输出格式中。这是输入文件:email, 
                          sent-count 和 email, received-count。输出应为此格式:email, sent-count, 
                          received-count。 
                        处理传入的文件并以不同方式输出内容,以便可以不同方式访问和生成文件和数据。然后依靠 Reduce 函数来执行化简。在大多数情况中,这将是一个多阶段过程: 
                        一个阶段处理 “已发送的” 电子邮件,以 email, fake#sent 形式输出信息 
                        注意:我们使用伪前缀来调整顺序,以便数据可按伪前缀来核对,而不按收到的前缀来核对。此做法允许数据按虚假、暗含的顺序联接。 
                        一个阶段处理 “已发送的” 电子邮件,以 email, received 形式输出信息。 
                        在 Map 函数读取文件时,它生成一些行。 
                        清单 11. 生成行  
                        
                           
                             dev@null.org,0#sent dev@null.org, received  | 
                           
                          
                        Map 识别输入记录并输出一个带一个键的统一版本。输出并生成 sent#received 结构来处理内容,确定该值应合并在一起还是汇总为一个单纯收到的值。 
                        清单 12. 输出一个带一个键的统一版本 
                        
                           
                             int sent = 0;  int received = 0;  for (Text val : values) {      String strVal = val.toString();      buf.append(strVal).append(",");      if (strVal.contains("#")) {           String[] tokens = strVal.split("#");  // If the content contains a hash, assume it's sent and received         int recvthis = Integer.parseInt(tokens[0]);          int sentthis = Integer.parseInt(tokens[1]);          received = received + Integer.parseInt(recvthis);         sent = sent _ sentthis;      } else {  // Otherwise, it's just the received value         received = received + Integer.parseInt(strVal);      }  }  context.write(key, IntWritable(sendReplyCount), new IntWritable(receiveReplyCount));  | 
                           
                         
                        在此情况下,我们依赖于 Hadoop 本身内的化简来按该键简化输出数据(在此情况下,该键为电子邮件地址),简化为我们需要的信息。因为该信息是以电子邮件为键,所以记录可以电子邮件为键来轻松地合并。 
                        使用键的技巧 
                        请记住,MapReduce 过程的一些方面可为我们所用。在本质上,MapReduce 是一个两阶段过程: 
                        Map 阶段访问数据,挑选您需要的信息,然后输出该信息,使用一个键和关联的信息。 
                        Reduce 阶段使用通用的键将映射的数据合并、汇总或统计为一种更简单的形式,从而简化数据。 
                        键是一个重要的概念,因为它可用于以不同方式格式化和汇总数据。例如,如果您计划化简有关国家和城市人口的数据,可以仅输出一个键来按国家化简或汇总数据。 
                        清单 13. 仅输出一个键 
                        
                        要按国家和城市汇总,键是二者的复合版本。 
                        清单 14. 键是国家和城市的复合版本 
                        
                           
                             France#Paris France#Lyon France#Grenoble United Kingdom#Birmingham United Kingdom#London  | 
                           
                         
                        这是一个基本的技巧,可在处理某些类型的数据时为我们所用(例如具有一个共同键的材料),因为我们可使用它模拟伪联接。此技巧在组合博客文章(拥有一个 
                          blogpostid 以便于识别)和博客评论(拥有一个 blogpostid 和 blogcommentid)时也很有用。 
                        要化简输出(例如统计博客和评论中的字数),我们首先通过 Map 处理博客文章和博客评论,但我们输出一个通用的 
                          ID。 
                        清单 15. 化简输出 
                        
                           
                             blogpostid,the,quick,brown,fox blogpostid#blogcommentid,jumps,over,the,lazy,dog  | 
                           
                         
                        这会明显地使用两个键,将信息输出为两个不同的信息行。我们也可反转这一关系。我们可通过向每个单词添加评论 
                          ID,从评论中针对 blogpostid 来识别单词。 
                        清单 16. 反转关系 
                        
                           
                             blogpostid,the,quick,brown,fox,jumps#blogcommentid,over#blogcommentid, the#blogcommentid,lazy#blogcommentid,dog#blogcommentid  | 
                           
                          
                        在处理期间,我们可通过查看 ID 而获知该单词是否附加到博客文章,以及它是否按该格式附加到博客文章或评论。 
                        模拟传统的数据库操作 
                        Hadoop 在真正意义上不是一个真正的数据库,这一定程度上是因为我们无法逐行执行更新、删除或插入。尽管这在许多情况下不是问题(您可对要处理的活动数据执行转储和加载),但有时您不希望导出并重新加载数据。 
                        一种避免导出并重新加载数据的技巧是,创建一个变更文件,其中包含来自原始转储文件的一个差异列表。现在我们暂时忽略从 
                          SQL 或其他数据库生成这些数据的过程。只要数据有一个惟一 ID,我们就可将它用作键,就可利用该键。下面来看一个类似于清单 
                          17 的源文件。 
                        清单 17. 源文件 
                        
                           
                             1,London 2,Paris, 3,New York  | 
                           
                         
                        假设有一个类似于清单 18 的变更文件。 
                        清单 18. 变更文件 
                        
                           
                             1,DELETE 2,UPDATE,Munich 4,INSERT,Tokyo  | 
                           
                         
                        最终得出两个文件经过解析的合并结果,如清单 19 所示。 
                        清单 19. 源文件和变更文件的合并 
                        
                           
                             2,Munich 3,New York 4,Tokyo  | 
                           
                         
                        我们如何通过 Hadoop 实现这样一种合并? 
                        使用 Hadoop 实现此合并的一种方式是,处理当前数据并将它转换为插入数据(因为它们都是插入目标文件中的新数据),然后将 
                          UDPATE 操作转换为新数据的 DELETE 和 INSERT 操作。事实上,使用变更文件,通过将它修改为清单 
                          20 中的内容更容易实现此目的。 
                        清单 20. 通过 Hadoop 实现合并 
                        
                           
                             1,DELETE 2,DELETE 2,INSERT,Munich 4,INSERT,Tokyo  | 
                           
                         
                        问题在于,我们无法对两个文件进行物理合并,但我们可相应地处理它们。如果它是一个原始的 INSERT 或 
                          DELETE,我们会输出一个带有计数器的键。如果它是创建新插入数据的 UPDATE 操作,我们想要一个不会化简的不同的键,所以我们生成一个类似清单 
                          21 的间隙 (interstitial) 文件。 
                        清单 21. 生成间隙文件 
                        
                           
                             1,1,London 2,1,Paris, 3,1,New York  1,-1,London 2,-1,Paris 2#NEW,Munich 4#NEW,1,Tokyo  | 
                           
                         
                        在 Reduce 期间,我们汇总每个惟一键的计数器的内容,生成清单 22。 
                        清单 22. 汇总每个惟一键的计数器的内容 
                        
                           
                             1,0,London 2,0,Paris, 3,1,New York  2#NEW,1,Munich 4#NEW,1,Tokyo  | 
                           
                         
                        我们然后可通过一个辅助 MapReduce 函数运行内容,使用清单 23 中所示的基本结构。 
                        清单 23. 通过一个辅助 MapReduce 函数运行内容 
                        
                           
                             map:      if (key contains #NEW):         emit(row)     if (count >0 ):         emit(row)  | 
                           
                          
                        辅助 MapReduce 会得到预期输出,如清单 24 中所示。 
                        清单 24. 辅助 MapReduce 函数的预期输出 
                        
                           
                             3,1,New York  2,Munich 4,1,Tokyo  | 
                           
                          
                        图 2 演示了这个首先格式化和化简、然后简化输出的两阶段过程。 
                          
                        图 2. 格式化、化简和简化输出的两阶段过程 
                        原始数据在 Map 和 Reduce 阶段中得到化简和映射 
                        这个过程需要比传统数据库中更多的工作,但它所提供解决方案需要的对不断更新的数据的交换简单得多。 
                        结束语 
                        本文介绍了许多使用 MapReduce 查询的不同场景。您看到了这些查询在处理各种数据上的强大功能,您现在应能够在自己的 
                          MapReduce 解决方案中利用这些示例了。 
                         |