2017-07-23 66 views
1

我可以在這兩個表中設置下面的spark v。1.6.0連接並加載。如何通過R來阻止Spark 1.6中的反連接?

library(sparklyr) 
library(dplyr) 

sc <- spark_connect(master   = "local", 
        version  = "1.6.0", 
        hadoop_version = 2.4) 

iris_r <- iris %>% mutate(id = row_number()) 
mtcars_r <- mtcars %>% mutate(id = row_number()) 

iris_db <- copy_to(sc, iris_r, name = "iris_db", overwrite = TRUE) 
mtcars_db <- copy_to(sc, mtcars_r, name = "mtcars_db", overwrite = TRUE) 

df <- iris_db %>% anti_join(mtcars_db, by = "id") 

df 

但是,當我嘗試看看或收集DF,我得到以下錯誤,

Error: org.apache.spark.sql.AnalysisException: 
Unsupported language features in query: SELECT * FROM `iris_db` AS `TBL_LEFT` 

WHERE NOT EXISTS (
    SELECT 1 FROM `mtcars_db` AS `TBL_RIGHT` 
    WHERE (`TBL_LEFT`.`id` = `TBL_RIGHT`.`id`) 
) 
TOK_QUERY 1, 0,51, 14 
    TOK_FROM 1, 4,10, 14 
    TOK_TABREF 1, 6,10, 14 
     TOK_TABNAME 1, 6,6, 14 
     iris_db 1, 6,6, 14 
     TBL_LEFT 1, 10,10, 27 
    TOK_INSERT 0, -1,51, 0 
    TOK_DESTINATION 0, -1,-1, 0 
     TOK_DIR 0, -1,-1, 0 
     TOK_TMP_FILE 0, -1,-1, 0 
    TOK_SELECT 0, 0,2, 0 
     TOK_SELEXPR 0, 2,2, 0 
     TOK_ALLCOLREF 0, 2,2, 0 
    TOK_WHERE 3, 13,51, 6 
     NOT 3, 15,51, 6 
     TOK_SUBQUERY_EXPR 3, 17,51, 10 
      TOK_SUBQUERY_OP 3, 17,17, 10 
      EXISTS 3, 17,17, 10 
      TOK_QUERY 4, 19,51, 16 
      TOK_FROM 4, 27,33, 16 
       TOK_TABREF 4, 29,33, 16 
       TOK_TABNAME 4, 29,29, 16 
        mtcars_db 4, 29,29, 16 
       TBL_RIGHT 4, 33,33, 31 
      TOK_INSERT 0, -1,49, 0 
       TOK_DESTINATION 0, -1,-1, 0 
       TOK_DIR 0, -1,-1, 0 
        TOK_TMP_FILE 0, -1,-1, 0 
       TOK_SELECT 4, 23,25, 9 
       TOK_SELEXPR 4, 25,25, 9 
        1 4, 25,25, 9 
       TOK_WHERE 5, 37,49, 25 
       = 5, 39,49, 25 
        . 5, 40,42, 19 
        TOK_TABLE_OR_COL 5, 40,40, 9 
         TBL_LEFT 5, 40,40, 9 
        id 5, 42,42, 20 
        . 5, 46,48, 38 
        TOK_TABLE_OR_COL 5, 46,46, 27 
         TBL_RIGHT 5, 46,46, 27 
        id 5, 48,48, 39 

scala.NotImplementedError: No parse rules for ASTNode type: 864, text: TOK_SUBQUERY_EXPR : 
TOK_SUBQUERY_EXPR 3, 17,51, 10 
    TOK_SUBQUERY_OP 3, 17,17, 10 
    EXISTS 3, 17,17, 10 
    TOK_QUERY 4, 19,51, 16 
    TOK_FROM 4, 27,33, 16 
     TOK_TABREF 4, 29,33, 16 
     TOK_TABNAME 4, 29,29, 16 
      mtcars_db 4, 29,29, 16 
     TBL_RIGHT 4, 33,33, 31 
    TOK_INSERT 0, -1,49, 0 
     TOK_DESTINATION 0, -1,-1, 0 
     TOK_DIR 0, -1,-1, 0 
      TOK_TMP_FILE 0, -1,-1, 0 
     TOK_SELECT 4, 23,25, 9 
     TOK_SELEXPR 4, 25,25, 9 
      1 4, 25,25, 9 
     TOK_WHERE 5, 37,49, 25 
     = 5, 39,49, 25 
      . 5, 40,42, 19 
      TOK_TABLE_OR_COL 5, 40,40, 9 
       TBL_LEFT 5, 40,40, 9 
      id 5, 42,42, 20 
      . 5, 46,48, 38 
      TOK_TABLE_OR_COL 5, 46,46, 27 
       TBL_RIGHT 5, 46,46, 27 
      id 5, 48,48, 39 
" + 

org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1721) 
      ; 
    at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:326) 
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) 
    at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) 
    at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) 
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) 
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) 
    at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) 
    at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34) 
    at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295) 
    at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66) 
    at org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66) 
    at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:279) 
    at org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:226) 
    at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:225) 
    at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:268) 
    at org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65) 
    at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211) 
    at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211) 
    at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114) 
    at org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) 
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) 
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) 
    at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) 
    at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) 
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) 
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) 
    at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) 
    at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) 
    at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34) 
    at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208) 
    at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208) 
    at org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43) 
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231) 
    at org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:331) 
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817) 
    at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at sparklyr.Invoke$.invoke(invoke.scala:102) 
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97) 
    at sparklyr.StreamHandler$.read(stream.scala:62) 
    at sparklyr.BackendHandler.channelRead0(handler.scala:52) 
    at sparklyr.BackendHandler.channelRead0(handler.scala:14) 
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) 
    at io.netty.channel.AbstractChann 

這個錯誤消失,如果我切換到星火2.0.1。假設這個問題我被迫使用1.6。是否有支持的方式來執行此連接?

回答

1

LEFT ANTI JOIN可以FULL OUTER JOIN其次是選擇代替:

df1 <- copy_to(sc, 
    data.frame(id=c(1, 2, 3), x=c("a", "b", "c")), 
    name="df1", overwrite=TRUE) 

df2 <- copy_to(sc, 
    data.frame(id=c(1, 3), y=c(2, -2)), 
    name="df2", overwrite=TRUE) 

df1 %>% 
    full_join(df2 %>% mutate(id_ = id), by="id") %>% 
    filter(is.null(id_)) %>% 
    select(one_of(colnames(df1))) 

如果有,你就必須糾正的是還有重複的列名。

請注意,你不應該:

  • 使用row_number()產生全局ID - 它不能擴展ANS不能提供所需的正確性保證。
  • 在Spark數據幀上使用copy_to。它將數據收集到本地節點,因此不適用於大型數據集。
+0

感謝您的其他建議,只是包含'row_number'和'copy_to'步驟才能使該示例具有可重現性。 –

+0

這最初不適用於我的數據集 - 我的'id_'變量是類'dbl',導致它在連接後被表示爲'-9.223372e + 18'而不是'NaN' //github.com/rstudio/sparklyr/issues/606)。但它在'as.integer(id_)'上工作 – jay

相關問題