Workerpool上的频道死锁
问题描述:
我正在通过制作1000名工作人员的工作区来播放频道。目前,我收到以下错误:Workerpool上的频道死锁
fatal error: all goroutines are asleep - deadlock!
这里是我的代码:
package main
import "fmt"
import "time"
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
for i:=0;i<len(results);i++ {
<-results
}
}
这究竟是为什么?我还是新来的,我希望能够理解这一点。
答
问题是您的渠道正在填满。在读取任何结果之前,main()
例程会尝试将所有作业放入jobs
通道。但results
频道只有100个结果的空间才能阻止频道的写入,因此所有工作人员最终都会阻止在此频道中等待空间 - 因为main()
尚未开始从results
开始读取,所以永远不会出现空间。
要快速解决此问题,您可以使jobs
大到足以容纳所有作业,以便main()
函数可以继续读取阶段;或者您可以让results
大到足以保存所有结果,这样工作人员就可以输出结果而不会阻塞。
一个更好的方法是让另一个够程,填补了jobs
队列,所以main()
可以直接去读取结果:
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 1000; w++ {
go worker(w, jobs, results)
}
go func() {
for j := 1; j < 1000000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}
for i := 1; i < 1000000; i++ {
<-results
}
}
注意,我不得不最终for
循环更改为固定数量的迭代,否则它可能会在所有结果被读取之前终止。
答
以下代码:
for j := 1; j < 1000000; j++ {
jobs <- j
}
应在一个单独的goroutine运行,因为所有的工人将阻塞等待主gorourine接收结果信道上,而主够程卡在循环。
答
虽然托马斯的回答基本上是正确的,我张贴我的版本,这是IMO最好去也无缓冲通道工程:
func main() {
jobs := make(chan int)
results := make(chan int)
var wg sync.WaitGroup
// you could init the WaitGroup's count here with one call but this is error
// prone - if you change the loop's size you could forget to change the
// WG's count. So call wg.Add in loop
//wg.Add(1000)
for w := 1; w <= 1000; w++ {
wg.Add(1)
go func() {
worker(w, jobs, results)
defer wg.Done()
}()
}
go func() {
for j := 1; j < 2000; j++ {
jobs <- j
}
close(jobs)
fmt.Println("==========CLOSED==============")
}()
// in this gorutine we wait until all "producer" routines are done
// then close the results channel so that the consumer loop stops
go func() {
wg.Wait()
close(results)
}()
for i := range results {
fmt.Print(i, " ")
}
fmt.Println("==========DONE==============")
}
目前两个通道都在100缓冲为何仍发生,如果我删除缓冲区? – rhillhouse
如果您未指定通道大小,则会得到0的缓冲区大小,这意味着写入通道的通道将阻塞,直到读取器可用。您无法通过设计创建无限大小的频道(例如,这可能会导致服务器内存使用量不受限制)。 – Thomas