我有一個相對簡單的任務,首先在1.2 mio文件上運行,併爲每個文件(每個都保存中間產品的多個步驟)都有一個管道。我已經在luigi中實現了這個功能:https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。我喜歡Luigi使用文件系統來查看任務是否完成。 我還發現了一個可以刪除中間產品的實現,管道將重新創建所有相關產品(這樣我就可以更改管道)。 我該如何在氣流中做到這一點(或者我應該堅持Luigi?)?從luigi切換到氣流
-1
A
回答
1
我真的不知道路易吉是如何工作的。我主要使用Apache Airflow。 Airflow是一個工作流程管理系統。這意味着它不會傳輸數據,轉換數據或生成一些數據(雖然它會生成日誌,並且有一個名爲Xcom
的概念,允許在任務之間交換消息,從而允許更多細微的控制形式和共享狀態。 Apache Nifi。但是它定義了使用Operators
實例化每個任務的依賴關係,例如。 BashOperator
。爲了知道任務是否完成,它會檢查同一任務返回的信號。
以下是您想要在Airflow中實施的示例。
要在氣流使用from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import glob
import gzip
import shutil
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dag', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
def extract_gzs():
for filename in glob.glob('/1002/*.gz')
with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
extractGZ = PythonOperator(
task_id='extract_gz',
provide_context=True,
python_callable=extract_gzs(),
dag=dag)
cmd_cmd="""
your sed script!
"""
sed_script = BashOperator(
task_id='sed_script',
bash_command=cmd_cmd,
dag=dag)
extractGZ.set_downstream(sed_script)
- 進口經營者(當然,如果你需要其他類/庫)
- 定義你的達格。這裏在變量
args
中我定義了owner
和start_date
參數。 - 然後實例化您的DAG。在這裏,我把它命名爲example_dag,歸功於它的定義變量,schedule_interval和之後的時間應該是超時(有更多的根據自己的需要使用參數)
- 創建一個Python函數extract_gzs()
- 實例化一個
PythonOperator
哪裏我打電話給我的蟒蛇FUNC - 做同樣與bash的代碼
- 確定兩個任務之間的依賴關係intances
當然有更多的實施同樣的想法的方式。根據需要來適應! PS:Here有一些Apache Airflow的例子
相關問題
- 1. FFmpeg:Remux f4v從流切換到mp4
- 2. luigi -pip install luigi
- 3. NVD3切換流
- 4. 將流體從一個氣缸轉移到另一個氣缸
- 5. 切換視頻流
- 6. 氣流
- 7. 氣流:dag_id找不到
- 8. 切換到新的工作流程
- 9. 自定義氣球 - 切換按鈕
- 10. 在工作流設計器中從VB切換到C#
- 11. 春季Websocket與SockJs從XHR流切換到Websocket
- 12. 從Hg切換到Git的Subrepo工作流問題
- 13. grafana從http切換到https
- 14. 從InstallShield切換到WiX
- 15. 從JWPlayer切換到Flowplayer
- 16. 從ActiveAndroid切換到GreenDao
- 17. 從PHP切換到Objective-C
- 18. 從3 colums切換到2
- 19. NSURLSessionDownloadTask從http切換到https
- 20. 從UIViewController切換到UITabBarController
- 21. 從bottle.template切換到mako
- 22. 從md5()切換到crypt()
- 23. 從IF/ELSE切換到Javascript
- 24. 從Android Activity切換到MapActivity
- 25. 從Nashorn切換到Rhino(Gradle)
- 26. 從maven切換到sbt
- 27. 從ListView切換到RecyclerView
- 28. 從$資源切換到Restarular
- 29. Android:從XML切換到SQLite
- 30. 從Eclipse切換到Netbeans
也許,我誤解了整個管道的事情。我假設你建立了一條管道,然後你給它一個數據集,然後沿着這條管道進行變換。這意味着一個管道工作在1.2 mio文件上。這不是正確的思考方式嗎?做一個在文件上運行sed的氣流管道,然後將其應用到1.2 mio文件應該是微不足道的,不是嗎? –
@WolfgangKerzendorf查看我的修改答案。 – sdikby
謝謝,我會試試看。這仍然不完全如我所想的那樣。我想爲一個文件構建一個管道,然後以某種方式通過這個東西來推動每個文件。 –