From 4219b480aa4bc3f88b6fc615338641a74d5ca440 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Tue, 18 Feb 2025 00:20:47 +0800 Subject: [PATCH 1/2] fix(server): fix deadlock caused by session close --- server/session.go | 9 ++------- server/session_manager.go | 7 ++----- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/server/session.go b/server/session.go index 2b66f61b..226b707a 100644 --- a/server/session.go +++ b/server/session.go @@ -76,6 +76,8 @@ func startSession(sessionId SessionId, sessionMetadata *proto.SessionMetadata, s } func (s *session) closeChannels() { + s.Lock() + defer s.Unlock() s.cancel() if s.heartbeatCh != nil { close(s.heartbeatCh) @@ -84,11 +86,6 @@ func (s *session) closeChannels() { s.log.Debug("Session channels closed") } -func (s *session) close() error { - s.log.Info("Session closing") - return s.delete() -} - func (s *session) delete() error { // Delete ephemeral data associated with this session sessionKey := SessionKey(s.id) @@ -171,7 +168,6 @@ func (s *session) waitForHeartbeats() { case <-timeoutCh: s.log.Warn("Session expired") - s.Lock() s.closeChannels() err := s.delete() @@ -181,7 +177,6 @@ func (s *session) waitForHeartbeats() { slog.Any("error", err), ) } - s.Unlock() s.sm.Lock() s.sm.sessions.Remove(s.id) diff --git a/server/session_manager.go b/server/session_manager.go index 8d9bd134..717bac62 100644 --- a/server/session_manager.go +++ b/server/session_manager.go @@ -202,10 +202,9 @@ func (sm *sessionManager) CloseSession(request *proto.CloseSessionRequest) (*pro } sm.sessions.Remove(s.id) sm.Unlock() - s.Lock() - defer s.Unlock() s.closeChannels() - err = s.close() + s.log.Info("Session closing") + err = s.delete() if err != nil { return nil, err } @@ -295,9 +294,7 @@ func (sm *sessionManager) Close() error { sm.cancel() for _, s := range sm.sessions.Values() { sm.sessions.Remove(s.id) - s.Lock() s.closeChannels() - s.Unlock() } sm.activeSessions.Unregister() From 695f1ebd74dce5fb44ff12d28cd52307bbedab40 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Tue, 18 Feb 2025 00:36:42 +0800 Subject: [PATCH 2/2] use interface --- server/session.go | 6 ++++-- server/session_manager.go | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/server/session.go b/server/session.go index 226b707a..e3a4dfc4 100644 --- a/server/session.go +++ b/server/session.go @@ -17,6 +17,7 @@ package server import ( "context" "fmt" + "io" "log/slog" "net/url" "sync" @@ -30,6 +31,7 @@ import ( // --- Session type session struct { + io.Closer sync.Mutex id SessionId clientIdentity string @@ -75,7 +77,7 @@ func startSession(sessionId SessionId, sessionMetadata *proto.SessionMetadata, s return s } -func (s *session) closeChannels() { +func (s *session) Close() { s.Lock() defer s.Unlock() s.cancel() @@ -168,7 +170,7 @@ func (s *session) waitForHeartbeats() { case <-timeoutCh: s.log.Warn("Session expired") - s.closeChannels() + s.Close() err := s.delete() if err != nil { diff --git a/server/session_manager.go b/server/session_manager.go index 717bac62..648c9210 100644 --- a/server/session_manager.go +++ b/server/session_manager.go @@ -202,8 +202,9 @@ func (sm *sessionManager) CloseSession(request *proto.CloseSessionRequest) (*pro } sm.sessions.Remove(s.id) sm.Unlock() - s.closeChannels() + s.log.Info("Session closing") + s.Close() err = s.delete() if err != nil { return nil, err @@ -294,7 +295,7 @@ func (sm *sessionManager) Close() error { sm.cancel() for _, s := range sm.sessions.Values() { sm.sessions.Remove(s.id) - s.closeChannels() + s.Close() } sm.activeSessions.Unregister()