技术服务于业务,选择一种技术要看这个技术是不适合业务需求。
mmap内存映射的文件存储,是一项典型的中间件技术,一般的业务开发是想不到这种做法的。
我们这次的业务场景是,根据上游数据计算出当前状态并保存。上游的推送速度大约是 1.5 万/s,计算结果单条 2kb 左右,而且有 5 台这样的服务器。
这是一个典型的计算密集型系统。同时需要预存大量用于计算的数据。计算结果具备时间相关性,一般只会更新最后一条数据。
为什么不存redis?
- 性能
redis的qps 在十万左右。5台形态服务器,均值QPS在1万3,峰值按2倍计算,是超过10万的。当然可以使用批处理来优化,但是这里还是会有不少的网络io开销,攒批的话,本地势必也需要多用很多内存来存储数据。
- 代码复杂度,每一个要查形态的服务,都需要再接入一个redis的配置。
- 内存很贵,但是硬盘很便宜
- 带宽开销
- 模型我不喜欢
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,⽽应通过通信来共享内存。
所以最佳方案其实就是写本地存储,通过对外暴露接口来提供数据。
写本地存储本质上就是要写一个简易的数据库。方案也有很多,
- 写堆内存
- 追加写文件
- mmap 内存映射
写堆内存会消耗大量的内存空间,追加写文件在需要读的时候又会涉及多次内存复制。
那么为什么要使用mmap呢
- 性能
直接使用本地内存,没有IO开销,性能最优。
使用的是堆外内存,减少GC的压力。
如果再能结合sendfile,性能可以再上一个台阶。
- 数据安全
理论上应用重启不会丢数据,除非操作系统宕机,操作系统宕机的概率很小,而且就算宕机,理论上只会丢30秒的数据。
- 内存稳定
5分K实际存储超过5个G的数据,盘前和盘后使用的内存基本一致。这里直接利用了操作系统刷新脏页的机制。
我把一个文件映射到内存,然后给这个文件做区域划分。 1 行是 4 kb,足够承载我们一条 2 kb 的数据,一只股票一条要 78 个点位,就按顺序一路下来。第一支股票 0-78,第二只 79-157,以此类推。
接下来就到了代码编写
有一位大师曾经说过,任何计算机问题都可以通过增加一层来解决。我们写代码也是这样,尽量不要写面条代码,要做好层与层之间的隔离。
写这套代码的时候,我们也是,首先应该写一个面向底层的代码。
对底层来说,最重要的就是 read 和 write 两个方法,参数应该是 byte数组,这样写的代码会更具有通用性,更容易被复用。
面向底层的代码缺点就是不方便使用,那么我们还应该写一个中间层,面向上层应用,同样也是 read 和 write 两个方法,但是入参就是上层所使用的对象了。这样上层如果不想使用这个方案了,也只需要修改中间层的代码,不用改整个业务逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
@Slf4j public class MmapContainer {
private String mappedFileStr; private File mappedFile;
private Long totalByte;
private Long totalPage;
private Integer pageSize = 4 * 1024;
private List<MappedByteBuffer> mappedByteBufferList = new ArrayList<>();
private Long sizePerFile = 1 * 1024 * 1024 * 1024L; private Long pagePerFile = sizePerFile / pageSize;
private final ReentrantLock lock = new ReentrantLock(false);
private static String LINE_SEPARATOR = System.getProperty("line.separator");
private static final byte[] LINE_SEPARATOR_BYTE = LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8);
public MmapContainer(String mappedFileStr, Long totalPage) { this.mappedFileStr = mappedFileStr; this.mappedFile = new File(mappedFileStr); this.totalPage = totalPage; this.totalByte = totalPage * pageSize; }
@SneakyThrows public void init() { int mappedCount = (int) Math.ceil((double) totalPage / pagePerFile); log.info("init: mappedCount: " + mappedCount); for (int i = 0; i < mappedCount; i++) { try (FileChannel fileChannel = FileChannel.open(Paths.get(mappedFile.getPath()), StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE)) { MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, sizePerFile * i, sizePerFile); mappedByteBufferList.add(mappedByteBuffer); } } log.info("init 结束"); }
public void clear() { unmap();
if (mappedFile.exists()) { mappedFile.delete(); } log.info("删除文件 {} 结果:{}", mappedFile.getName(), !mappedFile.exists()); }
public void write(Integer page, byte[] data) { if (page > totalPage) { throw new IndexOutOfBoundsException("超过总页数,写失败"); } if (data.length > pageSize) { throw new IndexOutOfBoundsException("写入内容超过一页容量"); } Integer fileIndex = getFileIndex(page); Integer pageIndex = getPageIndex(page); ByteBuffer slice = mappedByteBufferList.get(fileIndex).slice(); slice.position(pageIndex * pageSize); int length = data.length; slice.put(ByteUtil.intToBytes(length)); slice.put(data); slice.put(LINE_SEPARATOR_BYTE); }
public byte[] read(Integer page) { if (page > totalPage) { throw new IndexOutOfBoundsException("超过总页数,读失败"); } Integer fileIndex = getFileIndex(page); Integer pageIndex = getPageIndex(page); ByteBuffer slice = mappedByteBufferList.get(fileIndex).slice(); slice.position(pageIndex * pageSize); byte[] length = new byte[4]; slice.get(length); int dataLength = ByteUtil.bytesToInt(length); if (dataLength == 0) { return new byte[0]; } slice.position(pageIndex * pageSize + 4); byte[] data = new byte[dataLength]; slice.get(data); return data; }
public Integer getFileIndex(Integer page) { return Math.toIntExact(page / pagePerFile); }
public Integer getPageIndex(Integer page) { return Math.toIntExact(page % pagePerFile); }
public void unmap() { lock.lock(); try { for (MappedByteBuffer mappedByteBuffer : mappedByteBufferList) { if (mappedByteBuffer == null) { return; } mappedByteBuffer.force(); Cleaner cl = ((DirectBuffer) mappedByteBuffer).cleaner(); if (cl != null) { cl.clean(); } } } finally { lock.unlock(); } }
}
|
面向上层的代码涉及到一些公司业务,所以不贴了。