Skip to content

Commit

Permalink
Merge pull request #1658 from ydb-platform/pool-error-handling
Browse files Browse the repository at this point in the history
Pool error handling
  • Loading branch information
asmyasnikov authored Feb 20, 2025
2 parents 785ae68 + e629a3b commit 316b1eb
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 129 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added checking errors for conditionally delete item from pool

## v3.99.12
* Internal debug improved

Expand Down
229 changes: 143 additions & 86 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ type (
Item
}
Config[PT ItemConstraint[T], T any] struct {
trace *Trace
clock clockwork.Clock
limit int
createTimeout time.Duration
createItem func(ctx context.Context) (PT, error)
closeTimeout time.Duration
closeItem func(ctx context.Context, item PT)
idleTimeToLive time.Duration
itemUsageLimit uint64
trace *Trace
clock clockwork.Clock
limit int
createTimeout time.Duration
createItemFunc func(ctx context.Context) (PT, error)
mustDeleteItemFunc func(item PT, err error) bool
closeTimeout time.Duration
closeItemFunc func(ctx context.Context, item PT)
idleTimeToLive time.Duration
itemUsageLimit uint64
}
itemInfo[PT ItemConstraint[T], T any] struct {
idle *xlist.Element[PT]
Expand All @@ -54,8 +55,7 @@ type (
Pool[PT ItemConstraint[T], T any] struct {
config Config[PT, T]

createItem func(ctx context.Context) (PT, error)
closeItem func(ctx context.Context, item PT)
createItemFunc func(ctx context.Context) (PT, error)

mu xsync.RWMutex
createInProgress int // KIKIMR-9163: in-create-process counter
Expand All @@ -71,15 +71,21 @@ type (

func WithCreateItemFunc[PT ItemConstraint[T], T any](f func(ctx context.Context) (PT, error)) Option[PT, T] {
return func(c *Config[PT, T]) {
c.createItem = f
c.createItemFunc = f
}
}

func WithMustDeleteItemFunc[PT ItemConstraint[T], T any](f func(item PT, err error) bool) Option[PT, T] {
return func(c *Config[PT, T]) {
c.mustDeleteItemFunc = f
}
}

func WithSyncCloseItem[PT ItemConstraint[T], T any]() Option[PT, T] {
return func(c *Config[PT, T]) {
c.closeItem = func(ctx context.Context, item PT) {
_ = item.Close(ctx)
}
//c.closeItemFunc = func(ctx context.Context, item PT) {
// _ = item.Close(ctx)
//}
}
}

Expand Down Expand Up @@ -131,12 +137,22 @@ func New[PT ItemConstraint[T], T any](
) *Pool[PT, T] {
p := &Pool[PT, T]{
config: Config[PT, T]{
trace: &Trace{},
clock: clockwork.NewRealClock(),
limit: DefaultLimit,
createItem: defaultCreateItem[T, PT],
trace: &Trace{},
clock: clockwork.NewRealClock(),
limit: DefaultLimit,
createItemFunc: func(ctx context.Context) (PT, error) {
var item T

return &item, nil
},
closeItemFunc: func(ctx context.Context, item PT) {
_ = item.Close(ctx)
},
createTimeout: defaultCreateTimeout,
closeTimeout: defaultCloseTimeout,
mustDeleteItemFunc: func(item PT, err error) bool {
return !xerrors.IsRetryObjectValid(err)
},
},
index: make(map[PT]itemInfo[PT, T]),
idle: xlist.New[PT](),
Expand Down Expand Up @@ -168,23 +184,11 @@ func New[PT ItemConstraint[T], T any](
}
}

p.createItem = makeAsyncCreateItemFunc(p)
if p.config.closeItem != nil {
p.closeItem = p.config.closeItem
} else {
p.closeItem = makeAsyncCloseItemFunc[PT, T](p)
}
p.createItemFunc = makeAsyncCreateItemFunc(p)

return p
}

// defaultCreateItem returns a new item
func defaultCreateItem[T any, PT ItemConstraint[T]](context.Context) (PT, error) {
var item T

return &item, nil
}

// makeAsyncCreateItemFunc wraps the createItem function with timeout handling
func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
p *Pool[PT, T],
Expand Down Expand Up @@ -228,7 +232,7 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
defer cancelCreate()
}

newItem, err := p.config.createItem(createCtx)
newItem, err := p.config.createItemFunc(createCtx)
if newItem != nil {
p.mu.WithLock(func() {
var useCounter uint64
Expand Down Expand Up @@ -279,6 +283,80 @@ func makeAsyncCreateItemFunc[PT ItemConstraint[T], T any]( //nolint:funlen
}
}

type (
closeItemOptions struct {
withLock bool
withDeleteFromPool bool
withNotifyStats bool
wg *sync.WaitGroup
}
closeItemOption func(*closeItemOptions)
)

func closeItemWithLock() closeItemOption {
return func(o *closeItemOptions) {
o.withLock = true
}
}

func closeItemWithDeleteFromPool() closeItemOption {
return func(o *closeItemOptions) {
o.withDeleteFromPool = true
}
}

func closeItemNotifyStats() closeItemOption {
return func(o *closeItemOptions) {
o.withNotifyStats = true
}
}

func closeItemWithWaitGroup(wg *sync.WaitGroup) closeItemOption {
return func(o *closeItemOptions) {
o.wg = wg
}
}

func (p *Pool[PT, T]) closeItem(ctx context.Context, item PT, opts ...closeItemOption) {
options := closeItemOptions{}
for _, opt := range opts {
opt(&options)
}
if options.withLock {
p.mu.Lock()
defer p.mu.Unlock()
}

if options.withDeleteFromPool {
if options.withNotifyStats {
p.changeState(func() Stats {
delete(p.index, item)

return p.stats()
})
} else {
delete(p.index, item)
}
}

if t := p.config.closeTimeout; t > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, t)
defer cancel()
}

if options.wg != nil {
options.wg.Add(1)
go func() {
defer options.wg.Done()

p.config.closeItemFunc(ctx, item)
}()
} else {
p.config.closeItemFunc(ctx, item)
}
}

func (p *Pool[PT, T]) stats() Stats {
return Stats{
Limit: p.config.limit,
Expand All @@ -296,24 +374,6 @@ func (p *Pool[PT, T]) Stats() Stats {
return p.stats()
}

func makeAsyncCloseItemFunc[PT ItemConstraint[T], T any](
p *Pool[PT, T],
) func(ctx context.Context, item PT) {
return func(ctx context.Context, item PT) {
go func() {
closeItemCtx, closeItemCancel := xcontext.WithDone(xcontext.ValueOnly(ctx), p.done)
defer closeItemCancel()

if d := p.config.closeTimeout; d > 0 {
closeItemCtx, closeItemCancel = xcontext.WithTimeout(closeItemCtx, d)
defer closeItemCancel()
}

_ = item.Close(closeItemCtx)
}()
}
}

func (p *Pool[PT, T]) changeState(changeState func() Stats) {
if stats, onChange := changeState(), p.config.trace.OnChange; onChange != nil {
onChange(stats)
Expand Down Expand Up @@ -357,7 +417,15 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
}

defer func() {
_ = p.putItem(ctx, item)
if finalErr == nil || !p.config.mustDeleteItemFunc(item, finalErr) {
_ = p.putItem(ctx, item)
} else {
p.closeItem(ctx, item,
closeItemWithLock(),
closeItemNotifyStats(),
closeItemWithDeleteFromPool(),
)
}
}()

err = f(ctx, item)
Expand Down Expand Up @@ -434,14 +502,13 @@ func (p *Pool[PT, T]) Close(ctx context.Context) (finalErr error) {
p.waitQ.Clear()

var wg sync.WaitGroup
wg.Add(p.idle.Len())

for el := p.idle.Front(); el != nil; el = el.Next() {
go func(item PT) {
defer wg.Done()
p.closeItem(ctx, item)
}(el.Value)
delete(p.index, el.Value)

p.closeItem(ctx, el.Value,
closeItemWithWaitGroup(&wg),
)
}

wg.Wait()
Expand Down Expand Up @@ -664,14 +731,11 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /

if (p.config.itemUsageLimit > 0 && *info.useCounter > p.config.itemUsageLimit) ||
(p.config.idleTimeToLive > 0 && p.config.clock.Since(info.lastUsage) > p.config.idleTimeToLive) {
p.closeItem(ctx, item)
p.mu.WithLock(func() {
p.changeState(func() Stats {
delete(p.index, item)

return p.stats()
})
})
p.closeItem(ctx, item,
closeItemWithLock(),
closeItemNotifyStats(),
closeItemWithDeleteFromPool(),
)

continue
}
Expand All @@ -680,7 +744,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
}
}

item, err := p.createItem(ctx)
item, err := p.createItemFunc(ctx)
if item != nil {
return item, nil
}
Expand Down Expand Up @@ -814,38 +878,31 @@ func (p *Pool[PT, T]) putItem(ctx context.Context, item PT) (finalErr error) {
}
select {
case <-p.done:
p.closeItem(ctx, item)
p.mu.WithLock(func() {
p.changeState(func() Stats {
delete(p.index, item)

return p.stats()
})
})
p.closeItem(ctx, item,
closeItemWithLock(),
closeItemNotifyStats(),
closeItemWithDeleteFromPool(),
)

return xerrors.WithStackTrace(errClosedPool)
default:
p.mu.Lock()
defer p.mu.Unlock()

if !item.IsAlive() {
p.closeItem(ctx, item)
p.changeState(func() Stats {
delete(p.index, item)

return p.stats()
})
p.closeItem(ctx, item,
closeItemNotifyStats(),
closeItemWithDeleteFromPool(),
)

return xerrors.WithStackTrace(errItemIsNotAlive)
}

if p.idle.Len() >= p.config.limit {
p.closeItem(ctx, item)
p.changeState(func() Stats {
delete(p.index, item)

return p.stats()
})
p.closeItem(ctx, item,
closeItemNotifyStats(),
closeItemWithDeleteFromPool(),
)

return xerrors.WithStackTrace(errPoolIsOverflow)
}
Expand Down
Loading

0 comments on commit 316b1eb

Please # to comment.