全链路灰度

前言

全链路灰度是一个很基础架构的项目,在我看来,能独立完成全链路灰度的方案设计和代码编写,那基础架构算是入门了。

全链路灰度的实现有很多方案,比如大火的 service mesh,通过修改容器的iptable来实现流量的拦截和转发,可以说这是终极的解决方案,真正实现代码 0 侵入。
这种方式优点明显缺点也明显,最关键的就是复杂度和性能。

当然还有其他的实现方式,比如在入口进行流量染色,在调用时根据标识进行路由。

本文讲的就是这种。

方案设计

所有的方案无外乎两点,流量染色和流量路由。
从接入层开始大致有以下框架。
nginx, gateway, restTemplate, dubbo, kafka(此处仅写kafka,其他消息中间件同理)。

前置

版本

灰度的本质是,不同的流量标识指向不同的集群。如何定义这个《不同》?
那就需要通过版本号,当然版本号可以通过多种方式指定,比如通过环境变量指定,但是我推荐的是通过 pom.xml 文件指定,所见即所得,而且可以通过 git 进行管理。版本应该是随代码走的。
版本号管理其实又是一件麻烦的事情。如多迭代并行等问题。这里不展开讨论,结论就是我们需要一个版本号,这个版本号在 pom.xml 里指定。可以参考以下参考文章。
参考文章:Add Build Properties to a Spring Boot Application

全局配置

这个时候我们就要思考,我们如何一声令下告诉所有服务开启了灰度,灰度路由的版本号。
所以我们需要一份全局配置。
这份全局配置我推荐放到 nacos 配置中心上,由 nacos 来做变更的推送。

配置如下

1
2
3
4
5
6
7
8
9
status: true
defaultValue: blue
list:
app1:
blue: 1.0.0
green: 1.0.1
app2:
blue: 2.0.0
green: 2.0.1

通过以下代码来进行监听配置变更。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@PostConstruct
public void init() throws NacosException {
//获取ConfigService
Properties properties = new Properties();
String namespace = nacosDiscoveryProperties.getNamespace();
properties.put(PropertyKeyConst.NAMESPACE, namespace);
properties.put(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
ConfigService configService = NacosFactory.createConfigService(properties);
configInfoLocal = configService.getConfigAndSignListener(NACOS_DATA_ID, nacosDiscoveryProperties.getGroup(), 10000, new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
log.info("刷新灰度配置: \n" + configInfo);
configInfoLocal = configInfo;
load();
}
});
load();
log.info("NacosListener: \n" + configInfoLocal);
}

具体节点

浏览器

http 请求我们一般通过 header 来传递流量标识,例如 x-gray-tag 就是我们的标识,
我们给请求 增加这个header x-gray-tag=blue,那么这个请求就会打到 blue 集群
x-gray-tag=green,那么这个请求就会打到 green 集群。
当我们使用浏览器时,我们可以使用一些插件(如:requestly),方便的给我们的请求增加标识,也就是实现我们说的流量染色。

nginx

nginx 做流量染色的做法是,例如给 a.com 和 b.com 两个不同的域名增加不同的header, 这样实现流量染色。
add_header x-gray-tag 'blue'
但是在nginx做流量路由我认为是比较麻烦的,尤其是当需要进行切换时。所以这里无论什么流量,我们都指向下游的 gateway,在 gateway 做流量的路由。

gateway,feign,resttemplate

到 gateway 实际上就进入了我们的微服务体系,进入了我们熟悉的环境。

其实无论是 gateway、 feign 还是 resttemplate 对应的都是 http 流量,原理都是一样的。

透传都是通过拦截器, 在调用时把灰度标识写入 header,在接受到时写入 线程变量

1
2
3
4
5
6
7
8
9
10
public class FeignGrayRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
String grayFlag = GrayContext.get();
String traceFlag = TraceContext.get();
if (grayFlag != null) {
requestTemplate.header(GrayConstant.GRAY_HEADER, grayFlag);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class GrayFlagInterceptor implements HandlerInterceptor {


@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 判断请求是否来自灰度
String grayFlag = request.getHeader(GrayConstant.GRAY_HEADER);
if (grayFlag != null) {
GrayContext.set(grayFlag);
}
return true;
}

@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
// 请求结束后清理线程变量,防止污染
GrayContext.clear();
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,
@Nullable Exception ex) throws Exception {
// 请求结束后清理线程变量,防止污染
GrayContext.clear();
}

}

路由都是实现 ReactorServiceInstanceLoadBalancer 接口,核心都是 Mono<Response<ServiceInstance>> choose(Request request)方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Mono<Response<ServiceInstance>> choose(Request request) {
GrayProperties grayProperties = grayConfig.getGrayConfig();
DefaultRequest defaultRequest = (DefaultRequest) request;
RequestDataContext requestDataContext = (RequestDataContext) defaultRequest.getContext();
String serviceId = requestDataContext.getClientRequest().getUrl().getHost();
log.debug(String.format("GrayLoadBalancer Choose %s", serviceId));
List<ServiceInstance> instances = nacosDiscoveryClient.getInstances(serviceId);
Response<ServiceInstance> instanceResponse;
if (Boolean.TRUE.equals(grayProperties.getStatus())) {
// 灰度
// 获取服务的 ServiceInstance 列表
if (grayProperties.getList().containsKey(serviceId)) {
// 执行负载均衡算法,选择一个 ServiceInstance
instanceResponse = getInstanceResponse(serviceId, instances);
return Mono.just(instanceResponse);
}
}
if (instances.isEmpty()) {
instanceResponse = new EmptyResponse();
} else if (instances.size() == 1) {
instanceResponse = new DefaultResponse(instances.get(0));
} else {
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
ServiceInstance instance = instances.get(pos % instances.size());
instanceResponse = new DefaultResponse(instance);
}
// 没开启灰度则轮询
return Mono.just(instanceResponse);
}

dubbo

dubbo同样也是两件事,灰度标识的透传和路由,dubbo的扩展主要是通过它的spi机制实现的。

透传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 消费者拦截器
@Activate(group = {CommonConstants.CONSUMER})
@Slf4j
public class DubboGrayConsumerFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
String grayFlag = null;
String traceFlag = null;
if (rpcContext.isConsumerSide()) {//InjvmProtocol
try {
BuildProperties buildProperties = SpringUtil.getBean(BuildProperties.class);
rpcContext.setAttachment(BaseConstant.DUBBO_CONSUMER_SERVICE_KEY, buildProperties.getName());
} catch (Exception e) {
log.error("DubboGrayConsumerFilter ",e);
}
grayFlag = GrayContext.get();
if (grayFlag != null) {
rpcContext.setAttachment(GrayConstant.GRAY_HEADER, grayFlag);
}
traceFlag = TraceContext.get();
if (traceFlag != null) {
rpcContext.setAttachment(GrayConstant.TRACE_HEADER, traceFlag);
if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(invoker.getUrl())) {
log.info("TraceContext Injvm call {}", JSONUtil.toJsonStr(invoker.getUrl().getParameters()));
} else {
String ipPort = invoker.getUrl().getHost() + ":" + invoker.getUrl().getPort();
String service = invoker.getUrl().getParameter(BaseConstant.DUBBO_SERVICE_KEY);
String version = invoker.getUrl().getParameter(BaseConstant.DUBBO_VERSION_KEY);
log.info("TraceContext dubbo trace: {} 准备调用下游服务 service:{} version: {} ip:{} 灰度标识 {}", traceFlag, service, version, ipPort, grayFlag);
if (service == null) {
log.info("TraceContext 下游服务没有版本标识: {}", JSONUtil.toJsonStr(invoker.getUrl().getParameters()));
}
}
}
}
return invoker.invoke(invocation);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 生产者拦截器
@Activate(group = {CommonConstants.PROVIDER})
@Slf4j
public class DubboGrayProviderFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, String> attachments = invocation.getAttachments();
String grayFlag = attachments.get(GrayConstant.GRAY_HEADER);
String traceFlag = attachments.get(GrayConstant.TRACE_HEADER);
Result invoke;
try {
if (grayFlag != null) {
GrayContext.set(grayFlag);
}
if (traceFlag != null) {
TraceContext.set(traceFlag);
String consumer = attachments.get(BaseConstant.DUBBO_CONSUMER_SERVICE_KEY);
log.info("TraceContext dubbo trace: {}, 上游: {}, 灰度标识: {}", traceFlag, consumer, grayFlag);
}
invoke = invoker.invoke(invocation);
} finally {
// if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(invoker.getUrl()) ) {
if (isInjvm(invoker.getUrl())) {
if (traceFlag != null) {
log.info("TraceContext Injvm call 不清空线程变量 {}", JSONUtil.toJsonStr(invoker.getUrl().getParameters()));
}
} else {
GrayContext.clear();
TraceContext.clear();
}
}
return invoke;
}


public static boolean isInjvm(URL url) {
if (url.getProtocol().equals(LOCAL_PROTOCOL)) {
return true;
}
return false;
}
}

路由

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

@Slf4j
public class GrayRouter extends AbstractRouter {
private static final int TAG_ROUTER_DEFAULT_PRIORITY = 150;

public GrayRouter(URL url) {
this.url = url;
this.priority = TAG_ROUTER_DEFAULT_PRIORITY;
}

@Override
public URL getUrl() {
return url;
}

@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return invokers;
}
return filterTag(invokers, url, invocation);
}

/**
* 过滤下游的服务
*
* @param invokers
* @param url
* @param invocation
* @param <T>
* @return
*/
private <T> List<Invoker<T>> filterTag(List<Invoker<T>> invokers, URL url, Invocation invocation) {
List<Invoker<T>> result = invokers;
GrayConfig grayConfig = SpringUtil.getBean(GrayConfig.class);
GrayProperties grayProperties = grayConfig.getGrayConfig();
// 如果开启了灰度
if (Boolean.TRUE.equals(grayProperties.getStatus())) {
// 下游服务名
String serviceName = invokers.stream().findFirst().map(invoker -> invoker.getUrl().getParameter(BaseConstant.DUBBO_SERVICE_KEY)).orElse("").trim();
// 准备路由的版本号
// 灰度名单包含下游服务
if (grayProperties.getList().containsKey(serviceName)) {
// 过滤出符合灰度版本的invokers
String version = grayProperties.getVersion(serviceName);
invokers = invokers.stream().filter(invoker -> invoker.getUrl().getParameter(BaseConstant.DUBBO_VERSION_KEY).equals(version)).collect(Collectors.toList());
}
result = invokers;
}
return result;
}


@Override
public <T> void notify(List<Invoker<T>> invokers) {
super.notify(invokers);
}

@Override
public int compareTo(Router o) {
return super.compareTo(o);
}
}


@Activate(order = 100)
public class GrayRouterFactory implements RouterFactory {


@Override
public Router getRouter(URL url) {
return new GrayRouter(url);
}
}

kafka

kafka 逻辑类似,但是 kafka 是消息订阅的模式,不是直接调用。
不过消息透传这块是一样的,也是拦截器,调用时把线程变量的标识传入 header,消费时从header 读出写入线程变量。

由于不是调用的方式,所以没有路由,只能从其他方面想办法。由于我不想建多个topic,所以使用的是消息染色的方案。
那就要利用到 kafka 消费者组的机制。
以蓝绿为例
一个 topic,两个集群,会启动三个消费者组,正常消费者组,蓝消费者组,绿消费者组。
对于一条消息来说,只有三种情况,正常消息,蓝消息,绿消息。
当开启蓝绿时,每个消费者组只消费对应的消息。关闭蓝绿时,蓝消费者组和绿消费者组都会关闭,只剩下正常消费者组,正常消费者组会消息所有消息。

kafka的代码会稍微多一点,贴出来会显得太拥挤了,这里只讲原理。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!