ÕªÒª£ºÒ»Ð©´óÊý¾Ý¿Í»§Ïë·ÖÎöÐÂÊý¾ÝÒÔ¶ÔÌØ¶¨Ê¼þ×÷³öÏìÓ¦,ËûÃÇ¿ÉÄÜÒѾ¶¨ÒåºÃ¹ÜµÀÀ´Ö´ÐÐÅú´¦Àí²Ù×÷£¬ÕâЩ¹ÜµÀÊÇÓÉAWS Data Pipeline¾«ÐÄе÷°²Åŵġ£
һЩ´óÊý¾Ý¿Í»§Ïë·ÖÎöÐÂÊý¾ÝÒÔ¶ÔÌØ¶¨Ê¼þ×÷³öÏìÓ¦,ËûÃÇ¿ÉÄÜÒѾ¶¨ÒåºÃ¹ÜµÀÀ´Ö´ÐÐÅú´¦Àí²Ù×÷£¬ÕâЩ¹ÜµÀÊÇÓÉAWS Data Pipeline¾«ÐÄе÷°²Åŵġ£Ê¼þ´¥·¢¹ÜµÀµÄʾÀýÖ®Ò»¾ÍÊǵ±Êý¾Ý·ÖÎöʦÔÚÒ»ÊÕµ½Êý¾Ý¾Í±ØÐë¶ÔÆä½øÐзÖÎöʱ£¬ÒÔ±ãËûÃÇ¿ÉÒÔÁ¢¿ÌÏòºÏ×÷»ï°é×÷³öÏàÓ¦¡£ÔÚÕâÖÖÇé¿öϵ÷¶È²»ÊÇ×îÓŵĽâ¾ö·½°¸£¬Ö÷ÒªÎÊÌâÊÇÈçºÎÔÚÈÎÒâʱ¼äʹÓÃÒÀÀµÓÚµ÷¶È³ÌÐòµÄData Pipelineµ÷¶ÈÊý¾Ý´¦Àí¹ý³Ì¡£
ÕâÀïÓÐÒ»¸ö½â¾ö·½°¸¡£Ê×ÏÈ£¬´´½¨Ò»¸ö¼òµ¥µÄ¹ÜµÀ£¬Ê¹ÓÃÀ´×ÔAmazon S3µÄÊý¾Ý¶Ô¹ÜµÀ½øÐвâÊÔ£¬È»ºóÌí¼ÓÒ»¸öAmazon SNSÖ÷Ì⣬ʹÆäÔڹܵÀÍê³Éʱ֪ͨ¿Í»§£¬ÒÔ±ãÊý¾Ý·ÖÎöʦÄܹ»²é¿´´¦Àí½á¹û¡£×îºó£¬´´½¨Ò»¸öAWS Lambdaº¯Êý£¬Ê¹ÆäÔÚÐÂÊý¾Ý±»³É¹¦Ìá½»µ½S3ͰÖÐʱ¼¤»îData Pipeline£¬Ôڴ˹ý³ÌÖУ¬²»ÓùÜÀíÈκε÷¶È»î¶¯¡£¸ÃƪÌû×Ó½«»áÏòÄãչʾÈçºÎʵÏÖÕâÒ»¹ý³Ì¡£
ÔÚData Pipeline»î¶¯¿É±»µ÷¶Èʱ£¬¿Í»§¿ÉÒÔ¶¨ÒåÏȾöÌõ¼þ¡£ÕâЩÏȾöÌõ¼þ¿ÉÒÔ¿´µ½Êý¾ÝÊÇ·ñ´æÔÚÓÚS3ÖУ¬È»ºó½øÐÐ×ÊÔ´·ÖÅä¡£µ«ÊÇ£¬ÔÚData PipelineÐèÒªËæÊ±±»¼¤»îʱ£¬Ê¹ÓÃLambdaÊÇÒ»ÖֺܺõÄ;¾¶¡£
¿Ë¡¹ÜµÀÒÔ±¸ºóÓÃ
ÔÚÕâÖÖ³¡¾°Ï£¬¿Í»§µÄ¹ÜµÀÒѾͨ¹ýһЩԤ¶¨µÄ»î¶¯±»¼¤»î£¬µ«ÊÇÏëÒªÄܹ»µ÷ÓÃÏàͬµÄ¹ÜµÀÒÔ¶Ôij¸öÌØ±ðʼþ£¬ÈçÌá½»ÐÂÊý¾Ýµ½S3ͰÖУ¬×÷³öÏìÓ¦¡£¿Í»§ÒѾ¿ª·¢ÁËÒ»¸ö´ïµ½Finished״̬µÄ¡°Ä£°å¡±¹ÜµÀ¡£
ÖØÐ·¢Æð¸Ã¹ÜµÀµÄÒ»ÖÖ·½·¨ÊÇÔÚS3ÖÐʹÓùܵÀ¶¨ÒåÀ´±£´æJSONÎļþ£¬Ê¹ÓÃËü´´½¨Ò»¸öйܵÀ¡£Ò»Ð©¿Í»§ÔÚS3ÖжÔÏàͬ¹ÜµÀÒÔ¶à¸ö°æ±¾µÄÐÎʽ´æ´¢£¬µ«ÊÇÓÖÏë¿Ë¡ºÍÖØÐÂʹÓÃ×î½ü¸Õ¸ÕÖ´ÐеÄÄǸö¹ÜµÀ°æ±¾¡£´ÓÒÑÍê³É¹ÜµÀÖлñÈ¡¹ÜµÀ¶¨Òå²¢´´½¨Ò»¸ö¿Ë¡¹ÜµÀ£¬ÕâÊÇ¿ÉÒÔÂú×ãÕâÖÖÒªÇóµÄ¼òµ¥·½·¨¡£ÕâÖÖ·½·¨ÒÀÀµÓÚ×î½ü±»Ö´ÐеĹܵÀ£¬²»ÐèÒª¿Í»§±£´æÀ´×ÔS3µÄ¹ÜµÀ°æ±¾×¢²á±í£¬Ò²²»ÐèҪ׷×Ù×î½ü±»Ö´Ðеİ汾¡£
¼´Ê¹¿Í»§ÏëÔÚS3Öб£ÁôÕâÑùµÄÒ»¸ö¹ÜµÀ×¢²á±í£¬ËûÃÇ¿ÉÄÜÒ²ÏëʹÓÃLambda API¼´Ê±´ÓÒ»¸ö¼È´æµÄ¹ÜµÀÖлñȡһ¸ö¹ÜµÀ¶¨Òå¡£ËûÃÇ¿ÉÄÜÓи´ÔÓµÄʼþÇý¶¯¹¤×÷Á÷³Ì£¬ÔÚÕâЩÁ÷³ÌÖУ¬ËûÃÇÐèÒª¿Ë¡ÒÑÍê³ÉµÄ¹ÜµÀ£¬ÖØÐÂÔËÐÐËüÃÇ£¬È»ºóɾ³ý¿Ë¡µÄ¹ÜµÀ¡£Õâ¾ÍÊÇΪʲôÊ×Ïȼì²â´¦ÓÚFinished״̬µÄ¹ÜµÀÊÇÈç´ËÖØÒªÁË¡£
ÔÚ±¾ÆªÌû×ÓÖУ¬ÎÒ»áÏòÄãչʾÈçºÎÍê³ÉÕâÑù¼´Ê±µÄ¹ÜµÀ¿Ë¡¡£ÔÚData PipelineÖÐûÓÐÖ±½Ó¿Ë¡API£¬ËùÒÔÄã¿ÉÒÔ½øÐм¸´ÎAPIµ÷ÓÃÍê³ÉÕâÒ»¹ý³Ì¡£ÎÒÒ²ÌṩÁË´úÂ룬ʹÄãÄܹ»É¾³ýÒÑÍê³ÉµÄ¹ýʱµÄ¿Ë¡¹ÜµÀ¡£
Èý²½Ê½¹¤×÷Á÷³Ì
- ´´½¨Ò»¸ö¼òµ¥¹ÜµÀÓÃÓÚ²âÊÔ¡£
- ´´½¨Ò»¸öSNS֪ͨ£¬ÔڹܵÀÍê³Éʱ֪ͨ·ÖÎöʦ¡£
- ´´½¨Ò»¸öLambdaº¯Êý£¬ÔÚÐÂÊý¾Ý±»Ìá½»µ½S3ͰÖÐʱ¼¤»î¹ÜµÀ
µÚÒ»²½£º´´½¨Ò»¸ö¼òµ¥¹ÜµÀ¡£

- ´ò¿ªAWS Data Pipeline¿ØÖÆÌ¨¡£
- Èç¹ûÔÚ¸ÃÓòÖл¹Ã»Óд´½¨¹ÜµÀ£¬¿ØÖÆÌ¨ÆÁÄ»½«»áչʾ½éÉÜÐÔµÄÐÅÏ¢¡£ÔÚÕâÖÖÇé¿öÏ£¬Ñ¡ÔñGet started now¡£Èç¹ûÔÚ¸ÃÓòÖÐÄãÒѾ´´½¨¹ý¹ÜµÀÁË£¬¿ØÖÆÌ¨½«»áÏÔʾÄãÔÚ¸ÃÓòÖд´½¨µÄËùÓйܵÀ¡£ÔÚÕâÖÖÇé¿öÏ£¬Ñ¡ÔñCreate new pipeline¡£
- ÊäÈëÃû³ÆºÍÃèÊöÐÅÏ¢¡£
- Ñ¡ÔñÒ»¸öElastic MapReduce (EMR)Ä£°å£¬È»ºóÑ¡ÔñRun once on pipeline activation¡£
- ÔÚStep×Ö¶ÎÖУ¬ÊäÈëÈçÏÂÐÅÏ¢£º
/home/hadoop/contrib/streaming/hadoop-streaming.jar,-input, s3n://elasticmapreduce/samples/wordcount/input,-output, s3://example-bucket/wordcount/output/#{@scheduledStartTime},-mapper, s3n://elasticmapreduce/samples/wordcount/wordSplitter.py,-reducer,aggregate |
Äã¿ÉÒÔµ÷ÕûAmazon EMR¼¯Èº½ÚµãµÄÊýÁ¿£¬Ñ¡Ôñ·Ö·¢·½Ê½¡£ÏëÒª»ñÈ¡¹ÜµÀ´´½¨µÄ¸ü¶àÐÅÏ¢£¬²Î¼ûGetting Started with AWS Data Pipeline¡£
µÚ¶þ²½£º´´½¨Ò»¸öSNSÖ÷Ìâ
ÏëÒª´´½¨Ò»¸öSNSÖ÷Ì⣬ִÐÐÒÔϲ½Ö裺
- ÔÚä¯ÀÀÆ÷µÄÒ»¸öÐÂҳǩÖУ¬´ò¿ªAmazon SNS console£¨Amazon SNS¿ØÖÆÌ¨£©¡£
- Ñ¡ÔñCreate topic¡£
- ÔÚTopic name×Ö¶ÎÖУ¬ÊäÈëÖ÷ÌâÃû³Æ¡£
- Ñ¡ÔñCreate topic¡£
Ñ¡ÔñÐÂÖ÷Ì⣬ȻºóÑ¡ÔñÖ÷ÌâARN¡£Topic DetailsÒ³Ãæ³öÏÖ

- ¿½±´Ö÷ÌâARNÓÃÓÚÏÂÒ»¸öÈÎÎñ¡£
- Ϊ¸ÃÖ÷Ìâ´´½¨¶©ÔÄÈÎÎñ£¬ÌṩÄãµÄµç×ÓÓʼþµØÖ·¡£AWS»á·¢Ë͵ç×ÓÓʼþÀ´È·ÈÏÄãµÄ¶©ÔĽá¹û¡£
ÏëÒªÔڹܵÀÖÐÅäÖÃÖ÷Ìâ֪ͨ¶¯×÷£¬Ö´ÐÐÒÔϲ½Öè
- ÔÚAWS Data Pipeline¿ØÖÆÌ¨£¬ÔÚArchitect´°¿ÚÖдò¿ªÄãµÄ¹ÜµÀ¡£
- ÔÚÓҲര¸ñÖУ¬Ñ¡ÔñOthers¡£
- ÔÚDefaultAction1Ï£¬Ö´ÐÐÈçϲ½Ö裺
- ÊäÈë֪ͨµÄÃû³Æ£¨ÈçMyEMRJobNotice£©
- ÔÚType×Ö¶ÎÖУ¬Ñ¡ÔñSnsAlarm¡£
- ÔÚSubject×Ö¶ÎÖУ¬ÊäÈëÊÂÓÉÐС£
- ÔÚTopic Arn×Ö¶ÎÖУ¬ÊäÈëÖ÷ÌâµÄARN¡£
- ÔÚMessage×Ö¶ÎÖУ¬ÊäÈëÏûÏ¢ÄÚÈÝ¡£
- LeaveRoleset to the default value.Role±£ÁôÆäĬÈÏÖµ¡£
±£´æ²¢¼¤»î¹ÜµÀ£¬È·±£ËüÄܳɹ¦Ö´ÐС£
µÚÈý²½£º´´½¨Ò»¸öLambdaº¯Êý
ÔÚLambda¿ØÖÆÌ¨ÖУ¬Ñ¡ÔñCreate a Lambda function¡£Äã¿ÉÒÔÑ¡ÔñÒ»¸öÀ¶Í¼»òÕßÖ»ÊÇÌø¹ýµÚÒ»²½£¬¼ÌÐø½øÐÐStep 2: Configure function£¨µÚ¶þ²½£ºÅäÖú¯Êý£©£¬Ôڸò½ÖèÖУ¬ÄãÌṩһ¸öº¯ÊýÃû³Æ£¨ÈçLambdaDP£©ºÍÒ»ÌõÃèÊöÐÅÏ¢£¬Ñ¡ÔñNode.js×÷ΪRuntime×ֶεÄÖµ¡£
²âÊԹܵÀÒѾÍê³É¡£Ä¿Ç°ÈÔ²»Ö§³ÖÖØÐÂÔËÐÐÒÑÍê³ÉµÄ¹ÜµÀ¡£ÒªÏëÖØÐÂÔËÐÐÒ»¸öÒÑÍê³É¹ÜµÀ£¬´ÓÄ£°åÖпË¡¸Ã¹ÜµÀ£¬Lambda»á´¥·¢Ò»¸öйܵÀ¡£Ã¿Ò»´ÎÇå³ýÀϵĿË¡¹ÜµÀʱ£¬Ä㽫ÐèÒªLambdaÀ´´´½¨Ò»¸öпˡ¹ÜµÀ¡£ÏÂÃæÊǰïÖúʵÏÖйܵÀ¿Ë¡µÄһЩº¯Êý¡£ÔÚLambda¿ØÖÆÌ¨ÖУ¬Ê¹ÓÃCode entry typeºÍEdit code inline×ֶΣ¬ÒÔÏÂÃæµÄ´úÂ뿪ʼ£º
console.log('Loading function'); var AWS = require('aws-sdk'); exports.handler = function(event, context) { var Data Pipeline = new AWS.Data Pipeline(); var pipeline2delete ='None'; var pipeline ='df-02¡.T'; ¡¡¡. } |
¶¨Òå¹ÜµÀID£¬Îª¿Ë¡¹ÜµÀID´´½¨Ò»¸ö±äÁ¿£¬±ÈÈçpipeline2delete¡£È»ºó£¬Ìí¼ÓÒ»¸öº¯Êý£¬Ö´ÐÐÏÂÃæµÄ´úÂ룬¼ì²éÇ°ÃæµÄÔËÐйý³ÌÖÐÒÅÁôÏÂÀ´µÄ¼È´æ¿Ë¡¹ÜµÀ£º
//Iterate over the list of pipelines and check if the pipeline clone already exists Data Pipeline.listPipelines(paramsall, function(err, data) { if (err) {console.log(err, err.stack); // an error occurred} else {console.log(data); // successful response for (var i in data.pipelineIdList){ if (data.pipelineIdList[i].name =='myLambdaSample') { pipeline2delete = data.pipelineIdList[i].id; console.log('Pipeline clone id to delete: ' + pipeline2delete); }; |
Èç¹ûÇ°ÃæµÄÔËÐйý³ÌÖÐÒÅÁôÏÂÀ´µÄÒÑÍê³É¿Ë¡¹ÜµÀÒѾ±»Ê¶±ð³öÀ´£¬Äã±ØÐëÔÚ¸ÃÑ»·Öе÷ÓÃɾ³ýº¯Êý¡£ÏÂÃæÕ¹Ê¾ÁËʵÏÖµ÷ÓõÄʾÀý´úÂ룺
var paramsd = {pipelineId: pipeline2delete /* required */}; Data Pipeline.deletePipeline(paramsd, function(err, data) { if (err) {console.log(err, err.stack); // an error occurred} else console.log('Old clone deleted ' + pipeline2delete + ' Create new clone now'); }); |
×îºó£¬ÄãÐèÒª½øÐÐÈý´ÎAPIµ÷Ó㬴ÓÔÀ´µÄData PipelineÄ£°åÖд´½¨Ò»¸öеĿË¡¡£ÏÂÃæÊÇÄã¿ÉÒÔʹÓõÄAPI£º
- getPipelineDefinition (for the finished pipeline)
- createPipeline
- putPipelineDefinition (from #1)
ÏÂÃæÊÇÕâÈý´Îµ÷ÓõÄʾÀý£º
1¡¢Ê¹ÓùܵÀ¶¨Òå´´½¨ÏÂÒ»¸ö¿Ë¡£º
var params = {pipelineId: pipeline}; Data Pipeline.getPipelineDefinition(params, function(err, definition) { if (err) console.log(err, err.stack); // an error occurred else { var params = { name: 'myLambdaSample', /* required */ uniqueId: 'myLambdaSample' /* required */ }; <b> </b> |
2¡¢Ê¹ÓÃÀ´×Ô¶¨Òå¶ÔÏóµÄ¿Ë¡¶¨Ò壺
Data Pipeline.createPipeline(params, function(err, pipelineIdObject) { if (err) console.log(err, err.stack); // an error occurred else { //new pipeline created with id=pipelineIdObject.pipelineId console.log(pipelineIdObject); // successful response //Create and activate pipeline var params = { pipelineId: pipelineIdObject.pipelineId, pipelineObjects: definition.pipelineObjects//(you can add parameter objects and values) |
3¡¢Ê¹ÓÃÀ´×ÔgetPipelineDefinition API½á¹ûµÄ¶¨Ò壺
Data Pipeline.putPipelineDefinition(params, function(err, data) { if (err) console.log(err, err.stack); else { Data Pipeline.activatePipeline(pipelineIdObject, function(err, data) { //Activate the pipeline finally if (err) console.log(err, err.stack); else console.log(data); }); } }); }}); }}); |
ÏÖÔÚÄã¾ß±¸ÁËLambdaº¯ÊýËùÐèµÄËùÓк¯Êýµ÷Óùý³Ì¡£ÄãÒ²¿ÉÒÔÖ´ÐÐÏÂÃæµÄ²½Ö轫ÕâЩµ÷Óùý³Ì´ò°ü³ÉÒ»¸ö¶ÀÁ¢µÄº¯Êý£º
ÊäÈëHandler×ֶεÄÖµ×÷Ϊº¯Êý£¨LambdaDP.index£©µÄÃû³Æ¡£
Role¡£¸ÃÑ¡Ïî¿ÉÒÔʹÄã·ÃÎÊÏñS3ºÍData PipelineÕâÑùµÄ×ÊÔ´¡£
- ±£ÁôMemoryºÍTimeoutµÄĬÈÏÖµ¡£
- Ñ¡ÔñNext£¬¼ì²éº¯Êý£¬Ñ¡ÔñCreate function¡£
- ÔÚEvent source×Ö¶ÎÖУ¬Ñ¡ÔñS3¡£
- Ìṩ¹ÜµÀËùʹÓõÄͰµÄÃû³Æ¡£
- ÔÚEvent type×ֶΣ¬Ñ¡ÔñPut¡£ÔÚÐÂÎļþ±»Ìá½»µ½Í°ÖÐʱ£¬¸ÃÑ¡Ïî»á¼¤»î¹ÜµÀ¡£
- ±£´æ¹ÜµÀ£¬ÉÏ´«Ò»¸öÊý¾ÝÎļþµ½S3ͰÖС£
- ¼ì²éData Pipeline¿ØÖÆÌ¨£¬È·±£Ð¹ܵÀÒѾ´´½¨Íê±Ï²¢Òѱ»¼¤»î£¨¹ÜµÀÍê³Éºó£¬ÄãÓ¦¸ÃÄÜÊÕµ½Ò»ÌõSNS֪ͨÏûÏ¢£©¡£
|