2017-06-24 59 views
0

我試圖從分區0消費數據並將接收到的數據寫入hdfs上的文件,並且拋出異常,並且我沒有看到任何數據正在寫入HDFS文件無法寫入從kafka消費者使用hdfs的數據

import java.util 

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs.FileSystem 
import org.apache.hadoop.fs.Path 
import java.util.Properties 

import org.apache.kafka.clients.consumer.KafkaConsumer 


import scala.collection.JavaConverters._ 

object WeatherCons { 

    def main(args: Array[String]): Unit = { 

    val TOPIC = "again" 

    val props = new Properties() 
    props.put("consumer.timeout.ms", "1500") 
    props.put("bootstrap.servers", "104.197.102.208:9092") 
    props.put("key.deserializer", 
    "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", 
    "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("group.id", "something") 

    val consumer = new KafkaConsumer[String, String](props) 

    consumer.subscribe(util.Collections.singletonList(TOPIC)) 
    /* val topicPartition = new TopicPartition(TOPIC, 0) 
    consumer.seekToBeginning(topicPartition)*/ 
    val conf = new Configuration() 
    conf.set("fs.defaultFS", "hdfs://104.197.102.208:8020") 
    val fs = FileSystem.get(conf) 
    import org.apache.hadoop.fs.FSDataOutputStream 
    val fin:FSDataOutputStream = fs.create(new 
    Path("/prash/mySample3.txt")) 
    while (true) { 
     val records = consumer.poll(100) 
     for (record <- records.asScala) { 

     val co = record.value().toString 
     fin.writeUTF(co) 
     fin.writeUTF("\n") 

     println(co) 
     } 
     fin.close() 
     fin.flush() 
    } 
    } 
} 

它拋出異常如下,沒有數據被寫入HDFS

Exception in thread "main" java.nio.channels.ClosedChannelException 
     at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1940) 
     at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105) 
     at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) 
     at java.io.DataOutputStream.write(DataOutputStream.java:107) 
     at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401) 
     at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) 

回答

0

多試while循環之後接近fin沖洗。