我正在爲mappartitions創建一個函數來計算每個分區的最大值和最小值。我在pyspark中創建了這個函數,但是我不能將它成功轉換爲scala。我應用了這個函數兩次,我想在結果中運行一個zip。這是我得到的errot:Spark(scala)問題迭代器
result.zip(RES)
類型不匹配;
[error] found : org.apache.spark.rdd.RDD[(Int, Int)]
[error] required: scala.collection.GenIterable[?]
這裏有我的功能在Python:
def minmaxInt(iterator):
firsttime = 0
min = 0
max = 0
for x in iterator:
if(x!= '' and x!='NULL' and x is not None):
y=int(x)
if (firsttime == 0):
min = y;
max = y;
firsttime = 1
else:
if y > max:
max = y
if y < min:
min = y
return (min, max)
在這裏,我的代碼在斯卡拉
def minmaxInt(iterator: Iterator[String]) : Iterator[(Int,Int)]={
var firsttime = 0
var min = 0
var max = 0
var res=List[(Int,Int)]()
for(x <- iterator){
if(x!= "" && x!= null){
var y=x.toInt
if(firsttime == 0){
min = y
max = y
firsttime = 1}
else{
if (y > max){
max = y}
if (y < min){
min = y}
}
}
}
res.::=(min,max)
return res.iterator
}
預先感謝您
更新:
謝謝爲您快速反應!代碼很好,但我仍然遇到了zip的問題。我有兩次rdd.mapPartitions你最後的代碼,然後我執行ZIP:
[error] found : org.apache.spark.rdd.RDD[(Int, Int)]
[error] required: scala.collection.GenIterable[?]
[error] result.zip(res)
缺少某些情況下 - 錯誤似乎是在這裏就不介紹了代碼(也有在你粘貼代碼'RDD's沒有引用) –
這就是我正在做的,只是從csv中取出一列並應用minmax函數。 ' val file = sc.textFile(path) val split = file.map(x => x.split(「,」)) val col = split.map(x => x(0)) var結果= col.mapPartitions(minmaxInt) ......後來,同... VAR解析度= col.mapPartitions(minmaxInt) result.zip(RES) ' – Javier
不添加註釋代碼 - [編輯](http://stackoverflow.com/posts/42067415/edit)該帖子添加缺少的信息 –