Python如何把Spark數(shù)據(jù)寫入ElasticSearch
這里以將Apache的日志寫入到ElasticSearch為例,來演示一下如何使用Python將Spark數(shù)據(jù)導(dǎo)入到ES中。
實(shí)際工作中,由于數(shù)據(jù)與使用框架或技術(shù)的復(fù)雜性,數(shù)據(jù)的寫入變得比較復(fù)雜,在這里我們簡單演示一下。
如果使用Scala或Java的話,Spark提供自帶了支持寫入ES的支持庫,但Python不支持。所以首先你需要去這里下載依賴的ES官方開發(fā)的依賴包包。
下載完成后,放在本地目錄,以下面命令方式啟動pyspark:
pyspark --jars elasticsearch-hadoop-6.4.1.jar
如果你想pyspark使用Python3,請?jiān)O(shè)置環(huán)境變量:
export PYSPARK_PYTHON=/usr/bin/python3
理解如何寫入ES的關(guān)鍵是要明白,ES是一個JSON格式的數(shù)據(jù)庫,它有一個必須的要求。數(shù)據(jù)格式必須采用以下格式
{ "id: { the rest of your json}}
往下會展示如何轉(zhuǎn)換成這種格式。
解析Apache日志文件
我們將Apache的日志文件讀入,構(gòu)建Spark RDD。然后我們寫一個parse()函數(shù)用正則表達(dá)式處理每條日志,提取我們需要的字
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
def parse(str):
s=p.match(str)
d = {}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
return d
換句話說,我們剛開始從日志文件讀入RDD的數(shù)據(jù)類似如下:
['83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']
然后我們使用map函數(shù)轉(zhuǎn)換每條記錄:
rdd2 = rdd.map(parse)
rdd2.take(1)
[{'date': '17/May/2015:10:05:03 +0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]
現(xiàn)在看起來像JSON,但并不是JSON字符串,我們需要使用json.dumps將dict對象轉(zhuǎn)換。
我們同時增加一個doc_id字段作為整個JSON的ID。在配置ES中我們增加如下配置“es.mapping.id”: “doc_id”告訴ES我們將這個字段作為ID。
這里我們使用SHA算法,將這個JSON字符串作為參數(shù),得到一個唯一ID。
計(jì)算結(jié)果類似如下,可以看到ID是一個很長的SHA數(shù)值。
rdd3.take(1)
[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03 +0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]
現(xiàn)在我們需要制定ES配置,比較重要的兩項(xiàng)是:
- “es.resource” : ‘walker/apache': "walker"是索引,apache是類型,兩者一般合稱索引
- “es.mapping.id”: “doc_id”: 告訴ES那個字段作為整個文檔的ID,也就是查詢結(jié)果中的_id
其他的配置自己去探索。
然后我們使用saveAsNewAPIHadoopFile()將RDD寫入到ES。這部分代碼對于所有的ES都是一樣的,比較固定,不需要理解每一個細(xì)節(jié)
es_write_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : 'walker/apache',
"es.input.json": "yes",
"es.mapping.id": "doc_id"
}
rdd3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
rdd3 = rdd2.map(addID)
def addId(data):
j=json.dumps(data).encode('ascii', 'ignore')
data['doc_id'] = hashlib.sha224(j).hexdigest()
return (data['doc_id'], json.dumps(data))
最后我們可以使用curl進(jìn)行查詢
curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*
{
"_index" : "walker",
"_type" : "apache",
"_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
"_score" : 1.0,
"_source" : {
"date" : "17/May/2015:10:05:32 +0000",
"ip" : "91.177.205.119",
"operation" : "GET",
"doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",
"uri" : "/favicon.ico"
}
如下是所有代碼:
import json
import hashlib
import re
def addId(data):
j=json.dumps(data).encode('ascii', 'ignore')
data['doc_id'] = hashlib.sha224(j).hexdigest()
return (data['doc_id'], json.dumps(data))
def parse(str):
s=p.match(str)
d = {}
d['ip']=s.group(1)
d['date']=s.group(4)
d['operation']=s.group(5)
d['uri']=s.group(6)
return d
regex='^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
p=re.compile(regex)
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")
rdd2 = rdd.map(parse)
rdd3 = rdd2.map(addID)
es_write_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : 'walker/apache',
"es.input.json": "yes",
"es.mapping.id": "doc_id"
}
rdd3.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
也可以這么封裝,其實(shí)原理是一樣的
import hashlib
import json
from pyspark import Sparkcontext
def make_md5(line):
md5_obj=hashlib.md5()
md5_obj.encode(line)
return md5_obj.hexdigest()
def parse(line):
dic={}
l = line.split('\t')
doc_id=make_md5(line)
dic['name']=l[1]
dic['age'] =l[2]
dic['doc_id']=doc_id
return dic #記得這邊返回的是字典類型的,在寫入es之前要記得dumps
def saveData2es(pdd, es_host, port,index, index_type, key):
"""
把saprk的運(yùn)行結(jié)果寫入es
:param pdd: 一個rdd類型的數(shù)據(jù)
:param es_host: 要寫es的ip
:param index: 要寫入數(shù)據(jù)的索引
:param index_type: 索引的類型
:param key: 指定文檔的id,就是要以文檔的那個字段作為_id
:return:
"""
#實(shí)例es客戶端記得單例模式
if es.exist.index(index):
es.index.create(index, 'spo')
es_write_conf = {
"es.nodes": es_host,
"es.port": port,
"es.resource": index/index_type,
"es.input.json": "yes",
"es.mapping.id": key
}
(pdd.map(lambda _dic: ('', json.dumps(_dic)))) #這百年是為把這個數(shù)據(jù)構(gòu)造成元組格式,如果傳進(jìn)來的_dic是字典則需要jdumps,如果傳進(jìn)來之前就已經(jīng)dumps,這便就不需要dumps了
.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
)
if __name__ == '__main__':
#實(shí)例化sp對象
sc=Sparkcontext()
#文件中的呢內(nèi)容一行一行用sc的讀取出來
json_text=sc.textFile('./1.txt')
#進(jìn)行轉(zhuǎn)換
json_data=json_text.map(lambda line:parse(line))
saveData2es(json_data,'127.0.01','9200','index_test','index_type','doc_id')
sc.stop()
看到了把,面那個例子在寫入es之前加了一個id,返回一個元組格式的,現(xiàn)在這個封裝指定_id就會比較靈活了
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
在python中實(shí)現(xiàn)同行輸入/接收多個數(shù)據(jù)的示例
今天小編就為大家分享一篇在python中實(shí)現(xiàn)同行輸入/接收多個數(shù)據(jù)的示例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07
Python連接Oracle數(shù)據(jù)庫的操作指南
Oracle數(shù)據(jù)庫是一種強(qiáng)大的企業(yè)級關(guān)系數(shù)據(jù)庫管理系統(tǒng)(RDBMS),而Python是一門流行的編程語言,兩者的結(jié)合可以提供出色的數(shù)據(jù)管理和分析能力,本教程將詳細(xì)介紹如何在Python中連接Oracle數(shù)據(jù)庫,并演示常見的數(shù)據(jù)庫任務(wù),需要的朋友可以參考下2023-11-11
通過Jython調(diào)用Python腳本的實(shí)現(xiàn)方法
Jython 是 Python 的純 Java 實(shí)現(xiàn)。她無縫地結(jié)合了 Java 類與 Python,使用戶能以 Python 語言的語法編寫在 Java 虛擬機(jī)上運(yùn)行的 軟件,本文重點(diǎn)給大家介紹通過Jython調(diào)用Python腳本的實(shí)現(xiàn)方法,一起看看吧2021-06-06
Python數(shù)據(jù)分析之?Matplotlib?3D圖詳情
本文主要介紹了Python數(shù)據(jù)分析之Matplotlib 3D圖詳情,Matplotlib提供了mpl_toolkits.mplot3d工具包來進(jìn)行3D圖表的繪制,下文總結(jié)了更多相關(guān)資料,需要的小伙伴可以參考一下2022-05-05
Python 中PyQt5 點(diǎn)擊主窗口彈出另一個窗口的實(shí)現(xiàn)方法
這篇文章主要介紹了Python 中PyQt5 點(diǎn)擊主窗口彈出另一個窗口的實(shí)現(xiàn)方法,本文代碼實(shí)例圖文相結(jié)合的形式給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-07-07
python使用py2neo創(chuàng)建neo4j的節(jié)點(diǎn)和關(guān)系
這篇文章主要介紹了python使用py2neo創(chuàng)建neo4j的節(jié)點(diǎn)和關(guān)系,第一步使用py2neo連接neo4j的方法然后根據(jù)dict創(chuàng)建Node,更多相關(guān)資料需要的朋友參考下面文章內(nèi)容2022-02-02
python binascii 進(jìn)制轉(zhuǎn)換實(shí)例
今天小編就為大家分享一篇python binascii 進(jìn)制轉(zhuǎn)換實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-06-06

