我有處理從HDFS並存儲輸出數據的數據在HDFS如何存儲從在MongoDB中使用的MapReduce作爲輸出
但是,現在我需要存儲的輸出數據中的mongodb insted的的映射縮減應用HDFS處理的數據將它存儲到HDFS中
任何人都可以讓我知道如何做到這一點?
謝謝
映射類
package com.mapReduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FMapper extends Mapper<LongWritable, Text, Text, Text> {
private String pART;
private String actual;
private String fdate;
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
String tempString = ivalue.toString();
String[] data = tempString.split(",");
pART=data[1];
try{
fdate=convertyymmdd(data[0]);
/**IF ACTUAL IS LAST HEADER
* actual=data[2];
* */
actual=data[data.length-1];
context.write(new Text(pART), new Text(fdate+","+actual+","+dynamicVariables(data)));
}catch(ArrayIndexOutOfBoundsException ae){
System.err.println(ae.getMessage());
}
}
public static String convertyymmdd(String date){
String dateInString=null;
String data[] =date.split("/");
String month=data[0];
String day=data[1];
String year=data[2];
dateInString =year+"/"+month+"/"+day;
System.out.println(dateInString);
return dateInString;
}
public static String dynamicVariables(String[] data){
StringBuilder str=new StringBuilder();
boolean isfirst=true;
/** IF ACTUAL IS LAST HEADER
* for(int i=3;i<data.length;i++){ */
for(int i=2;i<data.length-1;i++){
if(isfirst){
str.append(data[i]);
isfirst=false;
}
else
str.append(","+data[i]);
}
return str.toString();
}
}
減速器類
package com.mapReduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import javax.faces.bean.ApplicationScoped;
import javax.faces.bean.ManagedBean;
import javax.faces.bean.ManagedProperty;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.ihub.bo.ForcastBO;
import com.ihub.service.ForcastService;
public class FReducer extends Reducer<Text, Text, Text, Text> {
private String pART;
private List<ForcastBO> list = null;
private List<List<String>> listOfList = null;
private List<String> vals = null;
private static List<ForcastBO> forcastBos=new ArrayList<ForcastBO>();
@Override
public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
pART = _key.toString();
// process values
for (Text val : values) {
String tempString = val.toString();
String[] data = tempString.split(",");
ForcastBO fb=new ForcastBO();
fb.setPart(pART);
fb.setDate(data[0]);
fb.setActual(data[1]);
fb.setW0(data[2]);
fb.setW1(data[3]);
fb.setW2(data[4]);
fb.setW3(data[5]);
fb.setW4(data[6]);
fb.setW5(data[7]);
fb.setW6(data[8]);
fb.setW7(data[9]);
try {
list.add(fb);
} catch (Exception ae) {
System.out.println(ae.getStackTrace() + "****" + ae.getMessage() + "*****" + ae.getLocalizedMessage());
}
}
}
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
listOfList = new ArrayList<List<String>>();
list=new ArrayList<ForcastBO>();
reduce(context.getCurrentKey(), context.getValues(), context);
files_WE(listOfList, list, context);
}
}finally {
cleanup(context);
}
}
public void files_WE(List<List<String>> listOfList, List<ForcastBO> list, Context context) {
Collections.sort(list);
try {
setData(listOfList, list);
Collections.sort(listOfList, new Comparator<List<String>>() {
@Override
public int compare(List<String> o1, List<String> o2) {
return o1.get(0).compareTo(o2.get(0));
}
});
for (int i = listOfList.size() - 1; i > -1; i--) {
List<String> list1 = listOfList.get(i);
int k = 1;
for (int j = 3; j < list1.size(); j++) {
try {
list1.set(j, listOfList.get(i - k).get(j));
} catch (Exception ex) {
list1.set(j, null);
}
k++;
}
}
} catch (Exception e) {
//e.getLocalizedMessage();
}
for(List<String> ls:listOfList){
System.out.println(ls.get(0));
ForcastBO forcastBO=new ForcastBO();
try{
forcastBO.setPart(ls.get(0));
forcastBO.setDate(ls.get(1));
forcastBO.setActual(ls.get(2));
forcastBO.setW0(ls.get(3));
forcastBO.setW1(ls.get(4));
forcastBO.setW2(ls.get(5));
forcastBO.setW3(ls.get(6));
forcastBO.setW4(ls.get(7));
forcastBO.setW5(ls.get(8));
forcastBO.setW6(ls.get(9));
forcastBO.setW7(ls.get(10));
forcastBos.add(forcastBO);
}catch(Exception e){
forcastBos.add(forcastBO);
}
try{
System.out.println(forcastBO);
//service.setForcastBOs(forcastBos);
}catch(Exception e){
System.out.println("FB::::"+e.getStackTrace());
}
}
}
public void setData(List<List<String>> listOfList, List<ForcastBO> list) {
List<List<String>> temListOfList=new ArrayList<List<String>>();
for (ForcastBO str : list) {
vals = new ArrayList<String>();
vals.add(str.getPart());
vals.add(str.getDate());
vals.add(str.getActual());
vals.add(str.getW0());
vals.add(str.getW1());
vals.add(str.getW2());
vals.add(str.getW3());
vals.add(str.getW4());
vals.add(str.getW5());
vals.add(str.getW6());
vals.add(str.getW7());
temListOfList.add(vals);
}
Collections.sort(temListOfList, new Comparator<List<String>>() {
@Override
public int compare(List<String> o1, List<String> o2) {
return o1.get(1).compareTo(o2.get(1));
}
});
for(List<String> ls:temListOfList){
System.out.println(ls);
listOfList.add(ls);
}
}
public static List<ForcastBO> getForcastBos() {
return forcastBos;
}
}
驅動程序類
package com.mapReduce;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(MRDriver.class);
// TODO: specify a mapper
job.setMapperClass(FMapper.class);
// TODO: specify a reducer
job.setReducerClass(FReducer.class);
// TODO: specify output types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// TODO: delete temp file
FileSystem hdfs = FileSystem.get(new URI("hdfs://localhost:9000"),
conf);
Path workingDir=hdfs.getWorkingDirectory();
Path newFolderPath= new Path("/sd1");
newFolderPath=Path.mergePaths(workingDir, newFolderPath);
if(hdfs.exists(newFolderPath))
{
hdfs.delete(newFolderPath); //Delete existing Directory
}
// TODO: specify input and output DIRECTORIES (not files)
FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/Forcast/SampleData"));
FileOutputFormat.setOutputPath(job, newFolderPath);
if (!job.waitForCompletion(true))
return;
}
}
首先你需要從HDFS讀取代碼,然後你需要一個MongoDB驅動並編寫你的代碼到MongoDB,或者直接從你的「reducer」或最後階段直接輸出到MongoDB。基本上爲你的語言獲得一個驅動程序(hadoop支持一對不同的模式,但也許你的意思是Java),然後連接和寫入。首先了解驅動程序。 –
你處理的數據是什麼格式?您可以隨時調用Reducer內的MongoDB客戶端,並將數據寫入清理部分(例如)。如果您希望我們提供幫助,請提供更多細節。 – void
處理的數據是在列表合成 –