multiprocessing.Pool执行从mysqldump到管道数据的并行子进程

问题描述:

我正在使用Python将数据从一个mysql数据库传送到另一个。下面是我一直在使用数月之久,其代码轻轻抽象的版本,这得不错:multiprocessing.Pool执行从mysqldump到管道数据的并行子进程

def copy_table(mytable): 
    raw_mysqldump = "mysqldump -h source_host -u source_user --password='secret' --lock-tables=FALSE myschema mytable" 
    raw_mysql = "mysql -h destination_host -u destination_user --password='secret' myschema" 

    mysqldump = shlex.split(raw_mysqldump) 
    mysql = shlex.split(raw_mysql) 

    ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE) 
    subprocess.check_output(mysql, stdin=ps.stdout) 
    ps.stdout.close() 
    retcode = ps.wait() 
    if retcode == 0: 
     return mytable, 1 
    else: 
     return mytable, 0 

数据的规模不断壮大,目前它需要一个小时左右来复制像30表。为了加快速度,我想利用多处理。我试图在Ubuntu服务器上执行以下代码,该服务器是t2.micro(AWS EC2)。

def copy_tables(tables): 
    with multiprocessing.Pool(processes=4) as pool: 
     params = [(arg, table) for table in sorted(tables)] 
     results = pool.starmap(copy_table, params) 
    failed_tables = [table for table, success in results if success == 0] 
    all_tables_processed = False if failed_tables else True 
    return all_tables_processed 

的问题是:几乎所有的表将复制,但总有一些遗留下来的一对夫妇的子进程将无法完成 - 他们只是挂了,我可以从监控数据库看到任何数据正在转移。这感觉就像他们不知何故与父进程断开连接,或者数据没有被正确地返回。

这是我的第一个问题,我试图做到既具体又简洁 - 在此先感谢您的帮助,请让我知道我是否可以提供更多信息。

我认为下面的代码

ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE) 
subprocess.check_output(mysql, stdin=ps.stdout) 
ps.stdout.close() 
retcode = ps.wait() 

应该

ps = subprocess.Popen(mysqldump, stdout=subprocess.PIPE) 
sps = subprocess.Popen(mysql, stdin=ps.stdout) 
retcode = ps.wait() 
ps.stdout.close() 
sps.wait() 

你不应该关闭该管道,直到mysqldump的过程中完成。 check_output被阻塞,它会挂起,直到stdin达到结尾。

+0

谢谢。我试过了,没有发现可观察到的变化。我理解stdout.close()的目的是 - 如果子进程死亡,请确保将SIGINT返回给父进程。 – speedyturkey

+1

@speedyturkey我的错误,check_output阻塞呼叫,应该是Popen。 – hailinzeng

+0

早期的结果看起来很有希望。神奇的答案,谢谢! – speedyturkey