2017-08-17 39 views
0

有沒有辦法使用scala將csv文件加載到hbase而無需使用spark?我正在尋找類似於Happybase的工具。使用Scala將csv數據加載到Hbase

+0

大多數Java庫可用於Scala。您可以導入相應的Hadoop庫並執行它。儘管如此,不會使用像Happybase這樣的高級API。 – void

回答

0

查看此鏈接僅供參考:http://www.mccarroll.net/snippets/hbaseload/ 希望它有幫助。

HBaseImportExport是一個超類,其定義了一些常用代碼

class HBaseImportExport { 
    var DEFAULT_COLUMN_FAMILY: String = "c1" 
    class HBaseCol(var family: String, var qualifier: String) 
} 

HBaseImporter解釋一個CSV文件作爲字段名的第一行。在制定如何將CSV數據導入HBase時。

import java.io.File 
import java.io.FileInputStream 
import java.io.IOException 
import java.util.ArrayList 
import java.util.HashSet 
import java.util.List 
import java.util.Set 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.hbase._ 
import org.apache.hadoop.hbase.client._ 
import org.apache.log4j.Logger 
import HBaseImporter._ 

object HBaseImporter { 

    def main(args: Array[String]): Unit = { 
    if (args.length < 2 || args.length > 3) { 
     println(
     "Usage: HBaseImporter <tablename> <csv file path> [<key field name>]") 
    } 
    val tableName: String = args(0) 
    val f: File = new File(args(1)) 
    var keyColumn: String = null 
    if (args.length > 2) { 
     keyColumn = args(2) 
    } 
    val importer: HBaseImporter = new HBaseImporter(tableName) 
    importer.importCSV(f, keyColumn) 
    } 

} 

class HBaseImporter(var tableName: String) extends HBaseImportExport { 
    var admin: HBaseAdmin = _ 
    var config: Configuration = _ 
    var families: Set[String] = new HashSet[String]() 
    var columns: List[HBaseCol] = new ArrayList[HBaseCol]() 
    var keyPosition: Int = -1 

    def init(): Unit = { 
    config = HBaseConfiguration.create() 
    admin = new HBaseAdmin(config) 
    } 

    private def deleteTable(): Unit = { 
    try { 
     admin.disableTable(tableName) 
     admin.deleteTable(tableName) 
    } catch { 
     case e: Exception => {} 

    } 
    } 

    private def createTable(): Unit = { 
    val desc: HTableDescriptor = new HTableDescriptor(tableName) 
    admin.createTable(desc) 
    admin.disableTable(tableName) 
    for (family <- families) { 
     val cf1: HColumnDescriptor = new HColumnDescriptor(family) 
     admin.addColumn(tableName, cf1) 
    } 
    admin.enableTable(tableName) 
    } 

    private def analyzeHeaders(headers: Array[String], keyColumn: String): Unit = { 
    columns.clear() 
    families.clear() 
    var col: Int = 0 
    for (header <- headers) { 
     var family: String = DEFAULT_COLUMN_FAMILY 
     var qualifier: String = header 
     var pos: Int = 0 
     if ((pos = header.indexOf(":")) > 0) { 
     family = header.substring(0, pos) 
     qualifier = header.substring(pos + 1) 
     } 
     columns.add(new HBaseCol(family, qualifier)) 
     families.add(family) 
     if (header == keyColumn) { 
     keyPosition = col 
     } 
     { col += 1; col - 1 } 
    } 
    } 

    private def loadData(cis: CsvInputStream): Unit = { 
    val table: HTable = new HTable(config, tableName) 
    var vals: Array[String] = cis.readLine() 
    val logger: Logger = org.apache.log4j.Logger.getLogger(this.getClass) 
    var counter: Int = 0 
    var rowId: String = "" 
    while (vals != null) { 
     rowId = 
     if (keyPosition >= 0 && keyPosition < vals.length) vals(keyPosition) 
     else "r" + counter 
     val put: Put = new Put(rowId.getBytes("UTF-8")) 
     var col: Int = 0 
     for (column <- columns) { 
     if (col >= vals.length) { 
//break 
     } 
     put.add(column.family.getBytes("UTF-8"), 
       column.qualifier.getBytes, 
       vals(col).getBytes) 
     col += 1 
     } 
     table.put(put) 
     vals = cis.readLine() 
     counter += 1 
     if (counter % 10000 == 0) { 
     logger.info("Imported " + counter + " records") 
     } 
    } 
    cis.close() 
    } 

    /** 
    * import CSV to an HBase table 
    * 
    * @param tableName name of the table in HBase 
    * @param csvFile a file 
    * 
    * @throws IOException 
    */ 
    def importCSV(csvFile: File, keyColumn: String): Unit = { 
    init() 
    val fis: FileInputStream = new FileInputStream(csvFile) 
    val cis: CsvInputStream = new CsvInputStream(fis) 
    // read field names from the first line of the csv file 
    analyzeHeaders(cis.readLine(), keyColumn) 
    deleteTable() 
    createTable() 
    loadData(cis) 
    cis.close() 
    } 

}