zipkin+sleuth定时任务生成全局调用链(十二)
问题背景
github提供的方式,是一个zipkin-dependencies-xxx.jar ,通过启动这个jar包生成调用链,这个jar包启动后,执行一次线程就会自动结束了,这在生产环境是不可忍受的。
准备
在github上下载最新的 release源码。
https://github.com/openzipkin/zipkin-dependencies/releases/tag/release-1.9.1
找到里面包含elasticsearch的文件夹zipkin-dependencies-release-1.9.1\elasticsearch\src\main\java\zipkin\dependencies\elasticsearch,从该文件中复制出
ElasticsearchDependenciesJob.java
TraceIdAndJsonToDependencyLinks.java
找到main文件夹下面的日志打印类zipkin-dependencies-release-1.9.1\main\src\main\java\zipkin\dependencies
LogInitializer.java
pom修改
<properties>
<spark.version>2.1.1</spark.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<!-- avoids compile error: Could not access type DataFrame in package org.apache.spark.sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_${scala.binary.version}</artifactId>
<version>6.0.0-beta2</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<!-- 当前最新版本1.1.3-SNAPSHOT -->
<dependency>
<groupId>com.enmonster.platform</groupId>
<artifactId>monster-rocketmq-spring-boot-starter</artifactId>
<version>1.1.3-SNAPSHOT</version>
</dependency>
</dependencies>
创建config文件
@ConfigurationProperties("zipkin.storage.elasticsearch")
@Data
@Component
public class ElasticSearchConfig {
// es的集群地址
private String hosts;
// 集群名称
private String cluster;
// 索引名称
private String index;
}
创建定时任务
public class TimedJobListener implements Listener<String> {
@Resource
private ElasticSearchConfig elasticSearchConfig;
@Override
public void onMessage(Message<String> message) throws Exception {
Runnable logInitializer = LogInitializer.create("info"); // 日志级别
logInitializer.run(); // 启动日志打印线程
ElasticsearchDependenciesJob.builder()
.logInitializer(logInitializer) // 设置日志打印 线程
.day(LocalDateTime.now().plusDays(-1L).toInstant(ZoneOffset.of("+8")).toEpochMilli()) // 生成调用链的日志,这里是生成昨天的
.hosts(elasticSearchConfig.getHosts()) // es的集群地址
.index(elasticSearchConfig.getIndex()) // es的索引名称
.build()
.run();
}
}
我这边使用的是xxl-job + mq 的分布式定时任务平台, 仅需要注意onMessage里面的的代码即可 , 根据自身的项目技术栈,然后去实现就好了。