2017-03-02 51 views
1

的流程如下:SPARK到HBase的在我的SPARK程序編寫

驅動器 - > HBase的連接創建 - >廣播HBase的從遺囑執行人處理 現在,我們取這個把手,並試圖寫入到HBase的

在驅動程序,我創建HBase的CONF對象和連接對象,然後通過JavaSPARK上下文廣播它如下:

 SparkConf sparkConf = JobConfigHelper.getSparkConfig(); 

     Configuration conf = new Configuration(); 
     UserGroupInformation.setConfiguration(conf); 

     jsc = new JavaStreamingContext(sparkConf, 
           Durations.milliseconds(Long.parseLong(batchDuration))); 

     Configuration hconf=HBaseConfiguration.create(); 
     hconf.addResource(new Path("/etc/hbase/conf/core-site.xml")); 
     hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); 
     UserGroupInformation.setConfiguration(hconf); 

    JavaSparkContext js = jsc.sparkContext(); 
Connection connection = ConnectionFactory.createConnection(hconf); 
     connectionbroadcast=js.broadcast(connection); 

內部()調用執行程序的方法,

Table table = connectionbroadcast.getValue().getTable(TableName.valueOf("gfttsdgn:FRESHHBaseRushi")) ; 

           Put p = new Put(Bytes.toBytes("row1")); 

           p.add(Bytes.toBytes("c1"), Bytes.toBytes("output"), Bytes.toBytes("rohan")); 
           table.put(p); 

獲得以下異常試圖在紗線客戶端模式下運行時:

17/03/02 09:19:38 ERROR yarn.ApplicationMaster: User class threw exception: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException 
    Serialization trace: 
    classes (sun.misc.Launcher$AppClassLoader) 
    classLoader (org.apache.hadoop.conf.Configuration) 
    conf (org.apache.hadoop.hbase.client.RpcRetryingCallerFactory) 
    rpcCallerFactory (org.apache.hadoop.hbase.client.AsyncProcess) 
    asyncProcess (org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation) 
    com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException 
    Serialization trace: 
    classes (sun.misc.Launcher$AppClassLoader) 
    classLoader (org.apache.hadoop.conf.Configuration) 
    conf (org.apache.hadoop.hbase.client.RpcRetryingCallerFactory) 
    rpcCallerFactory (org.apache.hadoop.hbase.client.AsyncProcess) 
    asyncProcess (org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
     at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) 
     at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) 
     at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) 
     at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) 
     at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) 
     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1337) 
     at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:639) 
     at com.citi.fresh.core.driver.FreshDriver.main(FreshDriver.java:178) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542) 
    Caused by: java.util.ConcurrentModificationException 
     at java.util.Vector$Itr.checkForComodification(Vector.java:1156) 
     at java.util.Vector$Itr.next(Vector.java:1133) 
     at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) 
     at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) 
     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
     ... 28 more 

回答

0

我可以看到你正在試圖用散裝星火放入數據HBase的。正如@jojo_Berlin解釋的,你的Hbase Conf不是線程安全的。但是,您可以使用SparkOnHbase輕鬆實現此目的。

Configuration conf = HBaseConfiguration.create(); 
conf.addResource(new Path("/etc/hbase/conf/core-site.xml")); 
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); 
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); 
hbaseContext.bulkPut(rdd, TableName.valueOf("gfttsdgn:FRESHHBaseRushi"), new PutFunction(), true); 

你在哪裏 '放' 功能是:

public static class PutFunction implements Function<String, Put> { 
    public Put call(String v) throws Exception { 
     Put put = new Put(Bytes.toBytes(v)); 
     put.add(Bytes.toBytes("c1"), Bytes.toBytes("output"), 
     Bytes.toBytes("rohan")); 
     return put; 
    } 
}