并发对象作家通吃优先级超额读者

问题描述:

我正在寻找一个并发对象,可以帮助在以下用例:并发对象作家通吃优先级超额读者

  • 线程/实体:1个出版商(唯一),O-许多读取器经常/不正确地更新数据结构,需要以最小的延迟快速地更新数据结构
  • 每个读取器都具有对数据结构的读取访问权限(通过不允许写入的东西或者因为读取器暗示承诺不改变数据)
  • 每个读者我只要它能够检测到发布者何时来修改它,因为它知道它最终将获得足够的时间来阅读它所需的数据结构。

有什么建议吗?我可以使用ReentrantReadWriteLock,但有点担心阻止发布者。我宁愿让出版商能够毁掉读者阅读的机会,而不是让读者能够容忍出版商。

出版商螺纹:

PublisherSignal ps = new PublisherSignal(); 
publishToAllReaders(ps.getReaderSignal()); 

    ... 

while (inLoop()) 
{ 
     ps.beginEdit(); 
     data.setSomething(someComputation()); 
     data.setSomethingElse(someOtherComputation()); 
     ps.endEdit(); 

     doOtherStuff(); 
} 

读线程:

PublisherSignal.Reader rs = acquireSignalFromPublisher(); 

     ... 

while (inLoop()) 
{ 
     readDataWhenWeGetAChance(); 

     doOtherStuff(); 
} 

     ... 

public readDataWhenWeGetAChance() 
{ 
     while (true) 
     { 
      rs.beginRead(); 
      useData(data.getSomething(), data.getSomethingElse()); 
      if (rs.endRead()) 
      { 
       // we get here if the publisher hasn't done a beginEdit() 
       // during our read. 
       break; 
      } 

      // darn, we have to try again. 
      // might as well yield thread if appropriate 
      rs.waitToRead(); 
     } 
} 

编辑:在较高的水平,我试图做的是有出版商变化数据数千倍第二,然后让读者以更慢的速度显示最新的更新(每秒5-10次)。我将使用ConcurrentLinkedQueue来发布更新已发生的事实,除了(a)可能有数百个更新在同一个项目上,我想合并,因为不得不复制大量数据 看起来像是浪费 是一个性能问题,(b)拥有多个阅读器似乎排除了一个队列......我想我可以有一个主代理阅读器并让它通知每个真实的阅读器。

嗯...我想我的绊脚石是围绕着共享的数据结构本身......我一直在使用类似

public class LotsOfData 
{ 
     int fee; 
     int fi; 
     int fo; 
     int fum; 

     long[] other = new long[123]; 

     /* other fields too */ 
} 

哪里发布者经常更新数据,但一次只能有一个字段。

听起来也许我应该找到一种方法,序列化更新的方式,这有利于使用生产者 - 消费者队列:

public class LotsOfData 
{ 
     enum Field { FEE, FI, FO, FUM }; 
     Map<Field, Integer> feeFiFoFum = new EnumMap<Field, Integer>(); 

     long[] other = new long[123]; 

     /* other fields too */ 
} 

,然后发布变更的项目到一个队列,像(FEE,23 )为feeFiFoFum字段,(33,1234567L)为other阵列。 (出于性能的原因,Bean类型的反射几乎肯定会出现。)

不过,看起来好像我被发布者写的任何想要的显而易见的简单所宠坏,并且知道读者将有时间(最终)进入并获得一个一致的一组数据,如果它只有一个标志,它可以用来判断数据是否已被修改。

更新:有趣,我尝试这个方法,将具有突变的对象(只存储必需的1个变化的状态)的一个的ConcurrentLinkedQueue为类似于第一LotsOfData上述(4个int字段和27名多头阵列的类),以及一个生产者,它在大约10000个批次之间产生总计1000万个带有Thread.sleep(1)的变异,以及一个每隔100毫秒检查一次队列的消费者,并消费出现的任何突变。我跑测试在许多方面:

    测试框架内
  • 空的动作(只是循环1000次,调用了Thread.sleep(1),并检查是否使用空对象):1.95秒,在我的3GHz奔腾4运行jre6u13。
  • 测试动作1 - 创建并仅在生产者端应用突变:4.3秒
  • 测试动作2 - 创建并在生产者端应用的突变,将每个队列,因为它们被创建:12秒

所以平均230nsec创建每个突变对象,平均770nsec将每个突变对象排入/出队到生产者的队列中并在消费者中将其拉出(对于基元类型执行突变的时间似乎可以忽略不计,与对象创建和队列操作相比,应该是这样)。不错,我想,它给了我一些指标来估计这种方法的性能成本。

为什么不使用BlockingQueue

您的发布者可以写入此队列,而不管任何正在阅读的内容。读者(同样)可以从队列中取走东西,而不用担心阻止作者。线程安全由队列处理,因此2个线程可以写入/读取,不需要进一步的同步等。

从链接DOC:

class Producer implements Runnable { 
    private final BlockingQueue queue; 
    Producer(BlockingQueue q) { queue = q; } 
    public void run() { 
    try { 
     while(true) { queue.put(produce()); } 
    } catch (InterruptedException ex) { ... handle ...} 
    } 
    Object produce() { ... } 
} 

class Consumer implements Runnable { 
    private final BlockingQueue queue; 
    Consumer(BlockingQueue q) { queue = q; } 
    public void run() { 
    try { 
     while(true) { consume(queue.take()); } 
    } catch (InterruptedException ex) { ... handle ...} 
    } 
    void consume(Object x) { ... } 
} 
+0

看到我的评论我补充说。 – 2009-08-06 14:05:52

+0

我会让一个消费者获取数据,合并它,然后将其传递给多个下游消费者(可能通过其他队列?) – 2009-08-06 14:07:09