diff --git a/CHANGELOG.md b/CHANGELOG.md index 741bf32..64f5091 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Tarantool 3.0 role for expirationd (#160). + ### Changed - Updated the 'space_index_test.lua' to drop and recreate the test space diff --git a/README.md b/README.md index 722c3e4..3849a67 100644 --- a/README.md +++ b/README.md @@ -193,7 +193,7 @@ package with features: is_master_only: true options: args: - - any + - any atomic_iteration: false force: false force_allow_functional_index: true @@ -232,3 +232,79 @@ package with features: -- code of the function end) ``` + +## Tarantool 3.0 role + +`roles.expirationd` is a Tarantool 3.0 role for the expirationd +package with the following features: + +* You can configure the expirationd role with `cfg` entry (check example). + Cluster configuration allows to set the same parameters as + in [expirationd.cfg()](https://tarantool.github.io/expirationd/#cfg) +* You can use persistent functions (i.e. created by `box.schema.func.create`) + for expirationd `cfg` entries. + When configuring, role tries first to get a function from global namespace (`_G`) + and if the function was not found then role tries to search in `box.func` + for a function with the same name. + If some functions from config are missing, + expirationd will wait for their creation and start tasks when all of them are found. + You can check logs to see what functions are missing. +* The role stops all expirationd tasks on an instance on the role termination. +* The role can automatically start or kill old tasks from the role + configuration. + + ```yaml + roles: [roles.expirationd] + roles_cfg: + roles.expirationd: + cfg: + metrics: true + task_name1: + space: users + is_expired: is_expired_func_name + is_master_only: true + options: + args: + - any + atomic_iteration: false + force: false + force_allow_functional_index: true + full_scan_delay: 1 + full_scan_time: 1 + index: 0 + iterate_with: iterate_with_func_name_in__G + iteration_delay: 1 + iterator_type: ALL + on_full_scan_complete: on_full_scan_complete_func_name_in__G + on_full_scan_error: on_full_scan_error_func_name_in__G + on_full_scan_start: on_full_scan_start_func_name_in__G + on_full_scan_success: on_full_scan_success_func_name_in__G + process_expired_tuple: process_expired_tuple_func_name_in__G + process_while: process_while_func_name_in__G + start_key: + - 1 + tuples_per_iteration: 100 + vinyl_assumed_space_len: 100 + vinyl_assumed_space_len_factor: 1 + ``` + + [expirationd.start()](https://tarantool.github.io/expirationd/#start) has + the same parameters with the same meaning except for the additional optional + param `is_master_only`. If `true`, the task should run only on a master + instance. By default, the value is `false`. + + You need to be careful with function parameters. Task will not start until it + finds all functions from config. You can define them in user code: + + ```Lua + box.schema.func.create('is_expired_func_name', { + body = "function(...) return true end", + if_not_exists = true + }) + + -- Or you could define a global variable. + rawset(_G, "process_while_func_name_in__G", function(...) + return true + end) + ``` + diff --git a/debian/tarantool-expirationd.install b/debian/tarantool-expirationd.install index 268da34..f8c4d0f 100644 --- a/debian/tarantool-expirationd.install +++ b/debian/tarantool-expirationd.install @@ -1,2 +1,3 @@ expirationd usr/share/tarantool/ cartridge/roles/expirationd.lua usr/share/tarantool/cartridge/roles/ +roles/expirationd.lua usr/share/tarantool/roles/ diff --git a/expirationd-scm-1.rockspec b/expirationd-scm-1.rockspec index 8c8d35a..f014307 100644 --- a/expirationd-scm-1.rockspec +++ b/expirationd-scm-1.rockspec @@ -20,6 +20,7 @@ build = { ["expirationd"] = "expirationd/init.lua", ["expirationd.version"] = "expirationd/version.lua", ["cartridge.roles.expirationd"] = "cartridge/roles/expirationd.lua", + ["roles.expirationd"] = "roles/expirationd.lua" } } -- vim: syntax=lua diff --git a/roles/expirationd.lua b/roles/expirationd.lua new file mode 100644 index 0000000..cd27a71 --- /dev/null +++ b/roles/expirationd.lua @@ -0,0 +1,324 @@ +local expirationd = require("expirationd") +local fiber = require("fiber") +local log = require("log") + +local role_name = "roles.expirationd" +local started = {} + + +local function load_function(func_name) + if func_name == nil or type(func_name) ~= 'string' then + return nil + end + + local func = rawget(_G, func_name) + if func ~= nil then + if type(func) ~= 'function' then + return nil + end + + return func + elseif box.schema.func.exists(func_name) then + return function(...) + return box.func[func_name]:call({...}) + end + else + return nil + end +end + +local types_map = { + b = {type = "boolean", err = "a boolean"}, + n = {type = "number", err = "a number"}, + s = {type = "string", err = "a string"}, + f = {type = "string", transform = load_function, err = "a function name in _G or in box.func"}, + t = {type = "table", err = "a table"}, + any = {err = "any type"}, +} + +local opts_map = { + args = {"any"}, + atomic_iteration = {"b"}, + force = {"b"}, + force_allow_functional_index = {"b"}, + full_scan_delay = {"n"}, + full_scan_time = {"n"}, + index = {"n", "s"}, + iterate_with = {"f"}, + iteration_delay = {"n"}, + iterator_type = {"n", "s"}, + on_full_scan_complete = {"f"}, + on_full_scan_error = {"f"}, + on_full_scan_start = {"f"}, + on_full_scan_success = {"f"}, + process_expired_tuple = {"f"}, + process_while = {"f"}, + start_key = {"f", "t"}, + tuples_per_iteration = {"n"}, + vinyl_assumed_space_len_factor = {"n"}, + vinyl_assumed_space_len = {"n"}, +} + +local function table_contains(table, element) + for _, value in pairs(table) do + if value == element then + return true + end + end + return false +end + + +local function get_param(param_name, value, types) + local found = false + for _, t in ipairs(types) do + local type_opts = types_map[t] + if type_opts == nil then + error(role_name .. ": unsupported type option") + end + if not type_opts.type or type(value) == type_opts.type then + if type_opts.transform then + local tmp = type_opts.transform(value) + if tmp then + value = tmp + found = true + break + end + else + found = true + break + end + end + end + + -- Small hack because in tarantool role we wait for functions to be created. + -- So, if type of value is function and it is allowed + -- and it is not found we do not return an error. + if table_contains(types, "f") and not found then + for _, t in ipairs(types) do + local type_opts = types_map[t] + if t == "f" and type(value) == type_opts.type then + return nil, true, nil + end + end + end + + if not found then + local err = role_name .. ": " .. param_name .. " must be " + for i, t in ipairs(types) do + err = err .. types_map[t].err + if i ~= #types then + err = err .. " or " + end + end + return nil, false, err + end + + return value, true, nil +end + +local function get_task_options(opts) + if opts == nil then + return + end + + local missed_functions = {} + + for opt, val in pairs(opts) do + if type(opt) ~= "string" then + error(role_name .. ": an option must be a string") + end + if opts_map[opt] == nil then + error(role_name .. ": unsupported option '" .. opt .. "'") + end + local res, ok, err = get_param("options." .. opt, val, opts_map[opt]) + if not ok then + error(err) + end + if ok and res == nil and opts_map[opt][1] == "f" then + table.insert(missed_functions, val) + end + opts[opt] = res + end + + return opts, missed_functions +end + +local function get_task_config(task_conf) + -- setmetatable resets __newindex write protection on a copy. + local conf = setmetatable(table.deepcopy(task_conf), {}) + local params_map = { + space = {required = true, types = {"n", "s"}}, + is_expired = {required = true, types = {"f"}}, + is_master_only = {required = false, types = {"b"}}, + options = {required = false, types = {"t"}}, + } + for k, _ in pairs(conf) do + if type(k) ~= "string" then + error(role_name .. ": param must be a string") + end + if params_map[k] == nil then + error(role_name .. ": unsupported param " .. k) + end + end + local missed_functions = {} + for param, opts in pairs(params_map) do + if opts.required and conf[param] == nil then + error(role_name .. ": " .. param .. " is required") + end + if conf[param] ~= nil then + local res, ok, err = get_param(param, conf[param], opts.types) + if not ok then + error(err) + end + if ok and res == nil and opts.types[1] == "f" then + table.insert(missed_functions, conf[param]) + end + conf[param] = res + end + end + + local missed_functions_opts + conf.options, missed_functions_opts = get_task_options(conf.options) + if missed_functions_opts ~= nil then + for _, func in pairs(missed_functions_opts) do + table.insert(missed_functions, func) + end + end + return conf, missed_functions +end + +local function get_cfg(cfg) + local conf = setmetatable(table.deepcopy(cfg), {}) + local params_map = { + metrics = {"b"}, + } + + for k, _ in pairs(conf) do + if type(k) ~= "string" then + error(role_name .. ": config option must be a string") + end + if params_map[k] == nil then + error(role_name .. ": unsupported config option " .. k) + end + end + + for param, types in pairs(params_map) do + if conf[param] ~= nil then + local _, ok, err = get_param(param, conf[param], types) + if not ok then + error(err) + end + end + end + + return conf +end + +local function validate_config(conf_new) + local conf = conf_new or {} + + for task_name, task_conf in pairs(conf) do + local _, ok, err = get_param("task name", task_name, {"s"}) + if not ok then + error(err) + end + local _, ok, err = get_param("task params", task_conf, {"t"}) + if not ok then + error(err) + end + local ok, ret = pcall(get_task_config, task_conf) + if not ok then + if task_name == "cfg" then + get_cfg(task_conf) + else + error(ret) + end + end + end + + return true +end + +local function load_task(task_conf, task_name) + local timeout = 1 + local warning_delay = 60 + local start = fiber.clock() + local task_config, missed_functions = get_task_config(task_conf) + + fiber.name(role_name .. ": " .. task_name) + + local skip = task_conf.is_master_only and not box.info.ro + if skip then + return + end + + while #missed_functions ~= 0 do + fiber.sleep(timeout) + if fiber.clock() - start > warning_delay then + local message = role_name .. ": " .. task_name .. ": waiting for functions: " + for i, func in pairs(missed_functions) do + if i == #missed_functions then + message = message .. func .. '.' + else + message = message .. func .. ', ' + end + end + + log.warn(message) + start = fiber.clock() + end + task_config, missed_functions = get_task_config(task_conf) + end + + local task = expirationd.start(task_name, task_config.space, + task_config.is_expired, + task_config.options) + if task == nil then + error(role_name .. ": unable to start task " .. task_name) + end + table.insert(started, task_name) +end + +local function apply_config(conf) + -- Finishes tasks from an old configuration + for i = #started, 1, -1 do + local task_name = started[i] + local ok, task = pcall(expirationd.task, task_name) + -- We don't need to do anything if there is no task + if ok then + if conf[task_name] then + task:stop() + else + task:kill() + end + end + table.remove(started, i) + end + + if conf["cfg"] ~= nil then + local ok = pcall(get_task_config, conf["cfg"]) + if not ok then + local cfg = get_cfg(conf["cfg"]) + expirationd.cfg(cfg) + conf["cfg"] = nil + end + end + + for task_name, task_conf in pairs(conf) do + fiber.new(load_task, task_conf, task_name) + end +end + +local function stop() + for _, task_name in pairs(expirationd.tasks()) do + local task = expirationd.task(task_name) + task:stop() + end +end + +return { + validate = validate_config, + apply = apply_config, + stop = stop, +} diff --git a/rpm/tarantool-expirationd.spec b/rpm/tarantool-expirationd.spec index 0656670..917b126 100644 --- a/rpm/tarantool-expirationd.spec +++ b/rpm/tarantool-expirationd.spec @@ -30,10 +30,13 @@ install -d %{buildroot}%{_datarootdir}/tarantool/expirationd/ install -m 0644 expirationd/* %{buildroot}%{_datarootdir}/tarantool/expirationd/ install -d %{buildroot}%{_datarootdir}/tarantool/cartridge/roles/ install -m 0644 cartridge/roles/expirationd.lua %{buildroot}%{_datarootdir}/tarantool/cartridge/roles/expirationd.lua +install -d %{buildroot}%{_datarootdir}/tarantool/roles/ +install -m 0644 roles/expirationd.lua %{buildroot}%{_datarootdir}/tarantool/roles/expirationd.lua %files %{_datarootdir}/tarantool/expirationd/ %{_datarootdir}/tarantool/cartridge +%{_datarootdir}/tarantool/roles %doc README.md %{!?_licensedir:%global license %doc} %license LICENSE diff --git a/test/helper.lua b/test/helper.lua index 7a9e646..fc3f711 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -300,6 +300,11 @@ function helpers.single_yield_transactional_ddl_is_supported() (major >= 3) end +function helpers.tarantool_role_is_supported() + local major, _, _ = helpers.tarantool_version() + return major >= 3 +end + function helpers.error_function() error("error function call") end @@ -317,4 +322,11 @@ function helpers.create_persistent_function(name, body) }) end +local root = fio.dirname(fio.dirname(fio.abspath(package.search('test.helper')))) + +helpers.lua_path = root .. '/?.lua;' .. + root .. '/?/init.lua;' .. + root .. '/.rocks/share/tarantool/?.lua;' .. + root .. '/.rocks/share/tarantool/?/init.lua' + return helpers diff --git a/test/helper_server.lua b/test/helper_server.lua new file mode 100644 index 0000000..d42da4e --- /dev/null +++ b/test/helper_server.lua @@ -0,0 +1,194 @@ +-- https://github.com/tarantool/tarantool/blob/5040fba9cf1da942371721e36e81c7372699600c/test/luatest_helpers/server.lua +local fun = require('fun') +local yaml = require('yaml') +local urilib = require('uri') +local fio = require('fio') +local luatest = require('luatest') + +-- Join paths in an intuitive way. +-- +-- If a component is nil, it is skipped. +-- +-- If a component is an absolute path, it skips all the previous +-- components. +-- +-- The wrapper is written for two components for simplicity. +local function pathjoin(a, b) + -- No first path -- skip it. + if a == nil then + return b + end + -- No second path -- skip it. + if b == nil then + return a + end + -- The absolute path is checked explicitly due to gh-8816. + if b:startswith('/') then + return b + end + return fio.pathjoin(a, b) +end + +-- Determine advertise URI for given instance from a cluster +-- configuration. +local function find_advertise_uri(config, instance_name, dir) + if config == nil or next(config) == nil then + return nil + end + + -- Determine listen and advertise options that are in effect + -- for the given instance. + local advertise + local listen + + for _, group in pairs(config.groups or {}) do + for _, replicaset in pairs(group.replicasets or {}) do + local instance = (replicaset.instances or {})[instance_name] + if instance == nil then + break + end + if instance.iproto ~= nil then + if instance.iproto.advertise ~= nil then + advertise = advertise or instance.iproto.advertise.client + end + listen = listen or instance.iproto.listen + end + if replicaset.iproto ~= nil then + if replicaset.iproto.advertise ~= nil then + advertise = advertise or replicaset.iproto.advertise.client + end + listen = listen or replicaset.iproto.listen + end + if group.iproto ~= nil then + if group.iproto.advertise ~= nil then + advertise = advertise or group.iproto.advertise.client + end + listen = listen or group.iproto.listen + end + end + end + + if config.iproto ~= nil then + if config.iproto.advertise ~= nil then + advertise = advertise or config.iproto.advertise.client + end + listen = listen or config.iproto.listen + end + + local uris + if advertise ~= nil then + uris = {{uri = advertise}} + else + uris = listen + end + + for _, uri in ipairs(uris or {}) do + uri = table.copy(uri) + uri.uri = uri.uri:gsub('{{ *instance_name *}}', instance_name) + uri.uri = uri.uri:gsub('unix/:%./', ('unix/:%s/'):format(dir)) + local u = urilib.parse(uri) + if u.ipv4 ~= '0.0.0.0' and u.ipv6 ~= '::' and u.service ~= '0' then + return uri + end + end + error('No suitable URI to connect is found') +end + +local Server = luatest.Server:inherit({}) + +-- Adds the following options: +-- +-- * config_file (string) +-- +-- An argument of the `--config <...>` CLI option. +-- +-- Used to deduce advertise URI to connect net.box to the +-- instance. +-- +-- The special value '' means running without `--config <...>` +-- CLI option (but still pass `--name `). +-- * remote_config (table) +-- +-- If `config_file` is not passed, this config value is used to +-- deduce the advertise URI to connect net.box to the instance. +Server.constructor_checks = fun.chain(Server.constructor_checks, { + config_file = 'string', + remote_config = '?table', +}):tomap() + +function Server:initialize() + if self.config_file ~= nil then + self.command = arg[-1] + + self.args = fun.chain(self.args or {}, { + '--name', self.alias + }):totable() + + if self.config_file ~= '' then + table.insert(self.args, '--config') + table.insert(self.args, self.config_file) + + -- Take into account self.chdir to calculate a config + -- file path. + local config_file_path = pathjoin(self.chdir, self.config_file) + + -- Read the provided config file. + local fh, err = fio.open(config_file_path, {'O_RDONLY'}) + if fh == nil then + error(('Unable to open file %q: %s'):format(config_file_path, + err)) + end + self.config = yaml.decode(fh:read()) + fh:close() + end + + if self.net_box_uri == nil then + local config = self.config or self.remote_config + + -- NB: listen and advertise URIs are relative to + -- process.work_dir, which, in turn, is relative to + -- self.chdir. + local work_dir + if config.process ~= nil and config.process.work_dir ~= nil then + work_dir = config.process.work_dir + end + local dir = pathjoin(self.chdir, work_dir) + self.net_box_uri = find_advertise_uri(config, self.alias, dir) + end + end + getmetatable(getmetatable(self)).initialize(self) +end + +function Server:connect_net_box() + getmetatable(getmetatable(self)).connect_net_box(self) + + if self.config_file == nil then + return + end + + if not self.net_box then + return + end + + -- Replace the ready condition. + local saved_eval = self.net_box.eval + self.net_box.eval = function(self, expr, args, opts) + if expr == 'return _G.ready' then + expr = "return require('config'):info().status == 'ready' or " .. + "require('config'):info().status == 'check_warnings'" + end + return saved_eval(self, expr, args, opts) + end +end + +-- Enable the startup waiting if the advertise URI of the instance +-- is determined. +function Server:start(opts) + opts = opts or {} + if self.config_file and opts.wait_until_ready == nil then + opts.wait_until_ready = self.net_box_uri ~= nil + end + getmetatable(getmetatable(self)).start(self, opts) +end + +return Server diff --git a/test/integration/role_test.lua b/test/integration/cartridge_role_test.lua similarity index 99% rename from test/integration/role_test.lua rename to test/integration/cartridge_role_test.lua index e580a7d..b3731f8 100644 --- a/test/integration/role_test.lua +++ b/test/integration/cartridge_role_test.lua @@ -1,7 +1,7 @@ local fio = require('fio') local t = require('luatest') local helpers = require('test.helper') -local g = t.group('expirationd_intergration_role') +local g = t.group('cartridge_expirationd_intergration_role') local is_cartridge_helpers, cartridge_helpers = pcall(require, 'cartridge.test-helpers') g.before_all(function(cg) diff --git a/test/integration/simple_app/config.yaml b/test/integration/simple_app/config.yaml new file mode 100644 index 0000000..031c8d2 --- /dev/null +++ b/test/integration/simple_app/config.yaml @@ -0,0 +1,22 @@ +credentials: + users: + guest: + roles: [super] + +groups: + group-001: + replicasets: + replicaset-001: + roles: [roles.expirationd] + roles_cfg: + roles.expirationd: + task_name1: + space: users + is_expired: forever_true_test + instances: + master: + iproto: + listen: + - uri: '127.0.0.1:3313' + database: + mode: rw diff --git a/test/integration/simple_app/instances.yml b/test/integration/simple_app/instances.yml new file mode 100644 index 0000000..b79b9b8 --- /dev/null +++ b/test/integration/simple_app/instances.yml @@ -0,0 +1 @@ +master: diff --git a/test/integration/tarantool_role_test.lua b/test/integration/tarantool_role_test.lua new file mode 100644 index 0000000..0a1d5ca --- /dev/null +++ b/test/integration/tarantool_role_test.lua @@ -0,0 +1,124 @@ +local t = require('luatest') +local fio = require('fio') + +local helpers = require('test.helper') +local Server = require('test.helper_server') + +local g = t.group('tarantool_role_integration_test') + +g.before_all(function (cg) + t.skip_if(not helpers.tarantool_role_is_supported(), + 'Tarantool role is supported only for Tarantool starting from v3.0.0') + + local workdir = fio.tempdir() + cg.router = Server:new({ + config_file = fio.abspath(fio.pathjoin('test', 'integration', 'simple_app', 'config.yaml')), + env = {LUA_PATH = helpers.lua_path}, + chdir = workdir, + alias = 'master', + workdir = workdir, + }) +end) + +g.before_each(function(cg) + fio.mktree(cg.router.workdir) + + -- We start instance before each test because + -- we need to force reload of expirationd role and also instance environment + -- from previous tests can influence test result. + -- (e.g function creation, when testing that role doesn't start w/o it) + -- Restarting instance is the easiest way to achive it. + -- It takes around 1s to start an instance, which considering small amount + -- of instegration tests is not a problem. + cg.router:start{wait_until_ready = true} + + cg.router.net_box:eval([[ + box.watch('box.status', function(_, status) + if status.is_ro == false then + box.schema.create_space('users', {if_not_exists = true}) + + box.space.users:format({ + {name = 'id', type = 'unsigned'}, + {name = 'first name', type = 'string'}, + {name = 'second name', type = 'string', is_nullable = true}, + {name = 'age', type = 'number', is_nullable = false}, + }) + + box.space.users:create_index('primary', { + parts = { + {field = 1, type = 'unsigned'}, + }, + }) + + box.space.users:insert{1, 'Samantha', 'Carter', 30} + box.space.users:insert{2, 'Fay', 'Rivers', 41} + box.space.users:insert{3, 'Zachariah', 'Peters', 13} + box.space.users:insert{4, 'Milo', 'Walters', 74} + end + end) + ]]) +end) + +g.after_each(function(cg) + cg.router:stop() + fio.rmtree(cg.router.workdir) +end) + +g.test_simple_role_config = function(cg) + cg.router.net_box:eval([[ + box.schema.func.create('forever_true_test', { + body = "function(...) return true end", + if_not_exists = true + }) + ]]) + + helpers.retrying({}, function() + t.assert_equals(cg.router.net_box:eval([[ + return #box.space.users:select({}, {limit = 10}) + ]]), 0) + end) +end + +g.test_waiting_for_functions = function(cg) + -- Function for is_expired config entry was not created, so expirationd + -- is waiting for it's creation. + helpers.retrying({}, function() + t.assert_equals(cg.router.net_box:eval([[ + return box.space.users:select({}, {limit = 10}) + ]]), { + {1, 'Samantha', 'Carter', 30}, + {2, 'Fay', 'Rivers', 41}, + {3, 'Zachariah', 'Peters', 13}, + {4, 'Milo', 'Walters', 74}, + }) + end) + + -- Create is_expired function and check that expirationd task started. + cg.router.net_box:eval([[ + box.schema.func.create('forever_true_test', { + body = "function(...) return true end", + if_not_exists = true + }) + ]]) + + helpers.retrying({}, function() + t.assert_equals(cg.router.net_box:eval([[ + return #box.space.users:select({}, {limit = 10}) + ]]), 0) + end) + +end + +g.test_function_created_in_g = function(cg) + cg.router.net_box:eval([[ + rawset(_G, 'forever_true_test', function(...) + return true + end) + ]]) + + helpers.retrying({}, function() + t.assert_equals(cg.router.net_box:eval([[ + return #box.space.users:select({}, {limit = 10}) + ]]), 0) + end) +end diff --git a/test/unit/role_test.lua b/test/unit/cartridge_role_test.lua similarity index 99% rename from test/unit/role_test.lua rename to test/unit/cartridge_role_test.lua index 1077749..9d17fe4 100644 --- a/test/unit/role_test.lua +++ b/test/unit/cartridge_role_test.lua @@ -2,7 +2,7 @@ local expirationd = require("expirationd") local fiber = require("fiber") local t = require("luatest") local helpers = require("test.helper") -local g = t.group('expirationd_role') +local g = t.group('cartridge_expirationd_role') local is_cartridge_roles, _ = pcall(require, 'cartridge.roles') local always_true_func_name = "expirationd_test_always_true" diff --git a/test/unit/tarantool_role_test.lua b/test/unit/tarantool_role_test.lua new file mode 100644 index 0000000..2b2c564 --- /dev/null +++ b/test/unit/tarantool_role_test.lua @@ -0,0 +1,565 @@ +local expirationd = require('expirationd') +local t = require('luatest') +local helpers = require('test.helper') + +local g = t.group('tarantool_expirationd_role') + +local always_true_func_name = 'expirationd_test_always_true' +local always_true_func_name_in_box_func = 'expirationd_test_always_true_bf' +local always_true_func_name_with_side_effect = 'expirationd_test_always_true_se' +local always_true_func_name_with_side_effect_flag = 'expirationd_test_always_true_se_called' +local iterate_pairs_func_name = 'expirationd_test_pairs' + +g.before_all(function() + t.skip_if(not helpers.tarantool_role_is_supported(), + 'Tarantool role is supported only for Tarantool starting from v3.0.0') + g.default_cfg = { metrics = expirationd.cfg.metrics } +end) + +g.before_each(function() + g.role = require('roles.expirationd') + g.space = helpers.create_space_with_tree_index('memtx') + + -- Kill live tasks (it can still live after failed tests). + for _, t in ipairs(expirationd.tasks()) do + local task = expirationd.task(t) + task:stop() + end + + g.is_metrics_supported = helpers.is_metrics_supported() + + helpers.create_persistent_function(always_true_func_name_in_box_func) + helpers.create_persistent_function(always_true_func_name_with_side_effect, [[ + function(...) + return false + end + ]]) + + rawset(_G, always_true_func_name_with_side_effect_flag, false) + rawset(_G, always_true_func_name, function() return true end) + rawset(_G, iterate_pairs_func_name, function() return pairs({}) end) + rawset(_G, always_true_func_name_with_side_effect, function() + rawset(_G, always_true_func_name_with_side_effect_flag, true) + end) +end) + +g.after_each(function(g) + expirationd.cfg(g.default_cfg) + g.space:drop() + g.role.stop() + for _, t in ipairs(expirationd.tasks()) do + local task = expirationd.task(t) + task:stop() + end + + rawset(_G, always_true_func_name, nil) + rawset(_G, iterate_pairs_func_name, nil) +end) + +local required_test_cases = { + cfg_empty = { + ok = true, + cfg = { ["cfg"] = { + }}, + }, + cfg_invalid_param = { + ok = false, + err = "roles.expirationd: unsupported config option any", + cfg = { ["cfg"] = { + any = 1, + }}, + }, + cfg_metrics = { + ok = true, + cfg = { ["cfg"] = { + metrics = true, + }}, + }, + cfg_metrics_invalid_value = { + ok = false, + err = "roles.expirationd: metrics must be a boolean", + cfg = { ["cfg"] = { + metrics = 12, + }}, + }, + all_ok_space_number = { + ok = true, + cfg = { ["task_name"] = { + space = 1, + is_expired = always_true_func_name, + }}, + }, + all_ok_space_string = { + ok = true, + cfg = { ["task_name"] = { + space = "space name", + is_expired = always_true_func_name, + }}, + }, + all_ok_empty_opts = { + ok = true, + cfg = { ["task_name"] = { + space = 1, + is_expired = always_true_func_name, + options = {} + }}, + }, + all_ok_task_cfg = { + ok = true, + cfg = { ["cfg"] = { + space = "space name", + is_expired = always_true_func_name, + }}, + }, + not_table = { + ok = false, + err = "roles.expirationd: task params must be a table", + cfg = { ["task_name"] = 123 }, + }, + no_space = { + ok = false, + err = "roles.expirationd: space is required", + cfg = { ["task_name"] = { + is_expired = always_true_func_name, + }}, + }, + no_expired = { + ok = false, + err = "roles.expirationd: is_expired is required", + cfg = { ["task_name"] = { + space = 1, + }}, + }, + invalid_name = { + ok = false, + err = "roles.expirationd: task name must be a string", + cfg = { [3] = { + space = "space name", + is_expired = always_true_func_name, + }}, + }, + invalid_space = { + ok = false, + err = "roles.expirationd: space must be a number or a string", + cfg = { ["task_name"] = { + space = {}, + is_expired = always_true_func_name, + }}, + }, +} + +for k, case in pairs(required_test_cases) do + g["test_validate_config_required_" .. k] = function(cg) + local status, res = pcall(cg.role.validate, case.cfg) + if case.ok then + t.assert_equals(status, true) + t.assert_equals(res, true) + else + t.assert_equals(status, false) + t.assert_str_contains(res, case.err) + end + end +end + +local function create_valid_required(args) + local new_args = table.deepcopy(args) + new_args["space"] = 1 + new_args["is_expired"] = always_true_func_name + return {["task_name"] = new_args} +end + +local additional_opts_test_cases = { + is_master_boolean = { + ok = true, + cfg = {is_master_only = true}, + }, + is_master_not_boolean = { + ok = false, + err = "roles.expirationd: is_master_only must be a boolean", + cfg = {is_master_only = {}}, + }, +} + +for k, case in pairs(additional_opts_test_cases) do + g["test_validate_config_additional_" .. k] = function(cg) + local cfg = create_valid_required(case.cfg) + local status, res = pcall(cg.role.validate, cfg) + if case.ok then + t.assert_equals(status, true) + t.assert_equals(res, true) + else + t.assert_equals(status, false) + t.assert_str_contains(res, case.err) + end + end +end + +local options_cases = { + nilval = { + ok = true, + options = nil, + }, + empty = { + ok = true, + options = {}, + }, + number = { + ok = false, + err = "roles.expirationd: an option must be a string", + options = {[1] = "any"}, + }, + unsupported = { + ok = false, + err = "roles.expirationd: unsupported option 'unsupported_option'", + options = {unsupported_option = "any"}, + }, + args_table = { + ok = true, + options = {args = {}}, + }, + args_string = { + ok = true, + options = {args = "string"}, + }, + args_number = { + ok = true, + options = {args = 13}, + }, + args_boolean = { + ok = true, + options = {args = true}, + }, + atomic_iteration_boolean = { + ok = true, + options = {atomic_iteration = true}, + }, + atomic_iteration_invalid = { + ok = false, + err = "roles.expirationd: options.atomic_iteration must be a boolean", + options = {atomic_iteration = "string"}, + }, + force_boolean = { + ok = true, + options = {force = false}, + }, + force_invalid = { + ok = false, + err = "roles.expirationd: options.force must be a boolean", + options = {force = "string"}, + }, + force_allow_functional_index_boolean = { + ok = true, + options = {force_allow_functional_index = true }, + }, + force_allow_functional_index_invalid = { + ok = false, + err = "roles.expirationd: options.force_allow_functional_index must be a boolean", + options = {force_allow_functional_index = 13}, + }, + full_scan_delay_number = { + ok = true, + options = {full_scan_delay = 13}, + }, + full_scan_delay_invalid = { + ok = false, + err = "roles.expirationd: options.full_scan_delay must be a number", + options = {full_scan_delay = "string"}, + }, + full_scan_time_number = { + ok = true, + options = {full_scan_time = 23}, + }, + full_scan_time_invalid = { + ok = false, + err = "roles.expirationd: options.full_scan_time must be a number", + options = {full_scan_time = true}, + }, + index_number = { + ok = true, + options = {index = 0}, + }, + index_string = { + ok = true, + options = {index = "string"}, + }, + index_invalid = { + ok = false, + err = "roles.expirationd: options.index must be a number or a string", + options = {index = true}, + }, + iterate_with_func = { + ok = true, + options = {iterate_with = always_true_func_name}, + }, + iteration_delay_number = { + ok = true, + options = {iteration_delay = 876}, + }, + iteration_delay_invalid = { + ok = false, + err = "roles.expirationd: options.iteration_delay must be a number", + options = {iteration_delay = {"table"}}, + }, + iterator_type_number = { + ok = true, + options = {iterator_type = box.index.GE}, + }, + iterator_type_string = { + ok = true, + options = {iterator_type = "GE"}, + }, + iterator_type_invalid = { + ok = false, + err = "roles.expirationd: options.iterator_type must be a number or a string", + options = {iterator_type = false}, + }, + on_full_scan_complete_func = { + ok = true, + options = {on_full_scan_complete = always_true_func_name}, + }, + on_full_scan_error_func = { + ok = true, + options = {on_full_scan_error = always_true_func_name}, + }, + on_full_scan_start_func = { + ok = true, + options = {on_full_scan_start = always_true_func_name}, + }, + on_full_scan_success_func = { + ok = true, + options = {on_full_scan_success = always_true_func_name}, + }, + process_expired_tuple_func = { + ok = true, + options = {process_expired_tuple = always_true_func_name}, + }, + process_while_func = { + ok = true, + options = {process_while = always_true_func_name}, + }, + start_key_func = { + ok = true, + options = {start_key = always_true_func_name}, + }, + start_key_table = { + ok = true, + options = {start_key = {1, 2, 3}}, + }, + tuples_per_iteration_number = { + ok = true, + options = {tuples_per_iteration = 11}, + }, + tuples_per_iteration_invalid = { + ok = false, + err = "roles.expirationd: options.tuples_per_iteration must be a number", + options = {tuples_per_iteration = {"table"}}, + }, + vinyl_assumed_space_len_factor_number = { + ok = true, + options = {vinyl_assumed_space_len_factor = 333}, + }, + vinyl_assumed_space_len_factor_invalid = { + ok = false, + err = "roles.expirationd: options.vinyl_assumed_space_len_factor must be a number", + options = {vinyl_assumed_space_len_factor = false}, + }, + vinyl_assumed_space_len_number = { + ok = true, + options = {vinyl_assumed_space_len = 1}, + }, + vinyl_assumed_space_len_invalid = { + ok = false, + err = "roles.expirationd: options.vinyl_assumed_space_len must be a number", + options = {vinyl_assumed_space_len = "string"}, + }, +} + +for k, case in pairs(options_cases) do + g["test_validate_config_option_" .. k] = function(cg) + local cfg = create_valid_required({options = case.options}) + local status, res = pcall(cg.role.validate, cfg) + if case.ok then + t.assert_equals(status, true) + t.assert_equals(res, true) + else + t.assert_equals(status, false) + t.assert_str_contains(res, case.err) + end + end +end + +function g.test_apply_config_start_tasks(cg) + local task_name1 = "apply_config_test1" + local task_name2 = "apply_config_test2" + pcall(cg.role.apply, { + [task_name1] = { + space = g.space.id, + is_expired = always_true_func_name, + }, + [task_name2] = { + space = g.space.id, + is_expired = always_true_func_name, + options = {} + }, + }, {is_master = false}) + + helpers.retrying({}, function() + t.assert_not_equals(expirationd.task(task_name1), nil) + t.assert_not_equals(expirationd.task(task_name2), nil) + end) +end + +function g.test_apply_config_cfg_metrics_default(cg) + t.skip_if(not g.is_metrics_supported, + "metrics >= 0.11.0 is not installed") + local task_name1 = "apply_config_test1" + + cg.role.apply({ + [task_name1] = { + space = g.space.id, + is_expired = always_true_func_name, + }, + }, {is_master = false}) + + helpers.retrying({}, function() + t.assert_not_equals(expirationd.task(task_name1), nil) + t.assert_equals(expirationd.cfg.metrics, g.default_cfg.metrics) + end) +end + +function g.test_apply_config_cfg_metrics(cg) + t.skip_if(not g.is_metrics_supported, + "metrics >= 0.11.0 is not installed") + local task_name1 = "apply_config_test1" + + for _, value in ipairs({false, true}) do + cg.role.apply({ + ["cfg"] = { + metrics = value, + }, + [task_name1] = { + space = g.space.id, + is_expired = always_true_func_name, + }, + }, {is_master = false}) + + helpers.retrying({}, function() + t.assert_not_equals(expirationd.task(task_name1), nil) + t.assert_equals(expirationd.cfg.metrics, value) + end) + end +end + +function g.test_apply_config_start_cfg_task(cg) + local task_name = "cfg" + cg.role.apply({ + [task_name] = { + space = g.space.id, + is_expired = always_true_func_name, + }, + }, {is_master = false}) + + helpers.retrying({}, function() + t.assert_not_equals(expirationd.task(task_name), nil) + end) +end + +function g.test_apply_config_start_cfg_task_with_box_func(cg) + local task_name = 'cfg' + cg.role.apply({ + [task_name] = { + space = g.space.id, + is_expired = always_true_func_name_in_box_func + }, + }, { is_master = false }) + + helpers.retrying({}, function() + t.assert_not_equals(expirationd.task(task_name), nil) + end) +end + +function g.test_apply_config_start_cfg_task_with_correct_order(cg) + t.assert_not(rawget(_G, always_true_func_name_with_side_effect_flag)) + local task_name = 'cfg' + cg.space:insert({1, "1"}) + cg.role.apply({ + [task_name] = { + space = g.space.id, + is_expired = always_true_func_name_with_side_effect + }, + }, { is_master = false }) + + helpers.retrying({}, function() + t.assert_not_equals(expirationd.task(task_name), nil) + t.assert(rawget(_G, always_true_func_name_with_side_effect_flag)) + end) +end + +function g.test_apply_config_start_task_with_all_options(cg) + local task_name = "apply_config_test" + local options = { + args = {"any"}, + atomic_iteration = false, + force = false, + force_allow_functional_index = true, + full_scan_delay = 1, + full_scan_time = 1, + index = 0, + iterate_with = iterate_pairs_func_name, + iteration_delay = 1, + iterator_type = "ALL", + on_full_scan_complete = always_true_func_name, + on_full_scan_error = always_true_func_name, + on_full_scan_start = always_true_func_name, + on_full_scan_success = always_true_func_name, + process_expired_tuple = always_true_func_name, + process_while = always_true_func_name, + start_key = {1}, + tuples_per_iteration = 100, + vinyl_assumed_space_len_factor = 1, + vinyl_assumed_space_len = 100, + } + + cg.role.apply({ + [task_name] = { + space = g.space.id, + is_expired = always_true_func_name, + options = options, + }, + }, {is_master = true}) + + helpers.retrying({}, function() + t.assert_not_equals(expirationd.task(task_name), nil) + end) + + -- pass an invalid option + options.full_scan_time = -100 + pcall(cg.role.apply, { + [task_name] = { + space = g.space.id, + is_expired = always_true_func_name, + options = options, + }, + }, {is_master = true}) + + helpers.retrying({}, function() + local err = pcall(expirationd.task, task_name) + t.assert_equals(err, false) + end) +end + +function g.test_apply_config_skip_is_master_only(cg) + local task_name = "apply_config_test" + + cg.role.apply({ + [task_name] = { + space = g.space.id, + is_expired = always_true_func_name, + is_master_only = true, + }, + }, {is_master = false}) + + helpers.retrying({}, function() + t.assert_equals(#expirationd.tasks(), 0) + end) +end