Skip to content

Commit

Permalink
expose task generation interval, add task latency quantiles to output
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed Jul 31, 2024
1 parent aaf400f commit e749bd4
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 17 deletions.
29 changes: 23 additions & 6 deletions host/matching_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,12 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
generatedTasksCounter := int32(0)
lastTaskScheduleID := int32(0)
numGenerators := getNumGenerators(s.testClusterConfig.MatchingConfig.SimulationConfig.NumTaskGenerators)
numTasksToGenerate := 10000
taskGenerateInterval := getTaskGenerateInterval(s.testClusterConfig.MatchingConfig.SimulationConfig.TaskGeneratorTickInterval)
maxTasksToGenerate := getMaxTaskstoGenerate(s.testClusterConfig.MatchingConfig.SimulationConfig.MaxTaskToGenerate)
var generatorWG sync.WaitGroup
for i := 1; i <= numGenerators; i++ {
generatorWG.Add(1)
go s.generate(ctx, matchingClient, domainID, tasklist, numTasksToGenerate, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG)
go s.generate(ctx, matchingClient, domainID, tasklist, maxTasksToGenerate, taskGenerateInterval, &generatedTasksCounter, &lastTaskScheduleID, &generatorWG)
}

// Start pollers
Expand Down Expand Up @@ -215,6 +216,7 @@ func (s *MatchingSimulationSuite) TestMatchingSimulation() {
testSummary = append(testSummary, fmt.Sprintf("Num of Pollers: %d", numPollers))
testSummary = append(testSummary, fmt.Sprintf("Poll Timeout: %v", pollDuration))
testSummary = append(testSummary, fmt.Sprintf("Num of Task Generators: %d", numGenerators))
testSummary = append(testSummary, fmt.Sprintf("Task generated every: %d", taskGenerateInterval))
testSummary = append(testSummary, fmt.Sprintf("Num of Write Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistWritePartitions]))
testSummary = append(testSummary, fmt.Sprintf("Num of Read Partitions: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingNumTasklistReadPartitions]))
testSummary = append(testSummary, fmt.Sprintf("Forwarder Max Outstanding Polls: %d", s.testClusterConfig.MatchingDynamicConfigOverrides[dynamicconfig.MatchingForwarderMaxOutstandingPolls]))
Expand Down Expand Up @@ -249,13 +251,14 @@ func (s *MatchingSimulationSuite) generate(
ctx context.Context,
matchingClient MatchingClient,
domainID, tasklist string,
numTasks int,
maxTasksToGenerate int,
taskGenerateInterval time.Duration,
generatedTasksCounter *int32,
lastTaskScheduleID *int32,
wg *sync.WaitGroup) {
defer wg.Done()

t := time.NewTicker(50 * time.Millisecond)
t := time.NewTicker(taskGenerateInterval)
defer t.Stop()

for {
Expand All @@ -265,8 +268,8 @@ func (s *MatchingSimulationSuite) generate(
return
case <-t.C:
scheduleID := int(atomic.AddInt32(lastTaskScheduleID, 1))
if scheduleID > numTasks {
s.log("Generated %d tasks so generator will stop", numTasks)
if scheduleID > maxTasksToGenerate {
s.log("Generated %d tasks so generator will stop", maxTasksToGenerate)
return
}
decisionTask := newDecisionTask(domainID, tasklist, scheduleID)
Expand Down Expand Up @@ -394,6 +397,20 @@ func newDecisionTask(domainID, tasklist string, i int) *types.AddDecisionTaskReq
}
}

func getMaxTaskstoGenerate(i int) int {
if i == 0 {
return 2000
}
return i
}

func getTaskGenerateInterval(i time.Duration) time.Duration {
if i == 0 {
return 50 * time.Millisecond
}
return i
}

func getNumGenerators(i int) int {
if i == 0 {
return 1
Expand Down
25 changes: 16 additions & 9 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,31 +154,38 @@ type (
}

MatchingSimulationConfig struct {
// number of task list write partitions defaults to 1
// Number of task list write partitions defaults to 1
TaskListWritePartitions int

// number of task list read partitions defaults to 1
// Number of task list read partitions defaults to 1
TaskListReadPartitions int

// number of pollers defaults to 10
// Number of pollers defaults to 10
NumPollers int

// number of task generators defaults to 1
// Number of task generators defaults to 1
NumTaskGenerators int

// poll request timeout defaults to 1 second
// Each generator will produce a new task every TaskGeneratorTickInterval. Defaults to 50ms
TaskGeneratorTickInterval time.Duration

// Upper limit of tasks to generate. Task generators will stop if total number of tasks generated reaches MaxTaskToGenerate during simulation
// Defaults to 2k
MaxTaskToGenerate int

// Poll request timeout defaults to 1 second
PollTimeout time.Duration

// at most N polls will be forwarded at a time. defaults to 20
// At most N polls will be forwarded at a time. defaults to 20
ForwarderMaxOutstandingPolls int

// at most N tasks will be forwarded at a time. defaults to 1
// At most N tasks will be forwarded at a time. defaults to 1
ForwarderMaxOutstandingTasks int

// forwarder rps limit defaults to 10
// Forwarder rps limit defaults to 10
ForwarderMaxRatePerSecond int

// children per node. defaults to 20
// Children per node. defaults to 20
ForwarderMaxChildrenPerNode int
}

Expand Down
2 changes: 2 additions & 0 deletions host/testdata/matching_simulation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ matchingconfig:
tasklistreadpartitions: 2
numpollers: 10
numtaskgenerators: 2
taskgeneratortickinterval: 50ms
maxtasktogenerate: 3000
polltimeout: 5s
forwardermaxoutstandingpolls: 20
forwardermaxoutstandingtasks: 1
Expand Down
20 changes: 18 additions & 2 deletions scripts/run_matching_simulator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,32 @@ cat test.log | sed -n '/Simulation Summary/,/End of Simulation Summary/p' | grep

tmp=$(cat "$eventLogsFile" \
| jq -c 'select(.EventName == "PollForDecisionTask returning task")' \
| jq .Payload.Latency | awk '{s+=$1}END{print s/NR}' RS=" ")
| jq .Payload.Latency | awk '{s+=$0}END{print s/NR}')
echo "Avg Task latency (ms): $tmp" | tee -a $testSummaryFile

tmp=$(cat "$eventLogsFile" \
| jq -c 'select(.EventName == "PollForDecisionTask returning task")' \
| jq .Payload.Latency | sort -n | awk '{a[NR]=$0}END{print a[int(NR*0.75)]}')
echo "P75 Task latency (ms): $tmp" | tee -a $testSummaryFile

tmp=$(cat "$eventLogsFile" \
| jq -c 'select(.EventName == "PollForDecisionTask returning task")' \
| jq .Payload.Latency | sort -n | awk '{a[NR]=$0}END{print a[int(NR*0.95)]}')
echo "P95 Task latency (ms): $tmp" | tee -a $testSummaryFile


tmp=$(cat "$eventLogsFile" \
| jq -c 'select(.EventName == "PollForDecisionTask returning task")' \
| jq .Payload.Latency | sort -n | awk '{a[NR]=$0}END{print a[int(NR*0.99)]}')
echo "P99 Task latency (ms): $tmp" | tee -a $testSummaryFile


tmp=$(cat "$eventLogsFile" \
| jq -c 'select(.EventName == "PollForDecisionTask returning task")' \
| jq .Payload.Latency | sort -n | tail -n 1)
echo "Max Task latency (ms): $tmp" | tee -a $testSummaryFile



tmp=$(cat "$eventLogsFile" \
| jq -c 'select(.EventName == "PollForDecisionTask returning task")' \
| jq '{ScheduleID,TaskListName,EventName,Payload}' \
Expand Down

0 comments on commit e749bd4

Please # to comment.