From 13b3d398a68d5924e9d5365a302b16c2804a1124 Mon Sep 17 00:00:00 2001
From: Daijiro Fukuda <fukuda@clear-code.com>
Date: Mon, 21 Oct 2024 12:22:29 +0900
Subject: [PATCH] socket_manager: add feature to take over another server

Another process can take over UDP/TCP sockets without downtime.

    server = ServerEngine::SocketManager::Server.take_over_another_server(path)

This starts a new server that has all UDP/TCP sockets of the
existing server.
The old process should stop without removing the file for the
socket after the new process starts.

This may not be the primary use case assumed by ServerEngine, but
we need this feature to replace both the server and the workers
with a new process without downtime.
Currently, ServerEngine does not provide this feature for
network servers.

At the moment, I assume that the application side uses this
feature ad hoc, but, in the future, this could be used to support
live reload for entire network servers.

ref: https://github.com/fluent/fluentd/issues/4622

Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
Co-authored-by: Shizuo Fujita <fujita@clear-code.com>
---
 README.md                               |   8 +-
 lib/serverengine/socket_manager.rb      |  16 ++-
 lib/serverengine/socket_manager_unix.rb | 129 +++++++++++++-------
 lib/serverengine/socket_manager_win.rb  |  40 +++----
 spec/socket_manager_spec.rb             | 151 ++++++++++++++++++++++++
 5 files changed, 279 insertions(+), 65 deletions(-)

diff --git a/README.md b/README.md
index 3edf48b..6031c42 100644
--- a/README.md
+++ b/README.md
@@ -413,8 +413,14 @@ se = ServerEngine.create(MyServer, MyWorker, {
 se.run
 ```
 
-See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
+Other features:
+
+- `socket_manager_server = SocketManager::Server.take_over_another_server(path)`
+  - It starts a new manager server that has all UDP/TCP sockets of the existing manager.
+  - It means that another process can take over UDP/TCP sockets without downtime.
+  - The old process should stop without removing the file for the socket after the new process starts.
 
+See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
 
 ## Module API
 
diff --git a/lib/serverengine/socket_manager.rb b/lib/serverengine/socket_manager.rb
index fe0e484..83d5f42 100644
--- a/lib/serverengine/socket_manager.rb
+++ b/lib/serverengine/socket_manager.rb
@@ -96,14 +96,22 @@ def self.open(path = nil)
         end
       end
 
-      def initialize(path)
+      def self.take_over_another_server(path)
+        raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
+        server = new(path, start: false)
+        server.take_over_another_server
+        server
+      end
+
+      def initialize(path, start: true)
         @tcp_sockets = {}
         @udp_sockets = {}
         @mutex = Mutex.new
-        @path = start_server(path)
+        @path = start ? start_server(path) : path
       end
 
       attr_reader :path
+      attr_reader :tcp_sockets, :udp_sockets # for tests
 
       def new_client
         Client.new(@path)
@@ -159,9 +167,9 @@ def process_peer(peer)
           res = SocketManager.recv_peer(peer)
           return if res.nil?
 
-          pid, method, bind, port = *res
+          pid, method, *opts = res
           begin
-            send_socket(peer, pid, method, bind, port)
+            send_socket(peer, pid, method, *opts)
           rescue => e
             SocketManager.send_peer(peer, e)
           end
diff --git a/lib/serverengine/socket_manager_unix.rb b/lib/serverengine/socket_manager_unix.rb
index 625a831..addfb70 100644
--- a/lib/serverengine/socket_manager_unix.rb
+++ b/lib/serverengine/socket_manager_unix.rb
@@ -47,6 +47,67 @@ def recv_udp(family, peer, sent)
     end
 
     module ServerModule
+      def start_server(path)
+        unless @server
+          # return absolute path so that client can connect to this path
+          # when client changed working directory
+          path = File.expand_path(path)
+
+          begin
+            old_umask = File.umask(0077) # Protect unix socket from other users
+            @server = UNIXServer.new(path)
+          ensure
+            File.umask(old_umask)
+          end
+        end
+
+        @thread = Thread.new do
+          begin
+            while peer = @server.accept
+              Thread.new(peer, &method(:process_peer))  # process_peer calls send_socket
+            end
+          rescue => e
+            unless @server.closed?
+              ServerEngine.dump_uncaught_error(e)
+            end
+          end
+        end
+
+        return path
+      end
+
+      def take_over_another_server
+        another_server = UNIXSocket.new(@path)
+        begin
+          idx = 0
+          while true
+            SocketManager.send_peer(another_server, [Process.pid, :get_listening_tcp, idx])
+            key = SocketManager.recv_peer(another_server)
+            break if key.nil?
+            @tcp_sockets[key] = another_server.recv_io TCPServer
+            idx += 1
+          end
+
+          idx = 0
+          while true
+            SocketManager.send_peer(another_server, [Process.pid, :get_listening_udp, idx])
+            key = SocketManager.recv_peer(another_server)
+            break if key.nil?
+            @udp_sockets[key] = another_server.recv_io UDPSocket
+            idx += 1
+          end
+
+          SocketManager.send_peer(another_server, [Process.pid, :get_unix])
+          res = SocketManager.recv_peer(another_server)
+          raise res if res.is_a?(Exception)
+          @server = another_server.recv_io UNIXServer
+
+          start_server(@path)
+        ensure
+          another_server.close
+        end
+      end
+
       private
 
       def listen_tcp_new(bind_ip, port)
@@ -76,33 +137,6 @@ def listen_udp_new(bind_ip, port)
         UDPSocket.for_fd(usock.fileno)
       end
 
-      def start_server(path)
-        # return absolute path so that client can connect to this path
-        # when client changed working directory
-        path = File.expand_path(path)
-
-        begin
-          old_umask = File.umask(0077) # Protect unix socket from other users
-          @server = UNIXServer.new(path)
-        ensure
-          File.umask(old_umask)
-        end
-
-        @thread = Thread.new do
-          begin
-            while peer = @server.accept
-              Thread.new(peer, &method(:process_peer))  # process_peer calls send_socket
-            end
-          rescue => e
-            unless @server.closed?
-              ServerEngine.dump_uncaught_error(e)
-            end
-          end
-        end
-
-        return path
-      end
-
       def stop_server
         @tcp_sockets.reject! {|key,lsock| lsock.close; true }
         @udp_sockets.reject! {|key,usock| usock.close; true }
@@ -111,19 +145,34 @@ def stop_server
         @thread.join if RUBY_VERSION >= "2.2"
       end
 
-      def send_socket(peer, pid, method, bind, port)
-        sock = case method
-               when :listen_tcp
-                 listen_tcp(bind, port)
-               when :listen_udp
-                 listen_udp(bind, port)
-               else
-                 raise ArgumentError, "Unknown method: #{method.inspect}"
-               end
-
-        SocketManager.send_peer(peer, nil)
-
-        peer.send_io sock
+      def send_socket(peer, pid, method, *opts)
+        case method
+        when :listen_tcp
+          bind, port = opts
+          sock = listen_tcp(bind, port)
+          SocketManager.send_peer(peer, nil)
+          peer.send_io sock
+        when :listen_udp
+          bind, port = opts
+          sock = listen_udp(bind, port)
+          SocketManager.send_peer(peer, nil)
+          peer.send_io sock
+        when :get_listening_tcp
+          idx, = opts
+          key = @tcp_sockets.keys[idx]
+          SocketManager.send_peer(peer, key)
+          peer.send_io(@tcp_sockets.values[idx]) if key
+        when :get_listening_udp
+          idx, = opts
+          key = @udp_sockets.keys[idx]
+          SocketManager.send_peer(peer, key)
+          peer.send_io(@udp_sockets.values[idx]) if key
+        when :get_unix
+          SocketManager.send_peer(peer, nil)
+          peer.send_io @server
+        else
+          raise ArgumentError, "Unknown method: #{method.inspect}"
+        end
       end
     end
 
diff --git a/lib/serverengine/socket_manager_win.rb b/lib/serverengine/socket_manager_win.rb
index f7a7e26..42acaa6 100644
--- a/lib/serverengine/socket_manager_win.rb
+++ b/lib/serverengine/socket_manager_win.rb
@@ -58,6 +58,26 @@ def recv_udp(family, peer, sent)
     end
 
     module ServerModule
+      def start_server(addr)
+        # We need to take care about selecting an available port.
+        # By passing `nil` or `0` as `addr`, an available port is automatically selected.
+        # However, we should consider using NamedPipe instead of TCPServer.
+        @server = TCPServer.new("127.0.0.1", addr)
+        @thread = Thread.new do
+          begin
+            while peer = @server.accept
+              Thread.new(peer, &method(:process_peer))  # process_peer calls send_socket
+            end
+          rescue => e
+            unless @server.closed?
+              ServerEngine.dump_uncaught_error(e)
+            end
+          end
+        end
+
+        return @server.addr[1]
+      end
+
       private
 
       TCP_OPTIONS = [Socket::SOCK_STREAM, Socket::IPPROTO_TCP, TCPServer, true]
@@ -107,26 +127,6 @@ def htons(h)
         [h].pack("S").unpack("n")[0]
       end
 
-      def start_server(addr)
-        # We need to take care about selecting an available port.
-        # By passing `nil` or `0` as `addr`, an available port is automatically selected.
-        # However, we should consider using NamedPipe instead of TCPServer.
-        @server = TCPServer.new("127.0.0.1", addr)
-        @thread = Thread.new do
-          begin
-            while peer = @server.accept
-              Thread.new(peer, &method(:process_peer))  # process_peer calls send_socket
-            end
-          rescue => e
-            unless @server.closed?
-              ServerEngine.dump_uncaught_error(e)
-            end
-          end
-        end
-
-        return @server.addr[1]
-      end
-
       def stop_server
         @tcp_sockets.reject! {|key,lsock| lsock.close; true }
         @udp_sockets.reject! {|key,usock| usock.close; true }
diff --git a/spec/socket_manager_spec.rb b/spec/socket_manager_spec.rb
index c74e877..d8f04df 100644
--- a/spec/socket_manager_spec.rb
+++ b/spec/socket_manager_spec.rb
@@ -55,6 +55,15 @@
         expect(server.path).to be_between(49152, 65535)
       end
     end
+
+    context 'Server.take_over_another_server' do
+      it 'not supported' do
+        server = SocketManager::Server.open(server_path)
+        expect { SocketManager::Server.take_over_another_server(server_path) }.to raise_error(NotImplementedError)
+      ensure
+        server.close
+      end
+    end
   else
     context 'Server.generate_path' do
       it 'returns socket path under /tmp' do
@@ -76,6 +85,148 @@
         expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_')
       end
     end
+
+    context 'Server.take_over_another_server' do
+      it 'takes over listen sockets to another server' do
+        server = SocketManager::Server.open(server_path)
+
+        client = SocketManager::Client.new(server_path)
+        tcp1 = client.listen_tcp('127.0.0.1', 55551)
+        udp1 = client.listen_udp('127.0.0.1', 55561)
+        udp2 = client.listen_udp('127.0.0.1', 55562)
+
+        another_server = SocketManager::Server.take_over_another_server(server_path)
+
+        expect(another_server.tcp_sockets.size).to eq(1)
+        expect(another_server.tcp_sockets['localhost:55551'].addr).to eq(['AF_INET', 55551, '127.0.0.1', '127.0.0.1'])
+
+        expect(another_server.udp_sockets.size).to eq(2)
+        expect(another_server.udp_sockets['localhost:55561'].addr).to eq(['AF_INET', 55561, '127.0.0.1', '127.0.0.1'])
+        expect(another_server.udp_sockets['localhost:55562'].addr).to eq(['AF_INET', 55562, '127.0.0.1', '127.0.0.1'])
+      ensure
+        tcp1&.close
+        udp1&.close
+        udp2&.close
+        server&.close
+        another_server&.close
+      end
+
+      it 'takes over TCP sockets without downtime' do
+        manager_server = SocketManager::Server.open(server_path)
+        manager_client = SocketManager::Client.new(server_path)
+
+        has_server_started = false
+        thread_server = Thread.new do
+          server = manager_client.listen_tcp('127.0.0.1', test_port)
+          has_server_started = true
+          while socket = server.accept
+            incr_test_state(:count)
+            socket.close
+          end
+        ensure
+          server&.close
+        end
+
+        sleep 0.1 until has_server_started
+
+        thread_client = Thread.new do
+          100.times do |i|
+            socket = TCPSocket.new('127.0.0.1', test_port)
+            begin
+              socket.write("Hello #{i}\n")
+            ensure
+              socket.close
+            end
+            sleep 0.01
+          end
+        end
+
+        sleep 0.5
+
+        thread_new_server = Thread.new do
+          new_manager_server = SocketManager::Server.take_over_another_server(server_path)
+          server = manager_client.listen_tcp('127.0.0.1', test_port)
+          while socket = server.accept
+            incr_test_state(:count)
+            socket.close
+          end
+        ensure
+          new_manager_server&.close
+          server&.close
+        end
+
+        thread_server.kill
+        thread_server.join
+
+        thread_client.join
+        wait_for_stop
+
+        expect(test_state(:count)).to eq(100)
+      ensure
+        manager_server&.close
+        thread_server&.kill
+        thread_new_server&.kill
+        thread_server&.join
+        thread_new_server&.join
+      end
+
+      it 'takes over UDP sockets without downtime' do
+        manager_server = SocketManager::Server.open(server_path)
+        manager_client = SocketManager::Client.new(server_path)
+
+        has_server_started = false
+        thread_server = Thread.new do
+          server = manager_client.listen_udp('127.0.0.1', test_port)
+          has_server_started = true
+          while server.recv(10)
+            incr_test_state(:count)
+          end
+        ensure
+          server&.close
+        end
+
+        sleep 0.1 until has_server_started
+
+        thread_client = Thread.new do
+          100.times do |i|
+            socket = UDPSocket.new
+            begin
+              socket.send("Hello #{i}\n", 0, "127.0.0.1", test_port)
+            ensure
+              socket.close
+            end
+            sleep 0.01
+          end
+        end
+
+        sleep 0.5
+
+        thread_new_server = Thread.new do
+          new_manager_server = SocketManager::Server.take_over_another_server(server_path)
+          server = manager_client.listen_udp('127.0.0.1', test_port)
+          while server.recv(10)
+            incr_test_state(:count)
+          end
+        ensure
+          new_manager_server&.close
+          server&.close
+        end
+
+        thread_server.kill
+        thread_server.join
+
+        thread_client.join
+        wait_for_stop
+
+        expect(test_state(:count)).to eq(100)
+      ensure
+        manager_server&.close
+        thread_server&.kill
+        thread_new_server&.kill
+        thread_server&.join
+        thread_new_server&.join
+      end
+    end
   end
 
   context 'with thread' do