disruptor demo(三) 复杂一点的例子



disruptor demo(三) 复杂一点的例子

从中图可以看出需求是介样子的:生产者生产数据经过C1,C2处理完成后再到C3。

假设如下场景:

1、交易网关收到交易(P1)把交易数据发到RingBuffer中,

2、负责处理增值业务的消费者C1和负责数据存储的消费者C2负责处理交易

3、负责发送JMS消息的消费者C3在C1和C2处理完成后再进行处理。



1.

[java] view plain copy
  1. package p1;  
  2. //POJO 交易类  
  3. public class TradeTransaction {  
  4.     private String id; //交易ID  
  5.     private double price;//交易金额  
  6.       
  7.     public TradeTransaction(){}  
  8.       
  9.     public TradeTransaction(String id,double price){  
  10.         super();  
  11.         this.id = id;  
  12.         this.price = price;  
  13.     }  
  14.   
  15.     public String getId() {  
  16.         return id;  
  17.     }  
  18.   
  19.     public void setId(String id) {  
  20.         this.id = id;  
  21.     }  
  22.   
  23.     public double getPrice() {  
  24.         return price;  
  25.     }  
  26.   
  27.     public void setPrice(double price) {  
  28.         this.price = price;  
  29.     }  
  30.       
  31. }  


2.事件处理器(消费者)

[java] view plain copy
  1. package p1;  
  2. import java.util.UUID;  
  3.   
  4.   
  5. import com.lmax.disruptor.EventHandler;  
  6. import com.lmax.disruptor.WorkHandler;  
  7.   
  8.   
  9. public class TradeTransactionInDBHandler implements EventHandler<TradeTransaction>{  
  10.   
  11.     @Override  
  12.     public void onEvent(TradeTransaction event, long sequence, boolean endOfBatch) throws Exception {  
  13.         this.onEvent(event);  
  14.     }  
  15.   
  16.     public void onEvent(TradeTransaction event) throws Exception {  
  17.         event.setId(UUID.randomUUID().toString());  
  18.         System.out.println(event.getId());  
  19.           
  20.     }  
  21.   
  22. }  
[java] view plain copy
  1. </pre><p></p><p><pre name="code" class="java">package p1;  
  2.   
  3. import com.lmax.disruptor.EventHandler;  
  4.   
  5. public class TradeTransactionVasConsumer implements EventHandler<TradeTransaction> {  
  6.   
  7.     @Override  
  8.     public void onEvent(TradeTransaction event, long sequence,  
  9.             boolean endOfBatch) throws Exception {  
  10.         //do something....  
  11.     }  
  12.       
  13. }  


[java] view plain copy
  1. package p1;  
  2.   
  3. import com.lmax.disruptor.EventHandler;  
  4.   
  5. public class TradeTransactionJMSNotifyHandler implements EventHandler<TradeTransaction>{  
  6.   
  7.     @Override  
  8.     public void onEvent(TradeTransaction event, long sequence, boolean endOfBatch) throws Exception {  
  9.           
  10.     }  
  11.   
  12. }  



3.生产者

[java] view plain copy
  1. package p1;  
  2.   
  3. import java.util.Random;  
  4.   
  5. import com.lmax.disruptor.EventTranslator;  
  6.   
  7. public class TradeTransactionEventTranslator implements EventTranslator<TradeTransaction>{  
  8.   
  9.     private Random random=new Random();  
  10.     @Override  
  11.     public void translateTo(TradeTransaction event, long sequence) {  
  12.         this.generateTradeTransaction(event);  
  13.     }  
  14.     private TradeTransaction generateTradeTransaction(TradeTransaction trade){  
  15.         trade.setPrice(random.nextDouble()*9999);  
  16.         return trade;  
  17.     }  
  18. }  

[java] view plain copy
  1. package p1;  
  2.   
  3. import java.util.concurrent.CountDownLatch;  
  4.   
  5. import com.lmax.disruptor.dsl.Disruptor;  
  6.   
  7. public class TradeTransactionPublisher implements Runnable{  
  8.     Disruptor<TradeTransaction> disruptor;  
  9.     private CountDownLatch latch;  
  10.     private static int LOOP=10000000;//模拟一千万次交易的发生  
  11.       
  12.     public TradeTransactionPublisher(CountDownLatch latch,Disruptor<TradeTransaction> disruptor){  
  13.             this.disruptor = disruptor;  
  14.             this.latch = latch;  
  15.     }  
  16.     @Override  
  17.     public void run() {  
  18.         TradeTransactionEventTranslator tradeTransloator=new TradeTransactionEventTranslator();  
  19.         for(int i=0;i<LOOP;i++){  
  20.             disruptor.publishEvent(tradeTransloator);  
  21.         }  
  22.         latch.countDown();  
  23.     }  
  24. }  


4.demo类

[java] view plain copy
  1. package p1;  
  2.   
  3. import java.util.concurrent.CountDownLatch;  
  4. import java.util.concurrent.ExecutorService;  
  5. import java.util.concurrent.Executors;  
  6.   
  7. import com.lmax.disruptor.BusySpinWaitStrategy;  
  8. import com.lmax.disruptor.EventFactory;  
  9. import com.lmax.disruptor.dsl.Disruptor;  
  10. import com.lmax.disruptor.dsl.EventHandlerGroup;  
  11. import com.lmax.disruptor.dsl.ProducerType;  
  12.   
  13. public class Demo3 {  
  14.     public static void main(String[] args) throws Exception{  
  15.         long beginTime = System.currentTimeMillis();  
  16.           
  17.         int bufferSize = 1024;  
  18.           
  19.         ExecutorService executor=Executors.newFixedThreadPool(4);  
  20.           
  21.         Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() {  
  22.             @Override  
  23.             public TradeTransaction newInstance() {  
  24.                 return new TradeTransaction();  
  25.             }  
  26.         }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
  27.           
  28.         EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler());  
  29.       
  30.         TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler();  
  31.         //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3  
  32.         handlerGroup.then(jmsConsumer);  
  33.           
  34.         disruptor.start();  
  35.         CountDownLatch latch = new CountDownLatch(1);  
  36.         executor.submit(new TradeTransactionPublisher(latch, disruptor));  
  37.         latch.await();//等待生产者完事.  
  38.         disruptor.shutdown();  
  39.         executor.shutdown();  
  40.           
  41.         System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
  42.     }  


disruptor demo(三) 复杂一点的例子

从中图可以看出需求是介样子的:生产者生产数据经过C1,C2处理完成后再到C3。

假设如下场景:

1、交易网关收到交易(P1)把交易数据发到RingBuffer中,

2、负责处理增值业务的消费者C1和负责数据存储的消费者C2负责处理交易

3、负责发送JMS消息的消费者C3在C1和C2处理完成后再进行处理。



1.

[java] view plain copy
  1. package p1;  
  2. //POJO 交易类  
  3. public class TradeTransaction {  
  4.     private String id; //交易ID  
  5.     private double price;//交易金额  
  6.       
  7.     public TradeTransaction(){}  
  8.       
  9.     public TradeTransaction(String id,double price){  
  10.         super();  
  11.         this.id = id;  
  12.         this.price = price;  
  13.     }  
  14.   
  15.     public String getId() {  
  16.         return id;  
  17.     }  
  18.   
  19.     public void setId(String id) {  
  20.         this.id = id;  
  21.     }  
  22.   
  23.     public double getPrice() {  
  24.         return price;  
  25.     }  
  26.   
  27.     public void setPrice(double price) {  
  28.         this.price = price;  
  29.     }  
  30.       
  31. }  


2.事件处理器(消费者)

[java] view plain copy
  1. package p1;  
  2. import java.util.UUID;  
  3.   
  4.   
  5. import com.lmax.disruptor.EventHandler;  
  6. import com.lmax.disruptor.WorkHandler;  
  7.   
  8.   
  9. public class TradeTransactionInDBHandler implements EventHandler<TradeTransaction>{  
  10.   
  11.     @Override  
  12.     public void onEvent(TradeTransaction event, long sequence, boolean endOfBatch) throws Exception {  
  13.         this.onEvent(event);  
  14.     }  
  15.   
  16.     public void onEvent(TradeTransaction event) throws Exception {  
  17.         event.setId(UUID.randomUUID().toString());  
  18.         System.out.println(event.getId());  
  19.           
  20.     }  
  21.   
  22. }  
[java] view plain copy
  1. </pre><p></p><p><pre name="code" class="java">package p1;  
  2.   
  3. import com.lmax.disruptor.EventHandler;  
  4.   
  5. public class TradeTransactionVasConsumer implements EventHandler<TradeTransaction> {  
  6.   
  7.     @Override  
  8.     public void onEvent(TradeTransaction event, long sequence,  
  9.             boolean endOfBatch) throws Exception {  
  10.         //do something....  
  11.     }  
  12.       
  13. }  


[java] view plain copy
  1. package p1;  
  2.   
  3. import com.lmax.disruptor.EventHandler;  
  4.   
  5. public class TradeTransactionJMSNotifyHandler implements EventHandler<TradeTransaction>{  
  6.   
  7.     @Override  
  8.     public void onEvent(TradeTransaction event, long sequence, boolean endOfBatch) throws Exception {  
  9.           
  10.     }  
  11.   
  12. }  



3.生产者

[java] view plain copy
  1. package p1;  
  2.   
  3. import java.util.Random;  
  4.   
  5. import com.lmax.disruptor.EventTranslator;  
  6.   
  7. public class TradeTransactionEventTranslator implements EventTranslator<TradeTransaction>{  
  8.   
  9.     private Random random=new Random();  
  10.     @Override  
  11.     public void translateTo(TradeTransaction event, long sequence) {  
  12.         this.generateTradeTransaction(event);  
  13.     }  
  14.     private TradeTransaction generateTradeTransaction(TradeTransaction trade){  
  15.         trade.setPrice(random.nextDouble()*9999);  
  16.         return trade;  
  17.     }  
  18. }  

[java] view plain copy
  1. package p1;  
  2.   
  3. import java.util.concurrent.CountDownLatch;  
  4.   
  5. import com.lmax.disruptor.dsl.Disruptor;  
  6.   
  7. public class TradeTransactionPublisher implements Runnable{  
  8.     Disruptor<TradeTransaction> disruptor;  
  9.     private CountDownLatch latch;  
  10.     private static int LOOP=10000000;//模拟一千万次交易的发生  
  11.       
  12.     public TradeTransactionPublisher(CountDownLatch latch,Disruptor<TradeTransaction> disruptor){  
  13.             this.disruptor = disruptor;  
  14.             this.latch = latch;  
  15.     }  
  16.     @Override  
  17.     public void run() {  
  18.         TradeTransactionEventTranslator tradeTransloator=new TradeTransactionEventTranslator();  
  19.         for(int i=0;i<LOOP;i++){  
  20.             disruptor.publishEvent(tradeTransloator);  
  21.         }  
  22.         latch.countDown();  
  23.     }  
  24. }  


4.demo类

[java] view plain copy
  1. package p1;  
  2.   
  3. import java.util.concurrent.CountDownLatch;  
  4. import java.util.concurrent.ExecutorService;  
  5. import java.util.concurrent.Executors;  
  6.   
  7. import com.lmax.disruptor.BusySpinWaitStrategy;  
  8. import com.lmax.disruptor.EventFactory;  
  9. import com.lmax.disruptor.dsl.Disruptor;  
  10. import com.lmax.disruptor.dsl.EventHandlerGroup;  
  11. import com.lmax.disruptor.dsl.ProducerType;  
  12.   
  13. public class Demo3 {  
  14.     public static void main(String[] args) throws Exception{  
  15.         long beginTime = System.currentTimeMillis();  
  16.           
  17.         int bufferSize = 1024;  
  18.           
  19.         ExecutorService executor=Executors.newFixedThreadPool(4);  
  20.           
  21.         Disruptor<TradeTransaction> disruptor=new Disruptor<TradeTransaction>(new EventFactory<TradeTransaction>() {  
  22.             @Override  
  23.             public TradeTransaction newInstance() {  
  24.                 return new TradeTransaction();  
  25.             }  
  26.         }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
  27.           
  28.         EventHandlerGroup<TradeTransaction> handlerGroup=disruptor.handleEventsWith(new TradeTransactionVasConsumer(),new TradeTransactionInDBHandler());  
  29.       
  30.         TradeTransactionJMSNotifyHandler jmsConsumer=new TradeTransactionJMSNotifyHandler();  
  31.         //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3  
  32.         handlerGroup.then(jmsConsumer);  
  33.           
  34.         disruptor.start();  
  35.         CountDownLatch latch = new CountDownLatch(1);  
  36.         executor.submit(new TradeTransactionPublisher(latch, disruptor));  
  37.         latch.await();//等待生产者完事.  
  38.         disruptor.shutdown();  
  39.         executor.shutdown();  
  40.           
  41.         System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
  42.     }