diff --git a/.gitignore b/.gitignore index f3d762b4fe5..401c21679ad 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ test_eventsV2 test_eventsV2.log test_eventsV2_xdc test_eventsV2_xdc.log +matching-simulator-output/ # Executables produced by cadence repo /cadence diff --git a/codecov.yml b/codecov.yml index be0c328b48b..94b67901630 100644 --- a/codecov.yml +++ b/codecov.yml @@ -19,7 +19,7 @@ coverage: if_ci_failed: ignore # require the CI to pass before setting the status patch: default: - target: 80% # specify the target coverage for each commit status + target: 75% # specify the target coverage for each commit status # option: "auto" (compare against parent commit or pull request base) # option: "X%" a static target percentage to hit threshold: 0% # allow the coverage drop by x% before marking as failure diff --git a/common/archiver/provider/noop_provider.go b/common/archiver/provider/noop_provider.go new file mode 100644 index 00000000000..3b5b5d4f148 --- /dev/null +++ b/common/archiver/provider/noop_provider.go @@ -0,0 +1,77 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package provider + +import ( + "context" + + "github.com/uber/cadence/common/archiver" +) + +type noOpArchiverProvider struct{} + +func NewNoOpArchiverProvider() ArchiverProvider { + return &noOpArchiverProvider{} +} + +func (*noOpArchiverProvider) RegisterBootstrapContainer( + serviceName string, + historyContainer *archiver.HistoryBootstrapContainer, + visibilityContainter *archiver.VisibilityBootstrapContainer, +) error { + return nil +} + +func (*noOpArchiverProvider) GetHistoryArchiver(scheme, serviceName string) (archiver.HistoryArchiver, error) { + return &noOpHistoryArchiver{}, nil +} + +func (*noOpArchiverProvider) GetVisibilityArchiver(scheme, serviceName string) (archiver.VisibilityArchiver, error) { + return &noOpVisibilityArchiver{}, nil +} + +type noOpHistoryArchiver struct{} + +func (*noOpHistoryArchiver) Archive(context.Context, archiver.URI, *archiver.ArchiveHistoryRequest, ...archiver.ArchiveOption) error { + return nil +} + +func (*noOpHistoryArchiver) Get(context.Context, archiver.URI, *archiver.GetHistoryRequest) (*archiver.GetHistoryResponse, error) { + return &archiver.GetHistoryResponse{}, nil +} + +func (*noOpHistoryArchiver) ValidateURI(archiver.URI) error { + return nil +} + +type noOpVisibilityArchiver struct{} + +func (*noOpVisibilityArchiver) Archive(context.Context, archiver.URI, *archiver.ArchiveVisibilityRequest, ...archiver.ArchiveOption) error { + return nil +} + +func (*noOpVisibilityArchiver) Query(context.Context, archiver.URI, *archiver.QueryVisibilityRequest) (*archiver.QueryVisibilityResponse, error) { + return &archiver.QueryVisibilityResponse{}, nil +} + +func (*noOpVisibilityArchiver) ValidateURI(archiver.URI) error { + return nil +} diff --git a/common/resource/params.go b/common/resource/params.go index e298f6bfe48..523ad2e480e 100644 --- a/common/resource/params.go +++ b/common/resource/params.go @@ -24,6 +24,7 @@ import ( "github.com/uber-go/tally" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "github.com/uber/cadence/client/history" "github.com/uber/cadence/common" "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" @@ -80,5 +81,7 @@ type ( PinotClient pinot.GenericClient AsyncWorkflowQueueProvider queue.Provider TimeSource clock.TimeSource + // HistoryClientFn is used by integration tests to mock a history client + HistoryClientFn func() history.Client } ) diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index 0b79fe88fbd..ab842b113f7 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -239,7 +239,14 @@ func New( serviceConfig.IsErrorRetryableFunction, ) - historyRawClient := clientBean.GetHistoryClient() + var historyRawClient history.Client + if params.HistoryClientFn != nil { + logger.Debug("Using history client from HistoryClientFn") + historyRawClient = params.HistoryClientFn() + } else { + logger.Debug("Using history client from bean") + historyRawClient = clientBean.GetHistoryClient() + } historyClient := retryable.NewHistoryClient( historyRawClient, common.CreateHistoryServiceRetryPolicy(), diff --git a/docker/buildkite/docker-compose-local-matching-simulation.yml b/docker/buildkite/docker-compose-local-matching-simulation.yml new file mode 100644 index 00000000000..89a6eb8b6a9 --- /dev/null +++ b/docker/buildkite/docker-compose-local-matching-simulation.yml @@ -0,0 +1,57 @@ +version: "3.5" + +services: + cassandra: + image: cassandra:4.1.1 + environment: + - "MAX_HEAP_SIZE=256M" + - "HEAP_NEWSIZE=128M" + expose: + - "9042" + networks: + services-network: + aliases: + - cassandra + healthcheck: + test: ["CMD", "cqlsh", "-u cassandra", "-p cassandra" ,"-e describe keyspaces"] + interval: 15s + timeout: 30s + retries: 10 + + matching-simulator: + build: + context: ../../ + dockerfile: ./docker/buildkite/Dockerfile + command: + - /bin/sh + - -e + - -c + - > + go test -timeout 180s + -run ^TestMatchingSimulationSuite$ + -count 1 + -v + -tags matchingsim + github.com/uber/cadence/host + | tee test.log + environment: + - "MATCHING_LOG_EVENTS=true" + - "CASSANDRA_HOST=cassandra" + - "CASSANDRA=1" + - "CASSANDRA_SEEDS=cassandra" + depends_on: + cassandra: + condition: service_healthy + volumes: + - ../../:/cadence + - /cadence/.build/ # ensure we don't mount the build directory + - /cadence/.bin/ # ensure we don't mount the bin directory + networks: + services-network: + aliases: + - integration-test + +networks: + services-network: + name: services-network + driver: bridge diff --git a/host/client.go b/host/client.go index 8520a9b9fdf..c2fcdf0ab48 100644 --- a/host/client.go +++ b/host/client.go @@ -29,6 +29,7 @@ import ( "github.com/uber/cadence/client/admin" "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/client/history" + "github.com/uber/cadence/client/matching" "github.com/uber/cadence/client/wrappers/thrift" "github.com/uber/cadence/common/service" ) @@ -48,6 +49,10 @@ type HistoryClient interface { history.Client } +type MatchingClient interface { + matching.Client +} + // NewAdminClient creates a client to cadence admin client func NewAdminClient(d *yarpc.Dispatcher) AdminClient { return thrift.NewAdminClient(adminserviceclient.New(d.ClientConfig(testOutboundName(service.Frontend)))) diff --git a/host/integrationbase.go b/host/integrationbase.go index f48dc7020d1..ddb6c594b65 100644 --- a/host/integrationbase.go +++ b/host/integrationbase.go @@ -23,7 +23,6 @@ package host import ( "context" "fmt" - "io/ioutil" "os" "testing" "time" @@ -148,7 +147,6 @@ func (s *IntegrationBase) setupLogger() { // GetTestClusterConfig return test cluster config func GetTestClusterConfig(configFile string) (*TestClusterConfig, error) { - if err := environment.SetupEnv(); err != nil { return nil, err } @@ -159,7 +157,7 @@ func GetTestClusterConfig(configFile string) (*TestClusterConfig, error) { } // This is just reading a config so it's less of a security concern // #nosec - confContent, err := ioutil.ReadFile(configLocation) + confContent, err := os.ReadFile(configLocation) if err != nil { return nil, fmt.Errorf("failed to read test cluster config file %v: %v", configLocation, err) } @@ -190,7 +188,7 @@ func GetTestClusterConfigs(configFile string) ([]*TestClusterConfig, error) { fileName = TestFlags.TestClusterConfigFile } - confContent, err := ioutil.ReadFile(fileName) + confContent, err := os.ReadFile(fileName) if err != nil { return nil, fmt.Errorf("failed to read test cluster config file %v: %v", fileName, err) } diff --git a/host/matching_simulation_test.go b/host/matching_simulation_test.go new file mode 100644 index 00000000000..ef6dc1104b0 --- /dev/null +++ b/host/matching_simulation_test.go @@ -0,0 +1,472 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:build !race && matchingsim +// +build !race,matchingsim + +/* +To run locally: + +1. Change the matchingconfig in host/testdata/matching_simulation.yaml as you wish + +2. Run `./scripts/run_matching_simulator.sh` + +Full test logs can be found at test.log file. Event json logs can be found at matching-simulator-output.json. +See the run_matching_simulator.sh script for more details about how to parse events. +*/ +package host + +import ( + "context" + "flag" + "fmt" + "reflect" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.uber.org/yarpc" + + "github.com/uber/cadence/client/history" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/persistence" + pt "github.com/uber/cadence/common/persistence/persistence-tests" + "github.com/uber/cadence/common/types" + + _ "github.com/uber/cadence/common/asyncworkflow/queue/kafka" // needed to load kafka asyncworkflow queue +) + +type operation string + +const ( + operationPollForDecisionTask operation = "PollForDecisionTask" +) + +type operationStats struct { + op operation + dur time.Duration + err error +} + +type operationAggStats struct { + successCnt int + failCnt int + totalDuration time.Duration + maxDuration time.Duration + lastUpdated time.Time +} + +func TestMatchingSimulationSuite(t *testing.T) { + flag.Parse() + + confPath := "testdata/matching_simulation.yaml" + clusterConfig, err := GetTestClusterConfig(confPath) + if err != nil { + t.Fatalf("failed creating cluster config from %s, err: %v", confPath, err) + } + + clusterConfig.MatchingDynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ + dynamicconfig.MatchingNumTasklistWritePartitions: getPartitions(clusterConfig.MatchingConfig.SimulationConfig.TaskListWritePartitions), + dynamicconfig.MatchingNumTasklistReadPartitions: getPartitions(clusterConfig.MatchingConfig.SimulationConfig.TaskListReadPartitions), + dynamicconfig.MatchingForwarderMaxOutstandingPolls: getForwarderMaxOutstandingPolls(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingPolls), + dynamicconfig.MatchingForwarderMaxOutstandingTasks: getForwarderMaxOutstandingTasks(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxOutstandingTasks), + dynamicconfig.MatchingForwarderMaxRatePerSecond: getForwarderMaxRPS(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxRatePerSecond), + dynamicconfig.MatchingForwarderMaxChildrenPerNode: getForwarderMaxChildPerNode(clusterConfig.MatchingConfig.SimulationConfig.ForwarderMaxChildrenPerNode), + } + + ctrl := gomock.NewController(t) + mockHistoryCl := history.NewMockClient(ctrl) + mockHistoryCl.EXPECT().RecordDecisionTaskStarted(gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, req *types.RecordDecisionTaskStartedRequest, opts ...yarpc.CallOption) (*types.RecordDecisionTaskStartedResponse, error) { + return &types.RecordDecisionTaskStartedResponse{ + ScheduledEventID: req.ScheduleID, + }, nil + }).AnyTimes() + clusterConfig.HistoryConfig.MockClient = mockHistoryCl + + testCluster := NewPersistenceTestCluster(t, clusterConfig) + + s := new(MatchingSimulationSuite) + params := IntegrationBaseParams{ + DefaultTestCluster: testCluster, + VisibilityTestCluster: testCluster, + TestClusterConfig: clusterConfig, + } + s.IntegrationBase = NewIntegrationBase(params) + suite.Run(t, s) +} + +func (s *MatchingSimulationSuite) SetupSuite() { + s.setupLogger() + + s.Logger.Info("Running integration test against test cluster") + clusterMetadata := NewClusterMetadata(s.T(), s.testClusterConfig) + dc := persistence.DynamicConfiguration{ + EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true), + PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100), + EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true), + } + params := pt.TestBaseParams{ + DefaultTestCluster: s.defaultTestCluster, + VisibilityTestCluster: s.visibilityTestCluster, + ClusterMetadata: clusterMetadata, + DynamicConfiguration: dc, + } + cluster, err := NewCluster(s.T(), s.testClusterConfig, s.Logger, params) + s.Require().NoError(err) + s.testCluster = cluster + s.engine = s.testCluster.GetFrontendClient() + s.adminClient = s.testCluster.GetAdminClient() + + s.domainName = s.randomizeStr("integration-test-domain") + s.Require().NoError(s.registerDomain(s.domainName, 1, types.ArchivalStatusDisabled, "", types.ArchivalStatusDisabled, "")) + s.secondaryDomainName = s.randomizeStr("unused-test-domain") + s.Require().NoError(s.registerDomain(s.secondaryDomainName, 1, types.ArchivalStatusDisabled, "", types.ArchivalStatusDisabled, "")) + + time.Sleep(2 * time.Second) +} + +func (s *MatchingSimulationSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *MatchingSimulationSuite) TearDownSuite() { + s.tearDownSuite() +} + +func (s *MatchingSimulationSuite) TestMatchingSimulation() { + matchingClient := s.testCluster.GetMatchingClient() + + ctx, cancel := context.WithCancel(context.Background()) + + domainID := s.domainID(ctx) + tasklist := "my-tasklist" + + // Start stat collector + statsCh := make(chan *operationStats, 10000) + aggStats := make(map[operation]*operationAggStats) + var collectorWG sync.WaitGroup + collectorWG.Add(1) + go s.collectStats(statsCh, aggStats, &collectorWG) + + // Start pollers + numPollers := getNumPollers(s.testClusterConfig.MatchingConfig.SimulationConfig.NumPollers) + pollDuration := getPollDuration(s.testClusterConfig.MatchingConfig.SimulationConfig.PollTimeout) + polledTasksCounter := int32(0) + var pollerWG sync.WaitGroup + for i := 0; i < numPollers; i++ { + pollerWG.Add(1) + go s.poll(ctx, matchingClient, domainID, tasklist, &polledTasksCounter, &pollerWG, pollDuration, statsCh) + } + + // wait a bit for pollers to start. + time.Sleep(300 * time.Millisecond) + + // Start task generators + generatedTasksCounter := int32(0) + lastTaskScheduleID := int32(0) + numGenerators := getNumGenerators(s.testClusterConfig.MatchingConfig.SimulationConfig.NumTaskGenerators) + taskGenerateInterval := getTaskGenerateInterval(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskGeneratorTickInterval) + maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate) + var generatorWG sync.WaitGroup + for i := 1; i <= numGenerators; i++ { + generatorWG.Add(1) + go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, taskGenerateInterval, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG) + } + + // Let it run for a while + sleepDuration := 60 * time.Second + s.log("Wait %v for simulation to run", sleepDuration) + time.Sleep(sleepDuration) + s.log("Canceling context to stop pollers and task generators") + cancel() + pollerWG.Wait() + s.log("Pollers stopped") + generatorWG.Wait() + s.log("Generators stopped") + s.log("Stopping stats collector") + close(statsCh) + collectorWG.Wait() + s.log("Stats collector stopped") + + // Print the test summary. + // Don't change the start/end line format as it is used by scripts to parse the summary info + testSummary := []string{} + testSummary = append(testSummary, "Simulation Summary:") + testSummary = append(testSummary, fmt.Sprintf("Simulation Duration: %v", sleepDuration)) + testSummary = append(testSummary, fmt.Sprintf("Num of Pollers: %d", numPollers)) + testSummary = append(testSummary, fmt.Sprintf("Poll Timeout: %v", pollDuration)) + testSummary = append(testSummary, fmt.Sprintf("Num of Task Generators: %d", numGenerators)) + testSummary = append(testSummary, fmt.Sprintf("Task generated every: %v", taskGenerateInterval)) + testSummary = append(testSummary, fmt.Sprintf("Num of Write Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistWritePartitions])) + testSummary = append(testSummary, fmt.Sprintf("Num of Read Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistReadPartitions])) + testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Polls: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingPolls])) + testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Tasks: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingTasks])) + testSummary = append(testSummary, fmt.Sprintf("Forwarder Max RPS: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxRatePerSecond])) + testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Children per Node: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxChildrenPerNode])) + testSummary = append(testSummary, fmt.Sprintf("Tasks generated: %d", generatedTasksCounter)) + testSummary = append(testSummary, fmt.Sprintf("Tasks polled: %d", polledTasksCounter)) + totalPollCnt := 0 + if pollStats, ok := aggStats[operationPollForDecisionTask]; ok { + totalPollCnt = pollStats.successCnt + pollStats.failCnt + } + + if totalPollCnt == 0 { + testSummary = append(testSummary, "No poll requests were made") + } else { + testSummary = append(testSummary, fmt.Sprintf("Total poll requests: %d", totalPollCnt)) + testSummary = append(testSummary, fmt.Sprintf("Poll request failure rate %%: %d", 100*aggStats[operationPollForDecisionTask].failCnt/totalPollCnt)) + testSummary = append(testSummary, fmt.Sprintf("Avg Poll latency (ms): %d", (aggStats[operationPollForDecisionTask].totalDuration/time.Duration(totalPollCnt)).Milliseconds())) + testSummary = append(testSummary, fmt.Sprintf("Max Poll latency (ms): %d", aggStats[operationPollForDecisionTask].maxDuration.Milliseconds())) + } + testSummary = append(testSummary, "End of Simulation Summary") + fmt.Println(strings.Join(testSummary, "\n")) +} + +func (s *MatchingSimulationSuite) log(msg string, args ...interface{}) { + msg = time.Now().Format(time.RFC3339Nano) + "\t" + msg + s.T().Logf(msg, args...) +} + +func (s *MatchingSimulationSuite) generate( + ctx context.Context, + matchingClient MatchingClient, + domainID, tasklist string, + maxTasksToGenerate int, + taskGenerateInterval time.Duration, + generatedTasksCounter *int32, + lastTaskScheduleID *int32, + wg *sync.WaitGroup) { + defer wg.Done() + + t := time.NewTicker(taskGenerateInterval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + s.log("Generator done") + return + case <-t.C: + scheduleID := int(atomic.AddInt32(lastTaskScheduleID, 1)) + if scheduleID > maxTasksToGenerate { + s.log("Generated %d tasks so generator will stop", maxTasksToGenerate) + return + } + decisionTask := newDecisionTask(domainID, tasklist, scheduleID) + reqCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + err := matchingClient.AddDecisionTask(reqCtx, decisionTask) + cancel() + if err != nil { + s.log("Error when adding decision task, err: %v", err) + continue + } + + s.log("Decision task %d added", scheduleID) + atomic.AddInt32(generatedTasksCounter, 1) + } + } +} + +func (s *MatchingSimulationSuite) poll( + ctx context.Context, + matchingClient MatchingClient, + domainID, tasklist string, + polledTasksCounter *int32, + wg *sync.WaitGroup, + pollDuration time.Duration, + statsCh chan *operationStats, +) { + defer wg.Done() + t := time.NewTicker(50 * time.Millisecond) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + s.log("Poller done") + return + case <-t.C: + s.log("Poller will initiate a poll") + reqCtx, cancel := context.WithTimeout(ctx, pollDuration) + start := time.Now() + resp, err := matchingClient.PollForDecisionTask(reqCtx, &types.MatchingPollForDecisionTaskRequest{ + DomainUUID: domainID, + PollRequest: &types.PollForDecisionTaskRequest{ + TaskList: &types.TaskList{ + Name: tasklist, + Kind: types.TaskListKindNormal.Ptr(), + }, + }, + }) + cancel() + + statsCh <- &operationStats{ + op: operationPollForDecisionTask, + dur: time.Since(start), + err: err, + } + + if err != nil { + s.log("PollForDecisionTask failed: %v", err) + continue + } + + empty := &types.MatchingPollForDecisionTaskResponse{} + + if reflect.DeepEqual(empty, resp) { + s.log("PollForDecisionTask response is empty") + continue + } + + atomic.AddInt32(polledTasksCounter, 1) + s.log("PollForDecisionTask got a task with startedid: %d. resp: %+v", resp.StartedEventID, resp) + } + } +} + +func (s *MatchingSimulationSuite) collectStats(statsCh chan *operationStats, aggStats map[operation]*operationAggStats, wg *sync.WaitGroup) { + defer wg.Done() + for stat := range statsCh { + opAggStats, ok := aggStats[stat.op] + if !ok { + opAggStats = &operationAggStats{} + aggStats[stat.op] = opAggStats + } + + opAggStats.lastUpdated = time.Now() + if stat.err != nil { + opAggStats.failCnt++ + } else { + opAggStats.successCnt++ + } + + opAggStats.totalDuration += stat.dur + if stat.dur > opAggStats.maxDuration { + opAggStats.maxDuration = stat.dur + } + } + + s.log("Stats collector done") +} + +func (s *MatchingSimulationSuite) domainID(ctx context.Context) string { + reqCtx, cancel := context.WithTimeout(ctx, 250*time.Millisecond) + defer cancel() + domainDesc, err := s.testCluster.GetFrontendClient().DescribeDomain(reqCtx, &types.DescribeDomainRequest{ + Name: &s.domainName, + }) + s.Require().NoError(err, "Error when describing domain") + + domainID := domainDesc.GetDomainInfo().UUID + s.T().Logf("DomainID: %s", domainID) + return domainID +} + +func newDecisionTask(domainID, tasklist string, i int) *types.AddDecisionTaskRequest { + return &types.AddDecisionTaskRequest{ + DomainUUID: domainID, + Execution: &types.WorkflowExecution{ + WorkflowID: "test-workflow-id", + RunID: uuid.New(), + }, + TaskList: &types.TaskList{ + Name: tasklist, + Kind: types.TaskListKindNormal.Ptr(), + }, + ScheduleID: int64(i), + } +} + +func getMaxTaskstoGenerate(i int) int { + if i == 0 { + return 2000 + } + return i +} + +func getTaskGenerateInterval(i time.Duration) time.Duration { + if i == 0 { + return 50 * time.Millisecond + } + return i +} + +func getNumGenerators(i int) int { + if i == 0 { + return 1 + } + return i +} + +func getNumPollers(i int) int { + if i == 0 { + return 10 + } + return i +} + +func getPollDuration(d time.Duration) time.Duration { + if d == 0 { + return 1 * time.Second + } + return d +} + +func getPartitions(i int) int { + if i == 0 { + return 1 + } + return i +} + +func getForwarderMaxOutstandingPolls(i int) int { + if i == 0 { + return 20 + } + return i +} + +func getForwarderMaxOutstandingTasks(i int) int { + if i == 0 { + return 1 + } + return i +} + +func getForwarderMaxRPS(i int) int { + if i == 0 { + return 10 + } + return i +} + +func getForwarderMaxChildPerNode(i int) int { + if i == 0 { + return 20 + } + return i +} diff --git a/host/onebox.go b/host/onebox.go index 09b82a94c37..3988226493b 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -40,6 +40,7 @@ import ( adminClient "github.com/uber/cadence/client/admin" frontendClient "github.com/uber/cadence/client/frontend" historyClient "github.com/uber/cadence/client/history" + matchingClient "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" carchiver "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" @@ -83,18 +84,20 @@ type Cadence interface { GetFrontendClient() frontendClient.Client FrontendHost() membership.HostInfo GetHistoryClient() historyClient.Client + GetMatchingClient() matchingClient.Client GetExecutionManagerFactory() persistence.ExecutionManagerFactory } type ( cadenceImpl struct { - frontendService common.Daemon - matchingService common.Daemon - historyServices []common.Daemon + frontendService common.Daemon + matchingServices []common.Daemon + historyServices []common.Daemon adminClient adminClient.Client frontendClient frontendClient.Client historyClient historyClient.Client + matchingClient matchingClient.Client logger log.Logger clusterMetadata cluster.Metadata persistenceConfig config.Persistence @@ -112,6 +115,7 @@ type ( archiverMetadata carchiver.ArchivalMetadata archiverProvider provider.ArchiverProvider historyConfig *HistoryConfig + matchingConfig *MatchingConfig esConfig *config.ElasticSearchConfig esClient elasticsearch.GenericClient workerConfig *WorkerConfig @@ -132,12 +136,59 @@ type ( // HistoryConfig contains configs for history service HistoryConfig struct { + // When MockClient is set, rest of the configs are ignored, history service is not started + // and mock history client is passed to other services + MockClient HistoryClient + NumHistoryShards int NumHistoryHosts int HistoryCountLimitError int HistoryCountLimitWarn int } + MatchingConfig struct { + // number of matching host can be at most 4 due to existing static port assignments in onebox.go. + // can be changed easily. + NumMatchingHosts int + SimulationConfig MatchingSimulationConfig + } + + MatchingSimulationConfig struct { + // Number of task list write partitions defaults to 1 + TaskListWritePartitions int + + // Number of task list read partitions defaults to 1 + TaskListReadPartitions int + + // Number of pollers defaults to 10 + NumPollers int + + // Number of task generators defaults to 1 + NumTaskGenerators int + + // Each generator will produce a new task every TaskGeneratorTickInterval. Defaults to 50ms + TaskGeneratorTickInterval time.Duration + + // Upper limit of tasks to generate. Task generators will stop if total number of tasks generated reaches MaxTaskToGenerate during simulation + // Defaults to 2k + MaxTaskToGenerate int + + // Poll request timeout defaults to 1 second + PollTimeout time.Duration + + // At most N polls will be forwarded at a time. defaults to 20 + ForwarderMaxOutstandingPolls int + + // At most N tasks will be forwarded at a time. defaults to 1 + ForwarderMaxOutstandingTasks int + + // Forwarder rps limit defaults to 10 + ForwarderMaxRatePerSecond int + + // Children per node. defaults to 20 + ForwarderMaxChildrenPerNode int + } + // CadenceParams contains everything needed to bootstrap Cadence CadenceParams struct { ClusterMetadata cluster.Metadata @@ -153,6 +204,7 @@ type ( ArchiverProvider provider.ArchiverProvider EnableReadHistoryFromArchival bool HistoryConfig *HistoryConfig + MatchingConfig *MatchingConfig ESConfig *config.ElasticSearchConfig ESClient elasticsearch.GenericClient WorkerConfig *WorkerConfig @@ -189,6 +241,7 @@ func NewCadence(params *CadenceParams) Cadence { archiverMetadata: params.ArchiverMetadata, archiverProvider: params.ArchiverProvider, historyConfig: params.HistoryConfig, + matchingConfig: params.MatchingConfig, workerConfig: params.WorkerConfig, mockAdminClient: params.MockAdminClient, domainReplicationTaskExecutor: params.DomainReplicationTaskExecutor, @@ -211,7 +264,7 @@ func (c *cadenceImpl) enableWorker() bool { func (c *cadenceImpl) Start() error { hosts := make(map[string][]membership.HostInfo) hosts[service.Frontend] = []membership.HostInfo{c.FrontendHost()} - hosts[service.Matching] = []membership.HostInfo{c.MatchingServiceHost()} + hosts[service.Matching] = c.MatchingHosts() hosts[service.History] = c.HistoryHosts() if c.enableWorker() { hosts[service.Worker] = []membership.HostInfo{c.WorkerServiceHost()} @@ -224,8 +277,11 @@ func (c *cadenceImpl) Start() error { } var startWG sync.WaitGroup - startWG.Add(2) + startWG.Add(1) go c.startHistory(hosts, &startWG) + startWG.Wait() + + startWG.Add(1) go c.startMatching(hosts, &startWG) startWG.Wait() @@ -243,16 +299,20 @@ func (c *cadenceImpl) Start() error { } func (c *cadenceImpl) Stop() { + serviceCount := 3 if c.enableWorker() { - c.shutdownWG.Add(4) - } else { - c.shutdownWG.Add(3) + serviceCount++ } + + c.shutdownWG.Add(serviceCount) c.frontendService.Stop() for _, historyService := range c.historyServices { historyService.Stop() } - c.matchingService.Stop() + for _, matchingService := range c.matchingServices { + matchingService.Stop() + } + if c.workerConfig.EnableReplicator { c.replicator.Stop() } @@ -334,7 +394,7 @@ func (c *cadenceImpl) HistoryHosts() []membership.HostInfo { return hosts } -func (c *cadenceImpl) HistoryPProfPort() []int { +func (c *cadenceImpl) HistoryPProfPorts() []int { var ports []int var startPort int switch c.clusterNo { @@ -358,7 +418,8 @@ func (c *cadenceImpl) HistoryPProfPort() []int { return ports } -func (c *cadenceImpl) MatchingServiceHost() membership.HostInfo { +func (c *cadenceImpl) MatchingHosts() []membership.HostInfo { + var hosts []membership.HostInfo var tchan uint16 switch c.clusterNo { case 0: @@ -373,23 +434,39 @@ func (c *cadenceImpl) MatchingServiceHost() membership.HostInfo { tchan = 7106 } - return newHost(tchan) + for i := 0; i < c.matchingConfig.NumMatchingHosts; i++ { + port := tchan + uint16(i) + hosts = append(hosts, newHost(uint16(port))) + } + c.logger.Info("Matching hosts", tag.Value(hosts)) + + return hosts } -func (c *cadenceImpl) MatchingPProfPort() int { +func (c *cadenceImpl) MatchingPProfPorts() []int { + var ports []int + var startPort int switch c.clusterNo { case 0: - return 7107 + startPort = 7206 case 1: - return 8107 + startPort = 8206 case 2: - return 9107 + startPort = 9206 case 3: - return 10107 + startPort = 10206 default: - return 7107 + startPort = 7206 + } + + for i := 0; i < c.matchingConfig.NumMatchingHosts; i++ { + port := startPort + i + ports = append(ports, port) } + + c.logger.Info("Matching pprof ports", tag.Value(ports)) + return ports } func (c *cadenceImpl) WorkerServiceHost() membership.HostInfo { @@ -436,6 +513,10 @@ func (c *cadenceImpl) GetHistoryClient() historyClient.Client { return c.historyClient } +func (c *cadenceImpl) GetMatchingClient() matchingClient.Client { + return c.matchingClient +} + func (c *cadenceImpl) startFrontend(hosts map[string][]membership.HostInfo, startWG *sync.WaitGroup) { params := new(resource.Params) params.ClusterRedirectionPolicy = &config.ClusterRedirectionPolicy{} @@ -509,17 +590,17 @@ func (c *cadenceImpl) startFrontend(hosts map[string][]membership.HostInfo, star c.adminClient = NewAdminClient(frontendService.GetDispatcher()) go frontendService.Start() + c.logger.Info("Started frontend service") startWG.Done() + <-c.shutdownCh c.shutdownWG.Done() } -func (c *cadenceImpl) startHistory( - hosts map[string][]membership.HostInfo, - startWG *sync.WaitGroup, -) { - pprofPorts := c.HistoryPProfPort() - for i, hostport := range c.HistoryHosts() { +func (c *cadenceImpl) startHistory(hosts map[string][]membership.HostInfo, startWG *sync.WaitGroup) { + pprofPorts := c.HistoryPProfPorts() + historyHosts := c.HistoryHosts() + for i, hostport := range historyHosts { params := new(resource.Params) params.Name = service.History params.Logger = c.logger @@ -588,49 +669,69 @@ func (c *cadenceImpl) startHistory( } startWG.Done() + c.logger.Info(fmt.Sprintf("Started %d history services", len(c.historyServices))) + <-c.shutdownCh c.shutdownWG.Done() } func (c *cadenceImpl) startMatching(hosts map[string][]membership.HostInfo, startWG *sync.WaitGroup) { + pprofPorts := c.MatchingPProfPorts() + for i, hostport := range c.MatchingHosts() { + hostport.Identity() + matchingHost := fmt.Sprintf("matching-host-%d:%s", i, hostport.Identity()) + params := new(resource.Params) + params.Name = service.Matching + params.Logger = c.logger.WithTags(tag.Dynamic("matching-host", matchingHost)) + params.ThrottledLogger = c.logger + params.TimeSource = c.timeSource + params.PProfInitializer = newPProfInitializerImpl(c.logger, pprofPorts[i]) + params.RPCFactory = c.newRPCFactory(service.Matching, hostport) + params.MetricScope = tally.NewTestScope(service.Matching, map[string]string{"matching-host": matchingHost}) + params.MembershipResolver = newMembershipResolver(params.Name, hosts) + params.ClusterMetadata = c.clusterMetadata + params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) + params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.matchingDynCfgOverrides) + params.ArchivalMetadata = c.archiverMetadata + params.ArchiverProvider = c.archiverProvider - params := new(resource.Params) - params.Name = service.Matching - params.Logger = c.logger - params.ThrottledLogger = c.logger - params.TimeSource = c.timeSource - params.PProfInitializer = newPProfInitializerImpl(c.logger, c.MatchingPProfPort()) - params.RPCFactory = c.newRPCFactory(service.Matching, c.MatchingServiceHost()) - params.MetricScope = tally.NewTestScope(service.Matching, make(map[string]string)) - params.MembershipResolver = newMembershipResolver(params.Name, hosts) - params.ClusterMetadata = c.clusterMetadata - params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) - params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.matchingDynCfgOverrides) - params.ArchivalMetadata = c.archiverMetadata - params.ArchiverProvider = c.archiverProvider + var err error + params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig) + if err != nil { + c.logger.Fatal("Failed to copy persistence config for matching", tag.Error(err)) + } - var err error - params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig) - if err != nil { - c.logger.Fatal("Failed to copy persistence config for matching", tag.Error(err)) - } + if c.historyConfig.MockClient != nil { + params.HistoryClientFn = func() historyClient.Client { + return c.historyConfig.MockClient + } + } + + matchingService, err := matching.NewService(params) + if err != nil { + params.Logger.Fatal("unable to start matching service", tag.Error(err)) + } - matchingService, err := matching.NewService(params) - if err != nil { - params.Logger.Fatal("unable to start matching service", tag.Error(err)) - } - if c.mockAdminClient != nil { clientBean := matchingService.GetClientBean() - if clientBean != nil { + if c.mockAdminClient != nil { for serviceName, client := range c.mockAdminClient { clientBean.SetRemoteAdminClient(serviceName, client) } } + + // When there are multiple matching hosts the last client will overwrite previous ones. + // It should be fine because the underlying client bean logic should still pick the right destination. + c.matchingClient, err = clientBean.GetMatchingClient(matchingService.GetDomainCache().GetDomainName) + if err != nil { + params.Logger.Fatal("unable to get matching client", tag.Error(err)) + } + c.matchingServices = append(c.matchingServices, matchingService) + go matchingService.Start() } - c.matchingService = matchingService - go c.matchingService.Start() startWG.Done() + c.logger.Info(fmt.Sprintf("Started %d matching services", len(c.matchingServices))) + <-c.shutdownCh c.shutdownWG.Done() } @@ -717,7 +818,9 @@ func (c *cadenceImpl) startWorker(hosts map[string][]membership.HostInfo, startW defer cm.Stop() } + c.logger.Info("Started worker service") startWG.Done() + <-c.shutdownCh if c.workerConfig.EnableReplicator { replicatorDomainCache.Stop() @@ -725,7 +828,6 @@ func (c *cadenceImpl) startWorker(hosts map[string][]membership.HostInfo, startW if c.workerConfig.EnableArchiver { clientWorkerDomainCache.Stop() } - } func (c *cadenceImpl) startWorkerReplicator(svc Service) { diff --git a/host/test_suites.go b/host/test_suites.go index 055a85b4f01..c3911427f04 100644 --- a/host/test_suites.go +++ b/host/test_suites.go @@ -61,6 +61,11 @@ type ( *IntegrationBase } + MatchingSimulationSuite struct { + *require.Assertions + *IntegrationBase + } + WorkflowIDRateLimitIntegrationSuite struct { *require.Assertions *IntegrationBase diff --git a/host/testcluster.go b/host/testcluster.go index 6acf89034aa..1594a0bdf63 100644 --- a/host/testcluster.go +++ b/host/testcluster.go @@ -89,6 +89,7 @@ type ( MessagingClientConfig *MessagingClientConfig Persistence persistencetests.TestBaseOptions HistoryConfig *HistoryConfig + MatchingConfig *MatchingConfig ESConfig *config.ElasticSearchConfig WorkerConfig *WorkerConfig MockAdminClient map[string]adminClient.Client @@ -168,6 +169,7 @@ func NewCluster(t *testing.T, options *TestClusterConfig, logger log.Logger, par ArchiverMetadata: archiverBase.metadata, ArchiverProvider: archiverBase.provider, HistoryConfig: options.HistoryConfig, + MatchingConfig: options.MatchingConfig, WorkerConfig: options.WorkerConfig, MockAdminClient: options.MockAdminClient, DomainReplicationTaskExecutor: domain.NewReplicationTaskExecutor(testBase.DomainManager, clock.NewRealTimeSource(), logger), @@ -238,6 +240,7 @@ func NewPinotTestCluster(t *testing.T, options *TestClusterConfig, logger log.Lo ArchiverMetadata: archiverBase.metadata, ArchiverProvider: archiverBase.provider, HistoryConfig: options.HistoryConfig, + MatchingConfig: options.MatchingConfig, WorkerConfig: options.WorkerConfig, MockAdminClient: options.MockAdminClient, DomainReplicationTaskExecutor: domain.NewReplicationTaskExecutor(testBase.DomainManager, clock.NewRealTimeSource(), logger), @@ -347,7 +350,7 @@ func newArchiverBase(enabled bool, logger log.Logger) *ArchiverBase { if !enabled { return &ArchiverBase{ metadata: archiver.NewArchivalMetadata(dcCollection, "", false, "", false, &config.ArchivalDomainDefaults{}), - provider: provider.NewArchiverProvider(nil, nil), + provider: provider.NewNoOpArchiverProvider(), } } @@ -423,6 +426,11 @@ func (tc *TestCluster) GetHistoryClient() HistoryClient { return tc.host.GetHistoryClient() } +// GetMatchingClient returns a matching client from the test cluster +func (tc *TestCluster) GetMatchingClient() MatchingClient { + return tc.host.GetMatchingClient() +} + // GetExecutionManagerFactory returns an execution manager factory from the test cluster func (tc *TestCluster) GetExecutionManagerFactory() persistence.ExecutionManagerFactory { return tc.host.GetExecutionManagerFactory() diff --git a/host/testdata/clientintegrationtestcluster.yaml b/host/testdata/clientintegrationtestcluster.yaml index 404c6bd9533..d0ebe1f06c0 100644 --- a/host/testdata/clientintegrationtestcluster.yaml +++ b/host/testdata/clientintegrationtestcluster.yaml @@ -5,6 +5,8 @@ messagingclientconfig: historyconfig: numhistoryshards: 4 numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 workerconfig: enablearchiver: false enablereplicator: false diff --git a/host/testdata/integration_async_wf_with_kafka_cluster.yaml b/host/testdata/integration_async_wf_with_kafka_cluster.yaml index 3b1db5da96f..c266d42ab41 100644 --- a/host/testdata/integration_async_wf_with_kafka_cluster.yaml +++ b/host/testdata/integration_async_wf_with_kafka_cluster.yaml @@ -16,5 +16,7 @@ messagingclientconfig: historyconfig: numhistoryshards: 4 numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 workerconfig: enableasyncwfconsumer: true diff --git a/host/testdata/integration_elasticsearch_os2_cluster.yaml b/host/testdata/integration_elasticsearch_os2_cluster.yaml index bde26abc241..14622979aea 100644 --- a/host/testdata/integration_elasticsearch_os2_cluster.yaml +++ b/host/testdata/integration_elasticsearch_os2_cluster.yaml @@ -19,6 +19,8 @@ messagingclientconfig: historyconfig: numhistoryshards: 4 numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 workerconfig: enablearchiver: false enablereplicator: false diff --git a/host/testdata/integration_elasticsearch_v6_cluster.yaml b/host/testdata/integration_elasticsearch_v6_cluster.yaml index 9e10ce38b29..455eda0f7a5 100644 --- a/host/testdata/integration_elasticsearch_v6_cluster.yaml +++ b/host/testdata/integration_elasticsearch_v6_cluster.yaml @@ -19,6 +19,8 @@ messagingclientconfig: historyconfig: numhistoryshards: 4 numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 workerconfig: enablearchiver: false enablereplicator: false diff --git a/host/testdata/integration_elasticsearch_v7_cluster.yaml b/host/testdata/integration_elasticsearch_v7_cluster.yaml index 9182b99697d..a5e67ab120f 100644 --- a/host/testdata/integration_elasticsearch_v7_cluster.yaml +++ b/host/testdata/integration_elasticsearch_v7_cluster.yaml @@ -19,6 +19,8 @@ messagingclientconfig: historyconfig: numhistoryshards: 4 numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 workerconfig: enablearchiver: false enablereplicator: false diff --git a/host/testdata/integration_pinot_cluster.yaml b/host/testdata/integration_pinot_cluster.yaml index 4b1df849ab6..f0a6d185492 100644 --- a/host/testdata/integration_pinot_cluster.yaml +++ b/host/testdata/integration_pinot_cluster.yaml @@ -26,6 +26,8 @@ messagingclientconfig: historyconfig: numhistoryshards: 4 numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 workerconfig: enablearchiver: false enablereplicator: false diff --git a/host/testdata/integration_sizelimit_cluster.yaml b/host/testdata/integration_sizelimit_cluster.yaml index 2ba6b765162..b63215b11f0 100644 --- a/host/testdata/integration_sizelimit_cluster.yaml +++ b/host/testdata/integration_sizelimit_cluster.yaml @@ -7,6 +7,8 @@ historyconfig: numhistoryhosts: 1 historycountlimiterror: 20 historycountlimitwarn: 10 +matchingconfig: + nummatchinghosts: 1 workerconfig: enablearchiver: false enablereplicator: false diff --git a/host/testdata/integration_test_cluster.yaml b/host/testdata/integration_test_cluster.yaml index d42b43e354f..a8ebfee513b 100644 --- a/host/testdata/integration_test_cluster.yaml +++ b/host/testdata/integration_test_cluster.yaml @@ -5,7 +5,9 @@ messagingclientconfig: historyconfig: numhistoryshards: 4 numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 workerconfig: enablearchiver: true enablereplicator: true - enableindexer: false \ No newline at end of file + enableindexer: false diff --git a/host/testdata/integration_wfidratelimit_cluster.yaml b/host/testdata/integration_wfidratelimit_cluster.yaml index 6ea9ec56fec..5f983983a8f 100644 --- a/host/testdata/integration_wfidratelimit_cluster.yaml +++ b/host/testdata/integration_wfidratelimit_cluster.yaml @@ -5,6 +5,8 @@ messagingclientconfig: historyconfig: numhistoryshards: 4 numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 1 workerconfig: enablearchiver: false enablereplicator: false diff --git a/host/testdata/matching_simulation.yaml b/host/testdata/matching_simulation.yaml new file mode 100644 index 00000000000..4a0da782f2c --- /dev/null +++ b/host/testdata/matching_simulation.yaml @@ -0,0 +1,23 @@ +enablearchival: false +clusterno: 1 +messagingclientconfig: + usemock: true +historyconfig: + numhistoryshards: 4 + numhistoryhosts: 1 +matchingconfig: + nummatchinghosts: 4 + simulationconfig: + tasklistwritepartitions: 2 + tasklistreadpartitions: 2 + numpollers: 10 + numtaskgenerators: 2 + taskgeneratortickinterval: 50ms + maxtasktogenerate: 1500 + polltimeout: 5s + forwardermaxoutstandingpolls: 20 + forwardermaxoutstandingtasks: 1 + forwardermaxratepersecond: 10 + forwardermaxchildrenpernode: 20 +workerconfig: + enableasyncwfconsumer: false diff --git a/host/testdata/ndc_integration_test_clusters.yaml b/host/testdata/ndc_integration_test_clusters.yaml index 423ee6fb8c5..19bef3e5ce8 100644 --- a/host/testdata/ndc_integration_test_clusters.yaml +++ b/host/testdata/ndc_integration_test_clusters.yaml @@ -32,6 +32,8 @@ historyconfig: numhistoryshards: 1 numhistoryhosts: 1 + matchingconfig: + nummatchinghosts: 1 messagingclientconfig: usemock: false kafkaconfig: @@ -97,6 +99,8 @@ historyconfig: numhistoryshards: 1 numhistoryhosts: 1 + matchingconfig: + nummatchinghosts: 1 messagingclientconfig: usemock: false kafkaconfig: @@ -162,6 +166,8 @@ historyconfig: numhistoryshards: 1 numhistoryhosts: 1 + matchingconfig: + nummatchinghosts: 1 messagingclientconfig: usemock: false kafkaconfig: diff --git a/host/testdata/xdc_integration_es_clusters.yaml b/host/testdata/xdc_integration_es_clusters.yaml index 28afa8d448b..f1ea0b53b5c 100644 --- a/host/testdata/xdc_integration_es_clusters.yaml +++ b/host/testdata/xdc_integration_es_clusters.yaml @@ -26,6 +26,8 @@ historyconfig: numhistoryshards: 1 numhistoryhosts: 1 + matchingconfig: + nummatchinghosts: 1 messagingclientconfig: usemock: false kafkaconfig: @@ -92,6 +94,8 @@ historyconfig: numhistoryshards: 1 numhistoryhosts: 1 + matchingconfig: + nummatchinghosts: 1 messagingclientconfig: usemock: false kafkaconfig: diff --git a/host/testdata/xdc_integration_test_clusters.yaml b/host/testdata/xdc_integration_test_clusters.yaml index f3e88ce3250..aaea4597106 100644 --- a/host/testdata/xdc_integration_test_clusters.yaml +++ b/host/testdata/xdc_integration_test_clusters.yaml @@ -26,6 +26,8 @@ historyconfig: numhistoryshards: 1 numhistoryhosts: 1 + matchingconfig: + nummatchinghosts: 1 messagingclientconfig: usemock: false kafkaconfig: @@ -78,6 +80,8 @@ historyconfig: numhistoryshards: 1 numhistoryhosts: 1 + matchingconfig: + nummatchinghosts: 1 messagingclientconfig: usemock: false kafkaconfig: diff --git a/scripts/run_matching_simulator.sh b/scripts/run_matching_simulator.sh new file mode 100755 index 00000000000..a3af1ee2f19 --- /dev/null +++ b/scripts/run_matching_simulator.sh @@ -0,0 +1,179 @@ +#!/bin/bash + +# This script can be used to run matching simulator and check the critical flow via logs +# + +set -eo pipefail + +testName="test-$(date '+%Y-%m-%d-%H-%M-%S')" +resultFolder="matching-simulator-output" +mkdir -p "$resultFolder" +eventLogsFile="$resultFolder/events.json" +testSummaryFile="$resultFolder/$testName-summary.txt" + + +echo "Building test image" +docker-compose -f docker/buildkite/docker-compose-local-matching-simulation.yml \ + build matching-simulator + +echo "Running the test" +docker-compose \ + -f docker/buildkite/docker-compose-local-matching-simulation.yml \ + run --rm matching-simulator \ + | grep -a --line-buffered "Matching New Event" \ + | sed "s/Matching New Event: //" \ + | jq . > "$eventLogsFile" + +if cat test.log | grep -a "FAIL: TestMatchingSimulationSuite"; then + echo "Test failed" + exit 1 +fi + +echo "---- Simulation Summary ----" +cat test.log \ + | sed -n '/Simulation Summary/,/End of Simulation Summary/p' \ + | grep -v "Simulation Summary" \ + | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ + | jq .Payload.Latency | awk '{s+=$0}END{print s/NR}') +echo "Avg Task latency (ms): $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ + | jq .Payload.Latency | sort -n | awk '{a[NR]=$0}END{print a[int(NR*0.75)]}') +echo "P75 Task latency (ms): $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ + | jq .Payload.Latency | sort -n | awk '{a[NR]=$0}END{print a[int(NR*0.95)]}') +echo "P95 Task latency (ms): $tmp" | tee -a $testSummaryFile + + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ + | jq .Payload.Latency | sort -n | awk '{a[NR]=$0}END{print a[int(NR*0.99)]}') +echo "P99 Task latency (ms): $tmp" | tee -a $testSummaryFile + + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ + | jq .Payload.Latency | sort -n | tail -n 1) +echo "Max Task latency (ms): $tmp" | tee -a $testSummaryFile + + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returning task")' \ + | jq '{ScheduleID,TaskListName,EventName,Payload}' \ + | jq -c '.' | wc -l) +echo "Worker Polls that returned a task: $tmp" | tee -a $testSummaryFile + + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returning task" and .Payload.TaskIsForwarded == true)' \ + | jq '{ScheduleID,TaskListName,EventName,Payload}' \ + | jq -c '.' | wc -l) +echo "Worker Polls that returned a forwarded task: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "PollForDecisionTask returned no tasks")' \ + | jq '{ScheduleID,TaskListName,EventName,Payload}' \ + | jq -c '.' | wc -l) +echo "Worker Polls that returned NO task: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Matcher Falling Back to Non-Local Polling")' \ + | jq '{ScheduleID,TaskListName,EventName,Payload}' \ + | jq -c '.' | wc -l) +echo "Worker Polls that falled back to non-local polling: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Attempting to Forward Poll")' \ + | jq '{ScheduleID,TaskListName,EventName,Payload}' \ + | jq -c '.' | wc -l) +echo "Poll forward attempts: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Forwarded Poll returned task")' \ + | jq '{ScheduleID,TaskListName,EventName,Payload}' \ + | jq -c '.' | wc -l) +echo "Forwarded poll returned task: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Task Written to DB")' \ + | jq '{ScheduleID,TaskListName,EventName,Payload}' \ + | jq -c '.' | wc -l) +echo "Tasks Written to DB: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Attempting to Forward Task")' \ + | jq '{ScheduleID,TaskListName,EventName,Payload}' \ + | jq -c '.' | wc -l) +echo "Task forward attempts: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName | contains("Matched Task"))' \ + | jq -c 'select((.Payload.SyncMatched == true) and (.Payload.TaskIsForwarded == true))' \ + | jq '{ScheduleID,TaskListName}' \ + | jq -c '.' | wc -l) +echo "Sync matches - task is forwarded: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName | contains("Matched Task"))' \ + | jq -c 'select((.Payload.SyncMatched == true) and (.Payload.TaskIsForwarded == false))' \ + | jq '{ScheduleID,TaskListName}' \ + | jq -c '.' | wc -l) +echo "Sync matches - task is not forwarded: $tmp" | tee -a $testSummaryFile + + +echo "Per tasklist sync matches:" | tee -a $testSummaryFile +cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "SyncMatched so not persisted")' \ + | jq '.TaskListName' \ + | jq -c '.' | sort -n | uniq -c | sed -e 's/^/ /' | tee -a $testSummaryFile + + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Could not SyncMatched Forwarded Task so not persisted")' \ + | jq '{ScheduleID,TaskListName}' \ + | jq -c '.' | wc -l) +echo "Forwarded Task failed to sync match: $tmp" | tee -a $testSummaryFile + +tmp=$(cat "$eventLogsFile" \ + | jq -c 'select(.EventName | contains("Matched Task"))' \ + | jq -c 'select(.Payload.SyncMatched != true)' \ + | jq '{ScheduleID,TaskListName,Payload}' \ + | jq -c '.' | wc -l) +echo "Async matches: $tmp" | tee -a $testSummaryFile + + +echo "AddDecisionTask request per tasklist (excluding forwarded):" | tee -a $testSummaryFile +cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Received AddDecisionTask" and .Payload.RequestForwardedFrom == "")' \ + | jq '.TaskListName' \ + | jq -c '.' | sort -n | uniq -c | sed -e 's/^/ /' | tee -a $testSummaryFile + +echo "AddDecisionTask request per tasklist (forwarded):" | tee -a $testSummaryFile +cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Received AddDecisionTask" and .Payload.RequestForwardedFrom != "")' \ + | jq '.TaskListName' \ + | jq -c '.' | sort -n | uniq -c | sed -e 's/^/ /' | tee -a $testSummaryFile + + +echo "PollForDecisionTask request per tasklist (excluding forwarded):" | tee -a $testSummaryFile +cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Received PollForDecisionTask" and .Payload.RequestForwardedFrom == "")' \ + | jq '.TaskListName' \ + | jq -c '.' | sort -n | uniq -c | sed -e 's/^/ /' | tee -a $testSummaryFile + + +echo "PollForDecisionTask request per tasklist (forwarded):" | tee -a $testSummaryFile +cat "$eventLogsFile" \ + | jq -c 'select(.EventName == "Received PollForDecisionTask" and .Payload.RequestForwardedFrom != "")' \ + | jq '.TaskListName' \ + | jq -c '.' | sort -n | uniq -c | sed -e 's/^/ /' | tee -a $testSummaryFile + + +printf "\nResults are saved in $testSummaryFile\n" +printf "For further analysis, please check $eventLogsFile via jq queries\n" diff --git a/service/matching/event/logger.go b/service/matching/event/logger.go new file mode 100644 index 00000000000..8f2290b1eb5 --- /dev/null +++ b/service/matching/event/logger.go @@ -0,0 +1,66 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package event + +import ( + "encoding/json" + "fmt" + "os" + "time" + + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" +) + +var enabled = false + +func init() { + enabled = os.Getenv("MATCHING_LOG_EVENTS") == "true" +} + +type E struct { + persistence.TaskInfo + TaskListName string + TaskListKind *types.TaskListKind + TaskListType int // persistence.TaskListTypeDecision or persistence.TaskListTypeActivity + + EventTime time.Time + + // EventName describes the event. It is used to query events in simulations so don't change existing event names. + EventName string + Host string + Payload map[string]any +} + +func Log(events ...E) { + if !enabled { + return + } + for _, e := range events { + e.EventTime = time.Now() + data, err := json.Marshal(e) + if err != nil { + fmt.Printf("failed to marshal event: %v", err) + } + + fmt.Printf("Matching New Event: %s\n", data) + } +} diff --git a/service/matching/handler/engine.go b/service/matching/handler/engine.go index 552fca58e8b..81788977f03 100644 --- a/service/matching/handler/engine.go +++ b/service/matching/handler/engine.go @@ -50,6 +50,7 @@ import ( "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" + "github.com/uber/cadence/service/matching/event" "github.com/uber/cadence/service/matching/tasklist" ) @@ -246,6 +247,16 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t return nil, err } logger.Info("Task list manager state changed", tag.LifeCycleStarted) + event.Log(event.E{ + TaskListName: taskList.GetName(), + TaskListKind: taskListKind, + TaskListType: taskList.GetType(), + TaskInfo: persistence.TaskInfo{ + DomainID: taskList.GetDomainID(), + }, + EventName: "TaskListManager Started", + Host: e.config.HostName, + }) return mgr, nil } @@ -299,6 +310,20 @@ func (e *matchingEngineImpl) AddDecisionTask( taskListKind := request.GetTaskList().Kind taskListType := persistence.TaskListTypeDecision + event.Log(event.E{ + TaskListName: taskListName, + TaskListKind: taskListKind, + TaskListType: taskListType, + TaskInfo: persistence.TaskInfo{ + DomainID: domainID, + ScheduleID: request.GetScheduleID(), + }, + EventName: "Received AddDecisionTask", + Host: e.config.HostName, + Payload: map[string]any{ + "RequestForwardedFrom": request.GetForwardedFrom(), + }, + }) e.emitInfoOrDebugLog( domainID, "Received AddDecisionTask", @@ -441,6 +466,19 @@ func (e *matchingEngineImpl) PollForDecisionTask( tag.WorkflowTaskListName(taskListName), tag.WorkflowDomainID(domainID), ) + event.Log(event.E{ + TaskListName: taskListName, + TaskListKind: taskListKind, + TaskListType: persistence.TaskListTypeDecision, + TaskInfo: persistence.TaskInfo{ + DomainID: domainID, + }, + EventName: "Received PollForDecisionTask", + Host: e.config.HostName, + Payload: map[string]any{ + "RequestForwardedFrom": req.GetForwardedFrom(), + }, + }) pollLoop: for { if err := common.IsValidContext(hCtx.Context); err != nil { @@ -466,12 +504,33 @@ pollLoop: tag.WorkflowDomainID(domainID), tag.Error(err), ) + event.Log(event.E{ + TaskListName: taskListName, + TaskListKind: taskListKind, + TaskListType: persistence.TaskListTypeDecision, + TaskInfo: persistence.TaskInfo{ + DomainID: domainID, + }, + EventName: "PollForDecisionTask returned no tasks", + Host: e.config.HostName, + Payload: map[string]any{ + "RequestForwardedFrom": req.GetForwardedFrom(), + }, + }) return emptyPollForDecisionTaskResponse, nil } return nil, fmt.Errorf("couldn't get task: %w", err) } if task.IsStarted() { + event.Log(event.E{ + TaskListName: taskListName, + TaskListKind: taskListKind, + TaskListType: persistence.TaskListTypeDecision, + TaskInfo: task.Info(), + EventName: "PollForDecisionTask returning already started task", + Host: e.config.HostName, + }) return task.PollForDecisionResponse(), nil // TODO: Maybe add history expose here? } @@ -540,7 +599,22 @@ pollLoop: continue pollLoop } + task.Finish(nil) + event.Log(event.E{ + TaskListName: taskListName, + TaskListKind: taskListKind, + TaskListType: persistence.TaskListTypeDecision, + TaskInfo: task.Info(), + EventName: "PollForDecisionTask returning task", + Host: e.config.HostName, + Payload: map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + "RequestForwardedFrom": req.GetForwardedFrom(), + "Latency": time.Since(task.Info().CreatedTime).Milliseconds(), + }, + }) + return e.createPollForDecisionTaskResponse(task, resp, hCtx.scope), nil } } diff --git a/service/matching/tasklist/matcher.go b/service/matching/tasklist/matcher.go index 243a0b36d84..17c4f1db8fb 100644 --- a/service/matching/tasklist/matcher.go +++ b/service/matching/tasklist/matcher.go @@ -33,6 +33,7 @@ import ( "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" + "github.com/uber/cadence/service/matching/event" ) // TaskMatcher matches a task producer with a task consumer @@ -58,6 +59,9 @@ type TaskMatcher struct { fwdr *Forwarder scope metrics.Scope // domain metric scope numPartitions func() int // number of task list partitions + + tasklist *Identifier + tasklistKind types.TaskListKind } // ErrTasklistThrottled implies a tasklist was throttled @@ -66,7 +70,14 @@ var ErrTasklistThrottled = errors.New("tasklist limit exceeded") // newTaskMatcher returns an task matcher instance. The returned instance can be // used by task producers and consumers to find a match. Both sync matches and non-sync // matches should use this implementation -func newTaskMatcher(config *config.TaskListConfig, fwdr *Forwarder, scope metrics.Scope, isolationGroups []string, log log.Logger) *TaskMatcher { +func newTaskMatcher( + config *config.TaskListConfig, + fwdr *Forwarder, + scope metrics.Scope, + isolationGroups []string, + log log.Logger, + tasklist *Identifier, + tasklistKind types.TaskListKind) *TaskMatcher { dPtr := config.TaskDispatchRPS limiter := quotas.NewRateLimiter(&dPtr, config.TaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize()) isolatedTaskC := make(map[string]chan *InternalTask) @@ -82,6 +93,8 @@ func newTaskMatcher(config *config.TaskListConfig, fwdr *Forwarder, scope metric isolatedTaskC: isolatedTaskC, queryTaskC: make(chan *InternalTask), numPartitions: config.NumReadPartitions, + tasklist: tasklist, + tasklistKind: tasklistKind, } } @@ -123,6 +136,9 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err } } + // TODO: Pollers are aggressively forwarded to parent partitions so it's unlikely + // to have a poller available to pick up the task below for sub-partitions. + // Try adding some wait here. select { case tm.getTaskC(task) <- task: // poller picked up the task if task.ResponseC != nil { @@ -137,6 +153,12 @@ func (tm *TaskMatcher) Offer(ctx context.Context, task *InternalTask) (bool, err // root partition if possible select { case token := <-tm.fwdrAddReqTokenC(): + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + EventName: "Attempting to Forward Task", + }) err := tm.fwdr.ForwardTask(ctx, task) token.release("") if err == nil { @@ -217,7 +239,15 @@ func (tm *TaskMatcher) OfferQuery(ctx context.Context, task *InternalTask) (*typ // MustOffer blocks until a consumer is found to handle this task // Returns error only when context is canceled, expired or the ratelimit is set to zero (allow nothing) func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error { + e := event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + TaskInfo: task.Info(), + } if err := tm.ratelimit(ctx); err != nil { + e.EventName = "Throttled While Dispatching" + event.Log(e) return fmt.Errorf("rate limit error dispatching: %w", err) } @@ -229,8 +259,12 @@ func (tm *TaskMatcher) MustOffer(ctx context.Context, task *InternalTask) error case taskC <- task: // poller picked up the task tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList) tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT)) + e.EventName = "Dispatched to Local Poller" + event.Log(e) return nil case <-ctx.Done(): + e.EventName = "Context Done While Dispatching to Local Poller" + event.Log(e) return fmt.Errorf("context done when trying to forward local task: %w", ctx.Err()) default: } @@ -240,11 +274,15 @@ forLoop: for { select { case taskC <- task: // poller picked up the task + e.EventName = "Dispatched to Local Poller" + event.Log(e) tm.scope.IncCounter(metrics.AsyncMatchLocalPollCounterPerTaskList) tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt)) tm.scope.RecordTimer(metrics.AsyncMatchLocalPollLatencyPerTaskList, time.Since(startT)) return nil case token := <-tm.fwdrAddReqTokenC(): + e.EventName = "Attempting to Forward Task" + event.Log(e) childCtx, cancel := context.WithTimeout(ctx, time.Second*2) err := tm.fwdr.ForwardTask(childCtx, task) token.release("") @@ -252,6 +290,10 @@ forLoop: if errors.Is(err, ErrForwarderSlowDown) { tm.scope.IncCounter(metrics.AsyncMatchForwardTaskThrottleErrorPerTasklist) } + e.EventName = "Task Forwarding Failed" + e.Payload = map[string]any{"error": err.Error()} + event.Log(e) + e.Payload = nil tm.log.Debug("failed to forward task", tag.Error(err), tag.TaskID(task.Event.TaskID), @@ -262,6 +304,8 @@ forLoop: // hoping for a local poller match select { case taskC <- task: // poller picked up the task + e.EventName = "Dispatched to Local Poller (after failed forward)" + event.Log(e) cancel() tm.scope.IncCounter(metrics.AsyncMatchLocalPollAfterForwardFailedCounterPerTaskList) tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt)) @@ -277,15 +321,21 @@ forLoop: continue forLoop } cancel() + + e.EventName = "Task Forwarded" + event.Log(e) tm.scope.IncCounter(metrics.AsyncMatchForwardPollCounterPerTaskList) tm.scope.RecordTimer(metrics.AsyncMatchAttemptPerTaskList, time.Duration(attempt)) tm.scope.RecordTimer(metrics.AsyncMatchForwardPollLatencyPerTaskList, time.Since(startT)) + // at this point, we forwarded the task to a parent partition which // in turn dispatched the task to a poller. Make sure we delete the // task from the database task.Finish(nil) return nil case <-ctx.Done(): + e.EventName = "Context Done While Dispatching to Local or Forwarding" + event.Log(e) return fmt.Errorf("failed to offer task: %w", ctx.Err()) } } @@ -313,6 +363,12 @@ func (tm *TaskMatcher) Poll(ctx context.Context, isolationGroup string) (*Intern tag.Dynamic("isolated channel", len(isolatedTaskC)), tag.Dynamic("fallback channel", len(tm.taskC)), ) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + EventName: "Matcher Falling Back to Non-Local Polling", + }) return tm.pollOrForward(ctx, isolationGroup, isolatedTaskC, tm.taskC, tm.queryTaskC) } @@ -361,12 +417,38 @@ func (tm *TaskMatcher) pollOrForward( tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) } tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + TaskInfo: task.Info(), + EventName: "Matched Task (pollOrForward)", + Payload: map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + "SyncMatched": task.ResponseC != nil, + "FromIsolatedTaskC": true, + "IsolationGroup": task.isolationGroup, + }, + }) return task, nil case task := <-taskC: if task.ResponseC != nil { tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) } tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + TaskInfo: task.Info(), + EventName: "Matched Task (pollOrForward)", + Payload: map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + "SyncMatched": task.ResponseC != nil, + "FromIsolatedTaskC": false, + "IsolationGroup": task.isolationGroup, + }, + }) return task, nil case task := <-queryTaskC: tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) @@ -374,10 +456,28 @@ func (tm *TaskMatcher) pollOrForward( return task, nil case <-ctx.Done(): tm.scope.IncCounter(metrics.PollTimeoutPerTaskListCounter) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + EventName: "Poll Timeout", + }) return nil, ErrNoTasks case token := <-tm.fwdrPollReqTokenC(isolationGroup): + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + EventName: "Attempting to Forward Poll", + }) if task, err := tm.fwdr.ForwardPoll(ctx); err == nil { token.release(isolationGroup) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + EventName: "Forwarded Poll returned task", + }) return task, nil } token.release(isolationGroup) @@ -397,12 +497,38 @@ func (tm *TaskMatcher) poll( tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) } tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + TaskInfo: task.Info(), + EventName: "Matched Task (poll)", + Payload: map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + "SyncMatched": task.ResponseC != nil, + "FromIsolatedTaskC": true, + "IsolationGroup": task.isolationGroup, + }, + }) return task, nil case task := <-taskC: if task.ResponseC != nil { tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) } tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + TaskInfo: task.Info(), + EventName: "Matched Task (poll)", + Payload: map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + "SyncMatched": task.ResponseC != nil, + "FromIsolatedTaskC": false, + "IsolationGroup": task.isolationGroup, + }, + }) return task, nil case task := <-queryTaskC: tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) @@ -410,6 +536,12 @@ func (tm *TaskMatcher) poll( return task, nil case <-ctx.Done(): tm.scope.IncCounter(metrics.PollTimeoutPerTaskListCounter) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + EventName: "Poll Timeout", + }) return nil, ErrNoTasks } } @@ -426,18 +558,50 @@ func (tm *TaskMatcher) pollNonBlocking( tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) } tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + TaskInfo: task.Info(), + EventName: "Matched Task Nonblocking", + Payload: map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + "SyncMatched": task.ResponseC != nil, + "FromIsolatedTaskC": true, + "IsolationGroup": task.isolationGroup, + }, + }) return task, nil case task := <-taskC: if task.ResponseC != nil { tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) } tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter) + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + TaskInfo: task.Info(), + EventName: "Matched Task Nonblocking", + Payload: map[string]any{ + "TaskIsForwarded": task.IsForwarded(), + "SyncMatched": task.ResponseC != nil, + "FromIsolatedTaskC": false, + "IsolationGroup": task.isolationGroup, + }, + }) return task, nil case task := <-queryTaskC: tm.scope.IncCounter(metrics.PollSuccessWithSyncPerTaskListCounter) tm.scope.IncCounter(metrics.PollSuccessPerTaskListCounter) return task, nil default: + event.Log(event.E{ + TaskListName: tm.tasklist.GetName(), + TaskListType: tm.tasklist.GetType(), + TaskListKind: tm.tasklistKind.Ptr(), + EventName: "Matcher Found No Tasks Nonblocking", + }) return nil, ErrNoTasks } } diff --git a/service/matching/tasklist/matcher_test.go b/service/matching/tasklist/matcher_test.go index cd8f6a788f8..a2433717c12 100644 --- a/service/matching/tasklist/matcher_test.go +++ b/service/matching/tasklist/matcher_test.go @@ -84,11 +84,11 @@ func (t *MatcherTestSuite) SetupTest() { t.cfg = tlCfg t.isolationGroups = []string{"dca1", "dca2"} t.fwdr = newForwarder(&t.cfg.ForwarderConfig, t.taskList, types.TaskListKindNormal, t.client, []string{"dca1", "dca2"}) - t.matcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"}, loggerimpl.NewNopLogger()) + t.matcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"}, loggerimpl.NewNopLogger(), t.taskList, types.TaskListKindNormal) rootTaskList := NewTestTaskListID(t.T(), t.taskList.GetDomainID(), t.taskList.Parent(20), persistence.TaskListTypeDecision) rootTasklistCfg := newTaskListConfig(rootTaskList, cfg, testDomainName) - t.rootMatcher = newTaskMatcher(rootTasklistCfg, nil, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"}, loggerimpl.NewNopLogger()) + t.rootMatcher = newTaskMatcher(rootTasklistCfg, nil, metrics.NoopScope(metrics.Matching), []string{"dca1", "dca2"}, loggerimpl.NewNopLogger(), t.taskList, types.TaskListKindNormal) } func (t *MatcherTestSuite) TearDownTest() { diff --git a/service/matching/tasklist/task.go b/service/matching/tasklist/task.go index 60a61f4adac..c27c27780d8 100644 --- a/service/matching/tasklist/task.go +++ b/service/matching/tasklist/task.go @@ -123,6 +123,14 @@ func (task *InternalTask) IsSyncMatch() bool { return task.ResponseC != nil } +func (task *InternalTask) Info() persistence.TaskInfo { + if task == nil || task.Event == nil || task.Event.TaskInfo == nil { + return persistence.TaskInfo{} + } + + return *task.Event.TaskInfo +} + func (task *InternalTask) WorkflowExecution() *types.WorkflowExecution { switch { case task.Event != nil: diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index eee07eaf2e7..fbbb3cea4fb 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -46,6 +46,7 @@ import ( "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" + "github.com/uber/cadence/service/matching/event" "github.com/uber/cadence/service/matching/liveness" "github.com/uber/cadence/service/matching/poller" ) @@ -195,7 +196,7 @@ func NewManager( if tlMgr.isFowardingAllowed(taskList, *taskListKind) { fwdr = newForwarder(&taskListConfig.ForwarderConfig, taskList, *taskListKind, matchingClient, isolationGroups) } - tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups, tlMgr.logger) + tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups, tlMgr.logger, taskList, *taskListKind) tlMgr.taskWriter = newTaskWriter(tlMgr) tlMgr.taskReader = newTaskReader(tlMgr, isolationGroups) tlMgr.startWG.Add(1) @@ -259,6 +260,12 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams) c.liveness.MarkAlive() } var syncMatch bool + e := event.E{ + TaskListName: c.taskListID.GetName(), + TaskListKind: &c.taskListKind, + TaskListType: c.taskListID.GetType(), + TaskInfo: *params.TaskInfo, + } _, err := c.executeWithRetry(func() (interface{}, error) { if err := ctx.Err(); err != nil { return nil, err @@ -289,6 +296,8 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams) // active task, try sync match first syncMatch, err = c.trySyncMatch(ctx, params, isolationGroup) if syncMatch { + e.EventName = "SyncMatched so not persisted" + event.Log(e) return &persistence.CreateTasksResponse{}, err } if params.ActivityTaskDispatchInfo != nil { @@ -298,9 +307,13 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams) if isForwarded { // forwarded from child partition - only do sync match // child partition will persist the task when sync match fails + e.EventName = "Could not SyncMatched Forwarded Task so not persisted" + event.Log(e) return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed } + e.EventName = "Task Sent to Writer" + event.Log(e) return c.taskWriter.appendTask(params.TaskInfo) }) diff --git a/service/matching/tasklist/task_list_manager_test.go b/service/matching/tasklist/task_list_manager_test.go index 7ff64337e13..187f827f257 100644 --- a/service/matching/tasklist/task_list_manager_test.go +++ b/service/matching/tasklist/task_list_manager_test.go @@ -89,6 +89,15 @@ func TestDeliverBufferTasks(t *testing.T) { } } +func TestTaskListString(t *testing.T) { + controller := gomock.NewController(t) + logger := testlogger.New(t) + tlm := createTestTaskListManager(t, logger, controller) + got := tlm.String() + want := "Activity task list tl\nRangeID=0\nTaskIDBlock={start:-99999 end:0}\nAckLevel=-1\nMaxReadLevel=-1\n" + assert.Equal(t, want, got) +} + func TestDeliverBufferTasks_NoPollers(t *testing.T) { controller := gomock.NewController(t) logger := testlogger.New(t) diff --git a/service/matching/tasklist/task_reader.go b/service/matching/tasklist/task_reader.go index 4a64e25924c..4d85ac11ffc 100644 --- a/service/matching/tasklist/task_reader.go +++ b/service/matching/tasklist/task_reader.go @@ -41,6 +41,7 @@ import ( "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" + "github.com/uber/cadence/service/matching/event" ) var epochStartTime = time.Unix(0, 0) @@ -174,6 +175,13 @@ dispatchLoop: if !ok { // Task list getTasks pump is shutdown break dispatchLoop } + event.Log(event.E{ + TaskListName: tr.taskListID.GetName(), + TaskListType: tr.taskListID.GetType(), + TaskListKind: &tr.tlMgr.taskListKind, + TaskInfo: *taskInfo, + EventName: "Attempting to Dispatch Buffered Task", + }) breakDispatchLoop := tr.dispatchSingleTaskFromBufferWithRetries(isolationGroup, taskInfo) if breakDispatchLoop { // shutting down @@ -414,11 +422,22 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskIn timerScope.Stop() cancel() + e := event.E{ + TaskListName: tr.taskListID.GetName(), + TaskListType: tr.taskListID.GetType(), + TaskListKind: &tr.tlMgr.taskListKind, + TaskInfo: *taskInfo, + } + if err == nil { + e.EventName = "Dispatched Buffered Task" + event.Log(e) return false, true } if errors.Is(err, context.Canceled) { + e.EventName = "Dispatch Failed because Context Cancelled" + event.Log(e) tr.logger.Info("Tasklist manager context is cancelled, shutting down") return true, true } @@ -436,6 +455,8 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskIn tag.Error(err), tag.WorkflowDomainID(taskInfo.DomainID), ) + e.EventName = "Dispatch Timed Out" + event.Log(e) tr.scope.IncCounter(metrics.AsyncMatchDispatchTimeoutCounterPerTaskList) // the idea here is that by re-fetching the isolation-groups, if something has shifted @@ -448,12 +469,22 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskIn // and let the decision get timed out and rescheduled to non-sticky tasklist if err == _stickyPollerUnavailableError { tr.completeTask(taskInfo, nil) + e.EventName = "Dispatch Failed because StickyPollerUnavailable" + event.Log(e) return false, true } // it should never happen, unless there is a bug in 'getIsolationGroupForTask' method tr.logger.Error("taskReader: unexpected error getting isolation group", tag.Error(err), tag.IsolationGroup(group)) + + e.EventName = "Dispatch Failed due to unexpected error getting isolation group" + e.Payload = map[string]any{ + "error": err, + } + event.Log(e) + e.Payload = nil + tr.completeTask(taskInfo, err) return false, true } @@ -482,12 +513,18 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskIn select { case <-tr.cancelCtx.Done(): // the task reader is shutting down + e.EventName = "Dispatch Failed because task reader is shutting down" + event.Log(e) return true, true case tr.taskBuffers[defaultTaskBufferIsolationGroup] <- taskInfo: // task successfully rerouted to default tasklist + e.EventName = "Task rerouted to default isolation group" + event.Log(e) return false, true default: // couldn't redirect, loop and try again + e.EventName = "Task is not rerouted to default isolation group. Will retry dispatch" + event.Log(e) return false, false } } @@ -498,6 +535,8 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskIn select { case <-tr.cancelCtx.Done(): // the task reader is shutting down + e.EventName = "Dispatch Failed because task reader is shutting down" + event.Log(e) return true, true case tr.taskBuffers[group] <- taskInfo: // successful redirect @@ -510,9 +549,19 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskIn tag.TaskID(taskInfo.TaskID), tag.WorkflowDomainID(taskInfo.DomainID), ) + + e.EventName = "Task forwarded to another isolation group" + e.Payload = map[string]any{ + "redirection-from-isolation-group": isolationGroup, + "redirection-to-isolation-group": group, + } + event.Log(e) + e.Payload = nil return false, true default: tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectFailureCounter) + e.EventName = "Task is not rerouted to another isolation group. Will retry dispatch" + event.Log(e) tr.logger.Error("some tasks could not be redirected to another isolation group as the buffer's already full", tag.WorkflowRunID(taskInfo.RunID), tag.Dynamic("redirection-from-isolation-group", isolationGroup), @@ -527,11 +576,20 @@ func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskIn } if errors.Is(err, ErrTasklistThrottled) { + e.EventName = "Dispatch failed because throttled. Will retry dispatch" + event.Log(e) tr.scope.IncCounter(metrics.BufferThrottlePerTaskListCounter) runtime.Gosched() return false, false } + e.EventName = "Dispatch failed because of unknown error. Will retry dispatch" + e.Payload = map[string]any{ + "error": err, + } + event.Log(e) + e.Payload = nil + tr.scope.IncCounter(metrics.BufferUnknownTaskDispatchError) tr.logger.Error("unknown error while dispatching task", tag.Error(err), diff --git a/service/matching/tasklist/task_writer.go b/service/matching/tasklist/task_writer.go index 02914f6164a..d8943fefb6b 100644 --- a/service/matching/tasklist/task_writer.go +++ b/service/matching/tasklist/task_writer.go @@ -33,7 +33,9 @@ import ( "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" + "github.com/uber/cadence/service/matching/event" ) type ( @@ -218,11 +220,20 @@ writerLoop: } tasks := []*persistence.CreateTaskInfo{} + events := []event.E{} for i, req := range reqs { tasks = append(tasks, &persistence.CreateTaskInfo{ TaskID: taskIDs[i], Data: req.taskInfo, }) + kind := types.TaskListKind(w.db.taskListKind) + events = append(events, event.E{ + TaskListName: w.db.taskListName, + TaskListKind: &kind, + TaskListType: w.db.taskType, + TaskInfo: *req.taskInfo, + EventName: "Task Written to DB", + }) maxReadLevel = taskIDs[i] } @@ -235,6 +246,8 @@ writerLoop: tag.Number(taskIDs[0]), tag.NextNumber(taskIDs[batchSize-1]), ) + } else { + event.Log(events...) } // Update the maxReadLevel after the writes are completed. if maxReadLevel > 0 {