2015-06-11 136 views
6

,同時通過在光滑3批插入插入數以千計的每5秒記錄我越來越數據庫異常而批量插入

org.postgresql.util.PSQLException: FATAL: sorry, too many clients already 

我的數據訪問層的樣子:

val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url, user=user, password=password, driver= jdbcDriver) 



override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = { 
    val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)} 
//db.close() 
     res 
     } 

    override def insertBatchQuery(rowList: List[T#TableElementType]): FixedSqlAction[Option[Int], NoStream, Write] = { 
    query ++= (rowList) 
    } 

關閉插入批處理中的連接沒有效果...它仍然會給出相同的錯誤。

我從我的代碼中調用插入一批這樣的:

val temp1 = list1.flatMap { li => 
     Future.sequence(li.map { trip => 
      val data = for { 
       tripData <- TripDataRepository.insertQuery(trip.tripData) 
       subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id)) 
      } yield ((tripData, subTripData)) 
      val res=db.run(data.transactionally) 
      res 
//db.close() 
     }) 
     } 

如果我關閉我在這裏工作後的連接,你可以看到在註釋掉的代碼,我得到錯誤:

java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] 

後調用沒有Future.sequence這樣的方法:

val temp1 =list.map { trip => 
      val data = for { 
      tripData <- TripDataRepository.insertQuery(trip.tripData) 
      subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id)) 
      } yield ((tripData, subTripData)) 
      val res=db.run(data.transactionally) 
      res 
     } 

我仍然有太多客戶錯誤...

+0

「*遺憾,太多的客戶已經*」表示你打開很多很多連接,但你永遠不會關閉它們。 –

+0

你可以發表你是如何調用insertBatch和一點點更舒服的代碼嗎?如@a_horse_with_no_name所示,錯誤意味着您打開了一個太多的連接。 – Biswanath

+0

li.map {trip => val data = for { tripData < - TripDataRepository.insertQuery(trip.tripData)//(TripDataRepository.query返回TripDataRepository.query.map(obj => obj)+ = trip.tripData) subTripData < - SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id)) }產率((tripData,subTripData)) VAL解析度= db.run(data.transactionally) – Archana

回答

1

這個問題的根源在於你正在旋轉一個無界的Future列表,每個列表連接到數據庫 - list中的每個條目都有一個連接。

這可以通過運行您的串行插入,迫使每個插入批來解決依賴於以前的:

// Empty Future for the results. Replace Unit with the correct type - whatever 
// "res" is below. 
val emptyFuture = Future.successful(Seq.empty[Unit]) 
// This will only insert one at a time. You could use list.sliding to batch the 
// inserts if that was important. 
val temp1 = list.foldLeft(emptyFuture) { (previousFuture, trip) => 
    previousFuture flatMap { previous => 
    // Inner code copied from your example. 
    val data = for { 
     tripData <- TripDataRepository.insertQuery(trip.tripData) 
     subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id)) 
    } yield ((tripData, subTripData)) 
    val res = db.run(data.transactionally) 
    previous :+ res 
    } 
}