MySQL备份调度器的实现

    对于MySQL方向的调度需求考虑了好久,总是感觉不够优雅,不够灵活。从我的感觉来看,如果设置成为crontab,其实管理起来是比较臃肿的。 

    当然这些可以通过批量管理来实现,或者说是改进,那么接下来的问题便是管理层面的一个问题了,如果管理这些任务,如果2点触发不够合适,那么几点触发合适,如果有100个任务需要分配和管理,调度就需要出手了,在调度层面的实现,如果暴露给系统层面来处理,其实它是很无助的,因为它也不知道该怎么合理的划分,如果按照个数显然是不合理的,有的数据库大,有的小,如果按照个数来划分,其实意义不大,从本质上没有解决切分的核心。

    所以调度的逻辑从某种程度来说,需要自己来定制,celery可以实现调度的任务处理,但是它不知道任务间的处理逻辑。所以这个思路来落实,那么我们就需要些一个简单的调度算法。

    当然开始说调度算法是枯燥的,我们都看不到结果,还要说一堆的逻辑,所以我们先看一个初步的效果,我们可以*的指定并行度,然后会基于这个配置信息来进行计算,目前的维度是基于备份时间,备份文件大小和备份时间是成比例的关系,那么我们可以基于一个维度来进行计算。

MySQL备份调度器的实现

分组之后,就会在每个备份任务配置后面打一个标签,它是属于哪个组的。一目了然,当然后续要做更多的改进,比如对时间进行细粒度的调度,其实做了分组,再做这个改进,也是行之有道。

一个初步的调度的结果如下,如果看到上面的图,不大确定备份任务是否足够平均,可以简单看一下下面的一个概览图,这是划分为了6个组。

0 [1892, 146, 54, 31, 29, 25]

1 [1500, 218, 204, 114, 59, 30, 27, 25]

2 [1171, 593, 190, 92, 45, 32, 29, 0]

3 [809, 607, 419, 167, 81, 43, 26]

4 [801, 696, 319, 176, 90, 45, 26]

5 [731, 718, 397, 167, 76, 33, 29, 25]

代码层面的实现如下,其实对于单纯的调度算法来说,需要做好数据的输入和输出转换,这个地方也是本次集成的重点和难点。

  大体来说分为4个步骤:

    1.定义一个数据结构来存放,备份ID和备份时间,另外一个数组存放备份时间的集合

     2.基于备份集合来进行分组,就是我们的调度算法

     3.基于分组后的结果进行分组匹配

     4.对输出结果进行重新的过滤和修改

random_lrandom_list = []
init_dict = {}
for tmp in mysql_xtracbackup_rs:
duration_seconds =(tmp.backup_finishtime-tmp.backup_starttime).seconds
           random_list.append(duration_seconds)
init_dict[tmp.backup_id] = duration_seconds
print(init_dict)

       random_list.sort(reverse=1)
print(random_list)

GRP_NO = int(grp_no)
print("grp:",GRP_NO)
array_group = [[0] * 1 for i in range(GRP_NO)]

       sum_value_array = []
for i in range(GRP_NO):
sum_value_array.append(i)

       for i in random_list[1:ARRAY_SIZE + 1]:
print(i)

array_sum_group = [0] * GRP_NO

for index, val in enumerate(random_list[1:GRP_NO + 1]):
           array_group[index][0] = val
array_sum_group[index] = val

       print(random_list)
for index, val in enumerate(random_list[GRP_NO + 1:]):
           min_group_no = array_sum_group.index(min(array_sum_group))
array_group[min_group_no].append(val)
array_sum_group[min_group_no] += val

print("array_group", array_group)
print("array_sum_group", array_sum_group)
return_dict["array_group"] = array_group
return_dict["array_sum_group"] = array_sum_group


for i in range(len(array_group)):
print i,array_group[i]

new_dict =[]
for i in range(len(array_group)):
           for j in range(len(array_group[i])):
               for key in init_dict:
if init_dict[key] == array_group[i][j]:
print key,init_dict[key],array_group[i][j]
                       array_group[i][j]=key

for i in range(len(array_group)):
print i,array_group[i]
for j in range(len(array_group[i])):
MySQL_Xtracbackup_RS.objects.filter(backup_id=array_group[i][j]).update(
service_level=i+1,
                   )
retureturn_dict["mysql_xtracbackup_rs"] = MySQL_Xtracbackup_RS.objects.filter(backup_starttime__range=(starttime, endtime))