muduo AsyncLogging 解析(异步日志)
异步日志原理
需要了解其中使用的几个类使用方法
muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
std::vector<std::unique_ptr<Buffer>> BufferVector;
BufferVector::value_type BufferPtr;
其中 Buffer 是一个缓冲区,用于接受来自程序的日志输入。
BufferVector是一个用于接受Buffer的容器类,注意其中使用的是唯一指针std::unique_ptr,使用的过程中只需要new 就可以,不需要手动delete 释放,不过相应的也只能使用std::move 进行转移。在当前场景中非常适合,这也是一个值得仔细思考的地方。
BufferPtr 用于简化使用。
其中异步处理日志文件的核心就是使用了另外一个写入线程,专门将需要记录的缓冲区块写入到磁盘中的日志文件中。其中使用 AsyncLogging 的类中有一个 buffer vector ,在异步线程中也存在一个 buffer vector 。在写入线程中,每次达到条件需要写入日志记录时都会将 两个 buffer vector 交换。使用日志的地方继续往新的 buffer vector 中写入日志,而写入线程则将vector 容器中的buffer的内容写入磁盘中。
具体流程用下图表示:
AsyncLogging.h
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#ifndef MUDUO_BASE_ASYNCLOGGING_H
#define MUDUO_BASE_ASYNCLOGGING_H
#include "muduo/base/BlockingQueue.h"
#include "muduo/base/BoundedBlockingQueue.h"
#include "muduo/base/CountDownLatch.h"
#include "muduo/base/Mutex.h"
#include "muduo/base/Thread.h"
#include "muduo/base/LogStream.h"
#include <atomic>
#include <vector>
namespace muduo
{
// 异步日志
class AsyncLogging : noncopyable
{
public:
AsyncLogging(const string &basename,
off_t rollSize, // 滚动大小
int flushInterval = 3);
~AsyncLogging()
{
if (running_)
{
stop();
}
}
void append(const char *logline, int len);
void start()
{
running_ = true;
// 开启线程
thread_.start();
// 等待线程结束
latch_.wait();
}
// 使用了 clang NO_THREAD_SAFETY_ANALYSIS 属性,作用于 method ; 关闭线程的安全性检测
void stop() NO_THREAD_SAFETY_ANALYSIS
{
running_ = false;
cond_.notify();
thread_.join();
}
private:
void threadFunc();
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; // 大缓冲区
typedef std::vector<std::unique_ptr<Buffer>> BufferVector; // 大缓冲区 vector
typedef BufferVector::value_type BufferPtr; // 大缓冲区 vector 元素类型
const int flushInterval_; // 刷新间隔
std::atomic<bool> running_; // 原子bool类型
const string basename_; // 日志文件 最后的文件名
const off_t rollSize_; // 滚动最大值
muduo::Thread thread_;
muduo::CountDownLatch latch_;
muduo::MutexLock mutex_;
muduo::Condition cond_ GUARDED_BY(mutex_);
BufferPtr currentBuffer_ GUARDED_BY(mutex_); // 当前的缓冲区
BufferPtr nextBuffer_ GUARDED_BY(mutex_); // 下一个缓冲区
BufferVector buffers_ GUARDED_BY(mutex_); // 缓冲区 vector
};
} // namespace muduo
#endif // MUDUO_BASE_ASYNCLOGGING_H
AsyncLogging.cpp
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include "muduo/base/AsyncLogging.h"
#include "muduo/base/LogFile.h"
#include "muduo/base/Timestamp.h"
#include <stdio.h>
using namespace muduo;
AsyncLogging::AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval)
: flushInterval_(flushInterval),
running_(false),
basename_(basename),
rollSize_(rollSize),
thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"), // std::functional<void()>
latch_(1),
mutex_(),
cond_(mutex_),
currentBuffer_(new Buffer),
nextBuffer_(new Buffer),
buffers_()
{
// 初始化 缓冲区
currentBuffer_->bzero();
nextBuffer_->bzero();
// vector reserve 16个
buffers_.reserve(16);
}
// 添加日志记录
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else
{
// 将使用完后的 buffer 添加到 buffer vector 后
buffers_.push_back(std::move(currentBuffer_));
if (nextBuffer_)
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
// Buffer 是 unique_ptr 不需要显示调用 delete
currentBuffer_.reset(new Buffer); // Rarely happens 极少数会发生
}
currentBuffer_->append(logline, len);
// 发出信号,有新的 buffer 添加到 vector 后面
cond_.notify();
}
}
// 异步线程执行函数
void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
// 创建一个 日志文件 LogFile
LogFile output(basename_, rollSize_, false);
// BufferPtr 属于 unique_ptr 只能使用 std::move 移动
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite; // 要写入 LogFile 的 buffer vector
buffersToWrite.reserve(16);
// loop
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
// buffersToWrite 必须是空的,newBuffer1、newBuffer2必须是空的并且可用
// 等待主程序,与主程序交互,交换 buffer vector 完成数据的写入
{
muduo::MutexLockGuard lock(mutex_);
// buffers_ 是主程序中写入的缓冲区
if (buffers_.empty()) // unusual usage! 主程序中没有写入日志
{
// 等待主程序中 vector 有新的 buffer 被添加
cond_.waitForSeconds(flushInterval_);
}
// 将主程序中正在使用的 currentBuffer_ 存入 buffers_
buffers_.push_back(std::move(currentBuffer_));
// 使用 newBuffer1 替换正在使用的 currentBuffer_
currentBuffer_ = std::move(newBuffer1);
// 使用新的未使用的 buffersToWrite 交换 buffers_,将buffers_中的数据在异步线程中写入 LogFile 中
buffersToWrite.swap(buffers_);
if (!nextBuffer_)
{
// 如果主程序中的 nextBuffer_ 为nullptr ,使用 newBuffer2 代替
nextBuffer_ = std::move(newBuffer2);
}
}
// 从主程序中获得的 buffer vector 不可以是空的
assert(!buffersToWrite.empty()); // 不可以是 nullptr
// 如果从主程序获得的 buffer vector 长度大于 25,则删除到只剩下 2 个 buffer
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
// 输出到标准错误流
fputs(buf, stderr);
// output is LogFile
output.append(buf, static_cast<int>(strlen(buf)));
// buffer vector 只留下两个 buffer 使用 std::move 移动,节省资源
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}
// 将 buffer vector 中的 数据写入 LogFile 中
for (const auto& buffer : buffersToWrite)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length());
}
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing 丢弃非零的缓冲区,避免垃圾
buffersToWrite.resize(2);
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty()); // not null
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();
}
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
}
output.flush();
}