1

我正在嘗試使用Spark流接收來自Kafka的消息,將它們轉換爲Put並插入到HBase中。 我創建了一個inputDstream來接收來自Kafka的消息,然後創建一個JobConf並最終使用saveAsHadoopDataset(JobConf)將記錄保存到HBase中。通過Spark將Kafka消息保存到HBase中。會話永不關閉

每次插入HBase的記錄都會建立一個從Hbase到zookeeper的會話,但從不關閉。如果連接數量增加超過zookeeper的最大客戶端連接數,則會導致Spark流崩潰。

我的代碼如下所示:

import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.StreamingContext 
import org.apache.hadoop.hbase.client.Put 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.mapred.JobConf 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.kafka._ 
import kafka.serializer.StringDecoder 

object ReceiveKafkaAsDstream { 

    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream") 
    val ssc = new StreamingContext(sparkConf, Seconds(1)) 

    val topics = "test" 
    val brokers = "10.0.2.15:6667" 

    val topicSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 

    val tableName = "KafkaTable" 
    val conf = HBaseConfiguration.create() 
    conf.set("zookeeper.znode.parent", "/hbase-unsecure") 
    conf.set("hbase.zookeeper.property.clientPort", "2181") 
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) 

    val jobConfig: JobConf = new JobConf(conf, this.getClass) 
    jobConfig.set("mapreduce.output.fileoutputformat", "/user/root/out") 
    jobConfig.setOutputFormat(classOf[TableOutputFormat]) 
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName) 

     val records = messages 
     .map(_._2) 
     .map(SampleKafkaRecord.parseToSampleRecord) 
     records.print() 
     records.foreachRDD{ stream => stream.map(SampleKafkaRecord.SampleToHbasePut).saveAsHadoopDataset(jobConfig) } 

    ssc.start() 
    ssc.awaitTermination() 
    } 

    case class SampleKafkaRecord(id: String, name: String) 
    object SampleKafkaRecord extends Serializable { 
    def parseToSampleRecord(line: String): SampleKafkaRecord = { 
     val values = line.split(";") 
     SampleKafkaRecord(values(0), values(1)) 
    } 

    def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = { 
     val rowKey = CSVData.id 
     val putOnce = new Put(rowKey.getBytes) 

     putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes) 
     return (new ImmutableBytesWritable(rowKey.getBytes), putOnce) 
    } 
    } 
} 

我設置SSC(SparkStreamingContext)的持續時間爲1s和設置maxClientCnxns如動物園管理員CONF文件zoo.cfg 10,所以有從一個允許至多10個連接客戶端到動物園管理員。

10秒鐘後(10次從HBase的設立動物園管理員),我得到了如下圖所示的錯誤:

16/08/24 14:59:30 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/hbaseid 
16/08/24 14:59:31 INFO ClientCnxn: Opening socket connection to server localhost.localdomain/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 
16/08/24 14:59:31 INFO ClientCnxn: Socket connection established to localhost.localdomain/127.0.0.1:2181, initiating session 
16/08/24 14:59:31 WARN ClientCnxn: Session 0x0 for server localhost.localdomain/127.0.0.1:2181, unexpected error, closing socket connection and attempting reconnect 
java.io.IOException: Connection reset by peer 
     at sun.nio.ch.FileDispatcherImpl.read0(Native Method) 
     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
     at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384) 
     at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) 
     at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366) 
     at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125) 

在我的理解,存在這種錯誤,因爲連接數超過飼養員的最大連接。如果我將maxClientCnxn設置爲20,流處理能夠持續20秒。我知道我可以將maxClientCnxn設置爲無限制,但我真的不認爲這是解決此問題的好方法。

另一件事是,如果我使用TextFileStream獲取文本文件作爲DStream並使用saveAsHadoopDataset(jobConf)將它們保存到hbase中,它運行得非常好。如果我只是使用val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)從kafka讀取數據並且只是將信息打印出來,那麼也沒有問題。當我收到kafka消息,然後將它們保存到應用程序中的HBase中時就會出現問題。

我的環境是HDP 2.4沙箱。版本spark:1.6,hbase:1.1.2,kafka:2.10.0,zookeeper:3.4.6。

任何幫助表示讚賞。

回答

2

那麼,最後我得到它的工作。

  1. 屬性設置:

有一個名爲 「zookeeper.connection.timeout.ms」 屬性。該屬性應該設置爲1。

  • 更改爲新的API:
  • 更改方法saveAsHadoopDataset(JobConf)saveAsNewAPIHadoopDataset(JobConf)。我仍然不知道爲什麼舊的API不工作。

    變化import org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.hbase.mapreduce.TableOutputFormat

    +0

    的極端很有幫助。謝謝。 –