Disruptor启动过程源码分析(一)
首先准备一个可以跑起来的demo
OrderEvent.java
public class OrderEvent {
private long value;
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
OrderEventFactory.java
public class OrderEventFactory implements EventFactory<OrderEvent> {
public OrderEvent newInstance() {
//这个方法就是为了返回空的数据对象(Event)
return new OrderEvent();
}
}
OrderEventHandler.java
public class OrderEventHandler implements EventHandler<OrderEvent>{
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
// Thread.sleep(Integer.MAX_VALUE);
System.err.println("消费者: " + event.getValue());
}
}
OrderEventProducer.java
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(ByteBuffer data) {
//1 在生产者发送消息的时候, 首先 需要从我们的ringBuffer里面 获取一个可用的序号
long sequence = ringBuffer.next(); //0
try {
//2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
OrderEvent event = ringBuffer.get(sequence);
//3 进行实际的赋值处理
event.setValue(data.getLong(0));
} finally {
//4 提交发布操作
ringBuffer.publish(sequence);
}
}
}
Main.java
public class Main {
public static void main(String[] args) {
// 参数准备工作
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 1024*1024;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 1 eventFactory: 消息(event)工厂对象
* 2 ringBufferSize: 容器的长度
* 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
* 4 ProducerType: 单生产者 还是 多生产者
* 5 waitStrategy: 等待策略
*/
//1. 实例化disruptor对象
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
ringBufferSize,
executor,
ProducerType.SINGLE,
new BlockingWaitStrategy());
//2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
disruptor.handleEventsWith(new OrderEventHandler());
//3. 启动disruptor
disruptor.start();
//4. 获取实际存储数据的容器: RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for(long i = 0 ; i < 5; i ++){
bb.putLong(0, i);
producer.sendData(bb);
}
disruptor.shutdown();
executor.shutdown();
}
}
本例子中,disruptor的实例化在Main.java中
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
ringBufferSize,
executor,
ProducerType.SINGLE,
new BlockingWaitStrategy());
点进源码看:
/**
* Create a new Disruptor.
*
* @param eventFactory 消息工厂对象
* @param ringBufferSize RingBuffer容器长度
* @param executor 线程池
* @param producerType 生产者类型:单生产者or多
* @param waitStrategy 等待策略
*/
public Disruptor(final EventFactory<T> eventFactory,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
executor);
}
继续点开this,发现进入了一个构造函数
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
{
this.ringBuffer = ringBuffer;
this.executor = executor;
}
可以看到,传入参数中的productType,eventFactory,ringBufferSize,waitStrategy是用于实例化一个RingBuffer,RingBuffer是通过create来实例化一个RingBuffer对象,让我们点开Ringbuffer的create看看
public static <E> RingBuffer<E> create(ProducerType producerType,
EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
switch (producerType)
{
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
这里主要是判断是创建单生产者还是多生产者,我们以单生产者为例继续追createSingleProduct方法
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory,
int bufferSize,
WaitStrategy waitStrategy)
{
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
这里看到首先实例化一个SingleProducerSequencer,首先我们看看这个类有什么方法
这个SingleProducerSequencer按照字面解释是一个单生产者的序号器。我们知道RingBuffer是一个环形数组,生产者和消费者在这个环形数组上存取数据,这个SingleProducerSequencer的next是用于获取获取生产者的下一个可用的序号。这里一直追构造函数可以知道我们开始传入的bufferSize和waitStragety是用于构造这个SingleProducerSequencer类。
这里顺带提一下,SingleProducerSequencer定义了7个long类型的字段,然后继承的父类又定义了7个long类型的字段,这里的作用是用于填充缓存行,消除伪共享,想具体了解参考网上其他资料,这里不详聊。
现在让我们回到RingBuffer类中的createSingleProduct方法,实例化好SingleProducerSequencer,把他传入给RingBuffer的构造函数,我们继续追:
RingBuffer(EventFactory<E> eventFactory,
Sequencer sequencer)
{
super(eventFactory, sequencer);
}
点开super看基类:
RingBufferFields(EventFactory<E> eventFactory,
Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
可以看到,这就是我们需要传入一个事件工厂进去的原因:RingBuffer在初始化的时候会创建一个空对象数组,这里的空不是数组为空,数组里面的对象的值为空。这样的话就避免了后期创建数据的消耗,只需要对现有的数据通过序号取出来,然后填充数据即可。
本人刚接触源码不久,如有不正确的地方,请指出我会及时改正,谢谢大家。