源码分析(spring alibaba 源码分析之 sentinel)
前两篇文章只画了流程图,发现反响不够好,没几个阅读量,今天我把看源码的方式也记录下,详细解析下主线逻辑。
sentinel实现其实就是一个大的Try catch ,在StatisticSlot保存各种指标,在其他slot中进行判断,通过就继续执行其他slot,不能通过就抛出异常,达到阈值就进行限流或者降级操作。
先上一个源码分析的流程图
1. sentinel是通过aop实现的它的注解是@SentinelResource那我们就找他的实现
2. 找到SentinelResourceAspect类的invokeResourceWithSentinel方法就是具体实现。
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = this.resolveMethod(pjp); SentinelResource annotation = (SentinelResource)originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { throw new IllegalStateException("Wrong state for SentinelResource annotation"); } else { String resourceName = this.getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { Object var18; try { entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); Object result = pjp.proceed(); var18 = result; return var18; } catch (BlockException var15) { var18 = this.handleBlockException(pjp, annotation, var15); return var18; } catch (Throwable var16) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); if (exceptionsToIgnore.length > 0 && this.exceptionBelongsTo(var16, exceptionsToIgnore)) { throw var16; } else if (this.exceptionBelongsTo(var16, annotation.exceptionsToTrace())) { this.traceException(var16, annotation); Object var10 = this.handleFallback(pjp, annotation, var16); return var10; } else { throw var16; } } } finally { if (entry != null) { entry.exit(1, pjp.getArgs()); } } } }
3 调用SphU.entry方法,一步步跟到具体实现,找到CtSph类的entryWithPriority方法。
获取slot的方法。
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
public static ProcessorSlotChain newSlotChain() { if (builder != null) { return builder.build(); } resolveSlotChainBuilder(); if (builder == null) { RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); builder = new DefaultSlotChainBuilder(); } return builder.build(); }
public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } }
以上逻辑新建了8个slot,以及一个链条,这个就是sentinel核心实现。
回到entryWithPriority继续向下走。
try { chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); }
执行
DefaultProcessorSlotChain.entry方法。
执行AbstractLinkedProcessorSlot.fireEntry方法
执行transformEntry方法。
transformEntry方法里面有entry,此时会一次执行(slot上面已经new出来了)slot的entry方法,如果在配置的各种指标内,会执行完成,如果大于配置的指标数据,会抛出对应异常。
实现流控/熔断/降级的就三个slot,我们就着重看着三个slot。
StatisticSlot是个统计指标的实现类,这里创建了sentinel的滑动时间窗口,我们来看看他是怎么实现的。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { Iterator var8; ProcessorSlotEntryCallback handler; try { this.fireEntry(context, resourceWrapper, node, count, prioritized, args); node.increaseThreadNum(); node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator(); while(var13.hasNext()) { ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next(); handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException var10) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); } var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator(); while(var8.hasNext()) { handler = (ProcessorSlotEntryCallback)var8.next(); handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException var11) { BlockException e = var11; context.getCurEntry().setError(var11); node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseBlockQps(count); } var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator(); while(var8.hasNext()) { handler = (ProcessorSlotEntryCallback)var8.next(); handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable var12) { context.getCurEntry().setError(var12); node.increaseExceptionQps(count); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseExceptionQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseExceptionQps(count); } throw var12; } }
从代码可以看出,首先执行了this.fireEntry方法向下执行,如果执行成功对各种指标(执行成功数,执行线程数做++),如果执行失败对异常指标++;
这里面需要插入一段滑动时间窗口的实现。
在StatisticNode类中新建了一个rollingCounterInSecond
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
SampleCountProperty.SAMPLE_COUNT 默认2
IntervalProperty.INTERVAL默认1000
我们去看看他的具体实现
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
这段代码的意思就是创建了两个窗口。
node.addPassRequest(count);
this.clusterNode.addPassRequest(count);
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
这个就是获取时间窗口的方法
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
我们再看看
FlowSlot的实现
获取时间窗口跟阈值进行判断 checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
有三个对应的实现,有金币算法,漏斗算法,限流算法 rule.getRater().canPass(selectedNode, acquireCount, prioritized)
再看看
DegradeSlot的实现
也是各种阈值判断,不通过就抛出异常。 DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);