我正在使用一個外部實用程序來備份我的MS-SQL數據庫,這需要我的數據庫備份和還原它給我的數據庫文件(.mdf ,.ldf)文件。使用python比較恢復的MS-SQL數據庫與原始數據庫
現在,我需要的是,以驗證是否恢復數據庫文件(.mdf & .ldf文件)都和原來一樣數據庫文件或不使用Python
自動化。有沒有可以與python集成的db比較的實用工具?
我期待到pymssql模塊,但不知道,如果我可以使用這個模塊比較恢復的DB ...
我正在使用一個外部實用程序來備份我的MS-SQL數據庫,這需要我的數據庫備份和還原它給我的數據庫文件(.mdf ,.ldf)文件。使用python比較恢復的MS-SQL數據庫與原始數據庫
現在,我需要的是,以驗證是否恢復數據庫文件(.mdf & .ldf文件)都和原來一樣數據庫文件或不使用Python
自動化。有沒有可以與python集成的db比較的實用工具?
我期待到pymssql模塊,但不知道,如果我可以使用這個模塊比較恢復的DB ...
我走進一看pymssql
。 pymssql
所做的只是執行傳遞給它的SQL查詢並返回相同的輸出。但是,安裝pymssql
(在Windows上)確實是一項非常困難的任務。因此,我傾向於使用SQL Server的內置SQLCMD
實用程序來執行我的SQL查詢,而不是使用pymssql
。
因此,我用os.system()
的python
執行我的SQL查詢通過SQLCMD
。因此,我通過python將我的SQL查詢寫入.sql文件,然後將它傳遞給SQLCMD並將輸出重定向到另一個文本文件。
使用SQLCMD,我首先收集了給定數據庫中的表的列表。然後,我逐個查詢所有表格,並將它們的數據放入一個文本文件(Original_DB_Data.txt)
中。對於還原的分貝,我遵循相同的程序,並將數據轉儲到另一個文本文件(Restored_DB_Data.txt)
。該代碼片段如下: -
import os
def get_table_list_in_DB(Instance_name, DB_name):
query = 'SELECT NAME from [%s].sys.tables' %(DB_name)
input_file = "C:\\SQL_Data\\SQLQuery_table_list.sql"
output_file = "C:\\SQL_Data\\Table_list_DB-%s_Output.txt" %(DB_name)
with open(input_file, 'w') as f:
f.write(query)
command = 'sqlcmd -S %s -i %s -o %s' %(Instance_name, input_file, output_file)
os.system(command)
os.remove(input_file) # to delete the created .sql file
return output_file
def get_db_data(Instance_name, DB_name):
output_file = "C:\\SQL_Data\\DB_Data-%s_Output.txt" %(DB_name)
table_list = get_table_list_in_DB(Instance_name, DB_name)
flag = 0
with open(table_list, 'r') as f1, open(output_file, 'a') as f2:
for lines in f1:
if re.match("^\s", lines):
flag = 0
if flag:
table_data = get_table_data(Instance_name, DB_name, table_name=lines.strip())
with open(table_data, 'r') as f3:
f2.write("##################################" + '\n')
f2.write('\t' +lines.strip() + '\n')
f2.write("##################################" + '\n')
f2.write(f3.read())
os.remove(table_data)
if re.match("^----+", lines):
flag = 1
return output_file
def get_table_data(Instance_name, DB_name, table_name):
input_file = "C:\\SQL_Data\\SQLQuery_table_data.sql"
output_file = "C:\\SQL_Data\\Table_data_%s_Output.txt" %(table_name)
query = "SELECT * from [%s].dbo.[%s]" %(DB_name, table_name)
with open(input_file, 'w') as f:
f.write(query)
command = "sqlcmd -S %s -i %s -o %s" %(Instance_name, input_file, output_file)
os.system(command)
os.remove(input_file)
return output_file
def compare_DB_Data(DB_Detail1=[], DB_Detail2=[]):
get_db_data(Instance_name=DB_Detail1[0], DB_name=DB_Detail1[1])
get_db_data(Instance_name=DB_Detail2[0], DB_name=DB_Detail2[1])
data_DB1 = "C:\\SQL_Data\\DB_Data-%s_Output.txt" %(DB_Detail1[1])
data_DB2 = "C:\\SQL_Data\\DB_Data-%s_Output.txt" %(DB_Detail2[1])
with open(data_DB1, 'r') as f1, open(data_DB2, 'r') as f2:
if f1.read() == f2.read():
print "Data of both DB Matches"
else:
print "Data of both DB Varies"
在的情況下,要提高compare_DB_Data()
方法,以便得到的是什麼確切的差異,那麼您可以按如下重新寫出來,這將傾的區別到另一個文本文件,您可以稍後參考它,以檢查究竟是什麼區別: -
def compare_DB_Data(DB_Detail1=[], DB_Detail2=[]):
'''
This method collects the data from both the given db and then compares the data
If any difference found, it appends the same into a file for post-verification
'''
get_db_data(Instance_name=DB_Detail1[0], DB_name=DB_Detail1[1])
get_db_data(Instance_name=DB_Detail2[0], DB_name=DB_Detail2[1])
data_DB1 = "C:\\SQL_Data\\DB_Data-%s_Output.txt" %(DB_Detail1[1])
data_DB2 = "C:\\SQL_Data\\DB_Data-%s_Output.txt" %(DB_Detail2[1])
data_difference = "C:\\SQL_Data\\Data-Difference-%s-vs-%s_Output.txt" %(DB_Detail1[1], DB_Detail2[1])
with open(data_difference, 'w') as f3:
f3.write("Data Difference in %s vs %s \n" %(DB_Detail1[1], DB_Detail2[1]))
f3.write("-"*100 + '\n')
f3.write(DB_Detail1[1])
f3.write('\t\t\t\t\t\t\t')
f3.write(DB_Detail2[1] + '\n')
f3.write("-"*100 + '\n')
data_diff =0
with open(data_DB1, 'r') as f1, open(data_DB2, 'r') as f2:
for line_f1, line_f2 in zip(f1, f2):
if line_f1 != line_f2:
data_diff = 1 # Flag set if any difference found
with open(data_difference, 'a') as f3:
f3.write(line_f1.strip() + '\t\t\t\t\t\t\t' + line_f2.strip())
if data_diff:
print "Data of both DB Varies"
else:
print "Data of both DB Matches"
我正在故意添加此答案。只有在SQL服務器所在的同一臺機器上運行時,上述答案纔有效。而且,之前我們使用的是文本文件,它不能確保兩個dbs之間的完整比較。所以,這個新的是基於字典的方法來避免上述問題。
即使您的SQL服務器和您的代碼位於不同的計算機上,下面的代碼也能正常工作。但是,爲此,您需要在存在SQL服務器的計算機上啓用WINRM
模塊。 WINRM
是一個窗口的內置模塊,用於在Windows機器之間進行通信。要在計算機上啓用WINRM,您需要在SQL計算機的命令行中運行以下命令。
winrm qc -q
winrm set winrm/config/client/auth @{Basic="true"}
winrm set winrm/config/service/auth @{Basic="true"}
winrm set winrm/config/service @{AllowUnencrypted="true"}
這將啓用您的機器上的WINRM模塊。現在我們需要python的pywinrm
模塊來與我們的遠程SQL主機進行通信。請參閱this鏈接以安裝pywinrm
。
因此,首先我們將收集數據庫中存在的表的列表(使用方法get_table_list_in_DB
。將這些表存儲到文本文件中。此列表將用於收集表格模式以及所有表格數據在數據庫中存在的表,逐個從文件中讀取每個表名並查詢表模式和表數據。
get_db_Schema()
方法讀取上述返回的表列表並在內部調用get_table_Schema()
方法,收集數據庫內所有表的模式詳細信息。爲每個表返回的模式都存儲爲字典鍵值對,其中表名是鍵,返回的模式是值。因此,整個數據庫模式是一個字典,每個表的格式爲key
,其模式爲value
。
以同樣的方式,get_db_data()
和get_table_data()
的作品。 get_db_data()
收集每個表的數據並將其存儲在表名爲key
的字典中,返回的數據爲value
。
因此,爲了比較兩個數據庫,我們將數據庫詳細信息(數據庫名稱和實例名稱)傳遞給compare_DB_Data()
方法,而該方法又調用get_db_Schema()
來處理兩個數據庫並進行比較。如果兩個數據庫的模式匹配,那麼in將調用兩個數據庫的get_db_data()
方法並匹配它們。因爲我們已經以鍵值對的形式存儲表模式以及表數據,所以如果DB1中的每個鍵(既用於模式也用於數據)在兩個DB中具有相同的值,那麼我們可以確保兩者我們的數據庫是一樣的。
如果兩個數據庫的模式發現有差異,則添加到schema1_diff_schema2
schema2_diff_schema1
字典中。同樣,數據差異將被添加到data1_diff_data2
和data2_diff_data1
字典中。
下面是相同的代碼片: -
class SQL_Compare_DB(object):
def __init__(self, SQL_Host_IP, auth):
# Append to path, in case not present
sys.path.append(r"C:\Program Files\Microsoft SQL Server\110\Tools\Binn")
# Create session with SQL host
self.session = winrm.Session(SQL_Host_IP, auth)
# We need a directory where the db files will be stored
if not os.path.exists("C:\SQL_Data"):
os.mkdir("C:\SQL_Data")
else:
os.system("RMDIR /S /Q C:\SQL_Data")
os.system("MKDIR C:\SQL_Data")
def get_table_list_in_DB(self, Instance_name, DB_name):
'''
Returns the list of table in the given database
'''
output_file = "C:\\SQL_Data\\Table_list_DB-%s_Output.txt" %(DB_name)
query = 'SELECT NAME from [%s].sys.tables' %(DB_name)
command = 'sqlcmd -S "%s" -Q "%s"' %(Instance_name, query)
execute_query = self.session.run_cmd(command)
if execute_query.std_err:
print "Error in command execution :- ", execute_query.std_err
return False
with open(output_file, 'w') as f:
f.write(execute_query.std_out)
return output_file
def get_db_Schema(self, Instance_name, DB_name):
'''
Get the schema of all tables in the Database
'''
table_list = self.get_table_list_in_DB(Instance_name, DB_name)
flag = 0
db_schema = {}
with open(table_list, 'r') as f1:
for lines in f1:
if re.match("^\s", lines):
flag = 0
if flag:
table_schema = self.get_table_Schema(
Instance_name, DB_name, table_name=lines.strip())
db_schema.update(table_schema)
if re.match("^----+?-", lines):
flag = 1
print db_schema
return db_schema
def get_table_Schema(self, Instance_name, DB_name, table_name):
'''
Get the table schema
'''
table_schema = {}
query = """
SELECT ORDINAL_POSITION, COLUMN_NAME, DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH, IS_NULLABLE
FROM [%s].INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '%s'""".replace('\n','') %(DB_name, table_name)
command = 'sqlcmd -S "%s" -Q "%s"' %(Instance_name, query)
execute_query = self.session.run_cmd(command)
table_schema[table_name] = execute_query.std_out
if execute_query.std_err:
print "Error in command execution :- ", execute_query.std_err
return False
return table_schema
def get_db_data(self, Instance_name, DB_name):
'''
From the given DB, it fetchs the data of all existing
tables and append those in a single text file
'''
db_data = {}
table_list = self.get_table_list_in_DB(Instance_name, DB_name)
flag = 0
with open(table_list, 'r') as f1:
for lines in f1:
if re.match("^\s", lines):
flag = 0
if flag:
table_data = self.get_table_data(
Instance_name, DB_name, table_name=lines.strip())
db_data.update(table_data)
if re.match("^----+?-", lines): # after the line ------ the tabel data are printed
flag = 1
print db_data
return db_data
def get_table_data(self, Instance_name, DB_name, table_name):
'''
Get the data for the given table
'''
table_data = {}
query = "SELECT * from [%s].dbo.[%s]" %(DB_name, table_name)
command = 'sqlcmd -S "%s" -Q "%s"' %(Instance_name, query)
execute_query = self.session.run_cmd(command)
table_data[table_name] = execute_query.std_out
if execute_query.std_err:
print "Error in command execution :- ", execute_query.std_err
return False
return table_data
def compare_DB_Data(self, DB_Detail1=[], DB_Detail2=[]):
'''
Take detail of two DBs as input and gets the detailed DB data
The data collected are dumped into one dictionary and at last
both the dictionary are compared.
Arguments :-
DB_Detail1 :- a list of instance name and DB name for original DB
DB_Detail2 :- a list of instance name and DB name for Restored DB
e.g :- DB_Detail1 = ['Instance_name', 'Database_name']
'''
# Compare schema
db_schema1 = self.get_db_Schema(
Instance_name=DB_Detail1[0], DB_name=DB_Detail1[1])
db_schema2 = self.get_db_Schema(
Instance_name=DB_Detail2[0], DB_name=DB_Detail2[1])
schema1_diff_schema2 = {}
schema2_diff_schema1 = {}
print db_schema1
print db_schema2
set_current, set_past = set(db_schema1.keys()), set(db_schema2.keys())
intersect = set_current.intersection(set_past)
added = set_current - intersect
removed = set_past - intersect
changed = set(k for k in intersect if db_schema2[k] != db_schema1[k])
unchanged = set(k for k in intersect if db_schema2[k] == db_schema1[k])
print added,removed,changed,unchanged
[schema1_diff_schema2.update(i) for i in [{m :db_schema1[m]} for m in added ]]
[schema1_diff_schema2.update(i) for i in [{m :db_schema1[m]} for m in changed]]
[schema2_diff_schema1.update(i) for i in [{m :db_schema2[m]} for m in removed]]
[schema2_diff_schema1.update(i) for i in [{m :db_schema2[m]} for m in changed]]
if added == set([]) and removed == set([]) and changed == set([]):
print "Schema of both DB Matches"
else:
print "Schema of both DB Varies"
# Compare data
data_DB1 = self.get_db_data(
Instance_name=DB_Detail1[0], DB_name=DB_Detail1[1])
data_DB2 = self.get_db_data(
Instance_name=DB_Detail2[0], DB_name=DB_Detail2[1])
data1_diff_data2 = {}
data2_diff_data1 = {}
set_current_data, set_past_data = set(data_DB1.keys()), set(data_DB2.keys())
intersect = set_current_data.intersection(set_past_data)
added = set_current_data - intersect
removed = set_past_data - intersect
changed = set(k for k in intersect if data_DB2[k] != data_DB1[k])
unchanged = set(k for k in intersect if data_DB2[k] == data_DB1[k])
print added,removed,changed,unchanged
[data1_diff_data2.update(i) for i in [{m :data_DB1[m]} for m in added ]]
[data1_diff_data2.update(i) for i in [{m :data_DB1[m]} for m in changed]]
[data2_diff_data1.update(i) for i in [{m :data_DB2[m]} for m in removed]]
[data2_diff_data1.update(i) for i in [{m :data_DB2[m]} for m in changed]]
print "Diff DB1 vs DB2 :- ", data1_diff_data2
print "Diff DB1 vs DB2 :- ", data2_diff_data1
if added == set([]) and removed == set([]) and changed == set([]):
print "Data of both DB Matches"
else:
print "Data of both DB Varies"