Python自定義主從分布式架構(gòu)實例分析
本文實例講述了Python自定義主從分布式架構(gòu)。分享給大家供大家參考,具體如下:
環(huán)境:Win7 x64,Python 2.7,APScheduler 2.1.2。
原理圖如下:

代碼部分:
(1)、中心節(jié)點:
#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心節(jié)點(主要功能是分配任務(wù))
import SocketServer, socket, Queue
CenterIP = '127.0.0.1' #中心節(jié)點IP
CenterListenPort = 9999 #中心節(jié)點監(jiān)聽端口
CenterClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #中心節(jié)點用于發(fā)送網(wǎng)絡(luò)消息的socket
TaskQueue = Queue.Queue() #任務(wù)隊列
#獲取任務(wù)隊列
def GetTaskQueue():
for i in range(1, 11):
TaskQueue.put(str(i))
#CenterServer的回調(diào)函數(shù),在接受到udp報文是觸發(fā)
class MyUDPHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print(data)
if data.startswith('wait'):
vec = data.split(':')
if len(vec) != 3:
print('Error: len(vec) != 3')
else:
nodeIP = vec[1]
nodeListenPort = vec[2]
nodeID = nodeIP + ':' + nodeListenPort
if not TaskQueue.empty():
task = TaskQueue.get()
print('send task ' + task + ' to ' + nodeID)
CenterClient.sendto('task:' + task, (nodeIP, int(nodeListenPort)))
else:
print('TaskQueue is empty!')
GetTaskQueue() #獲取任務(wù)隊列
CenterServer = SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print('Listen port ' + str(CenterListenPort) + ' ...')
CenterServer.serve_forever()
(2)、任務(wù)節(jié)點:
#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任務(wù)節(jié)點(請求/接收/執(zhí)行任務(wù))
import time, socket, SocketServer
from apscheduler.scheduler import Scheduler
CenterIP = '127.0.0.1' #中心節(jié)點IP
CenterListenPort = 9999 #中心節(jié)點監(jiān)聽端口
NodeIP = socket.gethostbyname(socket.gethostname()) #任務(wù)節(jié)點自身IP
NodeClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #任務(wù)節(jié)點用于發(fā)送網(wǎng)絡(luò)消息的socket
#任務(wù):發(fā)送網(wǎng)絡(luò)信息
def jobSendNetMsg():
msg = ''
if NodeServer.TaskState == 'wait':
msg = 'wait:' + NodeIP + ':' + str(NodeListenPort)
elif NodeServer.TaskState == 'exec':
msg = 'exec:' + NodeIP + ':' + str(NodeListenPort)
print(msg)
NodeClient.sendto(msg, (CenterIP, CenterListenPort))
#添加并啟動定時任務(wù)
def InitTimer():
sched = Scheduler()
sched.add_interval_job(jobSendNetMsg, seconds=1)
sched.start()
#執(zhí)行任務(wù)
def ExecTask(task):
print('ExecTask ' + task + ' ...')
time.sleep(2)
print('ExecTask ' + task + ' over')
#NodeServer的回調(diào)函數(shù),在接受到udp報文是觸發(fā)
class MyUDPHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print('recv data: ' + data)
if data.startswith('task'):
vec = data.split(':')
if len(vec) != 2:
print('Error: len(vec) != 2')
else:
task = vec[1]
self.server.TaskState = 'exec'
ExecTask(task)
self.server.TaskState = 'wait'
InitTimer()
NodeServer = SocketServer.UDPServer(('', 0), MyUDPHandler)
NodeServer.TaskState = 'wait' #(exec/wait)
NodeListenPort = NodeServer.server_address[1]
print('NodeListenPort:' + str(NodeListenPort))
NodeServer.serve_forever()
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python URL操作技巧總結(jié)》、《Python圖片操作技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python Socket編程技巧總結(jié)》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對大家Python程序設(shè)計有所幫助。
相關(guān)文章
Python實現(xiàn)GUI學(xué)生管理系統(tǒng)的示例代碼
這篇文章主要為大家介紹了如何留Python語言實現(xiàn)簡易的GUI學(xué)生管理系統(tǒng),文中的示例代碼講解詳細,對我們學(xué)習(xí)Python有一定幫助,需要的可以參考下2022-06-06
python實現(xiàn)對文件中圖片生成帶標簽的txt文件方法
下面小編就為大家分享一篇python實現(xiàn)對文件中圖片生成帶標簽的txt文件方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04
使用Python和大模型進行數(shù)據(jù)分析和文本生成
Python語言以其簡潔和強大的特性,成為了數(shù)據(jù)科學(xué)、機器學(xué)習(xí)和人工智能開發(fā)的首選語言之一,在這篇文章中,我將介紹如何用Python連接和使用大模型,并通過示例展示如何在實際項目中應(yīng)用這些技術(shù),需要的朋友可以參考下2024-05-05
Python采集大學(xué)教務(wù)系統(tǒng)成績單實戰(zhàn)示例
這篇文章主要為大家介紹了Python采集大學(xué)教務(wù)系統(tǒng)成績單實戰(zhàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-04-04
python爬蟲實戰(zhàn)steam加密逆向RSA登錄解析
今天帶來爬蟲實戰(zhàn)的文章。在挑選游戲的過程中感受學(xué)習(xí),讓你突飛猛進。本文主要實現(xiàn)用Python逆向登錄世界上最大的游戲平臺源碼分享,了解steam加密手段有多高明2021-10-10
關(guān)于Python中 循環(huán)器 itertools的介紹
循環(huán)器是對象的容器,包含有多個對象。通過調(diào)用循環(huán)器的next()方法 (__next__()方法,在Python 3.x中),循環(huán)器將依次返回一個對象。直到所有的對象遍歷窮盡,循環(huán)器將舉出StopIteration錯誤。這篇文章將對此做一個詳細介紹,感興趣的小伙伴請參考下面文字內(nèi)容2021-09-09

