基于mmap的存储方案

技术服务于业务,选择一种技术要看这个技术是不适合业务需求。
mmap内存映射的文件存储,是一项典型的中间件技术,一般的业务开发是想不到这种做法的。

我们这次的业务场景是,根据上游数据计算出当前状态并保存。上游的推送速度大约是 1.5 万/s,计算结果单条 2kb 左右,而且有 5 台这样的服务器。
业务逻辑
这是一个典型的计算密集型系统。同时需要预存大量用于计算的数据。计算结果具备时间相关性,一般只会更新最后一条数据。

为什么不存redis?

  1. 性能
    redis的qps 在十万左右。5台形态服务器,均值QPS在1万3,峰值按2倍计算,是超过10万的。当然可以使用批处理来优化,但是这里还是会有不少的网络io开销,攒批的话,本地势必也需要多用很多内存来存储数据。
  2. 代码复杂度,每一个要查形态的服务,都需要再接入一个redis的配置。
  3. 内存很贵,但是硬盘很便宜
  4. 带宽开销
  5. 模型我不喜欢
    Do not communicate by sharing memory; instead, share memory by communicating.
    不要通过共享内存来通信,⽽应通过通信来共享内存。

所以最佳方案其实就是写本地存储,通过对外暴露接口来提供数据。

写本地存储本质上就是要写一个简易的数据库。方案也有很多,

  1. 写堆内存
  2. 追加写文件
  3. mmap 内存映射

写堆内存会消耗大量的内存空间,追加写文件在需要读的时候又会涉及多次内存复制。

那么为什么要使用mmap呢

  1. 性能
    直接使用本地内存,没有IO开销,性能最优。
    使用的是堆外内存,减少GC的压力。
    如果再能结合sendfile,性能可以再上一个台阶。
  2. 数据安全
    理论上应用重启不会丢数据,除非操作系统宕机,操作系统宕机的概率很小,而且就算宕机,理论上只会丢30秒的数据。
  3. 内存稳定
    5分K实际存储超过5个G的数据,盘前和盘后使用的内存基本一致。这里直接利用了操作系统刷新脏页的机制。

内存规划
我把一个文件映射到内存,然后给这个文件做区域划分。 1 行是 4 kb,足够承载我们一条 2 kb 的数据,一只股票一条要 78 个点位,就按顺序一路下来。第一支股票 0-78,第二只 79-157,以此类推。

接下来就到了代码编写
有一位大师曾经说过,任何计算机问题都可以通过增加一层来解决。我们写代码也是这样,尽量不要写面条代码,要做好层与层之间的隔离。
写这套代码的时候,我们也是,首先应该写一个面向底层的代码。
对底层来说,最重要的就是 read 和 write 两个方法,参数应该是 byte数组,这样写的代码会更具有通用性,更容易被复用。
面向底层的代码缺点就是不方便使用,那么我们还应该写一个中间层,面向上层应用,同样也是 read 和 write 两个方法,但是入参就是上层所使用的对象了。这样上层如果不想使用这个方案了,也只需要修改中间层的代码,不用改整个业务逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/**
* 面向底层的文件按页读写,不要直接对业务暴露
*
* @date 2023-10-27 21:31
* @Description
**/
@Slf4j
public class MmapContainer {


/**
* 映射的文件
*/
private String mappedFileStr;
private File mappedFile;

/**
* 总字节数
*/
private Long totalByte;

/**
* 总页数
*/
private Long totalPage;
/**
* 每页字节数
*/
private Integer pageSize = 4 * 1024; // 4k

/**
* Mapping 列表
* 默认每 1gb 增加一个
*/
private List<MappedByteBuffer> mappedByteBufferList = new ArrayList<>();


private Long sizePerFile = 1 * 1024 * 1024 * 1024L; // 1Gb
private Long pagePerFile = sizePerFile / pageSize; // 1Gb/pageSize

private final ReentrantLock lock = new ReentrantLock(false);

private static String LINE_SEPARATOR = System.getProperty("line.separator");

private static final byte[] LINE_SEPARATOR_BYTE = LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8);


public MmapContainer(String mappedFileStr, Long totalPage) {
this.mappedFileStr = mappedFileStr;
this.mappedFile = new File(mappedFileStr);
this.totalPage = totalPage;
this.totalByte = totalPage * pageSize;
}

@SneakyThrows
public void init() {
int mappedCount = (int) Math.ceil((double) totalPage / pagePerFile);
log.info("init: mappedCount: " + mappedCount);
for (int i = 0; i < mappedCount; i++) {
try (FileChannel fileChannel = FileChannel.open(Paths.get(mappedFile.getPath()),
StandardOpenOption.CREATE,
StandardOpenOption.READ,
// StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.WRITE)) {
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, sizePerFile * i, sizePerFile);
mappedByteBufferList.add(mappedByteBuffer);
}
}
log.info("init 结束");
}

public void clear() {
// 解除映射
unmap();
// 删除文件
// mappedFile.deleteOnExit();
if (mappedFile.exists()) {
mappedFile.delete();
}
log.info("删除文件 {} 结果:{}", mappedFile.getName(), !mappedFile.exists());
}

/**
* 写一页数据
*
* @param page
* @param data
* @return
*/
public void write(Integer page, byte[] data) {
if (page > totalPage) {
throw new IndexOutOfBoundsException("超过总页数,写失败");
}
if (data.length > pageSize) {
throw new IndexOutOfBoundsException("写入内容超过一页容量");
}
Integer fileIndex = getFileIndex(page);
Integer pageIndex = getPageIndex(page);
ByteBuffer slice = mappedByteBufferList.get(fileIndex).slice();
slice.position(pageIndex * pageSize);
int length = data.length;
slice.put(ByteUtil.intToBytes(length)); // 一定是4个字节
slice.put(data);
slice.put(LINE_SEPARATOR_BYTE);
}

/**
* 读一页数据
*
* @param page
* @return
*/
public byte[] read(Integer page) {
if (page > totalPage) {
throw new IndexOutOfBoundsException("超过总页数,读失败");
}
Integer fileIndex = getFileIndex(page);
Integer pageIndex = getPageIndex(page);
ByteBuffer slice = mappedByteBufferList.get(fileIndex).slice();
slice.position(pageIndex * pageSize);
byte[] length = new byte[4];
slice.get(length);
int dataLength = ByteUtil.bytesToInt(length);
if (dataLength == 0) {
return new byte[0];
}
slice.position(pageIndex * pageSize + 4);
byte[] data = new byte[dataLength];
slice.get(data);
return data;
}


/**
* 获取所在映射的下标
*
* @param page
* @return
*/
public Integer getFileIndex(Integer page) {
return Math.toIntExact(page / pagePerFile);
}

/**
* 获取映射对应的行数偏移量
*
* @param page
* @return
*/
public Integer getPageIndex(Integer page) {
return Math.toIntExact(page % pagePerFile);
}


/**
* copy from FileChannelImpl#unmap(私有方法)
* 解除map
*/
public void unmap() {
lock.lock();
try {
for (MappedByteBuffer mappedByteBuffer : mappedByteBufferList) {
if (mappedByteBuffer == null) {
return;
}
mappedByteBuffer.force();
Cleaner cl = ((DirectBuffer) mappedByteBuffer).cleaner();
if (cl != null) {
cl.clean();
}
}
} finally {
lock.unlock();
}
}

}

面向上层的代码涉及到一些公司业务,所以不贴了。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!