假設我收到了一系列文件。我需要標記它們,然後將它們轉化爲進一步工作的載體。正如我發現elasticsearch的tokenizer比我自己的解決方案好得多,我正在切換到這一點。但是,它相當慢。然後,預期最終結果將被輸入到流中的矢量化器中。如何使與dask異步的腳本的一部分?
整個過程可搭配發電機
def fetch_documents(_cursor):
with _cursor:
# a lot of documents expected, may not fit in memory
_cursor.execute('select ... from ...')
for doc in _cursor:
yield doc
def tokenize(documents):
for doc in documents:
yield elasticsearch_tokenize_me(doc)
def build_model(documents):
some_model = SomeModel()
for doc in documents:
some_model.add_document(doc)
return some_model
build_model(tokenize(fetch_documents))
所以這基本上是工作正常,但並不利用所有可用的處理能力的鏈表來完成。由於dask在其他相關項目中使用,我嘗試調整並獲得此(我正在使用psycopg2進行數據庫訪問)。
from dask import delayed
import psycopg2
import psycopg2.extras
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
def loader():
conn = psycopg2.connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('''
SELECT document, ... FROM ...
''')
return cur
@delayed
def tokenize(partition):
result = []
client = IndicesClient(Elasticsearch())
for row in partition:
_result = client.analyze(analyzer='standard', text=row['document'])
result.append(dict(row,
tokens=tuple(item['token'] for item in _result['tokens'])))
return result
@delayed
def build_model(sequence_of_data):
some_model = SomeModel()
for item in chain.from_iterable(sequence_of_data):
some_model.add_document(item)
return some_model
with loader() as cur:
partitions = []
for idx_start in range(0, cur.rowcount, 200):
partitions.append(delayed(cur.fetchmany)(200))
tokenized = []
for partition in partitions:
tokenized.append(tokenize(partition))
result = do_something(tokenized)
result.compute()
代碼或多或少的工作,除了最後所有的文件被標記化,然後被饋送到模型中。雖然這適用於較小的數據收集,但不適用於大量數據收集(由於內存消耗巨大)。我應該使用普通的concurrent.futures
進行這項工作,還是我錯誤地使用了dask?