±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄÖ÷Òª½éÉܿɵü´úµÄÁ÷£¨IterativeStream£©ÒÔ¼°·´À¡Á÷µÈÏà¹ØÄÚÈÝ¡£ |
|
µ±Ç°Flink½«µü´úµÄÖØÐļ¯ÖÐÔÚÅú´¦ÀíÉÏ£¬Ö®Ç°ÎÒÃÇ̸¼°ÁËÅúÁ¿µü´úºÍÔöÁ¿µü´úÖ÷ÒªÊÇÕë¶ÔÅú´¦Àí£¨DataSet£©API¶øÑԵ쬲¢ÇÒFlinkΪÅú´¦ÀíÖеĵü´úÌṩÁËÕë¶ÔÐÔµÄÓÅ»¯¡£µ«ÊǶÔÓÚÁ÷´¦Àí£¨DataStream£©£¬FlinkͬÑùÌṩÁ˶Եü´úµÄÖ§³Ö£¬ÕâÒ»½ÚÎÒÃÇÖ÷ÒªÀ´·ÖÎöÁ÷´¦ÀíÖеĵü´ú£¬ÎÒÃǽ«»á¿´µ½Á÷´¦ÀíÖеĵü´úÏà½ÏÓÚÅú´¦ÀíÓÐÏàËÆÖ®´¦£¬µ«²îÒìÒ²ÊÇÊ®·ÖÖ®Ã÷ÏÔ¡£
¿Éµü´úµÄÁ÷´¦Àí³ÌÐòÔÊÐí¶¨Òå¡°²½º¯Êý¡±£¨step function£©²¢½«ÆäÄÚǶµ½Ò»¸ö¿Éµü´úµÄÁ÷£¨IterativeStream£©ÖС£ÒòΪһ¸öÁ÷´¦Àí³ÌÐò¿ÉÄÜÓÀ²»ÖÕÖ¹£¬Òò´Ë²»Í¬ÓÚÅú´¦ÀíÖеĵü´ú»úÖÆ£¬Á÷´¦ÀíÖÐÎÞ·¨ÉèÖõü´úµÄ×î´ó´ÎÊý¡£È¡¶ø´úÖ®µÄÊÇ£¬Äã¿ÉÒÔÖ¸¶¨µÈ´ý·´À¡ÊäÈëµÄ×î´óʱ¼ä¼ä¸ô£¨Èç¹û³¬¹ý¸Ãʱ¼ä¼ä¸ôûÓз´À¡ÔªËص½À´£¬ÄÇô¸Ãµü´ú½«»áÖÕÖ¹£©¡£Í¨¹ýÓ¦ÓÃsplit»òfilterת»»£¬Äã¿ÉÒÔÖ¸¶¨Á÷µÄÄÄÒ»²¿·ÖÓÃÓÚ·´À¡¸øµü´úÍ·£¬ÄÄÒ»²¿·Ö·Ö·¢¸øÏÂÓΡ£ÕâÀïÎÒÃÇÒÔfilter×÷ΪʾÀýÀ´Õ¹Ê¾¿Éµü´úµÄÁ÷´¦Àí³ÌÐòµÄAPIʹÓÃģʽ¡£
Ê×ÏÈ£¬»ùÓÚÊäÈëÁ÷¹¹½¨IterativeStream£¬ÕâÊÇÒ»¸öµü´úµÄÆðʼ£¬Í¨³£³ÆÖ®Îªµü´úÍ·£º
IterativeStream<Integer>
iteration = inputStream.iterate(); |
½Ó×Å£¬ÎÒÃÇÖ¸¶¨Ò»ÏµÁеÄת»»²Ù×÷ÓÃÓÚ±íÊöÔÚµü´ú¹ý³ÌÖÐÖ´ÐеÄÂß¼£¨ÕâÀï¼òµ¥ÒÔmapת»»×÷ΪʾÀý£©£¬map
APIËù½ÓÊܵÄUDF¾ÍÊÇÎÒÃÇÉÏÎÄËù˵µÄ²½º¯Êý£º
DataStream<Integer>
iteratedStream = iteration.map(/* this is executed
many times */); |
È»ºó£¬×÷Ϊµü´úÎÒÃǿ϶¨ÐèÒªÓÐÊý¾Ý·´À¡¸øµü´úÍ·½øÐÐÖØ¸´¼ÆË㣬ËùÒÔÎÒÃÇ´Óµü´ú¹ýµÄÁ÷ÖйýÂ˳ö·ûºÏÌõ¼þµÄÔªËØ×é³ÉµÄ²¿·ÖÁ÷£¬ÎÒÃdzÆÖ®Îª·´À¡Á÷£º
DataStream<Integer>
feedbackStream = iteratedStream.filter(/* one
part of the stream */); |
½«·´À¡Á÷·´À¡¸øµü´úÍ·¾ÍÒâζ×ÅÒ»¸öµü´úµÄÍêÕûÂß¼µÄÍê³É£¬ÄÇôËü¾Í¿ÉÒÔ¡°¹Ø±Õ¡±Õâ¸ö±ÕºÏµÄ¡°»·¡±ÁË¡£Í¨¹ýµ÷ÓÃIterativeStreamµÄcloseWithÕâһʵÀý·½·¨¿ÉÒԹرÕÒ»¸öµü´ú£¨Ò²¿É±íÊöΪ¶¨ÒåÁ˵ü´ú⣩¡£´«µÝ¸øcloseWithµÄÊý¾ÝÁ÷½«»á·´À¡¸øµü´úÍ·£º
iteration.closeWith(feedbackStream); |
ÁíÍ⣬һ¸ö¹ßÓõÄģʽÊǹýÂ˳öÐèÒª¼ÌÐøÏòǰ·Ö·¢µÄ²¿·ÖÁ÷£¬Õâ¸ö¹ýÂËת»»Æäʵ¶¨ÒåµÄÊÇ¡°ÖÕÖ¹µü´ú¡±µÄÂß¼Ìõ¼þ£¬·ûºÏÌõ¼þµÄÔªËØ½«±»·Ö·¢¸øÏÂÓζø²»ÓÃÓÚ½øÐÐÏÂÒ»´Îµü´ú£º
DataStream<Integer>
output = iteratedStream.filter(/* some other part
of the stream */); |
¸ú·ÖÎöÅú´¦ÀíÖеĵü´úÒ»Ñù£¬ÎÒÃÇÈÔÈ»ÒÔ½â¾öʵ¼ÊÎÊÌâµÄ°¸Àý×÷ΪÇÐÈëµãÀ´¿´¿´Á÷´¦ÀíÖеĵü´ú¸úÅú´¦ÀíÖеĵü´úÓкβ»Í¬¡£
Ê×ÏÈÃèÊöÒ»ÏÂÐèÒª½â¾öµÄÎÊÌ⣺²úÉúÒ»¸öÓÉһϵÁжþÔª×飨Á½¸ö×ֶζ¼ÊÇÔÚÒ»¸öÇø¼äÄÚ²úÉúµÄÕýÕûÊýÀ´×÷Ϊ쳲¨ÄÇÆõÊýÁеÄÁ½¸ö³õʼֵ£©¹¹³ÉµÄÊý¾ÝÁ÷£¬È»ºó¶Ô¸ÃÊý¾ÝÁ÷ÖеĶþÔª×é²»¶ÏµØµü´úʹÆä²úÉúì³²¨ÄÇÆõÊýÁУ¬Ö±µ½Ä³´Î²úÉúµÄÖµ´óÓÚ¸ø¶¨µÄãÐÖµ£¬ÔòÍ£Ö¹µü´ú²¢Êä³öµü´ú´ÎÊý¡£
¸Ã°¸Àý²Î¿¼×ÔFlinkËæÔ´Âë·¢²¼µÄµü´úʾÀý£¬´Ë°¸ÀýÎÊÌâ¹æÄ£½ÏС²¢ÇÒÄܹ»ËµÃ÷ÎÊÌâ¡£µ«ËüʾÀý´úÂëÖеÄһϵÁбäÁ¿ÉÔÏÔ»ìÂÒ£¬ÎªÁËÔöÇ¿³ÌÐòµÄ±íÊöÐÔ£¬±ÊÕß»á¶ÔÆäÉÔ×÷µ÷Õû¡£
Õâ¸ö°¸ÀýÈç¹û²ð·Öµ½¶Ôµ¥¸öÔªËØ£¨¶þÔª×飩µÄ½Ç¶ÈÀ´¿´£¬ÆäÖ´Ðйý³ÌÈçÏÂͼËùʾ£º

n±íʾµü´ú´ÎÊý£¬ÔÚ×î³õµÄmapת»»Öгõʼ»¯Îª0£»mÊÇÅж¨µü´úÍ£Ö¹µÄãÐÖµ£»
ÁíÍ⣬TºóÃæ¸úµÄÊÇ×Ö¶ÎË÷Òý£¬±ÈÈçT2±íʾȡԪ×éÖÐλÖÃΪ3µÄ×ֶΡ£ÇÒ×¢ÒâËæ×ŵü´úTÔÚ²»¶Ï±ä»¯¡£
ÉÏÃæÎÒÃÇÒѾ¶ÔÎÊÌâµÄºËÐĹý³Ì½øÐÐÁË·ÖÎö£¬½ÓÏÂÀ´ÎÒÃÇ»á·Ö²½½â¾öÕâ¸öÎÊÌâµÄ¹¹½¨µü´úµÄÁ÷´¦Àí³ÌÐò¡£
Ê×ÏÈ£¬ÎÒÃÇÏÈͨ¹ýsourceº¯Êý´´½¨³õʼµÄÁ÷¶ÔÏóinputStream£º
DataStream<Tuple2<Integer,
Integer>> inputStream = env.addSource(new
RandomFibonacciSource()); |
¸Ãsourceº¯Êý»áÉú³É¶þÔª×éÐòÁУ¬¶þÔª×éµÄÁ½¸ö×Ö¶ÎÖµÊÇËæ»úÉú³ÉµÄ×÷Ϊ쳲¨ÄÇÆõÊýÁеijõʼֵ£º
private static
class RandomFibonacciSource
implements SourceFunction<Tuple2<Integer,
Integer>> {
private Random random = new Random();
private volatile boolean isRunning = true;
private int counter = 0;
public void run(SourceContext<Tuple2<Integer,
Integer>> ctx) throws Exception {
while (isRunning && counter < MAX_RANDOM_VALUE)
{
int first = random.nextInt(MAX_RANDOM_VALUE /
2 - 1) + 1;
int second = random.nextInt(MAX_RANDOM_VALUE /
2 -1) + 1;
if (first > second) continue;
ctx.collect(new Tuple2<Integer, Integer>(first,
second));
counter++;
Thread.sleep(50);
}
}
public void cancel() {
isRunning = false;
}
} |
ΪÁ˶ÔмÆËãµÄì³²¨ÄÇÆõÊýÁÐÖеÄÖµÒÔ¼°Àۼӵĵü´ú´ÎÊý½øÐд洢£¬ÎÒÃÇÐèÒª½«¶þÔª×éÊý¾ÝÁ÷ת»»ÎªÎåÔª×éÊý¾ÝÁ÷£¬²¢¾Ý´Ë´´½¨µü´ú¶ÔÏó£º
IterativeStream<Tuple5<Integer,
Integer, Integer, Integer, Integer>> iterativeStream
=
inputStream.map(new TupleTransformMapFunction()).iterate(5000); |
×¢ÒâÉÏÃæ´úÂë¶ÎÖÐiterate APIµÄ²ÎÊý5000£¬²»ÊÇÖ¸µü´ú5000´Î£¬¶øÊǵȴý·´À¡ÊäÈëµÄ×î´óʱ¼ä¼ä¸ôΪ5Ãë¡£Á÷±»ÈÏΪÊÇÎÞ½çµÄ£¬ËùÒÔÎÞ·¨ÏñÅú´¦Àíµü´úÄÇÑùÖ¸¶¨×î´óµü´ú´ÎÊý¡£µ«ËüÔÊÐíÖ¸¶¨Ò»¸ö×î´óµÈ´ý¼ä¸ô£¬Èç¹ûÔÚ¸ø¶¨µÄʱ¼ä¼ä¸ôÀïûÓÐÔªËØµ½À´£¬ÄÇô½«»áÖÕÖ¹µü´ú¡£
Ôª×éת»»µÄmapº¯ÊýʵÏÖ£º
private static
class TupleTransformMapFunction extends RichMapFunction<Tuple2<Integer,
Integer>, Tuple5<Integer, Integer, Integer,
Integer, Integer>> {
public Tuple5<Integer, Integer, Integer, Integer,
Integer> map(
Tuple2<Integer, Integer> inputTuples) throws
Exception {
return new Tuple5<Integer, Integer, Integer,
Integer, Integer>(
inputTuples.f0,
inputTuples.f1,
inputTuples.f0,
inputTuples.f1,
0);
}
} |
ÉÏÃæÎåÔª×éÖУ¬ÆäÖÐË÷ÒýΪ0£¬1ÕâÁ½¸öλÖõÄÔªËØ£¬Ê¼ÖÕ¶¼ÊÇ×î³õÉú³ÉµÄÁ½¸öÔªËØ²»»á±ä»¯£¬¶øºóÈý¸ö×ֶζ¼»áËæ×ŵü´ú¶ø±ä»¯¡£
ÔÚµü´úÁ÷iterativeStream´´½¨Íê³ÉÖ®ºó£¬ÎÒÃǽ«»ùÓÚËüÖ´ÐÐì³²¨ÄÇÆõÊýÁеIJ½º¯Êý²¢²úÉúì³²¨ÄÇÆõÊýÁÐÁ÷fibonacciStream£º
DataStream<Tuple5<Integer,
Integer, Integer, Integer, Integer>> fibonacciStream
=
iterativeStream.map(new FibonacciCalcStepFunction()); |
ÕâÀïµÄfibonacciStreamÖ»ÊÇÒ»¸ö´ú³Æ£¬ÆäÖеÄÊý¾Ý²¢²»ÊÇÕæÕýµÄì³²¨ÄÇÆõÊýÁУ¬Æäʵ¾ÍÊÇÉÏÃæÄǸöÎåÔª×é¡£
ÆäÖÐÓÃÓÚ¼ÆËãì³²¨ÄÇÆõÊýÁеIJ½º¯ÊýʵÏÖÈçÏ£º
private static
class FibonacciCalcStepFunction extends
RichMapFunction<Tuple5<Integer, Integer,
Integer, Integer, Integer>,
Tuple5<Integer, Integer, Integer, Integer,
Integer>> {
public Tuple5<Integer, Integer, Integer, Integer,
Integer> map(
Tuple5<Integer, Integer, Integer, Integer,
Integer> inputTuple) throws Exception {
return new Tuple5<Integer, Integer, Integer,
Integer, Integer>(
inputTuple.f0,
inputTuple.f1,
inputTuple.f3,
inputTuple.f2 + inputTuple.f3,
++inputTuple.f4);
}
} |
ÕýÈçÉÏÎÄËùÊö£¬ºóÈý¸ö×ֶλá²úÉú±ä»¯£¬ÔÚ¼ÆËã֮ǰ£¬ÊýÁÐ×îºóÒ»¸öÔªËØ»á±»±£Áô£¬Ò²¾ÍÊÇf3¶ÔÓ¦µÄÔªËØ£¬È»ºóͨ¹ýf2ÔªËØ¼ÓÉÏf3ÔªËØ»á²úÉú×îÐÂÖµ²¢¸üÐÂf3ÔªËØ£¬¶øf4Ôò»áÀÛ¼Ó¡£
Ëæ×ŵü´ú´ÎÊýÔö¼Ó£¬²»ÊÇÕû¸öÊýÁж¼»á±»±£Áô£¬Ö»ÓÐ×î³õµÄÁ½¸öÔªËØºÍ×îеÄÁ½¸öÔªËØ»á±»±£Áô£¬ÕâÀïҲû±ØÒª±£ÁôÕû¸öÊýÁУ¬ÒòΪÎÒÃDz»ÐèÒªÍêÕûµÄÊýÁУ¬ÎÒÃÇÖ»ÐèÒª¶Ô×îеÄÁ½¸öÔªËØ½øÐÐÅжϼ´¿É¡£
ÉÏÎÄÎÒÃǶÔÿ¸öÔªËØ¼ÆËãì³²¨ÄÇÆõÊýÁеÄÐÂÖµ²¢²úÉúÁËfibonacciStream£¬µ«ÊÇÎÒÃÇÐèÒª¶Ô×îеÄÁ½¸öÖµ½øÐÐÅжϣ¬¿´ËüÃÇÊÇ·ñ³¬¹ýÁËÖ¸¶¨µÄãÐÖµ¡£³¬¹ýÁËãÐÖµµÄÔª×齫»á±»Êä³ö£¬¶øÃ»Óг¬¹ýµÄÔò»áÔٴβÎÓëµü´ú¡£Òò´ËÕ⽫²úÉúÁ½¸ö²»Í¬µÄ·ÖÖ§£¬ÎÒÃÇҲΪ´Ë¹¹½¨ÁË·ÖÖ§Á÷£º
SplitStream<Tuple5<Integer,
Integer, Integer, Integer, Integer>> branchedStream
=
fibonacciStream.split(new FibonacciOverflowSelector()) |
¶ø¶ÔÊÇ·ñ³¬¹ýãÐÖµµÄÔª×é½øÐÐÅжϲ¢·ÖÀëµÄʵÏÖÈçÏ£º
private static
class FibonacciOverflowSelector implements OutputSelector<
Tuple5<Integer, Integer, Integer, Integer,
Integer>> {
public Iterable<String> select(
Tuple5<Integer, Integer, Integer, Integer,
Integer> inputTuple) {
if (inputTuple.f2 < OVERFLOW_THRESHOLD &&
inputTuple.f3 < OVERFLOW_THRESHOLD) {
return Collections.singleton(ITERATE_FLAG);
}
return Collections.singleton(OUTPUT_FLAG);
} } |
ÔÚɸѡ·½·¨selectÖУ¬ÎÒÃǶԲ»Í¬µÄ·ÖÖ§ÒÔ²»Í¬µÄ³£Á¿±êʶ·û½øÐбêʶ£ºITERATE_FLAG£¨»¹Òª¼ÌÐøµü´ú£©ºÍOUTPUT_FLAG£¨Ö±½ÓÊä³ö£©¡£
²úÉúÁË·ÖÖ§Á÷Ö®ºó£¬ÎÒÃǾͿÉÒÔ´ÓÖмì³ö²»Í¬µÄÁ÷·ÖÖ§×öµü´ú»òÕßÊä³ö´¦Àí¡£¶ÔÐèÒªÔٴεü´úµÄ£¬¾Íͨ¹ýµü´úÁ÷µÄcloseWith·½·¨·´À¡¸øµü´úÍ·£º
iterativeStream.closeWith(branchedStream.select (ITERATE_FLAG)); |
¶ø¶ÔÓÚ²»ÐèÒªµÄµü´ú¾ÍÖ±½ÓÈÃÆäÁ÷ÏòÏÂÓδ¦Àí£¬ÕâÀïÎÒÃÇÖ»ÊǼòµ¥µÃ½«Á÷¡°Öع¹¡±ÁËÒ»ÏÂÈ»ºóÖ±½ÓÊä³ö£º
DataStream<Tuple3<Integer,
Integer, Integer>> outputStream = branchedStream
.select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());
outputStream.print(); |
ËùνµÄÖØ¹¹¾ÍÊǽ«Ö®Ç°µÄÎåÔª×éÖØÐÂËõ¼õΪÈýÔª×飬ʵÏÖÈçÏ£º
private static
class BuildOutputTupleMapFunction extends RichMapFunction<
Tuple5<Integer, Integer, Integer, Integer,
Integer>,
Tuple3<Integer, Integer, Integer>> {
public Tuple3<Integer, Integer, Integer>
map(Tuple5<Integer, Integer, Integer, Integer,
Integer> inputTuple) throws Exception {
return new Tuple3<Integer, Integer, Integer>(
inputTuple.f0,
inputTuple.f1,
inputTuple.f4);
}
} |
×îÖÕÎÒÃǽ«»áµÃµ½ÀàËÆÈçϵÄÊä³ö£º
(7,14,5)
(18,37,3)
(3,46,3)
(23,32,3)
(31,43,2)
(13,45,2)
(37,42,2)
¡¡
ǰÁ½¸öÕûÊýÊÇì³²¨ÄÇÆõÊýÁеÄÁ½¸ö³õʼֵ£¬µÚÈý¸öÕûÊý±íʾÆäÐèÒª¾Àú¶àÉٴεü´úÆäì³²¨ÄÇÆõÊýÁÐ×îеÄÁ½¸öÖµ²Å»á³¬¹ýãÐÖµ¡£
×îÖÕÍêÕûµÄÖ÷¸É³ÌÐò´úÂëÈçÏ£º
public static
void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment().setBufferTimeout(1);
DataStream<Tuple2<Integer, Integer>>
inputStream = env.addSource(new RandomFibonacciSource());
IterativeStream<Tuple5<Integer, Integer,
Integer, Integer, Integer>> iterativeStream
=
inputStream.map(new TupleTransformMapFunction()).iterate (5000);
DataStream<Tuple5<Integer, Integer, Integer,
Integer, Integer>> fibonacciStream =
iterativeStream.map(new FibonacciCalcStepFunction());
SplitStream<Tuple5<Integer, Integer, Integer,
Integer, Integer>> branchedStream =
fibonacciStream.split(new FibonacciOverflowSelector());
iterativeStream.closeWith(branchedStream.select (ITERATE_FLAG));
DataStream<Tuple3<Integer, Integer, Integer>>
outputStream = branchedStream
.select(OUTPUT_FLAG).map( new BuildOutputTupleMapFunction());
outputStream.print();
env.execute("Streaming Iteration Example");
} |
|