2013-07-03 25 views
3

我有一個Java應用程序設置,其中有多個memcached服務器節點與spymemcached客戶端進行通信。如何使用spymemcached動態添加memcached節點

我想知道是否有可能在運行時添加或刪除服務器節點,而不會干擾所有現有的緩存節點(我知道一些節點應該更改)。

這是我知道的(或理解):

它可以設置自定義散列算法在DefaultConnectionFactory,這有助於我們使用一致的散列,甚至使用內置的KetamaConnectionFactory。

所以我們應該可以添加或刪除節點,只改變一個或幾個現有節點。

是否有可能使用spymemcached

如果是,那麼怎麼樣?

任何人都可以請我指出正確的方向嗎?

回答

5

看起來應該是NodeLocator.updateLocator(List<MemcachedNode> newNodes)

但是爲了連接MemcachedNode有點困難。您必須覆蓋MemcachedClient,MemcachedConnectionDefaultConnectionFactory

這是合理的,你想添加或刪除客戶從MemcachedClient所以你添加remove(MemcachedNode node)add(MemcachedNode node)方法。

如果刪除,應斷開節點(請參閱MemcachedConnection.shutdown()),並將其從NodeLocator.getAll()中刪除,並致電NodeLocator.updateLocator(List<MemcachedNode> newNodes)

如果添加,應通過MemcachedConnection.createConnections(最終集合a)連接節點,將其與NodeLocator.getAll()合併並調用NodeLocator.updateLocator(List<MemcachedNode> newNodes)

那麼,我從來沒有嘗試過,所以它可能無法正常工作。祝你好運!

ExtMemCachedConnection.java 公共類ExtMemCachedConnection延伸MemcachedConnection {

protected final OperationFactory opFact; 

    /** 
    * Construct a memcached connection. 
    * 
    * @param bufSize the size of the buffer used for reading from the server 
    * @param f  the factory that will provide an operation queue 
    * @param a  the addresses of the servers to connect to 
    * @throws java.io.IOException if a connection attempt fails early 
    */ 
    public ExtendableMemcachedConnection(int bufSize, ConnectionFactory f, 
             List<InetSocketAddress> a, 
             Collection<ConnectionObserver> obs, 
             FailureMode fm, OperationFactory opfactory) 
     throws IOException { 
    super(bufSize, f, a, obs, fm, opfactory); 
    this.opFact = opfactory; 
    } 

    public void add(InetSocketAddress nodeAddress) throws IOException { 
    final List<InetSocketAddress> nodeToAdd = new ArrayList<InetSocketAddress>(1); 
    nodeToAdd.add(nodeAddress); 
    List<MemcachedNode> newNodesList = createConnections(nodeToAdd); 
    newNodesList.addAll(getLocator().getAll()); 
    getLocator().updateLocator(newNodesList); 
    } 

    //The node should be obtain from locator to ensure currentNode.equals(node) will return true 
    public void remove(MemcachedNode node) throws IOException { 
    for(MemcachedNode currentNode : getLocator().getAll()) { 
     if(currentNode.equals(node)) { 
     Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue(); 
     if (currentNode.getChannel() != null) { 
      currentNode.getChannel().close(); 
      currentNode.setSk(null); 
      if (currentNode.getBytesRemainingToWrite() > 0) { 
      getLogger().warn("Shut down with %d bytes remaining to write", 
          currentNode.getBytesRemainingToWrite()); 
      } 
      getLogger().debug("Shut down channel %s", currentNode.getChannel()); 
     } 
     //Unfortunatelly, redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation 
     redistributeOperations(notCompletedOperations); 
     } 
    } 
    } 

    protected void redistributeOperations(Collection<Operation> ops) { 
    for (Operation op : ops) { 
     if (op.isCancelled() || op.isTimedOut()) { 
     continue; 
     } 
     if (op instanceof KeyedOperation) { 
     KeyedOperation ko = (KeyedOperation) op; 
     int added = 0; 
     for (String k : ko.getKeys()) { 
      for (Operation newop : opFact.clone(ko)) { 
      addOperation(k, newop); 
      added++; 
      } 
     } 
     assert added > 0 : "Didn't add any new operations when redistributing"; 
     } else { 
     // Cancel things that don't have definite targets. 
     op.cancel(); 
     } 
    } 
    } 


} 

ExtMemcachedClient.java

public void add(InetSocketAddress nodeAddress) { 
    if(mconn instanceof ExtMemcachedConnection) { 
     ((ExtMemcachedConnection)mconn).add(nodeAddress); 
    } 
    } 

    public boolean remove(MemcachedNode node) { 
    if(mconn instanceof ExtMemcachedConnection) { 
     ((ExtMemcachedConnection)mconn).remove(nodeAddress); 
    } 
    } 

ExtMemcachedConnectionfactory.java

@Override 
    public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException { 
    return new ExtendableMemcachedConnection(getReadBufSize(), this, addrs, 
              getInitialObservers(), getFailureMode(), getOperationFactory()); 
    }