2017-02-06 144 views
0

我正在爲mappartitions創建一個函數來計算每個分區的最大值和最小值。我在pyspark中創建了這個函數,但是我不能將它成功轉換爲scala。我應用了這個函數兩次,我想在結果中運行一個zip。這是我得到的errot:Spark(scala)問題迭代器

result.zip(RES)

類型不匹配;

[error] found : org.apache.spark.rdd.RDD[(Int, Int)] 
[error] required: scala.collection.GenIterable[?] 

這裏有我的功能在Python:

def minmaxInt(iterator): 
    firsttime = 0 
    min = 0 
    max = 0 
    for x in iterator: 
    if(x!= '' and x!='NULL' and x is not None): 
     y=int(x)  
      if (firsttime == 0): 
        min = y; 
        max = y; 
        firsttime = 1 
      else: 
        if y > max: 
         max = y 
        if y < min: 
         min = y 
    return (min, max) 

在這裏,我的代碼在斯卡拉

def minmaxInt(iterator: Iterator[String]) : Iterator[(Int,Int)]={ 

    var firsttime = 0 
    var min = 0 
    var max = 0 
    var res=List[(Int,Int)]() 
    for(x <- iterator){ 
     if(x!= "" && x!= null){ 
    var y=x.toInt 

     if(firsttime == 0){ 
      min = y 
      max = y 
      firsttime = 1} 
     else{ 
      if (y > max){ 
       max = y} 
      if (y < min){ 
       min = y} 
     } 
     } 
    } 

    res.::=(min,max) 
    return res.iterator 

} 

預先感謝您

更新:

謝謝爲您快速反應!代碼很好,但我仍然遇到了zip的問題。我有兩次rdd.mapPartitions你最後的代碼,然後我執行ZIP:

[error] found : org.apache.spark.rdd.RDD[(Int, Int)] 
[error] required: scala.collection.GenIterable[?] 
[error]    result.zip(res) 
+0

缺少某些情況下 - 錯誤似乎是在這裏就不介紹了代碼(也有在你粘貼代碼'RDD's沒有引用) –

+0

這就是我正在做的,只是從csv中取出一列並應用minmax函數。 ' val file = sc.textFile(path) val split = file.map(x => x.split(「,」)) val col = split.map(x => x(0)) var結果= col.mapPartitions(minmaxInt) ......後來,同... VAR解析度= col.mapPartitions(minmaxInt) result.zip(RES) ' – Javier

+0

不添加註釋代碼 - [編輯](http://stackoverflow.com/posts/42067415/edit)該帖子添加缺少的信息 –

回答

0

這裏有一個簡單的(和更地道)實現的minMaxInt

def minMaxInt(iterator: Iterator[String]) : Iterator[(Int,Int)]= { 
    val tuple = iterator 
    .filter(_ != null).filter(!_.isEmpty) 
    .map(_.toInt).map(i => (i, i)) 
    .reduce[(Int, Int)] { case ((min, max), (i1, i2)) => (Math.min(min, i1), Math.max(max, i2)) } 

    Seq(tuple).iterator 
} 

可以應用到RDD[String]如下:

// some sample data 
def col = sc.parallelize(Seq("1", "4", "12", "3", "", null, "2")) 

// "use twice" and zip 
var result: RDD[(Int, Int)] = col.mapPartitions(minmaxInt) 
var res: RDD[(Int, Int)] = col.mapPartitions(minmaxInt) 

result.zip(res).foreach(println) 
// prints: 
// ((1,1),(1,1)) 
// ((2,2),(2,2)) 
// ((3,3),(3,3)) 
// ((4,12),(4,12)) 
+0

感謝您的輸入!正如你在更新的問題中看到的那樣,我仍然有這個問題。任何想法? – Javier

+0

首先 - 很難跟隨評論中的一些代碼和一些帖子正文中的代碼 - 請在帖子中顯示整個代碼,以便我可以自己嘗試並找到問題。其次 - 這聽起來像一個單獨的問題,與這種方法無關,所以不確定它甚至屬於這篇文章。 –

+0

看到更新的答案 - 你粘貼的代碼的作品,這個問題是在你沒有分享的代碼的某處,所以我不能再幫助.... –