在Python中使用多处理的对象的共享数组
问题描述:
我想在python程序的不同进程之间创建一个共享对象(类的实例)数组。这些对象中的每一个都将在程序中修改。为此我使用的多为:在Python中使用多处理的对象的共享数组
import multiprocessing
import numpy as np
sh = multiprocessing.RawArray (ctypes.py_object, 10)
f = np.frombuffer(sh, dtype=object)
,我得到的错误是:
Traceback (most recent call last):
File "<pyshell#14>", line 1, in <module>
f = np.frombuffer(sh, dtype=object)
ValueError: cannot create an OBJECT array from memory buffer
现在的问题是,首先,这是一般解决这个问题的正确方法第二,在上面的代码中我的错误是什么?先谢谢你。
答
ctypes.py_object
代表C中的PyObject *
。它是一个指向struct
的指针,它表示一个Python对象,驻留在进程的私有内存中,并且包含更多指针。另一个进程无法访问它;试图在进程之间共享指针是没有用的。请参阅this answer by Alex Martelli。
答
您可能想要使用multiprocessing.Queue
,您可以在其中转储对象而不用担心类型。它也是线程安全的和流程安全的。
下面是一个简单的队列示例,用于简化生产者 - 消费者问题(original source,比萨饼对我有点帮助)。
from multiprocessing import Process, Queue
class Pizza(object):
def __init__(self, pizza_num):
self.pizza_num = pizza_num
self.num_slices = 8
sentinel = "NO PIZZA"
def producer(initial_num_pizzas, total_num_pizzas, q):
"""Cooks Pizzas to be consumed and waits for the consumer to finish eating."""
print("Producer: I am cooking %s Pizzas and putting them on the Queue!"%(total_num_pizzas-initial_num_pizzas))
for i in range(q.qsize(), total_num_pizzas):
print("Producer: Behold, for I have cooked Pizza no. %s"%i)
q.put(Pizza(i))
q.put(sentinel)
def consumer(q):
"""Consumes some Pizza. In this case, all it does is set the number of slices to 0."""
while True:
pizza = q.get()
pizza.num_slices = 0
if pizza == sentinel:
break
print("Comsumer: Pizza no. %s was found! It has %s slices, yum!"%(pizza.pizza_num, pizza.num_slices))
if __name__ == '__main__':
q = Queue()
total_num_pizzas = 10
initial_num_pizzas = 4
## Let's add some Pizzas beforehand:
for i in range(0, initial_num_pizzas):
q.put(Pizza(i))
print("Main: I have precooked %s Pizzas."%q.qsize())
producer_proc = Process(target=producer, args=(initial_num_pizzas, total_num_pizzas, q))
consumer_proc = Process(target=consumer, args=(q,))
producer_proc.start()
consumer_proc.start()
q.close() ## Shop is closed, no more Pizzas will be added to Queue!
q.join_thread()
producer_proc.join()
consumer_proc.join()
下面是一个输出示例。如果运行它,生产者和消费者打印语句可能会以不同方式进行交错,因为并行进程的非确定性执行。
Main: I have precooked 4 Pizzas.
Producer: I am cooking 6 Pizzas and putting them on the Queue!
Producer: Behold, for I have cooked Pizza no. 4
Producer: Behold, for I have cooked Pizza no. 5
Producer: Behold, for I have cooked Pizza no. 6
Producer: Behold, for I have cooked Pizza no. 7
Comsumer: Pizza no. 0 was found! It has 8 slices, yum!
Comsumer: Pizza no. 1 was found! It has 8 slices, yum!
Producer: Behold, for I have cooked Pizza no. 8
Comsumer: Pizza no. 2 was found! It has 8 slices, yum!
Producer: Behold, for I have cooked Pizza no. 9
Comsumer: Pizza no. 3 was found! It has 8 slices, yum!
Comsumer: Pizza no. 4 was found! It has 8 slices, yum!
Comsumer: Pizza no. 5 was found! It has 8 slices, yum!
Comsumer: Pizza no. 6 was found! It has 8 slices, yum!
Comsumer: Pizza no. 7 was found! It has 8 slices, yum!
Comsumer: Pizza no. 8 was found! It has 8 slices, yum!
Comsumer: Pizza no. 9 was found! It has 8 slices, yum!
请注意,您应该use Sentinels to mark the end of your Queue。我在这里使用了“NO PIZZA”,但他们可以做任何事情。
实际上,使用Array和RawArray(在多处理中)是一种在内存中创建共享数组以供多个进程访问的方法。我可以写上面的代码让我们说一个共享的整数数组,但是有问题写它的一组对象。例如,如果我写: >>> import multiprocessing as mp >>> import numpy as np >>> sh = mp.RawArray(ctypes.c_int,10) >>> f = np.frombuffer(sh, dtype = int) >>> f [0] = 1 >>> sh [0] 它完美地工作! – user823743 2014-11-07 15:13:53
@ user823743正确,但你不能用指针做同样的事情。 – 2014-11-07 18:04:19
你是对的;幸运的是,我发现了另一种避免多处理的方法。谢谢。 – user823743 2014-11-08 19:24:27