多进程通信之基于mmap的共享内存和文件锁

一、 背景

在公司项目开发中存在多机器人并行运行的场景,每个机器人都是一个进程, 每个进程都是无亲缘关系的。每个机器人根据指令的不同都可能会在运行过程中下去访问临界资源。众所周知,临界资源是指一次仅能使一个进程访问的资源,而访问临界资源对应的那段代码端即临界区,就必须互斥访问,因此加锁成了处理并发、并行的常用手段。要实现无亲缘的多进程之间的通信,就需要借助于中间件的锁,我一开始想到两种方法,一种是基于分布式锁+消息队列,第二种是基于文件锁。由于产品是位于客户端的,而Python处理的程序也在客户端处理,所以第一种方案目前行不通,只能采用第二种方法,当然还有更好的方法。采用文件锁来实现多进程互斥访问临界资源是非常简单易行的,但是开发软件,光解决是不够的,还需要考虑有没有更优化的方案,如果有,优化多少,该怎么优化。因此这篇笔记记录基于mmap的内存映射的方法+文件排他锁的方法实现无亲缘多进程之间的通信的解决方案。

注:可能很多人会疑惑,使用语言本身自带的库也是可以实现多进程通信的,这里我说明一下,本篇笔记是基于无亲缘关系的进程之间通信,而非父子进程之间的通信,所以前者无法使用例如multiprocessing一类的进程锁。

二 、基于mmap的内存映射的概念和原理

1、mmap内存映射的概念

mmap是内存映射文件的一种方式,即将一个文件或者其他对象映射到内存空间上的某一块,实现文件物理地址和进程虚拟空间中的某一块的虚拟地址之间的映射关系,进程可以通过文件指针直接读写目标内存块,而操作系统会自动回写数据到磁盘上,这样就可以无需使用read,write等系统调用。同样,内核空间中的修改也可以反应到用户空间,被其他进程访问,从而实现多进程之间的数据通信。

2.mmap内存映射的原理

mmap内存映射的实现过程可以分为三个阶段:

第一阶段:在虚拟地址空间中为映射创建虚拟映射区域

(1).进程在用户态下调用mmap库函数,在其进程虚拟地址空间申请一段连续的虚拟地址空间(虚拟地址是可以连续的, 但物理地址是随机分配的)。

(2).在申请的连续的虚拟地址空间构造vm_area_struct结构,并对结构体进行初始化操作。

(3).初始化完成后,将该结构以加入到进程虚拟地址链表或者树中。

第二阶段:调用内核态下的mmap函数,实现文件物理地址和进程虚拟地址空间的映射

(1).为映射分配完虚拟映射区域后,根据待映射的文件指针在文件描述符表中找到对应的文件描述符,通过文件描述符,链接到内核中已打开文件集中文件结构体。文件结构体中包含了该文件的相关信息。

(2).内核函数mmap通过文件结构体里相关信息定位到文件的物理地址。

(3).建立页表,实现文件物理地址和进程虚拟地址空间的映射关系。

第三阶段:进程访问文件时,产生缺页异常,从磁盘拷贝数据到内存

(1).在完成文件物理地址和进程虚拟地址空间的映射后,数据此时还并没有发生拷贝。

(2).当进程读写虚拟地址空间中的页表时,通过搜索页表发现数据并不在映射的物理内存上,因此会产生缺页异常。

(3).操作系统通过一系列的检测,发现内存页确实不在,则内核请求调页过程。

(4).请求调页过程首先回去访问swap交换区,如果内存页存在交换区,则直接从交换区拿内存页到内存,swap没有的话,再去将所缺少的页从磁盘拷贝到内存。

(5).这样第一次拷贝完成后,其他进程访问的时候,可以直接读写内存上的文件数据。如果写操作改变了内存空间上的内容,则一段时间后,操作系统会将脏页面回写到磁盘上,当然也可以调用相关函数立即写回。

三 、 mmap内存映射和普通读写文件的优势

在学习了mmap的概念和原理后,归根到底mmap是实现了虚拟地址空间、内存地址空间、文件物理地址三者之间的映射关系,进程映射关系直接通过读写内存地址,将脏页写回磁盘,来实现多进程之间的通信。那么相比于普通读写文件又有哪些优势呢?在查阅了相关资料Linux下读写文件的原理后,得到以下的比较。至于Linux下读写文件的原理,我这里就不做额外说明,附上链接,前去学习:https://www.cnblogs.com/huxiao-tee/p/4657851.html

mmap的优势:

(1).对文件的读写操作跨过了页缓存,减少了内存的拷贝次数,用内存读写替代I/O读写,提高文件的读写效率。

(2).基于mmap内存共享读写文件只需要一次从磁盘–>内存,内存–>磁盘的数据拷贝,而普通的读写文件因为存在页缓存需要两次数据拷贝。

(3).能够满足父子进程和无亲缘进程之间的通信,每个进程可以将自身虚拟地址空间映射到同一块内存空间及文件地址上。通过读写内存空间,实现多进程之间的通信。

(4).当文件数据量很大的时候,内存撑不起多次读写大数据的文件时,面临用磁盘空间替代内存空间时,可以考虑使用mmap机制。

四 、文件锁

在使用了基于mmap的内存共享后,将不同进程的虚拟地址空间映射到同一内存块上,此时被映射到的内存空间变成了临界资源,当涉及到多个进程写内存块时,为了保证数据的同步不出错,要对写操作加锁,即当某个进程写内存时,其他进程要等待,直至释放锁。

由于项目中多个进程共享内存后,不仅仅是写操作,还涉及到访问其他的临界资源,例如鼠标的移动,窗口切换以及需要涉及原子性的操作等等。因此文件锁更多的是承担多进程访问临界资源时的同步处理角色。

注: Python的文件锁在Linux和windows下基于不同的库实现,在Linux下采用fcntl。在windows下基于微软提供的msvcrt.dll库函数。网上很多博客中都是默认在Linux下使用fcntl,但都没说清楚。windows下Python可以使用msvcrt库,如果为了更方便跨平台,基于其他人的肩膀上开发,我推荐filelock这个库,通过pip install filelock 安装

以下是我基于filelock+mmap内存共享实现的多进程通信类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""
实现基于mmap的内存共享机制的锁机制
file -> memory

example:
instance = MemoryShareLock()
with instance as f:
doing something

上下文中操作都是原子性的, 因此勿将大段代码放到上下文环境中, 只考虑并发过程可能会导致数据不一致的情况下。

***注意:
1.开发中, 尽量不要让一个进程同时拥有多把锁, 如果非要同时具备多把锁, 请自行处理好解锁时机, 以防出现死锁!
2.加锁和解锁过程请务必在同一个指令中完成!
***

"""

import functools
import typing as t
import os
import mmap
from util.fileimp import mmap_path
from filelock import FileLock


def file_lock(func: t.Callable):
"""
基于共享内存的文件锁装饰器, 锁函数体, 增加加锁后统一预处理, 解锁前统一后处理。
1.如果能某进程拿到针对指定文件描述符的锁, 则往下执行; 没拿到排它锁的进程将阻塞, 直到拿到锁, 向下执行。
2.锁住被修饰的函数整体。
3.基于共享内存实现文件读写, 因此定义prefix和suffix的函数中第一个参数必须看做一个mmap对象。
对文件的操作基于mmap进行,而不是普通的文件对象。

用法:
share_lock = MemoryShareLock(filename='test.txt', length=10, tag_name='共享内存涉及的说明')

1.first example:
@file_lock
def func(instance):
doing something...

func(share_lock)


2.second example:
def prefix(mmap, *args, **kwargs):
print('加锁后, 预处理函数')

def suffix(mmap, *args, **kwargs):
print('解锁前, 后处理函数')

@file_lock
def func(instance):
doing something...

func(share_lock, prefix=prefix, suffix=prefix, prefix_args=('name', 'age'),
suffix_args=('name', 'age'), prefix_kwargs={"hobby":"swim"},
suffix_kwargs={"love":"person"})

"""

@functools.wraps(func)
def inner(
instance: t.Any,
prefix: t.Optional[t.Callable] = None,
suffix: t.Optional[t.Callable] = None,
prefix_args: t.Optional[t.Tuple] = None,
suffix_args: t.Optional[t.Tuple] = None,
prefix_kwargs: t.Optional[t.Dict] = None,
suffix_kwargs: t.Optional[t.Dict] = None
):
file_obj = open(instance.filename, 'r+', encoding='utf-8')
mmap_obj = mmap.mmap(file_obj.fileno(), instance.length, tagname=instance.tag_name)
lock = FileLock(f'{instance.filename}.lock', timeout=instance.timeout)
with lock.acquire():
if prefix:
prefix(mmap_obj, *prefix_args, **prefix_kwargs)
func(instance)
if suffix:
suffix(mmap_obj, *suffix_args, **suffix_kwargs)

return inner


def inject_dependence(func):
"""
依赖注入装饰器
"""

@functools.wraps(func)
def inner(instance, *args, **kwargs):
dependence: str = func.__name__
func_name = dependence.split('_')[0]
if dependence.split('_')[0]:
_func = getattr(instance, f'{func_name}')
_args = getattr(instance, f'{func_name}_args') or ()
_kwargs = getattr(instance, f'{func_name}_kwargs') or {}
if _func:
_func(*_args, **_kwargs)
func(instance, *args, **kwargs)

return inner


class MemoryShareLock(object):
"""
内存共享类
1.使用__slots__减少因多次读取共享内存而带来过多额外实例对象的属性空间的内存靠小。
2.不能仅仅对打开文件加锁, 而应该对内存读写 + 刷到磁盘的这一过程加锁,
确保内存中的数据不被多个进程同时污染。
3.操作同一块 (文件-> 内存的映射) 确保为单例。
4.生成锁请在此文件末尾生成。
5.继承MemoryShareLock的锁类, 应当提供自定义的pre_treatment, after_treatment函数, 并使用
依赖注入装饰器装饰。
6.实现加锁后预处理和解锁前后处理的依赖注入。
7.允许手动添加依赖, *注意: 由于实例全局唯一, 因此属于该实例的依赖也全局唯一。

技术说明:
相比于直接采用文件的排斥锁来说, 使用基于mmap的内存共享 + 排斥锁, 读写文件只需一次数据拷贝,
磁盘 -> 内存 and 内存 -> 磁盘。而普通的读写文件时会加入提高读写效率, 保护磁盘的页缓存机制, 缺点会
导致读写文件各两次数据拷贝。

"""

__slots__ = (
'filename',
'length',
'tag_name',
'mmap_obj',
'file_obj',
'timeout',
'lock',
'prefix',
'suffix',
'prefix_args',
'suffix_args',
'prefix_kwargs',
'suffix_kwargs',
)

map_instance = {}

def __enter__(self):
file_obj = open(self.filename, 'w+', encoding='utf-8')
mmap_obj = mmap.mmap(file_obj.fileno(), self.length, tagname=self.tag_name)
lock = FileLock(f'{self.filename}.lock', timeout=self.timeout)
setattr(self, 'mmap_obj', mmap_obj)
setattr(self, 'lock', lock)
lock.acquire()
self.prefix_treatment()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
mmap_obj = getattr(self, 'mmap_obj')
lock = getattr(self, 'lock')
self.suffix_treatment()
mmap_obj.flush()
mmap_obj.close()
lock.release()

def __new__(
cls,
filename: str,
*args,
**kwargs
):
if not cls.map_instance.setdefault(filename, None):
instance = super().__new__(cls)
cls.map_instance[filename] = instance
return cls.map_instance.get(filename)

def __init__(
self,
filename: str,
length: int,
tag_name: str,
timeout: int = -1,
prefix: t.Optional[t.Callable] = None,
suffix: t.Optional[t.Callable] = None,
prefix_args: t.Optional[t.Tuple] = None,
suffix_args: t.Optional[t.Tuple] = None,
prefix_kwargs: t.Optional[t.Dict] = None,
suffix_kwargs: t.Optional[t.Dict] = None
) -> None:
"""
初始化对象
"""
self.filename = os.path.join(mmap_path, filename)
self.length = length
self.tag_name = tag_name
self.timeout = timeout
self.prefix = prefix
self.suffix = suffix
self.prefix_args = prefix_args
self.suffix_args = suffix_args
self.prefix_kwargs = prefix_kwargs
self.suffix_kwargs = suffix_kwargs

@inject_dependence
def prefix_treatment(self):
"""
预处理
读写 多进程共享的内存块, 进/线程安全的操作
"""
mmap_obj: mmap.mmap = getattr(self, 'mmap_obj')
mmap_obj.seek(0)
mmap_obj.write("1".encode(encoding='utf-8'))

@inject_dependence
def suffix_treatment(self):
"""
后处理
读写 多进程共享的内存块, 进/线程安全的操作
"""
mmap_obj: mmap.mmap = getattr(self, 'mmap_obj')
mmap_obj.seek(0)
mmap_obj.write("0".encode(encoding='utf-8'))

def add_suffix_dependence(self, suffix, *args, **kwargs):
"""
增加后处理依赖
"""
self.suffix = suffix
self.suffix_args = args
self.suffix_kwargs = kwargs

def add_prefix_dependence(self, prefix, *args, **kwargs):
"""
增加预处理依赖
"""
self.prefix = prefix
self.prefix_args = args
self.prefix_kwargs = kwargs


class FileWriteLock(MemoryShareLock):
"""
自定义文件锁
"""

@inject_dependence
def pre_treatment(self):
pass

@inject_dependence
def after_treatment(self):
pass


switch_window_lock = MemoryShareLock('switch_window.txt', 10, tag_name='切换窗口锁')
auto_gui_lock = MemoryShareLock('mouse_move.txt', 10, tag_name='鼠标移动锁')
file_write_lock = FileWriteLock('test_write.json', 1000, tag_name='文件写锁')
input_lock = MemoryShareLock('input.txt', 100, tag_name='输入框锁')
js_execute_lock = MemoryShareLock('js.txt', 100, tag_name='js执行锁')