我試圖將數百萬數據行插入到數據庫中。我正在嘗試爲此使用ThreadPoolExecutor。我正在爲每個9000記錄創建一個批處理,並將批處理髮送到每個線程。在這裏,我將ThreadPool Size固定爲20.在大小增加後,它失敗了。如何檢查ThreadPoolExecutor中有多少線程可用,以及如何等待線程池有空閒線程。使用多線程將百萬數據插入到數據庫中
聽到是我的代碼,請幫助,如果我錯了。
int threadCount=10;
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadCount);
int i=0;
StringBuffer sb=new StringBuffer();
sb.append("BEGIN BATCH");
sb.append(System.lineSeparator());
int cnt =metaData.getColumnCount();
while(rs.next())
{
String query ="INSERT INTO "+table+" ("+columnslist.get(1)+")VALUES("+i;
for (int j=1 ; j <= cnt ; j++)
{
if(metaData.getColumnTypeName(j).contains("int") || metaData.getColumnTypeName(j).contains("number"))
{
query +=","+ rs.getInt(j);
}
else if(metaData.getColumnTypeName(j).contains("varchar") || metaData.getColumnTypeName(j).contains("date") || metaData.getColumnTypeName(j).contains("getTimestamp"))
{
query +=",'"+parseColumnData(rs.getString(j))+"'";
}
else
{
query +=",'"+parseColumnData(rs.getString(j))+"'";
}
}
query +=");";
sb.append(query);sb.append(System.lineSeparator());
if(i%9000==0)
{
sb.append("APPLY BATCH");
System.out.println(threadPool.getActiveCount());
Thread t = new Thread(new ExcecuteTask(sb.toString(),session));
threadPool.execute(t);
sb.setLength(0);
sb.append("BEGIN BATCH");
sb.append(System.lineSeparator());
}
i++;
}
sb.append("APPLY BATCH");
Thread t = new Thread(new ExcecuteTask(sb.toString(),session));
threadPool.execute(t);
sb.setLength(0);
threadPool.shutdown();
while (threadPool.getTaskCount() != threadPool.getCompletedTaskCount())
{
}
System.out.println(table+" Loaded sucessfully");
public class ExcecuteTask implements Runnable
{
private String sb;
private Session session;
public ExcecuteTask(String s,Session session)
{
sb = s;
this.session=session;
}
public void run()
{
session.executeAsync(sb.toString());
}
}
您正在使用'FixedThreadPool',但在我看來您正在尋找一個具有可擴展線程數的'CachedThreadPool'。 – Pievis
你做錯了。查看'Connection.prepareStatement'獲取編譯語句(作爲'PreparedStatement'實例)。使用'PreparedStatement.setObject'爲要插入的行設置值,然後用'Statement.addBatch'將該行添加到批處理中。使用'Statement.executeBatch'來執行批處理(即在你的案例中插入行)。使用'Statement.clearBatch'進行另一輪插入。爲SQL語句連接字符串是許多原因的禁止(SQL注入是其中之一)。使用'+ ='連接字符串是字符串連接的不可否認的... –
在此處閱讀有關SQL注入防護的更多信息:[SQL注入預防備忘](https://www.owasp.org/index.php/SQL_Injection_Prevention_Cheat_Sheet ) –