如何在ASP.NET Core应用程序中持续收听Pub/Sub消息?
我想实现一个ASP.NET核心API,它不响应HTTP请求,但在启动时开始监听Google Cloud Pub/Sub消息,并且在其整个生命周期中一直保持无限期地监听。如何在ASP.NET Core应用程序中持续收听Pub/Sub消息?
使用官方Pub/Sub SDK实现此功能的首选方式是什么?
我能想到的方法有两种:
方法1:只需使用一个SimpleSubscriber
,并在Startup.Configure
开始听消息:
public void Configure(IApplicationBuilder app)
{
var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
var receivedMessages = new List<PubsubMessage>();
simpleSubscriber.StartAsync((msg, cancellationToken) =>
{
// Process the message here.
return Task.FromResult(SimpleSubscriber.Reply.Ack);
});
...
}
方法2:专门使用一个库创建用于定期运行作业,例如Quartz,Hangfire或FluentScheduler,并且每次触发作业时,都会使用SubscriberClient
来提取新消息。
哪一个是首选方法?第一个似乎更简单,但我不确定它是否真的可靠。
第一种方法肯定是如何打算使用它。
但是,看到该文档为StartAsync
:
开始接收邮件。当
StopAsync(CancellationToken)
被调用时返回的Task
完成,或者发生不可恢复的 故障。此方法不能被称为不止一次每SimpleSubscriber
实例。
所以你确实需要处理意外StartAsync
关闭不可恢复的错误。最简单的做法是使用外部循环,但考虑到这些错误被认为是不可恢复的,这可能与调用需要在成功之前进行更改有关。
的代码可能是这样的:
while (true)
{
// Each SimpleSubscriber instance must only be used once.
var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
try
{
await simpleSubscriber.StartAsync((msg, cancellationToken) =>
{
// Process the message here.
return Task.FromResult(SimpleSubscriber.Reply.Ack);
});
}
catch (Exception e)
{
// Handle the unrecoverable error somehow...
}
}
如果如预期这不起作用,请let us know。
感谢您的信息!由于不可恢复的错误,你的意思是像堆栈溢出或内存不足,所以它不会因为暂时的网络故障或类似的事情而停止,对吗?顺便说一句。如何实现监听,内部是SimpleSubscriber轮询还是Pub/Sub支持类似WebSocket连接? –
@MarkVincze SimpleSubscriber源代码位于:https://github.com/GoogleCloudPlatform/google-cloud-dotnet/blob/master/apis/Google.Cloud.PubSub。V1/Google.Cloud.PubSub.V1/SimpleSubscriber.cs,这样您就可以自己查看;)它使用长时间运行的流式RPC调用,该调用在第613行开始。 – Chris
@MarkVincze“不可恢复”意味着不可恢复的RPC错误。 *可恢复* RPC错误列表位于以下位置:https://github.com/GoogleCloudPlatform/google-cloud-dotnet/blob/master/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/Extensions .cs#L61所有其他都是不可恢复的。 – Chris
@Flater这是一个在Kubernetes中运行的ASP.NET Core应用程序。我希望该应用具有一些REST端点,**和**不断监听一些发布/订阅消息。 (我知道我可以将这两件事分成两个部分,但如果可能的话,为了方便起见,我想保留它。) –
我会从最简单的方法开始,然后根据需要移动到图书馆。在你的示例中,我只添加了用于在'静态字段中的某处保留'simpleSubscriber'的代码,以保护对象免受GC – Shrike
的影响。就我*知道*而言,第一种方法应该没问题 - 但我正在与同事谁知道更多。 –