Flask框架里面sse的使用示例
可以在一個(gè) Flask 應(yīng)用中注冊(cè)多個(gè) Blueprint,每個(gè) Blueprint 可以對(duì)應(yīng)一個(gè) SSE 接口。例如:
from flask import Flask, Response, Blueprint
import time
app = Flask(__name__)
bp1 = Blueprint('sse1', __name__)
bp2 = Blueprint('sse2', __name__)
@bp1.route('/sse1')
def sse1():
def generate():
while True:
yield 'data: {}\n\n'.format(time.time())
time.sleep(1)
return Response(generate(), mimetype='text/event-stream')
@bp2.route('/sse2')
def sse2():
def generate():
while True:
yield 'data: {}\n\n'.format(time.strftime('%Y-%m-%d %H:%M:%S'))
time.sleep(1)
return Response(generate(), mimetype='text/event-stream')
app.register_blueprint(bp1)
app.register_blueprint(bp2)
if __name__ == '__main__':
app.run(debug=True)在這個(gè)例子中,我們創(chuàng)建了兩個(gè) Blueprint,分別對(duì)應(yīng) `/sse1` 和 `/sse2` 接口。每個(gè) Blueprint 中的函數(shù)都返回一個(gè) SSE 數(shù)據(jù)流,使用 `Response` 類型的響應(yīng)對(duì)象封裝。最后在 Flask 應(yīng)用中注冊(cè)這兩個(gè) Blueprint,即可同時(shí)開(kāi)啟兩個(gè) SSE 接口。
from flask import Flask, Response, request
app = Flask(__name__)
# 存儲(chǔ)所有的訂閱者
subscribers = {}
# 訂閱接口,每個(gè)瀏覽器對(duì)應(yīng)一個(gè)topic
@app.route('/subscribe/<topic>')
def subscribe(topic):
def stream():
# 將當(dāng)前請(qǐng)求的客戶端添加到訂閱者列表中
subscribers.setdefault(topic, []).append(stream)
# 無(wú)限循環(huán),等待新的消息
while True:
# 等待新的消息
message = request.args.get('message')
if message:
# 向所有訂閱者推送新的消息
for subscriber in subscribers.get(topic, []):
subscriber.put(message)
# 休眠一段時(shí)間,減少服務(wù)器壓力
time.sleep(1)
# 設(shè)置響應(yīng)頭,告訴客戶端這是一個(gè)SSE流
response = Response(stream(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['Connection'] = 'keep-alive'
return response
# 推送接口,向指定topic的所有訂閱者推送消息
@app.route('/publish/<topic>')
def publish(topic):
# 獲取要推送的消息
message = request.args.get('message')
if message:
# 向所有訂閱者推送新的消息
for subscriber in subscribers.get(topic, []):
subscriber.put(message)
return 'OK'
在上面的代碼中,我們定義了兩個(gè)接口,`/subscribe/<topic>`用于訂閱指定的topic,`/publish/<topic>`用于向指定topic的所有訂閱者推送消息。
在訂閱接口中,我們將當(dāng)前請(qǐng)求的客戶端添加到訂閱者列表中,并通過(guò)一個(gè)無(wú)限循環(huán)等待新的消息。當(dāng)有新的消息到達(dá)時(shí),我們會(huì)向所有訂閱者推送這條消息。
在推送接口中,我們獲取要推送的消息,并向指定topic的所有訂閱者推送這條消息。
使用這個(gè)代碼,你可以輕松地實(shí)現(xiàn)一個(gè)簡(jiǎn)單的SSE推送服務(wù),每個(gè)瀏覽器對(duì)應(yīng)一個(gè)topic。
import uuid
from flask import Flask, jsonify, request, Response, g
from flask_cors import CORS
from flask_sse import sse
import time
import json
app = Flask(__name__)
cros = CORS(app)
app.config['REDIS_URL'] = 'redis://localhost'
app.register_blueprint(sse, url_prefix='/stream')
# SSE 推送函數(shù)
# SSE 推送路由
@app.route('/register', methods=["GET"])
def register():
# 獲取客戶端標(biāo)識(shí)符
client_id = str(uuid.uuid4())
# 返回 SSE 響應(yīng)
return jsonify({"client_id": client_id})
# SSE 推送路由
@app.route('/sse', methods=['POST'])
def stream():
# 獲取客戶端標(biāo)識(shí)符
data = request.get_json()
client_id = data['clientId']
print("client_id", client_id)
def aa():
# 循環(huán)發(fā)送 SSE 數(shù)據(jù)
for i in range(10):
data = 'Hello, %s!' % client_id+str(i)
sse.publish(data, channel=client_id, type='message')
time.sleep(1)
sse.publish("end", channel=client_id, type='message')
# 返回 SSE 響應(yīng)
response = Response(aa(), mimetype='text/event-stream')
response.headers.add('Cache-Control', 'no-cache')
response.headers.add('Connection', 'keep-alive')
response.headers.add('X-Accel-Buffering', 'no')
return response
if __name__ == '__main__':
app.run(debug=True, port=5000)
在上面的代碼中,我們使用了 Flask-SSE 擴(kuò)展來(lái)管理 SSE 通道。我們?cè)?`/register` 路由中為每個(gè)客戶端創(chuàng)建了唯一的標(biāo)識(shí)符,并將其存儲(chǔ)在請(qǐng)求環(huán)境中。在 `/sse` 路由中,我們使用 `sse.publish` 方法發(fā)送 SSE 數(shù)據(jù)。
到此這篇關(guān)于Flask框架里面sse的使用示例的文章就介紹到這了,更多相關(guān)Flask使用sse內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- python使用Flask框架獲取用戶IP地址的方法
- 使用Python的Flask框架實(shí)現(xiàn)視頻的流媒體傳輸
- Python Web框架Flask下網(wǎng)站開(kāi)發(fā)入門(mén)實(shí)例
- Python的Flask框架與數(shù)據(jù)庫(kù)連接的教程
- python和flask中返回JSON數(shù)據(jù)的方法
- Python的Flask框架中使用Flask-SQLAlchemy管理數(shù)據(jù)庫(kù)的教程
- Python使用Flask框架同時(shí)上傳多個(gè)文件的方法
- Python的Flask框架中實(shí)現(xiàn)簡(jiǎn)單的登錄功能的教程
- Python的Flask框架中實(shí)現(xiàn)分頁(yè)功能的教程
相關(guān)文章
解決使用OpenCV中的imread()內(nèi)存報(bào)錯(cuò)問(wèn)題
這篇文章主要介紹了解決使用OpenCV中的imread()內(nèi)存報(bào)錯(cuò)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-03-03
Python使用Paramiko實(shí)現(xiàn)輕松判斷文件類型
Paramiko是一個(gè)用于SSHv2協(xié)議的Python實(shí)現(xiàn),提供了客戶端和服務(wù)器功能,下面我們就來(lái)看看如何使用Paramiko判斷文件類型,并提取文件的上級(jí)目錄吧2025-03-03
Python使用keys() 獲取 Redis 數(shù)據(jù)庫(kù)中的所有鍵
哈希、列表、集合、排序集合、字符串、JSON 和流是 Redis 支持的眾多數(shù)據(jù)結(jié)構(gòu)之一,本文將討論獲取 Redis 數(shù)據(jù)庫(kù)中的所有鍵,感興趣的朋友跟隨小編一起看看吧2023-08-08
PyTorch之torch.matmul函數(shù)的使用及說(shuō)明
PyTorch的torch.matmul是一個(gè)強(qiáng)大的矩陣乘法函數(shù),支持不同維度張量的乘法運(yùn)算,包括廣播機(jī)制。提供了矩陣乘法的語(yǔ)法,參數(shù)說(shuō)明,以及使用示例,幫助理解其應(yīng)用方式和乘法規(guī)則2024-09-09
Pytorch四維Tensor轉(zhuǎn)圖片并保存方式(維度順序調(diào)整)
這篇文章主要介紹了Pytorch四維Tensor轉(zhuǎn)圖片并保存方式(維度順序調(diào)整),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12
pycharm中沒(méi)有找到database的解決方案
這篇文章主要介紹了pycharm中沒(méi)有找到database的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
Python查找兩個(gè)有序列表中位數(shù)的方法【基于歸并算法】
這篇文章主要介紹了Python查找兩個(gè)有序列表中位數(shù)的方法,結(jié)合實(shí)例形式分析了Python基于歸并算法遍歷、計(jì)算有序列表相關(guān)操作技巧,需要的朋友可以參考下2018-04-04

