2017-01-12 52 views
0

異常線程 「main」 java.nio.channels.ClosedChannelException 在kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 在kafka.consumer.SimpleConsumer.liftedTree1 $ 1 (SimpleConsumer.scala:78) at kafka.consumer.SimpleConsumer.kafka $ consumer $ SimpleConsumer $$ sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91) at kafka。 javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68) at cmb.SparkStream.kafka.kafkaOffsetTool.getTopicOffsets(kafkaOffsetTool.java:47) at cmb.SparkStream.LogClassify.main(LogClassify.java:95) 在sun.reflect.NativeMethodAccessorImpl.invoke0(本機方法) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang中.reflect.Method.invoke(Method.java:606) 在org.apache.spark.deploy.SparkSubmit $ .ORG $阿帕奇$火花$部署$ SparkSubmit $$ runMain(SparkSubmit.scala:729) 在org.apache .spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)得到的話題和分區偏移

我的代碼是:

public static Map<TopicAndPartition, Long> getTopicOffsets(String zkServers, String topic) { 

    Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>(); 
    for (String zkserver : zkServers.split(",")) { 
    SimpleConsumer simpleConsumer = new SimpleConsumer(zkserver.split(":")[0], 
    Integer.valueOf(zkserver.split(":")[1]), Consts.getKafkaConfigBean().getDuration(), 1024, 
    "consumser"); 
    TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic)); 

    TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest); 

    for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) { 
    for (PartitionMetadata part : metadata.partitionsMetadata()) { 
    Broker leader = part.leader(); 
    if (leader != null) { 
     TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId()); 

     PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(
     kafka.api.OffsetRequest.LatestTime(), 10000); 
     OffsetRequest offsetRequest = new OffsetRequest(
     ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo), 
     kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()); 
     OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest); 
     if (!offsetResponse.hasError()) { 
     long[] offsets = offsetResponse.offsets(topic, part.partitionId()); 
     retVals.put(topicAndPartition, offsets[0]); 
     } 
    } 

    } 

    } 
    simpleConsumer.close(); 
    } 
    return retVals; 
} 

回答

1

我認爲你可能會過於複雜的事情。使用org.apache.kafka.clients.consumer.KafkaConsumer(消費者在這裏),做類似的事情,以

val partitions = consumer.partitionsFor(topic).map[new TopicPartition(topic,it.partition)] 
    consumer.assign(partitions) 
    consumer.seekToEnd(partitions) 
    val offsets = partitions.map[ it -> consumer.position(it)] 
    println(offsets) 

,你將得到的結果一樣

[topicname-8-> 1917258,topicname -2- > 1876810,topicname-5-> 1857012,topicname-4-> 3844,topicname-7-> 4043972,topicname-1-> 1811078,topicname-9-> 12217819,topicname-3-> 3844,topicname-6-> 1430021,topicname-0-> 2808969]