2016-04-13 98 views
2

我需要在foreach循環內用各種SELECT語句迭代DF的內容,將輸出寫入文本文件。 foreach循環內的任何SELECT語句都會返回NullPointerException。我無法明白這是爲什麼。 「for」循環中的SELECT語句不會返回此錯誤。Spark scala:在foreach循環中返回SELECT返回java.lang.NullPointerException

這是測試案例。

// step 1 of 6: create the table and load two rows 
vc.sql(s"""CREATE TEMPORARY TABLE TEST1 (
c1  varchar(4) 
,username varchar(5) 
,numeric integer) USING com.databricks.spark.csv OPTIONS (path "/tmp/test.txt")""") 

// step 2 of 6: confirm that the data is queryable 
vc.sql("SELECT * FROM TEST1").show() 
+----+--------+-------+ 
| c1|username|numeric| 
+----+--------+-------+ 
|col1| USER1|  0| 
|col1| USER2|  1| 
+----+--------+-------+ 

// Step 3 of 6: create a dataframe for the table 
var df=vc.sql("""SELECT * FROM TEST1""") 


// step 4 of 6: create a second dataframe that we will use as a loop iterator 
var df_usernames=vc.sql(s"""SELECT DISTINCT username FROM TEST1 """) 

// step 5 of 6: first foreach loop works ok: 
df_usernames.foreach(t => 
    { 
     println("(The First foreach works ok: loop iterator t is " + t(0).toString()) 
    } 
) 
(The First foreach works ok: loop iterator t is USER1 
(The First foreach works ok: loop iterator t is USER2 

// step 6 of 6: second foreach with any embedded SQL returns an error 
df_usernames.foreach(t => 
    { 
     println("(The second foreach dies: loop iterator t is " +  t(0).toString()) 
     vc.sql("""SELECT c1 FROM TEST1""").show() 
    } 
)  
The second foreach dies: loop iterator t is USER1 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 158  in stage 94.0 failed 1 times, most recent failure: Lost task 158.0 in stage 94.0 (TID 3525, localhost): java.lang.NullPointerException 
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195) 

回答

1

不能這樣做。無需調用收集第一

>>> df_usernames.collect.foreach(
... lambda x: sqlContext.sql("""SELECT c1 FROM TEST1""").show()) 
+1

這是不一樣的foreach思想爲OP是使用一個,它是不是一個好的做法,以收集不知道基數和數據的大小無法啓動SQL查詢裏面的foreach。如果你可以說每個例子有2M個用戶,它將不會擴展。 – eliasah

+0

有沒有辦法實現這一點,而不使用收集?對於RDD中的每個「行」,我需要與現有數據(我可以從SparkSession.sql加載)進行比較。 – KangarooWest