python?協(xié)程并發(fā)數(shù)控制
前言:
本篇博客要采集的站點:【看歷史,通天下-歷史劇網(wǎng)】
目標數(shù)據(jù)是該站點下的熱門歷史事件,列表頁分頁規(guī)則如下所示:
http://www.lishiju.net/hotevents/p0 http://www.lishiju.net/hotevents/p1 http://www.lishiju.net/hotevents/p2
首先我們通過普通的多線程,對該數(shù)據(jù)進行采集,由于本文主要目的是學(xué)習(xí)如何控制并發(fā)數(shù),所以每頁僅輸出歷史事件的標題內(nèi)容。
普通的多線程代碼:
import threading
import time
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.__url = url
def run(self):
res = requests.get(url=self.__url)
soup = BeautifulSoup(res.text, 'html.parser')
title_tags = soup.find_all(attrs={'class': 'item-title'})
event_names = [item.a.text for item in title_tags]
print(event_names)
print("")
if __name__ == "__main__":
start_time = time.perf_counter()
threads = []
for i in range(111): # 創(chuàng)建了110個線程。
threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
for t in threads:
t.start() # 啟動了110個線程。
for t in threads:
t.join() # 等待線程結(jié)束
print("累計耗時:", time.perf_counter() - start_time)
# 累計耗時: 1.537718624上述代碼同時開啟所有線程,累計耗時 1.5 秒,程序采集結(jié)束。
多線程之信號量
python 信號量(Semaphore)用來控制線程并發(fā)數(shù),信號量管理一個內(nèi)置的計數(shù)器。 信號量對象每次調(diào)用其 acquire()方法時,信號量計數(shù)器執(zhí)行 -1 操作,調(diào)用 release()方法,計數(shù)器執(zhí)行 +1 操作,當計數(shù)器等于 0 時,acquire()方法會阻塞線程,一直等到其它線程調(diào)用 release()后,計數(shù)器重新 +1,線程的阻塞才會解除。
使用 threading.Semaphore()創(chuàng)建一個信號量對象。
修改上述并發(fā)代碼:
import threading
import time
import requests
from bs4 import BeautifulSoup
class MyThread(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.__url = url
def run(self):
if semaphore.acquire(): # 計數(shù)器 -1
print("正在采集:", self.__url)
res = requests.get(url=self.__url)
soup = BeautifulSoup(res.text, 'html.parser')
title_tags = soup.find_all(attrs={'class': 'item-title'})
event_names = [item.a.text for item in title_tags]
print(event_names)
print("")
semaphore.release() # 計數(shù)器 +1
if __name__ == "__main__":
semaphore = threading.Semaphore(5) # 控制每次最多執(zhí)行 5 個線程
start_time = time.perf_counter()
threads = []
for i in range(111): # 創(chuàng)建了110個線程。
threads.append(MyThread(url="http://www.lishiju.net/hotevents/p{}".format(i)))
for t in threads:
t.start() # 啟動了110個線程。
for t in threads:
t.join() # 等待線程結(jié)束
print("累計耗時:", time.perf_counter() - start_time)
# 累計耗時: 2.8005530640000003當控制并發(fā)線程數(shù)量之后,累計耗時變多。
補充知識點之 GIL:
GIL是 python 里面的全局解釋器鎖(互斥鎖),在同一進程,同一時間下,只能運行一個線程,這就導(dǎo)致了同一個進程下多個線程,只能實現(xiàn)并發(fā)而不能實現(xiàn)并行。
需要注意 python 語言并沒有全局解釋鎖,只是因為歷史的原因,在 CPython解析器中,無法移除 GIL,所以使用 CPython解析器,是會受到互斥鎖影響的。
還有一點是在編寫爬蟲程序時,多線程比單線程性能是有所提升的,因為遇到 I/O 阻塞會自動釋放 GIL鎖。
協(xié)程中使用信號量控制并發(fā)
下面將信號量管理并發(fā)數(shù),應(yīng)用到協(xié)程代碼中,在正式編寫前,使用協(xié)程寫法重構(gòu)上述代碼。
import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(url):
print("正在采集:", url)
async with aiohttp.request('GET', url) as res:
html = await res.text()
soup = BeautifulSoup(html, 'html.parser')
title_tags = soup.find_all(attrs={'class': 'item-title'})
event_names = [item.a.text for item in title_tags]
print(event_names)
async def main():
tasks = [asyncio.ensure_future(get_title("http://www.lishiju.net/hotevents/p{}".format(i))) for i in range(111)]
dones, pendings = await asyncio.wait(tasks)
# for task in dones:
# print(len(task.result()))
if __name__ == '__main__':
start_time = time.perf_counter()
asyncio.run(main())
print("代碼運行時間為:", time.perf_counter() - start_time)
# 代碼運行時間為: 1.6422313430000002代碼一次性并發(fā) 110 個協(xié)程,耗時 1.6 秒執(zhí)行完畢,接下來就對上述代碼,增加信號量管理代碼。
核心代碼是 semaphore = asyncio.Semaphore(10),控制事件循環(huán)中并發(fā)的協(xié)程數(shù)量。
import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(semaphore, url):
async with semaphore:
print("正在采集:", url)
async with aiohttp.request('GET', url) as res:
html = await res.text()
soup = BeautifulSoup(html, 'html.parser')
title_tags = soup.find_all(attrs={'class': 'item-title'})
event_names = [item.a.text for item in title_tags]
print(event_names)
async def main():
semaphore = asyncio.Semaphore(10) # 控制每次最多執(zhí)行 10 個線程
tasks = [asyncio.ensure_future(get_title(semaphore, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
range(111)]
dones, pendings = await asyncio.wait(tasks)
# for task in dones:
# print(len(task.result()))
if __name__ == '__main__':
start_time = time.perf_counter()
asyncio.run(main())
print("代碼運行時間為:", time.perf_counter() - start_time)
# 代碼運行時間為: 2.227831242aiohttp 中 TCPConnector 連接池
既然上述代碼已經(jīng)用到了 aiohttp 模塊,該模塊下通過限制同時連接數(shù),也可以控制線程并發(fā)數(shù)量,不過這個不是很好驗證,所以從數(shù)據(jù)上進行驗證,先設(shè)置控制并發(fā)數(shù)為 2,測試代碼運行時間為 5.56 秒,然后修改并發(fā)數(shù)為 10,得到的時間為 1.4 秒,與協(xié)程信號量控制并發(fā)數(shù)得到的時間一致。所以使用 TCPConnector 連接池控制并發(fā)數(shù)也是有效的。
import time
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def get_title(session, url):
async with session.get(url) as res:
print("正在采集:", url)
html = await res.text()
soup = BeautifulSoup(html, 'html.parser')
title_tags = soup.find_all(attrs={'class': 'item-title'})
event_names = [item.a.text for item in title_tags]
print(event_names)
async def main():
connector = aiohttp.TCPConnector(limit=1) # 限制同時連接數(shù)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [asyncio.ensure_future(get_title(session, "http://www.lishiju.net/hotevents/p{}".format(i))) for i in
range(111)]
await asyncio.wait(tasks)
if __name__ == '__main__':
start_time = time.perf_counter()
asyncio.run(main())
print("代碼運行時間為:", time.perf_counter() - start_time)到此這篇關(guān)于python 協(xié)程并發(fā)數(shù)控制的文章就介紹到這了,更多相關(guān)python 協(xié)程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python基礎(chǔ)之函數(shù)的定義和調(diào)用
這篇文章主要介紹了python函數(shù)的定義和調(diào)用,實例分析了Python中返回一個返回值與多個返回值的方法,需要的朋友可以參考下2021-10-10
使用Python構(gòu)建Hopfield網(wǎng)絡(luò)的教程
這篇文章主要介紹了使用Python構(gòu)建Hopfield網(wǎng)絡(luò)的教程,本文來自于IBM官方網(wǎng)站的技術(shù)文檔,需要的朋友可以參考下2015-04-04
Pytorch-mlu?實現(xiàn)添加逐層算子方法詳解
本文主要分享了在寒武紀設(shè)備上?pytorch-mlu?中添加逐層算子的方法教程,代碼具有一定學(xué)習(xí)價值,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-11-11

