2
當我在尋找球衣sse的例子時,我發現球衣示例文件夾中有一個示例sse-item-store-webapp。這是非常簡單的應用程序,有一個輸入和一個按鈕。你輸入一些文字,點擊按鈕,其他人就會得到改變。如何過濾Jersey SSE中的事件?
@Path("items")
public class ItemStoreResource {
private static final ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock();
private static final LinkedList<String> itemStore = new LinkedList<String>();
private static final SseBroadcaster broadcaster = new SseBroadcaster();
private static volatile long reconnectDelay = 0;
@GET
@Produces(MediaType.TEXT_PLAIN)
public String listItems() {
try {
storeLock.readLock().lock();
return itemStore.toString();
} finally {
storeLock.readLock().unlock();
}
}
@GET
@Path("events")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput itemEvents(@HeaderParam(SseFeature.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastEventId) {
final EventOutput eventOutput = new EventOutput();
if (lastEventId >= 0) {
LOGGER.info("Received last event id :" + lastEventId);
// decide the reconnect handling strategy based on current reconnect delay value.
final long delay = reconnectDelay;
if (delay > 0) {
LOGGER.info("Non-zero reconnect delay [" + delay + "] - responding with HTTP 503.");
throw new ServiceUnavailableException(delay);
} else {
LOGGER.info("Zero reconnect delay - reconnecting.");
replayMissedEvents(lastEventId, eventOutput);
}
}
if (!broadcaster.add(eventOutput)) {
LOGGER.severe("!!! Unable to add new event output to the broadcaster !!!");
// let's try to force a 5s delayed client reconnect attempt
throw new ServiceUnavailableException(5L);
}
return eventOutput;
}
private void replayMissedEvents(final int lastEventId, final EventOutput eventOutput) {
try {
storeLock.readLock().lock();
final int firstUnreceived = lastEventId + 1;
final int missingCount = itemStore.size() - firstUnreceived;
if (missingCount > 0) {
LOGGER.info("Replaying events - starting with id " + firstUnreceived);
final ListIterator<String> it = itemStore.subList(firstUnreceived, itemStore.size()).listIterator();
while (it.hasNext()) {
eventOutput.write(createItemEvent(it.nextIndex() + firstUnreceived, it.next()));
}
} else {
LOGGER.info("No events to replay.");
}
} catch (IOException ex) {
throw new InternalServerErrorException("Error replaying missed events", ex);
} finally {
storeLock.readLock().unlock();
}
}
@POST
public void addItem(@FormParam("name") String name) {
// Ignore if the request was sent without name parameter.
if (name == null) {
return;
}
final int eventId;
try {
storeLock.writeLock().lock();
eventId = itemStore.size();
itemStore.add(name);
// Broadcasting an un-named event with the name of the newly added item in data
broadcaster.broadcast(createItemEvent(eventId, name));
// Broadcasting a named "size" event with the current size of the items collection in data
broadcaster.broadcast(new OutboundEvent.Builder().name("size").data(Integer.class, eventId + 1).build());
} finally {
storeLock.writeLock().unlock();
}
}
private OutboundEvent createItemEvent(final int eventId, final String name) {
Logger.getLogger(ItemStoreResource.class.getName()).info("Creating event id [" + eventId + "] name [" + name + "]");
return new OutboundEvent.Builder().id("" + eventId).data(String.class, name).build();
}
}
舉例來說,如果我有一個聊天室,我不知道如何實現,使用SSE becouse每一個客戶端連接到/items/events
如果有人張貼新的消息,一些聊天broadcaster
將會廣播這個消息給所有簽署事件,但我想只爲一些聊天廣播事件。
誰與澤西SSE工作,你可以建議如何實施?