1
爪哇7,Glassfish的3.1.2Anynchronous多線程消息處理
輸入像信息:
public class Message {
private final String contextId;
private final String name;
...
}
此消息應該由工人進行處理。對於具有新contextId 的消息,應該啓動一個新線程。對於已存在的contextId,使用已存在的線程。已經存在的線程應該使用相同的contextId順序來處理消息。
Hier我的最後,不工作,工人版本。
@Stateless
@LocalBean
public class Worker {
private static final Map<String, Future<Result>> MAP = new ConcurrentHashMap<>();
@EJB
private Worker worker;
@Asynchronous
public void work(Message message) {
System.out.println(Thread.currentThread().getName() + ": A message: " + message.toString()+ " should be processed");
Future<Result> sameContext = MAP.get(message.getContextId());
if (sameContext != null) {
waitForSameContextId(message, sameContext);
}
MAP.put(message.getContextId(), worker.doWork(message));
}
@Asynchronous
public Future<Result> doWork(Message message) {
System.out.println(Thread.currentThread().getName() + ": Processing the message: " + message.toString());
AsyncResult<Result> asyncResult = new AsyncResult<>(new Result());
try {
Thread.sleep(15000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
MAP.remove(message.getContextId()); //We are done removing
System.out.println(Thread.currentThread().getName() + ": The message: " + message.toString()+ " was processed");
return asyncResult;
}
private void waitForSameContextId(Message message, Future<Result> result) {
try {
System.out.println(Thread.currentThread().getName() + ": message with id: " + message.toString()
+ " is already in work, blocking Thread until it is finished");
Result get = result.get(); //blocks thread
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
// Do some failure management
}
}
測試類:
public class MessageReceiver {
private static String ID = "#########";
@EJB
private Worker worker;
public void receive(Message message) {
worker.work(message);
}
@PostConstruct
void init() {
receive(new Message(ID, "message 1"));
receive(new Message(ID, "message 2"));
receive(new Message(ID, "message 3"));
...
}