diff --git a/Makefile b/Makefile index 3d11ed94c038..0c556559df95 100644 --- a/Makefile +++ b/Makefile @@ -382,8 +382,11 @@ install: runtime $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/ai-rag/vector-search $(ENV_INSTALL) apisix/plugins/ai-rag/vector-search/*.lua $(ENV_INST_LUADIR)/apisix/plugins/ai-rag/vector-search - $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/mcp + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/mcp/broker + $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/mcp/transport $(ENV_INSTALL) apisix/plugins/mcp/*.lua $(ENV_INST_LUADIR)/apisix/plugins/mcp + $(ENV_INSTALL) apisix/plugins/mcp/broker/*.lua $(ENV_INST_LUADIR)/apisix/plugins/mcp/broker + $(ENV_INSTALL) apisix/plugins/mcp/transport/*.lua $(ENV_INST_LUADIR)/apisix/plugins/mcp/transport $(ENV_INSTALL) bin/apisix $(ENV_INST_BINDIR)/apisix diff --git a/apisix/plugins/mcp-bridge.lua b/apisix/plugins/mcp-bridge.lua index 0d241564f3eb..e2a851afc3ea 100644 --- a/apisix/plugins/mcp-bridge.lua +++ b/apisix/plugins/mcp-bridge.lua @@ -15,18 +15,14 @@ -- limitations under the License. -- local unpack = unpack -local type = type -local tostring = tostring local ngx = ngx -local re_match = ngx.re.match +local thread_spawn = ngx.thread.spawn +local thread_kill = ngx.thread.kill local resty_signal = require("resty.signal") local core = require("apisix.core") local pipe = require("ngx.pipe") -local mcp_session_manager = require("apisix.plugins.mcp.session") - -local V241105_ENDPOINT_SSE = "sse" -local V241105_ENDPOINT_MESSAGE = "message" +local mcp_server_wrapper = require("apisix.plugins.mcp.server_wrapper") local schema = { type = "object", @@ -68,171 +64,108 @@ function _M.check_schema(conf, schema_type) end -local function sse_send(id, event, data) - local ok, err = ngx.print((id and "id: " .. id .. "\n" or "") .. - "event: " .. event .. "\ndata: " .. data .. "\n\n") - if not ok then - return ok, "failed to write buffer: " .. err - end - return ngx.flush(true) -end - - -local function sse_handler(conf, ctx) - -- TODO: recover by Last-Event-ID - local session = mcp_session_manager.new() - - -- spawn subprocess - local proc, err = pipe.spawn({conf.command, unpack(conf.args or {})}) - if not proc then - core.log.error("failed to spawn mcp process: ", err) - return 500 - end - proc:set_timeouts(nil, 100, 100) - - core.response.set_header("Content-Type", "text/event-stream") - core.response.set_header("Cache-Control", "no-cache") - - -- send endpoint event to advertise the message endpoint - sse_send(nil, "endpoint", conf.base_uri .. "/message?sessionId=" .. session.id) - - local stdout_partial, stderr_partial - - -- enter loop - while true do - if session:session_need_ping() then - local next_ping_id, err = session:session_next_ping_id() - if not next_ping_id then - core.log.error("session ", session.id, " exit, failed to get next ping id: ", err) - break - end - local ok, err = sse_send(nil, "message", - '{"jsonrpc": "2.0","method": "ping","id":"ping:'..next_ping_id..'"}') - if not ok then - core.log.info("session ", session.id, " exit, failed to send ping message: ", err) - break - end +local function on_connect(conf, ctx) + return function(additional) + local proc, err = pipe.spawn({conf.command, unpack(conf.args or {})}) + if not proc then + core.log.error("failed to spawn mcp process: ", err) + return 500 end - if session:session_timed_out() then - core.log.info("session ", session.id, " exit, timed out") - break - end - - -- pop the message from client in the queue and send it to the mcp server - repeat - local queue_item, err = session:pop_message_queue() - if err then - core.log.info("session ", session.id, - " exit, failed to pop message from queue: ", err) - break - end - -- write task message to stdio - if queue_item and type(queue_item) == "string" then - core.log.info("session ", session.id, " send message to mcp server: ", queue_item) - proc:write(queue_item .. "\n") - end - until not queue_item - - -- read all the messages in stdout's pipe, line by line - -- if there is an incomplete message it is buffered and spliced before the next message - repeat - local line, _ - line, _, stdout_partial = proc:stdout_read_line() - if line then - local ok, err = sse_send(nil, "message", - stdout_partial and stdout_partial .. line or line) - if not ok then - core.log.info("session ", session.id, - " exit, failed to send response message: ", err) + proc:set_timeouts(nil, 100, 100) + ctx.mcp_bridge_proc = proc + + local server = additional.server + + -- ngx_pipe is a yield operation, so we no longer need + -- to explicitly yield to other threads by ngx_sleep + ctx.mcp_bridge_proc_event_loop = thread_spawn(function () + local stdout_partial, stderr_partial, need_exit + while true do + -- read all the messages in stdout's pipe, line by line + -- if there is an incomplete message it is buffered and + -- spliced before the next message + repeat + local line, _ + line, _, stdout_partial = proc:stdout_read_line() + if line then + local ok, err = server.transport:send( + stdout_partial and stdout_partial .. line or line + ) + if not ok then + core.log.info("session ", server.session_id, + " exit, failed to send response message: ", err) + need_exit = true + break + end + stdout_partial = nil -- luacheck: ignore + end + until not line + if need_exit then break end - stdout_partial = nil -- luacheck: ignore - end - until not line - - repeat - local line, _ - line, _, stderr_partial = proc:stderr_read_line() - if line then - local ok, err = sse_send(nil, "message", - '{"jsonrpc":"2.0","method":"notifications/stderr","params":{"content":"' - .. (stderr_partial and stderr_partial .. line or line) .. '"}}' - ) - if not ok then - core.log.info("session ", session.id, - " exit, failed to send response message: ", err) + + repeat + local line, _ + line, _, stderr_partial = proc:stderr_read_line() + if line then + local ok, err = server.transport:send( + '{"jsonrpc":"2.0","method":"notifications/stderr","params":{"content":"' + .. (stderr_partial and stderr_partial .. line or line) .. '"}}') + if not ok then + core.log.info("session ", server.session_id, + " exit, failed to send response message: ", err) + need_exit = true + break + end + stderr_partial = "" -- luacheck: ignore + end + until not line + if need_exit then break end - stderr_partial = "" -- luacheck: ignore end - until not line - end - - session:destroy() - - -- shutdown the subprocess - proc:shutdown("stdin") - proc:wait() - local _, err = proc:wait() -- check if process not exited then kill it - if err ~= "exited" then - proc:kill(resty_signal.signum("KILL") or 9) + end) end end -local function message_handler(conf, ctx) - local session_id = ctx.var.arg_sessionId - local session, err = mcp_session_manager.recover(session_id) - - if not session then - core.log.error("failed to recover session: ", err) - return 404 +local function on_client_message(conf, ctx) + return function(message, additional) + core.log.info("session ", additional.server.session_id, + " send message to mcp server: ", additional.raw) + ctx.mcp_bridge_proc:write(additional.raw .. "\n") end +end - local body = core.request.get_body(nil, ctx) - if not body then - return 400 - end - local body_json = core.json.decode(body) - if not body_json then - return 400 - end - if core.string.has_prefix(tostring(body_json.id), "ping") then --TODO check client pong - session:on_session_pong() - return 202 - end +local function on_disconnect(conf, ctx) + return function() + if ctx.mcp_bridge_proc_event_loop then + thread_kill(ctx.mcp_bridge_proc_event_loop) + ctx.mcp_bridge_proc_event_loop = nil + end - local ok, err = session:push_message_queue(body) - if not ok then - core.log.error("failed to add task to queue: ", err) - return 500 + local proc = ctx.mcp_bridge_proc + if proc then + proc:shutdown("stdin") + proc:wait() + local _, err = proc:wait() -- check if process not exited then kill it + if err ~= "exited" then + proc:kill(resty_signal.signum("KILL") or 9) + end + end end - - return 202 end function _M.access(conf, ctx) - local m, err = re_match(ctx.var.uri, "^" .. conf.base_uri .. "/(.*)", "jo") - if err then - core.log.info("failed to mcp base uri: ", err) - return 404 - end - local action = m and m[1] or false - if not action then - return 404 - end - - if action == V241105_ENDPOINT_SSE and core.request.get_method() == "GET" then - return sse_handler(conf, ctx) - end - - if action == V241105_ENDPOINT_MESSAGE and core.request.get_method() == "POST" then - return message_handler(conf, ctx) - end - - return 200 + return mcp_server_wrapper.access(conf, ctx, { + event_handler = { + on_connect = on_connect(conf, ctx), + on_client_message = on_client_message(conf, ctx), + on_disconnect = on_disconnect(conf, ctx), + }, + }) end diff --git a/apisix/plugins/mcp/broker/shared_dict.lua b/apisix/plugins/mcp/broker/shared_dict.lua new file mode 100644 index 000000000000..8e6bdb08ce5b --- /dev/null +++ b/apisix/plugins/mcp/broker/shared_dict.lua @@ -0,0 +1,89 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local type = type +local setmetatable = setmetatable +local ngx = ngx +local ngx_sleep = ngx.sleep +local thread_spawn = ngx.thread.spawn +local thread_kill = ngx.thread.kill +local shared_dict = ngx.shared["mcp-session"] -- TODO: rename to something like mcp-broker +local core = require("apisix.core") +local broker_utils = require("apisix.plugins.mcp.broker.utils") + +local _M = {} +local mt = { __index = _M } + + +local STORAGE_SUFFIX_QUEUE = ":queue" + + +function _M.new(opts) + return setmetatable({ + session_id = opts.session_id, + event_handler = {} + }, mt) +end + + +function _M.on(self, event, cb) + self.event_handler[event] = cb +end + + +function _M.push(self, message) + if not message then + return nil, "message is nil" + end + local ok, err = shared_dict:rpush(self.session_id .. STORAGE_SUFFIX_QUEUE, message) + if not ok then + return nil, "failed to push message to queue: " .. err + end + return true +end + + +function _M.start(self) + self.thread = thread_spawn(function() + while true do + local item, err = shared_dict:lpop(self.session_id .. STORAGE_SUFFIX_QUEUE) + if err then + core.log.info("session ", self.session_id, + " exit, failed to pop message from queue: ", err) + break + end + if item and type(item) == "string" + and type(self.event_handler[broker_utils.EVENT_MESSAGE]) == "function" then + self.event_handler[broker_utils.EVENT_MESSAGE]( + core.json.decode(item), { raw = item } + ) + end + + ngx_sleep(0.1) -- yield to other light threads + end + end) +end + + +function _M.close(self) + if self.thread then + thread_kill(self.thread) + self.thread = nil + end +end + + +return _M diff --git a/apisix/plugins/mcp/broker/utils.lua b/apisix/plugins/mcp/broker/utils.lua new file mode 100644 index 000000000000..ded12ae5156c --- /dev/null +++ b/apisix/plugins/mcp/broker/utils.lua @@ -0,0 +1,21 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local _M = {} + +_M.EVENT_MESSAGE = "message" + +return _M diff --git a/apisix/plugins/mcp/server.lua b/apisix/plugins/mcp/server.lua new file mode 100644 index 000000000000..995a795b069e --- /dev/null +++ b/apisix/plugins/mcp/server.lua @@ -0,0 +1,115 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local require = require +local setmetatable = setmetatable +local ngx = ngx +local ngx_sleep = ngx.sleep +local thread_spwan = ngx.thread.spawn +local thread_wait = ngx.thread.wait +local thread_kill = ngx.thread.kill +local core = require("apisix.core") +local broker_utils = require("apisix.plugins.mcp.broker.utils") + + +local _M = {} +local mt = { __index = _M } + + +_M.EVENT_CLIENT_MESSAGE = "event:client_message" + + +-- TODO: ping requester and handler +function _M.new(opts) + local session_id = opts.session_id or core.id.gen_uuid_v4() + + -- TODO: configurable broker type + local message_broker = require("apisix.plugins.mcp.broker.shared_dict").new({ + session_id = session_id, + }) + + -- TODO: configurable transport type + local transport = require("apisix.plugins.mcp.transport.sse").new() + + local obj = setmetatable({ + opts = opts, + session_id = session_id, + next_ping_id = 0, + transport = transport, + message_broker = message_broker, + event_handler = {}, + need_exit = false, + }, mt) + + message_broker:on(broker_utils.EVENT_MESSAGE, function (message, additional) + if obj.event_handler[_M.EVENT_CLIENT_MESSAGE] then + obj.event_handler[_M.EVENT_CLIENT_MESSAGE](message, additional) + end + end) + + return obj +end + + +function _M.on(self, event, cb) + self.event_handler[event] = cb +end + + +function _M.start(self) + self.message_broker:start() + + -- ping loop + local ping = thread_spwan(function() + while true do + if self.need_exit then + break + end + + self.next_ping_id = self.next_ping_id + 1 + local ok, err = self.transport:send( + '{"jsonrpc": "2.0","method": "ping","id":"ping:' .. self.next_ping_id .. '"}') + if not ok then + core.log.info("session ", self.session_id, + " exit, failed to send ping message: ", err) + self.need_exit = true + break + end + ngx_sleep(30) + end + end) + thread_wait(ping) + thread_kill(ping) +end + + +function _M.close(self) + if self.message_broker then + self.message_broker:close() + end +end + + +function _M.push_message(self, message) + local ok, err = self.message_broker:push(message) + if not ok then + return nil, "failed to push message to broker: " .. err + end + return true +end + + +return _M diff --git a/apisix/plugins/mcp/server_wrapper.lua b/apisix/plugins/mcp/server_wrapper.lua new file mode 100644 index 000000000000..5b0ed8831bd0 --- /dev/null +++ b/apisix/plugins/mcp/server_wrapper.lua @@ -0,0 +1,106 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local ngx = ngx +local ngx_exit = ngx.exit +local re_match = ngx.re.match +local core = require("apisix.core") +local mcp_server = require("apisix.plugins.mcp.server") + +local _M = {} + +local V241105_ENDPOINT_SSE = "sse" +local V241105_ENDPOINT_MESSAGE = "message" + + +local function sse_handler(conf, ctx, opts) + -- send SSE headers and first chunk + core.response.set_header("Content-Type", "text/event-stream") + core.response.set_header("Cache-Control", "no-cache") + + local server = opts.server + + -- send endpoint event to advertise the message endpoint + server.transport:send(conf.base_uri .. "/message?sessionId=" .. server.session_id, "endpoint") + + if opts.event_handler and opts.event_handler.on_client_message then + server:on(mcp_server.EVENT_CLIENT_MESSAGE, function(message, additional) + additional.server = server + opts.event_handler.on_client_message(message, additional) + end) + end + + if opts.event_handler and opts.event_handler.on_connect then + local code, body = opts.event_handler.on_connect({ server = server }) + if code then + return code, body + end + server:start() -- this is a sync call that only returns when the client disconnects + end + + if opts.event_handler.on_disconnect then + opts.event_handler.on_disconnect({ server = server }) + server:close() + end + + ngx_exit(0) -- exit current phase, skip the upstream module +end + + +local function message_handler(conf, ctx, opts) + local body = core.request.get_body(nil, ctx) + if not body then + return 400 + end + + local ok, err = opts.server:push_message(body) + if not ok then + core.log.error("failed to add task to queue: ", err) + return 500 + end + + return 202 +end + + +function _M.access(conf, ctx, opts) + local m, err = re_match(ctx.var.uri, "^" .. conf.base_uri .. "/(.*)", "jo") + if err then + core.log.info("failed to mcp base uri: ", err) + return core.response.exit(404) + end + local action = m and m[1] or false + if not action then + return core.response.exit(404) + end + + if action == V241105_ENDPOINT_SSE and core.request.get_method() == "GET" then + opts.server = mcp_server.new({}) + return sse_handler(conf, ctx, opts) + end + + if action == V241105_ENDPOINT_MESSAGE and core.request.get_method() == "POST" then + -- TODO: check ctx.var.arg_sessionId + -- recover server instead of create + opts.server = mcp_server.new({ session_id = ctx.var.arg_sessionId }) + return core.response.exit(message_handler(conf, ctx, opts)) + end + + return core.response.exit(404) +end + + +return _M diff --git a/apisix/plugins/mcp/session.lua b/apisix/plugins/mcp/session.lua deleted file mode 100644 index 4e751e86b240..000000000000 --- a/apisix/plugins/mcp/session.lua +++ /dev/null @@ -1,122 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one or more --- contributor license agreements. See the NOTICE file distributed with --- this work for additional information regarding copyright ownership. --- The ASF licenses this file to You under the Apache License, Version 2.0 --- (the "License"); you may not use this file except in compliance with --- the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- -local type = type -local rawget = rawget -local rawset = rawset -local setmetatable = setmetatable -local ngx = ngx -local shared_dict = ngx.shared["mcp-session"] -local core = require("apisix.core") - -local SESSION_LAST_ACTIVE_AT = "_last_active_at" -local SESSION_THRESHOLD_PING = 30000 --TODO allow customize -local SESSION_THRESHOLD_TIMEOUT = 60000 --TODO allow customize -local STORAGE_SUFFIX_LAST_ACTIVE_AT = ":last_active_at" -local STORAGE_SUFFIX_PING_ID = ":ping_id" -local STORAGE_SUFFIX_QUEUE = ":queue" - -local _M = {} -local mt = { - __index = function (table, key) - if key == SESSION_LAST_ACTIVE_AT then - return shared_dict:get(table.id .. STORAGE_SUFFIX_LAST_ACTIVE_AT) or 0 - end - return rawget(table, key) or _M[key] - end, - __newindex = function (table, key, value) - if key == SESSION_LAST_ACTIVE_AT then - shared_dict:set(table.id .. STORAGE_SUFFIX_LAST_ACTIVE_AT, value) - else - rawset(table, key, value) - end - end -} - -local function gen_session_id() - return core.id.gen_uuid_v4() -end - - -function _M.new() - local session = setmetatable({ - id = gen_session_id(), - }, mt) - shared_dict:set(session.id, core.json.encode(session)) - shared_dict:set(session.id .. STORAGE_SUFFIX_LAST_ACTIVE_AT, ngx.time()) - shared_dict:set(session.id .. STORAGE_SUFFIX_PING_ID, 0) - return session -end - --- for state machine -function _M.session_initialize(self, params) - self.protocol_version = params.protocolVersion - self.client_info = params.clientInfo - self.capabilities = params.capabilities - self.state = _M.STATE_INITIALIZED -end - - -function _M.session_need_ping(self) - return self[SESSION_LAST_ACTIVE_AT] + SESSION_THRESHOLD_PING / 1000 <= ngx.time() -end - - -function _M.session_timed_out(self) - return self[SESSION_LAST_ACTIVE_AT] + SESSION_THRESHOLD_TIMEOUT / 1000 <= ngx.time() -end - - -function _M.session_next_ping_id(self) - return shared_dict:incr(self.id .. STORAGE_SUFFIX_PING_ID, 1) -end - - -function _M.on_session_pong(self) - self[SESSION_LAST_ACTIVE_AT] = ngx.time() -end - - -function _M.push_message_queue(self, task) - return shared_dict:rpush(self.id .. STORAGE_SUFFIX_QUEUE, task) -end - - -function _M.pop_message_queue(self) - return shared_dict:lpop(self.id .. STORAGE_SUFFIX_QUEUE) -end - - -function _M.destroy(self) - shared_dict:delete(self.id) - shared_dict:delete(self.id .. STORAGE_SUFFIX_LAST_ACTIVE_AT) - shared_dict:delete(self.id .. STORAGE_SUFFIX_PING_ID) - shared_dict:delete(self.id .. STORAGE_SUFFIX_QUEUE) -end - - -function _M.recover(session_id) - local session, err = shared_dict:get(session_id) - if not session then - return nil, err - end - if type(session) ~= "string" then - return nil, "session data is invalid" - end - return setmetatable(core.json.decode(session), mt) -end - -return _M diff --git a/apisix/plugins/mcp/transport/sse.lua b/apisix/plugins/mcp/transport/sse.lua new file mode 100644 index 000000000000..83d72a189a83 --- /dev/null +++ b/apisix/plugins/mcp/transport/sse.lua @@ -0,0 +1,44 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local setmetatable = setmetatable +local type = type +local ngx = ngx +local ngx_print = ngx.print +local ngx_flush = ngx.flush +local core = require("apisix.core") + +local _M = {} +local mt = { __index = _M } + + +function _M.new() + return setmetatable({}, mt) +end + + +function _M.send(self, message, event_type) + local data = type(message) == "table" and core.json.encode(message) or message + local ok, err = ngx_print("event: " .. (event_type or "message") .. + "\ndata: " .. data .. "\n\n") + if not ok then + return ok, "failed to write buffer: " .. err + end + return ngx_flush(true) +end + + +return _M