2016-11-06 76 views
0

當我使用地圖內的richfatMapFunction從hbase讀取時,出現序列化錯誤。我想要做的是,如果一個數據流等於從hbase讀取的特定字符串,否則忽略。以下是示例程序和我遇到的錯誤。Flink在從hbase讀取時發生序列化錯誤

package com.abb.Flinktest 
import java.text.SimpleDateFormat 
import java.util.Properties 

import scala.collection.concurrent.TrieMap 
import org.apache.flink.addons.hbase.TableInputFormat 
import org.apache.flink.api.common.functions.RichFlatMapFunction 
import org.apache.flink.api.common.io.OutputFormat 
import org.apache.flink.api.java.tuple.Tuple2 
import org.apache.flink.streaming.api.scala.DataStream 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import org.apache.flink.streaming.api.scala.createTypeInformation 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.apache.flink.util.Collector 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.TableName 
import org.apache.hadoop.hbase.client.ConnectionFactory 
import org.apache.hadoop.hbase.client.HTable 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.client.Result 
import org.apache.hadoop.hbase.client.Scan 
import org.apache.hadoop.hbase.filter.BinaryComparator 
import org.apache.hadoop.hbase.filter.CompareFilter 
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.log4j.Level 
import org.apache.flink.api.common.functions.RichMapFunction 

object Flinktesthbaseread { 

    def main(args:Array[String]) 
    { 
    val env = StreamExecutionEnvironment.createLocalEnvironment() 
    val kafkaStream = env.fromElements("hello") 
    val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()))  
    env.execute() 
    } 
     class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable 
    { 
     var conf: org.apache.hadoop.conf.Configuration = null; 
    var table: org.apache.hadoop.hbase.client.HTable = null; 
    var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null 
    var taskNumber: String = null; 
    var rowNumber = 0; 
    val serialVersionUID = 1L; 

    override def open(parameters: org.apache.flink.configuration.Configuration) { 
     println("getting table") 
     conf = HBaseConfiguration.create() 
     val in = getClass().getResourceAsStream("/hbase-site.xml") 

     conf.addResource(in) 
     hbaseconnection = ConnectionFactory.createConnection(conf) 
     table = new HTable(conf, "testtable"); 
    // this.taskNumber = String.valueOf(taskNumber); 
    } 

    override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]]) 
     { 
       //flatmap operation here 
     } 

     override def close() { 

     table.flushCommits(); 
     table.close(); 
    } 

    } 
} 

錯誤:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$). 
log4j:WARN Please initialize the log4j system properly. 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable 
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172) 
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164) 
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617) 
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959) 
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484) 
    at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45) 
    at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala) 
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream 
    - field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", name: "kafkaStream$1", type: "class org.apache.flink.streaming.api.scala.DataStream") 
    - root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", <function1>) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301) 
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170) 
    ... 6 more 

我試過包裝的方法,並通過使類序列化是逢一類,但沒有運氣內的電磁場。有人可能會對此提出一些指示或者爲此提出一些解決方法。

回答

3

問題是,你正試圖訪問映射函數中的kafka流變量,這是不可序列化的。它只是數據的抽象表示。它不包含任何內容,首先使您的功能無效。

代替,做這樣的事情:

kafkaStream.filter(x => x.equals("hello")).flatMap(new ReadHBase()) 

過濾funtion將只保留其條件爲真的元素,以及那些將被傳遞到您的flatMap功能。

我強烈建議您閱讀基礎API概念文檔,因爲在指定轉換時似乎存在某些實際發生的錯誤理解。

+0

謝謝你的工作。我會在繼續下一步之前閱讀API文檔。 –

相關問題