2017-03-02 86 views
0

我試圖在使用分區的多線程上運行JAVAEE7批處理。
我的批處理很簡單:讀一串隨機數,用3個線程寫出它們的總和。在多線程上運行JAVAEE7批處理時出錯

我的工作XML

<job id="partition" xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
    version="1.0"> 
    <step id="process" next="cleanup"> 
     <chunk item-count="3"> 
      <reader ref="partitionProcessIR"> 
       <properties> 
        <property name="start" value="#{partitionPlan['start']}" /> 
        <property name="end" value="#{partitionPlan['end']}" /> 
       </properties> 
      </reader> 
      <processor ref="partitionProcessIP" /> 
      <writer ref="partitionProcessIW" /> 
     </chunk> 
     <partition> 
      <mapper ref="partitionMapperImpl" /> 
     </partition> 
    </step> 
    <step id="cleanup"> 
     <batchlet ref="partitionCleanupBatchlet"></batchlet> 
    </step> 
</job> 

我PartitionMapperImpl:

@Override 
public PartitionPlan mapPartitions() throws Exception { 
    // TODO Auto-generated method stub 
    return new PartitionPlanImpl() { 

     @Override 
     public int getPartitions() { 
      return 3; 
     } 

     @Override 
     public int getThreads() { 
      return 3; 
     } 

     @Override 
     public Properties[] getPartitionProperties() { 
      int totalRecords = getTotalRecords(); 
      int partItems = totalRecords/getPartitions(); 
      int remainItems = totalRecords % getPartitions(); 
      Properties[] props = new Properties[getPartitions()]; 

      for (int i = 0; i < getPartitions(); i++) { 
       props[i] = new Properties(); 
       props[i].setProperty("start", String.valueOf(i * partItems)); 
       // if this is the last partition, add remaining items 
       if (i == getPartitions() - 1) { 
        props[i].setProperty("end", String.valueOf((i + 1) * partItems + remainItems)); 
       } else { 
        props[i].setProperty("end", String.valueOf((i + 1) * partItems)); 
       } 
      } 
      return props; 
     } 
    }; 
} 

private int getTotalRecords() { 
    return 50; 
} 

我的讀者:

@Override 
public void open(Serializable checkpoint) throws Exception { 
    int start = new Integer(startProperty); 
    int end = new Integer(endProperty); 
    List<Integer> listNumber = new ArrayList<>(); 
    for (int i = start; i < end; i++) { 
     int rand = (int) (Math.random() * 10); 
     listNumber.add(rand); 
    } 
    iterator = listNumber.iterator(); 
} 

@Override 
public Integer readItem() throws Exception { 
    if (iterator.hasNext()) { 
     return iterator.next(); 
    } 
    // end read 
    return null; 
} 

我的處理器

@Override 
    public Integer processItem(Object arg0) throws Exception { 
     Integer rand = (Integer) arg0; 
     return rand; 
    } 

我的作家

@Override 
    public void writeItems(List<Object> arg0) throws Exception { 
     int sum = 0; 
     for (Object object : arg0) { 
      Integer rand = (Integer) object; 
      sum += rand; 
     } 
     System.out.println(Thread.currentThread().getId() + " | SUM OF CHUNK: " + sum); 
    } 

當我運行這個批處理,下面的錯誤發生。 我猜這與在derby數據庫中同時存儲多個檢查點有關。

2017-03-02T15:22:45.955 + 0700 |情報:275 | SUM OF CHUNK:13 2017-03-02T15:22:45.958 + 0700 |情報:316 | SUM OF OF CHUNK:17 2017-03-02T15:23:05.971 + 0700 |重大:讀取進程寫入循環失敗 com.ibm.jbatch.container.exception.BatchContainerServiceException: 無法保留[進程的檢查點數據]在 com.ibm.jbatch.container.persistence.CheckpointManager.checkpoint(CheckpointManager.java:133) 在 com.ibm.jbatch.container.impl.ChunkStepControllerImpl.invokeChunk(ChunkStepControllerImpl.java:644) 在 COM .ibm.jbatch.container.impl.ChunkStepControllerImpl.invokeCoreStep(ChunkStepControllerImpl.java:764) 在 com.ibm.jbatch.container.impl.BaseStepControllerImpl.execute(BaseStepControllerImpl.java:144) 在 COM.I bm.jbatch.container.impl.ExecutionTransitioner.doExecutionLoop(ExecutionTransitioner.java:112) 在 com.ibm.jbatch.container.impl.JobThreadRootControllerImpl.originateExecutionOnThread(JobThreadRootControllerImpl.java:110) 在 com.ibm.jbatch。 container.util.BatchWorkUnit.run(BatchWorkUnit.java:80) at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java :266)在 org.glassfish.enterprise.concurrent.internal.ManagedFutureTask.run(ManagedFutureTask.java:141) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 JA va.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)at org.glassfish.enterprise.concurrent.ManagedThreadFactoryImpl $ ManagedThread.run( ManagedThreadFactoryImpl.java:250) 造成的:com.ibm.jbatch.container.exception.PersistenceException: java.sql.SQLTransactionRollbackException: ????????????????????鎖:ROW,CHECKPOINTDATA, (110,27)等待XID:{77885156,S},APP,從 選擇ID,obj CHECKPOINTDATA其中id = ?已授予XID:{77885155,X}鎖定:ROW, CHECKPOINTDATA,(110,28)等待XID:{77885155,S},APP,選擇ID, obj from CHECKPOINTDATA where id =?授予XID:{77885156,X} ???????? XID:77885156?在 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:503) 在 fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.updateCheckpointData(JBatchJDBCPersistenceManager.java:388) 在 魚。 payara.jbatch.persistence.rdbms.LazyBootPersistenceManager.updateCheckpointData(LazyBootPersistenceManager.java:230) at com.ibm.jbatch.container.persistence.CheckpointManager.checkpoint(CheckpointManager.java:128) ... 13 more引起: java.sql.SQLTransactionRollbackException: ??????????????????????:鎖:ROW,CHECKPOINTDATA, (110,27)等待XID:{77885156,S},APP,從選擇id,objCHECKPOINTDATA其中id =?已授予XID:{77885155,X}鎖定:ROW, CHECKPOINTDATA,(110,28)等待XID:{77885155,S},APP,選擇ID, obj from CHECKPOINTDATA where id =?授予XID:{77885156,X} ???????? XID:77885156?在 org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(未知 源)在 org.apache.derby.impl.jdbc.Util.generateCsSQLException(未知來源) 在 org.apache.derby.impl。 jdbc.TransactionResourceImpl.wrapInSQLException(未知 源)在 org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(未知 源)在 org.apache.derby.impl.jdbc.EmbedConnection.handleException(未知 源) at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source)at org.apache.derby.impl.jdbc.EmbedResultSet.closeOnTransactionError(Unknown ) org.apache.derby.impl.jdbc.EmbedResultSet.movePosition(Unknown Source) at org.apache.derby.impl.jdbc.EmbedResultSet.next(Unknown Source)at com.sun.gjc.spi .base.ResultSetWrapper.next(ResultSetWrapper.java:103) at fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:498) ... 16更多原因:java.sql.SQLException : ???????????????????????????????????????????鎖:ROW,CHECKPOINTDATA, (110,27 )等待XID:{77885156,S},APP,從 選擇ID,obj CHECKPOINTDATA其中id =?已授予XID:{77885155,X}鎖定:ROW, CHECKPOINTDATA,(110,28)等待XID:{77885155,S},APP,選擇ID, obj from CHECKPOINTDATA where id =?授予XID:{77885156,X} ???????? XID:77885156? ERROR 40001:在以 org.apache.derby.impl.jdbc.SQLExceptionFactory40.wrapArgsForTransportAcrossDRDA(未知 來源) org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(未知 源)... 27多個所致: ???????????????????????????????????????????鎖:ROW,CHECKPOINTDATA, (110,27 )等待XID:{77885156,S},APP,從 選擇ID,obj CHECKPOINTDATA其中id =?已授予XID:{77885155,X}鎖定:ROW, CHECKPOINTDATA,(110,28)等待XID:{77885155,S},APP,選擇ID, obj from CHECKPOINTDATA where id =?授予XID:{77885156,X} ???????? XID:77885156? at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)at org.apache.derby.impl.services.locks.Deadlock。buildException(未知 源)在 org.apache.derby.impl.services.locks.ConcurrentLockSet.lockObject(未知 源)在 org.apache.derby.impl.services.locks.ConcurrentLockSet.zeroDurationLockObject(未知 源)在在 org.apache org.apache.derby.impl.services.locks.AbstractPool.zeroDurationlockObject(未知 源)在 org.apache.derby.impl.services.locks.ConcurrentPool.zeroDurationlockObject(未知 源)。 derby.impl.store.raw.xact.RowLocking2nohold.lockRecordForRead(Unknown Source)at org.apache.derby.impl.store.access.conglomerate.OpenConglomerate.lockPositionForRead(Unknown 源)處 org.apache.derby.impl.store.access.heap.HeapScan.fetchNextGroup org.apache.derby.impl.store.access.conglomerate.GenericScanController.fetchRows(未知 源)(未知 源)在在 org.apache org.apache.derby.impl.sql.execute.BulkTableScanResultSet.reloadArray(未知 源)在 org.apache.derby.impl.sql.execute.BulkTableScanResultSet.getNextRowCore(未知 源) .derby.impl.sql.execute.BasicNoPutResultSetImpl.getNextRow(未知 源)... 20多個

你有什麼想法如何解決這個問題?
或者任何可以在2個以上線程上運行的示例都非常有用。
在此先感謝。

回答

0

它看起來好像你可能會遇到併發問題,比如死鎖或鎖定超時。 (這很難說,因爲你的異常信息在這個問題上有點亂碼,我認爲,因爲德比信息是以本地語言字符串和英文字符串混合打印的)。

你可以找到診斷和了解爲什麼你的併發數據庫訪問這裏遇到這些問題的一些策略:https://wiki.apache.org/db-derby/LockDebugging

0

看起來像一個似鯖水狼牙魚的問題,從這一行堆棧跟蹤:

fish.payara.jbatch.persistence.rdbms.JBatchJDBCPersistenceManager.queryCheckpointData(JBatchJDBCPersistenceManager.java:503) 

你可以嘗試使用GlassFish正確運行您的應用程序,並查看是否存在相同的問題。

或者您可以將應用程序部署到包含JBeret作爲批容器的WildFly。如果您的應用程序已寫入JSR 352規範,則應該在任何Java EE 7兼容的應用程序服務器中進行部署和運行。您可以將WildFly配置爲將jdbc作業存儲庫與Derby或任何其他受支持的DBMS(包括捆綁的H2數據庫)一起使用。

如果你仍然陷入困境,我建議跟進Payara項目。