此實現使用固定大小和偏移量的塊。如果數據塊非常大並且網絡速度非常慢,則讀取可能會阻塞很長時間(考慮從塊的最後一個字節處開始讀取,它將不得不等待整個前一個塊加載,然後下一個塊)。
理想情況下,我們可以使用任意大小和位置的塊,因此我們可以優化負載以在讀取點處開始。但以下是一個很好的80%解決方案。
import boto
import threading
import tempfile
import os
DEFAULT_CHUNK_SIZE = 2**20 * 64 # 64 MB per request
class BigFile(object):
def __init__(self, file_obj, file_size, chunksize=DEFAULT_CHUNK_SIZE, start=True):
self._file_obj = file_obj
self._file_size = file_size
self._lock = threading.RLock()
self._load_condition = threading.Condition(self._lock)
self._load_run = True
self._loc = 0
self._chunk_size = chunksize
chunk_count = self._file_size // self._chunk_size
chunk_count += 1 if self._file_size % self._chunk_size else 0
self._chunks = [None for _ in xrange(chunk_count)]
self._load_thread = threading.Thread(target=self._load)
if start:
self._load_thread.start()
def _chunk_loc(self):
' Returns (chunk_num, chunk_offset) for a given location in the larger file '
return self._loc // self._chunk_size, self._loc % self._chunk_size
def _load_chunk(self, chunk_num):
tf = tempfile.TemporaryFile()
start_idx = chunk_num * self._chunk_size
self._file_obj.seek(start_idx)
tf.write(self._file_obj.read(self._chunk_size))
with self._lock:
self._chunks[chunk_num] = (tf, tf.tell()) # (tempfile, size)
self._load_condition.notify()
def _load(self):
while self._load_run:
# check current chunk, load if needed
with self._lock:
chunk_num, _ = self._chunk_loc()
chunk_and_size = self._chunks[chunk_num]
if chunk_and_size is None:
self._load_chunk(chunk_num)
# find next empty chunk
for i in xrange(len(self._chunks)):
cur_chunk = chunk_num + i
cur_chunk %= len(self._chunks) # loop around
if self._chunks[cur_chunk] is None:
self._load_chunk(cur_chunk)
break
else:
# all done, stop thread
break
def seek(self, loc, rel=os.SEEK_SET):
with self._lock:
if rel == os.SEEK_CUR:
self._loc += loc
elif rel == os.SEEK_SET:
self._loc = loc
elif rel == os.SEEK_END:
self._loc = self._file_size + loc
def read(self, bytes_to_read):
ret = []
with self._lock:
chunk_num, chunk_offset = self._chunk_loc()
while (bytes_to_read > 0 or bytes_to_read == -1) and chunk_num < len(self._chunks):
while not self._chunks[chunk_num]:
self._load_condition.wait()
chunk, size = self._chunks[chunk_num]
cur_chunk_bytes = min(self._chunk_size-chunk_offset, bytes_to_read, size)
chunk.seek(chunk_offset, os.SEEK_SET)
data = chunk.read(cur_chunk_bytes)
ret.append(data)
bytes_to_read -= len(data)
chunk_num += 1
return ''.join(ret)
def start(self):
self._load_thread.start()
def join(self):
self._load_thread.join()
def stop(self):
self._load_run = False
class S3RangeReader:
def __init__(self, key_obj):
self._key_obj = key_obj
self.size = self._key_obj.size
self._pos = 0
def __len__(self):
return self.size
def seek(self, pos, rel=os.SEEK_SET):
if rel == os.SEEK_CUR:
self._pos += pos
elif rel == os.SEEK_SET:
self._pos = pos
elif rel == os.SEEK_END:
self._pos = self.size + pos
def read(self, bytes=-1):
if bytes == 0 or self._pos >= self.size:
return ''
else:
if bytes == -1:
bytes = self.size
headers = {'Range': 'bytes=%s-%s' % (self._pos, self._pos + bytes - 1)} # S3 ranges are closed ranges: [start,end]
return self._key_obj.get_contents_as_string(headers=headers)
if __name__ == '__main__':
key = boto.s3_connect().get_bucket('mybucket').get_key('my_key')
reader = S3RangeReader(key)
bf = BigFile(reader, len(reader)) # download starts by default
bf.seek(1000000)
bf.read(100) # blocks
bf.seek(0)
bf.read(100) # should not block
一個問題也會很好;) – Blender
「是否有一個簡單的數據結構可以用來存儲文件的任意間隔?」是個問題。 –