從MS-SQL環境進入一個也具有火花訪問的HIVE環境。正確地嘗試使用RStudio和R(有時使用rPython的python)來替換我曾經使用過的T-SQL以及我以前從未做過的所有事情。R DBI Sparklyr DBWritetable無結果運行
爲了這個工作,我需要能夠讀取和寫回HIVE數據庫。
我已經使用火花和將R包sparklyr連接,並且可以使用R包DBI與火花連接連接到我們的HIVE簇和拉數據轉換爲R dataframes就好:
sc <- spark_connect(master = "yarn-client", spark_home="/usr/hdp/current/spark-client", config = config)
result3 <- dbGetQuery(sc, "select * from sampledb.sampletable limit 100")
上面的代碼作品每次。我也可以使用dbGetQuery在引用的sql語句的上下文中在數據庫中創建表,而不會出現問題,因此它不是寫權限問題。
然而,當我嘗試從R幀數據寫回蜂巢星團,像這樣:
dbWriteTable(conn = sc, name = "sampledb.rsparktest3", value = result3)
它運行沒有錯誤,但該表顯示不出來,我不能查詢。
如果我再次嘗試寫表我得到這個錯誤:
> dbWriteTable(conn = sc, name = "sampledb.rsparktest3", value = result3)
Error in .local(conn, name, value, ...) :
Table sampledb.rsparktest3 already exists
任何想法可能是這樣嗎?除了DBI,還有更好的方法嗎?
在此先感謝您的幫助!
下面是當我運行這些語句整個RStudio控制檯日誌:
> result3 <- dbGetQuery(sc, "select * from sampledb.sampletable limit 100")
> dbWriteTable(conn = sc, name = "sampledb.rsparktest3", value = result3)
> result3y <- dbGetQuery(sc, "select * from sampledb.rsparktest3 limit 2")
Error: org.apache.spark.sql.AnalysisException: Table not found: sampledb.rsparktest3; line 1 pos 35
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:54)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:120)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:120)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:120)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at sparklyr.Invoke$.invoke(invoke.scala:102)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
at sparklyr.StreamHandler$.read(stream.scala:62)
at sparklyr.BackendHandler.channelRead0(handler.scala:52)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
> dbWriteTable(conn = sc, name = "sampledb.rsparktest3", value = result3)
Error in .local(conn, name, value, ...) :
Table sampledb.rsparktest3 already exists
感謝埃德加,但是當我嘗試使用spark_write_table它不會接受R數據幀和犯規認識我的火花dataframes。 > spark_write_table(spark_iris,spark_iris2) 錯誤UseMethod( 「spark_write_table」): 關於 'spark_write_table' 施加到 類 「data.frame」 – wlf211
您好對象沒有適用的方法,該呼叫應該是這樣的: 'spark_write_table(spark_iris,「hive_iris」)'。第一個參數'x'應該是一個Spark DF,第二個參數是Hive中的表名 – edgararuiz