Cat源码分析(一):Client端

前言

cat的Client端所做的工作就是收集埋点信息,将埋点信息处理成messageTree,放到发送队列中,在启动另一个线程,异步消费队列,进行消息的发送。

本文涉及到三个内容:

  • 客户端初始化:做了哪些准备工作
  • message的创建过程
  • 客户端的发送过程

客户端初始化

我们首先看一下Cat这个类的初始化过程

	public static void initialize(PlexusContainer container, File configFile) {
		ModuleContext ctx = new DefaultModuleContext(container);
		Module module = ctx.lookup(Module.class, CatClientModule.ID);

		if (!module.isInitialized()) {
			ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);

			ctx.setAttribute("cat-client-config-file", configFile);
			initializer.execute(ctx, module);
		}
	}

Cat源码分析(一):Client端

  1. 利用plexuscontainer,获得defaultModelContext
  2. 利用context从plexus容器中获得CatClientModule和ModuleInitializer
  3. Initializer.execute(ctx,module)

Cat源码分析(一):Client端

在这个方法中,涉及到最重要的方法就是启动一个statusUpdateTask的线程,下边就是这个线程run方法执行的流程,最重要的一步就是4,将m_statistic和m_jars传入statusInfoCollector的构造函数进行实例化,然后使用访问者模式,statusInfo类就可以set一些指标的了

Cat源码分析(一):Client端

Cat源码分析(一):Client端

Cat源码分析(一):Client端

Cat源码分析(一):Client端

访问者模式:是对象的行为模式。访问者模式的目的是封装一些施加于某种数据结构元素之上的操作。一旦这些操作需要修改的话,接受这个操作的数据结构可以保持不变

  • 抽象访问者:IVisitor BaseVisitor
  • 访问者:statusInfoCollector
  • 抽象元素类:IEntity
  • 元素类:StatusInfo–>BaseEntity–>IEntity

StatusInfo通过访问者StatusInfoCollector对自己进行赋值。如果set的字段发生改变,只需要改变statusInfoCollector的visitStatus方法即可。

Transaction消息创建的过程

Cat源码分析(一):Client端

  1. 从Cat中得到MessageProducer,通过producer来实例化Transaction
Transaction t = Cat.newTransaction("URL", pageName);

public static Transaction newTransaction(String type, String name) {
   return Cat.getProducer().newTransaction(type, name);
}

  1. 判断manager里边是否存在上下文,如果没有,则创建一个上下文,这个上下文使用ThreadLocal进行存储,在context的构造函数中,messageTree和Transaction栈被创建了,然后就会将创建好的context放到ThrealLocal中
	@Override
	public void setup() {
		Context ctx;

		if (m_domain != null) {
			ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
		} else {
			ctx = new Context("Unknown", m_hostName, "");
		}

		m_context.set(ctx);
	}
  1. 开启一个事务
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);

m_manager.start(transaction, false);

4.在start中,获取上下文,调用上下文的start方法,下边来看一下start方法
判断m_stack是否为空,如果不为空,则调用addTransactionChild方法,为空则将Transaction加到MessageTree中,若不是fored类型,则将transaction入栈

public void start(Transaction transaction, boolean forked) {
   if (!m_stack.isEmpty()) {
    Transaction parent = m_stack.peek();
      addTransactionChild(transaction, parent);
   } else {
      m_tree.setMessage(transaction);
   }

   if (!forked) {
      m_stack.push(transaction);
   }
}

判断treePeriod和messagePeriod的大小或长度,是否满足条件,如果满足条件则发送到server端,不满足则add到List中

private void addTransactionChild(Message message, Transaction transaction) {
   long treePeriod = trimToHour(m_tree.getMessage().getTimestamp());
   long messagePeriod = trimToHour(message.getTimestamp() - 10 * 1000L); // 10 seconds extra time allowed

   if (treePeriod < messagePeriod || m_length >= m_configManager.getMaxMessageLength()) {
      m_validator.truncateAndFlush(this, message.getTimestamp());
   }

   transaction.addChild(message);
   m_length++;
}

消息栈Stack和MessageTree的关系是什么??

消息栈是用来存储Transaction事务的,如果发送一个Event类型的message,首先调用context的add方法。

Add方法:判断栈是否为空,为空则说明event是一个独立的消息,没有被嵌套在transaction中,这个消息将被赋值到messageTree中,调用flush,将tree放到队列中,异步发送;如果不为空,则代表Event被嵌套在transaction中,将会调用addTransactionChild方法将event消息添加到Transaction的child中

消息发送

通过Transaction对象的complete方法完成,通过manager的end方法,调用context的end方法进行消息的发送

  1. 从栈顶弹出事务,判断是否使我们传入的事务,如果不是,则认为是被嵌套的子事务,继续弹出,直到弹出我们需要的事务为止
  2. 判断栈是否为空,如果为空,则end传入的事务就是根事务,这个时候才会调用manager.flush方法将消息树上传服务器
public boolean end(DefaultMessageManager manager, Transaction transaction) {
   if (!m_stack.isEmpty()) {
      //从栈顶弹出事务
      Transaction current = m_stack.pop();
      //如果弹出的事务不等于传入的事务
      if (transaction == current) {
         m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
      } else {
         //直到弹出的事务等于我们传入的事务为止
         while (transaction != current && !m_stack.empty()) {
            m_validator.validate(m_stack.peek(), current);

            current = m_stack.pop();
         }
      }
      //如果为空,则任务传入的是根节点,才会调用flush方法
      if (m_stack.isEmpty()) {
         //重新初始化一颗tree
         MessageTree tree = m_tree.copy();

         m_tree.setMessageId(null);
         m_tree.setMessage(null);

         if (m_totalDurationInMicros > 0) {
            adjustForTruncatedTransaction((Transaction) tree.getMessage());
         }

         manager.flush(tree);
         return true;
      }
   }

   return false;
}

在flush方法中,会调用TcpSocketSender方法的send方法,根据tree的类型,将tree放入不同的队列中

Cat源码分析(一):Client端

总结

Cat源码分析(一):Client端

消息通过messageProducer进行创建,在创建的时候会初始化messageManager的上下文。消息会依赖messageManager进行消息的各种操作,上下文起到了至关重要的作用。创建好的event消息,调用其complete()方法将调用上下文的add方法将消息入栈或入队列,这样消息的发送就是异步的过程