动态数据源

背景

最近几年 saas 很火。saas 对数据安全的要求会比较高,往往会要求每个商户,一个独立的数据库。
但是数据库太多,就很难进行管理和切库。

对此数据库的管理,可以使用一个系统进行存储,由DBA进行维护。而切库就是重中之重了。

这里其实有很多概念,动态数据源,远程数据源,分库分表,分布式事务等

概念说明:
多数据源:一个项目同时使用多个数据库。
动态数据源:实时获取数据源信息生成DataSource,后续可能会更改,比如按CompanyId获取数据源。
远程数据源:仅在启动的时候,从远程(资源配置中心)获取数据源信息,后续不会更改。

这篇文章只写动态数据源

最底层使用的是 com.baomidoudynamic-datasource-spring-boot-starter
这是 Mybatis-Plus 作者提供的方案。该库提供了多数据源以及切库的能力。
我们对该方案进行的扩展,增加动态获取数据源的方法。

设计思路

dynamic-datasource-spring-boot-starter 可以实现多数据源,所有的数据源是启动时从yaml配置文件中加载的。
实现原理是,在方法调用时,通过AOP拦截@DS注解,把预先配置好的数据源加载到线程变量中。
在真正使用的时候,就是使用的我们想用的数据源。

所以要实现动态数据源就是修改这个过程。

这里我定义了一个 Providor 这个接口有一个方法,会返回给我们具体的数据源。
通过重写 dynamic-datasource-spring-boot-starterdetermineDataSource方法,在 @DS后面是以 Providor 结尾的,就会调用特定的实现方法,
Providor 作为提供者,同时也是管理者,会维护DataSource的缓存。
所以核心其实就两步,
第一步:重写 determineDataSource
第二步:提供 Providor

源码解析

dynamic-datasource-spring-boot-starter 源码解析

这里不会讲解所有的源码,只选择关键部分

看 springboot 的 starter项目,一般可直接找到 AutoConfiguration结尾的文件。
这个项目就是 DynamicDataSourceAutoConfiguration

先看这一段,创建了一个路由用的DataSource。

1
2
3
4
5
6
7
8
9
10
11
@Bean
@ConditionalOnMissingBean
public DataSource dataSource() {
DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
dataSource.setPrimary(properties.getPrimary());
dataSource.setStrict(properties.getStrict());
dataSource.setStrategy(properties.getStrategy());
dataSource.setP6spy(properties.getP6spy());
dataSource.setSeata(properties.getSeata());
return dataSource;
}

这个DataSource 内部维护了一个 map。这个map里存储的就是实际使用的数据源。

1
private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();

在其抽象类 AbstractRoutingDataSource 中 通过 determineDataSource()选择数据源,再获取 connection。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Connection getConnection() throws SQLException {
String xid = TransactionContext.getXID();
if (StringUtils.isEmpty(xid)) {
return determineDataSource().getConnection();
} else {
String ds = DynamicDataSourceContextHolder.peek();
ds = StringUtils.isEmpty(ds) ? "default" : ds;
ConnectionProxy connection = ConnectionFactory.getConnection(ds);
return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
}
}

那么再分析 determineDataSource(), 分组的情况我们不讨论,普通情况下,就是从 dataSourceMap 拿一个DataSource。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public DataSource determineDataSource() {
String dsKey = DynamicDataSourceContextHolder.peek();
return getDataSource(dsKey);
}

public DataSource getDataSource(String ds) {
if (StringUtils.isEmpty(ds)) {
return determinePrimaryDataSource();
} else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
return groupDataSources.get(ds).determineDataSource();
} else if (dataSourceMap.containsKey(ds)) {
log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
return dataSourceMap.get(ds);
}
if (strict) {
throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
}
return determinePrimaryDataSource();
}

最核心的代码其实就这些,那么,我们只需要扩展这里就可以了。

扩展部分 源码解析

DynamicRoutingDataSource 使用了@ConditionalOnMissingBean
那么我们只需要在自己的代码里注入一个自己的 DynamicRoutingDataSource 就可以覆盖原方法。

源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class XXDynamicRoutingDataSource extends DynamicRoutingDataSource {

public static final String PROVIDER_SUFFIX = "Provider";

@Autowired
private Map<String, XXDataSourceProvider> providerMap;

@Override
public DataSource determineDataSource() {
String dsKey = DynamicDataSourceContextHolder.peek();
if (dsKey == null) {
// 会调用父类方法,返回 primary 数据库的信息
return getDataSource(null);
}
// 指定远端的从 PROVIDER 加载
if (dsKey.endsWith(PROVIDER_SUFFIX)) {
XXDataSourceProvider dynamicDataSourceProvider = providerMap.get(dsKey);
if (dynamicDataSourceProvider == null) {
// 未找到此 Provider,直接返回默认数据源
return getDataSource(dsKey);
}
// 调用 provider 提供的 getDataSource 方法
return dynamicDataSourceProvider.getDataSource(dsKey);
}
// 否则从原配置文件的数据库加载
return getDataSource(dsKey);
}
}

这里很好理解,只有 @DS内部是以 Provider 这个字符串结尾的,才做特殊处理,其他的都调父类的方法。

那么 剩下的就是 XXDataSourceProvider 怎么写了。
这里我定义了一个接口,一个抽象类。
接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface AkDataSourceProvider {

/**
* 同时兼容多数据源(多主多从等情况)的情况
* 加载数据源,并保存到内存中
*
* @param dataSourceName
* @return
*/
Map<String, DataSource> loadDataSources(String dataSourceName);

/**
* 从内存中获取DataSource,如果内存中不存在,不会动态加载
*
* @param dataSourceKey
* @return
*/
DataSource getDataSource(String dataSourceKey);

}

抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/**
* 抽象 动态数据源提供者,
* 里面提供了一些方法,如果和需求不匹配,可按需重写方法
* <p>
* 第一步 创建 DataSourceProperty
* DataSourceProperty dataSourceProperty = new DataSourceProperty();
* dataSourceProperty.setUrl(url);
* dataSourceProperty.setUsername(dbInstanceInfo.getUsername());
* dataSourceProperty.setPassword(dbInstanceInfo.getPassword());
* dataSourceProperty.setDriverClassName("org.postgresql.Driver");
* 第二步
* 调用 createDataSourceMap 生成 DataSource
* 第三步
* 生成的 DataSource 加入到 缓存 中
*
* @Description
* @Version V1.0.0
* @Date 2021/12/17
*/
public abstract class AbstractXXDataSourceProvider extends AbstractDataSourceProvider implements XXDataSourceProvider {

/**
* 数据源提供者自行管理数据源
* 此处使用缓存框架,防止项目启动时多线程同时加载导致多次请求远程服务器和数据源频繁替换的问题
*/
protected final Cache<String, DataSource> cache = init();

/**
* 初始化方法
* 子类可以重写该方法
* @return
*/
protected Cache<String, DataSource> init() {
return Caffeine.newBuilder()
.initialCapacity(5) // 初始化 5 个
.maximumSize(100) // 最大 100 个
.build();
}


/**
* 默认实现的获取DataSource
* <p>
* 如果存在 不是根据 datasourceKey 和 实际使用的 DataSource 不一样,则需要重写该方法
* 例如如果是根据 companyId 做为 key
*
* @param dataSourceKey
* @return
*/
@Override
public DataSource getDataSource(String dataSourceKey) {
return cache.get(dataSourceKey, key -> {
Map<String, DataSource> dataSourceMap = loadDataSources(key);
DataSource ds;
for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
ds = entry.getValue();
if (ds != null) {
return ds;
}
}
return null;
});
}

/**
* 根据 DataSourceProperty 生成 DataSource
*
* @param dataSourcePropertiesMap
* @return
*/
@Override
protected Map<String, DataSource> createDataSourceMap(Map<String, DataSourceProperty> dataSourcePropertiesMap) {
return super.createDataSourceMap(dataSourcePropertiesMap);
}

/**
* 关闭旧的数据源
*
* @param dsKey
* @param dataSource
*/
public void closeDataSource(String dsKey, DataSource dataSource) {
if (dataSource != null) {
if (dataSource instanceof ItemDataSource) {
((ItemDataSource) dataSource).close();
}
cache.invalidate(dsKey);
// 此处考虑增加 DataSourceProxy 的关闭对象方法
}
}

/**
* 这个方法是框架使用的,这里返回空,这样自定义的数据源可以和配置文件的数据源隔离
* 此处不禁止重写,但是如果不理解这个流程,最好不要对该方法进行重写
*
* @return
*/
@Override
public Map<String, DataSource> loadDataSources() {
return new HashMap<>(0);
}

}

实现类则需要继承抽象类并按需重写里面的方法即可。
比如按公司Id进行切库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public DataSource getDataSource(String dataSourceKey) {
// 改成从线程变量获取
Long companyId = 123456L;
DataSource dataSource = super.getDataSource(String.valueOf(companyId));
if (dataSource != null) {
log.info(dataSourceKey + "切换到数据源: " + companyId);
}
return dataSource;
}


/**
* 实际加载,从远端加载数据库
*
* @return
*/
@Override
public Map<String, DataSource> loadDataSources(String dataSourceName) {
// 改成从线程变量获取
Long companyId = 123456;
DataSourceProperty dataSourceProperty = loadFromRemote();
Map<String, DataSourceProperty> map = new HashMap<>();
map.put(String.valueOf(companyId), dataSourceProperty);
return createDataSourceMap(map);
}

抽象类实现了选择数据库和切换数据库。实现类则需要实现怎么从远程取数据库,怎么管理缓存。
抽象类只是技术交流,没有任何商业信息,或者说只是寥寥几笔的在巨人的肩膀上扩展了一下。实现类就不贴源码了。

改进

  1. 缓存要可以清理。
  2. companyId和datasource之前其实还应该再做一层。

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!