蟒蛇 - >多模块

问题描述:

这里就是我试图完成 -蟒蛇 - >多模块

  1. 我有一百万个文件,我需要解析&追加解析的内容到一个文件中。
  2. 由于单个过程需要很长时间,因此此选项不可用。
  3. 在Python中不使用线程,因为它实质上涉及运行单个进程(由于GIL)。
  4. 因此使用多处理模块。即产生4个子过程,以利用所有原始核心功率:)

到目前为止好,现在我需要一个共享对象,所有的子进程都可以访问。我正在使用多处理模块中的队列。此外,所有子流程都需要将其输出写入单个文件。一个潜在的地方使用锁我猜。有了这个设置,当我运行时,我没有得到任何错误(所以父进程看起来很好),它只是停滞不前。当我按ctrl-C时,我看到一个回溯(每个子进程一个)。也没有输出写入输出文件。这里的代码(注意,没有多进程运行良好) -

import os 
import glob 
from multiprocessing import Process, Queue, Pool 

data_file = open('out.txt', 'w+') 

def worker(task_queue): 
    for file in iter(task_queue.get, 'STOP'): 
     data = mine_imdb_page(os.path.join(DATA_DIR, file)) 
     if data: 
      data_file.write(repr(data)+'\n') 
    return 

def main(): 
    task_queue = Queue() 
    for file in glob.glob('*.csv'): 
     task_queue.put(file) 
    task_queue.put('STOP') # so that worker processes know when to stop 

    # this is the block of code that needs correction. 
    if multi_process: 
     # One way to spawn 4 processes 
     # pool = Pool(processes=4) #Start worker processes 
     # res = pool.apply_async(worker, [task_queue, data_file]) 

     # But I chose to do it like this for now. 
     for i in range(4): 
      proc = Process(target=worker, args=[task_queue]) 
      proc.start() 
    else: # single process mode is working fine! 
     worker(task_queue) 
    data_file.close() 
    return 

我做错了什么?我也尝试在生成时将打开的file_object传递给每个进程。但没有效果。例如 - Process(target=worker, args=[task_queue, data_file])。但是这并没有改变任何东西。我觉得由于某种原因,子进程无法写入文件。要么file_object的实例没有得到复制(在产卵时)或其他一些怪癖......任何人有一个想法?

另外:也有任何方法来保持一个持久的mysql_connection打开&传递给子进程吗?所以我打开我的父进程中的mysql连接&打开的连接应该可以访问我所有的子进程。基本上这相当于python中的shared_memory。这里的任何想法?

+0

如果你不写入文件,但做一个打印,它的工作呢? (在Linux上我会做python script.py> out.dat以防止屏幕泛滥)。 – extraneon 2010-08-27 17:37:49

+1

我认为proc.start是非阻塞的,因此您可能应该等待某个地方让流程有机会在做datafile.close() – extraneon 2010-08-27 17:42:30

+0

data_file.close()的最后完成一些工作。它应该在这里有效吗?另外打印工作正常。当我使用print时,我在屏幕上看到输出...但是我想使用文件。帮帮我! 也有没有办法保持一个持久的mysql_connection打开并传递给子进程? – 2010-08-27 17:47:00

尽管与Eric的讨论富有成效,但后来我发现了一个更好的方法。在多处理模块中有一个名为“Pool”的方法,它非常适合我的需求。

它优化自己的核心数量我的系统。即只有当许多过程产生时才会产生。的核心。当然这是可定制的。所以这是代码。稍后可以帮助他人 -

from multiprocessing import Pool 

def main(): 
    po = Pool() 
    for file in glob.glob('*.csv'): 
     filepath = os.path.join(DATA_DIR, file) 
     po.apply_async(mine_page, (filepath,), callback=save_data) 
    po.close() 
    po.join() 
    file_ptr.close() 

def mine_page(filepath): 
    #do whatever it is that you want to do in a separate process. 
    return data 

def save_data(data): 
    #data is a object. Store it in a file, mysql or... 
    return 

仍然会经历这个庞大的模块。不确定save_data()是由父进程执行还是由生成的子进程使用此函数。如果是保存的孩子,在某些情况下可能会导致并发问题。如果任何人有使用这个模块的经验,你在这里感谢更多的知识...

为多的文档表明进程之间的共享状态的几种方法:

http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes

我敢肯定,每个进程都有一个新的解释,然后目标(功能)和args被装进去。在这种情况下,脚本中的全局命名空间将被绑定到你的工作函数,所以data_file将会在那里。但是,我不确定文件描述符在复制时发生了什么。你有没有尝试传递文件对象作为参数之一?

另一种方法是通过另一个队列来保存工人的结果。工作人员put的结果和主码get的结果并将其写入文件。

+0

是啊!我可以做到。我可以有另一个Queue,它们就像进程写入的out_queue一样。由于父进程有权访问它可以继续读取该队列并写入文件。这可以工作! 此外,我尝试传递文件对象作为参数之一。它似乎没有工作。线程不写入文件。 也埃里克,任何想法如何传递一个持久的mysql连接到子进程? – 2010-08-27 18:13:49

+0

@Srikar,希望有所帮助。至于mysql连接,我不确定那个。我会说你最好每个过程都有独立的连接。即使你可以建立连接,我也不确定它是如何“线程安全”的。如果你真的必须分享一个,那么你可能不得不做一些奇怪的事情。然后再次,您可以在Queue中代理连接的查询/响应机制。然后主进程(或单独的mysql处理程序进程)从队列中获取查询,运行它们,并将结果返回......或类似的东西。 – 2010-08-27 18:39:34