给 GPT 的 Prompts
User
接着我希望你考虑优雅关闭,在 Release 的时候应该让已经在运行的 workers 能够继续运行完,所以在 close(p.taskQueue) 之后,应该接着等待,最后释放可以释放的内存。
DevChat
为了实现优雅关闭,我们可以在 Release
方法中添加一个循环,等待所有的 Worker
完成他们的任务。这可以通过检查 workerStack
的长度来实现,当 workerStack
的长度等于 MaxWorkers
时,说明所有的 Worker
都已经完成了他们的任务。
以下是修改后的 Release
方法:
func (p *GoPool) Release() {
close(p.taskQueue)
for len(p.workerStack) != p.MaxWorkers {
time.Sleep(time.Millisecond)
}
for _, worker := range p.Workers {
close(worker.TaskQueue)
}
p.Workers = nil
p.workerStack = nil
}
在这个实现中,Release
方法首先关闭 taskQueue
,然后等待所有的 Worker
完成他们的任务。然后关闭所有 Worker
的 TaskQueue
,并释放 Workers
和 workerStack
的内存。
User
gopool.go
package gopoolimport "time"
type Task func()
type GoPool struct { Workers []*Worker MaxWorkers int workerStack []int taskQueue chan Task }
func NewGoPool(maxWorkers int) *GoPool { pool := &GoPool{ MaxWorkers: maxWorkers, Workers: make([]*Worker, maxWorkers), workerStack: make([]int, maxWorkers), taskQueue: make(chan Task, 1e6), } for i := 0; i < maxWorkers; i++ { worker := newWorker() pool.Workers[i] = worker pool.workerStack[i] = i worker.start(pool, i) } go pool.dispatch() return pool }
func (p *GoPool) AddTask(task Task) { p.taskQueue <- task }
func (p *GoPool) Release() { close(p.taskQueue) for len(p.workerStack) != p.MaxWorkers { time.Sleep(time.Millisecond) } for _, worker := range p.Workers { close(worker.TaskQueue) } p.Workers = nil p.workerStack = nil }
func (p *GoPool) popWorker() int { workerIndex := p.workerStack[len(p.workerStack)-1] p.workerStack = p.workerStack[:len(p.workerStack)-1] return workerIndex }
func (p *GoPool) pushWorker(workerIndex int) { p.workerStack = append(p.workerStack, workerIndex) }
func (p *GoPool) dispatch() { for task := range p.taskQueue { for len(p.workerStack) == 0 { time.Sleep(time.Millisecond) } workerIndex := p.popWorker() p.Workers[workerIndex].TaskQueue <- task } }
worker.go
package gopooltype Worker struct { TaskQueue chan Task }
func newWorker() *Worker { return &Worker{ TaskQueue: make(chan Task, 1), } }
func (w *Worker) start(pool *GoPool, workerIndex int) { go func() { for task := range w.TaskQueue { if task != nil { task() } pool.pushWorker(workerIndex) } }() }
push 和 pop 操作没有加锁,并发的时候应该会出问题,我觉得应该在 GoPool 里添加一个互斥锁;此外 dispatch() 中的循环 sleep 看起来不太优雅,或许可以添加条件变量,当 len(p.workerStack) == 0 时进入等待;同时 Worker.start() 方法在完成 push worker 之后通过 cond.Signal() 唤醒 dispatch()
DevChat
根据你的反馈,我对代码进行了一些修改。以下是修改后的代码:
在 gopool.go
文件中:
package gopool
import (
"sync"
)
type Task func()
type GoPool struct {
Workers []*Worker
MaxWorkers int
workerStack []int
taskQueue chan Task
mutex sync.Mutex
cond *sync.Cond
}
func NewGoPool(maxWorkers int) *GoPool {
pool := &GoPool{
MaxWorkers: maxWorkers,
Workers: make([]*Worker, maxWorkers),
workerStack: make([]int, maxWorkers),
taskQueue: make(chan Task, 1e6),
}
pool.cond = sync.NewCond(&pool.mutex)
for i := 0; i < maxWorkers; i++ {
worker := newWorker()
pool.Workers[i] = worker
pool.workerStack[i] = i
worker.start(pool, i)
}
go pool.dispatch()
return pool
}
func (p *GoPool) AddTask(task Task) {
p.taskQueue <- task
}
func (p *GoPool) Release() {
close(p.taskQueue)
p.cond.L.Lock()
for len(p.workerStack) != p.MaxWorkers {
p.cond.Wait()
}
p.cond.L.Unlock()
for _, worker := range p.Workers {
close(worker.TaskQueue)
}
p.Workers = nil
p.workerStack = nil
}
func (p *GoPool) popWorker() int {
p.mutex.Lock()
workerIndex := p.workerStack[len(p.workerStack)-1]
p.workerStack = p.workerStack[:len(p.workerStack)-1]
p.mutex.Unlock()
return workerIndex
}
func (p *GoPool) pushWorker(workerIndex int) {
p.mutex.Lock()
p.workerStack = append(p.workerStack, workerIndex)
p.mutex.Unlock()
p.cond.Signal()
}
func (p *GoPool) dispatch() {
for task := range p.taskQueue {
p.cond.L.Lock()
for len(p.workerStack) == 0 {
p.cond.Wait()
}
p.cond.L.Unlock()
workerIndex := p.popWorker()
p.Workers[workerIndex].TaskQueue <- task
}
}
在 worker.go
文件中:
package gopool
type Worker struct {
TaskQueue chan Task
}
func newWorker() *Worker {
return &Worker{
TaskQueue: make(chan Task, 1),
}
}
func (w *Worker) start(pool *GoPool, workerIndex int) {
go func() {
for task := range w.TaskQueue {
if task != nil {
task()
}
pool.pushWorker(workerIndex)
}
}()
}
在这个实现中,GoPool
结构体中添加了一个互斥锁和一个条件变量。popWorker
和 pushWorker
方法现在都使用互斥锁来保护对 workerStack
的访问。pushWorker
方法在添加一个 Worker
到 workerStack
后,会使用 cond.Signal()
来唤醒 dispatch
方法。dispatch
方法现在使用 cond.Wait()
来等待一个 Worker
变为可用。Release
方法现在使用 cond.Wait()
来等待所有的 Worker
完成他们的任务。