Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Python¶àºË±à³Ìmpi4pyʵ¼ù
 
  4644  次浏览      30
 2018-5-2 
 
±à¼­ÍƼö:
±¾ÎÄÀ´Ô´ÓÚcsdn£¬½éÉÜÁËMPIÓëmpi4py£¬³£¼ûÓ÷¨£¬MPIºÍmpi4pyµÄ»·¾³´î½¨µÈ֪ʶ¡£

Ò»¡¢¸ÅÊö

CPU´ÓÈýÊ®¶àÄêǰµÄ8086£¬µ½Ê®ÄêǰµÄ±¼ÌÚ£¬ÔÙµ½µ±ÏµĶàºËi7¡£Ò»¿ªÊ¼£¬ÒÔµ¥ºËcpuµÄÖ÷ƵΪĿ±ê£¬¼Ü¹¹µÄ¸ÄÁ¼ºÍ¼¯³Éµç·¹¤ÒյĽø²½Ê¹µÃcpuµÄÐÔÄܸßËÙÉÏÉý£¬µ¥ºËcpuµÄÖ÷Ƶ´ÓÀÏÒ¯³µµÄMHz½×¶ÎÒ»¶È½Ó½ü4GHz¸ßµØ¡£È»¶ø£¬Ò²ÒòΪ¹¤Òպ͹¦ºÄµÈµÄÏÞÖÆ£¬µ¥ºËcpuÓöµ½ÁËÈËÉúµÄÌ컨°å£¬¼±Ðèת»»Ë¼Î¬£¬ÒÔÂú×ãÎÞÖ¹¾³µÄÐÔÄÜÐèÇó¡£¶àºËcpuÔڴ˵ÇÉÏÀúÊ·Îę̀¡£¸øÄãµÄÀÏÒ¯³µ¶à¼ÓÁ½¸öÒýÇæ£¬ÈÃÄãÓз¨À­ÀûµÄ¸Ð¾õ¡£ÏÖʱ´ú£¬Á¬ÊÖ»ú¶¼µ½´¦½ÐÏù×Ô¼ºÓÐ4ºË8ºË´¦ÀíÆ÷µÄʱ´ú£¬PC¾Í¸ü²»ÓÃ˵ÁË¡£

³¶Ô¶ÁË£¬anyway£¬¶ÔÓÚ°³ÃdzÌÐòÔ±À´Ëµ£¬ÈçºÎÀûÓÃÈç´ËÇ¿´óµÄÒýÇæÍê³ÉÎÒÃǵÄÈÎÎñ²ÅÊÇÎÒÃÇÒª¿¼Âǵġ£Ëæ×Å´ó¹æÄ£Êý¾Ý´¦Àí¡¢´ó¹æÄ£ÎÊÌâºÍ¸´ÔÓϵͳÇó½âÐèÇóµÄÔö¼Ó£¬ÒÔǰµÄµ¥ºË±à³ÌÒѾ­ÓÐÐÄÎÞÁ¦ÁË¡£Èç¹û³ÌÐòÒ»Åܾ͵ü¸¸öСʱ£¬ÉõÖÁÒ»Ì죬ÏëÏë¶¼ÎÞ·¨Ô­ÁÂ×Ô¼º¡£ÄÇÈçºÎÈÃ×Ô¼º¸ü¿ìµÄ¹ý¶Èµ½¸ß´óÉϵĶàºË²¢Ðбà³ÌÖÐÈ¥ÄØ£¿¹þ¹þ£¬¹ã´óÈËÃñµÄÁ¦Á¿£¡

Ŀǰ¹¤×÷ÖÐÎÒËù½Ó´¥µ½µÄ²¢Ðд¦Àí¿ò¼ÜÖ÷ÒªÓÐMPI¡¢OpenMPºÍMapReduce(Hadoop)Èý¸ö£¨CUDAÊôÓÚGPU²¢Ðбà³Ì£¬ÕâÀï²»Ìá¼°£©¡£MPIºÍHadoop¶¼¿ÉÒÔÔÚ¼¯ÈºÖÐÔËÐУ¬¶øOpenMPÒòΪ¹²Ïí´æ´¢½á¹¹µÄ¹ØÏµ£¬²»ÄÜÔÚ¼¯ÈºÉÏÔËÐУ¬Ö»Äܵ¥»ú¡£ÁíÍ⣬MPI¿ÉÒÔÈÃÊý¾Ý±£ÁôÔÚÄÚ´æÖУ¬¿ÉÒÔΪ½Úµã¼äµÄͨÐźÍÊý¾Ý½»»¥±£´æÉÏÏÂÎÄ£¬ËùÒÔÄÜÖ´Ðеü´úËã·¨£¬¶øHadoopÈ´²»¾ßÓÐÕâ¸öÌØÐÔ¡£Òò´Ë£¬ÐèÒªµü´úµÄ»úÆ÷ѧϰËã·¨´ó¶àʹÓÃMPIÀ´ÊµÏÖ¡£µ±È»ÁË£¬²¿·Ö»úÆ÷ѧϰËã·¨Ò²ÊÇ¿ÉÒÔͨ¹ýÉè¼ÆÊ¹ÓÃHadoopÀ´Íê³ÉµÄ¡££¨Ç³¼û£¬Èç¹û´íÎó£¬Ï£Íû¸÷λ²»ÁßÖ¸³ö£¬Ð»Ð»£©¡£

±¾ÎÄÖ÷Òª½éÉÜPython»·¾³ÏÂMPI±à³ÌµÄʵ¼ù»ù´¡¡£

¶þ¡¢MPIÓëmpi4py

MPIÊÇMessage Passing InterfaceµÄ¼ò³Æ£¬Ò²¾ÍÊÇÏûÏ¢´«µÝ¡£ÏûÏ¢´«µÝÖ¸µÄÊDz¢ÐÐÖ´Ðеĸ÷¸ö½ø³Ì¾ßÓÐ×Ô¼º¶ÀÁ¢µÄ¶ÑÕ»ºÍ´úÂë¶Î£¬×÷Ϊ»¥²»Ïà¹ØµÄ¶à¸ö³ÌÐò¶ÀÁ¢Ö´ÐУ¬½ø³ÌÖ®¼äµÄÐÅÏ¢½»»¥Íêȫͨ¹ýÏÔʾµØµ÷ÓÃͨÐź¯ÊýÀ´Íê³É¡£

Mpi4pyÊǹ¹½¨ÔÚmpiÖ®ÉϵÄpython¿â£¬Ê¹µÃpythonµÄÊý¾Ý½á¹¹¿ÉÒÔÔÚ½ø³Ì£¨»òÕß¶à¸öcpu£©Ö®¼ä½øÐд«µÝ¡£

2.1¡¢MPIµÄ¹¤×÷·½Ê½

ºÜ¼òµ¥£¬¾ÍÊÇÄãÆô¶¯ÁËÒ»×éMPI½ø³Ì£¬Ã¿¸ö½ø³Ì¶¼ÊÇÖ´ÐÐͬÑùµÄ´úÂ룡Ȼºóÿ¸ö½ø³Ì¶¼ÓÐÒ»¸öID£¬Ò²¾ÍÊÇrankÀ´±ê¼ÇÎÒÊÇË­¡£Ê²Ã´ÒâË¼ÄØ£¿¼ÙÉèÒ»¸öCPUÊÇÄãÇëµÄÒ»¸ö¹¤ÈË£¬¹²ÓÐ10¸ö¹¤ÈË¡£ÄãÓÐ100¿éשͷҪ°á£¬È»ºóºÜ¹«Æ½£¬ÈÃÿ¸ö¹¤È˰á10¿é¡£Õâʱºò£¬Äã°ÑÈÎÎñдµ½Ò»¸öÈÎÎñ¿¨ÀïÃæ£¬ÈÃ10¸ö¹¤È˶¼Ö´ÐÐÕâ¸öÈÎÎñ¿¨ÖеÄÈÎÎñ£¬Ò²¾ÍÊǰáש£¡Õâ¸öÈÎÎñ¿¨Öеġ°°áש¡±¾ÍÊÇÄãдµÄ´úÂ롣Ȼºó10¸öCPUÖ´ÐÐͬһ¶Î´úÂë¡£ÐèҪעÒâµÄÊÇ£¬´úÂëÀïÃæµÄËùÓбäÁ¿¶¼ÊÇÿ¸ö½ø³Ì¶ÀÓеģ¬ËäÈ»Ãû×ÖÏàͬ¡£

ÀýÈ磬һ¸ö½Å±¾test.py£¬ÀïÃæ°üº¬ÒÔÏ´úÂ룺

from mpi4py import MPI
print("hello world'')
print("my rank is: %d" %MPI.rank)

È»ºóÎÒÃÇÔÚÃüÁîÐÐͨ¹ýÒÔÏ·½Ê½ÔËÐУº

#mpirun ¨Cnp 5 python test.py

-np5 Ö¸¶¨Æô¶¯5¸ömpi½ø³ÌÀ´Ö´ÐкóÃæµÄ³ÌÐò¡£Ï൱ÓڶԽű¾¿½±´ÁË5·Ý£¬Ã¿¸ö½ø³ÌÔËÐÐÒ»·Ý£¬»¥²»¸ÉÈÅ¡£ÔÚÔËÐеÄʱºò´úÂëÀïÃæÎ¨Ò»µÄ²»Í¬£¬¾ÍÊǸ÷×ÔµÄrankÒ²¾ÍÊÇID²»Ò»Ñù¡£ËùÒÔÕâ¸ö´úÂë¾Í»á´òÓ¡5¸öhello worldºÍ5¸ö²»Í¬µÄrankÖµ£¬´Ó0µ½4.

2.2¡¢µã¶ÔµãͨÐÅ

µã¶ÔµãͨÐÅ£¨Point-to-PointCommunication£©µÄÄÜÁ¦ÊÇÐÅÏ¢´«µÝϵͳ×î»ù±¾µÄÒªÇó¡£Òâ˼¾ÍÊÇÈÃÁ½¸ö½ø³ÌÖ±½Ó¿ÉÒÔ´«ÊäÊý¾Ý£¬Ò²¾ÍÊÇÒ»¸ö·¢ËÍÊý¾Ý£¬ÁíÒ»¸ö½ÓÊÕÊý¾Ý¡£½Ó¿Ú¾ÍÁ½¸ö£¬sendºÍrecv£¬À´¸öÀý×Ó£º

import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
# point to point communication
data_send = [comm_rank]*5
comm.send(data_send,dest=(comm_rank+1)%comm_size)
data_recv =comm.recv(source=(comm_rank-1)%comm_size)
print("my rank is %d, and Ireceived:" % comm_rank)
print data_recv

Æô¶¯5¸ö½ø³ÌÔËÐÐÒÔÉÏ´úÂ룬½á¹ûÈçÏ£º

my rank is 0, and I received:
[4, 4, 4, 4, 4]
my rank is 1, and I received:
[0, 0, 0, 0, 0]
my rank is 2, and I received:
[1, 1, 1, 1, 1]
my rank is 3, and I received:
[2, 2, 2, 2, 2]
my rank is 4, and I received:
[3, 3, 3, 3, 3]

¿ÉÒÔ¿´µ½£¬Ã¿¸ö½ø³Ì¶¼´´½¨ÁËÒ»¸öÊý×飬Ȼºó°ÑËü´«µÝ¸øÏÂÒ»¸ö½ø³Ì£¬×îºóµÄÄǸö½ø³Ì´«µÝ¸øµÚÒ»¸ö½ø³Ì¡£comm_size¾ÍÊÇmpiµÄ½ø³Ì¸öÊý£¬Ò²¾ÍÊÇ-npÖ¸¶¨µÄÄǸöÊý¡£MPI.COMM_WORLD ±íʾ½ø³ÌËùÔÚµÄͨÐÅ×é¡£

µ«ÕâÀïÃæÓиöÐèҪעÒâµÄÎÊÌ⣬Èç¹ûÎÒÃÇÒª·¢Ë͵ÄÊý¾Ý±È½ÏСµÄ»°£¬mpi»á»º´æÎÒÃǵÄÊý¾Ý£¬Ò²¾ÍÊÇ˵ִÐе½sendÕâ¸ö´úÂëµÄʱºò£¬»á»º´æ±»sendµÄÊý¾Ý£¬È»ºó¼ÌÐøÖ´ÐкóÃæµÄÖ¸Á¶ø²»»áµÈ´ý¶Ô·½½ø³ÌÖ´ÐÐrecvÖ¸Áî½ÓÊÕÍêÕâ¸öÊý¾Ý¡£µ«ÊÇ£¬Èç¹ûÒª·¢Ë͵ÄÊý¾ÝºÜ´ó£¬ÄÇô½ø³Ì¾ÍÊÇ¹ÒÆðµÈ´ý£¬Ö±µ½½ÓÊÕ½ø³ÌÖ´ÐÐÁËrecvÖ¸Áî½ÓÊÕÁËÕâ¸öÊý¾Ý£¬½ø³Ì²Å¼ÌÐøÍùÏÂÖ´ÐС£ËùÒÔÉÏÊöµÄ´úÂë·¢ËÍ[rank]*5ûɶÎÊÌ⣬Èç¹û·¢ËÍ[rank]*500³ÌÐò¾Í»á°ëËÀ²»»îµÄÑù×ÓÁË¡£ÒòΪËùÓеĽø³Ì¶¼»á¿¨ÔÚ·¢ËÍÕâÌõÖ¸ÁµÈ´ýÏÂÒ»¸ö½ø³Ì·¢Æð½ÓÊÕµÄÕâ¸öÖ¸Áµ«Êǽø³ÌÊÇÖ´ÐÐÍê·¢Ë͵ÄÖ¸Áî²ÅÄÜÖ´ÐнÓÊÕµÄÖ¸ÁÕâ¾ÍºÍËÀËø²î²»¶àÁË¡£ËùÒÔÒ»°ã£¬ÎÒÃǽ«ÆäÐ޸ijÉÒÔϵķ½Ê½£º

import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
data_send = [comm_rank]*5
if comm_rank == 0:
comm.send(data_send, dest=(comm_rank+1)%comm_size)
if comm_rank > 0:
data_recv = comm.recv(source=(comm_rank-1)%comm_size)
comm.send(data_send, dest=(comm_rank+1)%comm_size)
if comm_rank == 0:
data_recv = comm.recv(source=(comm_rank-1)%comm_size)
print("my rank is %d, and Ireceived:" % comm_rank)
print data_recv

µÚÒ»¸ö½ø³ÌÒ»¿ªÊ¼¾Í·¢ËÍÊý¾Ý£¬ÆäËû½ø³ÌÒ»¿ªÊ¼¶¼ÊÇÔڵȴý½ÓÊÕÊý¾Ý£¬Õâʱºò½ø³Ì1½ÓÊÕÁ˽ø³Ì0µÄÊý¾Ý£¬È»ºó·¢Ëͽø³Ì1µÄÊý¾Ý£¬½ø³Ì2½ÓÊÕÁË£¬ÔÙ·¢Ëͽø³Ì2µÄÊý¾Ý¡­¡­ÖªµÀ×îºó½ø³Ì0½ÓÊÕ×îºóÒ»¸ö½ø³ÌµÄÊý¾Ý£¬´Ó¶ø±ÜÃâÁËÉÏÊöÎÊÌâ¡£

Ò»¸ö±È½Ï³£Óõķ½·¨ÊÇ·âÒ»¸ö×鳤£¬Ò²¾ÍÊÇÒ»¸öÖ÷½ø³Ì£¬Ò»°ãÊǽø³Ì0×÷ΪÖ÷½ø³Ìleader¡£Ö÷½ø³Ì½«Êý¾Ý·¢Ë͸øÆäËûµÄ½ø³Ì£¬ÆäËûµÄ½ø³Ì´¦ÀíÊý¾Ý£¬È»ºó·µ»Ø½á¹û¸ø½ø³Ì0¡£»»¾ä»°Ëµ£¬¾ÍÊǽø³Ì0À´¿ØÖÆÕû¸öÊý¾Ý´¦ÀíÁ÷³Ì¡£

2.3¡¢ÈºÌåͨÐÅ

µã¶ÔµãͨÐÅÊÇA·¢Ë͸øB£¬Ò»¸öÈ˽«×Ô¼ºµÄÃØÃܸæËßÁíÒ»¸öÈË£¬ÈºÌåͨÐÅ£¨Collective Communications£©ÏñÊÇÄøö´óÀ®°È£¬Ò»´ÎÐÔ¸æËßËùÓеÄÈË¡£Ç°ÕßÊÇÒ»¶ÔÒ»£¬ºóÕßÊÇÒ»¶Ô¶à¡£µ«ÊÇ£¬ÈºÌåͨÐÅÊÇÒÔ¸üÓÐЧµÄ·½Ê½¹¤×÷µÄ¡£ËüµÄÔ­Ôò¾ÍÒ»¸ö£º¾¡Á¿°ÑËùÓеĽø³ÌÔÚËùÓеÄʱ¿Ì¶¼Ê¹ÓÃÉÏ£¡ÎÒÃÇÔÚÏÂÃæµÄbcastС½Ú½²Êö¡£

ȺÌåͨÐÅ»¹ÊÇ·¢ËͺͽÓÊÕÁ½À࣬һ¸öÊÇÒ»´ÎÐÔ°ÑÊý¾Ý·¢¸øËùÓÐÈË£¬ÁíÒ»¸öÊÇÒ»´ÎÐÔ´ÓËùÓÐÈËÄÇÀï»ØÊÕ½á¹û¡£

1£©¹ã²¥bcast

½«Ò»·ÝÊý¾Ý·¢Ë͸øËùÓеĽø³Ì¡£ÀýÈçÎÒÓÐ200·ÝÊý¾Ý£¬ÓÐ10¸ö½ø³Ì£¬ÄÇôÿ¸ö½ø³Ì¶¼»áµÃµ½Õâ200·ÝÊý¾Ý¡£

import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = range(comm_size)
data = comm.bcast(data if comm_rank == 0else None, root=0)
print 'rank %d, got:' % (comm_rank)
print data

½á¹ûÈçÏ£º

rank 0, got:
[0, 1, 2, 3, 4]
rank 1, got:
[0, 1, 2, 3, 4]
rank 2, got:
[0, 1, 2, 3, 4]
rank 3, got:
[0, 1, 2, 3, 4]
rank 4, got:
[0, 1, 2, 3, 4]

Root½ø³Ì×Ô¼º½¨ÁËÒ»¸öÁÐ±í£¬È»ºó¹ã²¥¸øËùÓеĽø³Ì¡£ÕâÑùËùÓеĽø³Ì¶¼ÓµÓÐÁËÕâ¸öÁÐ±í¡£È»ºó°®¸ÉÂï¾Í¸ÉÂïÁË¡£

¶Ô¹ã²¥×îÖ±¹ÛµÄ¹ÛµãÊÇij¸öÌØ¶¨½ø³Ì½«Êý¾ÝÒ»Ò»·¢Ë͸øÃ¿¸ö½ø³Ì¡£¼ÙÉèÓÐn¸ö½ø³Ì£¬ÄÇô¼ÙÉèÎÒÃǵÄÊý¾ÝÔÚ0½ø³Ì£¬ÄÇô0½ø³Ì¾ÍÐèÒª½«Êý¾Ý·¢Ë͸øÊ£ÏµÄn-1¸ö½ø³Ì£¬ÕâÊǷdz£µÍЧµÄ£¬¸´ÔÓ¶ÈÊÇO(n)¡£ÄÇÓÐûÓиßЧµÄ·½Ê½£¿Ò»¸ö×î³£ÓÃÒ²ÊǷdz£¸ßЧµÄÊÖ¶ÎÊǹæÔ¼Ê÷¹ã²¥£ºÊÕµ½¹ã²¥Êý¾ÝµÄËùÓнø³Ì¶¼²ÎÓëµ½Êý¾Ý¹ã²¥µÄ¹ý³ÌÖС£Ê×ÏÈÖ»ÓÐÒ»¸ö½ø³ÌÓÐÊý¾Ý£¬È»ºóËü°ÑËü·¢Ë͸øµÚÒ»¸ö½ø³Ì£¬´ËʱÓÐÁ½¸ö½ø³ÌÓÐÊý¾Ý£»È»ºóÕâÁ½¸ö½ø³Ì¶¼²ÎÓëµ½ÏÂÒ»´ÎµÄ¹ã²¥ÖУ¬Õâʱ¾Í»áÓÐ4¸ö½ø³ÌÓÐÊý¾Ý£¬¡­¡­£¬ÒÔ´ËÀàÍÆ£¬Ã¿´Î¶¼»áÓÐ2µÄ´Î·½¸ö½ø³ÌÓÐÊý¾Ý¡£Í¨¹ýÕâÖÖ¹æÔ¼Ê÷µÄ¹ã²¥·½·¨£¬¹ã²¥µÄ¸´ÔӶȽµÎªO(log n)¡£Õâ¾ÍÊÇÉÏÃæËµµÄȺÌåͨÐŵĸßЧԭÔò£º³ä·ÖÀûÓÃËùÓеĽø³ÌÀ´ÊµÏÖÊý¾ÝµÄ·¢ËͺͽÓÊÕ¡£

2£©É¢²¥scatter

½«Ò»·ÝÊý¾Ýƽ·Ö¸øËùÓеĽø³Ì¡£ÀýÈçÎÒÓÐ200·ÝÊý¾Ý£¬ÓÐ10¸ö½ø³Ì£¬ÄÇôÿ¸ö½ø³Ì»á·Ö±ðµÃµ½20·ÝÊý¾Ý¡£

import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = range(comm_size)
print data
else:
data = None
local_data = comm.scatter(data, root=0)
print 'rank %d, got:' % comm_rank
print local_data

½á¹ûÈçÏ£º

[0, 1, 2, 3, 4]
rank 0, got:
0
rank 1, got:
1
rank 2, got:
2
rank 3, got:
3
rank 4, got:
4

ÕâÀïroot½ø³Ì´´½¨ÁËÒ»¸ölist£¬È»ºó½«ËüÉ¢²¥¸øËùÓеĽø³Ì£¬Ï൱ÓÚ¶ÔÕâ¸ölist×öÁË»®·Ö£¬Ã¿¸ö½ø³Ì»ñµÃµÈ·ÖµÄÊý¾Ý£¬ÕâÀï¾ÍÊÇlistµÄÿһ¸öÊý¡££¨Ö÷Òª¸ù¾ÝlistµÄË÷ÒýÀ´»®·Ö£¬listË÷ÒýΪµÚi·ÝµÄÊý¾Ý¾Í·¢Ë͸øµÚi¸ö½ø³Ì£©¡£Èç¹ûÊǾØÕó£¬ÄÇô¾ÍµÈ·ÖµÄ»®·ÖÐУ¬Ã¿¸ö½ø³Ì»ñµÃÏàͬµÄÐÐÊý½øÐд¦Àí¡£

ÐèҪעÒâµÄÊÇ£¬MPIµÄ¹¤×÷·½Ê½ÊÇÿ¸ö½ø³Ì¶¼»áÖ´ÐÐËùÓеĴúÂ룬ËùÒÔÿ¸ö½ø³Ì¶¼»áÖ´ÐÐscatterÕâ¸öÖ¸Áµ«Ö»ÓÐrootÖ´ÐÐËüµÄʱºò£¬Ëü²Å¼æ±¸·¢ËÍÕߺͽÓÊÕÕßµÄÉí·Ý£¨rootÒ²»áµÃµ½ÊôÓÚ×Ô¼ºµÄÊý¾Ý£©£¬¶ÔÓÚÆäËû½ø³ÌÀ´Ëµ£¬ËûÃǶ¼Ö»ÊǽÓÊÕÕß¶øÒÑ¡£

3£©ÊÕ¼¯gather

ÄÇÓз¢ËÍ£¬¾ÍÓÐÒ»Æð»ØÊյĺ¯Êý¡£GatherÊǽ«ËùÓнø³ÌµÄÊý¾ÝÊÕ¼¯»ØÀ´£¬ºÏ²¢³ÉÒ»¸öÁÐ±í¡£ÏÂÃæÁªºÏscatterºÍgather×é³ÉÒ»¸öÍê³ÉµÄ·Ö·¢ºÍÊջعý³Ì£º

import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()
if comm_rank == 0:
data = range(comm_size)
print data
else:
data = None
local_data = comm.scatter(data, root=0)
local_data = local_data * 2
print 'rank %d, got and do:' % comm_rank
print local_data
combine_data = comm.gather(local_data,root=0)
if comm_rank == 0:
printcombine_data

½á¹ûÈçÏ£º

[0, 1, 2, 3, 4]
rank 0, got and do:
0
rank 1, got and do:
2
rank 2, got and do:
4
rank 4, got and do:
8
rank 3, got and do:
6
[0, 2, 4, 6, 8]

Root½ø³Ì½«Êý¾Ýͨ¹ýscatterµÈ·Ö·¢¸øËùÓеĽø³Ì£¬µÈ´ýËùÓеĽø³Ì¶¼´¦ÀíÍêºó£¨ÕâÀïÖ»ÊǼòµ¥µÄ³ËÒÔ2£©£¬root½ø³ÌÔÙͨ¹ýgather»ØÊÕËûÃǵĽá¹û£¬ºÍ·Ö·¢µÄÔ­ÔòÒ»Ñù£¬×é³ÉÒ»¸ölist¡£Gather»¹ÓÐÒ»¸ö±äÌå¾ÍÊÇallgather£¬¿ÉÒÔÀí½âΪËüÔÚgatherµÄ»ù´¡ÉϽ«gatherµÄ½á¹ûÔÙbcastÁËÒ»´Î¡£É¶Òâ˼£¿Òâ˼ÊÇroot½ø³Ì½«ËùÓнø³ÌµÄ½á¹û¶¼»ØÊÕͳ¼ÆÍêºó£¬ÔÙ°ÑÕû¸öͳ¼Æ½á¹û¸æËß´ó¼Ò¡£ÕâÑù£¬²»½öroot¿ÉÒÔ·ÃÎÊcombine_data£¬ËùÓеĽø³Ì¶¼¿ÉÒÔ·ÃÎÊcombine_dataÁË¡£

4£©¹æÔ¼reduce

¹æÔ¼ÊÇÖ¸²»µ«½«ËùÓеÄÊý¾ÝÊÕ¼¯»ØÀ´£¬ÊÕ¼¯»ØÀ´µÄ¹ý³ÌÖл¹½øÐÐÁ˼òµ¥µÄ¼ÆË㣬ÀýÈçÇóºÍ£¬Çó×î´óÖµµÈµÈ¡£ÎªÊ²Ã´ÒªÓÐÕâ¸öÄØ£¿ÎÒÃDz»ÊÇ¿ÉÒÔÖ±½ÓÓÃgatherÈ«²¿ÊÕ¼¯»ØÀ´ÁË£¬ÔÙ¶ÔÁбíÇó¸ösum»òÕßmax¾Í¿ÉÒÔÁËÂð£¿ÕâÑù²»ÊÇÀÛËÀ×鳤Âð£¿ÎªÊ²Ã´²»³ä·ÖʹÓÃÿ¸ö¹¤ÈËÄØ£¿¹æÔ¼Êµ¼ÊÉÏÊÇʹÓùæÔ¼Ê÷À´ÊµÏֵġ£ÀýÈçÇómax£¬Íê³É¿ÉÒÔÈù¤ÈËÁ½Á½pkºó£¬ÔÙ·µ»ØÁ½Á½pkµÄ×î´óÖµ£¬È»ºóÔÙ¶ÔµÚ¶þ²ãµÄ×î´óÖµÁ½Á½pk£¬Ö±µ½·µ»ØÒ»¸ö×îÖÕµÄmax¸ø×鳤¡£×鳤¾Í·Ç³£´ÏÃ÷µÄ½«¹¤×÷·ÖÅäϹ¤È˸ßЧµÄÍê³ÉÁË¡£ÕâÊÇO(n)µÄ¸´ÔÓ¶È£¬Ï½µµ½O(log n)£¨µ×ÊýΪ2£©µÄ¸´ÔÓ¶È¡£

import mpi4py.MPI as MPI
comm = MPI.COMM_WORLD
comm_rank = comm.Get_rank ()
comm_size = comm.Get_size ()
if comm_rank == 0:
data = range(comm_size)
print data
else:
data = None
local_data = comm.scatter (data, root=0)
local_data = local_data * 2
print 'rank %d, got and do:' % comm _rank
print local _data
all_sum = comm.reduce (local_data, root = 0 ,op = MPI.SUM )
if comm_rank == 0:
print 'sumis:%d' % all _ sum

½á¹ûÈçÏ£º

[0, 1, 2, 3, 4]
rank 0, got and do:
0
rank 1, got and do:
2
rank 2, got and do:
4
rank 3, got and do:
6
rank 4, got and do:
8
sum is:20

¿ÉÒÔ¿´µ½£¬×îºó¿ÉÒԵõ½Ò»¸ösumÖµ¡£

Èý¡¢³£¼ûÓ÷¨

3.1¡¢¶ÔÒ»¸öÎļþµÄ¶à¸öÐв¢Ðд¦Àí

#!usr/bin/env python
#-*- coding: utf-8 -*-
import sys
import os
import mpi4py.MPI as MPI
import numpy as np
#
# Global variables for MPI
#
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm _ size = comm.Get_size()
if __name__ == '__main__':
if comm_rank == 0:
sys.stderr.write("processor root starts reading data...\n")
all_lines = sys.stdin.readlines()
all_lines = comm.bcast(all _lines if comm _ rank == 0 else None, root = 0)
num_lines = len (all_lines)
local_lines_offset = np.linspace( 0, num_lines, comm_size +1) .astype ('int')
local_lines = all_lines [local _lines _offset [comm_rank] : local_lines_offset[comm_rank + 1]]
sys.stderr.write ("%d/%d processor gets %d/%d data \n" % (comm_rank, comm_size, len (local_ lines ) , num_lines))
cnt = 0
for line in local_lines:
fields = line.strip().split('\t')
cnt += 1
if cnt % 100 == 0:
sys.stderr.write ("processor %d has processed %d /%d lines \n" % (comm_rank, cnt, len (local_ lines ) ) )
output = line.strip () + ' process every line here'
print output

3.2¡¢¶Ô¶à¸öÎļþ²¢Ðд¦Àí

Èç¹ûÎÒÃǵÄÎļþÌ«´ó£¬ÀýÈ缸ǧÍòÐУ¬ÄÇômpiÊÇû°ì·¨½«Õâô´óµÄÊý¾Ýbcast¸øËùÓеĽø³ÌµÄ£¬ËùÒÔÎÒÃÇ¿ÉÒÔÏȰѴóµÄÎļþsplit³ÉСµÄÎļþ£¬ÔÙÈÃÿ¸ö½ø³Ì´¦ÀíÉÙÊýµÄÎļþ¡£

#!usr/bin/env python
#-*- coding: utf-8 -*-
import sys
import os
import mpi4py.MPI as MPI
import numpy as np
#
# Global variables for MPI
#
# instance for invoking MPI relatedfunctions
comm = MPI.COMM _ WORLD
# the node rank in the whole community
comm_rank = comm.Get_ rank()
# the size of the whole community, i.e. ,the total number of working nodes in the MPI cluster
comm _ size = comm.Get_size ()
if __ name__ == '__main__':
if len(sys.argv) != 2:
sys .stderr.write ("Usage: python *.py directoty _ with _files\n" )
sys.exit(1)
path = sys.argv[1]
if comm_rank == 0:
file_list = os.listdir (path)
sys.stderr.write ("%d files\n" % len(file_list))
file_list = comm.bcast(file_list if comm_rank == 0 else None , root = 0)
num_files = len (file_list)
local_files_offset = np.linspace (0, num_files, comm _ size +1).astype('int')
local_files = file_ list [local_ files_ offset [ comm_ rank] :local_files_offset[comm_rank + 1]]
sys.stderr.write("%d/%d processor gets %d /%d data \n" % (comm_rank, comm_size, len (local_ files ) , num_files) )
cnt = 0
for file_name in local_files:
hd = open(os.path.join(path, file_name))
for line in hd:
output = line.strip() + ' process every line here'
print output
cnt += 1
sys.stderr.write ("processor %d has processed %d / %d files \n" % (comm_rank, cnt, len (local _ files )))
hd.close ()

3.3¡¢ÁªºÏnumpy¶Ô¾ØÕóµÄ¶à¸öÐлòÕß¶àÁв¢Ðд¦Àí

Mpi4pyÒ»¸ö·Ç³£ÓÅÐãµÄÌØÐÔÊÇÍêÃÀÖ§³Önumpy£¡

import os, sys, time
import numpy as np
import mpi4py.MPI as MPI
#
# Global variables for MPI
#
# instance for invoking MPI relatedfunctions
comm = MPI.COMM_WORLD
# the node rank in the whole community
comm_rank = comm.Get_rank ()
# the size of the whole community, i.e.,the total number of working nodes in the MPI cluster
comm _size = comm.Get_size()
# test MPI
if __name__ == "__main__":
#create a matrix
if comm_rank == 0:
all_data = np.arange(20).reshape(4, 5)
print "************ data ******************"
print all_data
#broadcast the data to all processors
all_data = comm.bcast (all_data if comm_rank == 0 else None, root = 0)
#divide the data to each processor
num _ samples = all_data.shape [0]
local_data_offset = np.linspace (0, num_samples, comm _size + 1).astype('int')
#get the local data which will be processed in this processor
local_data = all_data [local_ data_ offset [comm _ rank] :local_data_ offset[comm_rank + 1]]
print "****** %d /%d processor gets local data **** " % (comm_rank, comm_size)
print local _ data
#reduce to get sum of elements
local_sum = local_data.sum()
all_sum = comm.reduce (local_sum, root = 0, op = MPI.SUM)
#process in local
local_result = local_data ** 2
#gather the result from all processors and broadcast it
result = comm.allgather (local_result)
result = np.vstack (result)
if comm_rank == 0:
print "*** sum: ", all_sum
print "************ result ******************"
print result

ËÄ¡¢MPIºÍmpi4pyµÄ»·¾³´î½¨

ÕâÕ·ŵ½ÕâÀïÊÇ×÷Ϊһ¸ö¸½Â¼¡£ÎÒÃǵĻ·¾³ÊÇlinux£¬ÐèÒª°²×°µÄ°üÓÐpython¡¢openmpi¡¢numpy¡¢cpythonºÍmpi4py£¬¹ý³ÌÈçÏ£º

4.1¡¢°²×°Python

#tar xzvf Python-2.7.tgz
#cd Python-2.7
#./configure--prefix = /home/work /vis / zouxiaoyi / my_ tools
#make
#make install

ÏȽ«Python·Åµ½»·¾³±äÁ¿ÀïÃæ£¬»¹ÓÐPythonµÄ²å¼þ¿â

exportPATH = /home /work /vis /zouxiaoyi /my_tools/bin :$PATH
exportPYTHONPATH = /home /work /vis /zouxiaoyi /my_tools /lib /python2.7 /site-packages : $PYTHONPATH

Ö´ÐÐ#python£¬Èç¹û¿´µ½¿É°®µÄ>>>³öÀ´£¬¾Í±íʾ³É¹¦ÁË¡£°´crtl+dÍ˳ö

4.2¡¢°²×°openmpi

#wget http://www.open-mpi.org /software /ompi /v1.4 /downloads /openmpi - 1.4.1.tar.gz
#tar xzvf openmpi- 1.4.1.tar.gz
#cd openmpi - 1.4.1
#./configure--prefix= /home /work/ vis /zouxiaoyi /my_tools
#make - j 8
#make install

È»ºó°Ñbin·¾¶¼Óµ½»·¾³±äÁ¿ÀïÃæ£º

exportPATH = /home /work /vis /zouxiaoyi /my_ tools /bin: $PATH
exportLD_ LIBRARY_ PATH= /home /work/ vis /zouxiaoyi /my_ tools /lib:$LD_LIBRARY _PATH

Ö´ÐÐ#mpirun£¬Èç¹ûÓаïÖúÐÅÏ¢´òÓ¡³öÀ´£¬¾Í±íʾ°²×°ºÃÁË¡£ÐèҪעÒâµÄÊÇ£¬ÎÒ°²×°Á˼¸¸ö°æ±¾¶¼Ã»Óгɹ¦£¬×îºó°²×°ÁË1.4.1Õâ¸ö°æ±¾²ÅÄܳɹ¦£¬Òò´Ë¾Í¿´ÄãµÄÈËÆ·ÁË¡£

4.3¡¢°²×°numpyºÍCython

°²×°python¿âµÄ·½·¨¿ÉÒԲο¼Ö®Ç°µÄ²©¿Í¡£¹ý³ÌÒ»°ãÈçÏ£º

#tar ¨Cxgvf Cython-0.20.2.tar.gz
#cd Cython-0.20.2
#python setup.py install

´ò¿ªPython£¬import Cython£¬Èç¹ûûÓб¨´í£¬¾Í±íʾ°²×°³É¹¦ÁË

4.4¡¢°²×°mpi4py

#tar ¨Cxgvf mpi4py_1.3.1.tar.gz
#cd mpi4py
#vi mpi.cfg

ÔÚ68ÐУ¬[openmpi]ÏÂÃæ£¬½«¸Õ²ÅÒѾ­°²×°ºÃµÄopenmpiµÄĿ¼¸ø¸ÄÉÏ¡£

mpi_dir = /home/work /vis /zouxiaoyi /my_tools
#python setup.py install

´ò¿ªPython£¬import mpi4py as MPI£¬Èç¹ûûÓб¨´í£¬¾Í±íʾ°²×°³É¹¦ÁË

ÏÂÃæ¾Í¿ÉÒÔ¿ªÊ¼ÊôÓÚÄãµÄ²¢ÐÐÖ®ÂÃÁË£¬Ó¸Ò̽Ë÷¶àºËµÄÀÖȤ°É¡£

   
4644 ´Îä¯ÀÀ       30
Ïà¹ØÎÄÕÂ

ÊÖ»úÈí¼þ²âÊÔÓÃÀýÉè¼ÆÊµ¼ù
ÊÖ»ú¿Í»§¶ËUI²âÊÔ·ÖÎö
iPhoneÏûÏ¢ÍÆËÍ»úÖÆÊµÏÖÓë̽ÌÖ
AndroidÊÖ»ú¿ª·¢£¨Ò»£©
Ïà¹ØÎĵµ

Android_UI¹Ù·½Éè¼Æ½Ì³Ì
ÊÖ»ú¿ª·¢Æ½Ì¨½éÉÜ
androidÅÄÕÕ¼°ÉÏ´«¹¦ÄÜ
Android½²ÒåÖÇÄÜÊÖ»ú¿ª·¢
Ïà¹Ø¿Î³Ì

Android¸ß¼¶Òƶ¯Ó¦ÓóÌÐò
Androidϵͳ¿ª·¢
AndroidÓ¦Óÿª·¢
ÊÖ»úÈí¼þ²âÊÔ