概览

本文分以下几点:
- Eureka的高可用实现
 a. eureka客户端与服务端通讯机制
 b. eureka服务端与其他服务节点的通讯
 c. eureka服务端的自我保护机制
- Eureka与ZK的比较
 a. CAP模型
 b. 使用场景
Eureka高可用实现
Eureka的高可用通过以下三点实现:
a. eureka客户端与服务端通讯机制
b. eureka服务端与其他服务节点的通讯
c. eureka服务端的自我保护机制
eureka客户端与服务端通讯机制
可以看下客户端与Eureka服务端之间的通讯的概述流程:

同时为了更好的说明高可用的实现,在这里大概描述一下Eureka服务端的存储结构:

服务注册
- 客户端请求 - POST /eureka/v2/apps接口,将客户端的服务信息上传- PeerAwareInstanceRegistryImpl.java399行:- 1 
 2
 3
 4
 5
 6
 7
 8- int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; 
 if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
 leaseDuration = info.getLeaseInfo().getDurationInSecs();
 }
 // 本地注册
 super.register(info, leaseDuration, isReplication);
 // 协同其他eureka服务节点同步注册
 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
- Eureka服务端将客户端信息保存至 - Registry注册信息载体内
- 将此次添加变更增加到变更队列 - AbstractInstanceRegistry.java194行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26- public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { 
 try {
 // 读锁
 read.lock();
 // 存储本次注册信息(appName -> instanceId -> instanceInfo)
 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
 REGISTER.increment(isReplication);
 if (gMap == null) {
 final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
 gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
 if (gMap == null) {
 gMap = gNewMap;
 }
 }
 // ...其他逻辑
 // 将本次添加变更加入最近变更队列内
 recentlyChangedQueue.add(new RecentlyChangedItem(lease));
 // 设置租约的上一次更新时间
 registrant.setLastUpdatedTimestamp();
 // 执行缓存清空
 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
 } finally {
 // 读锁解锁
 read.unlock();
 }
 }
- 清空读写缓存(先清理APP、再清理ALL_APPS、最后清理ALL_APPS_DELTA) - ResponseCacheImpl.java251行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25- public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { 
 for (Key.KeyType type : Key.KeyType.values()) {
 for (Version v : Version.values()) {
 // 删除appName本身的缓存
 // 删除ALL_APPS的缓存
 // 删除ALL_APPS_DELTA的缓存
 invalidate(
 new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
 new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
 new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
 new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
 new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
 new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
 );
 // 存在虚拟地址名称的,删除虚拟地址为key的缓存
 if (null != vipAddress) {
 invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
 }
 // 存在安全虚拟地址名称的,删除安全虚拟地址为key的缓存
 if (null != secureVipAddress) {
 invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
 }
 }
 }
 }
ResponseCacheImpl.java 277行:
| 1 | public void invalidate(Key... keys) { | 
- 判断不存存在其他eureka服务节点,或者本次请求为同步请求则完成注册 - PeerAwareInstanceRegistryImpl.java620行:- 1 
 2
 3- if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { 
 return;
 }
- 否则,获取所有其他的服务节点,排除本身服务,将本次注册同步至其他eureka服务节点 - PeerAwareInstanceRegistryImpl.java624行:- 1 
 2
 3
 4
 5
 6
 7
 8- for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { 
 // 排除自身
 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
 continue;
 }
 // 同步其他服务节点
 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
 }
服务续约
- 客户端请求PUT /eureka/v2/apps/{appId}
- 根据appName获取租约
- 更新租约
- 同步至其他eureka服务节点
续约类似心跳机制,客户端会按照默认时间的30秒定时做一次续约,如果超过3次没有成功,则服务端会将该客户端剔除。
服务取消注册
- 客户端请求DELETE /eureka/v2/apps/{appId}
- 根据appName获取租约
- 设置租约过期时间evictionTimestamp为当前时间
- 将此次删除增加到变更队列中
- 清空读写缓存(先清理APP、再清理ALL_APPS、最后清理ALL_APPS_DELTA)
服务剔除
- Eureka服务端会在内部会初始化一个 - Timer定时器用于定时调度处理剔除任务;剔除时间间隔为- evictionIntervalTimerInMs- AbstractInstanceRegistry.java1212行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9- protected void postInit() { 
 // ....
 // 设置新的过期任务
 evictionTaskRef.set(new EvictionTask());
 // 任务调度
 evictionTimer.schedule(evictionTaskRef.get(),
 serverConfig.getEvictionIntervalTimerInMs(),
 serverConfig.getEvictionIntervalTimerInMs());
 }
- 判断是否启用自我保护,如果禁用,则不进行服务剔除 
- 判断 上一分钟实际的续约次数 <= - numberOfRenewsPerMinThreshold,则会触发自我保护机制,停止服务剔除- PeerAwareInstanceRegistryImpl.java474行:- 1 
 2
 3
 4
 5
 6- // 判断自我保护是否禁用 
 if (!isSelfPreservationModeEnabled()) {
 return true;
 }
 // 自我保护机制阈值判断
 return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
- 遍历 - Registry内所有的租约信息,判断当前租约是否过期
- 将这些过期的租约放置到一个过期列表内 - AbstractInstanceRegistry.java597行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16- List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); 
 // 遍历租约
 for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
 Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
 if (leaseMap != null) {
 for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
 Lease<InstanceInfo> lease = leaseEntry.getValue();
 // 判断租约是否过期
 // additionalLeaseMs是补偿时间,防止由于GC或者本地时间造成的一个时间误差,确保能够按照预期时间执行
 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
 // 将过期租约加入过期列表
 expiredLeases.add(lease);
 }
 }
 }
 }
- 计算可被剔除的过期实例数(过期数 = Math.min(过期列表大小, (本地租约数 - 本地租约数 * 续约百分比))) - AbstractInstanceRegistry.java612行:- 1 
 2
 3
 4
 5
 6- // 获取本地租约数 
 int registrySize = (int) getLocalRegistrySize();
 // 计算租约
 int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
 // 实际需要剔除的租约个数
 int evictionLimit = registrySize - registrySizeThreshold;
- 使用洗牌算法找出需要剔除的n个实例 - AbstractInstanceRegistry.java620行:- 1 
 2
 3
 4
 5
 6
 7
 8- // 按照剔除数遍历,每次的交换对象都是基于上次的随机结果 
 Random random = new Random(System.currentTimeMillis());
 for (int i = 0; i < toEvict; i++) {
 // 筛选出需要交换位置的索引next并与i交换位置
 int next = i + random.nextInt(expiredLeases.size() - i);
 Collections.swap(expiredLeases, i, next);
 // 剔除逻辑
 }
- 设置租约过期时间 - evictionTimestamp为当前时间
- 将此次删除增加到变更队列中
- 清空读写缓存
eureka服务端与其他服务节点的通讯
eureka服务端与其他服务节点的通讯主要包含两部分:
- eureka服务端启动时候自动拉取其他服务节点的注册信息并落入本地Registry中
- 一旦有诸如register,renew,cancel请求,则会将这些请求通过线程池自动同步至其他服务节点
启动初始化
- Servlet容器初始化,调用eureka环境初始化以及eureka上下文初始化 - EurekaBootstrap.java111行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14- public void contextInitialized(ServletContextEvent event) { 
 try {
 // 初始化环境
 initEurekaEnvironment();
 // 初始化上下文
 initEurekaServerContext();
 ServletContext sc = event.getServletContext();
 sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
 } catch (Throwable e) {
 logger.error("Cannot bootstrap eureka server :", e);
 throw new RuntimeException("Cannot bootstrap eureka server :", e);
 }
 }
- 初始化上下文的过程中会开始同步其他eureka服务节点的信息 - EurekaBootstrap.java147行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53- protected void initEurekaServerContext() throws Exception { 
 // ...其他逻辑
 ApplicationInfoManager applicationInfoManager = null;
 // 初始化eureka客户端,用于做为获取其他eureka服务信息的client
 if (eurekaClient == null) {
 EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
 ? new CloudInstanceConfig()
 : new MyDataCenterInstanceConfig();
 
 applicationInfoManager = new ApplicationInfoManager(
 instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
 
 EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
 eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
 } else {
 applicationInfoManager = eurekaClient.getApplicationInfoManager();
 }
 // 初始化其他服务实例感知的注册器
 PeerAwareInstanceRegistry registry;
 if (isAws(applicationInfoManager.getInfo())) {
 // aws 服务
 // ...
 } else {
 registry = new PeerAwareInstanceRegistryImpl(
 eurekaServerConfig,
 eurekaClient.getEurekaClientConfig(),
 serverCodecs,
 eurekaClient
 );
 }
 // 为其他eureka服务注册eurekaNode实例
 PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
 registry,
 eurekaServerConfig,
 eurekaClient.getEurekaClientConfig(),
 serverCodecs,
 applicationInfoManager
 );
 // servlet容器初始化
 // ...
 // 从其他服务拷贝注册信息
 int registryCount = registry.syncUp();
 // 等待接收请求
 registry.openForTraffic(applicationInfoManager, registryCount);
 // 其他操作
 // ...
 }
同步逻辑:
| 1 | public int syncUp() { | 
- 完成后开始接收请求
过程中同步
以注册同步为例子。
- 根据请求类型判断执行动作 - PeerAwareInstanceRegistry.java648行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20- case Cancel: 
 node.cancel(appName, id);
 break;
 case Heartbeat:
 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
 node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
 break;
 case Register:
 // 调用PeerEurekaNode的register方法执行注册动作的同步
 node.register(info);
 break;
 case StatusUpdate:
 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
 node.statusUpdate(appName, id, newStatus, infoFromRegistry);
 break;
 case DeleteStatusOverride:
 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
 node.deleteStatusOverride(appName, id, infoFromRegistry);
 break;
- 将任务提交至任务处理器 - TaskDispatcher进行- PeerEurekaNode.java135行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11- long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); 
 batchingDispatcher.process(
 taskId("register", info),
 // 同步任务信息
 new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
 public EurekaHttpResponse<Void> execute() {
 return replicationClient.register(info);
 }
 },
 expiryTime
 );
- 调用Jersey2执行REST请求 - AbstractJersey2EurekaHttpClient.java85行:- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25- public EurekaHttpResponse<Void> register(InstanceInfo info) { 
 String urlPath = "apps/" + info.getAppName();
 Response response = null;
 try {
 // 封装请求
 Builder resourceBuilder = jerseyClient.target(serviceUrl).path(urlPath).request();
 addExtraProperties(resourceBuilder);
 // 设置x-netflix-discovery-replication头信息为true,其他eureka服务节点接收后会知晓这是个同步请求
 addExtraHeaders(resourceBuilder);
 // 执行请求并获取结果
 response = resourceBuilder
 .accept(MediaType.APPLICATION_JSON)
 .acceptEncoding("gzip")
 .post(Entity.json(info));
 return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
 } finally {
 if (logger.isDebugEnabled()) {
 logger.debug("Jersey2 HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
 response == null ? "N/A" : response.getStatus());
 }
 if (response != null) {
 response.close();
 }
 }
 }
Eureka自我保护机制
Eureka是Netflix为了解决现有AWS的注册服务无法解决的一些场景而专门研发的。在设计之初,就考虑到高可用特性,防止AWS突然性的大规模断点造成服务不可用的情况设计了Eureka的自我保护机制。
Eureka的自我保护机制设定为:服务总数 每分钟续约数(60s / 客户端续约间隔) 自我保护阈值因子
举个例子说明:
如果某个应用A有100个服务实例,那么按照公式计算,它在一分钟内续约次数必须 >= 170。
如果在上一个分钟内,续约数 > 170,那么服务正常,某个实例就算失败也只会认为是客户端存在问题,会被剔除;
如果在上一个分钟内,续约数 < 170,那么Eureka就会认为是Eureka服务存在问题,会停止剔除流程,保护现有的注册信息,防止服务大规模下线。
Eureka与ZK的比较
CAP模型

Eureka是AP模型,ZK是CP模型。前者强调高可用,即使在某些情况下不同region看到的视图可能不一致;而后者强调的是强一致性,且在超过一定阈值后会造成ZK集群整体不可用
应用场景
| 场景 | Eureka | ZK | 
|---|---|---|
| 数据分发和订阅 | × | √ | 
| 异地大规模集群需求的注册中心 | √ | × | 
| 小规模单机房注册中心 | √ | √ |