讓我們假設我們有一個這樣的數字列表:Spark:找到缺失號碼的程序
lst = [1,2,4,5,9,10]
我該如何編寫Spark程序來找出該列表中缺失的數字。該計劃應返回:3,6,7,8。
我試着用蓄電池,力氣鍛煉出來。
讓我們假設我們有一個這樣的數字列表:Spark:找到缺失號碼的程序
lst = [1,2,4,5,9,10]
我該如何編寫Spark程序來找出該列表中缺失的數字。該計劃應返回:3,6,7,8。
我試着用蓄電池,力氣鍛煉出來。
如果您不太擔心擁有最佳解決方案,一種方法是首先廣播您擁有的數據,然後並行化包含所有元素的集合並根據廣播的數據進行過濾。
喜歡的東西
lst = [1,2,4,5,9,10]
broadcastVar = sc.broadcast(lst)
all_elems = sc.parallelize([i+1 for i in range(10)])
all_elems.filter(lambda x: x not in broadcastVar.value)
如果你正在尋找的東西,只是少量數據的工作,那麼這是罰款。如果你有很多數據,那麼這種方法是不好的,不應該使用。
如果需要一個更好的解決方案,然後我會做以下
然後,您可以編寫結果或收集或任何你想要做的事情。有一點需要注意的是,例如,如果我使用了5個執行者,那麼這些密鑰會是1-2,3-4,5-6,7-8,9-10,關鍵字7-8不會,沒有任何元素。爲了避免這種情況,可以將組之前的rdd按鍵與[(1-2,-1),(3-4,-1),(5-6,-1),(7-8, -1),(9-10,-1)]。如果你有很多數據,那麼與整個工作相比,這個數據所增加的開銷是非常小的。
這個樣本代碼有很多錯誤,但將其視爲概念驗證。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.spark_project.guava.collect.Lists;
import scala.Tuple2;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("spark-missing-nr").master("local[*]").getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Integer[] lst = new Integer[] { 1, 2, 4, 5, 9, 10 };
JavaRDD<Integer> lstRDD = sc.parallelize(Arrays.asList(lst));
// Partition the data by whether number is smaller/equal or larger than
// 5
JavaPairRDD<String, Integer> groupableRDD = lstRDD.mapToPair(i -> {
String group = i <= 5 ? "1-5" : "6-10";
return new Tuple2<String, Integer>(group, i);
});
// Group by key
JavaPairRDD<String, Iterable<Integer>> groupedRDD = groupableRDD.groupByKey();
// so now we have [(1-5,[1, 2, 4, 5]), (6-10,[9, 10])]
System.out.println(groupedRDD.collect());
// map where you iterate over range specified by key
JavaRDD<List<Integer>> missingValuesLists = groupedRDD.map(t -> {
Integer from = new Integer(t._1().split("-")[0]);
Integer to = new Integer(t._1().split("-")[1]);
List<Integer> valuesList = Lists.newArrayList(t._2());
List<Integer> missingValues = new ArrayList<Integer>();
// iterate over range specified by key
for (int i = from; i < to + 1; i++) {
if (!valuesList.contains(i)) {
missingValues.add(i);
}
}
return missingValues;
});
// outputs [[3], [6, 7, 8]]
System.out.println(missingValuesLists.collect());
sc.close();
}
}
你可以嘗試用全系列創建RDD,使用sc.range
,然後使用subtract
功能:
lst = sc.parallelize([1,2,4,5,9,10])
max_value = lst.max()
full_data = sc.range(1, max_value)
missing_values = full_data.subtract(lst)
你能避免調用max()
,如果你知道的完整列表的事先大小。
@Mrinal你嘗試過這種方法嗎? –
對不起丹尼爾回覆(我一直很忙)。我嘗試了它,但它工作正常,但如果我們正在處理數十億個數字,它就不能被認爲是最佳解決方案,我們不能再有一個龐大的數據清單來處理。無論如何感謝解決方案,我喜歡減法部分:)這是最簡單的方法。 – Mrinal
可以分享您的解決方案,這是不工作,你到目前爲止嘗試過。 –
對於在計算每一行時依賴於查看其他行的問題,Spark不是最佳選擇。當你可以處理每個項目而不依賴其他項目時,Spark是最好的,所以它可以高效地並行化。 –
@丹尼爾,我也很清楚,但我被要求在採訪中實施。我告訴他們帶有蓄電池的解決方案,他們接受了。後來我嘗試在家中實現它,但它並不奏效,因爲累加器只能用於通過更新操作關聯的不同任務更新值,而不能訪問該值。 – Mrinal