通用配料类的正确方法

问题描述:

我正在寻找一个允许我添加项目以处理项目并且当项目数量等于批量大小执行某些操作的类。我会用这样的事情:通用配料类的正确方法

Batcher<Token> batcher = new Batcher<Token>(500, Executors.newFixedThreadPool(4)) { 
     public void onFlush(List<Token> tokens) { 
      rest.notifyBatch(tokens); 
     } 
    }; 

    tokens.forEach((t)->batcher.add(t)); 
    batcher.awaitDone(); 

#awaitDone后我知道所有的令牌已被通知。 #onFlush可能会做任何事情,例如,我可能想批量插入数据库。我希望将#onFlush调用放入Executor。

我想出了一个解决方案,但它看起来像很多代码,所以我的问题是,有没有更好的方法,我应该这样做?除了我实施的课程还有一个更好的实施方法吗?看起来像我的解决方案有很多移动件。

这是我想出了代码:

/** 
* Simple class to allow the batched processing of items and then to alternatively wait 
* for all batches to be completed. 
*/ 
public abstract class Batcher<T> { 

    private final int batchSize; 
    private final ArrayBlockingQueue<T> batch; 
    private final Executor executor; 
    private final Phaser phaser = new Phaser(1); 
    private final AtomicInteger processed = new AtomicInteger(0); 

    public Batcher(int batchSize, Executor executor) { 
     this.batchSize = batchSize; 
     this.executor = executor; 
     this.batch = new ArrayBlockingQueue<>(batchSize); 
    } 

    public void add(T item) { 
     processed.incrementAndGet(); 
     while (!batch.offer(item)) { 
      flush(); 
     } 
    } 

    public void addAll(Iterable<T> items) { 
     for (T item : items) { 
      add(item); 
     } 
    } 

    public int getProcessedCount() { 
     return processed.get(); 
    } 

    public void flush() { 
     if (batch.isEmpty()) 
      return; 

     final List<T> batched = new ArrayList<>(batchSize); 
     batch.drainTo(batched, batchSize); 
     if (!batched.isEmpty()) 
      executor.execute(new PhasedRunnable(batched)); 
    } 

    public abstract void onFlush(List<T> batch); 

    public void awaitDone() { 
     flush(); 
     phaser.arriveAndAwaitAdvance(); 
    } 

    public void awaitDone(long duration, TimeUnit unit) throws TimeoutException { 
     flush(); 
     try { 
      phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit); 
     } 
     catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 

    private class PhasedRunnable implements Runnable { 
     private final List<T> batch; 

     private PhasedRunnable(List<T> batch) { 
      this.batch = batch; 
      phaser.register(); 
     } 

     @Override 
     public void run() { 
      try { 
       onFlush(batch); 
      } 
      finally { 
       phaser.arrive(); 
      } 
     } 
    } 
} 

一个Java 8的解决方案将是巨大的。谢谢。

引人瞩目的是,您的代码不适用于多个线程将项目添加到单个Batcher实例。如果我们将此限制转换为指定的用例,则不需要在内部使用专用的并发类。因此,我们可以累积到一个普通的ArrayList中,并在容量耗尽时将此列表换成新列表,而不需要复制项目。这允许简化了代码

public class Batcher<T> implements Consumer<T> { 

    private final int batchSize; 
    private final Executor executor; 
    private final Consumer<List<T>> actualAction; 
    private final Phaser phaser = new Phaser(1); 
    private ArrayList<T> batch; 
    private int processed; 

    public Batcher(int batchSize, Executor executor, Consumer<List<T>> c) { 
     this.batchSize = batchSize; 
     this.executor = executor; 
     this.actualAction = c; 
     this.batch = new ArrayList<>(batchSize); 
    } 

    public void accept(T item) { 
     processed++; 
     if(batch.size()==batchSize) flush(); 
     batch.add(item); 
    } 

    public int getProcessedCount() { 
     return processed; 
    } 

    public void flush() { 
     List<T> current = batch; 
     if (batch.isEmpty()) 
      return; 
     batch = new ArrayList<>(batchSize); 
     phaser.register(); 
     executor.execute(() -> { 
      try { 
       actualAction.accept(current); 
      } 
      finally { 
       phaser.arrive(); 
      } 
     }); 
    } 

    public void awaitDone() { 
     flush(); 
     phaser.arriveAndAwaitAdvance(); 
    } 

    public void awaitDone(long duration, TimeUnit unit) throws TimeoutException { 
     flush(); 
     try { 
      phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit); 
     } 
     catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 

关于Java的8个具体的改进,它采用了Consumer允许无需继承Batcher指定通过lambda表达式的最后行动。此外,PhasedRunnable被lambda表达式替换。作为另一种简化,Batcher<T> implements Consumer<T>其中每个Iterable支持forEach(Consumer<? super T>),这就避免了对方法addAll的需要。

所以使用情况现在看起来像:

Batcher<Token> batcher = new Batcher<>(
    500, Executors.newFixedThreadPool(4), currTokens -> rest.notifyBatch(currTokens)); 

tokens.forEach(batcher); 
batcher.awaitDone(); 
+0

感谢@Holger,为什么没有我的代码工作,如果多个线程正在呼吁#将? – 2015-01-27 19:50:15

+2

因为调用'add'后跟'awaitDone'匹配“check-then-act”模式。如果另一个线程执行某个操作(“add”或“awaitDone”),它就会中断。此外,你已经用'1'配置了'Phaser',这显然意味着调用'awaitDone'的两个线程将不起作用。 – Holger 2015-01-28 08:27:24

+0

所以两个线程可以调用#add,但只有一个线程可以调用#awaitDone。这将允许多个生产者,但只有一个#awaitDone。这是我可以忍受的限制。 – 2015-11-22 20:35:12