使用队列在D中的线程之间进行通信
D中很容易使用std.container.dlist
创建队列类型。使用队列在D中的线程之间进行通信
我想有多个线程,但让他们与队列通信,而不是消息传递(https://tour.dlang.org/tour/en/multithreading/message-passing)。据我所知,这些消息的设计始终是在代码中的特定点处接收数据;接收线程将阻塞,直到收到预期的数据。 (编辑:我被告知有关receiveTimeout但没有超时,只是一个检查是真的在这种情况下更合适(也许超时0?)。我也不知道如果多个消息API将做什么消息发送任何任何接收之前,我将不得不与该打。)
void main() {
spawn(&worker, thisTid);
// This line will block until the expected message is received.
receive (
(string message) {
writeln("Received the message: ", text);
},
)
}
我所需要的仅仅接收数据,如果有一些。事情是这样的:
void main() {
Queue!string queue// custom `Queue` type based on DList
spawn(&worker, queue);
while (true) {
// Go through any messages (while consuming `queue`)
for (string message; queue) {
writeln("Received a message: ", text);
}
// Do other stuff
}
}
我一直在使用shared
变量(https://tour.dlang.org/tour/en/multithreading/synchronization-sharing)尝试,但DMD抱怨说:“不许别名可变线程本地的数据。”或其他一些错误,具体情况。
这将如何在D中完成?或者,有没有办法使用消息来进行这种沟通?
我已经得到了我需要的答案。
简单地说,使用core.thread
而不是std.concurrency
。 std.concurrency
为您管理邮件,并且不允许您自己管理邮件。 core.thread
是std.concurrency
在内部使用的。
较长的答案,这里是我如何完全实现它。
我创建了Queue
类型,它基于Singly Linked List,但保留最后一个元素的指针。根据Walter Brights愿景(https://www.youtube.com/watch?v=cQkBOCo8UrE),Queue
也使用标准组件inputRange和outputRange(或者至少我认为它)。 Queue
也被构建为允许一个线程写入,另一个线程在内部很少进行读取操作,因此应该很快。
队列我共享这里https://pastebin.com/ddyPpLrp
一个简单的实现为具有第二螺纹读取输入:
Queue!string inputQueue = new Queue!string;
ThreadInput threadInput = new ThreadInput(inputQueue);
threadInput.start;
while (true) {
foreach (string value; inputQueue) {
writeln(value);
}
}
ThreadInput
被定义为这样的:
class ThreadInput : Thread {
private Queue!string queue;
this(Queue!string queue) {
super(&run);
this.queue = queue;
}
private void run() {
while (true) {
queue.put(readln);
}
}
}
代码https://pastebin.com/w5jwRVrL
的Queue
再次https://pastebin.com/ddyPpLrp
这不回答具体问题,但TI不明朗起来什么,我认为是消息传递API的误解......
就叫receiveTimeout
而不是纯receive
http://dpldocs.info/experimental-docs/std.concurrency.receiveTimeout.html
我使用这个:
shared class Queue(T) {
private T[] queue;
synchronized void opOpAssign(string op)(T object) if(op == "~") {
queue ~= object;
}
synchronized size_t length(){
return queue.length;
}
synchronized T pop(){
assert(queue.length, "Please check queue length, is 0");
auto first = queue[0];
queue = queue[1..$];
return first;
}
synchronized shared(T[]) consume(){
auto copy = queue;
queue = [];
return copy;
}
}
不,这不是我真正需要的,但我确实想念'receiveTimeout'我不知道如何。如果我无法获得其他任何工作,我可能会使用'receiveTimeout'来完成我所需要的工作。 –
给receiveTimout一个负值,如-1会做你想做的。请参阅:https://stackoverflow.com/a/31624806/2026276 – Bauss