| 
                           
                            | 
								
										
                                    | 编辑推荐: |  
										
                                    | 本文来自于cnblogs,文章主要总结了python并发编程并发和并行,同步和异步,阻塞和非阻塞等相关知识。 |  |  最近在学python的网络编程,学了socket通信,并利用socket实现了一个具有用户验证功能,可以上传下载文件、可以实现命令行功能,创建和删除文件夹,可以实现的断点续传等功能的FTP服务器。但在这当中,发现一些概念区分起来很难,比如并发和并行,同步和异步,阻塞和非阻塞,但是这些概念却很重要。因此在此把它总结下来。 1.并发 & 并行 并发:在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。简言之,是指系统具有处理多个任务的能力。 并行:当系统有一个以上CPU时,则线程的操作有可能非并发。当一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源,可以同时进行,这种方式我们称之为并行(Parallel)。简言之,是指系统具有同时处理多个任务的能力。 下面我们来两个例子: 
 								 
         							
                              | import threading 
                                #线程 import time
 
 
 def music():
 print('begin to listen music {}'.format(time.ctime()))
 time.sleep(3)
 print('stop to listen music {}'.format(time.ctime()))
 
 
 def game():
 print('begin to play game {}'.format(time.ctime()))
 time.sleep(5)
 print('stop to play game {}'.format(time.ctime()))
 
 
 if __name__ == '__main__':
 music()
 game()
 print('ending.....')
 |  
 music的时间为3秒,game的时间为5秒,如果按照我们正常的执行,直接执行函数,那么将按顺序顺序执行,整个过程8秒。 
                             
                              | import threading 
                                #线程 import time
 
 
 def music():
 print('begin to listen music {}'.format(time.ctime()))
 time.sleep(3)
 print('stop to listen music {}'.format(time.ctime()))
 
 
 def game():
 print('begin to play game {}'.format(time.ctime()))
 time.sleep(5)
 print('stop to play game {}'.format(time.ctime()))
 
 
 if __name__ == '__main__':
 t1 = threading.Thread(target=music) #创建一个线程对象t1 
                                  子线程
 t2 = threading.Thread(target=game) #创建一个线程对象t2 
                                  子线程
 
 t1.start()
 t2.start()
 
 # t1.join() #等待子线程执行完 t1不执行完,谁也不准往下走
 t2.join()
 
 print('ending.......') #主线程
 print(time.ctime())
 |  
 在这个例子中,我们开了两个线程,将music和game两个函数分别通过线程执行,运行结果显示两个线程同时开始,由于听音乐时间3秒,玩游戏时间5秒,所以整个过程完成时间为5秒。我们发现,通过开启多个线程,原本8秒的时间缩短为5秒,原本顺序执行现在是不是看起来好像是并行执行的?看起来好像是这样,听音乐的同时在玩游戏,整个过程的时间随最长的任务时间变化。但真的是这样吗?那么下面我来提出一个GIL锁的概念。 GIL(全局解释器锁):无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。 
                             
                              | import time from threading import Thread
 
 
 def add():
 sum = 0
 i = 1
 while i<=1000000:
 sum += i
 i += 1
 print('sum:',sum)
 
 
 def mul():
 sum2 = 1
 i = 1
 while i<=100000:
 sum2 = sum2 * i
 i += 1
 print('sum2:',sum2)
 
 
 start = time.time()
 
 add()
 mul() #串行比多线程还快
 
 print('cost time %s'%(time.time()-start))
 |  
 
                             
                              | import time from threading import Thread
 
 
 def add():
 sum = 0
 i = 1
 while i<=1000000:
 sum += i
 i += 1
 print('sum:',sum)
 
 
 def mul():
 sum2 = 1
 i = 1
 while i<=100000:
 sum2 = sum2 * i
 i += 1
 print('sum2:',sum2)
 
 
 start = time.time()
 t1 = Thread(target=add)
 t2 = Thread(target=mul)
 
 l = []
 l.append(t1)
 l.append(t2)
 
 for t in l:
 t.start()
 
 for t in l:
 t.join()
 
 print('cost time %s'%(time.time()-start))
 |  
 哎吆,这是怎么回事,串行执行比多线程还快?不符合常理呀。是不是颠覆了你的人生观,这个就和GIL锁有关,同一时刻,系统只允许一个线程执行,那么,就是说,本质上我们之前理解的多线程的并行是不存在的,那么之前的例子为什么时间确实缩短了呢?这里有涉及到一个任务的类型。 --任务: 1.IO密集型(会有cpu空闲的时间) 注:sleep等同于IO操作, socket通信也是IO 
                             2.计算密集型 而之前那个例子恰好是IO密集型的例子,后面这个由于涉及到了加法和乘法,属于计算密集型操作,那么,就产生了一个结论,多线程对于IO密集型任务有作用, 而计算密集型任务不推荐使用多线程。 而其中我们还可以得到一个结论:由于GIL锁,多线程不可能真正实现并行,所谓的并行也只是宏观上并行微观上并发,本质上是由于遇到io操作不断的cpu切换 所造成并行的现象。由于cpu切换速度极快,所以看起来就像是在同时执行。 --问题:没有利用多核的优势 --这就造成了多线程不能同时执行,并且增加了切换的开销,串行的效率可能更高。 2.同步 & 异步 对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段: 1. 等待数据准备 (Waiting for the data to be ready) 2. 将数据从内核拷贝到进程中 (Copying the data from the kernel 
                            to the process) 同步:当进程执行IO(等待外部数据)的时候,-----等。同步(例如打电话的时候必须等) 异步:当进程执行IO(等待外部数据)的时候,-----不等,去执行其他任务,一直等到数据接收成功,再回来处理。异步(例如发短信) 当我们去爬取一个网页的时候,要爬取多个网站,有些人可能会发起多个请求,然后通过函数顺序调用。执行顺序也是先调用先执行。效率非常低。 下面我们看一下异步的一个例子: 
                             
                              | import socket import select
 """
 ########http请求本质,IO阻塞########
 sk = socket.socket()
 #1.连接
 sk.connect(('www.baidu.com',80,)) #阻塞
 print('连接成功了')
 #2.连接成功后发送消息
 sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")
 
 #3.等待服务端响应
 data = sk.recv(8096)#阻塞
 print(data) #\r\n\r\n区分响应头和影响体
 
 #关闭连接
 sk.close()
 """
 """
 ########http请求本质,IO非阻塞########
 sk = socket.socket()
 sk.setblocking(False)
 #1.连接
 try:
 sk.connect(('www.baidu.com',80,)) #非阻塞,但会报错
 print('连接成功了')
 except BlockingIOError as e:
 print(e)
 
 #2.连接成功后发送消息
 sk.send(b"GET / HTTP/1.0\r\nHost: baidu.com\r\n\r\n")
 
 #3.等待服务端响应
 data = sk.recv(8096)#阻塞
 print(data) #\r\n\r\n区分响应头和影响体
 
 #关闭连接
 sk.close()
 """
 
 
 class HttpRequest:
 def __init__(self,sk,host,callback):
 self.socket = sk
 self.host = host
 self.callback = callback
 
 def fileno(self):
 return self.socket.fileno()
 
 
 class HttpResponse:
 def __init__(self,recv_data):
 self.recv_data = recv_data
 self.header_dict = {}
 self.body = None
 
 self.initialize()
 
 def initialize(self):
 headers, body = self.recv_data.split(b'\r\n\r\n', 
                                  1)
 self.body = body
 header_list = headers.split(b'\r\n')
 for h in header_list:
 h_str = str(h,encoding='utf-8')
 v = h_str.split(':',1)
 if len(v) == 2:
 self.header_dict[v[0]] = v[1]
 
 
 class AsyncRequest:
 def __init__(self):
 self.conn = []
 self.connection = [] # 用于检测是否已经连接成功
 
 def add_request(self,host,callback):
 try:
 sk = socket.socket()
 sk.setblocking(0)
 sk.connect((host,80))
 except BlockingIOError as e:
 pass
 request = HttpRequest(sk,host,callback)
 self.conn.append(request)
 self.connection.append(request)
 
 def run(self):
 
 while True:
 rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
 for w in wlist:
 print(w.host,'连接成功...')
 # 只要能循环到,表示socket和服务器端已经连接成功
 tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" 
                                  %(w.host,)
 w.socket.send(bytes(tpl,encoding='utf-8'))
 self.connection.remove(w)
 for r in rlist:
 # r,是HttpRequest
 recv_data = bytes()
 while True:
 try:
 chunck = r.socket.recv(8096)
 recv_data += chunck
 except Exception as e:
 break
 response = HttpResponse(recv_data)
 r.callback(response)
 r.socket.close()
 self.conn.remove(r)
 if len(self.conn) == 0:
 break
 
 
 def f1(response):
 print('保存到文件',response.header_dict)
 
 
 def f2(response):
 print('保存到数据库', response.header_dict)
 
 
 url_list = [
 {'host':'www.youku.com','callback': f1},
 {'host':'v.qq.com','callback': f2},
 {'host':'www.cnblogs.com','callback': f2},
 ]
 
 req = AsyncRequest()
 for item in url_list:
 req.add_request(item['host'],item['callback'])
 
 req.run()
 |  
 我们可以看到,三个请求发送顺序与返回顺序,并不一样,这样就体现了异步请求。即我同时将请求发送出去,哪个先回来我先处理哪个。 即我们可以理解为:我打电话的时候只允许和一个人通信,和这个人通信结束之后才允许和另一个人开始。这就是同步。 我们发短信的时候发完可以不去等待,去处理其他事情,当他回复之后我们再去处理,这样就大大解放了我们的时间。这就是异步。 体现在网页请求上面就是我请求一个网页时候等待他回复,否则不接收其它请求,这就是同步。另一种就是我发送请求之后不去等待他是否回复,而去处理其它请求,当处理完其他请求之后,某个请求说,我的回复了,然后程序转而去处理他的回复数据。这就是异步请求。所以,异步可以充分cpu的效率。 3.阻塞 & 非阻塞 调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking 
                            IO在kernel还准备数据的情况下会立刻返回。 下面我们通过socket实现一个命令行功能来感受一下。 
                             
                              | #服务端 from socket import *
 import subprocess
 import struct
 
 ip_port = ('127.0.0.1', 8000)
 buffer_size = 1024
 backlog = 5
 
 tcp_server = socket(AF_INET, SOCK_STREAM)
 tcp_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 
                                  #就是它,在bind前加
 tcp_server.bind(ip_port)
 tcp_server.listen(backlog)
 
 while True:
 conn, addr = tcp_server.accept()
 print('新的客户端链接:', addr)
 while True:
 try:
 cmd = conn.recv(buffer_size)
 print('收到客户端命令:', cmd.decode('utf-8'))
 
 #执行命令cmd,得到命令的结果cmd_res
 res = subprocess.Popen(cmd.decode('utf-8'),shell=True,
 stderr=subprocess.PIPE,
 stdout=subprocess.PIPE,
 stdin=subprocess.PIPE,
 )
 err = res.stderr.read()
 if err:
 cmd_res = err
 else:
 cmd_res = res.stdout.read()
 if not cmd_res:
 cmd_res = '执行成功'.encode('gbk')
 #解决粘包
 length = len(cmd_res)
 data_length = struct.pack('i',length)
 conn.send(data_length)
 conn.send(cmd_res)
 except Exception as e:
 print(e)
 break
 conn.close()
 
 
 #客户端
 from socket import *
 
 ip_port = ('127.0.0.1',8000)
 buffer_size = 1024
 backlog = 5
 
 tcp_client = socket(AF_INET,SOCK_STREAM)
 tcp_client.connect(ip_port)
 
 while True:
 cmd = input('>>:').strip()
 if not cmd:
 continue
 if cmd == 'quit':
 break
 tcp_client.send(cmd.encode('utf-8'))
 
 #解决粘包
 length = tcp_client.recv(4)
 length = struct.unpack('i',length)[0]
 
 recv_size = 0
 recv_msg = b''
 while recv_size < length:
 recv_msg += tcp_client.recv(buffer_size)
 recv_size = len(recv_msg)
 
 print(recv_msg.decode('gbk'))
 |  
 开启了服务器和一个客户端之后,我们在客户端输入一些命令,然后正确显示,功能实现。这是在我再打开一个客户端,输入命令,发现服务器迟迟没有响应。 
 这个就是当一个客户端在请求的时候,当这个客户端没有结束的时候,服务器不会去处理其他客户端的请求。这时候就阻塞了。 如何让服务器同时处理多个客户端请求呢? 
                             
                              | #服务端 import socketserver
 
 class Myserver(socketserver.BaseRequestHandler):
 """socketserver内置的通信方法"""
 def handle(self):
 print('conn is:',self.request) #conn
 print('addr is:',self.client_address) #addr
 
 while True:
 try:
 #发消息
 data = self.request.recv(1024)
 if not data:break
 print('收到的客户端消息是:',data.decode('utf-8'),self.client_address)
 
 #发消息
 self.request.sendall(data.upper())
 except Exception as e:
 print(e)
 break
 
 
 if __name__ == '__main__':
 s = socketserver.ThreadingTCPServer(('127.0.0.1',8000), 
                                  Myserver) #通信循环
 # s = socketserver.ForkingTCPServer(('127.0.0.1',8000), 
                                  Myserver) #通信循环
 print(s.server_address)
 print(s.RequestHandlerClass)
 print(Myserver)
 print(s.socket)
 s.serve_forever()
 
 #客户端
 
 from socket import *
 
 ip_port = ('127.0.0.1',8000)
 buffer_size = 1024
 backlog = 5
 
 tcp_client = socket(AF_INET,SOCK_STREAM)
 tcp_client.connect(ip_port)
 
 while True:
 msg = input('>>:').strip()
 if not msg:continue
 if msg == 'quit':break
 
 tcp_client.send(msg.encode('utf-8'))
 
 data = tcp_client.recv(buffer_size)
 print(data.decode('utf-8'))
 
 tcp_client.close()
 |    
 
 这段代码通过socketserver模块实现了socket的并发。这个过程中,当一个客户端在向服务器请求的时候,另一个客户端也可以正常请求。服务器在处理一个客户端请求的时候,另一个请求没有被阻塞。 总结:只要有一丁点阻塞,就是阻塞IO。 异步IO的特点就是全程无阻塞。 有些人常把同步阻塞和异步非阻塞联系起来,但实际上经过分析,阻塞与同步,非阻塞和异步的定义是不一样的。同步和异步的区别是遇到IO请求是否等待。阻塞和非阻塞的区别是数据没准备好的情况下是否立即返回。同步可能是阻塞的,也可能是非阻塞的,而非阻塞的有可能是同步的,也有可能是异步的。 这里面其实涉及到的知识点还很多,这里只是凭我的记忆简单总结了一下,以后会补充更多。 |