使用Python 3.6.1在Linux/Intel Xeon上使用“fork”上下文块进行多处理?
问题描述:
问题描述
我将代码从this answer稍微调整了一下(见下文)。但是,在Linux上运行此脚本(命令行:python script_name.py
)时,它将打印所有作业的jobs running: x
,但之后似乎卡住了。但是,当我使用spawn方法(mp.set_start_method('spawn')
)时,它运行良好,并立即开始打印counter
变量的值(请参阅listener
方法)。使用Python 3.6.1在Linux/Intel Xeon上使用“fork”上下文块进行多处理?
问题
- 为什么产卵过程时,它才起作用?
- 我怎样才能调整代码,以便它与FORC工作(因为它可能更快)
代码
import io
import csv
import multiprocessing as mp
NEWLINE = '\n'
def file_searcher(file_path):
parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t')
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count())
# put listener to work first
watcher = pool.apply_async(listener, (q,))
jobs = []
for row in parsed_file:
print('jobs running: ' + str(len(jobs) + 1))
job = pool.apply_async(worker, (row, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()
def worker(genome_row, q):
complete_data = []
#data processing
#ftp connection to retrieve data
#etc.
q.put(complete_data)
return complete_data
def listener(q):
'''listens for messages on the q, writes to file. '''
f = io.open('output.txt', 'w', encoding='utf-8')
counter = 0
while 1:
m = q.get()
counter +=1
print(counter)
if m == 'kill':
break
for x in m:
f.write(x + NEWLINE)
f.flush()
f.close()
if __name__ == "__main__":
file_searcher('path_to_some_tab_del_file.txt')
处理器信息
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 20
On-line CPU(s) list: 0-19
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 20
NUMA node(s): 2
Vendor ID: GenuineIntel
CPU family: 6
Model: 45
Model name: Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2596.501
BogoMIPS: 5193.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 25600K
NUMA node0 CPU(s): 0-19
的Linux内核版本
3.10.0-514.26.2.el7.x86_64
Python版本
Python 3.6.1 :: Continuum Analytics, Inc.
LOG
我添加的代码由@yacc的建议,这将提供以下日志:
[server scripts]$ python main_v3.py
[INFO/SyncManager-1] child process calling self.run()
[INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6
[INFO/SyncManager-1] manager serving at '/tmp/pymp-2a9stjh6/listener-jxwseclw'
[DEBUG/MainProcess] requesting creation of a shared 'Queue' object
[DEBUG/SyncManager-1] 'Queue' callable returned object with id '7f0842da56a0'
[DEBUG/MainProcess] INCREF '7f0842da56a0'
[DEBUG/MainProcess] created semlock with handle 139673691570176
[DEBUG/MainProcess] created semlock with handle 139673691566080
[DEBUG/MainProcess] created semlock with handle 139673691561984
[DEBUG/MainProcess] created semlock with handle 139673691557888
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-6] child process calling self.run()
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-7] child process calling self.run()
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-17] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-18] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-19] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-20] child process calling self.run()
jobs running: 1
jobs running: 2
jobs running: 3
jobs running: 4
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-21] child process calling self.run()
jobs running: 5
jobs running: 6
jobs running: 7
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
jobs running: 8
written to file
jobs running: 9
jobs running: 10
[DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection
[DEBUG/ForkPoolWorker-2] making connection to manager
jobs running: 11
jobs running: 12
jobs running: 13
jobs running: 14
jobs running: 15
[DEBUG/SyncManager-1] starting server thread to service 'ForkPoolWorker-2'
jobs running: 16
jobs running: 17
jobs running: 18
jobs running: 19
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'
答
不知道为什么你有这个问题,但不是这个代码是一个简单的事情,做同样的事情?
import csv
import multiprocessing as mp
NEWLINE = '\n'
def file_searcher(file_path):
with open(file_path, 'r', encoding='utf-8') as input_file,
open('output.txt', 'w', encoding='utf-8') as f,
mp.Pool() as pool:
parsed_file = csv.DictReader(input_file, delimiter='\t')
for result in pool.imap(worker, parsed_file):
f.write(result + NEWLINE)
def worker(genome_row):
complete_data = []
#data processing
#ftp connection to retrieve data
#etc.
return complete_data
if __name__ == "__main__":
file_searcher('path_to_some_tab_del_file.txt')
你能提供关于Linux,Python和MP包和硬件/处理器的版本的详细信息? – yacc
我添加了您所要求的信息(请参阅编辑)@yacc我无法弄清楚如何获取MP包版本。我希望你能找到问题 – CodeNoob
多处理是核心库的一部分,所以它和其他版本一样。对我来说(Python 3.4.3)代码工作正常(我改变的只是删除csvreader和读取普通文件)。你有没有尝试在其他地方复制它? – MacHala