2015-11-13 25 views
1

在spark-java程序中,我需要讀取一個配置文件並填充一個HashMap,我需要將其作爲廣播變量發佈,以便它可以跨所有datanode使用。Spark程序中的廣播變量發佈

我需要在將要在datanode中運行的CustomInputFormat類中獲取此廣播變量的值。如何在我的CustomInputFormat類中指定從特定的廣播變量中獲取值,因爲廣播變量是在我的驅動程序中聲明的?

我加入一些代碼來解釋它在更多:

在這種scenario1我使用它在驅動程序本身,即該變量在相同的類中使用:在這裏,我可以使用Broadcat.value()方法

> final Broadcast<String[]> signPrefixes = 
> sc.broadcast(loadCallSignTable()); 
>  JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(
>  new PairFunction<Tuple2<String, Integer>, String, Integer>(){ 
>   public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) { 
>   String sign = callSignCount._1(); 
>   String country = lookupCountry(sign, signPrefixes.value()); 
>   return new Tuple2(country, callSignCount._2()); 
>   }}).reduceByKey(new SumInts()); 

在方案2中,我將用我的自定義輸入格式類的內部廣播變量:

驅動程序:

> final JavaSparkContext sc= new 
> JavaSparkContext(sConf.setAppName("ParserSpark").setMaster("yarn-cluster")); 
> Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3}); 
> 
> JavaPairRDD<NullWritable, ArrayList<Record>> baseRDD = 
> sc.newAPIHadoopFile(args[2], InputFormat.class, NullWritable.class, 
> ArrayList.class, conf); 

InputFormat.class

> public class InputFormat extends FileInputFormat { 
> 
> @Override public RecordReader<NullWritable, ArrayList<Record>> 
> createRecordReader(InputSplit split,   TaskAttemptContext context) 
> throws IOException,   InterruptedException{ 
>  //I want to get the Broadcast Variable Here -- How will I do it 
>  
>   RecordReader reader = new RecordReader();   reader.initialize(split, context);  return reader; } @Override 
> protected boolean isSplitable(JobContext context, Path file) { 
>  return false; } } 
+0

我需要這個在驅動程序以外的其他Java類中廣播值。 – Harisyam

+0

您是否設法在此期間解決這個問題? – Havnar

回答

1

我最近碰到了這個自己。實際上結果相當簡單(幾小時後,然後一個哈!)

創建一個新的配置,設置您的變量,並將其傳遞給一個稍微不同的實現newAPIHadoopFile函數。

從驅動程序(這裏使用的Scala):

val myConf = new Configuration(); 
    myConf.set("var1", v1) 
    myConf.set("var2", v2) 
    myConf.set("var3", v3) 

val yourFile = sc.newAPIHadoopFile("yourFilePath", classOf[MyFileInputFormat],classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.DoubleWritable],myConf) 

從您的InputFormat或InputReader..or只要您有一個上下文(Java的這段時間)

context.getConfiguration().get("var1"); 

或者

job.getConfiguration().get("var2"); 
1

您可以創建在驅動器W/val bcVariable = sc.broadcast(myVariableToBroadcast)廣播var和日後訪問瓦特/ bcVariable.value