MapµÄ½á¹û£¬»áͨ¹ýpartition·Ö·¢µ½ReducerÉÏ£¬Reducer×öÍêReduce²Ù×÷ºó£¬Í¨¹ýOutputFormat£¬½øÐÐÊä³ö£¬ÏÂÃæÎÒÃǾÍÀ´·ÖÎö²ÎÓëÕâ¸ö¹ý³ÌµÄÀà¡£

MapperµÄ½á¹û£¬¿ÉÄÜË͵½¿ÉÄܵÄCombiner×öºÏ²¢£¬CombinerÔÚϵͳÖв¢Ã»ÓÐ×Ô¼ºµÄ»ùÀ࣬¶øÊÇÓÃReducer×÷ΪCombinerµÄ»ùÀ࣬ËûÃǶÔÍâµÄ¹¦ÄÜÊÇÒ»ÑùµÄ£¬Ö»ÊÇʹÓõÄλÖúÍʹÓÃʱµÄÉÏÏÂÎIJ»Ì«Ò»Ñù¶øÒÑ¡£
Mapper×îÖÕ´¦ÀíµÄ½á¹û¶Ô<key, value>£¬ÊÇÐèÒªË͵½ReducerÈ¥ºÏ²¢µÄ£¬ºÏ²¢µÄʱºò£¬ÓÐÏàͬkeyµÄ¼ü/Öµ¶Ô»áË͵½Í¬Ò»¸öReducerÄÇ£¬Äĸökeyµ½ÄĸöReducerµÄ·ÖÅä¹ý³Ì£¬ÊÇÓÉPartitioner¹æ¶¨µÄ£¬ËüÖ»ÓÐÒ»¸ö·½·¨£¬ÊäÈëÊÇMapµÄ½á¹û¶Ô<key,
value>ºÍReducerµÄÊýÄ¿£¬Êä³öÔòÊÇ·ÖÅäµÄReducer£¨ÕûÊý±àºÅ£©¡£ÏµÍ³È±Ê¡µÄPartitionerÊÇHashPartitioner£¬ËüÒÔkeyµÄHashÖµ¶ÔReducerµÄÊýĿȡ죬µÃµ½¶ÔÓ¦µÄReducer¡£
ReducerÊÇËùÓÐÓû§¶¨ÖÆReducerÀàµÄ»ùÀ࣬ºÍMapperÀàËÆ£¬ËüÒ²ÓÐsetup£¬reduce£¬cleanupºÍrun·½·¨£¬ÆäÖÐsetupºÍcleanupº¬ÒåºÍMapperÏàͬ£¬reduceÊÇÕæÕýºÏ²¢Mapper½á¹ûµÄµØ·½£¬ËüµÄÊäÈëÊÇkeyºÍÕâ¸ökey¶ÔÓ¦µÄËùÓÐvalueµÄÒ»¸öµü´úÆ÷£¬Í¬Ê±»¹°üÀ¨ReducerµÄÉÏÏÂÎÄ¡£ÏµÍ³Öж¨ÒåÁËÁ½¸ö·Ç³£¼òµ¥µÄReducer£¬IntSumReducerºÍLongSumReducer£¬·Ö±ðÓÃÓÚ¶ÔÕûÐÎ/³¤ÕûÐ͵ÄvalueÇóºÍ¡£
ReduceµÄ½á¹û£¬Í¨¹ýReducer.ContextµÄ·½·¨collectÊä³öµ½ÎļþÖУ¬ºÍÊäÈëÀàËÆ£¬HadoopÒýÈëÁËOutputFormat¡£OutputFormatÒÀÀµÁ½¸ö¸¨Öú½Ó¿Ú£ºRecordWriterºÍOutputCommitter£¬À´´¦ÀíÊä³ö¡£RecordWriterÌṩÁËwrite·½·¨£¬ÓÃÓÚÊä³ö<key,
value>ºÍclose·½·¨£¬ÓÃÓڹرնÔÓ¦µÄÊä³ö¡£OutputCommitterÌṩÁËһϵÁз½·¨£¬Óû§Í¨¹ýʵÏÖÕâЩ·½·¨£¬¿ÉÒÔ¶¨ÖÆOutputFormatÉú´æÆÚijЩ½×¶ÎÐèÒªµÄÌØÊâ²Ù×÷¡£ÎÒÃÇÔÚTaskInputOutputContextÖÐÌÖÂÛ¹ýÕâЩ·½·¨£¨Ã÷ÏÔ£¬TaskInputOutputContextÊÇOutputFormatºÍReducer¼äµÄÇÅÁº£©¡£
OutputFormatºÍRecordWriter·Ö±ð¶ÔÓ¦×ÅInputFormatºÍRecordReader£¬ÏµÍ³ÌṩÁË¿ÕÊä³öNullOutputFormat£¨Ê²Ã´½á¹û¶¼²»Êä³ö£¬NullOutputFormat.RecordWriterÖ»ÊÇʾÀý£¬ÏµÍ³ÖÐûÓж¨Ò壩£¬LazyOutputFormat£¨Ã»ÔÚÀàͼÖгöÏÖ£¬²»·ÖÎö£©£¬FilterOutputFormat£¨²»·ÖÎö£©ºÍ»ùÓÚÎļþFileOutputFormatµÄSequenceFileOutputFormatºÍTextOutputFormatÊä³ö¡£
»ùÓÚÎļþµÄÊä³öFileOutputFormatÀûÓÃÁËһЩÅäÖÃÏîÅäºÏ¹¤×÷£¬°üÀ¨mapred.output.compress£ºÊÇ·ñѹËõ
mapred.output.compression.codec£ºÑ¹Ëõ·½·¨£»mapred.output.dir£ºÊä³ö·¾¶£»mapred.work.output.dir£ºÊä³ö¹¤×÷·¾¶¡£FileOutputFormat»¹ÒÀÀµÓÚFileOutputCommitter£¬Í¨¹ýFileOutputCommitterÌṩһЩºÍJob£¬TaskÏà¹ØµÄÁÙʱÎļþ¹ÜÀí¹¦ÄÜ¡£ÈçFileOutputCommitterµÄsetupJob£¬»áÔÚÊä³ö·¾¶Ï´´½¨Ò»¸öÃûΪ_temporaryµÄÁÙʱĿ¼£¬cleanupJobÔò»áɾ³ýÕâ¸öĿ¼¡£
SequenceFileOutputFormatÊä³öºÍTextOutputFormatÊä³ö·Ö±ð¶ÔÓ¦ÊäÈëµÄSequenceFileInputFormatºÍTextInputFormat£¬ÎÒÃǾͲ»ÔÙÏêϸ·ÖÎöÀ²¡£
MapperµÄÊä³ö£¬ÔÚ·¢Ë͵½ReducerǰÊÇ´æ·ÅÔÚ±¾µØÎļþϵͳµÄ£¬IFileÌṩÁ˶ÔMapperÊä³öµÄ¹ÜÀí¡£ÎÒÃÇÒѾ֪µÀ£¬MapperµÄÊä³öÊÇ<Key£¬Value>¶Ô£¬IFileÒԼǼ<key-len,
value-len, key,value>µÄÐÎʽ´æ·ÅÁËÕâЩÊý¾Ý¡£ÎªÁ˱£´æ¼üÖµ¶ÔµÄ±ß½ç£¬ºÜ×ÔÈ»IFileÐèÒª±£´ækey-lenºÍvalue-len¡£
ºÍIFileÏà¹ØµÄÀàͼÈçÏ£º

ÆäÖУ¬ÎļþÁ÷ÐÎʽµÄÊäÈëºÍÊä³öÊÇÓÉIFIleInputStreamºÍIFIleOutputStream³éÏó¡£ÒԼǼÐÎʽµÄ¶Á/д²Ù×÷ÓÉIFile.Reader/IFile.WriterÌṩ£¬IFile.InMemoryReaderÓÃÓÚ¶ÁÈ¡´æÔÚÓÚÄÚ´æÖеÄIFileÎļþ¸ñʽÊý¾Ý¡£
ÎÒÃÇÒÔÊä³öΪÀý£¬À´·ÖÎöÕⲿ·ÖµÄʵÏÖ¡£Ê×ÏÈÊÇÏÂͼµÄºÍÐòÁл¯·´ÐòÁл¯Ïà¹ØµÄSerialization/Deserializer£¬Õⲿ·ÖµÄcodeÊÇÔÚ°üorg.apache.hadoop.io.serializer¡£ÐòÁл¯ÓÉSerializer³éÏó£¬Í¨¹ýSerializerµÄʵÏÖ£¬Óû§¿ÉÒÔÀûÓÃserialize·½·¨°Ñ¶ÔÏóÐòÁл¯µ½Í¨¹ýopen·½·¨´ò¿ªµÄÊä³öÁ÷Àï¡£DeserializerÌṩµÄÊÇÏà·´µÄ¹ý³Ì£¬¶ÔÓ¦µÄ·½·¨ÊÇdeserialize¡£hadoop.io.serializerÖл¹ÊµÏÖÁËÅäºÏ¹¤×÷µÄSerializationºÍ¶ÔÓ¦µÄ¹¤³§SerializationFactory¡£Á½¸ö¾ßÌåµÄʵÏÖÊÇWritableSerializationºÍJavaSerialization£¬·Ö±ð¶ÔÓ¦ÁËWritebleµÄÐòÁл¯·´ÐòÁл¯ºÍJava±¾Éí´øµÄÐòÁл¯·´ÐòÁл¯¡£

ÓÐÁËSerializer/Deserializer£¬ÎÒÃÇÀ´·ÖÎöIFile.Writer¡£WriterµÄ¹¹Ô캯ÊýÊÇ£º
public Writer(Configuration conf, FSDataOutputStream out, Class<K> keyClass, Class<V> valueClass, CompressionCodec codec, Counters.Counter writesCounter) |
conf£¬ÅäÖòÎÊý£¬outÊÇWriterµÄÊä³ö£¬keyClass ºÍvalueClass
ÊÇÊä³öµÄKay£¬ValueµÄclassÊôÐÔ£¬codecÊǶÔÊä³ö½øÐÐѹËõµÄ·½·¨£¬²ÎÊýwritesCounterÓÃÓÚ¶ÔÊä³ö×Ö½ÚÊý½øÐÐͳ¼ÆµÄCounters.Counter¡£Í¨¹ýÕâЩ²ÎÊý£¬ÎÒÃÇ¿ÉÒÔ¹¹ÔìÎÒÃÇʹÓõÄÖ§³ÖѹËõ¹¦ÄܵÄÊä³öÁ÷£¨Àà³ÉÔ±out£¬Àà³ÉÔ±rawOut±£´æÁ˹¹Ô캯Êý´«ÈëµÄout£©£¬Ïà¹ØµÄ¼ÆÊýÆ÷£¬»¹ÓоÍÊÇKay£¬ValueµÄSerializer·½·¨¡£
Writer×îÖ÷ÒªµÄ·½·¨ÊÇappend·½·¨£¨¾ÓÈ»²»ÊÇwrite·½·¨£¬ºÇºÇ£©£¬ÓÐÁ½ÖÖÐÎʽ£º
public void append(K key, V value) throws IOException { public void append(DataInputBuffer key, DataInputBuffer value) |
append(K key, V value)µÄÖ÷Òª¹ý³ÌÊǼì²é²ÎÊý£¬È»ºó½«keyºÍvalueÐòÁл¯µ½DataOutputBufferÖУ¬²¢»ñÈ¡ÐòÁл¯ºóµÄ³¤¶È£¬×îºó°Ñ³¤¶È£¨2¸ö£©ºÍDataOutputBufferÖеĽá¹ûдµ½Êä³ö£¬²¢¸´Î»DataOutputBufferºÍ¼ÆÊý¡£append(DataInputBuffer
key, DataInputBuffer value)´¦Àí¹ý³ÌÒ²±È½ÏÀàËÆ£¬¾Í²»ÔÙ·ÖÎöÁË¡£
close·½·¨ÖÐÐèҪעÒâµÄÊÇ£¬ÎÒÃÇÐèÒª±ê¼ÇÎļþ⣬»òÕßÊÇÁ÷½áÊø¡£Ä¿Ç°ÊÇͨ¹ýд2¸öֵΪEOF_MARKERµÄ³¤¶ÈÀ´×ö±ê¼Ç¡£
IFileOutputStreamÊÇÓÃÓÚÅäºÏWriterµÄÊä³öÁ÷£¬Ëü»áÔÚIFilesµÄ×îºóÌí¼ÓУÑéÊý¾Ý¡£µ±Writerµ÷ÓÃIFileOutputStreamµÄwrite²Ù×÷ʱ£¬IFileOutputStream¼ÆËã²¢±£³ÖУÑéºÍ£¬Á÷±»closeµÄʱºò£¬Ð£Ñé½á¹û»áдµ½¶ÔÓ¦ÎļþµÄÎļþβ¡£Êµ¼ÊÉÏ´æ·ÅÔÚ´ÅÅÌÉϵÄÎļþÊÇһϵÁеÄ<key-len,
value-len, key, value>¼Ç¼ºÍУÑé½á¹û¡£
ReaderµÄÏà¹Ø¹ý³Ì£¬ÎÒÃǾͲ»ÔÙ·ÖÎöÁË¡£
IDsÀàºÍ*ContextÀà
ÎÒÃÇ¿ªÊ¼À´·ÖÎöHadoop MapReduceµÄÄÚ²¿µÄÔËÐлúÖÆ¡£Óû§ÏòHadoopÌá½»Job£¨×÷Òµ£©£¬×÷ÒµÔÚJobTracker¶ÔÏóµÄ¿ØÖÆÏÂÖ´ÐС£Job±»·Ö½â³ÉΪTask£¨ÈÎÎñ£©£¬·Ö·¢µ½¼¯ÈºÖУ¬ÔÚTaskTrackerµÄ¿ØÖÆÏÂÔËÐС£Task°üÀ¨MapTaskºÍReduceTask£¬ÊÇMapReduceµÄMap²Ù×÷ºÍReduce²Ù×÷Ö´Ðеĵط½¡£ÕâÖÐÈÎÎñ·Ö²¼µÄ·½·¨±È½ÏÀàËÆÓÚHDFSÖÐNameNodeºÍDataNodeµÄ·Ö¹¤£¬NameNode¶ÔÓ¦µÄÊÇJobTracker£¬DataNode¶ÔÓ¦µÄÊÇTaskTracker¡£JobTracker£¬TaskTrackerºÍMapReduceµÄ¿Í»§¶Ëͨ¹ýRPCͨÐÅ£¬¾ßÌå¿ÉÒԲο¼HDFS²¿·ÖµÄ·ÖÎö¡£
ÎÒÃÇÏÈÀ´·ÖÎöһЩ¸¨ÖúÀ࣬Ê×ÏÈÊǺÍIDÓйصÄÀ࣬IDµÄ¼Ì³ÐÊ÷ÈçÏ£º

ÕâÕÅͼ¿ÉÒÔ¿´³öÏÖÔÚHadoopµÄorg.apache.hadoop.mapredÏòorg.apache.hadoop.mapreduceÇ¨ÒÆ´øÀ´µÄһЩÎÊÌ⣬ÆäÖлÒÉ«ÊDZêעΪ@DeprecatedµÄ¡£IDЯ´øÒ»¸öÕûÐÍ£¬ÊµÏÖÁËWritableComparable½Ó¿Ú£¬Õâ±íÃ÷Ëü¿ÉÒԱȽϣ¬¶øÇÒ¿ÉÒÔ±»HadoopµÄio»úÖÆ´®Ðл¯/½â´®Ðл¯£¨±ØÐëʵÏÖcompareTo/readFields/write·½·¨£©¡£JobIDÊÇϵͳ·ÖÅ䏸×÷ÒµµÄΨһ±êʶ·û£¬ËüµÄtoString½á¹ûÊÇjob_<jobtrackerID>_<jobNumber>¡£Àý×Ó£ºjob_200707121733_0003±íÃ÷ÕâÊÇjobtracker
200707121733£¨ÀûÓÃjobtrackerµÄ¿ªÊ¼Ê±¼ä×÷ΪID£©µÄµÚ3ºÅ×÷Òµ¡£
×÷Òµ·Ö³ÉÈÎÎñÖ´ÐУ¬ÈÎÎñºÅTaskID°üº¬ÁËËüËùÊôµÄ×÷ÒµID£¬Í¬Ê±Ò²ÓÐÈÎÎñID£¬Í¬Ê±»¹±£³ÖÁËÕâÊÇ·ñÊÇÒ»¸öMapÈÎÎñ£¨³ÉÔ±±äÁ¿isMap£©¡£ÈÎÎñºÅµÄ×Ö·û´®±íʾΪtask_<jobtrackerID>_<jobNumber>_[m|r]_<taskNumber>£¬Èçtask_200707121733_0003_m_000005±íʾ×÷Òµ200707121733_0003µÄ000005ºÅÈÎÎñ£¬¸ÄÈÎÎñÊÇÒ»¸öMapÈÎÎñ¡£
Ò»¸öÈÎÎñÓпÉÄÜÓжà¸öÖ´ÐУ¨´íÎó»Ö¸´/Ïû³ýStragglersµÈ£©£¬ËùÒÔ±ØÐëÇø·ÖÈÎÎñµÄ¶à¸öÖ´ÐУ¬ÕâÊÇͨ¹ýÀàTaskAttemptIDÀ´Íê³É£¬ËüÔÚÈÎÎñºÅµÄ»ù´¡ÉÏÌí¼ÓÁ˳¢ÊԺš£Ò»¸öÈÎÎñ³¢ÊԺŵÄÀý×ÓÊÇattempt_200707121733_0003_m_000005_0£¬ËüÊÇÈÎÎñtask_200707121733_0003_m_000005µÄµÚ0ºÅ³¢ÊÔ¡£
JVMIdÓÃÓÚ¹ÜÀíÈÎÎñÖ´Ðйý³ÌÖеÄJavaÐéÄâ»ú£¬ÎÒÃǺóÃæÔÙÌÖÂÛ¡£
ΪÁËʹJobºÍTask¹¤×÷£¬HadoopÌṩÁËһϵÁеÄÉÏÏÂÎÄ£¬ÕâЩÉÏÏÂÎı£´æÁËJobºÍTask¹¤×÷µÄÐÅÏ¢¡£

´¦Óڼ̳ÐÊ÷µÄ×îÉÏ·½ÊÇorg.apache.hadoop.mapreduce.JobContext£¬Ç°ÃæÎÒÃÇÒѾ½éÉܹýÁË£¬ËüÌṩÁËJobµÄһЩֻ¶ÁÊôÐÔ£¬Á½¸ö³ÉÔ±±äÁ¿£¬Ò»¸ö±£´æÁËJobID£¬ÁíÒ»¸öÀàÐÍΪJobConf£¬JobContextÖгýÁËJobIDÍ⣬ÆäËüµÄÐÅÏ¢¶¼±£³ÖÔÚJobConfÖС£Ëü¶¨ÒåÁËÈçÏÂÅäÖÃÏ
1.mapreduce.inputformat.class£ºInputFormatµÄʵÏÖ
2.mapreduce.map.class£ºMapperµÄʵÏÖ
3.mapreduce.combine.class: ReducerµÄʵÏÖ
4.mapreduce.reduce.class£ºReducerµÄʵÏÖ
5.mapreduce.outputformat.class: OutputFormatµÄʵÏÖ
6.mapreduce.partitioner.class: PartitionerµÄʵÏÖ
ͬʱ£¬ËüÌṩ·½·¨£¬Ê¹µÃͨ¹ýÀàÃû£¬ÀûÓÃJava·´ÉäÌṩµÄClass.forName·½·¨£¬»ñµÃÀà¶ÔÓ¦µÄClass¡£org.apache.hadoop.mapredµÄJobContext¶ÔÏó±Èorg.apache.hadoop.mapreduce.JobContext¶àÁ˳ÉÔ±±äÁ¿progress£¬ÓÃÓÚ»ñÈ¡½ø¶ÈÐÅÏ¢£¬ËüÀàÐÍΪJobConf³ÉÔ±jobÖ¸Ïòmapreduce.JobContext¶ÔÓ¦µÄ³ÉÔ±£¬Ã»ÓÐÌí¼ÓÈκÎй¦ÄÜ¡£
JobConf¼Ì³Ð×ÔConfiguration£¬±£³ÖÁËMapReduceÖ´ÐÐÐèÒªµÄһЩÅäÖÃÐÅÏ¢£¬Ëü¹ÜÀí×Å46¸öÅäÖòÎÊý£¬°üÀ¨ÉÏÃæmapreduceÅäÖÃÏî¶ÔÓ¦µÄÀϰ汾ÐÎʽ£¬Èçmapreduce.map.class
¶ÔÓ¦mapred.mapper.class¡£ÕâЩÅäÖÃÏîÎÒÃÇÔÚʹÓõ½ËüÃǵÄʱºòÔÙ½éÉÜ¡£
org.apache.hadoop.mapreduce.JobContextµÄ×ÓÀàJobÇ°ÃæÒ²ÒѾ½éÉÜÁË£¬ºóÃæÔÚÌÖÂÛϵͳµÄ¶¯Ì¬ÐÐΪʱ£¬ÔÙ»ØÀ´¿´Ëü¡£
TaskAttemptContextÓÃÓÚÈÎÎñµÄÖ´ÐУ¬ËüÒýÈëÁ˱êʶÈÎÎñÖ´ÐеÄTaskAttemptIDºÍÈÎÎñ״̬status£¬²¢ÌṩеķÃÎʽӿڡ£org.apache.hadoop.mapredµÄTaskAttemptContext¼Ì³Ð×ÔmapreduceµÄ¶ÔÓ¦°æ±¾£¬Ö»ÊÇÔö¼ÓÁ˼Ǽ½ø¶ÈµÄprogress¡£
TaskInputOutputContextºÍËüµÄ×ÓÀà¶¼ÔÚ°üorg.apache.hadoop.mapreduceÖУ¬Ç°ÃæÒѾ·ÖÎö¹ý¡£
°ühadoop.mapredÖеÄMapReduce½Ó¿Ú
Ç°ÃæÒѾÍê³ÉÁ˶Ôorg.apache.hadoop.mapreduceµÄ·ÖÎö£¬Õâ¸ö°üÌṩÁËHadoop
MapReduce²¿·ÖµÄÓ¦ÓÃAPI£¬ÓÃÓÚÓû§ÊµÏÖ×Ô¼ºµÄMapReduceÓ¦Óᣵ«ÕâЩ½Ó¿ÚÊǸøÎ´À´µÄMapReduceÓ¦Óõģ¬Ä¿Ç°MapReduce¿ò¼Ü»¹ÊÇʹÓÃÀÏϵͳ¡£ÏÂÃæÎÒÃÇÀ´·ÖÎöorg.apache.hadoop.mapred£¬Ê×ÏÈ»¹ÊÇ´ÓmapredµÄMapReduce¿ò¼Ü¿ªÊ¼·ÖÎö£¬ÏÂÃæµÄÀàͼ£¨»ÒÉ«²¿·ÖΪ±ê¼ÇΪ@DeprecatedµÄÀà/½Ó¿Ú£©£º

ÎÒÃǰѰümapreduceµÄÀàͼ¸½ÔÚÏÂÃæ£¬¶Ô±Èһϣ¬ÎÒÃǾͻᷢÏÖ£¬org.apache.hadoop.mapredÖеÄMapReduce
APIÏà¶ÔÀ´ËµºÜ¼òµ¥£¬Ö÷ÒªÊÇÉÙÁ˺ÍContextÏà¹ØµÄÀ࣬ÄÇô£¬ºÃ¶àÔÚmapreduceÖÐͨ¹ýcontextÀ´Íê³ÉµÄ¹¤×÷£¬¾ÍÐèҪͨ¹ý²ÎÊýÀ´´«µÝ£¬ÈçMapÖеÄÊä³ö£¬Àϰ汾ÊÇ£º
output.collect(key, result); // output¡¯s type is: OutputCollector |
а汾ÊÇ£º
context.write(key, result); // output¡¯s type is: Context |
ËüÃÇ·Ö±ðʹÓÃOutputCollectorºÍMapper.ContextÀ´Êä³ömapµÄ½á¹û£¬ÏÔÈ»£¬ÔÓÐOutputCollectorµÄÐÂAPIÖоͲ»ÔÙÐèÒª¡£×ÜÌåÀ´Ëµ£¬Àϰ汾µÄAPI±È½Ï¼òµ¥£¬MapReduce¹ý³ÌÖйؼüµÄ¶ÔÏó¶¼ÓУ¬µ«¿ÉÀ©Õ¹ÐÔ²»ÊǺÜÇ¿¡£Í¬Ê±£¬ÀϰæÖÐÌṩµÄ¸¨ÖúÀàÒ²ºÜ¶à£¬ÎÒÃÇÇ°Ãæ·ÖÎöµÄFileOutputFormat£¬Ò²ÓжÔÓ¦µÄʵÏÖ£¬ÎÒÃǾͲ»ÔÙÌÖÂÛÁË¡£

Ò»¡¢°ümapreduce.lib.input
½ÓÏÂÀ´ÎÒÃǰ´ÕÕMapReduce¹ý³ÌÖÐÊý¾ÝÁ÷¶¯µÄ˳Ðò£¬À´·Ö½âorg.apache.hadoop.mapreduce.lib.*µÄÏà¹ØÄÚÈÝ£¬²¢½éÉܶÔÓ¦µÄ»ùÀàµÄ¹¦ÄÜ¡£Ê×ÏÈÊÇinput²¿·Ö£¬ËüʵÏÖÁËMapReduceµÄÊý¾ÝÊäÈ벿·Ö¡£ÀàͼÈçÏ£º

ÀàͼµÄÓÒÉϽÇÊÇInputFormat£¬ËüÃèÊöÁËÒ»¸öMapReduceJobµÄÊäÈ룬ͨ¹ýInputFormat£¬Hadoop¿ÉÒÔ£º
1.¼ì²éMapReduceÊäÈëÊý¾ÝµÄÕýÈ·ÐÔ£»
2.½«ÊäÈëÊý¾ÝÇзÖΪÂß¼¿éInputSplit£¬ÕâЩ¿é»á·ÖÅ䏸Mapper£»
3.Ìṩһ¸öRecordReaderʵÏÖ£¬MapperÓøÃʵÏÖ´ÓInputSplitÖжÁÈ¡ÊäÈëµÄ<K,V>¶Ô¡£
ÔÚorg.apache.hadoop.mapreduce.lib.inputÖУ¬HadoopΪËùÓлùÓÚÎļþµÄInputFormatÌṩÁËÒ»¸öÐé»ùÀàFileInputFormat¡£ÏÂÃæ¼¸¸ö²ÎÊý¿ÉÒÔÓÃÓÚÅäÖÃFileInputFormat£º
1.mapred.input.pathFilter.class£ºÊäÈëÎļþ¹ýÂËÆ÷£¬Í¨¹ý¹ýÂËÆ÷µÄÎļþ²Å»á¼ÓÈëInputFormat£»
2.mapred.min.split.size£º×îСµÄ»®·Ö´óС£»
3.mapred.max.split.size£º×î´óµÄ»®·Ö´óС£»
4.mapred.input.dir£ºÊäÈë·¾¶£¬ÓöººÅ×ö·Ö¸î¡£
ÀàÖбȽÏÖØÒªµÄ·½·¨ÓУº
protected List<FileStatus> listStatus(Configuration job) |
µÝ¹é»ñÈ¡ÊäÈëÊý¾ÝĿ¼ÖеÄËùÓÐÎļþ£¨°üÀ¨ÎļþÐÅÏ¢£©£¬ÊäÈëµÄjobÊÇϵͳÔËÐеÄÅäÖÃConfiguration£¬°üº¬ÁËÉÏÃæÎÒÃÇÌáµ½µÄ²ÎÊý¡£
public List<InputSplit> getSplits(JobContext context) |
½«ÊäÈë»®·ÖΪInputSplit£¬°üº¬Á½¸öÑ»·£¬µÚÒ»¸öÑ»·´¦ÀíËùÓеÄÎļþ£¬¶ÔÓÚÿһ¸öÎļþ£¬¸ù¾ÝÊäÈëµÄ»®·Ö×î´ó/×îСֵ£¬Ñ»·µÃµ½ÎļþÉϵĻ®·Ö¡£×¢Ò⣬»®·Ö²»»á¿çÔ½Îļþ¡£
FileInputFormatûÓÐʵÏÖInputFormatµÄcreateRecordReader·½·¨¡£
FileInputFormatÓÐÁ½¸ö×ÓÀ࣬SequenceFileInputFormatÊÇHadoop¶¨ÒåµÄÒ»ÖÖ¶þ½øÖÆÐÎʽ´æ·ÅµÄ¼ü/ÖµÎļþ£¨²Î¿¼http://hadoop.apache.org/core/do
... o/SequenceFile.html£©£¬ËüÓÐ×Ô¼º¶¨ÒåµÄÎļþ²¼¾Ö¡£ÓÉÓÚËüÓÐÌØÊâµÄÀ©Õ¹Ãû£¬ËùÒÔSequenceFileInputFormatÖØÔØÁËlistStatus£¬Í¬Ê±£¬ËüʵÏÖÁËcreateRecordReader£¬·µ»ØÒ»¸öSequenceFileRecordReader¶ÔÏó
TextInputFormat´¦ÀíµÄÊÇÎı¾Îļþ£¬createRecordReader·µ»ØµÄÊÇLineRecordReaderµÄʵÀý¡£ÕâÁ½¸öÀ඼ûÓÐÖØÔØFileInputFormatµÄgetSplits·½·¨£¬ÄÇô£¬ÔÚËûÃǶÔÓÚµÄRecordReaderÖУ¬±ØÐ뿼ÂÇFileInputFormat¶ÔÊäÈëµÄ»®·Ö·½Ê½¡£
FileInputFormatµÄgetSplits£¬·µ»ØµÄÊÇFileSplit¡£ÕâÊÇÒ»¸öºÜ¼òµ¥µÄÀ࣬°üº¬µÄÊôÐÔ£¨ÎļþÃû£¬ÆðÊ¼Æ«ÒÆÁ¿£¬»®·ÖµÄ³¤¶ÈºÍ¿ÉÄܵÄÄ¿±ê»úÆ÷£©ÒѾ×ãÒÔ˵Ã÷Õâ¸öÀàµÄ¹¦ÄÜ¡£
RecordReaderÓÃÓÚÔÚ»®·ÖÖжÁÈ¡<Key,Value>¶Ô¡£RecordReaderÓÐÎå¸öÐé·½·¨£¬·Ö±ðÊÇ£º
1.initialize£º³õʼ»¯£¬ÊäÈë²ÎÊý°üÀ¨¸ÃReader¹¤×÷µÄÊý¾Ý»®·ÖInputSplitºÍJobµÄÉÏÏÂÎÄcontext£»
2.nextKey£ºµÃµ½ÊäÈëµÄÏÂÒ»¸öKey£¬Èç¹ûÊý¾Ý»®·ÖÒѾûÓÐеļǼ£¬·µ»Ø¿Õ£»
3.nextValue£ºµÃµ½Key¶ÔÓ¦µÄValue£¬±ØÐëÔÚµ÷ÓÃnextKeyºóµ÷Óã»
4.getProgress£ºµÃµ½ÏÖÔڵĽø¶È£»
5.close£¬À´×Ôjava.ioµÄCloseable½Ó¿Ú£¬ÓÃÓÚÇåÀíRecordReader¡£
ÎÒÃÇÒÔLineRecordReaderΪÀý£¬À´·ÖÎöRecordReaderµÄ¹¹³É¡£Ç°ÃæÎÒÃÇÒѾ·ÖÎö¹ýFileInputFormat¶ÔÎļþµÄ»®·ÖÁË£¬»®·ÖÍêµÄSplit°üÀ¨ÁËÎļþÃû£¬ÆðÊ¼Æ«ÒÆÁ¿£¬»®·ÖµÄ³¤¶È¡£ÓÉÓÚÎļþÊÇÎı¾Îļþ£¬LineRecordReaderµÄ³õʼ»¯·½·¨initialize»á´´½¨Ò»¸ö»ùÓÚÐеĶÁÈ¡¶ÔÏóLineReader£¨¶¨ÒåÔÚorg.apache.hadoop.utilÖУ¬ÎÒÃǾͲ»·ÖÎöÀ²£©£¬È»ºóÌø¹ýÊäÈëµÄ×ʼµÄ²¿·Ö£¨Ö»ÔÚSplitµÄÆðÊ¼Æ«ÒÆÁ¿²»Îª0µÄÇé¿öϽøÐУ¬Õâʱ×ʼµÄ²¿·Ö¿ÉÄÜÊÇÉÏÒ»¸öSplitµÄ×îºóÒ»ÐеÄÒ»²¿·Ö£©¡£nextKeyµÄ´¦ÀíºÜ¼òµ¥£¬ËüʹÓõ±Ç°µÄÆ«ÒÆÁ¿×÷ΪKey£¬nextValueµ±È»¾ÍÊÇÆ«ÒÆÁ¿¿ªÊ¼µÄÄÇÒ»ÐÐÁË£¨Èç¹ûÐкܳ¤£¬¿ÉÄܳöÏֽضϣ©¡£½ø¶ÈgetProgressºÍclose¶¼ºÜ¼òµ¥¡£
¶þ¡¢°ümapreduce.lib.map
HadoopµÄMapReduce¿ò¼ÜÖУ¬Map¶¯×÷ͨ¹ýMapperÀàÀ´³éÏó¡£Ò»°ãÀ´Ëµ£¬ÎÒÃÇ»áʵÏÖ×Ô¼ºÌØÊâµÄMapper£¬²¢×¢²áµ½ÏµÍ³ÖУ¬Ö´ÐÐʱ£¬ÎÒÃǵÄMapper»á±»MapReduce¿ò¼Üµ÷Óá£MapperÀàºÜ¼òµ¥£¬°üÀ¨Ò»¸öÄÚ²¿ÀàºÍËĸö·½·¨£¬¾²Ì¬½á¹¹Í¼ÈçÏ£º

ÄÚ²¿ÀàContext¼Ì³Ð×ÔMapContext£¬²¢Ã»ÓÐÒýÈëÈκÎÐµķ½·¨¡£
MapperµÄËĸö·½·¨ÊÇsetup£¬map£¬cleanupºÍrun¡£ÆäÖУ¬setupºÍcleanupÓÃÓÚ¹ÜÀíMapperÉúÃüÖÜÆÚÖеÄ×ÊÔ´£¬setupÔÚÍê³ÉMapper¹¹Ô죬¼´½«¿ªÊ¼Ö´ÐÐmap¶¯×÷ǰµ÷Óã¬cleanupÔòÔÚËùÓеÄmap¶¯×÷Íê³Éºó±»µ÷Ó᣷½·¨mapÓÃÓÚ¶ÔÒ»´ÎÊäÈëµÄkey/value¶Ô½øÐÐmap¶¯×÷¡£run·½·¨Ö´ÐÐÁËÉÏÃæÃèÊöµÄ¹ý³Ì£¬Ëüµ÷ÓÃsetup£¬Èúóµü´úËùÓеÄkey/value¶Ô£¬½øÐÐmap£¬×îºóµ÷ÓÃcleanup¡£
org.apache.hadoop.mapreduce.lib.mapÖÐʵÏÖÁËMapperµÄÈý¸ö×ÓÀ࣬·Ö±ðÊÇInverseMapper£¨½«ÊäÈë<key,
value> mapΪÊä³ö<value, key>£©£¬MultithreadedMapper£¨¶àÏß³ÌÖ´ÐÐmap·½·¨£©ºÍTokenCounterMapper£¨¶ÔÊäÈëµÄvalue·Ö½âΪtoken²¢¼ÆÊý£©¡£ÆäÖÐ×ÔÓµÄÊÇMultithreadedMapper£¬ÎÒÃǾÍÒÔËüΪÀý£¬À´·ÖÎöMapperµÄʵÏÖ¡£
MultithreadedMapper»áÆô¶¯¶à¸öÏß³ÌÖ´ÐÐÁíÒ»¸öMapperµÄmap·½·¨£¬Ëü»áÆô¶¯mapred.map.multithreadedrunner.threads£¨ÅäÖÃÏ¸öÏß³ÌÖ´ÐÐMapper£ºmapred.map.multithreadedrunner.class£¨ÅäÖÃÏ¡£MultithreadedMapperÖØÐ´ÁË»ùÀàMapperµÄrun·½·¨£¬Æô¶¯N¸öỊ̈߳¨¶ÔÓ¦µÄÀàΪMapRunner£©Ö´ÐÐmapred.map.multithreadedrunner.class£¨ÎÒÃdzÆÎªÄ¿±êMapper£©µÄrun·½·¨£¨¾ÍÊÇ˵£¬Ä¿±êMapperµÄsetupºÍcleanup»á±»Ö´Ðжà´Î£©¡£Ä¿±êMapper¹²Ïíͬһ·ÝInputSplit£¬Õâ¾ÍÒâζ×Å£¬¶ÔInputSplitµÄÊý¾Ý¶Á±ØÐëḬ̈߳²È«¡£Îª´Ë£¬MultithreadedMapperÒýÈëÁËÄÚ²¿ÀàSubMapRecordReader£¬SubMapRecordWriter£¬SubMapStatusReporter£¬·Ö±ð¼Ì³Ð×ÔRecordReader£¬RecordWriterºÍStatusReporter£¬ËüÃÇͨ¹ý»¥³â·ÃÎÊMultithreadedMapperµÄMapper.Context£¬ÊµÏÖÁ˶Ôͬһ·ÝInputSplitµÄḬ̈߳²È«·ÃÎÊ£¬ÎªMapperÌṩËùÐèµÄContext¡£ÕâЩÀàµÄʵÏÖ·½·¨¶¼ºÜ¼òµ¥¡£
ÎÒÃÇÀ´¿´Ò»Ï¾ßÌåµÄ½Ó¿Ú£¬ËüÃǶ¼´¦ÓÚ°üorg.apache.hadoop.mapreduceÖС£

ÉÏÃæµÄͼÖУ¬Àà¿ÉÒÔ·ÖΪ4ÖÖ¡£ÓÒÉϽǵÄÊÇ´ÓWriteable¼Ì³ÐµÄ£¬ºÍCounter£¨»¹ÓÐCounterGroupºÍCounters£¬Ò²ÔÚÕâ¸ö°üÖУ¬²¢Ã»ÓгöÏÖÔÚÉÏÃæµÄͼÀºÍIDÏà¹ØµÄÀ࣬ËüÃDZ£³ÖMapReduce¹ý³ÌÖÐÐèÒªµÄһЩ¼ÆÊýÆ÷ºÍ±êʶ£»Öмä´ó²¿·ÖÊǺÍContextÏà¹ØµÄ*ContextÀ࣬ËüΪMapperºÍReducerÌṩÁËÏà¹ØµÄÉÏÏÂÎÄ£»¹ØÓÚMapºÍReduce£¬¶ÔÓ¦µÄÀàÊÇMapper£¬ReducerºÍÃèÊöËûÃǵÄJob£¨ÔÚHadoop
ÖÐÒ»´Î¼ÆËãÈÎÎñ³ÆÖ®ÎªÒ»¸öjob£¬ÏÂÃæµÄ·ÖÎöÖУ¬ÖÐÎÄΪ¡°×÷Òµ¡±£¬ÏàÓ¦µÄtaskÎÒÃdzÆÎª¡°ÈÎÎñ¡±£©£»Í¼ÖÐÆäËûÀàÊÇÅäºÏMapperºÍReduce¹¤×÷µÄһЩ¸¨ÖúÀà¡£
Èç¹ûÄãÊìϤHTTPServlet£¬ ÄǾÍÄܺÜÇáËɵØÀí½âHadoop²ÉÓõĽṹ£¬°ÑÕû¸öHadoop¿´×÷ÊÇÈÝÆ÷£¬ÄÇôMapperºÍReduce¾ÍÊÇÈÝÆ÷ÀïµÄ×é¼þ£¬*Context±£´æÁË×é¼þµÄһЩÅäÖÃÐÅÏ¢£¬Í¬Ê±Ò²ÊǺÍÈÝÆ÷ͨÐŵĻúÖÆ¡£
ºÍIDÏà¹ØµÄÀàÎÒÃǾͲ»ÔÙÌÖÂÛÁË¡£ÎÒÃÇÏÈ¿´JobContext£¬ËüλÓÚ*Context¼Ì³ÐÊ÷µÄ×îÉÏ·½£¬ÎªJobÌṩһЩֻ¶ÁµÄÐÅÏ¢£¬ÈçJobµÄID£¬Ãû³ÆµÈ¡£ÏÂÃæµÄÐÅÏ¢ÊÇMapReduce¹ý³ÌÖÐһЩ½Ï¹Ø¼üµÄ¶¨ÖÆÐÅÏ¢£º

Job¼Ì³Ð×ÔJobContext£¬ÌṩÁËһϵÁеÄset·½·¨£¬ÓÃÓÚÉèÖÃJobµÄһЩÊôÐÔ£¨Job¸üÐÂÊôÐÔ£¬JobContext¶ÁÊôÐÔ£©£¬Í¬Ê±£¬Job»¹ÌṩÁËһЩ¶ÔJob½øÐпØÖƵķ½·¨£¬ÈçÏ£º
mapProgress£ºmapµÄ½ø¶È£¨0¡ª1.0£©£»
reduceProgress£ºreduceµÄ½ø¶È£¨0¡ª1.0£©£»
isComplete£º×÷ÒµÊÇ·ñÒѾÍê³É£»
isSuccessful£º×÷ÒµÊÇ·ñ³É¹¦£»
killJob£º½áÊøÒ»¸öÔÚÔËÐÐÖеÄ×÷Òµ£»
getTaskCompletionEvents£ºµÃµ½ÈÎÎñÍê³ÉµÄÓ¦´ð£¨³É¹¦/ʧ°Ü£©£»
killTask£º½áÊøÄ³Ò»¸öÈÎÎñ£»
|