1

我已經創建的樣本卡夫卡Streams應用程序從教程:卡夫卡流不讀輸入話題

public static void main(String[] args) throws Exception { 
    Logger log = Logger.getLogger("Name"); 

    Properties props = new Properties(); 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordprint"); 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092"); 
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 

    final KStreamBuilder builder = new KStreamBuilder(); 
    builder.stream("onecon_postgres").print(); 

    final KafkaStreams streams = new KafkaStreams(builder, props); 
    final CountDownLatch latch = new CountDownLatch(1); 

    // attach shutdown handler to catch control-c 
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { 
     @Override 
     public void run() { 
      streams.close(); 
      latch.countDown(); 
     } 
    }); 

    try { 
     streams.start(); 
     log.info("After Start"); 
     latch.await(); 
    } catch (Throwable e) { 
     System.exit(1); 
    } 
    System.exit(0); 
    } 

不幸的應用程序不讀取輸入流。我有一個來自PostgreSQL的JDBC源連接器,它可以很好地處理來自一個數據庫的數據流(我可以在本主題內的Kafka Connect UI數據中看到)。

我已經是即使我已經在在屬性IP改變IP BOOTSTRAP_SERVERS_CONFIG問題是本地主機我不知道爲什麼。

[main] INFO org.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
    application.id = streams-linesplit 
    application.server = 
    **bootstrap.servers = [localhost:9092]** 
    buffered.records.per.partition = 1000 
    cache.max.bytes.buffering = 10485760 
    client.id = 
    commit.interval.ms = 30000 
    connections.max.idle.ms = 540000 
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde 
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp 
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde 
    key.serde = null 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.recording.level = INFO 
    metrics.sample.window.ms = 30000 
    num.standby.replicas = 0 
    num.stream.threads = 1 
    partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper 
    poll.ms = 100 
    processing.guarantee = at_least_once 
    receive.buffer.bytes = 32768 
    reconnect.backoff.max.ms = 1000 
    reconnect.backoff.ms = 50 
    replication.factor = 1 
    request.timeout.ms = 40000 
    retry.backoff.ms = 100 
    rocksdb.config.setter = null 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    state.cleanup.delay.ms = 600000 
    state.dir = /tmp/kafka-streams 
    timestamp.extractor = null 
    value.serde = null 
    windowstore.changelog.additional.retention.ms = 86400000 
    zookeeper.connect = 

爲了克服這個問題我已經使用netsh的轉發流量,但我不能看到這個應用程序使用我的信息流。

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=127.0.0.1 connectport=9092 connectaddress=192.168.99.100 
+0

您是否嘗試調試到您的應用程序?在StreamsThread構造函數中設置一個斷點以查看傳入的配置。同時檢查「StreamsConfig」在解析你的配置時是否做了正確的事情。 –

回答

1

不幸的應用程序不讀取輸入流。

您的Kafka Streams應用程序與您的Kafka經紀人之間似乎存在聯網問題。 「卡夫卡溪不起作用」是不太可能的。

而且,很難幫你,沒有你提供更多信息:

  • 貴卡夫卡經紀人使用什麼樣的卡夫卡版本?
  • 您的應用程序使用什麼Kafka(Streams)版本?
  • 哪個操作系統?
  • 什麼是網絡設置?
    • 運行應用程序的機器的IP地址。
    • 在哪個IP +端口上是您的Kafka經紀人(或經紀人)監聽新連接?是192.168.99.100:9092
  • 你在應用程序的日誌中看到了什麼?你看到ERRORWARN日誌消息?

我的問題是,即使我已經在IP屬性中BOOTSTRAP_SERVERS_CONFIG改變IP爲localhost我不知道爲什麼。

我不明白 - 你爲什麼認爲改變BOOTSTRAP_SERVERS_CONFIGlocalhost:9092會解決你最初的問題?我明白卡夫卡經紀人實際上在192.168.99.100:9092上收聽?

爲了克服這一點,我用netsh轉發流量,但我看不到這個應用程序消耗我的流。

端口轉發很可能沒有幫助。在不更新卡夫卡代理的配置的情況下,代理默認只會在其「真實」IP +端口上進行通信。稍微簡化:配置爲在192.168.99.100:9092上偵聽的代理將不響應localhost:9092請求,即使您在運行Kafka Streams應用程序的計算機上從localhost:9092 -> 192.168.99.100:9092執行端口轉發,您的Kafka Streams應用程序也會發送該請求。

希望這會有所幫助!