Camel使用场景
Camel使用场景
Apache Camel是一个基于Enterprise Integration Pattern(企业整合模式,简称EIP)的开源框架。EIP定义了一些不同应用系统之间的消息传输模型,包括常见的Point-to-Point,Pub/Sub模型。
Apache Camel主要提供了以下功能:
1,实现了EIP的大部分模式,如果你要在不同的应用系统之间以不同的方式传递消息,那么你可以从Apache Camel中找到解决方案。
2,提供了大量Component(组件),每个组件都是一种消息中间件(或消息服务)的具体实现,每个消息中间件所用的协议都是不同的,因此,你可以通过多种不同的协议来完成消息传输。
3,允许用户定义灵活的路由规则,从这个角度来说,Apache Camel时一个规则引擎。
那么Apache Camel的应用场景有那些呢,这里列举一些:
1,消息汇聚,比如你有来自不同服务器的消息,有ActiveMQ,RabbitMQ,WebService等,你想把它们都存储到日志文件中,那么可以定义如下规则。
new RouteBuilder() {
@Override
public void configure() throws Exception {
from("amqp:queue:incoming").to("log:com.mycompany.log?level=DEBUG");
from("rabbitmq://localhost/A/routingKey=B").to("log:com.mycompany.log?level=DEBUG");
from("jetty:http://localhost:8080/myapp/myservice").to("log:com.mycompany.log?level=DEBUG");
}
}
from表示从这个endpoing取消息,to表示将消息发往这个endpoint,endpoint是消息地址,包含协议类型以及url。
2,消息分发,分为两种,顺序分发和并行分发。顺序分发时,消息会先到到第一个endpoing,第一个endpoint处理完成后,再分发到下下个endpoint。如果第一个endpoing处理出现故障,那么消息不会被传到第二个endpoint。比如有如下规则:
from("amqp:queue:order").to("uri:validateBean", "uri:handleBean", "uri:emailBean");
这个规则是从order队列中取订单信息,然后依次验证订单,处理订单,并发送邮件通知用户。任何一个步骤出错,下一个步骤将不回执行。
并行分发是将得到的消息同时发送到不同的endpoint,没有先后顺序之分,各个endpoint处理消息也是独立的。如果将以上路由改成
from("amqp:queue:order").multicast().to("uri:validateBean", "uri:handleBean", "uri:emailBean");
那么消息就会同时发到to所对应的endpoint。
3,消息转换,比如想将xml数据转换成json数据,可以使用如下规则。
from("amqp:queue:order").process(new XmlToJsonProcessor()).to("bean:orderHandler");
其中XmlToJsonProcessor是自定义的类,继承org.apache.camel.Processor,用于将xml数据转换成json。
4,规则引擎,你可以使用Spring Xml, Groovy这类DSL来定义route,这样无需修改代码,就能达到修改业务逻辑的目的。
例如上边的规则,from("amqp:queue:order").to("uri:validateBean", "uri:handleBean", "uri:emailBean");使用Spring Xml定义如下:
<route>
<from uri="amqp:queue:order"/>
<multicast>
<to uri="uri:validateBean"/>
<to uri="uri:handleBean"/>
<to uri="uri:emailBean"/>
</multicast>
</route>
如果需要在处理完订单后添加日志,可以改称如下规则:
<route>
<from uri="amqp:queue:order"/>
<multicast>
<to uri="uri:validateBean"/>
<to uri="uri:handleBean"/>
<to uri="log:com.mycompany.log?level=INFO"/>
<to uri="uri:emailBean"/>
</multicast>
</route>
另外camel提供了大量的内置Processor,用于逻辑运算,过滤等,例如:
from("amqp:queue:order").filter(header("foo").isEqualTo("bar")).choice()
.when(xpath("/person/city = 'London'"))
.to("file:target/messages/uk")
.otherwise()
.to("file:target/messages/others");
这条规则先对订单进行过滤,只处理header中foo的值为bar的订单,然后根据用户的城市进行将订单传给不同的endpoint。
Camel 和rabbitmq 集成处理
camel可以看作是java中的一个编程范式,通过DSL语言来编写路由规则。
camel 核心
camel是一个基于规则路由和处理的引擎,提供企业集成模式的Java对象的实现,通过应用程序接口或称为陈述式的Java领域特定语言(DSL)来配置路由和处理的规则。
其核心的思想就是从一个from源头得到数据,通过processor处理,再发到一个to目的的.发散:像是网络中超强的路由器规则,是否可以看做是软件定义的交换机?
这里的from和to可以是我们在项目集成中经常碰到的类型:一个FTP文件夹中的文件,一个MQ的queue,一个HTTP request/response,一个webservice等等。
现在的问题是 什么是java领域特定语言(DSL)?
Camel uses a Java Domain Specific Language or DSL for creating Enterprise Integration Patterns or Routes in a variety of domain-specific languages (DSL) as listed below.
Java DSL - A Java based DSL using the fluent builder style.
Spring XML - A XML based DSL in Spring XML files
Blueprint XML - A XML based DSL in OSGi Blueprint XML files
Rest DSL - A DSL to define REST services using a REST style in either Java or XML.
Groovy DSL - A Groovy based DSL using Groovy programming language
Scala DSL - A Scala based DSL using Scala programming language
Annotation DSL - Use annotations in Java beans.
Kotlin DSL - Work in progress - Currently developed outside ASF, but will we included later in Camel when Kotlin and the DSL is ready.
The main entry points for the DSL are
CamelContext for creating a Camel routing rulebase
RouteBuilder for creating a collection of routes using the routing DSL—http://camel.apache.org/dsl.html
也就是说camel可以使用多种方式配置,包括Java xml等。
这里主要关注java DSL,java DSL的语法主要是什么呢?
RouteBuilder类就是Java的smart DSL。
public abstract class RouteBuilder
extends BuilderSupport
implements RoutesBuilder
A Java DSL which is used to build DefaultRoute instances in a CamelContext for smart routing.
主要的方法有:
public abstract void configure()
throws Exception
Called on initialization to build the routes using the fluent builder syntax.
This is a central method for RouteBuilder implementations to implement the routes using the Java fluent builder syntax.
configure()就是主要的实现路由语法的函数。
主要的路由配置的方法我整理成了一张表格:
语法 | 作用 | 语法 |
---|---|---|
from | 路由的起点 | from(String uri),返回the builder |
to | 路由的终点 | 参数String uri,返回the builder |
choice(when,otherwise) | 选择某个内容 | 以choice开头,when表示某个条件 |
fromF/toF | 使用String formatting创建URI | 和from/to类似 |
filter | 选择某个内容 | 过滤想要的内容 |
process | 对内容进行一定的处理 | 需要编写继承自Processor接口的类,实现process方法,完成对Exchanage(交换内容)处理的逻辑 |
Processer接口用于处理消息转换或者消息交换。
来自网络的一个简单例子:
这里实现了一个简单的路由,这里的起点是DefaultCamelConetxt
,那么DefaultCamelConetxt这个类是怎样的?
public class DefaultCamelContext
extends ServiceSupport
implements ModelCamelContext, SuspendableService
Represents the context used to configure routes and the policies to use.
也就是说DefaultCamelContext
类代表了配置camel
路由的上下文,和一些必要的策略,DefaultCamelContext
继承自Service
,可以使用start,stop等基本的管理生命周期的方法。
让camel引擎开始工作使用start方法,对应的停止工作的方法是stop。使用addRoutes方法给camelContext添加对应的路由。
下图表示了一个继承自Processor类的基本的实现,
主要的逻辑是对Exchange进行操作,来达到想要的目的。上图的逻辑是将a文件所有的行连成一行,以空格分隔组成一个新的文件给to对应的终点uri。
前面讲了一个简单的这里,到这个时候,还需要补充一些基本的概念。
camel一些定义
补充的概念有Uris,EndPoint,Exchange,Message等
关于uris的官方解释: Camel makes extensive use of URIs to allow you to refer to endpoints which are lazily created by a Component if you refer to them within Routes.
意思是,uris的作用就是用来指向组件的endpoint的一种表示方法。
那么endpoint呢?
官方解释: public interface Endpoint extends IsSingleton, Service
An endpoint implements the Message Endpoint pattern and represents an endpoint that can send and receive message exchanges
endpoint就是一个可以收发消息的组件,组件component就是一个一个的camel组件的术语代表。
那么,message呢?
public interface Exchange An Exchange is the message container holdingthe information during the entire routing of a Message received by a Consumer.
public interface Message Implements the Message pattern and represents an inbound or outbound message as part of an Exchange.
Exchange是message容器,message有属性inbound和outbound,是Exchange的一部分。比如上面的处理器使用exchange的getIn()方法可以得到message本身。
目前我写的一系列文章都是讲的clojure,比如语法和数据处理,同样我本文的目的依旧是讲camel在clojure中的应用,讲的更具体一点的话,其实是谈论camel在clojure 微服务中的使用方法。
Apache Camel是一个基于Enterprise Integration Pattern(企业整合模式,简称EIP)的开源框架。EIP定义了一些不同应用系统之间的消息传输模型,包括常见的Point-to-Point,Pub/Sub模型。
Apache Camel主要提供了以下功能:
1,实现了EIP的大部分模式,如果你要在不同的应用系统之间以不同的方式传递消息,那么你可以从Apache Camel中找到解决方案。
2,提供了大量Component(组件),每个组件都是一种消息中间件(或消息服务)的具体实现,每个消息中间件所用的协议都是不同的,因此,你可以通过多种不同的协议来完成消息传输。
3,允许用户定义灵活的路由规则,从这个角度来说,Apache Camel时一个规则引擎。
那么Apache Camel的应用场景有那些呢,这里列举一些:
1,消息汇聚,比如你有来自不同服务器的消息,有ActiveMQ,RabbitMQ,WebService等,你想把它们都存储到日志文件中,那么可以定义如下规则。
new RouteBuilder() {
@Override
public void configure() throws Exception {
from("amqp:queue:incoming").to("log:com.mycompany.log?level=DEBUG");
from("rabbitmq://localhost/A/routingKey=B").to("log:com.mycompany.log?level=DEBUG");
from("jetty:http://localhost:8080/myapp/myservice").to("log:com.mycompany.log?level=DEBUG");
}
}
from表示从这个endpoing取消息,to表示将消息发往这个endpoint,endpoint是消息地址,包含协议类型以及url。
2,消息分发,分为两种,顺序分发和并行分发。顺序分发时,消息会先到到第一个endpoing,第一个endpoint处理完成后,再分发到下下个endpoint。如果第一个endpoing处理出现故障,那么消息不会被传到第二个endpoint。比如有如下规则:
from("amqp:queue:order").to("uri:validateBean", "uri:handleBean", "uri:emailBean");
这个规则是从order队列中取订单信息,然后依次验证订单,处理订单,并发送邮件通知用户。任何一个步骤出错,下一个步骤将不回执行。
并行分发是将得到的消息同时发送到不同的endpoint,没有先后顺序之分,各个endpoint处理消息也是独立的。如果将以上路由改成
from("amqp:queue:order").multicast().to("uri:validateBean", "uri:handleBean", "uri:emailBean");
那么消息就会同时发到to所对应的endpoint。
3,消息转换,比如想将xml数据转换成json数据,可以使用如下规则。
from("amqp:queue:order").process(new XmlToJsonProcessor()).to("bean:orderHandler");
其中XmlToJsonProcessor是自定义的类,继承org.apache.camel.Processor,用于将xml数据转换成json。
4,规则引擎,你可以使用Spring Xml, Groovy这类DSL来定义route,这样无需修改代码,就能达到修改业务逻辑的目的。
例如上边的规则,from("amqp:queue:order").to("uri:validateBean", "uri:handleBean", "uri:emailBean");使用Spring Xml定义如下:
<route>
<from uri="amqp:queue:order"/>
<multicast>
<to uri="uri:validateBean"/>
<to uri="uri:handleBean"/>
<to uri="uri:emailBean"/>
</multicast>
</route>
如果需要在处理完订单后添加日志,可以改称如下规则:
<route>
<from uri="amqp:queue:order"/>
<multicast>
<to uri="uri:validateBean"/>
<to uri="uri:handleBean"/>
<to uri="log:com.mycompany.log?level=INFO"/>
<to uri="uri:emailBean"/>
</multicast>
</route>
另外camel提供了大量的内置Processor,用于逻辑运算,过滤等,例如:
from("amqp:queue:order").filter(header("foo").isEqualTo("bar")).choice()
.when(xpath("/person/city = 'London'"))
.to("file:target/messages/uk")
.otherwise()
.to("file:target/messages/others");
这条规则先对订单进行过滤,只处理header中foo的值为bar的订单,然后根据用户的城市进行将订单传给不同的endpoint。
Camel 和rabbitmq 集成处理
camel可以看作是java中的一个编程范式,通过DSL语言来编写路由规则。
camel 核心
camel是一个基于规则路由和处理的引擎,提供企业集成模式的Java对象的实现,通过应用程序接口或称为陈述式的Java领域特定语言(DSL)来配置路由和处理的规则。
其核心的思想就是从一个from源头得到数据,通过processor处理,再发到一个to目的的.发散:像是网络中超强的路由器规则,是否可以看做是软件定义的交换机?
这里的from和to可以是我们在项目集成中经常碰到的类型:一个FTP文件夹中的文件,一个MQ的queue,一个HTTP request/response,一个webservice等等。
现在的问题是 什么是java领域特定语言(DSL)?
Camel uses a Java Domain Specific Language or DSL for creating Enterprise Integration Patterns or Routes in a variety of domain-specific languages (DSL) as listed below.
Java DSL - A Java based DSL using the fluent builder style.
Spring XML - A XML based DSL in Spring XML files
Blueprint XML - A XML based DSL in OSGi Blueprint XML files
Rest DSL - A DSL to define REST services using a REST style in either Java or XML.
Groovy DSL - A Groovy based DSL using Groovy programming language
Scala DSL - A Scala based DSL using Scala programming language
Annotation DSL - Use annotations in Java beans.
Kotlin DSL - Work in progress - Currently developed outside ASF, but will we included later in Camel when Kotlin and the DSL is ready.
The main entry points for the DSL are
CamelContext for creating a Camel routing rulebase
RouteBuilder for creating a collection of routes using the routing DSL—http://camel.apache.org/dsl.html
也就是说camel可以使用多种方式配置,包括Java xml等。
这里主要关注java DSL,java DSL的语法主要是什么呢?
RouteBuilder类就是Java的smart DSL。
public abstract class RouteBuilder
extends BuilderSupport
implements RoutesBuilder
A Java DSL which is used to build DefaultRoute instances in a CamelContext for smart routing.
主要的方法有:
public abstract void configure()
throws Exception
Called on initialization to build the routes using the fluent builder syntax.
This is a central method for RouteBuilder implementations to implement the routes using the Java fluent builder syntax.
configure()就是主要的实现路由语法的函数。
主要的路由配置的方法我整理成了一张表格:
语法 | 作用 | 语法 |
---|---|---|
from | 路由的起点 | from(String uri),返回the builder |
to | 路由的终点 | 参数String uri,返回the builder |
choice(when,otherwise) | 选择某个内容 | 以choice开头,when表示某个条件 |
fromF/toF | 使用String formatting创建URI | 和from/to类似 |
filter | 选择某个内容 | 过滤想要的内容 |
process | 对内容进行一定的处理 | 需要编写继承自Processor接口的类,实现process方法,完成对Exchanage(交换内容)处理的逻辑 |
Processer接口用于处理消息转换或者消息交换。
来自网络的一个简单例子:
这里实现了一个简单的路由,这里的起点是DefaultCamelConetxt
,那么DefaultCamelConetxt这个类是怎样的?
public class DefaultCamelContext
extends ServiceSupport
implements ModelCamelContext, SuspendableService
Represents the context used to configure routes and the policies to use.
也就是说DefaultCamelContext
类代表了配置camel
路由的上下文,和一些必要的策略,DefaultCamelContext
继承自Service
,可以使用start,stop等基本的管理生命周期的方法。
让camel引擎开始工作使用start方法,对应的停止工作的方法是stop。使用addRoutes方法给camelContext添加对应的路由。
下图表示了一个继承自Processor类的基本的实现,
主要的逻辑是对Exchange进行操作,来达到想要的目的。上图的逻辑是将a文件所有的行连成一行,以空格分隔组成一个新的文件给to对应的终点uri。
前面讲了一个简单的这里,到这个时候,还需要补充一些基本的概念。
camel一些定义
补充的概念有Uris,EndPoint,Exchange,Message等
关于uris的官方解释: Camel makes extensive use of URIs to allow you to refer to endpoints which are lazily created by a Component if you refer to them within Routes.
意思是,uris的作用就是用来指向组件的endpoint的一种表示方法。
那么endpoint呢?
官方解释: public interface Endpoint extends IsSingleton, Service
An endpoint implements the Message Endpoint pattern and represents an endpoint that can send and receive message exchanges
endpoint就是一个可以收发消息的组件,组件component就是一个一个的camel组件的术语代表。
那么,message呢?
public interface Exchange An Exchange is the message container holdingthe information during the entire routing of a Message received by a Consumer.
public interface Message Implements the Message pattern and represents an inbound or outbound message as part of an Exchange.
Exchange是message容器,message有属性inbound和outbound,是Exchange的一部分。比如上面的处理器使用exchange的getIn()方法可以得到message本身。
目前我写的一系列文章都是讲的clojure,比如语法和数据处理,同样我本文的目的依旧是讲camel在clojure中的应用,讲的更具体一点的话,其实是谈论camel在clojure 微服务中的使用方法。