-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathclient.go
257 lines (207 loc) · 6.08 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
/*
* chronos-client
* Copyright (c) 2015 Yieldbot, Inc.
* For the full copyright and license information, please view the LICENSE.txt file.
*/
// Package client provides Chronos operations
package client
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
)
// Client represents the Chronos client interface
type Client struct {
Client *http.Client
URL string
ProxyURL *url.URL
}
// Jobs returns the Chronos jobs
func (cl Client) Jobs() ([]Job, error) {
// Get jobs
req, err := http.NewRequest("GET", cl.chronosURL()+"/scheduler/jobs", nil)
res, err := cl.doRequest(req)
if err != nil {
return nil, errors.New("failed to fetch jobs due to " + err.Error())
}
// Parse jobs
var jobs []Job
if err = json.Unmarshal(res, &jobs); err != nil {
return nil, errors.New("failed to unmarshal JSON data due to " + err.Error())
}
return jobs, nil
}
// PrintJobs prints the Chronos jobs
func (cl Client) PrintJobs(pretty bool) error {
// Get jobs
jobs, err := cl.Jobs()
if err != nil {
return err
}
// Parse jobs
var buf []byte
// If pretty is true then
if pretty {
buf, err = json.MarshalIndent(jobs, "", " ")
} else {
// Otherwise just parse it
buf, err = json.Marshal(jobs)
}
if err != nil {
return err
}
os.Stdout.Write(buf)
return nil
}
// AddJob adds a Chronos job
func (cl Client) AddJob(jsonContent string) (bool, error) {
// Check job
buf := []byte(jsonContent)
var job Job
if err := json.Unmarshal(buf, &job); err != nil {
return false, errors.New("failed to unmarshal JSON data due to " + err.Error())
}
// Add job
req, err := http.NewRequest("POST", cl.chronosURL()+"/scheduler/iso8601", bytes.NewBuffer(buf))
req.Header.Set("Content-Type", "application/json")
_, err = cl.doRequest(req)
if err != nil {
return false, errors.New("failed to add job due to " + err.Error())
}
return true, nil
}
// AddDepJob adds a Chronos dependent job
func (cl Client) AddDepJob(jsonContent string) (bool, error) {
// Check job
buf := []byte(jsonContent)
var job Job
if err := json.Unmarshal(buf, &job); err != nil {
return false, errors.New("failed to unmarshal JSON data due to " + err.Error())
}
// Add job
req, err := http.NewRequest("POST", cl.chronosURL()+"/scheduler/dependency", bytes.NewBuffer(buf))
req.Header.Set("Content-Type", "application/json")
_, err = cl.doRequest(req)
if err != nil {
return false, errors.New("failed to add dependent job due to " + err.Error())
}
return true, nil
}
// RunJob runs a Chronos job
func (cl Client) RunJob(jobName, args string) (bool, error) {
// Check job
if jobName == "" {
return false, errors.New("invalid job name")
}
query := jobName
if args != "" {
query += fmt.Sprintf("?arguments=%s", args)
}
// Run job
req, err := http.NewRequest("PUT", cl.chronosURL()+"/scheduler/job/"+query, nil)
res, err := cl.doRequest(req)
if bytes.Index(res, []byte("not found")) != -1 {
return true, errors.New(jobName + " job couldn't be found")
} else if err != nil {
return false, errors.New("failed to run job due to " + err.Error())
}
return true, nil
}
// KillJobTasks kills the Chronos job tasks
func (cl Client) KillJobTasks(jobName string) (bool, error) {
// Check job
if jobName == "" {
return false, errors.New("invalid job name")
}
// Kill job tasks
req, err := http.NewRequest("DELETE", cl.chronosURL()+"/scheduler/task/kill/"+jobName, nil)
_, err = cl.doRequest(req)
if err != nil && strings.Index(err.Error(), "bad response") != -1 {
return true, errors.New(jobName + " job couldn't be found")
} else if err != nil {
return false, errors.New("failed to kill tasks due to " + err.Error())
}
return true, nil
}
// DeleteJob deletes a Chronos job
func (cl Client) DeleteJob(jobName string) (bool, error) {
// Check job
if jobName == "" {
return false, errors.New("invalid job name")
}
// Delete job
req, err := http.NewRequest("DELETE", cl.chronosURL()+"/scheduler/job/"+jobName, nil)
res, err := cl.doRequest(req)
if err != nil {
return false, errors.New("failed to delete job due to " + err.Error())
} else if bytes.Index(res, []byte("not found")) != -1 {
//if strings.Index(string(res), "not found") != -1 {
return true, errors.New(jobName + " job couldn't be found")
}
return true, nil
}
// UpdateJobTaskProgress updates a Chronos job task progress
func (cl Client) UpdateJobTaskProgress(jobName, taskID, jsonContent string) (bool, error) {
// Check job and task
if jobName == "" {
return false, errors.New("invalid job name")
}
if taskID == "" {
return false, errors.New("invalid task id")
}
// Update job task progress
req, err := http.NewRequest("POST", cl.chronosURL()+"/scheduler/job/"+jobName+"/task/"+taskID+"/progress", bytes.NewBuffer([]byte(jsonContent)))
req.Header.Set("Content-Type", "application/json")
_, err = cl.doRequest(req)
if err != nil {
return false, errors.New("failed to update job task progress due to " + err.Error())
}
return true, nil
}
// DepGraph returns the Chronos dependency Graph in the dotfile format
func (cl Client) DepGraph() (string, error) {
// Get the graph
req, err := http.NewRequest("GET", cl.chronosURL()+"/scheduler/graph/dot", nil)
res, err := cl.doRequest(req)
if err != nil {
return "", errors.New("failed to fetch graph due to " + err.Error())
}
return string(res), nil
}
// doRequest makes a request to Chronos REST API
func (cl Client) doRequest(req *http.Request) ([]byte, error) {
// Init a client
client := cl.Client
if cl.ProxyURL != nil {
client = &http.Client{Transport: &http.Transport{Proxy: http.ProxyURL(cl.ProxyURL)}}
} else {
if cl.Client == nil {
client = &http.Client{}
}
}
// Do request
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Read data
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return data, errors.New("bad response: " + fmt.Sprintf("%d", resp.StatusCode))
}
return data, nil
}
// chronosURL returns the Chronos url
func (cl Client) chronosURL() string {
return strings.TrimSuffix(cl.URL, "/")
}