2015-09-10 31 views
0

我有處理從HDFS並存儲輸出數據的數據在HDFS如何存儲從在MongoDB中使用的MapReduce作爲輸出

但是,現在我需要存儲的輸出數據中的mongodb insted的的映射縮減應用HDFS處理的數據將它存儲到HDFS中

任何人都可以讓我知道如何做到這一點?

謝謝

映射類

package com.mapReduce; 

import java.io.IOException; 

import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

public class FMapper extends Mapper<LongWritable, Text, Text, Text> { 
    private String pART; 
    private String actual; 
    private String fdate; 
    public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException { 
     String tempString = ivalue.toString(); 
     String[] data = tempString.split(","); 
     pART=data[1]; 
     try{ 
      fdate=convertyymmdd(data[0]); 
      /**IF ACTUAL IS LAST HEADER 
      * actual=data[2]; 
      * */ 
      actual=data[data.length-1]; 
      context.write(new Text(pART), new Text(fdate+","+actual+","+dynamicVariables(data))); 
     }catch(ArrayIndexOutOfBoundsException ae){ 
      System.err.println(ae.getMessage()); 
     } 

    } 


    public static String convertyymmdd(String date){ 

     String dateInString=null; 
     String data[] =date.split("/"); 
     String month=data[0]; 
     String day=data[1]; 
     String year=data[2]; 
     dateInString =year+"/"+month+"/"+day; 
     System.out.println(dateInString); 
     return dateInString; 
    } 

    public static String dynamicVariables(String[] data){ 
     StringBuilder str=new StringBuilder(); 
     boolean isfirst=true; 
    /** IF ACTUAL IS LAST HEADER 
    * for(int i=3;i<data.length;i++){ */ 
     for(int i=2;i<data.length-1;i++){ 

      if(isfirst){ 
       str.append(data[i]); 
       isfirst=false; 
      } 
      else 
      str.append(","+data[i]); 
     } 
     return str.toString(); 
     } 

} 

減速器類

package com.mapReduce; 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.Comparator; 
import java.util.List; 

import javax.faces.bean.ApplicationScoped; 
import javax.faces.bean.ManagedBean; 
import javax.faces.bean.ManagedProperty; 

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 

import com.ihub.bo.ForcastBO; 
import com.ihub.service.ForcastService; 
public class FReducer extends Reducer<Text, Text, Text, Text> { 
    private String pART; 
    private List<ForcastBO> list = null; 
    private List<List<String>> listOfList = null; 
    private List<String> vals = null; 
    private static List<ForcastBO> forcastBos=new ArrayList<ForcastBO>(); 

    @Override 
    public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
     pART = _key.toString(); 
     // process values 
     for (Text val : values) { 
      String tempString = val.toString(); 
      String[] data = tempString.split(","); 
      ForcastBO fb=new ForcastBO(); 
      fb.setPart(pART); 
      fb.setDate(data[0]); 
      fb.setActual(data[1]); 
      fb.setW0(data[2]); 
      fb.setW1(data[3]); 
      fb.setW2(data[4]); 
      fb.setW3(data[5]); 
      fb.setW4(data[6]); 
      fb.setW5(data[7]); 
      fb.setW6(data[8]); 
      fb.setW7(data[9]); 
      try { 
       list.add(fb); 
      } catch (Exception ae) { 
       System.out.println(ae.getStackTrace() + "****" + ae.getMessage() + "*****" + ae.getLocalizedMessage()); 
      } 
     } 
    } 

    @Override 
    public void run(Context context) throws IOException, InterruptedException { 
     setup(context); 
     try { 
      while (context.nextKey()) { 

     listOfList = new ArrayList<List<String>>(); 
     list=new ArrayList<ForcastBO>(); 
      reduce(context.getCurrentKey(), context.getValues(), context); 
      files_WE(listOfList, list, context); 

      } 

      }finally { 
       cleanup(context); 
      } 
    } 


    public void files_WE(List<List<String>> listOfList, List<ForcastBO> list, Context context) { 

     Collections.sort(list); 

      try { 
       setData(listOfList, list); 

       Collections.sort(listOfList, new Comparator<List<String>>() { 
        @Override 
        public int compare(List<String> o1, List<String> o2) { 
         return o1.get(0).compareTo(o2.get(0)); 
        } 
       }); 

       for (int i = listOfList.size() - 1; i > -1; i--) { 
        List<String> list1 = listOfList.get(i); 
        int k = 1; 
        for (int j = 3; j < list1.size(); j++) { 
         try { 
          list1.set(j, listOfList.get(i - k).get(j)); 
         } catch (Exception ex) { 
          list1.set(j, null); 
         } 
         k++; 
        } 

       } 
      } catch (Exception e) { 
       //e.getLocalizedMessage(); 
      } 

      for(List<String> ls:listOfList){ 
       System.out.println(ls.get(0)); 
       ForcastBO forcastBO=new ForcastBO(); 
       try{ 
        forcastBO.setPart(ls.get(0)); 
        forcastBO.setDate(ls.get(1)); 
        forcastBO.setActual(ls.get(2)); 
        forcastBO.setW0(ls.get(3)); 
        forcastBO.setW1(ls.get(4)); 
        forcastBO.setW2(ls.get(5)); 
        forcastBO.setW3(ls.get(6)); 
        forcastBO.setW4(ls.get(7)); 
        forcastBO.setW5(ls.get(8)); 
        forcastBO.setW6(ls.get(9)); 
        forcastBO.setW7(ls.get(10)); 
        forcastBos.add(forcastBO); 
        }catch(Exception e){ 
         forcastBos.add(forcastBO); 
        } 
       try{ 
        System.out.println(forcastBO); 
        //service.setForcastBOs(forcastBos); 
      }catch(Exception e){ 
       System.out.println("FB::::"+e.getStackTrace()); 
      } 
      } 
    } 





     public void setData(List<List<String>> listOfList, List<ForcastBO> list) { 
      List<List<String>> temListOfList=new ArrayList<List<String>>(); 
      for (ForcastBO str : list) { 
       vals = new ArrayList<String>(); 
       vals.add(str.getPart()); 
       vals.add(str.getDate()); 
       vals.add(str.getActual()); 
       vals.add(str.getW0()); 
       vals.add(str.getW1()); 
       vals.add(str.getW2()); 
       vals.add(str.getW3()); 
       vals.add(str.getW4()); 
       vals.add(str.getW5()); 
       vals.add(str.getW6()); 
       vals.add(str.getW7()); 
       temListOfList.add(vals); 
      } 


      Collections.sort(temListOfList, new Comparator<List<String>>() { 
       @Override 
       public int compare(List<String> o1, List<String> o2) { 
        return o1.get(1).compareTo(o2.get(1)); 
       } 
      }); 

      for(List<String> ls:temListOfList){ 
       System.out.println(ls); 
       listOfList.add(ls); 
       } 
     } 

     public static List<ForcastBO> getForcastBos() { 
      return forcastBos; 
     } 



    } 

驅動程序類

package com.mapReduce; 

import java.net.URI; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


public class MRDriver { 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     Job job = Job.getInstance(conf, "JobName"); 
     job.setJarByClass(MRDriver.class); 
     // TODO: specify a mapper 
     job.setMapperClass(FMapper.class); 
     // TODO: specify a reducer 
     job.setReducerClass(FReducer.class); 

     // TODO: specify output types 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     // TODO: delete temp file 
     FileSystem hdfs = FileSystem.get(new URI("hdfs://localhost:9000"), 
       conf); 
     Path workingDir=hdfs.getWorkingDirectory(); 

     Path newFolderPath= new Path("/sd1"); 
     newFolderPath=Path.mergePaths(workingDir, newFolderPath); 
     if(hdfs.exists(newFolderPath)) 

     { 
      hdfs.delete(newFolderPath); //Delete existing Directory 

     } 
     // TODO: specify input and output DIRECTORIES (not files) 

     FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/Forcast/SampleData")); 
     FileOutputFormat.setOutputPath(job, newFolderPath); 

     if (!job.waitForCompletion(true)) 
      return; 
    } 
} 
+0

首先你需要從HDFS讀取代碼,然後你需要一個MongoDB驅動並編寫你的代碼到MongoDB,或者直接從你的「reducer」或最後階段直接輸出到MongoDB。基本上爲你的語言獲得一個驅動程序(hadoop支持一對不同的模式,但也許你的意思是Java),然後連接和寫入。首先了解驅動程序。 –

+0

你處理的數據是什麼格式?您可以隨時調用Reducer內的MongoDB客戶端,並將數據寫入清理部分(例如)。如果您希望我們提供幫助,請提供更多細節。 – void

+0

處理的數據是在列表合成 –

回答

0

基本上你需要的是改變「輸出格式類」,你有幾種方法有:

  1. 使用MongoDB的連接器用於Hadoop的http://docs.mongodb.org/ecosystem/tools/hadoop/?_ga=1.111209414.370990604.1441913822
  2. 實現自己OUTPUTFORMAThttps://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/OutputFormat.html(而不是使用FileOutputFormat)。減速內
  3. 執行MongoDB的查詢,而不是在映射縮減背景下

在我看來選項(不是很好,你可以使用取決於駕駛者指定的OUTPUTFORMAT在HDFS空outut文件結尾)寫1是最好的選擇,但我沒有使用MongoDB連接器來說它是否足夠穩定和功能。選項2要求你真正理解如何工作hadoop發泄,以避免結束與大量的交易和hadoop任務重試的開放連接和問題。

+0

感謝Rojo有價值的信息......它真的幫助我一個lot –

+0

Rojo是否知道如何將輸出數據存儲在對象中,而不是將其存儲到文件系統中 –

+0

您可以詳細說明您的問題嗎?如果是與原始問題不同的問題,您是否可以創建一個新問題讓具有相同問題的其他人更容易?如果我知道答案,我會盡力幫助你 – RojoSam

相關問題