Spring Cloud OpenFeign 源码解析(三)Feign Client 远程调用、负载均衡算法

it2023-11-27  70

1. 远程调用

1.1 入口 FeignInvocationHandler

上一章分析,我们知道生成的feign client是通过jdk动态代理生成的代理对象,所以入口就是jdk动态代理的InvocationHandler: 这个factory反向跟踪可以看到是在Feign.java中初始化的:

看其create方法,具体的InvocationHandler实现:

//feign.InvocationHandlerFactory.Default.java static final class Default implements InvocationHandlerFactory { @Override public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) { //入口就是ReflectiveFeign.FeignInvocationHandler的invoke方法 //注意此时target是 HardCodedTarget.java类型,包含了接口类型、服务名、url //dispatch是当前接口所有方法对应的方法处理器 return new ReflectiveFeign.FeignInvocationHandler(target, dispatch); } }

targte是什么,看FeignClientFactoryBean:

1.2 远程调用分析

现在我们跟调用流程,看ReflectiveFeign.FeignInvocationHandler的invoke方法:

//feign.ReflectiveFeign.FeignInvocationHandler#invoke public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //本地方法equals、hashCode、toString的处理 if ("equals".equals(method.getName())) { try { Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null; return equals(otherHandler); } catch (IllegalArgumentException e) { return false; } } else if ("hashCode".equals(method.getName())) { return hashCode(); } else if ("toString".equals(method.getName())) { return toString(); } //从方法处理器map中获取当前要执行的方法的方法处理器进行调用 //上一章跟过方法处理器的创建 //如果执行的方法是接口中的抽象方法,方法处理器实现类是SynchronousMethodHandler //如果执行的方法是接口中的默认方法,方法处理器实现类是DefaultMethodHandler return dispatch.get(method).invoke(args); }

主要分析远程调用,所以跟SynchronousMethodHandler.invoke方法:

//SynchronousMethodHandler.java public Object invoke(Object[] argv) throws Throwable { //使用传递给方法调用的args创建请求模板。 RequestTemplate template = buildTemplateFromArgs.create(argv); Options options = findOptions(argv); Retryer retryer = this.retryer.clone(); while (true) { try { //执行并解码 return executeAndDecode(template, options); } catch (RetryableException e) { try { //如果允许重试,返回(可能在睡眠后)。否则传播异常。 retryer.continueOrPropagate(e); } catch (RetryableException th) { //传播异常就会退出重试 Throwable cause = th.getCause(); if (propagationPolicy == UNWRAP && cause != null) { throw cause; } else { throw th; } } if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } } //SynchronousMethodHandler.java Object executeAndDecode(RequestTemplate template, Options options) throws Throwable { //通过请求模板构建出请求,targetRequest方法主要是想从HardCodedTarget中获取url Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { //核心方法在这 //这个Client就是发起远程调用的客户端 //如果导入了Ribbon负载均衡依赖,这个client就是LoadBalancerFeignClient response = client.execute(request, options); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } //走到这里已经是获取到结果了,记录一下远程调用耗费的时间 long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); //是否需要关闭连接 boolean shouldClose = true; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); } //如果方法返回类型正好就是Response.class if (Response.class == metadata.returnType()) { if (response.body() == null) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { //如果请求体超过最大缓存大小,不能关闭连接 shouldClose = false; return response; } // Ensure the response body is disconnected // 确保响应体断开连接 // 缓存中够放了,不需要保持连接,所以新构建一个Response返回 byte[] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300) { //响应成功的处理 if (void.class == metadata.returnType()) { return null; } else { Object result = decode(response); shouldClose = closeAfterDecode; return result; } } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { //响应状态404的处理 Object result = decode(response); shouldClose = closeAfterDecode; return result; } else { //响应失败的处理 throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } }

看核心方法client.execute: 负载均衡的单独说,假设负载均衡过了,发起调用:

//feign.Client.Default.java public Response execute(Request request, Options options) throws IOException { //创建http连接,并发起连接,用的是jdk的 //convertAndSend具体就不看了,主要就是发起http连接,获取响应结果并写入到request的响应体中 HttpURLConnection connection = convertAndSend(request, options); //转换成响应对象返回 return convertResponse(connection, request); } //feign.Client.Default#convertResponse Response convertResponse(HttpURLConnection connection, Request request) throws IOException { //获取响应状态 int status = connection.getResponseCode(); //获取与服务器响应代码一起返回的HTTP响应消息(如果有的话)。 String reason = connection.getResponseMessage(); if (status < 0) { throw new IOException(format("Invalid status(%s) executing %s %s", status, connection.getRequestMethod(), connection.getURL())); } //收集响应头 Map<String, Collection<String>> headers = new LinkedHashMap<>(); for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) { // response message if (field.getKey() != null) { headers.put(field.getKey(), field.getValue()); } } //获取响应体长度 Integer length = connection.getContentLength(); if (length == -1) { length = null; } //获取响应体内容 InputStream stream; if (status >= 400) { stream = connection.getErrorStream(); } else { stream = connection.getInputStream(); } //构建响应返回 return Response.builder() .status(status) .reason(reason) .headers(headers) .request(request) .body(stream, length) .build(); }

2. 负载均衡

2.1 入口 LoadBalancerFeignClient

负载均衡会根据注册表中的地址,根据策略选出合适的主机地址,进行远程访问,入口在LoadBalancerFeignClient的execute方法:

//LoadBalancerFeignClient.java public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); //Ribbon整合就在这 //this.delegate 就是之前远程调用分析的feign.Client.Default.java //包装了一下先进行负载均衡,再进行远程调用,最终远程调用还是依靠this.delegate FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); IClientConfig requestConfig = getClientConfig(options, clientName); //lbClient:获取一个负载均衡器 //主要关注executeWithLoadBalancer return lbClient(clientName) .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } }

2.2 获取负载均衡器(子容器中获取)

先看获取负载均衡器:

//LoadBalancerFeignClient.java private FeignLoadBalancer lbClient(String clientName) { return this.lbClientFactory.create(clientName); }

追踪一下lbClientFactory是啥: 利用idea反向搜索类引用,找到这个配置类: 继续搜索看在哪初始化CachingSpringLoadBalancerFactory的: lbClientFactory是这个带有缓存、重试功能的工厂,看它的create方法:

//CachingSpringLoadBalancerFactory.java public FeignLoadBalancer create(String clientName) { //先从缓存获取 FeignLoadBalancer client = this.cache.get(clientName); if (client != null) { return client; } IClientConfig config = this.factory.getClientConfig(clientName); //这里才是创建真正的负载均衡器 ILoadBalancer lb = this.factory.getLoadBalancer(clientName); ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class); //包装成具有重试功能的负载均衡器 //同时注意ILoadBalancer是Ribbon的接口 //RetryableFeignLoadBalancer和FeignLoadBalancer都是OpenFeign的 //也可以认为是适配了一下 client = this.loadBalancedRetryFactory != null ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector, this.loadBalancedRetryFactory) : new FeignLoadBalancer(lb, config, serverIntrospector); this.cache.put(clientName, client); return client; }

真正的负载均衡器还是靠 this.factory.getLoadBalancer方法创建的,this.factory就是SpringClientFactory,追踪一下在哪初始化的:

看SpringClientFactory的getLoadBalancer方法:

//SpringClientFactory.java /** * Get the load balancer associated with the name. * @param name name to search by * @return {@link ILoadBalancer} instance * @throws RuntimeException if any error occurs */ public ILoadBalancer getLoadBalancer(String name) { //底层就是从对应的服务名的子容器中获取ILoadBalancer实例 //子容器没有的话就会从父容器获取 return getInstance(name, ILoadBalancer.class); }

与FeignContext有异曲同工之妙,自己看吧。 这种将默认配置的组件放入父容器,个性化配置的组件放入子容器,实现动态配置组件的方式,还是可以学习借鉴的。

默认情况下用的是ZoneAwareLoadBalancer:

2.3 负载均衡器执行流程分析

回到LoadBalancerFeignClient:

//LoadBalancerFeignClient.java public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); //Ribbon整合就在这 //this.delegate 就是之前远程调用分析时的feign.Client.Default.java //包装了一下先进行负载均衡,再进行远程调用,最终远程调用还是依靠this.delegate FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); IClientConfig requestConfig = getClientConfig(options, clientName); //lbClient:获取一个负载均衡器 // 返回的是一个FeignLoadBalancer,其包装了ILoadBalancer //主要关注executeWithLoadBalancer return lbClient(clientName) .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } }

现在看executeWithLoadBalancer方法,注意此时当前类已经是某一个负载均衡器的实例了:

//AbstractLoadBalancerAwareClient.java /** * This method should be used when the caller wants to dispatch the request to a server chosen by * the load balancer, instead of specifying the server in the request's URI. * 当调用者希望将请求分派到负载平衡器选择的服务器,而不是在请求的URI中指定服务器时,应该使用此方法。 * * It calculates the final URI by calling {@link #reconstructURIWithServer(com.netflix.loadbalancer.Server, java.net.URI)} * and then calls {@link #executeWithLoadBalancer(ClientRequest, com.netflix.client.config.IClientConfig)}. * 它通过调用{@link #reconstructURIWithServer(com.netflix.loadbalancer.Server, java.net.URI)}来计算最 * 终的URI,然后调用{@link #executeWithLoadBalancer(ClientRequest, com.netflix.client.config.IClientConfig)}。 * * @param request request to be dispatched to a server chosen by the load balancer. The URI can be a partial * URI which does not contain the host name or the protocol. */ public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { //command提交了一个服务器操作,当负载均衡选到了要访问的服务器后 //就会执行这个ServerOperation //所以负载均衡触发点主要在sumbit中 return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { //通过选择的Server计算最终的URI URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { //调用execute发起请求 return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }

看submit方法,这个方法看上去很复杂,因为Observable这个类实现了Reactive Pattern,响应式编程规范,我还没完全搞明白,以后搞明白了会专门讲解,所以现在我们只大概看一下这个方法,只关注我们最核心的点: 主要关注selectServer方法,选择服务器:

//LoadBalancerCommand.java /** * Return an Observable that either emits only the single requested server * or queries the load balancer for the next server on each subscription * 返回一个可观察对象,该对象要么只发出一个被请求的服务器,要么在每个订阅中查询下一个服务器的负载均衡器 */ private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { //getServerFromLoadBalancer:根据负载均衡器选择要访问的Server Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); }

根据负载均衡器选择要访问的Server: 看到最终调用真正的负载均衡器ILoadBalancer的chooseServer方法进行Server选择:

从 2.3 的分析我们知道ILoadBalancer默认就是ZoneAwareLoadBalancer:

//ZoneAwareLoadBalancer.java public Server chooseServer(Object key) { if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { //如果可用的zone小于等于一个,走这 logger.debug("Zone aware logic disabled or there is only one zone"); return super.chooseServer(key); } Server server = null; try { LoadBalancerStats lbStats = getLoadBalancerStats(); Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); if (triggeringLoad == null) { triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d); } if (triggeringBlackoutPercentage == null) { triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d); } //可用zone不止一个的话获取所有可用的zone Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { //通过随机方式在所有可用的zone中选择一个 String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { //获取对应zone的负载均衡器 BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); //通过负载均衡器选择Server server = zoneLoadBalancer.chooseServer(key); } } } catch (Exception e) { logger.error("Error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } }

无论是一个可用zone还是多个,流程都一样,我们看super.chooseServer:

//BaseLoadBalancer.java public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { //rule就是负载均衡的规则,即负载均衡策略 //rule默认是轮询算法RoundRobinRule return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }

rule默认是轮询算法:

现在我们看一下都有哪些内置的负载均衡策略:

2.4 负载均衡算法

接下来我们介绍一些常用的负载均衡算法:

关于负载均衡策略IRule接口的choose方法参数key,就是ClientRequest的loadBalancerKey,作为外部传参的,但一般用不到。

(1) RoundRobinRule

轮询策略。Ribbon 默认采用的策略。若经过一轮轮询没有找到可用的 provider,其最多轮询 10 轮。若最终还没有找到,则返回 null。

//RoundRobinRule.java public Server choose(Object key) { //key是负载均衡的外部条件,一般用不到 return choose(getLoadBalancer(), key); } public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) {//最多轮询 10 轮 //获取所有能够到达的Server,其实就是UP状态的Server! List<Server> reachableServers = lb.getReachableServers(); //获取所有Server List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } //轮询方式获取一个索引 int nextServerIndex = incrementAndGetModulo(serverCount); //从所有Server集合中获取Server server = allServers.get(nextServerIndex); //如果Server为空,就歇会儿。 if (server == null) { /* Transient. */ //让出CPU执行权 //让当前线程状态由执行态 置为 就绪态 Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { //判断是否可用、状态是否准备好了 return (server); } // Next.不可用或者没准备好,置为null,下一次循环 server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; }

lb.getReachableServers(),获取所有能够达到的Server,其实就是获取UP状态的Server!

lb.getAllServers(),获取所有Server

incrementAndGetModulo,轮询方式获取一个索引

//RoundRobinRule.java /** * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}. * 灵感来自于{@link AtomicInteger#incrementAndGet()}的实现。 * * @param modulo The modulo to bound the value of the counter. * @return The next value. */ private int incrementAndGetModulo(int modulo) { for (;;) { //nextServerCyclicCounter是AtomicInteger int current = nextServerCyclicCounter.get(); //+1取模,就是轮询取模 int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } }

(2) RandomRule

随机策略,从所有可用的 provider 中随机选择一个。

//RandomRule.java public Server choose(Object key) { //key是负载均衡的外部条件,一般用不到 return choose(getLoadBalancer(), key); } //RandomRule.java @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) {//如果当前线程被标记中断了,结束处理返回null return null; } //获取所有up状态的Server List<Server> upList = lb.getReachableServers(); //获取所有的Server List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } //获取随机索引 int index = chooseRandomInt(serverCount); //从所有up状态的Server中获取Server server = upList.get(index);//这里可能发生索引越界 //因为随机数的最大范围是到serverCount //但却从upList里获取 if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } if (server.isAlive()) {//可用返回 return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield();//休息一下 } return server; } chooseRandomInt//RandomRule.java protected int chooseRandomInt(int serverCount) { //根据当前线程随机数生成器,获取一个随机数 return ThreadLocalRandom.current().nextInt(serverCount); }

(3) RetryRule

重试策略。先按照 RoundRobinRule 策略获取 provider,若获取失败,则在指定的时限内重试。默认的时限为 500 毫秒。

//RetryRule.java public Server choose(Object key) { return choose(getLoadBalancer(), key); } public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis();//当前请求时间 long deadline = requestTime + maxRetryMillis;//截止时间,默认500毫秒 Server answer = null; answer = subRule.choose(key);//subRule就是RoundRobinRule //如果获取的主机为null、或者不可用,并且还没到截止时间 if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { //定义一个中断任务,deadline - 当前时间 = 可执行时间 //底层等时间到了会调用当前线程的interrupt方法,让其中断 InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) {//当前线程没有中断 answer = subRule.choose(key);//用轮询方式继续获取 if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield();//休息一会儿 } else { //找到或者超时了结束 break; } } task.cancel();//取消中断任务 } //如果是超时结束,还是null if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } } subRule就是RoundRobinRule

(4) BestAvailableRule

最可用策略。选择并发量最小的 provider,即连接的消费者数量最少的 provider。

//BestAvailableRule.java public Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key);//如果状态信息为空,调用super,super就是轮询 } //获取所有server List<Server> serverList = getLoadBalancer().getAllServers(); //记录最小连接并发数,先记个最大值 int minimalConcurrentConnections = Integer.MAX_VALUE; //并发数和时间有关系,所以需要当前时间 long currentTime = System.currentTimeMillis(); Server chosen = null; for (Server server: serverList) { //遍历所有Server,获取对应Server的状态信息 ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); //判断当前时间是否发生熔断 if (!serverStats.isCircuitBreakerTripped(currentTime)) { //获取当前时间的并发连接数 int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); if (concurrentConnections < minimalConcurrentConnections) { //记录最小的 minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; } } loadBalancerStats :

(5) AvailabilityFilteringRule

可用过滤算法。该算法规则是:先采用轮询方式选择一个 Server,然后判断其是否处于熔断状态,是否已经超过连接极限。若没有,则直接选择。否则再重新按照相同的方式继续选择。最多重试 11 次。

若 11 次后仍没有找到,则获取所有 Server 进行判断,挑选出所有未熔断,未超过连接极限的 Server,然后再采用轮询方式选择一个。若还没有符合条件的,则返回 null。

//AvailabilityFilteringRule.java public Server choose(Object key) { int count = 0; //先通过轮询方式选择 Server server = roundRobinRule.choose(key); while (count++ <= 10) {//11次 if (predicate.apply(new PredicateKey(server))) {//通过规则进行判断是否符合要求 return server;//符合规则返回 } server = roundRobinRule.choose(key);//不符合轮询方式再选 } return super.choose(key);//最终11次都不符合 }

通过规则进行判断是否符合要求: predicate是一个组合规则,主要关注第一个即可:

//AvailabilityPredicate.java public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } //判断是否应该跳过该Server return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { //如果开启了熔断机制,并且当前Server状态是熔断状态 //或者当前请求数量超过了限制的 //那么需要跳过,返回true if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; }

最终11次都不符合,走super.choose:

//PredicateBasedRule.java public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); //getPredicate:获取判断规则(当前类就是AvailabilityPredicate) //lb.getAllServers():获取所有Server Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } //AbstractServerPredicate.java public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { //从所有Server中获取满足规则的(没有熔断,没有超过连接极限) List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } //从符合规则的里面轮询选一个 return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); } //AbstractServerPredicate.java private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current + 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } }
最新回复(0)