±à¼ÍƼö: |
±¾ÎÄÀ´×Ôjianshu
£¬ÎÄÕ½éÉÜÁËʲôÊÇͼ£¬Í¼µÄ³ö¶ÈÇ󷨣¬Öظ´¶¥µãÊÓͼµÄËÄÖÖģʽ£¬·ÖÇø·½Ê½µÈÏà¹ØÄÚÈÝ£¬Ï£ÍûÄܶÔÄúÓÐËù°ïÖú¡£ |
|
дÔÚÇ°Ãæ
̬¶È¾ö¶¨¸ß¶È£¡ÈÃÓÅÐã³ÉΪһÖÖϰ¹ß£¡
ÊÀ½çÉÏûÓÐʲôʶùÊǼÓÒ»´Î°à½â¾ö²»Á˵ģ¬Èç¹ûÓУ¬¾Í¼ÓÁ½´Î£¡£¨- - -ïǿ£©
ʲôÊÇÒ»¸öͼ
Ò»¸öÍøÂç

Network
Ò»¸öÊ÷

Tree
Ò»¸öRDBMS

RDMBMS
Ò»¸öÏ¡Êè¾ØÕó

Ï¡Êè¾ØÕóÍøÂç
»òÕß

Kitchen sink
ÊôÐÔͼ
¶¥µã

¶¥µã
±ß

±ß
GRAPHX
graphxÊÇÒ»¸öͼ¼ÆËãÒýÇæ£¬¶ø²»ÊÇÒ»¸öͼÊý¾Ý¿â£¬Ëü¿ÉÒÔ´¦ÀíÏñµ¹ÅÅË÷Òý£¬ÍƼöϵͳ£¬×î¶Ì·¾¶£¬ÈºÌå¼ì²âµÈµÈ
ÓÐÏòͼÓëÎÞÏòͼ

ÓÐÏòͼÎÞÏòͼ
Óл·Í¼ÓëÎÞ»·Í¼
Á½ÕßµÄÇø±ðÔÚÓÚÊÇ·ñÄܹ»ÑØ×Å·½Ïò¹¹³ÉÒ»¸ö±Õ»·

Óл·Í¼ÎÞ»·Í¼
ÓбêǩͼÓëÎÞ±êǩͼ

ÓбêÇ©ÎÞ±êǩͼ
αͼÓëÑ»·
´Ó¼òµ¥µÄͼ¿ªÊ¼£¬µ±ÔÊÐíÁ½¸ö½ÚµãÖ®¼äÓжà¸ö±ßµÄʱºò£¬¾ÍÊÇÒ»¸ö¸´ºÏͼ£¬Èç¹ûÔÚij¸ö½ÚµãÉϼӸöÑ»·¾Í³ÉÁËαͼ£¬GRAPHXÖеÄͼ¶¼ÊÇαͼ

αͼÓëÑ»·
¶þ²¿Í¼/żͼ
żͼÓиöÌØÊâµÄ½á¹¹£¬¾ÍÊÇËùÓеĶ¥µã·ÖΪÁ½¸öÊý¾Ý¼¯£¬ËùÓеı߶¼Êǽ¨Á¢ÔÚÕâÁ½¸öÊý¾Ý¼¯Ö®¼äµÄ£¬ÔÚÒ»¸öÊý¾Ý¼¯Öв»»á´æÔÚ±ß

żͼ
RDF£¨Resource Description Framework £©Í¼ÓëÊôÐÔͼ

RDFͼÓëÊôÐÔͼ
ÁÚ½Ó¾ØÕó

ÁÚ½Ó¾ØÕó
SPARK GRAPHX
RDD

DATA IN GRAPHX
graphxÖеÄGraphÓÐÁ½¸öRDD£¬Ò»¸öÊDZßRDD£¬Ò»¸öÊǵãRDD
ÆäÖÐUMLÈçÏÂ

Graph UML
Àí½âÈýÔª×é
Æäʵ¾ÍÊÇÓÉ£¨µã¡¢±ß£¬µã£©µÄÒ»¸öÓÐЧ×éºÏ£¬ÓÉtriplets()½Ó¿Ú»ñÈ¡

ÈýÔª×é
ÆäÖÐtriplets()·µ»ØµÄ½á¹ûÊÇEdgeTriplet[VD,ED]£¬EdgeTriplet[VD,ED]µÄÊôÐÔ½Ó¿ÚÓУº

ÊôÐÔ½Ó¿Ú
Àí½âaggregateMessages
Ê×ÏÈ¿´ÏÂÔ´Â룺
def aggregateMessages[A:
ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields = TripletFields.All):
VertexRDD[A] = {
aggregateMessagesWithActiveSet(sendMsg, mergeMsg,
tripletFields, None)
} |
EdgeContext

EdgeContext
Ö÷Òª¿¼ÂÇ

sendmsg
ÕâÁ½¸ö·½·¨
ÕâÁ½¸ö·½·¨Ò»¸ö°ÉtripletsÖÐÊý¾Ý·¢Ë͵½Ô´½Úµã
Ò»¸öÊǰÑtripletsÖеÄÊý¾Ý·¢Ë͵½Ä¿µÄ½Úµã
ÕâÑù¾Í¿ÉÒÔÔÚÔ´»òÕßÄ¿µÄ½Úµã½øÐоۺϲÙ×÷ÁË
¿´¸öÀý×Ó£º
graph.aggregateMessages[Int](_.sendToSrc(1),
_ + _).foreach(println) |
Õâ¸öÀý×Ó¾ÍÊÇÇó³öͼµÄ³ö¶È
sendToSrc(1)»áÕë¶Ôÿһ¸ötripletsÏòÔ´½Úµã·¢ËÍ1
Èçͼ

ÈýÔª×é
»áÏò2½Úµã·¢ËÍÒ»¸ö1
_ + _ £º±íʾÕë¶Ôÿ¸ö½Úµã×öÏà¼ÓµÄ¾ÛºÏ
±ÈÈçÏÂͼ5½ÚµãÓÐ4¸ötriplets£¬²ÉÓÃsendToSrc·½·¨ºó£¬ËüµÄ¾ÛºÏ¾ÍÊÇ1+1 = 2
Ò²¾ÍÊÇËüµÄ³ö¶È

ͼ
½á¹ûÊÇ
Pregel
ÏÈ¿´Ô´Âë
def apply[VD:
ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,
A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
£¨1£©graph£º
ÊäÈëµÄͼ
£¨2£© initialMsg:
³õʼ»¯ÏûÏ¢£¬ÔÚµÚÒ»´Îµü´úµÄʱºò£¬Õâ¸ö³õʼÏûÏ¢»á±»ÓÃÀ´³õʼ»¯Í¼ÖеÄÿ¸ö½Úµã£¬ÔÚpregel½øÐе÷ÓÃʱ£¬»áÊ×ÏÈÔÚͼÉÏʹÓÃmapVerticesÀ´¸ù¾ÝinitialMsgµÄÖµ¸üÐÂÿ¸ö½ÚµãµÄÖµ£¬ÖÁÓÚÈçºÎ¸üУ¬ÔòÓÉvprog²ÎÊý¶ø¶¨£¬vprogº¯Êý¾Í½ÓÊÕÁËinitialMsgÏûÏ¢×öΪ²ÎÊýÀ´¸üжÔÓ¦½ÚµãµÄÖµ
£¨3£© maxIterations£º
×î´óµü´úµÄ´ÎÊý
£¨4£© activeDirection:
»îÔ¾·½Ïò£¬Ê×ÏÈÀí½â»îÔ¾ÏûÏ¢Óë»îÔ¾¶¥µã£¬»îÔ¾½ÚµãÊÇÖ¸ÔÚijһÂÖµü´úÖÐpregelÒÔsendMsgºÍmergeMsgΪ²ÎÊýÀ´µ÷ÓÃgraphµÄaggregateMessage·½·¨ºóÊÕµ½ÏûÏ¢µÄ½Úµã£¬»îÔ¾ÏûÏ¢¾ÍÊÇÕâÂÖµü´úÖÐËùÓб»³É¹¦ÊÕµ½µÄÏûÏ¢¡£ÕâÑùÒ»À´£¬ÓеıߵÄsrc½ÚµãÊÇ»îÔ¾½Úµã£¬ÓеÄdst½ÚµãÊÇ»îÔ¾½Úµã£¬¶øÓеıßÁ½¶Ë½Úµã¶¼ÊÇ»îÔ¾½Úµã¡£Èç¹ûactiveDirection²ÎÊýÖ¸¶¨Îª¡°EdgeDirection.Out¡±,ÔòÔÚÏÂÒ»ÂÖµü´úʱ£¬Ö»ÓнÓÊÕÏûÏ¢µÄ³ö±ß(src¡ª>dst)²Å»áÖ´ÐÐsendMsgº¯Êý£¬Ò²¾ÍÊÇ˵£¬sendMsg»Øµ÷º¯Êý»á¹ýÂ˵ô¡±dst¡ª>src¡±µÄedgeTripletÉÏÏÂÎIJÎÊý
EdgeDirection.Out ¡ªsendMsg gets called if srcId
received a message during the previous iteration,
meaning this edge is considered an ¡°out-edge¡±
of srcId.
EdgeDirection.In¡ªsendMsg gets called if dstId
received a message during the previous iteration,
meaning this edge is considered an ¡°in-edge¡± of
dstId.
EdgeDirection.Either¡ªsendMsg gets called if either
srcId or dstId received a message during the previous
iteration.
EdgeDirection.Both ¡ªsendMsg gets called if both
srcId and dstId received mes- sages during the
previous iteration.
£¨5£© vprog:
½Úµã±ä»»º¯Êý£¬ÔÚ³õʼʱ£¬ÔÚÿÂÖµü´úºó£¬pregel»á¸ù¾ÝÉÏÒ»ÂÖʹÓõÄmsgºÍÕâÀïµÄvprodº¯ÊýÔÚͼÉϵ÷ÓÃjoinVertices·½·¨±ä»¯Ã¿¸öÊÕµ½ÏûÏ¢µÄ½Úµã£¬×¢ÒâÕâ¸öº¯Êý³ý³õʼʱÍ⣬¶¼ÊǽöÔÚ½ÓÊÕµ½ÏûÏ¢µÄ½ÚµãÉÏÔËÐУ¬ÕâÒ»µã¿ÉÒÔ´ÓÔ´ÂëÖп´µ½£¬Ô´ÂëÖÐÓõÄÊÇjoinVertices(message)(vprog)£¬Òò´Ë£¬Ã»ÓÐÊÕµ½ÏûÏ¢µÄ½ÚµãÔÚjoinÖ®ºó¾ÍÂ˵ôÁË
£¨6£© sendMsg:
ÏûÏ¢·¢Ëͺ¯Êý£¬¸Ãº¯ÊýµÄÔËÐвÎÊýÊÇÒ»¸ö´ú±í±ßµÄÉÏÏÂÎÄ£¬pregelÔÚµ÷ÓÃaggregateMessagesʱ£¬»á½«EdgeContextת»»³ÉEdgeTriplet¶ÔÏó(ctx.toEdgeTriplet)À´Ê¹Óã¬Óû§ÐèҪͨ¹ýIterator[(VertexId,A)]Ö¸¶¨·¢ËÍÄÄЩÏûÏ¢£¬·¢¸øÄÇЩ½Úµã£¬·¢Ë͵ÄÄÚÈÝÊÇʲô£¬ÒòΪÔÚÒ»Ìõ±ßÉÏ¿ÉÒÔ·¢ËͶà¸öÏûÏ¢£¬ÓÐsendToDstºÍsendToSrc£¬ËùÒÔÕâÀïÊǸöIterator£¬Ã¿Ò»¸öÔªËØÊÇÒ»¸ötuple£¬ÆäÖеÄvertexId±íʾҪ½ÓÊÕ´ËÏûÏ¢µÄ½ÚµãµÄid£¬ËüÖ»ÄÜÊǸñßÉϵÄsrcId»òdstId£¬¶øA¾ÍÊÇÒª·¢Ë͵ÄÄÚÈÝ£¬Òò´ËÈç¹ûÊÇÐèÒªÓÉsrc·¢ËÍÒ»ÌõÏûÏ¢A¸ødst£¬ÔòÓУºIterator((dstId,A))£¬Èç¹ûʲôÏûÏ¢Ò²²»·¢ËÍ£¬Ôò¿ÉÒÔ·µ»ØÒ»¸ö¿ÕµÄIterator£ºIterator.empty
£¨7£© mergeMsg:
ÁÚ¾Ó½ÚµãÊÕµ½¶àÌõÏûϢʱµÄºÏ²¢Âß¼£¬×¢ÒâËüÇø±ðÓÚvprogº¯Êý£¬mergeMsg½öÄܺϲ¢ÏûÏ¢ÄÚÈÝ£¬µ«ºÏ²¢ºó²¢²»»á¸üе½½ÚµãÖÐÈ¥£¬¶øvprogº¯Êý¿ÉÒÔ¸ù¾ÝÊÕµ½µÄÏûÏ¢(¾ÍÊÇmergeMsg²úÉúµÄ½á¹û)¸üнڵãÊôÐÔ |
£¨×îС·¾¶Ëã·¨£©
´ÓͼÉÏ¿ÉÒÔ¿´³ö×îС·¾¶Ëã·¨DijkstraµÄÔÀí
a. ³õʼʱ£¬SÖ»°üº¬Ô´µã£¬¼´S£½{v}£¬vµÄ¾àÀëΪ0¡£U°üº¬³ývÍâµÄÆäËû¶¥µã£¬¼´:U={ÆäÓà¶¥µã}£¬ÈôvÓëUÖж¥µãuÓбߣ¬Ôò<u,v>Õý³£ÓÐȨֵ£¬Èôu²»ÊÇvµÄ³ö±ßÁڽӵ㣬Ôò<u,v>ȨֵΪ¡Þ¡£
b. ´ÓUÖÐѡȡһ¸ö¾àÀëv×îСµÄ¶¥µãk£¬°Ñk£¬¼ÓÈëSÖУ¨¸ÃÑ¡¶¨µÄ¾àÀë¾ÍÊÇvµ½kµÄ×î¶Ì·¾¶³¤¶È£©¡£
c. ÒÔkΪп¼ÂǵÄÖмäµã£¬ÐÞ¸ÄUÖи÷¶¥µãµÄ¾àÀ룻Èô´ÓÔ´µãvµ½¶¥µãuµÄ¾àÀ루¾¹ý¶¥µãk£©±ÈÔÀ´¾àÀ루²»¾¹ý¶¥µãk£©¶Ì£¬ÔòÐ޸Ķ¥µãuµÄ¾àÀëÖµ£¬Ð޸ĺóµÄ¾àÀëÖµµÄ¶¥µãkµÄ¾àÀë¼ÓÉϱßÉϵÄȨ¡£
d. ÖØ¸´²½ÖèbºÍcÖ±µ½ËùÓж¥µã¶¼°üº¬ÔÚSÖС£

×îС·¾¶
ÔÚGRAPHXÖÐ
GraphX ²ÉÓö¥µãÇзַ½Ê½½øÐзֲ¼Ê½Í¼·Ö¸î

±ßÇзÖÓë¶¥µãÇзÖ
GraphX ²»ÊÇÑØ×űßÑØ·Ö¸îͼÐΣ¬¶øÊÇÑØ×Ŷ¥µã·Ö¸îͼÐΣ¬Õâ¿ÉÒÔ¼õÉÙͨÐźʹ洢¿ªÏú£¬ÔÚÂß¼ÉÏ£¬Õâ¶ÔÓ¦ÓÚ½«±ßÔµ·ÖÅ䏸»úÆ÷²¢ÔÊÐí¶¥µã¿çÔ½¶ą̀»úÆ÷¡£·ÖÅä±ßÔµµÄÈ·Çз½·¨È¡¾öÓÚPartitionStrategy¸÷ÖÖÆô·¢Ê½µÄ¼¸ÖÖÕÛÖÔ¡£Óû§¿ÉÒÔͨ¹ýÓëGraph.partitionByÔËËã·ûÖØÐ·ÖÇøÍ¼À´Ñ¡Ôñ²»Í¬µÄ²ßÂÔ¡£Ä¬ÈÏ·ÖÇø²ßÂÔÊÇʹÓÃͼÐι¹½¨ÖÐÌṩµÄ±ßµÄ³õʼ·ÖÇø£¨Ê¹ÓñߵÄ
srcId ½øÐйþÏ£·ÖÇø£¬½«±ßÊý¾ÝÒÔ¶à·ÖÇøÐÎʽ·Ö²¼ÔÚ¼¯Èº£©£¬ÁíÍ⣬¶¥µã RDD Öл¹ÓµÓж¥µãµ½±ß RDD
·ÖÇøµÄ·ÓÉÐÅÏ¢¡ª¡ªÂ·ÓÉ±í£®Â·ÓÉ±í´æÔÚ¶¥µã RDD µÄ·ÖÇøÖУ¬Ëü¼Ç¼·ÖÇøÄÚ¶¥µã¸úËùÓÐ±ß RDD ·ÖÇøµÄ¹ØÏµ£®ÔÚ±ß
RDD ÐèÒª¶¥µãÊý¾Ýʱ£¨Èç¹¹Ôì±ßÈýÔª×飩£¬¶¥µã RDD »á¸ù¾Ý·Óɱí°Ñ¶¥µãÊý¾Ý·¢ËÍÖÁ±ß RDD ·ÖÇø¡£

·ÖÇø
ÈçÏÂͼ°´¶¥µã·Ö¸î·½·¨½«Í¼·Ö½âºóµÃµ½¶¥µã RDD¡¢±ß RDD ºÍ·Óɱí

·ÖÇø½âÊÍͼ
GraphX »áÒÀ¾Ý·ÓÉ±í£¬´Ó¶¥µã RDD ÖÐÉú³ÉÓë±ß RDD ·ÖÇøÏà¶ÔÓ¦µÄÖØ¸´¶¥µãÊÓͼ£¨ ReplicatedVertexView£©£¬ËüµÄ×÷ÓÃÊÇ×÷ΪÖмä
RDD£¬½«¶¥µãÊý¾Ý´«ËÍÖÁ±ß RDD ·ÖÇø¡£Öظ´¶¥µãÊÓͼ°´±ß RDD ·ÖÇø²¢Ð¯´ø¶¥µãÊý¾ÝµÄ RDD£¬ÈçͼÏÂͼËùʾ£¬Öظ´¶¥µã·ÖÇø
A ÖбãЯÁË´ø±ß RDD ·ÖÇø A ÖеÄËùÓеĶ¥µã£¬ËüÓë±ß RDD ÖеĶ¥µãÊÇ co-partition£¨¼´·ÖÇø¸öÊýÏàͬ£¬ÇÒ·ÖÇø·½·¨Ïàͬ£©£¬ÔÚͼ¼ÆËãʱ£¬
GraphX ½«Öظ´¶¥µãÊÓͼºÍ±ß RDD °´·ÖÇø½øÐÐÀÁ´£¨ zipPartition£©²Ù×÷£¬¼´½«Öظ´¶¥µãÊÓͼºÍ±ß
RDD µÄ·ÖÇøÒ»Ò»¶ÔÓ¦µØ×éºÏÆðÀ´£¬´Ó¶ø½«±ßÓë¶¥µãÊý¾ÝÁ¬½ÓÆðÀ´£¬Ê¹±ß·ÖÇøÓµÓж¥µãÊý¾Ý¡£ÔÚÕû¸öÐγɱßÈýÔª×é¹ý³ÌÖУ¬Ö»ÓÐÔÚ¶¥µã
RDD ÐγɵÄÖØ¸´¶¥µãÊÓͼÖдæÔÚ·ÖÇø¼äÊý¾ÝÒÆ¶¯£¬ÀÁ´²Ù×÷²»ÐèÒªÒÆ¶¯¶¥µãÊý¾ÝºÍ±ßÊý¾Ý£®ÓÉÓÚ¶¥µãÊý¾ÝÒ»°ã±È±ßÊý¾ÝÒªÉٵĶ࣬¶øÇÒËæ×ŵü´ú´ÎÊýµÄÔö¼Ó£¬ÐèÒª¸üÐµĶ¥µãÊýĿҲԽÀ´Ô½ÉÙ£¬Öظ´¶¥µãÊÓͼÖÐЯ´øµÄ¶¥µãÊý¾ÝÒ²»áÏàÓ¦¼õÉÙ£¬ÕâÑù¾Í¿ÉÒÔ´ó´ó¼õÉÙ¼¯ÈºÖÐÊý¾ÝµÄÒÆ¶¯Á¿£¬¼Ó¿ìÖ´ÐÐËÙ¶È¡£

ÖØ¸´¶¥µãÊÓͼ
ÖØ¸´¶¥µãÊÓͼÓÐËÄÖÖģʽ
£¨1£©bothAttr: ¼ÆËãÖÐÐèҪÿÌõ±ßµÄÔ´¶¥µãºÍÄ¿µÄ¶¥µãµÄÊý¾Ý
£¨2£©srcAttrOnly£º¼ÆËãÖÐÖ»ÐèҪÿÌõ±ßµÄÔ´¶¥µãµÄÊý¾Ý
£¨3£©destAttrOnly£º¼ÆËãÖÐÖ»ÐèҪÿÌõ±ßµÄÄ¿µÄ¶¥µãµÄÊý¾Ý
£¨4£©noAttr£º¼ÆËãÖв»ÐèÒª¶¥µãµÄÊý¾Ý

bothAttr

srcAttrOnly

destAttrOnly

noAttr
ÖØ¸´¶¥µãÊÓͼ´´½¨Ö®ºó¾Í»á±»¼ÓÔØµ½Äڴ棬ÒòΪͼ¼ÆËã¹ý³ÌÖУ¬Ëû¿ÉÄܻᱻ¶à´ÎʹÓã¬Èç¹û³ÌÐò²»ÔÙʹÓÃÖØ¸´¶¥µãÊÓͼ£¬ÄÇô¾ÍÐèÒªÊÖ¶¯µ÷ÓÃGraphImplÖеÄunpersistVertices£¬½«Æä´ÓÄÚ´æÖÐɾ³ý¡£
Éú³ÉÖØ¸´¶¥µãÊÓͼʱ£¬ÔÚ±ßRDDµÄÿ¸ö·ÖÇøÖд´½¨¼¯ºÏ£¬´æ´¢¸Ã·ÖÇø°üº¬µÄÔ´¶¥µãºÍÄ¿µÄ¶¥µãµÄID¼¯ºÏ£¬¸Ã¼¯ºÏ±»³Æ×÷±¾µØ¶¥µãIDÓ³É䣨local
VertexId Map£©£¬ÔÚÉú³ÉÖØ¸´¶¥µãÊÓͼʱ£¬ÈôÖØ¸´¶¥µãÊÓͼʱµÚÒ»´Î±»´´½¨£¬Ôò°Ñ±¾µØ¶¥µãIDÓ³ÉäºÍ·¢Ë͸ø±ßRDD¸÷·ÖÇøµÄ¶¥µãÊý¾Ý×éºÏÆðÀ´£¬ÔÚÿ¸ö·ÖÇøÖÐÒÔ·ÖÇøµÄ±¾µØ¶¥µãIDÓ³ÉäΪË÷Òý´æ´¢¶¥µãÊý¾Ý£¬Éú³ÉÐµĶ¥µã·ÖÇø£¬×îºóµÃµ½Ò»¸öÐµĶ¥µãRDD£¬ÈôÖØ¸´¶¥µãÊÓͼ²»ÊǵÚÒ»´Î±»´´½¨£¬ÔòʹÓÃÖ®Ç°ÖØ¸´¶¥µãÊÓͼ´´½¨µÄ¶¥µãRDDÔ¤·¢Ë͸ø±ßRDD¸÷·ÖÇøµÄ¶¡´øÄã¸üÐÂÊý¾Ý½øÐÐÁ¬½Ó£¨join£©²Ù×÷£¬¸üж¥µãRDDÖж¥µãµÄÊý¾Ý£¬Éú³ÉÐµĶ¥µãRDD¡£
GraphX ÔÚ¶¥µã RDD ºÍ±ß RDD µÄ·ÖÇøÖÐÒÔÊý×éÐÎʽ´æ´¢¶¥µãÊý¾ÝºÍ±ßÊý¾Ý£¬Ä¿µÄÊÇΪÁ˲»ËðÊ§ÔªËØ·ÃÎÊÐÔÄÜ¡£Í¬Ê±£¬GraphX
ÔÚ·ÖÇøÀィÁ¢ÁËÖÚ¶àË÷Òý½á¹¹£¬¸ßЧµØÊµÏÖ¿ìËÙ·ÃÎʶ¥µãÊý¾Ý»ò±ßÊý¾Ý¡£ÔÚµü´ú¹ý³ÌÖУ¬Í¼µÄ½á¹¹²»»á·¢Éú±ä»¯£¬Òò¶ø¶¥µã
RDD¡¢±ß RDD ÒÔ¼°Öظ´¶¥µãÊÓͼÖеÄË÷Òý½á¹¹È«²¿¿ÉÒÔÖØÓ㬵±ÓÉÒ»¸öͼÉú³ÉÁíÒ»¸öͼʱ£¬Ö»Ðë¸üж¥µã
RDD ºÍ±ß RDD µÄÊý¾Ý´æ´¢Êý×飬Òò´Ë£¬Ë÷Òý½á¹¹µÄÖØÓñ£³ÖÁËGraphX ¸ßÐÔÄÜ£¬Ò²ÊÇÏà¶ÔÓÚÔÉú
RDD ʵÏÖͼģÐÍÐÔÄÜÄܹ»´ó·ùÌá¸ßµÄÖ÷ÒªÔÒò¡£
-·ÖÇø·½Ê½¼ò½é

·ÖÇø·½Ê½
Ëã·¨
×îС·¾¶Ëã·¨
val sourceId:
VertexId = 5L
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(
Double.PositiveInfinity,
activeDirection = EdgeDirection.Out
)(
(vertexId, vertexValue, msg) =>
math.min(vertexValue, msg),//vprog,×÷ÓÃÊÇ´¦Àíµ½´ï¶¥µãµÄ²ÎÊý£¬È¡½ÏСµÄÄǸö×÷Ϊ¶¥µãµÄÖµ
triplet => { //sendMsg,¼ÆËãÈ¨ÖØ£¬Èç¹ûÁÚ¾Ó½ÚµãµÄÊôÐÔ¼ÓÉϱßÉϵľàÀëСÓڸýڵãµÄÊôÐÔ£¬ËµÃ÷´ÓÔ´½Úµã±È´ÓÁھӽڵ㵽¸Ã¶¥µãµÄ¾àÀë¸üС£¬¸üÐÂÖµ
if (triplet.srcAttr + triplet.attr < triplet.dstAttr)
{
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) //mergeMsg£¬ºÏ²¢µ½´ï¶¥µãµÄËùÓÐÐÅÏ¢
)
println(sssp.vertices.collect.mkString("\n")) |
ÒÔÉÏ´úÂëÊÇÇó½ÚµãIDΪ5µÄËùÓпɵ½´ï½ÚµãµÄ×î¶Ì·¾¶
Ëã·¨Ïê½â£ºÊ×ÏÈinitialGraph¾ÍÏȱéÀúËùÓеĽڵã°ÉÎÒÃÇÉèÖõÄÄ¿±ê½ÚµãÉèÖõÄÊôÐÔÖµÉèÖóÉ0.0ÆäËûµÄËùÓнڵãÉèÖóÉÕýÎÞÇpregelÖеÄDouble.PositiveInfinityÊdzõʼ»¯²ÎÊý£¬ÔÚpregelÖ´ÐеĹý³ÌÖеĵÚÒ»´Îµü´úʱ£¬»á³õʼ»¯ËùÓеĽڵãÊôÐÔÖµ£¬»á¸ù¾ÝϱߵÄvprog
= (vertexId, vertexValue, msg) => math.min(vertexValue,
msg),//£¨vprog,×÷ÓÃÊÇ´¦Àíµ½´ï¶¥µãµÄ²ÎÊý£¬È¡½ÏСµÄÄǸö×÷Ϊ¶¥µãµÄÖµ£©È¥´¦ÀíËùÓеĽڵ㣬ËùÒÔ£¬³õʼ»¯ºó³ýÁË5½ÚµãµÄÊôÐÔֵΪ0.0Í⣬ÆäËûµÄ¶¼ÊÇÕýÎÞÇî¡£activeDirection
= EdgeDirection.OutÏÞ¶¨ËùÓеÄÓÐЧ·½ÏòÊdzö±ß£¬tripletÏÞ¶¨ÁËÖ»ÓÐÔÚÿ´Îµü´úÖÐÂú×ãtriplet.srcAttr
+ triplet.attr < triplet.dstAttrÌõ¼þµÄ²Å»á¸üе±Ç°½ÚµãÖµ£¬×îºó(a,
b) => math.min(a, b)·½·¨ºÏ²¢Á˵ü´úµ½µ±Ç°ËùÓнÓÊܵ½ÏûÏ¢µÄ¶¥µãµÄÊôÐÔÖµ£¬Ò²¾ÍÊÇ˵ÕÒµ½Ô´¶¥µãµ½¿É´ï¶¥µãÖеÄ·¾¶×îСµÄÄǸö¿É´ï¶¥µã¡£²»¶ÏµÄµü´úÏÂÈ¥£¬×îºóɨÃèÍêÕû¸öͼ£¬×îÖյóöµ½ËùÓпɴﶥµã×î¶Ì·¾¶¡£
ÕÒ³öÄ¿±ê½ÚµãËùÓеÄ2Ìø½Úµã
val friends
= Pregel(
graph.mapVertices((vid,value) => if(vid ==2)
2 else -1),//³õʼ»¯ÐÅÏ¢£¬Ô´½ÚµãΪ2£¬ÆäËû½ÚµãΪ-1
-1,
2,
EdgeDirection.Either
)(
vprog = (vid,attr,msg) =>math.max(attr, msg),//¶¥µã²Ù×÷£¬µ½À´µÄÊôÐÔºÍÔÊôÐԱȽϣ¬½Ï´óµÄ×÷Ϊ¸Ã½ÚµãµÄÊôÐÔ
edge => {
if (edge.srcAttr <= 0) {
if (edge.dstAttr <= 0) {
Iterator.empty//¶¼Ð¡ÓÚ0£¬ËµÃ÷´ÓÔ´½Úµã»¹Ã»Óд«µÝµ½ÕâÀï
}else {
Iterator((edge.srcId,edge.dstAttr - 1))//Ä¿µÄ½Úµã´óÓÚ0£¬½«Ä¿µÄ½ÚµãÊôÐÔ¼õÒ»¸³Öµ¸øÔ´½Úµã
}
}else {
if(edge.dstAttr <= 0) {
Iterator((edge.dstId,edge.srcAttr -1))//Ô´½Úµã´óÓÚ0£¬½«Ô´½ÚµãÊôÐÔ¼õÒ»¸³Öµ¸øÄ¿µÄ½Úµã
}else {
Iterator.empty//¶¼´óÓÚ0£¬ËµÃ÷ÔÚ¶þÌø½ÚµãÒÔÄÚ£¬²»²Ù×÷
}
}
},
(a,b) => math.max(a, b)//µ±Óжà¸öÊôÐÔ´«µÝµ½Ò»¸ö½Úµã£¬È¡´óµÄ£¬ÒòΪ´óµÄÀëÔ´½Úµã¸ü½ü
).subgraph(vpred =(vid,v) =>v >= 0)
friends.vertices.collect.foreach(println(_)) |
Ëã·¨Ïê½â£ºÊ×ÏÈ£¬°ÑÄ¿±ê½ÚµãµÄÊôÐÔÖµÖÃΪ2£¬³õʼ»¯ÆäËûµÄËùÓеĽڵãµÄÊôÐÔֵΪ-1£¬µÚÒ»´Îµü´úÏûÏ¢£¨-1£©³õʼ»¯¾ÍÊǸù¾Ývprog
= (vid, attr, msg) => math.max(attr, msg)ÔÙ¹ýÂËÒ»±é½Úµã£¬ÔÚʣϵĵü´ú¹ý³ÌÖУ¬edgeÖеÄÌõ¼þÏÞ¶¨Ö»É¨Ã裺
£¨1£©Èç¹ûԴСÓÚ0£¬Ä¿±êҲСÓÚ0£¬Ôò²»·¢ÏûÏ¢
£¨2£©Èç¹ûԴСÓÚ0£¬Ä¿±ê´óÓÚ0£¬ÔòÄ¿±êÖµ-1¸³¸øÔ´½Úµã
£¨3£©Èç¹ûÔ´´óÓÚ0£¬Ä¿±êÖµÒ²´óÓÚ0£¬Ôò²»·¢ÏûÏ¢
£¨4£©Èç¹ûÔ´´óÓÚ0£¬Ä¿±êֵСÓÚ0£¬Ôò°ÑÔ´-1¸³¸øÄ¿±ê½Úµã
Ò²¾ÍÊÇ˵ֻ»áÔÚÓÐÕý¸º²î¾àµÄµÄ½ÚµãÖ®¼ä²Å»áÓÐÏûÏ¢´«µÝ

³õʼ»¯Í¼

Ìõ¼þ±éÀú
Ëã·¨
pageRank
¸ÃËã·¨¾Í²»¹ý¶à½éÉÜÁË£¬Ö±½ÓÉÏ´úÂ룬»ùÓÚgraphxµÄʵÏÖ£¬ÏëÁ˽â¾ßÌåËã·¨µÄÇë°Ù¶È»òÕßgoogleÒ»´ó¶Ñ
ÕâÀïÊ×ÏȼÙÉèÁËÄãÒѾ¼ÓÔØÁËÒ»¸öͼ
graph.pageRank(0.001,0.15)
.vertices //ÁгöËùÓеã
.sortBy(_._2, false) //¸ù¾Ýpagerank½µÐòÅÅÐò
.take(20) //È¡³öǰ20¸ö
.foreach(println) |
ºÜ¼òµ¥£¬½âÊÍϲÎÊý£º0.001ÊǸöÈÝÈ̶ȣ¬ÊÇÔÚ¶Ôϱ߹«Ê½½øÐеü´ú¹ý³ÌÖÐÍ˳öµü´úµÄÌõ¼þ£¬0.15Ò²ÊÇĬÈϵijõÊ¼Ìø×ª¸ÅÂÊ£¬Ò²¾ÍÊǹ«Ê½ÖеÄresetProb

¹«Ê½
¸öÐÔ»¯pageRank
¸ÃËã·¨Ö÷ÒªÓÃÓÚÍÆ¼öÖУ¬±ÈÈçÉç½»ÍøÂçÖУ¬¶ÔÓÚij¸öÈËÀ´Ëµ£¬ÄãÏë¸øËûÔÙÍÆ¼öÒ»¸öÈË£¬µ±È»Õâ¸ö±»ÍƼöµÄÕâ¸öÈ˿϶¨ÊÇÄǸöijÈ˸ÐÐËȤµÄ¡£»òÕß¶ÔÓÚÓû§ÉÌÆ·µÄÍÆ¼öÖУ¬Óû§ÉÌÆ·Á½¸öʵÌå¿ÉÒÔÐγÉÒ»¸öͼ£¬ÎÒÃǾͿÉÒÔ¸ù¾Ý¾ßÌåµÄij¸öÓû§À´¸øËûÍÆ¼öһЩÉÌÆ·
graph.personalizedPageRank(34175,
0.001) //ijÈËÊÇ34175
.vertices
.filter(_._1 != 34175)
.reduce((a,b) => if (a._2 > b._2) a else
b) //ÕÒ³öÄǸö34175¸ÐÐËȤµÄÈË |
Èý½Ç»·Í³¼Æ
Èý½Ç»·Í³¼ÆÓ¦Óó¡¾°£º´ó¹æÄ£µÄÉçÇø·¢ÏÖ£¬Í¨¹ý¸ÃËã·¨¿ÉÒÔ×öȺÌåÐÔ¼ì²â£¬Éç½»ÍøÂçÖоÍÊÇÄÇÖÖ×éÍŵġ¢¹ØÏµ¸´Ôӵ쬻¥ÏàÓÐÒ»ÍÈÇé¿ö±È½Ï¶àµÄ¡£Ò²¾ÍÊÇ˵£¬ÔÚij¸öÓû§Ï±ߣ¬Õâ¸öÈËÓµÓÐÔ½¶àµÄÈý½ÇÐλ·£¬ÄÇôÕâ¸öÈ˾ÍÓµÓÐÔ½¶àµÄÁ¬½Ó£¬ÕâÑù¾Í¿ÉÒÔ¼ì²âһЩСÍÅÌ壬СÅÉϵµÈ£¬Í¬Ê±Ò²¿ÉÒÔÖ§³ÖÒ»Ð©ÍÆ¼ö£¬È·ÈÏһЩÔìÒ¥ÉúÊÂÕߣ¨Äܹ»¸ù¾ÝͼȥÕÒµ½Ò¥ÑÔµÄÉ¢²¥Õߣ©£¬Ö»ÒªÊǸú´ó¹æÄ£Ð¡ÍÅÌå¼ì²â·½Ãæ¸ÃËã·¨¶¼¿ÉÒԺܺõÄÖ§³Ö
graph.triangleCount()
.vertices
.sortBy(_._2, false)
.take(20)
.foreach(println) |
ÕÒ³öÓµÓÐÈý½ÇÐλ·¹ØÏµµÄ×î¶àµÄ¶¥µã
×î¶Ì·¾¶Ëã·¨
×îËá·¾¶Ëã·¨µÄÔÀíÉÏÃæÒѾ˵¹ýÁË£¬ÏÖÔÚÀûÓÃgraphxÄÚÖõķ½Ê½ÊµÏÖ
ShortestPaths.run(diseaseSymptom,Array(19328L))
.vertices
.filter(!_._2.isEmpty)
.foreach(println) |
ÆäÖÐ19328LÊÇ×Ô¶¨ÒåµÄÆðʼµã
(266,Map(19328
-> 15))
(282,Map(19328 -> 12))
(770,Map(19328 -> 9))
(1730,Map(19328 -> 11))
(2170,Map(19328 -> 6))
(1530,Map(19328 -> 13))
(1346,Map(19328 -> 14))
(378,Map(19328 -> 3))
(1378,Map(19328 -> 11))
(970,Map(19328 -> 10))
... |
½á¹ûÈçÉÏ£¬(266,Map(19328 -> 15))±íʾ19328µ½266µÄ×î¶Ì·¾¶Îª15
¶ÀÁ¢ÈºÌå¼ì²â£º
¶ÀÁ¢ÈºÌå¼ì²â¾ÍÊÇ·¢ÏÖÄÇЩ²»ºÏȺµÄ³É·Ö£¬ÈçÏÂͼ£º

¶ÀÁ¢³É·Ö
val g = Graph(sc.makeRDD((1L
to 7L).map((_,""))),
sc.makeRDD(Array(Edge(2L,5L,""), Edge(5L,3L,""),
Edge(3L,2L,""),
Edge(4L,5L,""), Edge(6L,7L,""))))
g.connectedComponents
.vertices
.map(_.swap)
.groupByKey()
.map(_._2)
.foreach(println) |
Êä³ö½á¹û£º
CompactBuffer(6,
7)
CompactBuffer(4, 2, 3, 5)
CompactBuffer(1) |
Ç¿Á¬½ÓÍøÂç
ËùνµÄÇ¿Á¬½ÓÍøÂç¾ÍÊÇ£ºÔÚÕâ¸öÍøÂçÖÐÎÞÂÛÄã´ÓÄĸö¶¥µã¿ªÊ¼£¬ÆäËûËùÓж¥µã¶¼ÊǿɴïµÄ£¬¾ÍÈçÏÂͼ£º

Ç¿Á¬½ÓÍøÂç
g.stronglyConnectedComponents(3)
.vertices.map(_.swap)
.groupByKey()
.map(_._2)
.foreach(println) |
ÆäÖÐ3ÊÇ×î´óµü´ú´ÎÊý£¬ÔÚÉϱßͼÖУ¬µü´úÈý´Î¸ÕºÃ£¬Ò²¿ÉÒÔÉèÖõĴóÒ»µã£¬²»¹ý½á¹û¶¼ÊÇÒ»ÑùµÄ
±êÇ©´«²¥Ëã·¨£¨LPA£©
Ö÷ÒªÊÇÓÃÓÚÍÅÌå¼ì²â£¬LPAÄܹ»ÒÔ½Ó½üÏßÐÔ¸´ÔÓ¶ÈÈ¥¼ì²âÒ»¸ö´ó¹æÄ£Í¼ÖеÄÍÅÌå½á¹¹£¬Ö÷Ҫ˼ÏëÊǸøËùÓж¥µãÖеÄÃܼ¯Á¬½Ó×é´òÉÏÒ»¸öΨһ±êÇ©£¬ÕâЩӵÓÐÏàͬ±êÇ©µÄ×é¾ÍÊÇËùνµÄÍÅÌå
¸ÃËã·¨³£³£ÊDz»ÊÕÁ²µÄ£¬ÈçÏÂͼ

±êÇ©´«²¥Ëã·¨
¸ÃËã·¨Ò²¿ÉÒÔÓÃÓÚ°ë¼à¶½Ñ§Ï°£¨´ó²¿·ÖûÓбêÇ©£¬Ð¡²¿·ÖÓбêÇ©£©£¬¸øÄÇЩûÓбêÇ©µÄͨ¹ý±êÇ©´«²¥Ëã·¨½øÐдò±êÇ©¡£Ò²¿ÉÒÔÓ¦ÓÃÓÚ·ç¿Ø£¬¶ÔÓÚͨ¹ýÒÑÓзçÏÕÆÀ¹ÀµÄÈË£¬Í¨¹ýÉç½»ÍøÂçÈ¥ÆÀ¹ÀºÎÆäÓйØÏµµÄÈ˵ķçÏÕ
DijkstraËã·¨µÄʵÏÖ

Ë㷨ͼ
¾ÍÄÃÕâ¸öͼΪÀý
Ëã·¨²½Öè¾ÍÊÇ£º
£¨1£©Ê×Ïȳõʼ»¯Í¼£¬°ÑÆðʼĿ±ê½ÚµãÊôÐÔÖµÉèÖóÉ0£¬ÆäËûµÄ½ÚµãÉèÖóÉÕýÎÞÇͬʱ°Ñ½Úµã״̬ȫ²¿ÉèÖóÉδ¼¤»î״̬
£¨2£©È»ºó½øÈëµü´ú²Ù×÷£¬µü´úµÄ´ÎÊýΪËùÓж¥µãµÄ¸öÊý£¬½øÈëµü´ú¹ý³Ì£ºÕÒµ½µ±Ç°µÄ½Úµã£¨¾ÍÊÇÿ´Îµü´ú¹ý³ÌÖкìÉ«µÄµã£©£¬Ã¿´Îµü´ú¶¼»áÉú³ÉÒ»¸öеÄͼ£¬Ö÷ÒªÊÇÒòΪRDDÊDz»¿É±äµÄ£¬Èç¹ûÏë¸üÐÂÒ»¸öRDD¾Í±ØÐëÉú³ÉÒ»¸öеÄRDDÈ»ºó°ÑÁ½¸öRDDÔÙjoinÆðÀ´£¬ËùÒÔ½ÓÏÂÀ´¾ÍÊÇÉú³ÉÐÂͼµÄ¹ý³Ì£¬Õë¶Ô¸Õ²ÅÕÒµ½µÄµ±Ç°½Úµã£¬ÎÒÃÇÏòËüµÄÄ¿µÄÖ¸Ïò¶¥µã·¢ËÍÏûÏ¢£¬ÏûÏ¢¾ÍÊǵ±Ç°½ÚµãµÄÊôÐÔÖµ¼ÓÉÏÖ¸Ïò±ßÉϵÄÈ¨ÖØ£¬È»ºóÔٺϲ¢Ä¿µÄ½ÚµãµÄÊôÐÔÖµ£¬È¡ÆäÖÐ×îСµÄÊôÐÔÖµ£¬Æäʵ¾ÍÊÇÑ¡Ôñµ±Ç°½ÚµãµÄÄ¿µÄÒ»¸ö×îÓÅÄ¿µÄ½Úµã×÷ΪÏÂÒ»ÂÖµü´úµÄµ±Ç°½Úµã¡£ÔÚµ±Ç°½ÚµãÖУ¬·¢ËÍÏûÒÔ¼°ºÏ²¢Ä¿µÄ½ÚµãµÄÊôÐÔÖµÒÔºó¾Í»áÉú³ÉÒ»¸öеÄͼ£¬ÎªÁ˸üгõʼͼ£¬ÎÒÃÇÕâÀïÖ»ÄÜouterJoinVertices£¬°ÑÁ½¸öͼjoinÆðÀ´£¬ÕâÑù²»Í£µÄµü´ú£¬Ö±µ½ËùÓж¥µã¶¼ÊǼ¤»îµÄ
def dijkstra[VD](g:Graph[VD,Double],
origin:VertexId) = {
//³õʼ»¯Æðʼ½ÚµãµÄÊôÐÔÖµ
var g2 = g.mapVertices(
(vid,vd) => (false, if (vid == origin) 0 else
Double.MaxValue))
for (i <- 1L to g.vertices.count-1) {
val currentVertexId =
g2.vertices.filter(!_._2._1)
.fold((0L,(false,Double.MaxValue)))((a,b) =>
if (a._2._2 < b._2._2) a else b)
._1
val newDistances = g2.aggregateMessages[Double](
ctx => if (ctx.srcId == currentVertexId)
ctx.sendToDst(ctx.srcAttr._2 + ctx.attr),
(a,b) => math.min(a,b))
g2 = g2.outerJoinVertices(newDistances)((vid,
vd, newSum) =>
(vd._1 || vid == currentVertexId,
math.min(vd._2, newSum.getOrElse(Double.MaxValue))))
}
g.outerJoinVertices(g2.vertices)((vid, vd, dist)
=>
(vd, dist.getOrElse((false,Double.MaxValue))._2))
} |
|