2015-06-19 21 views
0

我是Avro格式的新手。我正在嘗試使用Storm-Jms噴口從JMS隊列中收集Avro消息,並使用hdfs螺栓將它們發送到hdfs。Storm-jms Spout收集Avro消息併發送下載流?

隊列正在發送avro,但我無法使用HDFS BOLT以avro格式獲取它們。

如何正確收集avro消息並將它們發送到下游,而不會在hdfs中產生編碼錯誤。

+0

您應該向您的問題添加異常消息。 –

+0

您好約書亞我沒有得到任何異常暴風雨我能夠從JMS讀取數據並將其放置hdfs,但在閱讀使用HDFS-bolt放置在hdfs中的.avro文件時,當我嘗試讀取文件時出現錯誤使用HIVE。這是錯誤:java.io.IOException:java.io.IOException:不是數據文件。 – RAJESH

+0

我認爲Storm需要一些與HDFS BOLT中的Flume Avroevent序列化程序相似的東西。 – RAJESH

回答

0

現有的HDFS Bolt不支持寫入avro文件,我們需要通過進行以下更改來克服此問題。在這個示例代碼中,我使用從我的噴口獲取JMS消息並將這些JMS字節消息轉換爲AVRO並將它們發送到HDFS。

此代碼可以作爲修改AbstractHdfsBolt中的方法的示例。

public void execute(Tuple tuple) {   
     try {    
      long length = bytesMessage.getBodyLength(); 
      byte[] bytes = new byte[(int)length]; 
      /////////////////////////////////////// 
      bytesMessage.readBytes(bytes); 
      String replyMessage = new String(bytes, "UTF-8"); 

      datumReader = new SpecificDatumReader<IndexedRecord>(schema); 
      decoder = DecoderFactory.get().binaryDecoder(bytes, null); 

      result = datumReader.read(null, decoder);        
      synchronized (this.writeLock) {     
       dataFileWriter.append(result);          
       dataFileWriter.sync(); 
       this.offset += bytes.length;      
       if (this.syncPolicy.mark(tuple, this.offset)) { 
        if (this.out instanceof HdfsDataOutputStream) { 
         ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); 
        } else { 
         this.out.hsync(); 
         this.out.flush(); 
        } 
        this.syncPolicy.reset(); 
       } 
       dataFileWriter.flush(); 
      } 

      if(this.rotationPolicy.mark(tuple, this.offset)){ 
       rotateOutputFile(); // synchronized 
       this.offset = 0; 
       this.rotationPolicy.reset(); 
      } 
     } catch (IOException | JMSException e) { 
      LOG.warn("write/sync failed.", e); 
      this.collector.fail(tuple); 
     } 
    } 

@Override 
void closeOutputFile() throws IOException { 
    this.out.close(); 
} 

@Override 
Path createOutputFile() throws IOException { 
    Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); 
    this.out = this.fs.create(path); 
    dataFileWriter.create(schema, out); 
    return path; 
} 

@Override 
void doPrepare(Map conf, TopologyContext topologyContext,OutputCollector collector) throws IOException { 
    // TODO Auto-generated method stub 
    LOG.info("Preparing HDFS Bolt..."); 
    try { 

      schema = new Schema.Parser().parse(new File("/home/*******/********SchemafileName.avsc")); 
     } catch (IOException e1) {    
      e1.printStackTrace(); 
     } 
    this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig); 
    datumWriter = new SpecificDatumWriter<IndexedRecord>(schema); 
    dataFileWriter = new DataFileWriter<IndexedRecord>(datumWriter); 
    JMSAvroUtils JASV = new JMSAvroUtils();   
}