2016-01-04 64 views
36

我試圖預取訓練數據以隱藏I/O延遲。我想編寫自定義的Python代碼,用於從磁盤加載數據並預處理數據(例如通過添加上下文窗口)。換句話說,一個線程進行數據預處理,另一個線程進行訓練。這在TensorFlow中可能嗎?如何使用張量流中的自定義python函數預取數據

更新:我有一個基於@ mrry示例的工作示例。

import numpy as np 
import tensorflow as tf 
import threading 

BATCH_SIZE = 5 
TRAINING_ITERS = 4100 

feature_input = tf.placeholder(tf.float32, shape=[128]) 
label_input = tf.placeholder(tf.float32, shape=[128]) 

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]]) 
enqueue_op = q.enqueue([label_input, feature_input]) 

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE) 
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128]) 

sess = tf.Session() 

def load_and_enqueue(sess, enqueue_op, coord): 
    with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file: 
    while not coord.should_stop(): 
     feature_array = np.fromfile(feature_file, np.float32, 128) 
     if feature_array.shape[0] == 0: 
     print('reach end of file, reset using seek(0,0)') 
     feature_file.seek(0,0) 
     label_file.seek(0,0) 
     continue 
     label_value = np.fromfile(label_file, np.float32, 128) 

     sess.run(enqueue_op, feed_dict={feature_input: feature_array, 
             label_input: label_value}) 

coord = tf.train.Coordinator() 
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord)) 
t.start() 

for i in range(TRAINING_ITERS): 
    sum = sess.run(c) 
    print('train_iter='+str(i)) 
    print(sum) 

coord.request_stop() 
coord.join([t]) 
+3

我剛剛製作了一個關於隊列的筆記本,這也解釋了類似的用例,我希望它對其他人也有用:https://gist.github.com/akiross/23b6ae42812841bb79af4976a2525cf9 – AkiRoss

回答

49

這是一個常見的情況,大多數實現使用TensorFlow的隊列脫鉤從訓練碼預處理代碼。有a tutorial on how to use queues,但主要步驟如下:

  1. 定義一個隊列,q,將緩衝預處理的數據。 TensorFlow支持簡單的tf.FIFOQueue,它按照它們入隊的順序生成元素,更高級的tf.RandomShuffleQueue以隨機順序生成元素。隊列元素是一個或多個張量的元組(可以具有不同的類型和形狀)。所有隊列都支持單元素(enqueue,dequeue)和批處理(enqueue_many,dequeue_many)操作,但要使用批處理操作,必須在構建隊列時指定隊列元素中每個張量的形狀。

  2. 構建將預處理元素排入隊列的子圖。一種方法是定義一些tf.placeholder()操作符對應於單個輸入示例的張量,然後將它們傳遞給q.enqueue()。 (如果您的預處理一次生成批次,則應該使用q.enqueue_many()代替。)您也可能在此子圖中包含TensorFlow操作。

  3. 構建執行訓練的子圖。這看起來像一個普通的TensorFlow圖表,但會通過調用q.dequeue_many(BATCH_SIZE)來獲得輸入。

  4. 開始你的課程。

  5. 創建一個或多個執行預處理邏輯的線程,然後執行入隊操作,輸入預處理數據。您可能會發現對此有用的tf.train.Coordinatortf.train.QueueRunner實用程序類。

  6. 正常運行你的訓練圖(優化器等)。

編輯:下面是一個簡單load_and_enqueue()功能和代碼片段,讓你開始:

# Features are length-100 vectors of floats 
feature_input = tf.placeholder(tf.float32, shape=[100]) 
# Labels are scalar integers. 
label_input = tf.placeholder(tf.int32, shape=[]) 

# Alternatively, could do: 
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100]) 
# label_batch_input = tf.placeholder(tf.int32, shape=[None]) 

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []]) 
enqueue_op = q.enqueue([feature_input, label_input]) 

# For batch input, do: 
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input]) 

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE) 
# Build rest of model taking label_batch, feature_batch as input. 
# [...] 
train_op = ... 

sess = tf.Session() 

def load_and_enqueue(): 
    with open(...) as feature_file, open(...) as label_file: 
    while True: 
     feature_array = numpy.fromfile(feature_file, numpy.float32, 100) 
     if not feature_array: 
     return 
     label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0] 

     sess.run(enqueue_op, feed_dict={feature_input: feature_array, 
             label_input: label_value}) 

# Start a thread to enqueue data asynchronously, and hide I/O latency. 
t = threading.Thread(target=load_and_enqueue) 
t.start() 

for _ in range(TRAINING_EPOCHS): 
    sess.run(train_op) 
+1

感謝您的建議。我有另一個問題。在我的實驗中,訓練功能和標籤存儲在兩個單獨的二進制文件中。我應該建立兩個隊列,一個用於功能,一個用於標籤?如果我們想從兩個隊列中獲得一對隨機數(功能,標籤),我如何確保該功能對應於正確的標籤?換句話說,我如何保證一對一映射? –

+0

要保留一對一映射,您應該建立一個單一隊列,其中每個元素是特徵張量和標籤張量的元組。您可以通過指定隊列構造函數的類型(和形狀)列表來完成此操作。這確保了相同元組的組件總是一起出隊。 – mrry

+0

功能和標籤分別存儲在兩個大的二進制文件中。所以我需要建立feat_queue = tf.train.string_input_producer(feat_filenames)和label_queue = tf.train.string_input_producer(label_filenames)。然後,我還將有兩個tf.FixedLengthRecordReader分別從label_queue中的feat_queue和label中獲得專長。最後,我將[feat,label]排入另一個隊列。這是問題。當我使用FixedLengthRecordReader獲取專長和標籤時,它們是否總是正確映射? –

6

換句話說,一個線程做數據預處理,另一種則訓練。這在TensorFlow中可能嗎?

是的。 mrry的解決方案有效,但更簡單。

讀取數據

tf.py_func包裝一個Python函數並使用它作爲一個TensorFlow運算符。所以我們可以每次加載數據在sess.run()。這種方法的問題是通過主線程在sess.run()期間加載數據。

小例子:

def get_numpy_tensor(): 
    return np.array([[1,2],[3,4]], dtype=np.float32) 
tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32) 

更復雜的例子:

def get_numpy_tensors(): 
    # Load data from the disk into numpy arrays. 
    input = np.array([[1,2],[3,4]], dtype=np.float32) 
    target = np.int32(1) 
    return input, target 
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) 

tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target 

sess = tf.InteractiveSession() 
numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target]) 
assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2 

在另一個線程

預取數據要排隊在另一個線程我們的數據(這樣sess.run()不會必須等待數據),我們可以從我們的運營商tf.py_func()上使用tf.train.batch()

的最小例如:

tensor_shape = get_numpy_tensor().shape 
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape]) 
# Run `tf.train.start_queue_runners()` once session is created. 

我們可以省略參數shapes如果tensorflow_tensor已經指定其形狀:

tensor_shape = get_numpy_tensor().shape 
tensorflow_tensor.set_shape(tensor_shape) 
tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32) 
# Run `tf.train.start_queue_runners()` once session is created. 

更復雜的例子:

input_shape, target_shape = (2, 2),() 
def get_numpy_tensors(): 
    input = np.random.rand(*input_shape).astype(np.float32) 
    target = np.random.randint(10, dtype=np.int32) 
    print('f', end='') 
    return input, target 
tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) 
batch_size = 2 
tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2) 
# Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`. 

tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets 

sess = tf.InteractiveSession() 
tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it. 
for _ in range(10): 
    numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets]) 
    assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape) 
    print('r', end='') 

# Prints `fffffrrffrfrffrffrffrffrffrffrf`. 

萬一get_numpy_tensor()返回一批張量, n tf.train.batch(..., enqueue_many=True)將有所幫助。

相關問題