Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Shard awareness extension for Scylla patchset 2 #1211

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type Conn struct {
version uint8
currentKeyspace string
host *HostInfo
supported map[string][]string

session *Session

Expand Down Expand Up @@ -378,21 +379,22 @@ func (s *startupCoordinator) options(ctx context.Context) error {
return err
}

supported, ok := frame.(*supportedFrame)
v, ok := frame.(*supportedFrame)
if !ok {
return NewErrProtocol("Unknown type of response to startup frame: %T", frame)
}
s.conn.supported = v.supported

return s.startup(ctx, supported.supported)
return s.startup(ctx)
}

func (s *startupCoordinator) startup(ctx context.Context, supported map[string][]string) error {
func (s *startupCoordinator) startup(ctx context.Context) error {
m := map[string]string{
"CQL_VERSION": s.conn.cfg.CQLVersion,
}

if s.conn.compressor != nil {
comp := supported["COMPRESSION"]
comp := s.conn.supported["COMPRESSION"]
name := s.conn.compressor.Name()
for _, compressor := range comp {
if compressor == name {
Expand Down Expand Up @@ -464,6 +466,10 @@ func (s *startupCoordinator) authenticateHandshake(ctx context.Context, authFram
}

func (c *Conn) closeWithError(err error) {
if c == nil {
return
}

if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
return
}
Expand Down
129 changes: 48 additions & 81 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -258,52 +257,51 @@ type hostConnPool struct {
addr string
size int
keyspace string
// protection for conns, closed, filling
mu sync.RWMutex
conns []*Conn
closed bool
filling bool

pos uint32
// protection for connPicker, closed, filling
mu sync.RWMutex
connPicker ConnPicker
closed bool
filling bool
}

func (h *hostConnPool) String() string {
h.mu.RLock()
defer h.mu.RUnlock()
size, _ := h.connPicker.Size()
return fmt.Sprintf("[filling=%v closed=%v conns=%v size=%v host=%v]",
h.filling, h.closed, len(h.conns), h.size, h.host)
h.filling, h.closed, size, h.size, h.host)
}

func newHostConnPool(session *Session, host *HostInfo, port, size int,
keyspace string) *hostConnPool {

pool := &hostConnPool{
session: session,
host: host,
port: port,
addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
size: size,
keyspace: keyspace,
conns: make([]*Conn, 0, size),
filling: false,
closed: false,
session: session,
host: host,
port: port,
addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
size: size,
keyspace: keyspace,
connPicker: nopConnPicker{},
filling: false,
closed: false,
}

// the pool is not filled or connected
return pool
}

// Pick a connection from this connection pool for the given query.
func (pool *hostConnPool) Pick() *Conn {
func (pool *hostConnPool) Pick(token token) *Conn {
pool.mu.RLock()
defer pool.mu.RUnlock()

if pool.closed {
return nil
}

size := len(pool.conns)
if size < pool.size {
size, missing := pool.connPicker.Size()
if missing > 0 {
// try to fill the pool
go pool.fill()

Expand All @@ -312,62 +310,27 @@ func (pool *hostConnPool) Pick() *Conn {
}
}

pos := int(atomic.AddUint32(&pool.pos, 1) - 1)

var (
leastBusyConn *Conn
streamsAvailable int
)

// find the conn which has the most available streams, this is racy
for i := 0; i < size; i++ {
conn := pool.conns[(pos+i)%size]
if streams := conn.AvailableStreams(); streams > streamsAvailable {
leastBusyConn = conn
streamsAvailable = streams
}
}

return leastBusyConn
return pool.connPicker.Pick(token)
}

//Size returns the number of connections currently active in the pool
func (pool *hostConnPool) Size() int {
pool.mu.RLock()
defer pool.mu.RUnlock()

return len(pool.conns)
size, _ := pool.connPicker.Size()
return size
}

//Close the connection pool
func (pool *hostConnPool) Close() {
pool.mu.Lock()
defer pool.mu.Unlock()

if pool.closed {
pool.mu.Unlock()
return
if !pool.closed {
pool.connPicker.Close()
}
pool.closed = true

// ensure we dont try to reacquire the lock in handleError
// TODO: improve this as the following can happen
// 1) we have locked pool.mu write lock
// 2) conn.Close calls conn.closeWithError(nil)
// 3) conn.closeWithError calls conn.Close() which returns an error
// 4) conn.closeWithError calls pool.HandleError with the error from conn.Close
// 5) pool.HandleError tries to lock pool.mu
// deadlock

// empty the pool
conns := pool.conns
pool.conns = nil

pool.mu.Unlock()

// close the connections
for _, conn := range conns {
conn.Close()
}
}

// Fill the connection pool
Expand All @@ -380,8 +343,7 @@ func (pool *hostConnPool) fill() {
}

// determine the filling work to be done
startCount := len(pool.conns)
fillCount := pool.size - startCount
startCount, fillCount := pool.connPicker.Size()

// avoid filling a full (or overfull) pool
if fillCount <= 0 {
Expand All @@ -393,9 +355,7 @@ func (pool *hostConnPool) fill() {
pool.mu.RUnlock()
pool.mu.Lock()

// double check everything since the lock was released
startCount = len(pool.conns)
fillCount = pool.size - startCount
startCount, fillCount = pool.connPicker.Size()
if pool.closed || pool.filling || fillCount <= 0 {
// looks like another goroutine already beat this
// goroutine to the filling
Expand Down Expand Up @@ -429,8 +389,10 @@ func (pool *hostConnPool) fill() {
return
}

// filled one
fillCount--
// filled one, let's reload it to see if it has changed
pool.mu.RLock()
_, fillCount = pool.connPicker.Size()
pool.mu.RUnlock()
}

// fill the rest of the pool asynchronously
Expand Down Expand Up @@ -545,11 +507,26 @@ func (pool *hostConnPool) connect() (err error) {
return nil
}

pool.conns = append(pool.conns, conn)
// lazily initialize the connPicker when we know the required type
pool.initConnPicker(conn)
pool.connPicker.Put(conn)

return nil
}

func (pool *hostConnPool) initConnPicker(conn *Conn) {
if _, ok := pool.connPicker.(nopConnPicker); !ok {
return
}

if isScyllaConn(conn) {
pool.connPicker = newScyllaConnPicker(conn)
return
}

pool.connPicker = newDefaultConnPicker(pool.size)
}

// handle any error from a Conn
func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
if !closed {
Expand All @@ -567,15 +544,5 @@ func (pool *hostConnPool) HandleError(conn *Conn, err error, closed bool) {
return
}

// find the connection index
for i, candidate := range pool.conns {
if candidate == conn {
// remove the connection, not preserving order
pool.conns[i], pool.conns = pool.conns[len(pool.conns)-1], pool.conns[:len(pool.conns)-1]

// lost a connection, so fill the pool
go pool.fill()
break
}
}
pool.connPicker.Remove(conn)
}
116 changes: 116 additions & 0 deletions connpicker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package gocql

import (
"fmt"
"sync"
"sync/atomic"
)

type ConnPicker interface {
Pick(token) *Conn
Put(*Conn)
Remove(conn *Conn)
Size() (int, int)
Close()
}

type defaultConnPicker struct {
conns []*Conn
pos uint32
size int
mu sync.RWMutex
}

func newDefaultConnPicker(size int) *defaultConnPicker {
if size <= 0 {
panic(fmt.Sprintf("invalid pool size %d", size))
}
return &defaultConnPicker{
size: size,
}
}

func (p *defaultConnPicker) Remove(conn *Conn) {
p.mu.Lock()
defer p.mu.Unlock()

for i, candidate := range p.conns {
if candidate == conn {
p.conns[i] = nil
return
}
}
}

func (p *defaultConnPicker) Close() {
p.mu.Lock()
defer p.mu.Unlock()

conns := p.conns
p.conns = nil
for _, conn := range conns {
if conn != nil {
conn.Close()
}
}
}

func (p *defaultConnPicker) Size() (int, int) {
size := len(p.conns)
return size, p.size - size
}

func (p *defaultConnPicker) Pick(token) *Conn {
pos := int(atomic.AddUint32(&p.pos, 1) - 1)
size := len(p.conns)

var (
leastBusyConn *Conn
streamsAvailable int
)

// find the conn which has the most available streams, this is racy
for i := 0; i < size; i++ {
conn := p.conns[(pos+i)%size]
if conn == nil {
continue
}
if streams := conn.AvailableStreams(); streams > streamsAvailable {
leastBusyConn = conn
streamsAvailable = streams
}
}

return leastBusyConn
}

func (p *defaultConnPicker) Put(conn *Conn) {
p.mu.Lock()
p.conns = append(p.conns, conn)
p.mu.Unlock()
}

// nopConnPicker is a no-operation implementation of ConnPicker, it's used when
// hostConnPool is created to allow deferring creation of the actual ConnPicker
// to the point where we have first connection.
type nopConnPicker struct{}

func (nopConnPicker) Pick(token) *Conn {
return nil
}

func (nopConnPicker) Put(*Conn) {
}

func (nopConnPicker) Remove(conn *Conn) {
}

func (nopConnPicker) Size() (int, int) {
// Return 1 to make hostConnPool to try to establish a connection.
// When first connection is established hostConnPool replaces nopConnPicker
// with a different ConnPicker implementation.
return 0, 1
}

func (nopConnPicker) Close() {
}
Loading