|
package errgroup |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"sync" |
|
"sync/atomic" |
|
|
|
"github.com/avast/retry-go" |
|
) |
|
|
|
type token struct{} |
|
type Group struct { |
|
cancel func(error) |
|
ctx context.Context |
|
opts []retry.Option |
|
|
|
success uint64 |
|
|
|
wg sync.WaitGroup |
|
sem chan token |
|
} |
|
|
|
func NewGroupWithContext(ctx context.Context, limit int, retryOpts ...retry.Option) (*Group, context.Context) { |
|
ctx, cancel := context.WithCancelCause(ctx) |
|
return (&Group{cancel: cancel, ctx: ctx, opts: append(retryOpts, retry.Context(ctx))}).SetLimit(limit), ctx |
|
} |
|
|
|
func (g *Group) done() { |
|
if g.sem != nil { |
|
<-g.sem |
|
} |
|
g.wg.Done() |
|
atomic.AddUint64(&g.success, 1) |
|
} |
|
|
|
func (g *Group) Wait() error { |
|
g.wg.Wait() |
|
return context.Cause(g.ctx) |
|
} |
|
|
|
func (g *Group) Go(f func(ctx context.Context) error) { |
|
if g.sem != nil { |
|
g.sem <- token{} |
|
} |
|
|
|
g.wg.Add(1) |
|
go func() { |
|
defer g.done() |
|
if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { |
|
g.cancel(err) |
|
} |
|
}() |
|
} |
|
|
|
func (g *Group) TryGo(f func(ctx context.Context) error) bool { |
|
if g.sem != nil { |
|
select { |
|
case g.sem <- token{}: |
|
default: |
|
return false |
|
} |
|
} |
|
|
|
g.wg.Add(1) |
|
go func() { |
|
defer g.done() |
|
if err := retry.Do(func() error { return f(g.ctx) }, g.opts...); err != nil { |
|
g.cancel(err) |
|
} |
|
}() |
|
return true |
|
} |
|
|
|
func (g *Group) SetLimit(n int) *Group { |
|
if len(g.sem) != 0 { |
|
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) |
|
} |
|
if n > 0 { |
|
g.sem = make(chan token, n) |
|
} else { |
|
g.sem = nil |
|
} |
|
return g |
|
} |
|
|
|
func (g *Group) Success() uint64 { |
|
return atomic.LoadUint64(&g.success) |
|
} |
|
|
|
func (g *Group) Err() error { |
|
return context.Cause(g.ctx) |
|
} |
|
|