Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Make versioning data limits configurable by namespace #4843

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ type (
ForwarderMaxOutstandingTasks dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters
ForwarderMaxRatePerSecond dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters
ForwarderMaxChildrenPerNode dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters
VersionCompatibleSetLimitPerQueue dynamicconfig.IntPropertyFn
VersionBuildIdLimitPerQueue dynamicconfig.IntPropertyFn
TaskQueueLimitPerBuildId dynamicconfig.IntPropertyFn
VersionCompatibleSetLimitPerQueue dynamicconfig.IntPropertyFnWithNamespaceFilter
VersionBuildIdLimitPerQueue dynamicconfig.IntPropertyFnWithNamespaceFilter
TaskQueueLimitPerBuildId dynamicconfig.IntPropertyFnWithNamespaceFilter
GetUserDataLongPollTimeout dynamicconfig.DurationPropertyFn

// Time to hold a poll request before returning an empty response if there are no tasks
Expand Down Expand Up @@ -183,9 +183,9 @@ func NewConfig(
ForwarderMaxRatePerSecond: dc.GetIntPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingForwarderMaxRatePerSecond, 10),
ForwarderMaxChildrenPerNode: dc.GetIntPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingForwarderMaxChildrenPerNode, 20),
ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.MatchingShutdownDrainDuration, 0*time.Second),
VersionCompatibleSetLimitPerQueue: dc.GetIntProperty(dynamicconfig.VersionCompatibleSetLimitPerQueue, 10),
VersionBuildIdLimitPerQueue: dc.GetIntProperty(dynamicconfig.VersionBuildIdLimitPerQueue, 100),
TaskQueueLimitPerBuildId: dc.GetIntProperty(dynamicconfig.TaskQueuesPerBuildIdLimit, 20),
VersionCompatibleSetLimitPerQueue: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.VersionCompatibleSetLimitPerQueue, 10),
VersionBuildIdLimitPerQueue: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.VersionBuildIdLimitPerQueue, 100),
TaskQueueLimitPerBuildId: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.TaskQueuesPerBuildIdLimit, 20),
GetUserDataLongPollTimeout: dc.GetDurationProperty(dynamicconfig.MatchingGetUserDataLongPollTimeout, 5*time.Minute),

AdminNamespaceToPartitionDispatchRate: dc.GetFloatPropertyFilteredByNamespace(dynamicconfig.AdminMatchingNamespaceToPartitionDispatchRate, 10000),
Expand Down
14 changes: 7 additions & 7 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,10 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
req *matchingservice.UpdateWorkerBuildIdCompatibilityRequest,
) (*matchingservice.UpdateWorkerBuildIdCompatibilityResponse, error) {
namespaceID := namespace.ID(req.GetNamespaceId())
ns, err := e.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return nil, err
}
taskQueueName := req.GetTaskQueue()
taskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
if err != nil {
Expand All @@ -846,7 +850,7 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
switch req.GetOperation().(type) {
case *matchingservice.UpdateWorkerBuildIdCompatibilityRequest_ApplyPublicRequest_:
// Only apply the limit when request is initiated by a user.
updateOptions.TaskQueueLimitPerBuildId = e.config.TaskQueueLimitPerBuildId()
updateOptions.TaskQueueLimitPerBuildId = e.config.TaskQueueLimitPerBuildId(ns.Name().String())
case *matchingservice.UpdateWorkerBuildIdCompatibilityRequest_RemoveBuildIds_:
updateOptions.KnownVersion = req.GetRemoveBuildIds().GetKnownUserDataVersion()
}
Expand All @@ -866,17 +870,13 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
updatedClock,
data.GetVersioningData(),
req.GetApplyPublicRequest().GetRequest(),
e.config.VersionCompatibleSetLimitPerQueue(),
e.config.VersionBuildIdLimitPerQueue(),
e.config.VersionCompatibleSetLimitPerQueue(ns.Name().String()),
e.config.VersionBuildIdLimitPerQueue(ns.Name().String()),
)
if err != nil {
return nil, false, err
}
case *matchingservice.UpdateWorkerBuildIdCompatibilityRequest_RemoveBuildIds_:
ns, err := e.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return nil, false, err
}
versioningData = RemoveBuildIds(
updatedClock,
data.GetVersioningData(),
Expand Down