2017-04-21 25 views
0

不斷獲取內容我有一個代碼here從表數據庫

package testcode; 

import java.sql.*; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 


public class ProducerClear2 { 

    public static String vardbserver; 
    public static String vardbuser; 
    public static String vardbpassword; 
    public static String vardbname; 

    public static void main(String[] args) { 
     vardbserver = "TestDBtoMQ"; 
     vardbuser = "postgresql"; 
     vardbpassword = "admin"; 

     ConnectionFactory factory = null; 
     javax.jms.Connection connection = null; 
     Session session = null; 
     Destination destination = null; 
     MessageProducer producer = null; 
     try { 
      factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); 
      connection = factory.createConnection(); 
      connection.start(); 
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      destination = session.createQueue("TestQueue"); 
      producer = session.createProducer(destination); 

      Class.forName("org.postgresql.Driver"); 
      System.out.println("----------------------------"); 

      try (Connection c = DriverManager.getConnection("jdbc:postgresql://localhost:5432/" + vardbserver, vardbuser, vardbpassword); 
       PreparedStatement stmt = c.prepareStatement("SELECT * FROM MESSAGES where xmin::varchar::bigint > ? and xmin::varchar::bigint < ? "); 
       PreparedStatement max = c.prepareStatement("select max(xmin::varchar::bigint) as txid from messages") 
      ) { 
       c.setAutoCommit(false); 

       Long previousTxId = 0L; 
       Long nextTxId = 0L; 

       while (true) { 
        stmt.clearParameters(); 

        try (ResultSet rs = max.executeQuery()) { 
         if (rs.next()) { 
          nextTxId = rs.getLong(1); 
         } 
        } 

        stmt.setLong(1, previousTxId); 
        stmt.setLong(2, nextTxId + 1); 
        try (ResultSet rs = stmt.executeQuery()) { 
         while (rs.next()) { 
          String message = rs.getString("MESSAGE"); 
          System.out.println("Message = " + message); 
          TextMessage mssg = session.createTextMessage(message); 
          System.out.println("Sent: " + mssg.getText()); 
          producer.send(mssg); 
         } 
         previousTxId = nextTxId; 
        } 
        Thread.sleep(5000); 
       } 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } catch (Exception e) { 
      System.err.println(e.getClass().getName() + ": " + e.getMessage()); 
     } finally { 
      if (session != null) { 
       try { 
        session.close(); 
       } catch (JMSException ex) { 
        // ignore 
       } 
      } 
      if (connection != null) { 
       try { 
        connection.close(); 
       } catch (JMSException ex) { 
        // ignore 
       } 
      } 
     } 
     System.out.println("----------------------------"); 
     System.out.println("Message sent successfully"); 
    } 
} 

基本上,應用工程,以獲取數據庫的表內的內容,並將其發送到ActiveMQ的。當表更新時,它將發送剛剛更新的內容(不發送已發送的過去)。但是這個代碼只適用於PostgreSQL

然後我打算創建一個「if」函數。所以我可以使用另一個數據庫來獲取數據(Oracle和MySQL)。

xmin是否仍適用於Oracle和MySQL?所以我只需要更改服務器的URL?或者我需要更改Oracle和MySQL的代碼?

+0

步驟1)旋轉起來的VM。第2步)安裝MySQL。步驟3)檢查查詢。 –

+2

'xmin :: varchar :: bigint'不會在Oracle或MySQL中工作...... – JohnHC

+0

xmin不是一個簡單的升序編號。這將打破。 –

回答

0

只要找到了答案,使用限制,保存它的每行限制到一個文件,並使用日期爲每一個日常的文件....

   if(vardbtype.equals("MYSQL")){ 
        Class.forName("com.mysql.jdbc.Driver"); 
        System.out.println("----------------------------"); 
        int limitrowmysql = 0; 
        LocalDate now = LocalDate.now(); 
        Path path = FileSystems.getDefault().getPath("C:\\Users\\NN\\Documents\\Test\\RowMYSQL\\RowIDMYSQL_" + now.format(DateTimeFormatter.ISO_LOCAL_DATE) + ".txt"); 
        if (Files.exists(path)) { 
         String latestRowIdFromFile = Files.lines(path).max((e1, e2) -> { 
         if (((String)e1).isEmpty() || ((String)e2).isEmpty()) { 
          return -1; 
         } 
         return new Long(e1).compareTo(new Long(e2)); 
         }).get(); // read latestRowId from file 
         if (latestRowIdFromFile != null && !latestRowIdFromFile.isEmpty()) { 
          limitrowmysql = Integer.valueOf(latestRowIdFromFile); 
         } 
        } else { 
         limitrowmysql = 0; 
        } 
        Connection c = DriverManager.getConnection("jdbc:mysql://localhost:3306/"+ vardbserver, vardbuser, vardbpassword); 
        while(true) { 
         Statement stmts = c.createStatement(); 
         int countrowmysql = 0; 
         String sql = ("SELECT * FROM "+ vardbname +" LIMIT "+ limitrowmysql +", 18446744073709551615"); 
         ResultSet rss = stmts.executeQuery(sql); 
         while(rss.next()) { 
          String message = rss.getString("MESSAGE"); 
          System.out.println("Message = " + message); 
          TextMessage mssg = session.createTextMessage(message); 
          System.out.println("Sent: " + mssg.getText()); 
          producer.send(mssg); 
          countrowmysql = countrowmysql + 1; 
         } 

         rss.close(); 
         stmts.close(); 
         Thread.sleep(batchperiod2); 

         limitrowmysql = limitrowmysql + countrowmysql; 
         Files.write(path, ("\n" + limitrowmysql).getBytes()); // write latestRowId to file 
        } 
       }