将数据库记录集流到多个线程工作人员

问题描述:

我有一个过程,需要从数据库流式传输数据并将记录传递到外部服务器进行处理,然后再将结果存储回数据库。将数据库记录集流到多个线程工作人员

Get database row from table A 
Hand off to external server 
Receive result 
insert database row into table B 

目前,这是一个单线程操作,并且瓶颈是外部服务器的过程,所以我想通过使用外部服务器进程的其他实例来处理请求,以提高性能。

Get 100 database rows from table A 
    For each row 
    Hand off to external server 1 
    Receive Result 
    insert database row into table B 
In parallel get 100 database rows from table A 
    For each row 
    Hand off to external server 2 
    Receive Result 
    insert database row into table B 

问题1 我一直在研究Java的线程池和调度记录到外部服务器的这种方式,但我不知道如何从数据库尽可能快地没有工人提取记录拒绝新的任务。这可以通过线程池来完成吗?应该使用什么架构来实现这个目标?

问题2 目前我已经使用批处理语句对数据库插入进行了优化,并且只处理了一次2000条记录。在工人中采用类似的方法是否可能?

任何帮助构建这个问题的解决方案将不胜感激。

+0

能否请您解释一下“我不知道如何从数据库中提取记录尽可能快地让工作人员拒绝新任务。“?线程池是否会拒绝您的任务,因为您快速生成如此多的任务以致池的等待队列已满?如果是这样,你可以使每个任务的批量更大。 (例如1000行而不是100行) – James

+0

我可以增加队列的大小,但考虑到我将有固定数量的工作人员,我可能会遇到队列仍然填满的情况。我能想到解决这个问题的唯一方法就是调整程序,在n毫秒的时间内暂停创建新任务,这样工作人员就应该完成足够的队列。 – niallsco

根据您的意见,我认为关键是控制未完成任务的计数。您有几种选择:

  1. 对数据集中的记录数进行估计。然后,决定一个能产生合理数量任务的批量大小。例如,如果要将待处理任务数限制为100.那么如果您有100K条记录,则可以拥有1K的批处理大小。如果您有1Mil记录,则将批量大小设置为10K。

  2. 将您自己的有界BlockingQueue提供给线程池。如果你之前没有做过,那么在做这件事之前,你可能应该仔细研究java.util.concurrent包装。

  3. 或者你可以使用一个java.util.concurrent.Semaphore,这是一个比一个用户提供的队列简单的设施:

    • 声明一个信号量与待处理的任务数限制

    Semaphore mySemaphore = new Semaphore(max_pending_task_count);

    • 由于您的任务生成速度很快,因此您可以使用单个线程来生成所有任务。在你的任务生成螺纹:

    while(hasMoreTasks()) { 
     // this will block if you've reached the count limit 
     mySemaphore.acquire(); 
     // generate a new task only after acquire 
     // The new task must have a reference to the Semaphore 
     Task task = new Task(..., mySemaphore); 
     threadpool.submit(task); 
    } 
    // now that you've generated all tasks, 
    // time to wait for them to finish. 
    // you may have a better way to detect that, however 
    while(mySemaphore.availablePermits() < max_pending_task_count) { 
     Thread.sleep(some_time); 
    } 
    // now, go ahead dealing with the results 
  • 在任务线程:
 
    public void run() { 
     ... 
     // when finished, do a release which increases the permit 
     // by 1 and inform your task generator thread to produce 1 more task 
     mySemaphore.release(); 
    }