当从PubSub读取数据流作业并写入Google Cloud Storage时数据丢失
问题描述:
将固定数量的字符串(用于测试的800,000个1KB)放入PubSub主题并在以下版本中运行以下Apache Beam(2.1.0)作业数据流,正好一次保留语义,如预期。当从PubSub读取数据流作业并写入Google Cloud Storage时数据丢失
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGsSimpleJob {
public static void main(String[] args) {
PubSubToGsPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PubSubToGsPipelineOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.readStrings().fromSubscription(options.getInput()))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()));
p.run();
}
}
PipelineOptions
以下实施
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
public interface PubSubToGsPipelineOptions extends PipelineOptions {
@Description("PubSub subscription")
String getInput();
void setInput(String input);
@Description("Google Cloud Storage output path")
String getOutput();
void setOutput(String output);
}
但是,如果同样的工作运行时,所有的元素之前倒掉读取(如数据流控制台中显示),并再次拉开序幕,输出文件与发布到PubSub主题中的原始数据集相比,记录数少。这意味着排除和替换作业可能会导致数据丢失,这似乎很奇怪,因为this google cloud blog post提到Drain and replace
应该至少有一次语义。如何设计这条管道,以便在排干和更换作业时至少实现一次语义(或者更好但恰好一次语义)?
答
我的猜测是,一个窗口可能会部分写入之前,排水和替换工作用其余的窗口覆盖它。您可以检查这个日志行in WriteFiles的排水作业和替换作业中的工人日志。如果您使用Beam HEAD,它还会在最终目的地被覆盖时进行记录。
从概念上消耗的作业和替换作业是完全不同的管道。使用相同的输出位置与其他两个不相关的作业使用相同的输出位置没有区别。您可以尝试的另一件事是为第二个作业使用不同的输出路径,并验证所有记录都存在于两个目录中。
这就是说,可能是TextIO可以/应该更好地处理这种情况(如果上面的猜测是正确的)。请确认更改输出路径是否保留所有记录。这将确认Dataflow的保证。 –
我找不到匹配日志模式的行。配置替换作业以写入单独的输出至少产生一次语义。有一些重复的记录意味着一次只使用PubSub作为源的语义不被维护。 该博客文章还提到'如果您的自定义源担保恰好一次交付,并提供源侧的缓冲,排水和更换可以提供恰好一次semantics.'因为PubSub的消息可以用一个ACK响应之前进行缓冲,这是否建议PubSub只能提供一次语义? – JonSim
感谢您确认工作。我将为TextIO提交一个错误。我很惊讶你看到重复。在这种情况下,PubSub应该提供完全一次的语义。如果你可以提供更多信息(和job_id id,你可以),这将是非常有用的。我当然有兴趣挖掘更多的重复来源。 –