2015-09-15 91 views
0

我應該將Phoenix數據讀入pyspark。PySpark HBase/Phoenix集成

編輯: 我使用HBase的星火轉換器:

下面的代碼片段:

port="2181" 
host="zookeperserver" 
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" 
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 
cmdata_conf = {"hbase.zookeeper.property.clientPort":port, "hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "camel", "hbase.mapreduce.scan.columns": "data:a"} 
sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=cmdata_conf) 

回溯:

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/hdp/2.3.0.0-2557/spark/python/pyspark/context.py", line 547, in newAPIHadoopRDD 
    jconf, batchSize) 
    File "/usr/hdp/2.3.0.0-2557/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/hdp/2.3.0.0-2557/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. 
: java.io.IOException: No table was provided. 
    at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:130) 

任何幫助將非常感激。

謝謝! /蒂娜

回答

0
+0

我嘗試了第二種方式,但即時得到一個錯誤: Py4JJavaError:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD:同時呼籲ž發生錯誤。 :java.io.IOException:未提供表。 你在PYSPARK做過這些嗎? – Ranic

+0

有你提供合適的配置,以星火newAPIHadoopRDD如下: sparkconf = { 「hbase.zookeeper.quorum」:zookeeperhost, 「hbase.mapreduce.inputtable」:sampletable, 「hbase.mapreduce.scan.columns」 :列} hbase_rdd = sc.newAPIHadoopRDD( 「org.apache.hadoop.hbase.mapreduce.TableInputFormat」, 「org.apache.hadoop.hbase.io.ImmutableBytesWritable」, 「org.apache.hadoop.hbase.client .Result「, keyConverter = keyConv, valueConverter = valueConv, conf = sparkconf) –

+0

請嘗試上面的方法,我認爲你沒有在配置中提供表名。還有t他的keyConv和valueConv的值分別爲examples.pythonconverters.ImmutableBytesWritableToStringConverter和examples.pythonconverters.HBaseResultToStringConverter分別爲 –

0

使用spark phoenix插件是推薦的方法。 關於鳳凰火花插件請找細節here

環境:與AWS EMR 5.10,PySpark

測試,以下是鳳凰https://phoenix.apache.org/language/ 打開鳳凰殼

步驟

  1. 創建表「/usr/lib/phoenix/bin/sqlline.py」

    DROP TABLE IF EXISTS TableName;

    CREATE TABLE TableName(DOMAIN VARCHAR primary key);

    UPSERT INTO TableName(DOMAIN)VALUES('foo');

  2. 下載火花鳳插件jar 下載火花鳳凰插件從https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core/4.11.0-HBase-1.3 你需要鳳凰罐子 - HBase的 - client.jar中,我用鳳凰4.11.0-HBase的-1.3-client.jar中按我的鳳凰和HBase的版本

  3. 從你的Hadoop主目錄,設置以下變量:

    phoenix_jars = /家庭/用戶/ Apache的鳳凰4.11.0-HBase的-1.3斌/鳳4.11.0- HBase-1.3-client.jar

  4. Start PySp赤貝並添加驅動程序和執行器的類路徑依賴

    pyspark --jars $ {} phoenix_jars --conf spark.executor.extraClassPath = $ {} phoenix_jars

--create ZooKeeper的URL,替換與您的羣集zookeeper法定人數,您可以從hbase站點檢查。XML

emrMaster = "ZooKeeper URL" 

df = sqlContext.read \ 
.format("org.apache.phoenix.spark") \ 
.option("table", "TableName") \ 
.option("zkUrl", emrMaster) \ 
.load() 

df.show() 
df.columns 
df.printSchema() 
df1=df.replace(['foo'], ['foo1'], 'DOMAIN') 
df1.show() 

df1.write \ 
    .format("org.apache.phoenix.spark") \ 
    .mode("overwrite") \ 
    .option("table", "TableName") \ 
    .option("zkUrl", emrMaster) \ 
    .save()