先前博文中分析了leveldb的写流程,当写线程获取锁将本次写数据(含WriteBatch)放入写队列后却不是队首写线程时,线程本身会阻塞等待直至队首线程写完毕,锁粒度比较大,相较于leveldb,rocksdb对于多线程下并发写入做了很多细节上的优化,让我们来一一分析。
DBImpl持有的WriteThread对象管理整个rocksdb在多线程下的写入,先来看下基础的数据结构:
class WriteThread { public: enum State : uint8_t { // The initial state of a writer. This is a Writer that is // waiting in JoinBatchGroup. This state can be left when another // thread informs the waiter that it has become a group leader // (-> STATE_GROUP_LEADER), when a leader that has chosen to be // non-parallel informs a follower that its writes have been committed // (-> STATE_COMPLETED), or when a leader that has chosen to perform // updates in parallel and needs this Writer to apply its batch (-> // STATE_PARALLEL_FOLLOWER). STATE_INIT = 1, // The state used to inform a waiting Writer that it has become the // leader, and it should now build a write batch group. Tricky: // this state is not used if newest_writer_ is empty when a writer // enqueues itself, because there is no need to wait (or even to // create the mutex and condvar used to wait) in that case. This is // a terminal state unless the leader chooses to make this a parallel // batch, in which case the last parallel worker to finish will move // the leader to STATE_COMPLETED. STATE_GROUP_LEADER = 2, // A Writer that has returned as a follower in a parallel group. // It should apply its batch to the memtable and then call // CompleteParallelWorker. When someone calls ExitAsBatchGroupLeader // or EarlyExitParallelGroup this state will get transitioned to // STATE_COMPLETED. STATE_PARALLEL_FOLLOWER = 4, // A follower whose writes have been applied, or a parallel leader // whose followers have all finished their work. This is a terminal // state. STATE_COMPLETED = 8, // A state indicating that the thread may be waiting using StateMutex() // and StateCondVar() STATE_LOCKED_WAITING = 16, }; struct Writer; struct ParallelGroup { Writer* leader; Writer* last_writer; SequenceNumber last_sequence; // before running goes to zero, status needs leader->StateMutex() Status status; std::atomic running; }; // Information kept for every waiting writer. struct Writer { WriteBatch* batch; bool sync; bool no_slowdown; bool disable_wal; bool disable_memtable; uint64_t log_used; // log number that this batch was inserted into uint64_t log_ref; // log number that memtable insert should reference bool in_batch_group; WriteCallback* callback; bool made_waitable; // records lazy construction of mutex and cv std::atomic<uint8_t> state; // write under StateMutex() or pre-link ParallelGroup* parallel_group; SequenceNumber sequence; // the sequence number to use for the first key Status status; // status of memtable inserter Status callback_status; // status returned by callback->Callback() std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes; std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes; Writer* link_older; // read/write only before linking, or as leader Writer* link_newer; // lazy, read/write only before linking, or as leader }; };
枚举结构State用来标识Writer的写状态,构造状态机用于更细化的甚至无锁并发写入控制:
STATE_INIT:Writer(一次写操作)的初始状态,在JoinBatchGroup等待,直到其它线程成为group leader,该leader在并行模式下通知Write应用本次写成为follower或者非并行模式下直至本次写被成功提交成为completed状态。
STATE_GROUP_LEADER:告知等待的某Writer成为leader,构建write batch group。需要注意的是当没有其它正在等待的Writer时,作为一个结束状态;但如果该leader选择并行写入模式则最后一个并行写入完成的worker会将该Writer状态置为completed状态。
STATE_PARALLEL_FOLLOWER:并行写入模式下作为group内的follower,将本次写入提交应用到memtable,调用CompleteParallelWorker。当其它Writer调用ExitAsBatchGroupLeader或EarlyExitParallelGroup,该状态转移到completed状态。
STATE_COMPLETED:正常结束状态,作为follower本次写入被提交应用完成或者作为leader在并行写入模式下所有follower均写入完成。
STATE_LOCKED_WAITING:条件等待,有锁。
再来看Writer这个类:
Writer是作为栈上局部对象创建使用,内部持有两个基于std::aligned_storage预先定义分配的两片栈上内存,需要同步时以placement new方式分别创建互斥锁和条件变量,相比直接new的方式更高效,对cache也更友好。
Writer一经创建,调用JoinBatchGroup:
void WriteThread::JoinBatchGroup(Writer* w) { static AdaptationContext ctx("JoinBatchGroup"); assert(w->batch != nullptr); bool linked_as_leader = LinkOne(w, &newest_writer_); if (linked_as_leader) { SetState(w, STATE_GROUP_LEADER); } TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); if (!linked_as_leader) { /** * Wait util: * 1) An existing leader pick us as the new leader when it finishes * 2) An existing leader pick us as its follewer and * 2.1) finishes the memtable writes on our behalf * 2.2) Or tell us to finish the memtable writes in pralallel * 3) (pipelined write) An existing leader pick us as its follower and * finish book-keeping and WAL write for us, enqueue us as pending * memtable writer, and * 3.1) we become memtable writer group leader, or * 3.2) an existing memtable writer group leader tell us to finish memtable * writes in parallel. */ AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &ctx); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); } } bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) { assert(newest_writer != nullptr); assert(w->state == STATE_INIT); Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { w->link_older = writers; if (newest_writer->compare_exchange_weak(writers, w)) { return (writers == nullptr); } } }
newest_writer_是std::atomic<Writer*>原子类型,在LinkOne中以无锁(lock-free)形式读写该变量。
LinkOne中使用while loop包着std::atomic的compare_exchange_weak,默认最严格的内存序memory_order_seq_cst,这是C++11中非常高效的无锁控制(best practice),无需任何锁,最终返回当前writer是否成为leader。
分拆来看写入的具体实现,先来看当前线程写入为STATE_PARALLEL_FOLLOWER时:
Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, bool disable_memtable) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w; w.batch = my_batch; w.sync = write_options.sync; w.disableWAL = write_options.disableWAL; w.disable_memtable = disable_memtable; w.in_batch_group = false; w.callback = callback; w.log_ref = log_ref; if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); } StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); write_thread_.JoinBatchGroup(&w); if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) { // we are a non-leader in a parallel group PERF_TIMER_GUARD(write_memtable_time); if (log_used != nullptr) { *log_used = w.log_used; } if (w.ShouldWriteToMemtable()) { ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); WriteBatchInternal::SetSequence(w.batch, w.sequence); w.status = WriteBatchInternal::InsertInto( &w, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); } if (write_thread_.CompleteParallelWorker(&w)) { // we're responsible for early exit auto last_sequence = w.parallel_group->last_sequence; versions_->SetLastSequence(last_sequence); write_thread_.EarlyExitParallelGroup(&w); } assert(w.state == WriteThread::STATE_COMPLETED); // STATE_COMPLETED conditional below handles exit status = w.FinalStatus(); } if (w.state == WriteThread::STATE_COMPLETED) { if (log_used != nullptr) { *log_used = w.log_used; } // write is complete and leader has updated sequence RecordTick(stats_, WRITE_DONE_BY_OTHER); return w.FinalStatus(); } } bool WriteThread::CompleteParallelWorker(Writer* w) { static AdaptationContext ctx("CompleteParallelWorker"); auto* pg = w->parallel_group; if (!w->status.ok()) { std::lock_guard guard(pg->leader->StateMutex()); pg->status = w->status; } auto leader = pg->leader; auto early_exit_allowed = pg->early_exit_allowed; if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) { // we're not the last one AwaitState(w, STATE_COMPLETED, &ctx); // Caller only needs to perform exit duties if early exit doesn't // apply and this is the leader. Can't touch pg here. Whoever set // our state to STATE_COMPLETED copied pg->status to w.status for us. return w == leader && !(early_exit_allowed && w->status.ok()); } // else we're the last parallel worker if (w == leader || (early_exit_allowed && pg->status.ok())) { // this thread should perform exit duties w->status = pg->status; return true; } else { // We're the last parallel follower but early commit is not // applicable. Wake up the leader and then wait for it to exit. assert(w->state == STATE_PARALLEL_FOLLOWER); SetState(leader, STATE_COMPLETED); AwaitState(w, STATE_COMPLETED, &ctx); return false; } }
调用JoinBatchGroup若当前线程Writer成为STATE_PARALLEL_FOLLOWER,借助MemTableInserter将本次WriteBatch并发的写入ColumnFamilyMemTablesImpl对应的MemTable而无需锁,该MemTable默认是由ColumnFamilyOptions继承自AdvancedColumnFamilyOptions中的memtable_factory选项指定为SkipListFactory,而SkipListFactory的CreateMemTableRep创建SkipListRep,SkipListRep内部使用了InlineSkipList,这点对于了解当前MemTable支持的并发写入模式至关重要。
当前线程写入为STATE_GROUP_LEADER时:
// else we are the leader of the write batch group assert(w.state == WriteThread::STATE_GROUP_LEADER); WriteContext context; mutex_.Lock(); if (!write_options.disableWAL) { default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); } RecordTick(stats_, WRITE_DONE_BY_SELF); default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); // Once reaches this point, the current writer "w" will try to do its write // job. It may also pick up some of the remaining writers in the "writers_" // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. assert(!single_column_family_mode_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); if (UNLIKELY(!single_column_family_mode_ && total_log_size_ > GetMaxTotalWalSize())) { MaybeFlushColumnFamilies(); } if (UNLIKELY(write_buffer_manager_->ShouldFlush())) { // Before a new memtable is added in SwitchMemtable(), // write_buffer_manager_->ShouldFlush() will keep returning true. If another // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log, "Flushing column family with largest mem table size. Write buffer is " "using %" PRIu64 " bytes out of a total of %" PRIu64 ".", write_buffer_manager_->memory_usage(), write_buffer_manager_->buffer_size()); // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread ColumnFamilyData* largest_cfd = nullptr; size_t largest_cfd_size = 0; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; } if (!cfd->mem()->IsEmpty()) { // We only consider active mem table, hoping immutable memtable is // already in the process of flushing. size_t cfd_size = cfd->mem()->ApproximateMemoryUsage(); if (largest_cfd == nullptr || cfd_size > largest_cfd_size) { largest_cfd = cfd; largest_cfd_size = cfd_size; } } } if (largest_cfd != nullptr) { status = SwitchMemtable(largest_cfd, &context); if (status.ok()) { largest_cfd->imm()->FlushRequested(); SchedulePendingFlush(largest_cfd); MaybeScheduleFlushOrCompaction(); } } } if (UNLIKELY(status.ok() && !bg_error_.ok())) { status = bg_error_; } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { status = ScheduleFlushes(&context); } if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || write_controller_.NeedsDelay()))) { PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size // for previous one. It might create a fairness issue that expiration // might happen for smaller writes but larger writes can go through. // Can optimize it if it is an issue. status = DelayWrite(last_batch_group_size_, write_options); PERF_TIMER_START(write_pre_and_post_process_time); } Status DBImpl::DelayWrite(uint64_t num_bytes, const WriteOptions& write_options) { uint64_t time_delayed = 0; bool delayed = false; { StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed); auto delay = write_controller_.GetDelay(env_, num_bytes); if (delay > 0) { if (write_options.no_slowdown) { return Status::Incomplete(); } mutex_.Unlock(); delayed = true; TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); // hopefully we don't have to sleep more than 2 billion microseconds env_->SleepForMicroseconds(static_cast(delay)); mutex_.Lock(); } while (bg_error_.ok() && write_controller_.IsStopped()) { if (write_options.no_slowdown) { return Status::Incomplete(); } delayed = true; TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); bg_cv_.Wait(); } } assert(!delayed || !write_options.no_slowdown); if (delayed) { default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_STALL_MICROS, time_delayed); RecordTick(stats_, STALL_MICROS, time_delayed); } return bg_error_; } // This is inside DB mutex, so we can't sleep and need to minimize // frequency to get time. // If it turns out to be a performance issue, we can redesign the thread // synchronization model here. // The function trust caller will sleep micros returned. uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { if (total_stopped_ > 0) { return 0; } if (total_delayed_ == 0) { return 0; } const uint64_t kMicrosPerSecond = 1000000; const uint64_t kRefillInterval = 1024U; if (bytes_left_ >= num_bytes) { bytes_left_ -= num_bytes; return 0; } // The frequency to get time inside DB mutex is less than one per refill // interval. auto time_now = NowMicrosMonotonic(env); uint64_t sleep_debt = 0; uint64_t time_since_last_refill = 0; if (last_refill_time_ != 0) { if (last_refill_time_ > time_now) { sleep_debt = last_refill_time_ - time_now; } else { time_since_last_refill = time_now - last_refill_time_; bytes_left_ += static_cast(static_cast(time_since_last_refill) / kMicrosPerSecond * delayed_write_rate_); if (time_since_last_refill >= kRefillInterval && bytes_left_ > num_bytes) { // If refill interval already passed and we have enough bytes // return without extra sleeping. last_refill_time_ = time_now; bytes_left_ -= num_bytes; return 0; } } } uint64_t single_refill_amount = delayed_write_rate_ * kRefillInterval / kMicrosPerSecond; if (bytes_left_ + single_refill_amount >= num_bytes) { // Wait until a refill interval // Never trigger expire for less than one refill interval to avoid to get // time. bytes_left_ = bytes_left_ + single_refill_amount - num_bytes; last_refill_time_ = time_now + kRefillInterval; return kRefillInterval + sleep_debt; } // Need to refill more than one interval. Need to sleep longer. Check // whether expiration will hit // Sleep just until `num_bytes` is allowed. uint64_t sleep_amount = static_cast(num_bytes / static_cast(delayed_write_rate_) * kMicrosPerSecond) + sleep_debt; last_refill_time_ = time_now + sleep_amount; return sleep_amount; }
作为STATE_GROUP_LEADER先加锁, 由write_buffer_manager_(DBOptions指定全局总控使用,对应初始化由ColumnFamilyOptions中的write_buffer_size指定,默认值为64MB)检查所有DB的全部存活的memtable是否达到某个上限值,如果需要flush,会选择当前DB下memtable使用最多的columnfamily进行SwitchMemtable,判断是否需要创建新log和是否复用已有log文件,创建新memable并切换,再提交flush并做compaction。
上一步可能需要做flush或compaction,继而由write_controller_检查是否需要延迟写,这块rocksdb进行的优化相当用心。
来看DelayWrite,由于DBOptions中delayed_write_rate默认值为16MB/s(限制的最大写速率),以last_batch_group_size_(上一次写入大小)来预测当前线程暂停执行多少微秒,为精细控制等待的频次,以kRefillInterval(1024,大概1毫秒)为间隔单位调整等待时间,再检查是否需要激活条件变量进行睡眠阻塞出让CPU。
下面看当前write线程作为leader,本次写入应用到WAL和写MemTable:
uint64_t last_sequence = versions_->LastSequence(); WriteThread::Writer* last_writer = &w; autovector<WriteThread::Writer*> write_group; bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; bool logs_getting_synced = false; if (status.ok()) { if (need_log_sync) { while (logs_.front().getting_synced) { log_sync_cv_.Wait(); } for (auto& log : logs_) { assert(!log.getting_synced); log.getting_synced = true; } logs_getting_synced = true; } // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into memtables } log::Writer* cur_log_writer = logs_.back().writer; mutex_.Unlock();
在本次写WAL之前,且作为leader仍持有锁期间,当前写入未禁用写WAL和文件及目录同步的情况下,检查所有的WAL文件并打完成同步标记,并得到本次需要写入的WAL日志writer对象,下面可以解锁进行真正的写入操作了。
// At this point the mutex is unlocked bool exit_completed_early = false; last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group); if (status.ok()) { // Rules for when we can update the memtable concurrently // 1. supported by memtable // 2. Puts are not okay if inplace_update_support // 3. Deletes or SingleDeletes are not okay if filtering deletes // (controlled by both batch and memtable setting) // 4. Merges are not okay // // Rules 1..3 are enforced by checking the options // during startup (CheckConcurrentWritesSupported), so if // options.allow_concurrent_memtable_write is true then they can be // assumed to be true. Rule 4 is checked for each batch. We could // relax rules 2 and 3 if we could prevent write batches from referring // more than once to a particular key. bool parallel = immutable_db_options_.allow_concurrent_memtable_write && write_group.size() > 1; int total_count = 0; uint64_t total_byte_size = 0; for (auto writer : write_group) { if (writer->CheckCallback(this)) { if (writer->ShouldWriteToMemtable()) { total_count += WriteBatchInternal::Count(writer->batch); parallel = parallel && !writer->batch->HasMerge(); } if (writer->ShouldWriteToWAL()) { total_byte_size = WriteBatchInternal::AppendedByteSize( total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); } } } const SequenceNumber current_sequence = last_sequence + 1; last_sequence += total_count; // Record statistics RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); RecordTick(stats_, BYTES_WRITTEN, total_byte_size); MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); PERF_TIMER_STOP(write_pre_and_post_process_time); if (write_options.disableWAL) { has_unpersisted_data_.store(true, std::memory_order_relaxed); } uint64_t log_size = 0; if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); WriteBatch* merged_batch = nullptr; if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() && write_group[0]->batch->GetWalTerminationPoint().is_cleared()) { // we simply write the first WriteBatch to WAL if the group only // contains one batch, that batch should be written to the WAL, // and the batch is not wanting to be truncated merged_batch = write_group[0]->batch; write_group[0]->log_used = logfile_number_; } else { // WAL needs all of the batches flattened into a single batch. // We could avoid copying here with an iov-like AddRecord // interface merged_batch = &tmp_batch_; for (auto writer : write_group) { if (writer->ShouldWriteToWAL()) { WriteBatchInternal::Append(merged_batch, writer->batch, /*WAL_only*/ true); } writer->log_used = logfile_number_; } } if (log_used != nullptr) { *log_used = logfile_number_; } WriteBatchInternal::SetSequence(merged_batch, current_sequence); Slice log_entry = WriteBatchInternal::Contents(merged_batch); status = cur_log_writer->AddRecord(log_entry); total_log_size_ += log_entry.size(); alive_log_files_.back().AddSize(log_entry.size()); log_empty_ = false; log_size = log_entry.size(); RecordTick(stats_, WAL_FILE_BYTES, log_size); if (status.ok() && need_log_sync) { RecordTick(stats_, WAL_FILE_SYNCED); StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); // It's safe to access logs_ with unlocked mutex_ here because: // - we've set getting_synced=true for all logs, // so other threads won't pop from logs_ while we're here, // - only writer thread can push to logs_, and we're in // writer thread, so no one will push to logs_, // - as long as other threads don't modify it, it's safe to read // from std::deque from multiple threads concurrently. for (auto& log : logs_) { status = log.writer->file()->Sync(immutable_db_options_.use_fsync); if (!status.ok()) { break; } } if (status.ok() && need_log_dir_sync) { // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, // we can avoid the disk I/O in the write code path. status = directories_.GetWalDir()->Fsync(); } } if (merged_batch == &tmp_batch_) { tmp_batch_.Clear(); } } if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); { // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully // commit. That lets us release our leader status early in // some cases. auto stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); if (!write_options.disableWAL) { if (write_options.sync) { stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1); } stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); } uint64_t for_other = write_group.size() - 1; if (for_other > 0) { stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other); if (!write_options.disableWAL) { stats->AddDBStats(InternalStats::WRITE_WITH_WAL, for_other); } } } if (!parallel) { status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this); if (status.ok()) { // There were no write failures. Set leader's status // in case the write callback returned a non-ok status. status = w.FinalStatus(); } } else { WriteThread::ParallelGroup pg; pg.leader = &w; pg.last_writer = last_writer; pg.last_sequence = last_sequence; pg.early_exit_allowed = !need_log_sync; pg.running.store(static_cast(write_group.size()), std::memory_order_relaxed); write_thread_.LaunchParallelFollowers(&pg, current_sequence); if (w.ShouldWriteToMemtable()) { // do leader write ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); assert(w.sequence == current_sequence); WriteBatchInternal::SetSequence(w.batch, w.sequence); w.status = WriteBatchInternal::InsertInto( &w, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); } // CompleteParallelWorker returns true if this thread should // handle exit, false means somebody else did exit_completed_early = !write_thread_.CompleteParallelWorker(&w); status = w.FinalStatus(); } if (!exit_completed_early && w.status.ok()) { versions_->SetLastSequence(last_sequence); if (!need_log_sync) { write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status); exit_completed_early = true; } } // A non-OK status here indicates that the state implied by the // WAL has diverged from the in-memory state. This could be // because of a corrupt write_batch (very bad), or because the // client specified an invalid column family and didn't specify // ignore_missing_column_families. // // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. if (!status.ok() && bg_error_.ok() && !w.CallbackFailed()) { bg_error_ = status; } } } PERF_TIMER_START(write_pre_and_post_process_time); if (immutable_db_options_.paranoid_checks && !status.ok() && !w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); if (bg_error_.ok()) { bg_error_ = status; // stop compaction & fail any further writes } mutex_.Unlock(); } if (logs_getting_synced) { mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); mutex_.Unlock(); } if (!exit_completed_early) { write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status); } return status;
调用EnterAsBatchGroupLeader,计算当前线程作为leader及其所有follower的本次总计写入大小(即DelayWrite预测暂停时间的参照量),且follower的写入按照先来后到的顺序并入本次batch group。