2014-02-23 57 views
4

我工作的一個項目,使用卡桑德拉1.2,Hadoop的1.2創建ColumnFamilyInputFormat的自定義InputFormat爲卡桑德拉

我已經創建了我的正常卡桑德拉映射器和減速,但我想創建自己的輸入格式類,它會從cassandra讀取記錄,我將通過使用分割和索引分割該值來獲得期望的列值,因此,我計劃創建自定義的Format類。但我很困惑,不知道,我會怎麼做?什麼課要擴展和實現,以及如何我將能夠去取行鍵,列名,列值等

我有我的Mapperclass如下:

public class MyMapper extends 
      Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, Text> { 
     private Text word = new Text(); 
     MyJDBC db = new MyJDBC(); 

     public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, 
       Context context) throws IOException, InterruptedException { 

      long std_id = Long.parseLong(ByteBufferUtil.string(key)); 
      long newSavePoint = 0; 
      if (columns.values().isEmpty()) { 
      System.out.println("EMPTY ITERATOR"); 
      sb.append("column_N/A" + ":" + "N/A" + " , ");     
      } else { 
       for (IColumn cell : columns.values()) { 
        name = ByteBufferUtil.string(cell.name()); 
        String value = null; 
        if (name.contains("int")) { 
        value = String.valueOf(ByteBufferUtil.toInt(cell.value())); 
        } else { 
        value = ByteBufferUtil.string(cell.value()); 
        } 
       String[] data = value.toString().split(","); 
       // if (data[0].equalsIgnoreCase("login")) { 
        Long[] dif = getDateDiffe(d1, d2); 

// logics i want to perform inside my custominput class , rather here, i just want a simple mapper class   
if (condition1 && condition2) {    
myhits++; 
sb.append(":\t " + data[0] + " " + data[2] + " "+ data[1] /* + " " + data[3] */+ "\n"); 
newSavePoint = d2; 
} 
} 
sb.append("~" + like + "~" + newSavePoint + "~"); 
word.set(sb.toString().replace("\t", "")); 
} 

db.setInterval(Long.parseLong(ByteBufferUtil.string(key)), newSavePoint); 
db.setHits(Long.parseLong(ByteBufferUtil.string(key)), like + ""); 
context.write(new Text(ByteBufferUtil.string(key)), word); 
} 

我想減少我的Mapper類邏輯,並且想對我的自定義輸入類執行相同的計算。

請幫幫忙,我想從stackies正r4esponse ...

回答

0

您可以通過映射邏輯移動到您的自定義輸入類(如您已表示已經)

我做的目的任務發現這nice post它解釋了類似的問題陳述,因爲你有。我認爲這可能會解決你的問題。