Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add PubSubConn.ReceiveContext() #592

Closed
RelicOfTesla opened this issue Jan 16, 2022 · 6 comments · Fixed by #603
Closed

Add PubSubConn.ReceiveContext() #592

RelicOfTesla opened this issue Jan 16, 2022 · 6 comments · Fixed by #603

Comments

@RelicOfTesla
Copy link

RelicOfTesla commented Jan 16, 2022

  1. add "PubSubConn.ReceiveContext" support watch the <-context.Done() .
    because "redis.ReceiveContext" is return native redis response. was not call "Subscription.receiveInternal" decode this response.

  2. and, "ReceiveWithTimeout" is return net.ErrClosed, can not loop call receive

@stevenh
Copy link
Collaborator

stevenh commented Jan 16, 2022

I'm sorry I don't understand what you're trying to say, do you have some code which demonstrates the issue you have?

@RelicOfTesla
Copy link
Author

RelicOfTesla commented Jan 17, 2022

func run(ctx context.Context){
  psc := &redis.PubSubConn{Conn: conn}
  err := psc.Subscribe(topicStr)
  for {
    r: = psc.ReceiveContext(ctx)   // add this
    err, ok := r.(error)
    _ = ok
    if errors.Is(v, context.DeadlineExceeded) || errors.Is(v, context.Canceled) {
        break; // break loop
    }
    msg, ok := r.(redis.Message)
    // ....

    // if not have the psc.ReceiveContext(ctx), and i try "v, err := redis.ReceiveContext(psc.Conn, ctx)"
    // the v was can not convert redis.Message, because not decode by Subscription.receiveInternal
    // so , please add PubSubConn.ReceiveContext
  }
}
func main(){
   ctx,cancel := context.WithCancel(context.TODO())
   go run(ctx)
   time.Sleep(time.Second*10)
    cancel()
   time.Sleep(time.Second*10)
}
for {
   v := psc.ReceiveWithTimeout(time.Second)
  fmt.Println(v) 
  // first print: os.ErrDeadlineExceeded,  and this connection was closed!!!!
  // second print: net.ErrClosed  , can not receive any redis message!!!
}

@stevenh
Copy link
Collaborator

stevenh commented Jan 17, 2022

To confirm my understanding you're looking for a new feature PubSubConn.ReceiveContext(ctx context.Context)?

You're also saying that psc.ReceiveWithTimeout returns an different error indicated the connection is closed after a timeout occurs. This is indeed the case, and is by design, and that would also be the same for a Context method as a timeout is treated as a fatal error, which can be seen here. It's a fatal error as with you can't be sure what state the connection is in, so it must be closed.

@RelicOfTesla
Copy link
Author

These are two questions..... second question it was an unexpected discovery

@stevenh
Copy link
Collaborator

stevenh commented Jan 18, 2022

So to confirm you should be able to use psc.ReceiveContext(...) to achieve what you want, with the exception that on timeout you're psc will be broken and hence will need to be set back up.

If you're seeing an unexpected result, please provide a working example and the error you are seeing.

pabigot added a commit to pabigot/redigo that referenced this issue Mar 17, 2022
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: gomodule#592
@pabigot
Copy link
Contributor

pabigot commented Mar 17, 2022

I was just looking for this. @stevenh The reason we need a new method is shown by the following:

r := psc.Receive()
fmt.Printf("psc.Receive: %T %v\n", r, r)
// Neither of these work because Conn doesn't have ReceiveContext.
// r, err := psc.ReceiveContext(context.Background())
// r, err := psc.Conn.ReceiveContext(context.Background())

// This works, but it doesn't include the processing
// of receiveInternal:
r, err = redis.ReceiveContext(psc.Conn, context.Background())
if err != nil {
	fmt.Printf("ReceiveContext: err %T: %s\n", err, err)
} else {
	fmt.Printf("ReceiveContext: %T %v\n", r, r)
}

// So this is preferred:
r = psc.ReceiveContext(context.Background())
fmt.Printf("psc.ReceiveContext: %T %v\n", r, r)

The output from this given three successive publications is:

# This is a normal PubSubConn Receive
psc.Receive: redis.Message {ps:cfg:rmq  [111 110 101]}

# This is what we get with redis.ReceiveContext()
ReceiveContext: []interface {} [[109 101 115 115 97 103 101] [112 115 58 99 102 103 58 114 109 113] [111 110 101]]

# This is what we want:
psc.ReceiveContext: redis.Message {ps:cfg:rmq  [111 110 101]}

The enhancement is trivial, and is in #603. It's in draft until I can set up an environment where I can extend the test suite and verify it (it does work in my application). done

pabigot added a commit to pabigot/redigo that referenced this issue Mar 17, 2022
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: gomodule#592
pabigot added a commit to pabigot/redigo that referenced this issue Mar 17, 2022
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: gomodule#592
pabigot added a commit to pabigot/redigo that referenced this issue Mar 22, 2022
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: gomodule#592
pabigot added a commit to pabigot/redigo that referenced this issue Mar 23, 2022
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: gomodule#592
pabigot added a commit to pabigot/redigo that referenced this issue Mar 23, 2022
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: gomodule#592
pabigot added a commit to pabigot/redigo that referenced this issue Mar 23, 2022
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: gomodule#592
stevenh pushed a commit that referenced this issue Mar 23, 2022
Add a wrapper that goes through the standard receiveInternal
processing to match the API of the existing PubSubConn Receive
methods.

Fixes: #592
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants