一文詳解python如何實(shí)現(xiàn)流式輸出
一、創(chuàng)建fastapi的項(xiàng)目
1.創(chuàng)建虛擬環(huán)境(power shell)
python -m venv venv
2.運(yùn)行虛擬環(huán)境(command prompt)
venv\Scripts\activate
3.虛擬環(huán)境下安裝fastapi
pip install fastapi
4.虛擬環(huán)境下安裝uvicorn
pip install uvicorn
5.虛擬環(huán)境同目錄創(chuàng)建app文件夾
6.app文件夾下創(chuàng)建main.py
7.配置main.py
from fastapi import FastAPI
app=FastAPI()
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)8.測(cè)試
uvicorn app.main:app --reload
9.配置swagger
mian.py 添加如下代碼
from fastapi import applications
from fastapi.openapi.docs import get_swagger_ui_html
def swagger_monkey_patch(*args, **kwargs):
return get_swagger_ui_html(
*args, **kwargs,
swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.1.0/swagger-ui-bundle.min.js",
swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.1.0/swagger-ui.min.css"
)
applications.get_swagger_ui_html = swagger_monkey_patch 10.測(cè)試
訪問(wèn)http://127.0.0.1:8000/docs
二、一個(gè)簡(jiǎn)易循環(huán)發(fā)送
1、main.py代碼如下
app=FastAPI()
@app.get("/",response_class=StreamingResponse)
async def start():
return await send_sse()
app.include_router(sse.router, prefix="/v1")
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)2、sse.py
async def event_generator():
for i in range(5):
# 標(biāo)準(zhǔn) SSE 格式:data: 內(nèi)容\n\n
yield f"data: {{\"message\": \"Data chunk {i}\"}}\n\n".encode()
await asyncio.sleep(1) # 異步非阻塞延遲
# 返回體頭
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
@router.post("/completions")
async def send_sse():
return StreamingResponse(event_generator(), headers=headers)
解釋:整個(gè)代碼很容易理解,調(diào)用send_see函數(shù),其實(shí)工作中復(fù)雜的點(diǎn)在于,你是每一步都在發(fā)送。比如現(xiàn)在流行的fastgpt、dify這些基于工作流構(gòu)建應(yīng)用,每次返回都是節(jié)點(diǎn)信息,將每個(gè)節(jié)點(diǎn)的結(jié)果通過(guò)stream返回。
三、問(wèn)題
1、如何動(dòng)態(tài)實(shí)現(xiàn)發(fā)送?
2、return后我還有其他操作怎么辦?
四、解決思路
題外話:解決問(wèn)題的思路,才是你的成長(zhǎng),一味地依靠AI,永遠(yuǎn)在原地踏步。AI只是輔助,思路才是你的價(jià)值
問(wèn)題一
因?yàn)橹灰猺eturn之后,只能通過(guò)event_generator函數(shù)進(jìn)行操作,所以要讓event_generator這個(gè)函數(shù)的循環(huán)是可控,因?yàn)橥獠繜o(wú)法直接調(diào)用event_generator。所以可以采用一個(gè)隊(duì)列實(shí)現(xiàn)。
1、為什么是隊(duì)列而不是集合?
因?yàn)橐捎藐?duì)列的先進(jìn)先出的思想,保證數(shù)據(jù)的先后順序。
2、是否隊(duì)列為空,整個(gè)循環(huán)就結(jié)束呢?
不是,因?yàn)槊總€(gè)節(jié)點(diǎn)執(zhí)行會(huì)有時(shí)間差,甚至說(shuō)處理的比較慢從而導(dǎo)致,數(shù)據(jù)還沒(méi)進(jìn)隊(duì)列,整個(gè)循環(huán)就已經(jīng)結(jié)束了。
3、循環(huán)結(jié)束的節(jié)點(diǎn)怎么做?
根據(jù)業(yè)務(wù)來(lái)看,因?yàn)榘l(fā)送事件信息是有個(gè)event鑒別數(shù)據(jù)的類(lèi)型,可以通過(guò)這個(gè)確定最后一個(gè)事件是什么從而結(jié)束整個(gè)循環(huán)。如果無(wú)法確定,可以設(shè)置具體的超時(shí)時(shí)間比如10s。
下面是具體的代碼實(shí)現(xiàn),以3s過(guò)期時(shí)間為例子。
async def event_generator(messags:deque):
timeout_seconds = 3 # 從props獲取超時(shí)時(shí)間,默認(rèn)3秒
last_data_time = asyncio.get_running_loop().time()
start = True
while start:
# 標(biāo)準(zhǔn) SSE 格式:data: 內(nèi)容\n\n
if messags:
message = messags.popleft()
yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode()
last_data_time = asyncio.get_running_loop().time() # 重置計(jì)時(shí)器
else:
# 檢查是否超時(shí)
current_time = asyncio.get_running_loop().time()
if current_time - last_data_time > timeout_seconds:
logging.info("消息列表已空 {} 秒,結(jié)束任務(wù)",timeout_seconds)
break
# 無(wú)數(shù)據(jù)時(shí)發(fā)送心跳,避免客戶端斷開(kāi)連接
await asyncio.sleep(0.5) # 降低 CPU 使用率
# 返回體頭
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
messags = deque()
message=0
@router.post("/completions/send")
async def send_sse():
return StreamingResponse(event_generator(messags), headers=headers)
@router.get("/completions/addDeque")
async def send_sse(): # 定義發(fā)送信息,每次對(duì)message+1操作
global message
message=message+1
messags.append(message)
return "ok"效果圖:

問(wèn)題二
流式的想法,是每次調(diào)用把結(jié)果給到前端。那么問(wèn)題是我們寫(xiě)這塊代碼是個(gè)很長(zhǎng)的模塊,在中間會(huì)進(jìn)行流式輸入,如果不return,所有的信息全部進(jìn)了隊(duì)列,最后return其實(shí)是一個(gè)一次性返回,跟流式的理念相違背。那么如何去做,這里可以采用攜程去實(shí)現(xiàn)這個(gè)功能。
我們可以把自己的代碼塊邏輯丟到攜程讓攜程去做。整個(gè)思想邏輯,是用隊(duì)列的延展性實(shí)現(xiàn)流式的輸出。所以我們只需要保證,在發(fā)送的時(shí)候把數(shù)據(jù)給到隊(duì)列就行。代碼如下:
async def event_generator(messages:deque):
timeout_seconds = 3 # 從props獲取超時(shí)時(shí)間,默認(rèn)3秒
last_data_time = asyncio.get_running_loop().time()
start = True
while start:
# 標(biāo)準(zhǔn) SSE 格式:data: 內(nèi)容\n\n
if messages:
message = messages.popleft()
yield f"data: {{\"message\": \"Data chunk {message}\"}}\n\n".encode()
last_data_time = asyncio.get_running_loop().time() # 重置計(jì)時(shí)器
else:
# 檢查是否超時(shí)
current_time = asyncio.get_running_loop().time()
if current_time - last_data_time > timeout_seconds:
logging.info("消息列表已空 {} 秒,結(jié)束任務(wù)",timeout_seconds)
break
# 無(wú)數(shù)據(jù)時(shí)發(fā)送心跳,避免客戶端斷開(kāi)連接
await asyncio.sleep(0.5) # 降低 CPU 使用率
# 返回體頭
headers = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
router = APIRouter()
async def do_something(messages):
# 操作一
message1="節(jié)點(diǎn)1開(kāi)始"
messages.append(message1)
#模擬每次操作耗時(shí)
await asyncio.sleep(0.5)
message2="節(jié)點(diǎn)1answer"
messages.append(message2)
#模擬每次操作耗時(shí)
await asyncio.sleep(0.5)
message2="節(jié)點(diǎn)1結(jié)束"
messages.append(message2)
#模擬每次操作耗時(shí)
await asyncio.sleep(0.5)
@router.post("/completions/send")
async def send_sse():
messages = deque()
asyncio.create_task(do_something(messages))
return StreamingResponse(event_generator(messages), headers=headers)注:messages是局部變量,這樣可以保證,線程安全。
結(jié)果如下:

以上就是一文詳解python如何實(shí)現(xiàn)流式輸出的詳細(xì)內(nèi)容,更多關(guān)于python流式輸出的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python實(shí)現(xiàn)將Unicode轉(zhuǎn)換為ASCII
這篇文章主要為大家詳細(xì)介紹了系統(tǒng)編碼的不同方法以及如何利用Python實(shí)現(xiàn)將Unicode轉(zhuǎn)換為?ASCII,文中的示例代碼講解詳細(xì),有需要的小伙伴可以學(xué)習(xí)一下2023-10-10
python接口自動(dòng)化之使用token傳入到header消息頭中
這篇文章主要介紹了python接口自動(dòng)化之使用token傳入到header消息頭中問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-08-08
python pip安裝包出現(xiàn):Failed building wheel for xxx錯(cuò)誤的解決
今天小編就為大家分享一篇python pip安裝包出現(xiàn):Failed building wheel for xxx錯(cuò)誤的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-12-12
python入門(mén)學(xué)習(xí)之自帶help功能初步使用示例
這篇文章主要為大家介紹了python入門(mén)學(xué)習(xí)自帶help功能初步使用示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03
windows下cx_Freeze生成Python可執(zhí)行程序的詳細(xì)步驟
這篇文章主要介紹了windows下cx_Freeze生成Python可執(zhí)行程序的詳細(xì)步驟,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-10-10
Django1.9 加載通過(guò)ImageField上傳的圖片方法
今天小編就為大家分享一篇Django1.9 加載通過(guò)ImageField上傳的圖片方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-05-05
python優(yōu)化測(cè)試穩(wěn)定性的失敗重試工具pytest-rerunfailures詳解
筆者在執(zhí)行自動(dòng)化測(cè)試用例時(shí),會(huì)發(fā)現(xiàn)有時(shí)候用例失敗并非代碼問(wèn)題,而是由于服務(wù)正在發(fā)版,導(dǎo)致請(qǐng)求失敗,從而降低了自動(dòng)化用例的穩(wěn)定性,那該如何增加失敗重試機(jī)制呢?帶著問(wèn)題我們一起探索2023-10-10

