GraphxʵÏÖÆÊÎö
¸ÅÒª
ͼµÄ²¢Ðл¯´¦ÀíÒ»Ö±ÊÇÒ»¸ö·Ç³£ÈÈÃŵϰÌ⣬ÕâÀïÍ·µÄÖØµãÓÐÁ½¸ö£¬Ò»ÊÇÈçºÎ½«Í¼µÄËã·¨²¢Ðл¯£¬¶þÊÇÕÒµ½Ò»¸öºÏÊʵIJ¢Ðл¯´¦Àí¿ò¼Ü¡£Spark×÷Ϊһ¸ö·Ç³£ÓÅÐãµÄ²¢Ðд¦Àí¿ò¼Ü£¬¼¯³ÉÁËһЩ²¢Ðл¯µÄËã·¨Ò²ÊÇÀíËùµ±È»¡£
GraphxÊÇһЩͼµÄ³£ÓÃËã·¨ÔÚSparkÉϵIJ¢Ðл¯ÊµÏÖ£¬Í¬Ê±ÌṩÁ˷ḻµÄAPI½Ó¿Ú¡£±¾ÎľÍGraphxµÄ´úÂë¼Ü¹¹¼°PageRankÔÚGraphxÖеľßÌåʵÏÖ×öÒ»¸ö³õ²½µÄѧϰ¡£
GoogleΪʲôӮµÃÁËËÑË÷ÒýÇæ´óÕ½
µ±Google»¹ÔÚÆð²½µÄʱºò£¬ÔÚËÑË÷ÒýÇæÁìÓò£¬Yahoo!ÕýÈçÈÕÖÐÌ죬ºìµÄ·¢×Ï¡£ÏÔÈ»£¬ÔÚGoogleÃæÇ°µÄÊÇÒ»¶ÂÈÃÈ˼¸ºõûÓÐÈκÎÏ£ÍûµÄǽ¡£µ«ÊÀÊÂÄÑÁÏ£¬ÏÖÔÚ¡°ÍâÊÂÎʹȸ衱³ÉÁ˲»ÕùµÄÊÂʵ¡£
ÕâÖÖת»»µ½µ×ÊÇÈçºÎÐγɵÄÁË£¬ÓÐÒ»¸öÒòËØÊÇÕâÑùµÄ£¬ÄǾÍÊÇGoogle·¢Ã÷ÁËÏÔÖøÌá¸ßËÑË÷׼ȷÂʵÄPageRankËã·¨¡£Èç¹û˵PageRankËã·¨µÄÌá³öÈùȸèÀÎÀÎÕ¾ÎÈÁËËÑË÷ÒýÇæ´óÕ½µÄ½Å¸ú£¬ÕâÊǺÁ²»¿äÕŵġ£¸öÈËÈÏΪ£¬ËÑË÷ÒýÇæÓм¸¸öÒª¿¼ÂǵĹؼüÒòËØ£º
ÒªÏëÎüÒýÓû§£¬¾Í±ØÐëÒªÓгöÉ«µÄËÑË÷׼ȷÂÊ
ÓÐÁËÓû§£¬²ÅÄÜ×ö¹ã¸æÍ¶·Å£¬Ìá¸ß¹ã¸æÍ¶·ÅµÄÕë¶ÔÐԾͿÉÒÔÓ¯Àû
ÉÏÊöÁ½¸ö·½Ãæ¶¼Óзdz£ÓÅÐãµÄËã·¨¡£
·Ï»°ÉÙÊö£¬»Øµ½ÕýÌâ¡£PageRankËã·¨ÊÇͼÂÛµÄÒ»¸ö¾ßÌåÓ¦Óã¬ÏÂÃæ×ªµ½Í¼ÂÛ¡£
ͼÂÛ¼ò½é
ͼµÄ×é³É
ÀëÉ¢ÊýѧÖзdz£ÖØÒªµÄÒ»¸ö²¿·Ö¾ÍÊÇͼÂÛ£¬ÏÂÃæÊÇÒ»¸öÎÞÏòÁ¬Í¨Í¼

¶¥µã£¨vertex£©
ÉÏͼÖеÄA¡¢B¡¢C¡¢D¡¢E³ÆÎªÍ¼µÄ¶¥µã¡£
±ß
¶¥µãÓë¶¥µãÖ®¼äµÄÁ¬Ïß³ÆÖ®Îª±ß¡£
ͼµÄÊýѧ±íʾ
¶Á´óѧµÄʱºò£¬Ò»Ö±Ã»ÓÐÏëÃ÷°×ΪʲôҪѧÀÍʲ×ÓµÄÏßÐÔ´úÊý¡£Ö±µ½ÕâÁ½Ìì¿´¡¶Êýѧ֮ÃÀ¡·Ò»Êéʱ£¬²Å·¢¾õ£¬ÏßÐÔ´úÊýÔÚһЩ¼ÆËã»úÓ¦ÓÃÁìÓò£¬ÄǼòÖ±¾ÍÊDz»¿É»òȱ°¡¡£
ÎÒÃDZȽÏÈÝÒ×Àí½âµÄÆ½Ãæ¼¸ºÎºÍÁ¢Ì弸ºÎ£¨Ò»¸öÊǶþά£¬Ò»¸öÊÇÈýά£©£¬¶øÏßÐÔ´úÊý½â¾öµÄÆäʵÊÇÒ»¸ö¸ßάÎÊÌ⣬ÓÉÓÚÎÞ·¨Ö±¾õµÄ¸ÐÊܵ½£¬ËùÒÔºÜÄÑ¡£Èç¹ûÏë±È½ÏͨË×µÄÀí½âÒ»ÏÂÊýѧΪʲôÓÐÕâô¶àµÄ·ÖÖ§¼°ÆäÄÚÔÚ¹ØÁª£¬Ç¿ÁÒÍÆ¼ö¶Áһϡ¶ÊýѧÇŶԸߵÈÊýѧµÄÒ»´Î¹ÛÉÍÖ®Âá·¡£
ÔÚÊýѧÖУ¬ÓÃʲôÀ´±íÊ¾Í¼ÄØ£¬´ð°¸¾ÍÊÇÏßÐÔ´úÊýÀïÃæµÄ¾ØÕó£¬ÏëÏë¿´£¬Í¼µÄ¹ØÁª¾ØÕó£¬Í¼µÄÁÚ½Ó¾ØÕó¡£×ÜÖ®¾ÍÊǾØÕó£¬ÏßÐÔ´úÊýÒ»ÏÂ×ÓÓÐÓÃÁË¡£ÏÂÃæÊÇÒ»¸ö¾ßÌåµÄÀý×Ó¡£

ͼµÄ²¢Ðл¯´¦Àí
¸Õ²Å˵µ½Í¼¿ÉÒÔÓþØÕóÀ´±íʾ£¬Í¼µÄ²¢Ðл¯ÎÊÌâÔÚijÖ̶ֳÈÉϾͱ»×ª»¯Îª¾ØÕóÔËËãµÄ²¢Ðл¯ÎÊÌâ¡£ÄÇôÒÔ¾ØÕóµÄ³Ë·¨ÎªÀý£¬¿´¿´ÆäÊÇ·ñ¿ÉÒÔ²¢Ðл¯´¦Àí¡£ÒÔ¾ØÕó
A X B ΪÀý£¬ËµÃ÷²¢Ðл¯´¦Àí¹ý³Ì¡£
½«ÉÏÊöµÄ¾ØÕóAºÍB»®·ÖΪËĸö²¿·Ö£¬ÈçÏÂͼËùʾ

Ê×´Î¶ÔÆëÖ®ºó

×Ó¾ØÕóÏà³Ë

Ïà³ËÖ®ºó£¬AµÄ×Ó¾ØÕó×óÒÆ£¬BµÄ×Ó¾ØÕóÉÏÒÆ

¼ÆËã½á¹ûºÏ²¢

ͼµÄ²¢Ðл¯´¦Àí¿ò¼Ü£¬´ÓPregel˵Æð¡£ÉÏÒ»½ÚµÄÖØµãÓÐÁ½µã£º
ͼÓþØÕóÀ´±íʾ£¬¶ÔͼµÄÔËËã¾ÍÊǾØÕóµÄÔËËã
¾ØÕó³Ë·¨ÔËËã¿ÉÒÔ²¢Ðл¯£¬¶¯Ì¬ÑÝʾÆä²¢Ðл¯µÄÔÀí
ÄÇÓÐûÓÐÒ»ÖÖºÏÊʵIJ¢Ðл¯´¦Àí¿ò¼Ü¿ÉÒÔÓÃÀ´½øÐÐͼµÄ¼ÆËãÄØ£¿ÄÇÄã¿Ï¶¨Ïëµ½ÁËMapReduce¡£MapReduce¾¡¹ÜÒ²ÊÇÒ»¸ö²»´íµÄ²¢Ðл¯´¦Àí¿ò¼Ü£¬µ«ÔÚͼ¼ÆËã·½Ãæ£¬ÓÐÐí¶àȱµã£¬Ö÷ÒªÊǼÆËãµÄÖмä¹ý³ÌÐèÒª´æ´¢µ½Ó²ÅÌ£¬Ð§Âʺܵ͡£GoogleÕë¶ÔͼµÄ²¢Ðд¦Àí£¬×¨ÃÅÌá³öÁËÒ»¸öÁ˲»ÆðµÄ¿ò¼ÜPregel¡£ÆäÖ´ÐÐʱµÄ¶¯Ì¬ÊÓͼÈçÏÂËùʾ¡£PregelÓÐÈçÏÂÓŵ㣺
¼¶Áª¿ÉÀ©ÐԺ㬼´Scalability
ÈÝ´íÐÔÇ¿
Äܹ»ºÜºÃµÄ±íʾ¸÷ÖÖͼµÄ³£ÓÃËã·¨

PregelµÄ¼ÆËãÄ£ÐÍ
¼ÆËãÄ£ÐÍÈçÏÂͼËùʾ£¬ÖØÒªµÄÓÐÈý¸ö
×÷ÓÃÓÚÿ¸ö¶¥µãµÄ´¦ÀíÂß¼ vertexProgram
ÏûÏ¢·¢ËÍ£¬ÓÃÓÚÏàÁÚ½Úµã¼äµÄͨѶ sendMessage
ÏûÏ¢ºÏ²¢Âß¼ messageCombining

PregelÔÚSparkÖеÄʵÏÖ
·Ç³£¸ÐлÄãÄܼá³Ö¿´µ½ÏÖÔÚ£¬ÕâÆª²©¿ÍÄÚÈݺܶ࣬ÓеãÄÑ¡£ÎÒÏ뻹ÊÇÉÏÒ»·ùͼ½«ÆäÄÚÔÚÂß¼ÕûÒ»ÏÂÔÙ¼ÌÐøËµÏÂÈ¥¡£

¸ÃͼҪ±íʾµÄÒâ˼ÊÇÕâÑùµÄ£¬GraphxÀûÓÃÁËSparkÕâÑùÁËÒ»¸ö²¢Ðд¦Àí¿ò¼ÜÀ´ÊµÏÖÁËͼÉϵÄһЩ¿É²¢Ðл¯Ö´ÐеÄËã·¨¡£±¾Æª²©¿ÍÒª±í´ïµÄÒâ˼¾ÍÊÇÉÏÃæ¼ÓºìµÄÕâ¾ä»°£¬ÇëÖîλ¿´¹Ù×ÐϸÀí½â¡£
Ëã·¨ÊÇ·ñÄܹ»²¢Ðл¯ÓëSpark±¾ÉíÎÞ¹Ø
Ëã·¨²¢Ðл¯Óë·ñµÄ±¾Éí£¬ÐèҪͨ¹ýÊýѧÀ´Ö¤Ã÷
ÒѾ֤Ã÷µÄ¿É²¢Ðл¯Ëã·¨£¬ÀûÓÃSparkÀ´ÊµÏÖ»áÊÇÒ»¸ö´íµÄÑ¡Ôñ£¬ÒòΪGraphxÖ§³ÖpregelµÄͼ¼ÆËãÄ£ÐÍ
GraphxÖеÄÖØÒª¸ÅÄî
Graph
ºÁÎÞÒÉÎÊ£¬Í¼±¾ÉíÊÇgraphxÖÐÒ»¸ö·Ç³£ÖØÒªµÄ¸ÅÄî¡£
³ÉÔ±±äÁ¿
GraphÖÐÖØÒªµÄ³ÉÔ±±äÁ¿·Ö±ðΪ
vertices
edges
triplets
ΪʲôҪÒýÈëtripletsÄØ£¬Ö÷ÒªÊǺÍPregelÕâ¸ö¼ÆËãÄ£ÐÍÏà¹Ø£¬ÔÚtripletsÖУ¬Í¬Ê±¼Ç¼×ÅedgeºÍvertex.
¾ßÌå´úÂë¾Í²»ÂÞÁÐÁË¡£
³ÉÔ±º¯Êý
º¯Êý·Ö³É¼¸´óÀà
¶ÔËùÓж¥µã»ò±ßµÄ²Ù×÷£¬µ«²»¸Ä±äͼ½á¹¹±¾Éí£¬ÈçmapEdges¡¢mapVertices
×Óͼ£¬ÀàËÆÓÚ¼¯ºÏ²Ù×÷ÖеÄfilter subGraph
ͼµÄ·Ö¸î£¬¼´paritition²Ù×÷£¬Õâ¸ö¶ÔÓÚSpark¼ÆËãÀ´Ëµ£¬ºÜ¹Ø¼ü£¬ÕýÊÇÒòΪÓÐÁ˲»Í¬µÄPartition£¬²ÅÓÐÁ˲¢Ðд¦ÀíµÄ¿ÉÄÜ£¬²»Í¬µÄPartitionStrategy£¬ÆäÊÕÒæ²»Í¬¡£×îÈÝÒ×Ïëµ½µÄ¾ÍÊÇÀûÓÃHashÀ´½«Õû¸öͼ·Ö³É¶à¸öÇøÓò
outerJoinVertices ¶¥µãµÄÍâÁ¬½Ó²Ù×÷
ͼµÄÔËËãºÍ²Ù×÷ GraphOps
ͼµÄ³£ÓÃËã·¨ÊǼ¯ÖгéÏóµ½GraphOpsÕâ¸öÀàÖУ¬ÔÚGraphÀï×÷ÁËÒþʽת»»£¬½«Graphת»»
GraphOps
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag] (g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops |
Ö§³ÖµÄ²Ù×÷ÈçÏÂ
collectNeighborIds
collectNeighbors
collectEdges
joinVertices
filter
pickRandomVertex
pregel
pageRank
staticPageRank
connectedComponents
triangleCount
stronglyConnectedComponents
RDD
RDDÊÇSparkÌåϵµÄºËÐÄ£¬ÄÇôGraphxÖÐÒýÈëÁËÄÄЩеÄRDDÄØ£¬ÓÐÁ©£¬·Ö±ðΪ
VertexRDD
EdgeRDD
½ÏÖ®EdgeRdd£¬VertexRDD¸üÎªÖØÒª£¬ÆäÉϵIJÙ×÷Ò²ºÜ¶à£¬Ö÷Òª¼¯ÖÐÓÚVertexÖ®ÉÏÊôÐԵĺϲ¢£¬Ëµµ½ºÏ²¢¾Í²»µÃ²»³¶µ½¹ØÏµ´úÊýºÍ¼¯ºÏÂÛ£¬ËùÒÔÔÚVertexRddÖÐÄÜ¿´µ½Ðí¶àÀàËÆÓÚsqlÖеÄÊõÓÈç
leftJoin
innerJoin
ÖÁÓÚleftJoin¡¢innerJoin¡¢outerJoinµÄÇø±ð£¬½¨Òé¹È¸èһϣ¬²»ÔÙ׸Êö¡£
Graphx³¡¾°·ÖÎö
ͼµÄ´æ´¢ºÍ¼ÓÔØ
ÔÚ½øÐÐÊýѧ¼ÆËãµÄʱºò£¬Í¼ÓÃÏßÐÔ´úÊýÖеľØÕóÀ´±íʾ£¬ÄÇôÈçºÎ½øÐÐ´æ´¢ÄØ£¿
ѧÊý¾Ý½á¹¹µÄʱºò£¬ÀÏʦ¿Ï¶¨Ëµ¹ýºÃ¶àµÄ°ì·¨£¬²»ÔÙ†ªàÂÁË¡£
²»¹ýÔÚ´óÊý¾ÝµÄ»·¾³Ï£¬Èç¹ûͼºÜ¾Þ´ó£¬±íʾ¶¥µãºÍ±ßµÄÊý¾Ý²»×ãÒÔ·ÅÔÚÒ»¸öÎļþÖÐÔõô°ì£¿ ÓÃHDFS¡£
¼ÓÔØµÄʱºò£¬Ò»Ì¨»úÆ÷µÄÄÚ´æ²»×ãÒÔÈÝÏÂÔõô°ì£¿ ÑÓ³Ù¼ÓÔØ£¬ÔÚÕæÕýÐèÒªÊý¾Ýʱ£¬½«Êý¾Ý·Ö·¢µ½²»Í¬»úÆ÷ÖУ¬²ÉÓü¶Áª·½Ê½¡£
Ò»°ãÀ´Ëµ£¬ÎÒÃǻὫËùÓÐÓë¶¥µãÏà¹ØµÄÄÚÈݱ£´æÔÚÒ»¸öÎļþÖÐvertexFile£¬ËùÓÐÓë±ßÏà¹ØµÄÐÅÏ¢±£´æÔÚÁíÒ»¸öÎļþÖÐedgeFile¡£
Éú³Éijһ¸ö¾ßÌåµÄͼʱ£¬ÓÃedge¾Í¿ÉÒÔ±íʾͼÖж¥µãµÄ¹ØÁª¹ØÏµ£¬Í¬Ê±Í¼µÄ½á¹¹Ò²±íʾ³öÀ´ÁË¡£
GraphLoader
graphLoaderÊÇgraphxÖÐרÃÅÓÃÓÚͼµÄ¼ÓÔØºÍÉú³É£¬×îÖØÒªµÄº¯Êý¾ÍÊÇedgeListFile£¬¶¨ÒåÈçÏ£º
def edgeListFile( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, minEdgePartitions: Int = 1, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) : Graph[Int, Int] = { val startTime = System.currentTimeMillis // Parse the edge data table directly into edge partitions val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions) val edges = lines.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[Int, Int] iter.foreach { line => if (!line.isEmpty && line(0) != '#') { val lineArray = line.split("\\s+") if (lineArray.length < 2) { logWarning("Invalid line: " + line) } val srcId = lineArray(0).toLong val dstId = lineArray(1).toLong if (canonicalOrientation && srcId > dstId) { builder.add(dstId, srcId, 1) } else { builder.add(srcId, dstId, 1) } } } Iterator((pid, builder.toEdgePartition)) }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path)) edges.count() logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel, vertexStorageLevel = vertexStorageLevel) } // end of edgeListFile |
Ó¦ÓþÙÀýÖ®PageRank
ʲôÊÇPageRank
PageRankÊÇGoogleרÓеÄËã·¨£¬ÓÃÓÚºâÁ¿Ìض¨ÍøÒ³Ïà¶ÔÓÚËÑË÷ÒýÇæË÷ÒýÖÐµÄÆäËûÍøÒ³¶øÑÔµÄÖØÒª³Ì¶È¡£ËüÓÉLarry
Page ºÍ Sergey BrinÔÚ20ÊÀ¼Í90Äê´úºóÆÚ·¢Ã÷¡£PageRankʵÏÖÁ˽«Á´½Ó¼ÛÖµ¸ÅÄî×÷ΪÅÅÃûÒòËØ¡£PageRank½«¶ÔÒ³ÃæµÄÁ´½Ó¿´³ÉͶƱ£¬Ö¸Ê¾ÁËÖØÒªÐÔ¡£
PageRankµÄºËÐÄ˼Ïë
¡±ÔÚ»¥ÁªÍøÉÏ£¬Èç¹ûÒ»¸öÍøÒ³±»ºÜ¶àÆäËüÍøÒ³ËùÁ´½Ó£¬ËµÃ÷ËüÊܵ½ÆÕ±éµÄ³ÐÈϺÍÒÀÀµ£¬ÄÇôËüµÄÅÅÃû¾ÍºÜ¸ß¡£¡° £¨Õª×ÔÊýѧ֮ÃÀµÚ10Õ£©
Äã˵ÕâҲ̫¼òµ¥Á˰ɣ¬²»ÊǸúû˵һ¸öÑùÂð£¬ÔõôÓÃÊýѧÀ´±íÊ¾ÄØ£¿
ºÇºÇ£¬Æð³õÎÒÒ²ÕâôÏëµÄ£¬ºóÀ´¶à¿´Á˼¸±éÖ®ºó£¬Ã÷°×ÁËÒ»µãµã¡£·ÖÎö²½ÖèÓÃÎÄ×Ö±íÊöÈçÏ£¬
ÍøÒ³ºÍÍøÒ³Ö®¼äµÄ¹ØÏµÓÃͼÀ´±íʾ
ÍøÒ³AºÍÍøÒ³BÖ®¼äµÄÁ¬½Ó¹ØÏµ±íʾÈÎÒâÒ»¸öÓû§´ÓÍøÒ³Aµ½×ªµ½ÍøÒ³BµÄ¿ÉÄÜÐÔ£¨¸ÅÂÊ£©
ËùÓÐÍøÒ³µÄÅÅÃûÓÃһάÏòÁ¿À´BÀ´±íʾ
ËùÓÐÍøÒ³Ö®¼äµÄÁ¬½ÓÓþØÕóAÀ´±íʾ£¬ËùÓÐÍøÒ³ÅÅÃûÓÃBÀ´±íʾ¡£
¡°ÔÚ»¥ÁªÍøÉÏ£¬Èç¹ûÒ»¸öÍøÒ³±»ºÜ¶àÆäËüÍøÒ³ËùÁ´½Ó£¬ËµÃ÷ËüÊܵ½ÆÕ±éµÄ³ÐÈϺÍÒÀÀµ£¬ÄÇôËüµÄÅÅÃû¾ÍºÜ¸ß¡£"£¨Õª×ÔÊýѧ֮ÃÀµÚ10Õ£©
Äã˵ÕâҲ̫¼òµ¥Á˰ɣ¬²»ÊǸúû˵һ¸öÑùÂð£¬ÔõôÓÃÊýѧÀ´±íÊ¾ÄØ£¿ºÇºÇ£¬Æð³õÎÒÒ²ÕâôÏëµÄ£¬ºóÀ´¶à¿´Á˼¸±éÖ®ºó£¬Ã÷°×ÁËÒ»µãµã¡£·ÖÎö²½ÖèÓÃÎÄ×Ö±íÊöÈçÏ£º
1.ÍøÒ³ºÍÍøÒ³Ö®¼äµÄ¹ØÏµÓÃͼÀ´±íʾ
2.ÍøÒ³AºÍÍøÒ³BÖ®¼äµÄÁ¬½Ó¹ØÏµ±íʾÈÎÒâÒ»¸öÓû§´ÓÍøÒ³Aµ½×ªµ½ÍøÒ³BµÄ¿ÉÄÜÐÔ£¨¸ÅÂÊ£©
3.ËùÓÐÍøÒ³µÄÅÅÃûÓÃһάÏòÁ¿À´BÀ´±íʾ
ËùÓÐÍøÒ³Ö®¼äµÄÁ¬½ÓÓþØÕóAÀ´±íʾ£¬ËùÓÐÍøÒ³ÅÅÃûÓÃBÀ´±íʾ¡£

pageRankÈçºÎ½øÐв¢Ðл¯
ºÃÁË£¬ÉÏÃæµÄÊýѧ²ûÊö˵Ã÷ÁË¡°ÍøÒ³ÅÅÃûµÄ¼ÆËã¿ÉÒÔ×îÖÕ³éÏóΪ¾ØÕóÏà³Ë¡±£¬¶øÔÚ¿ªÊ¼µÄʱºòÒѾ֤Ã÷¹ý ¾ØÕóÏà³Ë¿ÉÒÔ²¢Ðл¯´¦Àí
¡£
ÀíÂÛÑо¿½áÊøÁË£¬½ÓÏÂÀ´µÄ¾ÍÊǹ¤³ÌʵÏÖÁË£¬½èÓÃPregelÄ£ÐÍ£¬PageRankÖж¨ÒåµÄ¸÷Ö÷Òªº¯Êý·Ö±ðÈçÏ¡£
vertexProgram
def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = { val (oldPR, lastDelta) = attr val newPR = oldPR + (1.0 - resetProb) * msgSum (newPR, newPR - oldPR) } |
sendMessage
def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = { if (edge.srcAttr._2 > tol) { Iterator((edge.dstId, edge.srcAttr._2 * edge.attr)) } else { Iterator.empty } } |
messageCombiner
def messageCombiner(a: Double, b: Double): Double = a + b |
Ò»µãµãÆôʾ

ͨ¹ýpageRankÕâ¸öÀý×Ó£¬ÎÒÃÇÄܹ»¸ãÇå³þÈçºÎ½«Æ½ËØÑ§Ï°µÄÊýѧÀíÂÛÓÃÒÔ½â¾öʵ¼ÊÎÊÌâ¡£
¡°Ñ§Ï°µÄ¶«Î÷×ÜÊÇÓмÛÖµµÄ£¬ÖÁÓÚÓõÄÉÏÓò»ÉÏ£¬È«¿¿Ô컯ÁË¡±
ÍêÕû´úÂë
// Connect to the Spark cluster val sc = new SparkContext("spark://master.amplab.org", "research") // Load my user data and parse into tuples of user id and attribute list val users = (sc.textFile("graphx/data/users.txt") .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) )) // Parse the edge data which is already in userId -> userId format val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt") // Attach the user attributes val graph = followerGraph.outerJoinVertices(users) { case (uid, deg, Some(attrList)) => attrList // Some users may not have attributes so we set them as empty case (uid, deg, None) => Array.empty[String] } // Restrict the graph to users with usernames and names val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2) // Compute the PageRank val pagerankGraph = subgraph.pageRank(0.001) // Get the attributes of the top pagerank users val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) { case (uid, attrList, Some(pr)) => (pr, attrList.toList) case (uid, attrList, None) => (0.0, attrList.toList) } println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) |
С½á
±¾Æª½²À´½²È¥¾ÍÔÚÇ¿µ÷Ò»¸öÎÊÌ⣬SparkÊÇÒ»¸ö·Ö²¼Ê½²¢ÐмÆËã¿ò¼Ü¡£Äܲ»ÄÜÓÃSpark£¬Æäʵ´óÌåÈ¡¾öÓÚÎÊÌâµÄÊýѧģÐͱ¾Éí£¬Èç¹û¿ÉÒÔ²¢Ðл¯´¦Àí£¬ÔòÓÃÖ®£¬Çв»¿ÉÏ÷×ãÊÊÂÄ¡£
ÁíÒ»¸öÓÃÕÅͼÀ´×ܽáÒ»ÏÂÌáµ½µÄÊýѧ֪ʶ°É¡£

|