2014-05-14 44 views
1

我的mysql表中有大約5000萬條記錄。當我用java檢索它需要20多個小時。最近我在處理500,000條記錄後面臨通信鏈路故障錯誤。 (Question) 有人可以告訴我,如何使用java中的多線程訪問這些記錄。檢索每條記錄後,我需要執行一些預處理,然後將結果存儲在文本文件中。謝謝。如何使用Java多線程檢索mysql數據

+1

非常棘手的問題笑,從一堆Java線程我猜的創建一堆connnections的MySQL的開始,讓每一個做自己的工作'LIMIT 10000 ,50000'' LIMIT 50001,100000'等。 – SSpoke

+0

謝謝。我之前從未使用過多線程,所以對我來說很困難。我會試試這個。請再幫助我一件事。我是否必須在run()函數中編寫數據庫連接代碼和預處理部分,以便每次創建新連接。 – user2992438

+0

你可以在線程之外創建新的連接,它沒有任何工作,只是一個連接,並將它作爲參考傳遞給'Thread',但是有什麼工作(因爲它有很多工作而凍結的東西),你必須在不同的線程中運行。儘量不要爲每個人做這項工作,這樣他們都可以運行。也不要過多的連接,因爲他們可能會超時或斷開任何事情可能發生,你可能會添加一些重新連接代碼到'run()'',因此它可能會在run()中創建一個新的連接'爲了安全起見。 – SSpoke

回答

0

SQLManager.java

package twcore.core.sql; 

import java.io.File; 
import java.sql.Connection; 
import java.sql.PreparedStatement; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.sql.Statement; 
import java.util.HashMap; 
import java.util.Iterator; 

import twcore.core.BotSettings; 
import twcore.core.SubspaceBot; 
import twcore.core.events.; 
import twcore.core.util.Tools; 


/** 
* Thread-based main class for the core's SQL database functionality. 
* Initializes and manages SQL connection pools and queries, and runs 
* background queries on a semi-regular basis. 
* <p> 
* 
* Choosing a standard query vs. background/high-priority background query: 
* <p> 
* <b>Standard/foreground</b> - Runs exactly when needed. Does not wait in 
* a queue to execute (unless connections are low). Does not require a unique 
* identifier or special event handling. However, a standard query will 
* pause the program thread until the results are returned. For large queries 
* and bad connections this may result in long delays and unresponsiveness. 
* <p> 
* <b>Background</b>    - Runs as a separate program thread. Waits in a 
* queue and can be delayed by other queries waiting to execute. Requires the 
* bot to catch an SQLResultEvent and use a unique identifier to refer to the 
* query. As a separate thread, after the background query is run, the bot 
* continues execution as normal, causing no delays. Ideal when multiple users 
* may need to access large amounts of SQL data from the same bot at the same 
* time without compromising responsiveness to the bot for others. However, 
* their individual result sets may return more slowly than with a standard query. 
* <p> 
* <b>High-priority background</b> - Same as a background query, but added to the 
* head of the queue. Combines the versatility of a background queue with the 
* foreground's ability to return the result set almost instantly. 
* <p> 
* <b><u>IMPORTANT NOTE</b></u> 
* For every query you MUST run BotAction's SQLClose(), or manually run the close() 
* method on both the ResultSet and the Statement that created it. If you do not, 
* memory leaks may occur! 
* 
* TODO: 
* Setup Apache Commons DBCP to remove CommunicatonsExceptions: 
* validationQuery="SELECT 1" 
* testOnBorrow="true" 
*/ 
public class SQLManager extends Thread { 
    BotSettings sqlcfg;       // Reference to SQL config file 
    HashMap  <String,SQLConnectionPool>pools; // Connection pool storage 
    HashMap  <String,SQLBackgroundQueue>queues; // Background queue storage 
    boolean  operational = true;    // Status of SQL system 

    final static int THREAD_SLEEP_TIME = 30 * Tools.TimeInMillis.SECOND; 
                // Length of time for thread to 
                // sleep, in ms, after all 
                // background queries are done. 

    final static int STALE_TIME = 15 * Tools.TimeInMillis.MINUTE; 
                // Time in ms between stale conn checks. 
    private long nextStaleCheck = 0; 

    /** 
    * Initialize SQL functionality with the information given in the specified 
    * configuration file. 
    * @param configFile Properly formatted CFG file containing SQL system data 
    */ 
    public SQLManager(File configFile) { 
     super("SQLManager"); 
     pools = new HashMap<String,SQLConnectionPool>(); 
     queues = new HashMap<String,SQLBackgroundQueue>(); 
     sqlcfg = new BotSettings(configFile); 
     System.out.println("=== SQL Initialization ==="); 
     try{ 
      for(int i = 1; i <= sqlcfg.getInt("ConnectionCount"); i++){ 
       String name = sqlcfg.getString("Name" + i); 

       // TODO: Migrate to a DataSource object and pass that to SQLConnectionPool 
       // (com.mysql.jdbc.jdbc2.optional.MysqlDataSource) 
       String dburl = "jdbc:mysql://" + sqlcfg.getString("Server" + i) 
       + ":" + sqlcfg.getInt("Port" + i) + "/" 
       + sqlcfg.getString("Database" + i) + "?user=" 
       + sqlcfg.getString("Login" + i) + "&password=" 
       + sqlcfg.getString("Password" + i) + 

       // Available properties (and info about them) 
       // http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-configuration-properties.html 
       "&allowMultiQueries=true" + 
       "&maxReconnects=2147483647" + 
       "&initialTimeout=1" + 
       "&logSlowQueries=false" + 
       "&interactiveClient=true" + 
       "&autoReconnect=true" +   // Auto-Reconnect not recommended 
       "&autoReconnectForPools=true"; 

       // TODO: Better pooling solutions now exist that can be configured to our needs. 
       SQLConnectionPool db = new SQLConnectionPool(name, dburl, 
       sqlcfg.getInt("MinPoolSize" + i), 
       sqlcfg.getInt("MaxPoolSize" + i), 
       sqlcfg.getInt("WaitIfBusy" + i), 
       sqlcfg.getString("Driver" + i) 
       ); 
       pools.put(name, db); 
       queues.put(name, new SQLBackgroundQueue()); 
      } 
      Tools.printLog("SQL Connection Pools initialized successfully."); 
      for(Iterator<SQLConnectionPool> i = pools.values().iterator(); i.hasNext();){ 
       Tools.printLog(i.next().toString()); 
      } 
     } catch(SQLException e){ 
      Tools.printLog("Failed to load SQL Connection Pools. Driver missing?"); 
      operational = false; 
      Tools.printLog(e.getMessage()); 
     } 
     if(operational){ 
      start(); 
      Tools.printLog("SQL Background Queues initialized."); 
     } else { 
      Tools.printLog("SQL Background Queues NOT initialized."); 
     } 
     System.out.println(); 
     nextStaleCheck = System.currentTimeMillis() + STALE_TIME; 
    } 

    /** 
    * Adds a regular background query to the end of the queue. If there are no 
    * queued queries ahead of it, the background query will be executed nearly 
    * as quickly as a regularly executed query, but without delaying the bot's 
    * thread to retrieve the result set. The query is instead run in a new 
    * thread and returned to the bot via an SQLResultEvent, and is identified by 
    * a unique key (<CODE>identifier</CODE>). 
    * @param connName Name of the connection as defined in sql.cfg 
    * @param identifier The unique identifier for this query 
    * @param query A properly-formed SQL query 
    * @param bot The bot requesting the query (if unsure, use <b>this</b>) 
    */ 
    public void queryBackground(String connName, String identifier, 
    String query, SubspaceBot bot){ 
     if(!operational){ 
      Tools.printLog("Unable to process query: " + query); 
     } else { 
      if(!pools.containsKey(connName)) { 
       Tools.printLog("Invalid connection name supplied: '" + connName + "'"); 
       return; 
      } 
      SQLBackgroundQueue queue = queues.get(connName); 
      queue.addQuery(new SQLResultEvent(query, identifier, bot)); 
      interrupt(); 
     } 
    } 

    /** 
    * Adds a background query to the front of the queue. A high-priority 
    * background query will be executed nearly as quickly as a regularly 
    * executed query, but without delaying the bot's thread to retrieve the 
    * results. The query is instead run in a new thread and returned to the bot 
    * via an SQLResultEvent, and is identified by a unique key (<CODE>identifier</CODE>). 
    * @param connName Name of the connection as defined in sql.cfg 
    * @param identifier The unique identifier for this query 
    * @param query A properly-formed SQL query 
    * @param bot The bot requesting the query (if unsure, use <b>this</b>) 
    */ 
    public void queryBackgroundHighPriority(String connName, String identifier, 
    String query, SubspaceBot bot){ 
     if(!operational){ 
      Tools.printLog("Unable to process background high priority query: " + query); 
     } else { 
      if(!pools.containsKey(connName)) { 
       Tools.printLog("Invalid connection name supplied: '" + connName + "'"); 
       return; 
      } 
      SQLBackgroundQueue queue = queues.get(connName); 
      queue.addHighPriority(new SQLResultEvent(query, identifier, bot)); 
      interrupt(); 
     } 
    } 

    /** 
    * Runs a regular SQL query using the specified database connection. Your 
    * bot's thread will not continue while the query is in effect. Use a 
    * background query if you wish for the thread to continue while the query 
    * is executed. 
    * @param connectionName Name of the connection as defined in sql.cfg 
    * @param query A properly-formed SQL query 
    * @return The result set of the query (MAY be null) 
    * @throws SQLException 
    */ 
    public ResultSet query(String connectionName, String query) throws SQLException { 
     if(!operational){ 
      Tools.printLog("Unable to process query: " + query); 
      return null; 
     } else { 
      if(!pools.containsKey(connectionName)) { 
       Tools.printLog("Invalid connection name supplied: '" + connectionName + "'"); 
       return null; 
      } 
      return pools.get(connectionName).query(query); 
     } 
    } 


    /** 
    * Creates a PreparedStatement. 
    * Gets a Connection from the specified SQLConnectionPool (specified by the connectionName) 
    * and creates a PreparedStatement object using the specified query. 
    * Note that this sets the connection to "busy" in the SQLConnectionPool so it isn't used by other processes. 
    * 
    * You need to free it when the bot doesn't use the PreparedStatement anymore or this will be a Connection-leak !! 
    * 
    * @param connectionName Name of the connection as defined in sql.cfg 
    * @param uniqueID A unique string that is used for re-using (busy) Connections in the connection pool. This is only used for PreparedStatements as their Connection is locked when a bot creates a PreparedStatement. 
    * @param sqlstatement The (dynamic) SQL INSERT/UPDATE statement that will be pre-parsed for the PreparedStatement 
    * @param retrieveAutoGeneratedKeys whether auto-generated keys should be returned 
    * @return PreparedStatement object or null if there was an error 
    */ 
    public PreparedStatement createPreparedStatement(String connectionName, String uniqueID, String sqlstatement, boolean retrieveAutoGeneratedKeys) { 
     if(!operational) { 
      Tools.printLog("Unable to create PreparedStatement object; SQL System is not operational"); 
      return null; 
     } else { 
      if(!pools.containsKey(connectionName)) 
       return null; 
      else { 
       try { 
        // Have we hit the maximum number of allowed connections in the pool? 
        if(pools.get(connectionName).isAvailable() || pools.get(connectionName).totalConnections() < pools.get(connectionName).getMaxConnections()) { 
         Connection conn = pools.get(connectionName).getConnection(uniqueID); 
         if(retrieveAutoGeneratedKeys) 
          return conn.prepareStatement(sqlstatement, Statement.RETURN_GENERATED_KEYS); 
         else 
          return conn.prepareStatement(sqlstatement, Statement.NO_GENERATED_KEYS); 
        } else { 
         Tools.printLog("No more connections available in pool '"+connectionName+"' to create PreparedStatement!"); 
         return null; 
        } 
       } catch(SQLException sqle) { 
        Tools.printLog("SQLException encountered while trying to create a PreparedStatement from a Connection from '"+connectionName+"':"+sqle.getMessage()); 
        return null; 
       } 
      } 
     } 
    } 

    /** 
    * Frees specified Connection for specified connectionpool using specified unique ID. 
    * This should be used when closing a PreparedState\ment as it locks a connection on creation. 
    * 
    * @param connectionName Name of the connection as defined in sql.cfg 
    * @param uniqueID The unique ID used to create the Prepared Statement 
    * @param conn Connection used when creating a PreparedStatement 
    */ 
    public void freeConnection(String connectionName, String uniqueID, Connection conn) { 
     if(!operational) { 
      Tools.printLog("Unable to free Connection; SQL System is not operational"); 
     } else { 
      if(pools.containsKey(connectionName)) { 
       pools.get(connectionName).free(uniqueID, conn); 
      } 
     } 
    } 

    /** 
    * @return True if the SQL system is operational 
    */ 
    public boolean isOperational(){ 
     return operational; 
    } 



    /** 
    * Prints to the log file the status of all connection pools. 
    */ 
    public void printStatusToLog(){ 
     if(!operational){ 
      Tools.printLog("SQL Connection Not Operational"); 
     } else { 
      Tools.spamLog(getPoolStatus()); 
     } 
    } 

    /** 
    * Gets status of all connection pools. 
    * @return String array containing status of each individual connection pool. 
    */ 
    public String[] getPoolStatus() { 
     String[] status = new String[pools.size()]; 
     Iterator<SQLConnectionPool> i = pools.values().iterator(); 
     for(int j = 0; j<status.length; j++) 
      status[j] = i.next().toString(); 
     return status; 
    } 

    /** 
    * Checks the background queue for queries waiting to be run, and dispatches 
    * them each to a separate SQLWorker thread. After the result is received, 
    * it's then returned as an SQLResultEvent to the bot that made the query. 
    * The SQLManager thread will sleep for a defined amount of time, but will 
    * interrupt/return when a background query requires processing. 
    * 
    * Also runs a check for stales on each pool of connections periodically. 
    */ 
    public void run() { 
     boolean checkForStales; 
     while(true){ 

      // Run background queries 
      Iterator<String> i = queues.keySet().iterator(); 
      while(i.hasNext()){ 
       String name = i.next(); 
       SQLBackgroundQueue queue = queues.get(name); 
       SQLConnectionPool pool = pools.get(name); 
       while(!queue.isEmpty() && !pool.reachedMaxBackground()){ 
        SQLResultEvent event = queue.getNextInLine(); 
        try { 
         new SQLWorker(pool, event, this); 
        } catch (Exception e) { 
         Tools.printLog("Uncaught exception encountered running background query."); 
         Tools.printStackTrace(e); 
        }     
       } 
      } 

      // Perform stale check 
      checkForStales = (nextStaleCheck < System.currentTimeMillis());    
      i = pools.keySet().iterator(); 
      while(i.hasNext()) { 
       String name = i.next(); 
       SQLConnectionPool pool = pools.get(name); 
       if(checkForStales) 
        pool.updateStaleConnections(); 
      }    
      if(checkForStales) 
       nextStaleCheck = System.currentTimeMillis() + STALE_TIME; 
      try{ 
       Thread.sleep(THREAD_SLEEP_TIME); 
      } catch(InterruptedException e){} 
     } 
    } 

} 

SQLWorker.java

package twcore.core.sql; 

import java.sql.ResultSet; 
import java.sql.SQLException; 

import twcore.core.events.SQLResultEvent; 
import twcore.core.util.Tools; 

/** 
* Runs a background SQL query given a connection pool to use and an undelivered 
* SQLResultEvent object to place the results into. By handling in a separate 
* thread, it frees the bot process of having to wait on a query. 
*/ 
public class SQLWorker implements Runnable { 
    private SQLResultEvent m_event;  // Event to hand the ResultSet to  
    private SQLConnectionPool m_pool;  // Connection pool to run query on 
    private SQLManager  m_manager; // For interrupting any waits 

    /** 
    * Creates a new SQLWorker and begins a background query, given a connection 
    * pool to use for the query, an event to place the result set returned by 
    * the query into, and an SQLManager to wake up/interrupt when the process 
    * has finished (if it is currently sleeping). 
    * @param pool Connection pool to use to run the query 
    * @param event Event that will afterward contain the returned ResultSet 
    * @param manager Waiting object to interrupt when finished 
    */ 
    public SQLWorker(SQLConnectionPool pool, SQLResultEvent event, SQLManager manager) { 
     m_pool = pool; 
     m_manager = manager; 
     Thread t = new Thread(this, "SQLWorker"); 
     m_event = event; 
     m_pool.incrementBackgroundCount(); 
     t.start(); 
    } 

    /** 
    * Runs the SQL query found in the SQLResultEvent the SQLWorker was instantiated 
    * with. Sets the returned ResultSet inside the event, which in turn will fire 
    * the event in the bot so as to be handled and fetched by unique key. After 
    * this is done, the background queue count of the connection pool used is 
    * reduced by one, and the SQLManager that called the worker is interrupted 
    * back into consciousness, if it is currently asleep. 
    */ 
    public void run() { 
     try{    
      ResultSet set = m_pool.query(m_event.getQuery()); 
      m_event.setResultSet(set); 
      m_pool.decrementBackgroundCount(); 
      m_manager.interrupt(); 
     } catch(SQLException e){ 
      Tools.printLog("SQLException encountered while running background query in SQLWorker."); 
      Tools.printStackTrace(e); 
     } 
    } 
} 

還有更多的文件,但我無法將它們粘貼因爲我已經在這個問題上打了極限。我就直接您的網址

http://www.twcore.org/browser/trunk/twcore/src/twcore/core/sql