如何在ASP.NET Core应用程序中持续收听Pub/Sub消息?

问题描述:

我想实现一个ASP.NET核心API,它不响应HTTP请求,但在启动时开始监听Google Cloud Pub/Sub消息,并且在其整个生命周期中一直保持无限期地监听。如何在ASP.NET Core应用程序中持续收听Pub/Sub消息?

使用官方Pub/Sub SDK实现此功能的首选方式是什么?

我能想到的方法有两种:

方法1:只需使用一个SimpleSubscriber,并在Startup.Configure开始听消息:

public void Configure(IApplicationBuilder app) 
{ 
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName); 
    var receivedMessages = new List<PubsubMessage>(); 

    simpleSubscriber.StartAsync((msg, cancellationToken) => 
    { 
     // Process the message here. 

     return Task.FromResult(SimpleSubscriber.Reply.Ack); 
    }); 

    ... 
} 

方法2:专门使用一个库创建用于定期运行作业,例如Quartz,Hangfire或FluentScheduler,并且每次触发作业时,都会使用SubscriberClient来提取新消息。

哪一个是首选方法?第一个似乎更简单,但我不确定它是否真的可靠。

+0

@Flater这是一个在Kubernetes中运行的ASP.NET Core应用程序。我希望该应用具有一些REST端点,**和**不断监听一些发布/订阅消息。 (我知道我可以将这两件事分成两个部分,但如果可能的话,为了方便起见,我想保留它。) –

+1

我会从最简单的方法开始,然后根据需要移动到图书馆。在你的示例中,我只添加了用于在'静态字段中的某处保留'simpleSubscriber'的代码,以保护对象免受GC – Shrike

+1

的影响。就我*知道*而言,第一种方法应该没问题 - 但我正在与同事谁知道更多。 –

第一种方法肯定是如何打算使用它。

但是,看到该文档为StartAsync

开始接收邮件。当 StopAsync(CancellationToken)被调用时返回的Task完成,或者发生不可恢复的 故障。此方法不能被称为不止一次SimpleSubscriber实例。

所以你确实需要处理意外StartAsync关闭不可恢复的错误。最简单的做法是使用外部循环,但考虑到这些错误被认为是不可恢复的,这可能与调用需要在成功之前进行更改有关。

的代码可能是这样的:

while (true) 
{ 
    // Each SimpleSubscriber instance must only be used once. 
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName); 
    try 
    { 
     await simpleSubscriber.StartAsync((msg, cancellationToken) => 
     { 
      // Process the message here. 
      return Task.FromResult(SimpleSubscriber.Reply.Ack); 
     }); 
    } 
    catch (Exception e) 
    { 
     // Handle the unrecoverable error somehow... 
    } 
} 

如果如预期这不起作用,请let us know

+0

感谢您的信息!由于不可恢复的错误,你的意思是像堆栈溢出或内存不足,所以它不会因为暂时的网络故障或类似的事情而停止,对吗?顺便说一句。如何实现监听,内部是SimpleSubscriber轮询还是Pub/Sub支持类似WebSocket连接? –

+1

@MarkVincze SimpleSubscriber源代码位于:https://github.com/GoogleCloudPlatform/google-cloud-dotnet/blob/master/apis/Google.Cloud.PubSub。V1/Google.Cloud.PubSub.V1/SimpleSubscriber.cs,这样您就可以自己查看;)它使用长时间运行的流式RPC调用,该调用在第613行开始。 – Chris

+1

@MarkVincze“不可恢复”意味着不可恢复的RPC错误。 *可恢复* RPC错误列表位于以下位置:https://github.com/GoogleCloudPlatform/google-cloud-dotnet/blob/master/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/Extensions .cs#L61所有其他都是不可恢复的。 – Chris