2016-09-18 22 views
1

我在測試spring-cloud-starter-stream-kafka。下面有錯誤。春雲流應用程序,調度員沒有訂戶通道'unknown.channel.name'

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) 
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) 
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) 
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43) 
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) 
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) 
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4$1.doWithRetry(KafkaMessageChannelBinder.java:607) 
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263) 
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:154) 
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.onMessage(KafkaMessageChannelBinder.java:604) 
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) 
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) 
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) 
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153) 
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
... 32 common frames omitted 

我StreamApplication.java

package de.codecentric; 

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.annotation.StreamListener; 
import org.springframework.context.annotation.Bean; 
import org.springframework.integration.annotation.InboundChannelAdapter; 
import org.springframework.integration.annotation.Poller; 
import org.springframework.integration.core.MessageSource; 
import org.springframework.integration.support.MessageBuilder; 

@SpringBootApplication 
@EnableBinding({PersonProcessor.class, LogProcessor.class}) 
public class StreamApplication { 

    public static void main(String[] args) { 
     SpringApplication.run(StreamApplication.class, args); 
    } 

    @StreamListener(LogProcessor.CHANNEL) 
    public void logEvent(EventLog el) { 
     System.out.println("Received event log: " + el.id); 
    } 

    @StreamListener(PersonProcessor.CHANNEL) 
    public void logPerson(Person p) { 
     System.out.println("Received person: " + p.name); 
    } 

    @Bean 
    @InboundChannelAdapter(value = PersonProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1")) 
    public MessageSource<Person> timerMessageSource() { 
     return() -> MessageBuilder.withPayload(new Person()).build(); 
    } 

    @Bean 
    @InboundChannelAdapter(value = LogProcessor.CHANNEL, poller = @Poller(fixedDelay = "3000", maxMessagesPerPoll = "1")) 
    public MessageSource<EventLog> logMessageSource() { 
     return() -> MessageBuilder.withPayload(new EventLog()).build(); 
    } 

    public static class EventLog { 
     private static int seq = 0; 
     public String id = seq++ + ""; 
    } 

    public static class Person { 
     private static int seq = 0; 
     public String name = "hi " + seq++; 
    } 
} 

LogProcessor.java

package de.codecentric; 

import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.cloud.stream.annotation.Output; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.SubscribableChannel; 


public interface LogProcessor { 
    String CHANNEL = "logs"; 

    @Output(LogProcessor.CHANNEL) 
    MessageChannel output(); 

    @Input(LogProcessor.CHANNEL) 
    SubscribableChannel input(); 
} 

PersonProcessor.java

package de.codecentric; 

import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.cloud.stream.annotation.Output; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.SubscribableChannel; 


public interface PersonProcessor { 
    String CHANNEL = "person"; 

    @Output(PersonProcessor.CHANNEL) 
    MessageChannel output(); 

    @Input(PersonProcessor.CHANNEL) 
    SubscribableChannel input(); 
} 

我還可以看到輸出:

接收人:喜0 收到的事件日誌:0 收到的事件日誌:4 接收人:喜4 收到的事件日誌:9 接收人:喜9

感謝。

+0

不知道這是否有幫助,但我有同樣的例外,我發現問題是在我的代碼中@RefreshScope註釋(我看到你沒有包括在你的代碼中......但只是如果你從發佈的片段中刪除,認爲它不相關):請參閱https://github.com/spring-cloud/spring-cloud-stream/issues/461 – chrx

回答

4

我不確定這是否是問題,但您的輸入和輸出通道需要不同的目的地名稱 - 例如,

CHANNELIN = personIn,CHANNELOUT = personOut

處理器不打算向自己發送消息;它旨在接收消息,處理消息並將結果發送到不同的目標。

處理器本身不生成消息 - 這是源的目的。

+1

只需添加上面所述的Gary:每個頻道都是單獨的bean和bean的名字是'@ Input/@ Output'註解的參數(並且默認爲方法的名稱),所以你的設置有效地讓兩個bean相互衝突。單獨的渠道必須有單獨的名稱(我們應該更好地像您一樣識別錯誤情況,而不是無法綁定)。但它們可以配置爲相同的「目的地」。 –

0

我有同樣的問題,我通過升級我的春季雲版本(從Camden SR7到Dalston SR4)來解決它。