Python實現(xiàn)數(shù)據(jù)庫并行讀取和寫入實例
這篇主要記錄一下如何實現(xiàn)對數(shù)據(jù)庫的并行運(yùn)算來節(jié)省代碼運(yùn)行時間。語言是Python,其他語言思路一樣。
前言
一共23w條數(shù)據(jù),是之前通過自然語言分析處理過的數(shù)據(jù),附一張截圖:

要實現(xiàn)對news主體的讀取,并且找到其中含有的股票名稱,只要發(fā)現(xiàn),就將這支股票和對應(yīng)的日期、score寫入數(shù)據(jù)庫。
顯然,幾十萬條數(shù)據(jù)要是一條條讀寫,然后在本機(jī)上操作,耗時太久,可行性極低。所以,如何有效并行的讀取內(nèi)容,并且進(jìn)行操作,最后再寫入數(shù)據(jù)庫呢?
并行讀取和寫入
并行讀?。簞?chuàng)建N*max_process個進(jìn)程,對數(shù)據(jù)庫進(jìn)行讀取。讀取的時候應(yīng)該注意:
- 每個進(jìn)程需要分配不同的connection和對應(yīng)的cursor,否則數(shù)據(jù)庫會報錯。
- 數(shù)據(jù)庫必須能承受相應(yīng)的高并發(fā)訪問(可以手動更改)
實現(xiàn)的時候,如果不在進(jìn)程里面創(chuàng)建新的connection,就會發(fā)生沖突,每個進(jìn)程拿到權(quán)限后,會被下個進(jìn)程釋放,所以匯報出來NoneType Error的錯誤。
- 并行寫入:在對數(shù)據(jù)庫進(jìn)行更改的時候,不可以多進(jìn)程更改。所以,我們需要根據(jù)已有的表,創(chuàng)建max_process-1個同樣結(jié)構(gòu)的表用來寫入。表的命名規(guī)則可以直接在原來基礎(chǔ)上加上1,2,3...數(shù)字可以通過對max_process取余得到。
此時,對應(yīng)進(jìn)程里面先后出現(xiàn)讀入的conn(保存消息后關(guān)閉)和寫入的conn。每個進(jìn)程對應(yīng)的表的index就是 主循環(huán)中的num對max_process取余(100->4,101->5),這樣每個進(jìn)程只對一個表進(jìn)行操作了。
部分代碼實現(xiàn)
max_process = 16 #最大進(jìn)程數(shù)
def read_SQL_write(r_host,r_port,r_user,r_passwd,r_db,r_charset,w_host,w_port,w_user,w_passwd,w_db,w_charset,cmd,index=None):
#得到tem字典保存著信息
try:
conn = pymysql.Connect(host=r_host, port=r_port, user=r_user, passwd =r_passwd, db =r_db, charset =r_charset)
cursor = conn.cursor()
cursor.execute(cmd)
except Exception as e:
error = "[-][-]%d fail to connect SQL for reading" % index
log_error('error.log',error)
return
else:
tem = cursor.fetchone()
print('[+][+]%d succeed to connect SQL for reading' % index)
finally:
cursor.close()
conn.close()
try:
conn = pymysql.Connect(host=w_host, port=w_port, user=w_user, passwd =w_passwd, db =w_db, charset =w_charset)
cursor = conn.cursor()
cursor.execute(cmd)
except Exception as e:
error = "[-][-]%d fail to connect SQL for writing" % index
log_error('error.log',error)
return
else:
print('[+][+]%d succeed to connect SQL for writing' % index)
r_dict = dict()
r_dict['id'] = tem[0]
r_dict['content_id'] = tem[1]
r_dict['pub_date'] = tem[2]
r_dict['title'] = cht_to_chs(tem[3])
r_dict['title_score'] =tem[4]
r_dict['news_content'] = cht_to_chs(tem[5])
r_dict['content_score'] = tem[6]
for key in stock_dict.keys():
#能找到對應(yīng)的股票
if stock_dict[key][1] and ( r_dict['title'].find(stock_dict[key][1])!=-1 or r_dict['news_content'].find(stock_dict[key][1])!=-1 ):
w_dict=dict()
w_dict['code'] = key
w_dict['english_name'] = stock_dict[key][0]
w_dict['cn_name'] = stock_dict[key][1]
#得到分?jǐn)?shù)
if r_dict['title_score']:
w_dict['score']=r_dict['title_score']
else:
w_dict['score']=r_dict['content_score']
#開始寫入
try:
global max_process
cmd = "INSERT INTO dyx_stock_score%d VALUES ('%s', '%s' , %d , '%s' , '%s' , %.2f );" % \
(index%max_process ,r_dict['content_id'] ,r_dict['pub_date'] ,w_dict['code'] ,w_dict['english_name'] ,w_dict['cn_name'] ,w_dict['score'])
cursor.execute(cmd)
conn.commit()
except Exception as e:
error = " [-]%d fail to write to SQL" % index
cursor.rollback()
log_error('error.log',error)
else:
print(" [+]%d succeed to write to SQL" % index)
cursor.close()
conn.close()
def main():
num = 238143#數(shù)據(jù)庫查詢拿到的總數(shù)
p = None
for index in range(1,num+1):
if index%max_process==1:
if p:
p.close()
p.join()
p = multiprocessing.Pool(max_process)
r_cmd = ('select id,content_id,pub_date,title,title_score,news_content,content_score from dyx_emotion_analysis where id = %d;' % (index))
p.apply_async(func = read_SQL_write,args=(r_host,r_port,r_user,r_passwd,r_db,r_charset,w_host,w_port,w_user,w_passwd,w_db,w_charset,r_cmd,index,))
if p:
p.close()
p.join()
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- python實現(xiàn)數(shù)據(jù)寫入excel表格
- Python寫入數(shù)據(jù)到MP3文件中的方法
- Python基于csv模塊實現(xiàn)讀取與寫入csv數(shù)據(jù)的方法
- python讀取excel指定列數(shù)據(jù)并寫入到新的excel方法
- Python3實現(xiàn)將本地JSON大數(shù)據(jù)文件寫入MySQL數(shù)據(jù)庫的方法
- Python實現(xiàn)自定義順序、排列寫入數(shù)據(jù)到Excel的方法
- Python實現(xiàn)將數(shù)據(jù)框數(shù)據(jù)寫入mongodb及mysql數(shù)據(jù)庫的方法
- Python實現(xiàn)讀寫sqlite3數(shù)據(jù)庫并將統(tǒng)計數(shù)據(jù)寫入Excel的方法示例
- 利用python對Excel中的特定數(shù)據(jù)提取并寫入新表的方法
- Python把csv數(shù)據(jù)寫入list和字典類型的變量腳本方法
- Python實現(xiàn)將數(shù)據(jù)寫入netCDF4中的方法示例
相關(guān)文章
安裝python依賴包psycopg2來調(diào)用postgresql的操作
這篇文章主要介紹了安裝python依賴包psycopg2來調(diào)用postgresql的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01
Python利用openpyxl類實現(xiàn)在Excel中繪制樂高圖案
在商場看到一個超級瑪麗的樂高圖,感覺使用excel的顏色填充也能畫出來。所以本文將借助openpyxl類實現(xiàn)在Excel中繪制樂高圖案,需要的可以參考一下2022-12-12
Python3和pyqt5實現(xiàn)控件數(shù)據(jù)動態(tài)顯示方式
今天小編就為大家分享一篇Python3和pyqt5實現(xiàn)控件數(shù)據(jù)動態(tài)顯示方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-12-12
Python實現(xiàn)的對一個數(shù)進(jìn)行因式分解操作示例
這篇文章主要介紹了Python實現(xiàn)的對一個數(shù)進(jìn)行因式分解操作,結(jié)合實例形式分析了Python因式分解數(shù)值運(yùn)算相關(guān)操作技巧,需要的朋友可以參考下2019-06-06
Python數(shù)據(jù)結(jié)構(gòu)與算法之圖的最短路徑(Dijkstra算法)完整實例
這篇文章主要介紹了Python數(shù)據(jù)結(jié)構(gòu)與算法之圖的最短路徑(Dijkstra算法),結(jié)合完整實例形式分析了Python圖的最短路徑算法相關(guān)原理與實現(xiàn)技巧,需要的朋友可以參考下2017-12-12
python實現(xiàn)在sqlite動態(tài)創(chuàng)建表的方法
這篇文章主要介紹了python實現(xiàn)在sqlite動態(tài)創(chuàng)建表的方法,涉及Python操作SQLite數(shù)據(jù)庫創(chuàng)建數(shù)據(jù)表的技巧,具有一定參考借鑒價值,需要的朋友可以參考下2015-05-05

