为什么multiprocessing.pool.map引发PicklingError(Encoding)?
问题描述:
为什么在使用threads
时运行下面的代码,但在使用时出现异常multiprocessing
被使用?为什么multiprocessing.pool.map引发PicklingError(Encoding)?
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadsPool
import urllib2
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.python.org/doc/',
'http://www.python.org/download/']
def use_threads():
pool = ThreadsPool(4)
results = pool.map(urllib2.urlopen, urls)
pool.close()
pool.join()
print [len(x.read()) for x in results]
def use_procs():
p_pool = Pool(4)
p_results = p_pool.map(urllib2.urlopen, urls)
p_pool.close()
p_pool.join()
print 'using procs instead of threads'
print [len(x.read()) for x in p_results]
if __name__ == '__main__':
use_procs()
唯一的例外是
Traceback (most recent call last):
File "pools.py", line 39, in <module>
use_procs()
File "pools.py", line 31, in use_procs
p_results = p_pool.map(urllib2.urlopen, urls)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 250, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 554, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[<addinfourl at 35286624 whose fp = <socket._fileobject object at 0x2198ad0>>]'. Reason: 'PicklingError("Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed",)'
我知道有间进程和线程如何相互沟通的差异。为什么pickle
网站内容失败?我如何设置编码来解决这个问题?
答
问题ISN编码错误,这是因为酸洗错误,因为urllib2.urlopen()
返回的结果是一个不可取消的对象(根据我在代码中获得的错误消息中显示的稍微不同的原因,一个_ssl._SSLSocket
)。为了解决这个问题,可以通过在打开url之后读取数据来限制返回对象的使用情况,如下所示。这可能意味着更多的数据需要在进程之间传递。
# Added.
def get_data(url):
soc = urllib2.urlopen(url)
return soc.read()
def use_procs():
p_pool = Pool(4)
# p_results = p_pool.map(urllib2.urlopen, urls)
p_results = p_pool.map(get_data, urls)
p_pool.close()
p_pool.join()
print 'using procs instead of threads'
# print [len(x.read()) for x in results]
print [len(x) for x in p_results]
输出:提出
using procs instead of threads
[49062, 41616, 40086, 101224]
答
正如我已经提到的 - 由于您试图在进程之间传递套接字对象而引发错误。您必须更改脚本逻辑弄成这个样子:
from multiprocessing.pool import Pool
from multiprocessing.pool import ThreadPool
import urllib2
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.python.org/doc/',
'http://www.python.org/download/'
]
def worker(url):
return urllib2.urlopen(url).read() # string returned
def use_threads():
pool = ThreadPool(4)
results = pool.map(worker, urls)
pool.close()
pool.join()
print([len(x) for x in results])
def use_procs():
p_pool = Pool(4)
p_results = p_pool.map(worker, urls)
p_pool.close()
p_pool.join()
print('using procs instead of threads')
print([len(x) for x in p_results])
if __name__ == '__main__':
use_procs()
顺便说一句:你可以做池工厂,并挑选从它,而不是在use_threads
和复制代码池:
from multiprocessing.pool import Pool
from multiprocessing.pool import ThreadPool
import urllib2
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.python.org/doc/',
'http://www.python.org/download/'
]
def worker(url):
return urllib2.urlopen(url).read()
def pool_factory(key, n):
if key == 'proc':
print('using procs instead of threads')
return Pool(n)
else:
return ThreadPool(n)
def main():
pool = pool_factory('proc', 4) # change `proc` to anything for using ThreadPool
results = pool.map(worker, urls)
pool.close()
pool.join()
print([len(x) for x in results])
if __name__ == '__main__':
main()
+0
感谢您的输入。你对返回的字符串是正确的。我没有创建工厂方法,因为此代码仅用于练习,不用于其他代码:-) – Vinny
那错误,因为你试图序列套接字对象,这是不可能的 –
有一个想法,我应该通过什么样的功能映射到获得所需的输出? (读取对象的执行) – Vinny