Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Issues with pub/sub #138

Open
SS-TruMinds opened this issue Feb 7, 2023 · 4 comments
Open

Issues with pub/sub #138

SS-TruMinds opened this issue Feb 7, 2023 · 4 comments

Comments

@SS-TruMinds
Copy link

SS-TruMinds commented Feb 7, 2023

We are running Redis server ver 6.2.7 with hiredis ver 1.1.0 & hiredis-cluster ver 0.9

1] In our existing load test we create a single async context (redisClusterAsyncContext *asyncCtx) using which we perform SET/GET operations on a 3 Node cluster. This test works well. We have performed multiple long duration runs with this test on > 1 million keys without any issues.

2] For testing pub/sub, we have now created another connection with a new context: redisClusterAsyncContext *asyncCtxPS:

asyncCtxPS = redisClusterAsyncContextInit();
redisClusterSetOptionAddNodes(asyncCtxPS->cc, cluster_ip);
redisClusterConnect2(asyncCtxPS->cc);

As connections with Redis Nodes get established as a result of the key operations performed in step 1 above, we trigger subscription in the connect callback:

void connectCallback(const redisAsyncContext *ac, int status) {
nodeIterator ni;
cluster_node *clusterNode;
....
....
initNodeIterator(&ni, asyncCtx->cc);
while ((clusterNode = nodeNext(&ni)) != NULL) {
if (ac == clusterNode->acon) {
redisClusterAsyncCommandToNode(asyncCtx, clusterNode, configCb, NULL, "CONFIG SET notify-keyspace-events Eh");
redisClusterAsyncCommandToNode(asyncCtxPS, clusterNode, eventCb, NULL, "PSUBSCRIBE __key*__:*");
....
....

At present, the above two pub/sub commands are invoked only on the first connected Node, not on all 3 Nodes.

void eventCb(redisClusterAsyncContext *cc, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;

if (r == NULL) {
return;
}

if (reply->type == REDIS_REPLY_ARRAY) {
for (int j = 0; j < reply->elements; j++) {
printf("%u) %s\n", j, reply->element[j]->str);
}
}
}

In response, we see the following in the callback:

0) psubscribe
1) __key*__:*
2) (null)

(null) here doesn't seem correct. Subscription is successful if we run the above PSUBSCRIBE directly on redis-cli. What could be the issue here?

A couple of seconds later, we notice a crash:

#0 0x00007ffff7fa70d3 in redisClusterAsyncCallback (ac=0x5d94d0, r=0x0, privdata=0x5da940)
at /....../hircluster.c:3794
#1 0x00007ffff7fb45f2 in __redisRunCallback (ac=0x5d94d0, cb=0x5da9a0, reply=0x0)
at /....../async.c:311
#2 0x00007ffff7fb4979 in __redisAsyncFree (ac=0x5d94d0)
at /....../async.c:386
#3 0x00007ffff7fb4bbf in __redisAsyncDisconnect (ac=0x5d94d0)
at /....../async.c:450
#4 0x00007ffff7fb52b2 in redisProcessCallbacks (ac=0x5d94d0)
at /....../async.c:625
#5 0x00007ffff7fb560a in redisAsyncRead (ac=0x5d94d0)
at /....../async.c:717
#6 0x00007ffff7fb569b in redisAsyncHandleRead (ac=0x5d94d0)
at /....../async.c:738
#7 0x00000000004eb13d in redisLibmyeventHandlerRead (fd=<optimized out>, arg=0x5d7de0)
at /....../hiredis_libmyevent.h:27

Line 625 of async.c is:

605 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
606 /*
607 * A spontaneous reply in a not-subscribed context can be the error
608 * reply that is sent when a new connection exceeds the maximum
609 * number of allowed connections on the server side.
610 *
611 * This is seen as an error instead of a regular reply because the
612 * server closes the connection after sending it.
613 *
614 * To prevent the error from being overwritten by an EOF error the
615 * connection is closed here. See issue #43.
616 *
617 * Another possibility is that the server is loading its dataset.
618 * In this case we also want to close the connection, and have the
619 * user wait until the server is ready to take our request.
620 */
621 if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
622 c->err = REDIS_ERR_OTHER;
623 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
624 c->reader->fn->freeObject(reply);
625 __redisAsyncDisconnect(ac);
626 return;
627 }

If we disable pub/sub in step 2 above, the test runs well again.
Please help debug the issue. Let us know if any addition information is required.

Thank you.

@bjosv
Copy link
Collaborator

bjosv commented Feb 8, 2023

Hi, my previous assessment that this would work seem to be wrong..
The problem with pub/sub is that hiredis keeps its callback-info to be able to reuse this for all messages, while redis-cluster currently dont. hiredis-cluster frees its cluster_async_data after each called callback.

One possible fast-fix until this is handled correctly in hiredis-cluster could be to use the hiredis api directly in your connectCallback(). Here you have the redisAsyncContext for the specific node already and instead of iterating all nodes again you could use this redisAsyncContext as in the hiredis tests.
The callback would have a different signature but maybe thats ok?

We also need to findout how to handle this correctly in hiredis-cluster.

@SS-TruMinds
Copy link
Author

Thank you. We will try out your suggestion.

With regards to the issue with the PSUBSCRIBE response we mentioned, the null there was because we were reading reply->element[j]->str in eventCb().
Changing that to reply->element[j]->integer has fixed that problem.

@bjosv
Copy link
Collaborator

bjosv commented Feb 16, 2023

Some notes of options how to solve this a near future:

  • Option 1: Add information in hiredis redisCallback to inform user if the redisCallback-object with the privdata reference will be deleted or kept by hiredis (kept when pubsub).
    This will be used to know if hiredis-clusters cluster_async_data object should be deleted or not.

  • Option 2: Like hiredis, check sent commands to find "*SUBSCRIBE" and set a flag in the created cluster_async_data that this callback is used for pubsub. With this knowledge we know when cluster_async_data should be deleted or not. When flagged to be a subscribe callback we should not delete the cluster_async_data since it's a published message, and the callback is reused.

Drawbacks I can think of

  • In both options above we dont know when its ok to delete cluster_async_data after an UNSUBSCRIBE has been sent. hiredis dont call the user callback when response to SUBSCRIBE/UNSUNSCRIBE comes..

@zuiderkwast
Copy link
Collaborator

See #27

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants