Skip to content

Commit 2c3f4db

Browse files
committed
cartridge: add a role
The patch adds a Tarantool Cartridge role with features: * expirationd as a Tarantool Cartridge service for easy access to all API calls. * The role stops all expirationd tasks on an instance on the role termination. * It can automatically start and kill old tasks from the role congiguration. Closes #107
1 parent b0f9ae3 commit 2c3f4db

File tree

8 files changed

+1204
-0
lines changed

8 files changed

+1204
-0
lines changed

README.md

+63
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,66 @@ $ make SEED=1334 test
145145
luatest -v --coverage --shuffle all:1334
146146
...
147147
```
148+
149+
## Cartridge role
150+
151+
`cartridge.roles.expirationd` is a Tarantool Cartridge role for the expirationd
152+
package with the features:
153+
154+
* It registers expirationd as a Tarantool Cartridge service for easy access to
155+
all [API calls](https://tarantool.github.io/expirationd/#Module_functions):
156+
```Lua
157+
local task = cartridge.service_get('expirationd').start("task_name", id, is_expired)
158+
task:kill()
159+
```
160+
* The role stops all expirationd tasks on an instance on the role termination.
161+
* The role can automatically start or kill old tasks from the role
162+
configuration:
163+
164+
```yaml
165+
expirationd:
166+
task_name1:
167+
space_id: 579
168+
is_expired: is_expired_func_name_in__G
169+
is_master_only: true
170+
options:
171+
args:
172+
- any
173+
atomic_iteration: false
174+
force: false
175+
force_allow_functional_index: true
176+
full_scan_delay: 1
177+
full_scan_time: 1
178+
index: 0
179+
iterate_with: iterate_with_func_name_in__G
180+
iteration_delay: 1
181+
iterator_type: ALL
182+
on_full_scan_complete: on_full_scan_complete_func_name_in__G
183+
on_full_scan_error: on_full_scan_error_func_name_in__G
184+
on_full_scan_start: on_full_scan_start_func_name_in__G
185+
on_full_scan_success: on_full_scan_success_func_name_in__G
186+
process_expired_tuple: process_expired_tuple_func_name_in__G
187+
process_while: process_while_func_name_in__G
188+
start_key:
189+
- 1
190+
tuples_per_iteration: 100
191+
vinyl_assumed_space_len: 100
192+
vinyl_assumed_space_len_factor: 1
193+
task_name2:
194+
...
195+
```
196+
197+
[expirationd.start()](https://tarantool.github.io/expirationd/#start) has
198+
the same parameters with the same meaning. Except for the additional optional
199+
param `is_master_only`. If `true`, the task should run only on a master
200+
instance. By default, the value is `false`.
201+
202+
You need to be careful with parameters-functions. The string is a key in
203+
the global variable `_G`, the value must be a function. You need to define
204+
the key before initializing the role:
205+
206+
```Lua
207+
rawset(_G, "is_expired_func_name_in__G", function(args, tuple)
208+
-- code of the function
209+
end)
210+
```

cartridge/roles/expirationd.lua

+208
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
local expirationd = require("expirationd")
2+
local role_name = "expirationd"
3+
local started = require("cartridge.vars").new(role_name)
4+
5+
local function load_function(func_name)
6+
if func_name == nil or type(func_name) ~= 'string' then
7+
return nil
8+
end
9+
10+
local func = rawget(_G, func_name)
11+
if func == nil or type(func) ~= 'function' then
12+
return nil
13+
end
14+
return func
15+
end
16+
17+
local function get_param(param_name, value, types)
18+
local types_map = {
19+
b = {type = "boolean", err = "a boolean"},
20+
n = {type = "number", err = "a number"},
21+
s = {type = "string", err = "a string"},
22+
f = {type = "string", transform = load_function, err = "a function name in _G"},
23+
t = {type = "table", err = "a table"},
24+
any = {err = "any type"},
25+
}
26+
27+
local found = false
28+
for _, t in ipairs(types) do
29+
local type_opts = types_map[t]
30+
if type_opts == nil then
31+
error(role_name .. ": unsupported type option")
32+
end
33+
if not type_opts.type or type(value) == type_opts.type then
34+
if type_opts.transform then
35+
local tmp = type_opts.transform(value)
36+
if tmp then
37+
value = tmp
38+
found = true
39+
break
40+
end
41+
else
42+
found = true
43+
break
44+
end
45+
end
46+
end
47+
48+
if not found then
49+
local err = role_name .. ": " .. param_name .. " must be "
50+
for i, t in ipairs(types) do
51+
err = err .. types_map[t].err
52+
if i ~= #types then
53+
err = err .. " or "
54+
end
55+
end
56+
return false, err
57+
end
58+
return true, value
59+
end
60+
61+
local function get_task_options(opts)
62+
local opts_map = {
63+
args = {"any"},
64+
atomic_iteration = {"b"},
65+
force = {"b"},
66+
force_allow_functional_index = {"b"},
67+
full_scan_delay = {"n"},
68+
full_scan_time = {"n"},
69+
index = {"n", "s"},
70+
iterate_with = {"f"},
71+
iteration_delay = {"n"},
72+
iterator_type = {"n", "s"},
73+
on_full_scan_complete = {"f"},
74+
on_full_scan_error = {"f"},
75+
on_full_scan_start = {"f"},
76+
on_full_scan_success = {"f"},
77+
process_expired_tuple = {"f"},
78+
process_while = {"f"},
79+
start_key = {"f", "t"},
80+
tuples_per_iteration = {"n"},
81+
vinyl_assumed_space_len_factor = {"n"},
82+
vinyl_assumed_space_len = {"n"},
83+
}
84+
if opts == nil then
85+
return
86+
end
87+
88+
for opt, val in pairs(opts) do
89+
if type(opt) ~= "string" then
90+
error(role_name .. ": an option must be a string")
91+
end
92+
if opts_map[opt] == nil then
93+
error(role_name .. ": unsupported option '" .. opt .. "'")
94+
end
95+
local ok, res = get_param("options." .. opt, val, opts_map[opt])
96+
if not ok then
97+
error(res)
98+
end
99+
opts[opt] = res
100+
end
101+
102+
return opts
103+
end
104+
105+
local function get_task_config(task_conf)
106+
-- setmetatable resets __newindex write protection on a copy
107+
local conf = setmetatable(table.deepcopy(task_conf), {})
108+
local params_map = {
109+
space_id = {required = true, types = {"n", "s"}},
110+
is_expired = {required = true, types = {"f"}},
111+
is_master_only = {required = false, types = {"b"}},
112+
options = {required = false, types = {"t"}},
113+
}
114+
for k, _ in pairs(conf) do
115+
if type(k) ~= "string" then
116+
error(role_name .. ": param must be a string")
117+
end
118+
if params_map[k] == nil then
119+
error(role_name .. ": unsupported param " .. k)
120+
end
121+
end
122+
123+
for param, opts in pairs(params_map) do
124+
if opts.required and conf[param] == nil then
125+
error(role_name .. ": " .. param .. " is required")
126+
end
127+
if conf[param] ~= nil then
128+
local ok, res = get_param(param, conf[param], opts.types)
129+
if not ok then
130+
error(res)
131+
end
132+
conf[param] = res
133+
end
134+
end
135+
136+
conf.options = get_task_options(conf.options)
137+
return conf
138+
end
139+
140+
local function init()
141+
142+
end
143+
144+
local function validate_config(conf_new)
145+
local conf = conf_new[role_name] or {}
146+
147+
for task_name, task_conf in pairs(conf) do
148+
local ok, res = get_param("task name", task_name, {"s"})
149+
if not ok then
150+
error(res)
151+
end
152+
local ok, res = get_param("task params", task_conf, {"t"})
153+
if not ok then
154+
error(res)
155+
end
156+
get_task_config(task_conf)
157+
end
158+
159+
return true
160+
end
161+
162+
local function apply_config(conf_new, opts)
163+
local conf = conf_new[role_name] or {}
164+
165+
-- finishes tasks from an old configuration
166+
for i=#started,1,-1 do
167+
local task_name = started[i]
168+
local ok, _ = pcall(expirationd.task, task_name)
169+
if ok then
170+
if conf[task_name] then
171+
expirationd.task(task_name):stop()
172+
else
173+
expirationd.task(task_name):kill()
174+
end
175+
end
176+
table.remove(started, i)
177+
end
178+
179+
for task_name, task_conf in pairs(conf) do
180+
task_conf = get_task_config(task_conf)
181+
182+
local skip = task_conf.is_master_only and not opts.is_master
183+
if not skip then
184+
local task = expirationd.start(task_name, task_conf.space_id,
185+
task_conf.is_expired,
186+
task_conf.options)
187+
if task == nil then
188+
error(role_name .. ": unable to start task " .. task_name)
189+
end
190+
table.insert(started, task_name)
191+
end
192+
end
193+
end
194+
195+
local function stop()
196+
for _, task_name in pairs(expirationd.tasks()) do
197+
local task = expirationd.task(task_name)
198+
task:stop()
199+
end
200+
end
201+
202+
return setmetatable({
203+
role_name = role_name,
204+
init = init,
205+
validate_config = validate_config,
206+
apply_config = apply_config,
207+
stop = stop,
208+
}, { __index = expirationd })

debian/tarantool-expirationd.install

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
expirationd.lua usr/share/tarantool/
2+
cartridge/roles/expirationd.lua usr/share/tarantool/cartridge/roles/

expirationd-scm-1.rockspec

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ build = {
1818
type = "builtin",
1919
modules = {
2020
["expirationd"] = "expirationd.lua",
21+
["cartridge.roles.expirationd"] = "cartridge/roles/expirationd.lua",
2122
}
2223
}
2324
-- vim: syntax=lua

rpm/tarantool-expirationd.spec

+3
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@ some other space.
2727
%install
2828
install -d %{buildroot}%{_datarootdir}/tarantool/
2929
install -m 0644 expirationd.lua %{buildroot}%{_datarootdir}/tarantool/
30+
install -d %{buildroot}%{_datarootdir}/tarantool/cartridge/roles/
31+
install -m 0644 cartridge/roles/expirationd.lua %{buildroot}%{_datarootdir}/tarantool/cartridge/roles/expirationd.lua
3032

3133
%files
3234
%{_datarootdir}/tarantool/expirationd.lua
35+
%{_datarootdir}/tarantool/cartridge
3336
%doc README.md
3437
%{!?_licensedir:%global license %doc}
3538
%license LICENSE

test/entrypoint/srv_role.lua

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#!/usr/bin/env tarantool
2+
3+
require('strict').on()
4+
_G.is_initialized = function() return false end
5+
6+
local log = require('log')
7+
local errors = require('errors')
8+
local fiber = require('fiber')
9+
local cartridge = require('cartridge')
10+
local hotreload = require('cartridge.hotreload')
11+
12+
package.preload['customers-storage'] = function()
13+
return {
14+
role_name = 'customers-storage',
15+
init = function()
16+
local customers_space = box.schema.space.create('customers', {
17+
format = {
18+
{name = 'id', type = 'unsigned'},
19+
},
20+
if_not_exists = true,
21+
engine = 'memtx',
22+
})
23+
24+
customers_space:create_index('id', {
25+
parts = { {field = 'id'} },
26+
unique = true,
27+
type = 'TREE',
28+
if_not_exists = true,
29+
})
30+
end,
31+
}
32+
end
33+
34+
local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, {
35+
advertise_uri = 'localhost:3301',
36+
http_port = 8081,
37+
bucket_count = 3000,
38+
roles = {
39+
'customers-storage',
40+
'cartridge.roles.vshard-router',
41+
'cartridge.roles.vshard-storage',
42+
'cartridge.roles.expirationd'
43+
},
44+
roles_reload_allowed = true,
45+
})
46+
47+
if not ok then
48+
log.error('%s', err)
49+
os.exit(1)
50+
end
51+
52+
_G.is_initialized = cartridge.is_healthy
53+
_G.always_true_test = function() return true end
54+
_G.is_expired_test_continue = function(_, tuple)
55+
if rawget(_G, "is_expired_test_first_tuple") == nil then
56+
rawset(_G, "is_expired_test_first_tuple", tuple)
57+
end
58+
59+
local cnt = rawget(_G, "is_expired_test_wait_cnt") or 0
60+
cnt = cnt + 1
61+
rawset(_G, "is_expired_test_wait_cnt", cnt)
62+
if cnt == 5 then
63+
fiber.sleep(60)
64+
end
65+
return true
66+
end
67+
68+
hotreload.whitelist_globals({"always_true_test"})
69+
hotreload.whitelist_globals({"is_expired_test_continue"})

0 commit comments

Comments
 (0)