python基于concurrent模塊實現(xiàn)多線程
引言
之前也寫過多線程的博客,用的是 threading ,今天來講下 python 的另外一個自帶庫 concurrent 。concurrent 是在 Python3.2 中引入的,只用幾行代碼就可以編寫出線程池/進程池,并且計算型任務(wù)效率和 mutiprocessing.pool 提供的 poll 和 ThreadPoll 相比不分伯仲,而且在 IO 型任務(wù)由于引入了 Future 的概念效率要高數(shù)倍。而 threading 的話還要自己維護相關(guān)的隊列防止死鎖,代碼的可讀性也會下降,相反 concurrent 提供的線程池卻非常的便捷,不用自己操心死鎖以及編寫線程池代碼,由于異步的概念 IO 型任務(wù)也更有優(yōu)勢。
concurrent 的確很好用,主要提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 。一個多線程,一個多進程。但 concurrent 本質(zhì)上都是對 threading 和 mutiprocessing 的封裝??此脑创a可以知道,所以最底層并沒有異步。
ThreadPoolExecutor 自己提供了任務(wù)隊列,不需要自己寫了。而所謂的線程池,它只是簡單的比較當前的 threads 數(shù)量和定義的 max_workers 的大小,小于 max_workers 就允許任務(wù)創(chuàng)建線程執(zhí)行任務(wù)。
操作多線程/多進程
1、創(chuàng)建線程池
通過 ThreadPoolExecutor 類創(chuàng)建線程池對象,max_workers 設(shè)置最大運行線程數(shù)數(shù)。使用 ThreadPoolExecutor 的好處是不用擔心線程死鎖問題,讓多線程編程更簡潔。
from concurrent import futures pool = futures.ThreadPoolExecutor(max_workers = 2)
2、submit
submit(self, fn, *args, **kwargs):
- fn:需要異步執(zhí)行的函數(shù)
- *args,**kwargs:fn 接受的參數(shù)
該方法的作用就是提交一個可執(zhí)行的回調(diào)task,它返回一個Future對象??梢钥闯龃朔椒ú粫枞骶€程的執(zhí)行。
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
for url in urls:
task = pool.submit(get_request,url)
print('{}主線程'.format(datetime.datetime.now()))
time.sleep(2)
# 輸出結(jié)果
2021-03-12 15:29:10.780141:主線程
2021-03-12 15:29:10.865425:https://www.baidu.com 200
2021-03-12 15:29:10.923062:https://www.tmall.com 200
2021-03-12 15:29:10.940930:https://www.jd.com 200
3、map
map(self, fn, *iterables, timeout=None, chunksize=1):
- fn:需要異步執(zhí)行的函數(shù)
- *iterables:可迭代對象
map 第二個參數(shù)是可迭代對象,比如 list、tuple 等,寫法相對簡單。map 方法也不會阻塞主線程的執(zhí)行。
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks = pool.map(get_request,urls)
print('{}:主線程'.format(datetime.datetime.now()))
time.sleep(2)
# 輸出結(jié)果
2021-03-12 16:14:04.854452:主線程
2021-03-12 16:14:04.938870:https://www.baidu.com 200
2021-03-12 16:14:05.033849:https://www.jd.com 200
2021-03-12 16:14:05.048952:https://www.tmall.com 200
4、wait
如果要等待子線程執(zhí)行完之后再執(zhí)行主線程要怎么辦呢,可以通過 wait 。
wait(fs, timeout=None, return_when=ALL_COMPLETED):
- fs:所有任務(wù) tasks
- return_when:有三個參數(shù) FIRST_COMPLETED:只要有一個子線程完成則返回結(jié)果。 FIRST_EXCEPTION:只要有一個子線程拋異常則返回結(jié)果,若沒有異常則等同于ALL_COMPLETED。 ALL_COMPLETED:默認參數(shù),等待所有子線程完成。
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks =[]
for url in urls:
task = pool.submit(get_request,url)
tasks.append(task)
futures.wait(tasks)
print('{}:主線程'.format(datetime.datetime.now()))
time.sleep(2)
# 輸出結(jié)果
2021-03-12 16:30:13.437042:https://www.baidu.com 200
2021-03-12 16:30:13.552700:https://www.jd.com 200
2021-03-12 16:30:14.117325:https://www.tmall.com 200
2021-03-12 16:30:14.118284:主線程
5、異常處理
as_completed(fs, timeout=None)
- 所有任務(wù) tasks
使用 concurrent.futures 操作 多線程/多進程 過程中,很多函數(shù)報錯并不會直接終止程序,而是什么都沒發(fā)生。使用 as_completed 可以捕獲異常,代碼如下
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['www.baidu.com','https://www.tmall.com','https://www.jd.com']
# 創(chuàng)建線程池
pool = futures.ThreadPoolExecutor(max_workers = 2)
tasks =[]
for url in urls:
task = pool.submit(get_request,url)
tasks.append(task)
# 異常捕獲
errors = futures.as_completed(tasks)
for error in errors:
# error.result() 等待子線程都完成,并拋出異常,中斷主線程
# 捕獲子線程異常,不會終止主線程繼續(xù)運行
print(error.exception())
futures.wait(tasks)
print('{}:主線程'.format(datetime.datetime.now()))
time.sleep(2)
# 輸出結(jié)果
Invalid URL 'www.baidu.com': No schema supplied. Perhaps you meant http://www.baidu.com?
2021-03-12 17:24:26.984933:https://www.tmall.com 200
None
2021-03-12 17:24:26.993939:https://www.jd.com 200
None
2021-03-12 17:24:26.994937:主線程
多進程編程也類似,將 ThreadPoolExecutor 替換成 ProcessPoolExecutor 。
以上就是python基于concurrent模塊實現(xiàn)多線程的詳細內(nèi)容,更多關(guān)于python concurrent實現(xiàn)多線程的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
下載python中Crypto庫報錯:ModuleNotFoundError: No module named ‘Cry
Crypto不是自帶的模塊,需要下載。下面這篇文章主要給大家介紹了關(guān)于下載python中Crypto庫報錯:ModuleNotFoundError: No module named 'Crypto'的解決方法,文中通過圖文介紹的非常詳細,需要的朋友可以參考下。2018-04-04
Python中enumerate函數(shù)及其應(yīng)用詳解
在 Python 編程中,enumerate 函數(shù)是一個非常實用的工具,它能夠?qū)⒁粋€可迭代對象組合為一個索引序列,同時列出數(shù)據(jù)和數(shù)據(jù)下標,這種功能在處理列表、元組、字符串等可迭代對象時非常有用,尤其是在需要同時獲取每個元素的索引和值的情況下,需要的朋友可以參考下2025-01-01
python批量合成bilibili的m4s緩存文件為MP4格式 ver2.5
這篇文章主要介紹了python批量合成bilibili的m4s緩存文件為MP4格式 ver2.5的相關(guān)知識,本文給大家介紹的非常詳細,對大家的學(xué)習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12
Python存儲List數(shù)據(jù)到文件(text/csv/excel)幾種常見方法
在數(shù)據(jù)分析中經(jīng)常需要從csv格式的文件中存取數(shù)據(jù)以及將數(shù)據(jù)寫書到csv文件中,下面這篇文章主要給大家介紹了關(guān)于Python存儲List數(shù)據(jù)到文件(text/csv/excel)的幾種常見方法,需要的朋友可以參考下2024-02-02
Python解決多線程運行異步代碼報錯"There?is?no?current?event?loop
在Python開發(fā)中,我們經(jīng)常需要同時處理高并發(fā)網(wǎng)絡(luò)請求和CPU密集型任務(wù),不過當嘗試在多線程環(huán)境中運行異步代碼時,可能會報錯"There?is?no?current?event?loop",下面我們看看具體的解決方法吧2025-04-04

