2014-08-28 56 views
5

我一直在使用Kafka兩個月,並且使用此代碼在本地使用消息。我最近決定分發Zookeeper和Kafka,一切似乎都很順利。我試圖從遠程IP使用客戶代碼時開始了我的問題;有一次,我改變seeds.add("127.0.0.1");seeds.add("104.131.40.xxx");我收到此錯誤信息:Kafka Java使用者僅適用於localhost,遠程服務器不能使用

run: 
Error communicating with Broker [104.131.40.xxx] to find Leader for [temperature, 0] Reason: 
java.net.ConnectException: Connection refused Can't find metadata for Topic and Partition. Exiting 
BUILD SUCCESSFUL (total time: 21 seconds)r code here 

這是我目前使用的代碼:

/* 
Kafka API consumer reads 10 readings from the "temperature" topic 
*/ 

package simpleexample; 

import kafka.api.FetchRequest; 
import kafka.api.FetchRequestBuilder; 
import kafka.api.PartitionOffsetRequestInfo; 
import kafka.common.ErrorMapping; 
import kafka.common.TopicAndPartition; 
import kafka.javaapi.*; 
import kafka.javaapi.consumer.SimpleConsumer; 
import kafka.message.MessageAndOffset; 

import java.nio.ByteBuffer; 
import java.util.ArrayList; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 

public class SimpleExample { 
public static void main(String args[]) { 
    SimpleExample example = new SimpleExample(); 
    //long maxReads = Long.parseLong(args[0]); 
    long maxReads = 10; 
    //String topic = args[1]; 
    String topic = "temperature"; 
    //int partition = Integer.parseInt(args[2]); 
    int partition =0; 
    List<String> seeds = new ArrayList<String>(); 
    //seeds.add(args[3]); 
    seeds.add("104.131.40.xxx"); 
    //int port = Integer.parseInt(args[4]); 
    int port =9092; 
    try { 
     example.run(maxReads, topic, partition, seeds, port); 
    } catch (Exception e) { 
     System.out.println("Oops:" + e); 
     e.printStackTrace(); 
    } 
} 

private List<String> m_replicaBrokers = new ArrayList<String>(); 

public SimpleExample() { 
    m_replicaBrokers = new ArrayList<String>(); 
} 

public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { 
    // find the meta data about the topic and partition we are interested in 
    // 
    PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); 
    if (metadata == null) { 
     System.out.println("Can't find metadata for Topic and Partition. Exiting"); 
     return; 
    } 
    if (metadata.leader() == null) { 
     System.out.println("Can't find Leader for Topic and Partition. Exiting"); 
     return; 
    } 
    String leadBroker = metadata.leader().host(); 
    String clientName = "Client_" + a_topic + "_" + a_partition; 

    SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 
    long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); 

    int numErrors = 0; 
    while (a_maxReads > 0) { 
     if (consumer == null) { 
      consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); 
     } 
     FetchRequest req = new FetchRequestBuilder() 
       .clientId(clientName) 
       .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka 
       .build(); 
     FetchResponse fetchResponse = consumer.fetch(req); 

     if (fetchResponse.hasError()) { 
      numErrors++; 
      // Something went wrong! 
      short code = fetchResponse.errorCode(a_topic, a_partition); 
      System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); 
      if (numErrors > 5) break; 
      if (code == ErrorMapping.OffsetOutOfRangeCode()) { 
       // We asked for an invalid offset. For simple case ask for the last element to reset 
       readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); 
       continue; 
      } 
      consumer.close(); 
      consumer = null; 
      leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); 
      continue; 
     } 
     numErrors = 0; 

     long numRead = 0; 
     for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { 
      long currentOffset = messageAndOffset.offset(); 
      if (currentOffset < readOffset) { 
       System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); 
       continue; 
      } 
      readOffset = messageAndOffset.nextOffset(); 
      ByteBuffer payload = messageAndOffset.message().payload(); 

      byte[] bytes = new byte[payload.limit()]; 
      payload.get(bytes); 
      System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); 
      numRead++; 
      a_maxReads--; 
     } 

     if (numRead == 0) { 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException ie) { 
      } 
     } 
    } 
    if (consumer != null) consumer.close(); 
} 

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, 
           long whichTime, String clientName) { 
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); 
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); 
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); 
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
      requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); 
    OffsetResponse response = consumer.getOffsetsBefore(request); 

    if (response.hasError()) { 
     System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); 
     return 0; 
    } 
    long[] offsets = response.offsets(topic, partition); 
    return offsets[0]; 
} 

private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { 
    for (int i = 0; i < 3; i++) { 
     boolean goToSleep = false; 
     PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); 
     if (metadata == null) { 
      goToSleep = true; 
     } else if (metadata.leader() == null) { 
      goToSleep = true; 
     } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { 
      // first time through if the leader hasn't changed give ZooKeeper a second to recover 
      // second time, assume the broker did recover before failover, or it was a non-Broker issue 
      // 
      goToSleep = true; 
     } else { 
      return metadata.leader().host(); 
     } 
     if (goToSleep) { 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException ie) { 
      } 
     } 
    } 
    System.out.println("Unable to find new leader after Broker failure. Exiting"); 
    throw new Exception("Unable to find new leader after Broker failure. Exiting"); 
} 

private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { 
    PartitionMetadata returnMetaData = null; 
    loop: 
    for (String seed : a_seedBrokers) { 
     SimpleConsumer consumer = null; 
     try { 
      consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); 
      List<String> topics = Collections.singletonList(a_topic); 
      TopicMetadataRequest req = new TopicMetadataRequest(topics); 
      kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); 

      List<TopicMetadata> metaData = resp.topicsMetadata(); 
      for (TopicMetadata item : metaData) { 
       for (PartitionMetadata part : item.partitionsMetadata()) { 
        if (part.partitionId() == a_partition) { 
         returnMetaData = part; 
         break loop; 
        } 
       } 
      } 
     } catch (Exception e) { 
      System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic 
        + ", " + a_partition + "] Reason: " + e); 
     } finally { 
      if (consumer != null) consumer.close(); 
     } 
    } 
    if (returnMetaData != null) { 
     m_replicaBrokers.clear(); 
     for (kafka.cluster.Broker replica : returnMetaData.replicas()) { 
      m_replicaBrokers.add(replica.host()); 
     } 
    } 
    return returnMetaData; 
} 
} 
+0

P.S. - 從此行拋出的錯誤: 'kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);' – 2014-08-28 21:37:39

+0

您是否可以使用'$ KAFKA_HJOME/bin/kafka-console-consumer.sh中存在的控制檯腳本來使用消息'? – user2720864 2014-08-29 05:42:59

+0

是的,但是這又是:本地(在這種情況下只有ZK是遠程的) – 2014-08-29 12:09:40

回答

2

您需要設置advertised.host.name,而不是host.name在kafka server.properties配置文件。