python分布式編程實(shí)現(xiàn)過程解析
分布式編程的難點(diǎn)在于:
1.服務(wù)器之間的通信,主節(jié)點(diǎn)如何了解從節(jié)點(diǎn)的執(zhí)行進(jìn)度,并在從節(jié)點(diǎn)之間進(jìn)行負(fù)載均衡和任務(wù)調(diào)度;
2.如何讓多個(gè)服務(wù)器上的進(jìn)程訪問同一資源的不同部分進(jìn)行執(zhí)行
第一部分涉及到網(wǎng)絡(luò)編程的底層細(xì)節(jié)
第二個(gè)問題讓我聯(lián)想到hdfs的一些功能。
首先分布式進(jìn)程還是解決的是單機(jī)單進(jìn)程無法處理的大數(shù)據(jù)量大計(jì)算量的問題,希望能加通過一份代碼(最多主+從兩份)來并行執(zhí)行一個(gè)大任務(wù)。
這就面臨兩個(gè)問題,首先將程序分布到多臺(tái)服務(wù)器,其次將輸入數(shù)據(jù)分配給多臺(tái)服務(wù)器。
第一個(gè)問題相對(duì)比較簡單,畢竟程序一般不會(huì)太長,即便是超級(jí)jar包的spark程序,也不過百兆。
但數(shù)據(jù)里不同,如今企業(yè)級(jí)別的數(shù)據(jù)動(dòng)輒GB、TB,如果在分布式程序執(zhí)行之前首先要進(jìn)行大容量數(shù)據(jù)的轉(zhuǎn)移,顯然是不可取的。
這時(shí)候我們就需要一個(gè)中央共享數(shù)據(jù)源,所有服務(wù)器都可以對(duì)這個(gè)數(shù)據(jù)源進(jìn)行并行存?。▔Kblock),這就已經(jīng)非常接近hdfs的功能。
因?yàn)樵趆dfs中,集群中的多臺(tái)服務(wù)器共享同一個(gè)hdfs,每臺(tái)機(jī)器訪問hdfs就像訪問本地?cái)?shù)據(jù)一樣(還是稍微慢一點(diǎn));
計(jì)算任務(wù)執(zhí)行完之后,每臺(tái)服務(wù)器還可以將自己的計(jì)算結(jié)果寫回hdfs,每臺(tái)服務(wù)器的結(jié)果被存儲(chǔ)成了結(jié)果目錄中的小文件。
# task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 發(fā)送任務(wù)的隊(duì)列:
task_queue = queue.Queue()
# 接收結(jié)果的隊(duì)列:
result_queue = queue.Queue()
# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
pass
# 把兩個(gè)Queue都注冊(cè)到網(wǎng)絡(luò)上, callable參數(shù)關(guān)聯(lián)了Queue對(duì)象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設(shè)置驗(yàn)證碼'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啟動(dòng)Queue:
manager.start()
# 獲得通過網(wǎng)絡(luò)訪問的Queue對(duì)象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個(gè)任務(wù)進(jìn)去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 從result隊(duì)列讀取結(jié)果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 關(guān)閉:
manager.shutdown()
print('master exit.')
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 創(chuàng)建類似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于這個(gè)QueueManager只從網(wǎng)絡(luò)上獲取Queue,所以注冊(cè)時(shí)只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 連接到服務(wù)器,也就是運(yùn)行task_master.py的機(jī)器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗(yàn)證碼注意保持與task_master.py設(shè)置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網(wǎng)絡(luò)連接:
m.connect()
# 獲取Queue的對(duì)象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊(duì)列取任務(wù),并把結(jié)果寫入result隊(duì)列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 處理結(jié)束:
print('worker exit.')
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
淺談Django中的數(shù)據(jù)庫模型類-models.py(一對(duì)一的關(guān)系)
今天小編就為大家分享一篇淺談Django中的數(shù)據(jù)庫模型類-models.py(一對(duì)一的關(guān)系),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-05-05
Python采集代理ip并判斷是否可用和定時(shí)更新的方法
今天小編就為大家分享一篇Python采集代理ip并判斷是否可用和定時(shí)更新的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-05-05
Windows下Python2與Python3兩個(gè)版本共存的方法詳解
這篇文章主要介紹了Windows下Python2與Python3兩個(gè)版本共存的方法,文中介紹的很詳細(xì),對(duì)大家具有一定的參考價(jià)值,有需要的朋友們下面來一起看看吧。2017-02-02
Python中easy_install 和 pip 的安裝及使用
本篇文章主要介紹了Python中easy_install 和 pip 的安裝及使用,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06
ansible動(dòng)態(tài)Inventory主機(jī)清單配置遇到的坑
這篇文章主要介紹了ansible動(dòng)態(tài)Inventory主機(jī)清單配置遇到的坑,需要的朋友可以參考下2020-01-01
Python將xml和xsl轉(zhuǎn)換為html的方法
這篇文章主要介紹了Python將xml和xsl轉(zhuǎn)換為html的方法,實(shí)例分析了使用libxml2模塊操作xml和xsl轉(zhuǎn)換為html的技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-03-03

