源码分析Python的deque双端队列

一、 背景

我们都知道Python内置的列表list, 它可以模拟栈的操作,可以在O(1)的时间复杂度内在尾部增加和删除元素,它底层实现了动态扩容机制。但是在项目中,设计到BFS相关算法时,使用list, 从头部弹出元素, 将会导致后续的元素依次向前移动,时间复杂度变为O(N),性能变得很差。

Python标准库提供了deque模块—-双端队列,实现在头部删除和插入元素的时间复杂度为O(1),在尾部删除和插入的时间复杂度也为O(1),既可以当队列使用又可以当栈使用,接下来就从底层来研究它的数据结构和相关操作的算法思路。

二 、deque的数据结构

结构体定义为/Modules/_collectionsmodule.c中第82行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#define BLOCKLEN 64
#define CENTER ((BLOCKLEN - 1) / 2)
#define MAXFREEBLOCKS 16

typedef struct BLOCK {
struct BLOCK *leftlink; // 指向左边的内存块
PyObject *data[BLOCKLEN]; // 内存块的空间
struct BLOCK *rightlink; // 指向右边的内存块
} block;

typedef struct {
PyObject_VAR_HEAD // 可变类型头部标识
block *leftblock; // 指向双向链表最左边的节点
block *rightblock; // 指向双向链表最右边的节点
Py_ssize_t leftindex; /* 0 <= leftindex < BLOCKLEN 第一个元素在leftblock中数组的下表 */
Py_ssize_t rightindex; /* 0 <= rightindex < BLOCKLEN 最后一个元素在rightblock中数组的下表 */
size_t state; /* incremented whenever the indices move */
Py_ssize_t maxlen; /* maxlen is -1 for unbounded deques 队列所允许的最大长度 */
Py_ssize_t numfreeblocks; // freeblocks中第一个空闲块的下标
block *freeblocks[MAXFREEBLOCKS]; // 指向空闲的freeblocks数组
PyObject *weakreflist; // 弱引用链表的头指针
} dequeobject;

分析:

deque的底层是由双向链表实现,因此头部和尾部的操作都能在O(1)内完成。但是deque的双向链表和不同的双向链表不同。数据结构中我们学习的双向链表,每个节点表示的是一个元素,而这里每个节点中能够保存64个元素,这样可以有效的节省内存空间的浪费,因为原来一个元素关联两个指针,而现在64个元素才关联两个指针,大大降低了指针相对于数据本身对内存空间的占用率。

根据如上的结构,我粗略的画了一下结构图:

QQ图片20211025133700

图中不仅包含了deque的数据结构,同时我也将weakref的结构体也画进去了。

三 、 deque的操作运算集

1.为block结构体申请空间,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline block *
newblock(dequeobject *deque) {
// 构造链表节点
block *b;
// 分配空间时, 优先从空闲链表中复用空闲节点
if (deque->numfreeblocks) {
deque->numfreeblocks--;
return deque->freeblocks[deque->numfreeblocks];
}
// 空闲链表为空, 申请内存空间
b = PyMem_Malloc(sizeof(block));
if (b != NULL) {
return b;
}
PyErr_NoMemory();
return NULL;
}

分析:

底层在内存空间使用上的优化还是蛮不错的,在我看到的很多源码中,像基本类型,可变序列的类型等,都会使用缓存来避免内存的多次分配和回收。这里的deque->freeblocks指向空闲数组,在申请一块新空间时,首先尝试复用空闲数组中的节点块,如果空闲数组为空,那么就向操作系统申请新的内存空间,大大减少了频繁申请内存的开销。

2.回收block对象,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
static inline void
freeblock(dequeobject *deque, block *b)
// 释放链表节点
{
// 若空闲链表中的空闲块个数小于允许的最大值, 则将b对象添加到空闲链表,否则由操作系统回收内存
if (deque->numfreeblocks < MAXFREEBLOCKS) {
deque->freeblocks[deque->numfreeblocks] = b;
deque->numfreeblocks++;
} else {
PyMem_Free(b);
}
}

分析:

同样,释放链表节点时,会判断空闲链表中空闲块个数是否小于允许的最大值,如果小于,则将b对象添加到空闲数组的指定位置,否则由操作系统回收资源。

3.从右出队deque_pop方法,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static PyObject *
deque_pop(dequeobject *deque, PyObject *unused)
// 从右出队
{
PyObject *item;
block *prevblock;
// 判断队列是否为空
if (Py_SIZE(deque) == 0) {
PyErr_SetString(PyExc_IndexError, "pop from an empty deque");
return NULL;
}
// 找到最右边节点中的数组的最后一个元素
item = deque->rightblock->data[deque->rightindex];
deque->rightindex--;
// 强转, 修改deque对象的元素个数减一
Py_SET_SIZE(deque, Py_SIZE(deque) - 1);
deque->state++;

if (deque->rightindex < 0) {
// deque对象中还有元素, 如果rightindex应该指向前一块的末尾位置,设置rightindex为前驱节点中数组的首位置,回收该块资源(加入到空闲链表中or操作系统回收)
if (Py_SIZE(deque)) {
prevblock = deque->rightblock->leftlink; // 前一个节点
assert(deque->leftblock != deque->rightblock);
freeblock(deque, deque->rightblock);
CHECK_NOT_END(prevblock);
MARK_END(prevblock->rightlink);
deque->rightblock = prevblock; // 将前一个节点块作为队列中最后一个block块
deque->rightindex = BLOCKLEN - 1; // 重新设置最后一个元素的下表位置
} else {
// 删除元素后,deque中没有任何元素,重新定位元素位置,而不是回收节点块
assert(deque->leftblock == deque->rightblock);
assert(deque->leftindex == deque->rightindex+1);
/* re-center instead of freeing a block */
deque->leftindex = CENTER + 1;
deque->rightindex = CENTER;
}
}
return item;
}

分析

该函数的功能是从队列右侧弹出数据。

算法思想:

​ (1).判断队列是否为空,不为空往下走,为空返回NULL。

​ (2).找到队列最后一块block节点中的数组中的最右端元素,赋值给item, rightindex减1, 强转类型,修改deque对象的元素个数减一。

​ (3). 在元素获取后,deque对象中还有元素, 如果rightindex < 0, 应该重定位到前一块的末尾,则回收该块资源(加入到空闲链表中or操作系统回收),同时,修正deque中rightblock指向的前一个节点块, rightindex对应前一块中数组最后一个元素的下标位置。如果此时deque中的大小为0(即deque中只剩一个空的block块),则不再回收该内存块,重新定位leftindex和rightindex为中间位置

4.从左出队deque_popleft函数,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static PyObject *
deque_popleft(dequeobject *deque, PyObject *unused)
// 从头部弹出元素
{
PyObject *item;
block *prevblock;
// 队列为空
if (Py_SIZE(deque) == 0) {
PyErr_SetString(PyExc_IndexError, "pop from an empty deque");
return NULL;
}
assert(deque->leftblock != NULL);
item = deque->leftblock->data[deque->leftindex];
deque->leftindex++;
Py_SET_SIZE(deque, Py_SIZE(deque) - 1);
deque->state++;

if (deque->leftindex == BLOCKLEN) {
// deque对象中还有元素,如果leftindex==BLOCKEN,应指向后继节点个节点中数组首位置,设置leftindex为后继节点中数组的首位置,回收该块资源。
if (Py_SIZE(deque)) {
assert(deque->leftblock != deque->rightblock);
prevblock = deque->leftblock->rightlink;
freeblock(deque, deque->leftblock);
CHECK_NOT_END(prevblock);
MARK_END(prevblock->leftlink);
deque->leftblock = prevblock;
deque->leftindex = 0;
} else {
assert(deque->leftblock == deque->rightblock);
assert(deque->leftindex == deque->rightindex+1);
/* re-center instead of freeing a block */
deque->leftindex = CENTER + 1;
deque->rightindex = CENTER;
}
}
return item;
}

分析:

该函数的功能是从队列左侧弹出数据。

算法思想:

​ (1).判断队列是否为空,不为空往下走,为空返回NULL。

​ (2).找到队列第一块block节点中的数组中的最左端元素,赋值个item, 强转类型, leftindex加1,修改deque对象的元素个数减一。

​ (3). 在元素获取后,如果leftindex==BLOCKEN且deque对象中还有元素, 应指向后继节点中数组首位置,设置leftindex为后继节点中数组的首位置,回收该块资。同时,修正deque中rightblock指向的最后一个节点块即rightindex对应最后一个元素的下标位置。如果此时是deque中的大小为0(即deque中只剩一个空的block块),则不再回收该内存块,重新定位leftindex和rightindex为中间位置

  1. 向右追加元素,deque_append_internal函数,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* 先分配空间,然后加入元素
* 向右追加元素,deque->rightindex在一块block中是递增的
*/
static inline int
deque_append_internal(dequeobject *deque, PyObject *item, Py_ssize_t maxlen)
{
// 如果元素位于数组中的最后一个位置,则从空闲链表或内存中申请一块空间
if (deque->rightindex == BLOCKLEN - 1) {
block *b = newblock(deque);
if (b == NULL)
return -1;
// 双向链表的尾插法,并设置rightindex为最右块的第一个位置,即0,然后赋值
b->leftlink = deque->rightblock;
CHECK_END(deque->rightblock->rightlink);
deque->rightblock->rightlink = b;
deque->rightblock = b;
MARK_END(b->rightlink);
deque->rightindex = -1;
}
Py_SET_SIZE(deque, Py_SIZE(deque) + 1);
deque->rightindex++;
deque->rightblock->data[deque->rightindex] = item;
// 如果超过队列限定的最大长度,则会从左侧弹出数据
if (NEEDS_TRIM(deque, maxlen)) {
PyObject *olditem = deque_popleft(deque, NULL);
// 针对该对象,减少其引用计数值
Py_DECREF(olditem);
} else {
deque->state++;
}
return 0;
}

分析:

算法思想:

​ (1). 如果元素位于数组中的最后一个位置,则从空闲链表或内存中申请一块空间, 使用双向链表的尾插法,并设置rightindex为最右块的第一个位置,即0,然后赋值。

​ (2).如果超过队列限定的最大长度,则会从左侧弹出数据, 针对该对象,减少其引用计数值

6.向左追加元素,deque_appendleft_internal函数,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
* 先分配空间,然后加入元素
* 向左追加元素,deque->leftindex在一块block中是递减的
*/
static inline int
deque_appendleft_internal(dequeobject *deque, PyObject *item, Py_ssize_t maxlen)
{
// 如果元素处于数组的第一个位置,则从空闲链表或内存中申请一块空间
if (deque->leftindex == 0) {
block *b = newblock(deque);
if (b == NULL)
return -1;
// 头插法,并设定leftindex为数组的最后一个位置,然后赋值
b->rightlink = deque->leftblock;
CHECK_END(deque->leftblock->leftlink);
deque->leftblock->leftlink = b;
deque->leftblock = b;
MARK_END(b->leftlink);
deque->leftindex = BLOCKLEN;
}
Py_SET_SIZE(deque, Py_SIZE(deque) + 1);
deque->leftindex--;
deque->leftblock->data[deque->leftindex] = item;
// 如果超过队列限定的最大长度,则会从右侧弹出数据
if (NEEDS_TRIM(deque, deque->maxlen)) {
PyObject *olditem = deque_pop(deque, NULL);
Py_DECREF(olditem);
} else {
deque->state++;
}
return 0;
}

分析:

算法思想:

​ (1). 如果元素处于数组的第一个位置,则从空闲链表或内存中申请一块空间, 使用双向链表的头插法,并设定leftindex为数组的最后一个位置,即blocklen-1,然后赋值。

​ (2).如果超过队列限定的最大长度,则会从右侧弹出数据, 针对该对象,减少其引用计数值

四、总结

  1. 通过源码分析,deque的数据结构也很明了了,双向链表在底层中用的地方蛮多的,不止这里,向垃圾回收机制,缓存中也是用的双向链表

来存储数据。

  1. deque中在初始化队列时,设定leftindex和rightindex为数组的中间位置,使得数据均匀分布,使得从左插入和从右插入带来内存块的分配的频度尽可能趋于相同。
  2. deque中每个节点中存储64个元素,减少了指针大小对内存空间的消耗,更加节省空间。