2017-05-05 35 views
0

我有兩種方法,我們稱它們爲load()init()。每個人都在自己的線程中啓動一個計算,並在其自己的執行上下文中返回Future。這兩個計算是獨立的。用回調編寫兩個Scala未來,沒有第三個ExecutionContext

val loadContext = ExecutionContext.fromExecutor(...) 
def load(): Future[Unit] = { 
    Future 
} 

val initContext = ExecutionContext.fromExecutor(...) 
def init(): Future[Unit] = { 
    Future { ... }(initContext) 
} 

我想這兩個來自一些三線程中調用 - 說這是從main() - 並且都完成後執行一些其他的計算。

def onBothComplete(): Unit = ... 

現在:

  1. 我不在乎它完成第一
  2. 我不在乎什麼的線程上執行其它計算,除了:
  3. 我不想阻止兩個線程等待另一個線程;
  4. 我不想阻塞第三個(調用)線程;和
  5. 我不想爲了設置標誌而開始第四個線程。

如果我用-內涵,我得到的是這樣的:

val loading = load() 
val initialization = initialize() 

for { 
    loaded <- loading 
    initialized <- initialization 
} yield { onBothComplete() } 

,我得到無法找到一個隱含的ExecutionContext。

我認爲這意味着斯卡拉希望第四個線程等待兩個期貨的完成並設置標誌,無論是明確的新的ExecutionContext還是ExecutionContext.Implicits.global。所以看起來理解力已經不存在了。

我想我也許可以嵌套回調:

initialization.onComplete { 
    case Success(_) => 
    loading.onComplete { 
     case Success(_) => onBothComplete() 
     case Failure(t) => log.error("Unable to load", t) 
    } 
    case Failure(t) => log.error("Unable to initialize", t) 
} 

不幸的是onComplete也需要一個隱含的ExecutionContext,我也得到了同樣的錯誤。 (另外,這是難看的,並且從loading失去錯誤消息,如果initialization失敗。)

是否有任何方式來組成的Scala期貨而不阻塞和不引入另一個ExecutionContext?如果沒有,我可能不得不把它們扔給Java 8 CompletableFutures Javaslang Vavr Futures,它們都能夠在執行原始工作的線程上運行回調。

更新澄清阻塞兩個等待另一個的線程也是不可接受的。

再次更新對於完成後計算沒有那麼具體。

+0

你應該做'Future.firstCompletedOf(名單(初始化,加載))'如果你想抓住未來的結果首先完成。爲了列表理解將等待兩個期貨在完成收益之前完成。 – pcting

+0

@pcting在執行其他計算之前,我確實希望等待兩個期貨的完成。我不想阻止當前的線程來做到這一點。 –

+0

如果這是爲了將AtomicBoolean設置爲true的確切目的,那麼可以改爲在完成時指示完成。 'promise.completeWith(初始化zip加載)' –

回答

1

爲什麼不重用自己的執行上下文之一?不知道您對這些要求,但如果你使用一個單獨的線程執行,你可以只重用一個作爲您的理解與執行上下文,你不會得到任何新創建的線程:

implicit val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor) 

如果你真的無法重用它們,您可以將其視爲隱式執行上下文:

implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
    (runnable: Runnable) => { 
    runnable.run() 
    }) 

哪個將在當前線程上運行期貨。但是,Scala文檔明確建議不要這樣做,因爲它引入了非確定性,其中線程運行Future(但正如您所說的,您並不在乎它運行哪個線程,所以這可能無關緊要)。

請參閱Synchronous Execution Context爲什麼這是不可取的。

與該上下文的一個例子:

val loadContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor) 

def load(): Future[Unit] = { 
    Future(println("loading thread " + Thread.currentThread().getName))(loadContext) 
} 

val initContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor) 

def init(): Future[Unit] = { 
    Future(println("init thread " + Thread.currentThread().getName))(initContext) 
} 

val doneFlag = new AtomicBoolean(false) 

val loading = load() 
val initialization = init() 

implicit val currentThreadExecutionContext = ExecutionContext.fromExecutor(
    (runnable: Runnable) => { 
    runnable.run() 
    }) 

for { 
    loaded <- loading 
    initialized <- initialization 
} yield { 
    println("yield thread " + Thread.currentThread().getName) 
    doneFlag.set(true) 
} 

打印:

loading thread pool-1-thread-1 
init thread pool-2-thread-1 
yield thread main 

雖然yield線可以打印任何pool-1-thread-1pool-2-thread-1根據運行。

+0

正如我所說的,我不想阻塞當前線程,所以SynchronousExecutionContext已經結束。如果我明確選擇了現有的ExecutionContext中的一個,它是否意味着(假設它們是單線程的)該線程被阻塞,等待另一個線程?我也不想那樣。 (我會更新這個問題。) –

1

在Scala中,Future表示要執行的異步工作(即與其他工作單元同時執行)。 ExecutionContext表示用於執行Future的線程池。換句話說,ExecutionContext是執行實際工作的工作人員的團隊。

爲了提高效率和可擴展性,最好是有大團隊(S)(例如,單ExecutionContext有10個線程執行10 Future的),而不是小團隊(例如5 ExecutionContext 2個線程各執行10 Future的)。

在你的情況,如果你想線程數限制爲2,您可以:

def load()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = { 
    Future { ... } /* will use the teamOfWorkers implicitly */ 
} 

def init()(implicit teamOfWorkers: ExecutionContext): Future[Unit] = { 
    Future { ... } /* will use the teamOfWorkers implicitly */ 
} 

implicit val bigTeamOfWorkers = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)) 
/* All async works in the following will use 
    the same bigTeamOfWorkers implicitly and works will be shared by 
    the 2 workers (i.e. thread) in the team */ 
for { 
    loaded <- loading 
    initialized <- initialization 
} yield doneFlag.set(true) 

無法找到一個隱含的ExecutionContext錯誤並不意味着斯卡拉想要額外的線程。這隻意味着Scala想要ExecutionContext來完成這項工作。而額外的ExecutionContext並不一定意味着額外的「線程」,例如以下ExecutionContext,而不是創建新的線程,將在當前線程執行的工作:

val currThreadExecutor = ExecutionContext.fromExecutor(new Executor { 
    override def execute(command: Runnable): Unit = command.run() 
}) 
相關問題