前言
全链路灰度是一个很基础架构的项目,在我看来,能独立完成全链路灰度的方案设计和代码编写,那基础架构算是入门了。
全链路灰度的实现有很多方案,比如大火的 service mesh,通过修改容器的iptable来实现流量的拦截和转发,可以说这是终极的解决方案,真正实现代码 0 侵入。
这种方式优点明显缺点也明显,最关键的就是复杂度和性能。
当然还有其他的实现方式,比如在入口进行流量染色,在调用时根据标识进行路由。
本文讲的就是这种。
方案设计
所有的方案无外乎两点,流量染色和流量路由。
从接入层开始大致有以下框架。
nginx, gateway, restTemplate, dubbo, kafka(此处仅写kafka,其他消息中间件同理)。
前置
版本
灰度的本质是,不同的流量标识指向不同的集群。如何定义这个《不同》?
那就需要通过版本号,当然版本号可以通过多种方式指定,比如通过环境变量指定,但是我推荐的是通过 pom.xml 文件指定,所见即所得,而且可以通过 git 进行管理。版本应该是随代码走的。
版本号管理其实又是一件麻烦的事情。如多迭代并行等问题。这里不展开讨论,结论就是我们需要一个版本号,这个版本号在 pom.xml 里指定。可以参考以下参考文章。
参考文章:Add Build Properties to a Spring Boot Application
全局配置
这个时候我们就要思考,我们如何一声令下告诉所有服务开启了灰度,灰度路由的版本号。
所以我们需要一份全局配置。
这份全局配置我推荐放到 nacos 配置中心上,由 nacos 来做变更的推送。
配置如下
1 2 3 4 5 6 7 8 9
| status: true defaultValue: blue list: app1: blue: 1.0.0 green: 1.0.1 app2: blue: 2.0.0 green: 2.0.1
|
通过以下代码来进行监听配置变更。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @PostConstruct public void init() throws NacosException { Properties properties = new Properties(); String namespace = nacosDiscoveryProperties.getNamespace(); properties.put(PropertyKeyConst.NAMESPACE, namespace); properties.put(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr()); ConfigService configService = NacosFactory.createConfigService(properties); configInfoLocal = configService.getConfigAndSignListener(NACOS_DATA_ID, nacosDiscoveryProperties.getGroup(), 10000, new AbstractListener() { @Override public void receiveConfigInfo(String configInfo) { log.info("刷新灰度配置: \n" + configInfo); configInfoLocal = configInfo; load(); } }); load(); log.info("NacosListener: \n" + configInfoLocal); }
|
具体节点
浏览器
http 请求我们一般通过 header 来传递流量标识,例如 x-gray-tag 就是我们的标识,
我们给请求 增加这个header x-gray-tag=blue
,那么这个请求就会打到 blue 集群
x-gray-tag=green
,那么这个请求就会打到 green 集群。
当我们使用浏览器时,我们可以使用一些插件(如:requestly),方便的给我们的请求增加标识,也就是实现我们说的流量染色。
nginx
nginx 做流量染色的做法是,例如给 a.com 和 b.com 两个不同的域名增加不同的header, 这样实现流量染色。
add_header x-gray-tag 'blue'
但是在nginx做流量路由我认为是比较麻烦的,尤其是当需要进行切换时。所以这里无论什么流量,我们都指向下游的 gateway,在 gateway 做流量的路由。
gateway,feign,resttemplate
到 gateway 实际上就进入了我们的微服务体系,进入了我们熟悉的环境。
其实无论是 gateway、 feign 还是 resttemplate 对应的都是 http 流量,原理都是一样的。
透传都是通过拦截器, 在调用时把灰度标识写入 header,在接受到时写入 线程变量
1 2 3 4 5 6 7 8 9 10
| public class FeignGrayRequestInterceptor implements RequestInterceptor { @Override public void apply(RequestTemplate requestTemplate) { String grayFlag = GrayContext.get(); String traceFlag = TraceContext.get(); if (grayFlag != null) { requestTemplate.header(GrayConstant.GRAY_HEADER, grayFlag); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class GrayFlagInterceptor implements HandlerInterceptor {
@Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String grayFlag = request.getHeader(GrayConstant.GRAY_HEADER); if (grayFlag != null) { GrayContext.set(grayFlag); } return true; }
@Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { GrayContext.clear(); }
@Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception { GrayContext.clear(); }
}
|
路由都是实现 ReactorServiceInstanceLoadBalancer 接口,核心都是 Mono<Response<ServiceInstance>> choose(Request request)
方法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public Mono<Response<ServiceInstance>> choose(Request request) { GrayProperties grayProperties = grayConfig.getGrayConfig(); DefaultRequest defaultRequest = (DefaultRequest) request; RequestDataContext requestDataContext = (RequestDataContext) defaultRequest.getContext(); String serviceId = requestDataContext.getClientRequest().getUrl().getHost(); log.debug(String.format("GrayLoadBalancer Choose %s", serviceId)); List<ServiceInstance> instances = nacosDiscoveryClient.getInstances(serviceId); Response<ServiceInstance> instanceResponse; if (Boolean.TRUE.equals(grayProperties.getStatus())) { if (grayProperties.getList().containsKey(serviceId)) { instanceResponse = getInstanceResponse(serviceId, instances); return Mono.just(instanceResponse); } } if (instances.isEmpty()) { instanceResponse = new EmptyResponse(); } else if (instances.size() == 1) { instanceResponse = new DefaultResponse(instances.get(0)); } else { int pos = this.position.incrementAndGet() & Integer.MAX_VALUE; ServiceInstance instance = instances.get(pos % instances.size()); instanceResponse = new DefaultResponse(instance); } return Mono.just(instanceResponse); }
|
dubbo
dubbo同样也是两件事,灰度标识的透传和路由,dubbo的扩展主要是通过它的spi机制实现的。
透传
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Activate(group = {CommonConstants.CONSUMER}) @Slf4j public class DubboGrayConsumerFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { RpcContext rpcContext = RpcContext.getContext(); String grayFlag = null; String traceFlag = null; if (rpcContext.isConsumerSide()) { try { BuildProperties buildProperties = SpringUtil.getBean(BuildProperties.class); rpcContext.setAttachment(BaseConstant.DUBBO_CONSUMER_SERVICE_KEY, buildProperties.getName()); } catch (Exception e) { log.error("DubboGrayConsumerFilter ",e); } grayFlag = GrayContext.get(); if (grayFlag != null) { rpcContext.setAttachment(GrayConstant.GRAY_HEADER, grayFlag); } traceFlag = TraceContext.get(); if (traceFlag != null) { rpcContext.setAttachment(GrayConstant.TRACE_HEADER, traceFlag); if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(invoker.getUrl())) { log.info("TraceContext Injvm call {}", JSONUtil.toJsonStr(invoker.getUrl().getParameters())); } else { String ipPort = invoker.getUrl().getHost() + ":" + invoker.getUrl().getPort(); String service = invoker.getUrl().getParameter(BaseConstant.DUBBO_SERVICE_KEY); String version = invoker.getUrl().getParameter(BaseConstant.DUBBO_VERSION_KEY); log.info("TraceContext dubbo trace: {} 准备调用下游服务 service:{} version: {} ip:{} 灰度标识 {}", traceFlag, service, version, ipPort, grayFlag); if (service == null) { log.info("TraceContext 下游服务没有版本标识: {}", JSONUtil.toJsonStr(invoker.getUrl().getParameters())); } } } } return invoker.invoke(invocation); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Activate(group = {CommonConstants.PROVIDER}) @Slf4j public class DubboGrayProviderFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { Map<String, String> attachments = invocation.getAttachments(); String grayFlag = attachments.get(GrayConstant.GRAY_HEADER); String traceFlag = attachments.get(GrayConstant.TRACE_HEADER); Result invoke; try { if (grayFlag != null) { GrayContext.set(grayFlag); } if (traceFlag != null) { TraceContext.set(traceFlag); String consumer = attachments.get(BaseConstant.DUBBO_CONSUMER_SERVICE_KEY); log.info("TraceContext dubbo trace: {}, 上游: {}, 灰度标识: {}", traceFlag, consumer, grayFlag); } invoke = invoker.invoke(invocation); } finally {
if (isInjvm(invoker.getUrl())) { if (traceFlag != null) { log.info("TraceContext Injvm call 不清空线程变量 {}", JSONUtil.toJsonStr(invoker.getUrl().getParameters())); } } else { GrayContext.clear(); TraceContext.clear(); } } return invoke; }
public static boolean isInjvm(URL url) { if (url.getProtocol().equals(LOCAL_PROTOCOL)) { return true; } return false; } }
|
路由
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| @Slf4j public class GrayRouter extends AbstractRouter { private static final int TAG_ROUTER_DEFAULT_PRIORITY = 150;
public GrayRouter(URL url) { this.url = url; this.priority = TAG_ROUTER_DEFAULT_PRIORITY; }
@Override public URL getUrl() { return url; }
@Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return invokers; } return filterTag(invokers, url, invocation); }
private <T> List<Invoker<T>> filterTag(List<Invoker<T>> invokers, URL url, Invocation invocation) { List<Invoker<T>> result = invokers; GrayConfig grayConfig = SpringUtil.getBean(GrayConfig.class); GrayProperties grayProperties = grayConfig.getGrayConfig(); if (Boolean.TRUE.equals(grayProperties.getStatus())) { String serviceName = invokers.stream().findFirst().map(invoker -> invoker.getUrl().getParameter(BaseConstant.DUBBO_SERVICE_KEY)).orElse("").trim(); if (grayProperties.getList().containsKey(serviceName)) { String version = grayProperties.getVersion(serviceName); invokers = invokers.stream().filter(invoker -> invoker.getUrl().getParameter(BaseConstant.DUBBO_VERSION_KEY).equals(version)).collect(Collectors.toList()); } result = invokers; } return result; }
@Override public <T> void notify(List<Invoker<T>> invokers) { super.notify(invokers); }
@Override public int compareTo(Router o) { return super.compareTo(o); } }
@Activate(order = 100) public class GrayRouterFactory implements RouterFactory {
@Override public Router getRouter(URL url) { return new GrayRouter(url); } }
|
kafka
kafka 逻辑类似,但是 kafka 是消息订阅的模式,不是直接调用。
不过消息透传这块是一样的,也是拦截器,调用时把线程变量的标识传入 header,消费时从header 读出写入线程变量。
由于不是调用的方式,所以没有路由,只能从其他方面想办法。由于我不想建多个topic,所以使用的是消息染色的方案。
那就要利用到 kafka 消费者组的机制。
以蓝绿为例
一个 topic,两个集群,会启动三个消费者组,正常消费者组,蓝消费者组,绿消费者组。
对于一条消息来说,只有三种情况,正常消息,蓝消息,绿消息。
当开启蓝绿时,每个消费者组只消费对应的消息。关闭蓝绿时,蓝消费者组和绿消费者组都会关闭,只剩下正常消费者组,正常消费者组会消息所有消息。
kafka的代码会稍微多一点,贴出来会显得太拥挤了,这里只讲原理。