直接的解決方案應該是使用四個HdfsBolt
和一個額外的「LogSplitBolt」發出四個輸出流爲每個日誌級別):
public class LogSplitBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
Fields schema = new Fields(...);
declarer.declareStream("debug", schema);
declarer.declareStream("error", schema);
declarer.declareStream("warnings", schema);
declarer.declareStream("info", schema);
}
public void execute(Tuple input) {
String logLevel = input.getXXX(...); // get log level
// use logLevel as output streamId
collector.emit(logLevel, new Values(input.getValues());
}
}
當你建立你的拓撲結構,不同的HdfsBolt
小號訂閱不同的數據流:
builder.addBolt("splitter", new LogSplitBolt());
builder.addbolt("writeDebug", new HdfsBolt(...)).localOfShuffle("debug", "splitter");
builder.addbolt("writeError", new HdfsBolt(...)).localOfShuffle("error", "splitter");
builder.addbolt("writeWarning", new HdfsBolt(...)).localOfShuffle("warning", "splitter");
builder.addbolt("writeInfo", new HdfsBolt(...)).localOfShuffle("info", "splitter");
當然,每個HdfsBolt
被配置到自己的文件在HDFS中。
是的,或者你可以寫一個單一的螺栓來完成所有這些。 –