Skip to content

Commit

Permalink
fix: Fix flakes in Worker Tuner and Workflow Update tests (#1463)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh authored Jul 11, 2024
1 parent 913e67e commit f756351
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 155 deletions.
17 changes: 13 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ env:
|| startsWith(github.ref, 'refs/heads/releases'))
&& github.event_name != 'pull_request'
}}
TESTS_CLI_VERSION: 'v0.12.0'
# Use these variables to force specific version of CLI/Time Skipping Server for SDK tests
# TESTS_CLI_VERSION: 'v0.13.2'
# TESTS_TIME_SKIPPING_SERVER_VERSION: 'v1.24.1'

jobs:
# Compile native bridge code for each target platform.
Expand Down Expand Up @@ -281,21 +283,28 @@ jobs:
if: matrix.server == 'cli'
shell: bash
run: |
temporal server start-dev --headless &
temporal server start-dev --headless &> /tmp/devserver.log &
- name: Run Tests
run: npm test
env:
RUN_INTEGRATION_TESTS: true
REUSE_V8_CONTEXT: ${{ matrix.reuse-v8-context }}

- name: Upload logs
- name: Upload NPM logs
uses: actions/upload-artifact@v4
if: failure()
if: failure() || cancelled()
with:
name: integration-tests-${{ matrix.platform }}-node${{ matrix.node }}-${{ matrix.server }}-${{ matrix.reuse-v8-context && 'reuse' || 'noreuse' }}-logs
path: ${{ startsWith(matrix.platform, 'windows') && 'C:\\npm\\_logs\\' || '~/.npm/_logs/' }}

- name: Upload Dev Server logs
uses: actions/upload-artifact@v4
if: failure() || cancelled()
with:
name: integration-tests-${{ matrix.platform }}-node${{ matrix.node }}-${{ matrix.server }}-${{ matrix.reuse-v8-context && 'reuse' || 'noreuse' }}-devserver-logs
path: /tmp/devserver.log

# Tests that npm init @temporalio results in a working worker and client
test-npm-init:
needs: build-packages
Expand Down
6 changes: 4 additions & 2 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -806,13 +806,15 @@ export class WorkflowClient extends BaseClient {
waitForStage: WorkflowUpdateStage,
input: WorkflowStartUpdateInput
): Promise<WorkflowStartUpdateOutput> {
waitForStage = waitForStage >= WorkflowUpdateStage.ACCEPTED ? waitForStage : WorkflowUpdateStage.ACCEPTED;
const waitForStageProto = workflowUpdateStage.toProtoEnum(waitForStage);
const updateId = input.options?.updateId ?? uuid4();
const req: temporal.api.workflowservice.v1.IUpdateWorkflowExecutionRequest = {
namespace: this.options.namespace,
workflowExecution: input.workflowExecution,
firstExecutionRunId: input.firstExecutionRunId,
waitPolicy: {
lifecycleStage: workflowUpdateStage.toProtoEnum(waitForStage),
lifecycleStage: waitForStageProto,
},
request: {
meta: {
Expand All @@ -834,7 +836,7 @@ export class WorkflowClient extends BaseClient {
try {
do {
response = await this.workflowService.updateWorkflowExecution(req);
} while (response.stage < waitForStage && response.stage < WorkflowUpdateStage.ACCEPTED);
} while (response.stage < waitForStageProto);
} catch (err) {
this.rethrowUpdateGrpcError(err, 'Workflow Update failed', input.workflowExecution);
}
Expand Down
22 changes: 16 additions & 6 deletions packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
WorkerOptions,
WorkflowBundle,
bundleWorkflowCode,
makeTelemetryFilterString,
} from '@temporalio/worker';
import * as workflow from '@temporalio/workflow';
import { ConnectionInjectorInterceptor } from './activities/interceptors';
Expand Down Expand Up @@ -52,8 +53,22 @@ export function makeTestFunction(opts: {
}): TestFn<Context> {
const test = anyTest as TestFn<Context>;
test.before(async (t) => {
const workflowBundle = await bundleWorkflowCode({
...bundlerOptions,
workflowInterceptorModules: [...defaultWorkflowInterceptorModules, ...(opts.workflowInterceptorModules ?? [])],
workflowsPath: opts.workflowsPath,
});
// Ignore invalid log levels
Runtime.install({ logger: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel) });
Runtime.install({
logger: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel),
telemetryOptions: {
logging: {
filter: makeTelemetryFilterString({
core: (process.env.TEST_LOG_LEVEL || 'INFO').toUpperCase() as LogLevel,
}),
},
},
});
const env = await TestWorkflowEnvironment.createLocal({
...opts.workflowEnvironmentOpts,
server: {
Expand All @@ -65,11 +80,6 @@ export function makeTestFunction(opts: {
},
});
await registerDefaultCustomSearchAttributes(env.connection);
const workflowBundle = await bundleWorkflowCode({
...bundlerOptions,
workflowInterceptorModules: [...defaultWorkflowInterceptorModules, ...(opts.workflowInterceptorModules ?? [])],
workflowsPath: opts.workflowsPath,
});
t.context = {
env,
workflowBundle,
Expand Down
16 changes: 7 additions & 9 deletions packages/test/src/test-worker-lifecycle.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
/* eslint-disable no-duplicate-imports */
// ^ needed for lint passing in CI
/**
* Test the various states of a Worker.
* Most tests use a mocked core, some tests run serially because they emit signals to the process
Expand All @@ -16,7 +14,7 @@ if (RUN_INTEGRATION_TESTS) {
test.serial('Worker shuts down gracefully', async (t) => {
const worker = await Worker.create({
...defaultOptions,
taskQueue: 'shutdown-test',
taskQueue: t.title.replace(/ /g, '_'),
});
t.is(worker.getState(), 'INITIALIZED');
const p = worker.run();
Expand All @@ -33,7 +31,7 @@ if (RUN_INTEGRATION_TESTS) {
test.serial('Worker shuts down gracefully if interrupted before running', async (t) => {
const worker = await Worker.create({
...defaultOptions,
taskQueue: 'shutdown-test',
taskQueue: t.title.replace(/ /g, '_'),
});
t.is(worker.getState(), 'INITIALIZED');
process.emit('SIGINT', 'SIGINT');
Expand All @@ -47,7 +45,7 @@ if (RUN_INTEGRATION_TESTS) {
await t.throwsAsync(
Worker.create({
...defaultOptions,
taskQueue: 'shutdown-test',
taskQueue: t.title.replace(/ /g, '_'),
namespace: 'oogabooga',
}),
{
Expand All @@ -61,7 +59,7 @@ if (RUN_INTEGRATION_TESTS) {
test.serial('Mocked run shuts down gracefully', async (t) => {
try {
const worker = isolateFreeWorker({
taskQueue: 'shutdown-test',
taskQueue: t.title.replace(/ /g, '_'),
});
t.is(worker.getState(), 'INITIALIZED');
const p = worker.run();
Expand All @@ -78,7 +76,7 @@ test.serial('Mocked run shuts down gracefully', async (t) => {
test.serial('Mocked run shuts down gracefully if interrupted before running', async (t) => {
try {
const worker = isolateFreeWorker({
taskQueue: 'shutdown-test',
taskQueue: t.title.replace(/ /g, '_'),
});
// worker.native.initiateShutdown = () => new Promise(() => undefined);
t.is(worker.getState(), 'INITIALIZED');
Expand All @@ -95,7 +93,7 @@ test.serial('Mocked run shuts down gracefully if interrupted before running', as
test.serial('Mocked run throws if not shut down gracefully', async (t) => {
const worker = isolateFreeWorker({
shutdownForceTime: '5ms',
taskQueue: 'shutdown-test',
taskQueue: t.title.replace(/ /g, '_'),
});
t.is(worker.getState(), 'INITIALIZED');
const p = worker.run();
Expand All @@ -113,7 +111,7 @@ test.serial('Mocked run throws if not shut down gracefully', async (t) => {
test.serial('Mocked throws combined error in runUntil', async (t) => {
const worker = isolateFreeWorker({
shutdownForceTime: '5ms',
taskQueue: 'shutdown-test',
taskQueue: t.title.replace(/ /g, '_'),
});
worker.native.initiateShutdown = () => new Promise(() => undefined);
const err = await t.throwsAsync(
Expand Down
Loading

0 comments on commit f756351

Please # to comment.