// 一个生产者 funcproducer(wo chan<- Task) { var i int64 for i = 1; i <= taskNum; i++ { t := Task{ ID: i, } wo <- t } // 单个生产者就可以直接关闭通道了,关闭后,消费者任然可以消费 close(wo) }
// 一个消费者 funcconsumer(ro <-chan Task) { for t := range ro { if t.ID != 0 { t.run() } } }
// 一个生产者 funcproducer(wo chan<- Task) { var i int64 for i = 1; i <= taskNum; i++ { t := Task{ ID: i, } wo <- t } // 单个生产者就可以直接关闭通道了,关闭后,消费者任然可以消费 close(wo) }
// 一个消费者 funcconsumer(ro <-chan Task) { for t := range ro { if t.ID != 0 { t.run() } } }
funcExec() { wg := &sync.WaitGroup{} wg.Add(1) gofunc(wg *sync.WaitGroup) { defer wg.Done() producer(taskCh) }(wg) var i int64 for i = 0; i < taskNum; i++ { if i%100 == 0 { // 根据任务增量来逐渐开新的消费者去消费 wg.Add(1) gofunc(wg *sync.WaitGroup) { defer wg.Done() consumer(taskCh) }(wg) } } wg.Wait() out.Println("执行成功")
// 多个生产者 funcproducer(wo chan<- Task, startNum int64, nums int64) { var i int64 for i = startNum; i < startNum+nums; i++ { t := Task{ ID: i, } wo <- t } }
这里需要注意的两个问题:一个是,在 for 循环中的变量 i 可能会存在内存共享的问题,因为在可能在本次循环中 i 的值为 199,但是在协程开始执行后,传入 producer() 函数的 i 的值就变成了 200,所以这里需要用参数将 i 的值传到对应的协程中。另一个问题是,在关闭通道 close(taskch) 的时候,这里可能会存在一个极小的时间差,可能会存在还有协程在往通道里面写数据,所以这里用 go(close) 会保险一点。
/* 可能存在的问题 1、go close 去关闭channel,因为可能还有协程在向里面写数据,有极小的时间差 2、生产者在生产的时候,可能存在数据竞争问题 */ // 缓冲池 var taskCh = make(chan Task, 10)
// 生产者需要生产的任务数量 const taskNum int64 = 10000
// 每个生产者生产的任务数量,100 const nums int64 = 100
// 多个生产者 funcproducer(wo chan<- Task, startNum int64, nums int64) { var i int64 for i = startNum; i < startNum+nums; i++ { t := Task{ ID: i, } wo <- t } }
// 一个消费者 funcconsumer(ro <-chan Task) { for t := range ro { if t.ID != 0 { t.run() } } }
funcExec() { // 保证生产者任务生产完毕 wg := &sync.WaitGroup{} // 保证生产者任务生产完毕后,将 channel 关闭 pwg := &sync.WaitGroup{} var i int64 wg.Add(1) for i = 0; i < taskNum; i += nums { if i >= taskNum { break } // 每个生产者生产 100 个任务 wg.Add(1) pwg.Add(1) // 问题2:参数传递 gofunc(i int64) { defer wg.Done() defer pwg.Done() producer(taskCh, i, nums) }(i) }
生产者由于是无限生产,那毫无疑问生产者逻辑是写在一个 for 循环内的,这里为了避免缓冲区满了,生产者因为阻塞而导致无法接收到 done 信号,我们配合 select 来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
funcproducer(wo chan<- Task, done chanstruct{}) { var i int64 for { if i >= TaskNum { // 无限生产 i = 0 } i++ t := Task{ ID: i, } // 可以防止因为生产者阻塞,而导致关闭信号无法关闭 select { case wo <- t: case <-done: out.Println("生产者退出") return } } }
同样,我们的消费者逻辑肯定也是放在 for 循环中来写,并且也配合 select 来接收信号:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
funcconsumer(ro <-chan Task, done chanstruct{}) { for { select { case t := <-ro: if t.ID != 0 { t.run() } case <-done: // 这里如果直接退出的话,可能 channel 里面还有值没有被消费(有缓存区的情况) for t := range ro { // 生产者那边已经停止,消息不会再生产。消费者这里将所有消息消费后,就可以 退出了 if t.ID != 0 { t.run() } } out.Println("消费者退出") return } } }
funcproducer(wo chan<- Task, done chanstruct{}) { var i int64 for { if i >= TaskNum { // 无限生产 i = 0 } i++ t := Task{ ID: i, } // 可以防止因为生产者阻塞,而导致关闭信号无法关闭 select { case wo <- t: case <-done: out.Println("生产者退出") return } } }
funcconsumer(ro <-chan Task, done chanstruct{}) { for { select { case t := <-ro: if t.ID != 0 { t.run() } case <-done: // 这里如果直接退出的话,可能 channel 里面还有值没有被消费(有缓存区的情况) for t := range ro { // 生产者那边已经停止,消息不会再生产。消费者这里将所有消息消费后,就可以 退出了 if t.ID != 0 { t.run() } } out.Println("消费者退出") return } } }
funcExec() { // 多个生产者 go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done) go producer(taskCh, done)
// 多个消费者 go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done) go consumer(taskCh, done)