經過深入挖掘,我發現這個helpful section from SQLAlchemy docs。 那麼,從這個代碼片段開始,我創建了下面的類:
import io
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker
class SchemaDump(object):
def __init__(self, db_url, schema_file_path):
self.db_url = db_url
self.schema_file_path = schema_file_path
self.buf = io.BytesIO()
def dump_shema(self):
engine = create_engine(self.db_url)
metadata = MetaData()
metadata.reflect(engine)
def dump(sql, *multiparams, **params):
f = sql.compile(dialect=engine.dialect)
self.buf.write(str(f))
self.buf.write(str(';\n'))
new_engine = create_engine(self.db_url, strategy='mock', executor=dump)
metadata.create_all(new_engine, checkfirst=True)
with io.open(self.schema_file_path, 'wb+') as schema:
schema.write(self.buf.getvalue())
這仍然是相當模糊,但其主要思想是,以捕捉sql.compile(dialect=engine.dialect)
返回的原始SQL語句buf
,並將它們寫入文件。
我寫了一個同樣粗略類從通過上述類創建的.sql
轉儲恢復數據庫:
import io
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class RestoreSchema(object):
def __init__(self, db_url, schema_file_path):
self.db_url = db_url
self.schema_file_path = schema_file_path
def restore_schema(self):
raw_schema = ''
with io.open(self.schema_file_path) as sql_schema:
raw_schema = sql_schema.read()
engine = create_engine(self.db_url)
Session = sessionmaker(bind=engine)
session = Session()
conn = engine.connect()
transaction = conn.begin()
try:
conn.execute(raw_schema)
transaction.commit()
except Exception as e:
transaction.rollback()
raise e
finally:
session.close()
你還是擔心的表是否已經存在等,但效果是完全與我的問題中的代碼片段相同。