如何为自定义消息处理器实现自定义SamplingService?在此之后检索元素并在执行序列之前

问题描述:

我很新WSO2 ESB我有一个实现具有此特定行为的自定义消息处理器:执行操作后,从消息存储中检索元素,并在执行与此消息处理器相关的序列之前如何为自定义消息处理器实现自定义SamplingService?在此之后检索元素并在执行序列之前

我试着详细解释它。

这是我的ESB消息处理器定义:

<?xml version="1.0" encoding="UTF-8"?> 
<!---<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">--> 
<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse"> 
    <parameter name="sequence">transferProcessorSequence</parameter> 
    <parameter name="interval">1000</parameter> 
    <parameter name="is.active">true</parameter> 
    <parameter name="concurrency">1</parameter> 
</messageProcessor> 

它检索某些元素(XML文档)形成transferFromMessageStore(队列),并且该对象传递到使用transferProcessorSequence.xml序列它。正如您在此时看到的,我已经实现了一个自定义消息处理器SamplingProcessorHeaderRateLimit,它简单地扩展了WSO2类的org.apache.synapse.message.processor.impl.sampler.SamplingProcessor。此时它仅在执行init()方法时显示日志。我将它部署在我的Carbon服务器上,并且工作正常。

Here你可以找到整个项目代码。

好,但是从我所了解,以获得所需的行为我没有简单地延长SamplingProcessor类,因为为了做到每封邮件的消费和调度的序列之间的自定义实现,需要扩展类SamplingService类,这个one

我认为我需要重写的execute()取(的MessageConsumer msgConsumer)

此时应该还可以插入日志,每次从消息存储中检索元素并在此之前执行与消息处理器相关的序列时,都会写入日志文件。

可能吗?

所以我的主要主杜省是:

1)有我创建扩展SamplingService类在同一个项目中,我实现我的自定义消息处理器(此行为的类必须使用仅用于我的WSO2 ESB项目中的特定消息处理器,此项目中使用的所有其他消息处理器必须使用标准SamplingService实现)。

2)另一个疑问是关于如何将此自定义SamplingService实现传递给我的自定义消息处理器。进入SamplingProcessor WSO2类(如何将特定的自定义消息处理器实现与处理其生命周期的自定义SamplingService实现关联)。

+0

2问题为什么不重写com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation中的getTask方法(它保护了modifer),并返回自己的类来扩展SamplingService? – simar

+0

@simar你什么意思?在这个时候,我已经覆盖了getTask()方法,只是简单地把它放在log \ println消息中(仅用于测试puropose)。问题是我从来没有获得这个输出到我的日志文件(相反,我获得init()方法日志)。所以在我看来,这个getTask()方法从来没有执行的原因。你有一些关于它的想法吗?也许是受保护的修饰符? – AndreaNobili

+0

调用log.info()仅在您正确配置为在文件中可见的日志中可见。调用System.out.println只在控制台中可见。你是否尝试在控制台模式下运行并在getTask方法中查找println消息?您是否尝试在调试模式下运行wsoesb并尝试调试以查看发生了什么问题 – simar

1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation). 

您的自定义SamplingProcessorHeaderRateLimitation只会消耗进去,到transferFromMessageStore消息,并会注入其消耗和处理才序列transferProcessorSequence消息。此消息处理器不会处理所有其他路径。

2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle). 

如果你看看你的源代码来实现SamplingProcessorHeaderRateLimitation.getTask()你有你的自定义SamplingProcessorHeaderRateLimitation

@Override 
protected Task getTask() { 
    logger.info("getTask() START"); 
    System.out.println("getTask() START"); 
    logger.info("getTask() END"); 
    System.out.println("getTask() END"); 
    return (Task) new SamplingService2(this, synapseEnvironment, CONCURRENCY, SEQUENCE, isProcessorStartAsDeactivated()); 
} 
+0

我的疑问是:正如你所看到的,当getTask()方法启动并且返回时,我会放入一些logger \ println,但它会返回,但这些日志不会出现在日志文件中将日志放入存在于我的日志文件中的init()方法中)。所以在我看来,由于某种原因,getTask()方法从不执行(它只是一个假设),它仍然使用标准的SamplingService实现。你对此有进一步的想法吗?谢谢 – AndreaNobili

+0

在这里有什么问题(从github上次检查源)。 SamplingService2不推动任务接口,因此它不能转换为任务。它假设抛出ClassCastException – simar

你在XML配置任务捆绑您的自定义SamplingService2

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask"> 
    <trigger interval="5000"/> 
</task> 

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask"> 
    <trigger interval="5000" count="10"/> 
</task> 

Q1:

有我创建一个类扩展SamplingService类到 同一个项目中,我实现我的自定义消息处理器 (这种行为只能用于我的WSO2 ESB项目中的这个特定消息 处理器,al l在本项目中使用的其他消息处理器 必须使用标准SamplingService 实现)。

<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse"> 
<parameter name="sequence">transferProcessorSequence</parameter> 

定制处理器和服务将当它被指定为在处理器类,如上面在实施例仅使用。

Q2:

另一个疑问是这个自订SamplingService 实现是如何传递给我的自定义消息处理器相关。进入 SamplingProcessor WSO2类(如何将特定定制 消息处理器实现与处理其生命周期的自定义SamplingService 实现相关联)。

可以直接扩展/实施ScheduledMessageProcessor而非SamplingProcessor,因为它仅实现getTask()和初始化在init()方法视图对象。 另外您的SamplingService2类应扩展Task ManagedLifecycle接口。