0
我試圖從分區0消費數據並將接收到的數據寫入hdfs上的文件,並且拋出異常,並且我沒有看到任何數據正在寫入HDFS文件無法寫入從kafka消費者使用hdfs的數據
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object WeatherCons {
def main(args: Array[String]): Unit = {
val TOPIC = "again"
val props = new Properties()
props.put("consumer.timeout.ms", "1500")
props.put("bootstrap.servers", "104.197.102.208:9092")
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "something")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC))
/* val topicPartition = new TopicPartition(TOPIC, 0)
consumer.seekToBeginning(topicPartition)*/
val conf = new Configuration()
conf.set("fs.defaultFS", "hdfs://104.197.102.208:8020")
val fs = FileSystem.get(conf)
import org.apache.hadoop.fs.FSDataOutputStream
val fin:FSDataOutputStream = fs.create(new
Path("/prash/mySample3.txt"))
while (true) {
val records = consumer.poll(100)
for (record <- records.asScala) {
val co = record.value().toString
fin.writeUTF(co)
fin.writeUTF("\n")
println(co)
}
fin.close()
fin.flush()
}
}
}
它拋出異常如下,沒有數據被寫入HDFS
Exception in thread "main" java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1940)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:401)
at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)