2017-01-27 41 views
0

我是大數據新手,我有使用Flume將csv文件轉移到HDFS的任務,但它也應該將這些csv轉換爲avro。我想這樣做使用下面的水槽配置:將csv文件轉換爲hdfs,並將它們轉換爲avro,使用流水線

a1.channels = dataChannel 
a1.sources = dataSource 
a1.sinks = dataSink 

a1.channels.dataChannel.type = memory 
a1.channels.dataChannel.capacity = 1000000 
a1.channels.dataChannel.transactionCapacity = 10000 

a1.sources.dataSource.type = spooldir 
a1.sources.dataSource.spoolDir = {spool_dir} 
a1.sources.dataSource.fileHeader = true 
a1.sources.dataSource.fileHeaderKey = file 
a1.sources.dataSource.basenameHeader = true 
a1.sources.dataSource.basenameHeaderKey = basename 
a1.sources.dataSource.interceptors.attach-schema.type = static 
a1.sources.dataSource.interceptors.attach-schema.key = flume.avro.schema.url 
a1.sources.dataSource.interceptors.attach-schema.value = {path_to_schema_in_hdfs} 

a1.sinks.dataSink.type = hdfs 
a1.sinks.dataSink.hdfs.path = {sink_path} 
a1.sinks.dataSink.hdfs.format = text 
a1.sinks.dataSink.hdfs.inUsePrefix = . 
a1.sinks.dataSink.hdfs.filePrefix = drone 
a1.sinks.dataSink.hdfs.fileSuffix = .avro 
a1.sinks.dataSink.hdfs.rollSize = 180000000 
a1.sinks.dataSink.hdfs.rollCount = 100000 
a1.sinks.dataSink.hdfs.rollInterval = 120 
a1.sinks.dataSink.hdfs.idleTimeout = 3600 
a1.sinks.dataSink.hdfs.fileType = DataStream 
a1.sinks.dataSink.serializer = avro_event 

其中與水槽的默認schema.I的Avro文件還試圖用AvroEventSerializer的輸出,但我得到了很多不同的錯誤,我解決了所有的人,除了這一個:

ERROR hdfs.HDFSEventSink: process failed 
java.lang.ExceptionInInitializerError 
     at org.apache.hadoop.hdfs.DFSOutputStream.computePacketChunkSize(DFSOutputStream.java:1305) 
     at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1243) 
     at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1266) 
     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1101) 
     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1059) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:232) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:75) 

謝謝你的任何幫助。

回答

1

Sory在配置錯誤。我修復了它們,並找到了將css轉換爲avro的方法。我一點點修改AvroEventSerializer這樣:

public void write(Event event) throws IOException { 
     if (dataFileWriter == null) { 
      initialize(event); 
     } 
     String[] items = new String(event.getBody()).split(","); 
     city.put("deviceID", Long.parseLong(items[0])); 
     city.put("groupID", Long.parseLong(items[1])); 
     city.put("timeCounter", Long.parseLong(items[2])); 
     city.put("cityCityName", items[3]); 
     city.put("cityStateCode", items[4]); 
     city.put("sessionCount", Long.parseLong(items[5])); 
     city.put("errorCount", Long.parseLong(items[6])); 
     dataFileWriter.append(citi); 
    } 

這裏是city定義:

private GenericRecord city = null; 

請回復,如果你知道一個更好的方式來做到這一點