基于mmap的存储方案

技术服务于业务,选择一种技术要看这个技术是不适合业务需求。
mmap内存映射的文件存储,是一项典型的中间件技术,一般的业务开发是想不到这种做法的。

我们这次的业务场景是,根据上游数据计算出当前状态并保存。上游的推送速度大约是 1.5 万/s,计算结果单条 2kb 左右,而且有 5 台这样的服务器。

这是一个典型的计算密集型系统。同时需要预存大量用于计算的数据。计算结果具备时间相关性,一般只会更新最后一条数据。

为什么不存redis?

  1. 性能
    redis的qps 在十万左右。5台形态服务器,均值QPS在1万3,峰值按2倍计算,是超过10万的。当然可以使用批处理来优化,但是这里还是会有不少的网络io开销,攒批的话,本地势必也需要多用很多内存来存储数据。
  2. 代码复杂度,每一个要查形态的服务,都需要再接入一个redis的配置。
  3. 内存很贵,但是硬盘很便宜
  4. 带宽开销
  5. 模型我不喜欢
    Do not communicate by sharing memory; instead, share memory by communicating.
    不要通过共享内存来通信,⽽应通过通信来共享内存。

所以最佳方案其实就是写本地存储,通过对外暴露接口来提供数据。

写本地存储本质上就是要写一个简易的数据库。方案也有很多,

  1. 写堆内存
  2. 追加写文件
  3. mmap 内存映射

写堆内存会消耗大量的内存空间,追加写文件在需要读的时候又会涉及多次内存复制。

那么为什么要使用mmap呢

  1. 性能
    直接使用本地内存,没有IO开销,性能最优。
    使用的是堆外内存,减少GC的压力。
    如果再能结合sendfile,性能可以再上一个台阶。
  2. 数据安全
    理论上应用重启不会丢数据,除非操作系统宕机,操作系统宕机的概率很小,而且就算宕机,理论上只会丢30秒的数据。
  3. 内存稳定
    5分K实际存储超过5个G的数据,盘前和盘后使用的内存基本一致。这里直接利用了操作系统刷新脏页的机制。

我把一个文件映射到内存,然后给这个文件做区域划分。 1 行是 4 kb,足够承载我们一条 2 kb 的数据,一只股票一条要 78 个点位,就按顺序一路下来。第一支股票 0-78,第二只 79-157,以此类推。

接下来就到了代码编写
有一位大师曾经说过,任何计算机问题都可以通过增加一层来解决。我们写代码也是这样,尽量不要写面条代码,要做好层与层之间的隔离。
写这套代码的时候,我们也是,首先应该写一个面向底层的代码。
对底层来说,最重要的就是 read 和 write 两个方法,参数应该是 byte数组,这样写的代码会更具有通用性,更容易被复用。
面向底层的代码缺点就是不方便使用,那么我们还应该写一个中间层,面向上层应用,同样也是 read 和 write 两个方法,但是入参就是上层所使用的对象了。这样上层如果不想使用这个方案了,也只需要修改中间层的代码,不用改整个业务逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
* 面向底层的文件按页读写,不要直接对业务暴露
*
* @date 2023-10-27 21:31
* @Description
**/
@Slf4j
public class MmapContainer {


/**
* 映射的文件
*/
private String mappedFileStr;
private File mappedFile;

/**
* 总字节数
*/
private Long totalByte;

/**
* 总页数
*/
private Long totalPage;
/**
* 每页字节数
*/
private Integer pageSize = 4 * 1024; // 4k

/**
* Mapping 列表
* 默认每 1gb 增加一个
*/
private List<MappedByteBuffer> mappedByteBufferList = new ArrayList<>();


private Long sizePerFile = 1 * 1024 * 1024 * 1024L; // 1Gb
private Long pagePerFile = sizePerFile / pageSize; // 1Gb/pageSize

private final ReentrantLock lock = new ReentrantLock(false);

private static String LINE_SEPARATOR = System.getProperty("line.separator");

private static final byte[] LINE_SEPARATOR_BYTE = LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8);


public MmapContainer(String mappedFileStr, Long totalPage) {
this.mappedFileStr = mappedFileStr;
this.mappedFile = new File(mappedFileStr);
this.totalPage = totalPage;
this.totalByte = totalPage * pageSize;
}

@SneakyThrows
public void init() {
int mappedCount = (int) Math.ceil((double) totalPage / pagePerFile);
log.info("init: mappedCount: " + mappedCount);
for (int i = 0; i < mappedCount; i++) {
try (FileChannel fileChannel = FileChannel.open(Paths.get(mappedFile.getPath()),
StandardOpenOption.CREATE,
StandardOpenOption.READ,
// StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE)) {
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, sizePerFile * i, sizePerFile);
mappedByteBufferList.add(mappedByteBuffer);
}
}
log.info("init 结束");
}

public void clear() {
// 解除映射
unmap();
// 删除文件
// mappedFile.deleteOnExit();
if (mappedFile.exists()) {
mappedFile.delete();
}
log.info("删除文件 {} 结果:{}", mappedFile.getName(), !mappedFile.exists());
}

/**
* 写一页数据
*
* @param page
* @param data
* @return
*/
public void write(Integer page, byte[] data) {
if (page > totalPage) {
throw new IndexOutOfBoundsException("超过总页数,写失败");
}
if (data.length > pageSize) {
throw new IndexOutOfBoundsException("写入内容超过一页容量");
}
Integer fileIndex = getFileIndex(page);
Integer pageIndex = getPageIndex(page);
ByteBuffer slice = mappedByteBufferList.get(fileIndex).slice();
slice.position(pageIndex * pageSize);
int length = data.length;
slice.put(ByteUtil.intToBytes(length)); // 一定是4个字节
slice.put(data);
slice.put(LINE_SEPARATOR_BYTE);
}

/**
* 读一页数据
*
* @param page
* @return
*/
public byte[] read(Integer page) {
if (page > totalPage) {
throw new IndexOutOfBoundsException("超过总页数,读失败");
}
Integer fileIndex = getFileIndex(page);
Integer pageIndex = getPageIndex(page);
ByteBuffer slice = mappedByteBufferList.get(fileIndex).slice();
slice.position(pageIndex * pageSize);
byte[] length = new byte[4];
slice.get(length);
int dataLength = ByteUtil.bytesToInt(length);
if (dataLength == 0) {
return new byte[0];
}
slice.position(pageIndex * pageSize + 4);
byte[] data = new byte[dataLength];
slice.get(data);
return data;
}


/**
* 获取所在映射的下标
*
* @param page
* @return
*/
public Integer getFileIndex(Integer page) {
return Math.toIntExact(page / pagePerFile);
}

/**
* 获取映射对应的行数偏移量
*
* @param page
* @return
*/
public Integer getPageIndex(Integer page) {
return Math.toIntExact(page % pagePerFile);
}


/**
* copy from FileChannelImpl#unmap(私有方法)
* 解除map
*/
public void unmap() {
lock.lock();
try {
for (MappedByteBuffer mappedByteBuffer : mappedByteBufferList) {
if (mappedByteBuffer == null) {
return;
}
mappedByteBuffer.force();
Cleaner cl = ((DirectBuffer) mappedByteBuffer).cleaner();
if (cl != null) {
cl.clean();
}
}
} finally {
lock.unlock();
}
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
@Slf4j
public abstract class AbstractLineShapeMmapContainer implements ILineShapeMmapContainer {


// 用于比较的上一个形态指标数据(日K 取昨天最后一个,分时k 取上一个时间点最后一个,如果没有取前一天最后一个)
protected ConcurrentHashMap<String, Optional<LineShapeMQEntity>> lastLineShapeMap = new ConcurrentHashMap<>();
// code,Tuple2.v1 表示这支股票的序号,Tuple2.v2 标识已写入的点位
protected ConcurrentHashMap<String, Tuple2<Integer, BitSet>> lineShapeMap = new ConcurrentHashMap<>();


// 有多少个点位 5 分K
protected Integer pointNumber;

public static final Integer OPENING_POINT = 570;

protected MmapContainer mmapContainer;

protected String calcDuration;

protected Long totalPage;

protected String fileName;

protected String market;

@Resource
protected MongoTemplate mongoTemplate;

@Resource
protected ValueConfig valueConfig;


public MmapContainer getMmapContainer() {
return mmapContainer;
}

@PostConstruct
@SneakyThrows
public void init() {
if (LineShapeMQKeyConstants.dailyCalcDuration.equals(calcDuration)) {
pointNumber = 1;
} else if (LineShapeMQKeyConstants.min120CalcDuration.equals(calcDuration)) {
pointNumber = 4;
} else if (LineShapeMQKeyConstants.min60CalcDuration.equals(calcDuration)) {
pointNumber = 7;
} else if (LineShapeMQKeyConstants.min15CalcDuration.equals(calcDuration)) {
pointNumber = 28;
} else if (LineShapeMQKeyConstants.min5CalcDuration.equals(calcDuration)) {
pointNumber = 78;
}

// 港股 按 5000支股票 * 点位数
mmapContainer = new MmapContainer(fileName, totalPage * pointNumber);

if (FileUtil.exist(new File(fileName))) {
mmapContainer.init();
reload();
} else {
mmapContainer.init();
}
}


/**
* 重建索引
* 此处可以做一个页头来优化性能
*/
public void reload() {
log.info("LineShapeMmapContainer reload start:{} ", System.currentTimeMillis());
// 遍历文件每一页,写入内存索引
for (int i = 0; i < totalPage * pointNumber; i++) {
byte[] read = mmapContainer.read(i);
if (read.length == 0) {
continue;
}
LineShapeMQEntity lineShapeMQDomain = JsonUtils.toBean(new String(read), LineShapeMQEntity.class);
String stockCode = lineShapeMQDomain.getStockKline().getCode();
lineShapeMap.computeIfAbsent(stockCode, s -> Tuple.tuple(lineShapeMap.size(), new BitSet(Math.toIntExact(pointNumber))));
Integer point = getPoint(lineShapeMQDomain.getStockKline().getTime());
if (point < 0) {
continue;
}
Tuple2<Integer, BitSet> tuple = lineShapeMap.get(stockCode);
tuple.getV2().set(point);
}
log.info("LineShapeMmapContainer reload end:{} ", System.currentTimeMillis());
}



/**
* 清理数据
* 每天需要清理一次数据
*/
public void clear() {
// 清理内存索引
lineShapeMap.clear();
// 清理昨天最后一条数据缓存
lastLineShapeMap.clear();
// 删除文件
mmapContainer.clear();
// 重新初始化
mmapContainer.init();
}

/**
* 计算轨迹然后保存到本地内存
*
* @param lineShapeMQDomain
* @return 返回Json序列化后的字节数组,减少一次发送时的转换
*/
public Tuple2<Boolean, byte[]> calTraceAndWrite(LineShapeMQEntity lineShapeMQDomain) {
// 轨迹计算(取上一个周期形态数据对比,计算 首次、持续、失效)
LineShapeMQEntity lastLineShapeMQDomain;
Integer point = getPoint(lineShapeMQDomain.getStockKline().getTime());
if (point < 0) {
return Tuple.tuple(false, new byte[0]);
}
int lastPoint = point - 1;
if (lastPoint >= 0) {
// 从本地内存取今天的数据
Tuple2<Integer, BitSet> tuple = lineShapeMap.get(lineShapeMQDomain.getStockKline().getCode());
if (tuple == null) {
lastLineShapeMQDomain = getYesterdayLast(lineShapeMQDomain.getStockKline().getCode(), lineShapeMQDomain.getDurationEnum());
} else {
byte[] read = new byte[0];
try {
read = mmapContainer.read(tuple.getV1() * pointNumber + lastPoint);
} catch (Exception e) {
log.error("写文件读上一条形态数据失败,code:{}, v1:{},pointNumber:{},lastPoint:{}", lineShapeMQDomain.getStockKline().getCode(), tuple.getV1(), pointNumber, lastPoint);
throw e;
}
if (read.length > 0) {
lastLineShapeMQDomain = JsonUtils.toBean(new String(read), LineShapeMQEntity.class);
} else {
lastLineShapeMQDomain = null;
}
}
} else {
lastLineShapeMQDomain = getYesterdayLast(lineShapeMQDomain.getStockKline().getCode(), lineShapeMQDomain.getDurationEnum());
}
TraceTriggerHelper.traceTrigger(lastLineShapeMQDomain, lineShapeMQDomain);
Boolean needPush = TraceTriggerHelper.needPush(lastLineShapeMQDomain, lineShapeMQDomain);
return Tuple.tuple(needPush, write(lineShapeMQDomain));
}


public byte[] write(LineShapeMQEntity lineShapeMQDomain) {
String stockCode = lineShapeMQDomain.getStockKline().getCode();
lineShapeMap.computeIfAbsent(stockCode, s -> Tuple.tuple(lineShapeMap.size(), new BitSet(Math.toIntExact(pointNumber))));
// 写位表
Integer point = getPoint(lineShapeMQDomain.getStockKline().getTime());
if (point < 0 || point >= pointNumber) {
// 防止数据有问题导致的越界
return new byte[0];
}
Tuple2<Integer, BitSet> tuple = lineShapeMap.get(stockCode);
tuple.getV2().set(point);
// 写文件。JSON简单,可读,压缩比和读写性能还算不错。后续可考虑使用更高效的序列化方式
byte[] jsonBytes = JsonUtils.toJsonBytes(lineShapeMQDomain);
// String json = new String(jsonBytes, Charset.forName("UTF-8"));
mmapContainer.write(tuple.getV1() * pointNumber + point, jsonBytes);
return jsonBytes;
}

/**
* 读取今天的形态指标数据
*
* @param stockCode
* @return
*/
public List<LineShapeMQEntity> getByStockCode(String stockCode) {
return getByStockCode(stockCode, true);
}

public List<LineShapeMQEntity> getByStockCode(String stockCode, boolean validData) {
List<LineShapeMQEntity> result = new ArrayList<>();
if (!lineShapeMap.containsKey(stockCode)) {
return new ArrayList<>();
}
// lineShapeMap.computeIfAbsent(stockCode, s -> Tuple.tuple(lineShapeMap.size(), new BitSet(Math.toIntExact(pointNumber))));
Tuple2<Integer, BitSet> tuple = lineShapeMap.get(stockCode);
if (validData) {
// 这种写法会只返回索引中存在的
for (int i = tuple.getV2().nextSetBit(0); i >= 0; i = tuple.getV2().nextSetBit(i + 1)) {
byte[] read = mmapContainer.read(tuple.getV1() * pointNumber + i);
if (read.length > 0) {
LineShapeMQEntity lineShapeMQDomain = JsonUtils.toBean(new String(read), LineShapeMQEntity.class);
result.add(lineShapeMQDomain);
if (!lineShapeMQDomain.getStockKline().getCode().equals(stockCode)) {
log.error("Mmap 查询数据越界 {}", JsonUtils.toString(lineShapeMQDomain));
}
}
}
} else {
// 这种写法会使用null进行占位
int n = 0;
while (n < pointNumber) {
byte[] read = mmapContainer.read(tuple.getV1() * pointNumber + n);
if (read.length > 0) {
LineShapeMQEntity lineShapeMQDomain = JsonUtils.toBean(new String(read), LineShapeMQEntity.class);
result.add(lineShapeMQDomain);
} else {
result.add(null);
}
n++;
}
}
return result;
}


@Override
public LineShapeMQEntity getByStockCode(String stockCode, Long time) {
if (Boolean.FALSE.equals(hasThisPoint(stockCode, time))) {
return null;
}
Integer point = getPoint(time);
Tuple2<Integer, BitSet> tuple = lineShapeMap.get(stockCode);
byte[] read = mmapContainer.read(tuple.getV1() * pointNumber + point);
LineShapeMQEntity result = null;
if (read.length > 0) {
result = JsonUtils.toBean(new String(read), LineShapeMQEntity.class);
}
return result;
}

/**
* 判断是否有这个点位
*
* @param stockCode
* @param time
* @return
*/
public Boolean hasThisPoint(String stockCode, Long time) {
int point = getPoint(time);
if (point < 0) {
return false;
}
Tuple2<Integer, BitSet> tuple = lineShapeMap.get(stockCode);
if (tuple == null) {
return false;
} else {
return tuple.getV2().get(point);
}
}


/**
* 取时间对应的点位
* 以5分K 为例
* 09:30 对应的就是 0点位
* 09:31 对应的就是 0点位
* 09:33 对应的就是 0点位
* 09:35 对应的就是 1点位
* 09:40 对应的就是 2点位
* <p>
* 这里做一个兼容。行情给的K线实际上取了整点,比如 09:33 的K线,时间是 09:35。
* 也就是说第一条K线是 35分开始的。
* 这里的做法是,如果秒数大于等于1算下一根
*
* @param time
* @return
*/
public Integer getPoint(Long time) {
ZoneId timeZoneId;
if (market.equals("hk")) {
timeZoneId = ZoneOffset.ofHours(8);
} else {
timeZoneId = ZoneId.of("America/New_York");
}
LocalDateTime localDateTime = LocalDateTimeUtil.of(time, timeZoneId);
if (localDateTime.getSecond() == 0) {
localDateTime = localDateTime.minusSeconds(1);
}
int minuteFromOpening;
// if (market.equals("hk")) {
// minuteFromOpening = localDateTime.getHour() * 60 + localDateTime.getMinute() - OPENING_POINT;
// } else {
// minuteFromOpening = localDateTime.getHour() * 60 + localDateTime.getMinute() - 240;
// }
minuteFromOpening = localDateTime.getHour() * 60 + localDateTime.getMinute() - OPENING_POINT;
int point;
if (LineShapeMQKeyConstants.dailyCalcDuration.equals(calcDuration)) {
point = 0;
return point;
} else if (LineShapeMQKeyConstants.min120CalcDuration.equals(calcDuration)) {
point = (int) Math.floor((double) minuteFromOpening / 120); //向下取整
} else if (LineShapeMQKeyConstants.min60CalcDuration.equals(calcDuration)) {
point = (int) Math.floor((double) minuteFromOpening / 60);
} else if (LineShapeMQKeyConstants.min15CalcDuration.equals(calcDuration)) {
point = (int) Math.floor((double) minuteFromOpening / 15);
} else if (LineShapeMQKeyConstants.min5CalcDuration.equals(calcDuration)) {
point = (int) Math.floor((double) minuteFromOpening / 5);
} else {
throw new IllegalArgumentException("calcDuration 类型错误");
}
if (point < 0 || point >= pointNumber) {
// 防止数据有问题导致的越界
return -1;
}
return point;
}


public List<LineShapeMQEntity> getAllData() {
List<LineShapeMQEntity> result = new ArrayList<>();
for (String stockCode : lineShapeMap.keySet()) {
Tuple2<Integer, BitSet> tuple = lineShapeMap.get(stockCode);
for (int i = 0; i < pointNumber; i++) {
byte[] read = mmapContainer.read(tuple.getV1() * pointNumber + i);
if (read.length > 0) {
LineShapeMQEntity lineShapeMQDomain = JsonUtils.toBean(new String(read), LineShapeMQEntity.class);
result.add(lineShapeMQDomain);
}
}
}
return result;
}

}



基于mmap的存储方案
http://relengxing.tech/2024/02/01/基于mmap的存储方案/
作者
relengxing
发布于
2024年2月1日
许可协议