并发对象作家通吃优先级超额读者
我正在寻找一个并发对象,可以帮助在以下用例:并发对象作家通吃优先级超额读者
- 线程/实体:1个出版商(唯一),O-许多读取器经常/不正确地更新数据结构,需要以最小的延迟快速地更新数据结构
- 每个读取器都具有对数据结构的读取访问权限(通过不允许写入的东西或者因为读取器暗示承诺不改变数据)
- 每个读者我只要它能够检测到发布者何时来修改它,因为它知道它最终将获得足够的时间来阅读它所需的数据结构。
有什么建议吗?我可以使用ReentrantReadWriteLock
,但有点担心阻止发布者。我宁愿让出版商能够毁掉读者阅读的机会,而不是让读者能够容忍出版商。
出版商螺纹:
PublisherSignal ps = new PublisherSignal();
publishToAllReaders(ps.getReaderSignal());
...
while (inLoop())
{
ps.beginEdit();
data.setSomething(someComputation());
data.setSomethingElse(someOtherComputation());
ps.endEdit();
doOtherStuff();
}
读线程:
PublisherSignal.Reader rs = acquireSignalFromPublisher();
...
while (inLoop())
{
readDataWhenWeGetAChance();
doOtherStuff();
}
...
public readDataWhenWeGetAChance()
{
while (true)
{
rs.beginRead();
useData(data.getSomething(), data.getSomethingElse());
if (rs.endRead())
{
// we get here if the publisher hasn't done a beginEdit()
// during our read.
break;
}
// darn, we have to try again.
// might as well yield thread if appropriate
rs.waitToRead();
}
}
编辑:在较高的水平,我试图做的是有出版商变化数据数千倍第二,然后让读者以更慢的速度显示最新的更新(每秒5-10次)。我将使用ConcurrentLinkedQueue来发布更新已发生的事实,除了(a)可能有数百个更新在同一个项目上,我想合并,因为不得不复制大量数据
看起来像是浪费
是一个性能问题,(b)拥有多个阅读器似乎排除了一个队列......我想我可以有一个主代理阅读器并让它通知每个真实的阅读器。
嗯...我想我的绊脚石是围绕着共享的数据结构本身......我一直在使用类似
public class LotsOfData
{
int fee;
int fi;
int fo;
int fum;
long[] other = new long[123];
/* other fields too */
}
哪里发布者经常更新数据,但一次只能有一个字段。
听起来也许我应该找到一种方法,序列化更新的方式,这有利于使用生产者 - 消费者队列:
public class LotsOfData
{
enum Field { FEE, FI, FO, FUM };
Map<Field, Integer> feeFiFoFum = new EnumMap<Field, Integer>();
long[] other = new long[123];
/* other fields too */
}
,然后发布变更的项目到一个队列,像(FEE,23 )为feeFiFoFum字段,(33,1234567L)为other
阵列。 (出于性能的原因,Bean类型的反射几乎肯定会出现。)
不过,看起来好像我被发布者写的任何想要的显而易见的简单所宠坏,并且知道读者将有时间(最终)进入并获得一个一致的一组数据,如果它只有一个标志,它可以用来判断数据是否已被修改。
更新:有趣,我尝试这个方法,将具有突变的对象(只存储必需的1个变化的状态)的一个的ConcurrentLinkedQueue为类似于第一LotsOfData上述(4个int字段和27名多头阵列的类),以及一个生产者,它在大约10000个批次之间产生总计1000万个带有Thread.sleep(1)的变异,以及一个每隔100毫秒检查一次队列的消费者,并消费出现的任何突变。我跑测试在许多方面:
-
测试框架内
- 空的动作(只是循环1000次,调用了Thread.sleep(1),并检查是否使用空对象):1.95秒,在我的3GHz奔腾4运行jre6u13。
- 测试动作1 - 创建并仅在生产者端应用突变:4.3秒
- 测试动作2 - 创建并在生产者端应用的突变,将每个队列,因为它们被创建:12秒
所以平均230nsec创建每个突变对象,平均770nsec将每个突变对象排入/出队到生产者的队列中并在消费者中将其拉出(对于基元类型执行突变的时间似乎可以忽略不计,与对象创建和队列操作相比,应该是这样)。不错,我想,它给了我一些指标来估计这种方法的性能成本。
为什么不使用BlockingQueue?
您的发布者可以写入此队列,而不管任何正在阅读的内容。读者(同样)可以从队列中取走东西,而不用担心阻止作者。线程安全由队列处理,因此2个线程可以写入/读取,不需要进一步的同步等。
从链接DOC:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
看到我的评论我补充说。 – 2009-08-06 14:05:52
我会让一个消费者获取数据,合并它,然后将其传递给多个下游消费者(可能通过其他队列?) – 2009-08-06 14:07:09