您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center 汽车系统工程   模型库  
会员   
   
OCSMP认证课程:OCSMP-MU
4月9-10日 线上
基于模型的数据治理与数据中台
5月19-20日 北京+线上
网络安全原理与实践
5月21-22日 北京+线上
     
   
 订阅
  捐助
SpringCloud:Ribbon设计原理解析
 
作者:艾新建
  5729  次浏览      48
 2020-7-28
   
 
编辑推荐:
本文主要首先展示了Ribbon工作结构图、时序图,其次介绍了Ribbon的核心- 负载均衡器的实现原理,希望对您的学习有所帮助。
本文来自于微信公众号,由火龙果软件Alice编辑、推荐。

一:Ribbon的概述

SpringCloud Ribbon是一个基于HTTP和TCP的客户端的负载均衡工具

Ribbon和微服务是同级别的,融合到微服务的一些基础设施(如Feign),不需要独立部署

Ribbon会将微服务之间的Rest请求转为客户端的负载均衡的RPC调用

Ribbon默认的负载均衡策略是轮询,但不止轮询一种,可以自定义配置

二:Feign集成下ribbon工作结构图

Feign集成下的ribbon工作结构图

1:微服务之间通过Feign调用,最后通过LoadBalancerFeignClient发送请求

2:LoadBalancerFeignClient端从client端服务的上下文环境中找到负载均衡器,并把提取到的服务名称交给负载均衡器

3:负载均衡器提到选到server实例,将client端的请求包装成调用请求LoadBalancerCommand

4:根据封装的信息,发送远程调用到具体的服务实例

三:Ribbon工作时序图

四:Ribbon的核心——负载均衡器 LoadBalancer

4.1 负载均衡器的实现原理

从二中的结构图我们可以看到Ribbon实现负载均衡是通过ILoadBalancer接口来实现的,AbstractLoadBalancer是 ILoadBalancer 的接口抽象实现类,它有三个方法,维护不同的功能:

负载均衡器的基础实现类是 BaseLoadBalancer ,BaseLoadBalancer 的属性值如下:

BaseLoadBalancer的属性图

除了继承AbstractLoadBalancer类的维护实例和实例状态容器的方法并重写后,BaseLoadBalancer还有几个属性和功能值得去关注的

通过上面的方法我们就可以知道找到合适的实例,有了负载的规则,就能实现ribbon的负载均衡功能

4.2 负载均衡器如何维护服务实例server列表

先讲下结论:ILoadBalancer是和eruka结合通过服务发现的方式来维护server列表的

ILoadBalancer维护服务列表的属性结构图如下:

ILoadBalancer维护服务列表的属性结构图

负载均衡器扩展类DynamicServerListLoadBalancer实现服务实例清单在运行期的动态更新,在这个类里定义了服务实例操作对象ServerList

serverListImpl,ServerList接口实现了初始化服务实例和更新服务实例的功能,方法如下:

public List<T> getInitialListOfServers();
public List<T> getUpdatedListOfServers();

这些服务实例的创建在服务启动后,在首次Reuqest被拦截后,初始化懒加载生成的bean,在EurekaRibbonClientConfiguration配置类中我们可以看到实例化ServerList对象的方法

@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config,
Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}

通过上面的代码,我们就会看到创建的是 DomainExtractingServerList 的实现类实例。实例的创建是通过构造函数加载一个DiscoveryEnabledNIWSServerList 对象的实例来实现的,这个类就是服务发现的实现的源头,里面两个方法用于初始化 ServerList 和更新 ServerList ,代码如下:

@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}

初始化和更新都会执行 #obtainServersViaDiscovery 方法,这个方法就是获取服务实例的主要实现逻辑,它通过 EurekaClient 从服务中心获取最新的实例列表,然后对这里服务实例进行遍历,对状态为UP(正常状态)下的实例转为 DiscoveryEnabledServer 列表并返回

private List<DiscoveryEnabledServer>
obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

if (eurekaClientProvider == null || eurekaClientProvider.
get() == null) {
logger.warn("EurekaClient has not been initialized yet,
returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}

EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as
the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: "
+ clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder
just uses the original reference,
// and we don't want to corrupt the global eureka
copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).
build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).
build();
}
}
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}

这样的一个过程就保证了正常的服务实例被发现,然后给负载均衡器提供一个正常状态的ServerList

4.3 负载均衡器如何更新服务实例

在上面介绍中我们发现了服务实例,但是服务的状态是可变的,如何保证我们提供的ServerList一直是正常的,这就是需要我们有个策略保证更新ServerList

在 DynamicServerListLoadBalancer 类中有一个 serverListUpdater 对象实例属性,这个就是更新的 serverList,他有两个实现类来通过不同的策略实现serverList的更新

PollingServerListUpdater 通过定时任务来拉取服务列表(负载均衡器默认以这种策略为更新方式)

我们先介绍下负载默认的更新方式 PollingServerListUpdater ,通过源码可以看到它创建了一个多线程,通过定时任务进行更新,看下runnable的start方法

public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}

在这个线程的实现中会执行UpdateAction操作,来实现服务列表的更新,UpdateAction封装了更新的操作

public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
this.updateListener = new EurekaEventListener() {
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued
logger.info("an update action is already queued, returning as no-op");
return;
}
if (!refreshExecutor.isShutdown()) {
try {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
updateQueued.set(false); // if submit fails, need to reset updateQueued to false
}
}
else {
logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
stop();
}
}
}
};
if (eurekaClient == null) {
eurekaClient = eurekaClientProvider.get();
}
if (eurekaClient != null) {
eurekaClient.registerEventListener(updateListener);
} else {
logger.error("Failed to register an updateListener to eureka client, eureka client is null");
throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
}
} else {
logger.info("Update listener already registered, no-op");
}
}

4.4 如何从ServerList中挑选一个合适的服务实例

负载均衡器挑选服务实例流程包含两个步骤

1:将返回的ServerList通过ServerListFilter过滤一次,返回满足条件的列表

2:根据Rule规则,结合LoadBalancerStats信息,挑选一个合适的服务实例

服务实例过滤器ServerListFilter

当负载均衡器拿到了ServerList,首先会通过 ServerListFilter 过滤列表,Ribbon的默认实现类是 ZoneAffinityServerListFilter ,其实现代码:

//判断是否使用区域优先
private boolean shouldEnableZoneAffinity(List<T> filtered) {
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
}
// 过滤服务实例
@Override
public List<T> getFilteredListOfServers(List<T> servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}

4.5 IRule:选择服务实例的规则

IRule的定义:

public interface IRule {
Server choose(Object var1);
void setLoadBalancer(ILoadBalancer var1);
ILoadBalancer getLoadBalancer();
}

IRule接口的实现类定义了7种不同的算法选择服务实例,默认是以轮询的算法选择服务实例

五:结语

ribbon是springcloud非常核心的模块,理解中间的运行原理对我们的平常的问题的定位有很大的帮助,中间有很多模块没有仔细讲,比如IRule各个策略的实现,后期我们会单独拿出来补充

 

 
   
5729 次浏览       48
相关文章

企业架构、TOGAF与ArchiMate概览
架构师之路-如何做好业务建模?
大型网站电商网站架构案例和技术架构的示例
完整的Archimate视点指南(包括示例)
相关文档

数据中台技术架构方法论与实践
适用ArchiMate、EA 和 iSpace进行企业架构建模
Zachman企业架构框架简介
企业架构让SOA落地
相关课程

云平台与微服务架构设计
中台战略、中台建设与数字商业
亿级用户高并发、高可用系统架构
高可用分布式架构设计与实践
最新活动计划
嵌入式软件架构设计 12-11[北京]
LLM大模型与智能体开发实战 12-18[北京]
嵌入式软件测试 12-25[北京]
AI原生应用的微服务架构 1-9[北京]
AI大模型编写高质量代码 1-14[北京]
需求分析与管理 1-22[北京]
 
最新文章
大数据平台下的数据治理
如何设计实时数据平台(技术篇)
大数据资产管理总体框架概述
Kafka架构和原理
ELK多种架构及优劣
最新课程
大数据平台搭建与高性能计算
大数据平台架构与应用实战
大数据系统运维
大数据分析与管理
Python及数据分析
更多...   
成功案例
某通信设备企业 Python数据分析与挖掘
某银行 人工智能+Python+大数据
北京 Python及数据分析
神龙汽车 大数据技术平台-Hadoop
中国电信 大数据时代与现代企业的数据化运营实践
更多...