• 0

  • 532

SpringCloudConsul源码分析(三)服务发现

黑猫

我不是黑客

1星期前

一、com.netflix.loadbalancer核心抽象及Consul的实现

1.1 Server

  • com.netflix.loadbalancer.Server代表Consul注册中心里的一个服务实例
  • ConsulServer继承了Server,扩展一些Consul特有的属性,比如Map<String, String> metadata、HealthService
public class ConsulServer extends Server {
    private final MetaInfo metaInfo;
    private final HealthService service;
    private final Map<String, String> metadata;
复制代码

1.2 ServerList

  • ServerList定义了两个方法,用于获取List
    • getInitialListOfServers初始化获取
    • getUpdatedListOfServers会被频繁调用,用于定时更新List
public interface ServerList<T extends Server> {
    // 初始化获取List<Server>
    public List<T> getInitialListOfServers();
    // 定时获取List<Server>
    public List<T> getUpdatedListOfServers();   
}
复制代码
  • ServerList.getUpdatedListOfServers的触发点是DynamicServerListLoadBalancer的构造方法,而构造方法的触发,依赖于对于某个服务的首次Feign或Ribbon调用,每个服务会对应一个Spring上哪下文,也就对应一个DynamicServerListLoadBalancer(服务拆分多,耦合紧密,开启的定时更新服务列表线程就多,consul的压力也就大
  public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        ...
        // 这里触发
        this.restOfInit(clientConfig);
    }
复制代码
  • 无论是首次触发,还是定时任务触发,都是走DynamicServerListLoadBalancer.updateListOfServers
  public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            // 1. ConsulServerList的获取服务列表
            servers = serverListImpl.getUpdatedListOfServers();
            if (filter != null) {
                // 2. HealthServiceServerListFilter过滤不健康的服务
                servers = filter.getFilteredListOfServers(servers);
            }
        }
        // 3. copyOnWrite机制,更新内存里的服务列表
        updateAllServerList(servers);
    }
复制代码
  • 最终都是走ConsulServerList#getServers

        /**
     * 根据serviceName查询service
     * http://localhost:8500/v1/health/service/testConsulApp?passing=false&token=
     * 查询参数包括
     * 1、tag 目标服务必须包含的tag 仅支持一个kv对
     * 2、queryPassing 是否过滤非健康状态的服务(默认是否,通过HealthServiceServerListFilter过滤非健康的服务)
     * 3、datacenter 数据中心(Consul支持跨数据中心的服务发现)
     * 4、token 客户端跟Consul交互的凭证
     * @return
     */
    private List<ConsulServer> getServers() {
        if (this.client == null) {
            return Collections.emptyList();
        }
        HealthServicesRequest request = HealthServicesRequest.newBuilder()
                .setTag(getTag()).setPassing(this.properties.isQueryPassing())
                .setQueryParams(createQueryParamsForClientRequest())
                .setToken(this.properties.getAclToken()).build();
        Response<List<HealthService>> response = this.client
                .getHealthServices(this.serviceId, request);
        if (response.getValue() == null || response.getValue().isEmpty()) {
            return Collections.emptyList();
        }
        return transformResponse(response.getValue());
    }
    
    复制代码
  • 定时更新服务列表的时间间隔由ribbon.ServerListRefreshInterval控制,默认30s

1.3 ServerListFilter

  • ServerListFilter接口用于过滤服务实例列表
  public interface ServerListFilter<T extends Server> {
    public List<T> getFilteredListOfServers(List<T> servers);
    }
复制代码
  • Consul的实现是HealthServiceServerListFilter
  public class HealthServiceServerListFilter implements ServerListFilter<Server> {
    @Override
    public List<Server> getFilteredListOfServers(List<Server> servers) {
        List<Server> filtered = new ArrayList<>();
        for (Server server : servers) {
            // 如果是ConsulServer对象,取ConsulerServer的健康状态过滤
            if (server instanceof ConsulServer) {
                ConsulServer consulServer = (ConsulServer) server;
                if (consulServer.isPassingChecks()) {
                    filtered.add(server);
                }
            } else { // 如果非ConsulServer对象,不过滤
                filtered.add(server);
            }
        }
        return filtered;
    }

}
复制代码
  • 具体过滤逻辑,是取com.ecwid.consul.v1.health.model.HealthService#checks列表,如果列表里有一项检查没通过,则过滤
  public boolean isPassingChecks() {
        for (Check check : this.service.getChecks()) {
            if (check.getStatus() != Check.CheckStatus.PASSING) {
                return false;
            }
        }
        return true;
  }
复制代码

1.4 IPing

  • IPing接口负责检测Server实例是否能Ping通
  public interface IPing {
    public boolean isAlive(Server server);
  }
复制代码
  • Consul的实现是ConsulPing,不实际PingServer,只是看内存中的Server是否Check全是健康状态
  public class ConsulPing implements IPing {
    @Override
    public boolean isAlive(Server server) {
        boolean isAlive = true;
        if (server != null && server instanceof ConsulServer) {
            ConsulServer consulServer = (ConsulServer) server;
            return consulServer.isPassingChecks();
        }
        return isAlive;
    }
}
复制代码

二、org.springframework.cloud.client.discovery核心抽象及Consul的实现

2.1 DiscoveryClient

  • org.springframework.cloud.client.discovery.DiscoveryClient接口主要是两个方法(部分不重要的省略了)
    • getInstances:根据serviceId查询所有服务实例
    • getServices:查询所有serviceId
public interface DiscoveryClient extends Ordered {
    List<ServiceInstance> getInstances(String serviceId);

    List<String> getServices();
}
复制代码
  • Consul对于DiscoveryClient的实现是ConsulDiscoveryClient,这两个方法也只是调用consul接口
    public List<ServiceInstance> getInstances(final String serviceId) {
        return getInstances(serviceId,
                new QueryParams(this.properties.getConsistencyMode()));
    }
    复制代码
    • http://localhost:8500/v1/catalog/services?wait=2s&index=942&token=
    @Override
    public List<String> getServices() {
        String aclToken = this.properties.getAclToken();
        CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
                .setQueryParams(QueryParams.DEFAULT)
                .setToken(this.properties.getAclToken()).build();
        return new ArrayList<>(
                this.client.getCatalogServices(request).getValue().keySet());
    }
    复制代码

三、org.springframework.cloud.netflix.ribbon核心抽象及Consul实现

3.1 ServerIntrospector

  • ServerIntrospector
    • isSecure:判断Server是否是安全的,DefaultServerIntrospector默认实现是取ribbon.securePorts,如果securePorts列表包含server的port则安全
    • getMetadata:获取Server的元数据,DefaultServerIntrospector默认实现是返回空Map
public interface ServerIntrospector {
    boolean isSecure(Server server);

    Map<String, String> getMetadata(Server server);
}
复制代码
  • ConsulServerIntrospector继承DefaultServerIntrospector
    • getMetadata:取ConsulServer的metadata属性
    • isSecure:取ConsulServer的metadata属性。secure对应的value是否为true
public class ConsulServerIntrospector extends DefaultServerIntrospector {
    /**
     * 1、优先ConsulServer的metadata的secure字段,判断是否安全
     * 2、否则走父类,通过Server的端口判断,如果端口是443或8443则是安全的
     */
    @Override
    public boolean isSecure(Server server) {
        Map<String, String> metadata = getMetadata(server);
        if (metadata != null && metadata.containsKey("secure")) {
            return metadata.getOrDefault("secure", "false").equalsIgnoreCase("true");
        }
        return super.isSecure(server);
    }

    /**
     * MetaData的来源支持ConsulServer自己的MetaData
     */
    @Override
    public Map<String, String> getMetadata(Server server) {
        if (server instanceof ConsulServer) {
            ConsulServer consulServer = (ConsulServer) server;
            return consulServer.getMetadata();
        }
        return super.getMetadata(server);
    }

}
复制代码

四、spring-cloud-consul-discovery特有

4.1 ConsulCatalogWatch

  • ConsulCatalogWatch:启动一个定时任务,每秒更新consulIndex,并且发送HeartbeatEvent事件
  • 感觉没啥用,通过spring.cloud.consul.discovery.catalog-services-watch.enabled=false可以关闭
  • ConsulCatalogWatch#catalogServicesWatch执行定时任务的核心逻辑
public void catalogServicesWatch() {
        long index = -1;
        if (this.catalogServicesIndex.get() != null) {
            index = this.catalogServicesIndex.get().longValue();
        }
        // 1. 获取所有service信息,只包含了serviceName,http://127.0.0.1:8500/v1/catalog/services?wait=2s&index=812407&token=
        CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
                .setQueryParams(new QueryParams(
                        this.properties.getCatalogServicesWatchTimeout(), index))
                .setToken(this.properties.getAclToken()).build();
        Response<Map<String, List<String>>> response = this.consul
                .getCatalogServices(request);
        // 2. 更新consulIndex
        Long consulIndex = response.getConsulIndex();
        if (consulIndex != null) {
            this.catalogServicesIndex.set(BigInteger.valueOf(consulIndex));
        }
        // 3. 发送HeartbeatEvent事件
        this.publisher.publishEvent(new HeartbeatEvent(this, consulIndex));
    }
复制代码

五、总结

  • 几个核心抽象及consul实现
  • 每个被调用的服务名对应一个SpringIOC容器,每个容器包含一个ILoadBalancer,会启动一个线程更新BaseLoadBalancer里的List upServerList,默认30s更新一次
  • ConsulCatalogWatch看起来没啥用,考虑关闭spring.cloud.consul.discovery.catalog-services-watch.enabled=false
免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

信息安全

532

相关文章推荐

未登录头像

暂无评论