2016-09-18 34 views
2

我在我的Kotlin應用程序中使用Kovenant,我打電話給Elasticsearch,它有自己的異步API。我寧願用承諾,但最好我能想出是這樣的:在Kotlin中,如何整合Kovenant承諾和Elasticsearch異步響應?

task {  
    esClient.prepareSearch("index123") 
      .setQuery(QueryBuilders.matchAllQuery()) 
      .execute().actionGet() 
} then { 
    ... 
} success { 
    ... 
} fail { 
    ... 
} 

這使得一個Kovenant異步任務線程,然後Elasticsearch使用從池中的線程,然後actionGet()同步塊Elasticsearch到找回結果。在阻塞其他人的時候產生新線程似乎很愚蠢。有沒有一種方法可以更緊密地集成線程調度?

注:這個問題是故意書面和作者(Self-Answered Questions)回答,這樣有趣的問題的解決方案在SO共享。

回答

2

您可以使用Kovenant Deferred class來創建承諾,而無需像您在樣本中那樣通過異步task進行調度。該模型基本上是:

  1. 創建一個延遲的實例
  2. 鉤到異步處理並解決或拒絕遞延基於異步回調
  3. 回報deferred.promise給調用者

在代碼,這看起來像:

fun doSearch(): Promise<SearchResponse, Throwable> { 
    val deferred = deferred<Response, Throwable>() 
    esClient.prepareSearch("index") 
     .setQuery(QueryBuilders.matchAllQuery()) 
     .execute(object: ActionListener<T> { 
        override fun onResponse(response: T) { 
         deferred.resolve(response) 
        } 

        override fun onFailure(e: Throwable) { 
         deferred.reject(e) 
        }) 
    return deferred.promise 
} 

可重複使用的方式做到這一點是先創建一個適配器,可以只適應Elasticsearch的願望爲ActionListener一般地工作,並承諾工作:

fun <T: Any> promiseResult(deferred: Deferred<T, Exception>): ActionListener<T> { 
    return object: ActionListener<T> { 
     override fun onResponse(response: T) { 
      deferred.resolve(response) 
     } 

     override fun onFailure(e: Throwable) { 
      deferred.reject(wrapThrowable(e)) 
     } 
    } 
} 

class WrappedThrowableException(cause: Throwable): Exception(cause.message, cause) 
fun wrapThrowable(rawEx: Throwable): Exception = if (rawEx is Exception) rawEx else WrappedThrowableException(rawEx) 

注:wrapThrowable()方法有改變Throwable到Kovenant的Exception因爲當前版本(3.3.0)有一些方法所期望的承諾的拒絕式下降,從Exception例如bind()),你可以留在Throwable如果使用unwrap()代替嵌套的承諾。

現在使用這個適配器函數來一般地擴展Elasticsearch ActionRequestBuilder,這是幾乎所有您將調用​​的東西;創建一個新的promise()擴展功能:

fun <Request: ActionRequest<*>, Response: ActionResponse, RequestBuilder: ActionRequestBuilder<*, *, *, *>, Client: ElasticsearchClient<*>> 
     ActionRequestBuilder<Request, Response, RequestBuilder, Client>.promise(): Promise<Response, Exception> { 
    val deferred = deferred<Response, Exception>() 
    this.execute(promiseResult(deferred)) 
    return deferred.promise 
} 

現在,您可以撥打promise()而不是​​:

esClient.prepareSearch("index") 
      .setQuery(QueryBuilders.matchAllQuery()) 
      .promise() 

並開始鏈接你的承諾...

esClient.admin().indices().prepareCreate("index1").setSettings("...").promise() 
     .bind { 
      esClient.admin().cluster().prepareHealth() 
        .setWaitForGreenStatus() 
        .promise() 
     } bind { 
      esClient.prepareIndex("index1", "type1") 
        .setSource(...) 
        .promise() 
     } bind { 
      esClient.prepareSearch("index1") 
        .setQuery(QueryBuilders.matchAllQuery()) 
        .promise() 
     } then { searchResults -> 
      // ... use searchResults 
     }.success { 
      // ... 
     }.fail { 
      // ... 
     } 
} 

您應該熟悉bind()unwrap()當你有嵌套的承諾,你想鏈沒有嵌套更深。如果您不想包含kovenant-functional,則可以在上述情況下使用unwrap().then代替bind

由於Elasticsearch客戶端中所有請求對象的一致性,您在Elasticsearch中的每次調用都將能夠使用promise()而不是​​。