Skip to content

Commit

Permalink
Revamp connections to avoid runtime calls (#282)
Browse files Browse the repository at this point in the history
By having clients wait for timeouts on the same channel as they do for
results we end up in an efficient special case of the go runtime,
rather than a more heavyweight select loop.
  • Loading branch information
Watson Ladd authored Feb 5, 2021
1 parent a3cb6f2 commit 1c05176
Showing 1 changed file with 52 additions and 50 deletions.
102 changes: 52 additions & 50 deletions conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"io"
"net"
"net/rpc"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -105,13 +104,16 @@ const defaultOpTimeout = 10 * time.Second
// ErrClosed is the error returned when an already-closed connection is re-used.
var ErrClosed = fmt.Errorf("use of closed connection")

// ErrNotFound is not really an error, since timeouts race responses
var ErrNotFound = fmt.Errorf("connection removed")

// Conn represents an open keyless connection.
type Conn struct {
// In order to read, acquire readMtx; in order to write, acquire writeMtx
conn net.Conn
// In order to read, acquire mapMtx.RLock(); in order to write, acquire
// mapMtx.Lock().
listeners map[uint32]chan *protocol.Operation
listeners map[uint32]chan *result
// In order to modify, acquire mapMtx.Lock().
nextID uint32

Expand All @@ -120,18 +122,23 @@ type Conn struct {
// To lock up the connection, always acquire in the following order to avoid
// deadlock: writeMtx, mapMtx (don't acquire readMtx).
readMtx, writeMtx sync.Mutex
mapMtx sync.RWMutex
mapMtx sync.Mutex

// In order to read, acquire any mutex in any mode (read or write). In order
// to modify, acquire all three.
closed bool
}

type result struct {
err error
op *protocol.Operation
}

// NewConnTimeout constructs a new Conn with the given operation timeout.
func NewConnTimeout(inner net.Conn, opTimeout time.Duration) *Conn {
return &Conn{
conn: inner,
listeners: make(map[uint32]chan *protocol.Operation),
listeners: make(map[uint32]chan *result),
opTimeout: opTimeout,
}
}
Expand Down Expand Up @@ -177,41 +184,42 @@ func (c *Conn) DoRead() error {
if err != nil {
return err
}
l, err := c.extractChannel(pkt.ID)
if err != nil {
// The timeout fired, our connection was removed.
// Not a problem!
if err == ErrNotFound {
return nil
}
// Other errors close the whole connection
return err
}

// Acquire the map mutex until we're done with the map.
c.mapMtx.RLock()
l <- &result{op: &pkt.Operation}
return nil
}

func (c *Conn) extractChannel(id uint32) (chan *result, error) {
c.mapMtx.Lock()
defer c.mapMtx.Unlock()
if c.closed {
// it was closed in the time that we didn't have a lock held
c.mapMtx.RUnlock()
return ErrClosed
return nil, ErrClosed
}

// NOTE: Regardless of what happens, it's the writer's responsibility - not
// ours - to delete their channel after they receive their response. If we
// were to do that ourselves, we could introduce a race condition:
// - Writer times out, enters timeout case in select block
// - Reader locks the map mutex, finds their channel, sends them the response,
// and deletes the channel from the map.
// - Another writer comes, gets the same ID as the currently sleeping waiter,
// and writes their channel into the map.
// - The waiter that timed out wakes back up and deletes the wrong channel!

l, ok := c.listeners[pkt.ID]
c.mapMtx.RUnlock()
ret, ok := c.listeners[id]
delete(c.listeners, id)
if !ok {
// the call to DoOperation hit its timeout and cleared the map and returned;
// the error was delivered from that call, so no point in delivering it here
// too
return nil
return nil, ErrNotFound
}
return ret, nil
}

// In practice, the Go scheduler seems to have a tough time efficiently
// scheduling client goroutines. Empirically, adding this line speeds things
// up significantly.
runtime.Gosched()

l <- &pkt.Operation
return nil
func (c *Conn) timeoutRequest(id uint32, timeout time.Duration) {
<-time.After(timeout)
place, err := c.extractChannel(id)
if err != nil {
return // the process finished successfully first
}
place <- &result{err: fmt.Errorf("operation timed out")}
}

// DoOperation executes an entire keyless operation, returning its result.
Expand All @@ -224,7 +232,7 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco
// time out, but a reader finds this channel before we have a chance to delete
// it from the map, the reader doesn't block forever sending us a value that
// we will never receive.
response := make(chan *protocol.Operation, 1)
response := make(chan *result, 1)

// Acquire the map mutex and only release it once we're done with the map.
c.mapMtx.Lock()
Expand All @@ -236,6 +244,7 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco
c.nextID++
if _, ok := c.listeners[id]; ok {
c.mapMtx.Unlock()
c.Close()
// TODO: If this becomes an issue in practice, we could consider randomly
// generating IDs and spinning until we find an available one (the map
// acts as a record of all IDs currently in use).
Expand Down Expand Up @@ -270,23 +279,16 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco
// of writing to the connection (which could have taken a while if the
// connection was backed up).
left := end.Sub(time.Now())
select {
case op := <-response:
waitingSpan.Finish()
c.mapMtx.Lock()
delete(c.listeners, id)
c.mapMtx.Unlock()
if op == nil {
return nil, ErrClosed
}
return op, nil
case <-time.After(left):
waitingSpan.Finish()
c.mapMtx.Lock()
delete(c.listeners, id)
c.mapMtx.Unlock()
return nil, fmt.Errorf("operation timed out")
go c.timeoutRequest(id, left)
res := <-response
waitingSpan.Finish()
if res == nil {
return nil, ErrClosed
}
if res.err != nil {
return nil, res.err
}
return res.op, nil
}

// Ping sends a ping message over the connection and waits for a corresponding
Expand Down

0 comments on commit 1c05176

Please # to comment.