2014-04-04 42 views
0

我想使用Spring SubscribableChannel實現單個生產者..多個消費者問題。我無法解決這個問題。使用Spring集成實現消費者和生產者系統

例如我有一個用戶作爲製片人和他的朋友作爲消費者。所以當製作人用戶做了一些事情之後,就必須發佈給他的朋友。

真正的問題是生產者和消費者之間的關係是動態的,因爲在任何時候我都可以對用戶不友好,這意味着它不應該再聽我的活動。所以我想要一些java配置,我可以訂閱和取消訂閱生產者的消費者還必須有東西,可以從生產者分離消費者,以防生產者不再存在...

我在設計該頻道因爲會有number of channel equal to number of users.另外我不確定如何設計Spring Integration附帶的MessageHandler。

有沒有一些例子指向這個方向。

我做了迄今爲止現在是我有一個配置豆

@Configuration 
public class ChannelConfig { 

    @Bean 
    public SubscribableChannel subscribeChannel() { 
     PublishSubscribeChannel channel = new PublishSubscribeChannel(getAsyncExecutor()); 
     return channel; 
    } 

    public Executor getAsyncExecutor() { 
     ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
     executor.setCorePoolSize(2); 
     executor.setMaxPoolSize(50); 
     executor.setQueueCapacity(10000); 
     executor.setThreadNamePrefix("ChannelExecutor-"); 
     executor.initialize(); 
     return executor; 
    } 
} 

,我會注入,但比這將是唯一的唯一通道。我想爲每個用戶提供一個頻道。

回答

0

我做這樣的事情

public class ChannelCreator { 

    private Map<String , SubscribableChannel> channels; 
    ThreadPoolTaskExecutor executor; 

    public ChannelCreator() { 
     channels = new ConcurrentHashMap<String, SubscribableChannel>(); 
     prepareAsyncExecutor(); 
    } 

    public void createChannel(String login) { 
     PublishSubscribeChannel channel = new PublishSubscribeChannel(executor); 
     channels.put(login, channel); 
    } 

    public SubscribableChannel getChannel(String login){ 
     SubscribableChannel channel = channels.get(login); 
     if(channel == null){ 
      channel = new PublishSubscribeChannel(executor); 
      channels.put(login, channel); 
     } 
     return channel; 
    } 

    public void removeChannel(String login){ 
     channels.remove(login); 
    } 

    public Executor prepareAsyncExecutor() { 
     executor = new ThreadPoolTaskExecutor(); 
     executor.setCorePoolSize(2); 
     executor.setMaxPoolSize(50); 
     executor.setQueueCapacity(10000); 
     executor.setThreadNamePrefix("ChannelExecutor-"); 
     executor.initialize(); 
     return executor; 
    } 
} 

@Configuration 
public class ChannelConfig { 

    @Bean 
    public ChannelCreator channelCreator() { 
     ChannelCreator creator = new ChannelCreator(); 
     return creator; 
    } 
} 
+0

您可能想考慮共享一個任務執行程序,而不是爲每個通道創建一個新任務。 –

+0

已更新。這將是好的權利@加里 –

+0

哦不,不會......沒關係..將很快更新與正確的.. –

0

爲什麼你想爲每個用戶單獨的頻道?這是發佈/訂閱頻道的重點;只需爲該頻道的每個用戶subscribe()新增MessageHandler即可。然後,發送到該頻道的任何消息都將分發給每個消費者。

編輯:(根據下面的評論)。

在這種情況下,不是將聲明聲明爲@Bean,而是簡單地以編程方式將其實例化(或將其範圍設置爲prototype並獲得新實例)並訂閱他的「朋友」。

+0

因爲例如我們有3個用戶A,B,C以及B和C是A.的C朋友不是朋友B.如果C發佈了一些東西,它也會到達B,我不想要...... –

+0

編輯了答案。 –

+0

是的,我也這麼做。發佈... –