3

我開始使用pubsub模擬器來測試我的基本實現,並在嘗試創建新主題時遇到了問題。在pubsub模擬器上創建主題

我的模擬器監聽在localhost:8085,如果我通過API創建主題

PUT http://localhost:8085/v1/projects/testproject/topics/test 

一切工作正常和話題被創建。 但是,如果我運行下面的代碼片段沒有按預期工作,沒有話題被創建:

TopicName topicName = TopicName.create("testproject", "test"); 
    ChannelProvider channelProvider = 
      TopicAdminSettings.defaultChannelProviderBuilder() 
       .setEndpoint("localhost:8085") 
       .setCredentialsProvider(
         FixedCredentialsProvider.create(NoCredentials.getInstance())) 
       .build(); 
    TopicAdminClient topicClient = TopicAdminClient.create(
      TopicAdminSettings.defaultBuilder().setChannelProvider(channelProvider).build()); 
     topicClient.createTopic(topicName); 

而這個運行模擬器日誌

[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete 
[pubsub] INFORMATION: Adding handler(s) to newly registered Channel. 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead 
[pubsub] INFORMATION: Detected non-HTTP/2 connection. 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.netty.NotFoundHandler handleRequest 
[pubsub] INFORMATION: Unknown request URI: /bad-request 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete 
[pubsub] INFORMATION: Adding handler(s) to newly registered Channel. 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead 
[pubsub] INFORMATION: Detected non-HTTP/2 connection. 
[pubsub] Apr 27, 2017 1:10:47 PM io.gapi.emulators.netty.NotFoundHandler handleRequest 
[pubsub] INFORMATION: Unknown request URI: /bad-request 

...  

[pubsub] Apr 27, 2017 1:10:49 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete 
[pubsub] INFORMATION: Adding handler(s) to newly registered Channel. 
[pubsub] Apr 27, 2017 1:10:49 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead 
[pubsub] INFORMATION: Detected non-HTTP/2 connection. 

我失去了我的ChannelProvider的東西嗎?或者我沒有正確配置我的TopicAdminClient?我沒有看到什麼錯誤,因爲我用 this as reference

也許有人可以幫我解決這個問題。

回答

2

用於與模擬器通信的通道需要將negotiationType屬性設置爲NegotiationType.PLAINTEXT。這意味着您需要創建一個自定義ChannelProvider。像下面這樣的應該工作:

public class PlainTextChannelProvider implements ChannelProvider { 
    @Override 
    public boolean shouldAutoClose() { 
    return false; 
    } 

    @Override 
    public boolean needsExecutor() { 
    return false; 
    } 

    @Override 
    public ManagedChannel getChannel() throws IOException { 
    return NettyChannelBuilder.forAddress("localhost", 8085) 
     .negotiationType(NegotiationType.PLAINTEXT) 
     .build(); 
    } 

    @Override 
    public ManagedChannel getChannel(Executor executor) throws IOException { 
    return getChannel(); 
    } 
} 
+0

謝謝你,創建主題和發佈消息工作。我是否必須爲Subscriber.Listener實現類似的東西?我爲Subscriber設置了相同的channelProvider,但是在調用subscriber.stopAsync()時,它總是拋出一個java.util.concurrent.RejectedExecutionException,並且(它似乎是隨機的)不會拉取消息,我在這裏丟失了什麼? –

+0

您也需要爲訂閱服務器使用相同類型的ChannelProvider。很難說是什麼導致了RejectedExecutionException。我認爲這不是特定於訂閱者或模擬器的問題,但可能與您的代碼有關的訂閱,您使用的任何執行程序或應用程序中對象的生存期有關。 –

+0

我打開另一個問題,所以我可以給更詳細的描述[鏈接](http://stackoverflow.com/questions/43786716/subscriber-stopasync-results-in-rejectedexecutionexception) –