1

我想有一個異步消息傳遞在我的Java程序,所以第一步就應該不斷地監視某些表在DB的變化。當有新的傳入消息時,它應該顯示它。這應該是重複的過程,只要應用程序正在運行。如何實現異步處理

可我知道如何繼續其執行以下代碼,它有它的輪詢方法,它必須保持對自稱無限每6秒,也應該在數據庫中找到新傳入的消息。

這裏是代碼片段:

public class PollingSynchronizer implements Runnable { 

private Collection<KPIMessage> incomingMessages; 
private Connection dbConnection; 


/** 
* Constructor. Requires to provide a reference to the KA message queue 
* 
* @param incomingMessages reference to message queue 
* 
*/ 
    public PollingSynchronizer(Collection<KpiMessage> incomingMessages, Connection dbConnection) { 
    super(); 
    this.incomingMessages = incomingMessages; 
    this.dbConnection = dbConnection; 
} 

private int sequenceId; 

public int getSequenceId() { 
    return sequenceId; 
} 

public void setSequenceId(int sequenceId) { 
    this.sequenceId = sequenceId; 
} 



@Override 
/** 
* The method which runs Polling action and record the time at which it is done 
* 
*/ 
public void run() { 
    try { 


      incomingMessages.addAll(fullPoll()); 
      System.out.println("waiting 6 seconds"); 

      //perform this operation in a loop 
      Thread.sleep(6000); 

    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } catch (Exception e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
    Date currentDate = new Date(); 
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"); 
// System.out.println(sdf.format(currentDate) + " " + msg); 
} 

/** 
* Method which defines polling of the database and also count the number of Queries 
* @return 
* @throws Exception 
*/ 
public List<KpiMessage> fullPoll() throws Exception { 

// int sequenceID = 0; 
    Statement st = dbConnection.createStatement(); 

    ResultSet rs = st.executeQuery("select * from msg_new_to_bde where ACTION = 804 order by SEQ DESC"); 
     List<KpiMessage> pojoCol = new ArrayList<KpiMessage>(); 
     while (rs.next()) { 
      KpiMessage filedClass = convertRecordsetToPojo(rs); 
      pojoCol.add(filedClass); 
     } 

     return pojoCol; 
     } 

/** 
* Converts a provided record-set to a {@link KpiMessage}. 
* 
* The following attributes are copied from record-set to pojo: 
* 
* <ul> 
* <li>SEQ</li> 
* <li>TABLENAME</li> 
* <li>ENTRYTIME</li> 
* <li>STATUS</li> 
* </ul> 
* 
* @param rs 
*   the recordset to convert 
* @return the converted pojo class object 
* @throws SQLException 
*    if an sql error occurrs during processing of recordset 
*/ 
private KpiMessage convertRecordsetToPojo(ResultSet rs) throws SQLException { 

    KpiMessage msg = new KpiMessage(); 
    int sequence = rs.getInt("SEQ"); 
    msg.setSequence(sequence); 
    int action = rs.getInt("ACTION"); 
    msg.setAction(action); 
    String tablename = rs.getString("TABLENAME"); 
    msg.setTableName(tablename); 
    Timestamp entrytime = rs.getTimestamp("ENTRYTIME"); 
    Date entryTime = new Date(entrytime.getTime()); 
    msg.setEntryTime(entryTime); 
    Timestamp processingtime = rs.getTimestamp("PROCESSINGTIME"); 
    if (processingtime != null) { 
     Date processingTime = new Date(processingtime.getTime()); 
     msg.setProcessingTime(processingTime); 
    } 
    String keyInfo1 = rs.getString("KEYINFO1"); 
    msg.setKeyInfo1(keyInfo1); 
    String keyInfo2 = rs.getString("KEYINFO2"); 
    msg.setKeyInfo2(keyInfo2); 
    return msg; 
} 
} 

這裏序列ID在不斷爲新的傳入郵件到達時增加了表的唯一ID。

P.S:「各種要求:請給給了負分(反對)的理由讓我可以解釋我的問題清楚了。」

+1

我 - 我使用Quartz來創建一個重複投票的任務,我會使用JMS(HornetQ的更具體)來處理消息的一部分。在已經有堅如磐石的車輪可用的時候,我不會重新推出車輪。 – Gimby

+0

輪詢?你可以通過觸發器提升某種事件嗎? –

+0

@thanks Gimby可我知道如何使用石英的,這也是初學者水平的程序員,還如何調用使用異步地這個話題像一個不斷輪詢等proccess消息和更新它.... – Babu

回答

1

簡單的把它在一段時間(true)循環。

public void run() { 
    while(true){ 
     try { 


       incomingMessages.addAll(fullPoll()); 
       System.out.println("waiting 6 seconds"); 

       //perform this operation in a loop 
       Thread.sleep(6000); 

     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     Date currentDate = new Date(); 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"); 
    // System.out.println(sdf.format(currentDate) + " " + msg); 
    } 
} 

希望您已經將Runnable作爲新線程啓動。

您需要的數據庫中的一個字段,比如「LAST_UPDATE」新的和更新的消息。比你需要改變你的SQL語句來獲取新消息,如:「where last_update > $lastCheckedDate」,其中lastCheckedDate設置,每當你檢查新的消息。

也許你也想閱讀一些關於併發在Java中:http://docs.oracle.com/javase/tutorial/essential/concurrency/

+0

一試@ AdrianThat作品它反覆調用查詢,但如何在同一時間,我可以檢查新郵件,也更新新消息...? – Babu

0

投入while循環是一種方式,但我認爲這是更好地避免這樣的方法做(也有很多事情搞砸如交易等)。

如果你真的需要做這樣那樣的事情重複,可以考慮使用一個調度程序。 Spring 3.x有調度程序內置,或者你也可以使用Quartz。

更好的方法是避免這樣的投票。當數據更新時,是否可以在JMS隊列中放置消息,以便在JMS隊列中存在此類消息時,將調用您的邏輯(通過消息驅動的bean)? (只有一個可能的方式,有很多類似的方式做)

+0

我會嘗試使用石英,然後... – Babu

+0

我認爲使用像Quartz這樣的框架很大程度上取決於您的應用程序的範圍和大小。如果這是唯一的「工作」應用程序的需要,創建線程和投票是完全有效的。一些搜索還可以揭示Java Timer和TimerTask。也許這是比石英有點「slimer」:http://stackoverflow.com/questions/1453295/timer-timertask-versus-thread-sleep-in-java – Adrian

+0

@Adrian老實說使用石英已經是一個非常苗條的解決方案。鑑於OP實際上正在進行數據庫訪問,我不認爲我可以將他的應用視爲那些超級簡單的應用。而且,使用Quartz(或其他類型的調度程序)並不困難或笨重。我無法找到任何理由避免使用那些可靠的應用程序,並使我們的應用程序更易於開發。 –