2013-05-14 38 views
1

需要幫助,這似乎是一個常見的任務: 我們每小時都有大量包含許多不同事件的巨大日誌文件。 我們一直在使用蜂巢分裂這些事件到不同的文件,在一個硬編碼方式:分割日誌文件的最佳方法

from events 
    insert overwrite table specificevent1 
    where events.event_type='specificevent1' 
    insert overwrite table specificevent2 
    where events.event_type='specificevent2' 
...; 

這是有問題的代碼必須爲我們添加的每個新的事件而改變。

我們嘗試使用動態分區做一個自動解析,但遇到問題:

  1. 如果我的分區模式是/year/month/day/hour/event那麼我們就不能恢復超過一天的分區,每月將是〜數( 30天)(24小時)(100〜事件)=〜72K這是太多的工作。
  2. 如果我的模式是event/year/month/day/hour,那麼因爲事件是動態部分,所以它會強制將下一個分區編寫爲動態腳本,並且這會導致分區需要更多時間,因爲分區數量增加。

有沒有更好的方法來做到這一點(Hive和非Hive解決方案)?

回答

0

希望這會幫助別人......

我發現蜂巢是不要去,如果你想將日誌文件分割到許多不同的文件(每個文件EVENT_TYPE)的方式。 Hive提供的動態分區有太多限制恕我直言。

我最終做的是寫一個自定義的map-reduce jar。 我還發現舊的Hadoop接口更合適,因爲它提供了允許您實現generateFileNameForKeyValue()的MultipleTextOutputFormat抽象類。 (新的Hadoop提供了多個不同的輸出文件的機制:MultipleOutputs如果你有預定義的輸出位置這是偉大的,沒有得到如何讓他們從鍵值飛)

示例代碼:

\* 
Run example: 
hadoop jar DynamicSplit.jar DynamicEventSplit.DynamicEventSplitMultifileMapReduce /event/US/incoming/2013-01-01-01/ event US 2013-01-01-01 2 "[a-zA-Z0-9_ ]+" "/event/dynamicsplit1/" "," 
*/ 
package DynamicEventSplit; 

import java.io.*; 
import java.util.*; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 
import org.apache.hadoop.mapred.lib.*; 
import java.io.IOException; 
import java.util.Iterator; 

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.RecordWriter; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.util.Progressable; 

public class DynamicEventSplitMultifileMapReduce 
{ 
     static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
     { 
      private String event_name; 
      private String EventNameRegexp; 
      private int EventNameColumnNumber; 
      private String columndelimeter=","; 

      public void configure(JobConf job) 
      { 
       EventNameRegexp=job.get("EventNameRegexp"); 
       EventNameColumnNumber=Integer.parseInt(job.get("EventNameColumnNumber")); 
       columndelimeter=job.get("columndelimeter"); 
      } 
      public void map(LongWritable key, Text value,OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
      { 
       //check that expected event_name field exists 
       String [] dall=value.toString().split(columndelimeter); 
       if (dall.length<EventNameColumnNumber) 
       { 
        return; 
       } 
       event_name=dall[EventNameColumnNumber-1]; 
       //check that expected event_name is valid 
       if (!event_name.matches(EventNameRegexp)) 
       { 
        return; 
       } 
       output.collect(new Text(dall[1]),value); 
      } 
     } 

     static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
     { 
      public void reduce(Text key, Iterator<Text> values,OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
      { 
        while (values.hasNext()) 
        { 
         output.collect(key, values.next()); 
        } 
      } 
     } 


     static class MultiFileOutput extends MultipleTextOutputFormat<Text, Text> 
     { 
      private String event_name; 
      private String site; 
      private String event_date; 
      private String year; 
      private String month; 
      private String day; 
      private String hour; 
      private String basepath; 


      public RecordWriter<Text,Text> getRecordWriter(FileSystem fs, JobConf job,String name, Progressable arg3) throws IOException 
      { 
       RecordWriter<Text,Text> rw=super.getRecordWriter(fs, job, name, arg3); 
       site=job.get("site"); 
       event_date=job.get("date"); 
       year=event_date.substring(0,4); 
       month=event_date.substring(5,7); 
       day=event_date.substring(8,10); 
       hour=event_date.substring(11,13); 
       basepath=job.get("basepath"); 
       return rw; 
      } 

      protected String generateFileNameForKeyValue(Text key, Text value,String leaf) 
      { 
       event_name=key.toString(); 
       return basepath+"event="+event_name+"/site="+site+"/year="+year+"/month="+month+"/day="+day+"/hour="+hour+"/"+leaf; 
      } 

      protected Text generateActualKey(Text key, Text value) 
      { 
       return null; 
      } 
     } 

     public static void main(String[] args) throws Exception 
     { 
       String InputFiles=args[0]; 
       String OutputDir=args[1]; 
       String SiteStr=args[2]; 
       String DateStr=args[3]; 
       String EventNameColumnNumber=args[4]; 
       String EventNameRegexp=args[5]; 
       String basepath=args[6]; 
       String columndelimeter=args[7]; 

       Configuration mycon=new Configuration(); 
       JobConf conf = new JobConf(mycon,DynamicEventSplitMultifileMapReduce.class); 
       conf.set("site",SiteStr); 
       conf.set("date",DateStr); 

       conf.setOutputKeyClass(Text.class); 
       conf.setMapOutputKeyClass(Text.class); 
       conf.setOutputValueClass(Text.class); 

       conf.setMapperClass(Map.class); 
       conf.setReducerClass(Reduce.class); 

       conf.setInputFormat(TextInputFormat.class); 
       conf.setOutputFormat(MultiFileOutput.class); 

       conf.setMapSpeculativeExecution(false); 
       conf.setReduceSpeculativeExecution(false); 

       FileInputFormat.setInputPaths(conf,InputFiles); 
       FileOutputFormat.setOutputPath(conf,new Path("/"+OutputDir+SiteStr+DateStr+"/")); 

       conf.set("EventNameColumnNumber",EventNameColumnNumber); 
       conf.set("EventNameRegexp",EventNameRegexp); 
       conf.set("basepath",basepath); 
       conf.set("columndelimeter",columndelimeter); 

       JobClient.runJob(conf); 
     } 
} 
相關問題