前言
并行計算是使用并行計算機來減少單個計算問題所需要的時間,我們可以通過利用編程語言顯式的說明計算中的不同部分如何再不同的處理器上同時執(zhí)行來設(shè)計我們的并行程序,最終達到大幅度提升程序效率的目的。
眾所周知,中的GIL限制了多線程并行對多核CPU的利用,但是我們?nèi)匀豢梢酝ㄟ^各種其他的方式來讓真正利用多核資源, 例如通過C/C++擴展來實現(xiàn)多線程/多進程, 以及直接利用的多進程模塊來進行多進程編程。
本文主要嘗試僅僅通過內(nèi)置的模塊對自己的動力學計算程序來進行優(yōu)化和效率提升,其中:
本文并不是對的模塊的接口進行翻譯介紹,需要熟悉的童鞋可以參考官方文檔。
正文
最近想用自己的微觀動力學程序進行一系列的求解并將結(jié)果繪制成二維Map圖進行可視化,這樣就需要對二維圖上的多個點進行計算并將結(jié)果收集起來并進行繪制,由于每個點都需要進行一次ODE積分以及牛頓法求解方程組,因此要串行地繪制整張圖可能會遇到極低的效率問題尤其是對參數(shù)進行測試的時候,每畫一張圖都需要等很久的時間。其中繪制的二維圖中每個點都是獨立計算的python通訊錄程序設(shè)計報告,于是很自然而然的想到了進行并行化處理。
串行的原始版本
由于腳本比較長,而且實現(xiàn)均為自己的程序,因此對具體實現(xiàn)進行了省略。腳本的大致結(jié)構(gòu)如下, 本質(zhì)是一個二重循環(huán),循環(huán)的變量分別為反應(yīng)物氣體(O2 和 CO)的分壓的值:
import time
import numpy as np
# 省略若干...
pCOs = np.linspace(1e-5, 0.5, 10)
pO2s = np.linspace(1e-5, 0.5, 10)
if "__main__" == __name__:
try:
start = time.time()
for i, pO2 in enumerate(pO2s):
# ...
for j, pCO in enumerate(pCOs):
# 針對當前的分壓值 pCO, pO2進行動力學求解
# 具體代碼略...
end = time.time()
t = end - start
finally:
# 收集計算的結(jié)果并進行處理繪圖
整體過程就這么簡單,我需要做的就是使用的接口來對這個二重循環(huán)進行并行化。
使用單核串行繪制100個點所需要的時間如下, 總共花了240.76秒:
二維map圖繪制的效果如下:
進行多進程并行處理
模塊
模塊提供了類似模塊的接口,并對進程的各種操作進行了良好的封裝,提供了各種進程間通信的接口例如Pipe, Queue等等,可以幫助我們實現(xiàn)進程間的通信,同步等操作。
使用類來動態(tài)創(chuàng)建進程實現(xiàn)并行
模塊提供了能讓我們通過創(chuàng)建進程對象并執(zhí)行該進程對象的start方法來創(chuàng)建一個真正的進程來執(zhí)行任務(wù),該接口類似模塊中的線程類.
但是當被操作對象數(shù)目不大的時候可以使用動態(tài)生成多個進程,但是如果需要的進程數(shù)一旦很多的時候,手動限制進程的數(shù)量以及處理不同進程返回值會變得異常的繁瑣,因此這個時候我們需要使用進程池來簡化操作。
使用進程池來管理進程
模塊提供了一個進程池Pool類,負責創(chuàng)建進程池對象,并提供了一些方法來講運算任務(wù)到不同的子進程中執(zhí)行,并很方便的獲取返回值。例如我們現(xiàn)在要進行的循環(huán)并行便很容易的將其實現(xiàn)。
對于這里的單指令多數(shù)據(jù)流的并行python通訊錄程序設(shè)計報告,我們可以直接使用Pool.map()來將函數(shù)映射到參數(shù)列表中。Pool.map其實是map函數(shù)的并行版本,此函數(shù)將會阻塞直到所有進程全部結(jié)束,而且此函數(shù)返回的結(jié)果順序仍然不變。
首先,我先把針對每對分壓數(shù)據(jù)的處理過程封裝成一個函數(shù),這樣可以將函數(shù)對象傳遞給子進程執(zhí)行。
import time
from multiprocessing import Pool
import numpy as np
# 省略若干...
pCOs = np.linspace(1e-5, 0.5, 10)
pO2s = np.linspace(1e-5, 0.5, 10)
def task(pO2):
'''接受一個O2分壓,根據(jù)當前的CO分壓進行動力學求解'''
# 代碼細節(jié)省略...
if "__main__" == __name__:
try:
start = time.time()
pool = Pool() # 創(chuàng)建進程池對象,進程數(shù)與multiprocessing.cpu_count()相同
tofs = pool.map(task, pCOs) # 并行執(zhí)行函數(shù)
end = time.time()
t = end - start
finally:
# 收集計算的結(jié)果并進行處理繪圖
使用兩個核心進行計算,計算時間從240.76s降到了148.61秒, 加速比為1.62
對不同核心的加速效果進行測試
為了查看使用不同核心數(shù)對程序效率的改善,我對不同的核心數(shù)和加速比進行了測試繪圖,效果如下:
運行核心數(shù)與程序運行時間:
運行核心數(shù)與加速比:
可見,由于我外層循環(huán)只循環(huán)了10次因此使用的核心數(shù)超過10以后核心數(shù)的增加并不能對程序進行加速,也就是多余的核心都浪費掉了。
使用實現(xiàn)簡單的分布式計算
前面使用了包提供的接口我們使用了再一臺機器上進行多核心計算的并行處理,但是的用處還有更多,通過.模塊,我們可以實現(xiàn)簡單的多機分布式并行計算,將計算任務(wù)分布到不同的計算機中運行。
提供了另外的多進程通信工具,他提供了在多臺計算機之間共享數(shù)據(jù)的接口和數(shù)據(jù)對象,這些數(shù)據(jù)對象全部都是通過代理類實現(xiàn)的,比如和等等,他們都實現(xiàn)了與原生list和dict相同的接口,但是他們可以通過網(wǎng)絡(luò)在不同計算機中的進程中進行共享。
關(guān)于模塊的接口的詳細使用可以參考官方文檔:16.6. - -based “” - 2.7.13
好了現(xiàn)在我們開始嘗試將繪圖程序改造成可以在多臺計算機中分布式并行的程序。改造的主要思想是:
使用一臺計算機作為服務(wù)端(),此臺計算機通過一個對象來管理共享對象,任務(wù)分配以及結(jié)果的接收,并再收集結(jié)果以后進行后處理(繪制二維map圖)。其他多臺計算機可以作為客戶端來接收的數(shù)據(jù)進行計算,并將結(jié)果傳到共享數(shù)據(jù)中,讓可以收集。同時再端可以同時進行上文所實現(xiàn)的多進程并行來充分利用計算機的多核優(yōu)勢。
大致可總結(jié)為下圖:
服務(wù)進程
首先服務(wù)端需要一個對象來管理共享對象
def get_manager():
'''創(chuàng)建服務(wù)端manager對象.
'''
# 自定義manager類
class JobManager(BaseManager):
pass
# 創(chuàng)建任務(wù)隊列,并將此數(shù)據(jù)對象共享在網(wǎng)絡(luò)中
jobid_queue = Queue()
JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)
# 創(chuàng)建列表代理類,并將其共享再網(wǎng)絡(luò)中
tofs = [None]*N
JobManager.register('get_tofs_list', callable=lambda: tofs, proxytype=ListProxy)
# 將分壓參數(shù)共享到網(wǎng)絡(luò)中
JobManager.register('get_pCOs', callable=lambda: pCOs, proxytype=ListProxy)
JobManager.register('get_pO2s', callable=lambda: pCOs, proxytype=ListProxy)
# 創(chuàng)建manager對象并返回
manager = JobManager(address=(ADDR, PORT), authkey=AUTHKEY)
return manager

.是一個類方法,它可以將某種類型或者可調(diào)用的對象綁定到對象并共享到網(wǎng)絡(luò)中,使得其他在網(wǎng)絡(luò)中的計算機能夠獲取相應(yīng)的對象。
例如,
JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)
我就將一個返回任務(wù)隊列的函數(shù)對象同對象綁定并共享到網(wǎng)絡(luò)中,這樣在網(wǎng)絡(luò)中的進程就可以通過自己的對象的方法得到相同的隊列,這樣便實現(xiàn)了數(shù)據(jù)的共享.
創(chuàng)建對象的時候需要兩個參數(shù),
進行任務(wù)分配
上面我們將一個任務(wù)隊列綁定到了對象中,現(xiàn)在我需要將隊列進行填充,這樣才能將任務(wù)發(fā)放到不同的客戶端來進行并行執(zhí)行。
def fill_jobid_queue(manager, nclient):
indices = range(N)
interval = N/nclient
jobid_queue = manager.get_jobid_queue()
start = 0
for i in range(nclient):
jobid_queue.put(indices[start: start+interval])
start += interval
if N % nclient > 0:
jobid_queue.put(indices[start:])
這里所謂的任務(wù)其實就是相應(yīng)參數(shù)在list中的index值,這樣不同計算機中得到的結(jié)果可以按照相應(yīng)的index將結(jié)果填入到結(jié)果列表中,這樣服務(wù)端就能在共享的網(wǎng)絡(luò)中收集各個計算機計算的結(jié)果。
啟動服務(wù)端進行監(jiān)聽
def run_server():
# 獲取manager
manager = get_manager()
print "Start manager at {}:{}...".format(ADDR, PORT)
# 創(chuàng)建一個子進程來啟動manager
manager.start()
# 填充任務(wù)隊列
fill_jobid_queue(manager, NNODE)
shared_job_queue = manager.get_jobid_queue()
shared_tofs_list = manager.get_tofs_list()

queue_size = shared_job_queue.qsize()
# 循環(huán)進行監(jiān)聽,直到結(jié)果列表被填滿
while None in shared_tofs_list:
if shared_job_queue.qsize() < queue_size:
queue_size = shared_job_queue.qsize()
print "Job picked..."
return manager
任務(wù)進程
服務(wù)進程負責進行簡單的任務(wù)分配和調(diào)度,任務(wù)進程則只負責獲取任務(wù)并進行計算處理。
在任務(wù)進程(客戶端)中基本代碼與我們上面單機中的多核運行的腳本基本相同(因為都是同一個函數(shù)處理不同的數(shù)據(jù)),但是我們也需要為客戶端創(chuàng)建一個來進行任務(wù)的獲取和返回。
def get_manager():
class WorkManager(BaseManager):
pass
# 由于只是從共享網(wǎng)絡(luò)中獲取,因此只需要注冊名字即可
WorkManager.register('get_jobid_queue')
WorkManager.register('get_tofs_list')
WorkManager.register('get_pCOs')
WorkManager.register('get_pO2s')
# 這里的地址和驗證碼要與服務(wù)端相同才可以進行數(shù)據(jù)共享
manager = WorkManager(address=(ADDR, PORT), authkey=AUTHKEY)
return manager
在客戶端我們?nèi)匀豢梢远噙M程利用多核資源來加速計算。
if "__main__" == __name__:
manager = get_manager()
print "work manager connect to {}:{}...".format(ADDR, PORT)
# 將客戶端本地的manager連接到相應(yīng)的服務(wù)端manager
manager.connect()
# 獲取共享的結(jié)果收集列表
shared_tofs_list = manager.get_tofs_list()
# 獲取共享的任務(wù)隊列

shared_jobid_queue = manager.get_jobid_queue()
# 從服務(wù)端獲取計算參數(shù)
pCOs = manager.get_pCOs()
shared_pO2s = manager.get_pO2s()
# 創(chuàng)建進程池在本地計算機進行多核并行
pool = Pool()
while 1:
try:
indices = shared_jobid_queue.get_nowait()
pO2s = [shared_pO2s[i] for i in indices]
print "Run {}".format(str(pO2s))
tofs_2d = pool.map(task, pO2s)
# Update shared tofs list.
for idx, tofs_1d in zip(indices, tofs_2d):
shared_tofs_list[idx] = tofs_1d
# 直到將任務(wù)隊列中的任務(wù)全部取完,結(jié)束任務(wù)進程
except Queue.Empty:
break
下面我將在3臺在同一局域網(wǎng)中的電腦來進行簡單的分布式計算測試,
先在服務(wù)端運行服務(wù)腳本進行任務(wù)分配和監(jiān)聽:
python server.py
在兩個客戶端運行任務(wù)腳本來獲取任務(wù)隊列中的任務(wù)并執(zhí)行
python worker.py
如下圖:
當任務(wù)隊列為空且任務(wù)完成時,任務(wù)進程終止; 當結(jié)果列表中的結(jié)果收集完畢時,服務(wù)進程也會終止。
執(zhí)行結(jié)果如下圖:
上面的panel為服務(wù)端監(jiān)聽,左下為自己的筆記本運行結(jié)果,右下panel為集群中的其中一個節(jié)點。
可見運行時間為56.86s,無奈,是我的本子脫了后腿(-_-!)
總結(jié)
本文通過內(nèi)置模塊實現(xiàn)了單機內(nèi)多核并行以及簡單的多臺計算機的分布式并行計算,為我們提供了封裝良好并且友好的接口來使我們的程序更方面利用多核資源加速自己的計算程序,希望能對使用實現(xiàn)并行化的童鞋有所幫助。
參考