2017-05-23 76 views
6

我正在嘗試使用SQLAlchemy使用臨時表,並將其與現有表聯接起來。這是我迄今爲止在SQLAlchemy中使用臨時表

engine = db.get_engine(db.app, 'MY_DATABASE') 
df = pd.DataFrame({"id": [1, 2, 3], "value": [100, 200, 300], "date": [date.today(), date.today(), date.today()]}) 
temp_table = db.Table('#temp_table', 
         db.Column('id', db.Integer), 
         db.Column('value', db.Integer), 
         db.Column('date', db.DateTime)) 
temp_table.create(engine) 
df.to_sql(name='tempdb.dbo.#temp_table', 
      con=engine, 
      if_exists='append', 
      index=False) 
query = db.session.query(ExistingTable.id).join(temp_table, temp_table.c.id == ExistingTable.id) 
out_df = pd.read_sql(query.statement, engine) 
temp_table.drop(engine) 
return out_df.to_dict('records') 

這不返回任何結果,因爲插入語句to_sql確實沒有得到運行(我想這是因爲他們使用sp_prepexec運行,但我不完全肯定這一點)。

然後我試着寫出SQL語句(CREATE TABLE #temp_table...,INSERT INTO #temp_table...,SELECT [id] FROM...),然後運行pd.read_sql(query, engine)。我收到錯誤消息

此結果對象不返回行。它已自動關閉。

我想這是因爲聲明不僅僅是SELECT

我該如何解決這個問題(兩個解決方案都可以工作,儘管第一個方法會更好,因爲它避免了硬編碼的SQL)。清楚的是,我無法修改現有數據庫中的架構 - 這是一個供應商數據庫。

+0

「ExistingTable」中是否有記錄? –

+0

@AzatIbrakov是的。我確實將它改爲了左連接並添加了'temp_table.c.date'。我在'date'列中用'None'返回行。 –

+0

爲什麼你的'日期'列的類型是'DateTime'而不是'Date'? –

回答

4

如果在臨時表中插入的記錄的數量是小/中等,一種可能性是使用一個literal subqueryvalues CTE而不是創建臨時表。

# MODEL 
class ExistingTable(Base): 
    __tablename__ = 'existing_table' 
    id = sa.Column(sa.Integer, primary_key=True) 
    name = sa.Column(sa.String) 
    # ... 

假設也下列數據將被插入到temp表:

# This data retrieved from another database and used for filtering 
rows = [ 
    (1, 100, datetime.date(2017, 1, 1)), 
    (3, 300, datetime.date(2017, 3, 1)), 
    (5, 500, datetime.date(2017, 5, 1)), 
] 

創建CTE或包含該數據的子查詢:

stmts = [ 
    # @NOTE: optimization to reduce the size of the statement: 
    # make type cast only for first row, for other rows DB engine will infer 
    sa.select([ 
     sa.cast(sa.literal(i), sa.Integer).label("id"), 
     sa.cast(sa.literal(v), sa.Integer).label("value"), 
     sa.cast(sa.literal(d), sa.DateTime).label("date"), 
    ]) if idx == 0 else 
    sa.select([sa.literal(i), sa.literal(v), sa.literal(d)]) # no type cast 

    for idx, (i, v, d) in enumerate(rows) 
] 
subquery = sa.union_all(*stmts) 

# Choose one option below. 
# I personally prefer B because one could reuse the CTE multiple times in the same query 
# subquery = subquery.alias("temp_table") # option A 
subquery = subquery.cte(name="temp_table") # option B 

創建與最後的查詢需要連接和過濾器:

query = (
    session 
    .query(ExistingTable.id) 
    .join(subquery, subquery.c.id == ExistingTable.id) 
    # .filter(subquery.c.date >= XXX_DATE) 
) 

# TEMP: Test result output 
for res in query: 
    print(res)  

最後,得到熊貓數據幀:

out_df = pd.read_sql(query.statement, engine) 
result = out_df.to_dict('records') 
+0

哈,我只是*想着今天早上在我的通勤路上這麼做。我會給它一個鏡頭,讓你知道。 –

+0

是的,這是行得通的。感謝您的好評。 –

0

你可以嘗試用另一種解決方案 - 工藝鍵控表

一個過程密鑰表只是充當 臨時表永久表。爲了允許進程同時使用該表, 表具有額外的列以標識進程。 這樣做的最簡單方法是全局變量@@ spid(@@ spid是SQL 服務器中的進程ID)。

...爲進程密鑰

一種替代方法是使用GUID(數據類型 唯一標識符)。

http://www.sommarskog.se/share_data.html#prockeyed

+0

您是否建議在tempdb中創建此表?我想無論哪種方式,它會遇到我在我的問題的第二部分遇到的同樣的問題,其中'read_sql'不返回任何行。 –

+0

這個表應該在你的db('MY_DATABASE')中創建,而不是臨時數據庫。這不是一件好事,但它應該起作用。 –

+0

我無權在該數據庫中創建表。這是一個供應商數據庫。 –