2017-07-31 36 views
0

請注意:雖然這個問題提到Spark(2.1)我認爲這實際上是一個Scala(2.11)的問題,任何精通Scala開發人員將能夠回答它!Spark/Scala迭代器無法分配在foreach循環外定義的變量


我有下面的代碼,創建一個火花Dataset(基本上是二維表)和迭代它逐行。如果某行的username列有「fizzbuzz」的值,那麼我想設置的迭代器之外定義一個變量並使用該變量的行迭代完成後:

val myDataset = sqlContext 
    .read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "mytable", "keyspace" -> "mykeyspace")) 
    .load() 

var foobar : String 
myDataset.collect().foreach(rec => 
    if(rec.getAs("username") == "fizzbuzz") { 
    foobar = rec.getAs("foobarval") 
    } 
) 

if(foobar == null) { 
    throw new Exception("The fizzbuzz user was not found.") 
} 

當我運行此我得到但以下情況除外:

error: class $iw needs to be abstract, since: 
it has 2 unimplemented members. 
/** As seen from class $iw, the missing signatures are as follows. 
* For convenience, these are usable as stub implementations. 
*/ 
    def foobar=(x$1: String): Unit = ??? 

class $iw extends Serializable { 
    ^

有什麼特別的原因,我得到這個?

回答

2

foobar變量應該被初始化:

var foobar: String = null 

而且這看起來不正確:

foobar = rec.getAs("foobarval") 

,應該是:

foobar = rec.getAs[String]("foobarval") 

整體而言,這是沒有辦法的辦法去。它根本不受益於Spark執行模型。我會過濾並取而代之:

myDataset.filter($"username" === "fizzbuzz").select("foobarval").take(1) 
3

在方法或非抽象類中,您必須爲每個變量定義一個值;在這裏,你離開foobar未定義。事情會按預期工作,如果你把它定義爲具有null初步值:

var foobar: String = null 

:請注意,你的代碼是兩個非慣用(不繼Scala和星火的最佳實踐)和潛在的風險/慢:

  • 你應該避免可變值,如foobar - 不變的代碼更容易推理和真的讓你把Scala的強大功能
  • 你應該避免調用collectØ n DataFrame,除非您確定它非常小,因爲collect會將工作節點(其中可能有很多節點)的所有數據收集到單個驅動程序節點中,這會很慢並且可能導致OutOfMemoryError
  • null不鼓勵使用(因爲它往往會導致意外NullPointerException S)

一個驗證碼的更地道的版本將使用DataFrame.filter過濾相關記錄,並可能Option正確代表潛在的空值,是這樣的:

import spark.implicits._ 

val foobar: Option[String] = myDataset 
    .filter($"username" === "fizzbuzz") // filter only relevant records 
    .take(1) // get first 1 record (if it exists) as an Array[Row] 
    .headOption // get the first item in the array, or None 
    .map(r => r.getAs[String]("foobarval")) // get the value of the column "foobarval", or None 

if (foobar.isEmpty) { 
    throw new Exception("The fizzbuzz user was not found.") 
} 
0

你或許應該使用的過濾器,並選擇您的數據框:

import spark.sqlContext.implicits._ 

val data = spark.sparkContext.parallelize(List(
    """{ "username": "none", "foobarval":"none" }""", 
    """{ "username": "fizzbuzz", "foobarval":"expectedval" }""")) 

val df = spark.read.json(data) 
val foobar = df.filter($"username" === "fizzbuzz").select($"foobarval").collect.head.getString(0)