|
|
|
package task |
|
|
|
import ( |
|
"context" |
|
"runtime" |
|
|
|
"github.com/pkg/errors" |
|
log "github.com/sirupsen/logrus" |
|
) |
|
|
|
var ( |
|
PENDING = "pending" |
|
RUNNING = "running" |
|
SUCCEEDED = "succeeded" |
|
CANCELING = "canceling" |
|
CANCELED = "canceled" |
|
ERRORED = "errored" |
|
) |
|
|
|
type Func[K comparable] func(task *Task[K]) error |
|
type Callback[K comparable] func(task *Task[K]) |
|
|
|
type Task[K comparable] struct { |
|
ID K |
|
Name string |
|
state string |
|
status string |
|
progress float64 |
|
|
|
Error error |
|
|
|
Func Func[K] |
|
callback Callback[K] |
|
|
|
Ctx context.Context |
|
cancel context.CancelFunc |
|
} |
|
|
|
func (t *Task[K]) SetStatus(status string) { |
|
t.status = status |
|
} |
|
|
|
func (t *Task[K]) SetProgress(percentage float64) { |
|
t.progress = percentage |
|
} |
|
|
|
func (t Task[K]) GetProgress() float64 { |
|
return t.progress |
|
} |
|
|
|
func (t Task[K]) GetState() string { |
|
return t.state |
|
} |
|
|
|
func (t Task[K]) GetStatus() string { |
|
return t.status |
|
} |
|
|
|
func (t Task[K]) GetErrMsg() string { |
|
if t.Error == nil { |
|
return "" |
|
} |
|
return t.Error.Error() |
|
} |
|
|
|
func getCurrentGoroutineStack() string { |
|
buf := make([]byte, 1<<16) |
|
n := runtime.Stack(buf, false) |
|
return string(buf[:n]) |
|
} |
|
|
|
func (t *Task[K]) run() { |
|
t.state = RUNNING |
|
defer func() { |
|
if err := recover(); err != nil { |
|
log.Errorf("error [%s] while run task [%s],stack trace:\n%s", err, t.Name, getCurrentGoroutineStack()) |
|
t.Error = errors.Errorf("panic: %+v", err) |
|
t.state = ERRORED |
|
} |
|
}() |
|
t.Error = t.Func(t) |
|
if t.Error != nil { |
|
log.Errorf("error [%+v] while run task [%s]", t.Error, t.Name) |
|
} |
|
if errors.Is(t.Ctx.Err(), context.Canceled) { |
|
t.state = CANCELED |
|
} else if t.Error != nil { |
|
t.state = ERRORED |
|
} else { |
|
t.state = SUCCEEDED |
|
t.SetProgress(100) |
|
if t.callback != nil { |
|
t.callback(t) |
|
} |
|
} |
|
} |
|
|
|
func (t *Task[K]) retry() { |
|
t.run() |
|
} |
|
|
|
func (t *Task[K]) Done() bool { |
|
return t.state == SUCCEEDED || t.state == CANCELED || t.state == ERRORED |
|
} |
|
|
|
func (t *Task[K]) Cancel() { |
|
if t.state == SUCCEEDED || t.state == CANCELED { |
|
return |
|
} |
|
if t.cancel != nil { |
|
t.cancel() |
|
} |
|
|
|
t.state = CANCELING |
|
} |
|
|
|
func WithCancelCtx[K comparable](task *Task[K]) *Task[K] { |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
task.Ctx = ctx |
|
task.cancel = cancel |
|
task.state = PENDING |
|
return task |
|
} |
|
|