Skip to content

Commit

Permalink
Always persist guessed set id on unknown build id (#4800)
Browse files Browse the repository at this point in the history
**What changed?**
- In PersistUnknownBuildId, if the build id is already present but the
guessed set id is not a set id of that set, add it to the set
(previously it would leave the data unchanged).
- Get rid of the first unknown operation check in
UpdateWorkerBuildIdCompatibility (unknown operations will error out in
the mutation function anyway).

**Why?**
- The goal of PersistUnknownBuildId is to record that we spooled a task
using a guessed set id. In the propagation lag case,
PersistUnknownBuildId would see that a build id is present and not
modify the data. If the build id was not the first one in its set, then
we'd lose that guessed set id.
- We only need to check for unknown operations once, instead of updating
the options switch with each new operation, we can just update the main
switch in the mutation function.

**How did you test it?**
unit tests
  • Loading branch information
dnr authored and rodrigozhou committed Aug 28, 2023
1 parent c13afdc commit bba21ca
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
2 changes: 0 additions & 2 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,6 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
updateOptions.TaskQueueLimitPerBuildId = e.config.TaskQueueLimitPerBuildId()
case *matchingservice.UpdateWorkerBuildIdCompatibilityRequest_RemoveBuildIds_:
updateOptions.KnownVersion = req.GetRemoveBuildIds().GetKnownUserDataVersion()
default:
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("invalid operation: %v", req.GetOperation()))
}

err = tqMgr.UpdateUserData(ctx, updateOptions, func(data *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) {
Expand Down
32 changes: 20 additions & 12 deletions service/matching/version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,17 @@ func hashBuildId(buildID string) string {
}

func shallowCloneVersioningData(data *persistencespb.VersioningData) *persistencespb.VersioningData {
clone := persistencespb.VersioningData{
VersionSets: make([]*persistencespb.CompatibleVersionSet, len(data.GetVersionSets())),
return &persistencespb.VersioningData{
VersionSets: slices.Clone(data.GetVersionSets()),
}
copy(clone.VersionSets, data.GetVersionSets())
return &clone
}

func shallowCloneVersionSet(set *persistencespb.CompatibleVersionSet) *persistencespb.CompatibleVersionSet {
clone := &persistencespb.CompatibleVersionSet{
SetIds: set.SetIds,
BuildIds: make([]*persistencespb.BuildId, len(set.BuildIds)),
return &persistencespb.CompatibleVersionSet{
SetIds: slices.Clone(set.SetIds),
BuildIds: slices.Clone(set.BuildIds),
BecameDefaultTimestamp: set.BecameDefaultTimestamp,
}
copy(clone.BuildIds, set.BuildIds)
return clone
}

// UpdateVersionSets updates version sets given existing versioning data and an update request. The request is expected
Expand Down Expand Up @@ -487,9 +483,21 @@ func ClearTombstones(versioningData *persistencespb.VersioningData) *persistence
}

func PersistUnknownBuildId(clock hlc.Clock, data *persistencespb.VersioningData, buildId string) *persistencespb.VersioningData {
guessedSetId := hashBuildId(buildId)

if foundSetId, _ := worker_versioning.FindBuildId(data, buildId); foundSetId >= 0 {
// it's already there
return data
// it's already there. make sure its set id is present.
set := data.VersionSets[foundSetId]
if slices.Contains(set.SetIds, guessedSetId) {
return data
}

// if not, add the guessed set id
newSet := shallowCloneVersionSet(set)
newSet.SetIds = append(newSet.SetIds, guessedSetId)
newData := shallowCloneVersioningData(data)
newData.VersionSets[foundSetId] = newSet
return newData
}

// insert unknown build id with zero time so that if merged with any other set, the other
Expand All @@ -498,7 +506,7 @@ func PersistUnknownBuildId(clock hlc.Clock, data *persistencespb.VersioningData,

newData := shallowCloneVersioningData(data)
newData.VersionSets = slices.Insert(newData.VersionSets, 0, &persistencespb.CompatibleVersionSet{
SetIds: []string{hashBuildId(buildId)},
SetIds: []string{guessedSetId},
BuildIds: []*persistencespb.BuildId{{
Id: buildId,
State: persistencespb.STATE_ACTIVE,
Expand Down
31 changes: 31 additions & 0 deletions service/matching/version_sets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,3 +863,34 @@ func TestPersistUnknownBuildId(t *testing.T) {
assert.Equal(t, 1, len(newSet.BuildIds))
assert.Equal(t, "new-build-id", newSet.BuildIds[0].Id)
}

func TestPersistUnknownBuildIdAlreadyThere(t *testing.T) {
t.Parallel()
clock := hlc.Next(hlc.Zero(1), commonclock.NewRealTimeSource())

initial := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{
{
SetIds: []string{hashBuildId("1")},
BuildIds: []*persistencespb.BuildId{mkBuildId("1", clock), mkBuildId("2", clock)},
BecameDefaultTimestamp: &clock,
},
},
}

actual := PersistUnknownBuildId(clock, initial, "1")
assert.Equal(t, initial, actual)

// build id is already there but adds set id
actual = PersistUnknownBuildId(clock, initial, "2")
expected := &persistencespb.VersioningData{
VersionSets: []*persistencespb.CompatibleVersionSet{
{
SetIds: []string{hashBuildId("1"), hashBuildId("2")},
BuildIds: []*persistencespb.BuildId{mkBuildId("1", clock), mkBuildId("2", clock)},
BecameDefaultTimestamp: &clock,
},
},
}
assert.Equal(t, expected, actual)
}

0 comments on commit bba21ca

Please # to comment.