源码分析(spring alibaba 源码分析之 sentinel)

前两篇文章只画了流程图,发现反响不够好,没几个阅读量,今天我把看源码的方式也记录下,详细解析下主线逻辑。

sentinel实现其实就是一个大的Try catch ,在StatisticSlot保存各种指标,在其他slot中进行判断,通过就继续执行其他slot,不能通过就抛出异常,达到阈值就进行限流或者降级操作。

先上一个源码分析的流程图

源码分析(spring alibaba 源码分析之 sentinel)

1. sentinel是通过aop实现的它的注解是@SentinelResource那我们就找他的实现

源码分析(spring alibaba 源码分析之 sentinel)

2. 找到SentinelResourceAspect类的invokeResourceWithSentinel方法就是具体实现。

public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
    Method originMethod = this.resolveMethod(pjp);
    SentinelResource annotation = (SentinelResource)originMethod.getAnnotation(SentinelResource.class);
    if (annotation == null) {
        throw new IllegalStateException("Wrong state for SentinelResource annotation");
    } else {
        String resourceName = this.getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        Entry entry = null;

        try {
            Object var18;
            try {
                entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
                Object result = pjp.proceed();
                var18 = result;
                return var18;
            } catch (BlockException var15) {
                var18 = this.handleBlockException(pjp, annotation, var15);
                return var18;
            } catch (Throwable var16) {
                Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
                if (exceptionsToIgnore.length > 0 && this.exceptionBelongsTo(var16, exceptionsToIgnore)) {
                    throw var16;
                } else if (this.exceptionBelongsTo(var16, annotation.exceptionsToTrace())) {
                    this.traceException(var16, annotation);
                    Object var10 = this.handleFallback(pjp, annotation, var16);
                    return var10;
                } else {
                    throw var16;
                }
            }
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }

        }
    }
}

 

3 调用SphU.entry方法,一步步跟到具体实现,找到CtSph类的entryWithPriority方法。

获取slot的方法。

ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

 

public static ProcessorSlotChain newSlotChain() {
    if (builder != null) {
        return builder.build();
    }

    resolveSlotChainBuilder();

    if (builder == null) {
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        builder = new DefaultSlotChainBuilder();
    }
    return builder.build();
}

 

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

}

源码分析(spring alibaba 源码分析之 sentinel)

以上逻辑新建了8个slot,以及一个链条,这个就是sentinel核心实现。

回到entryWithPriority继续向下走。

try {
    chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
    e.exit(count, args);
    throw e1;
} catch (Throwable e1) {
    // This should not happen, unless there are errors existing in Sentinel internal.
    RecordLog.info("Sentinel unexpected exception", e1);
}

执行

DefaultProcessorSlotChain.entry方法。

执行AbstractLinkedProcessorSlot.fireEntry方法

执行transformEntry方法。

transformEntry方法里面有entry,此时会一次执行(slot上面已经new出来了)slot的entry方法,如果在配置的各种指标内,会执行完成,如果大于配置的指标数据,会抛出对应异常。

源码分析(spring alibaba 源码分析之 sentinel)

 

实现流控/熔断/降级的就三个slot,我们就着重看着三个slot。

StatisticSlot是个统计指标的实现类,这里创建了sentinel的滑动时间窗口,我们来看看他是怎么实现的。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    Iterator var8;
    ProcessorSlotEntryCallback handler;
    try {
        this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
        node.increaseThreadNum();
        node.addPassRequest(count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

        while(var13.hasNext()) {
            ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next();
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException var10) {
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseThreadNum();
        }

        var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

        while(var8.hasNext()) {
            handler = (ProcessorSlotEntryCallback)var8.next();
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException var11) {
        BlockException e = var11;
        context.getCurEntry().setError(var11);
        node.increaseBlockQps(count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseBlockQps(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseBlockQps(count);
        }

        var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

        while(var8.hasNext()) {
            handler = (ProcessorSlotEntryCallback)var8.next();
            handler.onBlocked(e, context, resourceWrapper, node, count, args);
        }

        throw e;
    } catch (Throwable var12) {
        context.getCurEntry().setError(var12);
        node.increaseExceptionQps(count);
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseExceptionQps(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseExceptionQps(count);
        }

        throw var12;
    }

}

从代码可以看出,首先执行了this.fireEntry方法向下执行,如果执行成功对各种指标(执行成功数,执行线程数做++),如果执行失败对异常指标++;

 

这里面需要插入一段滑动时间窗口的实现。

StatisticNode类中新建了一个rollingCounterInSecond

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL);

SampleCountProperty.SAMPLE_COUNT 默认2

IntervalProperty.INTERVAL默认1000

我们去看看他的具体实现

public LeapArray(int sampleCount, int intervalInMs) {
    AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
    AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
    AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

    this.windowLengthInMs = intervalInMs / sampleCount;
    this.intervalInMs = intervalInMs;
    this.sampleCount = sampleCount;

    this.array = new AtomicReferenceArray<>(sampleCount);
}

这段代码的意思就是创建了两个窗口。

源码分析(spring alibaba 源码分析之 sentinel)

 

node.addPassRequest(count);
this.clusterNode.addPassRequest(count);
@Override
public void addPassRequest(int count) {
    rollingCounterInSecond.addPass(count);
    rollingCounterInMinute.addPass(count);
}

 

@Override
public void addPass(int count) {
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    wrap.value().addPass(count);
}

这个就是获取时间窗口的方法

/**
 * Get bucket item at provided timestamp.
 *
 * @param timeMillis a valid timestamp in milliseconds
 * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
 */
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }

    int idx = calculateTimeIdx(timeMillis);
    // Calculate current bucket start time.
    long windowStart = calculateWindowStart(timeMillis);

    /*
     * Get bucket item at given time from the array.
     *
     * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
     * (2) Bucket is up-to-date, then just return the bucket.
     * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
     */
    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            /*
             *     B0       B1      B2    NULL      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            bucket is empty, so create new and update
             *
             * If the old bucket is absent, then we create a new bucket at {@code windowStart},
             * then try to update circular array via a CAS operation. Only one thread can
             * succeed to update, while other threads yield its time slice.
             */
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                // Successfully updated, return the created bucket.
                return window;
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            /*
             *     B0       B1      B2     B3      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            startTime of Bucket 3: 800, so it's up-to-date
             *
             * If current {@code windowStart} is equal to the start timestamp of old bucket,
             * that means the time is within the bucket, so directly return the bucket.
             */
            return old;
        } else if (windowStart > old.windowStart()) {
            /*
             *   (old)
             *             B0       B1      B2    NULL      B4
             * |_______||_______|_______|_______|_______|_______||___
             * ...    1200     1400    1600    1800    2000    2200  timestamp
             *                              ^
             *                           time=1676
             *          startTime of Bucket 2: 400, deprecated, should be reset
             *
             * If the start timestamp of old bucket is behind provided time, that means
             * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
             * Note that the reset and clean-up operations are hard to be atomic,
             * so we need a update lock to guarantee the correctness of bucket update.
             *
             * The update lock is conditional (tiny scope) and will take effect only when
             * bucket is deprecated, so in most cases it won't lead to performance loss.
             */
            if (updateLock.tryLock()) {
                try {
                    // Successfully get the update lock, now we reset the bucket.
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

 

我们再看看

FlowSlot的实现
获取时间窗口跟阈值进行判断
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
有三个对应的实现,有金币算法,漏斗算法,限流算法
rule.getRater().canPass(selectedNode, acquireCount, prioritized)

 

再看看

DegradeSlot的实现
也是各种阈值判断,不通过就抛出异常。
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);