Skip to content

Commit fa67dcf

Browse files
wip! to be completed
1 parent b9a591c commit fa67dcf

File tree

7 files changed

+200
-23
lines changed

7 files changed

+200
-23
lines changed

lib/taskinator/create_process_worker.rb

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ def perform
3131
process_args << { :uuid => uuid }
3232
end
3333

34+
# generate the process for the given definition and arguments
35+
# and enqueue the processes tasks
36+
# -> sequential processes - enqueues the first task
37+
# -> concurrent processes - enqueues all the tasks
3438
@definition._create_process_(false, *process_args).enqueue!
3539

3640
end

lib/taskinator/executor.rb

+6
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,15 @@ def options
2626
end
2727

2828
# helpers
29+
2930
def logger
3031
Taskinator.logger
3132
end
3233

34+
def error
35+
# task.process.error
36+
raise NoMethodError
37+
end
38+
3339
end
3440
end

lib/taskinator/process.rb

+22-5
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ def parent=(value)
5151
@key = nil # NB: invalidate memoized key
5252
end
5353

54+
def sub_process?
55+
defined?(@parent)
56+
end
57+
5458
def tasks
5559
@tasks ||= Tasks.new
5660
end
@@ -155,9 +159,6 @@ def complete!
155159
after_completed_tasks.each(&:enqueue!)
156160
end
157161

158-
# TODO: add retry method - to pick up from a failed task
159-
# e.g. like retrying a failed job in Resque Web
160-
161162
def tasks_completed?
162163
# TODO: optimize this
163164
tasks.all?(&:completed?)
@@ -185,8 +186,25 @@ def fail!(error)
185186
after_failed_tasks.each(&:enqueue!)
186187
end
187188

189+
#--------------------------------------------------
190+
191+
# TODO: add retry method - to pick up from a failed task
192+
# e.g. like retrying a failed job in Resque Web
193+
194+
def task_started(task)
195+
return if processing? || sub_process?
196+
197+
transition(:processing) do
198+
# enqueue before started tasks independently
199+
before_started_tasks.each(&:enqueue!)
200+
end
201+
end
202+
203+
def task_cancelled(task)
204+
cancel!
205+
end
206+
188207
def task_failed(task, error)
189-
# for now, fail this process
190208
fail!(error)
191209
end
192210

@@ -271,7 +289,6 @@ def enqueue
271289
end
272290
end
273291

274-
# this method only called in-process (usually from the console)
275292
def start
276293
if tasks.empty?
277294
complete! # weren't any tasks to start with

lib/taskinator/task.rb

+14-3
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ def start!
8383

8484
transition(:processing) do
8585
instrument('taskinator.task.processing', processing_payload) do
86+
# notify the process that this task has started
87+
process.task_started(self) if notify_process?
88+
8689
start
8790
end
8891
end
@@ -99,21 +102,27 @@ def paused?
99102
end
100103

101104
def complete!
105+
self.incr_completed if incr_count?
106+
102107
transition(:completed) do
103-
self.incr_completed if incr_count?
104108
instrument('taskinator.task.completed', completed_payload) do
105109
complete if respond_to?(:complete)
110+
106111
# notify the process that this task has completed
107112
process.task_completed(self) if notify_process?
108113
end
109114
end
110115
end
111116

112117
def cancel!
118+
self.incr_cancelled if incr_count?
119+
113120
transition(:cancelled) do
114-
self.incr_cancelled if incr_count?
115121
instrument('taskinator.task.cancelled', cancelled_payload) do
116122
cancel if respond_to?(:cancel)
123+
124+
# notify the process that this task has cancelled
125+
process.task_cancelled(self) if notify_process?
117126
end
118127
end
119128
end
@@ -123,10 +132,12 @@ def cancelled?
123132
end
124133

125134
def fail!(error)
135+
self.incr_failed if incr_count?
136+
126137
transition(:failed) do
127-
self.incr_failed if incr_count?
128138
instrument('taskinator.task.failed', failed_payload(error)) do
129139
fail(error) if respond_to?(:fail)
140+
130141
# notify the process that this task has failed
131142
process.task_failed(self, error) if notify_process?
132143
end

spec/support/test_definitions.rb

+57-15
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ def self.perform(*args)
66
end
77
end
88

9+
class TestTaskFailed < StandardError
10+
end
11+
912
module Support
1013

1114
def iterator(task_count, *args)
@@ -22,6 +25,22 @@ def iterator(task_count, *args)
2225
end
2326
end
2427

28+
def task_fail(*args)
29+
raise TestTaskFailed
30+
end
31+
32+
def task_before_started(*args)
33+
Taskinator.logger.info(">>> Executing before started task #{__method__} [#{uuid}]...")
34+
end
35+
36+
def task_after_completed(*args)
37+
Taskinator.logger.info(">>> Executing after completed task #{__method__} [#{uuid}]...")
38+
end
39+
40+
def task_after_failed(*args)
41+
Taskinator.logger.info(">>> Executing after failed task #{__method__} [#{uuid}]...")
42+
end
43+
2544
end
2645

2746
module Definition
@@ -46,12 +65,19 @@ module TaskBeforeStarted
4665
extend Taskinator::Definition
4766
include Support
4867

49-
define_process :task_count do
50-
before_started :task1, :queue => :foo
68+
define_process do
69+
before_started :task_before_started
5170

52-
for_each :iterator do
53-
task :task2, :queue => :foo
54-
end
71+
task :task1
72+
end
73+
end
74+
75+
module TaskBeforeStartedSubProcess
76+
extend Taskinator::Definition
77+
include Support
78+
79+
define_process do
80+
sub_process TaskBeforeStarted
5581
end
5682

5783
end
@@ -60,12 +86,20 @@ module TaskAfterCompleted
6086
extend Taskinator::Definition
6187
include Support
6288

63-
define_process :task_count do
64-
for_each :iterator do
65-
task :task1, :queue => :foo
66-
end
89+
define_process do
90+
task :task1
91+
92+
after_completed :task_after_completed
93+
end
94+
95+
end
96+
97+
module TaskAfterCompletedSubProcess
98+
extend Taskinator::Definition
99+
include Support
67100

68-
after_completed :task2, :queue => :foo
101+
define_process do
102+
sub_process TaskAfterCompleted
69103
end
70104

71105
end
@@ -74,12 +108,20 @@ module TaskAfterFailed
74108
extend Taskinator::Definition
75109
include Support
76110

77-
define_process :task_count do
78-
for_each :iterator do
79-
task :task1, :queue => :foo
80-
end
111+
define_process do
112+
task :task_fail
113+
114+
after_failed :task_after_failed
115+
end
81116

82-
after_failed :task2, :queue => :foo
117+
end
118+
119+
module TaskAfterFailedSubProcess
120+
extend Taskinator::Definition
121+
include Support
122+
123+
define_process do
124+
sub_process TaskAfterFailed
83125
end
84126

85127
end

spec/taskinator/persistence_spec.rb

+3
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,9 @@ def initialize
501501
TestDefinitions::EmptySequentialProcessTest,
502502
TestDefinitions::EmptyConcurrentProcessTest,
503503
TestDefinitions::NestedTask,
504+
TestDefinitions::TaskBeforeStarted,
505+
TestDefinitions::TaskAfterCompleted,
506+
TestDefinitions::TaskAfterFailed,
504507
].each do |definition|
505508

506509
describe "#{definition.name} expire immediately" do

spec/taskinator/test_definitions_spec.rb

+94
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,52 @@
9292
Taskinator.queue_adapter = :test_queue_worker
9393
end
9494

95+
context "before_started" do
96+
let(:definition) { TestDefinitions::TaskBeforeStarted }
97+
subject { definition.create_process }
98+
99+
it "invokes before_started task" do
100+
expect(subject.before_started_tasks.count).to eq(1)
101+
expect_any_instance_of(definition).to receive(:task_before_started)
102+
103+
expect {
104+
subject.enqueue!
105+
}.to change { Taskinator.queue.tasks.length }.by(2)
106+
end
107+
end
108+
109+
context "after_completed" do
110+
let(:definition) { TestDefinitions::TaskAfterCompleted }
111+
subject { definition.create_process }
112+
113+
it "invokes after_completed task" do
114+
expect(subject.after_completed_tasks.count).to eq(1)
115+
expect_any_instance_of(definition).to receive(:task_after_completed)
116+
117+
expect {
118+
subject.enqueue!
119+
}.to change { Taskinator.queue.tasks.length }.by(2)
120+
end
121+
end
122+
123+
context "after_failed" do
124+
let(:definition) { TestDefinitions::TaskAfterFailed }
125+
subject { definition.create_process }
126+
127+
it "invokes after_failed task" do
128+
expect(subject.after_failed_tasks.count).to eq(1)
129+
expect_any_instance_of(definition).to receive(:task_after_failed)
130+
131+
expect {
132+
begin
133+
subject.enqueue!
134+
rescue TestDefinitions::TestTaskFailed
135+
# ignore error
136+
end
137+
}.to change { Taskinator.queue.tasks.length }.by(2)
138+
end
139+
end
140+
95141
context "empty subprocesses" do
96142

97143
context "sequential" do
@@ -133,6 +179,54 @@
133179
end
134180

135181
end
182+
183+
context "subprocesses" do
184+
185+
context "before_started" do
186+
let(:definition) { TestDefinitions::TaskBeforeStartedSubProcess }
187+
subject { definition.create_process }
188+
189+
it "invokes before_started task" do
190+
expect_any_instance_of(definition).to receive(:task_before_started)
191+
192+
expect {
193+
subject.enqueue!
194+
}.to change { Taskinator.queue.tasks.length }.by(1)
195+
end
196+
end
197+
198+
context "after_completed" do
199+
let(:definition) { TestDefinitions::TaskAfterCompletedSubProcess }
200+
subject { definition.create_process }
201+
202+
it "invokes after_completed task" do
203+
expect_any_instance_of(definition).to receive(:task_after_completed)
204+
205+
expect {
206+
subject.enqueue!
207+
}.to change { Taskinator.queue.tasks.length }.by(2)
208+
end
209+
end
210+
211+
context "after_failed" do
212+
let(:definition) { TestDefinitions::TaskAfterFailedSubProcess }
213+
subject { definition.create_process }
214+
215+
it "invokes after_failed task" do
216+
expect_any_instance_of(definition).to receive(:task_after_failed)
217+
218+
expect {
219+
begin
220+
subject.enqueue!
221+
rescue TestDefinitions::TestTaskFailed
222+
# ignore error
223+
end
224+
}.to change { Taskinator.queue.tasks.length }.by(2)
225+
end
226+
end
227+
228+
end
229+
136230
end
137231

138232
describe "statuses" do

0 commit comments

Comments
 (0)