2012-09-19 117 views
1

我想將不同的文件附加到不同的縮減器。在hadoop中使用分佈式緩存技術有可能嗎?分佈式緩存文件Hadoop

我能夠在同一文件(文件)連接到所有的減速。但由於內存限制,我想知道是否可以將不同的文件附加到不同的縮減器。

如果一個無知的問題,請原諒我。

請幫忙!

在此先感謝!

回答

0

這是一個奇怪的願望,因爲任何減速器未綁定到特定節點,並在執行中的減速可以在任何節點或甚至節點上運行(如果存在故障或推測執行)。因此所有的縮減器應該是同質的,唯一不同的是他們處理的數據。

所以我想當你說你想把不同的文件放在不同的reducer上時,你實際上想把不同的文件放在reducer上,這些文件應該和這些reducer將要處理的數據(鍵)相對應。

的只有我自己知道這樣做是把你的數據在HDFS和減速機讀它時,它開始處理數據的一種方法。

+0

嗨,感謝您的答覆!如何從減速器讀取數據?據我所知,我們可以從映射器讀取數據,即在jobconf中設置數據,從hdfs中讀取數據的輸入目錄,並將其傳遞給reducer,但有沒有辦法讀取來自減速機的hdfs的數據? –

+0

您可以使用API​​直接從HDFS看,這裏有一個例子 - https://github.com/rystsov/learning-hadoop/blob/master/src/main/java/com/twitter/rystsov/hdfs/SimpleRead。 java – rystsov

+0

嗨,我看到了鏈接中的示例。 –

1

此外,它可能值得嘗試使用像GridGain,Infinispan等內存計算/數據網格技術...這種方式,你可以加載你的數據在內存中,你不會有任何限制如何映射您的計算作業(地圖/縮小)到使用數據關聯性的任何數據。

+0

感謝您的回覆!將對此做更多的研究! –

+0

你能告訴我們更多關於這項技術嗎?有沒有一種方法可以將它與hadoop集成,或者我必須重新開始針對我面臨的問題編寫網格編碼的代碼? –

0
package com.a; 

import javax.security.auth.login.Configuration; 

import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.MapReduceBase; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reducer; 
import org.apache.hadoop.mapred.Reporter; 

public class PrefixNew4Reduce4 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{ 
// @SuppressWarnings("unchecked") 


ArrayList<String> al = new ArrayList<String>(); 
public void configure(JobConf conf4) 
{ 

    String from = "home/users/mlakshm/haship"; 

    OutputStream dst = null; 
    try { 
     dst = new BufferedOutputStream(new FileOutputStream(to, false)); 
    } catch (FileNotFoundException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } /* src (hdfs file) something like hdfs://127.0.0.1:8020/home/rystsov/hi           */ 


    FileSystem fs = null; 
    try { 
     fs = FileSystem.get(new URI(from), conf4); 
    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } catch (URISyntaxException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
    FSDataInputStream src; 
    try { 
     src = fs.open(new Path(from)); 

     String val = src.readLine(); 
     StringTokenizer st = new StringTokenizer(val); 

     al.add(val); 


     System.out.println("val:----------------->"+val); 

    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 



} 



    public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { 


     StringTokenizer stk = new StringTokenizer(key.toString()); 
     String t = stk.nextToken(); 
     String i = stk.nextToken(); 
     String j = stk.nextToken(); 

    ArrayList<String> al1 = new ArrayList<String>(); 

      for(int i = 0; i<al.size(); i++) 
      { 

        boolean a = (al.get(i).equals(i)) || (al.get(i).equals(j)); 

        if(a==true) 
        { 

         output.collect(key, new Text(al.get(i));        

        } 


     while(values.hasNext()) 
      { 

      String val = values.next().toString(); 
      al1.add(val); 

     } 

for(int i = 0; i<al1.size(); i++) 
{ 
output.collect(key, new Text(al1.get(i)); 
} 
相關問題