2014-11-24 30 views
0

我有通過Morphline接收器索引在Solr中的csv文件。 我想在Morphlines中編寫一個自定義的Java函數,在索引到SOLR之前對其中一個csv字段進行哈希運算。 我試圖修改此示例:http://blog.cloudera.com/blog/2014/04/how-to-process-data-using-morphlines-in-kite-sdk/(請參閱將數據導入到Hadoop集羣部分) 但我不確定何時應存儲Java類。Morphlines Java自定義命令類的存儲位置?

所以在這個例子中,一個公開可用的「百萬歌」數據集是從Last.fm中挑選出來的。該想法是在給定日期之前和之後選擇歌曲。

新Morphline命令的配置是:

morphlines : [ 
    { 
     id : morphline1 
     importCommands : ["org.kitesdk.**", "com.sequenceiq.lastfm.etl.**"] 
     commands : [ 
      { 
       readJson { 
        outputClass : java.util.Map 
       } 
      } 
      { 
       latestSongs { 
        field : timestamp 
        operator: > 
        pattern: "2011-08-03" 
       } 
      } 
     ] 
    }] 

和相應的Java代碼:

@Override 
protected boolean doProcess(Record record) { 
    Map attachmentBody = (Map) record.get(Fields.ATTACHMENT_BODY).get(0); 
    String fieldValue = attachmentBody.get(fieldName).toString(); 

    try { 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 

     Date fieldDate = sdf.parse(fieldValue); 
     Date patternDate = sdf.parse(pattern + " 00:00:00"); 

     if (operator.equals(HIGHER)) { 
      if (!fieldDate.after(patternDate)) { 
       return true; 
      } 
     } else if (operator.equals(LOWER)) { 
      if (!fieldDate.before(patternDate)) { 
       return true; 
      } 
     } else if (operator.equals(EQUALS)) { 
      if (fieldDate.getYear() != patternDate.getYear() 
        || fieldDate.getMonth() != patternDate.getMonth() 
        || fieldDate.getDay() != patternDate.getDay()) { 
       return true; 
      } 
     } else { 
      LOG.info("bad operator syntax"); 
     } 
    } catch (Exception e) { 
     LOG.info("parse exception: " + e.getMessage()); 
     return false; 
    } 
    record.removeAll(Fields.ATTACHMENT_BODY); 
    try { 
     record.put(Fields.MESSAGE, OBJECTMAPPER.writeValueAsString(attachmentBody)); 
    } catch (JsonProcessingException e) { 
     LOG.info("parse exception: " + e.getMessage()); 
     return false; 
    } 
    return super.doProcess(record); 
} 

任何幫助表示讚賞!

回答