python kafka 多線程消費(fèi)者&手動(dòng)提交實(shí)例
官方文檔:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
import threading
import os
import sys
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from consumers.db_util import *
from consumers.json_dispose import *
from collections import OrderedDict
threads = []
# col_dic, sql_dic = get()
class MyThread(threading.Thread):
def __init__(self, thread_name, topic, partition):
threading.Thread.__init__(self)
self.thread_name = thread_name
# self.keyName = keyName
self.partition = partition
self.topic = topic
def run(self):
print("Starting " + self.name)
Consumer(self.thread_name, self.topic, self.partition)
def stop(self):
sys.exit()
def Consumer(thread_name, topic, partition):
broker_list = '172.16.90.63:6667, 172.16.90.58:6667, 172.16.90.59:6667'
'''
fetch_min_bytes(int) - 服務(wù)器為獲取請(qǐng)求而返回的最小數(shù)據(jù)量,否則請(qǐng)等待
fetch_max_wait_ms(int) - 如果沒(méi)有足夠的數(shù)據(jù)立即滿足fetch_min_bytes給出的要求,服務(wù)器在回應(yīng)提取請(qǐng)求之前將阻塞的最大時(shí)間量(以毫秒為單位)
fetch_max_bytes(int) - 服務(wù)器應(yīng)為獲取請(qǐng)求返回的最大數(shù)據(jù)量。這不是絕對(duì)最大值,如果獲取的第一個(gè)非空分區(qū)中的第一條消息大于此值,
則仍將返回消息以確保消費(fèi)者可以取得進(jìn)展。注意:使用者并行執(zhí)行對(duì)多個(gè)代理的提取,因此內(nèi)存使用將取決于包含該主題分區(qū)的代理的數(shù)量。
支持的Kafka版本> = 0.10.1.0。默認(rèn)值:52428800(50 MB)。
enable_auto_commit(bool) - 如果為True,則消費(fèi)者的偏移量將在后臺(tái)定期提交。默認(rèn)值:True。
max_poll_records(int) - 單次調(diào)用中返回的最大記錄數(shù)poll()。默認(rèn)值:500
max_poll_interval_ms(int) - poll()使用使用者組管理時(shí)的調(diào)用之間的最大延遲 。這為消費(fèi)者在獲取更多記錄之前可以閑置的時(shí)間量設(shè)置了上限。
如果 poll()在此超時(shí)到期之前未調(diào)用,則認(rèn)為使用者失敗,并且該組將重新平衡以便將分區(qū)重新分配給另一個(gè)成員。默認(rèn)300000
'''
consumer = KafkaConsumer(bootstrap_servers=broker_list,
group_id="xiaofesi",
client_id=thread_name,
enable_auto_commit=False,
fetch_min_bytes=1024*1024,#1M
# fetch_max_bytes=1024 * 1024 * 1024 * 10,
fetch_max_wait_ms=60000,#30s
request_timeout_ms=305000,
# consumer_timeout_ms=1,
# max_poll_records=5000,
# max_poll_interval_ms=60000 無(wú)該參數(shù)
)
#查出數(shù)據(jù)庫(kù)上次保存的offset,此offset已經(jīng)是上次消費(fèi)最后一條的offset的offset+1,也就是這次消費(fèi)的起始位
dic = get_kafka(topic, partition)
tp = TopicPartition(topic, partition)
print(thread_name, tp, dic['offset'])
#分配該消費(fèi)者的TopicPartition,也就是topic和partition,根據(jù)參數(shù),我是三個(gè)消費(fèi)者,三個(gè)線程,每個(gè)線程消費(fèi)者消費(fèi)一個(gè)分區(qū)
consumer.assign([tp])
#重置此消費(fèi)者消費(fèi)的起始位
consumer.seek(tp, dic['offset'])
print("程序首次運(yùn)行\(zhòng)t線程:", thread_name, "分區(qū):", partition, "偏移量:", dic['offset'], "\t開(kāi)始消費(fèi)...")
num=0 #記錄該消費(fèi)者消費(fèi)次數(shù)
# end_offset = consumer.end_offsets([tp])[tp]
# print(end_offset)
while True:
args = OrderedDict()
msg = consumer.poll(timeout_ms=60000)
end_offset = consumer.end_offsets([tp])[tp]
print('已保存的偏移量', consumer.committed(tp),'最新偏移量,',end_offset)
if len(msg) > 0:
print("線程:", thread_name, "分區(qū):", partition, "最大偏移量:", end_offset, "有無(wú)數(shù)據(jù),", len(msg))
lines=0
for data in msg.values():
for line in data:
lines+=1
line = eval(line.value.decode('utf-8'))
'''
do something
'''
# 線程此批次消息條數(shù)
print(thread_name,"lines",lines)
#數(shù)據(jù)保存至數(shù)據(jù)庫(kù)
is_succeed = save_to_db(args, thread_name)
if is_succeed:
#更新自己保存在數(shù)據(jù)庫(kù)中的各topic, partition的偏移量
is_succeed1 = update_offset(topic, partition, end_offset)
#手動(dòng)提交偏移量 offsets格式:{TopicPartition:OffsetAndMetadata(offset_num,None)}
consumer.commit(offsets={tp:(OffsetAndMetadata(end_offset,None))})
print(thread_name,"to db suss",num+1)
if is_succeed1 == 0:
#系統(tǒng)退出?這個(gè)還沒(méi)試
os.exit()
'''
sys.exit() 只能退出該線程,也就是說(shuō)其它兩個(gè)線程正常運(yùn)行,主程序不退出
'''
else:
os.exit()
else:
print(thread_name,'沒(méi)有數(shù)據(jù)')
num+=1
print(thread_name,"第",num,"次")
if __name__ == '__main__':
try:
t1 = MyThread("Thread-0", "test", 0)
threads.append(t1)
t2 = MyThread("Thread-1", "test", 1)
threads.append(t2)
t3 = MyThread("Thread-2", "test", 2)
threads.append(t3)
for t in threads:
t.start()
for t in threads:
t.join()
print("exit program with 0")
except:
print("Error: failed to run consumer program")
以上這篇python kafka 多線程消費(fèi)者&手動(dòng)提交實(shí)例就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- 在python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)傳輸?shù)姆椒?/a>
- kafka-python批量發(fā)送數(shù)據(jù)的實(shí)例
- 對(duì)python操作kafka寫入json數(shù)據(jù)的簡(jiǎn)單demo分享
- python3實(shí)現(xiàn)從kafka獲取數(shù)據(jù),并解析為json格式,寫入到mysql中
- python消費(fèi)kafka數(shù)據(jù)批量插入到es的方法
- python 消費(fèi) kafka 數(shù)據(jù)教程
- python3連接kafka模塊pykafka生產(chǎn)者簡(jiǎn)單封裝代碼
- python每5分鐘從kafka中提取數(shù)據(jù)的例子
- python操作kafka實(shí)踐的示例代碼
- 快速上手Python Kafka庫(kù)安裝攻略
相關(guān)文章
Python?Pygame實(shí)戰(zhàn)之紅心大戰(zhàn)游戲的實(shí)現(xiàn)
說(shuō)起Windows自帶的游戲,相信許多80、90后的朋友都不陌生。本文就將利用Python中的Pygame模塊實(shí)現(xiàn)一下windows經(jīng)典游戲之一的紅心大戰(zhàn),需要的可以參考一下2022-02-02
python編程開(kāi)發(fā)之日期操作實(shí)例分析
這篇文章主要介紹了python編程開(kāi)發(fā)之日期操作,以實(shí)例形式較為詳細(xì)的分析了Python中datetime與time庫(kù)的相關(guān)使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-11-11
python3.9安裝RobotFramework的簡(jiǎn)單教程
python3.9安裝RobotFramework,不同于python2.7和python3.6,使用這兩個(gè)版本安裝會(huì)出現(xiàn)問(wèn)題,因?yàn)槲野惭b遇到問(wèn)題發(fā)現(xiàn)沒(méi)有最新的教程,所以打算自己寫一個(gè),同時(shí)下面會(huì)記錄安裝步驟及使用的方法會(huì)出現(xiàn)的一些問(wèn)題,對(duì)python3.9安裝RobotFramework感興趣的朋友一起看看吧2023-01-01
使用Python實(shí)現(xiàn)將Word文檔轉(zhuǎn)換為PNG圖片
在這篇博客中,我將介紹一個(gè)使用Python編寫的小工具,它能夠?qū)⒅付ㄎ募A中的所有Word文檔轉(zhuǎn)換為PNG圖片,這個(gè)工具基于wxPython庫(kù)構(gòu)建圖形用戶界面,接下來(lái),我將詳細(xì)說(shuō)明這個(gè)工具的功能及其實(shí)現(xiàn),需要的朋友可以參考下2024-08-08
python文字轉(zhuǎn)語(yǔ)音實(shí)現(xiàn)過(guò)程解析
這篇文章主要介紹了python文字轉(zhuǎn)語(yǔ)音實(shí)現(xiàn)過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11
Python爬蟲之Selenium鼠標(biāo)事件的實(shí)現(xiàn)
這篇文章主要介紹了Python爬蟲之Selenium鼠標(biāo)事件的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12
Python實(shí)現(xiàn)批量下載ts文件并合并為mp4
這篇文章主要為大家詳細(xì)介紹了如何通過(guò)Python語(yǔ)言實(shí)現(xiàn)批量下載ts文件并合并為mp4視頻的功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2023-06-06

