File size: 1,827 Bytes
7107f0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
package tache
import (
"context"
"errors"
"fmt"
"log"
"sync/atomic"
)
// Worker is the worker to execute task
type Worker[T Task] struct {
ID int
}
// Execute executes the task
func (w Worker[T]) Execute(task T) {
if isRetry(task) {
task.SetState(StateBeforeRetry)
if hook, ok := Task(task).(OnBeforeRetry); ok {
hook.OnBeforeRetry()
}
}
onError := func(err error) {
task.SetErr(err)
if errors.Is(err, context.Canceled) {
task.SetState(StateCanceled)
} else {
task.SetState(StateErrored)
}
if !needRetry(task) {
if hook, ok := Task(task).(OnFailed); ok {
task.SetState(StateFailing)
hook.OnFailed()
}
task.SetState(StateFailed)
}
}
defer func() {
if err := recover(); err != nil {
log.Printf("error [%s] while run task [%s],stack trace:\n%s", err, task.GetID(), getCurrentGoroutineStack())
onError(NewErr(fmt.Sprintf("panic: %v", err)))
}
}()
task.SetState(StateRunning)
err := task.Run()
if err != nil {
onError(err)
return
}
task.SetState(StateSucceeded)
if onSucceeded, ok := Task(task).(OnSucceeded); ok {
onSucceeded.OnSucceeded()
}
task.SetErr(nil)
}
// WorkerPool is the pool of workers
type WorkerPool[T Task] struct {
working atomic.Int64
workers chan *Worker[T]
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool[T Task](size int) *WorkerPool[T] {
workers := make(chan *Worker[T], size)
for i := 0; i < size; i++ {
workers <- &Worker[T]{
ID: i,
}
}
return &WorkerPool[T]{
workers: workers,
}
}
// Get gets a worker from pool
func (wp *WorkerPool[T]) Get() *Worker[T] {
select {
case worker := <-wp.workers:
wp.working.Add(1)
return worker
default:
return nil
}
}
// Put puts a worker back to pool
func (wp *WorkerPool[T]) Put(worker *Worker[T]) {
wp.workers <- worker
wp.working.Add(-1)
}
|