From 237ce05e471b0ec569644789d03b9bd3d5b7a573 Mon Sep 17 00:00:00 2001 From: KyuJae Lee Date: Mon, 19 Apr 2021 17:17:00 +0900 Subject: [PATCH 1/3] implement logcompact-[seq|ts] redis-arc options --- integration_test/test_check_point_and_log.py | 55 ++++++++++++++++++++ redis/src/arc_config.c | 24 +++++++++ redis/src/arc_server.c | 27 ++++++++++ redis/src/arc_server.h | 10 +++- 4 files changed, 115 insertions(+), 1 deletion(-) diff --git a/integration_test/test_check_point_and_log.py b/integration_test/test_check_point_and_log.py index e614c40b..2909e9b7 100644 --- a/integration_test/test_check_point_and_log.py +++ b/integration_test/test_check_point_and_log.py @@ -373,3 +373,58 @@ def test_delete_smrlog_after_redis_restart(self): time.sleep(3) self.assertTrue(len(loglist) < 5) + + def test_redis_logcompact(self): + util.print_frame() + + server = self.cluster['servers'][0] + redis = telnetlib.Telnet(server['ip'], server['redis_port']) + + val = 'x' * 1048576 # 1MB + cmd = '*3\r\n$3\r\nset\r\n$4\r\ntest\r\n$1048576\r\n%s\r\n' % val + + # create smr log file + for i in xrange(64*7): # at least 7 log files + redis.write(cmd) + ret = redis.read_until('\r\n', 3) + self.assertEquals(ret, '+OK\r\n') + + # wait until synced + if config.opt_use_memlog: + time.sleep(3) + + loglist = [f for f in os.listdir('%s/log0' % util.smr_dir(0)) if '.log' in f] + util.log('before log delete') + util.log(loglist) + + self.assertTrue(len(loglist) > 7) + + testbase.request_to_shutdown_redis(server) + # start redis-arc with logcompact-seq option + if True: + import constant as c + id = server['id'] + ip = server['ip'] + smr_base_port = server['smr_base_port'] + redis_port = server['redis_port'] + cmd = './%s --smr-local-port %d --port %d --save "" --logcompact-seq %d' % (c.REDIS, smr_base_port, redis_port, 64*1024*1024*6) + f_log_std = util.open_process_logfile( id, 'redis_std' ) + f_log_err = util.open_process_logfile( id, 'redis_err' ) + p = util.exec_proc_async(util.redis_dir(id), cmd, True, None, f_log_std, f_log_err) + ret = p.wait() + self.assertTrue(ret == 0) + testbase.request_to_shutdown_smr(server) + + testbase.request_to_start_smr(server, log_delete_delay=1) + testbase.request_to_start_redis(server) + + time.sleep(30) + loglist = [f for f in os.listdir('%s/log0' % util.smr_dir(0)) if '.log' in f] + util.log('after log delete') + util.log(loglist) + + # wait until synced + if config.opt_use_memlog: + time.sleep(3) + + self.assertTrue(len(loglist) < 5) diff --git a/redis/src/arc_config.c b/redis/src/arc_config.c index c4eaa7f0..c1ada0b7 100644 --- a/redis/src/arc_config.c +++ b/redis/src/arc_config.c @@ -272,6 +272,30 @@ arc_config_cmp_load (int argc, sds * argv, char **err_ret) goto loaderr; } } + else if (!strcasecmp (argv[0], "logcompact-seq") && argc == 2) + { + long long seq = -1LL; + int ret; + ret = string2ll (argv[1], strlen (argv[1]), &seq); + if (ret == 0 || seq < 0) + { + err = "Invalid logcompact-seq spec"; + goto loaderr; + } + arc.logcompact_seq = seq; + } + else if (!strcasecmp (argv[0], "logcompact-ts") && argc == 2) + { + long long ts = -1LL; + int ret; + ret = string2ll (argv[1], strlen (argv[1]), &ts); + if (ret == 0 || ts < 0) + { + err = "Invalid logcompact-ts spec"; + goto loaderr; + } + arc.logcompact_ts = ts; + } else { /* no match */ diff --git a/redis/src/arc_server.c b/redis/src/arc_server.c index cdd79edd..5e575b71 100644 --- a/redis/src/arc_server.c +++ b/redis/src/arc_server.c @@ -30,6 +30,7 @@ static void ascii_art (void); static void process_smr_callback (aeEventLoop * el, int fd, void *privdata, int mask); static int smrcb_close (void *arg, long long seq, short nid, int sid); +static void save_and_exit (void); static int smrcb_data (void *arg, long long seq, long long timestamp, short nid, int sid, int hash, smrData * smr_data, int size); @@ -251,6 +252,10 @@ init_config (void) /* Local ip check */ arc.local_ip_addrs = arcx_get_local_ip_addrs (); //TODO why here? + /* logcompact-[seq|ts] */ + arc.logcompact_seq = -1LL; + arc.logcompact_ts = -1LL; + #ifdef COVERAGE_TEST arc.debug_mem_usage_fixed = 0; arc.debug_total_mem_kb = 0LL; @@ -533,6 +538,21 @@ smrcb_close (void *arg, long long seq, short nid, int sid) return 0; } +static void +save_and_exit (void) +{ + int ret; + + ret = rdbSave (server.rdb_filename); + if (ret == C_OK) + { + exit (0); + } + else + { + exit (1); + } +} static int smrcb_data (void *arg, long long seq, long long timestamp, @@ -544,6 +564,13 @@ smrcb_data (void *arg, long long seq, long long timestamp, dlisth *node; UNUSED (arg); + /* handle logcompaction */ + if ((arc.logcompact_seq != -1LL && seq + size > arc.logcompact_seq) || + (arc.logcompact_ts != -1LL && timestamp > arc.logcompact_ts)) + { + save_and_exit (); + } + /* Special commands */ if (size == 1 && data[0] == ARC_SMR_CMD_CATCHUP_CHECK) { diff --git a/redis/src/arc_server.h b/redis/src/arc_server.h index 3f211065..192c2f34 100644 --- a/redis/src/arc_server.h +++ b/redis/src/arc_server.h @@ -190,6 +190,10 @@ struct arcServer /* local ip check */ char **local_ip_addrs; + /* logcompact-[seq|ts] */ + long long logcompact_seq; + long long logcompact_ts; + #ifdef COVERAGE_TEST /* debugging value for injecting memory states */ int debug_mem_usage_fixed; @@ -237,6 +241,8 @@ extern struct arcServer arc; config_get_numerical_field("memory-max-allowed-percentage",arc.mem_max_allowed_perc); \ config_get_numerical_field("memory-hard-limit-percentage",arc.mem_hard_limit_perc); \ config_get_numerical_field("object-bio-delete-min-elems",arc.object_bio_delete_min_elems); \ + config_get_numerical_field("logcompact-seq",arc.logcompact_seq); \ + config_get_numerical_field("logcompact-ts",arc.logcompact_ts); \ } while(0); #define arc_rewrite_config() do { \ @@ -246,8 +252,10 @@ extern struct arcServer arc; rewriteConfigNumericalOption(state, "number-of-rdb-backups",arc.num_rdb_backups,1900); \ rewriteConfigNumericalOption(state, "memory-limit-activation-percentage",arc.mem_limit_active_perc,100); \ rewriteConfigNumericalOption(state, "memory-max-allowed-percentage",arc.mem_max_allowed_perc,100); \ - rewriteConfigNumericalOption(state, "memory-hard-limit-percentage",arc.mem_hard_limit_perc,100); \ + rewriteConfigNumericalOption(state, "memory-hard-limit-percentage",arc.mem_hard_limit_perc,100); \ rewriteConfigNumericalOption(state, "object-bio-delete-min-elems",arc.object_bio_delete_min_elems,ARC_OBJ_BIO_DELETE_MIN_ELEMS); \ + rewriteConfigNumericalOption(state, "logcompact-seq",arc.logcompact_seq,-1); \ + rewriteConfigNumericalOption(state, "logcompact-ts",arc.logcompact_ts,-1); \ } while(0) extern int arc_config_set (client * c); From 8e4f37130d092b56536b187643612c6231eaaf18 Mon Sep 17 00:00:00 2001 From: KyuJae Lee Date: Mon, 10 Apr 2023 16:43:39 +0900 Subject: [PATCH 2/3] bugfix: eager GC repeates when there are many unexpired values for a svc key --- Makefile | 1 + integration_test/test_s3gc.py | 88 ++++++++++++++++++++++++++++------- redis/src/arc_server.h | 8 ++-- redis/src/arc_t_sss.c | 24 ++++++++-- 4 files changed, 96 insertions(+), 25 deletions(-) diff --git a/Makefile b/Makefile index 8d53e6f2..5f2b067c 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,7 @@ release: all mkdir -p $(RELEASE_DIR)/api/java # Redis cp -rf redis/src/redis-arc $(RELEASE_DIR)/bin/redis-arc-$(VERSION) + cp -rf redis/src/dump-util $(RELEASE_DIR)/bin/dump-util-$(VERSION) cp -rf redis/src/cluster-util $(RELEASE_DIR)/bin/cluster-util-$(VERSION) # Confmaster cp -rf confmaster/target/confmaster-1.0.0-SNAPSHOT-jar-with-dependencies.jar $(RELEASE_DIR)/confmaster/confmaster-$(VERSION).jar diff --git a/integration_test/test_s3gc.py b/integration_test/test_s3gc.py index a06a30e2..e3eedf3e 100644 --- a/integration_test/test_s3gc.py +++ b/integration_test/test_s3gc.py @@ -86,13 +86,13 @@ def test_big_s3object ( self ): ok, prev_dbsize = redis.do_request('dbsize\r\n') assert (ok == True) - # save and set current sss-gc-interval long enough - ok, resp = redis.do_request('config get sss-gc-interval\r\n') - assert (ok == True) - prev_gc_interval = int(resp[1]) + # save and set current sss-gc-interval long enough + ok, resp = redis.do_request('config get sss-gc-interval\r\n') + assert (ok == True) + prev_gc_interval = int(resp[1]) - ok, dummy = redis.do_request('config set sss-gc-interval 1000000\r\n') - assert (ok == True) + ok, dummy = redis.do_request('config set sss-gc-interval 1000000\r\n') + assert (ok == True) # make big s3 object (10000000 elements) # Note max inline request size is about 64k @@ -120,20 +120,74 @@ def test_big_s3object ( self ): et = int(round(time.time() * 1000)) assert (et - st < 500) # 500 msec is hard limit? - # wait for the big object purged and deleted - limit = time.time() + (make_end - make_begin)/1000.0 - while time.time() < limit + 1: - ok, curr_dbsize = redis.do_request('dbsize\r\n') - assert (ok == True) - if curr_dbsize == prev_dbsize: - break - time.sleep(0.5) + # wait for the big object purged and deleted + limit = time.time() + (make_end - make_begin)/1000.0 + while time.time() < limit + 1: + ok, curr_dbsize = redis.do_request('dbsize\r\n') + assert (ok == True) + if curr_dbsize == prev_dbsize: + break + time.sleep(0.5) # check number of keys are unchanged ok, curr_dbsize = redis.do_request('dbsize\r\n') assert (ok == True) assert (curr_dbsize == prev_dbsize) - # restore gc-interval - ok, dummy = redis.do_request('config set sss-gc-interval %d\r\n' % prev_gc_interval) - assert (ok == True) + # restore gc-interval + ok, dummy = redis.do_request('config set sss-gc-interval %d\r\n' % prev_gc_interval) + assert (ok == True) + + def test_s3gc_eager_big_key_values(self): + # opensource#182 + redis = self.redis + ok, prev_dbsize = redis.do_request('dbsize\r\n') + assert (ok == True) + + util.log('before make some s3 objects with lots of long living values') + for i in range(0, 3000): + ok, data = redis.do_request('s3sadd * key1 svc key val%d 1000000\r\n' % i) + assert (ok == True) + ok, data = redis.do_request('s3sadd * key2 svc key val%d 1000000\r\n' % i) + assert (ok == True) + ok, data = redis.do_request('s3sadd * key3 svc key val%d 1000000\r\n' % i) + assert (ok == True) + util.log('after make some s3 objects with lots of long living values') + time.sleep (0.2) + + # iterate all gc lines + for i in range (0, 8192): + st = int(round(time.time() * 1000)) + ok, r = redis.do_request('s3gc 0\r\n') + assert (ok == True) + ok, r = redis.do_request('ping\r\n') + assert (ok == True) + et = int(round(time.time() * 1000)) + assert (et - st < 500) # 500 msec is hard limit? + + # we should see s3gc_eager_loops:0 eventually + def get_s3gc_eager_loops(): + ok, data = redis.do_request("info stats\r\n") + assert(ok == True) + items = data.split('\r\n') + for item in items: + prefix = 's3gc_eager_loops:' + if item.startswith(prefix): + return int(item[len(prefix):].strip()) + + # we should see idle eager gc mode + see_idle_gc = False + for i in range(50): + if get_s3gc_eager_loops() == 0: + see_idle_gc = True + break + time.sleep(0.02) + assert(see_idle_gc), get_s3gc_eager_loops() + + # check number of keys are unchanged + redis.do_request('del key1\r\n') + redis.do_request('del key2\r\n') + redis.do_request('del key3\r\n') + ok, curr_dbsize = redis.do_request('dbsize\r\n') + assert (ok == True) + assert (curr_dbsize == prev_dbsize) diff --git a/redis/src/arc_server.h b/redis/src/arc_server.h index 192c2f34..a5f67f88 100644 --- a/redis/src/arc_server.h +++ b/redis/src/arc_server.h @@ -149,7 +149,7 @@ struct arcServer dlisth *gc_line; /* fixed rate gc object headers */ dlisth gc_eager; /* s3 object header for eager mode gc */ void *gc_obc; /* s3 object cursor for incremental purge */ - int gc_eager_loops; /* event loop count in eager mode */ + long long gc_eager_loops; /* event loop count in eager mode */ /* State machine replicator (SMR) */ smrConnector *smr_conn; @@ -332,14 +332,16 @@ extern int arc_config_cmp_load (int argc, sds * argv, char **err_ret); "instantaneous_replied_ops_per_sec:%lld\r\n" \ "total_commands_lcon:%lld\r\n" \ "instantaneous_lcon_ops_per_sec:%lld\r\n" \ - "background_deleted_keys:%lld\r\n" + "background_deleted_keys:%lld\r\n" \ + "s3gc_eager_loops:%lld\r\n" #define ARC_INFO_STATS_ARG \ arc.stat_numcommands_replied, \ arc_get_instantaneous_metric(arc.replied_ops_sec_samples), \ arc.stat_numcommands_lcon, \ arc_get_instantaneous_metric(arc.lcon_ops_sec_samples), \ - arc.stat_bgdel_keys, + arc.stat_bgdel_keys, \ + arc.gc_eager_loops, extern void arc_amend_command_table (void); extern void arc_init_config (void); diff --git a/redis/src/arc_t_sss.c b/redis/src/arc_t_sss.c index 08a1b56b..7ee9ddc0 100644 --- a/redis/src/arc_t_sss.c +++ b/redis/src/arc_t_sss.c @@ -95,12 +95,16 @@ struct sssObc_ robj *ks; robj *svc; robj *key; + int index; + robj *val; }; #define init_sss_obc(c) do { \ (c)->s3 = NULL; \ (c)->ks = NULL; \ (c)->svc = NULL; \ (c)->key = NULL; \ + (c)->index = 0; \ + (c)->val = NULL; \ } while(0) struct add_arg @@ -268,7 +272,6 @@ static int s_Kv_mode = 0; /*----------------------------------------------------------------------------- * Local function implementations *----------------------------------------------------------------------------*/ - /* * if type conflict w.r.t (ks, uuid, svc) reply error and returns 1. returns 0 if ok */ @@ -2272,27 +2275,34 @@ obc_set (sssObc * obc, sss * s3, sssEntry * e) robj *ks = NULL; robj *svc = NULL; robj *key = NULL; + robj *val = NULL; if (e) { assert (e->ks->refcount >= 1); assert (e->svc->refcount >= 1); assert (e->key->refcount >= 1); + assert (e->val->refcount >= 1); incrRefCount (e->ks); incrRefCount (e->svc); incrRefCount (e->key); + incrRefCount (e->val); } obc->s3 = s3; ks = obc->ks; svc = obc->svc; key = obc->key; - obc->ks = obc->svc = obc->key = NULL; + val = obc->val; + obc->ks = obc->svc = obc->key = obc->val = NULL; + obc->index = 0; if (e) { obc->ks = e->ks; obc->svc = e->svc; obc->key = e->key; + obc->index = e->index; + obc->val = e->val; } if (ks != NULL) { @@ -2306,6 +2316,10 @@ obc_set (sssObc * obc, sss * s3, sssEntry * e) { decrRefCount (key); } + if (val != NULL) + { + decrRefCount (val); + } } static int @@ -2322,8 +2336,8 @@ obc_purge_scan (sssObc * obc, long long to, int nscan, int *npur, ke.ks = obc->ks; ke.svc = obc->svc; ke.key = obc->key; - ke.index = 0LL; - ke.val = NULL; + ke.index = obc->index; + ke.val = obc->val; first = NULL; e = rb_tree_find_node_geq (&s3->tree, &ke); @@ -2636,7 +2650,7 @@ arcx_sss_gc_cron (void) num_dead = purge_objects (&arc.gc_eager, (sssObc *) arc.gc_obc, curr_msec, until_usec, &h); - + UNUSED(num_dead); /* restore purged objects to gc lines */ while (!dlisth_is_empty (&arc.gc_eager)) { From e8cff5c7c2a3d83f9233423368bc87bddac4e3f5 Mon Sep 17 00:00:00 2001 From: KyuJae Lee Date: Tue, 11 Apr 2023 11:30:45 +0900 Subject: [PATCH 3/3] enlarge trac related variables --- smr/smr/smr_be.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/smr/smr/smr_be.c b/smr/smr/smr_be.c index a65e4b53..1b86da33 100644 --- a/smr/smr/smr_be.c +++ b/smr/smr/smr_be.c @@ -135,8 +135,8 @@ struct smrConnector ioStream *mous; // out state (to master) long long curr_last_cseq; // if not -1, log greater than this seq must got from new master int trac_enabled; // tracking enabled flag - int trac_sent; // total number of bytes sent to the master - int trac_rcvd; // total number of bytes received from the master + long long trac_sent; // total number of bytes sent to the master + long long trac_rcvd; // total number of bytes received from the master int cb_enabled; // callback endabled flag smrCallback *cb; // callback void *arg; // arg @@ -1561,7 +1561,7 @@ process_input_state_range (smrConnector * connector, smrCallback * cb, if (connector->trac_enabled && connector->trac_rcvd > 0) { - int diff; + long long diff; assert (connector->trac_sent >= connector->trac_rcvd); diff = connector->trac_rcvd;