python消費(fèi)kafka數(shù)據(jù)批量插入到es的方法
1、es的批量插入
這是為了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch來(lái)實(shí)現(xiàn)批量操作,先安裝依賴(lài)包,sudo pip install Elasticsearch2
from elasticsearch import Elasticsearch
class ImportEsData:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
def __init__(self,hosts,index,type):
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
self.index = index
self.type = type
def set_date(self,data):
# 批量處理
# es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
self.es.index(index=self.index,doc_type=self.index,body=data)
2、使用pykafka消費(fèi)kafka
1.因?yàn)閗afka是0.8,pykafka不支持zk,只能用get_simple_consumer來(lái)實(shí)現(xiàn)
2.為了實(shí)現(xiàn)多個(gè)應(yīng)用同時(shí)消費(fèi)而且不重消費(fèi),所以一個(gè)應(yīng)用消費(fèi)一個(gè)partition
3. 為是確保消費(fèi)數(shù)據(jù)量在不滿(mǎn)足10000這個(gè)批量值,能在一個(gè)時(shí)間范圍內(nèi)插入到es中,這里設(shè)置consumer_timeout_ms一個(gè)超時(shí)等待時(shí)間,退出等待消費(fèi)阻塞。
4.退出等待消費(fèi)阻塞后導(dǎo)致無(wú)法再消費(fèi)數(shù)據(jù),因此在獲取self.consumer 的外層加入了while True 一個(gè)死循環(huán)
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime
class KafkaPython:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
logger_data = logging.getLogger("data")
def __init__(self):
self.server = ConfigUtil().get("kafka","kafka_server")
self.topic = ConfigUtil().get("kafka","topic")
self.group = ConfigUtil().get("kafka","group")
self.partition_id = int(ConfigUtil().get("kafka","partition"))
self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
self.consumer = None
self.hosts = ConfigUtil().get("es","hosts")
self.index_name = ConfigUtil().get("es","index_name")
self.type_name = ConfigUtil().get("es","type_name")
def getConnect(self):
client = KafkaClient(self.server)
topic = client.topics[self.topic]
p = topic.partitions
ps={p.get(self.partition_id)}
self.consumer = topic.get_simple_consumer(
consumer_group=self.group,
auto_commit_enable=True,
consumer_timeout_ms=self.consumer_timeout_ms,
# num_consumer_fetchers=1,
# consumer_id='test1',
partitions=ps
)
self.starttime = datetime.datetime.now()
def beginConsumer(self):
print("beginConsumer kafka-python")
imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
#創(chuàng)建ACTIONS
count = 0
ACTIONS = []
while True:
endtime = datetime.datetime.now()
print (endtime - self.starttime).seconds
for message in self.consumer:
if message is not None:
try:
count = count + 1
# print(str(message.partition.id)+","+str(message.offset)+","+str(count))
# self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
action = {
"_index": self.index_name,
"_type": self.type_name,
"_source": message.value
}
ACTIONS.append(action)
if len(ACTIONS) >= 10000:
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self.consumer.commit_offsets()
endtime = datetime.datetime.now()
print (endtime - self.starttime).seconds
#break
except (Exception) as e:
# self.consumer.commit_offsets()
print(e)
self.logger.error(e)
self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
# self.logger_data.error(message.value+"\n")
# self.consumer.commit_offsets()
if len(ACTIONS) > 0:
self.logger.info("等待時(shí)間超過(guò),consumer_timeout_ms,把集合數(shù)據(jù)插入es")
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self.consumer.commit_offsets()
def disConnect(self):
self.consumer.close()
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ImportEsData:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
def __init__(self,hosts,index,type):
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
self.index = index
self.type = type
def set_date(self,data):
# 批量處理
success = bulk(self.es, data, index=self.index, raise_on_error=True)
self.logger.info(success)
3、運(yùn)行
if __name__ == '__main__': kp = KafkaPython() kp.getConnect() kp.beginConsumer() # kp.disConnect()
注:簡(jiǎn)單的寫(xiě)了一個(gè)從kafka中讀取數(shù)據(jù)到一個(gè)list里,當(dāng)數(shù)據(jù)達(dá)到一個(gè)閾值時(shí),在批量插入到 es的插件
現(xiàn)在還在批量的壓測(cè)中。。。
以上這篇python消費(fèi)kafka數(shù)據(jù)批量插入到es的方法就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- 在python環(huán)境下運(yùn)用kafka對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)傳輸?shù)姆椒?/a>
- kafka-python批量發(fā)送數(shù)據(jù)的實(shí)例
- 對(duì)python操作kafka寫(xiě)入json數(shù)據(jù)的簡(jiǎn)單demo分享
- python3實(shí)現(xiàn)從kafka獲取數(shù)據(jù),并解析為json格式,寫(xiě)入到mysql中
- python kafka 多線(xiàn)程消費(fèi)者&手動(dòng)提交實(shí)例
- python 消費(fèi) kafka 數(shù)據(jù)教程
- python3連接kafka模塊pykafka生產(chǎn)者簡(jiǎn)單封裝代碼
- python每5分鐘從kafka中提取數(shù)據(jù)的例子
- python操作kafka實(shí)踐的示例代碼
- 快速上手Python Kafka庫(kù)安裝攻略
相關(guān)文章
Python使用Turtle圖形函數(shù)畫(huà)圖顏色填充實(shí)例
這篇文章主要介紹了Python使用Turtle圖形函數(shù)畫(huà)圖顏色填充實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
關(guān)于WARNING:Ignoring?invalid?distribution?-pencv-python....
這篇文章主要給大家介紹了關(guān)于WARNING:Ignoring?invalid?distribution?-pencv-python....警告信息的處理方法,文中通過(guò)圖文將解決的辦法介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用python具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2023-03-03
Pytorch反向求導(dǎo)更新網(wǎng)絡(luò)參數(shù)的方法
今天小編就為大家分享一篇Pytorch反向求導(dǎo)更新網(wǎng)絡(luò)參數(shù)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-08-08
python爬蟲(chóng)使用scrapy注意事項(xiàng)
在本篇文章里小編給大家整理的是一篇關(guān)于python爬蟲(chóng)使用scrapy注意事項(xiàng)的相關(guān)文章,對(duì)此有興趣的朋友們可以學(xué)習(xí)下。2020-11-11
使用python讀寫(xiě)txt和json(jsonl)大文件的方法步驟
在Python中讀取txt和json(jsonl)大文件并保存到字典是一項(xiàng)非常常見(jiàn)的操作,這篇文章主要給大家介紹了關(guān)于使用python讀寫(xiě)txt和json(jsonl)大文件的方法步驟,需要的朋友可以參考下2023-12-12
python3實(shí)現(xiàn)字符串的全排列的方法(無(wú)重復(fù)字符)
這篇文章主要介紹了python3實(shí)現(xiàn)字符串的全排列的方法(無(wú)重復(fù)字符),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-07-07

