±¾ÎÄÒªµã
*ÏìӦʽ±à³ÌÊÇÒ»ÖÖ´¦ÀíÒì²½Êý¾ÝÁ÷µÄ¹æ·¶
*ÏìӦʽΪÊý¾ÝÁ÷µÄת»»ºÍ¾ÛºÏÒÔ¼°Êý¾ÝÁ÷µÄ¿ØÖƹÜÀíÌṩÁ˹¤¾ßÖ§³Ö
*µ¯Öé½»»¥Í¼£¨Marble Diagram£©ÒԿɽ»»¥µÄ·½Ê½¿ÉÊÓ»¯ÏìӦʽµÄ½á¹¹
*ÏìӦʽ±à³Ì·ç¸ñ¿´ÆðÀ´¸úJava Streams APIÓеãÏàËÆ£¬²»¹ý±¾ÖÊÉÏÊDz»Ò»ÑùµÄ
*ÈçºÎÁ¬½Óµ½¶¯Ì¬Á÷´¦ÀíÒì²½Êý¾ÝÔ´
±¾Îıü³Ðͬһ·ç¸ñ£¬ÓÃÏàͬµÄÀý×ÓÑÐò½¥½øµØ½éÉÜÁËRxJava2¡£ÒëÕß½«Á½ÆªÎÄÕÂÖеIJ»Í¬Ö®´¦ÓôÖÌå±êʶ³öÀ´£¬ÒÔ¹©¶Á¹ýǰƪµÄ¶ÁÕß¿ìËÙä¯ÀÀ±¾ÎÄ¡£
Ôڸ߲¢·¢±à³Ì·¶Ê½µÄ·¢Õ¹¹ý³ÌÖУ¬ÎÒÃÇʹÓùýºÜ¶à¹¤¾ß£¬±ÈÈçjava.util.concurrent°ü¡¢Akka
Streams¿ò¼Ü¡¢CompletableFutureÀàÒÔ¼°Netty¿ò¼Ü¡£ÏìӦʽ±à³Ì½üÀ´´óÊÜ»¶Ó£¬ÕâÒªµÃÒæÓÚËüÇ¿´óµÄ¹¦Äܺͽ¡×³µÄ¹¤¾ß°ü¡£
ÏìӦʽ±à³ÌÊÇÒ»ÖÖ´¦ÀíÒì²½Êý¾ÝÁ÷µÄ¹æ·¶£¬ËüΪÊý¾ÝÁ÷µÄת»»ºÍ¾ÛºÏÒÔ¼°Êý¾ÝÁ÷µÄ¿ØÖƹÜÀíÌṩÁ˹¤¾ßÖ§³Ö£¬ËüÈÿ¼Á¿³ÌÐòÕûÌåÉè¼ÆµÄ¹¤×÷±äµÃ¼òµ¥¡£
µ«ËüʹÓÃÆðÀ´²¢²»¼òµ¥£¬ËüµÄѧϰÇúÏßÒ²²¢²»Æ½Ì¹¡£¶ÔÓÚÎÒÃǵ±ÖеÄÄÇЩÊýѧ¼ÒÀ´Ëµ£¬Ñ§Ï°ÏìӦʽ¾ÍºÃ±Èµ±³õËûÃÇ´Óѧϰ±ê×¼´úÊýµÄÎÞÏòÁ¿¹ý¶Éµ½Ñ§Ï°ÏßÐÔ´úÊýµÄÏòÁ¿¡¢¾ØÕóºÍÕÅÁ¿£¬ËüÃÇʵ¼ÊÉÏÊDZ»µ¥Ôª»¯µÄÊý¾ÝÁ÷¡£´«Í³µÄ±à³ÌģʽÒÔ¶ÔÏóΪ»ù´¡£¬¶øÏìӦʽÒÔʼþÁ÷Ϊ»ù´¡¡£Ê¼þ¿ÉÄÜÒÔ¶àÖÖÐÎʽ³öÏÖ£¬±ÈÈç¶ÔÏó¡¢Êý¾ÝÔ´¡¢Êó±êÒÆ¶¯ÐÅÏ¢»òÕßÒì³£¡£ÔÚ´«Í³µÄ±à³Ì·¶Ê½À¡°Òì³£¡±Õâ¸ö´ÊÃèÊöµÄÊǶÔÒâÍâÇé¿öµÄ´¦Àí£¬ÒòΪÔÚÕâ¸ö±³¾°Ï£¬Ã»Óа´ÕÕÔ¤Ïë·¢ÉúµÄÇé¿ö¶¼ËãÒì³£¡£¶øÔÚÏìӦʽ±à³Ì·¶Ê½ÀÒ쳣ȴÊÇÒ»µÈ¹«Ãñ¡£ÒòΪÊý¾ÝÁ÷Ò»°ãÊÇÒì²½µÄ£¬ËùÒÔÅ׳öÒì³£ÊÇûÓÐÒâÒåµÄ£¬ÈκÎÒ»¸öÒì³£¶¼»á±»µ±³ÉÊý¾ÝÁ÷ÀïµÄÒ»¸öʼþ¡£
ÔÚÕâÆªÎÄÕÂÀÎÒÃÇ»á̽ÌÖÏìӦʽ±à³ÌµÄ»ù±¾ÔÀí£¬ÒÔÒ»ÖÖ½ÌÓëѧµÄ·½Ê½À´Ç¿»¯Ò»Ð©ÖØÒªµÄ¸ÅÄî¡£
Ê×ÏÈÒª¼ÇסµÄÊÇ£¬ÏìӦʽÀïËùÓеĶ«Î÷¶¼ÊÇÁ÷¡£Observable·â×°ÁËÁ÷£¬ÊÇ×î»ù±¾µÄµ¥Ôª¡£Á÷¿ÉÒÔ°üº¬Áã¸ö»ò¶à¸öʼþ£¬ÓÐδÍê³ÉºÍÒÑÍê³ÉÁ½ÖÖ״̬£¬¿ÉÒÔÕý³£½áÊøÒ²¿ÉÒÔ·¢Éú´íÎó¡£Èç¹ûÒ»¸öÁ÷Õý³£Íê³É»òÕß·¢Éú´íÎó£¬ËµÃ÷´¦Àí½áÊøÁË£¬ËäÈ»ÓÐЩ¹¤¾ß¿ÉÒÔ¶Ô´íÎó½øÐÐÖØÊÔ»òÕßʹÓò»Í¬µÄÁ÷Ìæ»»·¢Éú´íÎóµÄÁ÷¡£
ÔÚÔËÐÐÎÒÃǸø³öµÄÀý×Ó֮ǰ£¬ÐèÒª°ÑRxJavaµÄÒÀÀµ¼ÓÈëµ½ÏîÄ¿Àï¡£¿ÉÒÔÔÚMavenÀï¼ÓÈëÕâ¸öÒÀÀµ£º
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.0.5</version> </dependency> |
ObservableÀàÓм¸¸ö¾²Ì¬¹¤³§·½·¨ºÍʵÀý·½·¨£¬ËüÃDZ»ÓÃÀ´Éú³É¸÷ÖÖеÄObservable¶ÔÏ󣬻òÕß°ÑObservable¶ÔÏóÌí¼Óµ½¸ÐÐËȤµÄ´¦ÀíÁ÷³ÌÀï¡£ObservableÊǿɱäµÄ£¬ËùÒÔÕë¶ÔËüÃǵIJÙ×÷×ÜÊÇ»áÉú³ÉеÄObservable¶ÔÏó¡£ÎªÁ˸üºÃµØÀí½âÎÒÃǵÄÀý×Ó£¬ÎÒÃÇÏÈÀ´ÎÂϰһÏÂObservableµÄ»ù±¾²Ù×÷£¬ÒòΪÔÚºóÃæµÄÀý×ÓÀï»áÓõ½ËüÃÇ¡£
Observable.just·½·¨Éú³ÉÒ»¸ö¼òµ¥¶ÔÏó£¬È»ºó·µ»Ø¡£ÀýÈ磺
Observable.just("Howdy!") |
ÕâÐдúÂëÉú³ÉÒ»¸öеÄObservable¶ÔÏó£¬ÔÚ½áÊøÖ®Ç°´¥·¢Ò»¸öµ¥¶ÀµÄʼþ£¬Éú³É×Ö·û´®¡°Howdy!¡±¡£
¿ÉÒÔ°ÑÐÂÉú³ÉµÄObservable¶ÔÏ󸳸øÒ»¸öObservable±äÁ¿£º
Observable<String> hello = Observable.just("Howdy!"); |
²»¹ýÖªµÀÕâ¸ö»¹Ô¶Ô¶²»¹»¡£¾ÍÏñÄǸöÖøÃûµÄÕÜѧÎÊÌâÒ»Ñù£¬ÉÁÖÀïµÄÒ»¿ÅÊ÷µ¹ÏÂÀ´£¬Èç¹ûÖÜΧûÓÐÈËÌý¼û£¬ÄÇô¾ÍµÈÓÚ˵Ê÷µÄµ¹ÏÂÊÇÎÞÉùÎÞÏ¢µÄ¡£Ò»¸öObservable¶ÔÏó±ØÐëÒªÓÐÒ»¸ö¶©ÔÄÕßÀ´´¦ÀíËüËùÉú³ÉµÄʼþ¡£ËùÐÒµÄÊÇ£¬ÏÖÔÚJavaÖ§³ÖLambda±í´ïʽ£¬ÎÒÃǾͿÉÒÔʹÓüò½àµÄÉùÃ÷ʽ·ç¸ñÀ´±íʾ¶©ÔIJÙ×÷£º
Observable<String> howdy = Observable.just("Howdy!"); howdy.subscribe(System.out::println); |
Õâ¶Î´úÂëÈÔÈ»»áÉú³É×Ö·û´®¡°Howdy!¡±¡£
¸úObservableµÄÆäËü·½·¨Ò»Ñù£¬just·½·¨¿ÉÒÔ±»ÖØÔØ£º
Observable.just("Hello", "World") .subscribe(System.out::println); |
ÕâÐдúÂë»áÊä³ö
just·½·¨¿ÉÒÔ±»ÖØÔØ£¬×î¶à¿ÉÒÔ½ÓÊÕ10¸ö²ÎÊý¡£ÕâÀïҪעÒ⣬Êä³öµÄ½á¹û·Ö³ÉÁ½ÐÐÏÔʾ£¬ËµÃ÷ËüÃÇÊÇÁ½¸ö¶ÀÁ¢µÄʼþ¡£
ÈÃÎÒÃÇÀ´¿´¿´Èç¹ûʹÓÃÁбí»á·¢ÉúʲôÇé¿ö£º
List<String> words = Arrays.asList( "the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dog" );
Observable.just(words)
.subscribe(System.out::println); |
Õâ¶Î´úÂëÊä³öÒ»¸öºÜƽ³£µÄ½á¹û£º
[the, quick, brown, fox, jumped, over, the, lazy, dog] |
ÎÒÃDZ¾ÒÔΪÿ¸öµ¥´Ê»áÊÇÒ»¸öµ¥¶ÀµÄʼþ£¬µ«Êµ¼ÊÉÏÕû¸öÁÐ±í±»µ±³ÉÁËÒ»¸öʼþ¡£ÎªÁË´ïµ½ÎÒÃÇÏëÒªµÄ½á¹û£¬ÎÒÃÇÒýÈëfromIterable·½·¨£º
Observable.fromIterable(words) .subscribe(System.out::println); |
£¨×¢ÒâÔÚrxjava1ÖУ¬ÓÐÒ»¸öÖØÔØ¹ýµÄfrom·½·¨¡£¸Ã·½·¨Òѱ»¶àÖÖfrom·½·¨µÄ¾ßÌåʵÏÖËùÌæ´ú£¬°üÀ¨fromIterableºÍfromArrayµÈ¡££©
ÕâÐдúÂë°ÑÊý×é»òÕßÁбíת»»³ÉһϵÁÐʼþ£¬Ã¿¸öÔªËØ¾ÍÊÇÒ»¸öʼþ¡£
Ö´ÐÐÕâÐдúÂë»áµÃµ½ÎÒÃÇÏëÒªµÄ¶àÐÐÊä³ö£º
the quick brown fox jumped over the lazy dog |
ΪÁËÄÜ´ÓÖлñÈ¡±àºÅ£¬ÎÒÃÇÒªÔÚObservableÉ϶à×öһЩ¹¤×÷¡£
²»¹ýÔÚд´úÂë֮ǰ£¬ÎÒÃÇÏÈÀ´¿´¿´ÁíÍâÁ½¸ö²Ù×÷£¬rangeºÍzip¡£range(i,n)»á´´½¨Ò»¸ö°üº¬n¸öÊýµÄÁ÷£¬ËüµÄµÚÒ»¸öÊýÊÇ´Ói¿ªÊ¼µÄ¡£Èç¹ûÎÒÃÇÓа취°ÑÕâÖÖÇø¼äÁ÷¸úÉÏÃæµÄµ¥´ÊÁ÷×éºÏÔÚÒ»Æð£¬¾Í¿ÉÒÔ½â¾ö±àºÅµÄÎÊÌâ¡£
Observable.range(1, 5).subscribe(System.out::println); |
Êä³ö£º
Èç¹ûÓÐÒ»ÖÖ½«Çø¼äÁ÷ÓëÎÒÃǵĵ¥´ÊÁ÷ºÏ²¢µÄ·½·¨£¬¾Í»á½â¾öÌí¼Ó±àºÅµÄÎÊÌâ¡£
RX MarblesÕâ¸öÍøÕ¾¶ÔÎÒÃÇѧϰÏìӦʽ±à³ÌºÜÓаïÖú¡£Õâ¸öÍøÕ¾Ê¹ÓÃJavaScriptäÖȾ´ó²¿·ÖÏìӦʽ²Ù×÷£¬¶øÇÒÊǿɽ»»¥µÄ¡£Ã¿¸öÏìӦʽ²Ù×÷ʹÓá°µ¯Ö顱À´ÃèÊöÒ»¸ö»ò¶à¸öÔ´Á÷£¨source
stream£©ÒÔ¼°ÓɲÙ×÷Éú³ÉµÄ½á¹ûÁ÷£¨result stream£©¡£Ê±¼ä´Ó×óµ½ÓÒ£¬Ê¼þÓõ¯Öé±íʾ¡£µ¥»÷»òÕßÍ϶¯µ¯Ö飬¿ÉÒÔ¿´µ½ËüÃÇÊÇÈçºÎÓ°Ïì½á¹ûµÄ¡£
Ö´ÐÐÒ»¸özip²Ù×÷¾Í¸ú×ñÕÕÒ½ÖöÒ»Ñù¼òµ¥¡£ÈÃÎÒÃÇÓõ¯Öé½»»¥Í¼À´½âÊÍÒ»ÏÂÕâ¸ö¹ý³Ì£º

zip²Ù×÷ͨ¹ý³É¶ÔµÄ¡°zip¡±Ó³Éäת»»°ÑÔ´Á÷µÄÔªËØ¸úÁíÒ»¸ö¸ø¶¨Á÷µÄÔªËØ×éºÏÆðÀ´£¬ÆäÖеÄÓ³Éä¿ÉÒÔʹÓÃLambda±í´ïʽÀ´±íʾ¡£Ö»ÒªÆäÖеÄÒ»¸öÁ÷Íê³É²Ù×÷£¬Õû¸özip²Ù×÷Ò²¸ú×ÅÍ£Ö¹£¬ÁíÒ»¸öδÍê³ÉµÄÁ÷ʣϵÄʼþ¾Í»á±»ºöÂÔ¡£zip¿ÉÒÔÖ§³Ö×î¶à9¸öÔ´Á÷µÄzip²Ù×÷¡£zipWith²Ù×÷¿ÉÒÔ°ÑÒ»¸öÖ¸¶¨Á÷ºÏ²¢µ½Ò»¸öÒÑ´æÔÚµÄÁ÷Àï¡£
ÏÖÔڻص½ÎÒÃǵÄÀý×ÓÉÏ£¬ÎÒÃÇ¿ÉÒÔʹÓÃrangeºÍzipWith²Ù×÷¼ÓÈë±àºÅ£¬²¢ÓÃString.format×öÓ³Éäת»»£º
Observable.fromIterable(words) .zipWith(Observable.range(1, Integer.MAX_VALUE), (string, count)->String.format("%2d. %s", count, string)) .subscribe(System.out::println); |
Õâ¶Î´úÂë»áÊä³ö£º
1. the 2. quick 3. brown 4. fox 5. jumped 6. over 7. the 8. lazy 9. dog |
¿´ÆðÀ´ºÜ²»´í£¡×¢ÒâÒ»µ©ÈκÎÒ»¸öÁ÷µÄ²Ù×÷Íê³É£¬zipºÍzipWith²Ù×÷¾ÍÍ£Ö¹¶ÔËùÓÐÁ÷µÄÀÈ¡¡£Õâ¾ÍÊÇΪʲô²»ÊÜInteger.MAX_VALUEÉÏÏÞÏÞÖÆµÄÔÒò¡£
ÏÖÔÚ¼ÙÉèÎÒÃÇÒªÁгöµ¥´ÊÀïµÄ×Öĸ¶ø²»Êǵ¥´Ê±¾Éí£¬Õâ¸öʱºòÒªÓõ½flatMap£¬flatMap»á´ÓObservableÀï»ñȡʼþÔ´£¨¶ÔÏó¡¢¼¯ºÏ»òÊý×飩£¬²¢°ÑÕâÐ©ÔªËØ·Ö±ðÓ³Éä³ÉObservable£¬È»ºó°ÑÕâЩObservable±âƽ»¯³ÉÒ»¸öµ¥¶ÀµÄObservable¡£
¶ÔÓÚÎÒÃǵÄÀý×ÓÀ´Ëµ£¬ÎÒÃÇ»áÏÈÓÃsplit·½·¨°Ñÿ¸öµ¥´Ê²ð·Ö³ÉÒ»¸ö×ÖĸÊý×飬ȻºóÓÃflatMap´´½¨Ò»¸öеÄObservable¶ÔÏó£¬Õâ¸öObservable¶ÔÏó°üº¬ÁË×é³ÉÕâЩµ¥´ÊµÄËùÓÐ×Öĸ£º
Observable.fromIterable(words) .flatMap(word -> Observable.fromArray(word.split(""))) .zipWith(Observable.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string)) .subscribe(System.out::println); |
Õâ¶Î´úÂë»áÊä³ö£º
1. t 2. h 3. e 4. q 5. u 6. i 7. c 8. k ... 30. l 31. a 32. z 33. y 34. d 35. o 36. g |
ËùÓе¥´ÊµÄ×Öĸ¶¼³öÏÖÔÚÕâÀï¡£²»¹ýÕâÑùÌ«·±ËöÁË£¬ÎÒÃÇÏ£ÍûÏàͬµÄ×Öĸֻ³öÏÖÒ»´Î£º
Observable.fromIterable(words) .flatMap(word -> Observable.fromArray(word.split(""))) .distinct() .zipWith(Observable.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string)) .subscribe(System.out::println); |
Õâ¶Î´úÂëÊä³ö£º
1. t 2. h 3. e 4. q 5. u 6. i 7. c 8. k 9. b 10. r 11. o 12. w 13. n 14. f 15. x 16. j 17. m 18. p 19. d 20. v 21. l 22. a 23. z 24. y 25. g |
ÎÒÃÇ´ÓС±»¸æÖª¡°quick brown fox¡±Õâ¸öÈ«×Öĸ¶Ì¾ä°üº¬ÁËÓ¢ÓïÀïËùÓеÄ×Öĸ£¬²»¹ýÔÚÕâÀïÎÒÃÇÖ»¿´µ½25¸ö£¬¶ø²»ÊÇ26¸ö¡£ÏÖÔÚÈÃÎÒÃǶÔÕâЩ×Öĸ½øÐÐÅÅÐò£¬ÕÒ³ö¶ªÊ§µÄÄǸö×Öĸ£º
.flatMap(word -> Observable.fromIterable(word.split(""))) .distinct() .sorted() .zipWith(Observable.range(1, Integer.MAX_VALUE), (string, count) -> String.format("%2d. %s", count, string)) .subscribe(System.out::println); |
Õâ¶Î´úÂëÊä³ö£º
1. a 2. b 3. c ... 17. q 18. r 19. t 20. u 21. v 22. w 23. x 24. y 25. z |
¿´Ñù×ÓÊÇ×Öĸ¡°s¡±¶ªµôÁË¡£ÎªÁ˵õ½ÎÒÃÇÆÚÍûµÄ½á¹û£¬ÐèÒª¶ÔÊý×é×öÒ»µãÐ޸ģº
List<String> words = Arrays.asList( "the", "quick", "brown", "fox", "jumped", "over", "the", "lazy", "dogs" );
Observable.fromIterable(words)
.flatMap(word -> Observable.fromArray(word.split("")))
.distinct()
.sorted()
.zipWith(Observable.range(1, Integer.MAX_VALUE),
(string, count) -> String.format("%2d.
%s", count, string))
.subscribe(System.out::println); |
Ð޸ĺóµÄ´úÂëÊä³öΪ£º
1. a 2. b 3. c 4. d 5. e 6. f 7. g 8. h 9. i 10. j 11. k 12. l 13. m 14. n 15. o 16. p 17. q 18. r 19. s 20. t 21. u 22. v 23. w 24. x 25. y 26. z |
ÏÖÔÚºÃÁË£¡
µ«Êǵ½Ä¿Ç°ÎªÖ¹£¬ËùÓеĴúÂë¶¼¸úJava 8ÀïÒýÈëµÄStreams APIºÜÏàËÆ£¬²»¹ýÕâÖÖÏàËÆÖ»ÊÇÒ»ÖÖÇɺϣ¬ÒòΪÏìӦʽ°üº¬µÄÄÚÈÝÔ¶²»Ö¹ÕâЩ¡£
Java StreamsºÍLambda±í´ïʽΪ±à³ÌÓïÑÔ´øÀ´ºÜ´óµÄ¼ÛÖµ£¬²»¹ý¹é¸ù½áµ×£¬ËüÃÇÖ»ÊÇÌṩÁËÒ»ÖÖ·½Ê½À´±éÀú¼¯ºÏºÍÉú³É¼¯ºÏ¡£ËüÃǵÄ×÷ÓúÜÓÐÏÞ£¬¶øÇÒȱ·¦¿ÉÀ©Õ¹ÐԺͿÉÖØÓÃÐÔ¡£¾¡¹ÜStreamµÄparallel²Ù×÷¿ÉÒÔ²¢ÐÐÖ´ÐÐÈÎÎñ£¬µ«ÔÚ·µ»Ø½á¹ûǰ³ÌÐòÎÞ·¨¶ÔÕû¸ö¹ý³Ì½øÐиÉÔ¤¡£Ïà·´£¬ÏìӦʽÒýÈëÁËÖ´ÐÐʱ¼ä¡¢½ÚÁ÷¡¢Á÷Á¿¿ØÖƵȸÅÄ¶øÇÒËüÃÇ¿ÉÒÔ±»Á¬½Óµ½¡°ÓÀ²»Í£Ö¹¡±µÄ´¦ÀíÁ÷³ÌÀï¡£ÏìӦʽ²úÉúµÄ½á¹ûËäÈ»²»ÊǼ¯ºÏ£¬µ«Äã¿ÉÒÔÓÃÈÎºÎÆÚÍûµÄ·½Ê½À´´¦ÀíÕâЩ½á¹û¡£
ÈÃÎÒÃÇͨ¹ýµ¯Öé½»»¥Í¼¸üºÃµØÀí½âÕâЩ¸ÅÄî¡£
merge²Ù×÷¿ÉÒÔ°Ñ×î¶à9¸öÔ´Á÷ºÏ²¢µ½Ò»¸ö½á¹ûÀ¶øÇÒ¿ÉÒÔ±£ÁôËüÃǵÄ˳Ðò¡£ÎÞÐèµ£ÐÄÕâÀï»á³öÏÖ¾ºÈüÌõ¼þ£¬ÒòΪËùÓеÄʼþ¶¼±»¡°±âƽ»¯¡±µ½Ò»¸öµ¥¶ÀµÄÏß³ÌÀ°üÀ¨Ò쳣ʼþºÍ½áÊøÊ¼þ¡£
debounce²Ù×÷»á°ÑÔÚÒ»¸öʱ¼ä¶ÎÄÚ½ô°¤ÔÚÒ»ÆðµÄ¼¸¸öʼþ¿´³ÉÒ»¸öµ¥¶Àʼþ£¬Õ⼸¸öʼþÀïÖ»ÓÐ×îºóÒ»¸ö»á±»´¥·¢£º

¿ÉÒÔ¿´µ½£¬ÉÏÏÂÁ½¸öͼÖеġ°1¡±Ö®¼äÓÐÒ»¸öÖ¸¶¨µÄʱ¼ä¼ä¸ô£¬¶ø2¡¢3¡¢4¡¢5Ö®¼äµÄʱ¼ä¼ä¸ô¶¼Ð¡ÓÚÕâ¸ö¼ä¸ô£¬ËùÒÔËüÃDZ»¿´³Éµ¥¸öʼþ¡£Èç¹û°Ñ¡°5¡±ÍùÓÒŲһµã£¬½á¹û¾Í²»Ò»ÑùÁË£º

ÁíÒ»¸öÓÐȤµÄ²Ù×÷ÊÇamb£¬ËüÊÇÒ»ÖÖ²»È·¶¨ÐԵIJÙ×÷¡£¶ÔÓ¦µÄÊý×éÐÎʽ²Ù×÷ÊÇambArray¡£
amb²Ù×÷»á´ÓËùÓеÄÊäÈëÁ÷ÖÐÑ¡ÔñµÚÒ»¸ö³öÏÖµÄÁ÷£¬È»ºóºöÂÔÆäËüʣϵÄÁ÷¡£ÈçÏÂͼ£¬µÚ¶þ¸öÁ÷ÊÇ×îÏȳöÏֵģ¬ËùÒÔamb²Ù×÷Ñ¡ÔñÁËÕâ¸öÁ÷¡£

Èç¹û°ÑµÚÒ»¸öÁ÷ÀïµÄ¡°20¡±Íù×óÒÆ¶¯£¬³¬¹ýµÚ¶þ¸öÁ÷µÄµÚÒ»¸öÔªËØ£¬ÄÇôÉú³ÉµÄ½á¹ûÓֻ᲻һÑù£º

Èç¹ûÄãÓÐÒ»¸öÐèÒª½ÓÈ뵽ij¸öÊý¾ÝÔ´µÄ´¦ÀíÁ÷³Ì£¬±ÈÈç´ÓÏûÏ¢Ö÷ÌâÉÏ»ñÈ¡Êý¾Ý£¬¿ÉÄÜÊÇBloomberg»òÕßReuters£¬Äã²¢²»¹ØÐĽÓÈëµÄµ½µ×ÊÇÄÄÒ»¸ö£¬Ö»Òª´ÓÖÐÑ¡ÔñÒ»¸ö¾Í¿ÉÒÔÁË¡£ÔÚÕâÖÖÇé¿öÏ£¬amb²Ù×÷¾Í»áºÜÓÐÓá£
Tick Tock
ÏÖÔÚ£¬ÎÒÃÇ¿ÉÒÔʹÓÃÕâЩ¹¤¾ß»ùÓÚÁ÷Éú³É¸÷ÖÖÓÐÒâÒåµÄ½á¹û¡£ÔÚ½ÓÏÂÀ´µÄÕâ¸öÀý×ÓÀÎÒÃÇÓÐÒ»¸öÊý¾ÝÔ´£¬Ëü»áÿÃëÖÓÉú³ÉÒ»¸öʼþ¡£²»¹ýΪÁ˽ÚÊ¡CPU£¬ÎÒÃÇÈÃËüÔÚÖÜĩʱÿÈýÃëÉú³ÉÒ»´Î¡£ÎÒÃÇʹÓûìºÏÐ͵ġ°½Ú×àÆ÷¡±°´ÕÕÒ»¶¨µÄ½Ú×àÉú³ÉÊý¾Ý¡£
Ê×ÏÈ£¬ÎÒÃÇÒª´´½¨Ò»¸ö·µ»ØbooleanµÄ·½·¨£¬Ëü»á¼ì²éµ±Ç°Ê±¼äÊÇ·ñÊÇÖÜÄ©£¬Èç¹ûÊǾͷµ»Øtrue£¬·ñÔò¾Í·µ»Øfalse£º
private static boolean isSlowTickTime() { return LocalDate.now().getDayOfWeek() == DayOfWeek.SATURDAY || LocalDate.now().getDayOfWeek() == DayOfWeek.SUNDAY; } |
¶ÔÓڱ߶ÁÕâÆªÎÄÕ±ßÔÚIDEÀïÖ´ÐÐÕâ¶Î´úÂëµÄ¶ÁÕßÀ´Ëµ£¬ËûÃÇ¿ÉÄܲ»ÏëµÈµ½Ï¸öÖÜÄ©²ÅÀ´ÑéÖ¤Õâ¸ö·½·¨ÊÇ·ñ¿ÉÐУ¬ËùÒÔ¿ÉÒÔʹÓÃÏÂÃæµÄÌæ´úʵÏÖ£¬Õâ¸öʵÏÖ»áÔÚÒ»¸ö15ÃëÖÓÄÚ·µ»Øtrue£¬ÔÚÁíÒ»¸ö15ÃëÖÓÄÚ·µ»Øfalse£º
private static long start = System.currentTimeMillis(); public static Boolean isSlowTickTime() { return (System.currentTimeMillis() - start) % 30_000 >= 15_000; } |
½ÓÏÂÀ´ÎÒÃÇ´´½¨Á½¸öObservable¶ÔÏó£¬fastºÍslow£¬È»ºóʹÓùýÂËÆ÷¶ÔËüÃǽøÐе÷¶È£¬²¢°ÑËüÃǺϲ¢ÆðÀ´¡£
ÎÒÃÇʹÓÃObservable.interval²Ù×÷À´°²Åŵ÷¶È£¬Ëü»áÔÚÿ¸öÖ¸¶¨µÄʱ¼ä¼ä¸ôÄÚ²úÉúÒ»´ÎÊý¾Ý£¨´Ó0¿ªÊ¼¼ÆË㣩¡£
Observable<Long> fast = Observable.interval(1, TimeUnit.SECONDS); Observable<Long> slow = Observable.interval(3, TimeUnit.SECONDS); |
fastÿÃëÉú³ÉÒ»¸öʼþ£¬slowÿÈýÃëÉú³ÉÒ»¸öʼþ£¨ÎÒÃÇ»áºöÂÔʼþµÄÖµ£¬ÒòΪÎÒÃÇÖ»¶ÔÖ´ÐÐʱ¼ä¸ÐÐËȤ£©¡£
ÏÖÔÚÎÒÃǰÑÕâÁ½¸öObservableºÏ²¢µ½Ò»Æð£¬Í¨¹ýʹÓùýÂËÆ÷ÈÃfastÁ÷ÔÚ¹¤×÷ÈÕÉú³ÉÊý¾Ý£¨»òÕßÔÚ15ÃëÄÚ£©£¬slowÁ÷ÔÚÖÜÄ©Éú³ÉÊý¾Ý£¨»òÕßÔÚÁíÒ»¸ö15ÃëÄÚ£©¡£
Observable<Long> clock = Observable.merge( slow.filter(tick-> isSlowTickTime()), fast.filter(tick-> !isSlowTickTime()) ); |
×îºó£¬ÎÒÃÇÒªÌí¼ÓÒ»¸ö´òӡʱ¼äµÄ¶©Ô͝×÷¡£ÔÚÖ´ÐÐÕâЩ´úÂëʱ£¬Ëü»á¸ù¾ÝÎÒÃǵĵ÷¶È°²ÅÅ´òÓ¡³öϵͳʱ¼ä¡£
clock.subscribe(tick-> System.out.println(new Date()));
ΪÁË·ÀÖ¹³ÌÐòÖÐ;Í˳ö£¬ÐèÒªÔÚ·½·¨µÄĩβÌí¼ÓÒ»ÐдúÂ루עÒâÒª´¦ÀíInterruptedExceptionÒì³££©¡£
ÔËÐдúÂëµÄ½á¹û£º
Fri Sep 16 03:08:18 BST 2016 Fri Sep 16 03:08:19 BST 2016 Fri Sep 16 03:08:20 BST 2016 Fri Sep 16 03:08:21 BST 2016 Fri Sep 16 03:08:22 BST 2016 Fri Sep 16 03:08:23 BST 2016 Fri Sep 16 03:08:24 BST 2016 Fri Sep 16 03:08:25 BST 2016 Fri Sep 16 03:08:26 BST 2016 Fri Sep 16 03:08:27 BST 2016 Fri Sep 16 03:08:28 BST 2016 Fri Sep 16 03:08:29 BST 2016 Fri Sep 16 03:08:30 BST 2016 Fri Sep 16 03:08:31 BST 2016 Fri Sep 16 03:08:32 BST 2016 Fri Sep 16 03:08:35 BST 2016 Fri Sep 16 03:08:38 BST 2016 Fri Sep 16 03:08:41 BST 2016 Fri Sep 16 03:08:44 BST 2016 . . . |
¿ÉÒÔ¿´µ½£¬Ç°Ãæ15¸öʼþÖ®¼äµÄʱ¼ä¼ä¸ô¶¼ÊÇ1Ã룬ºóÃæ15ÃëÄÚµÄʼþÖ®¼äµÄʱ¼ä¼ä¸ôÊÇ3Ã룬¾ÍÏñÎÒÃÇËùÆÚÍûµÄÄÇÑù¡£
Á¬½Óµ½ÒÑ´æÔÚµÄÊý¾ÝÔ´
ÒÔÉÏ·½·¨ÓÃÓÚ´´½¨Äܹ»Éú³É¾²Ì¬Êý¾ÝµÄObservableÊÇûÓÐÎÊÌâµÄ¡£µ«ÈçºÎ°ÑObservableÁ¬½Óµ½ÒÑÓеÄÊý¾ÝÔ´ÉÏ£¬²¢ÏíÊÜÏìӦʽµÄÁ÷Á¿¿ØÖƺÍÁ÷²Ù×÷²ßÂÔΪÎÒÃÇ´øÀ´µÄºÃ´¦ÄØ£¿
ÔÚ¼ÌÐø½éÉÜ֮ǰ£¬ÎÒÃÇÓ¦¸ÃÈÏʶһÏÂRxJava2ÐÂÒýÈëµÄһЩÀà¡£
¾²Ì¬Observable¡¢¶¯Ì¬ObservableºÍFlowable
ÔÚRxJavaµÄǰÆÚ°æ±¾ÖУ¬¼´Ê¹¶ÔÓÚÎÞÐèÁ÷¿ØÖƵÄСÐÍÁ÷£¬ObservableÒ²¸ø³öÁËÁ÷¿ØÖÆ·½·¨¡£Îª·ûºÏÏìӦʽµÄ¹æ·¶£¬RxJava2½«Á÷¿ØÖÆ´ÓObservableÀàÖÐÒÆ³ý£¬²¢ÒýÈëÁËеÄFlowableÀà¡£Flowable¿ÉÒÔ¿´×÷ÊÇÌṩÁ÷¿ØÖƵÄObservable¡£
µ½Ä¿Ç°ÎªÖ¹ÎÒÃÇÌÖÂ۵ͼÊǾ²Ì¬Observable£¬ËüÃÇÌṩ¾²Ì¬µÄÊý¾Ý£¬¾¡¹ÜÎÒÃÇ¿ÉÒÔÔÚÖ´ÐÐʱ¼äÉÏ×öһЩµ÷½Ú£¬²»¹ýÕâÔ¶Ô¶
²»¹»¡£¾²Ì¬ObservableÖ»ÔÚÓж©ÔÄÕßµÄÇé¿öϲŻáÉú³Éʼþ£¬¶øÇÒ¶©ÔÄÕßÊÕµ½µÄÊÇÀúÊ·Êý¾Ý£¬²»¹ÜËüÃÇÊÇ´ÓºÎʱ¿ªÊ¼¶©Ôĵġ£Ïà·´£¬¶¯Ì¬Observable²»¹ÜÓжàÉÙ¸ö¶©ÔÄÕß¶¼»áÉú³ÉÊý¾Ý£¬¶øÇÒÖ»Éú³É×îеÄÊý¾Ý£¨³ý·ÇʹÓÃÁË»º´æ£©¡£¿ÉÒÔͨ¹ýÁ½¸ö²½Öè°Ñ¾²Ì¬Observableת»¯³É¶¯Ì¬Observable£º
µ÷ÓÃObservableµÄpublish·½·¨£¬Éú³ÉÒ»¸öеÄConnectableObservable
µ÷ÓÃConnectableObservableµÄconnect·½·¨£¬¿ªÊ¼Éú³ÉÊý¾Ý
ÕâÖÖ·½Ê½Ò²Äܹ¤×÷£¬µ«²¢²»Ö§³ÖÈκÎÁ÷¿ØÖÆ¡£ÒªÁ¬½Óµ½³¤ÆÚÔËÐеÄÏÖÓÐÊý¾ÝÔ´ÉÏ£¬³ý·ÇÊÇÌṩ±³Ñ¹¿ØÖÆ£¬ÎÒÃÇͨ³£»áÑ¡ÔñʹÓÃFlowable£¬Ê¹ÓÃÒ»ÖÖObservableµÄ²¢ÐÐÓï·¨¡£
1a. µ÷ÓÃFlowableµÄpublish·½·¨Éú³ÉÒ»¸öеÄConnectableFlowable
2a. µ÷ÓÃConnectableFlowableµÄconnect·½·¨¿ªÊ¼Éú³ÉÊý¾Ý
ÒªÁ¬½Óµ½Ò»¸öÒÑÓеÄÊý¾ÝÔ´ÉÏ£¬¿ÉÒÔÔÚÕâ¸öÊý¾ÝÔ´ÉÏÌí¼Ó¼àÌýÆ÷£¨Èç¹ûÄãϲ»¶Õâô×ö£©£¬¼àÌýÆ÷»á°Ñʼþ´«²¥¸ø¶©ÔÄÕߣ¬È»ºóÔÚÿ¸öʼþ·¢Éúʱµ÷Óö©ÔÄÕßµÄonNext·½·¨¡£ÔÚʵÏÖ¼àÌýÆ÷µÄʱºòҪȷ±£Ã¿¸ö¶©ÔÄÕßÈÔÈ»´¦ÓÚ¶©ÔÄ״̬£¬·ñÔò¾ÍҪֹͣ°Ñʼþ´«²¥¸øËü£¬Í¬Ê±Òª×¢Òâ»ØÑ¹Ðźš£ËùÐÒµÄÊÇ£¬ÕâЩ¹¤×÷¿ÉÒÔÓÉFlowabledµÄcreate·½·¨À´´¦Àí¡£¼ÙÉèÎÒÃÇÓÐÒ»¸ö½Ð×öSomeFeedµÄÊý¾Ý·þÎñ£¬Ëü»áÉú³É±¨¼Ûʼþ£¬Í¬Ê±ÓÐÒ»¸öSomeListener¼àÌýÕâЩ±¨¼ÛʼþÒÔ¼°ÆäËüÉúÃüÖÜÆÚʼþ¡£ÔÚGitHubÉÏÒѾÓÐÒ»¸öʵÏÖ£¬Èç¹ûÄãÏë×Ô¼º¶¯ÊÖÔËÐÐÕâЩ´úÂ룬¿ÉÒÔÈ¥ÏÂÔØ¡£
ÎÒÃǵÄÊý¾ÝÔ´¼àÌýÆ÷ÓÐÁ½¸ö·½·¨£º
public void priceTick(PriceTick event); public void error(Throwable throwable); |
PriceTickÀà°üº¬ÁËdate¡¢instrumentºÍprice×ֶΣ¬»¹ÓÐÒ»¸öisLast·½·¨ÓÃÀ´ÅжÏËüÊÇ·ñÊÇ×îºóÒ»¸öʼþ£º

ÈÃÎÒÃÇÀ´¿´¿´ÈçºÎʹÓÃAsyncEmitter°ÑObservableÁ¬½Óµ½Ò»¸öʵʱµÄÊý¾ÝÔ´ÉÏ£º
SomeFeed<PriceTick> feed = new SomeFeed<>(); Flowable<PriceTick> flowable = Flowable.create(emitter -> { SomeListener listener = new SomeListener() { @Override public void priceTick(PriceTick event) { emitter.onNext(event); if (event.isLast()) { emitter.onComplete(); } }
@Override
public void error(Throwable e) {
emitter.onError(e);
}
};
feed.register(listener);
}, BackpressureStrategy.BUFFER);
flowable.subscribe(System.out::println); |
Õâ¶Î´úÂ뼸ºõÊÇÖð×ÖÖð¾äµØ´ÓFlowableÀàµÄJavadocÀïÕª³³öÀ´µÄ¡£Flowable·â×°Á˼àÌýÆ÷£¨µÚ3ÐУ©µÄ´´½¨¹ý³Ì£¬²¢°ÑËü×¢²áµ½Êý¾ÝÔ´ÉÏ£¨µÚ17ÐУ©¡£FlowableÖ±½ÓÈö©ÔÄÕß¶Ô×Ô¼º½øÐÐÁ˶©ÔÄ¡£Êý¾ÝÔ´Éú³ÉµÄʼþ±»Î¯ÍиøÁËlistener£¨µÚ6ÐУ©¡£µÚ18ÐиæËß¹Û²ìÕßÒª»º³åËùÓеÄʼþ֪ͨ£¬Ö±µ½ËüÃDZ»¶©ÔÄÕßÏû·Ñ¡£³ýÁË»º³å£¬»¹ÓÐÆäËü¼¸ÖÖ»ØÑ¹²ßÂÔ£º
BackpressureMode.MISSING²»Ê¹ÓûØÑ¹¡£Èç¹ûÁ÷µÄËÙ¶ÈÎÞ·¨±£³Öͬ²½£¬¿ÉÄÜ»áÅ׳öMissingBackpressureException»òIllegalStateException¡£
BackpressureStrategy.ERROR»áÔÚÏÂÓθú²»ÉÏËÙ¶ÈʱÅ׳öMissingBackpressureException¡£
BackpressureStrategy.DROP»áÔÚÏÂÓθú²»ÉÏËÙ¶Èʱ°ÑonNextµÄÖµ¶ªÆú¡£
BackpressureStrategy.LATEST»áÒ»Ö±±£Áô×îеÄonNextµÄÖµ£¬Ö±µ½±»ÏÂÓÎÏû·Ñµô¡£
ÕâÑùÉú³ÉµÄÊǾ²Ì¬Flowable¡£¾²Ì¬ObservableÔÚûÓж©ÔÄÕßµÄʱºò²»»áÉú³ÉÊý¾Ý£¬¶øÇÒËùÓж©ÔÄÕßÊÕµ½µÄÊÇͬÑùµÄÀúÊ·Êý¾Ý£¬¶øÕâ²»ÊÇÎÒÃÇÏëÒªµÄ¡£
ΪÁ˰ÑËüת»¯³É¶¯Ì¬Observable£¬ÈÃËùÓж©ÔÄÕß¿ÉÒÔʵʱµØ½ÓÊÕʼþ֪ͨ£¬ÎÒÃDZØÐëµ÷ÓÃpublishºÍconnect·½·¨£¬¾ÍÏñ֮ǰÌáµ½µÄÄÇÑù£º
ConnectableFlowable<PriceTick> hotObservable = flowable.publish(); hotObservable.connect(); |
×îºó£¬ÎÒÃÇ¿ÉÒÔ¶ÔËü½øÐж©ÔIJ¢ÏÔʾ±¨¼Û£º
hotObservable.subscribe((priceTick) -> System.out.printf("%s %4s %6.2f%n", priceTick.getDate(), priceTick.getInstrument(), priceTick.getPrice())); |
|