0%

TCMalloc的实现细节

转载自https://zhuanlan.zhihu.com/p/51433411仅作学习使用

算法概览一节涉及到了很多概念,比如Page,Span,Size Class,PageHeap,ThreadCache等,但只是粗略的一提,本节将详细讨论这些概念所涉及的实现细节。

Page

Page是TCMalloc管理内存的基本单位(这里的page要区分于操作系统管理虚拟内存的page),page的默认大小为8KB,可在configure时通过选项调整为32KB或64KB。

1
./configure <other flags> --with-tcmalloc-pagesize=32 (or 64)

page越大,TCMalloc的速度相对越快,但其占用的内存也会越高。简单说,就是空间换时间的道理。默认的page大小通过减少内存碎片来最小化内存使用,但跟踪这些page会花费更多的时间。使用更大的page则会带来更多的内存碎片,但速度上会有所提升。官方文档给出的数据是在某些google应用上有3%~5%的速度提升。

PageID

TCMalloc并非只将堆内存看做是一个个的page,而是将整个虚拟内存空间都看做是page的集合。从内存地址0x0开始,每个page对应一个递增的PageID,如下图(以32位系统为例):

img

对于任意内存地址ptr,都可通过简单的移位操作来计算其所在page的PageID:

1
2
static const size_t kPageShift  = 13; // page大小:1 << 13 = 8KB
const PageID p = reinterpret_cast<uintptr_t>(ptr) >> kPageShift;

即,ptr所属page的PageID为ptr / page_size

Span

一个或多个连续的Page组成一个Span(a contiguous run of pages)。TCMalloc以Span为单位向系统申请内存。

img

如图,第一个span包含2个page,第二个和第四个span包含3个page,第三个span包含5个page。

一个span记录了起始page的PageID(start),以及所包含page的数量(length)。

一个span要么被拆分成多个相同size class的小对象用于小对象分配,要么作为一个整体用于中对象或大对象分配。当作用作小对象分配时,span的sizeclass成员变量记录了其对应的size class。

span中还包含两个Span类型的指针(prev, next),用于将多个span以链表的形式存储。

span的三种状态

一个span处于以下三种状态中的一种:

  • IN_USE
  • ON_NORMAL_FREELIST
  • ON_RETURNED_FREELIST

IN_USE比较好理解,正在使用中的意思,要么被拆分成小对象分配给CentralCache或者ThreadCache了,要么已经分配给应用程序了。因为span是由PageHeap来管理的,因此即使只是分配给了CentralCache,还没有被应用程序所申请,在PageHeap看来,也是IN_USE了。

ON_NORMAL_FREELISTON_RETURNED_FREELIST都可以认为是空闲状态,区别在于,ON_RETURNED_FREELIST是指span对应的内存已经被PageHeap释放给系统了(在Linux中,对于MAP_PRIVATE|MAP_ANONYMOUS的内存使用madvise来实现)。需要注意的是,即使归还给系统,其虚拟内存地址依然是可访问的,只是对这些内存的修改丢失了而已,在下一次访问时会导致page fault以用0来重新初始化。

空闲对象链表

被拆分成多个小对象的span还包含了一个记录空闲对象的链表objects,由CentralFreeList来维护。

对于新创建的span,将其对应的内存按size class的大小均分成若干小对象,在每一个小对象的起始位置处存储下一个小对象的地址,首首相连:

img

但当span中的小对象经过一系列申请和回收之后,其顺序就不确定了:

img

可以看到,有些小对象已经不再空闲对象链表objects中了,链表中的元素顺序也已经被打乱。

空闲对象链表中的元素乱序没什么影响,因为只有当一个span的所有小对象都被释放之后,CentralFreeList才会将其还给PageHeap。

PageMap

PageMap之前没有提到过,它主要用于解决这么一个问题:给定一个page,如何确定这个page属于哪个span?

即,PageMap缓存了PageID到Span的对应关系。

32位系统、x86-64、arm64使用两级PageMap,以32位系统为例:

img

在root_数组中包含512个指向Leaf的指针,每个Leaf又是1024个void*的数组,数组索引为PageID,数组元素为page所属Span的指针。一共$2^{19}$个数组元素,对应32位系统的$2^{19}$个page。

使用两级map可以减少TCMalloc元数据的内存占用,因为初始只会给第一层(即root_数组)分配内存(2KB),第二层只有在实际用到时才会实际分配内存。而如果初始就给$2^{19}$个page都分配内存的话,则会占用$2^{19} * 4 bytes = 2MB$的内存。

Size Class

TCMalloc将每个小对象的大小(1B~256KB)分为85个类别(官方介绍中说是88个左右,但我个人实际测试是85个,不包括0字节大小),称之为Size Class,每个size class一个编号,从0开始递增(实际编号为0的size class是对应0字节,是没有实际意义的)。

举个例子,896字节对应编号为30的size class,下一个size class 31大小为1024字节,那么897字节到1024字节之间所有的分配都会向上舍入到1024字节。

SizeMap::Init()实现了对size class的划分,规则如下:

划分跨度

  • 16字节以内,每8字节划分一个size class。
  • 满足这种情况的size class只有两个:8字节、16字节。
  • 16~128字节,每16字节划分一个size class。
  • 满足这种情况的size class有7个:32, 48, 64, 80, 96, 112, 128字节。
  • 128B~256KB,按照每次步进(size / 8)字节的长度划分,并且步长需要向下对齐到2的整数次幂,比如:
  • 144字节:128 + 128 / 8 = 128 + 16 = 144
  • 160字节:144 + 144 / 8 = 144 + 18 = 144 + 16 = 160
  • 176字节:160 + 160 / 8 = 160 + 20 = 160 + 16 = 176
  • 以此类推

一次移动多个空闲对象

ThreadCache会从CentralCache中获取空闲对象,也会将超出限制的空闲对象放回CentralCache。ThreadCache和CentralCache之间的对象移动是批量进行的,即一次移动多个空闲对象。CentralCache由于是所有线程公用,因此对其进行操作时需要加锁,一次移动多个对象可以均摊锁操作的开销,提升效率。

那么一次批量移动多少呢?每次移动64KB大小的内存,即因size class而异,但至少2个,至多32个(可通过环境变量TCMALLOC_TRANSFER_NUM_OBJ调整)。

移动数量的计算也是在size class初始化的过程中计算得出的。

一次申请多个page

对于每个size class,TCMalloc向系统申请内存时一次性申请n个page(一个span),然后均分成多个小对象进行缓存,以此来均摊系统调用的开销

不同的size class对应的page数量是不同的,如何决定n的大小呢?从1个page开始递增,一直到均分成若干小对象后所剩的空间小于span总大小的1/8为止,因此,浪费的内存被控制在12.5%以内。这是TCMalloc减少内部碎片的一种措施。

另外,所分配的page数量还需满足一次移动多个空闲对象的数量要求(源码中的注释是这样说的,不过实际代码是满足1/4即可,原因不明)。

合并操作

在上述规则之上,还有一个合并操作:TCMalloc会将相同page数量,相同对象数量的相邻的size class合并为一个size class。比如:

img

第30个size class的对象大小是832字节,page数量为1个,因此包含8192 / 832 = 9个小对象。

第31个size class对应的page数量(1个)和对象数量(9个)跟第30个size class完全一样,因此第30个size class和第31个size class合并,所以第30个size class对应的对象大小为896字节。

下一个size class对应的对象大小为1024字节,page数量为1个,因此对象数量是8个,跟第30个size class的对象数量不一样,无法合并。

最终,第30个size class对应的对象大小为896字节。

记录映射关系

由以上划分规则可以看到,一个size class对应:

  • 一个对象大小
  • 一个申请page的数量
  • 一个批量移动对象的数量

TCMalloc将size class与这些信息的映射关系分别记录在三个以size class的编号为索引的数组中(class_to_size_num_objects_to_move_class_to_pages_)。

还有一项非常重要的映射关系:小对象大小到size class编号的映射。TCMalloc将其记录在一个一维数组class_array_中。

256KB以内都是小对象,而size class的编号用一个字节就可以表示,因此存储小对象大小对应的size class编号需要256K个unsigned char,即256KB的内存。但由于size class之间是有间隔的(1024字节以内间隔至少8字节,1024以上间隔至少128字节),因此可以通过简单的计算对class_array_的索引进行压缩,以减少内存占用。

给定大小s,其对应的class_array_索引计算方式如下:

1
2
3
4
5
6
7
8
9
// s <= 1024
static inline size_t SmallSizeClass(size_t s) {
return (static_cast<uint32_t>(s) + 7) >> 3;
}

// s > 1024
static inline size_t LargeSizeClass(size_t s) {
return (static_cast<uint32_t>(s) + 127 + (120 << 7)) >> 7;
}

当s = 256KB时,计算结果即为class_array_的最大索引2169,因此数组的大小为2170字节。

计算任意内存地址对应的对象大小

当应用程序调用free()delete释放内存时,需要有一种方式获取所要释放的内存地址对应的内存大小。结合前文所述的各种映射关系,在TCMalloc中按照以下顺序计算任意内存地址对应的对象大小:

  • 计算给定地址计所在的PageID(ptr >> 13)
  • 从PageMap中查询该page所在的span
  • span中记录了size class编号
  • 根据size class编号从class_to_size_数组中查询对应的对象大小

这样做的好处是:不需要在内存块的头部记录内存大小,减少内存的浪费

小结

size class的实现中有很多省空间省时间的做法:

  • 省空间
  • 控制划分跨度的最大值(8KB),减少内部碎片
  • 控制一次申请page的数量,减少内部碎片
  • 通过计算和一系列元数据记录内存地址到内存大小的映射关系,避免在实际分配的内存块中记录内存大小,减少内存浪费
  • 两级PageMap或三级PageMap
  • 压缩class_array_
  • 省时间
  • 一次申请多个page
  • 一次移动多个空闲对象

PageHeap

前面介绍的都是TCMalloc如何对内存进行划分,接下来看TCMalloc如何管理如此划分后的内存,这是PageHeap的主要职责。

TCMalloc源码中对PageHeap的注释:

1
2
3
4
5
6
7
// -------------------------------------------------------------------------
// Page-level allocator
// * Eager coalescing
//
// Heap for page-level allocation. We allow allocating and freeing a
// contiguous runs of pages (called a "span").
// -------------------------------------------------------------------------

空闲Span管理器

如前所述,128page以内的span称为小span,128page以上的span称为大span。PageHeap对于这两种span采取了不同的管理策略。小span用链表,而且每个大小的span都用一个单独的链表来管理。大span用std::set。

前文没有提到的是,从另一个维度来看,PageHeap是分开管理ON_NORMAL_FREELISTON_RETURNED_FREELIST状态的span的。因此,每个小span对应两个链表,所有大span对应两个set。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// We segregate spans of a given size into two circular linked
// lists: one for normal spans, and one for spans whose memory
// has been returned to the system.
struct SpanList {
Span normal;
Span returned;
};

// Array mapping from span length to a doubly linked list of free spans
//
// NOTE: index 'i' stores spans of length 'i + 1'.
SpanList free_[kMaxPages]; // 128

typedef std::set<SpanPtrWithLength, SpanBestFitLess, STLPageHeapAllocator<SpanPtrWithLength, void> > SpanSet;

// Sets of spans with length > kMaxPages.
//
// Rather than using a linked list, we use sets here for efficient
// best-fit search.
SpanSet large_normal_;
SpanSet large_returned_;

因此,实际的PageHeap是这样的:

img

Heap段的使用限制

可以通过FLAGS_tcmalloc_heap_limit_mb对进程heap段的内存使用量进行限制,默认值0表示不做限制。如果开启了限制并且对heap段内存的使用量接近这个限制时,TCMalloc会更积极的将空闲内存释放给系统,进而会引发更多的软分页错误(minor page fault)。

为了简化讨论,后文均假设没有对heap段的内存使用做任何限制

创建Span

1
2
3
4
// Allocate a run of "n" pages.  Returns zero if out of memory.
// Caller should not pass "n == 0" -- instead, n should have
// been rounded up already.
Span* New(Length n);

创建span的过程其实就是分配中对象和大对象的过程,假设要创建k个page大小的span(以下简称大小为k的span),过程如下:

搜索空闲span

1
Span* SearchFreeAndLargeLists(Length n);
  1. 搜索空闲span链表,按照以下顺序,找出第一个不小于k的span:
  2. 从大小为k的span的链表开始依次搜索
  3. 对于某个大小的span,先搜normal链表,再搜returned链表
  4. 如果span链表中没找到合适的span,则搜索存储大span的set:
  5. 从大小为k的span开始搜索
  6. 同样先搜normal再搜returned
  7. 优先使用长度最小并且起始地址最小的span(best-fit
  8. 如果通过以上两步找到了一个大小为m的span,则将其拆成两个span:
  9. 大小为m - k的span重新根据大小和状态放回链表或set中
  10. 大小为k的span作为结果返回,创建span结束
  11. 如果没搜到合适的span,则继续后面的步骤:向系统申请内存。

小插曲:释放所有空闲内存

1
ReleaseAtLeastNPages(static_cast<Length>(0x7fffffff));

当没有可用的空闲span时,需要向系统申请新的内存,但在此之前,还有一次避免向系统申请新内存的机会:释放所有空闲内存。向系统申请的内存每达到128MB,且空闲内存超过从系统申请的总内存的1/4,就需要将所有的空闲内存释放。

因为TCMalloc将normal和returned的内存分开管理,而这两种内存不会合并在一起。因此,可能有一段连续的空闲内存符合要求(k个page大小),但因为它既有normal的部分,又有returned的部分,因此前面的搜索规则搜不到它。而释放所有内存可以将normal的内存也变为returned的,然后就可以合并了(合并规则详细后文合并span)。

之所以控制在每128MB一次的频率,是为了避免高频小量申请内存的程序遭受太多的minor page fault。

释放完毕后,会按照前面的搜索规则再次尝试搜索空闲span,如果还搜不到,才继续后续的步骤。

向系统申请内存

1
bool GrowHeap(Length n);

找不到合适的空闲span,就只能向系统申请新的内存了。

TCMalloc以sbrk()mmap()两种方式向系统申请内存,所申请内存的大小和位置均按page对齐,优先使用sbrk(),申请失败则会尝试使用mmap()(64位系统Debug模式优先使用mmap,原因详见InitSystemAllocators()注释)。

TCMalloc向系统申请应用程序所使用的内存时,每次至少尝试申请1MBkMinSystemAlloc),申请TCMalloc自身元数据所使用的内存时,每次至少申请8MB(kMetadataAllocChunkSize)。这样做有两点好处:

  • 减少外部内存碎片(减少所申请内存与TCMalloc元数据所占内存的交替)
  • 均摊系统调用的开销,提升性能

另外,当从系统申请的总内存超过128MB时就为PageMap一次性申请一大块内存,保证可以存储所有page和span的对应关系。这一举措可以减少TCMalloc的元数据将内存分块而导致的外部碎片。从源码中可以发现,仅在32位系统下才会这样做,可能是因为64位系统内存的理论上限太大,不太现实。

1
2
3
4
5
// bool PageHeap::GrowHeap(Length n);
if (old_system_bytes < kPageMapBigAllocationThreshold
&& stats_.system_bytes >= kPageMapBigAllocationThreshold) {
pagemap_.PreallocateMoreMemory();
}

sbrk

先来看如何使用sbrk()从Heap段申请内存,下图展示了SbrkSysAllocator::Alloc()的执行流程,为了说明外部碎片的产生,并覆盖到SbrkSysAllocator::Alloc()的大部分流程,假设page大小为8KB,所申请的内存大小为16KB:

img

  1. 假设在申请内存之前,pb(program break,可以认为是堆内存的上边界)指向红色箭头所示位置,即没有在page的边界处。
  2. 第一次sbrk申请16KB内存,因此pb移至绿色箭头所示位置。
  3. 由于需要对申请的内存按page对齐,因此需要第二次sbrk,pb指向蓝色箭头所示位置,page的边界处。
  4. 最终,返回的内存地址为黑色箭头所示位置,黑色和蓝色箭头之间刚好16KB。

可以看出,红色箭头和黑色箭头之间的内存就无法被使用了,产生了外部碎片

mmap

如果使用sbrk申请内存失败,TCMalloc会尝试使用mmap来分配。同样,为了覆盖MmapSysAllocator::Alloc()的大部分情况,下图假设系统的page为4KB,TCMalloc的page为16KB,申请的内存大小为32KB:

img

  1. 假设在申请内存之前,mmap段的边界位于红色箭头所示位置。
  2. 第一次mmap,会在32KB的基础上,多申请(对齐大小 - 系统page大小) = 16 -4 = 12KB的内存。此时mmap的边界位于绿色箭头所示位置。
  3. 然后通过两次munmap将所申请内存的两侧边界分别对齐到TCMalloc的page边界。
  4. 最终申请到的内存为两个蓝色箭头之间的部分,返回左侧蓝色箭头所指示的内存地址。

如果申请内存成功,则创建一个新的span并立即删除,可将其放入空闲span的链表或set中,然后继续后面的步骤。

最后的搜索

最后,重新搜索一次空闲span,如果还找不到合适的空闲span,那就认为是创建失败了。

至此,创建span的操作结束。

删除Span

1
2
3
4
// Delete the span "[p, p+n-1]".
// REQUIRES: span was returned by earlier call to New() and
// has not yet been deleted.
void Delete(Span* span);

当span所拆分成的小对象全部被应用程序释放变为空闲对象,或者作为中对象或大对象使用的span被应用程序释放时,需要将span删除。不过并不是真正的删除,而是放到空闲span的链表或set中。

删除的操作非常简单,但可能会触发合并span的操作,以及释放内存到系统的操作。

合并Span

当span被delete时,会尝试向前向后合并一个span。

合并规则如下:

  • 只有在虚拟内存空间上连续的span才可以被合并。
  • 只有同是normal状态的span或同是returned状态的span才可以被合并。

img

上图中,被删除的span的前一个span是normal状态,因此可以与之合并,而后一个span是returned状态,无法与之合并。合并操作之后如下图:

img

还有一个值得注意的开关:aggressive_decommit_,开启后TCMalloc会积极的释放内存给系统,默认是关闭状态,可通过以下方式更改:

1
MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit", value)

当开启了aggressive_decommit_后,删除normal状态的span时会尝试将其释放给系统,释放成功则状态变为returned。

在合并时,如果被删除的span此时是returned状态,则会将与其相邻的normal状态的span也释放给系统,然后再尝试合并。

因此,上面的例子中,被删除的span和其前一个span都会被更改为returned状态,合并之后如下,即三个span被合并成为一个span:

img

释放span

1
Length ReleaseAtLeastNPages(Length num_pages);

在delete一个span时还会以一定的频率触发释放span的内存给系统的操作(ReleaseAtLeastNPages())。释放的频率可以通过环境变量TCMALLOC_RELEASE_RATE来修改。默认值为1,表示每删除1000个page就尝试释放至少1个page,2表示每删除500个page尝试释放至少1个page,依次类推,10表示每删除100个page尝试释放至少1个page。0表示永远不释放,值越大表示释放的越快,合理的取值区间为[0, 10]。

释放规则:

  • 从小到大循环,按顺序释放空闲span,直到释放的page数量满足需求。
  • 多次调用会从上一次循环结束的位置继续循环,而不会重新从头(1 page)开始。
  • 释放的过程中能合并span就合并span
  • 可能释放少于num_pages,没那么多free的span了。
  • 可能释放多于num_pages,还差一点就够num_pages了,但刚好碰到一个很大的span。

释放方式:

如果系统支持,通过madvise(MADV_DONTNEED)释放span对应的内存。因此,即使释放成功,对应的虚拟内存地址空间仍然可访问,但内存的内容会丢失,在下次访问时会导致minor page fault以用0来重新初始化。

TCMalloc的内存分配算法概览

转载自https://zhuanlan.zhihu.com/p/51432385 仅作学习使用

TCMalloc的官方介绍中将内存分配称为Object Allocation,本文也沿用这种叫法,并将object翻译为对象,可以将其理解为具有一定大小的内存。

按照所分配内存的大小,TCMalloc将内存分配分为三类:

  • 小对象分配,(0, 256KB]
  • 中对象分配,(256KB, 1MB]
  • 大对象分配,(1MB, +∞)

简要介绍几个概念,Page,Span,PageHeap:

与操作系统管理内存的方式类似,TCMalloc将整个虚拟内存空间划分为n个同等大小的Page,每个page默认8KB。又将连续的n个page称为一个Span

TCMalloc定义了PageHeap类来处理向OS申请内存相关的操作,并提供了一层缓存。可以认为,PageHeap就是整个可供应用程序动态分配的内存的抽象。

PageHeap以span为单位向系统申请内存,申请到的span可能只有一个page,也可能包含n个page。可能会被划分为一系列的小对象,供小对象分配使用,也可能当做一整块当做中对象或大对象分配。

小对象分配

Size Class

对于256KB以内的小对象分配,TCMalloc按大小划分了85个类别(官方介绍中说是88个左右,但我个人实际测试是85个,不包括0字节大小),称为Size Class,每个size class都对应一个大小,比如8字节,16字节,32字节。应用程序申请内存时,TCMalloc会首先将所申请的内存大小向上取整到size class的大小,比如18字节之间的内存申请都会分配8字节,916字节之间都会分配16字节,以此类推。因此这里会产生内部碎片。TCMalloc将这里的内部碎片控制在12.5%以内,具体的做法在讨论size class的实现细节时再详细分析。

ThreadCache

对于每个线程,TCMalloc都为其保存了一份单独的缓存,称之为ThreadCache,这也是TCMalloc名字的由来(Thread-Caching Malloc)。每个ThreadCache中对于每个size class都有一个单独的FreeList,缓存了n个还未被应用程序使用的空闲对象。

小对象的分配直接从ThreadCache的FreeList中返回一个空闲对象,相应的,小对象的回收也是将其重新放回ThreadCache中对应的FreeList中。

由于每线程一个ThreadCache,因此从ThreadCache中取用或回收内存是不需要加锁的,速度很快。

为了方便统计数据,各线程的ThreadCache连接成一个双向链表。ThreadCache的结构示大致如下:

img

CentralCache

那么ThreadCache中的空闲对象从何而来呢?答案是CentralCache——一个所有线程公用的缓存。

与ThreadCache类似,CentralCache中对于每个size class也都有一个单独的链表来缓存空闲对象,称之为CentralFreeList,供各线程的ThreadCache从中取用空闲对象。

由于是所有线程公用的,因此从CentralCache中取用或回收对象,是需要加锁的。为了平摊锁操作的开销,ThreadCache一般从CentralCache中一次性取用或回收多个空闲对象。

CentralCache在TCMalloc中并不是一个类,只是一个逻辑上的概念,其本质是CentralFreeList类型的数组。后文会详细讨论CentralCache的内部结构,现在暂且认为CentralCache的简化结构如下:

img

PageHeap

CentralCache中的空闲对象又是从何而来呢?答案是之前提到的PageHeap——TCMalloc对可动态分配的内存的抽象。

当CentralCache中的空闲对象不够用时,CentralCache会向PageHeap申请一块内存(可能来自PageHeap的缓存,也可能向系统申请新的内存),并将其拆分成一系列空闲对象,添加到对应size class的CentralFreeList中。

PageHeap内部根据内存块(span)的大小采取了两种不同的缓存策略。128个page以内的span,每个大小都用一个链表来缓存,超过128个page的span,存储于一个有序set(std::set)。讨论TCMalloc的实现细节时再具体分析,现在可以认为PageHeap的简化结构如下:

img

内存回收

上面说的都是内存分配,内存回收的情况是怎样的?

应用程序调用free()或delete一个小对象时,仅仅是将其插入到ThreadCache中其size class对应的FreeList中而已,不需要加锁,因此速度也是非常快的。

只有当满足一定的条件时,ThreadCache中的空闲对象才会重新放回CentralCache中,以供其他线程取用。同样的,当满足一定条件时,CentralCache中的空闲对象也会还给PageHeap,PageHeap再还给系统。

内存在这些组件之间的移动会在后文详细讨论,现在先忽略这些细节。

小结

总结一下,小对象分配流程大致如下:

  • 将要分配的内存大小映射到对应的size class。

  • 查看ThreadCache中该size class对应的FreeList。

  • 如果FreeList非空,则移除FreeList的第一个空闲对象并将其返回,分配结束。

  • 如果FreeList是空的:

  • 从CentralCache中size class对应的CentralFreeList获取一堆空闲对象。

    • 如果CentralFreeList也是空的,则:
    • 向PageHeap申请一个span。
    • 拆分成size class对应大小的空闲对象,放入CentralFreeList中。
  • 将这堆对象放置到ThreadCache中size class对应的FreeList中(第一个对象除外)。

  • 返回从CentralCache获取的第一个对象,分配结束。

中对象分配

超过256KB但不超过1MB(128个page)的内存分配被认为是中对象分配,采取了与小对象不同的分配策略。

首先,TCMalloc会将应用程序所要申请的内存大小向上取整到整数个page(因此,这里会产生1B~8KB的内部碎片)。之后的操作表面上非常简单,向PageHeap申请一个指定page数量的span并返回其起始地址即可:

1
2
3
Span* span = Static::pageheap()->New(num_pages);
result = (PREDICT_FALSE(span == NULL) ? NULL : SpanToMallocResult(span));
return result;

问题在于,PageHeap是如何管理这些span的?即PageHeap::New()是如何实现的。

前文说到,PageHeap提供了一层缓存,因此PageHeap::New()并非每次都向系统申请内存,也可能直接从缓存中分配。

对128个page以内的span和超过128个page的span,PageHeap采取的缓存策略不一样。为了描述方便,以下将128个page以内的span称为小span,大于128个page的span称为大span。

先来看小span是如何管理的,大span的管理放在大对象分配一节介绍。

PageHeap中有128个小span的链表,分别对应1~128个page的span:

img

假设要分配一块内存,其大小经过向上取整之后对应k个page,因此需要从PageHeap取一个大小为k

个page的span,过程如下:

  • 从k个page的span链表开始,到128个page的span链表,按顺序找到第一个非空链表。
  • 取出这个非空链表中的一个span,假设有n个page,将这个span拆分成两个span:
  • 一个span大小为k个page,作为分配结果返回。
  • 另一个span大小为n - k个page,重新插入到n - k个page的span链表中。
  • 如果找不到非空链表,则将这次分配看做是大对象分配,分配过程详见下文。

大对象分配

超过1MB(128个page)的内存分配被认为是大对象分配,与中对象分配类似,也是先将所要分配的内存大小向上取整到整数个page,假设是k个page,然后向PageHeap申请一个k个page大小的span。

对于中对象的分配,如果上述的span链表无法满足,也会被当做是大对象来处理。也就是说,TCMalloc在源码层面其实并没有区分中对象和大对象,只是对于不同大小的span的缓存方式不一样罢了。

大对象分配用到的span都是超过128个page的span,其缓存方式不是链表,而是一个按span大小排序的有序set(std::set),以便按大小进行搜索。

假设要分配一块超过1MB的内存,其大小经过向上取整之后对应k个page(k>128),或者是要分配一块1MB以内的内存,但无法由中对象分配逻辑来满足,此时k <= 128。不管哪种情况,都是要从PageHeap的span set中取一个大小为k个page的span,其过程如下:

  • 搜索set,找到不小于k个page的最小的span(best-fit),假设该span有n个page。
  • 将这个span拆分为两个span:
  • 一个span大小为k个page,作为结果返回。
  • 另一个span大小为n - k个page,如果n - k > 128,则将其插入到大span的set中,否则,将其插入到对应的小span链表中。
  • 如果找不到合适的span,则使用sbrk或mmap向系统申请新的内存以生成新的span,并重新执行中对象或大对象的分配算法。

小结

以上讨论忽略了很多实现上的细节,比如PageHeap对span的管理还区分了normal状态的span和returned状态的span,接下来会详细分析这些细节。

在此之前,画张图概括下TCMalloc的管理内存的策略:

img

可以看到,不超过256KB的小对象分配,在应用程序和内存之间其实有三层缓存:PageHeap、CentralCache、ThreadCache。而中对象和大对象分配,则只有PageHeap一层缓存。

leveldb 的写入流程

在前面一节中我们讨论了 leveldb 中的 varint 以及 Key 格式,用户所传入的 User Key 将会被封装成 MemTable Key,再加上 User Value 封装成 MemTable Entry 写入到 MemTable 中:

1628668662510

在这一节中,就来分析一下 K-V 是如何被写入到硬盘中的。

1. WriteBatch

include/leveldb/db.h 中的 DB 类提供了 leveldb 对外的抽象 API,该类中所有的函数均为纯虚函数,也就是说,DB 类其实就是一个 Interface。不过,由于 C++ 语法的特殊性,DB 类也可以对纯虚函数进行实现,并且 leveldb 也给出了默认实现。

有两个类实现了 DB 接口,一个是位于 db/db_test.cc 文件中的 ModelDB,另一个则是位于 db/db_impl.cc 中的 DBImpl。很明显,ModelDB 主要用于测试,而 DBImpl 才是 leveldb 实现的核心类。在本节中,我们只需要关注 DBImpl::Put()DBImpl::Write() 这两个方法即可。

首先来看 Put() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct WriteOptions {
WriteOptions() = default;
bool sync = false; /* 写入 WAL 时是否同步调用 fsync() */
};

/* DBImpl 实现的 Put() 方法,本质上还是调用了 leveldb 的默认实现 */
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}

/* 真正的 Put() 方法实现 */
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}

WriteOptions 中只有 sync 一个字段,用于控制当 leveldb 将数据写入到预写日志时,是否同步地调用 fsync() 将内核缓冲区中的数据 flush 到硬盘,其值默认为 false。在生产实践中,这个值一般为 true,尽管为 false 时能够提高 leveldb 的写入速度,但是会有数据丢失的风险。

DB::Put() 方法中,生成了一个 WriteBatch 对象。leveldb 为了提高写入效率,会将多个线程提交的 K-V 写操作打包成一个 WriteBatch 对象,然后进行一次性写入,是一种常见的批量写入优化手段。WriteBatch 的底层数据结构实际上就是一个 std::string,用于承载多个 K-V。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class WriteBatch {
public:
WriteBatch();

void Put(const Slice& key, const Slice& value); /* 添加 K-V */
void Delete(const Slice& key); /* 删除 K-V */
void Append(const WriteBatch& source); /* Copy WriteBatch */

/* Iterate 方法将会遍历 rep_ 中的 K-V,并根据 K-V 中的 Value Type 来决定
* 调用 handler->Put() 或 handler->Delete(),Handler 是一个抽象基类*/
Status Iterate(Handler* handler) const;
private:
friend class WriteBatchInternal;
std::string rep_;
};

因为 DB::Put() 调用了 WriteBatch::Put() 方法,所以现在来看下 WriteBatch::Put() 方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
/* 将 value.size() 进行 varint 编码,写入 dst 中,并且将 value 也追加到 dst 中 */
void PutLengthPrefixedSlice(std::string* dst, const Slice& value) {
PutVarint32(dst, value.size());
dst->append(value.data(), value.size());
}

/* 将 User Key、User Value 以及 Value Type 追加到 rep_ 中 */
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}

可以看到,WriteBatch::Put 只是简单地将 User Key、User Value 以及 Value Type 进行打包,得到一个打包后的“数组”,其内存布局如下图所示:

1628755874963

需要注意的是在 rep_ 的起始位置存在 12 字节的预留位,用于填充这些 K-V 的起始 Sequence Number 以及 Count 计数。将多个 K-V 打包完成以后,将会调用 DBImpl::Write() 方法,正式进入 leveldb 的写入流程。

2. 写入逻辑

在将数据写入预写日志文件之前,需要循环确认 leveldb 的状态,主要包括 MemTable 是否达到最大容量、Level-0 中的文件数是否已经达到某个阈值等。如果 MemTable 已经达到了最大容量,并且此时 Immutable MemTable 仍未 flush 到硬盘时,leveldb 将会等待后台线程完成其相关工作。这些判断均在 MakeRoomForWrite() 方法中进行:

  1. 如果当前 Level-0 层的 SSTable 数量已经达到了阈值 kL0_SlowdownWritesTrigger(默认为 8),则会调用 std::this_thread::sleep_for() 方法延迟 1 毫秒写入,该延迟写入只会进行 1 次。
  2. 如果当前 MemTable 容量没有达到最大大小(write_buffer_size,默认为 4MB)则允许写入,返回 OK 状态的 Status
  3. 如果 MemTable 已达到最大容量,并且 Immutable MemTable 仍存在的话,就需要等待 Compaction 完成。
  4. 如果 Level-0 层的 SSTable 数量已经达到了阈值 kL0_StopWritesTrigger(默认为 12),同样需要等待 Compaction 完成。
  5. 最后,当 MemTable 以达到最大容量,并且 Immutable MemTable 不存在时,就需要将 Memtable 主动地变更为 Immutable MemTable,并初始化一个新的 MemTable 和日志文件,并主动地触发 Minor Compaction,可能会创建一个新的线程执行,同时允许写入。

MakeRoomForWrite() 方法调用返回时,不管之前发生了什么,现在的 MemTable 一定是有剩余容量并且 Level-0 的文件数量大概率小于 4,所以可以进行接下来的写入流程。

而后,我们需要获取最新的 Sequence Number,并将 last_sequence + 1 写入到 write_batch 的起始位置上,而后就可以写预写日志了。

在写完预写日志以后,会根据 options.sync 的值来决定是否调用 fsync() 进行刷盘,成功以后就会将 write_batch 中数据写入至 Skip List 所实现的 MemTable 中。

这个过程在 WriteBatch::Iterate() 方法中进行,遍历 rep_ 中打包好的 K-V,逐一地调用 MemTable::Add() 方法:

1
2
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value);

可以看到,MemTable Entry 所需要的“四元素”均会由外部传入,在 MemTable::Add() 方法内部将会调用相关的编码函数将其打包成 MemTable Entry 的格式插入到 Skip List 中。MemTable Entry 格式具体可见本篇文章开头部分。

最后,leveldb 将会更新全局的 Sequence Number,其值就是原来的 Sequence Number 再加上此次 write_batch 写入的 K-V 数量。

整个写入流程可见下图:

1628835101487

关于预写日志的格式与写入流程将在下篇描述。

leveldb 中的 varint 与 Key 组成

在 leveldb 中,int32 或者是 int64 采用的是变长存储,这一空间优化在 gRPC 中也有使用。其原理就是将原本需要使用 4 字节存储的 int32 或者是 8 字节存储的 int64 根据整数的实际大小使用不同的字节数进行存储。

比如说现在我们有一个 int32,其值为 1024,那么完全可以使用一个 int16 进行存储。但是因为我们需要考虑到存储值的上限,所以必须使用 int32 来实现。变长整数就是为了解决无论多大的数字都需要使用 4 字节 或者 8 字节 进行存储的问题的。

varint 是一种使用一个或多个字节序列化整数的方法,会把整数编码为变长字节。对于 32 位整型经过 varint 编码后需要 1~5 个字节,小的数字使用 1 字节,大的数字使用 5 字节。而 64 位整数根据 varint 编码后需要 1~10 个字节。在实际业务场景中,小整数的使用频率要远超于大整数的使用频率,因此使用 varint 编码能够有效的节省内存和硬盘的存储空间。

1. varint 编码

首先,leveldb 对整数提供了两种类型的编码,一种是 fixedint,另一种则是 varint。其中 fixedint 就是将 uint32_t 或者是 uint64_t 转换成 char *,而 varint 则是对整型进行变长编码,并写入到 char *dst 中。关于编码的实现全部都在 util/coding.cc 文件中。

1.1 fixedint 编码

1
2
3
4
5
6
7
inline void EncodeFixed32(char* dst, uint32_t value) {
uint8_t* const buffer = reinterpret_cast<uint8_t*>(dst);
buffer[0] = static_cast<uint8_t>(value);
buffer[1] = static_cast<uint8_t>(value >> 8);
buffer[2] = static_cast<uint8_t>(value >> 16);
buffer[3] = static_cast<uint8_t>(value >> 24);
}

EncodeFixed32() 的实现非常简单,就是将 value 的每一个字节写入到 dst 中。并且可以看到,value 的低字节被写入到了 buffer 的低地址中。因此,数据存放的方式是按照先低位后高位的顺序存放的,也就是说,leveldb 采用的是小端存储(Little-Endian)。

1.2 varint 编码

对于 varint 编码而言,每一个字节的最高位为保留位,1 表示后面仍有数据,0 则表示当前字节是 varint 的结尾。也就是说,varint 的每一个字节只能使用 7 位,所以当我们有一个 64 位长度的整型需要进行 varint 编码时,必须使用 10 个字节才能表示。这比原来的 8 字节还要多出 2 个字节,所以,varint 并不适合用于大整数占比非常多的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
char* EncodeVarint64(char* dst, uint64_t v) {
/* 每一个字节有效位为 7,最大只能表示 2^7 - 1 = 127 */
static const int B = 128;
uint8_t* ptr = reinterpret_cast<uint8_t*>(dst);

/* 当 v 大于 127 时,说明还需要一个字节来保存 */
while (v >= B) {
*(ptr++) = v | B;
v >>= 7;
}
*(ptr++) = static_cast<uint8_t>(v);
return reinterpret_cast<char*>(ptr);
}

对于 EncodeVarint64() 我们唯一需要注意的是,传入的 dst 指针地址与函数返回 result 指针的地址必然不是同一个地址,result 指针相对于 dst 指针将会有 1~10 字节的偏移量,具体取决于 varint 到底使用了多少个字节。

1
2
3
4
5
6
7
8
int main() {
char *dst = new char[10];
printf("before encode: %p \n", dst);

uint32_t v = 128;
dst = EncodeVarint64(dst, v);
printf("after encode: %p \n", dst);
}

运行上述程序得到的结果为:

1
2
before encode: 0x7fd6c1405880 
after encode: 0x7fd6c1405882

说明 128 在使用 varint 时必须使用 2 字节存储,同时也说明了原来 dst 指针发生了改变。

2. leveldb 中的 Key Format

由于 leveldb 是一个 K-V 存储引擎,并且使用 LSM 这一追加写的数据结构作为底层存储,那么对于 Key 的设计就变得至关重要了。

一方面 Key 需要保存用户所存入的 User Key 信息,另一方面还必须存在一个序号来表示同一个 User Key 的多个版本更新

InnoDB 存储引擎为了实现 MVCC 则是将一个全局递增的 Transaciton ID 写入到 B+Tree 聚簇索引的行记录中。而 leveldb 则是使用一个全局递增的序列号(Sequence Number)写入到 Key 中,以实现 Snapshot 功能,本质上就是 MVCC。

从另一个角度来说,如果某个 DB 支持 MVCC 或者说快照读功能的话,那么在其内部一定存在一个全局递增的序号,并且该序号是必须和用户数据一起被持久化至硬盘中的

最后,当我们使用 Delete() 删除一个 Key 时,实际上并不会找到这条数据并物理删除,而是追加写一条带有删除标志位的 Key。所以我们还需要一个标志位,来表示当前 Key 是否被删除,leveldb 中使用 ValueType 这个枚举类实现。

实际上,User Key、Sequence Number 以及 Value Type 正是组成一个 Key 的必要组件,并且在这些组件之上还会有一些额外的扩展,这些扩展也只是简单地使用 Varint 来记录 User Key 的长度而已。

2.1 InternalKeyParsedInternalKey

InternalKey 本质上就是一个字符串,由 User Key、Sequence Number 以及 Value Type 组成,是一个组合结构。而 ParsedInternalKey 其实就是对 InternalKey 的解析,将 User Key、Sequence Number 以及 Value Type 从 InternalKey 中提取出来,并保存起来。

首先来看 ParsedInternalKey 中字段的定义:

1
2
3
4
5
struct ParsedInternalKey {
Slice user_key;
SequenceNumber sequence;
ValueType type;
};

可以看到,ParsedInternalKey 中保存的正是我们在上面讨论的三个组件。将这三个组件按照一定的方式拼接成一个字符串,也就得到了 InternalKey,过程如下图所示:

8477019af8a69deeeda2b6d647e9aaf8.png

可以看到,leveldb 将 User Key、Sequence Number 以及 Value Type 拼接成 InternalKey 时并不是简单的 Append,而是将 Value Type 揉到了 Sequence Number 的低 8 位中,以节省存储空间。因此,尽管 Sequence Number 被声明为 uint64_t 类型,能够取得的最大值为 (1 << 56) - 1。虽然只有 7 字节的有效位,但是也是够用的。

ParsedInternalKeyInternalKey 之间的转换主要使用两个函数完成,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* 将 ParsedInternalKey 中的三个组件打包成 InternalKey,并存放到 result 中 */
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
/* 将 User Key 添加至 result */
result->append(key.user_key.data(), key.user_key.size());

/* PackSequenceAndType() 其实就是执行 (key.sequence << 8) | key.type */
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}


/* 将 InternalKey 拆解成三个组件并扔到 result 的相应字段中 */
inline bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result) {
const size_t n = internal_key.size();
if (n < 8) return false;
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
uint8_t c = num & 0xff;
result->sequence = num >> 8;
result->type = static_cast<ValueType>(c);
result->user_key = Slice(internal_key.data(), n - 8);
return (c <= static_cast<uint8_t>(kTypeValue));
}

2.2 LookupKeyMemTableKey

当我们查询一个 User Key 时,其查询顺序为 MemTable、Immutable Table 以及位于硬盘中的 SSTable。MemTable 所提供的 Get() 方法需要使用到 LookupKeyLookupKey 可以认为是一个“究极体”,从该对象中我们可以得到所有我们需要的信息,包括 User Key、User Key 的长度、Sequence Number 以及 Value Type。

LookupKey 其实就是在 InternalKey 的基础上,额外的添加了 User Key 的长度,这个长度是由 Varint 进行编码的。因此,程序为了能够正确的找到 User Key 和 Sequence Number 等信息,额外的使用了 3 个指针,如下图所示:

fd62e5aef2a8cf89800887dacf8bf32f.png

Size 的大小为 User Key 的字节数再加上 8,然后通过 EncodeVarint32() 方法进行编码,写入到字符串的头部。同时,对于 LookupKey 来说,其 Value Type 为 kValueTypeForSeek,其实也就是 kTypeValue

在有了这 3 根指针以后,我们就可以非常方便地获取到各种各样的信息了:

1
2
3
4
5
6
7
8
9
10
11
class LookupKey {
public:
/* 可以看到,MemTable Key 和 LookupKey 其实是等价的*/
Slice memtable_key() const { return Slice(start_, end_ - start_); }

/* 返回 InternalKey */
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }

/* User Key 的话需要刨去最后的 (Sequence Number << 8) | Value Type */
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
};

2.3 MemTable Entry

leveldb 使用 Skip List 来实现位于内存中的 MemTable,并提供了 Add() 方法将 Key-Value 写入至 Skip List 中。在 Skip List 的实现中,我们并没有发现 Value 字段,这是因为 leveldb 将 User Key 和 User Value 打包成一个更大的 Key,直接塞到了 Skip List 中,具体实现可见 MemTable::Add() 方法。

3578c2802b38dda5664a79e229defa82.png

leveldb 中常用的数据结构

1. Slice

Slice 是 leveldb 中使用最为频繁的数据结构,不管是 Write Batch,还是构建 SSTable,都需要 Slice 的重度参与。这里的 Slice 并不是 Go 语言中的切片,而是封装的 string 类型。

C++ 中的 std::string 只提供了简单的 push_backpop_back 等方法,诸如 starts_withremove_prefix 等方法都没有提供。因此,leveldb 使用 char * 自行封装了 string 对象,以满足功能需要。

Slice 中的成员变量只有两个,一个是 char 类型的字符指针,另一个则是字符串的长度。此外,Slice 本身并不负责内存分配,只是简单地接收外部传入的指针对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class LEVELDB_EXPORT Slice {
public:
/* 从 std::string 中构建 Slice 对象,s.data() 返回 string 首地址指针 */
Slice(const std::string& s) : data_(s.data()), size_(s.size()) {}

/* 主要用于从字面量中构建 Slice,例如 Slice("leveldb") */
Slice(const char* s) : data_(s), size_(strlen(s)) {}

/* 移除 Slice 的前 n 个字符,只是进行简单的指针运算 */
void remove_prefix(size_t n) {
data_ += n;
size_ -= n;
}

/* 将 Slice 转化为 string 对象 */
std::string ToString() const { return std::string(data_, size_); }

/* 判断 Slice 是否以 Slice.data_ 为前缀 */
bool starts_with(const Slice& x) const {
return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0));
}
private:
const char* data_; /* 字符指针,保存字符串起始地址 */
size_t size_; /* 字符串长度,不包括 '\0' 结尾标志 */
};

Slice 的实现并不复杂,甚至没有提供内存分配机制,只是简单地进行了一个封装,但却是 leveldb 中最为重要的数据结构。

2. Status

在 Web 开发中,由于 HTTP 状态码数量有限,不能够完全地表达出调用 API 的结果,因此通常都会采用自定义 error_code 的方式实现,比如微信公众号的全局错误返回码: 全局返回码说明,一个统一的返回状态能够有效地降低后期运维成本。leveldb 并没有使用 Linux Kernel 的错误返回,而是使用 Status 类来制定统一的返回状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class LEVELDB_EXPORT Status {
private:
enum Code {
kOk = 0,
kNotFound = 1,
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5
};

/* state_ 是分配在堆上的,使用 new char[] 的方式进行分配 */
const char* state_;
};

leveldb 一共定义了 6 种状态,内部使用枚举类实现,每一个返回状态都会有对应的 2 个方法,一个是构造返回状态,另一个则是判断状态。以 kNotFound 为例:

1
2
3
4
5
6
7
/* 通过 msg 构造 NotFound Status,其中 msg2 主要用于存储系统调用时的错误码,也就是 errno */
static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kNotFound, msg, msg2);
}

/* 判断当前 status 对象的状态是否为 NotFound */
bool IsNotFound() const { return code() == kNotFound; }

可以看到,方法名和状态名称之间是强耦合的,也就是说我们无法在不改变 Status 定义的前提下对其进行拓展,算是一个小小的缺点。

简单地看一下 Status 的构造函数实现,通过状态码和错误信息构造 Status 属于私有方法,只能由 NotFound()Corruption()InvalidArgument() 等方法调用,这又限制了我们自行拓展 Status 的能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* msg2 通常用于保存 errno */
Status::Status(Code code, const Slice& msg, const Slice& msg2) {

/* 获取 msg 以及 msg2 的长度 */
const uint32_t len1 = static_cast<uint32_t>(msg.size());
const uint32_t len2 = static_cast<uint32_t>(msg2.size());

/* 如果 len2 不为 0 的话,需要对 msg 和 msg2 做分割,leveldb
* 使用 ": " 这两个字符进行分隔,属于硬编码 */
const uint32_t size = len1 + (len2 ? (2 + len2) : 0);

/* 5 就是 4 字节的 message 长度 + 1 字节的状态码 */
char* result = new char[size + 5];

/* 将 message 长度写入 */
std::memcpy(result, &size, sizeof(size));
/* 将状态码写入 */
result[4] = static_cast<char>(code);
std::memcpy(result + 5, msg.data(), len1);

/* 使用 ": " 作为 msg 和 msg2 的分割线 */
if (len2) {
result[5 + len1] = ':';
result[6 + len1] = ' ';
std::memcpy(result + 7 + len1, msg2.data(), len2);
}
state_ = result;
}

Status 的结构如下图所示,在内部只需要使用 state_[4] 即可获得 Status 的具体状态:

aabb9126b1ef4c05ee68ee609c8c4559.png

3. Skip List

Memory Table 由 Skip List 实现,由于 leveldb 在对 Key 进行修改和删除操作时,采用的是追加方式,因此 Skip List 只需要实现插入和查找两个方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template <typename Key, class Comparator>
class SkipList {
private:
struct Node;
public:
explicit SkipList(Comparator cmp, Arena* arena);

void Insert(const Key& key);

bool Contains(const Key& key) const;
private:
enum { kMaxHeight = 12 }; /* 最大层高 */
Arena* const arena_; /* 内存分配器 */
Node* const head_; /* 虚拟头节点 */
};

Skip List 的核心结构为 Node,其内部采用原子变量来进行指针的相关操作,因此,leveldb 中的 Skip List 是线程安全的,并且使用的是无锁实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <typename Key, class Comparator>
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) {}
Key const key;

/* 原子性地获取当前 Node 的第 n 层的下一个节点指针 */
Node* Next(int n) {
/* std::memory_order_acquire 表示在 load() 之后的所有读写操作,
* 不允许被重排到这个 load() 的前面 */
return next_[n].load(std::memory_order_acquire);
}

/* 原子性地设置当前 Node 的第 n 层的下一个节点指针 */
void SetNext(int n, Node* x) {
/* std::memory_order_release 表示在 store() 之前的所有读写操作,
* 不允许被重排到这个 store() 的后面 */
next_[n].store(x, std::memory_order_release);
}
private:
std::atomic<Node*> next_[1];
};

Skip List 的大致结构如下,和 Redis 中跳跃表不同的是,leveldb 并没有使用 prev 指针,因为根本就用不到:

f29a9a56c6503a18bbf4e02e75b872d1.png

4. LRUCache

为了优化查询效率,leveldb 自行实现了一个标准的 LRU Cache,即哈希表+双向链表,并且 leveldb 选择了自行实现哈希表,并没有使用 std::unordered_map,同时使用了 port::Mutex 来保证 LRU Cache 的并发安全性。port::Mutex 其实就是对 std::mutex 的简单封装。

腾讯面试中被问到一个很有意思的面试题 虚函数的模板能被实例化吗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template<typename T>
class weight
{
public:
void foo1(T a) { ++a; }
virtual void foo2(T b) { int temp = *b; }
};
int main()
{
// 编译失败
weight<int>* par=new weight<int>;
par->foo1(3);
return 0;

}

实验结果是不行的

显然模板虚函数是编译不过的,至于为什么,我们可以深究至C++多态的实现原理,就能知道为什么C++不允许定义模板虚函数了。

我们知道C++的多态是通过虚表实现的,对于含有虚函数的类,会为其定义一个虚表,每个实例化的对象都有一个指向该虚表的指针,所以同样的类,含有虚函数的类的实例大小比不含虚函数的多上一个指针的大小,虚表里为每个虚函数维护着一条跳转记录,这些跳转地址在编译期就被确定了,存放在类定义模块的数据段中,在程序运行期是不可修改的。那么这跟模板虚函数有什么关系呢?

让我们了解一点关于模板的特性,C++对于模版的处理,首先,模版并不算一种类型,在编译时,编译器只对已经实例化的模板类生成对应的模板类代码,假如这些类中定义的有模板类虚函数,则对每个实例化的模板类型创建一个虚表,这就是第一种情况—模板类虚函数,是可行的。

现在再看看模板虚函数,为什么不可行,就拿上面的代码讲:

A是一个类型,它含有模板虚函数,虽然是虚函数,但是函数的符号并不确定,因为我们不知道模板T是一个什么类型,对于从没调用过这个模板函数的情况下,这个模板虚函数甚至都不会实例化,那么就相当于没有虚函数了。那么为了实现模板虚函数,我们姑且认为它就是含有虚函数,所以A应该有一张虚表,但是A的虚函数符号并不确定,要根据当前调用的情况来确定,A的这个模板虚函数到底实例化了几个类型,那么对于每个类型的虚函数都添加一个虚表记录,这样看起来,实现模板虚函数貌似是可行的,但是这也只仅限于单个文件编译成可执行文件的情况下。

我们都知道C++编译中间是有几个步骤的,预处理、编译和链接,每个cpp或c文件都会被编译成目标文件,然后这些目标文件在通过链接生成可执行文件。那么考虑一下这种情况,假如现在我有两个cpp文件分别是x.cpp和y.cpp,上面的模板虚函数,我在x.cpp文件中实例化了

1
2
void foo(int& t);
void foo(float& t);

而在y.cpp中实例化了

1
2
void foo(int& t);
void foo(bool& t);
1
2
3
4
5
那么x.o和y.o中的A类的虚表都含有两天记录,但是函数符号却并不一样,那么为了实现模板虚函数,进行链接的时候就需要对虚表合并去重了,先抛去实现代价的问题,从理论上看起来的确是可行的。

然而事情并不是到此为止了,我们知道目标文件不只是可以链接成可执行文件,还可以链接成静态库和动态库,对于静态库,再进行链接的时候和普通链接差别不大,但是动态库就没有那么好运了。

考虑这样的一种情况,在动态库里面定义了上面的模板函数,而且实例化了
1
void foo(bool& t);

那么此时为了继续下去,就得修改动态链接库中B类的虚表了,为它添加一条记录,很显然是行不通的。至于为什么行不通,抛开程序段的可读写的问题不谈,如果真的可修改,那么这个类型的每个实例都可能会守到其它实例的影响了,与类的设计原则相悖了。

至此为什么模板虚函数为什么行不通已经很明显了。

总结 动态链接库的原因,违反设计

Io uring 简单使用

原理可参考:https://arthurchiao.art/blog/intro-to-io-uring-zh/

io_uring是Linux内核在v5.1引入的一套异步IO接口,和aio不同的是,它可以提供更高的性能

io_uring 具体有三个系统调用

分别是 io_uring_setup, io_uring_enter, io_uring_register,我们可以通过这三个系统调用来完成异步事件提交,收割,自己处理的流程

异步io的优点在于,不用我们自己去等待 io操作的完成,我们只需要告诉内核,我们的任务,内核来帮我们完成。这样就可以让我们的进程去干其他的事情,实现更高的吞吐量。

io_uring 利用 mmap 开辟出一块空间,让用户态和内核态的程序都可以共享的一块区域

io_uring 分为 提交队列 和 完成队列

用户提交的任务放在提交队列中,由内核去处理,处理好的东西会放在完成队列中。

内核如何处理,不是用户关系的问题,内核可以回调、轮询的方式都可以进行处理。

用户只需要设置好就可以使用

由于这三个系统调用要用好并不容易

开发作者也提供了一个liburing来给我们使用

注: 内核版本最好高一点,比如 Linux 5.4 不支持 read,但支持 readv (亲身教训)

除了 io_uring的结构要了解外,我们还需了解两个用到的东西

这两个分别代表了 完成队列和提交队列的一项元素

其中的user_data可以是一个可以由我们进行diy的指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
struct io_uring_cqe {
__u64 user_data; /* sqe->data submission passed back */
__s32 res; /* result code for this event */
__u32 flags;

/*
* If the ring is initialized with IORING_SETUP_CQE32, then this field
* contains 16-bytes of padding, doubling the size of the CQE.
*/
__u64 big_cqe[];
};
复制代码
struct io_uring_sqe {
__u8 opcode; /* type of operation for this sqe */
__u8 flags; /* IOSQE_ flags */
__u16 ioprio; /* ioprio for the request */
__s32 fd; /* file descriptor to do IO on */
union {
__u64 off; /* offset into file */
__u64 addr2;
struct {
__u32 cmd_op;
__u32 __pad1;
};
};
union {
__u64 addr; /* pointer to buffer or iovecs */
__u64 splice_off_in;
};
__u32 len; /* buffer size or number of iovecs */
union {
__kernel_rwf_t rw_flags;
__u32 fsync_flags;
__u16 poll_events; /* compatibility */
__u32 poll32_events; /* word-reversed for BE */
__u32 sync_range_flags;
__u32 msg_flags;
__u32 timeout_flags;
__u32 accept_flags;
__u32 cancel_flags;
__u32 open_flags;
__u32 statx_flags;
__u32 fadvise_advice;
__u32 splice_flags;
__u32 rename_flags;
__u32 unlink_flags;
__u32 hardlink_flags;
__u32 xattr_flags;
__u32 msg_ring_flags;
};
__u64 user_data; /* data to be passed back at completion time */
/* pack this to avoid bogus arm OABI complaints */
union {
/* index into fixed buffers, if used */
__u16 buf_index;
/* for grouped buffer selection */
__u16 buf_group;
} __attribute__((packed));
/* personality to use, if used */
__u16 personality;
union {
__s32 splice_fd_in;
__u32 file_index;
struct {
__u16 addr_len;
__u16 __pad3[1];
};
};
union {
struct {
__u64 addr3;
__u64 __pad2[1];
};
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
__u8 cmd[0];
};
};
复制代码

我们来看两个使用 liburing的简单例子

例子 1

第一个例子诠释了 io_uring 最直接的一个流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 读取文件
**/
#include <bits/stdc++.h>
#include <liburing.h>
#include <unistd.h>

char buf[1024] = {0};

int main() {
int fd = open("1.txt", O_RDONLY, 0);

io_uring ring;
io_uring_queue_init(32, &ring, 0); // 初始化
auto sqe = io_uring_get_sqe(&ring); // 从环中得到一块空位
io_uring_prep_read(sqe, fd, buf, sizeof(buf), 0); // 为这块空位准备好操作
io_uring_submit(&ring); // 提交任务
io_uring_cqe* res; // 完成队列指针
io_uring_wait_cqe(&ring, &res); // 阻塞等待一项完成的任务
assert(res);
std::cout << "read bytes: " << res->res << " \n";
std::cout << buf << std::endl;
io_uring_cqe_seen(&ring, res); // 将任务移出完成队列
io_uring_queue_exit(&ring); // 退出
return 0;
}

复制代码

io_uring 有三个东西

提交队列

完成队列

任务实体

提交队列和完成队列都可以看成持有一项指针

我们得到一个 任务实体,通过 io_uring_prep_read 准备任务 和 io_uring_submit 提交任务

提交任务之后就到了提交队列中去

在提交队列里面,内核操作完以后。

任务就到了完成队列中去。

然后我们可以阻塞等待 io_uring_wait_cqe 一项任务

当然,我们也可以使用非阻塞的方式,去干其他事情

在拿到这一项任务之后,我们就可以对其进行处理,处理完成记得 从完成队列中清除

(至于 完成队列和提交队列是如何高效的且不出错的并发执行 暂且不谈)

例子 2

echo_server

上述 的 io_uring写着还是比较长,我们可以把它封装一下。

比如要用 read的操作,要用accpet的操作,都给他封装一下

同时,我们在写echo_server时,我们是 几个不同的操作

可能是 ACCEPT 操作,可能是 READ 操作, 可能是 WRITE操作

并且READ和WRITE操作都要有自己的缓冲区

所以,我们定义一下我们在任务之间传递的结构体

然后把它放到 user_data 中。

1
2
__u64	user_data;	/* data to be passed back at completion time */
复制代码

结构体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
复制代码

我们对可能用到的操作进行一下封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class IOuring {
io_uring ring;

public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }

~IOuring() { io_uring_queue_exit(&ring); }

void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }

int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }

int submit() { return io_uring_submit(&ring); }

void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}

void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}

void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};

复制代码

整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <arpa/inet.h>
#include <bits/stdc++.h>
#include <liburing.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

const int BUFSIZE = 1024;
struct request {
enum STATE { ACCEPT, READ, WRITE };
int fd;
STATE state;
union {
struct {
sockaddr_in ipv4_addr;
socklen_t lens;
} addr;
char buf[BUFSIZE];
};
};
class IOuring {
io_uring ring;

public:
IOuring(int queue_size) { io_uring_queue_init(queue_size, &ring, 0); }

~IOuring() { io_uring_queue_exit(&ring); }

void seen(io_uring_cqe* cqe) { io_uring_cqe_seen(&ring, cqe); }

int wait(io_uring_cqe** cqe) { return io_uring_wait_cqe(&ring, cqe); }

int submit() { return io_uring_submit(&ring); }

void accpet_asyn(int sock_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::ACCEPT;
body->fd = sock_fd;
body->addr.lens = sizeof(sockaddr_in);
io_uring_prep_accept(sqe, sock_fd, (sockaddr*)&(body->addr.ipv4_addr),
&(body->addr.lens), 0);
io_uring_sqe_set_data(sqe, body);
}

void read_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::READ;
body->fd = client_fd;
io_uring_prep_read(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}

void write_asyn(int client_fd, request* body) {
auto sqe = io_uring_get_sqe(&ring);
body->state = request::WRITE;
body->fd = client_fd;
io_uring_prep_write(sqe, client_fd, body->buf, sizeof(body->buf), -1);
io_uring_sqe_set_data(sqe, body);
}
};

int main() {
/*init socket*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in sock_addr;
sock_addr.sin_port = htons(8000);
sock_addr.sin_family = AF_INET;
sock_addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(sock_fd, (sockaddr*)&sock_addr, sizeof(sock_addr));
perror("");
listen(sock_fd, 10);

std::cout << "listen begin ..." << std::endl;

/*io_uring*/
IOuring ring(1024);

ring.accpet_asyn(sock_fd, new request);
ring.submit();

while (true) {
io_uring_cqe* cqe;
ring.wait(&cqe);
request* res = (request*)cqe->user_data;
switch (res->state) {
case request::ACCEPT:
if (cqe->res > 0) {
int client_fd = cqe->res;
ring.accpet_asyn(sock_fd, res);
ring.read_asyn(client_fd, new request);
ring.submit();
}
std::cout << cqe->res << std::endl;
break;
case request::READ:
if (cqe->res > 0) std::cout << res->buf << std::endl;
ring.write_asyn(res->fd, res);
ring.submit();
break;
case request::WRITE:
if (cqe->res > 0) {
close(res->fd);
delete res;
}
break;
default:
std::cout << "error " << std::endl;
break;
}
ring.seen(cqe);
}

return 0;
}

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
$ hexo generate

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment