2015-07-12 37 views
3

我想用Flink-HBase插件讀出數據,然後將其作爲Flink機器學習算法(分別爲SVM和MLR)的輸入。現在我首先將提取的數據寫入臨時文件,然後通過libSVM方法讀取它,但我想應該有更復雜的方法。用於機器學習算法的Flink HBase輸入

你有代碼片段或想法如何做?

+0

Flink是比較新的項目。我想,你可能會在flink郵件列表上得到更好的幫助。 –

回答

3

無需將數據寫入磁盤,然後使用MLUtils.readLibSVM進行讀取。原因如下。

MLUtils.readLibSVM需要一個文本文件,其中每一行都是稀疏特徵矢量及其相關標籤。它使用以下格式表示標籤特徵向量對:

<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info> 

<feature>是後續value的在特徵向量的索引。 MLUtils.readLibSVM可以讀取具有此格式的文件並轉換LabeledVector實例中的每一行。因此,您在讀取libSVM文件後獲得DataSet[LabeledVector]。這正是SVMMultipleLinearRegression預測器所需的輸入格式。

但是,根據您從HBase獲得的數據格式,首先必須將數據轉換爲libSVM格式。否則,MLUtils.readLibSVM將無法​​讀取寫入的文件。如果您轉換數據,那麼您還可以直接將數據轉換爲DataSet[LabeledVector],並將其用作Flink ML算法的輸入。這可以避免不必要的磁盤循環。

如果從HBase的一個DataSet[String]其中每個字符串具有libSVM格式(參見上面的說明書中)獲得,然後可以在HBase的DataSet施加map操作用下面的映射函數。

val hbaseInput: DataSet[String] = ... 
val labelCOODS = hbaseInput.flatMap { 
    line => 
    // remove all comments which start with a '#' 
    val commentFreeLine = line.takeWhile(_ != '#').trim 

    if(commentFreeLine.nonEmpty) { 
     val splits = commentFreeLine.split(' ') 
     val label = splits.head.toDouble 
     val sparseFeatures = splits.tail 
     val coos = sparseFeatures.map { 
     str => 
      val pair = str.split(':') 
      require(
      pair.length == 2, 
      "Each feature entry has to have the form <feature>:<value>") 

      // libSVM index is 1-based, but we expect it to be 0-based 
      val index = pair(0).toInt - 1 
      val value = pair(1).toDouble 

      (index, value) 
     } 

     Some((label, coos)) 
    } else { 
     None 
    } 

// Calculate maximum dimension of vectors 
val dimensionDS = labelCOODS.map { 
    labelCOO => 
    labelCOO._2.map(_._1 + 1).max 
}.reduce(scala.math.max(_, _)) 

val labeledVectors: DataSet[LabeledVector] = 
    labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] { 
    var dimension = 0 

    override def open(configuration: Configuration): Unit = { 
    dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0) 
    } 

    override def map(value: (Double, Array[(Int, Double)])): LabeledVector = { 
    new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2)) 
    } 
}}.withBroadcastSet(dimensionDS, DIMENSION) 

這會將您的libSVM格式數據轉換爲LabeledVectors的數據集。

+0

謝謝!你的回答非常有幫助! 不幸的是,HBase中的數據集必須在Java類中獲得,現在我得到的錯誤是我的DataSet與Scala類中的方法不兼容: 錯誤:(102,29)java:incompatible types: 'org.apache.flink.api.java.DataSet 不能轉換爲org.apache.flink.api.scala.DataSet '' – MsIcklerly

+0

你應該也可以使用Scala API從HBase讀取。然後你獲得一個'org.apache.flink.api.scala.Dataset [String]'。 –