基于asyncio 異步協(xié)程框架實現(xiàn)收集B站直播彈幕
前言
雖然標題是全站,但目前只做了等級 top 100 直播間的全天彈幕收集。
彈幕收集系統(tǒng)基于之前的B 站直播彈幕姬 Python 版修改而來。具體協(xié)議分析可以看上一篇文章。
直播彈幕協(xié)議是直接基于 TCP 協(xié)議,所以如果 B 站對類似我這種行為做反制措施,比較困難。應(yīng)該有我不知道的技術(shù)手段來檢測類似我這種惡意行為。
我試過同時連接 100 個房間,和連接單個房間 100 次的實驗,都沒有問題。>150 會被關(guān)閉鏈接。
直播間的選取
現(xiàn)在彈幕收集系統(tǒng)在選取直播間上比較簡單,直接選取了等級 top100。
以后會修改這部分,改成定時去 http://live.bilibili.com/all 查看新開播的直播間,并動態(tài)添加任務(wù)。
異步任務(wù)和彈幕存儲
收集系統(tǒng)仍舊使用了 asyncio 異步協(xié)程框架,對于每一個直播間都使用如下方法來加進 loop 中。
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task1 = asyncio.ensure_future(danmuji.connectServer()) task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
其實若將心跳任務(wù) HeartbeatLoop 放入 connectorServer 中去啟動,代碼看起來更優(yōu)雅一些。但這么做是因為我需要維護一個任務(wù)列表,后面會有描述。
在彈幕存儲上我花了些時間選擇。
數(shù)據(jù)庫存儲是一個同步 IO 的過程,Insert 的時候會阻塞彈幕收集的任務(wù)。雖然有 aiomysql 這種異步接口,但配置數(shù)據(jù)庫太麻煩,我的設(shè)想是這個小系統(tǒng)能夠方便地部署。
最終我選擇使用自帶的 sqlite3。但 sqlite3 無法做并行操作,故開了一個線程單獨進行數(shù)據(jù)庫存儲。在另一個線程中,100 * 2 個任務(wù)搜集所有的彈幕、人數(shù)信息,并塞進隊列 commentq, numq 中。存儲線程每隔 10s 喚醒一次,將隊列中的數(shù)據(jù)寫進 sqlite3 中,并清空隊列。
在多線程和異步的配合下,網(wǎng)絡(luò)流量沒有被阻塞。
可能的連接失敗場景處理
彈幕協(xié)議是直接基于 TCP,位與位直接關(guān)聯(lián)性較強,一旦解析錯誤,很容易就拋 Exception(個人感覺,雖然 TCP 是可靠傳輸,但B站服務(wù)器自身發(fā)生錯誤也是有可能的)。所以有必要設(shè)計一個自動重連機制。
在 asyncio 文檔中提到,
Done means either that a result / exception are available, or that the future was cancelled.
函數(shù)正常返回、拋出異常或者是被 cancel,都會退出當前任務(wù)??梢允褂?done() 來判斷。
每一個直播間對應(yīng)兩個任務(wù),解析任務(wù)是最容易掛的,但并不會影響心跳任務(wù),所以必須找出并將對應(yīng)心跳任務(wù)結(jié)束。
在創(chuàng)建任務(wù)的時候使用字典記錄每個房間的兩個任務(wù),
self.tasks[url] = [task1, task2]
在運行過程中,每隔 10s 做一次檢查,
for url in self.tasks:
item = self.tasks[url]
task1 = item[0]
task2 = item[1]
if task1.done() == True or task2.done() == True:
if task1.done() == False:
task1.cancel()
if task2.done() == False:
task2.cancel()
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task11 = asyncio.ensure_future(danmuji.connectServer())
task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
self.tasks[url] = [task11, task22]
實際我只見過一次任務(wù)失敗的場景,是因為主播房間被封了,導(dǎo)致無法進入直播間。
結(jié)論
- B站人數(shù)是按照連接彈幕服務(wù)器的鏈接數(shù)量統(tǒng)計的。通過操縱鏈接量,可以瞬間增加任意人數(shù)觀看,有商機?
- 運行的這幾天中,發(fā)現(xiàn)即使大部分房間不在直播,也能有 >5 的人數(shù),包括凌晨。我只能猜測也有和我一樣的人在 24h 收集彈幕。
- top100 平均一天 40M 彈幕數(shù)據(jù)。
- 收集的彈幕能做什么?還沒想好,可能可以拿來做用戶行為分析 -_^
最后附上本源碼的GITHUB地址 https://github.com/lyyyuna/bilibili_danmu_colloector
相關(guān)文章
TensorFlow自定義損失函數(shù)來預(yù)測商品銷售量
這篇文章主要介紹了TensorFlow自定義損失函數(shù)——預(yù)測商品銷售量,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02
Python原始字符串與Unicode字符串操作符用法實例分析
這篇文章主要介紹了Python原始字符串與Unicode字符串操作符用法,結(jié)合實例形式分析了Python針對原始字符與Unicode字符的操作符用法,需要的朋友可以參考下2017-07-07
Python利用yield?form實現(xiàn)異步協(xié)程爬蟲
這篇文章主要為大家詳細介紹了Python如何利用yield?form實現(xiàn)異步協(xié)程爬蟲。其實這是很古老的用法了,現(xiàn)在大多用的aiohttp庫實現(xiàn),這篇記錄僅僅用做個人的協(xié)程底層實現(xiàn)的學(xué)習(xí),希望對大家有所幫助2022-11-11
詳解Python網(wǎng)絡(luò)框架Django和Scrapy安裝指南
這篇文章主要介紹了詳解Python網(wǎng)絡(luò)框架Django和Scrapy安裝指南,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-04-04
python實現(xiàn)修改固定模式的字符串內(nèi)容操作示例
這篇文章主要介紹了python實現(xiàn)修改固定模式的字符串內(nèi)容操作,結(jié)合實例形式詳細分析了Python修改固定模式字符串原理、實現(xiàn)方法及相關(guān)操作注意事項,需要的朋友可以參考下2019-12-12
python 兩個一樣的字符串用==結(jié)果為false問題的解決
這篇文章主要介紹了python 兩個一樣的字符串用==結(jié)果為false問題的解決,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-03-03
TensorFlow實現(xiàn)創(chuàng)建分類器
這篇文章主要為大家詳細介紹了TensorFlow實現(xiàn)創(chuàng)建分類器,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-02-02

