Skip to content

Commit

Permalink
pools: unify constructor to NewResourcePool
Browse files Browse the repository at this point in the history
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
  • Loading branch information
sougou committed May 25, 2019
1 parent 1bbe3cc commit 9cebb40
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 24 deletions.
15 changes: 6 additions & 9 deletions go/pools/resource_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,12 @@ type resourceWrapper struct {
// maxCap specifies the extent to which the pool can be resized
// in the future through the SetCapacity function.
// You cannot resize the pool beyond maxCap.
// If a resource is unused beyond idleTimeout, it's discarded.
// If a resource is unused beyond idleTimeout, it's replaced
// with a new one.
// An idleTimeout of 0 means that there is no timeout.
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration) *ResourcePool {
return NewPrefilledResourcePool(factory, capacity, maxCap, idleTimeout, 0)
}

// NewPrefilledResourcePool creates a pre-filled resource pool.
// prefillParallelism specifies how many resources can be opened in parallel.
func NewPrefilledResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool {
// A non-zero value of prefillParallism causes the pool to be pre-filled.
// The value specifies how many resources can be opened in parallel.
func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool {
if capacity <= 0 || maxCap <= 0 || capacity > maxCap {
panic(errors.New("invalid/out of range capacity"))
}
Expand Down Expand Up @@ -229,7 +226,7 @@ func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error)
// Put will return a resource to the pool. For every successful Get,
// a corresponding Put is required. If you no longer need a resource,
// you will need to call Put(nil) instead of returning the closed resource.
// The will cause a new resource to be created in its place.
// This will cause a new resource to be created in its place.
func (rp *ResourcePool) Put(resource Resource) {
var wrapper resourceWrapper
if resource != nil {
Expand Down
24 changes: 12 additions & 12 deletions go/pools/resource_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestOpen(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 6, 6, time.Second)
p := NewResourcePool(PoolFactory, 6, 6, time.Second, 0)
p.SetCapacity(5)
var resources [10]Resource

Expand Down Expand Up @@ -197,12 +197,12 @@ func TestOpen(t *testing.T) {
func TestPrefill(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewPrefilledResourcePool(PoolFactory, 5, 5, time.Second, 1)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 1)
defer p.Close()
if p.Active() != 5 {
t.Errorf("p.Active(): %d, want 5", p.Active())
}
p = NewPrefilledResourcePool(FailFactory, 5, 5, time.Second, 1)
p = NewResourcePool(FailFactory, 5, 5, time.Second, 1)
defer p.Close()
if p.Active() != 0 {
t.Errorf("p.Active(): %d, want 0", p.Active())
Expand All @@ -213,7 +213,7 @@ func TestShrinking(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
var resources [10]Resource
// Leave one empty slot in the pool
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestClosing(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
var resources [10]Resource
for i := 0; i < 5; i++ {
r, err := p.Get(ctx)
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestIdleTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
defer p.Close()

r, err := p.Get(ctx)
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestIdleTimeoutCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond)
p := NewResourcePool(PoolFactory, 1, 1, 10*time.Millisecond, 0)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -538,7 +538,7 @@ func TestCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(FailFactory, 5, 5, time.Second)
p := NewResourcePool(FailFactory, 5, 5, time.Second, 0)
defer p.Close()
if _, err := p.Get(ctx); err.Error() != "Failed" {
t.Errorf("Expecting Failed, received %v", err)
Expand All @@ -554,7 +554,7 @@ func TestCreateFailOnPut(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 5, 5, time.Second)
p := NewResourcePool(PoolFactory, 5, 5, time.Second, 0)
defer p.Close()
_, err := p.Get(ctx)
if err != nil {
Expand All @@ -571,7 +571,7 @@ func TestSlowCreateFail(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second)
p := NewResourcePool(SlowFailFactory, 2, 2, time.Second, 0)
defer p.Close()
ch := make(chan bool)
// The third Get should not wait indefinitely
Expand All @@ -593,7 +593,7 @@ func TestTimeout(t *testing.T) {
ctx := context.Background()
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
defer p.Close()
r, err := p.Get(ctx)
if err != nil {
Expand All @@ -612,7 +612,7 @@ func TestTimeout(t *testing.T) {
func TestExpired(t *testing.T) {
lastID.Set(0)
count.Set(0)
p := NewResourcePool(PoolFactory, 1, 1, time.Second)
p := NewResourcePool(PoolFactory, 1, 1, time.Second, 0)
defer p.Close()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second))
r, err := p.Get(ctx)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/dbconnpool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
defer cp.mu.Unlock()
cp.info = info
cp.mysqlStats = mysqlStats
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout)
cp.connections = pools.NewResourcePool(cp.connect, cp.capacity, cp.capacity, cp.idleTimeout, 0)
// Check if we need to resolve a hostname (The Host is not just an IP address).
if cp.resolutionFrequency > 0 && net.ParseIP(info.Host) == nil {
cp.hostIsNotIP = true
Expand All @@ -156,7 +156,7 @@ func (cp *ConnectionPool) Open(info *mysql.ConnParams, mysqlStats *stats.Timings
defer cp.wg.Done()
for {
select {
case _ = <-cp.ticker.C:
case <-cp.ticker.C:
cp.refreshdns()
case <-cp.stop:
return
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (cp *Pool) Open(appParams, dbaParams, appDebugParams *mysql.ConnParams) {
f := func() (pools.Resource, error) {
return NewDBConn(cp, appParams)
}
cp.connections = pools.NewPrefilledResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism)
cp.connections = pools.NewResourcePool(f, cp.capacity, cp.capacity, cp.idleTimeout, cp.prefillParallelism)
cp.appDebugParams = appDebugParams

cp.dbaPool.Open(dbaParams, tabletenv.MySQLStats)
Expand Down

0 comments on commit 9cebb40

Please # to comment.