python并發(fā)和異步編程實(shí)例
關(guān)于并發(fā)、并行、同步阻塞、異步非阻塞、線程、進(jìn)程、協(xié)程等這些概念,單純通過文字恐怕很難有比較深刻的理解,本文就通過代碼一步步實(shí)現(xiàn)這些并發(fā)和異步編程,并進(jìn)行比較。解釋器方面本文選擇python3,畢竟python3才是python的未來,并且python3用原生的庫實(shí)現(xiàn)協(xié)程已經(jīng)非常方便了。
1、準(zhǔn)備階段
下面為所有測試代碼所需要的包
#! python3 # coding:utf-8 import socket from concurrent import futures from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ import asyncio import aiohttp import time from time import ctime
在進(jìn)行不同實(shí)現(xiàn)方式的比較時(shí),實(shí)現(xiàn)場景就是在進(jìn)行爬蟲開發(fā)的時(shí)候通過向?qū)Ψ骄W(wǎng)站發(fā)起一系列的http請(qǐng)求訪問,統(tǒng)計(jì)耗時(shí)來判斷實(shí)現(xiàn)方式的優(yōu)劣,具體地,通過建立通信套接字,訪問新浪主頁,返回源碼,作為一次請(qǐng)求。先實(shí)現(xiàn)一個(gè)裝飾器用來統(tǒng)計(jì)函數(shù)的執(zhí)行時(shí)間:
def tsfunc(func):
def wrappedFunc(*args,**kargs):
start = time.clock()
action = func(*args,**kargs)
time_delta = time.clock() - start
print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
return action
return wrappedFunc
輸出的格式為:當(dāng)前時(shí)間,調(diào)用的函數(shù),函數(shù)的執(zhí)行時(shí)間。
2、阻塞/非阻塞和同步/異步
這兩對(duì)概念不是很好區(qū)分,從定義上理解:
阻塞:在進(jìn)行socket通信過程中,一個(gè)線程發(fā)起請(qǐng)求,如果當(dāng)前請(qǐng)求沒有返回結(jié)果,則進(jìn)入sleep狀態(tài),期間線程掛起不能做其他操作,直到有返回結(jié)果,或者超時(shí)(如果設(shè)置超時(shí)的話)。
非阻塞:與阻塞相似,只不過在等待請(qǐng)求結(jié)果時(shí),線程并不掛起而是進(jìn)行其他操作,即在不能立刻得到結(jié)果之前,該函數(shù)不會(huì)阻掛起當(dāng)前線程,而會(huì)立刻返回。
同步:同步和阻塞比較相似,但是二者并不是同一個(gè)概念,同步是指完成事件的邏輯,是指一件事完成之后,再完成第二件事,以此類推…
異步:異步和非阻塞比較類似,異步的概念和同步相對(duì)。當(dāng)一個(gè)異步過程調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果。實(shí)際處理這個(gè)調(diào)用的部件在完成后,通過狀態(tài)、通知和回調(diào)來通知調(diào)用者,實(shí)現(xiàn)異步的方式通俗講就是“等會(huì)再告訴你”。
1)阻塞方式
回到代碼上,首先實(shí)現(xiàn)阻塞方式的請(qǐng)求函數(shù):
def blocking_way():
sock = socket.socket()
sock.connect(('www.sina.com',80))
request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
return response
測試線程、多進(jìn)程和多線程
# 阻塞無并發(fā)
@tsfunc
def sync_way():
res = []
for i in range(10):
res.append(blocking_way())
return len(res)
@tsfunc
# 阻塞、多進(jìn)程
def process_way():
worker = 10
with futures.ProcessPoolExecutor(worker) as executor:
futs = {executor.submit(blocking_way) for i in range(10)}
return len([fut.result() for fut in futs])
# 阻塞、多線程
@tsfunc
def thread_way():
worker = 10
with futures.ThreadPoolExecutor(worker) as executor:
futs = {executor.submit(blocking_way) for i in range(10)}
return len([fut.result() for fut in futs])
運(yùn)行結(jié)果:
[Wed Dec 13 16:52:25 2017] sync_way() called, time delta: 0.06371647809425328 [Wed Dec 13 16:52:28 2017] process_way() called, time delta: 2.31437644946734 [Wed Dec 13 16:52:28 2017] thread_way() called, time delta: 0.010172946070299727
可見與非并發(fā)的方式相比,啟動(dòng)10個(gè)進(jìn)程完成10次請(qǐng)求訪問耗費(fèi)的時(shí)間最長,進(jìn)程確實(shí)需要很大的系統(tǒng)開銷,相比多線程則效果好得多,啟動(dòng)10個(gè)線程并發(fā)請(qǐng)求,比順序請(qǐng)求速度快了6倍左右。
2)非阻塞方式
實(shí)現(xiàn)非阻塞的請(qǐng)求代碼,與阻塞方式的區(qū)別在于等待請(qǐng)求時(shí)并不掛起而是直接返回,為了確保能正確讀取消息,最原始的方式就是循環(huán)讀取,知道讀取完成為跳出循環(huán),代碼如下:
def nonblocking_way():
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('www.sina.com', 80))
except BlockingIOError:
pass
request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
data = request.encode('ascii')
while True:
try:
sock.send(data)
break
except OSError:
pass
response = b''
while True:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
break
except OSError:
pass
return response
測試單線程異步非阻塞方式:
@tsfunc
def async_way():
res = []
for i in range(10):
res.append(nonblocking_way())
return len(res)
測試結(jié)果與單線程同步阻塞方式相比:
[Wed Dec 13 17:18:30 2017] sync_way() called, time delta: 0.07342884475822574 [Wed Dec 13 17:18:30 2017] async_way() called, time delta: 0.06509009095694886
非阻塞方式起到了一定的效果,但是并不明顯,原因肯定是讀取消息的時(shí)候雖然不是在線程掛起的時(shí)候而是在循環(huán)讀取消息的時(shí)候浪費(fèi)了時(shí)間,如果大部分時(shí)間讀浪費(fèi)了并沒有發(fā)揮異步編程的威力,解決的辦法就是后面要說的【事件驅(qū)動(dòng)】
3、回調(diào)、生成器和協(xié)程
a、回調(diào)
class Crawler():
def __init__(self,url):
self.url = url
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('www.sina.com',80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)
def connected(self,key,mask):
selector.unregister(key.fd)
get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
self.sock.send(get.encode('ascii'))
selector.register(key.fd,EVENT_READ,self.read_response)
def read_response(self,key,mask):
global stopped
while True:
try:
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
chunk = self.sock.recv(4096)
else:
selector.unregister(key.fd)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
break
except:
pass
def loop():
while not stopped:
events = selector.select()
for event_key,event_mask in events:
callback = event_key.data
callback(event_key,event_mask)
@tsfunc
def callback_way():
for url in urls_todo:
crawler = Crawler(url)
crawler.fetch()
loop1()
這是通過傳統(tǒng)回調(diào)方式實(shí)現(xiàn)的異步編程,結(jié)果如下:
[Tue Mar 27 17:52:49 2018] callback_way() called, time delta: 0.054735804048789374
b、生成器
class Crawler2:
def __init__(self, url):
self.url = url
self.response = b''
def fetch(self):
global stopped
sock = socket.socket()
yield from connect(sock, ('www.sina.com', 80))
get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
class Task:
def __init__(self, coro):
self.coro = coro
f = Future1()
f.set_result(None)
self.step(f)
def step(self, future):
try:
# send會(huì)進(jìn)入到coro執(zhí)行, 即fetch, 直到下次yield
# next_future 為yield返回的對(duì)象
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
def loop1():
while not stopped:
events = selector.select()
for event_key,event_mask in events:
callback = event_key.data
callback()
運(yùn)行結(jié)果如下:
[Tue Mar 27 17:54:27 2018] generate_way() called, time delta: 0.2914336347673473
c、協(xié)程
def nonblocking_way():
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('www.sina.com', 80))
except BlockingIOError:
pass
request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
data = request.encode('ascii')
while True:
try:
sock.send(data)
break
except OSError:
pass
response = b''
while True:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
break
except OSError:
pass
return response
@tsfunc
def asyncio_way():
tasks = [fetch(host+url) for url in urls_todo]
loop.run_until_complete(asyncio.gather(*tasks))
return (len(tasks))
運(yùn)行結(jié)果:
[Tue Mar 27 17:56:17 2018] asyncio_way() called, time delta: 0.43688060698484166
到此終于把并發(fā)和異步編程實(shí)例代碼測試完,下邊貼出全部代碼,共讀者自行測試,在任務(wù)量加大時(shí),相信結(jié)果會(huì)大不一樣。
#! python3
# coding:utf-8
import socket
from concurrent import futures
from selectors import DefaultSelector,EVENT_WRITE,EVENT_READ
import asyncio
import aiohttp
import time
from time import ctime
def tsfunc(func):
def wrappedFunc(*args,**kargs):
start = time.clock()
action = func(*args,**kargs)
time_delta = time.clock() - start
print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))
return action
return wrappedFunc
def blocking_way():
sock = socket.socket()
sock.connect(('www.sina.com',80))
request = 'GET / HTTP/1.0\r\nHOST:www.sina.com\r\n\r\n'
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
return response
def nonblocking_way():
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('www.sina.com', 80))
except BlockingIOError:
pass
request = 'GET / HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'
data = request.encode('ascii')
while True:
try:
sock.send(data)
break
except OSError:
pass
response = b''
while True:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
break
except OSError:
pass
return response
selector = DefaultSelector()
stopped = False
urls_todo = ['/','/1','/2','/3','/4','/5','/6','/7','/8','/9']
class Crawler():
def __init__(self,url):
self.url = url
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('www.sina.com',80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)
def connected(self,key,mask):
selector.unregister(key.fd)
get = 'GET {0} HTTP/1.0\r\nHost:www.sina.com\r\n\r\n'.format(self.url)
self.sock.send(get.encode('ascii'))
selector.register(key.fd,EVENT_READ,self.read_response)
def read_response(self,key,mask):
global stopped
while True:
try:
chunk = self.sock.recv(4096)
if chunk:
self.response += chunk
chunk = self.sock.recv(4096)
else:
selector.unregister(key.fd)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
break
except:
pass
def loop():
while not stopped:
events = selector.select()
for event_key,event_mask in events:
callback = event_key.data
callback(event_key,event_mask)
# 基于生成器的協(xié)程
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self,fn):
self._callbacks.append(fn)
def set_result(self,result):
self.result = result
for fn in self._callbacks:
fn(self)
class Crawler1():
def __init__(self,url):
self.url = url
self.response = b''
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('www.sina.com',80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None)
selector.register(sock.fileno(),EVENT_WRITE,on_connected)
yield f
selector.unregister(sock.fileno())
get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
sock.send(get.encode('ascii'))
global stopped
while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(),EVENT_READ,on_readable)
chunk = yield f
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
break
# yield from 改進(jìn)的生成器協(xié)程
class Future1:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self,fn):
self._callbacks.append(fn)
def set_result(self,result):
self.result = result
for fn in self._callbacks:
fn(self)
def __iter__(self):
yield self
return self.result
def connect(sock, address):
f = Future1()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from f
selector.unregister(sock.fileno())
def read(sock):
f = Future1()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from f
selector.unregister(sock.fileno())
return chunk
def read_all(sock):
response = []
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)
class Crawler2:
def __init__(self, url):
self.url = url
self.response = b''
def fetch(self):
global stopped
sock = socket.socket()
yield from connect(sock, ('www.sina.com', 80))
get = 'GET {0} HTTP/1.0\r\nHost: www.sina.com\r\n\r\n'.format(self.url)
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
class Task:
def __init__(self, coro):
self.coro = coro
f = Future1()
f.set_result(None)
self.step(f)
def step(self, future):
try:
# send會(huì)進(jìn)入到coro執(zhí)行, 即fetch, 直到下次yield
# next_future 為yield返回的對(duì)象
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
def loop1():
while not stopped:
events = selector.select()
for event_key,event_mask in events:
callback = event_key.data
callback()
# asyncio 協(xié)程
host = 'http://www.sina.com'
loop = asyncio.get_event_loop()
async def fetch(url):
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url) as response:
response = await response.read()
return response
@tsfunc
def asyncio_way():
tasks = [fetch(host+url) for url in urls_todo]
loop.run_until_complete(asyncio.gather(*tasks))
return (len(tasks))
@tsfunc
def sync_way():
res = []
for i in range(10):
res.append(blocking_way())
return len(res)
@tsfunc
def process_way():
worker = 10
with futures.ProcessPoolExecutor(worker) as executor:
futs = {executor.submit(blocking_way) for i in range(10)}
return len([fut.result() for fut in futs])
@tsfunc
def thread_way():
worker = 10
with futures.ThreadPoolExecutor(worker) as executor:
futs = {executor.submit(blocking_way) for i in range(10)}
return len([fut.result() for fut in futs])
@tsfunc
def async_way():
res = []
for i in range(10):
res.append(nonblocking_way())
return len(res)
@tsfunc
def callback_way():
for url in urls_todo:
crawler = Crawler(url)
crawler.fetch()
loop1()
@tsfunc
def generate_way():
for url in urls_todo:
crawler = Crawler2(url)
Task(crawler.fetch())
loop1()
if __name__ == '__main__':
#sync_way()
#process_way()
#thread_way()
#async_way()
#callback_way()
#generate_way()
asyncio_way()
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Linux環(huán)境下MySQL-python安裝過程分享
這篇文章主要介紹了Linux環(huán)境下MySQL-python安裝過程分享,本文使用的編譯方式安裝,需要的朋友可以參考下2015-02-02
手把手教你搭建python+selenium自動(dòng)化環(huán)境(圖文)
本文主要介紹了手把手教你搭建python+selenium自動(dòng)化環(huán)境,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06
Python Opencv提取圖片中某種顏色組成的圖形的方法
這篇文章主要介紹了Python Opencv提取圖片中某種顏色組成的圖形的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
Python+Tkinter簡單實(shí)現(xiàn)注冊(cè)登錄功能
這篇文章主要為大家詳細(xì)介紹了Python+Tkinter簡單實(shí)現(xiàn)注冊(cè)登錄功能,連接本地MySQL數(shù)據(jù)庫,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02
解決numpy和torch數(shù)據(jù)類型轉(zhuǎn)化的問題
這篇文章主要介紹了解決numpy和torch數(shù)據(jù)類型轉(zhuǎn)化的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-05-05
Python基于jieba分詞實(shí)現(xiàn)snownlp情感分析
情感分析(sentiment analysis)是2018年公布的計(jì)算機(jī)科學(xué)技術(shù)名詞,它可以根據(jù)文本內(nèi)容判斷出所代表的含義是積極的還是負(fù)面的等。本文將通過jieba分詞實(shí)現(xiàn)snownlp情感分析,感興趣的可以了解一下2023-01-01
python 把文件中的每一行以數(shù)組的元素放入數(shù)組中的方法
下面小編就為大家分享一篇python 把文件中的每一行以數(shù)組的元素放入數(shù)組中的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-04-04

