Ribbon(六) Ribbon源码分析 - IRULE负载均衡的实现

it2024-04-06  51

   具体负载均衡服务还是由IRULE实现,在ribbon的使用案例中我们可以注意到切换具体的路由实现非常简单:

@RibbonClient(name = "MICROSERVICE-PROVIDER-PRODUCT", configuration = RibbonConfig.class)

通过 @RibbonClient指定负载均衡的实现类 RibbonConfig后,在这个类中创建具体实现算法即可. 

public class RibbonConfig { @Bean public IRule ribbonRule() { // 其中IRule就是所有规则的标准 return new com.netflix.loadbalancer.RandomRule(); // 随机的访问策略 } }

那么如果不是自已像上面一样配置  RibbonConfig作为负载均衡的实现类的话,ribbon会自动提供一个默认实现。 

让我们追踪一下  RibbonClientConfiguration 这个类,它位于包  org.springframework.cloud.netflix.ribbon 下, 它是Ribbon客户端的默认配置类。它里面加载了一系列的Ribbon所需的对象.其中就包括 IRule. 

@Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; }

从以上代码可以看出,加载的默认IRule实现是  ZoneAvoidanceRule, 让我们先来看一下IRule有哪些常用实现. 

BestAvailableRule :会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务RoundRobinRule:默认算法,轮询 ClientConfigEnabledRoundRobinRule: 使用RoundRobinRule选择服务器 RetryRule: 先按照RoundRobinRule的策略获取服务,如果获取服务失败则在指定时间内会进行重试,获取可用的服务WeightedResponseTimeRule: 根据平均响应时间计算所有服务的权重,响应时间越快服务权重越大被选中的概率越高。刚启动时如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,会切换到WeightedResponseTimeRule ZoneAvoidanceRule: 默认规则,复合判断server所在区域的性能和server的可用性选择服务器

 

2.   ZoneAvoidanceRule 的具体实现

ZoneAvoidanceRule是默认的IRule实例,他使用PredicateBasedRule来根据服务区的运行状况和服务器的可用性来选择服务器 它的父类是com.netflix.loadbalancer.PredicateBasedRule 它的choose()方法具体依次做了以下工作: 

先使用ILoadBalancer 获取服务器列表使用AbstractServerPredicate进行服务器过滤最后轮询从剩余的服务器列表中选择最终的服务器 public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule { public abstract AbstractServerPredicate getPredicate(); @Override public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); //轮询 Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } }

追踪   chooseRoundRobinAfterFiltering 方法,

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { //根据loadBalancerKey获取可用服务器列表 List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } // 这里面的 incrementAndGetModulo 是轮询的具体实现 return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); }

incrementAndGetModulo方法的源码,它是轮询的具体实现:  至此,可以得出ZoneAvoidanceRule也是使用的轮询算法的结论.

// modulo 可用服务器列表数 private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current + 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } }

 

再回到上面的代码中  getPredicate()是一个抽象方法,具体在 ZoneAvoidanceRule中实现,请看下面的实现代码:

@Override public AbstractServerPredicate getPredicate() { return compositePredicate; //请注意这里的 compositePredicate 是在下面的构造方法中初始化的. } public ZoneAvoidanceRule() { super(); //判断一个服务器的运行状况是否可用,去除不可用服务器的所有服务器 ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this); //用于过滤连接数过多的服务器 AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); } //将两个Predicate组合成一个CompositePredicate private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) { return CompositePredicate.withPredicates(p1, p2) .addFallbackPredicate(p2) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); }

接着再看另一个方法  chooseRoundRobinAfterFiltering, 它是过滤的方法,然后AvailabilityPredicate 里面并没有这方法,他直接继承了他的父类com.netflix.loadbalancer.AbstractServerPredicate#chooseRoundRobinAfterFiltering(java.util.List<com.netflix.loadbalancer.Server>)

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { //过滤服务器列表 List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } //(i+1)%n 轮询选择一个服务实例 return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); }

查看一下  getEligibleServers()

public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { //如果前面loadBalancerKey直接传入的null, 方法getEligibleServers会使用serverOnlyPredicate来依次过滤 if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List<Server> results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } }

追踪 getServerOnlyPredicate() , 会发现它返回的  serverOnlyPredicate 的创建代码如下:

//serverOnlyPredicate 则会调用apply方法,并将Server 对象分装PredicateKey当作参数传入 private final Predicate<Server> serverOnlyPredicate = new Predicate<Server>() { @Override public boolean apply(@Nullable Server input) { return AbstractServerPredicate.this.apply(new PredicateKey(input)); } };

AbstractServerPredicate并没有实现apply方法,具体的实现又回到了子类CompositePredicate的apply方法,它会依次调用ZoneAvoidancePredicate(它以查看服务区是否可用为主)与AvailabilityPredicate(它看的是联接数是否达到可用)的apply方法.  两个类的算法实现不同. 

 那么首先看 ZoneAvoidancePredicate 的apply实现如下: 

@Override public boolean apply(@Nullable PredicateKey input) { if (!ENABLED.get()) { return true; } String serverZone = input.getServer().getZone(); if (serverZone == null) { //如果服务器没有zone的相关信息,直接返回 return true; } LoadBalancerStats lbStats = getLBStats(); //LoadBalancerStats 存储每个服务器节点的执行特征和运行记录,这些信息可供动态负载均衡使用 if (lbStats == null) { //如果没有服务器的运行状态的记录,直接返回 return true; } if (lbStats.getAvailableZones().size() <= 1) { //如果只有就一个服务器,直接返回 return true; } //PredicateKey 封装了Server的信息,判断下服务器区的记录是否用当前区的信息 Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); //如果没有直接返回 if (!zoneSnapshot.keySet().contains(serverZone)) { // The server zone is unknown to the load balancer, do not filter it out return true; } logger.debug("Zone snapshots: {}", zoneSnapshot); // 最重要的方法: 前面都是一些基本信息的判断 获取可用的服务器列表 Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); //判断当前服务器是否在可用的服务器列表中 if (availableZones != null) { return availableZones.contains(input.getServer().getZone()); } else { return false; } }

在上面代码中大量出现的ZoneSnapshot代码如下: 它用于封装zone快照信息

public class ZoneSnapshot { //实例数 final int instanceCount; //平均负载 final double loadPerServer; //断路器端口数量 final int circuitTrippedCount; //活动请求数量 final int activeRequestsCount; }

  在这个代码:  Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());中的getAvailableZones是用来筛选服务区列表的. 

public static Set<String> getAvailableZones( Map<String, ZoneSnapshot> snapshot, double triggeringLoad, double triggeringBlackoutPercentage) { if (snapshot.isEmpty()) { //服务区列表是空的,则返回空 return null; } //只有一个区,则直接返回此 availableZones Set<String> availableZones = new HashSet<String>(snapshot.keySet()); if (availableZones.size() == 1) { return availableZones; } Set<String> worstZones = new HashSet<String>(); double maxLoadPerServer = 0; boolean limitedZoneAvailability = false; //遍历所有的服务区 for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) { String zone = zoneEntry.getKey(); ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); //获取服务区中的服务实例数 int instanceCount = zoneSnapshot.getInstanceCount(); if (instanceCount == 0) { //如果服务区中没有服务实例,那么移除该服务区 availableZones.remove(zone); limitedZoneAvailability = true; } else { //获取此服务区中的平均负载 double loadPerServer = zoneSnapshot.getLoadPerServer(); //服务区的实例平均负载小于0,或者实例故障率(断路器端口次数/实例数)大于等于阈值(默认0.99999),则去掉该服务区 if (((double) zoneSnapshot.getCircuitTrippedCount()) / instanceCount >= triggeringBlackoutPercentage || loadPerServer < 0) { availableZones.remove(zone); limitedZoneAvailability = true; } else { //如果该服务区的平均负载和最大负载的差小于一定的数量,则将该服务区加入到最坏服务区集合 if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) { worstZones.add(zone); } else if (loadPerServer > maxLoadPerServer) { //如果该zone的平均负载还大于最大负载,也将此服务区加入到最坏服务区集合中 maxLoadPerServer = loadPerServer; worstZones.clear(); worstZones.add(zone); } } } } //如果最大的平均负载小于设定的阈值 , 则此服务区可以用,直接返回 if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) { return availableZones; } //否则,从最坏的服务区集合服务器集合里面随机挑选一个(没办法啊 ) String zoneToAvoid = randomChooseZone(snapshot, worstZones); if (zoneToAvoid != null) { availableZones.remove(zoneToAvoid); } return availableZones; }

下面再谈谈AvailabilityPredicate(它看的是联接数是否达到可用) 的apply方法:

@Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } //获得关于该服务器的记录 return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { //如果该服务器的断路器已经打开,或者他的连接数大于设定的阈值,那么就需要将服务区过滤掉 if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; }

至此一个  IRule的实现类  ZoneAvoidanceRule的分析到此结束. 其它的实现方案可以自己去查看源码了. 

小结: ZoneAvoidanceRule类从名字上看与RoundRobin没有什么关系,但仔细查看源码,还是可以看出它使用的就是轮询方案. 

最新回复(0)