Skip to content

Commit 3b459a2

Browse files
ArtDuligurio
authored andcommitted
Iterate_with and process_while
For more flexible functionality, added the ability to create a custom iterator that will be created at the selected index (iterate_with). You can also pass a predicate that will stop the fullscan process, if required(process_while). Needed for: #50
1 parent 569ab85 commit 3b459a2

File tree

4 files changed

+136
-1
lines changed

4 files changed

+136
-1
lines changed

README.md

+4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ Run a scheduled task to check and process (expire) tuples in a given space.
5050
The index value may be a single value, if the index consists of one field, a tuple with the index key parts, or a function which returns such value.
5151
If omitted or nil, all tuples will be checked.
5252
* `tuples_per_iteration` - Number of tuples to check in one batch (iteration). Default is 1024.
53+
* `process_while` - Function to call before checking each tuple.
54+
If it returns false, the current tuple scan task finishes.
55+
* `iterate_with` - Function which returns an iterator object which provides tuples to check, considering the start_key, process_while and other options.
56+
There's a default function which can be overriden with this parameter.
5357
* `on_full_scan_start` - Function to call before starting a tuple scan.
5458
* `on_full_scan_complete` - Function to call after completing a full scan.
5559
* `on_full_scan_success` - Function to call after successfully completing a full scan.

expirationd.lua

+36-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ local constants = {
4848
default_on_full_scan = function() end,
4949
-- default function for start_key
5050
start_key = function() return nil end,
51+
-- default function for process_while
52+
process_while = function() return true end,
5153
-- default iterating over the loop will go in ascending index
5254
iterator_type = "ALL",
5355
}
@@ -97,7 +99,7 @@ local function default_do_worker_iteration(task)
9799
local space_len = task.vinyl_assumed_space_len
98100
local checked_tuples_count = 0
99101
local vinyl_checked_tuples_count = 0
100-
for _, tuple in task.index:pairs(task.start_key(), {iterator = task.iterator_type}) do
102+
for _, tuple in task:iterate_with() do
101103
checked_tuples_count = checked_tuples_count + 1
102104
vinyl_checked_tuples_count = vinyl_checked_tuples_count + 1
103105
expiration_process(task, tuple)
@@ -226,6 +228,7 @@ local function create_task(name)
226228
process_expired_tuple = nil,
227229
args = nil,
228230
index = nil,
231+
iterate_with = nil,
229232
iteration_delay = constants.max_delay,
230233
full_scan_delay = constants.max_delay,
231234
tuples_per_iteration = constants.default_tuples_per_iteration,
@@ -237,6 +240,7 @@ local function create_task(name)
237240
on_full_scan_start = constants.default_on_full_scan,
238241
on_full_scan_complete = constants.default_on_full_scan,
239242
start_key = constants.start_key,
243+
process_while = constants.process_while,
240244
iterator_type = constants.iterator_type,
241245
}, { __index = Task_methods })
242246
return task
@@ -262,6 +266,16 @@ local function default_tuple_drop(space_id, args, tuple)
262266
end
263267

264268

269+
-- default iterate_with function
270+
local function default_iterate_with(task)
271+
return task.index:pairs(task.start_key(), { iterator = task.iterator_type })
272+
:take_while(
273+
function()
274+
return task:process_while()
275+
end
276+
)
277+
end
278+
265279
-- ========================================================================= --
266280
-- Expiration daemon management functions
267281
-- ========================================================================= --
@@ -289,6 +303,11 @@ end
289303
-- or a function which returns such value;
290304
-- if omitted or nil, all tuples will be checked.
291305
-- * tuples_per_iteration -- Number of tuples to check in one batch (iteration); default is 1024.
306+
-- * process_while -- Function to call before checking each tuple;
307+
-- if it returns false, the task will stop until next full scan.
308+
-- * iterate_with -- Function which returns an iterator object which provides tuples to check,
309+
-- considering the start_key, process_while and other options.
310+
-- There's a default function which can be overriden with this parameter.
292311
-- * on_full_scan_start -- Function to call before starting a full scan iteration.
293312
-- * on_full_scan_complete -- Function to call after completing a full scan iteration.
294313
-- * on_full_scan_success -- Function to call after successfully completing a full scan iteration.
@@ -375,6 +394,22 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
375394
-- check valid of iterator_type and start key
376395
task.index:pairs( task.start_key(), { iterator = task.iterator_type })
377396

397+
-- check process_while
398+
if options.process_while ~= nil then
399+
if type(options.process_while) ~= "function" then
400+
error("Invalid type of process_while, expected function")
401+
end
402+
task.process_while = options.process_while
403+
end
404+
405+
-- check iterate_with
406+
if options.iterate_with ~= nil then
407+
if type(options.iterate_with) ~= "function" then
408+
error("Invalid type of iterate_with, expected function")
409+
end
410+
end
411+
task.iterate_with = options.iterate_with or default_iterate_with
412+
378413
-- check expire and process after expiration handler's arguments
379414
task.args = options.args
380415

test/unit/iterate_with_test.lua

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
local expirationd = require("expirationd")
2+
local t = require("luatest")
3+
local g = t.group("iterate_with")
4+
5+
local helpers = require("test.helper")
6+
7+
g.before_all(function()
8+
helpers.init_spaces(g)
9+
end)
10+
11+
g.after_each(function()
12+
helpers.truncate_spaces(g)
13+
end)
14+
15+
function g.test_passing()
16+
local task = expirationd.start("clean_all", g.tree.id, helpers.is_expired_true,
17+
{ iterate_with = helpers.is_expired_true })
18+
-- default process_while always return false, iterations never stopped by this function
19+
t.assert_equals(task.iterate_with, helpers.is_expired_true)
20+
task:kill()
21+
22+
-- errors
23+
t.assert_error_msg_content_equals("Invalid type of iterate_with, expected function",
24+
expirationd.start, "clean_all", g.tree.id, helpers.is_expired_true,
25+
{ iterate_with = "" })
26+
end

test/unit/process_while_test.lua

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
local expirationd = require("expirationd")
2+
local t = require("luatest")
3+
local g = t.group("process_while")
4+
5+
local helpers = require("test.helper")
6+
7+
g.before_all(function()
8+
helpers.init_spaces(g)
9+
end)
10+
11+
g.after_each(function()
12+
helpers.truncate_spaces(g)
13+
end)
14+
15+
function g.test_passing()
16+
local task = expirationd.start("clean_all", g.tree.id, helpers.is_expired_true)
17+
-- default process_while always return false, iterations never stopped by this function
18+
t.assert_equals(task.process_while(), true)
19+
task:kill()
20+
21+
local function process_while()
22+
return false
23+
end
24+
25+
task = expirationd.start("clean_all", g.tree.id, helpers.is_expired_true,
26+
{process_while = process_while})
27+
t.assert_equals(task.process_while(), false)
28+
task:kill()
29+
30+
-- errors
31+
t.assert_error_msg_content_equals("Invalid type of process_while, expected function",
32+
expirationd.start, "clean_all", g.tree.id, helpers.is_expired_true,
33+
{ process_while = "" })
34+
end
35+
36+
local function process_while(task)
37+
if task.checked_tuples_count >= 1 then return false end
38+
return true
39+
end
40+
41+
function g.test_tree_index()
42+
for _, space in pairs({g.tree, g.vinyl}) do
43+
helpers.iteration_result = {}
44+
space:insert({1, "3"})
45+
space:insert({2, "2"})
46+
space:insert({3, "1"})
47+
local task = expirationd.start("clean_all", space.id, helpers.is_expired_debug,
48+
{process_while = process_while})
49+
-- wait for tuples expired
50+
helpers.retrying({}, function()
51+
t.assert_equals(helpers.iteration_result, {{1, "3"}})
52+
end)
53+
task:kill()
54+
end
55+
end
56+
57+
function g.test_hash_index()
58+
helpers.iteration_result = {}
59+
g.hash:insert({1, "3"})
60+
g.hash:insert({2, "2"})
61+
g.hash:insert({3, "1"})
62+
63+
local task = expirationd.start("clean_all", g.hash.id, helpers.is_expired_debug,
64+
{process_while = process_while})
65+
-- wait for tuples expired
66+
helpers.retrying({}, function()
67+
t.assert_equals(helpers.iteration_result, {{3, "1"}})
68+
end)
69+
task:kill()
70+
end

0 commit comments

Comments
 (0)