diff --git a/common/namespace/registry.go b/common/namespace/registry.go index 1326697131c..57325464b50 100644 --- a/common/namespace/registry.go +++ b/common/namespace/registry.go @@ -129,8 +129,8 @@ type ( // Registry provides access to Namespace objects by name or by ID. Registry interface { common.Daemon - RegisterNamespaceChangeCallback(shard int32, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn) - UnregisterNamespaceChangeCallback(shard int32) + RegisterNamespaceChangeCallback(listenerID string, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn) + UnregisterNamespaceChangeCallback(listenerID string) GetNamespace(name Name) (*Namespace, error) GetNamespaceByID(id ID) (*Namespace, error) GetNamespaceID(name Name) (ID, error) @@ -162,8 +162,8 @@ type ( // cacheLock.Lock() (the other lock in this struct, above) while holding // this lock or you risk a deadlock. callbackLock sync.Mutex - prepareCallbacks map[int32]PrepareCallbackFn - callbacks map[int32]CallbackFn + prepareCallbacks map[string]PrepareCallbackFn + callbacks map[string]CallbackFn } ) @@ -184,8 +184,8 @@ func NewRegistry( logger: logger, cacheNameToID: cache.New(cacheMaxSize, &cacheOpts), cacheByID: cache.New(cacheMaxSize, &cacheOpts), - prepareCallbacks: make(map[int32]PrepareCallbackFn), - callbacks: make(map[int32]CallbackFn), + prepareCallbacks: make(map[string]PrepareCallbackFn), + callbacks: make(map[string]CallbackFn), } reg.lastRefreshTime.Store(time.Time{}) return reg @@ -244,15 +244,15 @@ func (r *registry) getAllNamespace() map[ID]*Namespace { // callback functions MUST NOT call back into this registry instance, either to // unregister themselves or to look up Namespaces. func (r *registry) RegisterNamespaceChangeCallback( - shard int32, + listenerID string, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn, ) { r.callbackLock.Lock() - r.prepareCallbacks[shard] = prepareCallback - r.callbacks[shard] = callback + r.prepareCallbacks[listenerID] = prepareCallback + r.callbacks[listenerID] = callback r.callbackLock.Unlock() // this section is trying to make the shard catch up with namespace changes @@ -281,13 +281,13 @@ func (r *registry) RegisterNamespaceChangeCallback( // UnregisterNamespaceChangeCallback delete a namespace failover callback func (r *registry) UnregisterNamespaceChangeCallback( - shard int32, + listenerID string, ) { r.callbackLock.Lock() defer r.callbackLock.Unlock() - delete(r.prepareCallbacks, shard) - delete(r.callbacks, shard) + delete(r.prepareCallbacks, listenerID) + delete(r.callbacks, listenerID) } // GetNamespace retrieves the information from the cache if it exists, otherwise retrieves the information from metadata diff --git a/common/namespace/registry_mock.go b/common/namespace/registry_mock.go index b3bcb8a7f9f..21f683eab83 100644 --- a/common/namespace/registry_mock.go +++ b/common/namespace/registry_mock.go @@ -253,15 +253,15 @@ func (mr *MockRegistryMockRecorder) Refresh() *gomock.Call { } // RegisterNamespaceChangeCallback mocks base method. -func (m *MockRegistry) RegisterNamespaceChangeCallback(shard int32, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn) { +func (m *MockRegistry) RegisterNamespaceChangeCallback(listenerID string, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn) { m.ctrl.T.Helper() - m.ctrl.Call(m, "RegisterNamespaceChangeCallback", shard, initialNotificationVersion, prepareCallback, callback) + m.ctrl.Call(m, "RegisterNamespaceChangeCallback", listenerID, initialNotificationVersion, prepareCallback, callback) } // RegisterNamespaceChangeCallback indicates an expected call of RegisterNamespaceChangeCallback. -func (mr *MockRegistryMockRecorder) RegisterNamespaceChangeCallback(shard, initialNotificationVersion, prepareCallback, callback interface{}) *gomock.Call { +func (mr *MockRegistryMockRecorder) RegisterNamespaceChangeCallback(listenerID, initialNotificationVersion, prepareCallback, callback interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterNamespaceChangeCallback", reflect.TypeOf((*MockRegistry)(nil).RegisterNamespaceChangeCallback), shard, initialNotificationVersion, prepareCallback, callback) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterNamespaceChangeCallback", reflect.TypeOf((*MockRegistry)(nil).RegisterNamespaceChangeCallback), listenerID, initialNotificationVersion, prepareCallback, callback) } // Start mocks base method. @@ -289,13 +289,13 @@ func (mr *MockRegistryMockRecorder) Stop() *gomock.Call { } // UnregisterNamespaceChangeCallback mocks base method. -func (m *MockRegistry) UnregisterNamespaceChangeCallback(shard int32) { +func (m *MockRegistry) UnregisterNamespaceChangeCallback(listenerID string) { m.ctrl.T.Helper() - m.ctrl.Call(m, "UnregisterNamespaceChangeCallback", shard) + m.ctrl.Call(m, "UnregisterNamespaceChangeCallback", listenerID) } // UnregisterNamespaceChangeCallback indicates an expected call of UnregisterNamespaceChangeCallback. -func (mr *MockRegistryMockRecorder) UnregisterNamespaceChangeCallback(shard interface{}) *gomock.Call { +func (mr *MockRegistryMockRecorder) UnregisterNamespaceChangeCallback(listenerID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterNamespaceChangeCallback", reflect.TypeOf((*MockRegistry)(nil).UnregisterNamespaceChangeCallback), shard) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterNamespaceChangeCallback", reflect.TypeOf((*MockRegistry)(nil).UnregisterNamespaceChangeCallback), listenerID) } diff --git a/common/namespace/registry_test.go b/common/namespace/registry_test.go index a1b45ac01a6..cf4450feaf2 100644 --- a/common/namespace/registry_test.go +++ b/common/namespace/registry_test.go @@ -287,7 +287,7 @@ func (s *registrySuite) TestRegisterCallback_CatchUp() { // we are not testing catching up, so make this really large currentNamespaceNotificationVersion := int64(0) s.registry.RegisterNamespaceChangeCallback( - 0, + "0", currentNamespaceNotificationVersion, func() { prepareCallbackInvoked = true @@ -428,7 +428,7 @@ func (s *registrySuite) TestUpdateCache_TriggerCallBack() { // we are not testing catching up, so make this really large currentNamespaceNotificationVersion := int64(9999999) s.registry.RegisterNamespaceChangeCallback( - 0, + "0", currentNamespaceNotificationVersion, func() { prepareCallbackInvoked = true diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 7edaf41c31a..a2b9dc72b60 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -92,6 +92,7 @@ type ( historyEngineImpl struct { status int32 currentClusterName string + namespaceChangeListenerID string shard shard.Context timeSource clock.TimeSource workflowTaskHandler workflowTaskHandlerCallbacks @@ -157,6 +158,7 @@ func NewEngineWithShardContext( historyEngImpl := &historyEngineImpl{ status: common.DaemonStatusInitialized, currentClusterName: currentClusterName, + namespaceChangeListenerID: uuid.New(), shard: shard, clusterMetadata: shard.GetClusterMetadata(), timeSource: shard.GetTimeSource(), @@ -286,7 +288,7 @@ func (e *historyEngineImpl) Stop() { e.replicationTaskProcessorsLock.Unlock() // unset the failover callback - e.shard.GetNamespaceRegistry().UnregisterNamespaceChangeCallback(e.shard.GetShardID()) + e.shard.GetNamespaceRegistry().UnregisterNamespaceChangeCallback(e.namespaceChangeListenerID) } func (e *historyEngineImpl) registerNamespaceFailoverCallback() { @@ -323,7 +325,7 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() { // first set the failover callback e.shard.GetNamespaceRegistry().RegisterNamespaceChangeCallback( - e.shard.GetShardID(), + e.namespaceChangeListenerID, 0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */ func() { for _, queueProcessor := range e.queueProcessors {