我想用Flink-HBase插件讀出數據,然後將其作爲Flink機器學習算法(分別爲SVM和MLR)的輸入。現在我首先將提取的數據寫入臨時文件,然後通過libSVM方法讀取它,但我想應該有更復雜的方法。用於機器學習算法的Flink HBase輸入
你有代碼片段或想法如何做?
我想用Flink-HBase插件讀出數據,然後將其作爲Flink機器學習算法(分別爲SVM和MLR)的輸入。現在我首先將提取的數據寫入臨時文件,然後通過libSVM方法讀取它,但我想應該有更復雜的方法。用於機器學習算法的Flink HBase輸入
你有代碼片段或想法如何做?
無需將數據寫入磁盤,然後使用MLUtils.readLibSVM
進行讀取。原因如下。
MLUtils.readLibSVM
需要一個文本文件,其中每一行都是稀疏特徵矢量及其相關標籤。它使用以下格式表示標籤特徵向量對:
<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>
凡<feature>
是後續value
的在特徵向量的索引。 MLUtils.readLibSVM
可以讀取具有此格式的文件並轉換LabeledVector
實例中的每一行。因此,您在讀取libSVM文件後獲得DataSet[LabeledVector]
。這正是SVM
和MultipleLinearRegression
預測器所需的輸入格式。
但是,根據您從HBase獲得的數據格式,首先必須將數據轉換爲libSVM
格式。否則,MLUtils.readLibSVM
將無法讀取寫入的文件。如果您轉換數據,那麼您還可以直接將數據轉換爲DataSet[LabeledVector]
,並將其用作Flink ML算法的輸入。這可以避免不必要的磁盤循環。
如果從HBase的一個DataSet[String]
其中每個字符串具有libSVM
格式(參見上面的說明書中)獲得,然後可以在HBase的DataSet
施加map
操作用下面的映射函數。
val hbaseInput: DataSet[String] = ...
val labelCOODS = hbaseInput.flatMap {
line =>
// remove all comments which start with a '#'
val commentFreeLine = line.takeWhile(_ != '#').trim
if(commentFreeLine.nonEmpty) {
val splits = commentFreeLine.split(' ')
val label = splits.head.toDouble
val sparseFeatures = splits.tail
val coos = sparseFeatures.map {
str =>
val pair = str.split(':')
require(
pair.length == 2,
"Each feature entry has to have the form <feature>:<value>")
// libSVM index is 1-based, but we expect it to be 0-based
val index = pair(0).toInt - 1
val value = pair(1).toDouble
(index, value)
}
Some((label, coos))
} else {
None
}
// Calculate maximum dimension of vectors
val dimensionDS = labelCOODS.map {
labelCOO =>
labelCOO._2.map(_._1 + 1).max
}.reduce(scala.math.max(_, _))
val labeledVectors: DataSet[LabeledVector] =
labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] {
var dimension = 0
override def open(configuration: Configuration): Unit = {
dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0)
}
override def map(value: (Double, Array[(Int, Double)])): LabeledVector = {
new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2))
}
}}.withBroadcastSet(dimensionDS, DIMENSION)
這會將您的libSVM格式數據轉換爲LabeledVectors
的數據集。
謝謝!你的回答非常有幫助! 不幸的是,HBase中的數據集必須在Java類中獲得,現在我得到的錯誤是我的DataSet與Scala類中的方法不兼容: 錯誤:(102,29)java:incompatible types: 'org.apache.flink.api.java.DataSet
你應該也可以使用Scala API從HBase讀取。然後你獲得一個'org.apache.flink.api.scala.Dataset [String]'。 –
Flink是比較新的項目。我想,你可能會在flink郵件列表上得到更好的幫助。 –