2
根據火花DAG vizualization,在階段0中執行groupBy
後,在階段1中執行groupBy
。在我的代碼中只有一個groupBy
,並且不會期望任何其他轉換我正在做結果groupBy
。在火花DAG中的外部組
下面的代碼(clojure
/flambo
):
;; stage 0
(-> (.textFile sc path 8192)
(f/map (f/fn [msg] (json/parse-string msg true)))
(f/group-by (f/fn [msg] (:mmsi msg)) 8192)
;; stage 1
(f/map-values (f/fn [values] (sort-by :timestamp (vec values))))
(f/flat-map (ft/key-val-fn (f/fn [mmsi messages]
(let [state-map (atom {}) draught-map (atom {})]
(map #(mk-line % state-map draught-map) (vec messages))))))
(f/map (f/fn [line] (json/generate-string line)))
(f/save-as-text-file path)))
很清楚,我0期是怎樣的序列textFile
,map
,groupBy
和第1階段是map-values
,map-values
,flat-map
,map
,saveAsTextFile
,但第一階段的groupBy
從哪裏來?
由於groupBy
導致洗牌這是計算昂貴和費時的,我不想,如果它可以幫助外來之一。