如何在线程之间异步传递引用?观察员?

问题描述:

如何保证:如何在线程之间异步传递引用?观察员?

1)localThread和运行相互独立的remoteThread

2.)在localThreadremoteThread之间传递消息?

具体来说,localThread中的字符串对象需要“渗透”到Telnet,我认为它被称为回调。然而,对于Telnetobserve本身没有任何实质性的东西。这是对LocalIO的匿名引用,我没有看到明确提供引用有帮助。

我已阅读约java.util.concurrent.Semaphore,直到我的头爆炸,所有我走开的是,它似乎并不适用。对于这两个线程,无论其他线程在做什么,它们都应该继续运行。但是,需要一些机制来在线程之间传递对象引用...

public class Telnet { 

    public Telnet() throws InterruptedException { 
     startThreads(); 
    } 

    public static void main(String[] args) throws InterruptedException { 
     new Telnet(); 
    } 

    public void startThreads() throws InterruptedException { 
     Semaphore s = new Semaphore(1, true); 

     Thread localThread = new Thread(new LocalIO()); 
     Thread remoteThread = new Thread(new RemoteIO()); 

     localThread.start(); 
     remoteThread.start(); 
    } 
} 

线程本身如下。 LocalIO

public class LocalIO implements Runnable { 

    @Override 
    public void run() { 
     Scanner scanner; 
     String line; 
     while (true) { 
      scanner = new Scanner(System.in); 
      line = scanner.nextLine(); 
      out.println("\n\nyou entered\t\"" + line + "\"\n"); 
     } 
    } 
} 

RemoteIO

public class RemoteIO implements Runnable { 

    private static Logger log = Logger.getLogger(RemoteIO.class.getName()); 
    final String host = "rainmaker.wunderground.com"; 
    final int port = 3000; 

    @Override 
    public void run() { 
     log.fine(host + port); 
     int byteOfData; 
     try (Socket socket = new Socket(host, port); 
       InputStream inputStream = socket.getInputStream(); 
       OutputStream ouputStream = socket.getOutputStream(); 
       PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true); 
       final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in))) { 
      while ((byteOfData = inputStream.read()) != -1) { 
       out.print((char) byteOfData); 
      } 
     } catch (Exception e) { 
      out.println(e); 
     } 
    } 
} 

牢记RemoteIO永远不会关闭它的连接,并无限期地运行。

我发现的溶液:

public class RemoteConnection extends Observable { 

    private static Logger log = Logger.getLogger(RemoteConnection.class.getName()); 
    private final Socket socket; 
    private final BufferedInputStream in; 
    private final BufferedOutputStream out; 
    private final static String UTF8 = "UTF-8"; 

    public RemoteConnection(String host, int port) throws UnknownHostException, IOException { 
     socket = new Socket(host, port); 
     in = new BufferedInputStream(socket.getInputStream()); 
     out = new BufferedOutputStream(socket.getOutputStream()); 
    } 

    public void write(Deque<String> commands) throws IOException { 
     String command; 
     while (!commands.isEmpty()) { 
      command = commands.pop(); 
      out.write(command.concat("\r\n").getBytes(Charset.forName(UTF8))); 
      log.info(command); 
     } 
     out.flush(); 
    } 

    void read() { //probably should use BufferedStream to better effect..? 
     Thread readRemote = new Thread() { 

      @Override 
      public void run() { 
       StringBuilder sb = new StringBuilder(); 
       char ch; 
       int i; 
       while (true) { 
        try { 
         i = in.read(); 
         ch = (char) i; 
         sb.append(ch); 
         System.out.print(ch); 
         if (i == 13) { 
          setChanged(); 
          notifyObservers(sb.toString()); 
          log.fine(sb.toString()); 
          sb = new StringBuilder(); 
         } 
        } catch (IOException ioe) { 
         log.fine(ioe.toString()); 
        } 
       } 
      } 
     }; 
     readRemote.start(); 
    } 
} 

通过重组穿线,这近似于差芒远程登录,与异步线程的I/O。我认为从控制台读取阻塞...东西...

我真的不知道为什么这个工程,但其他方法没有。我宁愿让主类启动并处理线程,并在线程之间传递引用,但是尽管使用了这里提供的各种解决方案,但仍然无法正常工作。

LocalConnection有一个类似的线程方法。

concurrent包是这样的事情非常有帮助: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html

例如,您可以把每个线程的ConcurrentLinkedQueue,他们可以检查队列,看看是否有什么就采取行动时,他们请。与此同时,其他线程可以随时将新对象添加到队列中。

+0

大声笑,是的,但一切认真,我会检查出来,但有**异步** API? – Thufir

+1

我刚刚描述的是异步的。两个线程都不会等待对方,他们只是继续他们正在做的事情,并在他们感觉喜欢时检查新消息。 –

有一个在编程范式一个本质上的区别你的代码可以采用:

  • 同步模式:接收端运行一个无限循环,其中明确需要的项目掀起了并发队列,当有阻塞没有物品准备好;

  • 异步模式:接收端向物品交换机制提交回调。对于从生产者线程到达的每个项目都会调用此回调函数。

观察员模式可以宽松地适用于后一种情况,而不是前者。

另请注意,在后一种情况下,“项目交换机制”通常以同步模式实现。

+0

是的,异步模式是我所追求的。 – Thufir

+0

您可以实现您自己的事件分派循环,然后向其提交回调。它只是几行代码。 –

不知道你在做什么,但是如果你想在线程之间交换数据,你需要一个volatile变量来确保其他线程看到变化。 AtomicReferences是非阻塞的,并提供了一些可能在这里帮助的API。

+0

当前的代码在github上:https://github.com/THUFIR/MudSocketClient/tree/master/src/mudsocketclient当然,它现在有点不同了。我不确定你在这里的意思是什么,也不是原子参考。从我读到的,POJO CubbyHole应该能够在两个线程之间切换(?)。或者,可以通过两个线程访问。但是,我似乎处于一个死锁(?),只有Producer线程继续运行 - 或者它可能会阻止使用者。我不确定。 – Thufir

+1

对于非易失性POJO,如果一个线程对POJO所做的更改对另一个线程可见,则不能保证jvm。原子***类基于此,并在其周围放置一个不错的API。一般来说,回到你的原始问题,你只能确保两个线程独立运行,如果他们没有共享任何东西。但就你而言,他们的确如此。 –

+0

因此,在您的示例代码中,CubbyHole的消息字段应该是易失性的。 http://*.com/questions/6259745/volatile-variable-in-java?rq=1 –