2016-12-16 65 views
1

讓我們假設我們有一個這樣的數字列表:Spark:找到缺失號碼的程序

lst = [1,2,4,5,9,10] 

我該如何編寫Spark程序來找出該列表中缺失的數字。該計劃應返回:3,6,7,8。

我試着用蓄電池,力氣鍛煉出來。

+0

可以分享您的解決方案,這是不工作,你到目前爲止嘗試過。 –

+0

對於在計算每一行時依賴於查看其他行的問題,Spark不是最佳選擇。當你可以處理每個項目而不依賴其他項目時,Spark是最好的,所以它可以高效地並行化。 –

+0

@丹尼爾,我也很清楚,但我被要求在採訪中實施。我告訴他們帶有蓄電池的解決方案,他們接受了。後來我嘗試在家中實現它,但它並不奏效,因爲累加器只能用於通過更新操作關聯的不同任務更新值,而不能訪問該值。 – Mrinal

回答

0

如果您不太擔心擁有最佳解決方案,一種方法是首先廣播您擁有的數據,然後並行化包含所有元素的集合並根據廣播的數據進行過濾。

喜歡的東西

lst = [1,2,4,5,9,10] 
broadcastVar = sc.broadcast(lst) 

all_elems = sc.parallelize([i+1 for i in range(10)]) 
all_elems.filter(lambda x: x not in broadcastVar.value) 

如果你正在尋找的東西,只是少量數據的工作,那麼這是罰款。如果你有很多數據,那麼這種方法是不好的,不應該使用。

如果需要一個更好的解決方案,然後我會做以下

  1. 本質上對數據進行分區,使用RDDS你可以做一個映射輸出(分區之前,數)。您可以編寫一個小函數來獲取每個數字的分區號。因此,舉例來說,如果您在此地圖之後有2個執行者,您將擁有[(1-5,1),(1-5,2),(1-5,4),(1-5,5),( 6-10,9),(6-10,10)]
  2. 按鍵分組,現在我們有[(1-5,[1,2,4,5]),(6-10,[9 ,10])]
  3. 映射您遍歷key指定的範圍,與值中的元素進行比較並返回不存在的元素列表。

然後,您可以編寫結果或收集或任何你想要做的事情。有一點需要注意的是,例如,如果我使用了5個執行者,那麼這些密鑰會是1-2,3-4,5-6,7-8,9-10,關鍵字7-8不會,沒有任何元素。爲了避免這種情況,可以將組之前的rdd按鍵與[(1-2,-1),(3-4,-1),(5-6,-1),(7-8, -1),(9-10,-1)]。如果你有很多數據,那麼與整個工作相比,這個數據所增加的開銷是非常小的。

這個樣本代碼有很多錯誤,但將其視爲概念驗證。

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.sql.SparkSession; 
import org.spark_project.guava.collect.Lists; 

import scala.Tuple2; 

public class Main { 

public static void main(String[] args) { 

    SparkSession spark = SparkSession.builder().appName("spark-missing-nr").master("local[*]").getOrCreate(); 
    JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); 
    Integer[] lst = new Integer[] { 1, 2, 4, 5, 9, 10 }; 
    JavaRDD<Integer> lstRDD = sc.parallelize(Arrays.asList(lst)); 

    // Partition the data by whether number is smaller/equal or larger than 
    // 5 
    JavaPairRDD<String, Integer> groupableRDD = lstRDD.mapToPair(i -> { 
     String group = i <= 5 ? "1-5" : "6-10"; 
     return new Tuple2<String, Integer>(group, i); 
    }); 
    // Group by key 
    JavaPairRDD<String, Iterable<Integer>> groupedRDD = groupableRDD.groupByKey(); 

    // so now we have [(1-5,[1, 2, 4, 5]), (6-10,[9, 10])] 
    System.out.println(groupedRDD.collect()); 

    // map where you iterate over range specified by key 
    JavaRDD<List<Integer>> missingValuesLists = groupedRDD.map(t -> { 
     Integer from = new Integer(t._1().split("-")[0]); 
     Integer to = new Integer(t._1().split("-")[1]); 

     List<Integer> valuesList = Lists.newArrayList(t._2()); 
     List<Integer> missingValues = new ArrayList<Integer>(); 

     // iterate over range specified by key 
     for (int i = from; i < to + 1; i++) { 
      if (!valuesList.contains(i)) { 
       missingValues.add(i); 
      } 
     } 
     return missingValues; 
    }); 
    // outputs [[3], [6, 7, 8]] 
    System.out.println(missingValuesLists.collect()); 
    sc.close(); 
} 
} 
+0

恐怕我必須用一個有大約十億個數字的列表來做。 – Mrinal

+0

嗨Ossu54,你可以請提供代碼示例,如果可能的話? – Mrinal

+0

我在Java中添加了一些樣例代碼,希望沒關係。 – oh54

0

你可以嘗試用全系列創建RDD,使用sc.range,然後使用subtract功能:

lst = sc.parallelize([1,2,4,5,9,10]) 
max_value = lst.max() 
full_data = sc.range(1, max_value) 
missing_values = full_data.subtract(lst) 

你能避免調用max(),如果你知道的完整列表的事先大小。

+0

@Mrinal你嘗試過這種方法嗎? –

+0

對不起丹尼爾回覆(我一直很忙)。我嘗試了它,但它工作正常,但如果我們正在處理數十億個數字,它就不能被認爲是最佳解決方案,我們不能再有一個龐大的數據清單來處理。無論如何感謝解決方案,我喜歡減法部分:)這是最簡單的方法。 – Mrinal