Skip to content

Commit

Permalink
Merge pull request #209 from sanitysoon/master
Browse files Browse the repository at this point in the history
new redis-arc options and bug fixes
  • Loading branch information
sanitysoon authored Apr 11, 2023
2 parents 1154659 + e8cff5c commit 5cc17be
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 29 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ release: all
mkdir -p $(RELEASE_DIR)/api/java
# Redis
cp -rf redis/src/redis-arc $(RELEASE_DIR)/bin/redis-arc-$(VERSION)
cp -rf redis/src/dump-util $(RELEASE_DIR)/bin/dump-util-$(VERSION)
cp -rf redis/src/cluster-util $(RELEASE_DIR)/bin/cluster-util-$(VERSION)
# Confmaster
cp -rf confmaster/target/confmaster-1.0.0-SNAPSHOT-jar-with-dependencies.jar $(RELEASE_DIR)/confmaster/confmaster-$(VERSION).jar
Expand Down
55 changes: 55 additions & 0 deletions integration_test/test_check_point_and_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,58 @@ def test_delete_smrlog_after_redis_restart(self):
time.sleep(3)

self.assertTrue(len(loglist) < 5)

def test_redis_logcompact(self):
util.print_frame()

server = self.cluster['servers'][0]
redis = telnetlib.Telnet(server['ip'], server['redis_port'])

val = 'x' * 1048576 # 1MB
cmd = '*3\r\n$3\r\nset\r\n$4\r\ntest\r\n$1048576\r\n%s\r\n' % val

# create smr log file
for i in xrange(64*7): # at least 7 log files
redis.write(cmd)
ret = redis.read_until('\r\n', 3)
self.assertEquals(ret, '+OK\r\n')

# wait until synced
if config.opt_use_memlog:
time.sleep(3)

loglist = [f for f in os.listdir('%s/log0' % util.smr_dir(0)) if '.log' in f]
util.log('before log delete')
util.log(loglist)

self.assertTrue(len(loglist) > 7)

testbase.request_to_shutdown_redis(server)
# start redis-arc with logcompact-seq option
if True:
import constant as c
id = server['id']
ip = server['ip']
smr_base_port = server['smr_base_port']
redis_port = server['redis_port']
cmd = './%s --smr-local-port %d --port %d --save "" --logcompact-seq %d' % (c.REDIS, smr_base_port, redis_port, 64*1024*1024*6)
f_log_std = util.open_process_logfile( id, 'redis_std' )
f_log_err = util.open_process_logfile( id, 'redis_err' )
p = util.exec_proc_async(util.redis_dir(id), cmd, True, None, f_log_std, f_log_err)
ret = p.wait()
self.assertTrue(ret == 0)
testbase.request_to_shutdown_smr(server)

testbase.request_to_start_smr(server, log_delete_delay=1)
testbase.request_to_start_redis(server)

time.sleep(30)
loglist = [f for f in os.listdir('%s/log0' % util.smr_dir(0)) if '.log' in f]
util.log('after log delete')
util.log(loglist)

# wait until synced
if config.opt_use_memlog:
time.sleep(3)

self.assertTrue(len(loglist) < 5)
88 changes: 71 additions & 17 deletions integration_test/test_s3gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ def test_big_s3object ( self ):
ok, prev_dbsize = redis.do_request('dbsize\r\n')
assert (ok == True)

# save and set current sss-gc-interval long enough
ok, resp = redis.do_request('config get sss-gc-interval\r\n')
assert (ok == True)
prev_gc_interval = int(resp[1])
# save and set current sss-gc-interval long enough
ok, resp = redis.do_request('config get sss-gc-interval\r\n')
assert (ok == True)
prev_gc_interval = int(resp[1])

ok, dummy = redis.do_request('config set sss-gc-interval 1000000\r\n')
assert (ok == True)
ok, dummy = redis.do_request('config set sss-gc-interval 1000000\r\n')
assert (ok == True)

# make big s3 object (10000000 elements)
# Note max inline request size is about 64k
Expand Down Expand Up @@ -120,20 +120,74 @@ def test_big_s3object ( self ):
et = int(round(time.time() * 1000))
assert (et - st < 500) # 500 msec is hard limit?

# wait for the big object purged and deleted
limit = time.time() + (make_end - make_begin)/1000.0
while time.time() < limit + 1:
ok, curr_dbsize = redis.do_request('dbsize\r\n')
assert (ok == True)
if curr_dbsize == prev_dbsize:
break
time.sleep(0.5)
# wait for the big object purged and deleted
limit = time.time() + (make_end - make_begin)/1000.0
while time.time() < limit + 1:
ok, curr_dbsize = redis.do_request('dbsize\r\n')
assert (ok == True)
if curr_dbsize == prev_dbsize:
break
time.sleep(0.5)

# check number of keys are unchanged
ok, curr_dbsize = redis.do_request('dbsize\r\n')
assert (ok == True)
assert (curr_dbsize == prev_dbsize)

# restore gc-interval
ok, dummy = redis.do_request('config set sss-gc-interval %d\r\n' % prev_gc_interval)
assert (ok == True)
# restore gc-interval
ok, dummy = redis.do_request('config set sss-gc-interval %d\r\n' % prev_gc_interval)
assert (ok == True)

def test_s3gc_eager_big_key_values(self):
# opensource#182
redis = self.redis
ok, prev_dbsize = redis.do_request('dbsize\r\n')
assert (ok == True)

util.log('before make some s3 objects with lots of long living values')
for i in range(0, 3000):
ok, data = redis.do_request('s3sadd * key1 svc key val%d 1000000\r\n' % i)
assert (ok == True)
ok, data = redis.do_request('s3sadd * key2 svc key val%d 1000000\r\n' % i)
assert (ok == True)
ok, data = redis.do_request('s3sadd * key3 svc key val%d 1000000\r\n' % i)
assert (ok == True)
util.log('after make some s3 objects with lots of long living values')
time.sleep (0.2)

# iterate all gc lines
for i in range (0, 8192):
st = int(round(time.time() * 1000))
ok, r = redis.do_request('s3gc 0\r\n')
assert (ok == True)
ok, r = redis.do_request('ping\r\n')
assert (ok == True)
et = int(round(time.time() * 1000))
assert (et - st < 500) # 500 msec is hard limit?

# we should see s3gc_eager_loops:0 eventually
def get_s3gc_eager_loops():
ok, data = redis.do_request("info stats\r\n")
assert(ok == True)
items = data.split('\r\n')
for item in items:
prefix = 's3gc_eager_loops:'
if item.startswith(prefix):
return int(item[len(prefix):].strip())

# we should see idle eager gc mode
see_idle_gc = False
for i in range(50):
if get_s3gc_eager_loops() == 0:
see_idle_gc = True
break
time.sleep(0.02)
assert(see_idle_gc), get_s3gc_eager_loops()

# check number of keys are unchanged
redis.do_request('del key1\r\n')
redis.do_request('del key2\r\n')
redis.do_request('del key3\r\n')
ok, curr_dbsize = redis.do_request('dbsize\r\n')
assert (ok == True)
assert (curr_dbsize == prev_dbsize)
24 changes: 24 additions & 0 deletions redis/src/arc_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,30 @@ arc_config_cmp_load (int argc, sds * argv, char **err_ret)
goto loaderr;
}
}
else if (!strcasecmp (argv[0], "logcompact-seq") && argc == 2)
{
long long seq = -1LL;
int ret;
ret = string2ll (argv[1], strlen (argv[1]), &seq);
if (ret == 0 || seq < 0)
{
err = "Invalid logcompact-seq spec";
goto loaderr;
}
arc.logcompact_seq = seq;
}
else if (!strcasecmp (argv[0], "logcompact-ts") && argc == 2)
{
long long ts = -1LL;
int ret;
ret = string2ll (argv[1], strlen (argv[1]), &ts);
if (ret == 0 || ts < 0)
{
err = "Invalid logcompact-ts spec";
goto loaderr;
}
arc.logcompact_ts = ts;
}
else
{
/* no match */
Expand Down
27 changes: 27 additions & 0 deletions redis/src/arc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static void ascii_art (void);
static void process_smr_callback (aeEventLoop * el, int fd, void *privdata,
int mask);
static int smrcb_close (void *arg, long long seq, short nid, int sid);
static void save_and_exit (void);
static int smrcb_data (void *arg, long long seq, long long timestamp,
short nid, int sid, int hash, smrData * smr_data,
int size);
Expand Down Expand Up @@ -251,6 +252,10 @@ init_config (void)
/* Local ip check */
arc.local_ip_addrs = arcx_get_local_ip_addrs (); //TODO why here?

/* logcompact-[seq|ts] */
arc.logcompact_seq = -1LL;
arc.logcompact_ts = -1LL;

#ifdef COVERAGE_TEST
arc.debug_mem_usage_fixed = 0;
arc.debug_total_mem_kb = 0LL;
Expand Down Expand Up @@ -533,6 +538,21 @@ smrcb_close (void *arg, long long seq, short nid, int sid)
return 0;
}

static void
save_and_exit (void)
{
int ret;

ret = rdbSave (server.rdb_filename);
if (ret == C_OK)
{
exit (0);
}
else
{
exit (1);
}
}

static int
smrcb_data (void *arg, long long seq, long long timestamp,
Expand All @@ -544,6 +564,13 @@ smrcb_data (void *arg, long long seq, long long timestamp,
dlisth *node;
UNUSED (arg);

/* handle logcompaction */
if ((arc.logcompact_seq != -1LL && seq + size > arc.logcompact_seq) ||
(arc.logcompact_ts != -1LL && timestamp > arc.logcompact_ts))
{
save_and_exit ();
}

/* Special commands */
if (size == 1 && data[0] == ARC_SMR_CMD_CATCHUP_CHECK)
{
Expand Down
18 changes: 14 additions & 4 deletions redis/src/arc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ struct arcServer
dlisth *gc_line; /* fixed rate gc object headers */
dlisth gc_eager; /* s3 object header for eager mode gc */
void *gc_obc; /* s3 object cursor for incremental purge */
int gc_eager_loops; /* event loop count in eager mode */
long long gc_eager_loops; /* event loop count in eager mode */

/* State machine replicator (SMR) */
smrConnector *smr_conn;
Expand Down Expand Up @@ -190,6 +190,10 @@ struct arcServer
/* local ip check */
char **local_ip_addrs;

/* logcompact-[seq|ts] */
long long logcompact_seq;
long long logcompact_ts;

#ifdef COVERAGE_TEST
/* debugging value for injecting memory states */
int debug_mem_usage_fixed;
Expand Down Expand Up @@ -237,6 +241,8 @@ extern struct arcServer arc;
config_get_numerical_field("memory-max-allowed-percentage",arc.mem_max_allowed_perc); \
config_get_numerical_field("memory-hard-limit-percentage",arc.mem_hard_limit_perc); \
config_get_numerical_field("object-bio-delete-min-elems",arc.object_bio_delete_min_elems); \
config_get_numerical_field("logcompact-seq",arc.logcompact_seq); \
config_get_numerical_field("logcompact-ts",arc.logcompact_ts); \
} while(0);

#define arc_rewrite_config() do { \
Expand All @@ -246,8 +252,10 @@ extern struct arcServer arc;
rewriteConfigNumericalOption(state, "number-of-rdb-backups",arc.num_rdb_backups,1900); \
rewriteConfigNumericalOption(state, "memory-limit-activation-percentage",arc.mem_limit_active_perc,100); \
rewriteConfigNumericalOption(state, "memory-max-allowed-percentage",arc.mem_max_allowed_perc,100); \
rewriteConfigNumericalOption(state, "memory-hard-limit-percentage",arc.mem_hard_limit_perc,100); \
rewriteConfigNumericalOption(state, "memory-hard-limit-percentage",arc.mem_hard_limit_perc,100); \
rewriteConfigNumericalOption(state, "object-bio-delete-min-elems",arc.object_bio_delete_min_elems,ARC_OBJ_BIO_DELETE_MIN_ELEMS); \
rewriteConfigNumericalOption(state, "logcompact-seq",arc.logcompact_seq,-1); \
rewriteConfigNumericalOption(state, "logcompact-ts",arc.logcompact_ts,-1); \
} while(0)

extern int arc_config_set (client * c);
Expand Down Expand Up @@ -324,14 +332,16 @@ extern int arc_config_cmp_load (int argc, sds * argv, char **err_ret);
"instantaneous_replied_ops_per_sec:%lld\r\n" \
"total_commands_lcon:%lld\r\n" \
"instantaneous_lcon_ops_per_sec:%lld\r\n" \
"background_deleted_keys:%lld\r\n"
"background_deleted_keys:%lld\r\n" \
"s3gc_eager_loops:%lld\r\n"

#define ARC_INFO_STATS_ARG \
arc.stat_numcommands_replied, \
arc_get_instantaneous_metric(arc.replied_ops_sec_samples), \
arc.stat_numcommands_lcon, \
arc_get_instantaneous_metric(arc.lcon_ops_sec_samples), \
arc.stat_bgdel_keys,
arc.stat_bgdel_keys, \
arc.gc_eager_loops,

extern void arc_amend_command_table (void);
extern void arc_init_config (void);
Expand Down
Loading

0 comments on commit 5cc17be

Please # to comment.