2016-06-08 91 views
1

非常簡短..這是一個錯誤還是我錯過了什麼? tmp_j是一個包含單個物品和6個分區的包。不過,我得到 與更大的袋子類似的迴應。Dask Bag.to_textfiles適用於單個分區,但不適用於多個

這種特殊的袋子與構建:

>>> tmp_j = jnode_b.filter(lambda r: (r['node']['attrib']['uid'] == '8909') & 
       (r['node']['attrib']['version'] == '1')).pluck('node').pluck('attire') 

,看起來像:再次

>>> tmp_j.compute() 

[{'changeset': '39455176', 
    'id': '4197394169', 
    'lat': '53.4803608', 
    'lon': '-113.4955328', 
    'timestamp': '2016-05-20T16:43:02Z', 
    'uid': '8909', 
    'user': 'mvexel', 
    'version': '1'}] 

感謝..

>>> tmp_j.repartition(1).map(json.dumps).to_textfiles('tmpA*.json') 

工作正常,(寫入文件),但

>>> tmp_j.map(json.dumps).to_textfiles('tmpA*.json') 

StopIteration        Traceback (most recent call last) 
<ipython-input-28-a77a33e2ff26> in <module>() 
----> 1 tmp_j.map(json.dumps).to_textfiles('tmp*.json') 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(self, path, name_function, compression, encoding, compute) 
    469  def to_textfiles(self, path, name_function=str, compression='infer', 
    470      encoding=system_encoding, compute=True): 
--> 471   return to_textfiles(self, path, name_function, compression, encoding, compute) 
    472 
    473  def fold(self, binop, combine=None, initial=no_default, split_every=None): 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py in to_textfiles(b, path, name_function, compression, encoding, compute) 
    167  result = Bag(merge(b.dask, dsk), name, b.npartitions) 
    168  if compute: 
--> 169   result.compute() 
    170  else: 
    171   return result 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(self, **kwargs) 
    35 
    36  def compute(self, **kwargs): 
---> 37   return compute(self, **kwargs)[0] 
    38 
    39  @classmethod 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs) 
    108     for opt, val in groups.items()]) 
    109  keys = [var._keys() for var in variables] 
--> 110  results = get(dsk, keys, **kwargs) 
    111 
    112  results_iter = iter(results) 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/multiprocessing.py in get(dsk, keys, optimizations, num_workers, func_loads, func_dumps, **kwargs) 
    76   # Run 
    77   result = get_async(apply_async, len(pool._pool), dsk3, keys, 
---> 78       queue=queue, get_id=_process_get_id, **kwargs) 
    79  finally: 
    80   if cleanup: 

/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs) 
    486     _execute_task(task, data) # Re-execute locally 
    487    else: 
--> 488     raise(remote_exception(res, tb)) 
    489   state['cache'][key] = res 
    490   finish_task(dsk, key, state, results, keyorder.get) 

StopIteration: 

Traceback 
--------- 
    File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 267, in execute_task 
    result = _execute_task(task, data) 
    File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task 
    return func(*args2) 
    File "/Users/jlatmann/anaconda/envs/python3/lib/python3.5/site-packages/dask/bag/core.py", line 1024, in write 
    firstline = next(data) 
注意

:是

>>> tmp_b = db.from_sequence(tmp_j,partition_size=3) 
>>> tmp_b.map(json.dumps).to_textfiles('tmp*.json') 

工作正常(但再次重申,tmp_b.npartitions == 1)。

再次感謝您的見解 - 我確實查看了源代碼,但後來發現我的智能/懶惰比例太低。

我會在自信的時候提交文檔我已經掌握了這一點。

+0

見https://github.com/dask/dask/pull/1256 – MRocklin

+0

你不看我,而我鍵入此,你是? (_was_快)。謝謝。 – JMann

回答

1

這是一個真正的錯誤,現在的主人已經解決

In [1]: import dask.bag as db 

In [2]: db.range(5, npartitions=5).filter(lambda x: x == 1).map(str).to_textfiles('*.txt') 

In [3]: ls *.txt 
0.txt 1.txt 2.txt 3.txt 4.txt C:\nppdf32Log\debuglog.txt foo.txt 
+0

非常感謝。 - 目前我正在學習python文檔編譯,以便我可以將我前面提到的dask文檔添加進去。 – JMann

+0

任何幫助將是可愛的:) – MRocklin

相關問題