Skip to content

Commit

Permalink
Improvements to lock handling.
Browse files Browse the repository at this point in the history
Closes #7
Closes #20
  • Loading branch information
perk11 committed Feb 3, 2025
1 parent 577b038 commit 3655c9b
Showing 1 changed file with 46 additions and 20 deletions.
66 changes: 46 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"syscall"
"time"
)
import _ "net/http/pprof"

type Config struct {
ShutDownAfterInactivitySeconds time.Duration
Expand Down Expand Up @@ -75,32 +76,43 @@ type ModelContainingRequest struct {
Model string `json:"model"`
}

func (rm ResourceManager) getRunningService(name string) RunningService {
rm.serviceMutex.Lock()
defer rm.serviceMutex.Unlock()
return rm.runningServices[name]
// maybeGetRunningServiceNoLock Only use if serviceMutex is already locked.
func (rm ResourceManager) maybeGetRunningServiceNoLock(name string) (RunningService, bool) {
rs, ok := rm.runningServices[name]
return rs, ok
}

func (rm ResourceManager) maybeGetRunningService(name string) (RunningService, bool) {
rm.serviceMutex.Lock()
if interrupted {
rm.serviceMutex.TryLock()
} else {
rm.serviceMutex.Lock()
}
defer rm.serviceMutex.Unlock()
rs, ok := rm.runningServices[name]
return rs, ok
return rm.maybeGetRunningServiceNoLock(name)
}

func (rm ResourceManager) storeRunningService(name string, rs RunningService) {
rm.serviceMutex.Lock()
defer rm.serviceMutex.Unlock()
rm.storeRunningServiceNoLock(name, rs)
}

// storeRunningServiceNoLock Only use if serviceMutex is already locked.
func (rm ResourceManager) storeRunningServiceNoLock(name string, rs RunningService) {
rm.runningServices[name] = rs
}

func (rm ResourceManager) incrementConnection(name string, count int) {
rm.serviceMutex.Lock()
defer rm.serviceMutex.Unlock()

runningService := resourceManager.runningServices[name]
runningService, ok := rm.maybeGetRunningServiceNoLock(name)
if !ok {
log.Printf("[%s] Warning: Tried to increment connection numbers but couldn't get the running service", name)
return
}
runningService.activeConnections += count
resourceManager.runningServices[name] = runningService
rm.storeRunningServiceNoLock(name, runningService)
}

func (rm ResourceManager) createRunningService(serviceConfig ServiceConfig) RunningService {
Expand All @@ -121,6 +133,9 @@ var (
)

func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)

Expand Down Expand Up @@ -524,14 +539,12 @@ func startService(serviceConfig ServiceConfig) (net.Conn, error) {
return
}
resourceManager.serviceMutex.Lock()
defer resourceManager.serviceMutex.Unlock()

if !canBeStopped(serviceConfig.Name) {
log.Printf("[%s] Idle timeout %s reached, but service is busy, resetting idle time", serviceConfig.Name, idleTimeout)
runningService.idleTimer.Reset(getIdleTimeout(serviceConfig))
return
}

resourceManager.serviceMutex.Unlock()
log.Printf("[%s] Idle timeout %s reached, stopping service", serviceConfig.Name, idleTimeout)
stopService(serviceConfig.Name)
})
Expand Down Expand Up @@ -697,7 +710,11 @@ func findFirstMissingResource(resourceRequirements map[string]int, requestingSer
}

func trackServiceLastUsed(serviceConfig ServiceConfig) {
runningService := resourceManager.getRunningService(serviceConfig.Name)
runningService, ok := resourceManager.maybeGetRunningService(serviceConfig.Name)
if !ok {
log.Printf("[%s] Warning, tried to track service usage, but couldn't find it in the list of running services, it was probably stopped", serviceConfig.Name)
return
}
runningService.lastUsed = time.Now()
if runningService.idleTimer != nil {
runningService.idleTimer.Reset(getIdleTimeout(serviceConfig))
Expand All @@ -706,7 +723,12 @@ func trackServiceLastUsed(serviceConfig ServiceConfig) {
}

func canBeStopped(serviceName string) bool {
runningService := resourceManager.runningServices[serviceName]
//Using nolock version since both callers already lock the service mutex
runningService, ok := resourceManager.maybeGetRunningServiceNoLock(serviceName)
if !ok {
log.Printf("[%s] Warning: A check whether service can be stopped failed to find the service in the running services list, it is probably already being stopped. Assuming it can't be stopped", serviceName)
return false
}
if !runningService.manageMutex.TryLock() {
return false
}
Expand Down Expand Up @@ -778,13 +800,17 @@ func forwardConnection(clientConnection net.Conn, serviceConnection net.Conn, se
}

func stopService(serviceName string) {
runningService, ok := resourceManager.maybeGetRunningService(serviceName)
if !ok {
log.Printf("[%s] Warning: Failed to find a service in a list of running services while stopping it, probably multiple stops requested. Stop aborted.", serviceName)
return
}
if interrupted {
//Shouldn't be necessary, but there might be some locks causing issues
resourceManager.runningServices[serviceName].manageMutex.TryLock()
//If the process is being interrupted, we want to stop the service no matter what, even if it's currently locked
runningService.manageMutex.TryLock()
} else {
resourceManager.runningServices[serviceName].manageMutex.Lock()
runningService.manageMutex.Lock()
}
runningService := resourceManager.runningServices[serviceName]
if runningService.idleTimer != nil {
runningService.idleTimer.Stop()
}
Expand Down Expand Up @@ -815,7 +841,7 @@ func stopService(serviceName string) {

releaseResources(runningService.resourceRequirements)
if !interrupted {
resourceManager.runningServices[serviceName].manageMutex.Unlock()
runningService.manageMutex.Unlock()
}
delete(resourceManager.runningServices, serviceName)
}
Expand Down

0 comments on commit 3655c9b

Please # to comment.