Skip to content

Commit e2bb86a

Browse files
committed
Iterate over any index
Added the ability to iterate over any index by specifying the index name in options. The default is primary index. ci: installation, caching and running luatest PHONY added to Makefile as makefile target and luatests folder are the same. Needed for: #50
1 parent b6dc62c commit e2bb86a

File tree

6 files changed

+417
-8
lines changed

6 files changed

+417
-8
lines changed

.github/workflows/fast_testing.yaml

+12
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,16 @@ jobs:
3535
- name: Clone the module
3636
uses: actions/checkout@v2
3737

38+
- name: Cache rocks
39+
uses: actions/cache@v2
40+
id: cache-rocks
41+
with:
42+
path: .rocks/
43+
key: cache-rocks-${{ matrix.tarantool }}-01
44+
45+
- name: Install requirements
46+
run: |
47+
tarantoolctl rocks install luatest 0.5.0
48+
if: steps.cache-rocks.outputs.cache-hit != 'true'
49+
3850
- run: make test

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
all:
22
@echo "Only tests are available: make test"
33

4+
.PHONY: test
45
test:
6+
.rocks/bin/luatest -v
57
rm -rf *.xlog* *.snap 51{2,3,4,5,6,7}
68
INDEX_TYPE='TREE' SPACE_TYPE='vinyl' ./test.lua
79
rm -rf *.xlog* *.snap 51{2,3,4,5,6,7}

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ Run a scheduled task to check and process (expire) tuples in a given space.
4040
* `options` -- (table with named options, may be nil)
4141
* `process_expired_tuple` - Applied to expired tuples, receives (space_id, args, tuple) as arguments.
4242
Can be nil: by default, tuples are removed.
43+
* `index` - Name or id of the index to iterate on. If omitted, will use the primary index.
44+
If there's no index with this name, will throw an error.
45+
Supported index types are TREE and HASH, using other types will result in an error.
4346
* `tuples_per_iteration` - Number of tuples to check in one batch (iteration). Default is 1024.
4447
* `on_full_scan_start` - Function to call before starting a tuple scan.
4548
* `on_full_scan_complete` - Function to call after completing a full scan.

expirationd.lua

+30-8
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ local function expiration_process(task, tuple)
6969
task.checked_tuples_count = task.checked_tuples_count + 1
7070
if task.is_tuple_expired(task.args, tuple) then
7171
task.expired_tuples_count = task.expired_tuples_count + 1
72-
task.process_expired_tuple(task.space_id, task.args, tuple)
72+
task.process_expired_tuple(task.space_id, task.args, tuple, task)
7373
end
7474
end
7575

@@ -80,39 +80,38 @@ local function suspend_basic(task, len)
8080
fiber.sleep(delay)
8181
end
8282

83-
local function suspend(scan_space, task)
83+
local function suspend(task)
8484
-- Return the number of tuples in the space
85-
local space_len = scan_space:len()
85+
local space_len = task.index:len()
8686
if space_len > 0 then
8787
suspend_basic(task, space_len)
8888
end
8989
end
9090

9191
local function default_do_worker_iteration(task)
9292
-- full index scan loop
93-
local scan_space = box.space[task.space_id]
9493
local space_len = task.vinyl_assumed_space_len
9594
local checked_tuples_count = 0
9695
local vinyl_checked_tuples_count = 0
97-
for _, tuple in scan_space.index[0]:pairs(nil, {iterator = box.index.ALL}) do
96+
for _, tuple in task.index:pairs(nil, {iterator = box.index.ALL}) do
9897
checked_tuples_count = checked_tuples_count + 1
9998
vinyl_checked_tuples_count = vinyl_checked_tuples_count + 1
10099
expiration_process(task, tuple)
101100
-- find out if the worker can go to sleep
102101
-- if the batch is full
103102
if checked_tuples_count >= task.tuples_per_iteration then
104103
checked_tuples_count = 0
105-
if scan_space.engine == "vinyl" then
104+
if box.space[task.space_id].engine == "vinyl" then
106105
if vinyl_checked_tuples_count > space_len then
107106
space_len = task.vinyl_assumed_space_len_factor * space_len
108107
end
109108
suspend_basic(task, space_len)
110109
else
111-
suspend(scan_space, task)
110+
suspend(task)
112111
end
113112
end
114113
end
115-
if scan_space.engine == "vinyl" then
114+
if box.space[task.space_id].engine == "vinyl" then
116115
task.vinyl_assumed_space_len = vinyl_checked_tuples_count
117116
end
118117
end
@@ -222,6 +221,7 @@ local function create_task(name)
222221
is_tuple_expired = nil,
223222
process_expired_tuple = nil,
224223
args = nil,
224+
index = nil,
225225
iteration_delay = constants.max_delay,
226226
full_scan_delay = constants.max_delay,
227227
tuples_per_iteration = constants.default_tuples_per_iteration,
@@ -271,6 +271,9 @@ end
271271
-- options = { -- (table with named options)
272272
-- * process_expired_tuple -- Applied to expired tuples, receives (space_id, args, tuple) as arguments;
273273
-- can be nil: by default, tuples are removed.
274+
-- * index -- Name or id of the index to iterate on; if omitted, will use the primary index;
275+
-- if there's no index with this name, will throw an error.
276+
-- supported index types are TREE and HASH, using other types will result in an error.
274277
-- * on_full_scan_start -- Function to call before starting a full scan iteration.
275278
-- * on_full_scan_complete -- Function to call after completing a full scan iteration.
276279
-- * on_full_scan_success -- Function to call after successfully completing a full scan iteration.
@@ -322,6 +325,25 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
322325
end
323326
task.process_expired_tuple = options.process_expired_tuple or default_tuple_drop
324327

328+
-- validate index
329+
local expire_index = box.space[space_id].index[0]
330+
if options.index then
331+
if box.space[space_id].index[options.index] == nil then
332+
if type(options.index) == "string" then
333+
error("Index with name " .. options.index .. " does not exist")
334+
elseif type(options.index) == "number" then
335+
error("Index with id " .. options.index .. " does not exist")
336+
else
337+
error("Invalid type of index, expected string or number")
338+
end
339+
end
340+
expire_index = box.space[space_id].index[options.index]
341+
if expire_index.type ~= "TREE" and expire_index.type ~= "HASH" then
342+
error("Not supported index type, expected TREE or HASH")
343+
end
344+
end
345+
task.index = expire_index
346+
325347
-- check expire and process after expiration handler's arguments
326348
task.args = options.args
327349

test/helper.lua

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
local t = require("luatest")
2+
local fio = require("fio")
3+
4+
local helpers = require("luatest.helpers")
5+
6+
t.before_suite(function()
7+
t.datadir = fio.tempdir()
8+
box.cfg{
9+
wal_dir = t.datadir,
10+
memtx_dir = t.datadir,
11+
vinyl_dir = t.datadir,
12+
}
13+
14+
local tree = box.schema.create_space("tree", { if_not_exists = true })
15+
tree:format({
16+
{name = "id", type = "number"},
17+
{name = "first_name", type = "string"},
18+
{name = "value", type = "number", is_nullable = true},
19+
{name = "count", type = "number", is_nullable = true},
20+
{name = "non_unique_id", type = "number", is_nullable = true, unique = false},
21+
{name = "json_path_field", is_nullable = true, unique = false},
22+
{name = "multikey_field", is_nullable = true},
23+
{name = "functional_field", is_nullable = true},
24+
})
25+
tree:create_index("primary", {type = "TREE", parts={ 1 }})
26+
tree:create_index("index_for_first_name", {type = "TREE", parts={ 2 }})
27+
tree:create_index("multipart_index", {type = "TREE", parts={ {3, is_nullable = true}, {4, is_nullable = true} }})
28+
tree:create_index("non_unique_index", {type = "TREE", parts={ {5, is_nullable = true} }, unique = false})
29+
30+
local hash = box.schema.create_space("hash", { if_not_exists = true })
31+
hash:format({
32+
{name = "id", type = "number"},
33+
{name = "first_name", type = "string"},
34+
{name = "value", type = "number", is_nullable = true},
35+
{name = "count", type = "number", is_nullable = true}
36+
})
37+
hash:create_index("primary", {type = "HASH", parts={ 1 }} )
38+
hash:create_index("index_for_first_name", {type = "HASH", parts={ 2 }} )
39+
hash:create_index("multipart_index", {type = "HASH", parts={ {1}, {2} }})
40+
41+
local vinyl = box.schema.create_space("vinyl", { if_not_exists = true, engine = "vinyl" })
42+
vinyl:format({
43+
{name = "id", type = "number"},
44+
{name = "first_name", type = "string"},
45+
{name = "value", type = "number", is_nullable = true},
46+
{name = "count", type = "number", is_nullable = true},
47+
{name = "non_unique_id", type = "number", is_nullable = true, unique = false},
48+
{name = "json_path_field", is_nullable = true, unique = false},
49+
{name = "multikey_field", is_nullable = true, unique = false},
50+
})
51+
vinyl:create_index("primary", {type = "TREE", parts={ 1 }})
52+
vinyl:create_index("index_for_first_name", {type = "TREE", parts={ 2 }})
53+
vinyl:create_index("multipart_index", {type = "TREE", parts={ {3, is_nullable = true}, {4, is_nullable = true} }})
54+
vinyl:create_index("non_unique_index", {type = "TREE", parts={ {5} }, unique = false })
55+
56+
if _TARANTOOL >= "2" then
57+
local tree_code = [[function(tuple)
58+
if tuple[8] then
59+
return {string.sub(tuple[8],2,2)}
60+
end
61+
return {tuple[2]}
62+
end]]
63+
box.schema.func.create("tree_func",
64+
{body = tree_code, is_deterministic = true, is_sandboxed = true})
65+
tree:create_index("json_path_index",
66+
{type = "TREE", parts = { {6, type = "scalar", path = "age", is_nullable = true} }})
67+
tree:create_index("multikey_index",
68+
{type = "TREE", parts = { {7, type = "str", path = "data[*].name"} }} )
69+
tree:create_index("functional_index",
70+
{type = "TREE", parts={ {1, type = "string"} }, func = "tree_func"})
71+
72+
vinyl:create_index("json_path_index",
73+
{type = "TREE", parts = { {6, type = "scalar", path = "age", is_nullable = true} }})
74+
vinyl:create_index("multikey_index",
75+
{type = "TREE", parts = { {7, type = "str", path = "data[*].name", is_nullable = true} }})
76+
end
77+
78+
local bitset = box.schema.create_space("bitset", { if_not_exists = true })
79+
bitset:create_index("primary", {type = "TREE", parts={ 1 }})
80+
bitset:create_index("index_for_first_name",
81+
{type = "BITSET", parts={ {field = 2, type = "string"} }, unique = false})
82+
end)
83+
84+
t.after_suite(function()
85+
fio.rmtree(t.datadir)
86+
end)
87+
88+
function helpers.init_spaces(g)
89+
g.tree = box.space.tree
90+
g.hash = box.space.hash
91+
g.vinyl = box.space.vinyl
92+
g.bitset = box.space.bitset
93+
end
94+
95+
function helpers.truncate_spaces(g)
96+
g.tree:truncate()
97+
g.hash:truncate()
98+
g.vinyl:truncate()
99+
end
100+
101+
function helpers.is_expired_true()
102+
return true
103+
end
104+
105+
helpers.iteration_result = {}
106+
function helpers.is_expired_debug(_, tuple)
107+
table.insert(helpers.iteration_result, tuple)
108+
return true
109+
end
110+
111+
return helpers

0 commit comments

Comments
 (0)