只要其中一个读取新输入,我可以维护一个TcpStream的Vec来跨线程写入它们吗?
问题描述:
我想做消息广播:当其中一个客户端发送消息时,服务器将它写入每个套接字。我的主要问题是我无法弄清楚如何发送Vec
到线程。我不能使用Mutex
,因为这会将其他线程的访问锁定到Vec
进行读取。我无法克隆和发送,因为无法克隆和发送TcpStream
。这里是我的尝试到现在为止只要其中一个读取新输入,我可以维护一个TcpStream的Vec来跨线程写入它们吗?
use std::net::{TcpStream, TcpListener};
use std::io::prelude::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use std::cell::RefCell;
type StreamSet = Arc<RefCell<Vec<TcpStream>>>;
type StreamReceiver = Arc<Mutex<Receiver<StreamSet>>>;
fn main() {
let listener = TcpListener::bind("0.0.0.0:8000").unwrap();
let mut connection_set: StreamSet = Arc::new(RefCell::new(vec![]));
let mut id = 0;
let (tx, rx) = channel();
let rx = Arc::new(Mutex::new(rx));
for stream in listener.incoming() {
let receiver = rx.clone();
let stream = stream.unwrap();
(*connection_set).borrow_mut().push(stream);
println!("A connection established with client {}", id);
thread::spawn(move || handle_connection(receiver, id));
id += 1;
tx.send(connection_set.clone()).unwrap();
}
}
fn handle_connection(rx: StreamReceiver, id: usize) {
let streams;
{
streams = *(rx.lock().unwrap().recv().unwrap()).borrow();
}
let mut connection = &streams[id];
loop {
let mut buffer = [0; 512];
if let Err(_) = connection.read(&mut buffer) {
break;
};
println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
if let Err(_) = connection.write(&buffer[..]) {
break;
};
if let Err(_) = connection.flush() {
break;
};
}
}
答
另一个想法是产生一个单一的“控制器”线程和每一个插座一个线程。每个线程都拥有套接字,并有一个通道将数据发送回控制器。控制器将拥有一个Vec
的通道发送给每个线程。当一个线程接收到数据时,将其发送给控制器,该控制器将其复制并发送回每个工作线程。您可以将数据包装在Arc
中以防止不必要的重复,并且您应该提供一个ID以避免将数据回送给原始发件人(如果需要的话)。
这将在一个线程内完全移动所有权,这应该避免您遇到的问题。
你也可以看看Tokio,它应该允许你做类似的事情,但不需要在1-1映射中产生线程。
我不能使用
Mutex
,因为这将锁定其他线程
的访问你总是可以尝试不同的锁定机构,诸如RwLock
。
因为
TcpStream
无法克隆当然
它可以:TcpStream::try_clone
。
啊谢谢。我实际上对编码比较陌生,将在下个月完成一年。所以我试图在使用像tokio这样的抽象之前尽我所能地自己实现它,所以我明白了底下发生了什么。我将尝试实施控制器工作者模型。每个单独的线程是否有单独的通道,或者它们是否使用相同的接收器,发送器对? – codeNoob
我期望控制器线程为每个工作线程有一个唯一的通道向它们发送数据,然后是从每个工作人员返回到控制器的重用通道(因为它们是** m ** ultiple ** p ** roducer ,** s ** ingle ** c ** onsumer频道)。总共应该有N + 1个通道。 – Shepmaster
啊好的。说得通。谢谢。 – codeNoob