forked from zerodha/dungbeetle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobber.go
212 lines (178 loc) · 5.72 KB
/
jobber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"sync"
"time"
machinery "github.com/RichardKnop/machinery/v1"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/knadh/sql-jobber/models"
uuid "github.com/satori/go.uuid"
)
var (
jobContexts = make(map[string]context.CancelFunc)
jobMutex = sync.RWMutex{}
)
// createJobSignature creates and returns a machinery tasks.Signature{} from the given job params.
func createJobSignature(j models.JobReq, taskName string, ttl int, jobber *Jobber) (tasks.Signature, error) {
task, ok := jobber.Tasks[taskName]
if !ok {
return tasks.Signature{}, fmt.Errorf("unrecognized task: %s", taskName)
}
// Check if a job with the same ID is already running.
if s, err := jobber.Machinery.GetBackend().GetState(j.JobID); err == nil && !s.IsCompleted() {
return tasks.Signature{}, fmt.Errorf("job '%s' is already running", j.JobID)
}
// If there's no job_id, we generate one. This is because
// the ID Machinery generates is not made available inside the
// actual task. So, we generate the ID and pass it as an argument
// to the task itself.
if j.JobID == "" {
j.JobID = fmt.Sprintf("job_%v", uuid.NewV4())
}
// Task arguments.
args := append([]tasks.Arg{
// First two arguments have to be jobID and taskName.
// Machinery will refelect on these and pass them as arguments
// to the callback registered using RegisterTask() when a task
// is executed.
{Type: "string", Value: j.JobID},
{Type: "string", Value: taskName},
{Type: "int", Value: ttl},
}, sliceToTaskArgs(j.Args)...)
var eta *time.Time
if j.ETA != "" {
e, err := time.Parse("2006-01-02 15:04:05", j.ETA)
if err != nil {
return tasks.Signature{}, fmt.Errorf("error parsing ETA: %v", err)
}
eta = &e
}
// If there's no queue in the request, use the one attached to the task,
// if there's any (Machinery will use the default queue otherwise).
if j.Queue == "" {
j.Queue = task.Queue
}
return tasks.Signature{
Name: taskName,
UUID: j.JobID,
RoutingKey: j.Queue,
RetryCount: j.Retries,
Args: args,
ETA: eta,
}, nil
}
// executeTask executes an SQL statement job and inserts the results into the result backend.
func executeTask(jobID, taskName string, ttl time.Duration, args []interface{}, task *Task, jobber *Jobber) (int64, error) {
// If the job's deleted, stop.
if _, err := jobber.Machinery.GetBackend().GetState(jobID); err != nil {
return 0, errors.New("the job was canceled")
}
// Execute the query.
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
jobMutex.Lock()
delete(jobContexts, jobID)
jobMutex.Unlock()
}()
jobMutex.Lock()
jobContexts[jobID] = cancel
jobMutex.Unlock()
var (
rows *sql.Rows
err error
)
// Prepared query.
if task.Stmt != nil {
rows, err = task.Stmt.QueryContext(ctx, args...)
} else {
_, db := task.DBs.GetRandom()
rows, err = db.QueryContext(ctx, task.Raw, args...)
}
if err != nil {
if err == context.Canceled {
return 0, errors.New("the job was canceled")
}
return 0, fmt.Errorf("task SQL query execution failed: %v", err)
}
defer rows.Close()
return writeResults(jobID, task, ttl, rows, jobber)
}
// writeResults writes results from an SQL query to a result backend.
func writeResults(jobID string, task *Task, ttl time.Duration, rows *sql.Rows, jobber *Jobber) (int64, error) {
var numRows int64
// Result backend.
name, backend := task.ResultBackends.GetRandom()
jobber.Logger.Printf("sending results form '%s' to '%s'", jobID, name)
w, err := backend.NewResultSet(jobID, task.Name, ttl)
if err != nil {
return numRows, fmt.Errorf("error writing columns to result backend: %v", err)
}
defer w.Close()
// Get the columns in the results.
cols, err := rows.Columns()
if err != nil {
return numRows, err
}
numCols := len(cols)
// If the column types for this particular taskName
// have not been registered, do it.
if !w.IsColTypesRegistered() {
colTypes, err := rows.ColumnTypes()
if err != nil {
return numRows, err
}
w.RegisterColTypes(cols, colTypes)
}
// Write the results columns / headers.
if err := w.WriteCols(cols); err != nil {
return numRows, fmt.Errorf("error writing columns to result backend: %v", err)
}
// Gymnastics to read arbitrary types from the row.
var (
resCols = make([]interface{}, numCols)
resPointers = make([]interface{}, numCols)
)
for i := 0; i < numCols; i++ {
resPointers[i] = &resCols[i]
}
// Scan each row and write to the results backend.
for rows.Next() {
if err := rows.Scan(resPointers...); err != nil {
return numRows, err
}
if err := w.WriteRow(resCols); err != nil {
return numRows, fmt.Errorf("error writing row to result backend: %v", err)
}
numRows++
}
if err := w.Flush(); err != nil {
return numRows, fmt.Errorf("error flushing results to result backend: %v", err)
}
return numRows, nil
}
// connectJobServer creates and returns a Machinery job server
// while registering the given SQL queries as tasks.
func connectJobServer(jobber *Jobber, cfg *config.Config, queries Tasks) (*machinery.Server, error) {
server, err := machinery.NewServer(cfg)
if err != nil {
return nil, err
}
// Register the tasks with the query names.
for name, query := range queries {
server.RegisterTask(string(name), func(q Task) taskFunc {
return func(jobID, taskName string, ttl int, args ...interface{}) (int64, error) {
// Check if the job's been deleted.
if _, err := jobber.Machinery.GetBackend().GetState(jobID); err != nil {
return 0, fmt.Errorf("Skipping deleted job: %v", err)
}
return executeTask(jobID, taskName, time.Duration(ttl)*time.Second, args, &q, jobber)
}
}(query))
}
return server, nil
}