往leveldb写一个KV对时先写log,再写MemTable,这样完成一次写入,写log是追加顺序写文件,写MemTable是跳表插入写内存,随机写转化成顺序写,只涉及一次磁盘IO及一次内存写,因此leveldb写入速度非常快。先写log也是为防止发生如进程挂掉时,MemTable的数据仍然可以从log进行重建恢复,不会造成数据丢失。
log文件的读写均是顺序IO,因此不需要额外的索引信息。
leveldb把每次写操作都会以日志文件记录下来,先来看格式,相关定义如下:
namespace log { enum RecordType { // Zero is reserved for preallocated files kZeroType = 0, kFullType = 1, // For fragments kFirstType = 2, kMiddleType = 3, kLastType = 4 }; static const int kMaxRecordType = kLastType; static const int kBlockSize = 32768; // Header is checksum (4 bytes), length (2 bytes), type (1 byte). static const int kHeaderSize = 4 + 2 + 1; }
日志是分块写的,每块大小为32K,每条记录有7个字节的头部,前四字节为CRC校验,中间两字节为长度,最后一字节为记录类型。块大小固定有限,而记录有可能跨块,因此有三个枚举值kFirstType、kMiddleType、kLastType分别用来标记记录位置。
Writer类提供了AddRecord的写接口,再看具体实现:
Status Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); size_t left = slice.size(); // Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record Status s; bool begin = true; do { const int leftover = kBlockSize - block_offset_; assert(leftover >= 0); if (leftover < kHeaderSize) { // Switch to a new block if (leftover > 0) { // Fill the trailer (literal below relies on kHeaderSize being 7) assert(kHeaderSize == 7); dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); } block_offset_ = 0; } // Invariant: we never leave < kHeaderSize bytes in a block. assert(kBlockSize - block_offset_ - kHeaderSize >= 0); const size_t avail = kBlockSize - block_offset_ - kHeaderSize; const size_t fragment_length = (left < avail) ? left : avail; RecordType type; const bool end = (left == fragment_length); if (begin && end) { type = kFullType; } else if (begin) { type = kFirstType; } else if (end) { type = kLastType; } else { type = kMiddleType; } s = EmitPhysicalRecord(type, ptr, fragment_length); ptr += fragment_length; left -= fragment_length; begin = false; } while (s.ok() && left > 0); return s; } Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { assert(n <= 0xffff); // Must fit in two bytes assert(block_offset_ + kHeaderSize + n <= kBlockSize); // Format the header char buf[kHeaderSize]; buf[4] = static_cast(n & 0xff); buf[5] = static_cast(n >> 8); buf[6] = static_cast(t); // Compute the crc of the record type and the payload. uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n); crc = crc32c::Mask(crc); // Adjust for storage EncodeFixed32(buf, crc); // Write the header and the payload Status s = dest_->Append(Slice(buf, kHeaderSize)); if (s.ok()) { s = dest_->Append(Slice(ptr, n)); if (s.ok()) { s = dest_->Flush(); } } block_offset_ += kHeaderSize + n; return s; }
块大小固定32K,可能跨块,分片写入,首先计算当前块剩余是否足够容纳记录头,若不能全部填充空串(ASCII码0),若恰好仅能容纳记录头则允许插入一次0长的记录;若当前块剩余大于记录头的7字节大小,则计算是否需要分片及设置相应记录标记;之后设置相应记录头和计算CRC校验,写入实际记录内容,写状态成功后flush数据到页面缓存(page cache)。
leveldb写记录使用了CRC32校验,这里来复习下CRC校验算法:
采用 CRC 校验时,发送方和接收方用同一个生成多项式g(x),g(x)是一个GF(2)多项式,并且g(x)的首位和最后一位的系数必须为1。
CRC的处理方法是:发送方用发送数据的二进制多项式t(x)除以g(x),得到余数y(x)作为CRC校验码。校验时,以计算的校正结果是否为0为据,判断数据帧是否出错。设生成多项式是r阶的(最高位是x^r)具体步骤如下面的描述。
发送方:
1 )在发送的 m 位数据的二进制多项式 t(x) 后添加 r 个 0 ,扩张到 m+ r 位,以容纳 r 位的校验码,追加 0 后的二进制多项式为 T(x) ;
2 )用 T(x) 除以生成多项式 g(x) ,得到 r 位的余数 y(x) ,它就是 CRC 校验码;
3 )把 y(x) 追加到 t(x) 后面,此时的数据 s(x) 就是包含了 CRC 校验码的待发送字符串;由于 s(x) = t(x) y(x) ,因此 s(x) 肯定能被 g(x) 除尽。
接收方:
1 )接收数据 n(x) ,这个 n(x) 就是包含了 CRC 校验码的 m+r 位数据;
2 )计算 n(x) 除以 g(x) ,如果余数为 0 则表示传输过程没有错误,否则表示有错误。从 n(x) 去掉尾部的 r 位数据,得到的就是原始数据。
生成多项式不是随意选择的,标准的CRC32生成多项式:
x^32 + x^26 + x^23 + x^22 + x^16 + x^12 + x^11+ x^10 + x^8 + x^7 + x^5 + x^4 + x^2 + x + 1
以16进制表示就是0x04C11DB7。
实际的CRC32算法是面向字节的,而非低效的单比特处理,leveldb对该算法的具体代码实现,有时间再仔细分析。
为什么以32K这种固定块大小写数据?个人认为相对于写KV记录大小和读取记录的buffer数据块,32K可能是在性能上比较适宜的大小。
再来看日志记录的读取,Reader类提供了ReadRecord的读接口:
bool Reader::ReadRecord(Slice* record, std::string* scratch) { if (last_record_offset_ < initial_offset_) { if (!SkipToInitialBlock()) { return false; } } scratch->clear(); record->clear(); bool in_fragmented_record = false; // Record offset of the logical record that we're reading // 0 is a dummy value to make compilers happy uint64_t prospective_record_offset = 0; Slice fragment; while (true) { const unsigned int record_type = ReadPhysicalRecord(&fragment); // ReadPhysicalRecord may have only had an empty trailer remaining in its // internal buffer. Calculate the offset of the next physical record now // that it has returned, properly accounting for its header size. uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size(); if (resyncing_) { if (record_type == kMiddleType) { continue; } else if (record_type == kLastType) { resyncing_ = false; continue; } else { resyncing_ = false; } } switch (record_type) { case kFullType: if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (scratch->empty()) { in_fragmented_record = false; } else { ReportCorruption(scratch->size(), "partial record without end(1)"); } } prospective_record_offset = physical_record_offset; scratch->clear(); *record = fragment; last_record_offset_ = prospective_record_offset; return true; case kFirstType: if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (scratch->empty()) { in_fragmented_record = false; } else { ReportCorruption(scratch->size(), "partial record without end(2)"); } } prospective_record_offset = physical_record_offset; scratch->assign(fragment.data(), fragment.size()); in_fragmented_record = true; break; case kMiddleType: if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(1)"); } else { scratch->append(fragment.data(), fragment.size()); } break; case kLastType: if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(2)"); } else { scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); last_record_offset_ = prospective_record_offset; return true; } break; case kEof: if (in_fragmented_record) { // This can be caused by the writer dying immediately after // writing a physical record but before completing the next; don't // treat it as a corruption, just ignore the entire logical record. scratch->clear(); } return false; case kBadRecord: if (in_fragmented_record) { ReportCorruption(scratch->size(), "error in middle of record"); in_fragmented_record = false; scratch->clear(); } break; default: { char buf[40]; snprintf(buf, sizeof(buf), "unknown record type %u", record_type); ReportCorruption( (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), buf); in_fragmented_record = false; scratch->clear(); break; } } } return false; }
Reader构造函数是可以指定初始读偏移(initial_offset_)的,如果上一次已读取记录的偏移(last_record_offset_)小于初始偏移,则在读取时首先需要校正读起始位置到当前块(32K)起始位置,若块内偏移在尾部不足记录头的大小范围内,则跳到下一块,如果skip调整失败直接返回读失败;下面是读取实际物理记录,初次读取可能需要resyncing跳过非初始块;也有可能读取到尾部0长的记录,其记录类型仅可能为kFirstType(某条新记录的初始),紧邻下一块可能为kFirstType或kFullType,Reader对这两种情形实现上做了兼容考虑;读取记录是否分片(fragment)跨块用in_fragmented_record来标记,结合该标记和记录类型,完成相应判断返回结果和读状态。
着重说明下几个成员变量,initial_offset_是指定的初始读偏移(相对于log文件),last_record_offset_是已读取到的记录偏移(相对于当前block块),end_of_buffer_offset_是已读取块偏移(相对于log文件),buffer_不仅用来缓存块数据,也用来配合计算实际读取的块内记录偏移。
下面看读取实际的记录过程:
unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { if (buffer_.size() < kHeaderSize) { if (!eof_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); Status status = file_->Read(kBlockSize, &buffer_, backing_store_); end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); eof_ = true; return kEof; } else if (buffer_.size() < kBlockSize) { eof_ = true; } continue; } else { // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the // middle of writing the header. Instead of considering this an error, // just report EOF. buffer_.clear(); return kEof; } } // Parse the header const char* header = buffer_.data(); const uint32_t a = static_cast(header[4]) & 0xff; const uint32_t b = static_cast(header[5]) & 0xff; const unsigned int type = header[6]; const uint32_t length = a | (b << 8); if (kHeaderSize + length > buffer_.size()) { size_t drop_size = buffer_.size(); buffer_.clear(); if (!eof_) { ReportCorruption(drop_size, "bad record length"); return kBadRecord; } // If the end of the file has been reached without reading |length| bytes // of payload, assume the writer died in the middle of writing the record. // Don't report a corruption. return kEof; } if (type == kZeroType && length == 0) { // Skip zero length record without reporting any drops since // such records are produced by the mmap based writing code in // env_posix.cc that preallocates file regions. buffer_.clear(); return kBadRecord; } // Check crc if (checksum_) { uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); if (actual_crc != expected_crc) { // Drop the rest of the buffer since "length" itself may have // been corrupted and if we trust it, we could find some // fragment of a real log record that just happens to look // like a valid log record. size_t drop_size = buffer_.size(); buffer_.clear(); ReportCorruption(drop_size, "checksum mismatch"); return kBadRecord; } } buffer_.remove_prefix(kHeaderSize + length); // Skip physical record that started before initial_offset_ if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < initial_offset_) { result->clear(); return kBadRecord; } *result = Slice(header + kHeaderSize, length); return type; } }
内部使用了buffer_缓存当前数据块,eof_用来标记文件是否读到结束,接着是解析头部,如果指定了CRC32校验,则对当前记录进行校验,更新buffer_已读取偏移以备下次顺序IO使用,最后构造实际数据填充结果。
leveldb作者在doc/log_format.txt中对这种日志格式设计的优缺点给予了说明,好处如下:
(1)不需要做额外的启发式resyncing,出错直接跳到下一块。
(2)支持如mapreduce所需的文件切分机制,按照完整逻辑记录切分即可。
(3)大记录读写无需添加外信息或buffer。
缺点:
(1)小记录无pack。
(2)不支持压缩。
这两点都可以通过添加新的记录类型支持。
refer:
1. http://blog.csdn.net/sparkliang/article/details/5671510