我試圖解決的問題如下: 我有一個文件名列表trainimgs
的文件名。我與它的capacity=len(trainimgs)
和min_after_dequeue=0
定義的TensorFlow:從多個線程入隊和出隊隊列
tf.RandomShuffleQueue
。- 對於指定的
epochlimit
,此tf.RandomShuffleQueue
預計將填充trainimgs
次數。 - 許多線程預計並行工作。每個線程從
tf.RandomShuffleQueue
中取出一個元素,並對其執行一些操作並將其排入另一個隊列。我有這個權利。 - 但是,一旦
1 epoch
的trainimgs
已被處理且tf.RandomShuffleQueue
爲空,假設當前時期e < epochlimit
,隊列必須再次被填滿並且線程必須再次工作。
好消息是:我有它一定的情況下工作(見PS在最後!!)
壞消息是:我認爲有這樣做的更好的辦法這個。
我使用要做到這一點,現在是如下所述的方法(I簡化了功能和已刪除基礎的電子圖像處理的預處理和隨後的入隊但處理的心臟保持相同!!):
with tf.Session() as sess:
train_filename_queue = tf.RandomShuffleQueue(capacity=len(trainimgs), min_after_dequeue=0, dtypes=tf.string, seed=0)
queue_size = train_filename_queue.size()
trainimgtensor = tf.constant(trainimgs)
close_queue = train_filename_queue.close()
epoch = tf.Variable(initial_value=1, trainable=False, dtype=tf.int32)
incrementepoch = tf.assign(epoch, epoch + 1, use_locking=True)
supplyimages = train_filename_queue.enqueue_many(trainimgtensor)
value = train_filename_queue.dequeue()
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
sess.run(init_op)
coord = tf.train.Coordinator()
tf.train.start_queue_runners(sess, coord)
sess.run(supplyimages)
lock = threading.Lock()
threads = [threading.Thread(target=work, args=(coord, value, sess, epoch, incrementepoch, supplyimages, queue_size, lock, close_queue)) for i in range(200)]
for t in threads:
t.start()
coord.join(threads)
功函數如下:
def work(coord, val, sess, epoch, incrementepoch, supplyimg, q, lock,\
close_op):
while not coord.should_stop():
if sess.run(q) > 0:
filename, currepoch = sess.run([val, epoch])
filename = filename.decode(encoding='UTF-8')
print(filename + ' ' + str(currepoch))
elif sess.run(epoch) < 2:
lock.acquire()
try:
if sess.run(q) == 0:
print("The previous epoch = %d"%(sess.run(epoch)))
sess.run([incrementepoch, supplyimg])
sz = sess.run(q)
print("The new epoch = %d"%(sess.run(epoch)))
print("The new queue size = %d"%(sz))
finally:
lock.release()
else:
try:
sess.run(close_op)
except tf.errors.CancelledError:
print('Queue already closed.')
coord.request_stop()
return None
所以,儘管這個作品,我有一種感覺,有一個更好的和更清潔的方式來實現這一目標。所以,簡而言之,我的問題是:
- 在TensorFlow中實現此任務有沒有更簡單更清晰的方法?
- 這段代碼的邏輯有問題嗎?我對多線程場景不是很有經驗,所以任何忽略我的注意力的明顯缺陷都會對我很有幫助。
P.S:看來這段代碼並不完美。當我運行120萬個圖像和200個線程時,它運行。然而,當我運行了10張和20個線程,它提供了以下錯誤:
CancelledError (see above for traceback): RandomShuffleQueue '_0_random_shuffle_queue' is closed.
[[Node: random_shuffle_queue_EnqueueMany = QueueEnqueueManyV2[Tcomponents=[DT_STRING], timeout_ms=-1, _device="/job:localhost/replica:0/task:0/cpu:0"](random_shuffle_queue, Const)]]
我想我得到了涵蓋except tf.errors.CancelledError
。這到底是怎麼回事 ?
謝謝,但我想使用多個線程來加速,因爲有我需要做的複雜預處理步驟 – Ujjwal
您可以使用一個線程將文件名排入主隊列,然後使用多個線程將這些文件名出列,預處理,並將它們排入最終隊列。 –