Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Flume¼Ü¹¹ÒÔ¼°Ó¦ÓýéÉÜ
 
  3105  次浏览      28
 2018-1-23  
 
±à¼­ÍƼö:

±¾ÎÄÀ´×ÔÓÚcsdn ,´ÓFlumeµÄ¸ÅÄî ¡¢EventµÄ¸ÅÄî×÷ΪÆðµã£¬È»ºó½²½âÁ˼ܹ¹Ò»Ð©ÈÕÖ¾²É¼¯£¬´úÂëÐðÊöµÈµÈ¡£

ÔÚ¾ßÌå½éÉܱ¾ÎÄÄÚÈÝ֮ǰ£¬Ïȸø´ó¼Ò¿´Ò»ÏÂHadoopÒµÎñµÄÕûÌ忪·¢Á÷³Ì£º

´ÓHadoopµÄÒµÎñ¿ª·¢Á÷³ÌͼÖпÉÒÔ¿´³ö£¬ÔÚ´óÊý¾ÝµÄÒµÎñ´¦Àí¹ý³ÌÖУ¬¶ÔÓÚÊý¾ÝµÄ²É¼¯ÊÇÊ®·ÖÖØÒªµÄÒ»²½£¬Ò²ÊDz»¿É±ÜÃâµÄÒ»²½£¬´Ó¶øÒý³öÎÒÃDZ¾ÎĵÄÖ÷½Ç¡ªFlume¡£±¾ÎĽ«Î§ÈÆFlumeµÄ¼Ü¹¹¡¢FlumeµÄÓ¦ÓÃ(ÈÕÖ¾²É¼¯)½øÐÐÏêϸµÄ½éÉÜ¡£

£¨Ò»£©Flume¼Ü¹¹½éÉÜ

1¡¢FlumeµÄ¸ÅÄî

flumeÊÇ·Ö²¼Ê½µÄÈÕÖ¾ÊÕ¼¯ÏµÍ³£¬Ëü½«¸÷¸ö·þÎñÆ÷ÖеÄÊý¾ÝÊÕ¼¯ÆðÀ´²¢Ë͵½Ö¸¶¨µÄµØ·½È¥£¬±ÈÈç˵Ë͵½Í¼ÖеÄHDFS£¬¼òµ¥À´Ëµflume¾ÍÊÇÊÕ¼¯ÈÕÖ¾µÄ¡£

2¡¢EventµÄ¸ÅÄî

ÔÚÕâÀïÓбØÒªÏȽéÉÜÒ»ÏÂflumeÖÐeventµÄÏà¹Ø¸ÅÄflumeµÄºËÐÄÊǰÑÊý¾Ý´ÓÊý¾ÝÔ´(source)ÊÕ¼¯¹ýÀ´£¬ÔÚ½«ÊÕ¼¯µ½µÄÊý¾ÝË͵½Ö¸¶¨µÄÄ¿µÄµØ(sink)¡£ÎªÁ˱£Ö¤ÊäË͵Ĺý³ÌÒ»¶¨³É¹¦£¬ÔÚË͵½Ä¿µÄµØ(sink)֮ǰ£¬»áÏÈ»º´æÊý¾Ý(channel),´ýÊý¾ÝÕæÕýµ½´ïÄ¿µÄµØ(sink)ºó£¬flumeÔÚɾ³ý×Ô¼º»º´æµÄÊý¾Ý¡£

ÔÚÕû¸öÊý¾ÝµÄ´«ÊäµÄ¹ý³ÌÖУ¬Á÷¶¯µÄÊÇevent£¬¼´ÊÂÎñ±£Ö¤ÊÇÔÚevent¼¶±ð½øÐеġ£ÄÇôʲôÊÇeventÄØ£¿¡ª¨Cevent½«´«ÊäµÄÊý¾Ý½øÐзâ×°£¬ÊÇflume´«ÊäÊý¾ÝµÄ»ù±¾µ¥Î»£¬Èç¹ûÊÇÎı¾Îļþ£¬Í¨³£ÊÇÒ»ÐмǼ£¬eventÒ²ÊÇÊÂÎñµÄ»ù±¾µ¥Î»¡£event´Ósource£¬Á÷Ïòchannel£¬ÔÙµ½sink£¬±¾ÉíΪһ¸ö×Ö½ÚÊý×飬²¢¿ÉЯ´øheaders(Í·ÐÅÏ¢)ÐÅÏ¢¡£event´ú±í×ÅÒ»¸öÊý¾ÝµÄ×îСÍêÕûµ¥Ôª£¬´ÓÍⲿÊý¾ÝÔ´À´£¬ÏòÍⲿµÄÄ¿µÄµØÈ¥¡£

ΪÁË·½±ã´ó¼ÒÀí½â£¬¸ø³öÒ»ÕÅeventµÄÊý¾ÝÁ÷Ïòͼ£º

Ò»¸öÍêÕûµÄevent°üÀ¨£ºevent headers¡¢event body¡¢eventÐÅÏ¢(¼´Îı¾ÎļþÖеĵ¥ÐмǼ)£¬ÈçÏÂËùÒÔ£º

ÆäÖÐeventÐÅÏ¢¾ÍÊÇflumeÊÕ¼¯µ½µÄÈռǼǼ¡£

3¡¢flume¼Ü¹¹½éÉÜ

flumeÖ®ËùÒÔÕâôÉñÆæ£¬ÊÇÔ´ÓÚËü×ÔÉíµÄÒ»¸öÉè¼Æ£¬Õâ¸öÉè¼Æ¾ÍÊÇagent£¬agent±¾ÉíÊÇÒ»¸öjava½ø³Ì£¬ÔËÐÐÔÚÈÕÖ¾ÊÕ¼¯½Úµã¡ªËùνÈÕÖ¾ÊÕ¼¯½Úµã¾ÍÊÇ·þÎñÆ÷½Úµã¡£

agentÀïÃæ°üº¬3¸öºËÐĵÄ×é¼þ£ºsource¡ª->channel¡ª¨C>sink,ÀàËÆÉú²úÕß¡¢²Ö¿â¡¢Ïû·ÑÕߵļܹ¹¡£

source£ºsource×é¼þÊÇרÃÅÓÃÀ´ÊÕ¼¯Êý¾ÝµÄ£¬¿ÉÒÔ´¦Àí¸÷ÖÖÀàÐÍ¡¢¸÷ÖÖ¸ñʽµÄÈÕÖ¾Êý¾Ý,°üÀ¨avro¡¢thrift¡¢exec¡¢jms¡¢spooling directory¡¢netcat¡¢sequence generator¡¢syslog¡¢http¡¢legacy¡¢×Ô¶¨Òå¡£

channel£ºsource×é¼þ°ÑÊý¾ÝÊÕ¼¯À´ÒÔºó£¬ÁÙʱ´æ·ÅÔÚchannelÖУ¬¼´channel×é¼þÔÚagentÖÐÊÇרÃÅÓÃÀ´´æ·ÅÁÙʱÊý¾ÝµÄ¡ª¡ª¶Ô²É¼¯µ½µÄÊý¾Ý½øÐмòµ¥µÄ»º´æ£¬¿ÉÒÔ´æ·ÅÔÚmemory¡¢jdbc¡¢fileµÈµÈ¡£

sink£ºsink×é¼þÊÇÓÃÓÚ°ÑÊý¾Ý·¢Ë͵½Ä¿µÄµØµÄ×é¼þ£¬Ä¿µÄµØ°üÀ¨hdfs¡¢logger¡¢avro¡¢thrift¡¢ipc¡¢file¡¢null¡¢hbase¡¢solr¡¢×Ô¶¨Òå¡£

4¡¢flumeµÄÔËÐлúÖÆ

flumeµÄºËÐľÍÊÇÒ»¸öagent£¬Õâ¸öagent¶ÔÍâÓÐÁ½¸ö½øÐн»»¥µÄµØ·½£¬Ò»¸öÊǽÓÊÜÊý¾ÝµÄÊäÈ롪¡ªsource£¬Ò»¸öÊÇÊý¾ÝµÄÊä³ösink£¬sink¸ºÔð½«Êý¾Ý·¢Ë͵½Íⲿָ¶¨µÄÄ¿µÄµØ¡£source½ÓÊÕµ½Êý¾ÝÖ®ºó£¬½«Êý¾Ý·¢Ë͸øchannel£¬chanel×÷Ϊһ¸öÊý¾Ý»º³åÇø»áÁÙʱ´æ·ÅÕâЩÊý¾Ý£¬Ëæºósink»á½«channelÖеÄÊý¾Ý·¢Ë͵½Ö¸¶¨µÄµØ·½¡ª-ÀýÈçHDFSµÈ£¬×¢Ò⣺ֻÓÐÔÚsink½«channelÖеÄÊý¾Ý³É¹¦·¢ËͳöÈ¥Ö®ºó£¬channel²Å»á½«ÁÙʱÊý¾Ý½øÐÐɾ³ý£¬ÕâÖÖ»úÖÆ±£Ö¤ÁËÊý¾Ý´«ÊäµÄ¿É¿¿ÐÔÓ밲ȫÐÔ¡£

5¡¢flumeµÄ¹ãÒåÓ÷¨

flumeÖ®ËùÒÔÕâôÉñÆæ¡ª-ÆäÔ­ÒòÒ²ÔÚÓÚflume¿ÉÒÔÖ§³Ö¶à¼¶flumeµÄagent£¬¼´flume¿ÉÒÔǰºóÏà¼Ì£¬ÀýÈçsink¿ÉÒÔ½«Êý¾Ýдµ½ÏÂÒ»¸öagentµÄsourceÖУ¬ÕâÑùµÄ»°¾Í¿ÉÒÔÁ¬³É´®ÁË£¬¿ÉÒÔÕûÌå´¦ÀíÁË¡£flume»¹Ö§³ÖÉÈÈë(fan-in)¡¢Éȳö(fan-out)¡£ËùνÉÈÈë¾ÍÊÇsource¿ÉÒÔ½ÓÊܶà¸öÊäÈ룬ËùνÉȳö¾ÍÊÇsink¿ÉÒÔ½«Êý¾ÝÊä³ö¶à¸öÄ¿µÄµØdestinationÖС£

£¨¶þ£©flumeÓ¦ÓáªÈÕÖ¾²É¼¯

¶ÔÓÚflumeµÄÔ­ÀíÆäʵºÜÈÝÒ×Àí½â£¬ÎÒÃǸüÓ¦¸ÃÕÆÎÕflumeµÄ¾ßÌåʹÓ÷½·¨£¬flumeÌṩÁË´óÁ¿ÄÚÖõÄSource¡¢ChannelºÍSinkÀàÐÍ¡£¶øÇÒ²»Í¬ÀàÐ͵ÄSource¡¢ChannelºÍSink¿ÉÒÔ×ÔÓÉ×éºÏ¡ª¨C×éºÏ·½Ê½»ùÓÚÓû§ÉèÖõÄÅäÖÃÎļþ£¬·Ç³£Áé»î¡£±ÈÈ磺Channel¿ÉÒÔ°ÑʼþÔÝ´æÔÚÄÚ´æÀҲ¿ÉÒԳ־û¯µ½±¾µØÓ²ÅÌÉÏ¡£Sink¿ÉÒÔ°ÑÈÕ־дÈëHDFS, HBase£¬ÉõÖÁÊÇÁíÍâÒ»¸öSourceµÈµÈ¡£ÏÂÃæÎÒ½«ÓþßÌåµÄ°¸ÀýÏêÊöflumeµÄ¾ßÌåÓ÷¨¡£

ÆäʵflumeµÄÓ÷¨ºÜ¼òµ¥¡ª-Êéдһ¸öÅäÖÃÎļþ£¬ÔÚÅäÖÃÎļþµ±ÖÐÃèÊösource¡¢channelÓësinkµÄ¾ßÌåʵÏÖ£¬¶øºóÔËÐÐÒ»¸öagentʵÀý£¬ÔÚÔËÐÐagentʵÀýµÄ¹ý³ÌÖлá¶ÁÈ¡ÅäÖÃÎļþµÄÄÚÈÝ£¬ÕâÑùflume¾Í»á²É¼¯µ½Êý¾Ý¡£

ÅäÖÃÎļþµÄ±àдԭÔò£º

1>´ÓÕûÌåÉÏÃèÊö´úÀíagentÖÐsources¡¢sinks¡¢channelsËùÉæ¼°µ½µÄ×é¼þ

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

2>ÏêϸÃèÊöagentÖÐÿһ¸ösource¡¢sinkÓëchannelµÄ¾ßÌåʵÏÖ£º¼´ÔÚÃèÊösourceµÄʱºò£¬ÐèÒª

Ö¸¶¨sourceµ½µ×ÊÇʲôÀàÐ͵쬼´Õâ¸ösourceÊǽÓÊÜÎļþµÄ¡¢»¹ÊǽÓÊÜhttpµÄ¡¢»¹ÊǽÓÊÜthrift

µÄ£»¶ÔÓÚsinkÒ²ÊÇͬÀí£¬ÐèÒªÖ¸¶¨½á¹ûÊÇÊä³öµ½HDFSÖУ¬»¹ÊÇHbaseÖа¡µÈµÈ£»¶ÔÓÚchannel

ÐèÒªÖ¸¶¨ÊÇÄÚ´æ°¡£¬»¹ÊÇÊý¾Ý¿â°¡£¬»¹ÊÇÎļþ°¡µÈµÈ¡£

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

3>ͨ¹ýchannel½«sourceÓësinkÁ¬½ÓÆðÀ´

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Æô¶¯agentµÄshell²Ù×÷£º

flume-ng agent -n a1 -c ../conf -f ../conf/example.file
-Dflume.root.logger=DEBUG,console

²ÎÊý˵Ã÷£º -n Ö¸¶¨agentÃû³Æ(ÓëÅäÖÃÎļþÖдúÀíµÄÃû×ÖÏàͬ)

-c Ö¸¶¨flumeÖÐÅäÖÃÎļþµÄĿ¼

-f Ö¸¶¨ÅäÖÃÎļþ

-Dflume.root.logger=DEBUG,console ÉèÖÃÈÕÖ¾µÈ¼¶

¾ßÌå°¸Àý£º

°¸Àý1£º NetCat Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÍøÂç¶Ë¿Ú£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸ö¶Ë¿ÚÀïÃæÐ´Êý¾Ý£¬Õâ¸ösource×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½ÐÅÏ¢¡£ ÆäÖÐ Sink£ºlogger Channel£ºmemory

flume¹ÙÍøÖÐNetCat SourceÃèÊö£º

Property Name Default Description
channels ¨C
type ¨C The component type name, needs to be netcat
bind ¨C ÈÕÖ¾ÐèÒª·¢Ë͵½µÄÖ÷»úÃû»òÕßIpµØÖ·£¬¸ÃÖ÷»úÔËÐÐ×ÅnetcatÀàÐ͵ÄsourceÔÚ¼àÌý
port ¨C ÈÕÖ¾ÐèÒª·¢Ë͵½µÄ¶Ë¿ÚºÅ£¬¸Ã¶Ë¿ÚºÅÒªÓÐnetcatÀàÐ͵ÄsourceÔÚ¼àÌý

a) ±àдÅäÖÃÎļþ£º

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.80.80
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b) Æô¶¯flume agent a1 ·þÎñ¶Ë

flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console

c) ʹÓÃtelnet·¢ËÍÊý¾Ý

telnet 192.168.80.80 44444 big data world£¡£¨windowsÖÐÔËÐеģ©

d) ÔÚ¿ØÖÆÌ¨Éϲ鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º

°¸Àý2£ºNetCat Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÍøÂç¶Ë¿Ú£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸ö¶Ë¿ÚÀïÃæÐ´Êý¾Ý£¬Õâ¸ösource×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½ÐÅÏ¢¡£ ÆäÖÐ Sink£ºhdfs Channel£ºfile (Ïà±ÈÓÚ°¸Àý1µÄÁ½¸ö±ä»¯)

flume¹ÙÍøÖÐHDFS SinkµÄÃèÊö£º

a) ±àдÅäÖÃÎļþ£º

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.80.80
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b) Æô¶¯flume agent a1 ·þÎñ¶Ë

flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console

c) ʹÓÃtelnet·¢ËÍÊý¾Ý

telnet 192.168.80.80 44444 big data world£¡£¨windowsÖÐÔËÐеģ©

d) ÔÚHDFSÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º

°¸Àý3£ºSpooling Directory Source£º¼àÌýÒ»¸öÖ¸¶¨µÄĿ¼£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸öÖ¸¶¨µÄĿ¼ÖÐÌí¼ÓеÄÎļþ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÐÅÏ¢£¬²¢½âÎö¸ÃÎļþµÄÄÚÈÝ£¬È»ºóдÈëµ½channle¡£Ð´ÈëÍê³Éºó£¬±ê¼Ç¸ÃÎļþÒÑÍê³É»òÕßɾ³ý¸ÃÎļþ¡£ÆäÖÐ Sink£ºlogger Channel£ºmemory

flume¹ÙÍøÖÐSpooling Directory SourceÃèÊö£º

Property Name Default Description
channels ¨C
type ¨C The component type name, needs to be spooldir.
spoolDir ¨C Spooling Directory Source¼àÌýµÄĿ¼
fileSuffix .COMPLETED ÎļþÄÚÈÝдÈëµ½channelÖ®ºó£¬±ê¼Ç¸ÃÎļþ
deletePolicy never ÎļþÄÚÈÝдÈëµ½channelÖ®ºóµÄɾ³ý²ßÂÔ: never or immediate
fileHeader false Whether to add a header storing the absolute path filename.
ignorePattern ^$ Regular expression specifying which files to ignore (skip)
interceptors ¨C Ö¸¶¨´«ÊäÖÐeventµÄhead(Í·ÐÅÏ¢)£¬³£ÓÃtimestamp

Spooling Directory SourceµÄÁ½¸ö×¢ÒâÊÂÏ

¢ÙIf a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
¼´£º¿½±´µ½spoolĿ¼ÏµÄÎļþ²»¿ÉÒÔÔÙ´ò¿ª±à¼­
¢ÚIf a file name is reused at a later time, Flume will print an error to its log file and stop processing.
¼´£º²»Äܽ«¾ßÓÐÏàͬÎļþÃû×ÖµÄÎļþ¿½±´µ½Õâ¸öĿ¼ÏÂ

a) ±àдÅäÖÃÎļþ£º

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datainput
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b) Æô¶¯flume agent a1 ·þÎñ¶Ë

flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console

c) ʹÓÃcpÃüÁîÏòSpooling Directory Öз¢ËÍÊý¾Ý

cp datafile /usr/local/datainput (×¢£ºdatafileÖеÄÄÚÈÝΪ£ºbig data world£¡)

d) ÔÚ¿ØÖÆÌ¨Éϲ鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º

´Ó¿ØÖÆÌ¨ÏÔʾµÄ½á¹û¿ÉÒÔ¿´³öeventµÄÍ·ÐÅÏ¢Öаüº¬ÁËʱ¼ä´ÁÐÅÏ¢¡£

ͬʱÎÒÃDz鿴һÏÂSpooling DirectoryÖеÄdatafileÐÅÏ¢¡ª-ÎļþÄÚÈÝдÈëµ½channelÖ®ºó£¬¸ÃÎļþ±»±ê¼ÇÁË£º

[root@hadoop80 datainput]# ls
datafile.COMPLETED

°¸Àý4£ºSpooling Directory Source£º¼àÌýÒ»¸öÖ¸¶¨µÄĿ¼£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸öÖ¸¶¨µÄĿ¼ÖÐÌí¼ÓеÄÎļþ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÐÅÏ¢£¬²¢½âÎö¸ÃÎļþµÄÄÚÈÝ£¬È»ºóдÈëµ½channle¡£Ð´ÈëÍê³Éºó£¬±ê¼Ç¸ÃÎļþÒÑÍê³É»òÕßɾ³ý¸ÃÎļþ¡£ ÆäÖÐ Sink£ºhdfs Channel£ºfile (Ïà±ÈÓÚ°¸Àý3µÄÁ½¸ö±ä»¯)

a) ±àдÅäÖÃÎļþ£º

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/datainput
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b) Æô¶¯flume agent a1 ·þÎñ¶Ë

flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console

c) ʹÓÃcpÃüÁîÏòSpooling Directory Öз¢ËÍÊý¾Ý

cp datafile /usr/local/datainput (×¢£ºdatafileÖеÄÄÚÈÝΪ£ºbig data world£¡)

d) ÔÚ¿ØÖÆÌ¨ÉÏ¿ÉÒԲο´sinkµÄÔËÐнø¶ÈÈÕÖ¾£º

d) ÔÚHDFSÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º

´Ó°¸Àý1Óë°¸Àý2¡¢°¸Àý3Óë°¸Àý4µÄ¶Ô±ÈÖÐÎÒÃÇ¿ÉÒÔ·¢ÏÖ£ºflumeµÄÅäÖÃÎļþÔÚ±àдµÄ¹ý³ÌÖÐÊǷdz£Áé»îµÄ¡£

°¸Àý5£ºExec Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÃüÁ»ñȡһÌõÃüÁîµÄ½á¹û×÷ΪËüµÄÊý¾ÝÔ´

³£ÓõÄÊÇtail -F fileÖ¸Á¼´Ö»ÒªÓ¦ÓóÌÐòÏòÈÕÖ¾(Îļþ)ÀïÃæÐ´Êý¾Ý£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½ÈÕÖ¾(Îļþ)ÖÐ×îеÄÄÚÈÝ ¡£ ÆäÖÐ Sink£ºhdfs Channel£ºfile

Õâ¸ö°¸ÁÐΪÁË·½±ãÏÔʾExec SourceµÄÔËÐÐЧ¹û£¬½áºÏHiveÖеÄexternal table½øÐÐÀ´ËµÃ÷¡£

a) ±àдÅäÖÃÎļþ£º

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/log.file

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b)ÔÚhiveÖн¨Á¢Íⲿ±í¡ª¨Chdfs://hadoop80:9000/dataoutputµÄĿ¼£¬·½±ã²é¿´ÈÕÖ¾²¶»ñÄÚÈÝ

hive> create external table t1(infor string)
> row format delimited
> fields terminated by '\t'
> location '/dataoutput/';
OK
Time taken: 0.284 seconds

c) Æô¶¯flume agent a1 ·þÎñ¶Ë

flume-ng agent -n a1 -c ../conf -f ../conf/exec.conf -Dflume.root.logger=DEBUG,console

d) ʹÓÃechoÃüÁîÏò/usr/local/datainput Öз¢ËÍÊý¾Ý

echo big data > log.file

d) ÔÚHDFSºÍHive·Ö±ðÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º

hive> select * from t1;
OK
big data
Time taken: 0.086 seconds

e)ʹÓÃechoÃüÁîÏò/usr/local/datainput ÖÐÔÚ×·¼ÓÒ»ÌõÊý¾Ý

echo big data world! >> log.file

d) ÔÚHDFSºÍHiveÔٴηֱðÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º

hive> select * from t1;
OK
big data
big data world!
Time taken: 0.511 seconds

×ܽáExec source£ºExec sourceºÍSpooling Directory SourceÊÇÁ½ÖÖ³£ÓõÄÈÕÖ¾²É¼¯µÄ·½Ê½£¬ÆäÖÐExec source¿ÉÒÔʵÏÖ¶ÔÈÕÖ¾µÄʵʱ²É¼¯£¬Spooling Directory SourceÔÚ¶ÔÈÕÖ¾µÄʵʱ²É¼¯ÉÏÉÔÓÐǷȱ£¬¾¡¹ÜExec source¿ÉÒÔʵÏÖ¶ÔÈÕÖ¾µÄʵʱ²É¼¯£¬µ«Êǵ±Flume²»ÔËÐлòÕßÖ¸ÁîÖ´Ðгö´íʱ£¬Exec source½«ÎÞ·¨ÊÕ¼¯µ½ÈÕÖ¾Êý¾Ý£¬ÈÕÖ¾»á³öÏÖ¶ªÊ§£¬´Ó¶øÎÞ·¨±£Ö¤ÊÕ¼¯ÈÕÖ¾µÄÍêÕûÐÔ¡£

°¸Àý6£ºAvro Source£º¼àÌýÒ»¸öÖ¸¶¨µÄAvro ¶Ë¿Ú£¬Í¨¹ýAvro ¶Ë¿Ú¿ÉÒÔ»ñÈ¡µ½Avro client·¢Ë͹ýÀ´µÄÎļþ ¡£¼´Ö»ÒªÓ¦ÓóÌÐòͨ¹ýAvro ¶Ë¿Ú·¢ËÍÎļþ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÎļþÖеÄÄÚÈÝ¡£ ÆäÖÐ Sink£ºhdfs Channel£ºfile

(×¢£ºAvroºÍThrift¶¼ÊÇһЩÐòÁл¯µÄÍøÂç¶Ë¿Ú¨Cͨ¹ýÕâÐ©ÍøÂç¶Ë¿Ú¿ÉÒÔ½ÓÊÜ»òÕß·¢ËÍÐÅÏ¢£¬Avro¿ÉÒÔ·¢ËÍÒ»¸ö¸ø¶¨µÄÎļþ¸øFlume£¬Avro ԴʹÓÃAVRO RPC»úÖÆ)

Avro SourceÔËÐÐÔ­ÀíÈçÏÂͼ£º

flume¹ÙÍøÖÐAvro SourceµÄÃèÊö£º

Property Name Default Description
channels ¨C
type ¨C The component type name, needs to be avro
bind ¨C ÈÕÖ¾ÐèÒª·¢Ë͵½µÄÖ÷»úÃû»òÕßip£¬¸ÃÖ÷»úÔËÐÐ×ÅARVOÀàÐ͵Äsource
port ¨C ÈÕÖ¾ÐèÒª·¢Ë͵½µÄ¶Ë¿ÚºÅ£¬¸Ã¶Ë¿ÚÒªÓÐARVOÀàÐ͵ÄsourceÔÚ¼àÌý

1)±àдÅäÖÃÎļþ

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.80.80
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/flume/checkpoint
a1.channels.c1.dataDirs = /usr/flume/data

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

b) Æô¶¯flume agent a1 ·þÎñ¶Ë

flume-ng agent -n a1 -c ../conf -f ../conf/avro.conf -Dflume.root.logger=DEBUG,console

c)ʹÓÃavro-client·¢ËÍÎļþ

flume-ng avro-client -c ../conf -H 192.168.80.80 -p 4141 -F /usr/local/log.file

×¢£ºlog.fileÎļþÖеÄÄÚÈÝΪ£º

[root@hadoop80 local]# more log.file
big data
big data world!

d) ÔÚHDFSÖв鿴flumeÊÕ¼¯µ½µÄÈÕÖ¾Êý¾Ý£º

ͨ¹ýÉÏÃæµÄ¼¸¸ö°¸Àý£¬ÎÒÃÇ¿ÉÒÔ·¢ÏÖ£ºflumeÅäÖÃÎļþµÄÊéдÊÇÏ൱Áé»îµÄ¡ª-²»Í¬ÀàÐ͵ÄSource¡¢ChannelºÍSink¿ÉÒÔ×ÔÓÉ×éºÏ£¡

×îºó¶ÔÉÏÃæÓõöflume source½øÐÐÊʵ±×ܽ᣺

¢Ù NetCat Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÍøÂç¶Ë¿Ú£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸ö¶Ë¿ÚÀïÃæÐ´Êý¾Ý£¬Õâ¸ösource×é¼þ

¾Í¿ÉÒÔ»ñÈ¡µ½ÐÅÏ¢¡£

¢ÚSpooling Directory Source£º¼àÌýÒ»¸öÖ¸¶¨µÄĿ¼£¬¼´Ö»ÒªÓ¦ÓóÌÐòÏòÕâ¸öÖ¸¶¨µÄĿ¼ÖÐÌí¼ÓеÄÎÄ

¼þ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÐÅÏ¢£¬²¢½âÎö¸ÃÎļþµÄÄÚÈÝ£¬È»ºóдÈëµ½channle¡£Ð´ÈëÍê³Éºó£¬±ê¼Ç

¸ÃÎļþÒÑÍê³É»òÕßɾ³ý¸ÃÎļþ¡£

¢ÛExec Source£º¼àÌýÒ»¸öÖ¸¶¨µÄÃüÁ»ñȡһÌõÃüÁîµÄ½á¹û×÷ΪËüµÄÊý¾ÝÔ´

³£ÓõÄÊÇtail -F fileÖ¸Á¼´Ö»ÒªÓ¦ÓóÌÐòÏòÈÕÖ¾(Îļþ)ÀïÃæÐ´Êý¾Ý£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½ÈÕÖ¾(Îļþ)ÖÐ×îеÄÄÚÈÝ ¡£

¢ÜAvro Source£º¼àÌýÒ»¸öÖ¸¶¨µÄAvro ¶Ë¿Ú£¬Í¨¹ýAvro ¶Ë¿Ú¿ÉÒÔ»ñÈ¡µ½Avro client·¢Ë͹ýÀ´µÄÎļþ ¡£¼´Ö»ÒªÓ¦ÓóÌÐòͨ¹ýAvro ¶Ë¿Ú·¢ËÍÎļþ£¬source×é¼þ¾Í¿ÉÒÔ»ñÈ¡µ½¸ÃÎļþÖеÄÄÚÈÝ¡£

 

   
3105 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ