2016-08-15 43 views
1

假設我收到了一系列文件。我需要標記它們,然後將它們轉化爲進一步工作的載體。正如我發現elasticsearch的tokenizer比我自己的解決方案好得多,我正在切換到這一點。但是,它相當慢。然後,預期最終結果將被輸入到流中的矢量化器中。如何使與dask異步的腳本的一部分?

整個過程可搭配發電機

def fetch_documents(_cursor): 
    with _cursor: 
     # a lot of documents expected, may not fit in memory 
     _cursor.execute('select ... from ...') 

     for doc in _cursor: 
      yield doc 

def tokenize(documents): 
    for doc in documents: 
     yield elasticsearch_tokenize_me(doc) 

def build_model(documents): 
    some_model = SomeModel() 

    for doc in documents: 
     some_model.add_document(doc) 

    return some_model 

build_model(tokenize(fetch_documents)) 

所以這基本上是工作正常,但並不利用所有可用的處理能力的鏈表來完成。由於dask在其他相關項目中使用,我嘗試調整並獲得此(我正在使用psycopg2進行數據庫訪問)。

from dask import delayed 
import psycopg2 
import psycopg2.extras 
from elasticsearch import Elasticsearch 
from elasticsearch.client import IndicesClient 

def loader(): 
    conn = psycopg2.connect() 

    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) 
    cur.execute(''' 
       SELECT document, ... FROM ... 
       ''') 
    return cur 

@delayed 
def tokenize(partition): 
    result = [] 

    client = IndicesClient(Elasticsearch()) 

    for row in partition: 
     _result = client.analyze(analyzer='standard', text=row['document']) 
     result.append(dict(row, 
          tokens=tuple(item['token'] for item in _result['tokens']))) 

    return result 

@delayed 
def build_model(sequence_of_data): 
    some_model = SomeModel() 

    for item in chain.from_iterable(sequence_of_data): 
     some_model.add_document(item) 

    return some_model 

with loader() as cur: 
    partitions = [] 

    for idx_start in range(0, cur.rowcount, 200): 
     partitions.append(delayed(cur.fetchmany)(200)) 

    tokenized = [] 
    for partition in partitions: 
     tokenized.append(tokenize(partition)) 

    result = do_something(tokenized) 
    result.compute() 

代碼或多或少的工作,除了最後所有的文件被標記化,然後被饋送到模型中。雖然這適用於較小的數據收集,但不適用於大量數據收集(由於內存消耗巨大)。我應該使用普通的concurrent.futures進行這項工作,還是我錯誤地使用了dask?

回答

1

一個簡單的解決辦法是在本地加載您的計算機上的數據(這是很難劃分單個SQL查詢),然後將數據發送到DASK集羣爲昂貴的令牌化步驟。也許如下:

rows = cur.execute(''' SELECT document, ... FROM ... ''') 

from toolz import partition_all, concat 
partitions = partition_all(10000, rows) 

from dask.distributed import Executor 
e = Executor('scheduler-address:8786') 

futures = [] 

for part in partitions: 
    x = e.submit(tokenize, part) 
    y = e.submit(process, x) 
    futures.append(y) 

results = e.gather(futures) 
result = list(concat(results)) 

在這個例子中,函數tokenize和進程期望消耗並返回一個元素列表。

1

只需使用concurrent.futures的工作

from concurrent.futures import ProcessPoolExecutor 

def loader(): 
    conn = psycopg2.connect() 

    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) 
    cur.execute(''' 
       SELECT document, ... FROM ... 
       ''') 
    return cur 

def tokenize(partition): 
    result = [] 

    client = IndicesClient(Elasticsearch()) 

    for row in partition: 
     _result = client.analyze(analyzer='standard', text=row['document']) 
     result.append(dict(row, 
          tokens=tuple(item['token'] for item in _result['tokens']))) 

    return result 

def do_something(partitions, total): 
    some_model = 0 
    for partition in partitions: 
     result = partition.result() 

     for item in result: 
      some_model.add_document(item) 

    return some_model 

with loader() as cur, \ 
    ProcessPoolExecutor(max_workers=8) as executor: 
    print(cur.rowcount) 
    partitions = [] 

    for idx_start in range(0, cur.rowcount, 200): 
     partitions.append(executor.submit(tokenize, 
              cur.fetchmany(200))) 

    build_model(partitions)