Skip to content

Commit 7f32422

Browse files
authored
Merge pull request #48 from deploymenttheory/dev
Dev
2 parents ea689a0 + 88de4fc commit 7f32422

File tree

4 files changed

+318
-606
lines changed

4 files changed

+318
-606
lines changed

errors/http_error_handling.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,3 +130,17 @@ func IsTransientError(resp *http.Response) bool {
130130
}
131131
return resp != nil && transientStatusCodes[resp.StatusCode]
132132
}
133+
134+
// IsRetryableStatusCode checks if the provided HTTP status code is considered retryable.
135+
func IsRetryableStatusCode(statusCode int) bool {
136+
retryableStatusCodes := map[int]bool{
137+
http.StatusTooManyRequests: true, // 429
138+
http.StatusInternalServerError: true, // 500
139+
http.StatusBadGateway: true, // 502
140+
http.StatusServiceUnavailable: true, // 503
141+
http.StatusGatewayTimeout: true, // 504
142+
}
143+
144+
_, retryable := retryableStatusCodes[statusCode]
145+
return retryable
146+
}

httpclient/httpclient_concurrency_management.go

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// http_concurrency_management.go
22
// package httpclient provides utilities to manage HTTP client interactions, including concurrency control.
3-
// The Concurrency Manager ensures no more than a certain number of concurrent requests (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore
3+
// The Concurrency Manager ensures no more than a certain number of concurrent requests
4+
// (e.g., 5 for Jamf Pro) are sent at the same time. This is managed using a semaphore
45
package httpclient
56

67
import (
@@ -42,23 +43,47 @@ type requestIDKey struct{}
4243

4344
// Functions:
4445

45-
// AcquireConcurrencyToken attempts to acquire a token from the ConcurrencyManager to manage the number of concurrent requests.
46-
// This function is designed to ensure that the HTTP client adheres to predefined concurrency limits, preventing an excessive number of simultaneous requests.
47-
// It creates a new context with a timeout to avoid indefinite blocking in case the concurrency limit is reached.
48-
// Upon successfully acquiring a token, it records the time taken to acquire the token and updates performance metrics accordingly.
49-
// The function then adds the acquired request ID to the context, which can be used for tracking and managing individual requests.
46+
// NewConcurrencyManager initializes a new ConcurrencyManager with the given
47+
// concurrency limit, logger, and debug mode. The ConcurrencyManager ensures
48+
// no more than a certain number of concurrent requests are made.
49+
// It uses a semaphore to control concurrency.
50+
func NewConcurrencyManager(limit int, logger logger.Logger, debugMode bool) *ConcurrencyManager {
51+
return &ConcurrencyManager{
52+
sem: make(chan struct{}, limit),
53+
logger: logger,
54+
debugMode: debugMode,
55+
AcquisitionTimes: []time.Duration{},
56+
}
57+
}
58+
59+
// AcquireConcurrencyToken attempts to acquire a token from the ConcurrencyManager
60+
// to manage the number of concurrent requests. This function is designed to ensure
61+
// that the HTTP client adheres to predefined concurrency limits, preventing an
62+
// excessive number of simultaneous requests. It creates a new context with a timeout
63+
// to avoid indefinite blocking in case the concurrency limit is reached.
64+
// Upon successfully acquiring a token, it records the time taken to acquire the
65+
// token and updates performance metrics accordingly. The function then adds the
66+
// acquired request ID to the context, which can be used for tracking and managing
67+
// individual requests.
5068
//
5169
// Parameters:
52-
// - ctx: The parent context from which the new context with timeout will be derived. This allows for proper request cancellation and timeout handling.
53-
// - log: An instance of a logger (conforming to the logger.Logger interface), used to log relevant information and errors during the token acquisition process.
70+
// - ctx: The parent context from which the new context with timeout will be derived.
71+
// This allows for proper request cancellation and timeout handling.
72+
//
73+
// - log: An instance of a logger (conforming to the logger.Logger interface), used
74+
// to log relevant information and errors during the token acquisition process.
5475
//
5576
// Returns:
56-
// - A new context containing the acquired request ID, which should be passed to subsequent operations requiring concurrency control.
57-
// - An error if the token could not be acquired within the timeout period or due to any other issues encountered by the ConcurrencyManager.
77+
// - A new context containing the acquired request ID, which should be passed to
78+
// subsequent operations requiring concurrency control.
79+
//
80+
// - An error if the token could not be acquired within the timeout period or due to
81+
// any other issues encountered by the ConcurrencyManager.
5882
//
5983
// Usage:
60-
// This function should be called before making an HTTP request that needs to be controlled for concurrency.
61-
// The returned context should be used for the HTTP request to ensure it is associated with the acquired concurrency token.
84+
// This function should be called before making an HTTP request that needs to be
85+
// controlled for concurrency. The returned context should be used for the HTTP
86+
// request to ensure it is associated with the acquired concurrency token.
6287
func (c *Client) AcquireConcurrencyToken(ctx context.Context, log logger.Logger) (context.Context, error) {
6388
// Measure the token acquisition start time
6489
tokenAcquisitionStart := time.Now()
@@ -104,18 +129,6 @@ func (c *Client) updatePerformanceMetrics(duration time.Duration) {
104129
c.PerfMetrics.TotalRequests++
105130
}
106131

107-
// NewConcurrencyManager initializes a new ConcurrencyManager with the given concurrency limit, logger, and debug mode.
108-
// The ConcurrencyManager ensures no more than a certain number of concurrent requests are made.
109-
// It uses a semaphore to control concurrency.
110-
func NewConcurrencyManager(limit int, logger logger.Logger, debugMode bool) *ConcurrencyManager {
111-
return &ConcurrencyManager{
112-
sem: make(chan struct{}, limit),
113-
logger: logger,
114-
debugMode: debugMode,
115-
AcquisitionTimes: []time.Duration{},
116-
}
117-
}
118-
119132
// Min returns the smaller of the two integers.
120133
func Min(a, b int) int {
121134
if a < b {
@@ -179,8 +192,9 @@ func (c *ConcurrencyManager) Release(requestID uuid.UUID) {
179192

180193
//------ Metric-related Functions:
181194

182-
// AverageAcquisitionTime computes the average time taken to acquire a token from the semaphore.
183-
// It helps in understanding the contention for tokens and can be used to adjust concurrency limits.
195+
// AverageAcquisitionTime computes the average time taken to acquire a token
196+
// from the semaphore. It helps in understanding the contention for tokens
197+
// and can be used to adjust concurrency limits.
184198
func (c *ConcurrencyManager) AverageAcquisitionTime() time.Duration {
185199
c.lock.Lock()
186200
defer c.lock.Unlock()
@@ -196,8 +210,10 @@ func (c *ConcurrencyManager) AverageAcquisitionTime() time.Duration {
196210
return totalTime / time.Duration(len(c.AcquisitionTimes))
197211
}
198212

199-
// HistoricalAverageAcquisitionTime computes the average time taken to acquire a token from the semaphore over a historical period (e.g., the last 5 minutes).
200-
// It helps in understanding the historical contention for tokens and can be used to adjust concurrency limits.
213+
// HistoricalAverageAcquisitionTime computes the average time taken to acquire
214+
// a token from the semaphore over a historical period (e.g., the last 5 minutes).
215+
// It helps in understanding the historical contention for tokens and can be used
216+
// to adjust concurrency limits.
201217
func (c *ConcurrencyManager) HistoricalAverageAcquisitionTime() time.Duration {
202218
c.lock.Lock()
203219
defer c.lock.Unlock()
@@ -218,9 +234,11 @@ func (c *ConcurrencyManager) HistoricalAverageAcquisitionTime() time.Duration {
218234

219235
//------ Concurrency Adjustment Functions:
220236

221-
// AdjustConcurrencyLimit dynamically modifies the maximum concurrency limit based on the newLimit provided.
222-
// This function helps in adjusting the concurrency limit in real-time based on observed system performance and other metrics.
223-
// It transfers the tokens from the old semaphore to the new one, ensuring that there's no loss of tokens during the transition.
237+
// AdjustConcurrencyLimit dynamically modifies the maximum concurrency limit
238+
// based on the newLimit provided. This function helps in adjusting the concurrency
239+
// limit in real-time based on observed system performance and other metrics. It
240+
// transfers the tokens from the old semaphore to the new one, ensuring that there's
241+
// no loss of tokens during the transition.
224242
func (c *ConcurrencyManager) AdjustConcurrencyLimit(newLimit int) {
225243
c.lock.Lock()
226244
defer c.lock.Unlock()
@@ -240,9 +258,10 @@ func (c *ConcurrencyManager) AdjustConcurrencyLimit(newLimit int) {
240258
c.sem = newSem
241259
}
242260

243-
// AdjustConcurrencyBasedOnMetrics evaluates the current metrics and adjusts the concurrency limit if required.
244-
// It checks metrics like average token acquisition time and decides on a new concurrency limit.
245-
// The method ensures that the new limit respects the minimum and maximum allowed concurrency bounds.
261+
// AdjustConcurrencyBasedOnMetrics evaluates the current metrics and adjusts the
262+
// concurrency limit if required. It checks metrics like average token acquisition
263+
// time and decides on a new concurrency limit. The method ensures that the new
264+
// limit respects the minimum and maximum allowed concurrency bounds.
246265
func (c *Client) AdjustConcurrencyBasedOnMetrics() {
247266
// Get average acquisition time
248267
avgAcquisitionTime := c.ConcurrencyMgr.AverageAcquisitionTime()

0 commit comments

Comments
 (0)