阿帕奇梁 - 无法推断在DOFN编码器与多输出标签
问题描述:
我尝试使用Apache束执行管线,但试图把一些输出标签时,我得到一个错误:阿帕奇梁 - 无法推断在DOFN编码器与多输出标签
import com.google.cloud.Tuple;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.stream.Collectors;
/**
* The Transformer.
*/
class Transformer {
final static TupleTag<Map<String, String>> successfulTransformation = new TupleTag<>();
final static TupleTag<Tuple<String, String>> failedTransformation = new TupleTag<>();
/**
* The entry point of the application.
*
* @param args the input arguments
*/
public static void main(String... args) {
TransformerOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TransformerOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Input", PubsubIO
.readMessagesWithAttributes()
.withIdAttribute("id")
.fromTopic(options.getTopicName()))
.apply(Window.<PubsubMessage>into(FixedWindows
.of(Duration.standardSeconds(60))))
.apply("Transform",
ParDo.of(new JsonTransformer())
.withOutputTags(successfulTransformation,
TupleTagList.of(failedTransformation)));
p.run().waitUntilFinish();
}
/**
* Deserialize the input and convert it to a key-value pairs map.
*/
static class JsonTransformer extends DoFn<PubsubMessage, Map<String, String>> {
/**
* Process each element.
*
* @param c the processing context
*/
@ProcessElement
public void processElement(ProcessContext c) {
String messagePayload = new String(c.element().getPayload());
try {
Type type = new TypeToken<Map<String, String>>() {
}.getType();
Gson gson = new Gson();
Map<String, String> map = gson.fromJson(messagePayload, type);
c.output(map);
} catch (Exception e) {
LOG.error("Failed to process input {} -- adding to dead letter file", c.element(), e);
String attributes = c.element()
.getAttributeMap()
.entrySet().stream().map((entry) ->
String.format("%s -> %s\n", entry.getKey(), entry.getValue()))
.collect(Collectors.joining());
c.output(failedTransformation, Tuple.of(attributes, messagePayload));
}
}
}
}
错误所示出的是:
异常在线程 “主要” java.lang.IllegalStateException:无法 换取Transform.out1 [PCollection]默认编码器。更正以下根本原因之一 :未手动指定编码器; 你可以使用.setCoder()。推测 CoderRegistry中的编码器失败:无法为V提供编码器。使用已注册的CoderProvider创建 编码器失败。有关详细故障,请参阅抑制 例外。使用来自 的默认输出编码器,产生PTransform失败:无法为V提供编码器。 使用注册的编码器提供程序构建编码器失败。
我试过不同的方法来解决这个问题,但我想我只是不明白是什么问题。我知道,这些线路会发生错误:
.withOutputTags(successfulTransformation,TupleTagList.of(failedTransformation))
,但我不明白的是哪一部分,在误差哪些部分需要特定的编码器,什么是“V”(从“无法提供编码器V“)。
为什么会发生错误?我也试着看看Apache Beam的文档,但他们似乎没有解释这种用法,也不了解关于编码器的部分。
感谢
答
首先,我建议如下 - 变化:
final static TupleTag<Map<String, String>> successfulTransformation =
new TupleTag<>();
final static TupleTag<Tuple<String, String>> failedTransformation =
new TupleTag<>();
到这一点:
final static TupleTag<Map<String, String>> successfulTransformation =
new TupleTag<Map<String, String>>() {};
final static TupleTag<Tuple<String, String>> failedTransformation =
new TupleTag<Tuple<String, String>>() {};
这将有助于该编码器推断确定侧输出的类型。另外,你有没有正确注册CoderProvider
为Tuple
?
在当前阶段,更改似乎没有做任何事情。不,我没有为Tuple注册一个CoderProvider,因为我不清楚我应该这样做的方式,您是否可以提供更多信息? –
我用自己制作的类替换了'Tuple',它扩展了Serializable,你的答案解决了我的问题。为什么?我所做的和你所建议的有什么不同?你也猜猜Google Cloud的'Tuple'为什么不扩展'Serializable'并且不能被继承吗? –
我对Google Cloud的元组不熟悉。你能链接到它来自哪里?一般来说,使用Serializable将比使用自定义编码器效率更低,或者因为Java序列化而导致类似Avro的效率。请参阅[注册编码器]的文档(https://beam.apache.org/documentation/programming-guide/#data-encoding-and-type-safety)。 –