2017-01-06 81 views
1

我有一個從Kafka消耗的流式作業(使用createDstream)。 它的「身份證」在Spark流式作業中調用實用程序(外部)

[id1,id2,id3 ..] 

流我有一個接受的id的數組,並做了一些外部呼叫並接收回一些信息說「T」每個ID

[id:t1,id2:t2,id3:t3...] 
的工具或API

我希望在調用實用程序保留Dstream時保留DStream。我不能在Dstream rdd上使用地圖轉換,因爲它會調用每個id,而且該實用程序正在接受id的集合。

Dstream.map(x=> myutility(x)) -- ruled out 

如果我用

Dstream.foreachrdd(rdd=> myutility(rdd.collect.toarray)) 

我失去了DStream。我需要保留DStream用於下游處理。

+0

重新設計'myutility',以便它可以正確地並行工作?在Spark中有單一的本地收藏是不行的。並行的 – user7337271

+0

@ user7337271由Dstream.foreachrdd(rdd => myutility(rdd.collect.toarray))實現,但是丟失了DStream –

+0

這裏沒有並行性。整個機構'foreachrdd(rdd => myutility(rdd.collect.toarray))'在驅動程序本地執行。你可以'轉換(rdd => sc.parallelize(myutility(rdd.collect.toarray)))',但它並不能解決這個問題。 – user7337271

回答

3

實現外部批量調用的方法是直接在分區級別轉換DStream中的RDD。

的圖案看起來像這樣:

val transformedStream = dstream.transform{rdd => 
    rdd.mapPartitions{iterator => 
     val externalService = Service.instance() // point to reserve local resources or make server connections. 
     val data = iterator.toList // to act in bulk. Need to tune partitioning to avoid huge data loads at this level 
     val resultCollection = externalService(data) 
     resultCollection.iterator 
    } 
} 

該方法過程中使用的簇中的可用資源並聯底層RDD的每個分區。請注意,需要爲每個分區(而不是每個元素)實例化與外部系統的連接。

+0

謝謝,得到了我一直在尋找的東西 –

相關問題