Cat源码分析(一):Client端
前言
cat的Client端所做的工作就是收集埋点信息,将埋点信息处理成messageTree,放到发送队列中,在启动另一个线程,异步消费队列,进行消息的发送。
本文涉及到三个内容:
- 客户端初始化:做了哪些准备工作
- message的创建过程
- 客户端的发送过程
客户端初始化
我们首先看一下Cat这个类的初始化过程
public static void initialize(PlexusContainer container, File configFile) {
ModuleContext ctx = new DefaultModuleContext(container);
Module module = ctx.lookup(Module.class, CatClientModule.ID);
if (!module.isInitialized()) {
ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
ctx.setAttribute("cat-client-config-file", configFile);
initializer.execute(ctx, module);
}
}
- 利用plexuscontainer,获得defaultModelContext
- 利用context从plexus容器中获得CatClientModule和ModuleInitializer
- Initializer.execute(ctx,module)
在这个方法中,涉及到最重要的方法就是启动一个statusUpdateTask的线程,下边就是这个线程run方法执行的流程,最重要的一步就是4,将m_statistic和m_jars传入statusInfoCollector的构造函数进行实例化,然后使用访问者模式,statusInfo类就可以set一些指标的了
访问者模式:是对象的行为模式。访问者模式的目的是封装一些施加于某种数据结构元素之上的操作。一旦这些操作需要修改的话,接受这个操作的数据结构可以保持不变
- 抽象访问者:IVisitor BaseVisitor
- 访问者:statusInfoCollector
- 抽象元素类:IEntity
- 元素类:StatusInfo–>BaseEntity–>IEntity
StatusInfo通过访问者StatusInfoCollector对自己进行赋值。如果set的字段发生改变,只需要改变statusInfoCollector的visitStatus方法即可。
Transaction消息创建的过程
- 从Cat中得到MessageProducer,通过producer来实例化Transaction
Transaction t = Cat.newTransaction("URL", pageName);
public static Transaction newTransaction(String type, String name) {
return Cat.getProducer().newTransaction(type, name);
}
- 判断manager里边是否存在上下文,如果没有,则创建一个上下文,这个上下文使用ThreadLocal进行存储,在context的构造函数中,messageTree和Transaction栈被创建了,然后就会将创建好的context放到ThrealLocal中
@Override
public void setup() {
Context ctx;
if (m_domain != null) {
ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
} else {
ctx = new Context("Unknown", m_hostName, "");
}
m_context.set(ctx);
}
- 开启一个事务
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);
m_manager.start(transaction, false);
4.在start中,获取上下文,调用上下文的start方法,下边来看一下start方法
判断m_stack是否为空,如果不为空,则调用addTransactionChild方法,为空则将Transaction加到MessageTree中,若不是fored类型,则将transaction入栈
public void start(Transaction transaction, boolean forked) {
if (!m_stack.isEmpty()) {
Transaction parent = m_stack.peek();
addTransactionChild(transaction, parent);
} else {
m_tree.setMessage(transaction);
}
if (!forked) {
m_stack.push(transaction);
}
}
判断treePeriod和messagePeriod的大小或长度,是否满足条件,如果满足条件则发送到server端,不满足则add到List中
private void addTransactionChild(Message message, Transaction transaction) {
long treePeriod = trimToHour(m_tree.getMessage().getTimestamp());
long messagePeriod = trimToHour(message.getTimestamp() - 10 * 1000L); // 10 seconds extra time allowed
if (treePeriod < messagePeriod || m_length >= m_configManager.getMaxMessageLength()) {
m_validator.truncateAndFlush(this, message.getTimestamp());
}
transaction.addChild(message);
m_length++;
}
消息栈Stack和MessageTree的关系是什么??
消息栈是用来存储Transaction事务的,如果发送一个Event类型的message,首先调用context的add方法。
Add方法:判断栈是否为空,为空则说明event是一个独立的消息,没有被嵌套在transaction中,这个消息将被赋值到messageTree中,调用flush,将tree放到队列中,异步发送;如果不为空,则代表Event被嵌套在transaction中,将会调用addTransactionChild方法将event消息添加到Transaction的child中
消息发送
通过Transaction对象的complete方法完成,通过manager的end方法,调用context的end方法进行消息的发送
- 从栈顶弹出事务,判断是否使我们传入的事务,如果不是,则认为是被嵌套的子事务,继续弹出,直到弹出我们需要的事务为止
- 判断栈是否为空,如果为空,则end传入的事务就是根事务,这个时候才会调用manager.flush方法将消息树上传服务器
public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
//从栈顶弹出事务
Transaction current = m_stack.pop();
//如果弹出的事务不等于传入的事务
if (transaction == current) {
m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
} else {
//直到弹出的事务等于我们传入的事务为止
while (transaction != current && !m_stack.empty()) {
m_validator.validate(m_stack.peek(), current);
current = m_stack.pop();
}
}
//如果为空,则任务传入的是根节点,才会调用flush方法
if (m_stack.isEmpty()) {
//重新初始化一颗tree
MessageTree tree = m_tree.copy();
m_tree.setMessageId(null);
m_tree.setMessage(null);
if (m_totalDurationInMicros > 0) {
adjustForTruncatedTransaction((Transaction) tree.getMessage());
}
manager.flush(tree);
return true;
}
}
return false;
}
在flush方法中,会调用TcpSocketSender方法的send方法,根据tree的类型,将tree放入不同的队列中
总结
消息通过messageProducer进行创建,在创建的时候会初始化messageManager的上下文。消息会依赖messageManager进行消息的各种操作,上下文起到了至关重要的作用。创建好的event消息,调用其complete()方法将调用上下文的add方法将消息入栈或入队列,这样消息的发送就是异步的过程