我有通過Morphline接收器索引在Solr中的csv文件。 我想在Morphlines中編寫一個自定義的Java函數,在索引到SOLR之前對其中一個csv字段進行哈希運算。 我試圖修改此示例:http://blog.cloudera.com/blog/2014/04/how-to-process-data-using-morphlines-in-kite-sdk/(請參閱將數據導入到Hadoop集羣部分) 但我不確定何時應存儲Java類。Morphlines Java自定義命令類的存儲位置?
所以在這個例子中,一個公開可用的「百萬歌」數據集是從Last.fm中挑選出來的。該想法是在給定日期之前和之後選擇歌曲。
新Morphline命令的配置是:
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**", "com.sequenceiq.lastfm.etl.**"]
commands : [
{
readJson {
outputClass : java.util.Map
}
}
{
latestSongs {
field : timestamp
operator: >
pattern: "2011-08-03"
}
}
]
}]
和相應的Java代碼:
@Override
protected boolean doProcess(Record record) {
Map attachmentBody = (Map) record.get(Fields.ATTACHMENT_BODY).get(0);
String fieldValue = attachmentBody.get(fieldName).toString();
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date fieldDate = sdf.parse(fieldValue);
Date patternDate = sdf.parse(pattern + " 00:00:00");
if (operator.equals(HIGHER)) {
if (!fieldDate.after(patternDate)) {
return true;
}
} else if (operator.equals(LOWER)) {
if (!fieldDate.before(patternDate)) {
return true;
}
} else if (operator.equals(EQUALS)) {
if (fieldDate.getYear() != patternDate.getYear()
|| fieldDate.getMonth() != patternDate.getMonth()
|| fieldDate.getDay() != patternDate.getDay()) {
return true;
}
} else {
LOG.info("bad operator syntax");
}
} catch (Exception e) {
LOG.info("parse exception: " + e.getMessage());
return false;
}
record.removeAll(Fields.ATTACHMENT_BODY);
try {
record.put(Fields.MESSAGE, OBJECTMAPPER.writeValueAsString(attachmentBody));
} catch (JsonProcessingException e) {
LOG.info("parse exception: " + e.getMessage());
return false;
}
return super.doProcess(record);
}
任何幫助表示讚賞!