Skip to content

Commit

Permalink
proper exit/handling with immediate cancel() instead of sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
ldemailly committed Feb 13, 2024
1 parent 25972f6 commit d452eb0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
53 changes: 34 additions & 19 deletions mstore/dnsWatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mstore
import (
"context"
"net"
"sync"
"time"

"fortio.org/dflag"
Expand All @@ -14,40 +15,46 @@ var (
peers sets.Set[string]
cancel context.CancelFunc
DNSWatchSleepTime = dflag.New(15*time.Second, "Sleep time between DNS resolution")
wg sync.WaitGroup
)

// Resolve the service name to a list of IPs.
func checkDNS(serviceName string) {
ips, err := net.LookupHost(serviceName)
if err != nil {
log.Errf("Error resolving service %q: %v", serviceName, err)
return
}
newIPs := sets.FromSlice(ips)
log.LogVf("Resolved %q to %v", serviceName, newIPs)
// If the list changes, update the peers list
if newIPs.Equals(peers) {
log.LogVf("No change in peers: %v", peers)
return
}
peers = newIPs.Clone()
log.Infof("Updated peers: %v", peers)
_ = Peers.SetV(peers)

Check warning on line 37 in mstore/dnsWatch.go

View check run for this annotation

Codecov / codecov/patch

mstore/dnsWatch.go#L22-L37

Added lines #L22 - L37 were not covered by tests
}

func dnsWatcher(ctx context.Context, serviceName string) {
defer wg.Done()
for {
checkDNS(serviceName) // first time, without waiting or cancel check
select {
case <-ctx.Done():
log.Warnf("DNS Watcher for %q exiting", serviceName)
return
default:
// Resolve the service name to a list of IPs
ips, err := net.LookupHost(serviceName)
if err != nil {
log.Errf("Error resolving service %q: %v", serviceName, err)
time.Sleep(DNSWatchSleepTime.Get()) // Sleep for a minute before resolving again
continue
}
newIPs := sets.FromSlice(ips)
log.LogVf("Resolved %q to %v", serviceName, newIPs)
// If the list changes, update the peers list
if !newIPs.Equals(peers) {
peers = newIPs.Clone()
log.Infof("Updated peers: %v", peers)
_ = Peers.SetV(peers)
} else {
log.LogVf("No change in peers: %v", peers)
}
time.Sleep(DNSWatchSleepTime.Get()) // Sleep for a minute before resolving again
case <-time.After(DNSWatchSleepTime.Get()):
checkDNS(serviceName)

Check warning on line 49 in mstore/dnsWatch.go

View check run for this annotation

Codecov / codecov/patch

mstore/dnsWatch.go#L40-L49

Added lines #L40 - L49 were not covered by tests
}
}
}

func DNSWatcher(serviceName string) context.CancelFunc {
ctx := context.Background()
ctx, cancel = context.WithCancel(ctx)
wg.Add(1)
go dnsWatcher(ctx, serviceName)
return cancel

Check warning on line 59 in mstore/dnsWatch.go

View check run for this annotation

Codecov / codecov/patch

mstore/dnsWatch.go#L54-L59

Added lines #L54 - L59 were not covered by tests
}
Expand All @@ -58,3 +65,11 @@ func StartDNSWatch(serviceName string) {
}
cancel = DNSWatcher(serviceName)

Check warning on line 66 in mstore/dnsWatch.go

View check run for this annotation

Codecov / codecov/patch

mstore/dnsWatch.go#L62-L66

Added lines #L62 - L66 were not covered by tests
}

func StopDNSWatch() {
if cancel != nil {
cancel()
cancel = nil
wg.Wait()
}

Check warning on line 74 in mstore/dnsWatch.go

View check run for this annotation

Codecov / codecov/patch

mstore/dnsWatch.go#L69-L74

Added lines #L69 - L74 were not covered by tests
}
5 changes: 5 additions & 0 deletions mstore/mstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,8 @@ func Start() {
log.Infof("memstore Start()")
// peerChange does get call for even initial flag value
}

func Stop() {
StopDNSWatch()
log.Infof("memstore stopped")

Check warning on line 52 in mstore/mstore.go

View check run for this annotation

Codecov / codecov/patch

mstore/mstore.go#L50-L52

Added lines #L50 - L52 were not covered by tests
}
1 change: 1 addition & 0 deletions proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ func main() {
log.Infof("Starting memstore prototype...")

Check warning on line 19 in proto.go

View check run for this annotation

Codecov / codecov/patch

proto.go#L19

Added line #L19 was not covered by tests
mstore.Start()
scli.UntilInterrupted()
mstore.Stop()

Check warning on line 22 in proto.go

View check run for this annotation

Codecov / codecov/patch

proto.go#L22

Added line #L22 was not covered by tests
}

0 comments on commit d452eb0

Please # to comment.