Flask中嵌套啟動(dòng)子線程的方法示例詳解
正文
如果你在Flask中啟動(dòng)過(guò)子線程,然后在子線程中讀寫過(guò)g對(duì)象或者嘗試從request對(duì)象中讀取url參數(shù),那么,你肯定對(duì)下面這個(gè)報(bào)錯(cuò)不陌生:RuntimeError: Working outside of request context..
例如下面這段Flask代碼:
import threading
from flask import Flask, request
app = Flask(__name__)
def inner_func():
doc_id = request.args.get('doc_id', '')
print(f'用戶ID為:{doc_id}')
@app.route('/start_thread')
def start_thread():
thread = threading.Thread(target=inner_func)
thread.start()
return {'success': True, 'msg': '獲取用戶ID成功!'}
請(qǐng)求/start_thread接口就會(huì)報(bào)錯(cuò),如下圖所示:

如果你在網(wǎng)上搜索flask thread RuntimeError: Working outside of request context. ,那么你可能會(huì)看到官方文檔或者StackOverFlow上面提供了一個(gè)裝飾器@copy_current_request_context。如下圖所示:

照著它這樣寫,確實(shí)能解決問(wèn)題,如下圖所示:

但無(wú)論是官網(wǎng)還是StackOverFlow,它的例子都非常簡(jiǎn)單。但是我們知道,啟動(dòng)線程有很多種方法,例如:
# 方法一,啟動(dòng)簡(jiǎn)單線程
import threading
job = threading.Thread(target=函數(shù)名, args=(參數(shù)1, 參數(shù)2), kwargs={'參數(shù)3': xxx, '參數(shù)4': yyy})
job.start()
# 方法2,使用類定義線程
import threading
class Job(threading.Thread):
def __init__(self, 參數(shù)):
super().__init__()
def run(self):
print('子線程開(kāi)始運(yùn)行')
job = Job(參數(shù))
job.start()
# 方法3,使用線程池
from multiprocessing.dummy import Pool
pool = Pool(5) # 5個(gè)線程的線程池
pool.map(函數(shù)名, 參數(shù)列表)
網(wǎng)上的方法只能解決第一種寫法的問(wèn)題。如果想使用方法2和方法3啟動(dòng)子線程,代碼應(yīng)該怎么寫呢?如果在子線程中又啟動(dòng)子線程,再用一次@copy_current_request_context還行嗎?
相信我,你在網(wǎng)上搜索一下午,只有兩種結(jié)果:一是找不到答案,二是找到的答案是晚于2023年1月14日的,因?yàn)槭莿e人看了我這篇文章以后,再寫的。
解答上面的問(wèn)題前,還是說(shuō)明一下我對(duì)于在后端啟動(dòng)子線程這個(gè)行為的觀點(diǎn)。例如有些人喜歡在后端掛一個(gè)爬蟲,請(qǐng)求接口以后,通過(guò)線程啟動(dòng)爬蟲,爬蟲開(kāi)始爬數(shù)據(jù)。又或者,有些人在后端上面掛了一些復(fù)雜的程序代碼,請(qǐng)求接口以后,后端啟動(dòng)子線程,在子線程中運(yùn)行這些代碼。
我一向是不建議在后端又啟動(dòng)子線程去做復(fù)雜操作的。無(wú)論你使用的是Flask還是Django還是FastAPI。正確的做法應(yīng)該是使用消息隊(duì)列,后端只是把觸發(fā)任務(wù)的相關(guān)參數(shù)發(fā)送到消息隊(duì)列中。下游真正的運(yùn)行程序從消息隊(duì)列讀取到觸發(fā)參數(shù)以后,開(kāi)始運(yùn)行。
但有時(shí)候,你可能綜合考慮性價(jià)比,覺(jué)得再增加一個(gè)消息隊(duì)列,成本太高;或者干脆是要趕工期,不得不先暫時(shí)使用多線程來(lái)解決問(wèn)題,那么這篇文章將會(huì)極大幫助到你。
盡量不要在子線程中讀取請(qǐng)求相關(guān)的參數(shù)
如果你的子線程不需要讀寫g對(duì)象,也不需要從請(qǐng)求中讀取各種參數(shù),那么你就可以關(guān)閉這篇文章了。因?yàn)槟愕淖泳€程可以直接運(yùn)行,不會(huì)遇到什么的問(wèn)題,例如:

所以最好的解決方法,就是在啟動(dòng)子線程之前,提前先獲取到子線程需要的每一個(gè)參數(shù),然后把這些參數(shù)在啟動(dòng)子線程的時(shí)候作為函數(shù)參數(shù)傳進(jìn)去。如果你是從零開(kāi)始寫代碼,那么一開(kāi)始這樣做,就可以幫你避免很多麻煩。
但如果你是修改已有的代碼,并且嵌套太深,已經(jīng)沒(méi)有辦法一層一層傳入?yún)?shù),或者代碼量太大,不知道哪些地方悄悄調(diào)用了g對(duì)象或者讀寫了請(qǐng)求上下文,那么你可以繼續(xù)往下看。
裝飾閉包函數(shù)而不是一級(jí)函數(shù)
上面的簡(jiǎn)單多線程寫法,有一個(gè)地方需要特別注意,被@copy_current_request_context裝飾的子線程入口函數(shù)inner_func,必須是閉包函數(shù),不能是一級(jí)函數(shù)。如下圖所示:

如果不小心裝飾了一級(jí)函數(shù),就會(huì)報(bào)如下的錯(cuò)誤:

線程池復(fù)制請(qǐng)求上下文
當(dāng)我們使用multiprocessing.dummy來(lái)實(shí)現(xiàn)線程池時(shí),代碼如下:
from multiprocessing.dummy import Pool
from flask import Flask, request, copy_current_request_context, g
app = Flask(__name__)
@app.route('/start_thread', methods=['POST'])
def start_thread():
@copy_current_request_context
def crawl(doc_id):
url_template = request.json.get('url_template', '')
url = url_template.format(doc_id=doc_id)
print(f'開(kāi)始爬?。簕url}')
doc_id_list = [123, 456, 789, 111, 222, 333, 444]
pool = Pool(3)
pool.map(crawl, doc_id_list)
return {'success': True, 'msg': '爬取文章成功!'}
運(yùn)行效果如下圖所示:

寫法上整體跟threading.Thread啟動(dòng)簡(jiǎn)單線程的方法差不多。
用類定義線程時(shí)復(fù)制請(qǐng)求上下文
當(dāng)我們額外定義了一個(gè)線程類時(shí),需要把被裝飾的閉包函數(shù)傳入到子線程中,然后在子線程的run()方法中運(yùn)行:
import threading
from flask import Flask, request, copy_current_request_context
app = Flask(__name__)
class Job(threading.Thread):
def __init__(self, func):
super().__init__()
self.func = func
def run(self):
self.func()
@app.route('/start_thread', methods=['POST'])
def start_thread():
@copy_current_request_context
def runner():
doc_id = request.json.get('doc_id', '')
print(f'docId的值是:{doc_id}')
job = Job(runner)
job.start()
return {'success': True, 'msg': '讀取文章成功!'}
運(yùn)行效果如下圖所示:

嵌套子線程復(fù)制請(qǐng)求上下文
有時(shí)候,我們先創(chuàng)建了一個(gè)子線程,然后在子線程中,又需要?jiǎng)?chuàng)建孫線程。并且在孫線程中讀取請(qǐng)求上下文。例如下面的代碼:
import threading
from multiprocessing.dummy import Pool
from flask import Flask, request, copy_current_request_context
app = Flask(__name__)
def deep_func_runner(doc_id_list):
@copy_current_request_context
def deep_func(doc_id):
category = request.args.get('category', '')
url = f'https://www.kingname.info/{category}/{doc_id}'
print(f'開(kāi)始爬?。簕url}')
pool = Pool(3)
pool.map(deep_func, doc_id_list)
@app.route('/start_thread', methods=['POST'])
def start_thread():
@copy_current_request_context
def runner():
doc_id_list = [111, 222, 333, 444, 555, 666, 777, 888, 999]
deep_func_runner(doc_id_list)
job = threading.Thread(target=runner)
job.start()
return {'success': True, 'msg': '讀取文章成功!'}
此時(shí)使用@copy_current_request_context就會(huì)報(bào)您一個(gè)錯(cuò)誤:ValueError: <Token var=<ContextVar name='flask.request_ctx' at 0x103ef69a0> at 0x104446700> was created in a different Context。如下圖所示:

這個(gè)時(shí)候,我們就需要額外再創(chuàng)建一個(gè)裝飾器:
def copy_current_app_context(f):
from flask.globals import _app_ctx_stack
appctx = _app_ctx_stack.top
def _(*args, **kwargs):
with appctx:
return f(*args, **kwargs)
return _
@copy_current_app_context這個(gè)裝飾器需要放到孫線程里面@copy_current_request_context的上面。完整的代碼為:
import threading
from multiprocessing.dummy import Pool
from flask import Flask, request, copy_current_request_context
app = Flask(__name__)
def copy_current_app_context(f):
from flask.globals import _app_ctx_stack
appctx = _app_ctx_stack.top
def _(*args, **kwargs):
with appctx:
return f(*args, **kwargs)
return _
def deep_func_runner(doc_id_list):
@copy_current_app_context
@copy_current_request_context
def deep_func(doc_id):
category = request.args.get('category', '')
url = f'https://www.kingname.info/{category}/{doc_id}'
print(f'開(kāi)始爬取:{url}')
pool = Pool(3)
pool.map(deep_func, doc_id_list)
@app.route('/start_thread', methods=['POST'])
def start_thread():
@copy_current_request_context
def runner():
doc_id_list = [111, 222, 333, 444, 555, 666, 777, 888, 999]
deep_func_runner(doc_id_list)
job = threading.Thread(target=runner)
job.start()
return {'success': True, 'msg': '讀取文章成功!'}
運(yùn)行效果如下圖所示,孫線程也正常啟動(dòng)了:

總結(jié)
- 非必要不在后端中創(chuàng)建子線程
- 創(chuàng)建子線程時(shí),如果能把參數(shù)從外面?zhèn)魅?,就不要讓子線程自己去Flask的上下文讀取
@copy_current_request_context需要裝飾閉包函數(shù),不能裝飾一級(jí)函數(shù)- 嵌套子線程需要同時(shí)使用
@copy_current_app_context和@copy_current_request_context兩個(gè)裝飾器來(lái)裝飾孫線程的閉包函數(shù)
以上就是Flask中嵌套啟動(dòng)子線程的方法示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Flask嵌套啟動(dòng)子線程的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
在Django model中設(shè)置多個(gè)字段聯(lián)合唯一約束的實(shí)例
今天小編就為大家分享一篇在Django model中設(shè)置多個(gè)字段聯(lián)合唯一約束的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-07-07
Python實(shí)現(xiàn)圖像去噪方式(中值去噪和均值去噪)
今天小編就為大家分享一篇Python實(shí)現(xiàn)圖像去噪方式(中值去噪和均值去噪),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-12-12
python反編譯學(xué)習(xí)之字節(jié)碼詳解
這篇文章主要給大家介紹了關(guān)于python反編譯學(xué)習(xí)之字節(jié)碼的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用python具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05
Python多線程編程(七):使用Condition實(shí)現(xiàn)復(fù)雜同步
這篇文章主要介紹了Python多線程編程(七):使用Condition實(shí)現(xiàn)復(fù)雜同步,本文講解通過(guò)很著名的“生產(chǎn)者-消費(fèi)者”模型來(lái)來(lái)演示在Python中使用Condition實(shí)現(xiàn)復(fù)雜同步,需要的朋友可以參考下2015-04-04
Django項(xiàng)目在pycharm新建的步驟方法
在本篇文章里小編給大家整理的是一篇關(guān)于Django項(xiàng)目在pycharm新建的步驟方法,有興趣的朋友們可以學(xué)習(xí)參考下。2021-03-03
深度學(xué)習(xí)的MNIST手寫數(shù)字?jǐn)?shù)據(jù)集識(shí)別方式(準(zhǔn)確率99%,附代碼)
這篇文章主要介紹了深度學(xué)習(xí)的MNIST手寫數(shù)字?jǐn)?shù)據(jù)集識(shí)別方式(準(zhǔn)確率99%,附代碼),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
opencv python簡(jiǎn)易文檔之圖像處理算法
OpenCV是一個(gè)開(kāi)源庫(kù),包含了許多計(jì)算機(jī)視覺(jué)算法,它在計(jì)算機(jī)視覺(jué)和圖像處理中起著重要作用,用于實(shí)時(shí)操作,其效率足以滿足工業(yè)上的要求,這篇文章主要給大家介紹了關(guān)于opencv python簡(jiǎn)易文檔之圖像處理算法的相關(guān)資料,需要的朋友可以參考下2021-08-08
pyspark給dataframe增加新的一列的實(shí)現(xiàn)示例
這篇文章主要介紹了pyspark給dataframe增加新的一列的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04

