给 GPT 的 Prompts
User
gopool.go
package gopooltype Task func()
type GoPool struct { Workers []*Worker MaxWorkers int workerStack []int }
func NewGoPool(maxWorkers int) *GoPool { pool := &GoPool{ MaxWorkers: maxWorkers, Workers: make([]*Worker, maxWorkers), workerStack: make([]int, maxWorkers), } for i := 0; i < maxWorkers; i++ { worker := newWorker() pool.Workers[i] = worker pool.workerStack[i] = i worker.start(pool, i) } return pool }
func (p *GoPool) AddTask(task Task) { workerIndex := p.popWorker() p.Workers[workerIndex].TaskQueue <- task }
func (p *GoPool) Release() { for _, worker := range p.Workers { close(worker.TaskQueue) } }
func (p *GoPool) popWorker() int { workerIndex := p.workerStack[len(p.workerStack)-1] p.workerStack = p.workerStack[:len(p.workerStack)-1] return workerIndex }
func (p *GoPool) pushWorker(workerIndex int) { p.workerStack = append(p.workerStack, workerIndex) }
worker.go
package gopooltype Worker struct { TaskQueue chan Task }
func newWorker() *Worker { return &Worker{ TaskQueue: make(chan Task, 1), } }
func (w *Worker) start(pool *GoPool, workerIndex int) { go func() { for task := range w.TaskQueue { if task != nil { task() } pool.pushWorker(workerIndex) } }() }
似乎你没有考虑到当 workers 耗尽的时候 AddTask 方法应该怎么办,显然继续执行 popWorker() 会导致 slice 越界。我觉得你可以在 GoPool 中添加一个容量非常大的 Channel 来保存来不及分发给 Worker 的 tasks,然后在一个单独的 goroutine 中监听这个 Channel,不断尝试获取空闲的 Worker,将 Channel 中的任务分发给空闲的 workers
DevChat
根据你的反馈,我对代码进行了一些修改。以下是修改后的代码:
在 gopool.go
文件中:
package gopool
import "time"
type Task func()
type GoPool struct {
Workers []*Worker
MaxWorkers int
workerStack []int
taskQueue chan Task
}
func NewGoPool(maxWorkers int) *GoPool {
pool := &GoPool{
MaxWorkers: maxWorkers,
Workers: make([]*Worker, maxWorkers),
workerStack: make([]int, maxWorkers),
taskQueue: make(chan Task, 1e6),
}
for i := 0; i < maxWorkers; i++ {
worker := newWorker()
pool.Workers[i] = worker
pool.workerStack[i] = i
worker.start(pool, i)
}
go pool.dispatch()
return pool
}
func (p *GoPool) AddTask(task Task) {
p.taskQueue <- task
}
func (p *GoPool) Release() {
close(p.taskQueue)
for _, worker := range p.Workers {
close(worker.TaskQueue)
}
}
func (p *GoPool) popWorker() int {
workerIndex := p.workerStack[len(p.workerStack)-1]
p.workerStack = p.workerStack[:len(p.workerStack)-1]
return workerIndex
}
func (p *GoPool) pushWorker(workerIndex int) {
p.workerStack = append(p.workerStack, workerIndex)
}
func (p *GoPool) dispatch() {
for task := range p.taskQueue {
for len(p.workerStack) == 0 {
time.Sleep(time.Millisecond)
}
workerIndex := p.popWorker()
p.Workers[workerIndex].TaskQueue <- task
}
}
在 worker.go
文件中:
package gopool
type Worker struct {
TaskQueue chan Task
}
func newWorker() *Worker {
return &Worker{
TaskQueue: make(chan Task, 1),
}
}
func (w *Worker) start(pool *GoPool, workerIndex int) {
go func() {
for task := range w.TaskQueue {
if task != nil {
task()
}
pool.pushWorker(workerIndex)
}
}()
}
在这个实现中,GoPool
结构体中添加了一个 taskQueue
,用于保存来不及分发给 Worker
的任务。添加了 dispatch
方法,用于在一个单独的 goroutine 中监听 taskQueue
,并将任务分发给空闲的 Worker
。AddTask
方法现在只是将任务添加到 taskQueue
中。