2017-07-31 18 views
1

我試圖做到以下幾點:使用CacheListener 如何使用Redis適配器寫入Geode,然後使用Geode客戶端讀取/響應事件?

    1. 使用Redis的連接器
    2. 應對一些關鍵把一些數據的Geode創建/更新事件使用的Geode客戶端讀取數據並且該區域上的方法爲entrySet

    我已經無法從我的Geode客戶端訪問redis數據。我必須做到以下幾點:

    region.get(Coder.stringToByteArrayWrapper("key")); 
    

    我也有很多製作region.entrySet()工作的麻煩。首先,它與ClientRegionShortcut.PROXY根本不起作用,看起來只有50%的時間與ClientRegionShortcut.CACHING_PROXY一起工作。

    下面是我用來進行測試(請注意,我用lettuce作爲Redis的客戶端)的代碼:

    @Test 
    public void test_subscribe() throws InterruptedException, ExecutionException { 
        ClientCache cache = new ClientCacheFactory() 
         .addPoolLocator(HOST, LOCATOR_PORT) 
         .create(); 
    
        @SuppressWarnings({ "rawtypes", "unchecked" }) 
        CacheListener<ByteArrayWrapper, ByteArrayWrapper> cl = new CacheListenerAdapter() { 
         @Override 
         public void afterCreate(EntryEvent event) { 
          System.out.println("Created: " + event.getKey() + " = " + event.getNewValue()); 
         } 
    
         @Override 
         public void afterUpdate(EntryEvent event) { 
          System.out.println("Updated: " + event.getKey() + " replacing " + event.getOldValue() + "with" + event.getNewValue()); 
         } 
        }; 
    
        Region<ByteArrayWrapper, ByteArrayWrapper> region = cache 
         .<ByteArrayWrapper, ByteArrayWrapper> createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) 
         .addCacheListener(cl) 
         .setKeyConstraint(ByteArrayWrapper.class) 
         .setValueConstraint(ByteArrayWrapper.class) 
         .create(GeodeRedisServer.STRING_REGION); 
    
        RedisClient client = RedisClient.create("redis://" + HOST); 
        StatefulRedisConnection<String, String> connection = client.connect(); 
        RedisAsyncCommands<String, String> cmd = connection.async(); 
    
        cmd.set("1", "HelloGeodeRedis").get(); 
        cmd.set("2", "WorldGeodeRedis" + System.currentTimeMillis()).get(); 
    
        System.out.println("FromRedis: " + cmd.get("2").get()); 
        System.out.println("FromGeode: " + region.get(Coder.stringToByteArrayWrapper("2"))); 
    
        for (Map.Entry<?, ?> entry : region.entrySet()) { 
         System.out.format("key = %s, value = %s\n", entry.getKey(), entry.getValue()); 
        } 
    
        cache.close(); 
    } 
    

    我想知道,如果50%的啄與服務器做我運行:

    gfsh>describe region --name=ReDiS_StRiNgS 
    .......................................................... 
    Name   : ReDiS_StRiNgS 
    Data Policy  : persistent partition 
    Hosting Members : my-redis 
    
    Non-Default Attributes Shared By Hosting Members 
    
    Type | Name  | Value 
    ------ | ----------- | -------------------- 
    Region | size  | 2 
         | data-policy | PERSISTENT_PARTITION 
    
    gfsh>describe region --name my-region 
    .......................................................... 
    Name   : my-region 
    Data Policy  : persistent replicate 
    Hosting Members : my-server 
            my-redis 
    
    Non-Default Attributes Shared By Hosting Members 
    
    Type | Name  | Value 
    ------ | ----------- | -------------------- 
    Region | data-policy | PERSISTENT_REPLICATE 
         | size  | 2 
         | scope  | distributed-ack 
    
    gfsh>list members 
         Name  | Id 
    ---------------- | ----------------------------------------------------------- 
    my-locator | 172.16.202.245(my-locator:21234:locator)<ec><v0>:1024 
    my-server | 172.16.202.245(my-server:22154)<v1>:1025 
    my-redis | 172.16.202.245(my-redis:24890)<v2>:1026 
    

    正如你看到的我是手動創建的區域由兩個服務器託管,而是由Redis的創造了一個僅由Redis的服務器託管。

    我得到的50%的時間誤差是:

    org.apache.geode.cache.client.ServerOperationException: remote server on My-Computer(4352:loner):64103:58d54999: While performing a remote get 
        at org.apache.geode.cache.client.internal.AbstractOp.processObjResponse(AbstractOp.java:285) 
        at org.apache.geode.cache.client.internal.GetOp$GetOpImpl.processResponse(GetOp.java:143) 
        at org.apache.geode.cache.client.internal.AbstractOp.attemptReadResponse(AbstractOp.java:171) 
        at org.apache.geode.cache.client.internal.AbstractOp.attempt(AbstractOp.java:382) 
        at org.apache.geode.cache.client.internal.ConnectionImpl.execute(ConnectionImpl.java:275) 
        at org.apache.geode.cache.client.internal.pooling.PooledConnection.execute(PooledConnection.java:332) 
        at org.apache.geode.cache.client.internal.OpExecutorImpl.executeWithPossibleReAuthentication(OpExecutorImpl.java:900) 
        at org.apache.geode.cache.client.internal.OpExecutorImpl.execute(OpExecutorImpl.java:158) 
        at org.apache.geode.cache.client.internal.OpExecutorImpl.execute(OpExecutorImpl.java:115) 
        at org.apache.geode.cache.client.internal.PoolImpl.execute(PoolImpl.java:763) 
        at org.apache.geode.cache.client.internal.GetOp.execute(GetOp.java:91) 
        at org.apache.geode.cache.client.internal.ServerRegionProxy.get(ServerRegionProxy.java:116) 
        at org.apache.geode.internal.cache.LocalRegion.findObjectInSystem(LocalRegion.java:2776) 
        at org.apache.geode.internal.cache.LocalRegion.nonTxnFindObject(LocalRegion.java:1488) 
        at org.apache.geode.internal.cache.LocalRegionDataView.findObject(LocalRegionDataView.java:175) 
        at org.apache.geode.internal.cache.LocalRegion.get(LocalRegion.java:1377) 
        at org.apache.geode.internal.cache.LocalRegion.get(LocalRegion.java:1310) 
        at org.apache.geode.internal.cache.LocalRegion.get(LocalRegion.java:1295) 
        at org.apache.geode.internal.cache.AbstractRegion.get(AbstractRegion.java:320) 
        at trial.GeodeTest.test_subscribe(GeodeTest.java:112) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:498) 
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
        at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
        at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) 
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
        at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 
    Caused by: org.apache.geode.cache.RegionDestroyedException: Server connection from [identity(192.168.64.106(4352:loner):64103:58d54999,connection=1; port=64103]: Region named /ReDiS_StRiNgS/ReDiS_StRiNgS was not found during get request 
        at org.apache.geode.internal.cache.tier.sockets.BaseCommand.writeRegionDestroyedEx(BaseCommand.java:615) 
        at org.apache.geode.internal.cache.tier.sockets.command.Get70.cmdExecute(Get70.java:126) 
        at org.apache.geode.internal.cache.tier.sockets.BaseCommand.execute(BaseCommand.java:165) 
        at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMsg(ServerConnection.java:780) 
        at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:911) 
        at org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1166) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
        at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl$1$1.run(AcceptorImpl.java:523) 
        at java.lang.Thread.run(Thread.java:745) 
    

    所以,你有充分的免責聲明,我測試這種情況的原因是,我想從卡夫卡把數據主題使用kafka-connect-redis避免必須自己編寫geode kafka連接器。

    編輯:50%的問題已得到修復由於@Swapnil,但現在我又回到了有麻煩entrySet和事件通知工作。看來,除非我強制使用Redis客戶端的密鑰,否則我沒有收到任何EntryEvent通知。

  • 回答

    1

    我懷疑這是啓動Geode服務器的方式。我沒有問題,當我安裝我的Geode集羣像這樣:

    gfsh>start locator --name=loc1 
    gfsh>start server --name=serv1 --redis-port=1111 --redis-bind-address=localhost 
    gfsh>start server --name=serv2 --server-port=40405 --redis-port=2222 --redis-bind-address=localhost 
    

    形容區向我表明RedisStrings在兩臺服務器上創建:

     
    gfsh>describe region --name=/ReDiS_StRiNgS 
    .......................................................... 
    Name   : ReDiS_StRiNgS 
    Data Policy  : partition 
    Hosting Members : serv2 
            serv1 
    
    Non-Default Attributes Shared By Hosting Members 
    
    Type | Name  | Value 
    ------ | ----------- | --------- 
    Region | size  | 0 
         | data-policy | PARTITION 
    

    然後從我的測試程序中,我使用過的數據插入Redis的(Jedis客戶端),然後從的Geode成功回讀數據:

    public void putRedis() { 
        Jedis jedis = new Jedis("localhost", 1111); 
        for (int i=0; i<10; i++) { 
        jedis.set("foo"+i, "bar"+i); 
        } 
    } 
    
    public void readGeode() { 
        ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); 
        clientCacheFactory.addPoolLocator("localhost", 10334); 
        ClientCache client = clientCacheFactory.create(); 
        Region redisStrings = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("ReDiS_StRiNgS"); 
        for (int i=0; i<10; i++) { 
        System.out.println(redisStrings.get(Coder.stringToByteArrayWrapper("foo"+i))); 
        } 
    } 
    
    +0

    感謝它確實與此有關。我停止了其他服務器,沒有更多的錯誤。我想這意味着每臺服務器都託管一個Redis適配器。 – Crystark

    +0

    似乎50%的問題已經修復,但現在我面臨着另一個問題:除非我強制使用geode客戶端獲取密鑰,否則我沒有事件通知,也沒有數據在我的'entrySet'中。 – Crystark

    2

    我有沒有收到這些事件是因爲我沒有註冊的感興趣的問題我感興趣的密鑰。

    添加region.registerInterestRegex(".*");允許標記每個密鑰的興趣。它需要使用.setPoolSubscriptionEnabled(true)來設置ClientCache

    ClientCache cache = new ClientCacheFactory() 
         .addPoolLocator(HOST, LOCATOR_PORT) 
         .setPoolSubscriptionEnabled(true) 
         .create(); 
    
        // ... 
    
        Region<ByteArrayWrapper, ByteArrayWrapper> region = cache 
         .<ByteArrayWrapper, ByteArrayWrapper> createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) 
         .addCacheListener(cl) 
         .setKeyConstraint(ByteArrayWrapper.class) 
         .setValueConstraint(ByteArrayWrapper.class) 
         .create(GeodeRedisServer.STRING_REGION); 
    
        region.registerInterestRegex(".*"); 
    
        // ... 
    
    +0

    我有[同樣的問題](https://stackoverflow.com/questions/45286249/apache-geode-cachelistener-not-executing) – rupweb

    相關問題