2014-10-01 61 views
2

當我在尋找球衣sse的例子時,我發現球衣示例文件夾中有一個示例sse-item-store-webapp。這是非常簡單的應用程序,有一個輸入和一個按鈕。你輸入一些文字,點擊按鈕,其他人就會得到改變。如何過濾Jersey SSE中的事件?

@Path("items") 
public class ItemStoreResource { 

    private static final ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock(); 
    private static final LinkedList<String> itemStore = new LinkedList<String>(); 
    private static final SseBroadcaster broadcaster = new SseBroadcaster(); 

    private static volatile long reconnectDelay = 0; 

    @GET 
    @Produces(MediaType.TEXT_PLAIN) 
    public String listItems() { 
     try { 
      storeLock.readLock().lock(); 
      return itemStore.toString(); 
     } finally { 
      storeLock.readLock().unlock(); 
     } 
    } 

    @GET 
    @Path("events") 
    @Produces(SseFeature.SERVER_SENT_EVENTS) 
    public EventOutput itemEvents(@HeaderParam(SseFeature.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastEventId) { 
     final EventOutput eventOutput = new EventOutput(); 


     if (lastEventId >= 0) { 
      LOGGER.info("Received last event id :" + lastEventId); 

      // decide the reconnect handling strategy based on current reconnect delay value. 
      final long delay = reconnectDelay; 
      if (delay > 0) { 
       LOGGER.info("Non-zero reconnect delay [" + delay + "] - responding with HTTP 503."); 
       throw new ServiceUnavailableException(delay); 
      } else { 
       LOGGER.info("Zero reconnect delay - reconnecting."); 
       replayMissedEvents(lastEventId, eventOutput); 
      } 
     } 


     if (!broadcaster.add(eventOutput)) { 
      LOGGER.severe("!!! Unable to add new event output to the broadcaster !!!"); 
      // let's try to force a 5s delayed client reconnect attempt 
      throw new ServiceUnavailableException(5L); 
     } 

     return eventOutput; 
    } 

    private void replayMissedEvents(final int lastEventId, final EventOutput eventOutput) { 
     try { 
      storeLock.readLock().lock(); 
      final int firstUnreceived = lastEventId + 1; 
      final int missingCount = itemStore.size() - firstUnreceived; 
      if (missingCount > 0) { 
       LOGGER.info("Replaying events - starting with id " + firstUnreceived); 
       final ListIterator<String> it = itemStore.subList(firstUnreceived, itemStore.size()).listIterator(); 
       while (it.hasNext()) { 
        eventOutput.write(createItemEvent(it.nextIndex() + firstUnreceived, it.next())); 
       } 
      } else { 
       LOGGER.info("No events to replay."); 
      } 
     } catch (IOException ex) { 
      throw new InternalServerErrorException("Error replaying missed events", ex); 
     } finally { 
      storeLock.readLock().unlock(); 
     } 
    } 

    @POST 
    public void addItem(@FormParam("name") String name) { 
     // Ignore if the request was sent without name parameter. 
     if (name == null) { 
      return; 
     } 

     final int eventId; 
     try { 
      storeLock.writeLock().lock(); 
      eventId = itemStore.size(); 
      itemStore.add(name); 

      // Broadcasting an un-named event with the name of the newly added item in data 
      broadcaster.broadcast(createItemEvent(eventId, name)); 
      // Broadcasting a named "size" event with the current size of the items collection in data 
      broadcaster.broadcast(new OutboundEvent.Builder().name("size").data(Integer.class, eventId + 1).build()); 

     } finally { 
      storeLock.writeLock().unlock(); 
     } 
    } 

    private OutboundEvent createItemEvent(final int eventId, final String name) { 
     Logger.getLogger(ItemStoreResource.class.getName()).info("Creating event id [" + eventId + "] name [" + name + "]"); 
     return new OutboundEvent.Builder().id("" + eventId).data(String.class, name).build(); 
    } 
} 

舉例來說,如果我有一個聊天室,我不知道如何實現,使用SSE becouse每一個客戶端連接到/items/events如果有人張貼新的消息,一些聊天broadcaster將會廣播這個消息給所有簽署事件,但我想只爲一些聊天廣播事件。

誰與澤西SSE工作,你可以建議如何實施?

回答

3

嘗試使用smth像聊天室ID的地圖與SseBroadcast對象,然後您可以訂閱從某個房間到廣播電臺的所有用戶。您可以將其用於tet-a-tet對話或團隊對話。

樣品下面:

private static final Map<Long, SseBroadcaster> ROOM_SSE_BROADCASTER = new ConcurrentHashMap<>(); 

@GET 
@Path("/updatestate/{roomId}/{userId}") 
@Produces(SseFeature.SERVER_SENT_EVENTS) 
public EventOutput updateState(@PathParam("roomId") Long roomId, @PathParam("userId") Long userId) { 
    EventOutput eo = new EventOutput(); 
    ROOM_SSE_BROADCASTER.get(roomId).add(eo); 

    return eo; 
} 

public static void updateRoom(Long roomId) { 
    ROOM_SSE_BROADCASTER.get(roomId).broadcast(buildEvent()); 
} 

public static void registerRoom(Long roomId) { 
    ROOM_SSE_BROADCASTER.put(roomId, new SseBroadcaster()); 
} 

private static OutboundEvent buildEvent() { 
    OutboundEvent.Builder builder = new OutboundEvent.Builder(); 
    OutboundEvent event = builder.data(String.class, "update").build(); 

    return event; 
}