使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作(2)
前言 :
上一篇文章:如何使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作
模擬學生個人信息寫入es數(shù)據(jù)庫,包括姓名、性別、年齡、特點、科目、成績,創(chuàng)建時間。
方案一
在寫入數(shù)據(jù)時未提前創(chuàng)建索引mapping,而是每插入一條數(shù)據(jù)都包含了索引的信息。
示例代碼:【多線程寫入數(shù)據(jù)】【一次性寫入10000*1000條數(shù)據(jù)】 【本人親測耗時3266秒】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負,不以自我為中心',
'努力、積極、樂觀、拼搏是我的人生信條',
'抗壓能力強,能夠快速適應(yīng)周圍環(huán)境',
'敢做敢拼,腳踏實地;做事認真負責,責任心強',
'愛好所學專業(yè),樂于學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情',
'主動性強,自學能力強,具有團隊合作意識,有一定組織能力',
'忠實誠信,講原則,說到做到,決不推卸責任',
'有自制力,做事情始終堅持有始有終,從不半途而廢',
'肯學習,有問題不逃避,愿意虛心向他人學習',
'愿意以謙虛態(tài)度贊揚接納優(yōu)越者,權(quán)威者',
'會用100%的熱情和精力投入到工作中;平易近人',
'為人誠懇,性格開朗,積極進取,適應(yīng)力強、勤奮好學、腳踏實地',
'有較強的團隊精神,工作積極進取,態(tài)度認真']
subjects = ['語文', '數(shù)學', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def save_to_es(num):
"""
批量寫入數(shù)據(jù)到es數(shù)據(jù)庫
:param num:
:return:
"""
start = time.time()
action = [
{
"_index": "personal_info_10000000",
"_type": "doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"age": random.choice(age),
"character": random.choice(character),
"subject": random.choice(subjects),
"grade": random.choice(grades),
"create_time": create_time
}
} for i in range(10000 * num, 10000 * num + 10000)
]
helpers.bulk(es, action)
end = time.time()
print(f"{num}耗時{end - start}s!")
def run():
global queue
while queue.qsize() > 0:
num = queue.get()
print(num)
save_to_es(num)
if __name__ == '__main__':
start = time.time()
queue = Queue()
# 序號數(shù)據(jù)進隊列
for num in range(1000):
queue.put(num)
# 多線程執(zhí)行程序
consumer_lst = []
for _ in range(10):
thread = threading.Thread(target=run)
thread.start()
consumer_lst.append(thread)
for consumer in consumer_lst:
consumer.join()
end = time.time()
print('程序執(zhí)行完畢!花費時間:', end - start)運行結(jié)果:



自動創(chuàng)建的索引mapping:
GET personal_info_10000000/_mapping
{
"personal_info_10000000" : {
"mappings" : {
"properties" : {
"age" : {
"type" : "long"
},
"character" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"create_time" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"grade" : {
"type" : "long"
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"sex" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"subject" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}方案二
1.順序插入5000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000,確定好mapping后,再插入數(shù)據(jù)。
新建索引并設(shè)置mapping信息:
PUT personal_info_5000000
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 32
}
}
},
"sex": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 8
}
}
},
"age": {
"type": "long"
},
"character": {
"type": "text",
"analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"subject": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"grade": {
"type": "long"
},
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}查看新建索引信息:
GET personal_info_5000000
{
"personal_info_5000000" : {
"aliases" : { },
"mappings" : {
"properties" : {
"age" : {
"type" : "long"
},
"character" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
},
"analyzer" : "ik_smart"
},
"create_time" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"grade" : {
"type" : "long"
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 32
}
}
},
"sex" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 8
}
}
},
"subject" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"settings" : {
"index" : {
"routing" : {
"allocation" : {
"include" : {
"_tier_preference" : "data_content"
}
}
},
"number_of_shards" : "3",
"provided_name" : "personal_info_50000000",
"creation_date" : "1663471072176",
"number_of_replicas" : "1",
"uuid" : "5DfmfUhUTJeGk1k4XnN-lQ",
"version" : {
"created" : "7170699"
}
}
}
}
}開始插入數(shù)據(jù):
示例代碼: 【單線程寫入數(shù)據(jù)】【一次性寫入10000*500條數(shù)據(jù)】 【本人親測耗時7916秒】
from elasticsearch import Elasticsearch
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負,不以自我為中心',
'努力、積極、樂觀、拼搏是我的人生信條',
'抗壓能力強,能夠快速適應(yīng)周圍環(huán)境',
'敢做敢拼,腳踏實地;做事認真負責,責任心強',
'愛好所學專業(yè),樂于學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情',
'主動性強,自學能力強,具有團隊合作意識,有一定組織能力',
'忠實誠信,講原則,說到做到,決不推卸責任',
'有自制力,做事情始終堅持有始有終,從不半途而廢',
'肯學習,有問題不逃避,愿意虛心向他人學習',
'愿意以謙虛態(tài)度贊揚接納優(yōu)越者,權(quán)威者',
'會用100%的熱情和精力投入到工作中;平易近人',
'為人誠懇,性格開朗,積極進取,適應(yīng)力強、勤奮好學、腳踏實地',
'有較強的團隊精神,工作積極進取,態(tài)度認真']
subjects = ['語文', '數(shù)學', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加程序耗時的功能
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
end = time.time()
print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
return res
return wrapper
@timer
def save_to_es(num):
"""
順序?qū)懭霐?shù)據(jù)到es數(shù)據(jù)庫
:param num:
:return:
"""
body = {
"id": num,
"name": random.choice(names),
"sex": random.choice(sexs),
"age": random.choice(age),
"character": random.choice(character),
"subject": random.choice(subjects),
"grade": random.choice(grades),
"create_time": create_time
}
# 此時若索引不存在時會新建
es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body)
def run():
global queue
while queue.qsize() > 0:
num = queue.get()
print(num)
save_to_es(num)
if __name__ == '__main__':
start = time.time()
queue = Queue()
# 序號數(shù)據(jù)進隊列
for num in range(5000000):
queue.put(num)
# 多線程執(zhí)行程序
consumer_lst = []
for _ in range(10):
thread = threading.Thread(target=run)
thread.start()
consumer_lst.append(thread)
for consumer in consumer_lst:
consumer.join()
end = time.time()
print('程序執(zhí)行完畢!花費時間:', end - start)運行結(jié)果:

2.批量插入5000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000_v2,確定好mapping后,再插入數(shù)據(jù)。
新建索引并設(shè)置mapping信息:
PUT personal_info_5000000_v2
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 32
}
}
},
"sex": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 8
}
}
},
"age": {
"type": "long"
},
"character": {
"type": "text",
"analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"subject": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"grade": {
"type": "long"
},
"create_time": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}查看新建索引信息:
GET personal_info_5000000_v2
{
"personal_info_5000000_v2" : {
"aliases" : { },
"mappings" : {
"properties" : {
"age" : {
"type" : "long"
},
"character" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
},
"analyzer" : "ik_smart"
},
"create_time" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"grade" : {
"type" : "long"
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 32
}
}
},
"sex" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 8
}
}
},
"subject" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"settings" : {
"index" : {
"routing" : {
"allocation" : {
"include" : {
"_tier_preference" : "data_content"
}
}
},
"number_of_shards" : "3",
"provided_name" : "personal_info_5000000_v2",
"creation_date" : "1663485323617",
"number_of_replicas" : "1",
"uuid" : "XBPaDn_gREmAoJmdRyBMAA",
"version" : {
"created" : "7170699"
}
}
}
}
}批量插入數(shù)據(jù):
通過elasticsearch模塊導入helper,通過helper.bulk來批量處理大量的數(shù)據(jù)。首先將所有的數(shù)據(jù)定義成字典形式,各字段含義如下:
- _index對應(yīng)索引名稱,并且該索引必須存在。
- _type對應(yīng)類型名稱。
- _source對應(yīng)的字典內(nèi),每一篇文檔的字段和值,可有有多個字段。
示例代碼: 【程序中途異常,寫入4714000條數(shù)據(jù)】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負,不以自我為中心',
'努力、積極、樂觀、拼搏是我的人生信條',
'抗壓能力強,能夠快速適應(yīng)周圍環(huán)境',
'敢做敢拼,腳踏實地;做事認真負責,責任心強',
'愛好所學專業(yè),樂于學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情',
'主動性強,自學能力強,具有團隊合作意識,有一定組織能力',
'忠實誠信,講原則,說到做到,決不推卸責任',
'有自制力,做事情始終堅持有始有終,從不半途而廢',
'肯學習,有問題不逃避,愿意虛心向他人學習',
'愿意以謙虛態(tài)度贊揚接納優(yōu)越者,權(quán)威者',
'會用100%的熱情和精力投入到工作中;平易近人',
'為人誠懇,性格開朗,積極進取,適應(yīng)力強、勤奮好學、腳踏實地',
'有較強的團隊精神,工作積極進取,態(tài)度認真']
subjects = ['語文', '數(shù)學', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加程序耗時的功能
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
end = time.time()
print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
return res
return wrapper
@timer
def save_to_es(num):
"""
批量寫入數(shù)據(jù)到es數(shù)據(jù)庫
:param num:
:return:
"""
action = [
{
"_index": "personal_info_5000000_v2",
"_type": "_doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"age": random.choice(age),
"character": random.choice(character),
"subject": random.choice(subjects),
"grade": random.choice(grades),
"create_time": create_time
}
} for i in range(10000 * num, 10000 * num + 10000)
]
helpers.bulk(es, action)
def run():
global queue
while queue.qsize() > 0:
num = queue.get()
print(num)
save_to_es(num)
if __name__ == '__main__':
start = time.time()
queue = Queue()
# 序號數(shù)據(jù)進隊列
for num in range(500):
queue.put(num)
# 多線程執(zhí)行程序
consumer_lst = []
for _ in range(10):
thread = threading.Thread(target=run)
thread.start()
consumer_lst.append(thread)
for consumer in consumer_lst:
consumer.join()
end = time.time()
print('程序執(zhí)行完畢!花費時間:', end - start)運行結(jié)果:


3.批量插入50000000條數(shù)據(jù)
先創(chuàng)建索引personal_info_5000000_v2,確定好mapping后,再插入數(shù)據(jù)。
此過程是在上面批量插入的前提下進行優(yōu)化,采用python生成器。
建立索引和mapping同上,直接上代碼:
示例代碼: 【程序中途異常,寫入3688000條數(shù)據(jù)】
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負,不以自我為中心',
'努力、積極、樂觀、拼搏是我的人生信條',
'抗壓能力強,能夠快速適應(yīng)周圍環(huán)境',
'敢做敢拼,腳踏實地;做事認真負責,責任心強',
'愛好所學專業(yè),樂于學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情',
'主動性強,自學能力強,具有團隊合作意識,有一定組織能力',
'忠實誠信,講原則,說到做到,決不推卸責任',
'有自制力,做事情始終堅持有始有終,從不半途而廢',
'肯學習,有問題不逃避,愿意虛心向他人學習',
'愿意以謙虛態(tài)度贊揚接納優(yōu)越者,權(quán)威者',
'會用100%的熱情和精力投入到工作中;平易近人',
'為人誠懇,性格開朗,積極進取,適應(yīng)力強、勤奮好學、腳踏實地',
'有較強的團隊精神,工作積極進取,態(tài)度認真']
subjects = ['語文', '數(shù)學', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 添加程序耗時的功能
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
end = time.time()
print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
return res
return wrapper
@timer
def save_to_es(num):
"""
使用生成器批量寫入數(shù)據(jù)到es數(shù)據(jù)庫
:param num:
:return:
"""
action = (
{
"_index": "personal_info_5000000_v3",
"_type": "_doc",
"_id": i,
"_source": {
"id": i,
"name": random.choice(names),
"sex": random.choice(sexs),
"age": random.choice(age),
"character": random.choice(character),
"subject": random.choice(subjects),
"grade": random.choice(grades),
"create_time": create_time
}
} for i in range(10000 * num, 10000 * num + 10000)
)
helpers.bulk(es, action)
def run():
global queue
while queue.qsize() > 0:
num = queue.get()
print(num)
save_to_es(num)
if __name__ == '__main__':
start = time.time()
queue = Queue()
# 序號數(shù)據(jù)進隊列
for num in range(500):
queue.put(num)
# 多線程執(zhí)行程序
consumer_lst = []
for _ in range(10):
thread = threading.Thread(target=run)
thread.start()
consumer_lst.append(thread)
for consumer in consumer_lst:
consumer.join()
end = time.time()
print('程序執(zhí)行完畢!花費時間:', end - start)運行結(jié)果:


到此這篇關(guān)于使用python生成大量數(shù)據(jù)寫入es數(shù)據(jù)庫并查詢操作(2)的文章就介紹到這了,更多相關(guān)python生成 數(shù)據(jù) 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python網(wǎng)絡(luò)應(yīng)用開發(fā)知識點淺析
在本篇內(nèi)容中小編給學習python的朋友們整理了關(guān)于網(wǎng)絡(luò)應(yīng)用開發(fā)的相關(guān)知識點以及實例內(nèi)容,需要的朋友們參考下。2019-05-05
Python多進程庫multiprocessing中進程池Pool類的使用詳解
這篇文章主要介紹了Python多進程庫multiprocessing中進程池Pool類的使用詳解,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-11-11
Python實現(xiàn)按照指定要求逆序輸出一個數(shù)字的方法
這篇文章主要介紹了Python實現(xiàn)按照指定要求逆序輸出一個數(shù)字的方法,涉及Python針對字符串的遍歷、判斷、輸出等相關(guān)操作技巧,需要的朋友可以參考下2018-04-04
Python中聲明只包含一個元素的元組數(shù)據(jù)方法
這篇文章主要介紹了Python中聲明只包含一個元素的元組數(shù)據(jù)方法,本文是實際經(jīng)驗總結(jié)而來,沒有碰到這個需要可能不會注意到這個問題,需要的朋友可以參考下2014-08-08

