我有Spark Notebook,Spark,Accumulo 1.6和Hadoop都在運行的Vagrant圖像。從筆記本電腦,我可以手動創建一個掃描儀,並從我創建使用的Accumulo一個例子表拉力測試數據:如何在Spark-notebook中從Accumulo 1.6創建Spark RDD?
val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector("root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)
scanner.setRange(new Range("row_0000000000", "row_0000000010"))
for(entry: Entry[Key, Value] <- scanner) {
println(entry.getKey + " is " + entry.getValue)
}
會給前十行表中的數據。
當我嘗試正是如此創建RDD:
val rdd2 =
sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
我得到一個RDD還給我,我不能做與因以下錯誤:
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.count(RDD.scala:927)
這完全因爲我沒有指定任何參數來連接哪個表,認證是什麼等等這一事實是有意義的。
所以我的問題是:我該怎麼做需要從這裏開始將前十行表數據存入我的RDD中?
更新一個 還不行,但我確實發現了一些東西。原來,有兩個幾乎相同的包,
org.apache.accumulo.core.client.mapreduce
&
org.apache.accumulo.core.client.mapred
都具有幾乎相同的成員,除了一個事實,即一些方法簽名是不同的。不知道爲什麼兩者都存在,因爲沒有我可以看到的貶低通知。我試圖以不愉快的方式實現Sietse的回答。下面是我做什麼,並且響應:
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
val rdd2 =
sparkContext.hadoopRDD (
jobConf,
classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value],
1
)
rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = HadoopRDD[1] at hadoopRDD at :62
rdd2.first
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308) at org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) at...
*編輯2 *
重:霍頓的答案 - 仍然沒有喜悅:
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
InputFormatBase.setInputTableName(jobConf, "batchtest1")
val rddX = sparkContext.newAPIHadoopRDD(
jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] at newAPIHadoopRDD at :58
Out[15]: NewHadoopRDD[0] at newAPIHadoopRDD at :58
rddX.first
java.io.IOException: Input info has not been set. at org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) at org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.RDD.take(RDD.scala:1077) at org.apache.spark.rdd.RDD.first(RDD.scala:1110) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) at
編輯3 - 進步!
我能弄清楚爲什麼'input INFO not set'錯誤發生。眼尖的你之間無疑會看到下面的代碼缺少結束「(」
AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password")
爲我做這個火花的筆記本電腦,我一直點擊執行按鈕和移動上,因爲我沒有看到錯誤。我忘記的是,當你離開關閉'''時,筆記本將做火花外殼將會做的事情 - 它會永久等待你添加它。所以錯誤是'setConnectorInfo'方法永遠不會被執行的結果。
不幸的是,我仍然無法將accumulo表數據推送到可用於我的RDD中。當我執行
rddX.count
我回來
res15: Long = 10000
這是正確的反應 - 有10000行中我指着表中的數據。然而,當我試圖抓住正是如此數據的第一個元素:
rddX.first
我得到以下錯誤:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.accumulo.core.data.Key
在何處何去何從有什麼想法?
編輯4-成功!
接受的答案+評論是那裏的90% - 除了accumulo鍵/值需要被轉換成可序列化的事實。通過調用兩者的.toString()方法,我得到了這個工作。我會盡快發佈一些完整的工作代碼,以防其他人遇到同樣的問題。
大衛,你只是想知道一件快事(因爲我還不知道accumulo ^^)。你已經在火星殼裏試過這種東西了嗎?所以,我會知道這是一個火花筆記本問題還是不是:-D。如果它是一個累積的東西,我可以用@lossyrob看到在Geotrellis中使用Accumulo和Spark – 2015-03-25 12:49:36
@andypetrella我還沒有在spark-shell中試過這個,因爲 - 我認爲 - spark-notebook只是傳遞我的命令來激發和回到我身上從火花中回來的東西(你會比我更清楚)。我會說,當我嘗試按照accumulo文檔9.1.2節中的說明操作時,我得到了「Job job = new Job(getConf(java.lang.Object)」中的「java.lang.IllegalStateException:Job in state DEFINE instead of RUNNING」 ))「或'我不知道什麼getConf()」是消息,這取決於我如何設置。 – 2015-03-25 17:14:53
我在這裏看到http://pastebin.com/ti7Qz19m這個人是按照accumulo文件 - 但我不能從它得到任何牽引力。 – 2015-03-25 17:17:12