2010-02-02 36 views
3

我想以異步方式在服務器上執行Hive查詢。 Hive查詢可能需要很長時間才能完成,因此我不希望阻止該呼叫。我目前使用Thirft進行阻塞調用(在client.execute()上的阻塞),但我還沒有看到如何進行非阻塞調用的示例。這裏是阻止代碼:如何在Java中對Hive進行異步調用?

 TSocket transport = new TSocket("hive.example.com", 10000); 
     transport.setTimeout(999999999); 
     TBinaryProtocol protocol = new TBinaryProtocol(transport); 
     Client client = new ThriftHive.Client(protocol); 
     transport.open(); 
     client.execute(hql); // Omitted HQL 

     List<String> rows; 
     while ((rows = client.fetchN(1000)) != null) { 
      for (String row : rows) { 
       // Do stuff with row 
      } 
     } 

     transport.close(); 

上面的代碼缺少try/catch塊來保持它簡短。

有沒有人有任何想法如何做異步調用? Hive/Thrift可以支持它嗎?有沒有更好的辦法?

謝謝!

+0

我現在對Thrift不是很瞭解,但不能將它包裝在可運行的程序中並創建一個新線程? – brindy 2010-02-02 02:10:44

+0

是的,我很清楚自己可以完成這項工作,但是有些事情讓我覺得它已經嵌入到Thrift中,比如TNonblockingSocket。我找不到如何使用它的任何示例,或者即使Hive支持它。 – 2010-02-02 17:38:58

回答

1

說話蜂巢郵件列表後,蜂房不支持使用Thirft異步調用。

0

我不知道Hive特別是任何阻塞調用可以通過產生一個新線程並使用回調來調用異步調用。你可以看看java.util.concurrent.FutureTask,它被設計爲允許輕鬆處理這種異步操作。

1

我一無所知蜂巢,但作爲最後的手段,你可以使用Java的併發庫:

Callable<SomeResult> c = new Callable<SomeResult>(){public SomeResult call(){ 

    // your Hive code here 

}}; 

Future<SomeResult> result = executorService.submit(c); 

// when you need the result, this will block 
result.get(); 

或者,如果您不需要等待結果,使用的Runnable代替可調用

0

我們啓動了對AWS Elastic MapReduce的異步調用。通過調用AWS MapReduce Web服務,AWS MapReduce可以在亞馬遜的雲上運行hadoop/hive作業。

您也可以監控您的作業的狀態,一旦工作完成抓取結果關S3。

由於調用Web服務在本質上是異步的,我們從來沒有阻止我們的其他業務。我們繼續在一個單獨的線程中監控我們的工作狀態,並在工作完成時獲得結果。

2

AFAIK,在撰寫本文時,Thrift不生成異步客戶端。在此鏈接here(「異步」的搜索文本)中解釋的原因是Thrift專爲數據中心設計,其中延遲被認爲是低的。

不幸的是,你知道有經驗的呼叫和結果之間不總是由網絡引起的延遲,但通過邏輯執行!我們從Java應用程序服務器調用Cassandra數據庫時遇到了這個問題,我們希望限制總線程數。總結:現在你所能做的就是確保你有足夠的資源來處理被阻塞的併發線程所需的數量,並等待更高效的實現。

2

現在有可能使在Java節儉客戶端異步調用放入後,該修補程序: https://issues.apache.org/jira/browse/THRIFT-768

使用新節儉生成異步Java客戶端和初始化客戶端如下:

TNonblockingTransport transport = new TNonblockingSocket("127.0.0.1", 9160); 
TAsyncClientManager clientManager = new TAsyncClientManager(); 
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); 
Hive.AsyncClient client = new Hive.AsyncClient(protocolFactory, clientManager, transport); 

現在您可以像在同步接口上一樣在此客戶端上執行方法。唯一的變化是所有的方法都需要一個回調的附加參數。