基于python yield機制的異步操作同步化編程模型
本文總結(jié)下如何在編寫python代碼時對異步操作進行同步化模擬,從而提高代碼的可讀性和可擴展性。
游戲引擎一般都采用分布式框架,通過一定的策略來均衡服務(wù)器集群的資源負載,從而保證服務(wù)器運算的高并發(fā)性和CPU高利用率,最終提高游戲的性能和負載。由于引擎的邏輯層調(diào)用是非搶占式的,服務(wù)器之間都是通過異步調(diào)用來進行通訊,導(dǎo)致游戲邏輯無法同步執(zhí)行,所以在代碼層不得不人為地添加很多回調(diào)函數(shù),使一個原本完整的功能碎片化地分布在各個回調(diào)函數(shù)中。
異步邏輯
以游戲中的副本評分邏輯為例,在副本結(jié)束時副本管理進程需要收集副本中每個玩家的戰(zhàn)斗信息,再結(jié)合管理進程內(nèi)部的統(tǒng)計信息最終給出一個副本評分,發(fā)放相應(yīng)獎勵。因為每個玩家實體都隨機分布在不同進程中,所以管理進程需要通過異步調(diào)用來獲取玩家身上的戰(zhàn)斗信息。
實現(xiàn)代碼如下所示:
# -*- coding: gbk -*-
import random
# 玩家實體類
class Player(object):
def __init__(self, entityId):
super(Player, self).__init__()
# 玩家標識
self.entityId = entityId
def onFubenEnd(self, mailBox):
score = random.randint(1, 10)
print "onFubenEnd player %d score %d"%(self.entityId, score)
# 向副本管理進程發(fā)送自己的id和戰(zhàn)斗信息
mailBox.onEvalFubenScore(self.entityId, score)
# 副本管理類
class FubenStub(object):
def __init__(self, players):
super(FubenStub, self).__init__()
self.players = players
def evalFubenScore(self):
self.playerRelayCnt = 0
self.totalScore = 0
# 通知每個注冊的玩家,副本已經(jīng)結(jié)束,索取戰(zhàn)斗信息
for player in self.players:
player.onFubenEnd(self)
def onEvalFubenScore(self, entityId, score):
# 收到其中一個玩家的戰(zhàn)斗信息
print "onEvalFubenScore player %d score %d"%(entityId, score)
self.playerRelayCnt += 1
self.totalScore += score
# 當收集完所有玩家的信息后,打印評分
if len(self.players) == self.playerRelayCnt:
print 'The fuben totalScore is %d'%self.totalScore
if __name__ == '__main__':
# 模擬創(chuàng)建玩家實體
players = [Player(i) for i in xrange(3)]
# 副本開始時,每個玩家將自己的MailBox注冊到副本管理進程
fs = FubenStub(players)
# 副本進行中
# ....
# 副本結(jié)束,開始評分
fs.evalFubenScore()
代碼簡化了副本評分邏輯的實現(xiàn),其中Player類表示游戲的玩家實體,在游戲運行時無縫地在不同服務(wù)器中切換,F(xiàn)ubenStub表示副本的管理進程,在副本剛開始的時候該副本內(nèi)所有玩家會將自己的MailBox注冊到管理進程中,其中MailBox表示各個實體的遠程調(diào)用句柄。在副本結(jié)束時,F(xiàn)ubenStub首先向各個玩家發(fā)送副本結(jié)束消息,同時請求玩家的戰(zhàn)斗信息,玩家在得到消息后,將自己的戰(zhàn)斗信息發(fā)送給FubenStub;然后當FubenStub收集完所有玩家的信息后,最終打印副本評分。
同步邏輯
如果Player和FubenStub在同一進程中的話,那所有的操作都可以同步完成,在FubenStub向玩家發(fā)送副本結(jié)束消息的同時可以馬上得到該玩家的戰(zhàn)斗信息,實現(xiàn)代碼如下所示:
# -*- coding: gbk -*-
import random
class Player(object):
def __init__(self, entityId):
super(Player, self).__init__()
self.entityId = entityId
def onFubenEnd(self, mailBox):
score = random.randint(1, 10)
print "onFubenEnd player %d score %d"%(self.entityId, score)
return self.entityId, score
class FubenStub(object):
def __init__(self, players):
super(FubenStub, self).__init__()
self.players = players
def evalFubenScore(self):
totalScore = 0
for player in self.players:
entityId, score = player.onFubenEnd(self)
print "onEvalFubenScore player %d score %d"%(entityId, score)
totalScore += score
print 'The fuben totalScore is %d'%totalScore
if __name__ == '__main__':
players = [Player(i) for i in xrange(3)]
fs = FubenStub(players)
fs.evalFubenScore()
從以上兩份代碼可以看到由于異步操作,F(xiàn)ubenStub中的評分邏輯人為地分成兩個功能點:1)向玩家發(fā)送副本結(jié)束消息;2)接受玩家的戰(zhàn)斗信息;并且兩個功能點分布在兩個不同的函數(shù)中。如果游戲邏輯一旦復(fù)雜,勢必會造成功能點分散,出現(xiàn)過多onXXX異步回調(diào)函數(shù),最終導(dǎo)致代碼的開發(fā)成本和維護成本提高,可讀性和可擴展性下降。
如果有一種方法,可以讓函數(shù)在異步調(diào)用時暫時掛起,并且在回調(diào)函數(shù)得到返回值后恢復(fù)執(zhí)行,那么就可以用同步化的編程模式開發(fā)異步邏輯。
yield 關(guān)鍵字
yield 是 Python中的一個關(guān)鍵字,凡是函數(shù)體中出現(xiàn)了 yield 關(guān)鍵字, Python將改變整個函數(shù)的上下文,調(diào)用該函數(shù)不再返回值, 而是一個生成器對象。只有調(diào)用這個生成器的迭代函數(shù)next才能開始執(zhí)行生成器對象,當生成器對象執(zhí)行到包含 yield 表達式時, 函數(shù)將暫時掛起,等待下一次next調(diào)用來恢復(fù)執(zhí)行,具體機制如下:
1)調(diào)用生成器對象的next方法,啟動函數(shù)執(zhí)行;
2)當生成器對象執(zhí)行到包含 yield 表達式時, 函數(shù)掛起;
3)下一次 next 函數(shù)調(diào)用又會驅(qū)動該生成器對象繼續(xù)執(zhí)行此后的語句, 直到遇見下一個 yield 再次掛起;
4)如果某次 next 調(diào)用驅(qū)動了生成器繼續(xù)執(zhí)行, 而此后函數(shù)正常結(jié)束,生成器會拋出 StopIteration 異常;
如下代碼所示:
def f(): print "Before first yield" yield 1 print "Before second yield" yield 2 print "After second yield" g = f() print "Before first next" g.next() print "Before second next" g.next() print "Before third yield" g.next()
執(zhí)行結(jié)果為:
Before first next
Before first yield
Before second next
Before second yield
Before third yield
After second yield
StopIteration
哈,有了讓函數(shù)暫時掛起的機制,最后就剩下如何傳遞異步調(diào)用的返回值問題了。其實生成器的next函數(shù)已經(jīng)實現(xiàn)了將參數(shù)從生成器對象內(nèi)部向外傳遞的機制,并且python還提供了一個send函數(shù)將參數(shù)從外向生成器對象內(nèi)部傳遞的機制,具體機制如下:
1) 調(diào)用next 函數(shù)驅(qū)動生成器時, next會同時等待生成器中下一個 yield 掛起,并將該yield后面的參數(shù)返回給next;
2)往生成器中傳遞參數(shù),需要將next函數(shù)替換成send,此時send的功能與next相同(驅(qū)動生成器執(zhí)行,等待返回值),同時send將后面的參數(shù)傳遞給生成器內(nèi)部之前掛起的yield;
如下代碼所示:
def f():
msg = yield 'first yield msg'
print "generator inner receive:", msg
msg = yield 'second yield msg'
print "generator inner receive:", msg
g = f()
msg = g.next()
print "generator outer receive:", msg
msg = g.send('first send msg')
print "generator outer receive:", msg
g.send('second send msg')
執(zhí)行結(jié)果為:
generator outer receive: first yield msg
generator inner receive: first send msg
generator outer receive: second yield msg
generator inner receive: second send msg
StopIteration
同步化實現(xiàn)
好了,萬事俱備只欠東風,下面就是簡單對yield機制進行工程上封裝以方便之后開發(fā)。下面的代碼提供了一個叫IFakeSyncCall的interface,所有包含異步操作的邏輯類都可以繼承這個接口:
class IFakeSyncCall(object):
def __init__(self):
super(IFakeSyncCall, self).__init__()
self.generators = {}
@staticmethod
def FAKE_SYNCALL():
def fwrap(method):
def fakeSyncCall(instance, *args, **kwargs):
instance.generators[method.__name__] = method(instance, *args, **kwargs)
func, args = instance.generators[method.__name__].next()
func(*args)
return fakeSyncCall
return fwrap
def onFakeSyncCall(self, identify, result):
try:
func, args = self.generators[identify].send(result)
func(*args)
except StopIteration:
self.generators.pop(identify)
其中interface中屬性generators用來保存類中已經(jīng)開始執(zhí)行的生成器對象;函數(shù)FAKE_SYNCALL是一個decorator,裝飾類中包含有yield的函數(shù),改變函數(shù)的調(diào)用上下文,在fakeSyncCall內(nèi)部封裝了對生成器對象的next調(diào)用;函數(shù)onFakeSyncCall封裝了所有onXXX函數(shù)的邏輯,其他實體通過調(diào)用這個函數(shù)傳遞異步回調(diào)的返回值。
下面就是經(jīng)過同步化改進后的異步副本評分邏輯代碼:
# -*- coding: gbk -*-
import random
class Player(object):
def __init__(self, entityId):
super(Player, self).__init__()
self.entityId = entityId
def onFubenEnd(self, mailBox):
score = random.randint(1, 10)
print "onFubenEnd player %d score %d"%(self.entityId, score)
mailBox.onFakeSyncCall('evalFubenScore', (self.entityId, score))
class FubenStub(IFakeSyncCall):
def __init__(self, players):
super(FubenStub, self).__init__()
self.players = players
@IFakeSyncCall.FAKE_SYNCALL()
def evalFubenScore(self):
totalScore = 0
for player in self.players:
entityId, score = yield (player.onFubenEnd, (self,))
print "onEvalFubenScore player %d score %d"%(entityId, score)
totalScore += score
print 'the totalScore is %d'%totalScore
if __name__ == '__main__':
players = [Player(i) for i in xrange(3)]
fs = FubenStub(players)
fs.evalFubenScore()
比較evalFubenScore函數(shù),基本已經(jīng)和原本的同步邏輯代碼相差無幾。
利用yield機制實現(xiàn)同步化編程模型的另外一個優(yōu)點是可以保證所有異步調(diào)用的邏輯串行化,從而保證數(shù)據(jù)的一致性和有效性,特別是在各種異步初始化流程中可以摒棄傳統(tǒng)的timer sleep機制,從源頭上扼殺一些隱藏很深的由于數(shù)據(jù)不一致性所導(dǎo)致的bug。
相關(guān)文章
Python使用psutil庫實現(xiàn)系統(tǒng)監(jiān)控與管理詳解
在我們的測試工作中,監(jiān)控和管理系統(tǒng)資源是一項重要的任務(wù),本文將介紹如何使用psutil庫來實現(xiàn)系統(tǒng)監(jiān)控和管理,以及一些實用的技巧和示例,希望對大家有所幫助2022-10-10
Pycharm-community-2020.2.3 社區(qū)版安裝教程圖文詳解
這篇文章主要介紹了Pycharm-community-2020.2.3 社區(qū)版安裝教程圖文詳解,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12
Python數(shù)值求解微分方程方法(歐拉法,隱式歐拉)
這篇文章主要介紹了Python數(shù)值求解微分方程方法(歐拉法,隱式歐拉),文章圍繞主題展開詳細的內(nèi)介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-09-09
Django對接elasticsearch實現(xiàn)全文檢索的示例代碼
搜索是很常用的功能,如果是千萬級的數(shù)據(jù)應(yīng)該怎么檢索,本文主要介紹了Django對接elasticsearch實現(xiàn)全文檢索的示例代碼,感興趣的可以了解一下2021-08-08
使用Cython中prange函數(shù)實現(xiàn)for循環(huán)的并行
Cython中提供了一個prange函數(shù),專門用于循環(huán)的并行執(zhí)行。這個 prange的特殊功能是Cython獨一無二的,并且prange只能與for循環(huán)搭配使用,不能獨立存在。本文就將使用 prange 實現(xiàn) for 循環(huán)的并行,感興趣的可以了解一下2022-08-08
python opencv角點檢測連線功能的實現(xiàn)代碼
這篇文章主要介紹了python opencv角點檢測連線功能的實現(xiàn)代碼,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11
在import scipy.misc 后找不到 imsave的解決方案
這篇文章主要介紹了在import scipy.misc 后找不到 imsave的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-05-05

