5

增量S3文件,我做了如下管道: 任務管理器 - > SQS - >刮板工人(我的應用程序) - > AWS流水 - > S3文件 - >星火 - >紅移(?)。如何處理在星火

有些事情我試圖解決/改進,我會很樂意指導:

  1. 刮板可能得到複製數據,並再次刷新他們流水,這將導致火花的DUP。我應該在開始計算之前使用Distinct函數在火花中解決這個問題嗎?
  2. 我並沒有刪除S3處理過的文件,所以數據越來越大。這是一個很好的做法嗎? (以s3作爲輸入數據庫)或者我應該處理每個文件並在spark完成後刪除它?目前我正在做sc.textFile("s3n://...../*/*/*") - 這將收集我所有的桶文件並運行計算。
  3. 要將結果放入Redshift(或s3) - >我該如何增量執行此操作?也就是說,如果s3變得越來越大,那麼紅移會有重複的數據......我以前總是沖洗它嗎?怎麼樣?
+0

你可以有你的水桶要處理的元素,一旦他們已推,將它們移動到另一個桶,所以你保留一份副本如果需要的話,但你不會處理它們第二次 –

回答

0

我曾經在一個流水線前,雖然沒有遇到這些問題。這是我做的。

  1. 刪除重複

    一個。我用BloomFilter刪除本地重複。請注意,文檔相對不完整,但您可以輕鬆地保存/加載/合併/交叉布隆過濾器對象。你甚至可以在過濾器上做reduce

    b。如果直接將數據從Spark保存到RedShift,則可能需要花費一些時間和精力爲當前批次更新BloomFilter,進行廣播,然後過濾以確保全局不存在重複。在我使用RDS中的UNIQUE約束之前,忽略錯誤,但不幸的是RedShift does not honour the constraint

  2. 和3.數據挺大

我使用EMR集羣運行s3-dist-cp command移動&合併數據(因爲通常有很多小的日誌文件,其影響斯巴克的性能)。如果您碰巧使用EMR託管您的Spark羣集,只需在分析之前添加一個步驟即可將數據從一個存儲桶移至另一個存儲桶。步驟採取command-runner.jar作爲自定義罐子,命令看起來像

s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess 

而且,原distcp不支持合併的文件。

通常,應該儘量避免在同一個桶(或至少,路徑)一起具有加工和未加工的數據。