源码分析ElasticJob 选举及分片
源码分析ElasticJob 选举及分片
elastic-job是将分片项分配至各个运行中的作业服务器,需自行处理实现分片项和数据的关系,分片策略包括(平均分配算法策略,作业名哈希值奇偶数算法策略,轮转分片策略。同时也提供了自定义分片策略的接口)
elastic-job 的分片逻辑是用一台主节点服务器触发,服务器之间需要进行选主,选主的过程就是通过注册中心zookeeper来实现
选主实现
调用源码发现 其调用LeaderService的electLeader方法
public void electLeader() {
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
}
主节点选举时采用先获取先得到方式,其它节点会在主节点选举时,统一等待,选主分布式锁节点目录 /{namespace}/{jobName}/leader/election/latch
LeaderNode.LATCH 节点选举
LeaderElectionExecutionCallback回调会在拿到leader锁的节点触发
接下来分析 jobNodeStorage.executeInLeader()方法
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
//开始
latch.start();
//等待锁释放
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}
latchNode: 分布式锁使用的作业节点名称
callback: 执行后回调函数
同时有很多服务器都会执行选主,为了保证有序性,拿到主节点服务器会执行await 释放锁,执行callback逻辑,判断masterNode.INSTANCE是否被设置 在此期间,其它服务器同样也会获取锁之后执行callback,主节点服务器设置状态后,其它服务器不参与处理
获取分布式锁之后,会判断{namespace}/{jobname}/leader/election/in-stance节点是否存在,不存在则创建临时节点并存储内容
问题思考
一次完整选举成功之后,如果主服务器在此期间出现宕机,ElasticJob 如何剔除节点,让从服务器升级主节点并接管?
- 监控主服务器宕机后,将其节点剔除
- 再触发一次选主过程
接下来重点分析事件监控
public void start() {
addDataListener(new LeaderElectionJobListener());
addDataListener(new LeaderAbdicationJobListener());
}
选主管理器在启动时会添加两个事件监听器
- LeaderElectionJobListener 当主节点宕机后触发重新选主监听器
- LeaderAbdicationJobListener 主节点退位监听器。当通过配置方式在线设置主节点状态为disabled时需要删除主节点信息从而再次**选主事件
LeaderElectionJobListener 监听节点master.INSTANCE,对应路径为{namespace}/{jobname}/leader/election/in-stance,如果主节点宕机,zookeeper心跳检测到后,会删除此节点及其内容,并且服务器监控到事件后会触发调用
LeaderAbdicationJobListener 如果通过后台管理更改主节点状态为 ‘disabled’ ,此时会触发将节点信息删除,并且触发一次选主。
分片实现
接下来分析 ShardingService.setReshardingFlag()
public void setReshardingFlag() {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}
job初始化时调用并设置分片标记,如分片状态存在,等待主节点分片完成后再执行对应分片逻辑
分片逻辑 :ShardingService.shardingIfNecessary
/**
* 如果需要分片且当前节点为主节点, 则作业分片
*
* 如果当前无可用节点则不分片
*/
public void shardingIfNecessary() {
//获取可分片的作业运行实例
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
//判断是否需要进行分片
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
//判断是否是主节点,不是主节点进入if逻辑,同时内部会等待主节点选举完成
if (!leaderService.isLeaderUntilBlock()) {
//等待主节点分片完成
blockUntilShardingCompleted();
return;
}
//等待当前任务的其他分片运行结束
waitingOtherShardingItemCompleted();
LiteJobConfiguration liteJobConfig = configService.load(false);
int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
//设置分片运行标志
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
//清空之前的sharding节点
resetShardingInfo(shardingTotalCount);
//获取配置的分片策略类,不存在使用默认的AverageAllocationJobShardingStrategy
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
//执行分片逻辑
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
}
执行job时候会触发分片,查看分片标志是否重新被设置,没设置触发分片逻辑
执行分片具体方法sharding
public interface JobShardingStrategy {
/**
* 作业分片.
*
* @param jobInstances 所有参与分片的单元列表
* @param jobName 作业名称
* @param shardingTotalCount 分片总数
* @return 分片结果
*/
Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
addAliquant(jobInstances, shardingTotalCount, result);
return result;
}
private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
//分片数/作业实例数,得到每个作业实例最少得到的分片数
int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
int count = 0;
for (JobInstance each : shardingUnits) {
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
//给每个作业实例分配最少得到的分片数
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
//分片数%作业实例数,得到多余的分片数
int aliquant = shardingTotalCount % shardingUnits.size();
int count = 0;
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
//把多余的分片数,按顺序分给作业服务器
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
}
count++;
}
}
}
通过JobShardingStrategy得到分片结果之后,通过PersistShardingInfoTransactionExecutionCallback将结果持久化到zookeeper
@RequiredArgsConstructor
class PersistShardingInfoTransactionExecutionCallback implements TransactionExecutionCallback {
private final Map<JobInstance, List<Integer>> shardingResults;
@Override
public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
for (int shardingItem : entry.getValue()) {
//设置节点/namespace/jobName/sharding/shardingItem/instance = jobinstanceid
curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
}
}
//清除分片标记
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();
//清除分片进行中标记
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();
}
}
本文详细描述了ElasticJob 的选举及分片实现
1.通过先获先得 获取分布式锁成为主节点,创建masterNode.INSTANCE节点并记录节点的信息,监听节点及其目录
2.主服务区宕机后,移除节点,通过事件监听触发选主
3.后台管理设置主服务器为disabled,移除节点,触发选主
4.服务器启动会触发检测分片标记,未标记触发分片逻辑
5.分片逻辑执行完后会持久化到zookeeper
作者简介:张程 技术研究
更多文章请关注微信公众号:zachary分解狮 (frankly0423)