Feign-负载均衡实现
FeignAutoConfiguration类配置了Client的类型(包括HttpURLConnection、HttpClient、OkHttp),最终向容器注入的是Client的实现类LoadBalancerFeignClient类中的execute()方法,即执行请求的方法,代码如下:
public Response execute(Request request, Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
RibbonRequest ribbonRequest = new RibbonRequest(this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = this.getClientConfig(options, clientName);
return ((RibbonResponse)this.lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig)).toResponse();
} catch (ClientException var8) {
IOException io = this.findIOException(var8);
if (io != null) {
throw io;
} else {
throw new RuntimeException(var8);
}
}
}
其中有一个executeWithLoadBalancer()方法,即通过负载均很的方式来执行网络请求,代码如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand command = this.buildLoadBalancerCommand(request, requestConfig);
try {
return (IResponse)command.submit(new ServerOperation<T>() {
public Observable<T> call(Server server) {
URI finalUri = AbstractLoadBalancerAwareClient.this.reconstructURIWithServer(server, request.getUri());
ClientRequest requestForServer = request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception var5) {
return Observable.error(var5);
}
}
}).toBlocking().single();
} catch (Exception var6) {
Throwable t = var6.getCause();
if (t instanceof ClientException) {
throw (ClientException)t;
} else {
throw new ClientException(var6);
}
}
}
在上述代码中有一个submit()方法,进入方法内部可以看出它是
public Observable<T> submit(final ServerOperation<T> operation) {
final LoadBalancerCommand<T>.ExecutionInfoContext context = new LoadBalancerCommand.ExecutionInfoContext();
if (this.listenerInvoker != null) {
try {
this.listenerInvoker.onExecutionStart();
} catch (AbortExecutionException var6) {
return Observable.error(var6);
}
}
final int maxRetrysSame = this.retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = this.retryHandler.getMaxRetriesOnNextServer();
Observable<T> o = (this.server == null ? this.selectServer() : Observable.just(this.server)).concatMap(new Func1<Server, Observable<T>>() {
public Observable<T> call(Server server) {
context.setServer(server);
.....//代码略
在上述代码中,有一个selectServer()方法,该方法是选择服务进行负载均衡的方法,代码如下:
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
public void call(Subscriber<? super Server> next) {
try {
Server server = LoadBalancerCommand.this.loadBalancerContext.getServerFromLoadBalancer(LoadBalancerCommand.this.loadBalancerURI, LoadBalancerCommand.this.loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception var3) {
next.onError(var3);
}
}
});
}
由上面的代码可以看出,负载均衡最终是交给loadBalancerContext来处理大的,就是之前介绍的Ribbon。这个时候的架构图如下图所示: