muduo AsyncLogging 解析(异步日志)

异步日志原理

需要了解其中使用的几个类使用方法

muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
std::vector<std::unique_ptr<Buffer>> BufferVector; 
BufferVector::value_type BufferPtr;

其中 Buffer 是一个缓冲区,用于接受来自程序的日志输入。
BufferVector是一个用于接受Buffer的容器类,注意其中使用的是唯一指针std::unique_ptr,使用的过程中只需要new 就可以,不需要手动delete 释放,不过相应的也只能使用std::move 进行转移。在当前场景中非常适合,这也是一个值得仔细思考的地方。
BufferPtr 用于简化使用。

其中异步处理日志文件的核心就是使用了另外一个写入线程,专门将需要记录的缓冲区块写入到磁盘中的日志文件中。其中使用 AsyncLogging 的类中有一个 buffer vector ,在异步线程中也存在一个 buffer vector 。在写入线程中,每次达到条件需要写入日志记录时都会将 两个 buffer vector 交换。使用日志的地方继续往新的 buffer vector 中写入日志,而写入线程则将vector 容器中的buffer的内容写入磁盘中。
具体流程用下图表示:
muduo AsyncLogging 解析(异步日志)

AsyncLogging.h

// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_ASYNCLOGGING_H
#define MUDUO_BASE_ASYNCLOGGING_H

#include "muduo/base/BlockingQueue.h"
#include "muduo/base/BoundedBlockingQueue.h"
#include "muduo/base/CountDownLatch.h"
#include "muduo/base/Mutex.h"
#include "muduo/base/Thread.h"
#include "muduo/base/LogStream.h"

#include <atomic>
#include <vector>

namespace muduo
{

// 异步日志
class AsyncLogging : noncopyable
{
public:
  AsyncLogging(const string &basename,
               off_t rollSize, // 滚动大小
               int flushInterval = 3);

  ~AsyncLogging()
  {
    if (running_)
    {
      stop();
    }
  }

  void append(const char *logline, int len);

  void start()
  {
    running_ = true;
    // 开启线程
    thread_.start();
    // 等待线程结束
    latch_.wait();
  }

  // 使用了 clang NO_THREAD_SAFETY_ANALYSIS 属性,作用于 method ; 关闭线程的安全性检测
  void stop() NO_THREAD_SAFETY_ANALYSIS
  {
    running_ = false;
    cond_.notify();
    thread_.join();
  }

private:
  void threadFunc();

  typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; // 大缓冲区
  typedef std::vector<std::unique_ptr<Buffer>> BufferVector;              // 大缓冲区 vector
  typedef BufferVector::value_type BufferPtr;                             // 大缓冲区 vector 元素类型

  const int flushInterval_;   // 刷新间隔
  std::atomic<bool> running_; // 原子bool类型
  const string basename_;     // 日志文件 最后的文件名
  const off_t rollSize_;      // 滚动最大值
  muduo::Thread thread_;
  muduo::CountDownLatch latch_;
  muduo::MutexLock mutex_;
  muduo::Condition cond_ GUARDED_BY(mutex_);
  BufferPtr currentBuffer_ GUARDED_BY(mutex_); // 当前的缓冲区
  BufferPtr nextBuffer_ GUARDED_BY(mutex_);    // 下一个缓冲区
  BufferVector buffers_ GUARDED_BY(mutex_);    // 缓冲区 vector
};

} // namespace muduo

#endif // MUDUO_BASE_ASYNCLOGGING_H

AsyncLogging.cpp

// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/base/AsyncLogging.h"
#include "muduo/base/LogFile.h"
#include "muduo/base/Timestamp.h"

#include <stdio.h>

using namespace muduo;

AsyncLogging::AsyncLogging(const string& basename,
                           off_t rollSize,
                           int flushInterval)
  : flushInterval_(flushInterval),
    running_(false),
    basename_(basename),
    rollSize_(rollSize),
    thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"), // std::functional<void()>
    latch_(1),
    mutex_(),
    cond_(mutex_),
    currentBuffer_(new Buffer),
    nextBuffer_(new Buffer),
    buffers_()
{
  // 初始化 缓冲区
  currentBuffer_->bzero();
  nextBuffer_->bzero();
  // vector reserve 16个
  buffers_.reserve(16);
}

// 添加日志记录
void AsyncLogging::append(const char* logline, int len)
{
  muduo::MutexLockGuard lock(mutex_);
  if (currentBuffer_->avail() > len)
  {
    currentBuffer_->append(logline, len);
  }
  else
  {
    // 将使用完后的 buffer 添加到 buffer vector 后
    buffers_.push_back(std::move(currentBuffer_));

    if (nextBuffer_)
    {
      currentBuffer_ = std::move(nextBuffer_);
    }
    else
    {
      // Buffer 是 unique_ptr 不需要显示调用 delete
      currentBuffer_.reset(new Buffer); // Rarely happens 极少数会发生
    }
    currentBuffer_->append(logline, len);
    // 发出信号,有新的 buffer 添加到 vector 后面
    cond_.notify();
  }
}

// 异步线程执行函数
void AsyncLogging::threadFunc()
{
  assert(running_ == true);
  latch_.countDown();
  // 创建一个 日志文件 LogFile
  LogFile output(basename_, rollSize_, false);
  // BufferPtr 属于 unique_ptr 只能使用 std::move 移动
  BufferPtr newBuffer1(new Buffer);
  BufferPtr newBuffer2(new Buffer);
  newBuffer1->bzero();
  newBuffer2->bzero();
  BufferVector buffersToWrite;  // 要写入 LogFile 的 buffer vector
  buffersToWrite.reserve(16);

  // loop
  while (running_)
  {
    assert(newBuffer1 && newBuffer1->length() == 0);
    assert(newBuffer2 && newBuffer2->length() == 0);
    assert(buffersToWrite.empty());
    // buffersToWrite 必须是空的,newBuffer1、newBuffer2必须是空的并且可用


    // 等待主程序,与主程序交互,交换 buffer vector 完成数据的写入
    {
      muduo::MutexLockGuard lock(mutex_);
      // buffers_ 是主程序中写入的缓冲区
      if (buffers_.empty())  // unusual usage!  主程序中没有写入日志
      {
        // 等待主程序中 vector 有新的 buffer 被添加
        cond_.waitForSeconds(flushInterval_);
      }
      // 将主程序中正在使用的 currentBuffer_ 存入 buffers_
      buffers_.push_back(std::move(currentBuffer_));
      // 使用 newBuffer1 替换正在使用的 currentBuffer_
      currentBuffer_ = std::move(newBuffer1);
      // 使用新的未使用的 buffersToWrite 交换 buffers_,将buffers_中的数据在异步线程中写入 LogFile 中
      buffersToWrite.swap(buffers_);
      if (!nextBuffer_)
      {
        // 如果主程序中的 nextBuffer_ 为nullptr ,使用 newBuffer2 代替
        nextBuffer_ = std::move(newBuffer2);
      }
    }

    // 从主程序中获得的 buffer vector 不可以是空的
    assert(!buffersToWrite.empty());  // 不可以是 nullptr

    // 如果从主程序获得的 buffer vector 长度大于 25,则删除到只剩下 2 个 buffer
    if (buffersToWrite.size() > 25)
    {
      char buf[256];
      snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
               Timestamp::now().toFormattedString().c_str(),
               buffersToWrite.size()-2);
      // 输出到标准错误流
      fputs(buf, stderr);
      // output is LogFile 
      output.append(buf, static_cast<int>(strlen(buf)));
      // buffer vector 只留下两个 buffer 使用 std::move 移动,节省资源
      buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
    }

    // 将 buffer vector 中的 数据写入 LogFile 中
    for (const auto& buffer : buffersToWrite)
    {
      // FIXME: use unbuffered stdio FILE ? or use ::writev ?
      output.append(buffer->data(), buffer->length());
    }

    if (buffersToWrite.size() > 2)
    {
      // drop non-bzero-ed buffers, avoid trashing 丢弃非零的缓冲区,避免垃圾
      buffersToWrite.resize(2);
    }

    if (!newBuffer1)
    {
      assert(!buffersToWrite.empty());  // not null
      newBuffer1 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer1->reset();
    }

    if (!newBuffer2)
    {
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer2->reset();
    }

    buffersToWrite.clear();
    output.flush();
  }

  output.flush();
}