±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn
,´ÓFlumeµÄ¸ÅÄî ¡¢EventµÄ¸ÅÄî×÷ΪÆðµã£¬È»ºó½²½âÁ˼ܹ¹Ò»Ð©ÈÕÖ¾²É¼¯£¬´úÂëÐðÊöµÈµÈ¡£ |
|
ÔÚ¾ßÌå½éÉܱ¾ÎÄÄÚÈÝ֮ǰ£¬Ïȸø´ó¼Ò¿´Ò»ÏÂHadoopÒµÎñµÄÕûÌ忪·¢Á÷³Ì£º

´ÓHadoopµÄÒµÎñ¿ª·¢Á÷³ÌͼÖпÉÒÔ¿´³ö£¬ÔÚ´óÊý¾ÝµÄÒµÎñ´¦Àí¹ý³ÌÖУ¬¶ÔÓÚÊý¾ÝµÄ²É¼¯ÊÇÊ®·ÖÖØÒªµÄÒ»²½£¬Ò²ÊDz»¿É±ÜÃâµÄÒ»²½£¬´Ó¶øÒý³öÎÒÃDZ¾ÎĵÄÖ÷½Ç¡ªFlume¡£±¾ÎĽ«Î§ÈÆFlumeµÄ¼Ü¹¹¡¢FlumeµÄÓ¦ÓÃ(ÈÕÖ¾²É¼¯)½øÐÐÏêϸµÄ½éÉÜ¡£
£¨Ò»£©Flume¼Ü¹¹½éÉÜ
1¡¢FlumeµÄ¸ÅÄî

flumeÊÇ·Ö²¼Ê½µÄÈÕÖ¾ÊÕ¼¯ÏµÍ³£¬Ëü½«¸÷¸ö·þÎñÆ÷ÖеÄÊý¾ÝÊÕ¼¯ÆðÀ´²¢Ë͵½Ö¸¶¨µÄµØ·½È¥£¬±ÈÈç˵Ë͵½Í¼ÖеÄHDFS£¬¼òµ¥À´Ëµflume¾ÍÊÇÊÕ¼¯ÈÕÖ¾µÄ¡£
2¡¢EventµÄ¸ÅÄî
ÔÚÕâÀïÓбØÒªÏȽéÉÜÒ»ÏÂflumeÖÐeventµÄÏà¹Ø¸ÅÄflumeµÄºËÐÄÊǰÑÊý¾Ý´ÓÊý¾ÝÔ´(source)ÊÕ¼¯¹ýÀ´£¬ÔÚ½«ÊÕ¼¯µ½µÄÊý¾ÝË͵½Ö¸¶¨µÄÄ¿µÄµØ(sink)¡£ÎªÁ˱£Ö¤ÊäË͵Ĺý³ÌÒ»¶¨³É¹¦£¬ÔÚË͵½Ä¿µÄµØ(sink)֮ǰ£¬»áÏÈ»º´æÊý¾Ý(channel),´ýÊý¾ÝÕæÕýµ½´ïÄ¿µÄµØ(sink)ºó£¬flumeÔÚɾ³ý×Ô¼º»º´æµÄÊý¾Ý¡£
ÔÚÕû¸öÊý¾ÝµÄ´«ÊäµÄ¹ý³ÌÖУ¬Á÷¶¯µÄÊÇevent£¬¼´ÊÂÎñ±£Ö¤ÊÇÔÚevent¼¶±ð½øÐеġ£ÄÇôʲôÊÇeventÄØ£¿¡ª¨Cevent½«´«ÊäµÄÊý¾Ý½øÐзâ×°£¬ÊÇflume´«ÊäÊý¾ÝµÄ»ù±¾µ¥Î»£¬Èç¹ûÊÇÎı¾Îļþ£¬Í¨³£ÊÇÒ»ÐмǼ£¬eventÒ²ÊÇÊÂÎñµÄ»ù±¾µ¥Î»¡£event´Ósource£¬Á÷Ïòchannel£¬ÔÙµ½sink£¬±¾ÉíΪһ¸ö×Ö½ÚÊý×飬²¢¿ÉЯ´øheaders(Í·ÐÅÏ¢)ÐÅÏ¢¡£event´ú±í×ÅÒ»¸öÊý¾ÝµÄ×îСÍêÕûµ¥Ôª£¬´ÓÍⲿÊý¾ÝÔ´À´£¬ÏòÍⲿµÄÄ¿µÄµØÈ¥¡£
ΪÁË·½±ã´ó¼ÒÀí½â£¬¸ø³öÒ»ÕÅeventµÄÊý¾ÝÁ÷Ïòͼ£º 
Ò»¸öÍêÕûµÄevent°üÀ¨£ºevent headers¡¢event body¡¢eventÐÅÏ¢(¼´Îı¾ÎļþÖеĵ¥ÐмǼ)£¬ÈçÏÂËùÒÔ£º

ÆäÖÐeventÐÅÏ¢¾ÍÊÇflumeÊÕ¼¯µ½µÄÈռǼǼ¡£
3¡¢flume¼Ü¹¹½éÉÜ
flumeÖ®ËùÒÔÕâôÉñÆæ£¬ÊÇÔ´ÓÚËü×ÔÉíµÄÒ»¸öÉè¼Æ£¬Õâ¸öÉè¼Æ¾ÍÊÇagent£¬agent±¾ÉíÊÇÒ»¸öjava½ø³Ì£¬ÔËÐÐÔÚÈÕÖ¾ÊÕ¼¯½Úµã¡ªËùνÈÕÖ¾ÊÕ¼¯½Úµã¾ÍÊÇ·þÎñÆ÷½Úµã¡£
agentÀïÃæ°üº¬3¸öºËÐĵÄ×é¼þ£ºsource¡ª->channel¡ª¨C>sink,ÀàËÆÉú²úÕß¡¢²Ö¿â¡¢Ïû·ÑÕߵļܹ¹¡£
source£ºsource×é¼þÊÇרÃÅÓÃÀ´ÊÕ¼¯Êý¾ÝµÄ£¬¿ÉÒÔ´¦Àí¸÷ÖÖÀàÐÍ¡¢¸÷ÖÖ¸ñʽµÄÈÕÖ¾Êý¾Ý,°üÀ¨avro¡¢thrift¡¢exec¡¢jms¡¢spooling
directory¡¢netcat¡¢sequence generator¡¢syslog¡¢http¡¢legacy¡¢×Ô¶¨Òå¡£
channel£ºsource×é¼þ°ÑÊý¾ÝÊÕ¼¯À´ÒÔºó£¬ÁÙʱ´æ·ÅÔÚchannelÖУ¬¼´channel×é¼þÔÚagentÖÐÊÇרÃÅÓÃÀ´´æ·ÅÁÙʱÊý¾ÝµÄ¡ª¡ª¶Ô²É¼¯µ½µÄÊý¾Ý½øÐмòµ¥µÄ»º´æ£¬¿ÉÒÔ´æ·ÅÔÚmemory¡¢jdbc¡¢fileµÈµÈ¡£
sink£ºsink×é¼þÊÇÓÃÓÚ°ÑÊý¾Ý·¢Ë͵½Ä¿µÄµØµÄ×é¼þ£¬Ä¿µÄµØ°üÀ¨hdfs¡¢logger¡¢avro¡¢thrift¡¢ipc¡¢file¡¢null¡¢hbase¡¢solr¡¢×Ô¶¨Òå¡£
4¡¢flumeµÄÔËÐлúÖÆ
flumeµÄºËÐľÍÊÇÒ»¸öagent£¬Õâ¸öagent¶ÔÍâÓÐÁ½¸ö½øÐн»»¥µÄµØ·½£¬Ò»¸öÊǽÓÊÜÊý¾ÝµÄÊäÈ롪¡ªsource£¬Ò»¸öÊÇÊý¾ÝµÄÊä³ösink£¬sink¸ºÔð½«Êý¾Ý·¢Ë͵½Íⲿָ¶¨µÄÄ¿µÄµØ¡£source½ÓÊÕµ½Êý¾ÝÖ®ºó£¬½«Êý¾Ý·¢Ë͸øchannel£¬chanel×÷Ϊһ¸öÊý¾Ý»º³åÇø»áÁÙʱ´æ·ÅÕâЩÊý¾Ý£¬Ëæºósink»á½«channelÖеÄÊý¾Ý·¢Ë͵½Ö¸¶¨µÄµØ·½¡ª-ÀýÈçHDFSµÈ£¬×¢Ò⣺ֻÓÐÔÚsink½«channelÖеÄÊý¾Ý³É¹¦·¢ËͳöÈ¥Ö®ºó£¬channel²Å»á½«ÁÙʱÊý¾Ý½øÐÐɾ³ý£¬ÕâÖÖ»úÖÆ±£Ö¤ÁËÊý¾Ý´«ÊäµÄ¿É¿¿ÐÔÓ밲ȫÐÔ¡£
5¡¢flumeµÄ¹ãÒåÓ÷¨
flumeÖ®ËùÒÔÕâôÉñÆæ¡ª-ÆäÔÒòÒ²ÔÚÓÚflume¿ÉÒÔÖ§³Ö¶à¼¶flumeµÄagent£¬¼´flume¿ÉÒÔǰºóÏà¼Ì£¬ÀýÈçsink¿ÉÒÔ½«Êý¾Ýдµ½ÏÂÒ»¸öagentµÄsourceÖУ¬ÕâÑùµÄ»°¾Í¿ÉÒÔÁ¬³É´®ÁË£¬¿ÉÒÔÕûÌå´¦ÀíÁË¡£flume»¹Ö§³ÖÉÈÈë(fan-in)¡¢Éȳö(fan-out)¡£ËùνÉÈÈë¾ÍÊÇsource¿ÉÒÔ½ÓÊܶà¸öÊäÈ룬ËùνÉȳö¾ÍÊÇsink¿ÉÒÔ½«Êý¾ÝÊä³ö¶à¸öÄ¿µÄµØdestinationÖС£

£¨¶þ£©flumeÓ¦ÓáªÈÕÖ¾²É¼¯
¶ÔÓÚflumeµÄÔÀíÆäʵºÜÈÝÒ×Àí½â£¬ÎÒÃǸüÓ¦¸ÃÕÆÎÕflumeµÄ¾ßÌåʹÓ÷½·¨£¬flumeÌṩÁË´óÁ¿ÄÚÖõÄSource¡¢ChannelºÍSinkÀàÐÍ¡£¶øÇÒ²»Í¬ÀàÐ͵ÄSource¡¢ChannelºÍSink¿ÉÒÔ×ÔÓÉ×éºÏ¡ª¨C×éºÏ·½Ê½»ùÓÚÓû§ÉèÖõÄÅäÖÃÎļþ£¬·Ç³£Áé»î¡£±ÈÈ磺Channel¿ÉÒÔ°ÑʼþÔÝ´æÔÚÄÚ´æÀҲ¿ÉÒԳ־û¯µ½±¾µØÓ²ÅÌÉÏ¡£Sink¿ÉÒÔ°ÑÈÕ־дÈëHDFS,
HBase£¬ÉõÖÁÊÇÁíÍâÒ»¸öSourceµÈµÈ¡£ÏÂÃæÎÒ½«ÓþßÌåµÄ°¸ÀýÏêÊöflumeµÄ¾ßÌåÓ÷¨¡£
ÆäʵflumeµÄÓ÷¨ºÜ¼òµ¥¡ª-Êéдһ¸öÅäÖÃÎļþ£¬ÔÚÅäÖÃÎļþµ±ÖÐÃèÊösource¡¢channelÓësinkµÄ¾ßÌåʵÏÖ£¬¶øºóÔËÐÐÒ»¸öagentʵÀý£¬ÔÚÔËÐÐagentʵÀýµÄ¹ý³ÌÖлá¶ÁÈ¡ÅäÖÃÎļþµÄÄÚÈÝ£¬ÕâÑùflume¾Í»á²É¼¯µ½Êý¾Ý¡£
ÅäÖÃÎļþµÄ±àдÔÔò£º
1>´ÓÕûÌåÉÏÃèÊö´úÀíagentÖÐsources¡¢sinks¡¢channelsËùÉæ¼°µ½µÄ×é¼þ
# Name the components
on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 |
2>ÏêϸÃèÊöagentÖÐÿһ¸ösource¡¢sinkÓëchannelµÄ¾ßÌåʵÏÖ£º¼´ÔÚÃèÊösourceµÄʱºò£¬ÐèÒª
Ö¸¶¨sourceµ½µ×ÊÇʲôÀàÐ͵쬼´Õâ¸ösourceÊǽÓÊÜÎļþµÄ¡¢»¹ÊǽÓÊÜhttpµÄ¡¢»¹ÊǽÓÊÜthrift
µÄ£»¶ÔÓÚsinkÒ²ÊÇͬÀí£¬ÐèÒªÖ¸¶¨½á¹ûÊÇÊä³öµ½HDFSÖУ¬»¹ÊÇHbaseÖа¡µÈµÈ£»¶ÔÓÚchannel
ÐèÒªÖ¸¶¨ÊÇÄÚ´æ°¡£¬»¹ÊÇÊý¾Ý¿â°¡£¬»¹ÊÇÎļþ°¡µÈµÈ¡£
# Describe/configure
the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 |
3>ͨ¹ýchannel½«sourceÓësinkÁ¬½ÓÆðÀ´
# Bind the source
and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 |
Æô¶¯agentµÄshell²Ù×÷£º
flume-ng agent
-n a1 -c ../conf -f ../conf/example.file
-Dflume.root.logger=DEBUG,console |
²ÎÊý˵Ã÷£º -n Ö¸¶¨agentÃû³Æ(ÓëÅäÖÃÎļþÖдúÀíµÄÃû×ÖÏàͬ)
-c Ö¸¶¨flumeÖÐÅäÖÃÎļþµÄĿ¼
-f Ö¸¶¨ÅäÖÃÎļþ
-Dflume.root.logger=DEBUG,console ÉèÖÃÈÕÖ¾µÈ¼¶
¾ßÌå°¸Àý£º
°¸Àý1£º NetCat Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÍøÂç¶Ë¿Ú£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸ö¶Ë¿ÚÀïÃæÐ´Êý¾Ý£¬Õâ¸ösource×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½ÐÅÏ¢¡£
ÆäÖÐ Sink£ºlogger Channel£ºmemory
flume¹ÙÍøÖÐNetCat SourceÃèÊö£º
Property Name
Default Description
channels ¨C
type ¨C The component type name, needs to be netcat
bind ¨C ÈÕÖ¾ÐèÒª·¢Ë͵½µÄÖ÷»úÃû»òÕßIpµØÖ·£¬¸ÃÖ÷»úÔËÐÐ×ÅnetcatÀàÐ͵ÄsourceÔÚ¼àÌý
port ¨C ÈÕÖ¾ÐèÒª·¢Ë͵½µÄ¶Ë¿ÚºÅ£¬¸Ã¶Ë¿ÚºÅÒªÓÐnetcatÀàÐ͵ÄsourceÔÚ¼àÌý |
a) ±àдÅäÖÃÎļþ£º
# Name the components
on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.80.80
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 |
b) Æô¶¯flume agent a1 ·þÎñ¶Ë
flume-ng agent
-n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console |
c) ʹÓÃtelnet·¢ËÍÊý¾Ý
telnet 192.168.80.80
44444 big data world£¡£¨windowsÖÐÔËÐеģ© |
d) ÔÚ¿ØÖÆÌ¨Éϲ鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º 
°¸Àý2£ºNetCat Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÍøÂç¶Ë¿Ú£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸ö¶Ë¿ÚÀïÃæÐ´Êý¾Ý£¬Õâ¸ösource×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½ÐÅÏ¢¡£
ÆäÖÐ Sink£ºhdfs Channel£ºfile (Ïà±ÈÓÚ°¸Àý1µÄÁ½¸ö±ä»¯)
flume¹ÙÍøÖÐHDFS SinkµÄÃèÊö£º 
a) ±àдÅäÖÃÎļþ£º
# Name the components
on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.80.80
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 |
b) Æô¶¯flume agent a1 ·þÎñ¶Ë
flume-ng agent
-n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console |
c) ʹÓÃtelnet·¢ËÍÊý¾Ý
telnet 192.168.80.80
44444 big data world£¡£¨windowsÖÐÔËÐеģ© |
d) ÔÚHDFSÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º 
°¸Àý3£ºSpooling Directory Source£º¼àÌýÒ»¸öÖ¸¶¨µÄĿ¼£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸öÖ¸¶¨µÄĿ¼ÖÐÌí¼ÓеÄÎļþ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÐÅÏ¢£¬²¢½âÎö¸ÃÎļþµÄÄÚÈÝ£¬È»ºóдÈëµ½channle¡£Ð´ÈëÍê³Éºó£¬±ê¼Ç¸ÃÎļþÒÑÍê³É»òÕßɾ³ý¸ÃÎļþ¡£ÆäÖÐ
Sink£ºlogger Channel£ºmemory
flume¹ÙÍøÖÐSpooling Directory SourceÃèÊö£º
Property Name
Default Description
channels ¨C
type ¨C The component type name, needs to be spooldir.
spoolDir ¨C Spooling Directory Source¼àÌýµÄĿ¼
fileSuffix .COMPLETED ÎļþÄÚÈÝдÈëµ½channelÖ®ºó£¬±ê¼Ç¸ÃÎļþ
deletePolicy never ÎļþÄÚÈÝдÈëµ½channelÖ®ºóµÄɾ³ý²ßÂÔ: never
or immediate
fileHeader false Whether to add a header storing
the absolute path filename.
ignorePattern ^$ Regular expression specifying
which files to ignore (skip)
interceptors ¨C Ö¸¶¨´«ÊäÖÐeventµÄhead(Í·ÐÅÏ¢)£¬³£ÓÃtimestamp |
Spooling Directory SourceµÄÁ½¸ö×¢ÒâÊÂÏ
¢ÙIf a file is
written to after being placed into the spooling
directory, Flume will print an error to its log
file and stop processing.
¼´£º¿½±´µ½spoolĿ¼ÏµÄÎļþ²»¿ÉÒÔÔÙ´ò¿ª±à¼
¢ÚIf a file name is reused at a later time, Flume
will print an error to its log file and stop processing.
¼´£º²»Äܽ«¾ßÓÐÏàͬÎļþÃû×ÖµÄÎļþ¿½±´µ½Õâ¸öĿ¼Ï |
a) ±àдÅäÖÃÎļþ£º
# Name the components
on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datainput
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 |
b) Æô¶¯flume agent a1 ·þÎñ¶Ë
flume-ng agent
-n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console |
c) ʹÓÃcpÃüÁîÏòSpooling Directory Öз¢ËÍÊý¾Ý
cp datafile
/usr/local/datainput (×¢£ºdatafileÖеÄÄÚÈÝΪ£ºbig data
world£¡) |
d) ÔÚ¿ØÖÆÌ¨Éϲ鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º 
´Ó¿ØÖÆÌ¨ÏÔʾµÄ½á¹û¿ÉÒÔ¿´³öeventµÄÍ·ÐÅÏ¢Öаüº¬ÁËʱ¼ä´ÁÐÅÏ¢¡£
ͬʱÎÒÃDz鿴һÏÂSpooling DirectoryÖеÄdatafileÐÅÏ¢¡ª-ÎļþÄÚÈÝдÈëµ½channelÖ®ºó£¬¸ÃÎļþ±»±ê¼ÇÁË£º
[root@hadoop80
datainput]# ls
datafile.COMPLETED |
°¸Àý4£ºSpooling Directory Source£º¼àÌýÒ»¸öÖ¸¶¨µÄĿ¼£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸öÖ¸¶¨µÄĿ¼ÖÐÌí¼ÓеÄÎļþ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÐÅÏ¢£¬²¢½âÎö¸ÃÎļþµÄÄÚÈÝ£¬È»ºóдÈëµ½channle¡£Ð´ÈëÍê³Éºó£¬±ê¼Ç¸ÃÎļþÒÑÍê³É»òÕßɾ³ý¸ÃÎļþ¡£
ÆäÖÐ Sink£ºhdfs Channel£ºfile (Ïà±ÈÓÚ°¸Àý3µÄÁ½¸ö±ä»¯)
a) ±àдÅäÖÃÎļþ£º
# Name the components
on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datainput
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 |
b) Æô¶¯flume agent a1 ·þÎñ¶Ë
flume-ng agent
-n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console |
c) ʹÓÃcpÃüÁîÏòSpooling Directory Öз¢ËÍÊý¾Ý
cp datafile
/usr/local/datainput (×¢£ºdatafileÖеÄÄÚÈÝΪ£ºbig data
world£¡) |
d) ÔÚ¿ØÖÆÌ¨ÉÏ¿ÉÒԲο´sinkµÄÔËÐнø¶ÈÈÕÖ¾£º 
d) ÔÚHDFSÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º 

´Ó°¸Àý1Óë°¸Àý2¡¢°¸Àý3Óë°¸Àý4µÄ¶Ô±ÈÖÐÎÒÃÇ¿ÉÒÔ·¢ÏÖ£ºflumeµÄÅäÖÃÎļþÔÚ±àдµÄ¹ý³ÌÖÐÊǷdz£Áé»îµÄ¡£
°¸Àý5£ºExec Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÃüÁ»ñȡһÌõÃüÁîµÄ½á¹û×÷ΪËüµÄÊý¾ÝÔ´
³£ÓõÄÊÇtail -F fileÖ¸Á¼´Ö»ÒªÓ¦ÓóÌÐòÏòÈÕÖ¾(Îļþ)ÀïÃæÐ´Êý¾Ý£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½ÈÕÖ¾(Îļþ)ÖÐ×îеÄÄÚÈÝ
¡£ ÆäÖÐ Sink£ºhdfs Channel£ºfile
Õâ¸ö°¸ÁÐΪÁË·½±ãÏÔʾExec SourceµÄÔËÐÐЧ¹û£¬½áºÏHiveÖеÄexternal table½øÐÐÀ´ËµÃ÷¡£
a) ±àдÅäÖÃÎļþ£º
# Name the components
on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/log.file
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 |
b)ÔÚhiveÖн¨Á¢Íⲿ±í¡ª¨Chdfs://hadoop80:9000/dataoutputµÄĿ¼£¬·½±ã²é¿´ÈÕÖ¾²¶»ñÄÚÈÝ
hive> create
external table t1(infor string) > row format
delimited > fields terminated by '\t'
> location '/dataoutput/';
OK
Time taken: 0.284 seconds |
c) Æô¶¯flume agent a1 ·þÎñ¶Ë
flume-ng agent
-n a1 -c ../conf -f ../conf/exec.conf -Dflume.root.logger=DEBUG,console |
d) ʹÓÃechoÃüÁîÏò/usr/local/datainput Öз¢ËÍÊý¾Ý
d) ÔÚHDFSºÍHive·Ö±ðÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º 
hive> select
* from t1;
OK
big data
Time taken: 0.086 seconds |
e)ʹÓÃechoÃüÁîÏò/usr/local/datainput ÖÐÔÚ×·¼ÓÒ»ÌõÊý¾Ý
echo big data
world! >> log.file |
d) ÔÚHDFSºÍHiveÔٴηֱðÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º 

hive> select
* from t1;
OK
big data
big data world!
Time taken: 0.511 seconds |
×ܽáExec source£ºExec sourceºÍSpooling Directory SourceÊÇÁ½ÖÖ³£ÓõÄÈÕÖ¾²É¼¯µÄ·½Ê½£¬ÆäÖÐExec
source¿ÉÒÔʵÏÖ¶ÔÈÕÖ¾µÄʵʱ²É¼¯£¬Spooling Directory SourceÔÚ¶ÔÈÕÖ¾µÄʵʱ²É¼¯ÉÏÉÔÓÐǷȱ£¬¾¡¹ÜExec
source¿ÉÒÔʵÏÖ¶ÔÈÕÖ¾µÄʵʱ²É¼¯£¬µ«Êǵ±Flume²»ÔËÐлòÕßÖ¸ÁîÖ´Ðгö´íʱ£¬Exec source½«ÎÞ·¨ÊÕ¼¯µ½ÈÕÖ¾Êý¾Ý£¬ÈÕÖ¾»á³öÏÖ¶ªÊ§£¬´Ó¶øÎÞ·¨±£Ö¤ÊÕ¼¯ÈÕÖ¾µÄÍêÕûÐÔ¡£
°¸Àý6£ºAvro Source£º¼àÌýÒ»¸öÖ¸¶¨µÄAvro ¶Ë¿Ú£¬Í¨¹ýAvro ¶Ë¿Ú¿ÉÒÔ»ñÈ¡µ½Avro
client·¢Ë͹ýÀ´µÄÎļþ ¡£¼´Ö»ÒªÓ¦ÓóÌÐòͨ¹ýAvro ¶Ë¿Ú·¢ËÍÎļþ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÎļþÖеÄÄÚÈÝ¡£
ÆäÖÐ Sink£ºhdfs Channel£ºfile
(×¢£ºAvroºÍThrift¶¼ÊÇһЩÐòÁл¯µÄÍøÂç¶Ë¿Ú¨Cͨ¹ýÕâÐ©ÍøÂç¶Ë¿Ú¿ÉÒÔ½ÓÊÜ»òÕß·¢ËÍÐÅÏ¢£¬Avro¿ÉÒÔ·¢ËÍÒ»¸ö¸ø¶¨µÄÎļþ¸øFlume£¬Avro
ԴʹÓÃAVRO RPC»úÖÆ)
Avro SourceÔËÐÐÔÀíÈçÏÂͼ£º 
flume¹ÙÍøÖÐAvro SourceµÄÃèÊö£º
Property Name
Default Description
channels ¨C
type ¨C The component type name, needs to be avro
bind ¨C ÈÕÖ¾ÐèÒª·¢Ë͵½µÄÖ÷»úÃû»òÕßip£¬¸ÃÖ÷»úÔËÐÐ×ÅARVOÀàÐ͵Äsource
port ¨C ÈÕÖ¾ÐèÒª·¢Ë͵½µÄ¶Ë¿ÚºÅ£¬¸Ã¶Ë¿ÚÒªÓÐARVOÀàÐ͵ÄsourceÔÚ¼àÌý |
1)±àдÅäÖÃÎļþ
# Name the components
on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.80.80
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 |
b) Æô¶¯flume agent a1 ·þÎñ¶Ë
flume-ng agent
-n a1 -c ../conf -f ../conf/avro.conf -Dflume.root.logger=DEBUG,console |
c)ʹÓÃavro-client·¢ËÍÎļþ
flume-ng avro-client
-c ../conf -H 192.168.80.80 -p 4141 -F /usr/local/log.file |
×¢£ºlog.fileÎļþÖеÄÄÚÈÝΪ£º
[root@hadoop80
local]# more log.file
big data
big data world! |
d) ÔÚHDFSÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º 


ͨ¹ýÉÏÃæµÄ¼¸¸ö°¸Àý£¬ÎÒÃÇ¿ÉÒÔ·¢ÏÖ£ºflumeÅäÖÃÎļþµÄÊéдÊÇÏ൱Áé»îµÄ¡ª-²»Í¬ÀàÐ͵ÄSource¡¢ChannelºÍSink¿ÉÒÔ×ÔÓÉ×éºÏ£¡
×îºó¶ÔÉÏÃæÓõöflume source½øÐÐÊʵ±×ܽ᣺
¢Ù NetCat Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÍøÂç¶Ë¿Ú£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸ö¶Ë¿ÚÀïÃæÐ´Êý¾Ý£¬Õâ¸ösource×é¼þ
¾Í¿ÉÒÔ»ñÈ¡µ½ÐÅÏ¢¡£
¢ÚSpooling Directory Source£º¼àÌýÒ»¸öÖ¸¶¨µÄĿ¼£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸öÖ¸¶¨µÄĿ¼ÖÐÌí¼ÓеÄÎÄ
¼þ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÐÅÏ¢£¬²¢½âÎö¸ÃÎļþµÄÄÚÈÝ£¬È»ºóдÈëµ½channle¡£Ð´ÈëÍê³Éºó£¬±ê¼Ç
¸ÃÎļþÒÑÍê³É»òÕßɾ³ý¸ÃÎļþ¡£
¢ÛExec Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÃüÁ»ñȡһÌõÃüÁîµÄ½á¹û×÷ΪËüµÄÊý¾ÝÔ´
³£ÓõÄÊÇtail -F fileÖ¸Á¼´Ö»ÒªÓ¦ÓóÌÐòÏòÈÕÖ¾(Îļþ)ÀïÃæÐ´Êý¾Ý£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½ÈÕÖ¾(Îļþ)ÖÐ×îеÄÄÚÈÝ
¡£
¢ÜAvro Source£º¼àÌýÒ»¸öÖ¸¶¨µÄAvro ¶Ë¿Ú£¬Í¨¹ýAvro ¶Ë¿Ú¿ÉÒÔ»ñÈ¡µ½Avro client·¢Ë͹ýÀ´µÄÎļþ
¡£¼´Ö»ÒªÓ¦ÓóÌÐòͨ¹ýAvro ¶Ë¿Ú·¢ËÍÎļþ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÎļþÖеÄÄÚÈÝ¡£
|