2
我有通過DStream從Kafka到達的數據。我想要執行特徵提取以獲得一些關鍵字。如何在Apache Spark中使用DStream進行特徵提取
我不想等待所有數據的到來(因爲它打算是連續的流,可能永遠不會結束),所以我希望能夠以大塊的方式執行提取 - 如果精度將會忍受一點。
到目前爲止,我放在一起類似的東西:
def extractKeywords(stream: DStream[Data]): Unit = {
val spark: SparkSession = SparkSession.builder.getOrCreate
val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData
val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _
val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData
streamWithFeatures.print()
}
def extractFeatures(spark: SparkSession)
(rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {
val df = spark.createDataFrame(rdd).toDF("data", "words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
val rawFeatures = hashingTF.transform(df)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(rawFeatures)
val rescaledData = idfModel.transform(rawFeature)
import spark.implicits._
rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}
不過,我收到java.lang.IllegalStateException: Haven't seen any document yet.
- 我並不感到驚訝,因爲我只是嘗試了放棄的東西放在一起,我的理解是,因爲我不等待一些數據的到來,當我嘗試在數據上使用它時,生成的模型可能是空的。
這個問題的正確方法是什麼?