DB写Put-leveldb源码剖析(12)

写操作会封装成WriteBatch,然后再写MemTable,为防止过量写影响读的效率,会有一定策略来限制写,本篇将分析具体调用流程及详细实现。

DB写即DBImpl::Put转调用DB默认实现的函数进行WriteBatch封装:

// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

WriteBatch从设计上是为支持批量写入操作,默认DB提供的Put接口对单条插入也封装成批量模式,并提供真正的DBImpl::Write执行真正的批量操作,因此leveldb数据库的使用者实际可用Write接口来自定制批量写入模式。

从另一个角度看WriteBatch,也体现了leveldb数据库写操作的事务性,可将相关的多个写操作绑定于同一个WriteBatch做到原子性更新。

WriteBatch持有string变量rep_,按照一定的格式将多条记录写入rep_,具体格式如下:writebatch_record_format注意当ValueType为kTypeDeletion时,Record中只有Key。

继续调用Write完成真正的写操作:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == NULL);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
    WriteBatch* updates = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }

  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}

先看第7-11行,这段代码主要考虑多线程并发写的情况,多写需要加锁,确保只有一个线程在执行写操作,获取锁后先加入写请求队列,再用while 循环包住block条件和wait调用,这是互斥量和条件变量一块使用的惯用写法,当获得锁的某个写线程完成批量写入操作唤醒等待的线程,某个写线程被唤醒后检查自身done标记是否已写入完成,完成则直接返回写状态结果。

下面看MakeRoomForWrite这个很重要的函数,分析限制写的策略:

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
  mutex_.AssertHeld();
  assert(!writers_.empty());
  bool allow_delay = !force;
  Status s;
  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
    } else if (
        allow_delay &&
        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
      // individual write by 1ms to reduce latency variance.  Also,
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // There is room in current memtable
      break;
    } else if (imm_ != NULL) {
      // We have filled up the current memtable, but the previous
      // one is still being compacted, so we wait.
      Log(options_.info_log, "Current memtable full; waiting...\n");
      bg_cv_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // There are too many level-0 files.
      Log(options_.info_log, "Too many L0 files; waiting...\n");
      bg_cv_.Wait();
    } else {
      // Attempt to switch to a new memtable and trigger compaction of old
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
      WritableFile* lfile = NULL;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      if (!s.ok()) {
        // Avoid chewing through file number space in a tight loop.
        versions_->ReuseFileNumber(new_log_number);
        break;
      }
      delete log_;
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.Release_Store(imm_);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;   // Do not force another compaction if have room
      MaybeScheduleCompaction();
    }
  }
  return s;
}

首先检查锁持有,断言写请求队列不为空。
正常情况下是允许限制写,而传入NULL WriteBatch则标志试图强制做Compaction检查,来看循环检查当前DB状态:
1> allow_delay为true且当前level 0文件数不小于kL0_SlowdownWritesTrigger,将会减速写,解锁同时sleep 1毫秒,置allow_delay为false,该延迟写线程仅允许一次。
2> MemTable未写满,正常继续写。
3> MemTable写满,但imm又不为空,只得等待compaction完成。
4> imm不为空,但level 0文件数达到kL0_StopWritesTrigger,只得等待compaction完成。
5> MemTable写满且imm为空,则创建新的log文件,并作MemTable切换,构造新的MemTable,同时触发Compaction检查,置force标记为false,不应再强制触发。

继续回到Write函数,开启真正的写操作:
1> 批量写以BuildBatchGroup形式做过量写的限制,避免一次写过多。
2> 设置并根据当前批量写数量更新SequenceNumber。
3> 解锁允许新的写请求进入,对本次批量写调用WriteBatchInternal先写log日志再写MemTable。
4> 加锁更新当前版本LastSequence。
5> pop处理写请求队列,置done完成标记,直到本次BuildBatchGroup的最后一个Writer,并唤醒写请求队列的队首。

当前写线程写操作完成。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注