2013-10-05 52 views
1

我有一個代碼,在運行時添加額外的memcache實例,但這使我的密鑰丟失。我知道有幾個庫可用,如consistent_hash,hash_ring,但我無法在我的代碼中使用它們。我知道有可用的ketama,但無法找到它的python代碼示例。Python memcache一致哈希使用ketama

import random 
import string 
import memcache 


class MemcacheClient(memcache.Client): 
    """ A memcache subclass. It currently allows you to add a new host at run 
    time. 

    Sadly, this truely messes with the our keys. I.E. Adding a host at runtime 
    effectively wipes our cache all together...Wonder why? 
    """ 

    def _get_server(self, key): 
     """ Current implementation of Memcache client 
     """ 
     return super(MemcacheClient, self)._get_server(key) 

    def add_server(self, server): 
     """ Adds a host at runtime to client 
     """ 
     # Create a new host entry 
     server = memcache._Host(
      server, self.debug, dead_retry=self.dead_retry, 
      socket_timeout=self.socket_timeout, 
      flush_on_reconnect=self.flush_on_reconnect 
     ) 
     # Add this to our server choices 
     self.servers.append(server) 
     # Update our buckets 
     self.buckets.append(server) 


def random_key(size): 
    """ Generates a random key 
    """ 
    return ''.join(random.choice(string.letters) for _ in range(size)) 


if __name__ == '__main__': 
    # We have 7 running memcached servers 
    servers = ['127.0.0.1:1121%d' % i for i in range(1,8)] 
    # We have 100 keys to split across our servers 
    keys = [random_key(10) for i in range(100)] 
    # Init our subclass 
    client = MemcacheClient(servers=servers) 
    # Distribute the keys on our servers 
    for key in keys: 
     client.set(key, 1) 

    # Check how many keys come back 
    valid_keys = client.get_multi(keys) 
    print '%s percent of keys matched' % ((len(valid_keys)/float(len(keys))) * 100) 

    # We add another server...and pow! 
    client.add_server('127.0.0.1:11219') 
    print 'Added new server' 

    valid_keys = client.get_multi(keys) 
    print '%s percent of keys stil matched' % ((len(valid_keys)/float(len(keys))) * 100) 

回答

0

這對我有效......在創建新的主機條目之前添加一個條件。如果服務器是None,那麼執行server = memcahce。行

3

那麼,基本上你必須重寫_get _server()方法來更改服務器分佈算法。

我已經做了一些互聯網搜索,發現這篇文章在谷歌,amix.dk/blog/post/19367,這是一個非常好的材料,由Amir Salihefendic編寫,這有助於瞭解ketama一致哈希算法是如何工作的,也有由他製作的名爲HashRing的Python類上的ketama實現。

因此,我基本上使用他的類,並改變了一點,以適應Memcached客戶端的需求。修改是這是depprecated的MD5模塊的變化,並用於產生從該服務器密鑰的字符串的變化:

key = self.gen_key('%s:%s' % (node, i)) 

到:

key = self.gen_key(
      '%s:%s:%s:%s' % (node.address[0], 
      node.address[1], i, node.weight) 
    ) 

我也固定一個錯誤當算法沒有在第一個循環找到服務器時,這會在get_nodes()方法上導致無限循環。

舊的get_nodes()方法(如果沒有服務器產生,將進入無限循環)。

def get_nodes(self, string_key): 
    """Given a string key it returns the nodes as a generator that can hold the key. 

    The generator is never ending and iterates through the ring 
    starting at the correct position. 
    """ 
    if not self.ring: 
     yield None, None 

    node, pos = self.get_node_pos(string_key) 
    for key in self._sorted_keys[pos:]: 
     yield self.ring[key] 

    while True: 
     for key in self._sorted_keys: 
      yield self.ring[key] 

新get_nodes()方法:

def get_nodes(self, string_key): 
    if not self.ring: 
     yield None, None 

    node, pos = self.get_node_pos(string_key) 
    for key in self._sorted_keys[pos:]: 
     if key in self.ring: 
      yield self.ring[key] 

    for key in self._sorted_keys[:pos]: 
     if key in self.ring: 
      yield self.ring[key] 

我對remove_node()方法添加上add_node(新的for循環的範圍),以及要考慮服務器的添加重量更多副本。

老辦法:

for i in xrange(0, self.replicas): 
    key = self.gen_key('%s:%s' % (node, i)) 
    self.ring[key] = node 
    self._sorted_keys.append(key) 

新方法:

for i in xrange(0, self.replicas): 
    for x in range(0, node.weight): 
     key = self.gen_key(
      '%s:%s:%s:%s' % (node.address[0], 
      node.address[1], i, node.weight) 
     ) 

     if key not in self.ring: 
      self.ring[key] = node 
      self._sorted_keys.append(key) 

上面的代碼方面的add_node()方法,但一些概念適用於remove_node()。

那麼,也許還有一些其他的改變,我現在只是不記得任何其他的改變。這是適合HashRing類:

from hashlib import md5  
class HashRing(object): 

    def __init__(self, nodes=None, replicas=3): 
     """Manages a hash ring. 

     `nodes` is a list of objects that have a proper __str__ representation. 
     `replicas` indicates how many virtual points should be used pr. node, 
     replicas are required to improve the distribution. 
     """ 
     self.replicas = replicas 

     self.ring = dict() 
     self._sorted_keys = [] 

     if nodes: 
      for node in nodes: 
       self.add_node(node) 

    def add_node(self, node): 
     """Adds a `node` to the hash ring (including a number of replicas). 
     """ 
     for i in xrange(0, self.replicas): 
      """This will ensure that a server with a bigger weight will have 
      more copies into the ring increasing it's probability to be retrieved. 
      """ 
      for x in range(0, node.weight): 
       key = self.gen_key(
        '%s:%s:%s:%s' % (node.address[0], 
        node.address[1], i, node.weight) 
       ) 

       if key not in self.ring: 
        self.ring[key] = node 
        self._sorted_keys.append(key) 

     self._sorted_keys.sort() 

    def remove_node(self, node): 
     """Removes `node` from the hash ring and its replicas. 
     """ 
     for i in xrange(0, self.replicas): 
      for x in range(node.weight): 
       key = self.gen_key(
        '%s:%s:%s:%s' % (node.address[0], 
        node.address[1], i, node.weight) 
       ) 

       if key in self.ring: 
        del self.ring[key] 
        self._sorted_keys.remove(key) 

    def get_node(self, string_key): 
     """ 
     Given a string key a corresponding node in the hash ring is returned. 

     If the hash ring is empty, `None` is returned. 
     """ 
     return self.get_node_pos(string_key)[0] 

    def get_node_pos(self, string_key): 
     """Given a string key a corresponding node in the hash ring is returned 
     along with it's position in the ring. 

     If the hash ring is empty, (`None`, `None`) is returned. 
     """ 
     if not self.ring: 
      return None, None 

     key = self.gen_key(string_key) 

     nodes = self._sorted_keys 
     for i in xrange(0, len(nodes)): 
      node = nodes[i] 
      if key <= node: 
       return self.ring[node], i 

     return self.ring[nodes[0]], 0 

    def get_nodes(self, string_key): 
     """Given a string key it returns the nodes as a generator that can hold 
     the key. 

     The generator is never ending and iterates through the ring 
     starting at the correct position. 
     """ 
     if not self.ring: 
      yield None, None 

     node, pos = self.get_node_pos(string_key) 
     for key in self._sorted_keys[pos:]: 
      if key in self.ring: 
       yield self.ring[key] 

     for key in self._sorted_keys[:pos]: 
      if key in self.ring: 
       yield self.ring[key] 

    @staticmethod 
    def gen_key(key): 
     """Given a string key it returns a long value, 
     this long value represents a place on the hash ring. 

     md5 is currently used because it mixes well. 
     """ 
     m = md5() 
     m.update(key) 
     return long(m.hexdigest(), 16) 

我改變了你的類中的一點,以使其成爲決定何時使用ketama算法,或默認更加靈活 - 模。

我注意到,在編寫add_server()方法時,您忘記了在將服務器附加到存儲區列表時考慮服務器的權重。

因此,這是新MemcacheClient會是什麼樣子:

from consistent_hash import HashRing 


class MemcacheClient(memcache.Client): 
    """ A memcache subclass. It currently allows you to add a new host at run 
    time. 
    """ 
    available_algorithms = ['ketama', 'modulo'] 
    hash_algorithm_index = 0 

    def __init__(self, hash_algorithm='ketama', *args, **kwargs): 
     super(MemcacheClient, self).__init__(*args, **kwargs) 

     if hash_algorithm in self.available_algorithms: 
      self.hash_algorithm_index = self.available_algorithms.index(
       hash_algorithm) 

      if hash_algorithm == 'ketama': 
       self.consistent_hash_manager = HashRing(nodes=self.servers) 
      else: 
       self.consistent_hash_manager = None 
     else: 
      raise Exception(
       "The algorithm \"%s\" is not implemented for this client. The " 
       "options are \"%s\"" 
       "" % (hash_algorithm, " or ".join(self.available_algorithms)) 
      ) 

    def _get_server(self, key): 
     """ Returns the most likely server to hold the key 
     """ 

     if self.hash_algorithm == 'ketama': 
      """ Basic concept of the Implementation of ketama algorithm 
      e.g. ring = {100:server1, 110:server2, 120:server3, 140:server4} 
      If the hash of the current key is 105, it server will be the next 
      bigger integer in the ring which is 110 (server2) 
      If a server is added on position 108 the key will be now allocated 
      to it and not to server 110. Otherwise if the server on position 
      110 is removed the key will now belong to de server 120. 
      If there's no bigger integer position in the ring then the hash of 
      the key, it will take the first server from the ring. 
      """ 
      # The variable "servers" is the list of the servers in the ring 
      # starting from the next bigger integer to the hash of the key, 
      # till it finds the one that holds the key 
      servers_generator = self.consistent_hash_manager.get_nodes(key) 
      for server in servers_generator: 
       if server.connect(): 
        #print server.address[1] 
        return server, key 
      return None, None 

     else: 
      return super(MemcacheClient, self)._get_server(key) 

    def add_server(self, server): 
     """ Adds a host at runtime to client 
     """ 

     # Uncomment this to protect the Client from adding a server in case 
     # there's no reliable consistent hash algorithm such as MODULO 
     """ 
     if not self.consistent_hash_manager: 
      raise Exception("The current consistent hash algorithm (\"%s\") is" 
          " not reliable for adding a new server" 
          "" % self.hash_algorithm) 
     """ 

     # Create a new host entry 
     server = memcache._Host(
      server, self.debug, dead_retry=self.dead_retry, 
      socket_timeout=self.socket_timeout, 
      flush_on_reconnect=self.flush_on_reconnect 
     ) 
     # Add this to our server choices 
     self.servers.append(server) 

     """This for statement will ensure that a server with a bigger weight 
     will have more copies into the buckets increasing it's probability to 
     be retrieved. 
     """ 
     for i in range(server.weight): 
      self.buckets.append(server) 

     # Adds this node to the circle 
     if self.consistent_hash_manager: 
      self.consistent_hash_manager.add_node(server) 

def random_key(size): 
    """ Generates a random key 
    """ 
    return ''.join(random.choice(string.letters) for _ in range(size)) 


def run_consistent_hash_test(client_obj): 
    # We have 500 keys to split across our servers 
    keys = [random_key(100) for i in range(500)] 

    print(
     "\n/////////// CONSISTENT HASH ALGORITHM \"%s\" //////////////" 
     "" % client_obj.hash_algorithm.upper() 
    ) 

    print("\n->These are the %s servers:" % len(client_obj.servers)) 
    str_servers = "" 
    for server in client_obj.servers: 
     str_servers += "%s:%s, " % (server.address[0], server.address[1]) 
    print("******************************************************************") 
    print(str_servers) 
    print("******************************************************************") 

    # Clear all previous keys from memcache 
    client_obj.flush_all() 

    # Distribute the keys over the servers 
    for key in keys: 
     client_obj.set(key, 1) 

    print(
     "\n%d keys distributed for %d server(s)\n" 
     "" % (len(keys), len(client_obj.servers)) 
    ) 

    # Check how many keys come back 
    valid_keys = client_obj.get_multi(keys) 
    print(
     "%s percent of keys matched, before adding extra servers.\n" \ 
     "" %((len(valid_keys)/float(len(keys))) * 100) 
    ) 

    # Add 5 new extra servers 
    interval_extra_servers = range(19, 24) 
    extra_servers = ['127.0.0.1:112%d' % i for i in interval_extra_servers] 
    for server in extra_servers: 
     client_obj.add_server(server) 

    # Check how many keys come back after adding the extra servers 
    valid_keys = client_obj.get_multi(keys) 
    print (
     "Added %d new server(s).\n%s percent of keys still matched" \ 
     "" % (len(interval_extra_servers), 
     (len(valid_keys)/float(len(keys))) * 100) 
    ) 

    print("\n***************************************************************" 
      "****\n") 
if __name__ == '__main__': 
    # We have 8 running memcached servers 
    interval_servers = range(11, 19) 
    servers = ['127.0.0.1:112%d' % i for i in interval_servers] 
    """ 
    Init our subclass. The hash_algorithm paramether can be "modulo"<- 
    (default) or "ketama" (the new one). 
    """ 
    client = MemcacheClient(servers=servers, hash_algorithm='ketama') 
    run_consistent_hash_test(client) 

如果你直接在終端上運行這個類,它會顯示一個合適的輸出

+0

在這裏在我的github我創建了一個簡單的測試,以超越上面的實現:[鏈接到我的github](https://github.com/mau21mau/python-ketama) – Mauricio

0

我知道這是爲時已晚來回答這個問題,但我希望這會對一些人有所幫助。我有工作課,你可以直接使用。這將是原來的memcache.Client的下降。

class KetamaMemcacheClient(memcache.Client): 
    """ 
    This memcache client implements consistent hashing algorithm "ketama". 

    This will make sure that the cache miss happening while adding or removing 
    a node from the client to very minimal. 
    """ 

    # 
    # Server weight means, numer of slots given for one server. For better 
    # performence it whould be between 100-200 - Adjust the weight to see how 
    # cache miss changing. 
    # 
    DEFAULT_SERVER_WEIGHT = 200 

    # Total number of slots on the ring. 
    # If addition or deltion of a new node only causes 1 to 5 percentage cache 
    # miss on the current configuraiton. ie; K/RING_SIZE - where K means total 
    # keys. 
    RING_SIZE = 2 ** 16 

    def __init__(self, *args, **kwargs): 
     """ 
     Add some special parameters to handle the servers allocation. 
     """ 
     # Mapping between ring slot -> server. 
     self._ketama_server_ring = {} 

     # Sorted server slots on top of the virtual ring. 
     self._ketama_server_slots = [] 

     super(KetamaMemcacheClient, self).__init__(*args, **kwargs) 

    def _get_server(self, key): 
     """ 
     Get the memcache server corresponding to the given key. 
     :param key: The input query. 

     :return: A tuple with (server_obj, key). 
     """ 
     # map the key on to the ring slot space. 
     h_key = self._generate_ring_slot(key) 

     for slot in self._ketama_server_slots: 
      if h_key <= slot: 
       server = self._ketama_server_ring[slot] 
       if server.connect(): 
        return (server, key) 

     # Even after allocating the server, if the h_key won't fit 
     # on any server, then pick the first server on the ring. 
     server = self._ketama_server_ring[self._ketama_server_slots[0]] if \ 
       self._ketama_server_slots else None 

     server and server.connect() 

     return server, key 

    def add_server(self, server): 
     """ 
     Add new server to the client. 

     :param servers: server host in <IP>:<PORT> format. 
         or in tuple of (<IP>:<PORT>, weight) 
     """ 
     server_obj = memcache._Host(
      server if isinstance(server, tuple) else (
       server, self.DEFAULT_SERVER_WEIGHT), 
      self.debug, dead_retry=self.dead_retry, 
      socket_timeout=self.socket_timeout, 
      flush_on_reconnect=self.flush_on_reconnect) 

     self._place_server_on_ring(server_obj) 

    def set_servers(self, servers): 
     """ 
     Add a pool of servers into the client. 

     :param servers: List of server hosts in <IP>:<PORT> format. 
         or 
         List of tuples with each tuple of the format 
         (<IP>:<PORT>, weight) 
     """ 
     # Set the default weight if weight isn't passed. 
     self.servers = [memcache._Host(
      s if isinstance(s, tuple) else (s, self.DEFAULT_SERVER_WEIGHT), 
      self.debug, dead_retry=self.dead_retry, 
      socket_timeout=self.socket_timeout, 
      flush_on_reconnect=self.flush_on_reconnect) for s in servers] 

     # Place all the servers on rings based on the slot allocation 
     # specifications. 
     [self._place_server_on_ring(s) for s in self.servers] 

    def _place_server_on_ring(self, server): 
     """ 
     Place given server on the ring. 
     :param server: An instance of :class:~`memcache._Host`. 
     """ 
     server_slots = self._get_server_slots_on_ring(server) 
     for slot in server_slots: 
      if slot not in self._ketama_server_ring: 
       self._ketama_server_ring[slot] = server 
       self._ketama_server_slots.append(slot) 
      else: 
       # There is a key collection(<<<1% chance). 
       # Discarding this scenario now. 
       # TODO: Handle it. 
       pass 

     # Append the sorted server slot list 
     self._ketama_server_slots.sort() 

    def _get_server_slots_on_ring(self, server): 
     """ 
     Returns list of slot on the ring for given server. 

     This make sure that the slots won't collid with others server. 
     :param server: An object of :class:~`memcache._Host`. 

     :return: list of slots on the ring. 
     """ 
     server_slots = [] 

     for i in range(0, server.weight): 
      server_key = "{}_{}".format("{}:{}".format(server.ip, 
                 server.port), i) 

      server_slots.append(self._generate_ring_slot(server_key)) 

     return server_slots 

    def _generate_ring_slot(self, key): 
     """ 
     Hash function which give random slots on the ring. Hash functon make 
     sure that the key distribution is even as much as possible. 

     :param key: Key which need to be mapped to the hash space. 
     :type key: str 

     :return: hash key corresponding to `key` 
     """ 
     # Simple hash method using python's internal hash algorithm. 
     #h_key = hash(key) & 0xffff 

     # crc32 based hashing 
     #h_key = ((crc32(key) & 0xffffffff) >> 16) & 0xffff 

     # For better randomness 
     h_key = ((crc32(key) & 0xffffffff)) & 0xffff 

     return h_key 


client = KetamaMemcacheClient(servers) 
# This change in number of servers only affect very few key misses. 
client.add_server('127.0.0.1:11218') 

我還沒有添加remove_server方法從配置的服務器列表中刪除了一些死的服務器。通過保持倒置的服務器映射並刪除分配給該服務器的插槽,這非常容易。

享受!