From 14a99a8693ec4265ce5d6cf54986edb63cd59c9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 29 Jan 2025 13:25:03 +0100 Subject: [PATCH 01/13] Remove the on reconnect callback from client layer. This event will be managed by guard. --- relay/client/client.go | 18 ---------- relay/client/guard.go | 79 +++++++++++++++++++++++++++++------------ relay/client/manager.go | 10 +++++- 3 files changed, 65 insertions(+), 42 deletions(-) diff --git a/relay/client/client.go b/relay/client/client.go index 3c23b70d27d..9e7e54393d4 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -141,7 +141,6 @@ type Client struct { muInstanceURL sync.Mutex onDisconnectListener func(string) - onConnectedListener func() listenerMutex sync.Mutex } @@ -190,7 +189,6 @@ func (c *Client) Connect() error { c.wgReadLoop.Add(1) go c.readLoop(c.relayConn) - go c.notifyConnected() return nil } @@ -238,12 +236,6 @@ func (c *Client) SetOnDisconnectListener(fn func(string)) { c.onDisconnectListener = fn } -func (c *Client) SetOnConnectedListener(fn func()) { - c.listenerMutex.Lock() - defer c.listenerMutex.Unlock() - c.onConnectedListener = fn -} - // HasConns returns true if there are connections. func (c *Client) HasConns() bool { c.mu.Lock() @@ -559,16 +551,6 @@ func (c *Client) notifyDisconnected() { go c.onDisconnectListener(c.connectionURL) } -func (c *Client) notifyConnected() { - c.listenerMutex.Lock() - defer c.listenerMutex.Unlock() - - if c.onConnectedListener == nil { - return - } - go c.onConnectedListener() -} - func (c *Client) writeCloseMsg() { msg := messages.MarshalCloseMsg() _, err := c.relayConn.Write(msg) diff --git a/relay/client/guard.go b/relay/client/guard.go index b971363a878..457c5d3ae76 100644 --- a/relay/client/guard.go +++ b/relay/client/guard.go @@ -14,8 +14,9 @@ var ( // Guard manage the reconnection tries to the Relay server in case of disconnection event. type Guard struct { - // OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance. + // OnNewRelayClient is a channel that is used to notify the relay manager about a new relay client instance. OnNewRelayClient chan *Client + OnReconnected chan struct{} serverPicker *ServerPicker } @@ -23,6 +24,7 @@ type Guard struct { func NewGuard(sp *ServerPicker) *Guard { g := &Guard{ OnNewRelayClient: make(chan *Client, 1), + OnReconnected: make(chan struct{}, 1), serverPicker: sp, } return g @@ -39,14 +41,13 @@ func NewGuard(sp *ServerPicker) *Guard { // - relayClient: The relay client instance that was disconnected. // todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) { - if relayClient == nil { - goto RETRY - } - if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) { + // try to reconnect to the same server + if ok := g.tryToQuickReconnect(ctx, relayClient); ok { + g.notifyReconnected() return } -RETRY: + // start a ticker to pick a new server ticker := exponentTicker(ctx) defer ticker.Stop() @@ -64,28 +65,19 @@ RETRY: } } -func (g *Guard) retry(ctx context.Context) error { - log.Infof("try to pick up a new Relay server") - relayClient, err := g.serverPicker.PickServer(ctx) - if err != nil { - return err +func (g *Guard) tryToQuickReconnect(parentCtx context.Context, rc *Client) bool { + if rc == nil { + return false } - // prevent to work with a deprecated Relay client instance - g.drainRelayClientChan() - - g.OnNewRelayClient <- relayClient - return nil -} - -func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool { - ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond) - defer cancel() - <-ctx.Done() + if !g.isServerURLStillValid(rc) { + return false + } - if parentCtx.Err() != nil { + if cancelled := waiteBeforeRetry(parentCtx); !cancelled { return false } + log.Infof("try to reconnect to Relay server: %s", rc.connectionURL) if err := rc.Connect(); err != nil { @@ -95,6 +87,20 @@ func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool { return true } +func (g *Guard) retry(ctx context.Context) error { + log.Infof("try to pick up a new Relay server") + relayClient, err := g.serverPicker.PickServer(ctx) + if err != nil { + return err + } + + // prevent to work with a deprecated Relay client instance + g.drainRelayClientChan() + + g.OnNewRelayClient <- relayClient + return nil +} + func (g *Guard) drainRelayClientChan() { select { case <-g.OnNewRelayClient: @@ -111,6 +117,21 @@ func (g *Guard) isServerURLStillValid(rc *Client) bool { return false } +func (g *Guard) notifyReconnected() { + select { + case g.OnReconnected <- struct{}{}: + default: + } +} + +func (g *Guard) isReadyToQuickReconnect(relayClient *Client) bool { + if relayClient == nil { + return false + } + + return g.isServerURLStillValid(relayClient) +} + func exponentTicker(ctx context.Context) *backoff.Ticker { bo := backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 2 * time.Second, @@ -121,3 +142,15 @@ func exponentTicker(ctx context.Context) *backoff.Ticker { return backoff.NewTicker(bo) } + +func waiteBeforeRetry(ctx context.Context) bool { + timer := time.NewTimer(1500 * time.Millisecond) + defer timer.Stop() + + select { + case <-timer.C: + return true + case <-ctx.Done(): + return false + } +} diff --git a/relay/client/manager.go b/relay/client/manager.go index d847bb879f1..26b11305058 100644 --- a/relay/client/manager.go +++ b/relay/client/manager.go @@ -165,6 +165,9 @@ func (m *Manager) Ready() bool { } func (m *Manager) SetOnReconnectedListener(f func()) { + m.listenerLock.Lock() + defer m.listenerLock.Unlock() + m.onReconnectedListenerFn = f } @@ -284,6 +287,9 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { } func (m *Manager) onServerConnected() { + m.listenerLock.Lock() + defer m.listenerLock.Unlock() + if m.onReconnectedListenerFn == nil { return } @@ -304,8 +310,11 @@ func (m *Manager) onServerDisconnected(serverAddress string) { func (m *Manager) listenGuardEvent(ctx context.Context) { for { select { + case <-m.reconnectGuard.OnReconnected: + m.onServerConnected() case rc := <-m.reconnectGuard.OnNewRelayClient: m.storeClient(rc) + m.onServerConnected() case <-ctx.Done(): return } @@ -317,7 +326,6 @@ func (m *Manager) storeClient(client *Client) { defer m.relayClientMu.Unlock() m.relayClient = client - m.relayClient.SetOnConnectedListener(m.onServerConnected) m.relayClient.SetOnDisconnectListener(m.onServerDisconnected) } From 5d403a79ba285d09ee4c132daf6c8c73e885f78e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 29 Jan 2025 14:54:24 +0100 Subject: [PATCH 02/13] Remove unused function --- relay/client/guard.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/relay/client/guard.go b/relay/client/guard.go index 457c5d3ae76..554330ea318 100644 --- a/relay/client/guard.go +++ b/relay/client/guard.go @@ -124,14 +124,6 @@ func (g *Guard) notifyReconnected() { } } -func (g *Guard) isReadyToQuickReconnect(relayClient *Client) bool { - if relayClient == nil { - return false - } - - return g.isServerURLStillValid(relayClient) -} - func exponentTicker(ctx context.Context) *backoff.Ticker { bo := backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 2 * time.Second, From 759544f2c37569413aa2e3a349e03356bae7cf6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 27 Jan 2025 19:02:04 +0100 Subject: [PATCH 03/13] Move wg watcher code to separated file and add tests --- client/internal/peer/wg_watcher.go | 123 ++++++++++++++++++++++++ client/internal/peer/wg_watcher_test.go | 98 +++++++++++++++++++ client/internal/peer/worker_relay.go | 108 +++------------------ 3 files changed, 235 insertions(+), 94 deletions(-) create mode 100644 client/internal/peer/wg_watcher.go create mode 100644 client/internal/peer/wg_watcher_test.go diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go new file mode 100644 index 00000000000..a05f4f83ab6 --- /dev/null +++ b/client/internal/peer/wg_watcher.go @@ -0,0 +1,123 @@ +package peer + +import ( + "context" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/iface/configurer" +) + +const ( + wgHandshakePeriod = 3 * time.Minute +) + +var ( + wgHandshakeOvertime = 30 * time.Second + wgReadErrorRetry = 5 * time.Second + checkPeriod = wgHandshakePeriod + wgHandshakeOvertime +) + +type WGInterfaceStater interface { + GetStats(key string) (configurer.WGStats, error) +} + +type WGWatcher struct { + log *log.Entry + wgIfaceStater WGInterfaceStater + peerKey string + + ctx context.Context + ctxCancel context.CancelFunc + ctxLock sync.Mutex +} + +func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string) *WGWatcher { + return &WGWatcher{ + log: log, + wgIfaceStater: wgIfaceStater, + peerKey: peerKey, + } +} + +func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) { + w.log.Debugf("enable WireGuard watcher") + w.ctxLock.Lock() + defer w.ctxLock.Unlock() + + if w.ctx != nil && w.ctx.Err() == nil { + return + } + + ctx, ctxCancel := context.WithCancel(parentCtx) + w.ctx = ctx + w.ctxCancel = ctxCancel + + w.wgStateCheck(ctx, ctxCancel, onDisconnectedFn) +} + +func (w *WGWatcher) DisableWgWatcher() { + w.ctxLock.Lock() + defer w.ctxLock.Unlock() + + if w.ctxCancel == nil { + return + } + + w.log.Debugf("disable WireGuard watcher") + + w.ctxCancel() +} + +// wgStateCheck help to check the state of the WireGuard handshake and relay connection +func (w *WGWatcher) wgStateCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func()) { + w.log.Debugf("WireGuard watcher started") + lastHandshake, err := w.wgState() + if err != nil { + w.log.Warnf("failed to read wg stats: %v", err) + lastHandshake = time.Time{} + } + + go func(lastHandshake time.Time) { + timer := time.NewTimer(wgHandshakeOvertime) + defer timer.Stop() + defer ctxCancel() + + for { + select { + case <-timer.C: + handshake, err := w.wgState() + if err != nil { + w.log.Errorf("failed to read wg stats: %v", err) + timer.Reset(wgReadErrorRetry) + continue + } + + w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake) + + if handshake.Equal(lastHandshake) { + w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) + onDisconnectedFn() + return + } + + resetTime := time.Until(handshake.Add(checkPeriod)) + lastHandshake = handshake + timer.Reset(resetTime) + case <-ctx.Done(): + w.log.Debugf("WireGuard watcher stopped") + return + } + } + }(lastHandshake) +} + +func (w *WGWatcher) wgState() (time.Time, error) { + wgState, err := w.wgIfaceStater.GetStats(w.peerKey) + if err != nil { + return time.Time{}, err + } + return wgState.LastHandshake, nil +} diff --git a/client/internal/peer/wg_watcher_test.go b/client/internal/peer/wg_watcher_test.go new file mode 100644 index 00000000000..a5b9026adb1 --- /dev/null +++ b/client/internal/peer/wg_watcher_test.go @@ -0,0 +1,98 @@ +package peer + +import ( + "context" + "testing" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/iface/configurer" +) + +type MocWgIface struct { + initial bool + lastHandshake time.Time + stop bool +} + +func (m *MocWgIface) GetStats(key string) (configurer.WGStats, error) { + if !m.initial { + m.initial = true + return configurer.WGStats{}, nil + } + + if !m.stop { + m.lastHandshake = time.Now() + } + + stats := configurer.WGStats{ + LastHandshake: m.lastHandshake, + } + + return stats, nil +} + +func (m *MocWgIface) disconnect() { + m.stop = true +} + +func TestWGWatcher_EnableWgWatcher(t *testing.T) { + checkPeriod = 5 * time.Second + wgHandshakeOvertime = 1 * time.Second + + mlog := log.WithField("peer", "tet") + mocWgIface := &MocWgIface{} + watcher := NewWGWatcher(mlog, mocWgIface, "") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + onDisconnected := make(chan struct{}, 1) + watcher.EnableWgWatcher(ctx, func() { + mlog.Infof("onDisconnectedFn") + onDisconnected <- struct{}{} + }) + + // wait for initial reading + time.Sleep(2 * time.Second) + mocWgIface.disconnect() + + select { + case <-onDisconnected: + case <-time.After(10 * time.Second): + t.Errorf("timeout") + } + watcher.DisableWgWatcher() +} + +func TestWGWatcher_ReEnable(t *testing.T) { + checkPeriod = 5 * time.Second + wgHandshakeOvertime = 1 * time.Second + + mlog := log.WithField("peer", "tet") + mocWgIface := &MocWgIface{} + watcher := NewWGWatcher(mlog, mocWgIface, "") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + onDisconnected := make(chan struct{}, 1) + + watcher.EnableWgWatcher(ctx, func() {}) + watcher.DisableWgWatcher() + + watcher.EnableWgWatcher(ctx, func() { + onDisconnected <- struct{}{} + }) + + time.Sleep(2 * time.Second) + mocWgIface.disconnect() + + select { + case <-onDisconnected: + case <-time.After(10 * time.Second): + t.Errorf("timeout") + } + watcher.DisableWgWatcher() +} diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index c22dcdeda5d..d2df8cd360e 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -6,18 +6,12 @@ import ( "net" "sync" "sync/atomic" - "time" log "github.com/sirupsen/logrus" relayClient "github.com/netbirdio/netbird/relay/client" ) -var ( - wgHandshakePeriod = 3 * time.Minute - wgHandshakeOvertime = 30 * time.Second -) - type RelayConnInfo struct { relayedConn net.Conn rosenpassPubKey []byte @@ -36,13 +30,12 @@ type WorkerRelay struct { relayManager relayClient.ManagerService callBacks WorkerRelayCallbacks - relayedConn net.Conn - relayLock sync.Mutex - ctxWgWatch context.Context - ctxCancelWgWatch context.CancelFunc - ctxLock sync.Mutex + relayedConn net.Conn + relayLock sync.Mutex relaySupportedOnRemotePeer atomic.Bool + + wgWatcher *WGWatcher } func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay { @@ -52,6 +45,7 @@ func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager r config: config, relayManager: relayManager, callBacks: callbacks, + wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key), } return r } @@ -103,32 +97,11 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { } func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) { - w.log.Debugf("enable WireGuard watcher") - w.ctxLock.Lock() - defer w.ctxLock.Unlock() - - if w.ctxWgWatch != nil && w.ctxWgWatch.Err() == nil { - return - } - - ctx, ctxCancel := context.WithCancel(ctx) - w.ctxWgWatch = ctx - w.ctxCancelWgWatch = ctxCancel - - w.wgStateCheck(ctx, ctxCancel) + w.wgWatcher.EnableWgWatcher(ctx, w.disconnected) } func (w *WorkerRelay) DisableWgWatcher() { - w.ctxLock.Lock() - defer w.ctxLock.Unlock() - - if w.ctxCancelWgWatch == nil { - return - } - - w.log.Debugf("disable WireGuard watcher") - - w.ctxCancelWgWatch() + w.wgWatcher.DisableWgWatcher() } func (w *WorkerRelay) RelayInstanceAddress() (string, error) { @@ -150,57 +123,17 @@ func (w *WorkerRelay) CloseConn() { return } - err := w.relayedConn.Close() - if err != nil { + if err := w.relayedConn.Close(); err != nil { w.log.Warnf("failed to close relay connection: %v", err) } } -// wgStateCheck help to check the state of the WireGuard handshake and relay connection -func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.CancelFunc) { - w.log.Debugf("WireGuard watcher started") - lastHandshake, err := w.wgState() - if err != nil { - w.log.Warnf("failed to read wg stats: %v", err) - lastHandshake = time.Time{} - } - - go func(lastHandshake time.Time) { - timer := time.NewTimer(wgHandshakeOvertime) - defer timer.Stop() - defer ctxCancel() - - for { - select { - case <-timer.C: - handshake, err := w.wgState() - if err != nil { - w.log.Errorf("failed to read wg stats: %v", err) - timer.Reset(wgHandshakeOvertime) - continue - } - - w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake) - - if handshake.Equal(lastHandshake) { - w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) - w.relayLock.Lock() - _ = w.relayedConn.Close() - w.relayLock.Unlock() - w.callBacks.OnDisconnected() - return - } - - resetTime := time.Until(handshake.Add(wgHandshakePeriod + wgHandshakeOvertime)) - lastHandshake = handshake - timer.Reset(resetTime) - case <-ctx.Done(): - w.log.Debugf("WireGuard watcher stopped") - return - } - } - }(lastHandshake) +func (w *WorkerRelay) disconnected() { + w.relayLock.Lock() + _ = w.relayedConn.Close() + w.relayLock.Unlock() + w.callBacks.OnDisconnected() } func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool { @@ -217,20 +150,7 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st return remoteRelayAddress } -func (w *WorkerRelay) wgState() (time.Time, error) { - wgState, err := w.config.WgConfig.WgInterface.GetStats(w.config.Key) - if err != nil { - return time.Time{}, err - } - return wgState.LastHandshake, nil -} - func (w *WorkerRelay) onRelayMGDisconnected() { - w.ctxLock.Lock() - defer w.ctxLock.Unlock() - - if w.ctxCancelWgWatch != nil { - w.ctxCancelWgWatch() - } + w.wgWatcher.DisableWgWatcher() go w.callBacks.OnDisconnected() } From ef5e417cb7d9346d00fbd36a885f53dca68a2a61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 30 Jan 2025 13:37:00 +0100 Subject: [PATCH 04/13] If can not read WireGuard state then trigger reconnection --- client/internal/peer/wg_watcher.go | 83 ++++++++++++++++-------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index a05f4f83ab6..a43301e73bd 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -16,7 +16,6 @@ const ( var ( wgHandshakeOvertime = 30 * time.Second - wgReadErrorRetry = 5 * time.Second checkPeriod = wgHandshakePeriod + wgHandshakeOvertime ) @@ -51,11 +50,14 @@ func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn return } - ctx, ctxCancel := context.WithCancel(parentCtx) - w.ctx = ctx - w.ctxCancel = ctxCancel + w.ctx, w.ctxCancel = context.WithCancel(parentCtx) - w.wgStateCheck(ctx, ctxCancel, onDisconnectedFn) + initialHandshake, err := w.wgState() + if err != nil { + w.log.Warnf("failed to read wg stats: %v", err) + } + + go w.periodicHandshakeCheck(w.ctx, w.ctxCancel, onDisconnectedFn, initialHandshake) } func (w *WGWatcher) DisableWgWatcher() { @@ -72,46 +74,30 @@ func (w *WGWatcher) DisableWgWatcher() { } // wgStateCheck help to check the state of the WireGuard handshake and relay connection -func (w *WGWatcher) wgStateCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func()) { +func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) { w.log.Debugf("WireGuard watcher started") - lastHandshake, err := w.wgState() - if err != nil { - w.log.Warnf("failed to read wg stats: %v", err) - lastHandshake = time.Time{} - } - go func(lastHandshake time.Time) { - timer := time.NewTimer(wgHandshakeOvertime) - defer timer.Stop() - defer ctxCancel() - - for { - select { - case <-timer.C: - handshake, err := w.wgState() - if err != nil { - w.log.Errorf("failed to read wg stats: %v", err) - timer.Reset(wgReadErrorRetry) - continue - } - - w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake) - - if handshake.Equal(lastHandshake) { - w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) - onDisconnectedFn() - return - } - - resetTime := time.Until(handshake.Add(checkPeriod)) - lastHandshake = handshake - timer.Reset(resetTime) - case <-ctx.Done(): - w.log.Debugf("WireGuard watcher stopped") + timer := time.NewTimer(wgHandshakeOvertime) + defer timer.Stop() + defer ctxCancel() + + lastHandshake := initialHandshake + + for { + select { + case <-timer.C: + handshake, ok := w.handshakeCheck(lastHandshake) + if !ok { + onDisconnectedFn() return } + timer.Reset(time.Until(handshake.Add(checkPeriod))) + lastHandshake = *handshake + case <-ctx.Done(): + w.log.Debugf("WireGuard watcher stopped") + return } - }(lastHandshake) + } } func (w *WGWatcher) wgState() (time.Time, error) { @@ -121,3 +107,20 @@ func (w *WGWatcher) wgState() (time.Time, error) { } return wgState.LastHandshake, nil } + +func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { + handshake, err := w.wgState() + if err != nil { + w.log.Errorf("failed to read wg stats: %v", err) + return nil, false + } + + w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake) + + if handshake.Equal(lastHandshake) { + w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) + return nil, false + } + + return &handshake, true +} From b7cf77d3241c024e1394a0412e4eab18ee7bcb69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 30 Jan 2025 14:06:42 +0100 Subject: [PATCH 05/13] Wait DisableWgWatcher to exit from periodic check loop --- client/internal/peer/wg_watcher.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index a43301e73bd..ff94e352d17 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -28,9 +28,9 @@ type WGWatcher struct { wgIfaceStater WGInterfaceStater peerKey string - ctx context.Context ctxCancel context.CancelFunc ctxLock sync.Mutex + waitGroup sync.WaitGroup } func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string) *WGWatcher { @@ -41,25 +41,30 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin } } +// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing. func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) { w.log.Debugf("enable WireGuard watcher") w.ctxLock.Lock() defer w.ctxLock.Unlock() - if w.ctx != nil && w.ctx.Err() == nil { + if w.ctxCancel != nil { + w.log.Errorf("WireGuard watcher already enabled") return } - w.ctx, w.ctxCancel = context.WithCancel(parentCtx) + ctx, ctxCancel := context.WithCancel(parentCtx) + w.ctxCancel = ctxCancel initialHandshake, err := w.wgState() if err != nil { w.log.Warnf("failed to read wg stats: %v", err) } - go w.periodicHandshakeCheck(w.ctx, w.ctxCancel, onDisconnectedFn, initialHandshake) + w.waitGroup.Add(1) + go w.periodicHandshakeCheck(ctx, w.ctxCancel, onDisconnectedFn, initialHandshake) } +// DisableWgWatcher stops the WireGuard watcher and wait for the watcher to exit func (w *WGWatcher) DisableWgWatcher() { w.ctxLock.Lock() defer w.ctxLock.Unlock() @@ -71,11 +76,14 @@ func (w *WGWatcher) DisableWgWatcher() { w.log.Debugf("disable WireGuard watcher") w.ctxCancel() + w.ctxCancel = nil + w.waitGroup.Wait() } // wgStateCheck help to check the state of the WireGuard handshake and relay connection func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) { w.log.Debugf("WireGuard watcher started") + defer w.waitGroup.Done() timer := time.NewTimer(wgHandshakeOvertime) defer timer.Stop() From 961a942339144fe0ab3c26280fd0ab01d1ff1923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 30 Jan 2025 14:21:36 +0100 Subject: [PATCH 06/13] Code cleaning in callbacks --- client/internal/peer/conn.go | 28 +++++++++------------------- client/internal/peer/worker_ice.go | 15 +++++---------- client/internal/peer/worker_relay.go | 25 ++++++++++--------------- 3 files changed, 24 insertions(+), 44 deletions(-) diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index b8cb2582fb9..c24ee8dd431 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -135,21 +135,11 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu semaphore: semaphore, } - rFns := WorkerRelayCallbacks{ - OnConnReady: conn.relayConnectionIsReady, - OnDisconnected: conn.onWorkerRelayStateDisconnected, - } - - wFns := WorkerICECallbacks{ - OnConnReady: conn.iCEConnectionIsReady, - OnStatusChanged: conn.onWorkerICEStateDisconnected, - } - ctrl := isController(config) - conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns) + conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager) relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() - conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns) + conn.workerICE, err = NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally) if err != nil { return nil, err } @@ -304,7 +294,7 @@ func (conn *Conn) GetKey() string { } // configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected -func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) { +func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) { conn.mu.Lock() defer conn.mu.Unlock() @@ -376,7 +366,7 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon } // todo review to make sense to handle connecting and disconnected status also? -func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { +func (conn *Conn) onICEStateDisconnected() { conn.mu.Lock() defer conn.mu.Unlock() @@ -384,7 +374,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { return } - conn.log.Tracef("ICE connection state changed to %s", newState) + conn.log.Tracef("ICE connection state changed to disconnected") if conn.wgProxyICE != nil { if err := conn.wgProxyICE.CloseConn(); err != nil { @@ -404,8 +394,8 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { conn.currentConnPriority = connPriorityRelay } - changed := conn.statusICE.Get() != newState && newState != StatusConnecting - conn.statusICE.Set(newState) + changed := conn.statusICE.Get() != StatusDisconnected + conn.statusICE.Set(StatusDisconnected) conn.guard.SetICEConnDisconnected(changed) @@ -422,7 +412,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { } } -func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { +func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) { conn.mu.Lock() defer conn.mu.Unlock() @@ -474,7 +464,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr) } -func (conn *Conn) onWorkerRelayStateDisconnected() { +func (conn *Conn) onRelayDisconnected() { conn.mu.Lock() defer conn.mu.Unlock() diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 00831849295..244028bdd3e 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -31,20 +31,15 @@ type ICEConnInfo struct { RelayedOnLocal bool } -type WorkerICECallbacks struct { - OnConnReady func(ConnPriority, ICEConnInfo) - OnStatusChanged func(ConnStatus) -} - type WorkerICE struct { ctx context.Context log *log.Entry config ConnConfig + conn *Conn signaler *Signaler iFaceDiscover stdnet.ExternalIFaceDiscover statusRecorder *Status hasRelayOnLocally bool - conn WorkerICECallbacks agent *ice.Agent muxAgent sync.Mutex @@ -60,16 +55,16 @@ type WorkerICE struct { lastKnownState ice.ConnectionState } -func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) { +func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) { w := &WorkerICE{ ctx: ctx, log: log, config: config, + conn: conn, signaler: signaler, iFaceDiscover: ifaceDiscover, statusRecorder: statusRecorder, hasRelayOnLocally: hasRelayOnLocally, - conn: callBacks, } localUfrag, localPwd, err := icemaker.GenerateICECredentials() @@ -155,7 +150,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { RelayedOnLocal: isRelayCandidate(pair.Local), } w.log.Debugf("on ICE conn read to use ready") - go w.conn.OnConnReady(selectedPriority(pair), ci) + go w.conn.onICEConnectionIsReady(selectedPriority(pair), ci) } // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. @@ -220,7 +215,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected: if w.lastKnownState != ice.ConnectionStateDisconnected { w.lastKnownState = ice.ConnectionStateDisconnected - w.conn.OnStatusChanged(StatusDisconnected) + w.conn.onICEStateDisconnected() } w.closeAgent(agentCancel) default: diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index d2df8cd360e..56c19cd1e3c 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -18,17 +18,12 @@ type RelayConnInfo struct { rosenpassAddr string } -type WorkerRelayCallbacks struct { - OnConnReady func(RelayConnInfo) - OnDisconnected func() -} - type WorkerRelay struct { log *log.Entry isController bool config ConnConfig + conn *Conn relayManager relayClient.ManagerService - callBacks WorkerRelayCallbacks relayedConn net.Conn relayLock sync.Mutex @@ -38,13 +33,13 @@ type WorkerRelay struct { wgWatcher *WGWatcher } -func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay { +func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager relayClient.ManagerService) *WorkerRelay { r := &WorkerRelay{ log: log, isController: ctrl, config: config, + conn: conn, relayManager: relayManager, - callBacks: callbacks, wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key), } return r @@ -81,7 +76,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.relayedConn = relayedConn w.relayLock.Unlock() - err = w.relayManager.AddCloseListener(srv, w.onRelayMGDisconnected) + err = w.relayManager.AddCloseListener(srv, w.onRelayClientDisconnected) if err != nil { log.Errorf("failed to add close listener: %s", err) _ = relayedConn.Close() @@ -89,7 +84,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { } w.log.Debugf("peer conn opened via Relay: %s", srv) - go w.callBacks.OnConnReady(RelayConnInfo{ + go w.conn.onRelayConnectionIsReady(RelayConnInfo{ relayedConn: relayedConn, rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, rosenpassAddr: remoteOfferAnswer.RosenpassAddr, @@ -97,7 +92,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { } func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) { - w.wgWatcher.EnableWgWatcher(ctx, w.disconnected) + w.wgWatcher.EnableWgWatcher(ctx, w.onWGDisconnected) } func (w *WorkerRelay) DisableWgWatcher() { @@ -128,12 +123,12 @@ func (w *WorkerRelay) CloseConn() { } } -func (w *WorkerRelay) disconnected() { +func (w *WorkerRelay) onWGDisconnected() { w.relayLock.Lock() _ = w.relayedConn.Close() w.relayLock.Unlock() - w.callBacks.OnDisconnected() + w.conn.onRelayDisconnected() } func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool { @@ -150,7 +145,7 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st return remoteRelayAddress } -func (w *WorkerRelay) onRelayMGDisconnected() { +func (w *WorkerRelay) onRelayClientDisconnected() { w.wgWatcher.DisableWgWatcher() - go w.callBacks.OnDisconnected() + go w.conn.onRelayDisconnected() } From dd13b8f27eabb649952754071e5165515887d3ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 31 Jan 2025 10:32:21 +0100 Subject: [PATCH 07/13] Code cleaning --- client/internal/peer/conn.go | 9 +++++--- client/internal/peer/guard/guard.go | 36 ++++++++++------------------- client/internal/peer/worker_ice.go | 2 +- 3 files changed, 19 insertions(+), 28 deletions(-) diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index c24ee8dd431..501db65d97c 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -395,10 +395,11 @@ func (conn *Conn) onICEStateDisconnected() { } changed := conn.statusICE.Get() != StatusDisconnected + if changed { + conn.guard.SetICEConnDisconnected() + } conn.statusICE.Set(StatusDisconnected) - conn.guard.SetICEConnDisconnected(changed) - peerState := State{ PubKey: conn.config.Key, ConnStatus: conn.evalStatus(), @@ -487,8 +488,10 @@ func (conn *Conn) onRelayDisconnected() { } changed := conn.statusRelay.Get() != StatusDisconnected + if changed { + conn.guard.SetRelayedConnDisconnected() + } conn.statusRelay.Set(StatusDisconnected) - conn.guard.SetRelayedConnDisconnected(changed) peerState := State{ PubKey: conn.config.Key, diff --git a/client/internal/peer/guard/guard.go b/client/internal/peer/guard/guard.go index bf3527a6264..1fc2b4a4a90 100644 --- a/client/internal/peer/guard/guard.go +++ b/client/internal/peer/guard/guard.go @@ -29,8 +29,8 @@ type Guard struct { isConnectedOnAllWay isConnectedFunc timeout time.Duration srWatcher *SRWatcher - relayedConnDisconnected chan bool - iCEConnDisconnected chan bool + relayedConnDisconnected chan struct{} + iCEConnDisconnected chan struct{} } func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard { @@ -41,8 +41,8 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc, isConnectedOnAllWay: isConnectedFn, timeout: timeout, srWatcher: srWatcher, - relayedConnDisconnected: make(chan bool, 1), - iCEConnDisconnected: make(chan bool, 1), + relayedConnDisconnected: make(chan struct{}, 1), + iCEConnDisconnected: make(chan struct{}, 1), } } @@ -54,16 +54,16 @@ func (g *Guard) Start(ctx context.Context) { } } -func (g *Guard) SetRelayedConnDisconnected(changed bool) { +func (g *Guard) SetRelayedConnDisconnected() { select { - case g.relayedConnDisconnected <- changed: + case g.relayedConnDisconnected <- struct{}{}: default: } } -func (g *Guard) SetICEConnDisconnected(changed bool) { +func (g *Guard) SetICEConnDisconnected() { select { - case g.iCEConnDisconnected <- changed: + case g.iCEConnDisconnected <- struct{}{}: default: } } @@ -96,19 +96,13 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) { g.triggerOfferSending() } - case changed := <-g.relayedConnDisconnected: - if !changed { - continue - } + case <-g.relayedConnDisconnected: g.log.Debugf("Relay connection changed, reset reconnection ticker") ticker.Stop() ticker = g.prepareExponentTicker(ctx) tickerChannel = ticker.C - case changed := <-g.iCEConnDisconnected: - if !changed { - continue - } + case <-g.iCEConnDisconnected: g.log.Debugf("ICE connection changed, reset reconnection ticker") ticker.Stop() ticker = g.prepareExponentTicker(ctx) @@ -138,16 +132,10 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) { g.log.Infof("start listen for reconnect events...") for { select { - case changed := <-g.relayedConnDisconnected: - if !changed { - continue - } + case <-g.relayedConnDisconnected: g.log.Debugf("Relay connection changed, triggering reconnect") g.triggerOfferSending() - case changed := <-g.iCEConnDisconnected: - if !changed { - continue - } + case <-g.iCEConnDisconnected: g.log.Debugf("ICE state changed, try to send new offer") g.triggerOfferSending() case <-srReconnectedChan: diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 244028bdd3e..7dd84a98e56 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -149,7 +149,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { Relayed: isRelayed(pair), RelayedOnLocal: isRelayCandidate(pair.Local), } - w.log.Debugf("on ICE conn read to use ready") + w.log.Debugf("on ICE conn is ready to use") go w.conn.onICEConnectionIsReady(selectedPriority(pair), ci) } From a747f0bd3b58ddc9e5417872a3024e29f36cad30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Tue, 4 Feb 2025 13:50:47 +0100 Subject: [PATCH 08/13] Fix context cancellation check in EnableWgWatcher function --- client/internal/peer/wg_watcher.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index ff94e352d17..c2d0c6d1223 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -28,6 +28,7 @@ type WGWatcher struct { wgIfaceStater WGInterfaceStater peerKey string + ctx context.Context ctxCancel context.CancelFunc ctxLock sync.Mutex waitGroup sync.WaitGroup @@ -47,12 +48,13 @@ func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn w.ctxLock.Lock() defer w.ctxLock.Unlock() - if w.ctxCancel != nil { + if w.ctx != nil && w.ctx.Err() == nil { w.log.Errorf("WireGuard watcher already enabled") return } ctx, ctxCancel := context.WithCancel(parentCtx) + w.ctx = ctx w.ctxCancel = ctxCancel initialHandshake, err := w.wgState() @@ -61,7 +63,7 @@ func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn } w.waitGroup.Add(1) - go w.periodicHandshakeCheck(ctx, w.ctxCancel, onDisconnectedFn, initialHandshake) + go w.periodicHandshakeCheck(ctx, ctxCancel, onDisconnectedFn, initialHandshake) } // DisableWgWatcher stops the WireGuard watcher and wait for the watcher to exit From 47aa0c2140d4337c0ad31f0b9cdda4536fbefdba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Tue, 4 Feb 2025 14:08:06 +0100 Subject: [PATCH 09/13] Add logs --- client/internal/peer/wg_watcher.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index c2d0c6d1223..430b99182a3 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -101,7 +101,9 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex onDisconnectedFn() return } - timer.Reset(time.Until(handshake.Add(checkPeriod))) + resetTime := time.Until(handshake.Add(checkPeriod)) + w.log.Debugf("WireGuard watcher reset timer: %v", resetTime) + timer.Reset(resetTime) lastHandshake = *handshake case <-ctx.Done(): w.log.Debugf("WireGuard watcher stopped") @@ -125,7 +127,7 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { return nil, false } - w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake) + w.log.Debugf("previous handshake, handshake: %v, %v", lastHandshake, handshake) if handshake.Equal(lastHandshake) { w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) From 8dafb61cf731fdd837ab8a8ea0227bb54a3a4dab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 5 Feb 2025 09:43:58 +0100 Subject: [PATCH 10/13] After suspend immediately timeout the wg watcher --- client/internal/peer/wg_watcher.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index 430b99182a3..3ae010be707 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -15,7 +15,7 @@ const ( ) var ( - wgHandshakeOvertime = 30 * time.Second + wgHandshakeOvertime = 30 * time.Second // allowed delay in network checkPeriod = wgHandshakePeriod + wgHandshakeOvertime ) @@ -101,10 +101,12 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex onDisconnectedFn() return } + lastHandshake = *handshake + resetTime := time.Until(handshake.Add(checkPeriod)) - w.log.Debugf("WireGuard watcher reset timer: %v", resetTime) timer.Reset(resetTime) - lastHandshake = *handshake + + w.log.Debugf("WireGuard watcher reset timer: %v", resetTime) case <-ctx.Done(): w.log.Debugf("WireGuard watcher stopped") return @@ -120,6 +122,7 @@ func (w *WGWatcher) wgState() (time.Time, error) { return wgState.LastHandshake, nil } +// handshakeCheck checks the WireGuard handshake and return the new handshake time if it is different from the previous one func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { handshake, err := w.wgState() if err != nil { @@ -134,5 +137,11 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { return nil, false } + // in case if the machine is suspended, the handshake time will be in the past + if handshake.Add(checkPeriod).Before(time.Now()) { + w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) + return nil, false + } + return &handshake, true } From c1f67dfa6adf6cd8d9a992847d4dd491317ef071 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 5 Feb 2025 10:45:56 +0100 Subject: [PATCH 11/13] Add edge case error handling --- client/internal/peer/wg_watcher.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index 3ae010be707..7f633820383 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -114,14 +114,6 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex } } -func (w *WGWatcher) wgState() (time.Time, error) { - wgState, err := w.wgIfaceStater.GetStats(w.peerKey) - if err != nil { - return time.Time{}, err - } - return wgState.LastHandshake, nil -} - // handshakeCheck checks the WireGuard handshake and return the new handshake time if it is different from the previous one func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { handshake, err := w.wgState() @@ -132,6 +124,7 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { w.log.Debugf("previous handshake, handshake: %v, %v", lastHandshake, handshake) + // the current know handshake did not change if handshake.Equal(lastHandshake) { w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) return nil, false @@ -143,5 +136,19 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { return nil, false } + // error handling for handshake time in the future + if handshake.After(time.Now()) { + w.log.Infof("WireGuard handshake is in the future, closing relay connection: %v", handshake) + return nil, false + } + return &handshake, true } + +func (w *WGWatcher) wgState() (time.Time, error) { + wgState, err := w.wgIfaceStater.GetStats(w.peerKey) + if err != nil { + return time.Time{}, err + } + return wgState.LastHandshake, nil +} From fb7fcbd17cbad3a62007db87f7d3c9416c2678c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 7 Feb 2025 22:14:07 +0100 Subject: [PATCH 12/13] Fix race between ICE TURN and Relayed connection --- client/internal/peer/conn.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 501db65d97c..7caafa53d31 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -32,8 +32,8 @@ const ( defaultWgKeepAlive = 25 * time.Second connPriorityRelay ConnPriority = 1 - connPriorityICETurn ConnPriority = 1 - connPriorityICEP2P ConnPriority = 2 + connPriorityICETurn ConnPriority = 2 + connPriorityICEP2P ConnPriority = 3 ) type WgConfig struct { @@ -66,14 +66,6 @@ type ConnConfig struct { ICEConfig icemaker.Config } -type WorkerCallbacks struct { - OnRelayReadyCallback func(info RelayConnInfo) - OnRelayStatusChanged func(ConnStatus) - - OnICEConnReadyCallback func(ConnPriority, ICEConnInfo) - OnICEStatusChanged func(ConnStatus) -} - type Conn struct { log *log.Entry mu sync.Mutex From 123c5cf4402c78cf61863b4f5ee784beef849d69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 10 Feb 2025 09:10:24 +0100 Subject: [PATCH 13/13] Change logging levels --- client/internal/peer/wg_watcher.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index 7f633820383..6670c6517e7 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -59,7 +59,7 @@ func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn initialHandshake, err := w.wgState() if err != nil { - w.log.Warnf("failed to read wg stats: %v", err) + w.log.Warnf("failed to read initial wg stats: %v", err) } w.waitGroup.Add(1) @@ -84,7 +84,7 @@ func (w *WGWatcher) DisableWgWatcher() { // wgStateCheck help to check the state of the WireGuard handshake and relay connection func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) { - w.log.Debugf("WireGuard watcher started") + w.log.Infof("WireGuard watcher started") defer w.waitGroup.Done() timer := time.NewTimer(wgHandshakeOvertime) @@ -108,7 +108,7 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex w.log.Debugf("WireGuard watcher reset timer: %v", resetTime) case <-ctx.Done(): - w.log.Debugf("WireGuard watcher stopped") + w.log.Infof("WireGuard watcher stopped") return } } @@ -122,23 +122,23 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { return nil, false } - w.log.Debugf("previous handshake, handshake: %v, %v", lastHandshake, handshake) + w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake) // the current know handshake did not change if handshake.Equal(lastHandshake) { - w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) + w.log.Warnf("WireGuard handshake timed out, closing relay connection: %v", handshake) return nil, false } // in case if the machine is suspended, the handshake time will be in the past if handshake.Add(checkPeriod).Before(time.Now()) { - w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake) + w.log.Warnf("WireGuard handshake timed out, closing relay connection: %v", handshake) return nil, false } // error handling for handshake time in the future if handshake.After(time.Now()) { - w.log.Infof("WireGuard handshake is in the future, closing relay connection: %v", handshake) + w.log.Warnf("WireGuard handshake is in the future, closing relay connection: %v", handshake) return nil, false }