0
package testcode;
import java.sql.*;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerClear2 {
public static String vardbserver;
public static String vardbuser;
public static String vardbpassword;
public static String vardbname;
public static void main(String[] args) {
vardbserver = "TestDBtoMQ";
vardbuser = "postgresql";
vardbpassword = "admin";
ConnectionFactory factory = null;
javax.jms.Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;
try {
factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("TestQueue");
producer = session.createProducer(destination);
Class.forName("org.postgresql.Driver");
System.out.println("----------------------------");
try (Connection c = DriverManager.getConnection("jdbc:postgresql://localhost:5432/" + vardbserver, vardbuser, vardbpassword);
PreparedStatement stmt = c.prepareStatement("SELECT * FROM MESSAGES where xmin::varchar::bigint > ? and xmin::varchar::bigint < ? ");
PreparedStatement max = c.prepareStatement("select max(xmin::varchar::bigint) as txid from messages")
) {
c.setAutoCommit(false);
Long previousTxId = 0L;
Long nextTxId = 0L;
while (true) {
stmt.clearParameters();
try (ResultSet rs = max.executeQuery()) {
if (rs.next()) {
nextTxId = rs.getLong(1);
}
}
stmt.setLong(1, previousTxId);
stmt.setLong(2, nextTxId + 1);
try (ResultSet rs = stmt.executeQuery()) {
while (rs.next()) {
String message = rs.getString("MESSAGE");
System.out.println("Message = " + message);
TextMessage mssg = session.createTextMessage(message);
System.out.println("Sent: " + mssg.getText());
producer.send(mssg);
}
previousTxId = nextTxId;
}
Thread.sleep(5000);
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (Exception e) {
System.err.println(e.getClass().getName() + ": " + e.getMessage());
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException ex) {
// ignore
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {
// ignore
}
}
}
System.out.println("----------------------------");
System.out.println("Message sent successfully");
}
}
基本上,應用工程,以獲取數據庫的表內的內容,並將其發送到ActiveMQ的。當表更新時,它將發送剛剛更新的內容(不發送已發送的過去)。但是這個代碼只適用於PostgreSQL
然後我打算創建一個「if」函數。所以我可以使用另一個數據庫來獲取數據(Oracle和MySQL)。
xmin是否仍適用於Oracle和MySQL?所以我只需要更改服務器的URL?或者我需要更改Oracle和MySQL的代碼?
步驟1)旋轉起來的VM。第2步)安裝MySQL。步驟3)檢查查詢。 –
'xmin :: varchar :: bigint'不會在Oracle或MySQL中工作...... – JohnHC
xmin不是一個簡單的升序編號。這將打破。 –