Python實現(xiàn)基于POS算法的區(qū)塊鏈
區(qū)塊鏈中的共識算法
在比特幣公鏈架構(gòu)解析中,就曾提到過為了實現(xiàn)去中介化的設(shè)計,比特幣設(shè)計了一套共識協(xié)議,并通過此協(xié)議來保證系統(tǒng)的穩(wěn)定性和防攻擊性。 并且我們知道,截止目前使用最廣泛,也是最被大家接受的共識算法,是我們先前介紹過的POW(proof of work)工作量證明算法。目前市值排名前二的比特幣和以太坊也是采用的此算法。
雖然POW共識算法取得了巨大的成功,但對它的質(zhì)疑也從來未曾停止過。 其中最主要的一個原因就是電力消耗。據(jù)不完全統(tǒng)計,基于POW的挖礦機制所消耗的電量是非常巨大的,甚至比絕大多數(shù)國家耗電量還要多。這對我們的資源造成了極大的浪費,此外隨著比特大陸等公司的強勢崛起,造成了算力的高度集中。
基于以上種種原因,更多的共識算法被提出來 POS、DPOS、BPFT等等。 今天我們就來認識POS(proof of stake)算法。
Proof of stake,譯為權(quán)益證明。你可能已經(jīng)猜到了,權(quán)益證明簡單理解就是擁有更多token的人,有更大的概率獲得記賬權(quán)利,然后獲得獎勵。 這個概率具體有多大呢? 下面我們在代碼實現(xiàn)中會展示,分析也放在后面。 當然,POS是會比POW更好嗎? 會更去中心化嗎? 現(xiàn)在看來未必,所以我們這里也不去對比誰優(yōu)誰劣。 我們站在中立的角度,單純的來討論討論POS這種算法。
代碼實戰(zhàn)
生成一個Block
既然要實現(xiàn)POS算法,那么就難免要生成一條鏈,鏈又是由一個個Block生成的,所以下面我們首先來看看如何生成Block,當然在前面的內(nèi)容里面,關(guān)于如何生成Block,以及交易、UTXO等等都已經(jīng)介紹過了。由于今天我們的核心是實現(xiàn)POS,所以關(guān)于Block的生成,我們就用最簡單的實現(xiàn)方式,好讓大家把目光聚焦在核心的內(nèi)容上面。
我們用三個方法來實現(xiàn)生成一個合法的區(qū)塊
- calculate_hash 計算區(qū)塊的hash值
- is_block_valid 校驗區(qū)塊是否合法
- generate_block 生成一個區(qū)塊
from hashlib import sha256
from datetime import datetime
def generate_block(oldblock, bpm, address):
"""
:param oldblock:
:param bpm:
:param address:
:return:
"""
newblock = {
"Index": oldblock["Index"] + 1,
"BPM": bpm,
"Timestamp": str(datetime.now()),
"PrevHash": oldblock["Hash"],
"Validator": address
}
newblock["Hash"] = calculate_hash(newblock)
return newblock
def calculate_hash(block):
record = "".join([
str(block["Index"]),
str(block["BPM"]),
block["Timestamp"],
block["PrevHash"]
])
return sha256(record.encode()).hexdigest()
def is_block_valid(newblock, oldblock):
"""
:param newblock:
:param oldblock:
:return:
"""
if oldblock["Index"] + 1 != newblock["Index"]:
return False
if oldblock["Hash"] != newblock["PrevHash"]:
return False
if calculate_hash(newblock) != newblock["Hash"]:
return False
return True
這里為了更靈活,我們沒有用類的實現(xiàn)方式,直接采用函數(shù)來實現(xiàn)了Block生成,相信很容易看懂。
創(chuàng)建一個TCP服務(wù)器
由于我們需要用權(quán)益證明算法來選擇記賬人,所以需要從很多Node(節(jié)點)中選擇記賬人,也就是需要一個server讓節(jié)點鏈接上來,同時要同步信息給節(jié)點。因此需要一個TCP長鏈接。
from socketserver import BaseRequestHandler, ThreadingTCPServer
def run():
# start a tcp server
serv = ThreadingTCPServer(('', 9090), HandleConn)
serv.serve_forever()
在這里我們用了python內(nèi)庫socketserver來創(chuàng)建了一個TCPServer。 需要注意的是,這里我們是采用的多線程的創(chuàng)建方式,這樣可以保證有多個客戶端同時連接上來,而不至于被阻塞。當然,這里這個server也是存在問題的,那就是有多少個客戶端連接,就會創(chuàng)建多少個線程,更好的方式是創(chuàng)建一個線程池。由于這里是測試,所以就采用更簡單的方式了。
相信大家已經(jīng)看到了,在我們創(chuàng)建TCPServer的時候,使用到了HandleConn,但是我們還沒有定義,所以接下來我們就來定義一個HandleConn
消息處理器
下面我們來實現(xiàn)Handler函數(shù),Handler函數(shù)在跟Client Node通信的時候,需要我們的Node實現(xiàn)下面的功能
- Node可以輸入balance(token數(shù)量) 也就是股權(quán)數(shù)目
- Node需要能夠接收廣播,方便Server同步區(qū)塊以及記賬人信息
- 添加自己到候選人名單 (候選人為持有token的人)
- 輸入BPM生成Block
- 驗證一個區(qū)塊的合法性
感覺任務(wù)還是蠻多的,接下來我們看代碼實現(xiàn)
import threading
from queue import Queue, Empty
# 定義變量
block_chain = []
temp_blocks = []
candidate_blocks = Queue() # 創(chuàng)建隊列,用于線程間通信
announcements = Queue()
validators = {}
My_Lock = threading.Lock()
class HandleConn(BaseRequestHandler):
def handle(self):
print("Got connection from", self.client_address)
# validator address
self.request.send(b"Enter token balance:")
balance = self.request.recv(8192)
try:
balance = int(balance)
except Exception as e:
print(e)
t = str(datetime.now())
address = sha256(t.encode()).hexdigest()
validators[address] = balance
print(validators)
while True:
announce_winner_t = threading.Thread(target=annouce_winner, args=(announcements, self.request,),
daemon=True)
announce_winner_t.start()
self.request.send(b"\nEnter a new BPM:")
bpm = self.request.recv(8192)
try:
bpm = int(bpm)
except Exception as e:
print(e)
del validators[address]
break
# with My_Lock:
last_block = block_chain[-1]
new_block = generate_block(last_block, bpm, address)
if is_block_valid(new_block, last_block):
print("new block is valid!")
candidate_blocks.put(new_block)
self.request.send(b"\nEnter a new BPM:\n")
annouce_blockchain_t = threading.Thread(target=annouce_blockchain, args=(self.request,), daemon=True)
annouce_blockchain_t.start()
這段代碼,可能對大多數(shù)同學(xué)來說是有難度的,在這里我們采用了多線程的方式,同時為了能夠讓消息在線程間通信,我們使用了隊列。 這里使用隊列,也是為了我們的系統(tǒng)可以更好的拓展,后面如果可能,這一節(jié)的程序很容易拓展為分布式系統(tǒng)。 將多線程里面處理的任務(wù)拆分出去成獨立的服務(wù),然后用消息隊列進行通信,就是一個簡單的分布式系統(tǒng)啦。(是不是很激動?)
由于這里有難度,所以代碼還是講一講吧
# validator address self.request.send(b"Enter token balance:") balance = self.request.recv(8192) try: balance = int(balance) except Exception as e: print(e) t = str(datetime.now()) address = sha256(t.encode()).hexdigest() validators[address] = balance print(validators)
這一段就是我們提到的Node 客戶端添加自己到候選人的代碼,每鏈接一個客戶端,就會添加一個候選人。 這里我們用添加的時間戳的hash來記錄候選人。 當然也可以用其他的方式,比如我們代碼里面的client_address
announce_winner_t = threading.Thread(target=annouce_winner, args=(announcements, self.request,),
daemon=True)
announce_winner_t.start()
def annouce_winner(announcements, request):
"""
:param announcements:
:param request:
:return:
"""
while True:
try:
msg = announcements.get(block=False)
request.send(msg.encode())
request.send(b'\n')
except Empty:
time.sleep(3)
continue
然后接下來我們起了一個線程去廣播獲得記賬權(quán)的節(jié)點信息到所有節(jié)點。
self.request.send(b"\nEnter a new BPM:")
bpm = self.request.recv(8192)
try:
bpm = int(bpm)
except Exception as e:
print(e)
del validators[address]
break
# with My_Lock:
last_block = block_chain[-1]
new_block = generate_block(last_block, bpm, address)
if is_block_valid(new_block, last_block):
print("new block is valid!")
candidate_blocks.put(new_block)
根據(jù)節(jié)點輸入的BPM值生成一個區(qū)塊,并校驗區(qū)塊的有效性。 將有效的區(qū)塊放到候選區(qū)塊當中,等待記賬人將區(qū)塊添加到鏈上。
annouce_blockchain_t = threading.Thread(target=annouce_blockchain, args=(self.request,), daemon=True) annouce_blockchain_t.start() def annouce_blockchain(request): """ :param request: :return: """ while True: time.sleep(30) with My_Lock: output = json.dumps(block_chain) try: request.send(output.encode()) request.send(b'\n') except OSError: pass
最后起一個線程,同步區(qū)塊鏈到所有節(jié)點。
看完了,節(jié)點跟Server交互的部分,接下來是最重要的部分,
POS算法實現(xiàn)
def pick_winner(announcements):
"""
選擇記賬人
:param announcements:
:return:
"""
time.sleep(10)
while True:
with My_Lock:
temp = temp_blocks
lottery_pool = [] #
if temp:
for block in temp:
if block["Validator"] not in lottery_pool:
set_validators = validators
k = set_validators.get(block["Validator"])
if k:
for i in range(k):
lottery_pool.append(block["Validator"])
lottery_winner = choice(lottery_pool)
print(lottery_winner)
# add block of winner to blockchain and let all the other nodes known
for block in temp:
if block["Validator"] == lottery_winner:
with My_Lock:
block_chain.append(block)
# write message in queue.
msg = "\n{0} 贏得了記賬權(quán)利\n".format(lottery_winner)
announcements.put(msg)
break
with My_Lock:
temp_blocks.clear()
這里我們用pick_winner 來選擇記賬權(quán)利,我們根據(jù)token數(shù)量構(gòu)造了一個列表。 一個人獲得記賬權(quán)利的概率為:
p = mount['NodeA']/mount['All']
文字描述就是其token數(shù)目在總數(shù)中的占比。 比如總數(shù)有100個,他有10個,那么其獲得記賬權(quán)的概率就是0.1, 到這里核心的部分就寫的差不多了,接下來,我們來添加節(jié)點,開始測試吧
測試POS的記賬方式
在測試之前,起始還有一部分工作要做,前面我們的run方法需要完善下,代碼如下:
def run():
# create a genesis block
t = str(datetime.now())
genesis_block = {
"Index": 0,
"Timestamp": t,
"BPM": 0,
"PrevHash": "",
"Validator": ""
}
genesis_block["Hash"] = calculate_hash(genesis_block)
print(genesis_block)
block_chain.append(genesis_block)
thread_canditate = threading.Thread(target=candidate, args=(candidate_blocks,), daemon=True)
thread_pick = threading.Thread(target=pick_winner, args=(announcements,), daemon=True)
thread_canditate.start()
thread_pick.start()
# start a tcp server
serv = ThreadingTCPServer(('', 9090), HandleConn)
serv.serve_forever()
def candidate(candidate_blocks):
"""
:param candidate_blocks:
:return:
"""
while True:
try:
candi = candidate_blocks.get(block=False)
except Empty:
time.sleep(5)
continue
temp_blocks.append(candi)
if __name__ == '__main__':
run()
添加節(jié)點連接到TCPServer
為了充分減少程序的復(fù)雜性,tcp client我們這里就不實現(xiàn)了,可以放在后面拓展部分。 畢竟我們這個系統(tǒng)是很容易擴展的,后面我們拆分了多線程的部分,在實現(xiàn)tcp client就是一個完整的分布式系統(tǒng)了。
所以,我們這里用linux自帶的命令 nc,不知道nc怎么用的同學(xué)可以google或者 man nc

啟動服務(wù) 運行 python pos.py
打開3個終端
分別輸入下面命令
nc localhost 9090
終端如果輸出
Enter token balance:
說明你client已經(jīng)鏈接服務(wù)器ok啦.
測試POS的記賬方式
接下來依次按照提示操作。 balance可以按心情來操作,因為這里是測試,我們輸入100,
緊接著會提示輸入BPM,我們前面提到過,輸入BPM是為了生成Block,那么就輸入吧,隨便輸入個9. ok, 接下來就稍等片刻,等待記賬。
輸出如同所示

依次在不同的終端,根據(jù)提示輸入數(shù)字,等待消息同步。
生成區(qū)塊鏈
下面是我這邊獲得的3個block信息。

總結(jié)
在上面的代碼中,我們實現(xiàn)了一個完整的基于POS算法記賬的鏈,當然這里有許多值得擴展與改進的地方。
- python中多線程開銷比較大,可以改成協(xié)程的方式
- TCP建立的長鏈接是基于TCPServer,是中心化的方式,可以改成P2P對等網(wǎng)絡(luò)
- 鏈的信息不夠完整
- 系統(tǒng)可以拓展成分布式,讓其更健壯
大概列了以上幾點,其他還有很多可以拓展的地方,感興趣的朋友可以先玩玩, 后者等到我們后面的教程。 (廣告打的措手不及,哈哈)
當然了,語言不是重點,所以在這里,我也實現(xiàn)了go語言的版本源碼地址
go語言的實現(xiàn)感覺要更好理解一點,也顯得要優(yōu)雅一點。這也是為什么go語言在分布式領(lǐng)域要更搶手的原因之一吧!
項目地址
https://github.com/csunny/py-bitcoin/
參考
https://medium.com/@mycoralhealth/code-your-own-proof-of-stake-blockchain-in-go-610cd99aa658
總結(jié)
以上所述是小編給大家介紹的Python實現(xiàn)基于POS算法的區(qū)塊鏈,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
Django使用裝飾器限制對視圖的訪問及實現(xiàn)原理
除了可以在視圖處理中校驗用戶身份以及驗證用戶權(quán)限之外,Django還提供了便捷的裝飾器來完成這兩類校驗,下面介紹這兩個裝飾器的使用方法與實現(xiàn)原理,對Django裝飾器限制視圖訪問相關(guān)知識感興趣的朋友一起看看吧2022-10-10
使用SQLAlchemy操作數(shù)據(jù)庫表過程解析
這篇文章主要介紹了使用SQLAlchemy操作數(shù)據(jù)庫表過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-06-06
Python+Tkinter實現(xiàn)RGB數(shù)值轉(zhuǎn)換為16進制碼
這篇文章主要為大家詳細介紹了Python如何利用Tkinter編寫一個RGB數(shù)值轉(zhuǎn)換為16進制碼的小工具,文中的示例代講解詳細,感興趣的小伙伴可以了解一下2023-01-01
Python利用arcpy模塊實現(xiàn)柵格的創(chuàng)建與拼接
這篇文章主要為大家詳細介紹了如何基于Python語言arcpy模塊,實現(xiàn)柵格影像圖層建立與多幅遙感影像數(shù)據(jù)批量拼接(Mosaic)的操作,感興趣的可以了解一下2023-02-02
python 時間 T 去掉 帶上ms 毫秒 時間格式的操作
這篇文章主要介紹了python 時間 T 去掉 帶上ms 毫秒 時間格式的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04

