實例分析python3實現(xiàn)并發(fā)訪問水平切分表
場景說明
假設(shè)有一個mysql表被水平切分,分散到多個host中,每個host擁有n個切分表。
如果需要并發(fā)去訪問這些表,快速得到查詢結(jié)果, 應(yīng)該怎么做呢?
這里提供一種方案,利用python3的asyncio異步io庫及aiomysql異步庫去實現(xiàn)這個需求。
代碼演示
import logging
import random
import asynciofrom aiomysql
import create_pool
# 假設(shè)mysql表分散在8個host, 每個host有16張子表
TBLES = { "192.168.1.01": "table_000-015",
# 000-015表示該ip下的表明從table_000一直連續(xù)到table_015
"192.168.1.02": "table_016-031",
"192.168.1.03": "table_032-047",
"192.168.1.04": "table_048-063",
"192.168.1.05": "table_064-079",
"192.168.1.06": "table_080-095",
"192.168.1.07": "table_096-0111",
"192.168.1.08": "table_112-0127",
}
USER = "xxx"PASSWD = "xxxx"# wrapper函數(shù),用于捕捉異常def query_wrapper(func):
async def wrapper(*args, **kwargs):
try:
await func(*args, **kwargs) except Exception as e:
print(e) return wrapper
# 實際的sql訪問處理函數(shù),通過aiomysql實現(xiàn)異步非阻塞請求@
query_wrapperasync def query_do_something(ip, db, table):
async with create_pool(host=ip, db=db, user=USER, password=PASSWD) as pool:
async with pool.get() as conn:
async with conn.cursor() as cur:
sql = ("select xxx from {} where xxxx")
await cur.execute(sql.format(table))
res = await cur.fetchall()
# then do something...# 生成sql訪問隊列, 隊列的每個元素包含要對某個表進行訪問的函數(shù)及參數(shù)def gen_tasks():
tasks = [] for ip, tbls in TBLES.items():
cols = re.split('_|-', tbls)
tblpre = "_".join(cols[:-2])
min_num = int(cols[-2])
max_num = int(cols[-1])
for num in range(min_num, max_num+1):
tasks.append(
(query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num))
)
random.shuffle(tasks)
return tasks# 按批量運行sql訪問請求隊列def run_tasks(tasks, batch_len):
try:
for idx in range(0, len(tasks), batch_len):
batch_tasks = tasks[idx:idx+batch_len]
logging.info("current batch, start_idx:%s len:%s" % (idx, len(batch_tasks)))
for i in range(0, len(batch_tasks)):
l = batch_tasks[i]
batch_tasks[i] = asyncio.ensure_future(
l[0](*l[1:])
)
loop.run_until_complete(asyncio.gather(*batch_tasks))
except Exception as e:
logging.warn(e)# main方法, 通過asyncio實現(xiàn)函數(shù)異步調(diào)用def main():
loop = asyncio.get_event_loop()
tasks = gen_tasks()
batch_len = len(TBLES.keys()) * 5 # all up to you
run_tasks(tasks, batch_len)
loop.close()
以上就是本次相關(guān)內(nèi)容的全部實例代碼,大家可以本地測試以下,感謝你對腳本之家的支持。
- 基于python實現(xiàn)對文件進行切分行
- Python實現(xiàn)filter函數(shù)實現(xiàn)字符串切分
- Python 等分切分數(shù)據(jù)及規(guī)則命名的實例代碼
- python按比例隨機切分數(shù)據(jù)的實現(xiàn)
- Python 最大概率法進行漢語切分的方法
- 分享Python切分字符串的一個不錯方法
- python實現(xiàn)根據(jù)文件關(guān)鍵字進行切分為多個文件的示例
- Python數(shù)據(jù)集切分實例
- Ubuntu下使用Python實現(xiàn)游戲制作中的切分圖片功能
- python實現(xiàn)按行切分文本文件的方法
- Python自然語言處理之切分算法詳解
相關(guān)文章
Flask框架運用Ajax實現(xiàn)數(shù)據(jù)交互的示例代碼
使用Ajax技術(shù)網(wǎng)頁應(yīng)用能夠快速地將增量更新呈現(xiàn)在用戶界面上,而不需要重載刷新整個頁面,這使得程序能夠更快地回應(yīng)用戶的操作,本文將簡單介紹使用AJAX如何實現(xiàn)前后端數(shù)據(jù)通信2022-11-11
django使用定時任務(wù)django_apscheduler的實現(xiàn)
定時任務(wù)無論是個人開發(fā)還是企業(yè)業(yè)務(wù)都是需要的,本文主要介紹了django使用定時任務(wù)django_apscheduler的實現(xiàn),減少請求時需要用戶等待的時間,感興趣的可以了解一下2021-08-08
python自定義函數(shù)def的應(yīng)用詳解
這篇文章主要介紹了python自定義函數(shù)def的應(yīng)用詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06
Python編程django實現(xiàn)同一個ip十分鐘內(nèi)只能注冊一次
這篇文章主要介紹了Python編程django實現(xiàn)同一個ip十分鐘內(nèi)只能注冊一次的相關(guān)內(nèi)容,具有一定參考價值。需要的朋友可以了解下。2017-11-11
Python Pickling 和 Unpickling 的區(qū)別
Python中的Pickling和Unpickling是與數(shù)據(jù)序列化和反序列化相關(guān)的重要概念,本文主要介紹了Python Pickling和Unpickling的區(qū)別,具有一定的參考價值,感興趣的可以了解一下2023-11-11

