Compaction操作是以后台线程的形式进行,本部分主要讨论触发机制、合并策略以及详细流程等。本篇为上篇,主讲触发机制,minor compaction,人工和自动触发合并的前期准备工作等。
Compaction操作入口实际由DBImpl::MaybeScheduleCompaction控制,触发时机也就是看该函数调用的时机,通过查看该函数的引用得出:
1> db启动(DB::Open)时恢复完毕,主线程内主动调用尝试做Compaction
2> 写操作DBImpl::Put/DBImpl::Delete时,检查DBImpl::MakeRoomForWrite,若当前memtable已经写满且没有immutbale memtable,主动触发Compaction
3> 读操作DBImpl::Get时基于seek stats做Compaction(被频繁访问却是无效查询的文件可能需要做Compaction进行平衡level下沉),如果本次Get需要也实际对超过1个sstable文件进行IO操作,记录第一个文件并检查对应的allowed_seeks是否已使用完(减为0),allowed_seeks为0需要进行Compaction。
这里强调下该条件下的检查实现,有点绕:
在当前Version下Get,Version有个std::vector<FileMetaData*> files_[config::kNumLevels]成员变量,包含current Version各层下全部的FileMetaData数据,每次Get用GetStats对应局部变量保存本次Get的读结果状态,GetStats内含有FileMetaData指针指向对应SSTable文件,此处用来更新current全局的文件seek读状态,并对应后续的UpdateStats操作,即作对应allowed_seeks减1,并作触发Compaction的条件检查。
另外,在本篇后面部分会对allowed_seeks的初始值设定做详细解析。
4> 手动调用DBImpl::CompactRange,置manual_compaction_标识,人工触发
下面看DBImpl::MaybeScheduleCompaction的具体流程实现:
void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (bg_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else if (!bg_error_.ok()) { // Already got an error; no more changes } else if (imm_ == NULL && manual_compaction_ == NULL && !versions_->NeedsCompaction()) { // No work to be done } else { bg_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } }
判断流程很简单,当前线程只允许同一时刻仅有一个Compaction任务,由Schedule初始时创建后台线程(仅一次),并插入Compaction任务,任务实际对应DBImpl::BackgroundCall函数:
void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(bg_compaction_scheduled_); if (shutting_down_.Acquire_Load()) { // No more background work when shutting down. } else if (!bg_error_.ok()) { // No more background work after a background error. } else { BackgroundCompaction(); } bg_compaction_scheduled_ = false; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); bg_cv_.SignalAll(); }
后台线程执行BackgroundCall,如若Compaction操作完成,可能需要重新检查是否需要继续做Compaction,Schedule插入任务后立即返回,因此不会有MaybeScheduleCompaction看似无限递归调用的栈溢出。
由上可看出DBImpl::BackgroundCompaction是真正进行Compaction操作的函数,来看具体实现:
void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); if (imm_ != NULL) { CompactMemTable(); return; } Compaction* c; bool is_manual = (manual_compaction_ != NULL); InternalKey manual_end; if (is_manual) { ManualCompaction* m = manual_compaction_; c = versions_->CompactRange(m->level, m->begin, m->end); m->done = (c == NULL); if (c != NULL) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } Log(options_.info_log, "Manual compaction at level-%d from %s .. %s; will stop at %s\n", m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); } else { c = versions_->PickCompaction(); } Status status; if (c == NULL) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast(f->number), c->level() + 1, static_cast(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); if (!status.ok()) { RecordBackgroundError(status); } CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles(); } delete c; if (status.ok()) { // Done } else if (shutting_down_.Acquire_Load()) { // Ignore compaction errors found during shutting down } else { Log(options_.info_log, "Compaction error: %s", status.ToString().c_str()); } if (is_manual) { ManualCompaction* m = manual_compaction_; if (!status.ok()) { m->done = true; } if (!m->done) { // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. m->tmp_storage = manual_end; m->begin = &m->tmp_storage; } manual_compaction_ = NULL; } }
首行mutex_.AssertHeld(),Mutex的AssertHeld函数实现默认为空,在很多函数的实现内有调用,有神马作用呢?stackoverflow上有人对此解释如下:
As you have observed it does nothing in the default implementation. The function seems to be a placeholder for checking whether a particular thread holds a mutex and optionally abort if it doesn’t. This would be equivalent to the normal asserts we use for variables but applied on mutexes.
I think the reason it is not implemented yet is we don’t have an equivalent light weight function to assert whether a thread holds a lock in pthread_mutex_t used in the default implementation. Some platforms which has that capability could fill this implementation as part of porting process. Searching online I did find some implementation for this function in the windows port of leveldb. I can see one way to implement it using a wrapper class over pthread_mutex_t and setting some sort of a thread id variable to indicate which thread(s) currently holds the mutex, but it will have to be carefully implemented given the race conditions that can arise.
个人对这种解释也是持赞同的。好,言归正传。
Compaction首先检查imm_,及时将已写满的memtable写入磁盘sstable文件,来看下CompactMemTable实现:
void DBImpl::CompactMemTable() { mutex_.AssertHeld(); assert(imm_ != NULL); // Save the contents of the memtable as a new Table VersionEdit edit; Version* base = versions_->current(); base->Ref(); Status s = WriteLevel0Table(imm_, &edit, base); base->Unref(); if (s.ok() && shutting_down_.Acquire_Load()) { s = Status::IOError("Deleting DB during memtable compaction"); } // Replace immutable memtable with the generated Table if (s.ok()) { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed s = versions_->LogAndApply(&edit, &mutex_); } if (s.ok()) { // Commit to the new state imm_->Unref(); imm_ = NULL; has_imm_.Release_Store(NULL); DeleteObsoleteFiles(); } else { RecordBackgroundError(s); } }
每做Compaction,不论是minor还是major,sstable文件会发生变化,DB版本也就需要随之更新,因此在compact时需要先获取当前版本的句柄,再做实际的WriteLevel0Table,其将imm_对应跳表遍历写入sstbale,并将新生成的sstable更新入TableCache,即调用BuildTable来完成,如果本次生成的sstable有效,拿新生成的sstable的user_key range基于当前版本选取并写入合适的level即调用 PickLevelForMemTableOutput,如果跟level 0的文件range有重叠则直接加入level 0,否则会根据一定策略推至level 1或者level 2,不会超过kMaxMemCompactLevel(默认值2)。最后再更新对应level的CompactionStats数据统计信息。
来看下PickLevelForMemTableOutput的折中策略,先看下官方对kMaxMemCompactLevel的解释:
// Maximum level to which a new compacted memtable is pushed if it // does not create overlap. We try to push to level 2 to avoid the // relatively expensive level 0=>1 compactions and to avoid some // expensive manifest file operations. We do not push all the way to // the largest level since that can generate a lot of wasted disk // space if the same key space is being repeatedly overwritten. static const int kMaxMemCompactLevel = 2; int Version::PickLevelForMemTableOutput( const Slice& smallest_user_key, const Slice& largest_user_key) { int level = 0; if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) { // Push to next level if there is no overlap in next level, // and the #bytes overlapping in the level after that are limited. InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek); InternalKey limit(largest_user_key, 0, static_cast(0)); std::vector<FileMetaData*> overlaps; while (level < config::kMaxMemCompactLevel) { if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) { break; } if (level + 2 < config::kNumLevels) { // Check that file does not overlap too many grandparent bytes. GetOverlappingInputs(level + 2, &start, &limit, &overlaps); const int64_t sum = TotalFileSize(overlaps); if (sum > kMaxGrandParentOverlapBytes) { break; } } level++; } } return level; }
OverlapInLevel通过调用SomeFileOverlapsRange来计算新sstable文件key range是否与当前level文件列表重叠,level 0的文件key range本身有重叠需要遍历进行比较;而level大于0的文件列表因自身无重叠,可以使用二分查找确定新文件的最小key可能重叠的位置所属sstable,再判断下新文件最大key是否在该位置sstable的最小key之前取非即可。
再来看推至level 1或者level 2的条件,与level + 1层无重叠,与level + 2层重叠的sstable的总文件大小不能超过kMaxGrandParentOverlapBytes(默认20MB),这样就会推至下一层。
kMaxGrandParentOverlapBytes常量定义为10个kTargetFileSize(2MB)大小,控制合并的IO量。
GetOverlappingInputs即查找与level层重叠的文件列表,需要注意的是当level为0时,因level 0自身重叠需要扩大查询范围并重启搜索。
从策略上要尽量将新compact的文件推至高level,毕竟在level 0 需要控制文件过多,compaction IO和查找都比较耗费,另一方面也不能推至过高level,一定程度上控制查找的次数,而且若某些范围的key更新比较频繁,后续往高层compaction IO消耗也很大。
所以PickLevelForMemTableOutput就是个权衡折中。
分析完memtable的compaction,继续看BackgroundCompaction的后续流程,先来看手动调用DBImpl::CompactRange传入ManualCompaction信息并进行compaction的流程:
// Information for a manual compaction struct ManualCompaction { int level; bool done; const InternalKey* begin; // NULL means beginning of key range const InternalKey* end; // NULL means end of key range InternalKey tmp_storage; // Used to keep track of compaction progress }; void DBImpl::CompactRange(const Slice* begin, const Slice* end) { int max_level_with_files = 1; { MutexLock l(&mutex_); Version* base = versions_->current(); for (int level = 1; level < config::kNumLevels; level++) { if (base->OverlapInLevel(level, begin, end)) { max_level_with_files = level; } } } TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); } }
首先计算当前Version下与指定range有重叠的最高level,然后compact memtable,继续从0到此最高level依次对每层调用做compaction,即回到BackgroundCompaction中的代码块:
if (is_manual) { ManualCompaction* m = manual_compaction_; c = versions_->CompactRange(m->level, m->begin, m->end); m->done = (c == NULL); if (c != NULL) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } Log(options_.info_log, "Manual compaction at level-%d from %s .. %s; will stop at %s\n", m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->end ? m->end->DebugString().c_str() : "(end)"), (m->done ? "(end)" : manual_end.DebugString().c_str())); } else { c = versions_->PickCompaction(); }
传入某层ManualCompaction信息,调用VersionSet::CompactRange,来看具体实现:
Compaction* VersionSet::CompactRange( int level, const InternalKey* begin, const InternalKey* end) { std::vector<FileMetaData*> inputs; current_->GetOverlappingInputs(level, begin, end, &inputs); if (inputs.empty()) { return NULL; } // Avoid compacting too much in one shot in case the range is large. // But we cannot do this for level-0 since level-0 files can overlap // and we must not pick one file and drop another older file if the // two files overlap. if (level > 0) { const uint64_t limit = MaxFileSizeForLevel(level); uint64_t total = 0; for (size_t i = 0; i < inputs.size(); i++) { uint64_t s = inputs[i]->file_size; total += s; if (total >= limit) { inputs.resize(i + 1); break; } } } Compaction* c = new Compaction(level); c->input_version_ = current_; c->input_version_->Ref(); c->inputs_[0] = inputs; SetupOtherInputs(c); return c; }
首先计算当前层与指定range重叠的sstable文件列表;对于level大于0时要避免一次compaction太多的文件数量,这里通过控制当前层的文件大小不超过MaxFileSizeForLevel调整文件数量丢弃过多部分,目前该函数写死固定的每层最大文件大小为kTargetFileSize(2MB),而level 0是不需做这个检查,因为文件本身有重叠不可丢弃任一;最后构造本层Compaction对象初始相关基础数据,调用SetupOtherInputs计算下一层需要合并的文件列表,并校正合并的key range,以及尽可能合并更多的本层文件。
看SetupOtherInputs的具体实现:
void VersionSet::SetupOtherInputs(Compaction* c) { const int level = c->level(); InternalKey smallest, largest; GetRange(c->inputs_[0], &smallest, &largest); current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]); // Get entire range covered by compaction InternalKey all_start, all_limit; GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit); // See if we can grow the number of inputs in "level" without // changing the number of "level+1" files we pick up. if (!c->inputs_[1].empty()) { std::vector<FileMetaData*> expanded0; current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0); const int64_t inputs0_size = TotalFileSize(c->inputs_[0]); const int64_t inputs1_size = TotalFileSize(c->inputs_[1]); const int64_t expanded0_size = TotalFileSize(expanded0); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) { InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector<FileMetaData*> expanded1; current_->GetOverlappingInputs(level+1, &new_start, &new_limit, &expanded1); if (expanded1.size() == c->inputs_[1].size()) { Log(options_->info_log, "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", level, int(c->inputs_[0].size()), int(c->inputs_[1].size()), long(inputs0_size), long(inputs1_size), int(expanded0.size()), int(expanded1.size()), long(expanded0_size), long(inputs1_size)); smallest = new_start; largest = new_limit; c->inputs_[0] = expanded0; c->inputs_[1] = expanded1; GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit); } } } // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) if (level + 2 < config::kNumLevels) { current_->GetOverlappingInputs(level + 2, &all_start, &all_limit, &c->grandparents_); } if (false) { Log(options_->info_log, "Compacting %d '%s' .. '%s'", level, smallest.DebugString().c_str(), largest.DebugString().c_str()); } // Update the place where we will do the next compaction for this level. // We update this immediately instead of waiting for the VersionEdit // to be applied so that if the compaction fails, we will try a different // key range next time. compact_pointer_[level] = largest.Encode().ToString(); c->edit_.SetCompactPointer(level, largest); }
首先通过VersionSet::GetRange计算第level层包含所有文件的最小key range,以第level层最小最大key为基准调用Version::GetOverlappingInputs获取level + 1层与之重叠的文件列表并存入inputs_[1],再以inputs_[0]和inputs_[1]的全部文件调用GetRange2计算包含相邻两层全部文件的最小key range,并以这个key range计算第level层与之重叠的文件列表写入expanded0,如果相比inputs_[0]得到了更多的输入文件,且expanded0的文件大小总和加上inputs_[1]文件列表所有文件之和小于kExpandedCompactionByteSize(25 * 2MB),这里也是控制避免过多文件的compaction,在同时满足这两个条件的基础上,先获取包含expanded0所有文件的最小key range,再以此计算level + 1层与之重叠的文件列表并存入expanded1,如果expanded1的文件数量与inputs_[1]相等(不改变本次compaction的文件数量),就更新本次compaction对象的数据,记录inputs_[0]为expanded0,inputs_[1]为expanded1,同时计算更新后的compaction全部文件对应的最小key range,并以此计算level + 2层与之重叠的文件列表并存入grandparents_,最后记录本次compaction最初level层的最大key以便本次compaction如若失败下一次选取不同的key range,自动compaction时也会基于此选取待合并的文件。
不同于人工Compaction的触发条件和前期准备,自动Compaction更倾向于某层有太多的ssatble文件数据进而触发compaction(相较于因sstable文件seek times过多而言),自动触发即调用VersionSet::PickCompaction,来看下具体实现:
Compaction* VersionSet::PickCompaction() { Compaction* c; int level; // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. const bool size_compaction = (current_->compaction_score_ >= 1); const bool seek_compaction = (current_->file_to_compact_ != NULL); if (size_compaction) { level = current_->compaction_level_; assert(level >= 0); assert(level+1 < config::kNumLevels); c = new Compaction(level); // Pick the first file that comes after compact_pointer_[level] for (size_t i = 0; i < current_->files_[level].size(); i++) { FileMetaData* f = current_->files_[level][i]; if (compact_pointer_[level].empty() || icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { c->inputs_[0].push_back(f); break; } } if (c->inputs_[0].empty()) { // Wrap-around to the beginning of the key space c->inputs_[0].push_back(current_->files_[level][0]); } } else if (seek_compaction) { level = current_->file_to_compact_level_; c = new Compaction(level); c->inputs_[0].push_back(current_->file_to_compact_); } else { return NULL; } c->input_version_ = current_; c->input_version_->Ref(); // Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { InternalKey smallest, largest; GetRange(c->inputs_[0], &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); assert(!c->inputs_[0].empty()); } SetupOtherInputs(c); return c; }
自动Compaction优先检查因文件数据过多而触发的合并,其次检查因文件seek次数过多触发的合并,这两个检查条件分别由compaction_score_和file_to_compact_决定,其中前者由每次compaction(包括minor和major)后收尾工作中的VersionSet::LogAndApply调用VersionSet::Finalize预计算下一次最佳level和对应score得来,先来看下这个计算实现吧:
void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction int best_level = -1; double best_score = -1; for (int level = 0; level < config::kNumLevels-1; level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // // (1) With larger write-buffer sizes, it is nice not to do too // many level-0 compactions. // // (2) The files in level-0 are merged on every read and // therefore we wish to avoid too many files when the individual // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). score = v->files_[level].size() / static_cast(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast(level_bytes) / MaxBytesForLevel(level); } if (score > best_score) { best_level = level; best_score = score; } } v->compaction_level_ = best_level; v->compaction_score_ = best_score; }
compaction_score_的计算对于非0层是基于当前层的所有文件数据大小和该层所允许的总大小比值确定,随着层级增加所允许的文件总大小是10倍递增关系(其中level0和level1除外,均为10MB);对于第0层则考虑到write_buffer_size设置过大或过小,尽量避免大文件做过多合并,也尽量保存过多的小文件,基于文件数量和该层合并触发阈值kL0_CompactionTrigger两者比值作为该层score。
再回到PickCompaction上来,满足size_compaction的条件下,基于上一次预计算的该层待合并的最大key下限,选取第一个符合的文件,未找到就加入该层第一个文件;其次满足seek_compaction则直接加入预先计算好的file_to_compact_;level 0因文件key range有重叠,会基于当前选取的文件的key range重新计算重叠文件作为待合并输入。最后调用上面分析过的SetupOtherInputs,这一步与手动合并是一样的。
来看下file_to_compact_如何得来,一并分析相关的allowed_seeks初始值设定:
// Next file to compact based on seek stats. FileMetaData* file_to_compact_; int file_to_compact_level_; struct FileMetaData { int refs; int allowed_seeks; // Seeks allowed until compaction uint64_t number; uint64_t file_size; // File size in bytes InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { } }; bool Version::UpdateStats(const GetStats& stats) { FileMetaData* f = stats.seek_file; if (f != NULL) { f->allowed_seeks--; if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) { file_to_compact_ = f; file_to_compact_level_ = stats.seek_file_level; return true; } } return false; } // Apply all of the edits in *edit to the current state. void Apply(VersionEdit* edit) { // Update compaction pointers for (size_t i = 0; i < edit->compact_pointers_.size(); i++) { const int level = edit->compact_pointers_[i].first; vset_->compact_pointer_[level] = edit->compact_pointers_[i].second.Encode().ToString(); } // Delete files const VersionEdit::DeletedFileSet& del = edit->deleted_files_; for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin(); iter != del.end(); ++iter) { const int level = iter->first; const uint64_t number = iter->second; levels_[level].deleted_files.insert(number); } // Add new files for (size_t i = 0; i < edit->new_files_.size(); i++) { const int level = edit->new_files_[i].first; FileMetaData* f = new FileMetaData(edit->new_files_[i].second); f->refs = 1; // We arrange to automatically compact this file after // a certain number of seeks. Let's assume: // (1) One seek costs 10ms // (2) Writing or reading 1MB costs 10ms (100MB/s) // (3) A compaction of 1MB does 25MB of IO: // 1MB read from this level // 10-12MB read from next level (boundaries may be misaligned) // 10-12MB written to next level // This implies that 25 seeks cost the same as the compaction // of 1MB of data. I.e., one seek costs approximately the // same as the compaction of 40KB of data. We are a little // conservative and allow approximately one seek for every 16KB // of data before triggering a compaction. f->allowed_seeks = (f->file_size / 16384); if (f->allowed_seeks < 100) f->allowed_seeks = 100; levels_[level].deleted_files.erase(f->number); levels_[level].added_files->insert(f); } }
上面把相关代码片段截取到一块,注释比较清晰,本篇首部也有提到allowed_seeks值的更新(DBImpl::Get),file_to_compact_和file_to_compact_level_分别对应allowed_seeks减到零时的文件以及level;来看allowed_seeks初值的设定,每当做compaction后对生成的新文件调用VersionSet::Builder::Apply计算allowed_seeks:
1. 一次seek花费10ms(这里应该是以普通磁盘来计算,SSD实际访问延迟在0.1ms)
2. 顺序读写1MB为10ms(数据传输率100MB/s)
3. 合并1MB的文件数据需要25MB的IO(1MB读当前level,10-12MB读下一level,10-12MB写下一level)
由以上三点可得,1MB的compaction相当于25次seek花费,相当于1次seek对应40KB的数据compaction,实际取值更保守些,以16KB来计算,16384 = 16 *1024。
写得真好!赞一个!