| ±à¼ÍƼö: |
| ±¾ÎÄÀ´×ÔÓÚcnblogs,ÎÄÕÂÖ÷Òª½éÉÜÁËthreadingÄ£¿é¡¢¿ªÆôÏ̵߳ÄÁ½ÖÖ·½Ê½¡¢Í¬²½ËøÒÔ¼°Python±ê׼ģ¿é--concurrent.futuresµÈÏà¹ØÖªÊ¶¡£ |
|
Ò» threadingÄ£¿é½éÉÜ
multiprocessÄ£¿éµÄÍêȫģ·ÂÁËthreadingÄ£¿éµÄ½Ó¿Ú£¬¶þÕßÔÚʹÓòãÃæ£¬ÓкܴóµÄÏàËÆÐÔ£¬Òò¶ø²»ÔÙÏêϸ½éÉÜ
¹ÙÍøÁ´½Ó£ºhttps://docs.python.org/3/library/threading.html?highlight=threading#
¶þ ¿ªÆôÏ̵߳ÄÁ½ÖÖ·½Ê½
#·½Ê½Ò»
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
print('Ö÷Ïß³Ì')
·½Ê½Ò»
|
#·½Ê½¶þ
from threading import Thread
import time
class Sayhi(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
time.sleep(2)
print('%s say hello' % self.name)
if __name__ == '__main__':
t = Sayhi('egon')
t.start()
print('Ö÷Ïß³Ì')
·½Ê½¶þ |
Èý ÔÚÒ»¸ö½ø³ÌÏ¿ªÆô¶à¸öÏß³ÌÓëÔÚÒ»¸ö½ø³ÌÏ¿ªÆô¶à¸ö×Ó½ø³ÌµÄÇø±ð
from threading
import Thread
from multiprocessing import Process
import os
def work():
print('hello')
if __name__ == '__main__':
#ÔÚÖ÷½ø³ÌÏ¿ªÆôÏß³Ì
t=Thread(target=work)
t.start()
print('Ö÷Ïß³Ì/Ö÷½ø³Ì')
'''
´òÓ¡½á¹û:
hello
Ö÷Ïß³Ì/Ö÷½ø³Ì
'''
#ÔÚÖ÷½ø³ÌÏ¿ªÆô×Ó½ø³Ì
t=Process(target=work)
t.start()
print('Ö÷Ïß³Ì/Ö÷½ø³Ì')
'''
´òÓ¡½á¹û:
Ö÷Ïß³Ì/Ö÷½ø³Ì
hello
'''
˵ĿªÆôËÙ¶È¿ì |
from threading
import Thread
from multiprocessing import Process
import os
def work():
print('hello',os.getpid())
if __name__ == '__main__':
#part1:ÔÚÖ÷½ø³ÌÏ¿ªÆô¶à¸öÏß³Ì,ÿ¸öÏ̶߳¼¸úÖ÷½ø³ÌµÄpidÒ»Ñù
t1=Thread(target=work)
t2=Thread(target=work)
t1.start()
t2.start()
print('Ö÷Ïß³Ì/Ö÷½ø³Ìpid',os.getpid())
#part2:¿ª¶à¸ö½ø³Ì,ÿ¸ö½ø³Ì¶¼Óв»Í¬µÄpid
p1=Process(target=work)
p2=Process(target=work)
p1.start()
p2.start()
print('Ö÷Ïß³Ì/Ö÷½ø³Ìpid',os.getpid())
³òÒ»³òpid |
from threading
import Thread
from multiprocessing import Process
import os
def work():
global n
n=0
if __name__ == '__main__':
# n=100
# p=Process(target=work)
# p.start()
# p.join()
# print('Ö÷',n) #ºÁÎÞÒÉÎÊ×Ó½ø³ÌpÒѾ½«×Ô¼ºµÄÈ«¾ÖµÄn¸Ä³ÉÁË0,µ«¸ÄµÄ½ö½öÊÇËü×Ô¼ºµÄ,²é¿´¸¸½ø³ÌµÄnÈÔȻΪ100
n=1
t=Thread(target=work)
t.start()
t.join()
print('Ö÷',n) #²é¿´½á¹ûΪ0,ÒòΪͬһ½ø³ÌÄÚµÄÏß³ÌÖ®¼ä¹²Ïí½ø³ÌÄÚµÄÊý¾Ý
ͬһ½ø³ÌÄÚµÄÏ̹߳²Ïí¸Ã½ø³ÌµÄÊý¾Ý£¿ |
ËÄ Á·Ï°
Á·Ï°Ò»£º
#_*_coding:utf-8_*_
#!/usr/bin/env python
import multiprocessing
import threading
import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)
def action(conn):
while True:
data=conn.recv(1024)
print(data)
conn.send(data.upper())
if __name__ == '__main__':
while True:
conn,addr=s.accept()
p=threading.Thread(target=action,args=(conn,))
p.start()
¶àÏ̲߳¢·¢µÄsocket·þÎñ¶Ë |
#_*_coding:utf-8_*_
#!/usr/bin/env python
import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
s.send(msg.encode('utf-8'))
data=s.recv(1024)
print(data)
¿Í»§¶Ë |
Á·Ï°¶þ£ºÈý¸öÈÎÎñ£¬Ò»¸ö½ÓÊÕÓû§ÊäÈ룬һ¸ö½«Óû§ÊäÈëµÄÄÚÈݸñʽ»¯³É´óд£¬Ò»¸ö½«¸ñʽ»¯ºóµÄ½á¹û´æÈëÎļþ
from threading
import Thread
msg_l=[]
format_l=[]
def talk():
while True:
msg=input('>>: ').strip()
if not msg:continue
msg_l.append(msg)
def format_msg():
while True:
if msg_l:
res=msg_l.pop()
format_l.append(res.upper())
def save():
while True:
if format_l:
with open('db.txt','a',encoding='utf-8') as
f:
res=format_l.pop()
f.write('%s\n' %res)
if __name__ == '__main__':
t1=Thread(target=talk)
t2=Thread(target=format_msg)
t3=Thread(target=save)
t1.start()
t2.start()
t3.start() |
Îå Ïß³ÌÏà¹ØµÄÆäËû·½·¨
ThreadʵÀý¶ÔÏóµÄ·½·¨
# isAlive(): ·µ»ØÏß³ÌÊÇ·ñ»î¶¯µÄ¡£
# getName(): ·µ»ØÏß³ÌÃû¡£
# setName(): ÉèÖÃÏß³ÌÃû¡£
threadingÄ£¿éÌṩµÄһЩ·½·¨£º
# threading.currentThread(): ·µ»Øµ±Ç°µÄÏ̱߳äÁ¿¡£
# threading.enumerate(): ·µ»ØÒ»¸ö°üº¬ÕýÔÚÔËÐеÄÏ̵߳Älist¡£ÕýÔÚÔËÐÐÖ¸Ïß³ÌÆô¶¯ºó¡¢½áÊøÇ°£¬²»°üÀ¨Æô¶¯Ç°ºÍÖÕÖ¹ºóµÄÏ̡߳£
# threading.activeCount(): ·µ»ØÕýÔÚÔËÐеÄÏß³ÌÊýÁ¿£¬Óëlen(threading.enumerate())ÓÐÏàͬµÄ½á¹û¡£ |
from threading
import Thread
import threading
from multiprocessing import Process
import os
def work():
import time
time.sleep(3)
print(threading.current_thread().getName())
if __name__ == '__main__':
#ÔÚÖ÷½ø³ÌÏ¿ªÆôÏß³Ì
t=Thread(target=work)
t.start()
print(threading.current_thread().getName())
print(threading.current_thread()) #Ö÷Ïß³Ì
print(threading.enumerate()) #Á¬Í¬Ö÷Ïß³ÌÔÚÄÚÓÐÁ½¸öÔËÐеÄÏß³Ì
print(threading.active_count())
print('Ö÷Ïß³Ì/Ö÷½ø³Ì')
'''
´òÓ¡½á¹û:
MainThread
<_MainThread(MainThread, started 140735268892672)>
[<_MainThread(MainThread, started 140735268892672)>,
<Thread(Thread-1, started 123145307557888)>]
Ö÷Ïß³Ì/Ö÷½ø³Ì
Thread-1
''' |
Ö÷Ï̵߳ȴý×ÓÏ߳̽áÊø
from threading
import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
t.join()
print('Ö÷Ïß³Ì')
print(t.is_alive())
'''
egon say hello
Ö÷Ïß³Ì
False
'' |
Áù ÊØ»¤Ïß³Ì
ÎÞÂÛÊǽø³Ì»¹ÊÇỊ̈߳¬¶¼×ñÑ£ºÊØ»¤xxx»áµÈ´ýÖ÷xxxÔËÐÐÍê±Ïºó±»Ïú»Ù
ÐèҪǿµ÷µÄÊÇ£ºÔËÐÐÍê±Ï²¢·ÇÖÕÖ¹ÔËÐÐ
#1.¶ÔÖ÷½ø³ÌÀ´Ëµ£¬ÔËÐÐÍê±ÏÖ¸µÄÊÇÖ÷½ø³Ì´úÂëÔËÐÐÍê±Ï
#2.¶ÔÖ÷Ïß³ÌÀ´Ëµ£¬ÔËÐÐÍê±ÏÖ¸µÄÊÇÖ÷Ïß³ÌËùÔڵĽø³ÌÄÚËùÓзÇÊØ»¤Ïß³ÌͳͳÔËÐÐÍê±Ï£¬Ö÷Ï̲߳ÅËãÔËÐÐÍê±Ï
Ïêϸ½âÊÍ£º
#1 Ö÷½ø³ÌÔÚÆä´úÂë½áÊøºó¾ÍÒѾËãÔËÐÐÍê±ÏÁË£¨ÊØ»¤½ø³ÌÔÚ´Ëʱ¾Í±»»ØÊÕ£©,È»ºóÖ÷½ø³Ì»áÒ»Ö±µÈ·ÇÊØ»¤µÄ×Ó½ø³Ì¶¼ÔËÐÐÍê±Ïºó»ØÊÕ×Ó½ø³ÌµÄ×ÊÔ´(·ñÔò»á²úÉú½©Ê¬½ø³Ì)£¬²Å»á½áÊø£¬
#2 Ö÷Ïß³ÌÔÚÆäËû·ÇÊØ»¤Ïß³ÌÔËÐÐÍê±Ïºó²ÅËãÔËÐÐÍê±Ï£¨ÊØ»¤Ïß³ÌÔÚ´Ëʱ¾Í±»»ØÊÕ£©¡£ÒòΪÖ÷Ï̵߳ĽáÊøÒâζ׎ø³ÌµÄ½áÊø£¬½ø³ÌÕûÌåµÄ×ÊÔ´¶¼½«±»»ØÊÕ£¬¶ø½ø³Ì±ØÐë±£Ö¤·ÇÊØ»¤Ï̶߳¼ÔËÐÐÍê±Ïºó²ÅÄܽáÊø¡£
from threading
import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.setDaemon(True) #±ØÐëÔÚt.start()֮ǰÉèÖÃ
t.start()
print('Ö÷Ïß³Ì')
print(t.is_alive())
'''
Ö÷Ïß³Ì
True
''' |
from threading
import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
t1=Thread(target=foo)
t2=Thread(target=bar)
t1.daemon=True
t1.start()
t2.start()
print("main-------")
ÃÔ»óÈ˵ÄÀý×Ó |
Æß ͬ²½Ëø
Èý¸öÐèҪעÒâµÄµã£º
#1.Ïß³ÌÇÀµÄÊÇGILËø£¬GILËøÏ൱ÓÚÖ´ÐÐȨÏÞ£¬Äõ½Ö´ÐÐȨÏÞºó²ÅÄÜÄõ½»¥³âËøLock£¬ÆäËûÏß³ÌÒ²¿ÉÒÔÇÀµ½GIL£¬µ«Èç¹û·¢ÏÖLockÈÔȻûÓб»ÊÍ·ÅÔò×èÈû£¬¼´±ãÊÇÄõ½Ö´ÐÐȨÏÞGILÒ²ÒªÁ¢¿Ì½»³öÀ´
#2.joinÊǵȴýËùÓУ¬¼´ÕûÌå´®ÐУ¬¶øËøÖ»ÊÇËø×¡Ð޸Ĺ²ÏíÊý¾ÝµÄ²¿·Ö£¬¼´²¿·Ö´®ÐУ¬ÒªÏë±£Ö¤Êý¾Ý°²È«µÄ¸ù±¾ÔÀíÔÚÓÚÈò¢·¢±ä³É´®ÐУ¬joinÓ뻥³âËø¶¼¿ÉÒÔʵÏÖ£¬ºÁÎÞÒÉÎÊ£¬»¥³âËøµÄ²¿·Ö´®ÐÐЧÂÊÒª¸ü¸ß
#3. Ò»¶¨Òª¿´±¾Ð¡½Ú×îºóµÄGILÓ뻥³âËøµÄ¾µä·ÖÎö
GIL VS Lock
»úÖǵÄͬѧ¿ÉÄÜ»áÎʵ½Õâ¸öÎÊÌ⣬¾ÍÊǼÈÈ»Äã֮ǰ˵¹ýÁË£¬PythonÒѾÓÐÒ»¸öGILÀ´±£Ö¤Í¬Ò»Ê±¼äÖ»ÄÜÓÐÒ»¸öÏß³ÌÀ´Ö´ÐÐÁË£¬ÎªÊ²Ã´ÕâÀﻹÐèÒªlock?
Ê×ÏÈÎÒÃÇÐèÒª´ï³É¹²Ê¶£ºËøµÄÄ¿µÄÊÇΪÁ˱£»¤¹²ÏíµÄÊý¾Ý£¬Í¬Ò»Ê±¼äÖ»ÄÜÓÐÒ»¸öÏß³ÌÀ´Ð޸Ĺ²ÏíµÄÊý¾Ý
È»ºó£¬ÎÒÃÇ¿ÉÒԵóö½áÂÛ£º±£»¤²»Í¬µÄÊý¾Ý¾ÍÓ¦¸Ã¼Ó²»Í¬µÄËø¡£
×îºó£¬ÎÊÌâ¾ÍºÜÃ÷ÀÊÁË£¬GIL ÓëLockÊÇÁ½°ÑËø£¬±£»¤µÄÊý¾Ý²»Ò»Ñù£¬Ç°ÕßÊǽâÊÍÆ÷¼¶±ðµÄ£¨µ±È»±£»¤µÄ¾ÍÊǽâÊÍÆ÷¼¶±ðµÄÊý¾Ý£¬±ÈÈçÀ¬»ø»ØÊÕµÄÊý¾Ý£©£¬ºóÕßÊDZ£»¤Óû§×Ô¼º¿ª·¢µÄÓ¦ÓóÌÐòµÄÊý¾Ý£¬ºÜÃ÷ÏÔGIL²»¸ºÔðÕâ¼þÊ£¬Ö»ÄÜÓû§×Ô¶¨Òå¼ÓËø´¦Àí£¬¼´Lock
¹ý³Ì·ÖÎö£ºËùÓÐÏß³ÌÇÀµÄÊÇGILËø£¬»òÕß˵ËùÓÐÏß³ÌÇÀµÄÊÇÖ´ÐÐȨÏÞ
Ïß³Ì1ÇÀµ½GILËø£¬Äõ½Ö´ÐÐȨÏÞ£¬¿ªÊ¼Ö´ÐУ¬È»ºó¼ÓÁËÒ»°ÑLock£¬»¹Ã»ÓÐÖ´ÐÐÍê±Ï£¬¼´Ïß³Ì1»¹Î´ÊÍ·ÅLock£¬ÓпÉÄÜÏß³Ì2ÇÀµ½GILËø£¬¿ªÊ¼Ö´ÐУ¬Ö´Ðйý³ÌÖз¢ÏÖLock»¹Ã»Óб»Ïß³Ì1ÊÍ·Å£¬ÓÚÊÇÏß³Ì2½øÈë×èÈû£¬±»¶á×ßÖ´ÐÐȨÏÞ£¬ÓпÉÄÜÏß³Ì1Äõ½GIL£¬È»ºóÕý³£Ö´Ðе½ÊÍ·ÅLock¡£¡£¡£Õâ¾Íµ¼ÖÂÁË´®ÐÐÔËÐеÄЧ¹û
¼ÈÈ»ÊÇ´®ÐУ¬ÄÇÎÒÃÇÖ´ÐÐ
t1.start()
t1.join
t2.start()
t2.join()
ÕâÒ²ÊÇ´®ÐÐÖ´Ðа¡£¬ÎªºÎ»¹Òª¼ÓLockÄØ£¬ÐèÖªjoinÊǵȴýt1ËùÓеĴúÂëÖ´ÐÐÍ꣬Ï൱ÓÚËø×¡ÁËt1µÄËùÓдúÂ룬¶øLockÖ»ÊÇËø×¡Ò»²¿·Ö²Ù×÷¹²ÏíÊý¾ÝµÄ´úÂë¡£
ÒòΪPython½âÊÍÆ÷°ïÄã×Ô¶¯¶¨ÆÚ½øÐÐÄÚ´æ»ØÊÕ£¬Äã¿ÉÒÔÀí½âΪpython½âÊÍÆ÷ÀïÓÐÒ»¸ö¶ÀÁ¢µÄỊ̈߳¬Ã¿¹ýÒ»¶Îʱ¼äËüÆðwake
up×öÒ»´ÎÈ«¾ÖÂÖѯ¿´¿´ÄÄЩÄÚ´æÊý¾ÝÊÇ¿ÉÒÔ±»Çå¿ÕµÄ£¬´ËʱÄã×Ô¼ºµÄ³ÌÐò ÀïµÄÏß³ÌºÍ py½âÊÍÆ÷×Ô¼ºµÄÏß³ÌÊDz¢·¢ÔËÐе쬼ÙÉèÄãµÄÏß³Ìɾ³ýÁËÒ»¸ö±äÁ¿£¬py½âÊÍÆ÷µÄÀ¬»ø»ØÊÕÏß³ÌÔÚÇå¿ÕÕâ¸ö±äÁ¿µÄ¹ý³ÌÖеÄclearingʱ¿Ì£¬¿ÉÄÜÒ»¸öÆäËüÏß³ÌÕýºÃÓÖÖØÐ¸øÕâ¸ö»¹Ã»À´¼°µÃÇå¿ÕµÄÄÚ´æ¿Õ¼ä¸³ÖµÁË£¬½á¹û¾ÍÓпÉÄÜи³ÖµµÄÊý¾Ý±»É¾³ýÁË£¬ÎªÁ˽â¾öÀàËÆµÄÎÊÌ⣬python½âÊÍÆ÷¼òµ¥´Ö±©µÄ¼ÓÁËËø£¬¼´µ±Ò»¸öÏß³ÌÔËÐÐʱ£¬ÆäËüÈ˶¼²»Äܶ¯£¬ÕâÑù¾Í½â¾öÁËÉÏÊöµÄÎÊÌ⣬
Õâ¿ÉÒÔ˵ÊÇPythonÔçÆÚ°æ±¾µÄÒÅÁôÎÊÌâ¡£¡¡
from threading
import Thread
import os,time
def work():
global n
temp=n
time.sleep(0.1)
n=temp-1
if __name__ == '__main__':
n=100
l=[]
for i in range(100):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) #½á¹û¿ÉÄÜΪ99 |
ËøÍ¨³£±»ÓÃÀ´ÊµÏÖ¶Ô¹²Ïí×ÊÔ´µÄͬ²½·ÃÎÊ¡£ÎªÃ¿Ò»¸ö¹²Ïí×ÊÔ´´´½¨Ò»¸öLock¶ÔÏ󣬵±ÄãÐèÒª·ÃÎʸÃ×ÊԴʱ£¬µ÷ÓÃacquire·½·¨À´»ñÈ¡Ëø¶ÔÏó£¨Èç¹ûÆäËüÏß³ÌÒѾ»ñµÃÁ˸ÃËø£¬Ôòµ±Ç°Ïß³ÌÐèµÈ´ýÆä±»ÊÍ·Å£©£¬´ý×ÊÔ´·ÃÎÊÍêºó£¬ÔÙµ÷ÓÃrelease·½·¨ÊÍ·ÅËø£º
import threading
R=threading.Lock()
R.acquire()
'''
¶Ô¹«¹²Êý¾ÝµÄ²Ù×÷
'''
R.release() |
from threading
import Thread,Lock
import os,time
def work():
global n
lock.acquire()
temp=n
time.sleep(0.1)
n=temp-1
lock.release()
if __name__ == '__main__':
lock=Lock()
n=100
l=[]
for i in range(100):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) #½á¹û¿Ï¶¨Îª0£¬ÓÉÔÀ´µÄ²¢·¢Ö´Ðбä³É´®ÐУ¬ÎþÉüÁËÖ´ÐÐЧÂʱ£Ö¤ÁËÊý¾Ý°²È« |
·ÖÎö£º
#1.100¸öÏß³ÌÈ¥ÇÀGILËø£¬¼´ÇÀÖ´ÐÐȨÏÞ
#2. ¿Ï¶¨ÓÐÒ»¸öÏß³ÌÏÈÇÀµ½GIL£¨ÔÝÇÒ³ÆÎªÏß³Ì1£©£¬È»ºó¿ªÊ¼Ö´ÐУ¬Ò»µ©Ö´ÐоͻáÄõ½lock.acquire()
#3. ¼«ÓпÉÄÜÏß³Ì1»¹Î´ÔËÐÐÍê±Ï£¬¾ÍÓÐÁíÍâÒ»¸öÏß³Ì2ÇÀµ½GIL£¬È»ºó¿ªÊ¼ÔËÐУ¬µ«Ïß³Ì2·¢ÏÖ»¥³âËølock»¹Î´±»Ïß³Ì1ÊÍ·Å£¬ÓÚÊÇ×èÈû£¬±»ÆÈ½»³öÖ´ÐÐȨÏÞ£¬¼´ÊÍ·ÅGIL
#4.Ö±µ½Ïß³Ì1ÖØÐÂÇÀµ½GIL£¬¿ªÊ¼´ÓÉÏ´ÎÔÝÍ£µÄλÖüÌÐøÖ´ÐУ¬Ö±µ½Õý³£ÊÍ·Å»¥³âËølock£¬È»ºóÆäËûµÄÏß³ÌÔÙÖØ¸´2
3 4µÄ¹ý³Ì
GILËøÓ뻥³âËø×ۺϷÖÎö£¨Öص㣡£¡£¡£©
#²»¼ÓËø:²¢·¢Ö´ÐÐ,ËÙ¶È¿ì,Êý¾Ý²»°²È«
from threading import current_thread,Thread,Lock
import os,time
def task():
global n
print('%s is running' %current_thread().getName())
temp=n
time.sleep(0.5)
n=temp-1
if __name__ == '__main__':
n=100
lock=Lock()
threads=[]
start_time=time.time()
for i in range(100):
t=Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
stop_time=time.time()
print('Ö÷:%s n:%s' %(stop_time-start_time,n))
'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
Ö÷:0.5216062068939209 n:99
'''
#²»¼ÓËø:δ¼ÓËø²¿·Ö²¢·¢Ö´ÐÐ,¼ÓËø²¿·Ö´®ÐÐÖ´ÐÐ,ËÙ¶ÈÂý,Êý¾Ý°²È«
from threading import current_thread,Thread,Lock
import os,time
def task():
#δ¼ÓËøµÄ´úÂë²¢·¢ÔËÐÐ
time.sleep(3)
print('%s start to run' %current_thread().getName())
global n
#¼ÓËøµÄ´úÂë´®ÐÐÔËÐÐ
lock.acquire()
temp=n
time.sleep(0.5)
n=temp-1
lock.release()
if __name__ == '__main__':
n=100
lock=Lock()
threads=[]
start_time=time.time()
for i in range(100):
t=Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
stop_time=time.time()
print('Ö÷:%s n:%s' %(stop_time-start_time,n))
'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
Ö÷:53.294203758239746 n:0
'''
#ÓеÄͬѧ¿ÉÄÜÓÐÒÉÎÊ:¼ÈÈ»¼ÓËø»áÈÃÔËÐбä³É´®ÐÐ,ÄÇôÎÒÔÚstartÖ®ºóÁ¢¼´Ê¹ÓÃjoin,¾Í²»ÓüÓËøÁ˰¡,Ò²ÊÇ´®ÐеÄЧ¹û°¡
#û´í:ÔÚstartÖ®ºóÁ¢¿ÌʹÓÃjion,¿Ï¶¨»á½«100¸öÈÎÎñµÄÖ´Ðбä³É´®ÐÐ,ºÁÎÞÒÉÎÊ,×îÖÕnµÄ½á¹ûÒ²¿Ï¶¨ÊÇ0,Êǰ²È«µÄ,µ«ÎÊÌâÊÇ
#startºóÁ¢¼´join:ÈÎÎñÄÚµÄËùÓдúÂë¶¼ÊÇ´®ÐÐÖ´ÐеÄ,¶ø¼ÓËø,Ö»ÊǼÓËøµÄ²¿·Ö¼´Ð޸Ĺ²ÏíÊý¾ÝµÄ²¿·ÖÊÇ´®ÐеÄ
#µ¥´Ó±£Ö¤Êý¾Ý°²È«·½Ãæ,¶þÕß¶¼¿ÉÒÔʵÏÖ,µ«ºÜÃ÷ÏÔÊǼÓËøµÄЧÂʸü¸ß.
from threading import current_thread,Thread,Lock
import os,time
def task():
time.sleep(3)
print('%s start to run' %current_thread().getName())
global n
temp=n
time.sleep(0.5)
n=temp-1
if __name__ == '__main__':
n=100
lock=Lock()
start_time=time.time()
for i in range(100):
t=Thread(target=task)
t.start()
t.join()
stop_time=time.time()
print('Ö÷:%s n:%s' %(stop_time-start_time,n))
'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
Ö÷:350.6937336921692 n:0 #ºÄʱÊǶàôµÄ¿Ö²À
'''
»¥³âËøÓëjoinµÄÇø±ð£¨Öص㣡£¡£¡£© |
°Ë ËÀËøÏÖÏóÓëµÝ¹éËø
½ø³ÌÒ²ÓÐËÀËøÓëµÝ¹éËø£¬ÔÚ½ø³ÌÄÇÀïÍü¼Ç˵ÁË£¬·Åµ½ÕâÀïÒ»ÇÐ˵Á˶î
ËùνËÀËø£º ÊÇÖ¸Á½¸ö»òÁ½¸öÒÔÉϵĽø³Ì»òÏß³ÌÔÚÖ´Ðйý³ÌÖУ¬ÒòÕù¶á×ÊÔ´¶øÔì³ÉµÄÒ»ÖÖ»¥ÏàµÈ´ýµÄÏÖÏó£¬ÈôÎÞÍâÁ¦×÷Óã¬ËüÃǶ¼½«ÎÞ·¨ÍƽøÏÂÈ¥¡£´Ëʱ³ÆÏµÍ³´¦ÓÚËÀËø×´Ì¬»òϵͳ²úÉúÁËËÀËø£¬ÕâЩÓÀÔ¶ÔÚ»¥ÏàµÈ´ýµÄ½ø³Ì³ÆÎªËÀËø½ø³Ì£¬ÈçϾÍÊÇËÀËø
from threading
import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('\033[41m%s Äõ½AËø\033[0m' %self.name)
mutexB.acquire()
print('\033[42m%s Äõ½BËø\033[0m' %self.name)
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print('\033[43m%s Äõ½BËø\033[0m' %self.name)
time.sleep(2)
mutexA.acquire()
print('\033[44m%s Äõ½AËø\033[0m' %self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t=MyThread()
t.start()
'''
Thread-1 Äõ½AËø
Thread-1 Äõ½BËø
Thread-1 Äõ½BËø
Thread-2 Äõ½AËø
È»ºó¾Í¿¨×¡£¬ËÀËøÁË
''' |
½â¾ö·½·¨£¬µÝ¹éËø£¬ÔÚPythonÖÐΪÁËÖ§³ÖÔÚͬһÏß³ÌÖжà´ÎÇëÇóͬһ×ÊÔ´£¬pythonÌṩÁË¿ÉÖØÈëËøRLock¡£
Õâ¸öRLockÄÚ²¿Î¬»¤×ÅÒ»¸öLockºÍÒ»¸öcounter±äÁ¿£¬counter¼Ç¼ÁËacquireµÄ´ÎÊý£¬´Ó¶øÊ¹µÃ×ÊÔ´¿ÉÒÔ±»¶à´Îrequire¡£Ö±µ½Ò»¸öÏß³ÌËùÓеÄacquire¶¼±»release£¬ÆäËûµÄÏ̲߳ÅÄÜ»ñµÃ×ÊÔ´¡£ÉÏÃæµÄÀý×ÓÈç¹ûʹÓÃRLock´úÌæLock£¬Ôò²»»á·¢ÉúËÀËø£º
| mutexA=mutexB=threading.RLock()
#Ò»¸öÏß³ÌÄõ½Ëø£¬counter¼Ó1,¸ÃÏß³ÌÄÚÓÖÅöµ½¼ÓËøµÄÇé¿ö£¬Ôòcounter¼ÌÐø¼Ó1£¬ÕâÆÚ¼äËùÓÐÆäËûÏ̶߳¼Ö»Äܵȴý£¬µÈ´ý¸ÃÏß³ÌÊÍ·ÅËùÓÐËø£¬¼´counterµÝ¼õµ½0Ϊֹ |
¾Å ÐźÅÁ¿Semaphore
ͬ½ø³ÌµÄÒ»Ñù
Semaphore¹ÜÀíÒ»¸öÄÚÖõļÆÊýÆ÷£¬
ÿµ±µ÷ÓÃacquire()ʱÄÚÖüÆÊýÆ÷-1£»
µ÷ÓÃrelease() ʱÄÚÖüÆÊýÆ÷+1£»
¼ÆÊýÆ÷²»ÄÜСÓÚ0£»µ±¼ÆÊýÆ÷Ϊ0ʱ£¬acquire()½«×èÈûÏß³ÌÖ±µ½ÆäËûÏ̵߳÷ÓÃrelease()¡£
ʵÀý£º(ͬʱֻÓÐ5¸öÏ߳̿ÉÒÔ»ñµÃsemaphore,¼´¿ÉÒÔÏÞÖÆ×î´óÁ¬½ÓÊýΪ5)£º
from threading
import Thread,Semaphore
import threading
import time
# def func():
# if sm.acquire():
# print (threading.currentThread().getName() +
' get semaphore')
# time.sleep(2)
# sm.release()
def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm=Semaphore(5)
for i in range(23):
t=Thread(target=func)
t.start() |
Óë½ø³Ì³ØÊÇÍêÈ«²»Í¬µÄ¸ÅÄ½ø³Ì³ØPool(4)£¬×î´óÖ»ÄܲúÉú4¸ö½ø³Ì£¬¶øÇÒ´ÓÍ·µ½Î²¶¼Ö»ÊÇÕâËĸö½ø³Ì£¬²»»á²úÉúÐµģ¬¶øÐźÅÁ¿ÊDzúÉúÒ»¶ÑÏß³Ì/½ø³Ì
Ê® Event
ͬ½ø³ÌµÄÒ»Ñù
Ï̵߳ÄÒ»¸ö¹Ø¼üÌØÐÔÊÇÿ¸öÏ̶߳¼ÊǶÀÁ¢ÔËÐÐÇÒ״̬²»¿ÉÔ¤²â¡£Èç¹û³ÌÐòÖÐµÄÆä
ËûÏß³ÌÐèҪͨ¹ýÅжÏij¸öÏ̵߳Ä״̬À´È·¶¨×Ô¼ºÏÂÒ»²½µÄ²Ù×÷,ÕâʱÏß³Ìͬ²½ÎÊÌâ¾Í»á±äµÃ·Ç³£¼¬ÊÖ¡£ÎªÁ˽â¾öÕâЩÎÊÌâ,ÎÒÃÇÐèҪʹÓÃthreading¿âÖеÄEvent¶ÔÏó¡£
¶ÔÏó°üº¬Ò»¸ö¿ÉÓÉÏß³ÌÉèÖõÄÐźűêÖ¾,ËüÔÊÐíÏ̵߳ȴýijЩʼþµÄ·¢Éú¡£ÔÚ ³õʼÇé¿öÏÂ,Event¶ÔÏóÖеÄÐźűêÖ¾±»ÉèÖÃΪ¼Ù¡£Èç¹ûÓÐÏ̵߳ȴýÒ»¸öEvent¶ÔÏó,
¶øÕâ¸öEvent¶ÔÏóµÄ±ê־Ϊ¼Ù,ÄÇôÕâ¸öÏ߳̽«»á±»Ò»Ö±×èÈûÖ±ÖÁ¸Ã±êÖ¾ÎªÕæ¡£Ò»¸öÏß³ÌÈç¹û½«Ò»¸öEvent¶ÔÏóµÄÐźűêÖ¾ÉèÖÃÎªÕæ,Ëü½«»½ÐÑËùÓеȴýÕâ¸öEvent¶ÔÏóµÄÏ̡߳£Èç¹ûÒ»¸öÏ̵߳ȴýÒ»¸öÒѾ±»ÉèÖÃÎªÕæµÄEvent¶ÔÏó,ÄÇôËü½«ºöÂÔÕâ¸öʼþ,
¼ÌÐøÖ´ÐÐ
event.isSet()£º·µ»ØeventµÄ״ֵ̬£»
event.wait()£ºÈç¹û event.isSet()==False½«×èÈûỊ̈߳»
event.set()£º ÉèÖÃeventµÄ״ֵ̬ΪTrue£¬ËùÓÐ×èÈû³ØµÄÏ̼߳¤»î½øÈë¾ÍÐ÷״̬£¬
µÈ´ý²Ù×÷ϵͳµ÷¶È£»
event.clear()£º»Ö¸´eventµÄ״ֵ̬ΪFalse¡£ |

ÀýÈ磬Óжà¸ö¹¤×÷Ï̳߳¢ÊÔÁ´½ÓMySQL£¬ÎÒÃÇÏëÒªÔÚÁ´½Óǰȷ±£MySQL·þÎñÕý³£²ÅÈÃÄÇЩ¹¤×÷Ïß³ÌÈ¥Á¬½ÓMySQL·þÎñÆ÷£¬Èç¹ûÁ¬½Ó²»³É¹¦£¬¶¼»áÈ¥³¢ÊÔÖØÐÂÁ¬½Ó¡£ÄÇôÎÒÃǾͿÉÒÔ²ÉÓÃthreading.Event»úÖÆÀ´Ðµ÷¸÷¸ö¹¤×÷Ï̵߳ÄÁ¬½Ó²Ù×÷
from threading
import Thread,Event
import threading
import time,random
def conn_mysql():
count=1
while not event.is_set():
if count > 3:
raise TimeoutError('Á´½Ó³¬Ê±')
print('<%s>µÚ%s´Î³¢ÊÔÁ´½Ó' % (threading.current_thread().getName(),
count))
event.wait(0.5)
count+=1
print('<%s>Á´½Ó³É¹¦' %threading.current_thread().getName())
def check_mysql():
print('\033[45m[%s]ÕýÔÚ¼ì²émysql\033[0m' % threading.current_thread().getName())
time.sleep(random.randint(2,4))
event.set()
if __name__ == '__main__':
event=Event()
conn1=Thread(target=conn_mysql)
conn2=Thread(target=conn_mysql)
check=Thread(target=check_mysql)
conn1.start()
conn2.start()
check.start() |
ʮһ Ìõ¼þCondition£¨Á˽⣩
ʹµÃÏ̵߳ȴý£¬Ö»ÓÐÂú×ãijÌõ¼þʱ£¬²ÅÊÍ·Ån¸öÏß³Ì
import threading
def run(n):
con.acquire()
con.wait()
print("run the thread: %s" %n)
con.release()
if __name__ == '__main__':
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input('>>>')
if inp == 'q':
break
con.acquire()
con.notify(int(inp))
con.release() |
def condition_func():
ret = False
inp = input('>>>')
if inp == '1':
ret = True
return ret
def run(n):
con.acquire()
con.wait_for(condition_func)
print("run the thread: %s" %n)
con.release()
if __name__ == '__main__':
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start() |
Ê®¶þ ¶¨Ê±Æ÷
¶¨Ê±Æ÷£¬Ö¸¶¨nÃëºóÖ´ÐÐij²Ù×÷
from threading
import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world"
will be printed |
from threading
import Timer
import random,time
class Code:
def __init__(self):
self.make_cache()
def make_cache(self,interval=5):
self.cache=self.make_code()
print(self.cache)
self.t=Timer(interval,self.make_cache)
self.t.start()
def make_code(self,n=4):
res=''
for i in range(n):
s1=str(random.randint(0,9))
s2=chr(random.randint(65,90))
res+=random.choice([s1,s2])
return res
def check(self):
while True:
inp=input('>>: ').strip()
if inp.upper() == self.cache:
print('ÑéÖ¤³É¹¦',end='\n')
self.t.cancel()
break
if __name__ == '__main__':
obj=Code()
obj.check()
ÑéÖ¤Â붨ʱÆ÷ |
Ê®Èý Ïß³Ìqueue
queue¶ÓÁÐ £ºÊ¹ÓÃimport queue£¬Ó÷¨Óë½ø³ÌQueueÒ»Ñù
queue is especially useful in threaded
programming when information must be exchanged safely
between multiple threads.
class queue.Queue(maxsize=0) #ÏȽøÏȳö
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
½á¹û(ÏȽøÏȳö):
first
second
third
''' |
class queue.LifoQueue(maxsize=0)
#last in fisrt out
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
½á¹û(ºó½øÏȳö):
third
second
first
''' |
class queue.PriorityQueue(maxsize=0)
#´æ´¢Êý¾Ýʱ¿ÉÉèÖÃÓÅÏȼ¶µÄ¶ÓÁÐ
import queue
q=queue.PriorityQueue()
#put½øÈëÒ»¸öÔª×é,Ôª×éµÄµÚÒ»¸öÔªËØÊÇÓÅÏȼ¶(ͨ³£ÊÇÊý×Ö,Ò²¿ÉÒÔÊÇ·ÇÊý×ÖÖ®¼äµÄ±È½Ï),Êý×ÖԽСÓÅÏȼ¶Ô½¸ß
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())
'''
½á¹û(Êý×ÖԽСÓÅÏȼ¶Ô½¸ß,ÓÅÏȼ¶¸ßµÄÓÅÏȳö¶Ó):
(10, 'b')
(20, 'a')
(30, 'c')
'' |
ÆäËû
Constructor
for a priority queue. maxsize is an integer that
sets the upperbound limit on the number of items
that can be placed in the queue. Insertion will
block once this size has been reached, until queue
items are consumed. If maxsize is less than or
equal to zero, the queue size is infinite.
The lowest valued entries are retrieved first
(the lowest valued entry is the one returned
by sorted(list(entries))[0]). A typical pattern
for entries is a tuple in the form: (priority_number,
data).
exception queue.Empty
Exception raised when non-blocking get() (or
get_nowait()) is called on a Queue object which
is empty.
exception queue.Full
Exception raised when non-blocking put() (or
put_nowait()) is called on a Queue object which
is full.
Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)
Put item into the queue. If optional args block
is true and timeout is None (the default), block
if necessary until a free slot is available.
If timeout is a positive number, it blocks at
most timeout seconds and raises the Full exception
if no free slot was available within that time.
Otherwise (block is false), put an item on the
queue if a free slot is immediately available,
else raise the Full exception (timeout is ignored
in that case).
Queue.put_nowait(item)
Equivalent to put(item, False).
Queue.get(block=True, timeout=None)
Remove and return an item from the queue. If
optional args block is true and timeout is None
(the default), block if necessary until an item
is available. If timeout is a positive number,
it blocks at most timeout seconds and raises
the Empty exception if no item was available
within that time. Otherwise (block is false),
return an item if one is immediately available,
else raise the Empty exception (timeout is ignored
in that case).
Queue.get_nowait()
Equivalent to get(False).
Two methods are offered to support tracking
whether enqueued tasks have been fully processed
by daemon consumer threads.
Queue.task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumer threads. For each get()
used to fetch a task, a subsequent call to task_done()
tells the queue that the processing on the task
is complete.
If a join() is currently blocking, it will
resume when all items have been processed (meaning
that a task_done() call was received for every
item that had been put() into the queue).
Raises a ValueError if called more times than
there were items placed in the queue.
Queue.join() blockÖ±µ½queue±»Ïû·ÑÍê±Ï |
Ê®ËÄ Python±ê׼ģ¿é--concurrent.futures
https://docs.python.org/dev/library /concurrent.futures.html
#1 ½éÉÜ
concurrent.futuresÄ£¿éÌṩÁ˸߶ȷâ×°µÄÒì²½µ÷ÓýӿÚ
ThreadPoolExecutor£ºÏ̳߳أ¬ÌṩÒì²½µ÷ÓÃ
ProcessPoolExecutor: ½ø³Ì³Ø£¬ÌṩÒì²½µ÷ÓÃ
Both implement the same interface, which is defined
by the abstract Executor class.
#2 »ù±¾·½·¨
#submit(fn, *args, **kwargs)
Òì²½Ìá½»ÈÎÎñ
#map(func, *iterables, timeout=None, chunksize=1)
È¡´úforÑ»·submitµÄ²Ù×÷
#shutdown(wait=True)
Ï൱ÓÚ½ø³Ì³ØµÄpool.close()+pool.join()²Ù×÷
wait=True£¬µÈ´ý³ØÄÚËùÓÐÈÎÎñÖ´ÐÐÍê±Ï»ØÊÕÍê×ÊÔ´ºó²Å¼ÌÐø
wait=False£¬Á¢¼´·µ»Ø£¬²¢²»»áµÈ´ý³ØÄÚµÄÈÎÎñÖ´ÐÐÍê±Ï
µ«²»¹Üwait²ÎÊýΪºÎÖµ£¬Õû¸ö³ÌÐò¶¼»áµÈµ½ËùÓÐÈÎÎñÖ´ÐÐÍê±Ï
submitºÍmap±ØÐëÔÚshutdown֮ǰ
#result(timeout=None)
È¡µÃ½á¹û
#add_done_callback(fn)
»Øµ÷º¯Êý |
#½éÉÜ
The ProcessPoolExecutor class is an Executor subclass
that uses a pool of processes to execute calls
asynchronously. ProcessPoolExecutor uses the multiprocessing
module, which allows it to side-step the Global
Interpreter Lock but also means that only picklable
objects can be executed and returned.
class concurrent.futures.ProcessPoolExecutor(max_workers=None,
mp_context=None)
An Executor subclass that executes calls asynchronously
using a pool of at most max_workers processes.
If max_workers is None or not given, it will
default to the number of processors on the machine.
If max_workers is lower or equal to 0, then
a ValueError will be raised.
#Ó÷¨
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ProcessPoolExecutor(max_workers=3)
futures=[]
for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)
print('+++>')
for future in futures:
print(future.result())
ProcessPoolExecutor |
#½éÉÜ
ThreadPoolExecutor is an Executor subclass that
uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None,
thread_name_prefix='')
An Executor subclass that uses a pool of at most
max_workers threads to execute calls asynchronously.
Changed in version 3.5: If max_workers is None
or not given, it will default to the number
of processors on the machine, multiplied by
5, assuming that ThreadPoolExecutor is often
used to overlap I/O instead of CPU work and
the number of workers should be higher than
the number of workers for ProcessPoolExecutor.
New in version 3.6: The thread_name_prefix
argument was added to allow users to control
the threading.Thread names for worker threads
created by the pool for easier debugging.
#Ó÷¨
ÓëProcessPoolExecutorÏàͬ
ThreadPoolExecutor |
from concurrent.futures
import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=3)
# for i in range(11):
# future=executor.submit(task,i)
executor.map(task,range(1,12)) #mapÈ¡´úÁËfor+submit
mapµÄÓ÷¨ |
from concurrent.futures
import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os
def get_pag e(url):
print('<½ø³Ì%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def parse_page(res):
res=res.result()
print('<½ø³Ì%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
# p=Pool(3)
# for url in urls:
# p.apply_async(get_page,args=(url,),callback=pasrse_page)
# p.close()
# p.join()
p=ProcessPoolExecutor(3)
for url in urls:
p.submit(get_page,url).add_done_callback(parse_page)
#parse_pageÄõ½µÄÊÇÒ»¸öfuture¶ÔÏóobj£¬ÐèÒªÓÃobj.result()Äõ½½á¹û
»Øµ÷º¯Êý |
|