@@ -40,6 +40,7 @@ if is_http then
40
40
ngx_pipe = require (" ngx.pipe" )
41
41
events = require (" resty.worker.events" )
42
42
end
43
+ local resty_lock = require (" resty.lock" )
43
44
local resty_signal = require " resty.signal"
44
45
local bit = require (" bit" )
45
46
local band = bit .band
@@ -63,11 +64,18 @@ local type = type
63
64
64
65
65
66
local events_list
66
- local lrucache = core .lrucache .new ({
67
- type = " plugin" ,
68
- invalid_stale = true ,
69
- ttl = helper .get_conf_token_cache_time (),
70
- })
67
+
68
+ local function new_lrucache ()
69
+ return core .lrucache .new ({
70
+ type = " plugin" ,
71
+ invalid_stale = true ,
72
+ ttl = helper .get_conf_token_cache_time (),
73
+ })
74
+ end
75
+ local lrucache = new_lrucache ()
76
+
77
+ local shdict_name = " ext-plugin"
78
+ local shdict = ngx .shared [shdict_name ]
71
79
72
80
local schema = {
73
81
type = " object" ,
@@ -293,14 +301,74 @@ local function handle_extra_info(ctx, input)
293
301
end
294
302
295
303
304
+ local function fetch_token (key )
305
+ if shdict then
306
+ return shdict :get (key )
307
+ else
308
+ core .log .error (' shm "ext-plugin" not found' )
309
+ return nil
310
+ end
311
+ end
312
+
313
+
314
+ local function store_token (key , token )
315
+ if shdict then
316
+ local exp = helper .get_conf_token_cache_time ()
317
+ -- early expiry, lrucache in critical state sends prepare_conf_req as original behaviour
318
+ exp = exp * 0.9
319
+ local success , err , forcible = shdict :set (key , token , exp )
320
+ if not success then
321
+ core .log .error (" ext-plugin:failed to set conf token, err: " , err )
322
+ end
323
+ if forcible then
324
+ core .log .warn (" ext-plugin:set valid items forcibly overwritten" )
325
+ end
326
+ else
327
+ core .log .error (' shm "ext-plugin" not found' )
328
+ end
329
+ end
330
+
331
+
332
+ local function flush_token ()
333
+ if shdict then
334
+ core .log .warn (" flush conf token in shared dict" )
335
+ shdict :flush_all ()
336
+ else
337
+ core .log .error (' shm "ext-plugin" not found' )
338
+ end
339
+ end
340
+
341
+
296
342
local rpc_call
297
343
local rpc_handlers = {
298
344
nil ,
299
345
function (conf , ctx , sock , unique_key )
346
+ local token = fetch_token (unique_key )
347
+ if token then
348
+ core .log .info (" fetch token from shared dict, token: " , token )
349
+ return token
350
+ end
351
+
352
+ local lock , err = resty_lock :new (shdict_name )
353
+ if not lock then
354
+ return nil , " failed to create lock: " .. err
355
+ end
356
+
357
+ local elapsed , err = lock :lock (" prepare_conf" )
358
+ if not elapsed then
359
+ return nil , " failed to acquire the lock: " .. err
360
+ end
361
+
362
+ local token = fetch_token (unique_key )
363
+ if token then
364
+ lock :unlock ()
365
+ core .log .info (" fetch token from shared dict, token: " , token )
366
+ return token
367
+ end
368
+
300
369
builder :Clear ()
301
370
302
371
local key = builder :CreateString (unique_key )
303
-
304
372
local conf_vec
305
373
if conf .conf then
306
374
local len = # conf .conf
@@ -331,23 +399,30 @@ local rpc_handlers = {
331
399
332
400
local ok , err = send (sock , constants .RPC_PREPARE_CONF , builder :Output ())
333
401
if not ok then
402
+ lock :unlock ()
334
403
return nil , " failed to send RPC_PREPARE_CONF: " .. err
335
404
end
336
405
337
406
local ty , resp = receive (sock )
338
407
if ty == nil then
408
+ lock :unlock ()
339
409
return nil , " failed to receive RPC_PREPARE_CONF: " .. resp
340
410
end
341
411
342
412
if ty ~= constants .RPC_PREPARE_CONF then
413
+ lock :unlock ()
343
414
return nil , " failed to receive RPC_PREPARE_CONF: unexpected type " .. ty
344
415
end
345
416
346
417
local buf = flatbuffers .binaryArray .New (resp )
347
418
local pcr = prepare_conf_resp .GetRootAsResp (buf , 0 )
348
- local token = pcr :ConfToken ()
419
+ token = pcr :ConfToken ()
349
420
350
421
core .log .notice (" get conf token: " , token , " conf: " , core .json .delay_encode (conf .conf ))
422
+ store_token (unique_key , token )
423
+
424
+ lock :unlock ()
425
+
351
426
return token
352
427
end ,
353
428
function (conf , ctx , sock , entry )
@@ -471,7 +546,6 @@ local rpc_handlers = {
471
546
local buf = flatbuffers .binaryArray .New (resp )
472
547
local call_resp = http_req_call_resp .GetRootAsResp (buf , 0 )
473
548
local action_type = call_resp :ActionType ()
474
-
475
549
if action_type == http_req_call_action .Stop then
476
550
local action = call_resp :Action ()
477
551
local stop = http_req_call_stop .New ()
@@ -588,15 +662,14 @@ rpc_call = function (ty, conf, ctx, ...)
588
662
end
589
663
590
664
591
- local function create_lrucache ()
665
+ local function recreate_lrucache ()
666
+ flush_token ()
667
+
592
668
if lrucache then
593
669
core .log .warn (" flush conf token lrucache" )
594
670
end
595
671
596
- lrucache = core .lrucache .new ({
597
- type = " plugin" ,
598
- ttl = helper .get_conf_token_cache_time (),
599
- })
672
+ lrucache = new_lrucache ()
600
673
end
601
674
602
675
@@ -620,7 +693,7 @@ function _M.communicate(conf, ctx, plugin_name)
620
693
end
621
694
622
695
core .log .warn (" refresh cache and try again" )
623
- create_lrucache ()
696
+ recreate_lrucache ()
624
697
end
625
698
626
699
core .log .error (err )
@@ -717,7 +790,7 @@ function _M.init_worker()
717
790
)
718
791
719
792
-- flush cache when runner exited
720
- events .register (create_lrucache , events_list ._source , events_list .runner_exit )
793
+ events .register (recreate_lrucache , events_list ._source , events_list .runner_exit )
721
794
722
795
-- note that the runner is run under the same user as the Nginx master
723
796
if process .type () == " privileged agent" then
0 commit comments