Python中functools.partial設(shè)置回調(diào)函數(shù)處理異步任務(wù)使用
一. 前言
在Python中,回調(diào)函數(shù)是指在一個函數(shù)執(zhí)行完成后,調(diào)用另一個函數(shù)的過程。通常情況下,回調(diào)函數(shù)作為參數(shù)傳遞給原始函數(shù),原始函數(shù)在執(zhí)行完自己的邏輯后,會自動調(diào)用回調(diào)函數(shù)并將結(jié)果作為參數(shù)傳遞給它。
二. 回調(diào)函數(shù)基本使用
以下是在Python中設(shè)置回調(diào)函數(shù)的基本步驟:
- 定義回調(diào)函數(shù),確定回調(diào)函數(shù)的參數(shù)列表和返回值(如果有)。
- 在原始函數(shù)中,將回調(diào)函數(shù)作為參數(shù)傳遞給需要回調(diào)的函數(shù)。
- 在原始函數(shù)內(nèi)部的適當(dāng)位置調(diào)用回調(diào)函數(shù),將需要傳遞的參數(shù)傳遞給它。
例如,假設(shè)我們需要設(shè)置一個回調(diào)函數(shù)來處理異步操作的結(jié)果,可以按如下方式進(jìn)行設(shè)置:
# 定義回調(diào)函數(shù)
def callback(result):
print('Callback function is called with result: ', result)
# 異步函數(shù),需要傳入回調(diào)函數(shù)
def async_function(param1, param2, callback):
# 進(jìn)行異步操作
result = param1 + param2
# 異步操作完成后調(diào)用回調(diào)函數(shù)
callback(result)
# 調(diào)用異步函數(shù),并傳入回調(diào)函數(shù)
async_function(1, 2, callback)
運(yùn)行結(jié)果

在上面的代碼中,我們先定義了一個回調(diào)函數(shù)callback,然后在異步函數(shù)async_function中將該函數(shù)作為參數(shù)傳遞,并在異步操作完成后調(diào)用回調(diào)函數(shù),將操作結(jié)果傳遞給它。
通常情況下,我們會將回調(diào)函數(shù)定義為一個可調(diào)用對象,也就是實(shí)現(xiàn)了__call__方法的類對象。使用這種方式,可以更加靈活地定義回調(diào)函數(shù),并且可以把一些狀態(tài)或上下文信息存儲在對象中,在回調(diào)函數(shù)中使用。
三. 進(jìn)階 - 使用functools.partial傳遞函數(shù)
1. functools.partial基本介紹
functools.partial 是 Python 標(biāo)準(zhǔn)庫中的一個函數(shù),用來部分應(yīng)用一個函數(shù)(partial application),也就是固定函數(shù)的一部分參數(shù),返回一個新的函數(shù)。partial 函數(shù)的用法如下:
functools.partial(func, *args, **kwargs)
其中,func 是要部分應(yīng)用的函數(shù),*args 和 **kwargs 是要固定的參數(shù)。
具體來說,partial 函數(shù)會返回一個新的函數(shù)對象,這個新的函數(shù)對象跟原來的函數(shù)對象是相似的,但是將部分參數(shù)固定下來了,相當(dāng)于原來的函數(shù)變成了一個帶有默認(rèn)參數(shù)的函數(shù)。我們可以用這個新的函數(shù)對象來調(diào)用原來的函數(shù),而不必傳入那些已經(jīng)固定的參數(shù)。
下面是一個簡單的示例代碼,演示如何使用 partial 函數(shù):
import functools
# 定義一個簡單的加法函數(shù)
def add(a, b):
return a + b
# 固定 add 函數(shù)的第一個參數(shù)
add2 = functools.partial(add, 2)
# 調(diào)用 add2 函數(shù)
print(add2(3)) # 輸出:5
在上面的示例代碼中,我們定義了一個簡單的加法函數(shù) add,然后使用 partial 函數(shù)將 add 函數(shù)的第一個參數(shù)固定為 2,得到一個新的函數(shù)對象 add2。接下來,我們使用 add2 函數(shù)來計算 2+3 的結(jié)果,并將結(jié)果輸出到控制臺。
使用 partial 函數(shù)的好處是可以更方便地定義新的函數(shù),避免代碼重復(fù)。比如,如果我們想定義一個加 3、加 4、加 5 的函數(shù),可以使用 partial 來實(shí)現(xiàn),而不必寫多個類似的函數(shù)。
add3 = functools.partial(add, 3) add4 = functools.partial(add, 4) add5 = functools.partial(add, 5) print(add3(2)) # 輸出:5 print(add4(2)) # 輸出:6 print(add5(2)) # 輸出:7
在上面的示例代碼中,使用 partial 函數(shù)分別定義了 3 個新的函數(shù) add3、add4、add5,分別將加數(shù)固定為 3、4、5,然后使用這些新的函數(shù)計算 2+3、2+4、2+5 的結(jié)果,并將結(jié)果輸出到控制臺。
2. functools.partial進(jìn)階使用示例代碼
接下來我們看一個socket連接成功之后調(diào)用回調(diào)函數(shù)的例子:
1. 啟動一個socket server服務(wù)
import json
import os
import socket
import threading
import time
import sys
import traceback
HOST = '127.0.0.1' # 服務(wù)器IP地址
PORT = 8000 # 服務(wù)器端口號
BACKLOG = 5 # 服務(wù)器監(jiān)聽隊(duì)列大小,即最多同時接收多少個客戶端連接
RECONNECT_INTERVAL = 5 # 重連間隔,單位:秒
def start_server():
print(os.getpid())
while True:
try:
# 創(chuàng)建一個 TCP/IP socket 對象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 綁定服務(wù)器 IP 地址和端口號
server_socket.bind((HOST, PORT))
# 開始監(jiān)聽客戶端連接請求
server_socket.listen(BACKLOG)
print('服務(wù)器啟動,監(jiān)聽端口:%s' % PORT)
while True:
# 等待客戶端連接
print('等待客戶端連接...')
try:
client_socket, client_address = server_socket.accept()
threading.Thread(target=send_msg, args=(client_socket, client_address)).start()
# send_msg(client_socket, client_address)
print(f"Process {threading.current_thread()}: {threading.active_count()} threads")
print(f"Total threads: {threading.active_count()}")
print('新客戶端連接,地址:%s' % str(client_address))
# 讀取客戶端發(fā)送的數(shù)據(jù)
data = client_socket.recv(1024)
print('Received data:', data.decode())
# 向客戶端發(fā)送數(shù)據(jù)
message = 'Welcome to my server!'
client_socket.sendall(message.encode())
except Exception as e:
print('客戶端連接異常,錯誤信息:%s' % e)
finally:
# 關(guān)閉客戶端連接
client_socket.close()
print('客戶端連接已關(guān)閉')
except Exception as e:
print('服務(wù)器異常,錯誤信息:%s' % e)
traceback.print_exc()
# 關(guān)閉服務(wù)端 socket
server_socket.close()
print('{}s后嘗試重連服務(wù)器...'.format(RECONNECT_INTERVAL))
time.sleep(RECONNECT_INTERVAL)
def send_msg(client, addr):
try:
while 1:
time.sleep(1)
jsonTemplate = {
"Command": "FORWARD_ELEV_INFO",
"DeviceId": "C0002T",
"ElevId": 1,
}
msg2Elev = json.dumps(jsonTemplate).encode() + "\n".encode()
client.sendto(msg2Elev, addr)
print('send msg to client:{}:{}'.format(addr, msg2Elev))
except Exception as e:
print('send_msg:{}'.format(e))
if __name__ == '__main__':
# 啟動服務(wù)器
start_server()
2. 開啟一個客戶端連接,連接成功后調(diào)用回調(diào)函數(shù)
import functools
import json
import socket
import threading
import time
import traceback
class TestClient(threading.Thread):
def __init__(self, connectHost, connectPort, callbackFunc):
threading.Thread.__init__(self, name="TestClient")
self.host = connectHost
self.port = connectPort
self.callbackFunc = callbackFunc
self.sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def run(self):
self.connect()
while True:
try:
# 從socket中讀取數(shù)據(jù)
# data = self.sck.recv(1024)
# print(data)
data = self.recv_msg(self.sck)
if data is None:
time.sleep(1)
continue
self.callbackFunc(data)
except OSError:
# An operation was attempted on something that is not a socket
traceback.print_exc()
time.sleep(5)
# FIXME: if socket is broken, reconnect with the same sck does not work, so create an new one.
self.sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect()
except Exception as e:
# TODO: if disconnected, connect it
traceback.print_exc()
time.sleep(5)
self.connect()
def recv_msg(self, sock):
try:
data = sock.recv(1024)
print('recv data:{}'.format(data))
return data
except Exception as e:
print('recv_msg:{}'.format(e))
sock.close()
time.sleep(0.5)
def connect(self):
while True:
try:
self.sck.connect((self.host, self.port))
print("connected to Service {}:{}".format(self.host, self.port))
break
except ConnectionRefusedError:
print("service refused or not started? Reconnecting to Service in 5s")
time.sleep(5)
except Exception as e:
print("connect error type->{}".format(type(e)))
traceback.print_exc()
# FIXME: if other exception, not sure to restart action will work.
time.sleep(5)
def callbackFunc(a, res):
print(a)
print('callback msg -- >', res)
if __name__ == '__main__':
connectHost = '127.0.0.1'
connectPort = 8000
callbackFunc = callbackFunc
elevClient = TestClient(connectHost, connectPort, functools.partial(callbackFunc, 'hello callback'))
elevClient.start()
上面的程序定義了一個回調(diào)函數(shù)callbackFunc,在socket連接完成后將其作為參數(shù)傳給TestClient類的構(gòu)造函數(shù),用于處理接收到的消息,通過回調(diào)方式處理從服務(wù)端發(fā)送回來的數(shù)據(jù)。
運(yùn)行結(jié)果

到此這篇關(guān)于Python中functools.partial設(shè)置回調(diào)函數(shù)處理異步任務(wù)使用的文章就介紹到這了,更多相關(guān)Python functools.partial異步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python?datacompy?找出兩個DataFrames不同的地方
本文主要介紹了Python?datacompy?找出兩個DataFrames不同的地方,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧<BR>2022-05-05

