Disruptor启动过程源码分析(一)

首先准备一个可以跑起来的demo

OrderEvent.java
public class OrderEvent {

	private long value; 

	public long getValue() {
		return value;
	}

	public void setValue(long value) {
		this.value = value;
	}
	
}
OrderEventFactory.java
public class OrderEventFactory implements EventFactory<OrderEvent> {

    public OrderEvent newInstance() {
        //这个方法就是为了返回空的数据对象(Event)
        return new OrderEvent();
    }

}
OrderEventHandler.java
public class OrderEventHandler implements EventHandler<OrderEvent>{

	@Override
	public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
//		Thread.sleep(Integer.MAX_VALUE);
		System.err.println("消费者: " + event.getValue());
	}

}
OrderEventProducer.java
public class OrderEventProducer {

    private RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void sendData(ByteBuffer data) {
        //1 在生产者发送消息的时候, 首先 需要从我们的ringBuffer里面 获取一个可用的序号
        long sequence = ringBuffer.next();    //0	
        try {
            //2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
            OrderEvent event = ringBuffer.get(sequence);
            //3 进行实际的赋值处理
            event.setValue(data.getLong(0));
        } finally {
            //4 提交发布操作
            ringBuffer.publish(sequence);
        }
    }
}

Main.java

public class Main {

	
	
	public static void main(String[] args) {
		
		
		// 参数准备工作
		OrderEventFactory orderEventFactory = new OrderEventFactory();
		int ringBufferSize = 1024*1024;
		ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
		
		/**
		 * 1 eventFactory: 消息(event)工厂对象
		 * 2 ringBufferSize: 容器的长度
		 * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
		 * 4 ProducerType: 单生产者 还是 多生产者
		 * 5 waitStrategy: 等待策略
		 */
		//1. 实例化disruptor对象
		Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
				ringBufferSize,
				executor,
				ProducerType.SINGLE,
				new BlockingWaitStrategy());
		
		//2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
		disruptor.handleEventsWith(new OrderEventHandler());
		
		//3. 启动disruptor
		disruptor.start();
		
		//4. 获取实际存储数据的容器: RingBuffer
		RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
		
		OrderEventProducer producer = new OrderEventProducer(ringBuffer);
		
		ByteBuffer bb = ByteBuffer.allocate(8);
		
		for(long i = 0 ; i < 5; i ++){
			bb.putLong(0, i);
			producer.sendData(bb);
		}
		
		disruptor.shutdown();
		executor.shutdown();
		
	}
}

 

本例子中,disruptor的实例化在Main.java中

Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
				ringBufferSize,
				executor,
				ProducerType.SINGLE,
				new BlockingWaitStrategy());

点进源码看:

/**
     * Create a new Disruptor.
     *
     * @param eventFactory   消息工厂对象
     * @param ringBufferSize RingBuffer容器长度
     * @param executor       线程池
     * @param producerType   生产者类型:单生产者or多
     * @param waitStrategy   等待策略
     */
    public Disruptor(final EventFactory<T> eventFactory,
                     final int ringBufferSize,
                     final Executor executor,
                     final ProducerType producerType,
                     final WaitStrategy waitStrategy)
    {
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
             executor);
    }

继续点开this,发现进入了一个构造函数

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
    {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }

可以看到,传入参数中的productType,eventFactory,ringBufferSize,waitStrategy是用于实例化一个RingBuffer,RingBuffer是通过create来实例化一个RingBuffer对象,让我们点开Ringbuffer的create看看

public static <E> RingBuffer<E> create(ProducerType    producerType,
                                           EventFactory<E> factory,
                                           int             bufferSize,
                                           WaitStrategy    waitStrategy)
    {
        switch (producerType)
        {
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
        }
    }

这里主要是判断是创建单生产者还是多生产者,我们以单生产者为例继续追createSingleProduct方法

 public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory,
                                                         int             bufferSize,
                                                         WaitStrategy    waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }

这里看到首先实例化一个SingleProducerSequencer,首先我们看看这个类有什么方法

Disruptor启动过程源码分析(一)

这个SingleProducerSequencer按照字面解释是一个单生产者的序号器。我们知道RingBuffer是一个环形数组,生产者和消费者在这个环形数组上存取数据,这个SingleProducerSequencer的next是用于获取获取生产者的下一个可用的序号。这里一直追构造函数可以知道我们开始传入的bufferSize和waitStragety是用于构造这个SingleProducerSequencer类。

这里顺带提一下,SingleProducerSequencer定义了7个long类型的字段,然后继承的父类又定义了7个long类型的字段,这里的作用是用于填充缓存行,消除伪共享,想具体了解参考网上其他资料,这里不详聊。

现在让我们回到RingBuffer类中的createSingleProduct方法,实例化好SingleProducerSequencer,把他传入给RingBuffer的构造函数,我们继续追:

  RingBuffer(EventFactory<E> eventFactory,
               Sequencer       sequencer)
    {
        super(eventFactory, sequencer);
    }

点开super看基类:

 RingBufferFields(EventFactory<E> eventFactory,
                     Sequencer       sequencer)
    {
        this.sequencer  = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        this.entries   = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }


 private void fill(EventFactory<E> eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

可以看到,这就是我们需要传入一个事件工厂进去的原因:RingBuffer在初始化的时候会创建一个空对象数组,这里的空不是数组为空,数组里面的对象的值为空。这样的话就避免了后期创建数据的消耗,只需要对现有的数据通过序号取出来,然后填充数据即可。

 

本人刚接触源码不久,如有不正确的地方,请指出我会及时改正,谢谢大家。