1

我目前嘗試創建某種監控解決方案 - 將一些數據寫入kafka,並使用Spark Streaming讀取此數據並對其進行處理。Spark Streaming - 基於過濾器分離輸入流的最佳方法參數

爲了預處理機器學習和異常檢測的數據,我想根據一些過濾器參數來分割流。到目前爲止,我已經瞭解到DStream本身不能分成幾個流。

我主要面臨的問題是,很多算法(如KMeans)只能獲取連續數據而不是像離散數據那樣的離散數據。網址或其他字符串。

我的要求,將理想:從卡夫卡

  • 讀取數據並生成基於什麼我讀
  • 基於字符串的名單上多個流的字符串列表 - (分流,過濾器流,或什麼是最好的做法)
  • 使用這些流來訓練不同的模型爲每個數據流,以獲得基準和比較之後,後來談到對基線一切

我會很高興得到任何建議如何解決我的問題。我無法想象這種情況不在Spark中涵蓋 - 但直到現在,我還沒有發現一個可行的解決方案。

回答

1

我認爲這應該是足以從原來的創建衍生DStreams,使用過濾器和地圖:

val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e)) 
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e)) 

注意,這些filtermap步驟可以在一個collect步驟結合(不要與混淆無參數RDD.collect這需要數據來驅動!)

val featuresStream = originalDStream.transform(rdd => 
    rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)} 
) 
streamingKMeans.trainOn(featuresStream) 

我們也可以有一組動態保持到一些收集過濾DStreams的。在這裏,我們使用包含我們用於篩選密鑰的地圖:

originalDStream.cache() // important for performance when a DStream is branched out. 
// filterKeys: Set[String] 
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key))) 
// do something with the different DStreams in this structure ... 

這些片段是代碼示例將與實際的邏輯完成。

+0

我該如何去做這個動態列表的字符串 - 例如一個String []?流可以像數組一樣存儲在某種結構中,或者甚至可以更好地映射到一個地圖中?因爲我必須稍後通過某個密鑰或其他密鑰訪問它們。 – LST

+0

當我按照你的指示創建集合,然後嘗試訪問foreach循環中的集合,並在集合中的Streams上調用'forEachRDD',我得到以下異常:java.io.NotSerializableException:org.apache的對象.spark.streaming.kafka010.DirectKafkaInputDStream正在被序列化,可能是RDD操作關閉的一部分。這是因爲DStream對象正在從閉包中引用。請重寫此DStream內的RDD操作以避免這種情況。這已被強制執行,以避免吞噬不必要的對象的Spark任務'任何想法? – LST

+0

這可能是簡單或複雜的解決:-)。你可以用你正在使用的代碼創建另一個問題嗎? – maasg