2016-07-15 44 views
1

我嘗試在創建Elasticsearch索引後立即執行讀取操作。下面是簡單的代碼來重現這種情況:在Elasticsearch索引創建導致異常後立即進行讀取操作

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; 
import org.elasticsearch.client.Client; 
import org.elasticsearch.client.transport.TransportClient; 
import org.elasticsearch.common.settings.Settings; 
import org.elasticsearch.common.transport.InetSocketTransportAddress; 
import static java.net.InetAddress.getLoopbackAddress; 

public class ElasticIssue { 

    static String index = "my_index"; 

    public static void main(String[] args) { 
    final Client c = getClient(); 
    deleteIndexIfExists(c); 
    createIndex(c); 
    //refresh(c); 
    //flush(c); 
    //delay(); 
    //indexDoc(c); 
    getDoc(c); 
    } 

    static void getDoc(Client client) { 
    client.prepareGet(index, "some-type", "1").get(); 
    } 

    static void indexDoc(Client client) { 
    client.prepareIndex(index, "another-type", "25").setSource("{}").get(); 
    } 

    static void createIndex(Client client) { 
    client.admin().indices().prepareCreate(index).get(); 
    } 

    static void delay() { 
    try {Thread.sleep(3000);} catch (InterruptedException e) {} 
    } 

    static void flush(Client client) { 
    client.admin().indices().prepareFlush(index).get(); 
    } 

    private static void refresh(Client client) { 
    client.admin().indices().prepareRefresh(index).get(); 
    } 

    static void deleteIndexIfExists(Client client) { 
    final IndicesExistsResponse response = client.admin().indices().prepareExists(index).get(); 
    if (response.isExists()) { 
     deleteIndex(client); 
    } 
    } 

    static void deleteIndex(Client client) { 
    client.admin().indices().prepareDelete(index).get(); 
    } 

    static Client getClient() { 
    final Settings settings = Settings.builder() 
     .put("cluster.name", "elasticsearch") //default name 
     .put("node.name", "my-node") 
     .build(); 
    return TransportClient.builder() 
     .settings(settings) 
     .build() 
     .addTransportAddress(new InetSocketTransportAddress(getLoopbackAddress(), 9300)); 
    } 
} 

然後我得到以下錯誤:

Exception in thread "main" NoShardAvailableActionException[No shard available for [get [my_index][some-type][1]: routing [null]]]; nested: RemoteTransportException[[my-node][172.17.0.2:9300][indices:data/read/get[s]]]; nested: IllegalIndexShardStateException[CurrentState[RECOVERING] operations only allowed when shard state is one of [POST_RECOVERY, STARTED, RELOCATED]]; 
    at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction.perform(TransportSingleShardAction.java:199) 
    at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction.onFailure(TransportSingleShardAction.java:186) 
    at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction.access$1300(TransportSingleShardAction.java:115) 
    at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$AsyncSingleAction$2.handleException(TransportSingleShardAction.java:240) 
    at org.elasticsearch.transport.TransportService$DirectResponseChannel.processException(TransportService.java:855) 
    at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:833) 
    at org.elasticsearch.transport.TransportService$4.onFailure(TransportService.java:387) 
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:39) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: RemoteTransportException[[my-node][172.17.0.2:9300][indices:data/read/get[s]]]; nested: IllegalIndexShardStateException[CurrentState[RECOVERING] operations only allowed when shard state is one of [POST_RECOVERY, STARTED, RELOCATED]]; 
Caused by: [my_index][[my_index][3]] IllegalIndexShardStateException[CurrentState[RECOVERING] operations only allowed when shard state is one of [POST_RECOVERY, STARTED, RELOCATED]] 
    at org.elasticsearch.index.shard.IndexShard.readAllowed(IndexShard.java:1035) 
    at org.elasticsearch.index.shard.IndexShard.get(IndexShard.java:651) 
    at org.elasticsearch.index.get.ShardGetService.innerGet(ShardGetService.java:173) 
    at org.elasticsearch.index.get.ShardGetService.get(ShardGetService.java:86) 
    at org.elasticsearch.action.get.TransportGetAction.shardOperation(TransportGetAction.java:101) 
    at org.elasticsearch.action.get.TransportGetAction.shardOperation(TransportGetAction.java:44) 
    at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$ShardTransportHandler.messageReceived(TransportSingleShardAction.java:282) 
    at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$ShardTransportHandler.messageReceived(TransportSingleShardAction.java:275) 
    at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33) 
    at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75) 
    at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376) 
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

好像Elasticsearch索引的創建是不完全的,儘管響應已返回。這有點令人沮喪。如果我做任何:延遲,索引任何文檔,刷新索引,刷新索引(取消任何行的註釋);然後讀操作成功執行。

這種行爲的解釋是什麼?什麼是推薦的方法來確保索引可以工作?列出的解決方案通過實驗找到。

我使用Elasticsearch 2.3.3和Java 8.所有與Elasticsearch的通信都使用傳輸協議(使用Java api)完成。

因爲在這裏更容易的設置是泊塢窗命令與所有必要的設置得到容器:

docker run -p 9200:9200 -p 9300:9300 elasticsearch:2.3.3 -Des.node.name="my-node" 

這裏是Elasticsearch的Java API Maven的依賴性:

<dependency> 
    <groupId>org.elasticsearch</groupId> 
    <artifactId>elasticsearch</artifactId> 
    <version>2.3.3</version> 
</dependency> 

回答

0

您需要等到指數創建。這是你可以做的等待,直到指數的健康處於黃色狀態。

創建索引的功能後調用下面的函數:

static void indexStatusCheck(Client client) { 
    ClusterHealthResponse response = client.admin().cluster().prepareHealth().setIndices(index).setWaitForYellowStatus().get(); 
if (response.getStatus() == ClusterHealthStatus.RED) { 
throw Exception("Index not ready"); 
} 
    } 

然後你就可以用的getDoc()調用進行。

相關問題