想要在Java中使用多线程并行化嵌套for循环

问题描述:

我想使用执行器服务并使用java中的任何其他方法并行化嵌套for循环。我想创建一些固定数量的线程,以便CPU不会被线程完全获取。每个线程都在做一些独立的工作。第一个线程应该执行j = 0,20,30,40,...第二个线程应该执行j = 1,21,31,41,...每个线程都应该执行并行执行。这是我想做的事,想要在Java中使用多线程并行化嵌套for循环

ExecutorService service = Executors.newFixedThreadPool(NoOfThreads); 
    for(int i = 0; i < 100; i++) { 
    for(int j=0; j < 50000000; j++) { 
     //some independent work 
     //parallelize this work 
    ... 

这里是我做过什么

for (int i = 0; i < 100; i++) { 
    ExecutorService executorService = Executors.newFixedThreadPool(20); 
    for (int j=0; j < 50000000; j++) { 
    executorService.execute(new Runnable() { 
     @Override 
     public void run() { 
     //do some work 
     //send data to some api 
     }}); 
    } 
    executorService.shutdown(); 
    while (!executorService.isTerminated()) { 
    System.out.print(""); 
    } 
} 

我要确保这是我想做什么做正确的实施。请让我知道我该如何改进我的代码。

+0

我建议使用并行数据流,并让它处理你的问题。它可能工作,它可能无法工作,我不确定,但我会测试它。 – Shadov

+0

@沙多夫我认为这不是很好的建议。简单的事实是:生活中没有任何弯路。尤其是在谈论在编程中使用“并行”抽象时。你必须**了解**你在做什么。仅仅对那些对平行事物没有太多线索的人进行“平行流”并不会有多大帮助。他已经使用执行者和任务负担过重。拉起平行流有什么好处?! – GhostCat

好了,有两个明显的问题就在这里:

while (!executorService.isTerminated()) { 
    System.out.print(""); 
} 

将意味着你主要线程正在调用该代码会做等待。你应该在这里做一个Thread.sleep()调用;以避免绝对没有理由烧毁数以百万计的CPU周期。

但没关系太多;因为你的代码在这里

for (int i = 0; i < 100; i++) { 
    ExecutorService executorService = Executors.newFixedThreadPool(20); 

创建100 * 20个线程;从事50000000个任务,你打算推进它们。也许我错了;但我有一定的直觉,会将大多数系统推向极限。

从结构上讲,这不是一个好方法。创建该服务;并添加任务;而任务内容本身都是不同的东西。

在这个意义上说:

    要创建 一个执行服务(使用多个线程,不知怎的,类似于底层硬件的能力)
  • 要推
  • 进入该任务的数量

含义:当您必须处理100万个元素时,您不会创建100万个任务。你可能创建1000个任务;每个人工作1000个元素。

您的代码会分配50000000个Runnable对象,这会很慢。 相反,每个工作线程应运行一个循环,以获取要处理的下一个“j”值。使用AtomicInteger确保每个“j”值只处理一次。

下面是一些大纲代码:

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicInteger; 

public class ParallelLoop { 
    public static void main(String[] args) throws InterruptedException { 

     for (int i = 0; i < 10; i++) { 
      final int n = 50000000; 
      final AtomicInteger atomicJ = new AtomicInteger(); 

      int nThread = 20; 
      ExecutorService es = Executors.newFixedThreadPool(nThread); 
      for (int t = 0; t < nThread; t++) { 
       Runnable r = new Runnable() { 
        public void run() { 
         while (true) { 
          int j = atomicJ.getAndIncrement(); 
          if (j >= n) 
           return; 
          // Process J .... 
         } 
        } 
       }; 
       es.submit(r); 
      } 
      es.shutdown(); 
      es.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
      System.out.println("===="); 
     } 
    } 

}