我正在使用airflow,我希望能夠跟蹤表airflow.file_list
中由給定任務實例生成的所有文件,這是與airflow(在postgres上運行)使用的相同數據庫的一部分, 。使用SQLAlchemy的我有我的file_list
表下面的映射:SQLAlchemy與空氣流中的task_instance表的關係
from airflow.models import Base
class MySourceFile(Base):
""" SQLAlchemy mapper class for the file_list table entries."""
__table__ = Table('file_list', Base.metadata,
Column('UID', Integer, primary_key=True),
Column('task_id', String(_ID_LEN), nullable=False),
Column('dag_id', String(_ID_LEN), nullable=False),
Column('execution_date', DateTime, nullable=False),
Column('file_path', String(_ID_LEN), nullable=False),
Column('file_sha256', String(_ID_LEN), nullable=False),
ForeignKeyConstraint(
['task_id', 'dag_id', 'execution_date'],
['task_instance.task_id', 'task_instance.dag_id', 'task_instance.execution_date']
),
extend_existing=True,
)
instance_task = relationship(
TaskInstance,
primaryjoin=and_(
TaskInstance.task_id == __table__.c.task_id,
TaskInstance.dag_id == __table__.c.dag_id,
TaskInstance.execution_date == __table__.c.execution_date
),
viewonly=True,
foreign_keys=[__table__.c.task_id, __table__.c.dag_id, __table__.c.execution_date]
)
我從進口的airflow.modles
聲明的基礎,因爲我讀過,這是必要的交互映射器共享基地的同一個實例。在上面的代碼片段中,我想要instance_task
引用創建該文件的task_instance。表中的列task_id
,dag_id
和execution_date
在我的表airflow.file_list
鏡像主鍵在airflow.task_instance
。不幸的是,當我運行氣流服務器時,出現以下錯誤:
sqlalchemy.exc.InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'Mapper|MySourceFile|file_list'. Original exception was: Can't determine relationship direction for relationship 'MySourceFile.instance_task' - foreign key columns are present in neither the parent nor the child's mapped tables
如果可能,我不希望修改氣流源。預先感謝您的任何幫助。
不是你正在尋找的答案,但安裝氣流和使用你的模型都很好。我會確保正在使用的表格確實具有fk約束。您正在使用'extend_existing',因此即使該定義之前的元數據中存在該表並且沒有fk,但應該進行雙重檢查,也應該覆蓋該範圍。 –