2017-10-04 29 views
2

我正在使用airflow,我希望能夠跟蹤表airflow.file_list中由給定任務實例生成的所有文件,這是與airflow(在postgres上運行)使用的相同數據庫的一部分, 。使用SQLAlchemy的我有我的file_list表下面的映射:SQLAlchemy與空氣流中的task_instance表的關係

from airflow.models import Base 

class MySourceFile(Base): 
    """ SQLAlchemy mapper class for the file_list table entries.""" 
    __table__ = Table('file_list', Base.metadata, 
     Column('UID', Integer, primary_key=True), 
     Column('task_id', String(_ID_LEN), nullable=False), 
     Column('dag_id', String(_ID_LEN), nullable=False), 
     Column('execution_date', DateTime, nullable=False), 
     Column('file_path', String(_ID_LEN), nullable=False), 
     Column('file_sha256', String(_ID_LEN), nullable=False), 
     ForeignKeyConstraint(
      ['task_id', 'dag_id', 'execution_date'], 
      ['task_instance.task_id', 'task_instance.dag_id', 'task_instance.execution_date'] 
     ), 
     extend_existing=True, 
    ) 

    instance_task = relationship(
    TaskInstance, 
    primaryjoin=and_(
     TaskInstance.task_id == __table__.c.task_id, 
     TaskInstance.dag_id == __table__.c.dag_id, 
     TaskInstance.execution_date == __table__.c.execution_date 
    ), 
    viewonly=True, 
    foreign_keys=[__table__.c.task_id, __table__.c.dag_id, __table__.c.execution_date] 
) 

我從進口的airflow.modles聲明的基礎,因爲我讀過,這是必要的交互映射器共享基地的同一個實例。在上面的代碼片段中,我想要instance_task引用創建該文件的task_instance。表中的列task_id,dag_idexecution_date在我的表airflow.file_list鏡像主鍵在airflow.task_instance。不幸的是,當我運行氣流服務器時,出現以下錯誤:

sqlalchemy.exc.InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'Mapper|MySourceFile|file_list'. Original exception was: Can't determine relationship direction for relationship 'MySourceFile.instance_task' - foreign key columns are present in neither the parent nor the child's mapped tables

如果可能,我不希望修改氣流源。預先感謝您的任何幫助。

+1

不是你正在尋找的答案,但安裝氣流和使用你的模型都很好。我會確保正在使用的表格確實具有fk約束。您正在使用'extend_existing',因此即使該定義之前的元數據中存在該表並且沒有fk,但應該進行雙重檢查,也應該覆蓋該範圍。 –

回答

0

首先指定__table__就像那是奇怪的(我從來沒有見過它),並且作爲一般規則,任何以下劃線開頭和下劃線都是私有的,應該避免。

您需要指定task_id是外鍵。類似這樣的:

task_id = Column(String, ForeignKey('task.id')) 

如果你看起來不像他們那麼docs on SQL Alchemy relationships很有用。並且backref模式可能有助於創建從任務實例到您的自定義文件的關係,而無需更改TaskInstance模型。

+2

以我已經完成的方式指定列/表格被稱爲[混合方法](http://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/table_config.html#using-a-混合式方法與表)在文檔中。我以前使用過你建議的聲明式方法,但沒有成功。 – AdvancedGarde89

+0

啊對了。不過,您應該能夠在此模式下將'ForeignKey(task_instance.id)'傳遞給Column()調用。 –

+0

哦。我完全錯過了它是一個複合外鍵。 –