python使用redis實現(xiàn)消息隊列(異步)的實現(xiàn)完整例程
最近在用fastapi框架開發(fā)web后端,由于近幾年python異步編程大火,fastapi憑借高性能也火了起來。本篇介紹了在異步環(huán)境下實現(xiàn)redis消息隊列的方法,代碼可以直接拷貝到fastapi中使用。
安裝相關(guān)庫
pip install aioredis
消息隊列實現(xiàn)及使用
我們使用redis的stream類型作為消息隊列的載體
首先我們創(chuàng)建一個目錄作為項目目錄:works/
創(chuàng)建配置文件
在項目根目錄下新建文件works/.env
在文件中寫入
export APP_ENV=development export REDIS_URL="192.168.70.130/" export REDIS_USER= export REDIS_PASSWORD= export REDIS_HOST="192.168.70.130" export REDIS_PORT=6379
代碼實現(xiàn)
在項目目錄下創(chuàng)建py文件works/main.py
import os
from dotenv import load_dotenv
import aioredis
import asyncio
load_dotenv()
class Redis():
? ? def __init__(self):
? ? ? ? """initialize ?connection """
? ? ? ? self.REDIS_URL = os.environ['REDIS_URL']
? ? ? ? self.REDIS_PASSWORD = os.environ['REDIS_PASSWORD']
? ? ? ? self.REDIS_USER = os.environ['REDIS_USER']
? ? ? ? self.connection_url = f"redis://{self.REDIS_USER}:{self.REDIS_PASSWORD}@{self.REDIS_URL}"
? ? ? ? self.REDIS_HOST = os.environ['REDIS_HOST']
? ? ? ? self.REDIS_PORT = os.environ['REDIS_PORT']
? ? ? ??
? ? async def create_connection(self):
? ? ? ? self.connection = aioredis.from_url(
? ? ? ? ? ? self.connection_url, db=0)
? ? ? ? return self.connection
class Producer:
? ? def __init__(self, redis_client):
? ? ? ? self.redis_client = redis_client
? ? async def add_to_stream(self, ?data: dict, stream_channel):
? ? ? ? """將一條數(shù)據(jù)添加到隊列
? ? ? ? Args:
? ? ? ? ? ? data (dict): _description_
? ? ? ? ? ? stream_channel (_type_): _description_
? ? ? ? Returns:
? ? ? ? ? ? _type_: _description_
? ? ? ? """
? ? ? ? try:
? ? ? ? ? ? msg_id = await self.redis_client.xadd(name=stream_channel, id="*", fields=data)
? ? ? ? ? ? print(f"Message id {msg_id} added to {stream_channel} stream")
? ? ? ? ? ? return msg_id
? ? ? ? except Exception as e:
? ? ? ? ? ? raise Exception(f"Error sending msg to stream => {e}")
class StreamConsumer:
? ? def __init__(self, redis_client):
? ? ? ? self.redis_client = redis_client
? ? async def consume_stream(self, count: int, block: int, ?stream_channel):
? ? ? ? """讀取隊列中的消息,但是并不刪除
? ? ? ? Args:
? ? ? ? ? ? count (int): _description_
? ? ? ? ? ? block (int): _description_
? ? ? ? ? ? stream_channel (_type_): _description_
? ? ? ? Returns:
? ? ? ? ? ? _type_: _description_
? ? ? ? """
? ? ? ? response = await self.redis_client.xread(
? ? ? ? ? ? streams={stream_channel: ?'0-0'}, count=count, block=block)
? ? ? ? return response
? ? async def delete_message(self, stream_channel, message_id):
? ? ? ? """成功消費數(shù)據(jù)后,調(diào)用此函數(shù)刪除隊列數(shù)據(jù)
? ? ? ? Args:
? ? ? ? ? ? stream_channel (_type_): _description_
? ? ? ? ? ? message_id (_type_): _description_
? ? ? ? """
? ? ? ? await self.redis_client.xdel(stream_channel, message_id)
async def main():
? ? redis_conn = await Redis().create_connection()
? ? produce = Producer(redis_conn)
? ? consumer = StreamConsumer(redis_conn)
? ? # 添加一個消息到隊列中
? ? data = {'xiaoming4':123}
? ? await produce.add_to_stream(data=data,stream_channel='message_channel')
? ??
? ? # 從隊列中拿出最新的1條數(shù)據(jù)
? ? data = await consumer.consume_stream(1,block=0,stream_channel='message_channel')
? ? print(data)
? ??
? ? # 輪詢等待隊列中的新消息
? ? response = await consumer.consume_stream(stream_channel="message_channel", count=1, block=0)
? ? if response:
? ? ? ? for stream, messagees in response:
? ? ? ? ? ? print('stream:',stream)
? ? ? ? ? ? for message in messagees:
? ? ? ? ? ? ? ? print('message: ',message)
? ? ? ? ? ? ? ? message_id = message[0]
? ? ? ? ? ? ? ? print('message_id: ',message_id)
? ? ? ? ? ? ? ? message_content = message[1]
? ? ? ? ? ? ? ? print('message_content: ',message_content)
? ? ? ? ? ? ? ? print('注意里面的鍵、值都變成了byte類型,需要進(jìn)行解碼:')
? ? ? ? ? ? ? ? message_content:dict
? ? ? ? ? ? ? ? print('message_content_decode: ',{k.decode('utf-8'):v.decode('utf-8') for k,v in message_content.items()})
? ? # 消費成功后刪除隊列中的消息
? ? await consumer.delete_message(
? ? ? ? stream_channel='message_channel',message_id=message_id
? ? ) ? ?
if __name__ == '__main__':
? ? asyncio.run(main())非常簡單好用,啟動一下看看吧
到此這篇關(guān)于python使用redis實現(xiàn)消息隊列(異步)的實現(xiàn)完整例程的文章就介紹到這了,更多相關(guān)python redis消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實現(xiàn)消息隊列的多種方法小結(jié)
- 一文詳解消息隊列中為什么不用redis作為隊列
- SpringBoot集成Redisson實現(xiàn)消息隊列的示例代碼
- redis?消息隊列完成秒殺過期訂單處理方法(一)
- 如何使用?redis?消息隊列完成秒殺過期訂單處理操作(二)
- Redis高階使用消息隊列分布式鎖排行榜等(高階用法)
- Redis消息隊列的三種實現(xiàn)方式
- Redis使用ZSET實現(xiàn)消息隊列的項目實踐
- Redis使用ZSET實現(xiàn)消息隊列使用小結(jié)
- 詳解Redis Stream做消息隊列
- 基于Redis實現(xiàn)消息隊列的示例代碼
相關(guān)文章
基于Python實現(xiàn)簡易學(xué)生信息管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了python實現(xiàn)簡易學(xué)生信息管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-07-07
python數(shù)據(jù)分析之如何刪除value=0的行
這篇文章主要給大家介紹了關(guān)于python數(shù)據(jù)分析之如何刪除value=0的行的相關(guān)資料,文中通過實例代碼以及圖文介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Python具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2022-12-12
pymysql之cur.fetchall() 和cur.fetchone()用法詳解
這篇文章主要介紹了pymysql之cur.fetchall() 和cur.fetchone()用法詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05
Python Django網(wǎng)頁界面協(xié)同過濾推薦算法實現(xiàn)商品管理與推薦
商品管理與推薦系統(tǒng),本系統(tǒng)使用Python作為主要開發(fā)語言,前端采用HTML、CSS、BootStrap等技術(shù)搭建顯示界面,后端采用Django框架處理用戶的請求響應(yīng)2023-11-11
Python學(xué)習(xí)之異常中的finally使用詳解
這篇文章主要為大家介紹一下Python異常語法中的另一個成員—finally,通過學(xué)習(xí)finally,可以幫助我們更好的處理異常,快來跟隨小編一起學(xué)習(xí)一下吧2022-03-03
Python?ORM數(shù)據(jù)庫框架Sqlalchemy的使用教程詳解
對象關(guān)系映射(Object?Relational?Mapping,簡稱ORM)模式是一種為了解決面向?qū)ο笈c關(guān)系數(shù)據(jù)庫存在的互不匹配的現(xiàn)象的技術(shù)。本文主要介紹了其使用的相關(guān)資料,感興趣的小伙伴可以學(xué)習(xí)一下2022-10-10
python聚類算法解決方案(rest接口/mpp數(shù)據(jù)庫/json數(shù)據(jù)/下載圖片及數(shù)據(jù))
這篇文章主要介紹了python聚類算法解決方案(rest接口/mpp數(shù)據(jù)庫/json數(shù)據(jù)/下載圖片及數(shù)據(jù)),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-08-08

