2012-01-26 95 views
2

我想允許一個python應用程序訪問存儲在S3中的許多GB文件中的各個位置。我希望創建一個嵌入式替換文件類對象,以便在單獨的線程中智能地從S3下載數據塊以滿足seek()和read()請求。從S3下載透明背景文件

是否有一個簡單的數據結構可以用來存儲文件的任意間隔?

它必須支持O(log n)查找和O(n)插入(n =塊數,而不是文件大小)。它還需要支持快速查詢間隙,以便加載線程可以有效地找到它應該下載的下一個塊。目前不支持SortedCollection之類的東西,這表明我可能需要在新容器中手動使用bisect_ *。

實施例的用法是:

import os 
import time 
from bigfile import BigFile 

chunksize = (2**20)*64 # 64MB 

bf = BigFile('my_bucket', 'key_name', chunksize=chunksize) 

# read from beginning (blocks until first chunk arrives) 
bf.read(100) 

# continues downloading subsequent chunks in background 
time.sleep(10) 

# seek into second chunk and read (should not block) 
bf.seek(blocksize, os.SEEK_SET) 
bf.read(100) 

# seek far into the file 
bf.seek(blocksize*100 + 54, os.SEEK_SET) # triggers chunk download starting at new location 
bf.read(100) # blocks until chunk arrives 

# seek back to beginning (should not block, already have this chunk) 
bf.seek(0, os.SEEK_SET) 
bf.read(100) 

# read entire rest of file (blocks until all chunks are downloaded) 
bf.read() 
+0

一個問題也會很好;) – Blender

+0

「是否有一個簡單的數據結構可以用來存儲文件的任意間隔?」是個問題。 –

回答

1

此實現使用固定大小和偏移量的塊。如果數據塊非常大並且網絡速度非常慢,則讀取可能會阻塞很長時間(考慮從塊的最後一個字節處開始讀取,它將不得不等待整個前一個塊加載,然後下一個塊)。

理想情況下,我們可以使用任意大小和位置的塊,因此我們可以優化負載以在讀取點處開始。但以下是一個很好的80%解決方案。

import boto 
import threading 
import tempfile 
import os 

DEFAULT_CHUNK_SIZE = 2**20 * 64 # 64 MB per request 

class BigFile(object): 
    def __init__(self, file_obj, file_size, chunksize=DEFAULT_CHUNK_SIZE, start=True): 
     self._file_obj = file_obj 
     self._file_size = file_size 
     self._lock = threading.RLock() 
     self._load_condition = threading.Condition(self._lock) 
     self._load_run = True 
     self._loc = 0 
     self._chunk_size = chunksize 
     chunk_count = self._file_size // self._chunk_size 
     chunk_count += 1 if self._file_size % self._chunk_size else 0 
     self._chunks = [None for _ in xrange(chunk_count)] 
     self._load_thread = threading.Thread(target=self._load) 
     if start: 
      self._load_thread.start() 

    def _chunk_loc(self): 
     ' Returns (chunk_num, chunk_offset) for a given location in the larger file ' 
     return self._loc // self._chunk_size, self._loc % self._chunk_size 

    def _load_chunk(self, chunk_num): 
     tf = tempfile.TemporaryFile() 
     start_idx = chunk_num * self._chunk_size 
     self._file_obj.seek(start_idx) 
     tf.write(self._file_obj.read(self._chunk_size)) 
     with self._lock: 
      self._chunks[chunk_num] = (tf, tf.tell()) # (tempfile, size) 
      self._load_condition.notify() 

    def _load(self): 
     while self._load_run: 
      # check current chunk, load if needed 
      with self._lock: 
       chunk_num, _ = self._chunk_loc() 
       chunk_and_size = self._chunks[chunk_num] 
      if chunk_and_size is None: 
       self._load_chunk(chunk_num) 

      # find next empty chunk 
      for i in xrange(len(self._chunks)): 
       cur_chunk = chunk_num + i 
        cur_chunk %= len(self._chunks) # loop around 
       if self._chunks[cur_chunk] is None: 
        self._load_chunk(cur_chunk) 
        break 
      else: 
       # all done, stop thread 
       break 

    def seek(self, loc, rel=os.SEEK_SET): 
     with self._lock: 
      if rel == os.SEEK_CUR: 
       self._loc += loc 
      elif rel == os.SEEK_SET: 
       self._loc = loc 
      elif rel == os.SEEK_END: 
       self._loc = self._file_size + loc 

    def read(self, bytes_to_read): 
     ret = [] 
     with self._lock: 
      chunk_num, chunk_offset = self._chunk_loc() 
      while (bytes_to_read > 0 or bytes_to_read == -1) and chunk_num < len(self._chunks): 
       while not self._chunks[chunk_num]: 
        self._load_condition.wait() 
       chunk, size = self._chunks[chunk_num] 
       cur_chunk_bytes = min(self._chunk_size-chunk_offset, bytes_to_read, size) 
       chunk.seek(chunk_offset, os.SEEK_SET) 
       data = chunk.read(cur_chunk_bytes) 
       ret.append(data) 
       bytes_to_read -= len(data) 
       chunk_num += 1 
     return ''.join(ret) 

    def start(self): 
     self._load_thread.start() 

    def join(self): 
     self._load_thread.join() 

    def stop(self): 
     self._load_run = False 

class S3RangeReader: 
    def __init__(self, key_obj): 
     self._key_obj = key_obj 
     self.size = self._key_obj.size 
     self._pos = 0 

    def __len__(self): 
     return self.size 

    def seek(self, pos, rel=os.SEEK_SET): 
     if rel == os.SEEK_CUR: 
      self._pos += pos 
     elif rel == os.SEEK_SET: 
      self._pos = pos 
     elif rel == os.SEEK_END: 
      self._pos = self.size + pos 

    def read(self, bytes=-1): 
     if bytes == 0 or self._pos >= self.size: 
      return '' 
     else: 
      if bytes == -1: 
       bytes = self.size 
      headers = {'Range': 'bytes=%s-%s' % (self._pos, self._pos + bytes - 1)} # S3 ranges are closed ranges: [start,end] 
      return self._key_obj.get_contents_as_string(headers=headers) 

if __name__ == '__main__': 
    key = boto.s3_connect().get_bucket('mybucket').get_key('my_key') 
    reader = S3RangeReader(key) 
    bf = BigFile(reader, len(reader)) # download starts by default 
    bf.seek(1000000) 
    bf.read(100) # blocks 
    bf.seek(0) 
    bf.read(100) # should not block