Skip to content

Commit c6a93ea

Browse files
committed
cartridge: add a role
The patch adds a Tarantool Cartridge role with features: * expirationd as a Tarantool Cartridge service for easy access to all API calls. * The role stops all expirationd tasks on an instance on the role termination. * It can automatically start and kill old tasks from the role congiguration. Closes #107
1 parent cc8c16a commit c6a93ea

File tree

8 files changed

+1052
-0
lines changed

8 files changed

+1052
-0
lines changed

README.md

+62
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,65 @@ $ make SEED=1334 test
145145
luatest -v --coverage --shuffle all:1334
146146
...
147147
```
148+
149+
## Cartridge role
150+
151+
`cartridge.roles.expirationd` is a Tarantool Cartridge role for the expirationd
152+
package with the features:
153+
154+
* It registers expirationd as a Tarantool Cartridge service for easy access to
155+
all [API calls](https://tarantool.github.io/expirationd/#Module_functions):
156+
```Lua
157+
local task = cartridge.service_get('expiratind').start("task_name", id, is_expired)
158+
task:kill()
159+
```
160+
* The role stops all expirationd tasks on an instance on the role termination.
161+
* The role can automatically start or kill old tasks from the role
162+
configuration:
163+
164+
```yaml
165+
expirationd:
166+
task_name1:
167+
space_id: 579
168+
is_expired: is_expired_func_name_in__G
169+
is_master_only: true
170+
options:
171+
args:
172+
- any
173+
atomic_iteration: false
174+
force: false
175+
force_allow_functional_index: true
176+
full_scan_delay: 1
177+
full_scan_time: 1
178+
index: 0
179+
iterate_with: iterate_with_func_name_in__G
180+
iteration_delay: 1
181+
iterator_type: ALL
182+
on_full_scan_complete: on_full_scan_complete_func_name_in__G
183+
on_full_scan_error: on_full_scan_error_func_name_in__G
184+
on_full_scan_start: on_full_scan_start_func_name_in__G
185+
on_full_scan_success: on_full_scan_success_func_name_in__G
186+
process_expired_tuple: process_expired_tuple_func_name_in__G
187+
process_while: process_while_func_name_in__G
188+
start_key:
189+
- 1
190+
tuples_per_iteration: 100
191+
vinyl_assumed_space_len: 100
192+
vinyl_assumed_space_len_factor: 1
193+
task_name2:
194+
...
195+
```
196+
197+
[expirationd.start()](https://tarantool.github.io/expirationd/#start) has
198+
the same parameters with the same meaning. Except for the additional optional
199+
param `is_master_only`. It defines is a task should be started only on a
200+
master instance. It is `false` by default.
201+
202+
Also you need to be careful with the parameters-functions. The string is
203+
a key of in the global variable `_G` and the value must be a function. So
204+
something like this should be done before initializing the role:
205+
```Lua
206+
rawset(_G, "is_expired_func_name_in__G", function(args, tuple)
207+
--logic
208+
end)
209+
```

cartridge/roles/expirationd.lua

+209
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
local expirationd = require("expirationd")
2+
local role_name = "expirationd"
3+
local started = require("cartridge.vars").new(role_name)
4+
5+
local function load_function(func_name)
6+
if func_name == nil or type(func_name) ~= 'string' then
7+
return nil
8+
end
9+
10+
local func = rawget(_G, func_name)
11+
if func == nil or type(func) ~= 'function' then
12+
return nil
13+
end
14+
return func
15+
end
16+
17+
local function get_param(param_name, value, types)
18+
local types_map = {
19+
b = {type = "boolean", err = "a boolean"},
20+
n = {type = "number", err = "a number"},
21+
s = {type = "string", err = "a string"},
22+
f = {type = "string", transform = load_function, err = "a function name in _G"},
23+
t = {type = "table", err = "a table"},
24+
any = {err = "any type"},
25+
}
26+
27+
local found = false
28+
for _, t in ipairs(types) do
29+
local type_opts = types_map[t]
30+
if type_opts == nil then
31+
error(role_name .. ": unsupported type option")
32+
end
33+
if not type_opts.type or type(value) == type_opts.type then
34+
if type_opts.transform then
35+
local tmp = type_opts.transform(value)
36+
if tmp then
37+
value = tmp
38+
found = true
39+
break
40+
end
41+
else
42+
found = true
43+
break
44+
end
45+
end
46+
end
47+
48+
if not found then
49+
local err = role_name .. ": " .. param_name .. " must be "
50+
for i, t in ipairs(types) do
51+
err = err .. types_map[t].err
52+
if i ~= #types then
53+
err = err .. " or "
54+
end
55+
end
56+
return false, err
57+
end
58+
return true, value
59+
end
60+
61+
local function get_task_options(opts)
62+
local opts_map = {
63+
args = {"any"},
64+
atomic_iteration = {"b"},
65+
force = {"b"},
66+
force_allow_functional_index = {"b"},
67+
full_scan_delay = {"n"},
68+
full_scan_time = {"n"},
69+
index = {"n", "s"},
70+
iterate_with = {"f"},
71+
iteration_delay = {"n"},
72+
iterator_type = {"n", "s"},
73+
on_full_scan_complete = {"f"},
74+
on_full_scan_error = {"f"},
75+
on_full_scan_start = {"f"},
76+
on_full_scan_success = {"f"},
77+
process_expired_tuple = {"f"},
78+
process_while = {"f"},
79+
start_key = {"f", "t"},
80+
tuples_per_iteration = {"n"},
81+
vinyl_assumed_space_len_factor = {"n"},
82+
vinyl_assumed_space_len = {"n"},
83+
}
84+
if opts == nil then
85+
return
86+
end
87+
88+
for opt, val in pairs(opts) do
89+
if type(opt) ~= "string" then
90+
error(role_name .. ": an option must be a string")
91+
end
92+
if opts_map[opt] == nil then
93+
error(role_name .. ": unsupported option '" .. opt .. "'")
94+
end
95+
local ok, res = get_param("options." .. opt, val, opts_map[opt])
96+
if not ok then
97+
error(res)
98+
end
99+
opts[opt] = res
100+
end
101+
102+
return opts
103+
end
104+
105+
local function get_task_config(task_conf)
106+
local conf = table.deepcopy(task_conf)
107+
local params_map = {
108+
space_id = {required = true, types = {"n", "s"}},
109+
is_expired = {required = true, types = {"f"}},
110+
is_master_only = {required = false, types = {"b"}},
111+
options = {required = false, types = {"t"}},
112+
}
113+
for k, _ in pairs(conf) do
114+
if type(k) ~= "string" then
115+
error(role_name .. ": param must be a string")
116+
end
117+
if params_map[k] == nil then
118+
error(role_name .. ": unsupported param " .. k)
119+
end
120+
end
121+
122+
for param, opts in pairs(params_map) do
123+
if opts.required and conf[param] == nil then
124+
error(role_name .. ": " .. param .. " is required")
125+
end
126+
if conf[param] ~= nil then
127+
local ok, res = get_param(param, conf[param], opts.types)
128+
if not ok then
129+
error(res)
130+
end
131+
conf[param] = res
132+
end
133+
end
134+
135+
conf.options = get_task_options(conf.options)
136+
return conf
137+
end
138+
139+
local function init()
140+
141+
end
142+
143+
local function validate_config(conf_new)
144+
local conf = table.deepcopy(conf_new[role_name] or {})
145+
146+
for task_name, task_conf in pairs(conf) do
147+
local ok, res = get_param("task name", task_name, {"s"})
148+
if not ok then
149+
error(res)
150+
end
151+
get_task_config(task_conf)
152+
end
153+
154+
return true
155+
end
156+
157+
local function apply_config(conf_new, opts)
158+
local conf = table.deepcopy(conf_new[role_name] or {})
159+
160+
-- finishes tasks from an old configuration
161+
for i=#started,1,-1 do
162+
local task_name = started[i]
163+
local ok, _ = pcall(expirationd.task, task_name)
164+
if ok then
165+
if conf[task_name] then
166+
expirationd.task(task_name):stop()
167+
else
168+
expirationd.task(task_name):kill()
169+
end
170+
end
171+
table.remove(started, i)
172+
end
173+
174+
for task_name, task_conf in pairs(conf) do
175+
task_conf = get_task_config(task_conf)
176+
177+
local skip = task_conf.is_master_only and not opts.is_master
178+
if not skip then
179+
local task = expirationd.start(task_name, task_conf.space_id,
180+
task_conf.is_expired,
181+
task_conf.options)
182+
if task == nil then
183+
error(role_name .. ": unable to start task " .. task_name)
184+
end
185+
table.insert(started, task_name)
186+
end
187+
end
188+
end
189+
190+
local function stop()
191+
for _, task_name in pairs(expirationd.tasks()) do
192+
local task = expirationd.task(task_name)
193+
task:stop()
194+
end
195+
end
196+
197+
local role = {
198+
role_name = role_name,
199+
init = init,
200+
validate_config = validate_config,
201+
apply_config = apply_config,
202+
stop = stop,
203+
}
204+
205+
for k, v in pairs(expirationd) do
206+
role[k] = v
207+
end
208+
209+
return role

debian/tarantool-expirationd.install

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
expirationd.lua usr/share/tarantool/
2+
cartridge/roles/expirationd.lua usr/share/tarantool/cartridge/roles/

expirationd-scm-1.rockspec

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ build = {
1818
type = "builtin",
1919
modules = {
2020
["expirationd"] = "expirationd.lua",
21+
["cartridge.roles.expirationd"] = "cartridge/roles/expirationd.lua",
2122
}
2223
}
2324
-- vim: syntax=lua

rpm/tarantool-expirationd.spec

+3
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@ make test
2929
%install
3030
install -d %{buildroot}%{_datarootdir}/tarantool/
3131
install -m 0644 expirationd.lua %{buildroot}%{_datarootdir}/tarantool/
32+
install -d %{buildroot}%{_datarootdir}/tarantool/cartridge/roles/
33+
install -m 0644 cartridge/roles/expirationd.lua %{buildroot}%{_datarootdir}/tarantool/cartridge/roles/expirationd.lua
3234

3335
%files
3436
%{_datarootdir}/tarantool/expirationd.lua
37+
%{_datarootdir}/tarantool/cartridge
3538
%doc README.md
3639
%{!?_licensedir:%global license %doc}
3740
%license LICENSE

test/entrypoint/srv_role.lua

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#!/usr/bin/env tarantool
2+
3+
require('strict').on()
4+
_G.is_initialized = function() return false end
5+
6+
local log = require('log')
7+
local errors = require('errors')
8+
local cartridge = require('cartridge')
9+
10+
package.preload['customers-storage'] = function()
11+
return {
12+
role_name = 'customers-storage',
13+
init = function()
14+
local customers_space = box.schema.space.create('customers', {
15+
format = {
16+
{name = 'id', type = 'unsigned'},
17+
},
18+
if_not_exists = true,
19+
engine = 'memtx',
20+
})
21+
22+
customers_space:create_index('id', {
23+
parts = { {field = 'id'} },
24+
unique = true,
25+
type = 'TREE',
26+
if_not_exists = true,
27+
})
28+
end,
29+
}
30+
end
31+
32+
local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, {
33+
advertise_uri = 'localhost:3301',
34+
http_port = 8081,
35+
bucket_count = 3000,
36+
roles = {
37+
'customers-storage',
38+
'cartridge.roles.vshard-router',
39+
'cartridge.roles.vshard-storage',
40+
'cartridge.roles.expirationd'
41+
},
42+
roles_reload_allowed = true
43+
})
44+
45+
if not ok then
46+
log.error('%s', err)
47+
os.exit(1)
48+
end
49+
50+
_G.is_initialized = cartridge.is_healthy

0 commit comments

Comments
 (0)