为什么multiprocessing.pool.map引发PicklingError(Encoding)?

问题描述:

为什么在使用threads时运行下面的代码,但在使用时出现异常multiprocessing被使用?为什么multiprocessing.pool.map引发PicklingError(Encoding)?

from multiprocessing import Pool 
from multiprocessing.dummy import Pool as ThreadsPool 
import urllib2 

urls = [ 
    'http://www.python.org', 
    'http://www.python.org/about/', 
    'http://www.python.org/doc/', 
    'http://www.python.org/download/'] 

def use_threads(): 

    pool = ThreadsPool(4) 
    results = pool.map(urllib2.urlopen, urls) 
    pool.close() 
    pool.join() 

    print [len(x.read()) for x in results] 

def use_procs(): 

    p_pool = Pool(4) 
    p_results = p_pool.map(urllib2.urlopen, urls) 
    p_pool.close() 
    p_pool.join() 

    print 'using procs instead of threads' 
    print [len(x.read()) for x in p_results] 

if __name__ == '__main__': 
    use_procs() 

唯一的例外是

Traceback (most recent call last): 
    File "pools.py", line 39, in <module> 
    use_procs() 
    File "pools.py", line 31, in use_procs 
    p_results = p_pool.map(urllib2.urlopen, urls) 
    File "/usr/lib64/python2.7/multiprocessing/pool.py", line 250, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/usr/lib64/python2.7/multiprocessing/pool.py", line 554, in get 
    raise self._value 
multiprocessing.pool.MaybeEncodingError: Error sending result: '[<addinfourl at 35286624 whose fp = <socket._fileobject object at 0x2198ad0>>]'. Reason: 'PicklingError("Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed",)' 

我知道有间进程和线程如何相互沟通的差异。为什么pickle网站内容失败?我如何设置编码来解决这个问题?

+1

那错误,因为你试图序列套接字对象,这是不可能的 –

+0

有一个想法,我应该通过什么样的功能映射到获得所需的输出? (读取对象的执行) – Vinny

问题ISN编码错误,这是因为酸洗错误,因为urllib2.urlopen()返回的结果是一个不可取消的对象(根据我在代码中获得的错误消息中显示的稍微不同的原因,一个_ssl._SSLSocket)。为了解决这个问题,可以通过在打开url之后读取数据来限制返回对象的使用情况,如下所示。这可能意味着更多的数据需要在进程之间传递。

# Added. 
def get_data(url): 

    soc = urllib2.urlopen(url) 
    return soc.read() 

def use_procs(): 

    p_pool = Pool(4) 
# p_results = p_pool.map(urllib2.urlopen, urls) 
    p_results = p_pool.map(get_data, urls) 
    p_pool.close() 
    p_pool.join() 

    print 'using procs instead of threads' 
# print [len(x.read()) for x in results] 
    print [len(x) for x in p_results] 

输出:提出

using procs instead of threads 
[49062, 41616, 40086, 101224] 

正如我已经提到的 - 由于您试图在进程之间传递套接字对象而引发错误。您必须更改脚本逻辑弄成这个样子:

from multiprocessing.pool import Pool 
from multiprocessing.pool import ThreadPool 
import urllib2 

urls = [ 
    'http://www.python.org', 
    'http://www.python.org/about/', 
    'http://www.python.org/doc/', 
    'http://www.python.org/download/' 
] 

def worker(url): 
    return urllib2.urlopen(url).read() # string returned 

def use_threads(): 

    pool = ThreadPool(4) 
    results = pool.map(worker, urls) 
    pool.close() 
    pool.join() 

    print([len(x) for x in results]) 

def use_procs(): 

    p_pool = Pool(4) 
    p_results = p_pool.map(worker, urls) 
    p_pool.close() 
    p_pool.join() 

    print('using procs instead of threads') 
    print([len(x) for x in p_results]) 

if __name__ == '__main__': 
    use_procs() 

顺便说一句:你可以做池工厂,并挑选从它,而不是在use_threads和​​复制代码池:

from multiprocessing.pool import Pool 
from multiprocessing.pool import ThreadPool 
import urllib2 

urls = [ 
    'http://www.python.org', 
    'http://www.python.org/about/', 
    'http://www.python.org/doc/', 
    'http://www.python.org/download/' 
] 


def worker(url): 
    return urllib2.urlopen(url).read() 


def pool_factory(key, n): 
    if key == 'proc': 
     print('using procs instead of threads') 
     return Pool(n) 
    else: 
     return ThreadPool(n) 


def main(): 

    pool = pool_factory('proc', 4) # change `proc` to anything for using ThreadPool 
    results = pool.map(worker, urls) 
    pool.close() 
    pool.join() 
    print([len(x) for x in results]) 


if __name__ == '__main__': 
    main() 
+0

感谢您的输入。你对返回的字符串是正确的。我没有创建工厂方法,因为此代码仅用于练习,不用于其他代码:-) – Vinny