在spark-java程序中,我需要讀取一個配置文件並填充一個HashMap,我需要將其作爲廣播變量發佈,以便它可以跨所有datanode使用。Spark程序中的廣播變量發佈
我需要在將要在datanode中運行的CustomInputFormat類中獲取此廣播變量的值。如何在我的CustomInputFormat類中指定從特定的廣播變量中獲取值,因爲廣播變量是在我的驅動程序中聲明的?
我加入一些代碼來解釋它在更多:
在這種scenario1我使用它在驅動程序本身,即該變量在相同的類中使用:在這裏,我可以使用Broadcat.value()方法
> final Broadcast<String[]> signPrefixes =
> sc.broadcast(loadCallSignTable());
> JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
> new PairFunction<Tuple2<String, Integer>, String, Integer>(){
> public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
> String sign = callSignCount._1();
> String country = lookupCountry(sign, signPrefixes.value());
> return new Tuple2(country, callSignCount._2());
> }}).reduceByKey(new SumInts());
在方案2中,我將用我的自定義輸入格式類的內部廣播變量:
驅動程序:
> final JavaSparkContext sc= new
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster"));
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
>
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD =
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class,
> ArrayList.class, conf);
個
InputFormat.class
> public class InputFormat extends FileInputFormat {
>
> @Override public RecordReader<NullWritable, ArrayList<Record>>
> createRecordReader(InputSplit split, TaskAttemptContext context)
> throws IOException, InterruptedException{
> //I want to get the Broadcast Variable Here -- How will I do it
>
> RecordReader reader = new RecordReader(); reader.initialize(split, context); return reader; } @Override
> protected boolean isSplitable(JobContext context, Path file) {
> return false; } }
我需要這個在驅動程序以外的其他Java類中廣播值。 – Harisyam
您是否設法在此期間解決這個問題? – Havnar