2016-05-12 69 views
0

我必須在reduce side join算法中使用bloom過濾器來過濾我的一個輸入,但是我對函數readFields產生了一個問題,它會對分佈式緩存(bloom過濾器)的輸入流進行反序列化,進入布隆過濾器。Bloom Filter in MapReduce

public class BloomJoin { 

    //function map : input transaction.txt 
    public static class TransactionJoin extends 
      Mapper<LongWritable, Text, Text, Text> { 

     private Text CID=new Text(); 
     private Text outValue=new Text(); 

     public void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 

      String line = value.toString(); 
       String record[] = line.split(",", -1); 
       CID.set(record[1]); 

       outValue.set("A"+value); 
       context.write(CID, outValue); 
       } 
     } 
    //function map : input customer.txt 
      public static class CustomerJoinMapper extends 
        Mapper<LongWritable, Text, Text, Text> { 

       private Text outkey=new Text(); 
       private Text outvalue = new Text(); 
       private BloomFilter bfilter = new BloomFilter(); 
       public void setup(Context context) throws IOException { 

        URI[] files = DistributedCache 
          .getCacheFiles(context.getConfiguration()); 

        // if the files in the distributed cache are set 
        if (files != null) { 
        System.out.println("Reading Bloom filter from: " 
        + files[0].getPath()); 
        // Open local file for read. 

        DataInputStream strm = new DataInputStream(new FileInputStream(
        files[0].toString())); 
        bfilter.readFields(strm); 
        strm.close(); 

        // Read into our Bloom filter. 

        } else { 
        throw new IOException(
        "Bloom filter file not set in the DistributedCache."); 
        } 
       }; 

       public void map(LongWritable key, Text value, Context context) 
         throws IOException, InterruptedException { 
        String line = value.toString(); 
        String record[] = line.split(",", -1); 

         outkey.set(record[0]); 
         if (bfilter.membershipTest(new Key(outkey.getBytes()))) { 
         outvalue.set("B"+value); 
         context.write(outkey, outvalue); 
         } 
      } 
      } 

    //function reducer: join customer with transaction 
    public static class JoinReducer extends 
      Reducer<Text, Text, Text, Text> { 

     private ArrayList<Text> listA = new ArrayList<Text>(); 
     private ArrayList<Text> listB = new ArrayList<Text>(); 


     @Override 
     public void reduce(Text key, Iterable<Text> values, Context context) 
       throws IOException, InterruptedException { 

      listA.clear(); 
      listB.clear(); 

        for (Text t : values) { 
       if (t.charAt(0) == 'A') { 
        listA.add(new Text(t.toString().substring(1))); 
        System.out.println("liste A: "+listA); 
       } else /* if (t.charAt('0') == 'B') */{ 
        listB.add(new Text(t.toString().substring(1))); 
        System.out.println("listeB :"+listB); 
       } 
      } 

      executeJoinLogic(context); 
     } 

     private void executeJoinLogic(Context context) throws IOException, 
       InterruptedException { 
       if (!listA.isEmpty() && !listB.isEmpty()) { 
        for (Text A : listB) { 
         for (Text B : listA) { 
          context.write(A, B); 
          System.out.println("A="+A+",B="+B); 
         } 
        } 
       } 

     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 
     Path bloompath=new Path("/user/biadmin/ezzaki/bloomfilter/output/part-00000"); 
     DistributedCache.addCacheFile(bloompath.toUri(),conf); 
     Job job = new Job(conf, "Bloom Join"); 
     job.setJarByClass(BloomJoin.class); 
     String[] otherArgs = new GenericOptionsParser(conf, args) 
     .getRemainingArgs(); 
     if (otherArgs.length != 3) { 
    System.err 
      .println("ReduceSideJoin <Transaction data> <Customer data> <out> "); 
    System.exit(1); 
            } 
     MultipleInputs.addInputPath(job, new Path(otherArgs[0]), 
       TextInputFormat.class,TransactionJoin.class); 
     MultipleInputs.addInputPath(job, new Path(otherArgs[1]), 
       TextInputFormat.class, CustomerJoinMapper.class); 

     job.setReducerClass(JoinReducer.class); 

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 
     //job.setMapOutputKeyClass(Text.class); 
     //job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     System.exit(job.waitForCompletion(true) ? 0 : 3); 
    } 
} 

我怎樣才能解決這個問題?

+0

請將錯誤代碼和異常添加到您的問題中... –

+0

如果它沒有出現,我添加了我的類的代碼和錯誤圖像:java.io.EOFException:在java.io. DataInputStream.readInt/at org.apache.hadoop.util.bloom.Filter.readFields/at org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso

回答

1

你可以嘗試改變

URI[] files = DistributedCache.getCacheFiles(context.getConfiguration()); 

Path[] cacheFilePaths = DistributedCache.getLocalCacheFiles(conf); 
for (Path cacheFilePath : cacheFilePaths) {  
    DataInputStream fileInputStream = fs.open(cacheFilePath); 
} 
bloomFilter.readFields(fileInputStream); 
fileInputStream.close(); 

另外,我覺得您使用的地圖方聯接因爲你正在使用中映射的分佈式緩存不減少副作用。

+0

感謝您的回答,但問題依然存在:函數readFields在ecxeption中:java.io.EOFException:在java.io.DataInputStream.readInt /在org.apache.hadoop.util.bloom.Filter.readFields /在org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso

+0

這是減少邊連接,因爲連接在減少邊完成,我試圖在映射端過濾我的輸入之前比發送PKV到減速器 – Fatiso