python中的生成器實(shí)現(xiàn)周期性報(bào)文發(fā)送功能
使用python中的生成器實(shí)現(xiàn)周期性發(fā)送列表中數(shù)值的報(bào)文發(fā)送功能。
功能開發(fā)背景:提取cantest工具采集到的現(xiàn)場報(bào)文數(shù)據(jù),希望使用原始的現(xiàn)場數(shù)據(jù)模擬驗(yàn)證程序現(xiàn)有邏輯,需要開發(fā)一個(gè)工具能夠自動(dòng)按照報(bào)文發(fā)送周期依次發(fā)送采集到的報(bào)文數(shù)據(jù)中的一個(gè)數(shù)值。
功能開發(fā)需求:多個(gè)報(bào)文發(fā)送對象共用同一個(gè)報(bào)文發(fā)送線程,多個(gè)對象間的報(bào)文發(fā)送周期不同,多個(gè)對象間的總報(bào)文發(fā)送數(shù)據(jù)長度不同,能夠允許報(bào)文發(fā)送過程中斷及恢復(fù)某個(gè)對象的報(bào)文發(fā)送。
功能開發(fā)實(shí)現(xiàn)邏輯:在固定發(fā)送對象某個(gè)數(shù)值的基礎(chǔ)程序版本上增加新的功能,考慮使用python中生成器實(shí)現(xiàn)周期性提取對象數(shù)值發(fā)送報(bào)文的功能。
目前只需要發(fā)送兩個(gè)對象的報(bào)文數(shù)據(jù),先定義兩個(gè)使用yield生成器:
? ? def yield_item_value_1(self): ? ? ? ? item_value_list = self.item_value_dict[item_list[0]] ? ? ? ? for i in range(len(item_value_list)): ? ? ? ? ? ? yield item_value_list[i] ? ? def yield_item_value_2(self): ? ? ? ? item_value_list = self.item_value_dict[item_list[1]] ? ? ? ? for i in range(len(item_value_list)): ? ? ? ? ? ? yield item_value_list[i]
報(bào)文發(fā)送線程中的run()函數(shù):
def run(self):
# 實(shí)時(shí)更新item的被選狀態(tài)
self.get_checkbox_res_func()
# 獲取每個(gè)對象的實(shí)際物理值
self.get_item_value_dict()
self.item1_value_func = self.yield_item_value_1()
self.item2_value_func = self.yield_item_value_2()
while self.Flag:
if any(msg_send_flag_dict.values()):
# 每隔second秒執(zhí)行func函數(shù)
timer = Timer(0.01, self.tick_10ms_func)
timer.start()
self.send_working_msg(self.working_can_device, self.working_can_channel)
timer.join()
else:
mes_info = "Goodbye *** 自動(dòng)發(fā)送所有報(bào)文數(shù)據(jù)結(jié)束?。?!"
toastone = wx.MessageDialog(None, mes_info, "信息提示",
wx.YES_DEFAULT | wx.ICON_QUESTION)
if toastone.ShowModal() == wx.ID_YES: # 如果點(diǎn)擊了提示框的確定按鈕
toastone.Destroy() # 則關(guān)閉提示框
break
報(bào)文周期性發(fā)送函數(shù):
def send_working_msg(self, can_device, device_id):
for idx in range(len(item_list)):
if msg_send_flag_dict[item_list[idx]] == 1:
msg_id_idx = msg_operation_list.index("報(bào)文ID") - 1
msg_id = eval(str(self.operation_dict[item_list[idx]][msg_id_idx]).strip())
# 獲取報(bào)文發(fā)送幀類型
msg_type_idx = msg_operation_list.index("幀類型") - 1
msg_type = str(self.operation_dict[item_list[idx]][msg_type_idx])
msg_type = 1 if msg_type == "擴(kuò)展幀" else 0
# 獲取報(bào)文發(fā)送周期
msg_cycle_idx = msg_operation_list.index("周期(ms)") - 1
msg_cycle = int(self.operation_dict[item_list[idx]][msg_cycle_idx])
send_cycle = msg_cycle / 10
if msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] >= send_cycle:
# 開始喂值
if idx == 0:
try:
item_phyValue = next(self.item1_value_func)
except StopIteration:
msg_send_flag_dict[item_list[idx]] = 0
continue
else:
try:
item_phyValue = next(self.item2_value_func)
except StopIteration:
msg_send_flag_dict[item_list[idx]] = 0
continue
msg_data = self.get_item_msg(item_list[idx], item_phyValue)
if send_msg(msg_id, msg_type, msg_data, can_device, device_id, 0):
print("發(fā)送報(bào)文成功")
# print("msg_data", msg_data)
msg_tick_10ms_dict["_".join(["tick", "10ms", str(idx)])] = 0
else:
pass
# print("發(fā)送報(bào)文失敗")
# mes_info = "發(fā)送報(bào)文失敗"
# toastone = wx.MessageDialog(None, mes_info, "信息提示",
# wx.YES_DEFAULT | wx.ICON_QUESTION)
# if toastone.ShowModal() == wx.ID_YES: # 如果點(diǎn)擊了提示框的確定按鈕
# toastone.Destroy() # 則關(guān)閉提示框
功能實(shí)現(xiàn)邏輯的待優(yōu)化點(diǎn):存在多個(gè)對象就需要定義多個(gè)存儲(chǔ)報(bào)文數(shù)據(jù)的生成器。
上述功能實(shí)現(xiàn)邏輯優(yōu)化如下:
? ? def set_yield_func(self): ? ? ? ? item_yield_func_dict = dict() ? ? ? ? for i in range(len(item_list)): ? ? ? ? ? ? item_yield_func_dict[item_list[i]] = self.yield_item_value(i) ? ? ? ? return item_yield_func_dict ? ? def yield_item_value(self, item_idx): ? ? ? ? item_value_list = self.item_value_dict[item_list[item_idx]] ? ? ? ? for i in range(len(item_value_list)): ? ? ? ? ? ? yield item_value_list[i]
報(bào)文發(fā)送線程的run()函數(shù)中調(diào)用這個(gè)存儲(chǔ)對象報(bào)文發(fā)送數(shù)據(jù)生成器的字典item_yield_func_dict:
def run(self):
# 實(shí)時(shí)更新item的被選狀態(tài)
self.get_checkbox_res_func()
# 獲取每個(gè)對象的實(shí)際物理值
self.get_item_value_dict()
self.item_yield_func_dict = self.set_yield_func()
…………
從存儲(chǔ)每個(gè)對象生成器的字典item_yield_func_dict中獲取生成器對象:
try:
item_phyValue = next(self.item_yield_func_dict[item_list[idx]])
except StopIteration:
msg_send_flag_dict[item_list[idx]] = 0
continue
到此這篇關(guān)于python中的生成器實(shí)現(xiàn)周期性報(bào)文發(fā)送功能的文章就介紹到這了,更多相關(guān)python 周期性報(bào)文發(fā)送 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
批標(biāo)準(zhǔn)化層 tf.keras.layers.Batchnormalization()解析
這篇文章主要介紹了批標(biāo)準(zhǔn)化層 tf.keras.layers.Batchnormalization(),具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
基于python內(nèi)置函數(shù)與匿名函數(shù)詳解
下面小編就為大家分享一篇基于python內(nèi)置函數(shù)與匿名函數(shù)詳解,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-01-01
關(guān)于PySnooper 永遠(yuǎn)不要使用print進(jìn)行調(diào)試的問題
這篇文章主要介紹了關(guān)于PySnooper 永遠(yuǎn)不要使用print進(jìn)行調(diào)試的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
python數(shù)據(jù)類型強(qiáng)制轉(zhuǎn)換實(shí)例詳解
這篇文章主要介紹了python數(shù)據(jù)類型強(qiáng)制轉(zhuǎn)換實(shí)例詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
Python?numpy生成矩陣基礎(chǔ)用法實(shí)例代碼
矩陣是matrix類型的對象,該類繼承自numpy.ndarray,任何針對ndarray的操作,對矩陣對象同樣有效,下面這篇文章主要給大家介紹了關(guān)于Python?numpy生成矩陣基礎(chǔ)的相關(guān)資料,需要的朋友可以參考下2022-08-08
python數(shù)據(jù)結(jié)構(gòu)之圖深度優(yōu)先和廣度優(yōu)先實(shí)例詳解
這篇文章主要介紹了python數(shù)據(jù)結(jié)構(gòu)之圖深度優(yōu)先和廣度優(yōu)先,較為詳細(xì)的分析了深度優(yōu)先和廣度優(yōu)先算法的概念與原理,并給出了完整實(shí)現(xiàn)算法,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-07-07
python數(shù)字圖像處理實(shí)現(xiàn)直方圖與均衡化
在圖像處理中,直方圖是非常重要,也是非常有用的一個(gè)處理要素。這篇文章主要介紹了python數(shù)字圖像處理實(shí)現(xiàn)直方圖與均衡化,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05

