This repository has been archived by the owner on Apr 16, 2018. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathmicroservice.coffee
416 lines (364 loc) · 14.9 KB
/
microservice.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
###############################################################################
#
# SageMathCloud: A collaborative web-based interface to Sage, IPython, LaTeX and the Terminal.
#
# Copyright (C) 2014, William Stein
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
DEFAULT_PORT = 6000
DEFAULT_HOST = '127.0.0.1'
SECRET_TOKEN_LENGTH = 128
DEFAULT_TIMEOUT = 10
{EventEmitter} = require('events')
fs = require("fs")
net = require('net')
async = require('async')
program = require('commander')
daemon = require('start-stop-daemon')
winston = require('winston')
misc = require('misc')
misc_node = require('misc_node')
message = require('message')
uuid = require('node-uuid')
{defaults, required} = misc
{EventEmitter} = require('events')
if not process.env.SALVUS_TOKENS?
throw "Update and source salvus-env"
token_filename = (name) -> "#{process.env.SALVUS_TOKENS}/#{name}.token"
######################################################################
# CLIENT
######################################################################
exports.client = (opts) ->
opts = defaults opts,
cb : required # cb(err, instance)
port : DEFAULT_PORT
host : DEFAULT_HOST
debug : true
new exports.Client(opts)
class exports.Client extends EventEmitter
constructor : (opts) ->
opts = defaults opts,
port : DEFAULT_PORT
host : DEFAULT_HOST
name : 'microservice'
debug : false
cb : required
if opts.debug
winston.remove(winston.transports.Console)
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
@token_file = token_filename(opts.name)
@port = opts.port
@host = opts.host
@name = opts.name
misc.call_lock(obj:@)
t = misc.mswalltime()
async.series([
(cb) =>
misc.retry_until_success
f : @read_token
cb : cb
(cb) =>
@connect(cb)
], (err) =>
if err
@dbg("constructor", "failed to create: #{err}") # should be impossible
opts.cb(err)
else
@dbg("constructor", "connected Client in #{misc.mswalltime(t)}ms")
opts.cb(undefined, @)
)
dbg: (f, m) =>
winston.debug("#{@name}Client.#{f}: #{misc.trunc(misc.to_json(m),300)}")
read_token: (cb) =>
@dbg("read_token")
fs.readFile @token_file, (err, buf) =>
if err
@dbg("read_token", "error = #{err}")
cb(err)
else
@dbg("read_token", "got it")
@secret_token = buf.toString('base64')
cb()
connect: (cb) =>
@dbg("connect")
f = (cb) =>
misc.retry_until_success
start_delay : 1000
max_delay : 30000 # milliseconds -- stop increasing time at this point
factor : 1.4 # multiply delay by this each time
f : @_connect
cb : cb
@_call_with_lock(f, cb)
_connect: (cb) =>
@dbg("_connect")
@socket = undefined
misc_node.connect_to_locked_socket
port : @port
host : @host
token : @secret_token
timeout : 5
cb : (err, socket) =>
if err
@dbg("_connect", "error -- #{err}")
cb(err)
else
@dbg("_connect", "success -- now adding listeners")
misc_node.enable_mesg(socket, 'connection_to_syncstring_server')
@socket = socket
socket.on 'mesg', (type, mesg) =>
if type == 'json'
@dbg("receive mesg", misc.trunc(misc.to_json(mesg),300))
@emit("mesg_#{mesg.event}", mesg)
else
@dbg("receive mesg", "mesg of unknown type #{type} ignored")
reconnect = () =>
@emit('disconnect') # tell listeners that right now not connected
socket.removeAllListeners()
@connect()
socket.on('end', reconnect)
socket.on('close', reconnect)
socket.on('error', reconnect)
@on 'mesg_ping', (mesg) =>
@send_mesg
mesg : message.pong(id:mesg.id)
call : false
@emit('connect') # tell listeners that we are connected
cb()
send_mesg: (opts) =>
opts = defaults opts,
mesg : required
timeout : DEFAULT_TIMEOUT
call : false # if true, tags mesg with id and waits for a response with the same id
cb : undefined
if opts.call and not opts.mesg.id?
opts.mesg.id = uuid.v4()
@dbg("send_mesg", opts.mesg)
@socket.write_mesg('json', opts.mesg)
if opts.call
@socket.recv_mesg
type : 'json'
id : opts.mesg.id
timeout : opts.timeout
cb : (resp) =>
if resp.event == 'error'
opts.cb?(resp.error)
else
opts.cb?(undefined, resp)
else
opts.cb?()
call: (opts) =>
opts.call = true
@send_mesg(opts)
ping: (cb) =>
t0 = misc.mswalltime()
@call
mesg : message.ping()
cb : (err, resp) =>
if err
@dbg("ping", "error -- #{err}")
cb(err)
else
@dbg("ping", "pong: #{misc.mswalltime(t0)}ms")
cb()
test0: (cb) =>
@call
mesg : {event:'test0'}
cb : (err, resp) =>
@dbg('test0', misc.to_json(resp))
test1: (n, cb) =>
b = new Buffer(n)
b.fill('a')
big = b.toString()
t = misc.mswalltime()
@call
mesg : {event:'ping', big:big}
cb : (err, resp) =>
@dbg('test1', "totat time: #{misc.mswalltime(t)}ms")
######################################################################
# Network SERVER
######################################################################
class exports.Server extends EventEmitter
constructor : (opts) ->
opts = defaults opts,
port : DEFAULT_PORT
host : DEFAULT_HOST
name : 'microservice'
cb : undefined
@dbg("constructor")
@port = opts.port
@host = opts.host
@name = opts.name
@init_event_handlers()
token_file = token_filename(opts.name)
async.series([
(cb) =>
@dbg("constructor","reading token file")
fs.exists token_file, (exists) =>
if not exists
cb(); return
fs.readFile token_file, (err, buf) =>
if err
cb() # will generate in next step
else
@secret_token = buf.toString('base64')
cb()
(cb) =>
if @secret_token?
cb()
else
@dbg("constructor","generating token")
require('crypto').randomBytes SECRET_TOKEN_LENGTH, (ex, buf) =>
@secret_token = buf.toString('base64')
fs.writeFile(token_file, buf, cb)
(cb) =>
@dbg("constructor","starting tcp server")
@start_tcp_server(cb)
], (err) =>
if err
@dbg("constructor","Failed to start server: #{err}")
opts.cb?(err)
else
@dbg("constructor","Started server")
opts.cb?(undefined, @)
)
init_event_handlers: () =>
@on 'mesg_ping', (socket, mesg) =>
@send_mesg
socket : socket
mesg : message.pong(id:mesg.id)
@on 'mesg_test0', (socket, mesg) =>
@dbg("test0", "mesg_test0")
@send_mesg
socket : socket
mesg : message.pong(id:mesg.id)
f = () =>
@dbg("test0", "f")
@send_mesg
socket : socket
mesg : {event:'test0'}
setTimeout(f, 1000)
g = () =>
@dbg("test0", "g")
t0 = misc.mswalltime()
@call
socket : socket
mesg : message.ping()
cb : (err, resp) =>
if err
@dbg("test0", "error -- #{err}")
else
@dbg("test0", "pong: #{misc.mswalltime(t0)}ms")
setTimeout(g, 1000)
dbg: (f, m) =>
winston.debug("#{@name}Server.#{f}: #{misc.trunc(misc.to_json(m),300)}")
start_tcp_server: (cb) =>
server = net.createServer (socket) =>
@dbg("tcp_server", "received connection")
misc_node.unlock_socket socket, @secret_token, (err) =>
if err
@dbg("tcp_server", "error unlocking socket -- #{err}")
winston.debug(err)
else
socket.id = uuid.v4()
misc_node.enable_mesg(socket)
@dbg("tcp_server", "unlocked socket -- id=#{socket.id}")
socket.on 'mesg', (type, mesg) =>
if type == 'json'
@dbg("socket (id=#{socket.id})", misc.trunc(misc.to_json(mesg),200))
@emit("mesg_#{mesg.event}", socket, mesg)
else
@dbg("mesg", "mesg of unknown type #{type} ignored")
disconnect = () =>
@dbg("socket (id=#{socket.id})", "disconnect")
@emit('close', socket)
socket.removeAllListeners()
socket.on('end', disconnect)
socket.on('close', disconnect)
socket.on('error', disconnect)
try
server.listen @port, @host, () =>
@dbg("tcp_server", "listening on #{@host}:#{@port}")
cb()
catch err
cb(err)
send_mesg: (opts) =>
opts = defaults opts,
socket : required
mesg : required
timeout : DEFAULT_TIMEOUT
call : false # if true, tags mesg with id and waits for a response with the same id
cb : undefined
@dbg("send_mesg(id=#{opts.socket.id}, call=#{opts.call})", opts.mesg)
if opts.call and not opts.mesg.id?
opts.mesg.id = uuid.v4()
opts.socket.write_mesg('json', opts.mesg)
if opts.call
opts.socket.recv_mesg
type : 'json'
id : opts.mesg.id
timeout : opts.timeout
cb : (resp) =>
if resp.event == 'error'
opts.cb?(resp.error)
else
opts.cb?(undefined, resp)
else
opts.cb?(undefined)
call: (opts) =>
opts.call = true
@send_mesg(opts)
# Process command line arguments
exports.cli = (opts) ->
opts = defaults opts,
server_class : exports.Server
default_port : DEFAULT_PORT
default_host : DEFAULT_HOST
#console.log("exports.cli -- #{misc.to_json(opts)}")
program.usage('[start/stop/restart/status] [options]')
.option('--port <n>', "port to listen on (default: #{opts.default_port})", parseInt)
.option('--host [string]', 'host of interface to bind to (default: "#{opts.default_host}")', String, "127.0.0.1")
.option('--debug [string]', 'logging debug level (default: "debug"); "" for no debugging output)', String, 'debug')
.option('--pidfile [string]', 'store pid in this file', String)
.option('--logfile [string]', 'write log to this file (default: "data/logs/program._name.log")', String)
.option('--database_nodes <string,string,...>', 'comma separated list of ip addresses of all database nodes in the cluster', String, 'localhost')
.option('--keyspace [string]', 'Cassandra keyspace to use (default: "salvus")', String, 'salvus')
.parse(process.argv)
if program._name != 'undefined'
if not program.port?
program.port = opts.default_port
if not program.host?
program.host = opts.default_host
if not program.pidfile?
program.pidfile = "data/pids/#{program._name}.pid"
if not program.logfile?
program.logfile = "data/logs/#{program._name}.log"
# run as a server/daemon (otherwise, imported as a library)
if program.debug
winston.remove(winston.transports.Console)
winston.add(winston.transports.Console, {level: program.debug, timestamp:true, colorize:true})
process.addListener "uncaughtException", (err) ->
winston.debug("BUG ****************************************************************************")
winston.debug("Uncaught exception: " + err)
winston.debug(err.stack)
winston.debug("BUG ****************************************************************************")
console.log("#{program._name} #{program.args[0]} server on port #{program.port}...")
daemon({pidFile:program.pidfile, outFile:program.logfile, errFile:program.logfile},
() ->
winston.debug("start_server")
new opts.server_class(port : program.port, host : program.host)
)
if not module.parent?
exports.cli()