java之间共享数据线程

问题描述:

我有一个java进程,从套接字服务器读取数据。因此,我有一个BufferedReader和一个PrintWriter对象与该套接字相对应。java之间共享数据线程

现在在同一个java进程中,我有一个多线程的java服务器,它接受客户端连接。我想实现的功能,所有这些客户端,我接受可以从我上面提到的BufferedReader对象读取数据。(使他们可以复用数据)

如何使这些单独的客户端线程读取数据从BuffereReader单个对象?对不起,我感到困惑。

+1

我假设你说所有的线程都应该得到所有的数据,而不仅仅是谁先读取数据应该得到一个特定的数据? – johusman 2011-03-04 07:26:54

+0

雅所有他们应该得到的数据,这是目前来自BufferedReader流.. – ayush 2011-03-04 07:28:02

+1

@ayush:我不认为你理解johusman的问题 - 你是否试图*拆分*不同客户端之间的数据,或者应该*全部*参见*全部*的数据?如果数据是0,1,2,3,4,那么每个客户端应该看到0,1,2,3,4还是可能一个客户端看到0,1,3而另一个看到2,4? – 2011-03-04 07:34:18

我强烈建议他们不要直接访问BufferedReader。假设你知道数据的格式,并且每个客户都试图读取相同的格式(如果情况并非如此,我看不出它是如何工作的),我建议创建一个线程从BufferedReader并将工作项目放入Queue。在Java中使用生产者/消费者队列的示例有批次,这也可能使测试客户端代码更容易。

只有一个线程访问BufferedReader意味着您不必担心定义所有其他线程都必须满足的原子操作 - 您的阅读线程通过决定将工作项添加到队列。

编辑:如果所有的客户端都应该看到数据的所有,这巩固了我更进一步具有单个读者的建议 - 除了,而不必数据Queue该项目从删除,你就会有一个收集哪些客户端可以读取全部现有的数据。您需要使用适当的线程安全集合,但Java中有很多。

编辑:刚刚阅读您的评论,其中说每个客户端应该只看到从阅读器读取的最后一项,这使得它更容易。让一个线程读取数据,但只保留一个引用“最后一个读取项”的变量。您可能想要同步对它的访问或使用AtomicReference,但这两者都很容易。

+0

所以如果我想所有的线程读取所有的数据,那么我该怎么办?有一个文件在哪里写这个数据,然后通过所有的客户端同步访问这个文件..?或者其他一些方法? – ayush 2011-03-04 08:23:44

+0

@ayush:所有的数据,或只是最后一个项目?你是自相矛盾的。无论如何,我的“编辑”段落涵盖了两种可能性 - 一个线程读取数据,但通过AtomicReference或线程安全集合使其可用。 – 2011-03-04 08:28:37

+0

感谢您的解释..数据将持续不断地在'BufferedReader'中,我希望所有线程都能读取该数据(最后一个)..缺少一些数据是可以接受的..但是所有的线程都应该得到相同的数据。 .. – ayush 2011-03-04 08:37:23

最简单的事情可能是创建一个临时文件并使用java.nio.channels.FileChannel。原因是这已经提供了你想要的读写器语义。另一种方法是用内存方案自己重新实现这些语义。

你会做的是从你的阅读器读取并附加到文件的末尾。然后您的读者将获得最初指向文件末尾的文件通道。当你只是在等待阅读器的更多输入时,你必须小心以确保他们不认为它们已经到达尾声。您可以让下游逻辑意识到这一点,或者为读者提供某种包装。

编辑:这是写的理解,你希望所有的读者看到的一切。如果这实际上不是你想要的,这太复杂了。

如果你不能在内存中保存所有的数据,你可以使用这样的事情: (使用一个线程监视源的经典观察者模式和其他人通知的新数据)

class Broadcaster { 
    private final ConcurrentSkipListSet<Listener> listeners = 
     new ConcurrentSkipListSet<Listener>(); 

    private final BufferedReader source = //inject source 

    // register/de-gegister code 
    public void register(Listener listener); 
    public void deRegister(Listener listener); 
    // usually it's used like broadcaster.register(this); 

    public void broadcast(){ 
     String read = null; 
     while((read = source.readLine())!=null){ 
      for(Listener listener : listeners){ 
       listener.send(read); 
      } 
     } 
    } 
} 

class Listener implements Runnable { 
    private final Queue<String> queue = new LinkedBlockingQueue<String>(); 

    public void send(String read){ 
     queue.add(read); 
    } 

    public void run(){ 
     while(!Thread.currentThread().isInterrupted()){ 
      String got = queue.get(); 
      System.out.println(got); 
     } 
    } 
} 

我没有测试这个东西什么的,所以可怜,如果它不能正常工作!

编辑:如果你有记忆的限制,你可能想这样做,以及:

Queue<String> queue = new LinkedBlockingQueue<String>(2000); 

这将使多少元素可以在队列中排队一个上限,而当这限制已达到,想要在其上放置元素的线程将不得不等待(如果使用了add,那就是)。这样,当听众不能够快速消费数据时,广播公司会阻止(等待)(在这种简单的方案中,其他听众可能会饿死)。

编辑2:当你这样做时,你应该最好只在队列上放置不变的东西。如果你把例如byte[],一个不礼貌的监听器可能会改变数据,其他线程可能会感到惊讶。如果这是不可能的,你应该在每个队列上放一个不同的副本,例如byteArray.clone()。尽管这可能会带来一些性能问题。