0
我是Avro格式的新手。我正在嘗試使用Storm-Jms噴口從JMS隊列中收集Avro消息,並使用hdfs螺栓將它們發送到hdfs。Storm-jms Spout收集Avro消息併發送下載流?
隊列正在發送avro,但我無法使用HDFS BOLT以avro格式獲取它們。
如何正確收集avro消息並將它們發送到下游,而不會在hdfs中產生編碼錯誤。
我是Avro格式的新手。我正在嘗試使用Storm-Jms噴口從JMS隊列中收集Avro消息,並使用hdfs螺栓將它們發送到hdfs。Storm-jms Spout收集Avro消息併發送下載流?
隊列正在發送avro,但我無法使用HDFS BOLT以avro格式獲取它們。
如何正確收集avro消息並將它們發送到下游,而不會在hdfs中產生編碼錯誤。
現有的HDFS Bolt不支持寫入avro文件,我們需要通過進行以下更改來克服此問題。在這個示例代碼中,我使用從我的噴口獲取JMS消息並將這些JMS字節消息轉換爲AVRO並將它們發送到HDFS。
此代碼可以作爲修改AbstractHdfsBolt中的方法的示例。
public void execute(Tuple tuple) {
try {
long length = bytesMessage.getBodyLength();
byte[] bytes = new byte[(int)length];
///////////////////////////////////////
bytesMessage.readBytes(bytes);
String replyMessage = new String(bytes, "UTF-8");
datumReader = new SpecificDatumReader<IndexedRecord>(schema);
decoder = DecoderFactory.get().binaryDecoder(bytes, null);
result = datumReader.read(null, decoder);
synchronized (this.writeLock) {
dataFileWriter.append(result);
dataFileWriter.sync();
this.offset += bytes.length;
if (this.syncPolicy.mark(tuple, this.offset)) {
if (this.out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
this.out.flush();
}
this.syncPolicy.reset();
}
dataFileWriter.flush();
}
if(this.rotationPolicy.mark(tuple, this.offset)){
rotateOutputFile(); // synchronized
this.offset = 0;
this.rotationPolicy.reset();
}
} catch (IOException | JMSException e) {
LOG.warn("write/sync failed.", e);
this.collector.fail(tuple);
}
}
@Override
void closeOutputFile() throws IOException {
this.out.close();
}
@Override
Path createOutputFile() throws IOException {
Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
this.out = this.fs.create(path);
dataFileWriter.create(schema, out);
return path;
}
@Override
void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException {
// TODO Auto-generated method stub
LOG.info("Preparing HDFS Bolt...");
try {
schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc"));
} catch (IOException e1) {
e1.printStackTrace();
}
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
datumWriter = new SpecificDatumWriter<IndexedRecord>(schema);
dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter);
JMSAvroUtils JASV = new JMSAvroUtils();
}
您應該向您的問題添加異常消息。 –
您好約書亞我沒有得到任何異常暴風雨我能夠從JMS讀取數據並將其放置hdfs,但在閱讀使用HDFS-bolt放置在hdfs中的.avro文件時,當我嘗試讀取文件時出現錯誤使用HIVE。這是錯誤:java.io.IOException:java.io.IOException:不是數據文件。 – RAJESH
我認爲Storm需要一些與HDFS BOLT中的Flume Avroevent序列化程序相似的東西。 – RAJESH