生成使用谷歌云数据流

问题描述:

我运行下面的数据流的代码作为JUnit测试类的一部分大文件生成使用谷歌云数据流

@Test 
public void dataFlowGenerator() { 
    DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); 
    options.setRunner(BlockingDataflowPipelineRunner.class); 
    options.setStagingLocation("gs://mybucket/lt"); 
    options.setProject("myProject"); 
    Pipeline p = Pipeline.create(options); 

    List<String> list = Arrays.asList("sup1", "sup2", "sup3"); 
    p.apply(Create.of(list)).apply(ParDo.of(new generate())).apply(
     TextIO.Write.to("gs://mybucket/lt/df.txt")); 
} 


private class generate extends DoFn<String, String> implements Serializable { 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
    new DoFn<String, String>() { 
     @Override 
     public void processElement(ProcessContext c) { 
     c.output(c.element()); 
     } 
    }; 
    } 
} 

这没什么输出(只是一个空文件df.txt 00000-的-00001 )在云存储中提到的文件中。我期待processElement将被调用列表中的每个字符串项目,它们将被输出到输出文件。

如何用随机字符串生成数百万条记录。在这种情况下,数据流并不需要任何输入源。它应该能够转换一些种子字符串和输出。

两件事情:

首先,你不需要做ParDo可言。您的变换只是标识变换 - 您可以将Create的输出正确输入TextIO.Write

但我也想澄清一下,为什么您没有看到goutput:您DoFn子类有一个创建另外一个DoFn方法processElement,只是丢弃值。你会想写这个代替:

private class Generate extends DoFn<String, String> implements Serializable { 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
    c.output(c.element()); 
    } 
} 
+0

谢谢。创建需要的是内存中的字符串列表。有什么办法可以将它们写入文件并提供它,因为我需要将近10亿行CSV生成 – SoulMan

+0

您是否可以通过关于如何生成数据的一些细节创建新问题? –

+0

完成http://*.com/questions/43084252/create-large-csv-data-using-google-cloud-dataflow – SoulMan