2016-07-07 31 views
1

我有下面這段代碼:HBase的連接實例

DStream.map { 
     _.message() 
}.foreachRDD { rdd => 
    rdd.foreachPartition{iter => 
     val conf = HBaseUtils.configureHBase("iemployee") 
     val connection = ConnectionFactory.createConnection(conf) 
     val table = connection.getTable(TableName.valueOf("""iemployee""")) 
     iter.foreach{elem => 
     /* loop through the records in the partition and push them out to the DB */ 
    } 
} 

是否有人可以告訴我,如果連接對象創建val connection = ConnectionFactory.createConnection(conf)這裏是每個分區使用(因爲我從來沒有關閉它)相同的連接對象或會爲每個分區創建新的連接對象?每個分區

回答

1

新的連接實例..

請參閱下面的code & documentation of Connection Factory。還有人提到它的來電者有責任關閉連接。

/** 
    * Create a new Connection instance using the passed <code>conf</code> instance. Connection 
    * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 
    * created from returned connection share zookeeper connection, meta cache, and connections 
    * to region servers and masters. 
    * <br> 
    * The caller is responsible for calling {@link Connection#close()} on the returned 
    * connection instance. 
    * 
    * Typical usage: 
    * <pre> 
    * Connection connection = ConnectionFactory.createConnection(conf); 
    * Table table = connection.getTable(TableName.valueOf("table1")); 
    * try { 
    * table.get(...); 
    * ... 
    * } finally { 
    * table.close(); 
    * connection.close(); 
    * } 
    * </pre> 
    * 
    * @param conf configuration 
    * @param user the user the connection is for 
    * @param pool the thread pool to use for batch operations 
    * @return Connection object for <code>conf</code> 
    */ 
    public static Connection createConnection(Configuration conf, ExecutorService pool, User user) 
    throws IOException { 
    if (user == null) { 
     UserProvider provider = UserProvider.instantiate(conf); 
     user = provider.getCurrent(); 
    } 

    String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 
     ConnectionImplementation.class.getName()); 
    Class<?> clazz; 
    try { 
     clazz = Class.forName(className); 
    } catch (ClassNotFoundException e) { 
     throw new IOException(e); 
    } 
    try { 
     // Default HCM#HCI is not accessible; make it so before invoking. 
     Constructor<?> constructor = 
     clazz.getDeclaredConstructor(Configuration.class, 
      ExecutorService.class, User.class); 
     constructor.setAccessible(true); 
     return (Connection) constructor.newInstance(conf, pool, user); 
    } catch (Exception e) { 
     throw new IOException(e); 
    } 
    } 
} 

希望這有助於!

+0

非常感謝! – CapturedTree