2016-12-06 109 views
0

我剛開始學習TensorFlow。我想從hdfs中的csv文件中讀取一個3x3矩陣,並將其與自身相乘。從TensorFlow中的文件中讀取一個(批量)矩陣

文件看起來是這樣的:

1,2,3 
4,5,6 
7,8,9 

到目前爲止,我可以想出了下面的代碼與TensorFlow tutorial的幫助:

def read_and_decode(filename_queue): 
    reader = tf.TextLineReader() 
    key, value = reader.read(filename_queue) 

    # Type information and column names based on the decoded CSV. 
    record_defaults = [[0.0], [0.0], [0.0]] 
    f1,f2,f3 = tf.decode_csv(value, record_defaults=record_defaults) 

    # Turn the features back into a tensor. 
    features = tf.pack([ 
    f1, 
    f2, 
    f3]) 

    return features 

def input_pipeline(filename_queue, batch_size, num_threads): 
    example = read_and_decode(filename_queue) 
    min_after_dequeue = 10000 
    capacity = min_after_dequeue + 3 * batch_size 

    example_batch = tf.train.batch(
     [example], batch_size=batch_size, capacity=capacity, 
     num_threads=num_threads, allow_smaller_final_batch=True) 
    return example_batch 


def get_all_records(FILE): 
with tf.Session() as sess: 
    filename_queue = tf.train.string_input_producer([FILE], num_epochs=1, shuffle=False) 
    batch_size = 1 
    num_threads = 4 
    #batch = input_pipeline(filename_queue, batch_size, num_threads) 
    batch = read_and_decode(filename_queue) 
    init_op = tf.local_variables_initializer() 
    sess.run(init_op) 
    coord = tf.train.Coordinator() 
    threads = tf.train.start_queue_runners(coord=coord) 
    try: 
    while True: 
     example = sess.run([batch]) 
     print(example) 
    except tf.errors.OutOfRangeError, e: 
    coord.request_stop(e) 
    finally: 
    coord.request_stop() 

    coord.join(threads) 

get_all_records('hdfs://default/test.csv') 

這將打印矩陣中每行正確的順序。但是,當我通過應用input_pipeline()使用批處理時,結果將不會按正確順序排列。

我們也可以在Matrix Market format中讀取文件。這將刪除訂單上的約束。

所以我的問題是我怎樣才能以可擴展的方式(即矩陣真的很大)得到的行(或批次)成矩陣(或批次矩陣),以便我可以應用矩陣乘法,如:

result = tf.matmul(Matrix,Matrix) 
result = tf.batch_matmul(batched_Matrix,batched_Matrix) 

作爲問題的延伸:哪一個是最快的解決方案,特別是當涉及到分佈式執行?

感謝你的幫助, 菲利克斯

回答

0

經過一番研究,我終於可以實現一個工作原型:

def read_and_decode(filename_queue): 
    reader = tf.TextLineReader() 
    key, value = reader.read(filename_queue) 

    # Type information and column names based on the decoded CSV. 
    record_defaults = [[0.0], [0.0], [0.0]] 
    f1,f2,f3 = tf.decode_csv(value, record_defaults=record_defaults) 

    return [f1,f2,f3] 

def cond(sequence_len, step): 
    return tf.less(step,sequence_len) 

def body(sequence_len, step, filename_queue): 
    begin = tf.get_variable("begin",tensor_shape.TensorShape([3, 3]),dtype=tf.float32,initializer=tf.constant_initializer(0)) 
    begin = tf.scatter_update(begin, step, read_and_decode(filename_queue), use_locking=None) 
    tf.get_variable_scope().reuse_variables() 

    with tf.control_dependencies([begin]): 
     return (sequence_len, step+1) 

def get_all_records(FILE): 
with tf.Session() as sess: 

    filename_queue = tf.train.string_input_producer([FILE], num_epochs=1, shuffle=False) 

    b = lambda sl, st: body(sl,st,filename_queue) 

    step = tf.constant(0) 
    sequence_len = tf.constant(3) 
    _,step, = tf.while_loop(cond, 
        b, 
        [sequence_len, step], 
        parallel_iterations=10, 
        back_prop=True, 
        swap_memory=False, 
        name=None) 

    begin = tf.get_variable("begin",tensor_shape.TensorShape([3, 3]),dtype=tf.float32) 

    with tf.control_dependencies([step]): 
     product = tf.matmul(begin, begin) 

    init0 = tf.local_variables_initializer() 
    sess.run(init0) 
    init1 = tf.global_variables_initializer() 
    sess.run(init1) 

    coord = tf.train.Coordinator() 
    threads = tf.train.start_queue_runners(coord=coord) 
    try: 
     print(sess.run([product])) 
    except tf.errors.OutOfRangeError, e: 
     coord.request_stop(e) 
    finally: 
     coord.request_stop() 

    coord.join(threads) 

get_all_records('hdfs://default/data.csv') 

的想法來自:How does the tf.scatter_update() work inside the while_loop() 我想我可以在實現批量版本類似的方式。不過,我很樂意提供任何建議以使其更具性能。

相關問題