2016-12-06 82 views
2

我有通過DStream從Kafka到達的數據。我想要執行特徵提取以獲得一些關鍵字。如何在Apache Spark中使用DStream進行特徵提取

我不想等待所有數據的到來(因爲它打算是連續的流,可能永遠不會結束),所以我希望能夠以大塊的方式執行提取 - 如果精度將會忍受一點。

到目前爲止,我放在一起類似的東西:

def extractKeywords(stream: DStream[Data]): Unit = { 

    val spark: SparkSession = SparkSession.builder.getOrCreate 

    val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData 

    val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _ 

    val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData 

    streamWithFeatures.print() 
} 

def extractFeatures(spark: SparkSession) 
        (rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = { 

    val df = spark.createDataFrame(rdd).toDF("data", "words") 

    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures) 
    val rawFeatures = hashingTF.transform(df) 

    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") 
    val idfModel = idf.fit(rawFeatures) 

    val rescaledData = idfModel.transform(rawFeature) 

    import spark.implicits._ 
    rescaledData.select("data", "features").as[(Data, Array[String])].rdd 
} 

不過,我收到java.lang.IllegalStateException: Haven't seen any document yet. - 我並不感到驚訝,因爲我只是嘗試了放棄的東西放在一起,我的理解是,因爲我不等待一些數據的到來,當我嘗試在數據上使用它時,生成的模型可能是空的。

這個問題的正確方法是什麼?

回答

0

我使用的意見建議和分裂過程成2個運行:

  • 一個計算IDF模型並將其保存到文件

    def trainFeatures(idfModelFile: File, rdd: RDD[(String, Seq[String])]) = { 
        val session: SparkSession = SparkSession.builder.getOrCreate 
    
        val wordsDf = session.createDataFrame(rdd).toDF("data", "words") 
    
        val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures") 
        val featurizedDf = hashingTF.transform(wordsDf) 
    
        val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") 
        val idfModel = idf.fit(featurizedDf) 
    
        idfModel.write.save(idfModelFile.getAbsolutePath) 
    } 
    
  • 一個從文件中讀取IDF模型和只需在所有收到的信息上運行它

    val idfModel = IDFModel.load(idfModelFile.getAbsolutePath) 
    
    val documentDf = spark.createDataFrame(rdd).toDF("update", "document") 
    
    val tokenizer = new Tokenizer().setInputCol("document").setOutputCol("words") 
    val wordsDf = tokenizer.transform(documentDf) 
    
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures") 
    val featurizedDf = hashingTF.transform(wordsDf) 
    
    val extractor = idfModel.setInputCol("rawFeatures").setOutputCol("features") 
    val featuresDf = extractor.transform(featurizedDf) 
    
    featuresDf.select("update", "features") 
    
相關問題