Skip to content

Commit

Permalink
feat: add RequestContext to PubSubConn (gomodule#603)
Browse files Browse the repository at this point in the history
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: gomodule#592
  • Loading branch information
pabigot committed Mar 23, 2022
1 parent 8eb5625 commit 3706d8e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
8 changes: 8 additions & 0 deletions redis/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package redis

import (
"context"
"errors"
"time"
)
Expand Down Expand Up @@ -116,6 +117,13 @@ func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{} {
return c.receiveInternal(ReceiveWithTimeout(c.Conn, timeout))
}

// ReceiveContext is like Receive, but it allows termination of the receive
// via a Context. If the call returns due to closure of the context's Done
// channel the underlying Conn will have been closed.
func (c PubSubConn) ReceiveContext(ctx context.Context) interface{} {
return c.receiveInternal(ReceiveContext(c.Conn, ctx))
}

func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} {
reply, err := Values(replyArg, errArg)
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions redis/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package redis_test

import (
"context"
"errors"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -74,3 +76,27 @@ func TestPushed(t *testing.T) {
t.Errorf("recv /w timeout got %v, want %v", got, want)
}
}

func TestPubSubReceiveContext(t *testing.T) {
sc, err := redis.DialDefaultServer()
if err != nil {
t.Fatalf("error connection to database, %v", err)
}
defer sc.Close()

c := redis.PubSubConn{Conn: sc}

require.NoError(t, c.Subscribe("c1"))
expectPushed(t, c, "Subscribe(c1)", redis.Subscription{Kind: "subscribe", Channel: "c1", Count: 1})

ctx, cancel := context.WithCancel(context.Background())
cancel()
got := c.ReceiveContext(ctx)
if err, ok := got.(error); !ok {
t.Errorf("recv w/canceled expected Canceled got non-error type %T", got)
} else if !errors.Is(err, context.Canceled) {
t.Errorf("recv w/canceled expected Canceled got %s", err.Error())
} else if err == nil {
t.Errorf("recv w/canceled ctx got %T %v, expected error", got, got)
}
}

0 comments on commit 3706d8e

Please # to comment.