嵌套循环并行输出

问题描述:

与dict我有两个一组数据:嵌套循环并行输出

aDict = {'barcode1': [('barcode1', 184), ('barcode1_mut', 2)], 'barcode2': [('barcode2', 138)], 'barcode3': [('barcode3', 375)]} 
bList = [(('barcode1', 'mut1'), 184), (('barcode1_mut', 'mut2'), 2), (('barcode2', 'mut3'), 136), (('barcode2', 'mut4'), 1), (('barcode2', 'mut5'), 1), (('barcode3', 'mut6'), 373), (('barcode3', 'mut7'), 2)] 

而且我在字典aDict在列表bList和结果一致的每一个键,带条码:

>>>print(result) 
{'barcode1': {'barcode1': [('mut1', 184)], 'barcode1_mut': [('mut2', 2)]}, 
'barcode2': {'barcode2': [('mut3', 136), ('mut4', 1), ('mut5', 1)]}, 
'barcode3': {'barcode3': [('mut6', 373), ('mut7', 2)]}} 

但它对我来说太慢了。我尝试将代码与输出的处理行数信息进行并行处理。但在我的实施中,每条线都由所有工人同时处理。

现在,我的实现是这样的:

from collections import defaultdict 
import multiprocessing as mp 

def f(uniqueBarcode): 
    mutBarcodeList = [x[0] for x in aDict[uniqueBarcode]] 
    a = filter(lambda x: x[0][0] in mutBarcodeList, bList.items()) 
    d = defaultdict(tuple) 
    b = [(x[0][0], (x[0][1], x[1])) for x in a] 
    for tup in b: d[tup[0]] += (tup[1],) 
    result = {i[0]:[y for y in i[1]] for i in d.items()} 
    return result 

seqDict={} 

if __name__=='__main__': 
    cpus = mp.cpu_count() 
    pool = mp.Pool(cpus) 
    for barcode in aDict.keys(): 
     seqDict[barcode] = pool.map(f, [barcode]) 
     if len(seqDict) % 100 == 0: 
      print("Processed {} barcodes".format(len(seqDict))) 
    pool.close() 
    pool.join() 

输出:

Processed 100 barcodes 
Processed 100 barcodes 
Processed 100 barcodes 
Processed 100 barcodes 
Processed 100 barcodes 
Processed 100 barcodes 
Processed 100 barcodes 
Processed 100 barcodes 
Processed 200 barcodes 
Processed 200 barcodes 
Processed 200 barcodes 
Processed 200 barcodes 
Processed 200 barcodes 
Processed 200 barcodes 
Processed 200 barcodes 
Processed 200 barcodes 
... 

而且字典seqDict是空的,但一定不会这样 - 第一行第一处理过程中,第二行是第二行...第八行是第八行,第九行再行第一行等

如何正确地并行执行它?

Upd0:我适应Flomp的密码给我

res={} 
for key in aDict: 
    if len(aDict[key]) == 1: 
     res[key] = {key:[(a[1],b) for a,b in bList if a[0] == key]} 
    elif len(aDict[key]) > 1: 
     res[key] = {x[0]:[(a[1],b) for a,b in bList if a[0] == x[0]] for x in aDict[key]} 

但它的工作这么长时间

+0

您可能想要阅读[mcve],并相应地重新提出您的问题。 – boardrider

+0

好吧,我在编辑我的问题 –

起初:将bList转换为字典。

bDict = { 
('barcode1', 'mut1'): 184, 
('barcode1_mut', 'mut2'): 2, 
('barcode2', 'mut3'): 136, 
('barcode2', 'mut4'): 1, 
('barcode2', 'mut5'): 1, 
('barcode3', 'mut6'): 373, 
('barcode3', 'mut7'): 2} 

第二种:将值与相同的条形码组合。

mDict = {} 
for x, y in bDict.items(): 
    if mDict.get(x[0]) == None: 
     mDict[x[0]] = [(x[1], y)] 
    else: 
     mDict[x[0]].append((x[1], y)) 
>>>print(mDict) 
{'barcode1': [('mut1', 184)], 
'barcode1_mut': [('mut2', 2)], 
'barcode2': [('mut3', 136), ('mut4', 1), ('mut5', 1)], 
'barcode3': [('mut6', 373), ('mut7', 2)]} 

第三:将结果分配给唯一的条形码。

seqDict = {x: {y[0]: mDict[y[0]] for y in aDict[x]} for x in aDict.keys()} 

我看到了很多的for循环在你的代码。这会减慢你的程序。下面是一些代码有更好的运行时:

bcDict = {'TTCTCTTACCGGGTAC':1,'ACCTCTCGAGAATTCA':2,'TGCAGTTCTGTGCATC':3} 

bcMutCount = [(('TTCTCTTACCGGGTAC', 'ATTCAACA'), 184), 
(('ACCTCTCGAGAATTCA', 'CATCCCAC'), 136), 
(('ACCTCTCGAGAATTCA', 'CATGCCAC'), 1), 
(('ACCTCTCGAGAATTCA', 'CATCCCCC'), 1), 
(('TGCAGTTCTGTGCATC', 'TCTACATT'), 373), 
(('TGCAGTTCTGTGCATC', 'ACTGCGCA'), 2)] 


for key in bcDict: 
    print({key:[(a[1],b) for a,b in bcMutCount if a[0] == key]}) 

输出:

{'TTCTCTTACCGGGTAC': [('ATTCAACA', 184)]} 
{'ACCTCTCGAGAATTCA': [('CATCCCAC', 136), ('CATGCCAC', 1), ('CATCCCCC', 1)]} 
{'TGCAGTTCTGTGCATC': [('TCTACATT', 373), ('ACTGCGCA', 2)]} 

请纠正我,如果这不是你想要的。上面的代码应该在O(m * n)中运行,其中m是bcDict中的键数和n是bcMutCount的长度。 这是否运行得足够快?

+0

好的,没问题。我正在编辑帖子到最低 –

+0

我编辑了我的原始答案。 – Flomp

+0

谢谢您的回复,但我很抱歉,我没有完全代表bcDict变量。在这本词典中,一个键映射到一个元组列表,其中可能有多个元组。现在我编辑了我的问题。 关于速度:现在我在bcDict中有〜380 000个键,在bcMutCount中有〜10000万个项目:)而我作为空气需要并行化。早期我使用multiprocessing.pool.map(function,bcDict.keys()),但这并没有导致进程的加速。我有两个并行计算的实现,但它们工作缓慢。如有必要,我可以将它们添加到我的问题中。 –