Google Dataflow内部连接加入列表[]

问题描述:

我正在使用谷歌数据流CoGbkResult连接两个表作为内部连接。Google Dataflow内部连接加入列表[]

我能够成功加入表格。 我正在写输出到一个文本文件,并能够验证连接。但是,连接会将匹配结果放入列表中。

就是这样。

301%103%203%2017-09-20 07:49:46[2%google, 3%google, 1%microsoft] 
301%105%200%2017-09-17 11:48:59[2%google, 3%google, 1%microsoft] 

301%103%203%2017-09-20 07:49:46来自table_1。 2%google,3%google,1%microsoft与在table_2中加入的结果匹配。

以下是我processElement方法:

public void processElement(ProcessContext c) { 
    KV<String, CoGbkResult> e = c.element(); 
    String Ad_ID = e.getKey(); 
    Iterable<String> Ad_Info = null; 
    Ad_Info = e.getValue().getAll(AdInfoTag); 
    for (String ImpressionInfo : c.element().getValue().getAll(ImpressionInfoTag)) { 
    // Generate a string that combines information from both collection values 
    c.output(KV.of(Ad_ID, "%" + ImpressionInfo + Ad_Info)); 
    } 
} 

我不知道我怎样才能在单行输出。例如:

301%103%203%2017-09-20 07:49:46 2%google 
01%103%203%2017-09-20 07:49:46 3%google 
01%103%203%2017-09-20 07:49:46 1%microsoft 
301%105%200%2017-09-17 11:48:59 2%google 1%microsoft 
301%105%200%2017-09-17 11:48:59 3%google 
301%105%200%2017-09-17 11:48:59 1%microsoft 
+0

这并不完全清楚你想如何格式化输出。具体来说,在您的示例中有3个不同的行,前缀为“301%105%200%2017-09-17 11:48:59”,其中一行包含“2%谷歌”和“1%微软”在线上。那是故意的吗? –

+0

@Ben Chambers ...这是工作,当我做单独解析。问题是客户我切换到toString – KosiB

我设法通过解析器来解决这个问题。 GCP数据流还为此提供了一种方法吗?

int jointbegin = outputstring.indexOf(“[”); String firsthalf = outputstring.substring(0,jointbegin); String secondhalf = outputstring.substring(outputstring.indexOf(“[”)+ 1,outputstring.indexOf(“]”));

  if (!secondhalf.isEmpty()) 
      { 
       String[] ad_data = secondhalf.split(","); 

       for (int i = 0; i < ad_data.length; i++) 
       { 
        String final_string = firsthalf + ad_data[i]; 
        c.output(final_string); 
       } 
      } 
      } 
+0

在你的问题的DoFn中,你(隐式地)在可迭代的Ad_Info上调用toString(),现在你解析它以提取单个组件 - 为什么不只是迭代AdInfo原来的DoFn,其中已包含组件? – jkff

+0

@jkff。这正如你所说的那样工作。 – KosiB

+0

是的,本的上面的答案描述了如何以正确的方式做到这一点。 – jkff

我的理解(部分猜测)你想输出的是要输出在第一和第二迭代每个条目行,但我不知道为什么你不能只使用两个for循环,而不是将iterable转换为一个字符串,然后解析它。例如:

public void processElement(ProcessContext c) { 
    KV<String, CoGbkResult> e = c.element(); 
    String Ad_ID = e.getKey(); 
    Iterable<String> Ad_Infos = e.getValue().getAll(AdInfoTag); 
    for (String ImpressionInfo : c.element().getValue().getAll(ImpressionInfoTag)) { 
    for (String Ad_Info : Ad_Infos) { 
     c.output(KV.of(Ad_ID, "%" + ImpressionInfo + Ad_Info)); 
    } 
    } 
}