2014-03-13 25 views
2

Cassandra Python驅動程序的問題在於「未來」對象返回時通過副作用添加回調。這意味着「未來」不可組合,就像Javascript或Scala的Future是可組合的。我想知道如果存在可用於實現轉化非組合的未來成組合的未來的圖案(優選不泄漏存儲器。)Python:Asynchronous Cassandra Inserts

my_query_object.insert(1, 2, 3, 'Fred Flinstone') 
    .insert(1, 2, 3, 'Barney Rubble') 
    .insert(5000, 2, 3, 'George Jetson') 
    .insert(5000, 2, 3, 'Jane his wife') 

在從卡桑德拉的Python駕駛員的performance部尋找Datastax,我看到他們如何創建一個不斷鏈式插入查詢的例子。也就是這種圖案的稍微更復雜的版本:

def insert_next(previous_result=sentinel): 
    if previous_result is not sentinel: 
     if isinstance(previous_result, BaseException): 
      log.error("Error on insert: %r", previous_result) 

    future = session.execute_async(query) 
    # NOTE: this callback also handles errors 
    future.add_callbacks(insert_next, insert_next) 

它作爲一個玩具的例子很好。第一個查詢完成後,再次執行另一個等效查詢。該方案允許他們實現7k次寫入,而不嘗試「鏈接」回調的版本則約爲2k次寫入/秒。

我一直試圖讓我的頭腦創建某種機制,使我能夠重新捕獲確切的機制,但無濟於事。任何人都會想出類似的東西?

回答

1

我花了一點思考如何以某種形式保留未來:一個Queue

import logging 
from Queue import Queue #queue in python 3 
from threading import Event #hmm... this needed? 


insert_logger = logging.getLogger('async_insert') 
insert_logger.setLevel(logging.INFO) 

def handle_err(err): 
    insert_logger.warning('Failed to insert due to %s', err) 


#Designed to work in a high write environment. Chained callbacks for best performance and fast fail/stop when error 
#encountered. Next insert should re-up the writing. Potential loss of failed write. Some guarantee on order of write 
#preservation. 
class CappedQueueInserter(object): 
    def __init__(self, session, max_count=0): 
    self.__queue = Queue(max_count) 
    self.__session = session 
    self.__started = Event() 

    @property 
    def started(self): 
    return self.__started.is_set() 

    def insert(self, bound_statement): 
    if not self.started: 
     self._begin(bound_statement) 
    else: 
     self._enqueue(bound_statement) 

    def _begin(self, bound_statement): 
    def callback(): 
     try: 
     bound = self.__queue.get(True) #block until an item is added to the queue 
     future = self.__session.execute_async(bound) 
     future.add_callbacks(callback, handle_err) 
     except: 
     self.__started.clear() 

    self.__started.set() 
    future = self.__session.execute_async(bound_statement) 
    future.add_callbacks(callback, handle_err) 

    def _enqueue(self, bound_statement): 
    self.__queue.put(bound_statement, True) 


#Separate insert statement binding from the insertion loop 
class InsertEnqueue(object): 
    def __init__(self, prepared_query, insert, consistency_level=None): 
    self.__statement = prepared_query 
    self.__level = consistency_level 
    self.__sink = insert 

    def insert(self, *args): 
    bound = self.bind(*args) 
    self.__sink.insert(bound) 

    @property 
    def consistency_level(self): 
    return self.__level or self.__statement.consistency_level 

    @consistency_level.setter 
    def adjust_level(self, value): 
    if value: 
     self.__level = value 

    def bind(self, *args): 
    bound = self.__statement.bind(*args) 
    bound.consistency_level = self.consistency_level 

    return bound 

組合和Event觸發的東西。假設寫入可以「最終」發生,這應該起作用。