Python 并行化執(zhí)行詳細(xì)解析
前言:
并行編程比程序編程困難,除非正常編程需要?jiǎng)?chuàng)建大量數(shù)據(jù),計(jì)算耗時(shí)太長(zhǎng),物理行為模擬困難
例子:N體問(wèn)題
物理前提:
- 牛頓定律
- 時(shí)間離散運(yùn)動(dòng)方程

普通計(jì)算方法
import numpy as np
import time
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
Ns = [2**i for i in range(1,10)]
runtimes = []
def remove_i(x,i):
"從所有粒子中去除本粒子"
shape = (x.shape[0]-1,)+x.shape[1:]
y = np.empty(shape,dtype=float)
y[:i] = x[:i]
y[i:] = x[i+1:]
return y
def a(i,x,G,m):
"計(jì)算加速度"
x_i = x[i]
x_j = remove_i(x,i)
m_j = remove_i(m,i)
diff = x_j - x_i
mag3 = np.sum(diff**2,axis=1)**1.5
result = G * np.sum(diff * (m_j / mag3)[:,np.newaxis],axis=0)
return result
def timestep(x0,v0,G,m,dt):
N = len(x0)
x1 = np.empty(x0.shape,dtype=float)
v1 = np.empty(v0.shape,dtype=float)
for i in range(N):
a_i0 = a(i,x0,G,m)
v1[i] = a_i0 * dt + v0[i]
x1[i] = a_i0 * dt**2 + v0[i] * dt + x0[i]
return x1,v1
def initial_cond(N,D):
x0 = np.array([[1,1,1],[10,10,10]])
v0 = np.array([[10,10,1],[0,0,0]])
m = np.array([10,10])
return x0,v0,m
def stimulate(N,D,S,G,dt):
fig = plt.figure()
ax = Axes3D(fig)
x0,v0,m = initial_cond(N,D)
for s in range(S):
x1,v1 = timestep(x0,v0,G,m,dt)
x0,v0 = x1,v1
t = 0
for i in x0:
ax.scatter(i[0],i[1],i[2],label=str(s*dt),c=["black","green","red"][t])
t += 1
t = 0
plt.show()
start = time.time()
stimulate(2,3,3000,9.8,1e-3)
stop = time.time()
runtimes.append(stop - start)效果圖

Python 并行化執(zhí)行
首先我們給出一個(gè)可以用來(lái)寫自己的并行化程序的,額,一串代碼
import datetime
import multiprocessing as mp
def accessional_fun():
f = open("accession.txt","r")
result = float(f.read())
f.close()
return result
def final_fun(name, param):
result = 0
for num in param:
result += num + accessional_fun() * 2
return {name: result}
if __name__ == '__main__':
start_time = datetime.datetime.now()
num_cores = int(mp.cpu_count())
print("你使用的計(jì)算機(jī)有: " + str(num_cores) + " 個(gè)核,當(dāng)然了,Intel 7 以上的要除以2")
print("如果你使用的 Python 是 32 位的,注意數(shù)據(jù)量不要超過(guò)兩個(gè)G")
print("請(qǐng)你再次檢查你的程序是否已經(jīng)改成了適合并行運(yùn)算的樣子")
pool = mp.Pool(num_cores)
param_dict = {'task1': list(range(10, 300)),
'task2': list(range(300, 600)),
'task3': list(range(600, 900)),
'task4': list(range(900, 1200)),
'task5': list(range(1200, 1500)),
'task6': list(range(1500, 1800)),
'task7': list(range(1800, 2100)),
'task8': list(range(2100, 2400)),
'task9': list(range(2400, 2700)),
'task10': list(range(2700, 3000))}
results = [pool.apply_async(final_fun, args=(name, param)) for name, param in param_dict.items()]
results = [p.get() for p in results]
end_time = datetime.datetime.now()
use_time = (end_time - start_time).total_seconds()
print("多進(jìn)程計(jì)算 共消耗: " + "{:.2f}".format(use_time) + " 秒")
print(results)運(yùn)行結(jié)果:如下:

accession.txt 里的內(nèi)容是2.5 這就是一個(gè)累加的問(wèn)題,每次累加的時(shí)候都會(huì)讀取文件中的2.5
如果需要運(yùn)算的問(wèn)題是類似于累加的問(wèn)題,也就是可并行運(yùn)算的問(wèn)題,那么才好做出并行運(yùn)算的改造
再舉一個(gè)例子
import math
import time
import multiprocessing as mp
def final_fun(name, param):
result = 0
for num in param:
result += math.cos(num) + math.sin(num)
return {name: result}
if __name__ == '__main__':
start_time = time.time()
num_cores = int(mp.cpu_count())
print("你使用的計(jì)算機(jī)有: " + str(num_cores) + " 個(gè)核,當(dāng)然了,Intel 7 以上的要除以2")
print("如果你使用的 Python 是 32 位的,注意數(shù)據(jù)量不要超過(guò)兩個(gè)G")
print("請(qǐng)你再次檢查你的程序是否已經(jīng)改成了適合并行運(yùn)算的樣子")
pool = mp.Pool(num_cores)
param_dict = {'task1': list(range(10, 3000000)),
'task2': list(range(3000000, 6000000)),
'task3': list(range(6000000, 9000000)),
'task4': list(range(9000000, 12000000)),
'task5': list(range(12000000, 15000000)),
'task6': list(range(15000000, 18000000)),
'task7': list(range(18000000, 21000000)),
'task8': list(range(21000000, 24000000)),
'task9': list(range(24000000, 27000000)),
'task10': list(range(27000000, 30000000))}
results = [pool.apply_async(final_fun, args=(name, param)) for name, param in param_dict.items()]
results = [p.get() for p in results]
end_time = time.time()
use_time = end_time - start_time
print("多進(jìn)程計(jì)算 共消耗: " + "{:.2f}".format(use_time) + " 秒")
result = 0
for i in range(0,10):
result += results[i].get("task"+str(i+1))
print(result)
start_time = time.time()
result = 0
for i in range(10,30000000):
result += math.cos(i) + math.sin(i)
end_time = time.time()
print("單進(jìn)程計(jì)算 共消耗: " + "{:.2f}".format(end_time - start_time) + " 秒")
print(result)運(yùn)行結(jié)果:

力學(xué)問(wèn)題改進(jìn):
import numpy as np
import time
from mpi4py import MPI
from mpi4py.MPI import COMM_WORLD
from types import FunctionType
from matplotlib import pyplot as plt
from multiprocessing import Pool
def remove_i(x,i):
shape = (x.shape[0]-1,) + x.shape[1:]
y = np.empty(shape,dtype=float)
y[:1] = x[:1]
y[i:] = x[i+1:]
return y
def a(i,x,G,m):
x_i = x[i]
x_j = remove_i(x,i)
m_j = remove_i(m,i)
diff = x_j - x_i
mag3 = np.sum(diff**2,axis=1)**1.5
result = G * np.sum(diff * (m_j/mag3)[:,np.newaxis],axis=0)
return result
def timestep(x0,v0,G,m,dt,pool):
N = len(x0)
takes = [(i,x0,v0,G,m,dt) for i in range(N)]
results = pool.map(timestep_i,takes)
x1 = np.empty(x0.shape,dtype=float)
v1 = np.empty(v0.shape,dtype=float)
for i,x_i1,v_i1 in results:
x1[i] = x_i1
v1[i] = v_i1
return x1,v1
def timestep_i(args):
i,x0,v0,G,m,dt = args
a_i0 = a(i,x0,G,m)
v_i1 = a_i0 * dt + v0[i]
x_i1 = a_i0 * dt ** 2 +v0[i]*dt + x0[i]
return i,x_i1,v_i1
def initial_cond(N,D):
x0 = np.random.rand(N,D)
v0 = np.zeros((N,D),dtype=float)
m = np.ones(N,dtype=float)
return x0,v0,m
class Pool(object):
def __init__(self):
self.f = None
self.P = COMM_WORLD.Get_size()
self.rank = COMM_WORLD.Get_rank()
def wait(self):
if self.rank == 0:
raise RuntimeError("Proc 0 cannot wait!")
status = MPI.Status()
while True:
task = COMM_WORLD.recv(source=0,tag=MPI.ANY_TAG,status=status)
if not task:
break
if isinstance(task,FunctionType):
self.f = task
continue
result = self.f(task)
COMM_WORLD.isend(result,dest=0,tag=status.tag)
def map(self,f,tasks):
N = len(tasks)
P = self.P
Pless1 = P - 1
if self.rank != 0:
self.wait()
return
if f is not self.f:
self.f = f
requests = []
for p in range(1,self.P):
r = COMM_WORLD.isend(f,dest=p)
requests.append(r)
MPI.Request.waitall(requests)
results = []
for i in range(N):
result = COMM_WORLD.recv(source=(i%Pless1)+1,tag=i)
results.append(result)
return results
def __del__(self):
if self.rank == 0:
for p in range(1,self.p):
COMM_WORLD.isend(False,dest=p)
def simulate(N,D,S,G,dt):
x0,v0,m = initial_cond(N,D)
pool = Pool()
if COMM_WORLD.Get_rank()==0:
for s in range(S):
x1,v1 = timestep(x0,v0,G,m,dt,pool)
x0,v0 = x1,v1
else:
pool.wait()
if __name__ == '__main__':
simulate(128,3,300,1.0,0.001)
Ps = [1,2,4,8]
runtimes = []
for P in Ps:
start = time.time()
simulate(128,3,300,1.0,0.001)
stop = time.time()
runtimes.append(stop - start)
print(runtimes)到此這篇關(guān)于Python 并行化執(zhí)行詳細(xì)解析的文章就介紹到這了,更多相關(guān)Python 并行化執(zhí)行內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python+wxPython實(shí)現(xiàn)將圖片轉(zhuǎn)換為草圖
將照片轉(zhuǎn)換為藝術(shù)風(fēng)格的草圖是一種有趣的方式,可以為您的圖像添加獨(dú)特的效果,本文主要介紹了如何Python和wxPython來(lái)實(shí)現(xiàn)這一目標(biāo),需要的可以參考下2023-08-08
Python sklearn KFold 生成交叉驗(yàn)證數(shù)據(jù)集的方法
今天小編就為大家分享一篇Python sklearn KFold 生成交叉驗(yàn)證數(shù)據(jù)集的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-12-12
python 生成正態(tài)分布數(shù)據(jù),并繪圖和解析
這篇文章主要介紹了python 生成正態(tài)分布數(shù)據(jù),并繪圖和解析,幫助大家更好的利用python進(jìn)行數(shù)據(jù)分析,感興趣的朋友可以了解下2020-12-12
使用python-cv2實(shí)現(xiàn)Harr+Adaboost人臉識(shí)別的示例
這篇文章主要介紹了使用python-cv2實(shí)現(xiàn)Harr+Adaboost人臉識(shí)別的示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
Pygame坦克大戰(zhàn)游戲開(kāi)發(fā)實(shí)戰(zhàn)詳解代碼
《坦克大戰(zhàn)》以二戰(zhàn)坦克為題材,既保留了射擊類游戲的操作性,也改進(jìn)了射擊類游戲太過(guò)于復(fù)雜難玩的高門檻特點(diǎn),集休閑與競(jìng)技于一身。經(jīng)典再度襲來(lái),流暢的畫面,瘋狂的戰(zhàn)斗,讓玩家再次進(jìn)入瘋狂坦克的世界。玩家的目標(biāo)是控制坦克躲避危險(xiǎn),消滅掉所有的敵人即可進(jìn)入下一關(guān)2022-02-02

