如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作
前言:
模擬學(xué)生成績信息寫入es數(shù)據(jù)庫,包括姓名、性別、科目、成績。
示例代碼1:【一次性寫入10000*1000條數(shù)據(jù)】 【本人親測耗時(shí)5100秒】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import random
import time
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
datas = []
start = time.time()
# 開始批量寫入es數(shù)據(jù)庫
# 批量寫入數(shù)據(jù)
for j in range(1000):
print(j)
action = [
{
"_index": "grade",
"_type": "doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"subject": random.choice(subjects),
"grade": random.choice(grades)
}
} for i in range(10000 * j, 10000 * j + 10000)
]
helpers.bulk(es, action)
end = time.time()
print('花費(fèi)時(shí)間:', end - start)elasticsearch-head中顯示:

示例代碼2:【一次性寫入10000*5000條數(shù)據(jù)】 【本人親測耗時(shí)23000秒】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import random
import time
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
datas = []
start = time.time()
# 開始批量寫入es數(shù)據(jù)庫
# 批量寫入數(shù)據(jù)
for j in range(5000):
print(j)
action = [
{
"_index": "grade3",
"_type": "doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"subject": random.choice(subjects),
"grade": random.choice(grades)
}
} for i in range(10000 * j, 10000 * j + 10000)
]
helpers.bulk(es, action)
end = time.time()
print('花費(fèi)時(shí)間:', end - start)
示例代碼3:【一次性寫入10000*9205條數(shù)據(jù)】 【耗時(shí)過長】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import random
import time
es = Elasticsearch(hosts='http://127.0.0.1:9200')
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
subjects = ['語文', '數(shù)學(xué)', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
datas = []
start = time.time()
# 開始批量寫入es數(shù)據(jù)庫
# 批量寫入數(shù)據(jù)
for j in range(9205):
print(j)
action = [
{
"_index": "grade2",
"_type": "doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"subject": random.choice(subjects),
"grade": random.choice(grades)
}
} for i in range(10000*j, 10000*j+10000)
]
helpers.bulk(es, action)
end = time.time()
print('花費(fèi)時(shí)間:', end - start)
查詢數(shù)據(jù)并計(jì)算各種方式的成績總分。
示例代碼4:【一次性獲取所有的數(shù)據(jù),在程序中分別計(jì)算所耗的時(shí)間】
from elasticsearch import Elasticsearch
import time
def search_data(es, size=10):
query = {
"query": {
"match_all": {}
}
}
res = es.search(index='grade', body=query, size=size)
# print(res)
return res
if __name__ == '__main__':
start = time.time()
es = Elasticsearch(hosts='http://192.168.1.1:9200')
# print(es)
size = 10000
res = search_data(es, size)
# print(type(res))
# total = res['hits']['total']['value']
# print(total)
all_source = []
for i in range(size):
source = res['hits']['hits'][i]['_source']
all_source.append(source)
# print(source)
# 統(tǒng)計(jì)查詢出來的所有學(xué)生的所有課程的所有成績的總成績
start1 = time.time()
all_grade = 0
for data in all_source:
all_grade += int(data['grade'])
print('所有學(xué)生總成績之和:', all_grade)
end1 = time.time()
print("耗時(shí):", end1 - start1)
# 統(tǒng)計(jì)查詢出來的每個(gè)學(xué)生的所有課程的所有成績的總成績
start2 = time.time()
names1 = []
all_name_grade = {}
for data in all_source:
if data['name'] in names1:
all_name_grade[data['name']] += data['grade']
else:
names1.append(data['name'])
all_name_grade[data['name']] = data['grade']
print(all_name_grade)
end2 = time.time()
print("耗時(shí):", end2 - start2)
# 統(tǒng)計(jì)查詢出來的每個(gè)學(xué)生的每門課程的所有成績的總成績
start3 = time.time()
names2 = []
subjects = []
all_name_all_subject_grade = {}
for data in all_source:
if data['name'] in names2:
if all_name_all_subject_grade[data['name']].get(data['subject']):
all_name_all_subject_grade[data['name']][data['subject']] += data['grade']
else:
all_name_all_subject_grade[data['name']][data['subject']] = data['grade']
else:
names2.append(data['name'])
all_name_all_subject_grade[data['name']] = {}
all_name_all_subject_grade[data['name']][data['subject']] = data['grade']
print(all_name_all_subject_grade)
end3 = time.time()
print("耗時(shí):", end3 - start3)
end = time.time()
print('總耗時(shí):', end - start)運(yùn)行結(jié)果:

在示例代碼4中當(dāng)把size由10000改為 2000000時(shí),運(yùn)行效果如下所示:

在項(xiàng)目中一般不用上述代碼4中所統(tǒng)計(jì)成績的方法,面對(duì)大量的數(shù)據(jù)是比較耗時(shí)的,要使用es中的聚合查詢。計(jì)算數(shù)據(jù)中所有成績之和。
示例代碼5:【使用普通計(jì)算方法和聚類方法做對(duì)比驗(yàn)證】
from elasticsearch import Elasticsearch
import time
def search_data(es, size=10):
query = {
"query": {
"match_all": {}
}
}
res = es.search(index='grade', body=query, size=size)
# print(res)
return res
def search_data2(es, size=10):
query = {
"aggs": {
"all_grade": {
"terms": {
"field": "grade",
"size": 1000
}
}
}
}
res = es.search(index='grade', body=query, size=size)
# print(res)
return res
if __name__ == '__main__':
start = time.time()
es = Elasticsearch(hosts='http://127.0.0.1:9200')
size = 2000000
res = search_data(es, size)
all_source = []
for i in range(size):
source = res['hits']['hits'][i]['_source']
all_source.append(source)
# print(source)
# 統(tǒng)計(jì)查詢出來的所有學(xué)生的所有課程的所有成績的總成績
start1 = time.time()
all_grade = 0
for data in all_source:
all_grade += int(data['grade'])
print('200萬數(shù)據(jù)所有學(xué)生總成績之和:', all_grade)
end1 = time.time()
print("耗時(shí):", end1 - start1)
end = time.time()
print('200萬數(shù)據(jù)總耗時(shí):', end - start)
# 聚合操作
start_aggs = time.time()
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# size = 2000000
size = 0
res = search_data2(es, size)
# print(res)
aggs = res['aggregations']['all_grade']['buckets']
print(aggs)
sum = 0
for agg in aggs:
sum += (agg['key'] * agg['doc_count'])
print('1000萬數(shù)據(jù)總成績之和:', sum)
end_aggs = time.time()
print('1000萬數(shù)據(jù)總耗時(shí):', end_aggs - start_aggs)運(yùn)行結(jié)果:

計(jì)算數(shù)據(jù)中每個(gè)同學(xué)的各科總成績之和。
示例代碼6: 【子聚合】【先分組,再計(jì)算】
from elasticsearch import Elasticsearch
import time
def search_data(es, size=10):
query = {
"query": {
"match_all": {}
}
}
res = es.search(index='grade', body=query, size=size)
# print(res)
return res
def search_data2(es):
query = {
"size": 0,
"aggs": {
"all_names": {
"terms": {
"field": "name.keyword",
"size": 10
},
"aggs": {
"total_grade": {
"sum": {
"field": "grade"
}
}
}
}
}
}
res = es.search(index='grade', body=query)
# print(res)
return res
if __name__ == '__main__':
start = time.time()
es = Elasticsearch(hosts='http://127.0.0.1:9200')
size = 2000000
res = search_data(es, size)
all_source = []
for i in range(size):
source = res['hits']['hits'][i]['_source']
all_source.append(source)
# print(source)
# 統(tǒng)計(jì)查詢出來的每個(gè)學(xué)生的所有課程的所有成績的總成績
start2 = time.time()
names1 = []
all_name_grade = {}
for data in all_source:
if data['name'] in names1:
all_name_grade[data['name']] += data['grade']
else:
names1.append(data['name'])
all_name_grade[data['name']] = data['grade']
print(all_name_grade)
end2 = time.time()
print("200萬數(shù)據(jù)耗時(shí):", end2 - start2)
end = time.time()
print('200萬數(shù)據(jù)總耗時(shí):', end - start)
# 聚合操作
start_aggs = time.time()
es = Elasticsearch(hosts='http://127.0.0.1:9200')
res = search_data2(es)
# print(res)
aggs = res['aggregations']['all_names']['buckets']
# print(aggs)
dic = {}
for agg in aggs:
dic[agg['key']] = agg['total_grade']['value']
print('1000萬數(shù)據(jù):', dic)
end_aggs = time.time()
print('1000萬數(shù)據(jù)總耗時(shí):', end_aggs - start_aggs)運(yùn)行結(jié)果:

計(jì)算數(shù)據(jù)中每個(gè)同學(xué)的每科成績之和。
示例代碼7:
from elasticsearch import Elasticsearch
import time
def search_data(es, size=10):
query = {
"query": {
"match_all": {}
}
}
res = es.search(index='grade', body=query, size=size)
# print(res)
return res
def search_data2(es):
query = {
"size": 0,
"aggs": {
"all_names": {
"terms": {
"field": "name.keyword",
"size": 10
},
"aggs": {
"all_subjects": {
"terms": {
"field": "subject.keyword",
"size": 5
},
"aggs": {
"total_grade": {
"sum": {
"field": "grade"
}
}
}
}
}
}
}
}
res = es.search(index='grade', body=query)
# print(res)
return res
if __name__ == '__main__':
start = time.time()
es = Elasticsearch(hosts='http://127.0.0.1:9200')
size = 2000000
res = search_data(es, size)
all_source = []
for i in range(size):
source = res['hits']['hits'][i]['_source']
all_source.append(source)
# print(source)
# 統(tǒng)計(jì)查詢出來的每個(gè)學(xué)生的每門課程的所有成績的總成績
start3 = time.time()
names2 = []
subjects = []
all_name_all_subject_grade = {}
for data in all_source:
if data['name'] in names2:
if all_name_all_subject_grade[data['name']].get(data['subject']):
all_name_all_subject_grade[data['name']][data['subject']] += data['grade']
else:
all_name_all_subject_grade[data['name']][data['subject']] = data['grade']
else:
names2.append(data['name'])
all_name_all_subject_grade[data['name']] = {}
all_name_all_subject_grade[data['name']][data['subject']] = data['grade']
print('200萬數(shù)據(jù):', all_name_all_subject_grade)
end3 = time.time()
print("耗時(shí):", end3 - start3)
end = time.time()
print('200萬數(shù)據(jù)總耗時(shí):', end - start)
# 聚合操作
start_aggs = time.time()
es = Elasticsearch(hosts='http://127.0.0.1:9200')
res = search_data2(es)
# print(res)
aggs = res['aggregations']['all_names']['buckets']
# print(aggs)
dic = {}
for agg in aggs:
dic[agg['key']] = {}
for sub in agg['all_subjects']['buckets']:
dic[agg['key']][sub['key']] = sub['total_grade']['value']
print('1000萬數(shù)據(jù):', dic)
end_aggs = time.time()
print('1000萬數(shù)據(jù)總耗時(shí):', end_aggs - start_aggs)運(yùn)行結(jié)果:

在上面查詢計(jì)算示例代碼中,當(dāng)使用含有1000萬數(shù)據(jù)的索引grade時(shí),普通方法查詢計(jì)算是比較耗時(shí)的,使用聚合查詢能夠大大節(jié)約大量時(shí)間。當(dāng)面對(duì)9205萬數(shù)據(jù)的索引grade2時(shí),這時(shí)使用普通計(jì)算方法所消耗的時(shí)間太大了,在線上開發(fā)環(huán)境中是不可用的,所以必須使用聚合方法來計(jì)算。
示例代碼8:
from elasticsearch import Elasticsearch
import time
def search_data(es):
query = {
"size": 0,
"aggs": {
"all_names": {
"terms": {
"field": "name.keyword",
"size": 10
},
"aggs": {
"all_subjects": {
"terms": {
"field": "subject.keyword",
"size": 5
},
"aggs": {
"total_grade": {
"sum": {
"field": "grade"
}
}
}
}
}
}
}
}
res = es.search(index='grade2', body=query)
# print(res)
return res
if __name__ == '__main__':
# 聚合操作
start_aggs = time.time()
es = Elasticsearch(hosts='http://127.0.0.1:9200')
res = search_data(es)
# print(res)
aggs = res['aggregations']['all_names']['buckets']
# print(aggs)
dic = {}
for agg in aggs:
dic[agg['key']] = {}
for sub in agg['all_subjects']['buckets']:
dic[agg['key']][sub['key']] = sub['total_grade']['value']
print('9205萬數(shù)據(jù):', dic)
end_aggs = time.time()
print('9205萬數(shù)據(jù)總耗時(shí):', end_aggs - start_aggs)運(yùn)行結(jié)果:

注意:寫查詢語句時(shí)建議使用kibana去寫,然后復(fù)制查詢語句到代碼中,kibana會(huì)提示查詢語句。
到此這篇關(guān)于如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作的文章就介紹到這了,更多相關(guān)python es 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用Python批量導(dǎo)出mysql數(shù)據(jù)庫表結(jié)構(gòu)的操作實(shí)例
這篇文章主要給大家介紹了關(guān)于利用Python批量導(dǎo)出mysql數(shù)據(jù)庫表結(jié)構(gòu)的相關(guān)資料,需要的朋友可以參考下2022-08-08
python計(jì)算程序開始到程序結(jié)束的運(yùn)行時(shí)間和程序運(yùn)行的CPU時(shí)間
這篇文章主要介紹了python計(jì)算程序開始到程序結(jié)束的運(yùn)行時(shí)間和程序運(yùn)行的CPU時(shí)間的三個(gè)方法,大家參考使用2013-11-11
Jacobi迭代算法的Python實(shí)現(xiàn)詳解
這篇文章主要介紹了Jacobi迭代算法的Python實(shí)現(xiàn)詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-06-06
Python使用PEfile模塊實(shí)現(xiàn)分析PE文件
PeFile模塊是Python中一個(gè)強(qiáng)大的便攜式第三方PE格式分析工具,用于解析和處理Windows可執(zhí)行文件,本文主要就來講講如何使用PEfile模塊實(shí)現(xiàn)分析PE文件,需要的可以參考下2023-08-08
Python常用base64 md5 aes des crc32加密解密方法匯總
這篇文章主要介紹了Python常用base64 md5 aes des crc32加密解密方法匯總,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11
Python協(xié)程的四種實(shí)現(xiàn)方式總結(jié)
今天繼續(xù)給大家介紹Python關(guān)知識(shí),本文主要內(nèi)容是Python協(xié)程的四種實(shí)現(xiàn)方式。文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-01-01
python實(shí)現(xiàn)plt x軸坐標(biāo)按1刻度顯示
這篇文章主要介紹了python實(shí)現(xiàn)plt x軸坐標(biāo)按1刻度顯示,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07

