2017-04-18 19 views
0

我需要從數據庫中每分鐘選擇一批消息(大約20條消息),並且需要同時處理它們。我正在使用EJB定時器服務(Scheduler)來每分鐘從數據庫獲取消息。帶EJB定時服務的Java多線程

基本上我需要每分鐘選擇20-30條消息,在處理完這些消息之後,我需要發送一些郵件。處理消息涉及的數據庫操作很少。

您能否建議我如何使用java.concurrent包中的executor服務框架以及這些郵件如何每分鐘提交一次?

+0

有很多教程來幫助ExecutorService。該方法還有其他一些考慮:1)如果處理20-30條消息花費的時間超過一分鐘,會發生什麼? 2)如果20-30條消息變爲50-100,會發生什麼? 3)每個消息是一個事務(當數據庫提交)? 4)處理消息的順序是否重要? 5)一個pub/sub架構會更合適嗎? –

+0

嗨安德魯,PFB的迴應。1)是的。處理所有這些消息可能需要不止一分鐘的時間。我需要每隔一分鐘挑選接下來的20-30條消息並處理它們。2)目前我們需要處理20-30條消息。如果批量增加,我們可以將這些消息放在某個隊列中?3)不,我們在單個事務中提交所有數據庫操作4)否。處理順序無關緊要,因爲每個消息都可以獨立處理 –

+0

@ SanketMurugkar的答案對你有幫助嗎? :)如果不是,你能指出什麼不起作用? – Sneh

回答

2

嗨,這裏是一個基本的例子,使用Java的ExecutorService,CountDownLatch和CompletableFuture。這個例子只是爲了向你指出正確的方向,絕不是完美的,它使用了很多Java8的東西(我假設你使用的是Java8)。此外,我不使用EJB計時器的東西,而是使用ScheduledExecutorService,但你可以很容易地交換他們我猜。

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.*; 
import java.util.stream.Collectors; 

public class BatchMessageProcessingExample { 

    private static final int BATCH_SIZE = 20; 

    //Having 20 here may not entirely benefit you. Chosing this number depends on a lot of stuff. 
    // Its usually better to go with total number of cores you have 
    private final ExecutorService pool = Executors.newFixedThreadPool(BATCH_SIZE); 

    private final ScheduledExecutorService databasePool = Executors.newScheduledThreadPool(1); 

    public void schedule() { 
     databasePool.scheduleWithFixedDelay(() -> runBatchProcess(), 0, 1, TimeUnit.MINUTES); //Schedule the database work to execute every minute 
    } 

    private void runBatchProcess() { 
     List<Message> taskFromDbFetch = getMessagesFromDb(); //Get stuff from the db 
     CountDownLatch countDownLatch = new CountDownLatch(taskFromDbFetch.size()); //Create a latch having same size as the list 

     List<Task> taskList = taskFromDbFetch.stream().map(x -> new Task(countDownLatch, x)).collect(Collectors.toList()); // Create tasks using the messages and the countdown latch 

     taskList.forEach(pool::execute); //Submit them all in pool 

     CompletableFuture.runAsync(() -> sendEmailAfterCompletion(countDownLatch)); //Send an email out from a separate thread 
    } 

    private void sendEmailAfterCompletion(CountDownLatch countDownLatch) { 
     try { 
      countDownLatch.await();//Await on the latch for the batch tasks to complete 
      sendEmail(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

    private void sendEmail() { 
     System.out.println("Sending out an email."); 
    } 

    private List<Message> getMessagesFromDb() { //Get your messages from db here 
     List<Message> messages = new ArrayList<>(); 

     for(int i = 0; i < BATCH_SIZE; i++) { 
      final int taskNumber = i; 
      messages.add(() -> System.out.println("I am a db message number " + taskNumber)); 
     } 

     return messages; 
    } 

    class Task implements Runnable { 

     private final CountDownLatch countDownLatch; 

     private final Message message; 

     public Task(CountDownLatch countDownLatch, Message message) { 
      this.countDownLatch = countDownLatch; 
      this.message = message; 
     } 

     @Override 
     public void run() { 
      message.process(); //Process the message 
      countDownLatch.countDown(); //Countdown the latch 
     } 
    } 

    interface Message { 
     void process(); 
    } 

    public static void main(String[] args) { 
     new BatchMessageProcessingExample().schedule(); 
    } 

}