2017-03-16 80 views
0

我運行下面的數據流的代碼作爲JUnit測試類的一部分大文件生成使用谷歌雲數據流

@Test 
public void dataFlowGenerator() { 
    DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); 
    options.setRunner(BlockingDataflowPipelineRunner.class); 
    options.setStagingLocation("gs://mybucket/lt"); 
    options.setProject("myProject"); 
    Pipeline p = Pipeline.create(options); 

    List<String> list = Arrays.asList("sup1", "sup2", "sup3"); 
    p.apply(Create.of(list)).apply(ParDo.of(new generate())).apply(
     TextIO.Write.to("gs://mybucket/lt/df.txt")); 
} 


private class generate extends DoFn<String, String> implements Serializable { 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
    new DoFn<String, String>() { 
     @Override 
     public void processElement(ProcessContext c) { 
     c.output(c.element()); 
     } 
    }; 
    } 
} 

這沒什麼輸出(只是一個空文件df.txt 00000-的-00001 )在雲存儲中提到的文件中。我期待processElement將被調用列表中的每個字符串項目,它們將被輸出到輸出文件。

如何用隨機字符串生成數百萬條記錄。在這種情況下,數據流並不需要任何輸入源。它應該能夠轉換一些種子字符串和輸出。

回答

1

兩件事情:

首先,你不需要做ParDo可言。您的變換隻是標識變換 - 您可以將Create的輸出正確輸入TextIO.Write

但我也想澄清一下,爲什麼您沒有看到goutput:您DoFn子類有一個創建另外一個DoFn方法processElement,只是丟棄值。你會想寫這個代替:

private class Generate extends DoFn<String, String> implements Serializable { 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
    c.output(c.element()); 
    } 
} 
+0

謝謝。創建需要的是內存中的字符串列表。有什麼辦法可以將它們寫入文件並提供它,因爲我需要將近10億行CSV生成 – SoulMan

+0

您是否可以通過關於如何生成數據的一些細節創建新問題? –

+0

完成http://stackoverflow.com/questions/43084252/create-large-csv-data-using-google-cloud-dataflow – SoulMan

相關問題