Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
FlinkÁ÷´¦ÀíÖ®µü´ú°¸Àý
 
  2116  次浏览      28
 2018-11-05
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄÖ÷Òª½éÉܿɵü´úµÄÁ÷£¨IterativeStream£©ÒÔ¼°·´À¡Á÷µÈÏà¹ØÄÚÈÝ¡£

µ±Ç°Flink½«µü´úµÄÖØÐļ¯ÖÐÔÚÅú´¦ÀíÉÏ£¬Ö®Ç°ÎÒÃÇ̸¼°ÁËÅúÁ¿µü´úºÍÔöÁ¿µü´úÖ÷ÒªÊÇÕë¶ÔÅú´¦Àí£¨DataSet£©API¶øÑԵ쬲¢ÇÒFlinkΪÅú´¦ÀíÖеĵü´úÌṩÁËÕë¶ÔÐÔµÄÓÅ»¯¡£µ«ÊǶÔÓÚÁ÷´¦Àí£¨DataStream£©£¬FlinkͬÑùÌṩÁ˶Եü´úµÄÖ§³Ö£¬ÕâÒ»½ÚÎÒÃÇÖ÷ÒªÀ´·ÖÎöÁ÷´¦ÀíÖеĵü´ú£¬ÎÒÃǽ«»á¿´µ½Á÷´¦ÀíÖеĵü´úÏà½ÏÓÚÅú´¦ÀíÓÐÏàËÆÖ®´¦£¬µ«²îÒìÒ²ÊÇÊ®·ÖÖ®Ã÷ÏÔ¡£

¿Éµü´úµÄÁ÷´¦Àí³ÌÐòÔÊÐí¶¨Òå¡°²½º¯Êý¡±£¨step function£©²¢½«ÆäÄÚǶµ½Ò»¸ö¿Éµü´úµÄÁ÷£¨IterativeStream£©ÖС£ÒòΪһ¸öÁ÷´¦Àí³ÌÐò¿ÉÄÜÓÀ²»ÖÕÖ¹£¬Òò´Ë²»Í¬ÓÚÅú´¦ÀíÖеĵü´ú»úÖÆ£¬Á÷´¦ÀíÖÐÎÞ·¨ÉèÖõü´úµÄ×î´ó´ÎÊý¡£È¡¶ø´úÖ®µÄÊÇ£¬Äã¿ÉÒÔÖ¸¶¨µÈ´ý·´À¡ÊäÈëµÄ×î´óʱ¼ä¼ä¸ô£¨Èç¹û³¬¹ý¸Ãʱ¼ä¼ä¸ôûÓз´À¡ÔªËص½À´£¬ÄÇô¸Ãµü´ú½«»áÖÕÖ¹£©¡£Í¨¹ýÓ¦ÓÃsplit»òfilterת»»£¬Äã¿ÉÒÔÖ¸¶¨Á÷µÄÄÄÒ»²¿·ÖÓÃÓÚ·´À¡¸øµü´úÍ·£¬ÄÄÒ»²¿·Ö·Ö·¢¸øÏÂÓΡ£ÕâÀïÎÒÃÇÒÔfilter×÷ΪʾÀýÀ´Õ¹Ê¾¿Éµü´úµÄÁ÷´¦Àí³ÌÐòµÄAPIʹÓÃģʽ¡£

Ê×ÏÈ£¬»ùÓÚÊäÈëÁ÷¹¹½¨IterativeStream£¬ÕâÊÇÒ»¸öµü´úµÄÆðʼ£¬Í¨³£³ÆÖ®Îªµü´úÍ·£º

IterativeStream<Integer> iteration = inputStream.iterate();

½Ó×Å£¬ÎÒÃÇÖ¸¶¨Ò»ÏµÁеÄת»»²Ù×÷ÓÃÓÚ±íÊöÔÚµü´ú¹ý³ÌÖÐÖ´ÐеÄÂß¼­£¨ÕâÀï¼òµ¥ÒÔmapת»»×÷ΪʾÀý£©£¬map APIËù½ÓÊܵÄUDF¾ÍÊÇÎÒÃÇÉÏÎÄËù˵µÄ²½º¯Êý£º

DataStream<Integer> iteratedStream = iteration.map(/* this is executed many times */);

È»ºó£¬×÷Ϊµü´úÎÒÃǿ϶¨ÐèÒªÓÐÊý¾Ý·´À¡¸øµü´úÍ·½øÐÐÖØ¸´¼ÆË㣬ËùÒÔÎÒÃÇ´Óµü´ú¹ýµÄÁ÷ÖйýÂ˳ö·ûºÏÌõ¼þµÄÔªËØ×é³ÉµÄ²¿·ÖÁ÷£¬ÎÒÃdzÆÖ®Îª·´À¡Á÷£º

DataStream<Integer> feedbackStream = iteratedStream.filter(/* one part of the stream */);

½«·´À¡Á÷·´À¡¸øµü´úÍ·¾ÍÒâζ×ÅÒ»¸öµü´úµÄÍêÕûÂß¼­µÄÍê³É£¬ÄÇôËü¾Í¿ÉÒÔ¡°¹Ø±Õ¡±Õâ¸ö±ÕºÏµÄ¡°»·¡±ÁË¡£Í¨¹ýµ÷ÓÃIterativeStreamµÄcloseWithÕâһʵÀý·½·¨¿ÉÒԹرÕÒ»¸öµü´ú£¨Ò²¿É±íÊöΪ¶¨ÒåÁ˵ü´ú⣩¡£´«µÝ¸øcloseWithµÄÊý¾ÝÁ÷½«»á·´À¡¸øµü´úÍ·£º

iteration.closeWith(feedbackStream);

ÁíÍ⣬һ¸ö¹ßÓõÄģʽÊǹýÂ˳öÐèÒª¼ÌÐøÏòǰ·Ö·¢µÄ²¿·ÖÁ÷£¬Õâ¸ö¹ýÂËת»»Æäʵ¶¨ÒåµÄÊÇ¡°ÖÕÖ¹µü´ú¡±µÄÂß¼­Ìõ¼þ£¬·ûºÏÌõ¼þµÄÔªËØ½«±»·Ö·¢¸øÏÂÓζø²»ÓÃÓÚ½øÐÐÏÂÒ»´Îµü´ú£º

DataStream<Integer> output = iteratedStream.filter(/* some other part of the stream */);

¸ú·ÖÎöÅú´¦ÀíÖеĵü´úÒ»Ñù£¬ÎÒÃÇÈÔÈ»ÒÔ½â¾öʵ¼ÊÎÊÌâµÄ°¸Àý×÷ΪÇÐÈëµãÀ´¿´¿´Á÷´¦ÀíÖеĵü´ú¸úÅú´¦ÀíÖеĵü´úÓкβ»Í¬¡£

Ê×ÏÈÃèÊöÒ»ÏÂÐèÒª½â¾öµÄÎÊÌ⣺²úÉúÒ»¸öÓÉһϵÁжþÔª×飨Á½¸ö×ֶζ¼ÊÇÔÚÒ»¸öÇø¼äÄÚ²úÉúµÄÕýÕûÊýÀ´×÷Ϊ쳲¨ÄÇÆõÊýÁеÄÁ½¸ö³õʼֵ£©¹¹³ÉµÄÊý¾ÝÁ÷£¬È»ºó¶Ô¸ÃÊý¾ÝÁ÷ÖеĶþÔª×é²»¶ÏµØµü´úʹÆä²úÉúì³²¨ÄÇÆõÊýÁУ¬Ö±µ½Ä³´Î²úÉúµÄÖµ´óÓÚ¸ø¶¨µÄãÐÖµ£¬ÔòÍ£Ö¹µü´ú²¢Êä³öµü´ú´ÎÊý¡£

¸Ã°¸Àý²Î¿¼×ÔFlinkËæÔ´Âë·¢²¼µÄµü´úʾÀý£¬´Ë°¸ÀýÎÊÌâ¹æÄ£½ÏС²¢ÇÒÄܹ»ËµÃ÷ÎÊÌâ¡£µ«ËüʾÀý´úÂëÖеÄһϵÁбäÁ¿ÉÔÏÔ»ìÂÒ£¬ÎªÁËÔöÇ¿³ÌÐòµÄ±íÊöÐÔ£¬±ÊÕß»á¶ÔÆäÉÔ×÷µ÷Õû¡£

Õâ¸ö°¸ÀýÈç¹û²ð·Öµ½¶Ôµ¥¸öÔªËØ£¨¶þÔª×飩µÄ½Ç¶ÈÀ´¿´£¬ÆäÖ´Ðйý³ÌÈçÏÂͼËùʾ£º

n±íʾµü´ú´ÎÊý£¬ÔÚ×î³õµÄmapת»»Öгõʼ»¯Îª0£»mÊÇÅж¨µü´úÍ£Ö¹µÄãÐÖµ£»

ÁíÍ⣬TºóÃæ¸úµÄÊÇ×Ö¶ÎË÷Òý£¬±ÈÈçT2±íʾȡԪ×éÖÐλÖÃΪ3µÄ×ֶΡ£ÇÒ×¢ÒâËæ×ŵü´úTÔÚ²»¶Ï±ä»¯¡£

ÉÏÃæÎÒÃÇÒѾ­¶ÔÎÊÌâµÄºËÐĹý³Ì½øÐÐÁË·ÖÎö£¬½ÓÏÂÀ´ÎÒÃÇ»á·Ö²½½â¾öÕâ¸öÎÊÌâµÄ¹¹½¨µü´úµÄÁ÷´¦Àí³ÌÐò¡£

Ê×ÏÈ£¬ÎÒÃÇÏÈͨ¹ýsourceº¯Êý´´½¨³õʼµÄÁ÷¶ÔÏóinputStream£º

DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());

¸Ãsourceº¯Êý»áÉú³É¶þÔª×éÐòÁУ¬¶þÔª×éµÄÁ½¸ö×Ö¶ÎÖµÊÇËæ»úÉú³ÉµÄ×÷Ϊ쳲¨ÄÇÆõÊýÁеijõʼֵ£º

private static class RandomFibonacciSource
implements SourceFunction<Tuple2<Integer, Integer>> {
private Random random = new Random();
private volatile boolean isRunning = true;
private int counter = 0;
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (isRunning && counter < MAX_RANDOM_VALUE) {
int first = random.nextInt(MAX_RANDOM_VALUE / 2 - 1) + 1;
int second = random.nextInt(MAX_RANDOM_VALUE / 2 -1) + 1;
if (first > second) continue;
ctx.collect(new Tuple2<Integer, Integer>(first, second));
counter++;
Thread.sleep(50);
}
}
public void cancel() {
isRunning = false;
}
}

ΪÁ˶ÔмÆËãµÄì³²¨ÄÇÆõÊýÁÐÖеÄÖµÒÔ¼°Àۼӵĵü´ú´ÎÊý½øÐд洢£¬ÎÒÃÇÐèÒª½«¶þÔª×éÊý¾ÝÁ÷ת»»ÎªÎåÔª×éÊý¾ÝÁ÷£¬²¢¾Ý´Ë´´½¨µü´ú¶ÔÏó£º

IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =
inputStream.map(new TupleTransformMapFunction()).iterate(5000);

×¢ÒâÉÏÃæ´úÂë¶ÎÖÐiterate APIµÄ²ÎÊý5000£¬²»ÊÇÖ¸µü´ú5000´Î£¬¶øÊǵȴý·´À¡ÊäÈëµÄ×î´óʱ¼ä¼ä¸ôΪ5Ãë¡£Á÷±»ÈÏΪÊÇÎÞ½çµÄ£¬ËùÒÔÎÞ·¨ÏñÅú´¦Àíµü´úÄÇÑùÖ¸¶¨×î´óµü´ú´ÎÊý¡£µ«ËüÔÊÐíÖ¸¶¨Ò»¸ö×î´óµÈ´ý¼ä¸ô£¬Èç¹ûÔÚ¸ø¶¨µÄʱ¼ä¼ä¸ôÀïûÓÐÔªËØµ½À´£¬ÄÇô½«»áÖÕÖ¹µü´ú¡£

Ôª×éת»»µÄmapº¯ÊýʵÏÖ£º

private static class TupleTransformMapFunction extends RichMapFunction<Tuple2<Integer,
Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
Tuple2<Integer, Integer> inputTuples) throws Exception {
return new Tuple5<Integer, Integer, Integer, Integer, Integer>(
inputTuples.f0,
inputTuples.f1,
inputTuples.f0,
inputTuples.f1,
0);
}
}

ÉÏÃæÎåÔª×éÖУ¬ÆäÖÐË÷ÒýΪ0£¬1ÕâÁ½¸öλÖõÄÔªËØ£¬Ê¼ÖÕ¶¼ÊÇ×î³õÉú³ÉµÄÁ½¸öÔªËØ²»»á±ä»¯£¬¶øºóÈý¸ö×ֶζ¼»áËæ×ŵü´ú¶ø±ä»¯¡£

ÔÚµü´úÁ÷iterativeStream´´½¨Íê³ÉÖ®ºó£¬ÎÒÃǽ«»ùÓÚËüÖ´ÐÐì³²¨ÄÇÆõÊýÁеIJ½º¯Êý²¢²úÉúì³²¨ÄÇÆõÊýÁÐÁ÷fibonacciStream£º

DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =
iterativeStream.map(new FibonacciCalcStepFunction());

ÕâÀïµÄfibonacciStreamÖ»ÊÇÒ»¸ö´ú³Æ£¬ÆäÖеÄÊý¾Ý²¢²»ÊÇÕæÕýµÄì³²¨ÄÇÆõÊýÁУ¬Æäʵ¾ÍÊÇÉÏÃæÄǸöÎåÔª×é¡£

ÆäÖÐÓÃÓÚ¼ÆËãì³²¨ÄÇÆõÊýÁеIJ½º¯ÊýʵÏÖÈçÏ£º

private static class FibonacciCalcStepFunction extends
RichMapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) throws Exception {
return new Tuple5<Integer, Integer, Integer, Integer, Integer>(
inputTuple.f0,
inputTuple.f1,
inputTuple.f3,
inputTuple.f2 + inputTuple.f3,
++inputTuple.f4);
}
}

ÕýÈçÉÏÎÄËùÊö£¬ºóÈý¸ö×ֶλá²úÉú±ä»¯£¬ÔÚ¼ÆËã֮ǰ£¬ÊýÁÐ×îºóÒ»¸öÔªËØ»á±»±£Áô£¬Ò²¾ÍÊÇf3¶ÔÓ¦µÄÔªËØ£¬È»ºóͨ¹ýf2ÔªËØ¼ÓÉÏf3ÔªËØ»á²úÉú×îÐÂÖµ²¢¸üÐÂf3ÔªËØ£¬¶øf4Ôò»áÀÛ¼Ó¡£

Ëæ×ŵü´ú´ÎÊýÔö¼Ó£¬²»ÊÇÕû¸öÊýÁж¼»á±»±£Áô£¬Ö»ÓÐ×î³õµÄÁ½¸öÔªËØºÍ×îеÄÁ½¸öÔªËØ»á±»±£Áô£¬ÕâÀïҲû±ØÒª±£ÁôÕû¸öÊýÁУ¬ÒòΪÎÒÃDz»ÐèÒªÍêÕûµÄÊýÁУ¬ÎÒÃÇÖ»ÐèÒª¶Ô×îеÄÁ½¸öÔªËØ½øÐÐÅжϼ´¿É¡£

ÉÏÎÄÎÒÃǶÔÿ¸öÔªËØ¼ÆËãì³²¨ÄÇÆõÊýÁеÄÐÂÖµ²¢²úÉúÁËfibonacciStream£¬µ«ÊÇÎÒÃÇÐèÒª¶Ô×îеÄÁ½¸öÖµ½øÐÐÅжϣ¬¿´ËüÃÇÊÇ·ñ³¬¹ýÁËÖ¸¶¨µÄãÐÖµ¡£³¬¹ýÁËãÐÖµµÄÔª×齫»á±»Êä³ö£¬¶øÃ»Óг¬¹ýµÄÔò»áÔٴβÎÓëµü´ú¡£Òò´ËÕ⽫²úÉúÁ½¸ö²»Í¬µÄ·ÖÖ§£¬ÎÒÃÇҲΪ´Ë¹¹½¨ÁË·ÖÖ§Á÷£º

SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =
fibonacciStream.split(new FibonacciOverflowSelector())

¶ø¶ÔÊÇ·ñ³¬¹ýãÐÖµµÄÔª×é½øÐÐÅжϲ¢·ÖÀëµÄʵÏÖÈçÏ£º

private static class FibonacciOverflowSelector implements OutputSelector<
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
public Iterable<String> select(
Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) {
if (inputTuple.f2 < OVERFLOW_THRESHOLD && inputTuple.f3 < OVERFLOW_THRESHOLD) {
return Collections.singleton(ITERATE_FLAG);
}
return Collections.singleton(OUTPUT_FLAG);
}
}

ÔÚɸѡ·½·¨selectÖУ¬ÎÒÃǶԲ»Í¬µÄ·ÖÖ§ÒÔ²»Í¬µÄ³£Á¿±êʶ·û½øÐбêʶ£ºITERATE_FLAG£¨»¹Òª¼ÌÐøµü´ú£©ºÍOUTPUT_FLAG£¨Ö±½ÓÊä³ö£©¡£

²úÉúÁË·ÖÖ§Á÷Ö®ºó£¬ÎÒÃǾͿÉÒÔ´ÓÖмì³ö²»Í¬µÄÁ÷·ÖÖ§×öµü´ú»òÕßÊä³ö´¦Àí¡£¶ÔÐèÒªÔٴεü´úµÄ£¬¾Íͨ¹ýµü´úÁ÷µÄcloseWith·½·¨·´À¡¸øµü´úÍ·£º

iterativeStream.closeWith(branchedStream.select
(ITERATE_FLAG));

¶ø¶ÔÓÚ²»ÐèÒªµÄµü´ú¾ÍÖ±½ÓÈÃÆäÁ÷ÏòÏÂÓδ¦Àí£¬ÕâÀïÎÒÃÇÖ»ÊǼòµ¥µÃ½«Á÷¡°Öع¹¡±ÁËÒ»ÏÂÈ»ºóÖ±½ÓÊä³ö£º

DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream
.select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());
outputStream.print();

ËùνµÄÖØ¹¹¾ÍÊǽ«Ö®Ç°µÄÎåÔª×éÖØÐÂËõ¼õΪÈýÔª×飬ʵÏÖÈçÏ£º

private static class BuildOutputTupleMapFunction extends RichMapFunction<
Tuple5<Integer, Integer, Integer, Integer, Integer>,
Tuple3<Integer, Integer, Integer>> {
public Tuple3<Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
Integer> inputTuple) throws Exception {
return new Tuple3<Integer, Integer, Integer>(
inputTuple.f0,
inputTuple.f1,
inputTuple.f4);
}
}

×îÖÕÎÒÃǽ«»áµÃµ½ÀàËÆÈçϵÄÊä³ö£º

(7,14,5)

(18,37,3)

(3,46,3)

(23,32,3)

(31,43,2)

(13,45,2)

(37,42,2)

¡­¡­

ǰÁ½¸öÕûÊýÊÇì³²¨ÄÇÆõÊýÁеÄÁ½¸ö³õʼֵ£¬µÚÈý¸öÕûÊý±íʾÆäÐèÒª¾­Àú¶àÉٴεü´úÆäì³²¨ÄÇÆõÊýÁÐ×îеÄÁ½¸öÖµ²Å»á³¬¹ýãÐÖµ¡£

×îÖÕÍêÕûµÄÖ÷¸É³ÌÐò´úÂëÈçÏ£º

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment().setBufferTimeout(1);
DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =
inputStream.map(new TupleTransformMapFunction()).iterate
(5000);
DataStream<Tuple5<Integer, Integer,
Integer, Integer, Integer>> fibonacciStream =
iterativeStream.map(new FibonacciCalcStepFunction());
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =
fibonacciStream.split(new FibonacciOverflowSelector());
iterativeStream.closeWith(branchedStream.select
(ITERATE_FLAG));
DataStream<Tuple3<Integer, Integer, Integer>>
outputStream = branchedStream
.select(OUTPUT_FLAG).map(
new BuildOutputTupleMapFunction());
outputStream.print();
env.execute("Streaming Iteration Example");
}
   
2116 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ