每天遷移MySQL歷史數(shù)據(jù)到歷史庫Python腳本
本文實(shí)例為大家分享了Python每天遷移MySQL歷史數(shù)據(jù)到歷史庫的具體代碼,供大家參考,具體內(nèi)容如下
#!/usr/bin/env python
# coding:utf-8
__author__ = 'John'
import MySQLdb
import sys
import datetime
import time
class ClassMigrate(object):
def _get_argv(self):
self.usage = """
usage():
python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \\
--dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \\
--delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180
"""
if len(sys.argv) == 1:
print self.usage
sys.exit(1)
elif sys.argv[1] == '--help' or sys.argv[1] == '-h':
print self.usage
sys.exit()
elif len(sys.argv) > 2:
for i in sys.argv[1:]:
_argv = i.split('=')
if _argv[0] == '--source':
_list = _argv[1].split('/')
self.source_host = _list[0].split(':')[0]
self.source_port = int(_list[0].split(':')[1])
self.source_db = _list[1].split(':')[0]
self.source_tab = _list[1].split(':')[1]
self.source_user = _list[2]
self.source_password = _list[3]
elif _argv[0] == '--dest':
_list = _argv[1].split('/')
self.dest_host = _list[0].split(':')[0]
self.dest_port = int(_list[0].split(':')[1])
self.dest_db = _list[1].split(':')[0]
self.dest_tab = _list[1].split(':')[1]
self.dest_user = _list[2]
self.dest_password = _list[3]
elif _argv[0] == '--delete_strategy':
self.deleteStrategy = _argv[1]
if self.deleteStrategy not in ('delete', 'drop'):
print (self.usage)
sys.exit(1)
elif _argv[0] == '--primary_key':
self.pk = _argv[1]
elif _argv[0] == '--date_col':
self.date_col = _argv[1]
elif _argv[0] == '--time_interval':
self.interval = _argv[1]
else:
print (self.usage)
sys.exit(1)
def __init__(self):
self._get_argv()
## --------------------------------------------------------------------
self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')
self.sourcedb_conn_str.autocommit(True)
self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')
self.destdb_conn_str.autocommit(True)
## --------------------------------------------------------------------
self.template_tab = self.source_tab + '_template'
self.step_size = 20000
## --------------------------------------------------------------------
self._migCompleteState = False
self._deleteCompleteState = False
## --------------------------------------------------------------------
self.source_cnt = ''
self.source_min_id = ''
self.source_max_id = ''
self.source_checksum = ''
self.dest_cn = ''
## --------------------------------------------------------------------
self.today = time.strftime("%Y-%m-%d")
# self.today = '2016-05-30 09:59:40'
def sourcedb_query(self, sql, sql_type):
try:
cr = self.sourcedb_conn_str.cursor()
cr.execute(sql)
if sql_type == 'select':
return cr.fetchall()
elif sql_type == 'dml':
rows = self.sourcedb_conn_str.affected_rows()
return rows
else:
return True
except Exception, e:
print (str(e) + "<br>")
return False
finally:
cr.close()
def destdb_query(self, sql, sql_type, values=''):
try:
cr = self.destdb_conn_str.cursor()
if sql_type == 'select':
cr.execute(sql)
return cr.fetchall()
elif sql_type == 'insertmany':
cr.executemany(sql, values)
rows = self.destdb_conn_str.affected_rows()
return rows
else:
cr.execute(sql)
return True
except Exception, e:
print (str(e) + "<br>")
return False
finally:
cr.close()
def create_table_from_source(self):
'''''因?yàn)閠ab_name表的數(shù)據(jù)需要遷移到archive引擎表,所以不適合使用這種方式。 預(yù)留作其他用途。'''
try:
sql = "show create table %s;" % self.source_tab
create_str = self.sourcedb_query(sql, 'select')[0][1]
create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
self.destdb_query(create_str, 'ddl')
return True
except Exception, e:
print (str(e) + "<br>")
return False
def create_table_from_template(self):
try:
sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)
state = self.destdb_query(sql, 'ddl')
if state:
return True
else:
return False
except Exception, e:
print (str(e + "<br>") + "<br>")
return False
def get_min_max(self):
""" 創(chuàng)建目標(biāo)表、并獲取源表需要遷移的總條數(shù)、最小id、最大id """
try:
print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
q = self.sourcedb_query(sql, 'select')
self.source_cnt = q[0][0]
self.source_min_id = q[0][1]
self.source_max_id = q[0][2]
self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)
if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:
print ("There is 0 record in source table been matched! <br>")
return False
else:
return True
except Exception, e:
print (str(e) + "<br>")
return False
def migrate_2_destdb(self):
try:
get_min_max_id = self.get_min_max()
if get_min_max_id:
k = self.source_min_id
desc_sql = "desc %s;" % self.source_tab
# self.filed = []
cols = self.sourcedb_query(desc_sql, 'select')
# for j in cols:
# self.filed.append(j[0])
fileds = "%s," * len(cols) # 源表有多少個(gè)字段,就拼湊多少個(gè)%s,拼接到insert語句
fileds = fileds.rstrip(',')
while k <= self.source_max_id:
sql = """select * from %s where %s >= %d and %s< %d \
and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
% (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
print ("\n%s <br>") % sql
starttime = datetime.datetime.now()
results = self.sourcedb_query(sql, 'select')
insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds
rows = self.destdb_query(insert_sql, 'insertmany', results)
if rows == False:
print ("Insert failed!! <br>")
else:
print ("Inserted %s rows. <br>") % rows
endtime = datetime.datetime.now()
timeinterval = endtime - starttime
print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
k += self.step_size
print ("\nInsert complete at -- %s <br>") % (datetime.datetime.now().__str__())
return True
else:
return False
except Exception, e:
print (str(e) + "<br>")
return False
def verify_total_cnt(self):
try:
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
dest_result = self.destdb_query(sql, 'select')
self.dest_cnt = dest_result[0][0]
dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])
print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)
if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:
self._migCompleteState = True
print ("Verify successfully !!<br>")
else:
print ("Verify failed !!<br>")
sys.exit(77)
except Exception, e:
print (str(e) + "<br>")
def drop_daily_partition(self):
try:
if self._migCompleteState:
sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
% (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
partition_name = self.sourcedb_query(sql, 'select')
partition_name = partition_name[0][3]
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" \
% (self.pk, self.pk, self.source_tab, partition_name)
q = self.sourcedb_query(sql, 'select')
source_cnt = q[0][0]
source_min_id = q[0][1]
source_max_id = q[0][2]
checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)
if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:
print ("There is 0 record in source PARTITION been matched! <br>")
else:
if checksum == self.source_checksum:
drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name)
droped = self.sourcedb_query(drop_par_sql, 'ddl')
if droped:
print (drop_par_sql + " <br>")
print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())
self._deleteCompleteState = True
else:
print (drop_par_sql + " <br>")
print ("Drop partition failed.. <br>")
else:
print ("The partition %s checksum failed !! Drop failed !!") % partition_name
sys.exit(77)
except Exception, e:
print (str(e) + "<br>")
def delete_data(self):
try:
if self._migCompleteState:
k = self.source_min_id
while k <= self.source_max_id:
sql = """delete from %s where %s >= %d and %s< %d \
and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
print ("\n%s <br>") % sql
starttime = datetime.datetime.now()
rows = self.sourcedb_query(sql, 'dml')
if rows == False:
print ("Delete failed!! <br>")
else:
print ("Deleted %s rows. <br>") % rows
endtime = datetime.datetime.now()
timeinterval = endtime - starttime
print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
time.sleep(1)
k += self.step_size
print ("\nDelete complete at -- %s <br>") % (datetime.datetime.now().__str__())
self._deleteCompleteState = True
except Exception, e:
print (str(e) + "<br>")
def do(self):
tab_create = self.create_table_from_template()
if tab_create:
migration = self.migrate_2_destdb()
if migration:
self.verify_total_cnt()
if self._migCompleteState:
if self.deleteStrategy == 'drop':
self.drop_daily_partition()
else:
self.delete_data()
print ("\n<br>")
print ("====="*5 + '<br>')
print ("source_total_cnt: %s <br>") % self.source_cnt
print ("dest_total_cnt: %s <br>") % self.dest_cnt
print ("====="*5 + '<br>')
if self._deleteCompleteState:
print ("\nFinal result: Successfully !! <br>")
sys.exit(88)
else:
print ("\nFinal result: Failed !! <br>")
sys.exit(254)
else:
print ("Create table failed ! Exiting. . .")
sys.exit(255)
f = ClassMigrate()
f.do()
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Python獲取網(wǎng)頁數(shù)據(jù)詳解流程
讀萬卷書不如行萬里路,只學(xué)書上的理論是遠(yuǎn)遠(yuǎn)不夠的,只有在實(shí)戰(zhàn)中才能獲得能力的提升,本篇文章手把手帶你用Python來獲取網(wǎng)頁的數(shù)據(jù),主要應(yīng)用了Requests庫,大家可以在過程中查缺補(bǔ)漏,提升水平2021-10-10
Django migrate報(bào)錯(cuò)的解決方案
在講解如何解決migrate報(bào)錯(cuò)原因前,我們先要了解migrate做了什么事情,本文就詳細(xì)的介紹migrate使用以及出現(xiàn)問題的解決,感興趣的可以了解一下2021-05-05
python之文件的讀寫和文件目錄以及文件夾的操作實(shí)現(xiàn)代碼
這篇文章主要介紹了python之文件的讀寫和文件目錄以及文件夾的操作實(shí)現(xiàn)代碼,需要的朋友可以參考下2016-08-08
python引用(import)某個(gè)模塊提示沒找到對(duì)應(yīng)模塊的解決方法
今天小編就為大家分享一篇python引用(import)某個(gè)模塊提示沒找到對(duì)應(yīng)模塊的解決方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-01-01
Python使用pycharm導(dǎo)入pymysql教程
這篇文章主要介紹了Python使用pycharm導(dǎo)入pymysql教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09
python selenium實(shí)現(xiàn)發(fā)送帶附件的郵件代碼實(shí)例
這篇文章主要介紹了python selenium實(shí)現(xiàn)發(fā)送帶附件的郵件代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
python 簡(jiǎn)單照相機(jī)調(diào)用系統(tǒng)攝像頭實(shí)現(xiàn)方法 pygame
今天小編就為大家分享一篇python 簡(jiǎn)單照相機(jī)調(diào)用系統(tǒng)攝像頭實(shí)現(xiàn)方法 pygame,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-08-08
python:刪除離群值操作(每一行為一類數(shù)據(jù))
這篇文章主要介紹了python:刪除離群值操作(每一行為一類數(shù)據(jù)),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-06-06

