| 编辑推荐: |
本文主要首先展示了Ribbon工作结构图、时序图,其次介绍了Ribbon的核心-
负载均衡器的实现原理,希望对您的学习有所帮助。
本文来自于微信公众号,由火龙果软件Alice编辑、推荐。 |
|
一:Ribbon的概述
SpringCloud Ribbon是一个基于HTTP和TCP的客户端的负载均衡工具
Ribbon和微服务是同级别的,融合到微服务的一些基础设施(如Feign),不需要独立部署
Ribbon会将微服务之间的Rest请求转为客户端的负载均衡的RPC调用
Ribbon默认的负载均衡策略是轮询,但不止轮询一种,可以自定义配置
二:Feign集成下ribbon工作结构图

Feign集成下的ribbon工作结构图
1:微服务之间通过Feign调用,最后通过LoadBalancerFeignClient发送请求
2:LoadBalancerFeignClient端从client端服务的上下文环境中找到负载均衡器,并把提取到的服务名称交给负载均衡器
3:负载均衡器提到选到server实例,将client端的请求包装成调用请求LoadBalancerCommand
4:根据封装的信息,发送远程调用到具体的服务实例
三:Ribbon工作时序图

四:Ribbon的核心——负载均衡器 LoadBalancer
4.1 负载均衡器的实现原理
从二中的结构图我们可以看到Ribbon实现负载均衡是通过ILoadBalancer接口来实现的,AbstractLoadBalancer是
ILoadBalancer 的接口抽象实现类,它有三个方法,维护不同的功能:

负载均衡器的基础实现类是 BaseLoadBalancer ,BaseLoadBalancer
的属性值如下:

BaseLoadBalancer的属性图
除了继承AbstractLoadBalancer类的维护实例和实例状态容器的方法并重写后,BaseLoadBalancer还有几个属性和功能值得去关注的

通过上面的方法我们就可以知道找到合适的实例,有了负载的规则,就能实现ribbon的负载均衡功能
4.2 负载均衡器如何维护服务实例server列表
先讲下结论:ILoadBalancer是和eruka结合通过服务发现的方式来维护server列表的
ILoadBalancer维护服务列表的属性结构图如下:

ILoadBalancer维护服务列表的属性结构图
负载均衡器扩展类DynamicServerListLoadBalancer实现服务实例清单在运行期的动态更新,在这个类里定义了服务实例操作对象ServerList
serverListImpl,ServerList接口实现了初始化服务实例和更新服务实例的功能,方法如下:
public List<T>
getInitialListOfServers();
public List<T> getUpdatedListOfServers();
|
这些服务实例的创建在服务启动后,在首次Reuqest被拦截后,初始化懒加载生成的bean,在EurekaRibbonClientConfiguration配置类中我们可以看到实例化ServerList对象的方法
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig
config,
Provider<EurekaClient> eurekaClientProvider)
{
if (this.propertiesFactory.isSet(ServerList.class,
serviceId)) {
return this.propertiesFactory.get(ServerList.class,
config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList
= new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
} |
通过上面的代码,我们就会看到创建的是 DomainExtractingServerList
的实现类实例。实例的创建是通过构造函数加载一个DiscoveryEnabledNIWSServerList
对象的实例来实现的,这个类就是服务发现的实现的源头,里面两个方法用于初始化 ServerList 和更新
ServerList ,代码如下:
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
} |
初始化和更新都会执行 #obtainServersViaDiscovery
方法,这个方法就是获取服务实例的主要实现逻辑,它通过 EurekaClient 从服务中心获取最新的实例列表,然后对这里服务实例进行遍历,对状态为UP(正常状态)下的实例转为
DiscoveryEnabledServer 列表并返回
private List<DiscoveryEnabledServer>
obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList
= new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider. get()
== null) {
logger.warn("EurekaClient has not been
initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(","))
{
// if targetRegion is null, it will be interpreted
as the same region of client
List<InstanceInfo> listOfInstanceInfo
= eurekaClient.getInstancesByVipAddress(vipAddress,
isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP))
{ if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client
name: " + clientName + " to "
+ overridePort);
} // copy is necessary since the InstanceInfo
builder just uses the original reference,
// and we don't want to corrupt the global eureka
copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii); if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort). build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort). build();
}
} DiscoveryEnabledServer des = createServer(ii,
isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers,
we dont use subsequent vipAddress based servers
}
}
}
return serverList;
} |
这样的一个过程就保证了正常的服务实例被发现,然后给负载均衡器提供一个正常状态的ServerList
4.3 负载均衡器如何更新服务实例
在上面介绍中我们发现了服务实例,但是服务的状态是可变的,如何保证我们提供的ServerList一直是正常的,这就是需要我们有个策略保证更新ServerList
在 DynamicServerListLoadBalancer 类中有一个
serverListUpdater 对象实例属性,这个就是更新的 serverList,他有两个实现类来通过不同的策略实现serverList的更新

PollingServerListUpdater 通过定时任务来拉取服务列表(负载均衡器默认以这种策略为更新方式)
我们先介绍下负载默认的更新方式 PollingServerListUpdater
,通过源码可以看到它创建了一个多线程,通过定时任务进行更新,看下runnable的start方法
public synchronized
void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable()
{
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle",
e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
} |
在这个线程的实现中会执行UpdateAction操作,来实现服务列表的更新,UpdateAction封装了更新的操作
public synchronized
void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
this.updateListener = new EurekaEventListener()
{
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
if (!updateQueued.compareAndSet(false, true))
{ // if an update is already queued
logger.info("an update action is already
queued, returning as no-op");
return;
}
if (!refreshExecutor.isShutdown()) {
try {
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
updateAction.doUpdate();
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList",
e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task
to executor, skipping one round of updates",
e);
updateQueued.set(false); // if submit fails,
need to reset updateQueued to false
}
}
else {
logger.debug("stopping EurekaNotificationServerListUpdater,
as refreshExecutor has been shut down");
stop();
}
}
}
};
if (eurekaClient == null) {
eurekaClient = eurekaClientProvider.get();
}
if (eurekaClient != null) {
eurekaClient.registerEventListener(updateListener);
} else {
logger.error("Failed to register an updateListener
to eureka client, eureka client is null");
throw new IllegalStateException("Failed
to start the updater, unable to register the
update listener due to eureka client being null.");
}
} else {
logger.info("Update listener already registered,
no-op");
}
} |
4.4 如何从ServerList中挑选一个合适的服务实例
负载均衡器挑选服务实例流程包含两个步骤
1:将返回的ServerList通过ServerListFilter过滤一次,返回满足条件的列表
2:根据Rule规则,结合LoadBalancerStats信息,挑选一个合适的服务实例
服务实例过滤器ServerListFilter
当负载均衡器拿到了ServerList,首先会通过 ServerListFilter
过滤列表,Ribbon的默认实现类是 ZoneAffinityServerListFilter ,其实现代码:
//判断是否使用区域优先
private boolean shouldEnableZoneAffinity(List<T>
filtered) {
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity
should be enabled with given server list: {}",
filtered);
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
if (((double) circuitBreakerTrippedCount) / instanceCount
>= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount)
< availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden.
blackOutServerPercentage: {}, activeReqeustsPerServer:
{}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount
/ instanceCount, loadPerServer, instanceCount
- circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
} // 过滤服务实例
@Override
public List<T> getFilteredListOfServers(List<T>
servers) {
if (zone != null && (zoneAffinity ||
zoneExclusive) && servers !=null &&
servers.size() > 0){
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers))
{
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
} |
4.5 IRule:选择服务实例的规则
IRule的定义:
public interface
IRule {
Server choose(Object var1);
void setLoadBalancer(ILoadBalancer var1);
ILoadBalancer getLoadBalancer();
} |
IRule接口的实现类定义了7种不同的算法选择服务实例,默认是以轮询的算法选择服务实例

五:结语
ribbon是springcloud非常核心的模块,理解中间的运行原理对我们的平常的问题的定位有很大的帮助,中间有很多模块没有仔细讲,比如IRule各个策略的实现,后期我们会单独拿出来补充
|