Camel使用场景

Camel使用场景

Apache Camel是一个基于Enterprise Integration Pattern(企业整合模式,简称EIP)的开源框架。EIP定义了一些不同应用系统之间的消息传输模型,包括常见的Point-to-Point,Pub/Sub模型。

 

Apache Camel主要提供了以下功能:

1,实现了EIP的大部分模式,如果你要在不同的应用系统之间以不同的方式传递消息,那么你可以从Apache Camel中找到解决方案。

2,提供了大量Component(组件),每个组件都是一种消息中间件(或消息服务)的具体实现,每个消息中间件所用的协议都是不同的,因此,你可以通过多种不同的协议来完成消息传输。

3,允许用户定义灵活的路由规则,从这个角度来说,Apache Camel时一个规则引擎。

 

 

那么Apache Camel的应用场景有那些呢,这里列举一些:

 

1,消息汇聚,比如你有来自不同服务器的消息,有ActiveMQ,RabbitMQ,WebService等,你想把它们都存储到日志文件中,那么可以定义如下规则。

new RouteBuilder() {

    @Override

    public void configure() throws Exception {

        from("amqp:queue:incoming").to("log:com.mycompany.log?level=DEBUG");

        from("rabbitmq://localhost/A/routingKey=B").to("log:com.mycompany.log?level=DEBUG");

        from("jetty:http://localhost:8080/myapp/myservice").to("log:com.mycompany.log?level=DEBUG");

    }

}

from表示从这个endpoing取消息,to表示将消息发往这个endpoint,endpoint是消息地址,包含协议类型以及url。

 

2,消息分发,分为两种,顺序分发和并行分发。顺序分发时,消息会先到到第一个endpoing,第一个endpoint处理完成后,再分发到下下个endpoint。如果第一个endpoing处理出现故障,那么消息不会被传到第二个endpoint。比如有如下规则:

from("amqp:queue:order").to("uri:validateBean", "uri:handleBean", "uri:emailBean");

这个规则是从order队列中取订单信息,然后依次验证订单,处理订单,并发送邮件通知用户。任何一个步骤出错,下一个步骤将不回执行。

 

并行分发是将得到的消息同时发送到不同的endpoint,没有先后顺序之分,各个endpoint处理消息也是独立的。如果将以上路由改成

from("amqp:queue:order").multicast().to("uri:validateBean", "uri:handleBean", "uri:emailBean");

那么消息就会同时发到to所对应的endpoint

 

3,消息转换,比如想将xml数据转换成json数据,可以使用如下规则。

from("amqp:queue:order").process(new XmlToJsonProcessor()).to("bean:orderHandler");

其中XmlToJsonProcessor是自定义的类,继承org.apache.camel.Processor,用于将xml数据转换成json。

 

4,规则引擎,你可以使用Spring Xml, Groovy这类DSL来定义route,这样无需修改代码,就能达到修改业务逻辑的目的。

例如上边的规则,from("amqp:queue:order").to("uri:validateBean", "uri:handleBean", "uri:emailBean");使用Spring Xml定义如下:

 

<route>

        <from uri="amqp:queue:order"/>

        <multicast>

            <to uri="uri:validateBean"/>

            <to uri="uri:handleBean"/>

            <to uri="uri:emailBean"/>

        </multicast>

</route>

 

如果需要在处理完订单后添加日志,可以改称如下规则:

 

<route>
        <from uri="amqp:queue:order"/>
        <multicast>
            <to uri="uri:validateBean"/>
            <to uri="uri:handleBean"/>
            <to uri="log:com.mycompany.log?level=INFO"/>
            <to uri="uri:emailBean"/>
        </multicast>
</route>

 

 

另外camel提供了大量的内置Processor,用于逻辑运算,过滤等,例如:

 

from("amqp:queue:order").filter(header("foo").isEqualTo("bar")).choice()
    .when(xpath("/person/city = 'London'"))
        .to("file:target/messages/uk")
    .otherwise()
        .to("file:target/messages/others");

 

这条规则先对订单进行过滤,只处理header中foo的值为bar的订单,然后根据用户的城市进行将订单传给不同的endpoint。


Camel 和rabbitmq 集成处理

camel可以看作是java中的一个编程范式,通过DSL语言来编写路由规则。

camel 核心

camel是一个基于规则路由和处理的引擎,提供企业集成模式的Java对象的实现,通过应用程序接口或称为陈述式的Java领域特定语言(DSL)来配置路由和处理的规则。
其核心的思想就是从一个from源头得到数据,通过processor处理,再发到一个to目的的.

发散:像是网络中超强的路由器规则,是否可以看做是软件定义的交换机

这里的from和to可以是我们在项目集成中经常碰到的类型:一个FTP文件夹中的文件,一个MQ的queue,一个HTTP request/response,一个webservice等等。

现在的问题是 什么是java领域特定语言(DSL)?

Camel uses a Java Domain Specific Language or DSL for creating Enterprise Integration Patterns or Routes in a variety of domain-specific languages (DSL) as listed below.
Java DSL - A Java based DSL using the fluent builder style.
Spring XML - A XML based DSL in Spring XML files
Blueprint XML - A XML based DSL in OSGi Blueprint XML files
Rest DSL - A DSL to define REST services using a REST style in either Java or XML.
Groovy DSL - A Groovy based DSL using Groovy programming language
Scala DSL - A Scala based DSL using Scala programming language
Annotation DSL - Use annotations in Java beans.
Kotlin DSL - Work in progress - Currently developed outside ASF, but will we included later in Camel when Kotlin and the DSL is ready.
The main entry points for the DSL are
CamelContext for creating a Camel routing rulebase
RouteBuilder for creating a collection of routes using the routing DSL—http://camel.apache.org/dsl.html
也就是说camel可以使用多种方式配置,包括Java xml等。

这里主要关注java DSL,java DSL的语法主要是什么呢?
RouteBuilder类就是Java的smart DSL。

public abstract class RouteBuilder
extends BuilderSupport
implements RoutesBuilder
A Java DSL which is used to build DefaultRoute instances in a CamelContext for smart routing.

主要的方法有:

public abstract void configure()
throws Exception
Called on initialization to build the routes using the fluent builder syntax.
This is a central method for RouteBuilder implementations to implement the routes using the Java fluent builder syntax.

configure()就是主要的实现路由语法的函数。
主要的路由配置的方法我整理成了一张表格:

语法 作用 语法
from 路由的起点 from(String uri),返回the builder
to 路由的终点 参数String uri,返回the builder
choice(when,otherwise) 选择某个内容 以choice开头,when表示某个条件
fromF/toF 使用String formatting创建URI 和from/to类似
filter 选择某个内容 过滤想要的内容
process 对内容进行一定的处理 需要编写继承自Processor接口的类,实现process方法,完成对Exchanage(交换内容)处理的逻辑

Processer接口用于处理消息转换或者消息交换。

来自网络的一个简单例子:
Camel使用场景
这里实现了一个简单的路由,这里的起点是DefaultCamelConetxt,那么DefaultCamelConetxt这个类是怎样的?

public class DefaultCamelContext
extends ServiceSupport
implements ModelCamelContext, SuspendableService
Represents the context used to configure routes and the policies to use.

也就是说DefaultCamelContext类代表了配置camel路由的上下文,和一些必要的策略,DefaultCamelContext继承自Service,可以使用start,stop等基本的管理生命周期的方法。
让camel引擎开始工作使用start方法,对应的停止工作的方法是stop。使用addRoutes方法给camelContext添加对应的路由。
下图表示了一个继承自Processor类的基本的实现,
Camel使用场景
主要的逻辑是对Exchange进行操作,来达到想要的目的。上图的逻辑是将a文件所有的行连成一行,以空格分隔组成一个新的文件给to对应的终点uri。
前面讲了一个简单的这里,到这个时候,还需要补充一些基本的概念。

camel一些定义

补充的概念有Uris,EndPoint,Exchange,Message等

关于uris的官方解释: Camel makes extensive use of URIs to allow you to refer to endpoints which are lazily created by a Component if you refer to them within Routes.

意思是,uris的作用就是用来指向组件的endpoint的一种表示方法。

那么endpoint呢?

官方解释: public interface Endpoint extends IsSingleton, Service
An endpoint implements the Message Endpoint pattern and represents an endpoint that can send and receive message exchanges

endpoint就是一个可以收发消息的组件,组件component就是一个一个的camel组件的术语代表。

那么,message呢?

public interface Exchange An Exchange is the message container holdingthe information during the entire routing of a Message received by a Consumer.

public interface Message Implements the Message pattern and represents an inbound or outbound message as part of an Exchange.

Exchange是message容器,message有属性inbound和outbound,是Exchange的一部分。比如上面的处理器使用exchange的getIn()方法可以得到message本身。

目前我写的一系列文章都是讲的clojure,比如语法和数据处理,同样我本文的目的依旧是讲camel在clojure中的应用,讲的更具体一点的话,其实是谈论camel在clojure 微服务中的使用方法。


Apache Camel是一个基于Enterprise Integration Pattern(企业整合模式,简称EIP)的开源框架。EIP定义了一些不同应用系统之间的消息传输模型,包括常见的Point-to-Point,Pub/Sub模型。

 

Apache Camel主要提供了以下功能:

1,实现了EIP的大部分模式,如果你要在不同的应用系统之间以不同的方式传递消息,那么你可以从Apache Camel中找到解决方案。

2,提供了大量Component(组件),每个组件都是一种消息中间件(或消息服务)的具体实现,每个消息中间件所用的协议都是不同的,因此,你可以通过多种不同的协议来完成消息传输。

3,允许用户定义灵活的路由规则,从这个角度来说,Apache Camel时一个规则引擎。

 

 

那么Apache Camel的应用场景有那些呢,这里列举一些:

 

1,消息汇聚,比如你有来自不同服务器的消息,有ActiveMQ,RabbitMQ,WebService等,你想把它们都存储到日志文件中,那么可以定义如下规则。

new RouteBuilder() {

    @Override

    public void configure() throws Exception {

        from("amqp:queue:incoming").to("log:com.mycompany.log?level=DEBUG");

        from("rabbitmq://localhost/A/routingKey=B").to("log:com.mycompany.log?level=DEBUG");

        from("jetty:http://localhost:8080/myapp/myservice").to("log:com.mycompany.log?level=DEBUG");

    }

}

from表示从这个endpoing取消息,to表示将消息发往这个endpoint,endpoint是消息地址,包含协议类型以及url。

 

2,消息分发,分为两种,顺序分发和并行分发。顺序分发时,消息会先到到第一个endpoing,第一个endpoint处理完成后,再分发到下下个endpoint。如果第一个endpoing处理出现故障,那么消息不会被传到第二个endpoint。比如有如下规则:

from("amqp:queue:order").to("uri:validateBean", "uri:handleBean", "uri:emailBean");

这个规则是从order队列中取订单信息,然后依次验证订单,处理订单,并发送邮件通知用户。任何一个步骤出错,下一个步骤将不回执行。

 

并行分发是将得到的消息同时发送到不同的endpoint,没有先后顺序之分,各个endpoint处理消息也是独立的。如果将以上路由改成

from("amqp:queue:order").multicast().to("uri:validateBean", "uri:handleBean", "uri:emailBean");

那么消息就会同时发到to所对应的endpoint

 

3,消息转换,比如想将xml数据转换成json数据,可以使用如下规则。

from("amqp:queue:order").process(new XmlToJsonProcessor()).to("bean:orderHandler");

其中XmlToJsonProcessor是自定义的类,继承org.apache.camel.Processor,用于将xml数据转换成json。

 

4,规则引擎,你可以使用Spring Xml, Groovy这类DSL来定义route,这样无需修改代码,就能达到修改业务逻辑的目的。

例如上边的规则,from("amqp:queue:order").to("uri:validateBean", "uri:handleBean", "uri:emailBean");使用Spring Xml定义如下:

 

<route>

        <from uri="amqp:queue:order"/>

        <multicast>

            <to uri="uri:validateBean"/>

            <to uri="uri:handleBean"/>

            <to uri="uri:emailBean"/>

        </multicast>

</route>

 

如果需要在处理完订单后添加日志,可以改称如下规则:

 

<route>
        <from uri="amqp:queue:order"/>
        <multicast>
            <to uri="uri:validateBean"/>
            <to uri="uri:handleBean"/>
            <to uri="log:com.mycompany.log?level=INFO"/>
            <to uri="uri:emailBean"/>
        </multicast>
</route>

 

 

另外camel提供了大量的内置Processor,用于逻辑运算,过滤等,例如:

 

from("amqp:queue:order").filter(header("foo").isEqualTo("bar")).choice()
    .when(xpath("/person/city = 'London'"))
        .to("file:target/messages/uk")
    .otherwise()
        .to("file:target/messages/others");

 

这条规则先对订单进行过滤,只处理header中foo的值为bar的订单,然后根据用户的城市进行将订单传给不同的endpoint。


Camel 和rabbitmq 集成处理

camel可以看作是java中的一个编程范式,通过DSL语言来编写路由规则。

camel 核心

camel是一个基于规则路由和处理的引擎,提供企业集成模式的Java对象的实现,通过应用程序接口或称为陈述式的Java领域特定语言(DSL)来配置路由和处理的规则。
其核心的思想就是从一个from源头得到数据,通过processor处理,再发到一个to目的的.

发散:像是网络中超强的路由器规则,是否可以看做是软件定义的交换机

这里的from和to可以是我们在项目集成中经常碰到的类型:一个FTP文件夹中的文件,一个MQ的queue,一个HTTP request/response,一个webservice等等。

现在的问题是 什么是java领域特定语言(DSL)?

Camel uses a Java Domain Specific Language or DSL for creating Enterprise Integration Patterns or Routes in a variety of domain-specific languages (DSL) as listed below.
Java DSL - A Java based DSL using the fluent builder style.
Spring XML - A XML based DSL in Spring XML files
Blueprint XML - A XML based DSL in OSGi Blueprint XML files
Rest DSL - A DSL to define REST services using a REST style in either Java or XML.
Groovy DSL - A Groovy based DSL using Groovy programming language
Scala DSL - A Scala based DSL using Scala programming language
Annotation DSL - Use annotations in Java beans.
Kotlin DSL - Work in progress - Currently developed outside ASF, but will we included later in Camel when Kotlin and the DSL is ready.
The main entry points for the DSL are
CamelContext for creating a Camel routing rulebase
RouteBuilder for creating a collection of routes using the routing DSL—http://camel.apache.org/dsl.html
也就是说camel可以使用多种方式配置,包括Java xml等。

这里主要关注java DSL,java DSL的语法主要是什么呢?
RouteBuilder类就是Java的smart DSL。

public abstract class RouteBuilder
extends BuilderSupport
implements RoutesBuilder
A Java DSL which is used to build DefaultRoute instances in a CamelContext for smart routing.

主要的方法有:

public abstract void configure()
throws Exception
Called on initialization to build the routes using the fluent builder syntax.
This is a central method for RouteBuilder implementations to implement the routes using the Java fluent builder syntax.

configure()就是主要的实现路由语法的函数。
主要的路由配置的方法我整理成了一张表格:

语法 作用 语法
from 路由的起点 from(String uri),返回the builder
to 路由的终点 参数String uri,返回the builder
choice(when,otherwise) 选择某个内容 以choice开头,when表示某个条件
fromF/toF 使用String formatting创建URI 和from/to类似
filter 选择某个内容 过滤想要的内容
process 对内容进行一定的处理 需要编写继承自Processor接口的类,实现process方法,完成对Exchanage(交换内容)处理的逻辑

Processer接口用于处理消息转换或者消息交换。

来自网络的一个简单例子:
Camel使用场景
这里实现了一个简单的路由,这里的起点是DefaultCamelConetxt,那么DefaultCamelConetxt这个类是怎样的?

public class DefaultCamelContext
extends ServiceSupport
implements ModelCamelContext, SuspendableService
Represents the context used to configure routes and the policies to use.

也就是说DefaultCamelContext类代表了配置camel路由的上下文,和一些必要的策略,DefaultCamelContext继承自Service,可以使用start,stop等基本的管理生命周期的方法。
让camel引擎开始工作使用start方法,对应的停止工作的方法是stop。使用addRoutes方法给camelContext添加对应的路由。
下图表示了一个继承自Processor类的基本的实现,
Camel使用场景
主要的逻辑是对Exchange进行操作,来达到想要的目的。上图的逻辑是将a文件所有的行连成一行,以空格分隔组成一个新的文件给to对应的终点uri。
前面讲了一个简单的这里,到这个时候,还需要补充一些基本的概念。

camel一些定义

补充的概念有Uris,EndPoint,Exchange,Message等

关于uris的官方解释: Camel makes extensive use of URIs to allow you to refer to endpoints which are lazily created by a Component if you refer to them within Routes.

意思是,uris的作用就是用来指向组件的endpoint的一种表示方法。

那么endpoint呢?

官方解释: public interface Endpoint extends IsSingleton, Service
An endpoint implements the Message Endpoint pattern and represents an endpoint that can send and receive message exchanges

endpoint就是一个可以收发消息的组件,组件component就是一个一个的camel组件的术语代表。

那么,message呢?

public interface Exchange An Exchange is the message container holdingthe information during the entire routing of a Message received by a Consumer.

public interface Message Implements the Message pattern and represents an inbound or outbound message as part of an Exchange.

Exchange是message容器,message有属性inbound和outbound,是Exchange的一部分。比如上面的处理器使用exchange的getIn()方法可以得到message本身。

目前我写的一系列文章都是讲的clojure,比如语法和数据处理,同样我本文的目的依旧是讲camel在clojure中的应用,讲的更具体一点的话,其实是谈论camel在clojure 微服务中的使用方法。