diff --git a/.github/workflows/all-docker-images.yaml b/.github/workflows/all-docker-images.yaml index 2bd34804..38b8ce4e 100644 --- a/.github/workflows/all-docker-images.yaml +++ b/.github/workflows/all-docker-images.yaml @@ -14,7 +14,7 @@ on: description: Python SDK ver to build. Skipped if not specified. Must start with v. type: string php-ver: - description: PHP SDK ver to build. Skipped if not specified. + description: PHP SDK ver to build. Skipped if not specified. Must start with v. type: string ts-ver: description: TypeScript SDK ver to build. Skipped if not specified. Must start with v. diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a154ce43..b793aed8 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -16,6 +16,9 @@ on: # rebuild any PRs and main branch changes java_sdk_version: default: '' type: string + php_sdk_version: + default: '' + type: string python_sdk_version: default: '' type: string @@ -39,6 +42,7 @@ jobs: go_latest: ${{ steps.latest_version.outputs.go_latest }} typescript_latest: ${{ steps.latest_version.outputs.typescript_latest }} java_latest: ${{ steps.latest_version.outputs.java_latest }} + php_latest: ${{ steps.latest_version.outputs.php_latest }} python_latest: ${{ steps.latest_version.outputs.python_latest }} csharp_latest: ${{ steps.latest_version.outputs.csharp_latest }} steps: @@ -75,6 +79,13 @@ jobs: fi echo "java_latest=$java_latest" >> $GITHUB_OUTPUT + php_latest="${{ github.event.inputs.php_sdk_version }}" + if [ -z "$php_latest" ]; then + php_latest=$(./temporal-features latest-sdk-version --lang php) + echo "Derived latest PHP SDK release version: $php_latest" + fi + echo "php_latest=$php_latest" >> $GITHUB_OUTPUT + python_latest="${{ github.event.inputs.python_sdk_version }}" if [ -z "$python_latest" ]; then python_latest=$(./temporal-features latest-sdk-version --lang py) @@ -122,6 +133,23 @@ jobs: - run: poetry install --no-root - run: poe lint + build-php: + strategy: + fail-fast: true + matrix: + os: [ubuntu-latest] + runs-on: ${{ matrix.os }} + steps: + - name: Print build information + run: 'echo head_ref: "$GITHUB_HEAD_REF", ref: "$GITHUB_REF", os: ${{ matrix.os }}' + - uses: actions/checkout@v4 + - name: Setup PHP 8.2 + uses: shivammathur/setup-php@v2 + with: + php-version: 8.2 + tools: composer:v2 + extensions: dom, sockets, grpc, curl, protobuf + build-java: strategy: fail-fast: true @@ -182,6 +210,15 @@ jobs: features-repo-ref: ${{ github.head_ref }} features-repo-path: ${{ github.event.pull_request.head.repo.full_name }} + feature-tests-php: + needs: build-go + uses: ./.github/workflows/php.yaml + with: + version: ${{ needs.build-go.outputs.php_latest }} + version-is-repo-ref: false + features-repo-ref: ${{ github.head_ref }} + features-repo-path: ${{ github.event.pull_request.head.repo.full_name }} + feature-tests-java: needs: build-go uses: ./.github/workflows/java.yaml @@ -209,6 +246,6 @@ jobs: go-ver: 'v${{ needs.build-go.outputs.go_latest }}' ts-ver: 'v${{ needs.build-go.outputs.typescript_latest }}' java-ver: 'v${{ needs.build-go.outputs.java_latest }}' - php-ver: '${{ needs.build-go.outputs.php_latest }}' + php-ver: 'v${{ needs.build-go.outputs.php_latest }}' py-ver: 'v${{ needs.build-go.outputs.python_latest }}' cs-ver: 'v${{ needs.build-go.outputs.csharp_latest }}' diff --git a/.github/workflows/docker-images.yaml b/.github/workflows/docker-images.yaml index 4f02c1a7..ed1fae1a 100644 --- a/.github/workflows/docker-images.yaml +++ b/.github/workflows/docker-images.yaml @@ -65,6 +65,8 @@ jobs: # This step will set the FEATURES_BUILT_IMAGE_TAG env key - name: Build docker image + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | go run . build-image --lang ${{ inputs.lang }} \ ${{ inputs.sdk-repo-ref && format('--repo-ref {0}', inputs.sdk-repo-ref) || '' }} \ diff --git a/.github/workflows/php.yaml b/.github/workflows/php.yaml new file mode 100644 index 00000000..c9ef4a6e --- /dev/null +++ b/.github/workflows/php.yaml @@ -0,0 +1,101 @@ +name: PHP Features Testing +on: + workflow_call: + inputs: + php-repo-path: + type: string + default: 'temporal/sdk' + version: + required: true + type: string + # When true, the default version will be used (actually it's the latest tag) + version-is-repo-ref: + required: true + type: boolean + features-repo-path: + type: string + default: 'temporalio/features' + features-repo-ref: + type: string + default: 'main' + # If set, download the docker image for server from the provided artifact name + docker-image-artifact-name: + type: string + required: false + +jobs: + test: + runs-on: ubuntu-latest + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + defaults: + run: + working-directory: ./features + steps: + - name: Print git info + run: 'echo head_ref: "$GITHUB_HEAD_REF", ref: "$GITHUB_REF", PHP sdk version: ${{ inputs.version }}' + working-directory: '.' + + - name: Download docker artifacts + if: ${{ inputs.docker-image-artifact-name }} + uses: actions/download-artifact@v3 + with: + name: ${{ inputs.docker-image-artifact-name }} + path: /tmp/server-docker + + - name: Load server Docker image + if: ${{ inputs.docker-image-artifact-name }} + run: docker load --input /tmp/server-docker/temporal-autosetup.tar + working-directory: '.' + + - name: Override IMAGE_TAG environment variable + if: ${{ inputs.docker-image-artifact-name }} + run: | + image_tag=latest + # image_tag won't exist on older builds (like 1.22.0), so default to latest + if [ -f /tmp/server-docker/image_tag ]; then + image_tag=$(cat /tmp/server-docker/image_tag) + fi + echo "IMAGE_TAG=${image_tag}" >> $GITHUB_ENV + working-directory: '.' + + - name: Checkout SDK features repo + uses: actions/checkout@v4 + with: + path: features + repository: ${{ inputs.features-repo-path }} + ref: ${{ inputs.features-repo-ref }} + + - uses: actions/setup-go@v2 + with: + go-version: '^1.22' + - name: Setup PHP 8.2 + uses: shivammathur/setup-php@v2 + with: + php-version: 8.2 + tools: composer:v2 + extensions: dom, sockets, grpc, curl, protobuf + - name: Start containerized server and dependencies + if: inputs.docker-image-artifact-name + run: | + docker compose \ + -f ./dockerfiles/docker-compose.for-server-image.yaml \ + -f /tmp/server-docker/docker-compose.yml \ + up -d temporal-server cassandra elasticsearch + + - name: Run SDK-features tests directly + if: inputs.docker-image-artifact-name == '' + run: go run . run --lang php ${{ inputs.docker-image-artifact-name && '--server localhost:7233 --namespace default' || ''}} --version "${{ inputs.version-is-repo-ref && '' || inputs.version }}" + + # Running the tests in their own step keeps the logs readable + - name: Run containerized SDK-features tests + if: inputs.docker-image-artifact-name + run: | + docker compose \ + -f ./dockerfiles/docker-compose.for-server-image.yaml \ + -f /tmp/server-docker/docker-compose.yml \ + up --no-log-prefix --exit-code-from features-tests-php features-tests-php + + - name: Tear down docker compose + if: inputs.docker-image-artifact-name && (success() || failure()) + run: docker compose -f ./dockerfiles/docker-compose.for-server-image.yaml -f /tmp/server-docker/docker-compose.yml down -v diff --git a/README.md b/README.md index 1c1bc731..6db70f62 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ These features serve several purposes: With latest [Go](https://golang.org/) installed, run: ``` -go build -o temporal-features # or temporal-features.exec on Windows +go build -o temporal-features # or temporal-features.exe on Windows ``` ## Running @@ -31,6 +31,8 @@ Prerequisites: - [Poetry](https://python-poetry.org/): `poetry install` - `setuptools`: `python -m pip install -U setuptools` - [.NET](https://dotnet.microsoft.com) 7+ +- [PHP](https://www.php.net/) 8.1+ + - [Composer](https://getcomposer.org/) Command: @@ -38,7 +40,7 @@ Command: Note, `go run .` can be used in place of `go build` + `temporal-features` to save on the build step. -`LANG` can be `go`, `java`, `ts`, `py`, or `cs`. `VERSION` is per SDK and if left off, uses the latest version set for +`LANG` can be `go`, `java`, `ts`, `php`, `py`, or `cs`. `VERSION` is per SDK and if left off, uses the latest version set for the language in this repository. `PATTERN` must match either the features relative directory _or_ the relative directory + `/feature.` via diff --git a/cmd/prepare.go b/cmd/prepare.go index 08138d42..d08f204c 100644 --- a/cmd/prepare.go +++ b/cmd/prepare.go @@ -90,6 +90,8 @@ func (p *Preparer) Prepare(ctx context.Context) error { _, err = p.BuildJavaProgram(ctx, true) case "ts": _, err = p.BuildTypeScriptProgram(ctx) + case "php": + _, err = p.BuildPhpProgram(ctx) case "py": _, err = p.BuildPythonProgram(ctx) case "cs": diff --git a/cmd/run.go b/cmd/run.go index b0db3b09..0056301a 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -216,7 +216,7 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { dynamicConfigArgs := make([]string, 0, len(yamlValues)) for key, values := range yamlValues { for _, value := range values { - asJsonStr, err := json.Marshal(value) + asJsonStr, err := json.Marshal(value.Value) if err != nil { return fmt.Errorf("unable to marshal dynamic config value %s: %w", key, err) } @@ -312,6 +312,16 @@ func (r *Runner) Run(ctx context.Context, patterns []string) error { if err == nil { err = r.RunTypeScriptExternal(ctx, run) } + case "php": + if r.config.DirName != "" { + r.program, err = sdkbuild.PhpProgramFromDir( + filepath.Join(r.rootDir, r.config.DirName), + r.rootDir, + ) + } + if err == nil { + err = r.RunPhpExternal(ctx, run) + } case "py": if r.config.DirName != "" { r.program, err = sdkbuild.PythonProgramFromDir(filepath.Join(r.rootDir, r.config.DirName)) @@ -562,7 +572,7 @@ func (r *Runner) destroyTempDir() { func normalizeLangName(lang string) (string, error) { // Normalize to file extension switch lang { - case "go", "java", "ts", "py", "cs": + case "go", "java", "ts", "php", "py", "cs": case "typescript": lang = "ts" case "python": @@ -578,7 +588,7 @@ func normalizeLangName(lang string) (string, error) { func expandLangName(lang string) (string, error) { // Expand to lang name switch lang { - case "go", "java", "typescript", "python": + case "go", "java", "typescript", "php", "python": case "ts": lang = "typescript" case "py": diff --git a/cmd/run_php.go b/cmd/run_php.go new file mode 100644 index 00000000..e906e307 --- /dev/null +++ b/cmd/run_php.go @@ -0,0 +1,77 @@ +package cmd + +import ( + "context" + "fmt" + "path/filepath" + "github.com/temporalio/features/harness/go/cmd" + "github.com/temporalio/features/sdkbuild" +) + +// PreparePhpExternal prepares a PHP run without running it. The preparer +// config directory if present is expected to be a subdirectory name just +// beneath the root directory. +func (p *Preparer) BuildPhpProgram(ctx context.Context) (sdkbuild.Program, error) { + p.log.Info("Building PHP project", "DirName", p.config.DirName) + + prog, err := sdkbuild.BuildPhpProgram(ctx, sdkbuild.BuildPhpProgramOptions{ + DirName: p.config.DirName, + Version: p.config.Version, + RootDir: p.rootDir, + }) + if err != nil { + p.log.Error("failed preparing: %w", err) + return nil, fmt.Errorf("failed preparing: %w", err) + } + return prog, nil +} + +// RunPhpExternal runs the PHP run in an external process. This expects +// the server to already be started. +func (r *Runner) RunPhpExternal(ctx context.Context, run *cmd.Run) error { + // If program not built, build it + if r.program == nil { + var err error + if r.program, err = NewPreparer(r.config.PrepareConfig).BuildPhpProgram(ctx); err != nil { + return err + } + } + + // Compose RoadRunner command options + args := append( + []string{ + // Namespace + "namespace=" + r.config.Namespace, + // Server address + "address=" + r.config.Server, + }, + // Features + run.ToArgs()..., + ) + // TLS + if r.config.ClientCertPath != "" { + clientCertPath, err := filepath.Abs(r.config.ClientCertPath) + if err != nil { + return err + } + args = append(args, "tls.cert="+clientCertPath) + } + if r.config.ClientKeyPath != "" { + clientKeyPath, err := filepath.Abs(r.config.ClientKeyPath) + if err != nil { + return err + } + args = append(args, "tls.key="+clientKeyPath) + } + + // Run + cmd, err := r.program.NewCommand(ctx, args...) + if err == nil { + // r.log.Debug("Running PHP separately", "Args", cmd.Args) + err = cmd.Run() + } + if err != nil { + return fmt.Errorf("failed running: %w", err) + } + return nil +} diff --git a/dockerfiles/docker-compose.for-server-image.yaml b/dockerfiles/docker-compose.for-server-image.yaml index 4e5d8d7f..48566d62 100644 --- a/dockerfiles/docker-compose.for-server-image.yaml +++ b/dockerfiles/docker-compose.for-server-image.yaml @@ -69,3 +69,13 @@ services: - temporal-server networks: - temporal-dev-network + + features-tests-php: + image: temporaliotest/features:php + environment: + - WAIT_EXTRA_FOR_NAMESPACE + command: ['--server', 'temporal-server:7233', '--namespace', 'default'] + depends_on: + - temporal-server + networks: + - temporal-dev-network diff --git a/dockerfiles/dynamicconfig/docker.yaml b/dockerfiles/dynamicconfig/docker.yaml index e4fba6e9..a0eb514c 100644 --- a/dockerfiles/dynamicconfig/docker.yaml +++ b/dockerfiles/dynamicconfig/docker.yaml @@ -1,10 +1,10 @@ -frontend.forceSearchAttributesCacheRefreshOnRead: +system.forceSearchAttributesCacheRefreshOnRead: - value: true constraints: {} -frontend.enableActivityEagerExecution: +system.enableActivityEagerExecution: - value: true constraints: {} -frontend.enableEagerWorkflowStart: +system.enableEagerWorkflowStart: - value: true constraints: {} frontend.enableUpdateWorkflowExecution: diff --git a/dockerfiles/php.Dockerfile b/dockerfiles/php.Dockerfile index 8f2c2abd..83fbbafb 100644 --- a/dockerfiles/php.Dockerfile +++ b/dockerfiles/php.Dockerfile @@ -1,20 +1,16 @@ # Build in a full featured container -FROM php:8.2 as build +FROM php:8.2-cli as build # Install protobuf compiler RUN apt-get update \ && DEBIAN_FRONTEND=noninteractive \ apt-get install --no-install-recommends --assume-yes \ - protobuf-compiler=3.* libprotobuf-dev=3.* wget=* + protobuf-compiler=3.* libprotobuf-dev=3.* wget=* git=* # Get go compiler ARG PLATFORM=amd64 -RUN wget -q https://go.dev/dl/go1.22.4.linux-${PLATFORM}.tar.gz \ - && tar -C /usr/local -xzf go1.22.4.linux-${PLATFORM}.tar.gz -# Install Rust for compiling the core bridge - only required for installation from a repo but is cheap enough to install -# in the "build" container (-y is for non-interactive install) -# hadolint ignore=DL4006 -RUN wget -q -O - https://sh.rustup.rs | sh -s -- -y +RUN wget -q https://go.dev/dl/go1.22.5.linux-${PLATFORM}.tar.gz \ + && tar -C /usr/local -xzf go1.22.5.linux-${PLATFORM}.tar.gz # Install composer COPY --from=composer:2.3 /usr/bin/composer /usr/bin/composer @@ -22,6 +18,7 @@ COPY --from=composer:2.3 /usr/bin/composer /usr/bin/composer WORKDIR /app # Copy CLI build dependencies +COPY dockerfiles/dynamicconfig ./dockerfiles/dynamicconfig COPY features ./features COPY harness ./harness COPY sdkbuild ./sdkbuild @@ -45,6 +42,7 @@ RUN CGO_ENABLED=0 ./temporal-features prepare --lang php --dir prepared --versio # Copy the CLI and prepared feature to a smaller container for running FROM spiralscout/php-grpc:8.2 +COPY --from=build /app/dockerfiles/dynamicconfig /app/dockerfiles/dynamicconfig COPY --from=build /app/temporal-features /app/temporal-features COPY --from=build /app/features /app/features COPY --from=build /app/prepared /app/prepared diff --git a/features/activity/basic_no_workflow_timeout/feature.php b/features/activity/basic_no_workflow_timeout/feature.php new file mode 100644 index 00000000..15db6fca --- /dev/null +++ b/features/activity/basic_no_workflow_timeout/feature.php @@ -0,0 +1,53 @@ +withScheduleToCloseTimeout('1 minute'), + )->echo(); + + return yield Workflow::newActivityStub( + FeatureActivity::class, + ActivityOptions::new()->withStartToCloseTimeout('1 minute'), + )->echo(); + } +} + +#[ActivityInterface] +class FeatureActivity +{ + #[ActivityMethod('echo')] + public function echo(): string + { + return 'echo'; + } +} + +class FeatureChecker +{ + #[Check] + public static function check(#[Stub('Workflow')] WorkflowStubInterface $stub): void + { + Assert::same($stub->getResult(), 'echo'); + } +} diff --git a/features/activity/cancel_try_cancel/feature.php b/features/activity/cancel_try_cancel/feature.php new file mode 100644 index 00000000..21fc7bd2 --- /dev/null +++ b/features/activity/cancel_try_cancel/feature.php @@ -0,0 +1,109 @@ +withScheduleToCloseTimeout('1 minute') + ->withHeartbeatTimeout('5 seconds') + # Disable retry + ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(1)) + ->withCancellationType(Activity\ActivityCancellationType::TryCancel) + ); + + $scope = Workflow::async(static fn () => $activity->cancellableActivity()); + + # Sleep for short time (force task turnover) + yield Workflow::timer(2); + + try { + $scope->cancel(); + yield $scope; + } catch (CanceledFailure) { + # Expected + } + + # Wait for activity result + yield Workflow::awaitWithTimeout('5 seconds', fn() => $this->result !== ''); + + return $this->result; + } + + #[Workflow\SignalMethod('activity_result')] + public function activityResult(string $result) + { + $this->result = $result; + } +} + +#[ActivityInterface] +class FeatureActivity +{ + public function __construct( + private readonly WorkflowClientInterface $client, + ) {} + + /** + * @return PromiseInterface + */ + #[ActivityMethod('cancellable_activity')] + public function cancellableActivity() + { + # Heartbeat every second for a minute + $result = 'timeout'; + try { + for ($i = 0; $i < 5_0; $i++) { + \usleep(100_000); + Activity::heartbeat($i); + } + } catch (ActivityCanceledException $e) { + $result = 'cancelled'; + } catch (\Throwable $e) { + $result = 'unexpected'; + } + + # Send result as signal to workflow + $execution = Activity::getInfo()->workflowExecution; + $this->client + ->newRunningWorkflowStub(FeatureWorkflow::class, $execution->getID(), $execution->getRunID()) + ->activityResult($result); + } +} + +class FeatureChecker +{ + #[Check] + public static function check(#[Stub('Workflow')] WorkflowStubInterface $stub): void + { + Assert::same($stub->getResult(timeout: 10), 'cancelled'); + } +} diff --git a/features/activity/retry_on_error/feature.php b/features/activity/retry_on_error/feature.php new file mode 100644 index 00000000..68102f9a --- /dev/null +++ b/features/activity/retry_on_error/feature.php @@ -0,0 +1,76 @@ +withScheduleToCloseTimeout('1 minute') + ->withRetryOptions((new RetryOptions()) + ->withInitialInterval('1 millisecond') + # Do not increase retry backoff each time + ->withBackoffCoefficient(1) + # 5 total maximum attempts + ->withMaximumAttempts(5) + ), + )->alwaysFailActivity(); + } +} + +#[ActivityInterface] +class FeatureActivity +{ + #[ActivityMethod('always_fail_activity')] + public function alwaysFailActivity(): string + { + $attempt = Activity::getInfo()->attempt; + throw new ApplicationFailure( + message: "activity attempt {$attempt} failed", + type: "CustomError", + nonRetryable: false, + ); + } +} + +class FeatureChecker +{ + #[Check] + public static function check(#[Stub('Workflow')] WorkflowStubInterface $stub): void + { + try { + $stub->getResult(); + throw new \Exception('Expected WorkflowFailedException'); + } catch (WorkflowFailedException $e) { + Assert::isInstanceOf($e->getPrevious(), ActivityFailure::class); + /** @var ActivityFailure $failure */ + $failure = $e->getPrevious()->getPrevious(); + Assert::isInstanceOf($failure, ApplicationFailure::class); + Assert::contains($failure->getOriginalMessage(), 'activity attempt 5 failed'); + } + } +} diff --git a/features/child_workflow/result/feature.php b/features/child_workflow/result/feature.php new file mode 100644 index 00000000..acb73713 --- /dev/null +++ b/features/child_workflow/result/feature.php @@ -0,0 +1,46 @@ +withTaskQueue(Workflow::getInfo()->taskQueue), + )->run('Test'); + } +} + +#[WorkflowInterface] +class ChildWorkflow +{ + #[WorkflowMethod('ChildWorkflow')] + public function run(string $input) + { + return $input; + } +} + +class FeatureChecker +{ + #[Check] + public static function check(#[Stub('MainWorkflow')] WorkflowStubInterface $stub): void + { + Assert::same($stub->getResult(), 'Test'); + } +} diff --git a/features/child_workflow/signal/feature.php b/features/child_workflow/signal/feature.php new file mode 100644 index 00000000..55b558dc --- /dev/null +++ b/features/child_workflow/signal/feature.php @@ -0,0 +1,69 @@ +withTaskQueue(Workflow::getInfo()->taskQueue), + ); + $handle = $workflow->run(); + yield $workflow->signal('unblock'); + return yield $handle; + } +} + +/** + * A workflow that waits for a signal and returns the data received. + */ +#[WorkflowInterface] +class ChildWorkflow +{ + private ?string $message = null; + + #[WorkflowMethod('ChildWorkflow')] + public function run() + { + yield Workflow::await(fn(): bool => $this->message !== null); + return $this->message; + } + + /** + * @return PromiseInterface + */ + #[SignalMethod('signal')] + public function signal(string $message): void + { + $this->message = $message; + } +} + +class FeatureChecker +{ + #[Check] + public static function check(#[Stub('MainWorkflow')] WorkflowStubInterface $stub): void + { + Assert::same($stub->getResult(), 'unblock'); + } +} diff --git a/features/child_workflow/throws_on_execute/feature.php b/features/child_workflow/throws_on_execute/feature.php new file mode 100644 index 00000000..868f8144 --- /dev/null +++ b/features/child_workflow/throws_on_execute/feature.php @@ -0,0 +1,68 @@ +withTaskQueue(Workflow::getInfo()->taskQueue), + )->run(); + } +} + +#[WorkflowInterface] +class ChildWorkflow +{ + #[WorkflowMethod('ChildWorkflow')] + public function run() + { + throw new ApplicationFailure('Test message', 'TestError', true, EncodedValues::fromValues([['foo' => 'bar']])); + } +} + +class FeatureChecker +{ + #[Check] + public static function check(#[Stub('MainWorkflow')] WorkflowStubInterface $stub): void + { + try { + $stub->getResult(); + throw new \Exception('Expected exception'); + } catch (WorkflowFailedException $e) { + Assert::same($e->getWorkflowType(), 'MainWorkflow'); + + /** @var ChildWorkflowFailure $previous */ + $previous = $e->getPrevious(); + Assert::isInstanceOf($previous, ChildWorkflowFailure::class); + Assert::same($previous->getWorkflowType(), 'ChildWorkflow'); + + /** @var ApplicationFailure $failure */ + $failure = $previous->getPrevious(); + Assert::isInstanceOf($failure, ApplicationFailure::class); + Assert::contains($failure->getOriginalMessage(), 'Test message'); + Assert::same($failure->getType(), 'TestError'); + Assert::same($failure->isNonRetryable(), true); + Assert::same($failure->getDetails()->getValue(0, 'array'), ['foo' => 'bar']); + } + } +} diff --git a/features/continue_as_new/continue_as_same/feature.php b/features/continue_as_new/continue_as_same/feature.php new file mode 100644 index 00000000..2aa21fee --- /dev/null +++ b/features/continue_as_new/continue_as_same/feature.php @@ -0,0 +1,59 @@ +continuedExecutionRunId)) { + return $input; + } + + return yield Workflow::continueAsNew( + 'Workflow', + args: [$input], + options: Workflow\ContinueAsNewOptions::new() + // todo might be removed with https://github.com/temporalio/sdk-php/issues/453 + ->withTaskQueue(Workflow::getInfo()->taskQueue) + ); + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub( + type: 'Workflow', + workflowId: WORKFLOW_ID, + args: [INPUT_DATA], + memo: [MEMO_KEY => MEMO_VALUE], + )] + WorkflowStubInterface $stub + ): void { + Assert::same($stub->getResult(), INPUT_DATA); + # Workflow ID does not change after continue as new + Assert::same($stub->getExecution()->getID(), WORKFLOW_ID); + # Memos do not change after continue as new + $description = $stub->describe(); + Assert::same($description->info->memo->getValues(), [MEMO_KEY => MEMO_VALUE]); + } +} diff --git a/features/data_converter/binary/feature.php b/features/data_converter/binary/feature.php new file mode 100644 index 00000000..275504ce --- /dev/null +++ b/features/data_converter/binary/feature.php @@ -0,0 +1,97 @@ +startRequest = $arg; + return $next($method, $arg, $ctx); + } + + public function getResult(GetResultInput $input, callable $next): ?EncodedValues + { + return $this->result = $next($input); + } +} + +class FeatureChecker +{ + public function __construct( + private readonly Interceptor $interceptor = new Interceptor(), + ) {} + + public function pipelineProvider(): PipelineProvider + { + return new SimplePipelineProvider([$this->interceptor]); + } + + #[Check] + public function check( + #[Stub('Workflow', args: [INPUT])] + #[Client(pipelineProvider: [FeatureChecker::class, 'pipelineProvider'])] + WorkflowStubInterface $stub, + ): void { + /** @var Bytes $result */ + $result = $stub->getResult(Bytes::class); + + Assert::eq($result->getData(), EXPECTED_RESULT); + + # Check arguments + Assert::notNull($this->interceptor->startRequest); + Assert::notNull($this->interceptor->result); + + /** @var Payload $payload */ + $payload = $this->interceptor->startRequest->getInput()?->getPayloads()[0] ?? null; + Assert::notNull($payload); + + Assert::same($payload->getMetadata()['encoding'], CODEC_ENCODING); + + // Check result value from interceptor + /** @var Payload $resultPayload */ + $resultPayload = $this->interceptor->result->toPayloads()->getPayloads()[0]; + Assert::same($resultPayload->getMetadata()['encoding'], CODEC_ENCODING); + } +} diff --git a/features/data_converter/binary_protobuf/feature.php b/features/data_converter/binary_protobuf/feature.php new file mode 100644 index 00000000..93404459 --- /dev/null +++ b/features/data_converter/binary_protobuf/feature.php @@ -0,0 +1,89 @@ +setData(EXPECTED_RESULT)); + +#[WorkflowInterface] +class FeatureWorkflow +{ + #[WorkflowMethod('Workflow')] + public function run(DataBlob $data) + { + return $data; + } +} + +/** + * Catches {@see StartWorkflowExecutionRequest} from the gRPC calls. + */ +class GrpcCallInterceptor implements GrpcClientInterceptor +{ + public ?StartWorkflowExecutionRequest $startRequest = null; + + public function interceptCall(string $method, object $arg, ContextInterface $ctx, callable $next): object + { + $arg instanceof StartWorkflowExecutionRequest and $this->startRequest = $arg; + return $next($method, $arg, $ctx); + } +} + + +class FeatureChecker +{ + public function __construct( + private readonly GrpcCallInterceptor $interceptor = new GrpcCallInterceptor(), + ) {} + + public function pipelineProvider(): PipelineProvider + { + return new SimplePipelineProvider([$this->interceptor]); + } + + #[Check] + public function check( + #[Stub('Workflow', args: [INPUT])] + #[Client( + pipelineProvider: [FeatureChecker::class, 'pipelineProvider'], + payloadConverters: [ProtoConverter::class], + )] + WorkflowStubInterface $stub, + ): void { + /** @var DataBlob $result */ + $result = $stub->getResult(DataBlob::class); + + # Check that binary protobuf message was decoded in the Workflow and sent back. + # But we don't check the result Payload encoding, because we can't configure different Payload encoders + # on the server side for different Harness features. + # There `json/protobuf` converter is used for protobuf messages by default on the server side. + Assert::eq($result->getData(), EXPECTED_RESULT); + + # Check arguments + Assert::notNull($this->interceptor->startRequest); + /** @var Payload $payload */ + $payload = $this->interceptor->startRequest->getInput()?->getPayloads()[0] ?? null; + Assert::notNull($payload); + + Assert::same($payload->getMetadata()['encoding'], 'binary/protobuf'); + Assert::same($payload->getMetadata()['messageType'], 'temporal.api.common.v1.DataBlob'); + } +} diff --git a/features/data_converter/codec/feature.php b/features/data_converter/codec/feature.php new file mode 100644 index 00000000..9353655d --- /dev/null +++ b/features/data_converter/codec/feature.php @@ -0,0 +1,138 @@ +result = $next($input); + } + + public function start(StartInput $input, callable $next): WorkflowExecution + { + $this->start = $input->arguments; + return $next($input); + } +} + +#[\AllowDynamicProperties] +class DTO +{ + public function __construct(...$args) + { + foreach ($args as $key => $value) { + $this->{$key} = $value; + } + } +} + +class Base64PayloadCodec implements PayloadConverterInterface +{ + public function getEncodingType(): string + { + return CODEC_ENCODING; + } + + public function toPayload($value): ?Payload + { + return $value instanceof DTO + ? (new Payload()) + ->setData(\base64_encode(\json_encode($value, flags: \JSON_THROW_ON_ERROR))) + ->setMetadata(['encoding' => CODEC_ENCODING]) + : null; + } + + public function fromPayload(Payload $payload, Type $type): DTO + { + $values = \json_decode(\base64_decode($payload->getData()), associative: true, flags: \JSON_THROW_ON_ERROR); + $dto = new DTO(); + foreach ($values as $key => $value) { + $dto->{$key} = $value; + } + return $dto; + } +} + +class FeatureChecker +{ + public function __construct( + private ResultInterceptor $interceptor = new ResultInterceptor(), + ) {} + + public function pipelineProvider(): PipelineProvider + { + return new SimplePipelineProvider([$this->interceptor]); + } + + #[Check] + public function check( + #[Stub('Workflow', args: [EXPECTED_RESULT])] + #[Client( + pipelineProvider: [FeatureChecker::class, 'pipelineProvider'], + payloadConverters: [Base64PayloadCodec::class]), + ] + WorkflowStubInterface $stub, + ): void { + $result = $stub->getResult(); + + Assert::eq($result, EXPECTED_RESULT); + + $result = $this->interceptor->result; + $input = $this->interceptor->start; + Assert::notNull($result); + Assert::notNull($input); + + // Check result value from interceptor + /** @var Payload $resultPayload */ + $resultPayload = $result->toPayloads()->getPayloads()[0]; + Assert::same($resultPayload->getMetadata()['encoding'], CODEC_ENCODING); + Assert::same($resultPayload->getData(), \base64_encode('{"spec":true}')); + + // Check arguments from interceptor + /** @var Payload $inputPayload */ + $inputPayload = $input->toPayloads()->getPayloads()[0]; + Assert::same($inputPayload->getMetadata()['encoding'], CODEC_ENCODING); + Assert::same($inputPayload->getData(), \base64_encode('{"spec":true}')); + } +} diff --git a/features/data_converter/empty/feature.php b/features/data_converter/empty/feature.php new file mode 100644 index 00000000..9ffd32ec --- /dev/null +++ b/features/data_converter/empty/feature.php @@ -0,0 +1,85 @@ +withStartToCloseTimeout(10), + )->nullActivity(null); + } +} + +#[ActivityInterface] +class EmptyActivity +{ + /** + * @return PromiseInterface + */ + #[ActivityMethod('null_activity')] + public function nullActivity(?string $input): void + { + // check the null input is serialized correctly + if ($input !== null) { + throw new ApplicationFailure('Activity input should be null', 'BadResult', true); + } + } +} + +class FeatureChecker +{ + #[Check] + public function check( + #[Stub('Workflow')] + WorkflowStubInterface $stub, + WorkflowClientInterface $client, + ): void { + // verify the workflow returns nothing + $result = $stub->getResult(); + Assert::null($result); + + // get result payload of ActivityTaskScheduled event from workflow history + $found = false; + $event = null; + /** @var HistoryEvent $event */ + foreach ($client->getWorkflowHistory($stub->getExecution()) as $event) { + if ($event->getEventType() === EventType::EVENT_TYPE_ACTIVITY_TASK_SCHEDULED) { + $found = true; + break; + } + } + + Assert::true($found, 'Activity task scheduled event not found'); + $payload = $event->getActivityTaskScheduledEventAttributes()?->getInput()?->getPayloads()[0]; + Assert::isInstanceOf($payload, Payload::class); + \assert($payload instanceof Payload); + + // load JSON payload from `./payload.json` and compare it to JSON representation of result payload + $decoded = \json_decode(\file_get_contents(__DIR__ . '/payload.json'), true, 512, JSON_THROW_ON_ERROR); + Assert::eq(\json_decode($payload->serializeToJsonString(), true, 512, JSON_THROW_ON_ERROR), $decoded); + } +} diff --git a/features/data_converter/failure/feature.php b/features/data_converter/failure/feature.php new file mode 100644 index 00000000..8426dd62 --- /dev/null +++ b/features/data_converter/failure/feature.php @@ -0,0 +1,110 @@ +withStartToCloseTimeout(10), + )->failActivity(null); + } +} + +#[ActivityInterface] +class EmptyActivity +{ + #[ActivityMethod('fail_activity')] + public function failActivity(?string $input): never + { + throw new ApplicationFailure( + message: 'main error', + type: 'MainError', + nonRetryable: true, + previous: new ApplicationFailure( + message: 'cause error', + type: 'CauseError', + nonRetryable: true, + ) + ); + } +} + +class FeatureChecker +{ + #[Check] + public function check( + #[Stub('Workflow')] + WorkflowStubInterface $stub, + WorkflowClientInterface $client, + ): void { + try { + $stub->getResult(); + throw new \Exception('Expected WorkflowFailedException'); + } catch (WorkflowFailedException) { + // do nothing + } + + // get result payload of ActivityTaskScheduled event from workflow history + $found = false; + $event = null; + /** @var HistoryEvent $event */ + foreach ($client->getWorkflowHistory($stub->getExecution()) as $event) { + if ($event->getEventType() === EventType::EVENT_TYPE_ACTIVITY_TASK_FAILED) { + $found = true; + break; + } + } + + Assert::true($found, 'Activity task failed event not found'); + Assert::true($event->hasActivityTaskFailedEventAttributes()); + + $failure = $event->getActivityTaskFailedEventAttributes()?->getFailure(); + Assert::isInstanceOf($failure, Failure::class); + \assert($failure instanceof Failure); + + throw new SkipTest('SDK does not format Failure message as expected'); + $this->checkFailure($failure, 'main error'); + $this->checkFailure($failure->getCause(), 'cause error'); + } + + private function checkFailure(Failure $failure, string $message): void + { + Assert::same($failure->getMessage(), 'Encoded failure'); + Assert::isEmpty($failure->getStackTrace()); + + $payload = $failure->getEncodedAttributes(); + \assert($payload instanceof Payload); + Assert::isEmpty($payload->getMetadata()['encoding'], 'json/plain'); + + $data = DataConverter::createDefault()->fromPayload($payload, null); + Assert::same($data['message'], $message); + Assert::keyExists($data, 'stack_trace'); + } +} diff --git a/features/data_converter/json/feature.php b/features/data_converter/json/feature.php new file mode 100644 index 00000000..6aa7122c --- /dev/null +++ b/features/data_converter/json/feature.php @@ -0,0 +1,82 @@ + true]); + +#[WorkflowInterface] +class FeatureWorkflow +{ + #[WorkflowMethod('Workflow')] + public function run(object $data) + { + return $data; + } +} + +/** + * Catches raw Workflow result. + */ +class ResultInterceptor implements WorkflowClientCallsInterceptor +{ + use WorkflowClientCallsInterceptorTrait; + + public ?EncodedValues $result = null; + + public function getResult(GetResultInput $input, callable $next): ?EncodedValues + { + return $this->result = $next($input); + } +} + +class FeatureChecker +{ + private ResultInterceptor $interceptor; + + public function __construct() + { + $this->interceptor = new ResultInterceptor(); + } + + public function pipelineProvider(): PipelineProvider + { + return new SimplePipelineProvider([$this->interceptor]); + } + + #[Check] + public function check( + #[Stub('Workflow', args: [EXPECTED_RESULT])] + #[Client(pipelineProvider: [FeatureChecker::class, 'pipelineProvider'])] + WorkflowStubInterface $stub, + ): void { + $result = $stub->getResult(); + + Assert::eq($result, EXPECTED_RESULT); + + $result = $this->interceptor->result; + Assert::notNull($result); + + $payloads = $result->toPayloads(); + /** @var \Temporal\Api\Common\V1\Payload $payload */ + $payload = $payloads->getPayloads()[0]; + + Assert::same($payload->getMetadata()['encoding'], 'json/plain'); + Assert::same($payload->getData(), '{"spec":true}'); + } +} diff --git a/features/data_converter/json_protobuf/feature.php b/features/data_converter/json_protobuf/feature.php new file mode 100644 index 00000000..af57ea22 --- /dev/null +++ b/features/data_converter/json_protobuf/feature.php @@ -0,0 +1,86 @@ +setData(EXPECTED_RESULT)); + +#[WorkflowInterface] +class FeatureWorkflow +{ + #[WorkflowMethod('Workflow')] + public function run(DataBlob $data) + { + return $data; + } +} + +/** + * Catches raw Workflow result. + */ +class ResultInterceptor implements WorkflowClientCallsInterceptor +{ + use WorkflowClientCallsInterceptorTrait; + + public ?EncodedValues $result = null; + + public function getResult(GetResultInput $input, callable $next): ?EncodedValues + { + return $this->result = $next($input); + } +} + +class FeatureChecker +{ + private ResultInterceptor $interceptor; + + public function __construct() + { + $this->interceptor = new ResultInterceptor(); + } + + public function pipelineProvider(): PipelineProvider + { + return new SimplePipelineProvider([$this->interceptor]); + } + + #[Check] + public function check( + #[Stub('Workflow', args: [INPUT])] + #[Client(pipelineProvider: [FeatureChecker::class, 'pipelineProvider'])] + WorkflowStubInterface $stub, + ): void { + /** @var DataBlob $result */ + $result = $stub->getResult(DataBlob::class); + + Assert::eq($result->getData(), EXPECTED_RESULT); + + $result = $this->interceptor->result; + Assert::notNull($result); + + $payloads = $result->toPayloads(); + /** @var \Temporal\Api\Common\V1\Payload $payload */ + $payload = $payloads->getPayloads()[0]; + + Assert::same($payload->getMetadata()['encoding'], 'json/protobuf'); + Assert::same($payload->getMetadata()['messageType'], 'temporal.api.common.v1.DataBlob'); + Assert::same($payload->getData(), '{"data":"MzczNTkyODU1OQ=="}'); + } +} diff --git a/features/eager_activity/non_remote_activities_worker/feature.php b/features/eager_activity/non_remote_activities_worker/feature.php new file mode 100644 index 00000000..126f652b --- /dev/null +++ b/features/eager_activity/non_remote_activities_worker/feature.php @@ -0,0 +1,60 @@ +withStartToCloseTimeout(3), + )->dummy(); + } +} + +/** + * Not a local activity + */ +#[ActivityInterface] +class EmptyActivity +{ + #[ActivityMethod('dummy')] + public function dummy(): void + { + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub + ): void { + throw new SkipTest('Need to run worker with no_remote_activities=True'); + + try { + $stub->getResult(); + } catch (WorkflowFailedException $e) { + // todo check that previous exception is a timeout_error and not a schedule_to_start_error + } + + throw new \Exception('Test not completed'); + } +} diff --git a/features/eager_workflow/successful_start/feature.php b/features/eager_workflow/successful_start/feature.php new file mode 100644 index 00000000..22d177c9 --- /dev/null +++ b/features/eager_workflow/successful_start/feature.php @@ -0,0 +1,69 @@ +lastResponse = $result; + return $result; + } +} + +class FeatureChecker +{ + public function __construct( + private grpcCallInterceptor $interceptor = new grpcCallInterceptor(), + ) {} + + public function pipelineProvider(): PipelineProvider + { + return new SimplePipelineProvider([$this->interceptor]); + } + + #[Check] + public function check( + #[Stub('Workflow', eagerStart: true)] + #[Client(timeout:30, pipelineProvider: [FeatureChecker::class, 'pipelineProvider'])] + WorkflowStubInterface $stub, + ): void { + // Check the result and the eager workflow proof + Assert::same($stub->getResult(), EXPECTED_RESULT); + Assert::notNull($this->interceptor->lastResponse); + Assert::notNull($this->interceptor->lastResponse->getEagerWorkflowTask()); + } +} diff --git a/features/query/successful_query/feature.php b/features/query/successful_query/feature.php new file mode 100644 index 00000000..8e525da0 --- /dev/null +++ b/features/query/successful_query/feature.php @@ -0,0 +1,66 @@ + $this->beDone); + } + + #[QueryMethod('get_counter')] + public function getCounter(): int + { + return $this->counter; + } + + #[SignalMethod('inc_counter')] + public function incCounter(): void + { + ++$this->counter; + } + + #[SignalMethod('finish')] + public function finish(): void + { + $this->beDone = true; + } +} + +class FeatureChecker +{ + #[Check] + public static function check(#[Stub('Workflow')] WorkflowStubInterface $stub): void + { + Assert::same($stub->query('get_counter')?->getValue(0), 0); + + $stub->signal('inc_counter'); + Assert::same($stub->query('get_counter')?->getValue(0), 1); + + $stub->signal('inc_counter'); + $stub->signal('inc_counter'); + $stub->signal('inc_counter'); + Assert::same($stub->query('get_counter')?->getValue(0), 4); + + $stub->signal('finish'); + $stub->getResult(); + } +} diff --git a/features/query/timeout_due_to_no_active_workers/feature.php b/features/query/timeout_due_to_no_active_workers/feature.php new file mode 100644 index 00000000..5341f9ae --- /dev/null +++ b/features/query/timeout_due_to_no_active_workers/feature.php @@ -0,0 +1,73 @@ + $this->beDone); + } + + #[QueryMethod('simple_query')] + public function simpleQuery(): bool + { + return true; + } + + #[SignalMethod('finish')] + public function finish(): void + { + $this->beDone = true; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + Runner $runner, + ): void { + # Stop worker + $runner->stop(); + + try { + $stub->query('simple_query')?->getValue(0); + throw new \Exception('Query must fail due to no active workers'); + } catch (WorkflowServiceException $e) { + // Can be cancelled or deadline exceeded depending on whether client or + // server hit timeout first in a racy way + $status = $e->getPrevious()?->getCode(); + Assert::inArray($status, [ + StatusCode::CANCELLED, + StatusCode::DEADLINE_EXCEEDED, // Deadline Exceeded + StatusCode::FAILED_PRECONDITION, // no poller seen for task queue recently + ], 'Error code must be DEADLINE_EXCEEDED or CANCELLED. Got ' . \print_r($status, true)); + } finally { + # Restart the worker and finish the wf + $runner->start(); + $stub->signal('finish'); + $stub->getResult(); + } + } +} diff --git a/features/query/unexpected_arguments/feature.php b/features/query/unexpected_arguments/feature.php new file mode 100644 index 00000000..ecf8a41f --- /dev/null +++ b/features/query/unexpected_arguments/feature.php @@ -0,0 +1,74 @@ + $this->beDone); + } + + #[QueryMethod('the_query')] + public function theQuery(int $arg): string + { + return "got $arg"; + } + + #[SignalMethod('finish')] + public function finish(): void + { + $this->beDone = true; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + Assert::same('got 42', $stub->query('the_query', 42)?->getValue(0)); + + try { + $stub->query('the_query', true)?->getValue(0); + throw new \Exception('Query must fail due to unexpected argument type'); + } catch (WorkflowQueryException $e) { + Assert::contains( + $e->getPrevious()->getMessage(), + 'The passed value of type "bool" can not be converted to required type "int"', + ); + } + + # Silently drops extra arg + Assert::same('got 123', $stub->query('the_query', 123, true)?->getValue(0)); + + # Not enough arg + try { + $stub->query('the_query')?->getValue(0); + throw new \Exception('Query must fail due to missing argument'); + } catch (WorkflowQueryException $e) { + Assert::contains($e->getPrevious()->getMessage(), '0 passed and exactly 1 expected'); + } + + $stub->signal('finish'); + $stub->getResult(); + } +} diff --git a/features/query/unexpected_query_type_name/feature.php b/features/query/unexpected_query_type_name/feature.php new file mode 100644 index 00000000..a7195f67 --- /dev/null +++ b/features/query/unexpected_query_type_name/feature.php @@ -0,0 +1,54 @@ + $this->beDone); + } + + #[SignalMethod('finish')] + public function finish(): void + { + $this->beDone = true; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + try { + $stub->query('nonexistent'); + throw new \Exception('Query must fail due to unknown queryType'); + } catch (WorkflowQueryException $e) { + Assert::contains( + $e->getPrevious()->getMessage(), + 'unknown queryType nonexistent', + ); + } + + $stub->signal('finish'); + $stub->getResult(); + } +} diff --git a/features/query/unexpected_return_type/feature.php b/features/query/unexpected_return_type/feature.php new file mode 100644 index 00000000..53df1972 --- /dev/null +++ b/features/query/unexpected_return_type/feature.php @@ -0,0 +1,61 @@ + $this->beDone); + } + + #[QueryMethod('the_query')] + public function theQuery(): string + { + return 'hi bob'; + } + + #[SignalMethod('finish')] + public function finish(): void + { + $this->beDone = true; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + try { + $stub->query('the_query')?->getValue(0, 'int'); + throw new \Exception('Query must fail due to unexpected return type'); + } catch (DataConverterException $e) { + Assert::contains( + $e->getMessage(), + 'The passed value of type "string" can not be converted to required type "int"', + ); + } + + $stub->signal('finish'); + $stub->getResult(); + } +} diff --git a/features/schedule/backfill/feature.php b/features/schedule/backfill/feature.php new file mode 100644 index 00000000..a3adc00b --- /dev/null +++ b/features/schedule/backfill/feature.php @@ -0,0 +1,90 @@ +toString(); + $scheduleId = Uuid::uuid4()->toString(); + + $handle = $client->createSchedule( + schedule: Schedule::new() + ->withAction( + StartWorkflowAction::new('Workflow') + ->withWorkflowId($workflowId) + ->withTaskQueue($feature->taskQueue) + ->withInput(['arg1']) + )->withSpec( + ScheduleSpec::new() + ->withIntervalList(CarbonInterval::minute(1)) + )->withState( + ScheduleState::new() + ->withPaused(true) + ), + options: ScheduleOptions::new() + // todo: should namespace be inherited from Service Client options by default? + ->withNamespace($runtime->namespace), + scheduleId: $scheduleId, + ); + + try { + // Run backfill + $now = CarbonImmutable::now()->setSeconds(0); + $threeYearsAgo = $now->modify('-3 years'); + $thirtyMinutesAgo = $now->modify('-30 minutes'); + $handle->backfill([ + BackfillPeriod::new( + $threeYearsAgo->modify('-2 minutes'), + $threeYearsAgo, + ScheduleOverlapPolicy::AllowAll, + ), + BackfillPeriod::new( + $thirtyMinutesAgo->modify('-2 minutes'), + $thirtyMinutesAgo, + ScheduleOverlapPolicy::AllowAll, + ), + ]); + + // Confirm 6 executions + Assert::same($handle->describe()->info->numActions, 6); + } finally { + $handle->delete(); + } + } +} diff --git a/features/schedule/basic/feature.php b/features/schedule/basic/feature.php new file mode 100644 index 00000000..18e0bbe5 --- /dev/null +++ b/features/schedule/basic/feature.php @@ -0,0 +1,139 @@ +toString(); + $scheduleId = Uuid::uuid4()->toString(); + $interval = CarbonInterval::seconds(2); + + $handle = $client->createSchedule( + schedule: Schedule::new() + ->withAction( + StartWorkflowAction::new('Workflow') + ->withWorkflowId($workflowId) + ->withTaskQueue($feature->taskQueue) + ->withInput(['arg1']) + )->withSpec( + ScheduleSpec::new() + ->withIntervalList($interval) + )->withPolicies( + SchedulePolicies::new() + ->withOverlapPolicy(ScheduleOverlapPolicy::BufferOne) + ), + options: ScheduleOptions::new() + ->withNamespace($runtime->namespace), + scheduleId: $scheduleId, + ); + try { + $deadline = CarbonImmutable::now()->add($interval)->add($interval); + + // Confirm simple describe + $description = $handle->describe(); + Assert::same($handle->getID(), $scheduleId); + /** @var StartWorkflowAction $action */ + $action = $description->schedule->action; + Assert::isInstanceOf($action, StartWorkflowAction::class); + Assert::same($action->workflowId, $workflowId); + + // Confirm simple list + $found = false; + $findDeadline = \microtime(true) + 10; + find: + foreach ($client->listSchedules() as $schedule) { + if ($schedule->scheduleId === $scheduleId) { + $found = true; + break; + } + } + if (!$found and \microtime(true) < $findDeadline) { + goto find; + } + + $found or throw new \Exception('Schedule not found'); + + // Wait for first completion + while ($handle->describe()->info->numActions < 1) { + CarbonImmutable::now() < $deadline or throw new \Exception('Workflow did not execute'); + \usleep(100_000); + } + $handle->pause('Waiting for changes'); + + // Check result + $lastActions = $handle->describe()->info->recentActions; + $lastAction = $lastActions[\array_key_last($lastActions)]; + $result = $wfClient->newUntypedRunningWorkflowStub( + $lastAction->startWorkflowResult->getID(), + $lastAction->startWorkflowResult->getRunID(), + workflowType: 'Workflow' + )->getResult(); + Assert::same($result, 'arg1'); + + // Update and change arg + $handle->update( + $description->schedule->withAction( + $action->withInput(['arg2']) + ), + ); + $numActions = $handle->describe()->info->numActions; + $handle->unpause('Run again'); + + // Wait for second completion + $deadline = CarbonImmutable::now()->add($interval)->add($interval); + while ($handle->describe()->info->numActions <= $numActions) { + CarbonImmutable::now() < $deadline or throw new \Exception('Workflow did not execute'); + \usleep(100_000); + } + + // Check result 2 + $lastActions = $handle->describe()->info->recentActions; + $lastAction = $lastActions[\array_key_last($lastActions)]; + $result = $wfClient->newUntypedRunningWorkflowStub( + $lastAction->startWorkflowResult->getID(), + $lastAction->startWorkflowResult->getRunID(), + workflowType: 'Workflow' + )->getResult(); + Assert::same($result, 'arg2'); + } finally { + $handle->delete(); + } + } +} diff --git a/features/schedule/pause/feature.php b/features/schedule/pause/feature.php new file mode 100644 index 00000000..652712a9 --- /dev/null +++ b/features/schedule/pause/feature.php @@ -0,0 +1,81 @@ +createSchedule( + schedule: Schedule::new() + ->withAction( + StartWorkflowAction::new('Workflow') + ->withTaskQueue($feature->taskQueue) + ->withInput(['arg1']) + )->withSpec( + ScheduleSpec::new() + ->withIntervalList(CarbonInterval::minute(1)) + )->withState( + ScheduleState::new() + ->withPaused(true) + ->withNotes('initial note') + ), + options: ScheduleOptions::new() + ->withNamespace($runtime->namespace), + ); + + try { + // Confirm pause + $state = $handle->describe()->schedule->state; + Assert::true($state->paused); + Assert::same($state->notes, 'initial note'); + // Re-pause + $handle->pause('custom note1'); + $state = $handle->describe()->schedule->state; + Assert::true($state->paused); + Assert::same($state->notes, 'custom note1'); + // Unpause + $handle->unpause(); + $state = $handle->describe()->schedule->state; + Assert::false($state->paused); + Assert::same($state->notes, 'Unpaused via PHP SDK'); + // Pause + $handle->pause(); + $state = $handle->describe()->schedule->state; + Assert::true($state->paused); + Assert::same($state->notes, 'Paused via PHP SDK'); + } finally { + $handle->delete(); + } + } +} diff --git a/features/schedule/trigger/feature.php b/features/schedule/trigger/feature.php new file mode 100644 index 00000000..422cf8ad --- /dev/null +++ b/features/schedule/trigger/feature.php @@ -0,0 +1,67 @@ +createSchedule( + schedule: Schedule::new() + ->withAction(StartWorkflowAction::new('Workflow') + ->withTaskQueue($feature->taskQueue) + ->withInput(['arg1'])) + ->withSpec(ScheduleSpec::new()->withIntervalList(CarbonInterval::minute(1))) + ->withState(ScheduleState::new()->withPaused(true)), + options: ScheduleOptions::new()->withNamespace($runtime->namespace), + ); + + try { + $handle->trigger(); + // We have to wait before triggering again. See + // https://github.com/temporalio/temporal/issues/3614 + \sleep(2); + + $handle->trigger(); + + // Wait for completion + $deadline = CarbonImmutable::now()->addSeconds(10); + while ($handle->describe()->info->numActions < 2) { + CarbonImmutable::now() < $deadline or throw new \Exception('Workflow did not complete'); + \usleep(100_000); + } + } finally { + $handle->delete(); + } + } +} diff --git a/features/signal/activities/feature.php b/features/signal/activities/feature.php new file mode 100644 index 00000000..16aef554 --- /dev/null +++ b/features/signal/activities/feature.php @@ -0,0 +1,70 @@ + $this->total > 0); + return $this->total; + } + + #[SignalMethod('mySignal')] + public function mySignal() + { + $promises = []; + for ($i = 0; $i < ACTIVITY_COUNT; ++$i) { + $promises[] = Workflow::executeActivity( + 'result', + options: ActivityOptions::new()->withStartToCloseTimeout(10) + ); + } + + yield Promise::all($promises) + ->then(fn(array $results) => $this->total = \array_sum($results)); + } +} + +#[ActivityInterface] +class FeatureActivity +{ + #[ActivityMethod('result')] + public function result(): int + { + return ACTIVITY_RESULT; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $stub->signal('mySignal'); + Assert::same($stub->getResult(), ACTIVITY_COUNT * ACTIVITY_RESULT); + } +} diff --git a/features/signal/basic/feature.php b/features/signal/basic/feature.php new file mode 100644 index 00000000..c8163d85 --- /dev/null +++ b/features/signal/basic/feature.php @@ -0,0 +1,44 @@ + $this->value !== ''); + return $this->value; + } + + #[SignalMethod('my_signal')] + public function mySignal(string $arg) + { + $this->value = $arg; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $stub->signal('my_signal', 'arg'); + Assert::same($stub->getResult(), 'arg'); + } +} diff --git a/features/signal/child_workflow/feature.php b/features/signal/child_workflow/feature.php new file mode 100644 index 00000000..4c3a2b69 --- /dev/null +++ b/features/signal/child_workflow/feature.php @@ -0,0 +1,62 @@ +withTaskQueue(Workflow::getInfo()->taskQueue) + ); + $handle = $wf->run(); + + yield $wf->mySignal('child-wf-arg'); + return yield $handle; + } +} + +#[WorkflowInterface] +class ChildWorkflow +{ + private string $value = ''; + + #[WorkflowMethod('Child')] + public function run() + { + yield Workflow::await(fn(): bool => $this->value !== ''); + return $this->value; + } + + #[SignalMethod('my_signal')] + public function mySignal(string $arg) + { + $this->value = $arg; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + Assert::same($stub->getResult(), 'child-wf-arg'); + } +} diff --git a/features/signal/external/feature.php b/features/signal/external/feature.php new file mode 100644 index 00000000..59a736f7 --- /dev/null +++ b/features/signal/external/feature.php @@ -0,0 +1,46 @@ + $this->result !== null); + return $this->result; + } + + #[SignalMethod('my_signal')] + public function mySignal(string $arg) + { + $this->result = $arg; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $stub->signal('my_signal', SIGNAL_DATA); + Assert::same($stub->getResult(), SIGNAL_DATA); + } +} diff --git a/features/signal/prevent_close/feature.php b/features/signal/prevent_close/feature.php new file mode 100644 index 00000000..622706a6 --- /dev/null +++ b/features/signal/prevent_close/feature.php @@ -0,0 +1,79 @@ + $this->values !== []); + + // Add some blocking lag 300ms + \usleep(300_000); + + return [$this->values, $replay]; + } + + #[SignalMethod('add')] + public function add(int $arg) + { + $this->values[] = $arg; + } +} + +class FeatureChecker +{ + #[Check] + public static function checkSignalOutOfExecution( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $stub->signal('add', 1); + \usleep(1_500_000); // Wait 1.5s to workflow complete + try { + $stub->signal('add', 2); + throw new \Exception('Workflow is not completed after the first signal.'); + } catch (WorkflowNotFoundException) { + // false means the workflow was not replayed + Assert::same($stub->getResult()[0], [1]); + Assert::same($stub->getResult()[1], false, 'The workflow was not replayed'); + } + } + + #[Check] + public static function checkPreventClose( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $stub->signal('add', 1); + + // Wait that the first signal is processed + \usleep(200_000); + + // Add signal while WF is completing + $stub->signal('add', 2); + + Assert::same($stub->getResult()[0], [1, 2], 'Both signals were processed'); + + // todo: Find a better way + // Assert::same($stub->getResult()[1], true, 'The workflow was replayed'); + } +} diff --git a/features/signal/signal_with_start/feature.php b/features/signal/signal_with_start/feature.php new file mode 100644 index 00000000..2052f941 --- /dev/null +++ b/features/signal/signal_with_start/feature.php @@ -0,0 +1,75 @@ +value += $arg; + + yield Workflow::await(fn() => $this->value > 0); + + return $this->value; + } + + #[SignalMethod('add')] + public function add(int $arg): void + { + $this->value += $arg; + } +} + +class FeatureChecker +{ + #[Check] + public static function checkSignalProcessedBeforeHandler( + WorkflowClientInterface $client, + Feature $feature, + ): void { + $stub = $client->newWorkflowStub( + FeatureWorkflow::class, + WorkflowOptions::new()->withTaskQueue($feature->taskQueue), + ); + $run = $client->startWithSignal($stub, 'add', [42], [1]); + + // See https://github.com/temporalio/sdk-php/issues/457 + Assert::same($run->getResult(), 43, 'Signal must be processed before WF handler. Result: ' . $run->getResult()); + } + + #[Check] + public static function checkSignalToExistingWorkflow( + #[Stub('Workflow', args: [-2])] WorkflowStubInterface $stub, + WorkflowClientInterface $client, + Feature $feature, + ): void { + $stub2 = $client->newWorkflowStub( + FeatureWorkflow::class, + WorkflowOptions::new() + ->withTaskQueue($feature->taskQueue) + // Reuse same ID + ->withWorkflowId($stub->getExecution()->getID()), + ); + $run = $client->startWithSignal($stub2, 'add', [42]); + + Assert::same($run->getResult(), 40, 'Existing WF must be reused. Result: ' . $run->getResult()); + } +} diff --git a/features/update/activities/feature.php b/features/update/activities/feature.php new file mode 100644 index 00000000..9c04f3f9 --- /dev/null +++ b/features/update/activities/feature.php @@ -0,0 +1,70 @@ + $this->total > 0); + return $this->total; + } + + #[Workflow\UpdateMethod('my_update')] + public function myUpdate() + { + $promises = []; + for ($i = 0; $i < ACTIVITY_COUNT; ++$i) { + $promises[] = Workflow::executeActivity( + 'result', + options: ActivityOptions::new()->withStartToCloseTimeout(10) + ); + } + + return yield Promise::all($promises) + ->then(fn(array $results) => $this->total = \array_sum($results)); + } +} + +#[ActivityInterface] +class FeatureActivity +{ + #[ActivityMethod('result')] + public function result(): int + { + return ACTIVITY_RESULT; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $updated = $stub->update('my_update')->getValue(0); + Assert::same($updated, ACTIVITY_COUNT * ACTIVITY_RESULT); + Assert::same($stub->getResult(), ACTIVITY_COUNT * ACTIVITY_RESULT); + } +} diff --git a/features/update/async_accepted/feature.php b/features/update/async_accepted/feature.php new file mode 100644 index 00000000..1e0de5d9 --- /dev/null +++ b/features/update/async_accepted/feature.php @@ -0,0 +1,110 @@ + $this->done); + return 'Hello, World!'; + } + + #[Workflow\SignalMethod('finish')] + public function finish() + { + $this->done = true; + } + + #[Workflow\SignalMethod('unblock')] + public function unblock() + { + $this->blocked = false; + } + + #[Workflow\UpdateMethod('my_update')] + public function myUpdate(bool $block) + { + if ($block) { + yield Workflow::await(fn(): bool => !$this->blocked); + $this->blocked = true; + return 123; + } + + throw new ApplicationFailure('Dying on purpose', 'my_update', true); + } +} + +class FeatureChecker +{ + #[Check] + public function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $updateId = Uuid::uuid4()->toString(); + # Issue async update + $handle = $stub->startUpdate( + UpdateOptions::new('my_update', LifecycleStage::StageAccepted) + ->withUpdateId($updateId), + true, + ); + + $this->assertHandleIsBlocked($handle); + // Create a separate handle to the same update + $otherHandle = $stub->getUpdateHandle($updateId); + $this->assertHandleIsBlocked($otherHandle); + + # Unblock last update + $stub->signal('unblock'); + Assert::same($handle->getResult(), 123); + Assert::same($otherHandle->getResult(), 123); + + # issue an async update that should throw + $updateId = Uuid::uuid4()->toString(); + try { + $stub->startUpdate( + UpdateOptions::new('my_update', LifecycleStage::StageCompleted) + ->withUpdateId($updateId), + false, + ); + throw new \RuntimeException('Expected ApplicationFailure.'); + } catch (WorkflowUpdateException $e) { + Assert::contains($e->getPrevious()->getMessage(), 'Dying on purpose'); + Assert::same($e->getUpdateId(), $updateId); + Assert::same($e->getUpdateName(), 'my_update'); + } + } + + private function assertHandleIsBlocked(UpdateHandle $handle): void + { + try { + // Check there is no result + $handle->getEncodedValues(1.5); + throw new \RuntimeException('Expected Timeout Exception.'); + } catch (TimeoutException) { + // Expected + } + } +} diff --git a/features/update/basic/feature.php b/features/update/basic/feature.php new file mode 100644 index 00000000..cd039643 --- /dev/null +++ b/features/update/basic/feature.php @@ -0,0 +1,45 @@ + $this->done); + return 'Hello, world!'; + } + + #[Workflow\UpdateMethod('my_update')] + public function myUpdate() + { + $this->done = true; + return 'Updated'; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $updated = $stub->update('my_update')->getValue(0); + Assert::same($updated, 'Updated'); + Assert::same($stub->getResult(), 'Hello, world!'); + } +} diff --git a/features/update/basic_async/feature.php b/features/update/basic_async/feature.php new file mode 100644 index 00000000..99c90fa8 --- /dev/null +++ b/features/update/basic_async/feature.php @@ -0,0 +1,59 @@ + $this->state !== ''); + return $this->state; + } + + #[Workflow\UpdateMethod('my_update')] + public function myUpdate(string $arg): string + { + $this->state = $arg; + return 'update-result'; + } + + #[Workflow\UpdateValidatorMethod('my_update')] + public function myValidateUpdate(string $arg): void + { + $arg === 'bad-update-arg' and throw new \Exception('Invalid Update argument'); + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + try { + $stub->update('my_update', 'bad-update-arg'); + throw new \RuntimeException('Expected validation exception'); + } catch (WorkflowUpdateException $e) { + Assert::contains($e->getPrevious()?->getMessage(), 'Invalid Update argument'); + } + + $updated = $stub->update('my_update', 'foo-bar')->getValue(0); + Assert::same($updated, 'update-result'); + Assert::same($stub->getResult(), 'foo-bar'); + } +} diff --git a/features/update/client_interceptor/feature.php b/features/update/client_interceptor/feature.php new file mode 100644 index 00000000..09ff1ca5 --- /dev/null +++ b/features/update/client_interceptor/feature.php @@ -0,0 +1,76 @@ + $this->done); + return 'Hello, World!'; + } + + #[Workflow\UpdateMethod('my_update')] + public function myUpdate(int $arg): int + { + $this->done = true; + return $arg; + } +} + +class Interceptor implements WorkflowClientCallsInterceptor +{ + use WorkflowClientCallsInterceptorTrait; + + public function update(UpdateInput $input, callable $next): StartUpdateOutput + { + if ($input->updateName !== 'my_update') { + return $next($input); + } + + $rg = $input->arguments->getValue(0); + + return $next($input->with(arguments: EncodedValues::fromValues([$rg + 1]))); + } +} + +class FeatureChecker +{ + public function pipelineProvider(): PipelineProvider + { + return new SimplePipelineProvider([new Interceptor()]); + } + + #[Check] + public static function check( + #[Stub('Workflow')] + #[Client(pipelineProvider: [FeatureChecker::class, 'pipelineProvider'])] + WorkflowStubInterface $stub, + ): void { + $updated = $stub->update('my_update', 1)->getValue(0); + Assert::same($updated, 2); + $stub->getResult(); + } +} diff --git a/features/update/deduplication/feature.php b/features/update/deduplication/feature.php new file mode 100644 index 00000000..e8b61eec --- /dev/null +++ b/features/update/deduplication/feature.php @@ -0,0 +1,86 @@ + $this->counter >= 2); + return $this->counter; + } + + #[Workflow\SignalMethod('unblock')] + public function unblock() + { + $this->blocked = false; + } + + #[Workflow\UpdateMethod('my_update')] + public function myUpdate() + { + ++$this->counter; + # Verify that dedupe works pre-update-completion + yield Workflow::await(fn(): bool => !$this->blocked); + $this->blocked = true; + return $this->counter; + } +} + +class FeatureChecker +{ + #[Check] + public function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + WorkflowClientInterface $client, + ): void { + $updateId = 'incrementer'; + # Issue async update + + $handle1 = $stub->startUpdate( + UpdateOptions::new('my_update', LifecycleStage::StageAccepted) + ->withUpdateId($updateId), + ); + $handle2 = $stub->startUpdate( + UpdateOptions::new('my_update', LifecycleStage::StageAccepted) + ->withUpdateId($updateId), + ); + + $stub->signal('unblock'); + + Assert::same($handle1->getResult(1), 1); + Assert::same($handle2->getResult(1), 1); + + # This only needs to start to unblock the workflow + $stub->startUpdate('my_update'); + + # There should be two accepted updates, and only one of them should be completed with the set id + $totalUpdates = 0; + foreach ($client->getWorkflowHistory($stub->getExecution()) as $event) { + $event->hasWorkflowExecutionUpdateAcceptedEventAttributes() and ++$totalUpdates; + + $f = $event->getWorkflowExecutionUpdateCompletedEventAttributes(); + $f === null or Assert::same($f->getMeta()?->getUpdateId(), $updateId); + } + + Assert::same($totalUpdates, 2); + } +} diff --git a/features/update/non_durable_reject/feature.php b/features/update/non_durable_reject/feature.php new file mode 100644 index 00000000..7d783fdf --- /dev/null +++ b/features/update/non_durable_reject/feature.php @@ -0,0 +1,68 @@ + $this->counter === 5); + return $this->counter; + } + + #[Workflow\UpdateMethod('my_update')] + public function myUpdate(int $arg): int + { + $this->counter += $arg; + return $this->counter; + } + + #[Workflow\UpdateValidatorMethod('my_update')] + public function validateMyUpdate(int $arg): void + { + $arg < 0 and throw new \InvalidArgumentException('I *HATE* negative numbers!'); + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + WorkflowClientInterface $client, + ): void { + for ($i = 0; $i < 5; $i++) { + try { + $stub->update('my_update', -1); + throw new \RuntimeException('Expected exception'); + } catch (WorkflowUpdateException) { + # Expected + } + + $stub->update('my_update', 1); + } + + Assert::same($stub->getResult(), 5); + + # Verify no rejections were written to history since we failed in the validator + foreach ($client->getWorkflowHistory($stub->getExecution()) as $event) { + $event->hasWorkflowExecutionUpdateRejectedEventAttributes() and throw new \RuntimeException('Unexpected rejection event'); + } + } +} diff --git a/features/update/self/feature.php b/features/update/self/feature.php new file mode 100644 index 00000000..af1d2ad8 --- /dev/null +++ b/features/update/self/feature.php @@ -0,0 +1,70 @@ +withStartToCloseTimeout(2) + ); + + yield Workflow::await(fn(): bool => $this->done); + + return 'Hello, world!'; + } + + #[Workflow\UpdateMethod('my_update')] + public function myUpdate() + { + $this->done = true; + } +} + +#[ActivityInterface] +class FeatureActivity +{ + public function __construct( + private WorkflowClientInterface $client, + ) {} + + #[ActivityMethod('result')] + public function result(): void + { + $this->client->newUntypedRunningWorkflowStub( + workflowID: Activity::getInfo()->workflowExecution->getID(), + workflowType: Activity::getInfo()->workflowType->name, + )->update('my_update'); + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Harness_Update_Self')] WorkflowStubInterface $stub, + ): void { + Assert::same($stub->getResult(timeout: 10), 'Hello, world!'); + } +} diff --git a/features/update/task_failure/feature.php b/features/update/task_failure/feature.php new file mode 100644 index 00000000..e87838cb --- /dev/null +++ b/features/update/task_failure/feature.php @@ -0,0 +1,99 @@ + $this->done); + + return static::$fails; + } + + #[Workflow\UpdateMethod('do_update')] + public function doUpdate(): string + { + # Don't use static variables like this. We do here because we need to fail the task a + # controlled number of times. + if (static::$fails < 2) { + ++static::$fails; + throw new class extends \Error { + public function __construct() + { + parent::__construct("I'll fail task"); + } + }; + } + + throw new ApplicationFailure("I'll fail update", 'task-failure', true); + } + + #[Workflow\UpdateMethod('throw_or_done')] + public function throwOrDone(bool $doThrow): void + { + $this->done = true; + } + + #[Workflow\UpdateValidatorMethod('throw_or_done')] + public function validateThrowOrDone(bool $doThrow): void + { + $doThrow and throw new \RuntimeException('This will fail validation, not task'); + } +} + +class FeatureChecker +{ + #[Check] + public static function retryableException( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + throw new SkipTest('TODO: doesn\'t pass in some cases'); + + try { + $stub->update('do_update'); + throw new \RuntimeException('Expected validation exception'); + } catch (WorkflowUpdateException $e) { + Assert::contains($e->getPrevious()?->getMessage(), "I'll fail update"); + } finally { + # Finish Workflow + $stub->update('throw_or_done', doThrow: false); + } + + Assert::same($stub->getResult(), 2); + } + + #[Check] + public static function validationException( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + try { + $stub->update('throw_or_done', true); + throw new \RuntimeException('Expected validation exception'); + } catch (WorkflowUpdateException) { + # Expected + } finally { + # Finish Workflow + $stub->update('throw_or_done', doThrow: false); + } + } +} diff --git a/features/update/validation_replay/feature.php b/features/update/validation_replay/feature.php new file mode 100644 index 00000000..c2aaea9e --- /dev/null +++ b/features/update/validation_replay/feature.php @@ -0,0 +1,65 @@ + $this->done); + + return static::$validations; + } + + #[Workflow\UpdateMethod('do_update')] + public function doUpdate(): void + { + if (static::$validations === 0) { + ++static::$validations; + throw new class extends \Error { + public function __construct() + { + parent::__construct("I'll fail task"); + } + }; + } + + $this->done = true; + } + + #[Workflow\UpdateValidatorMethod('do_update')] + public function validateDoUpdate(): void + { + if (static::$validations > 1) { + throw new \RuntimeException('I would reject if I even ran :|'); + } + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ): void { + $stub->update('do_update'); + Assert::same($stub->getResult(), 1); + } +} diff --git a/features/update/worker_restart/feature.php b/features/update/worker_restart/feature.php new file mode 100644 index 00000000..d6314358 --- /dev/null +++ b/features/update/worker_restart/feature.php @@ -0,0 +1,107 @@ + $this->done); + + return 'Hello, World!'; + } + + #[Workflow\UpdateMethod('do_activities')] + public function doActivities() + { + yield Workflow::executeActivity( + 'blocks', + options: ActivityOptions::new()->withStartToCloseTimeout(10) + ); + $this->done = true; + } +} + +#[ActivityInterface] +class FeatureActivity +{ + public function __construct( + private StorageInterface $kv, + ) {} + + #[ActivityMethod('blocks')] + public function blocks(): string + { + $this->kv->set(KV_ACTIVITY_STARTED, true); + + do { + $blocked = $this->kv->get(KV_ACTIVITY_BLOCKED); + + \is_bool($blocked) or throw new ApplicationFailure('KV BLOCKED key not set', 'KvNotSet', true); + if (!$blocked) { + break; + } + + \usleep(100_000); + } while (true); + + return 'hi'; + } +} + +class FeatureChecker +{ + #[Check] + public static function check( + #[Stub('Workflow')] WorkflowStubInterface $stub, + ContainerInterface $c, + Runner $runner, + ): void { + $c->get(StorageInterface::class)->set(KV_ACTIVITY_BLOCKED, true); + $handle = $stub->startUpdate('do_activities'); + + # Wait for the activity to start. + $deadline = \microtime(true) + 20; + do { + if ($c->get(StorageInterface::class)->get(KV_ACTIVITY_STARTED, false)) { + break; + } + + \microtime(true) > $deadline and throw throw new \RuntimeException('Activity did not start'); + \usleep(100_000); + } while (true); + + # Restart the worker. + $runner->stop(); + $runner->start(); + # Unblocks the activity. + $c->get(StorageInterface::class)->set(KV_ACTIVITY_BLOCKED, false); + + # Wait for Temporal restarts the activity + $handle->getResult(30); + $stub->getResult(); + } +} diff --git a/harness/php/.gitignore b/harness/php/.gitignore new file mode 100644 index 00000000..d069ae6b --- /dev/null +++ b/harness/php/.gitignore @@ -0,0 +1,5 @@ +# PHP stuff +vendor +rr +rr.exe +composer.lock diff --git a/harness/php/.rr.yaml b/harness/php/.rr.yaml new file mode 100644 index 00000000..7aab70df --- /dev/null +++ b/harness/php/.rr.yaml @@ -0,0 +1,22 @@ +version: "3" +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php worker.php" + +# Workflow and activity mesh service +temporal: + address: ${TEMPORAL_ADDRESS:-localhost:7233} + namespace: ${TEMPORAL_NAMESPACE:-default} + activities: + num_workers: 2 + +kv: + harness: + driver: memory + config: { } + +logs: + mode: development + level: info diff --git a/harness/php/composer.json b/harness/php/composer.json new file mode 100644 index 00000000..a16e8a41 --- /dev/null +++ b/harness/php/composer.json @@ -0,0 +1,24 @@ +{ + "name": "temporal/harness", + "type": "project", + "description": "Temporal SDK Harness", + "keywords": ["temporal", "sdk", "harness"], + "license": "MIT", + "require": { + "buggregator/trap": "^1.9", + "spiral/core": "^3.13", + "symfony/process": ">=6.4", + "temporal/sdk": "^2.11.0", + "webmozart/assert": "^1.11" + }, + "autoload": { + "psr-4": { + "Harness\\": "src/" + } + }, + "scripts": { + "rr-get": "rr get" + }, + "prefer-stable": true, + "minimum-stability": "dev" +} diff --git a/harness/php/runner.php b/harness/php/runner.php new file mode 100644 index 00000000..b096b963 --- /dev/null +++ b/harness/php/runner.php @@ -0,0 +1,127 @@ +workflows(), false) !== [] || \iterator_to_array($runtime->activities(), false) !== []) { + $runner->start(); +} + +// Prepare and run checks + +// Prepare services to be injected + +$serviceClient = $runtime->command->tlsKey === null && $runtime->command->tlsCert === null + ? ServiceClient::create($runtime->address) + : ServiceClient::createSSL( + $runtime->address, + clientKey: $runtime->command->tlsKey, + clientPem: $runtime->command->tlsCert, + ); +echo "Connecting to Temporal service at {$runtime->address}... "; +try { + $serviceClient->getConnection()->connect(5); + echo "\e[1;32mOK\e[0m\n"; +} catch (\Throwable $e) { + echo "\e[1;31mFAILED\e[0m\n"; + Support::echoException($e); + return; +} + +// TODO if authKey is set +// $serviceClient->withAuthKey($authKey) + +$converter = DataConverter::createDefault(); + +$workflowClient = WorkflowClient::create( + serviceClient: $serviceClient, + options: (new ClientOptions())->withNamespace($runtime->namespace), + converter: $converter, +)->withTimeout(5); + +$scheduleClient = ScheduleClient::create( + serviceClient: $serviceClient, + options: (new ClientOptions())->withNamespace($runtime->namespace), + converter: $converter, +)->withTimeout(5); + +$container = new Spiral\Core\Container(); +$container->bindSingleton(State::class, $runtime); +$container->bindSingleton(Runner::class, $runner); +$container->bindSingleton(ServiceClientInterface::class, $serviceClient); +$container->bindSingleton(WorkflowClientInterface::class, $workflowClient); +$container->bindSingleton(ScheduleClientInterface::class, $scheduleClient); +$container->bindInjector(WorkflowStubInterface::class, WorkflowStubInjector::class); +$container->bindSingleton(DataConverterInterface::class, $converter); +$container->bind(RPCInterface::class, static fn() => RPC::create('tcp://127.0.0.1:6001')); +$container->bind( + StorageInterface::class, + fn (#[Proxy] ContainerInterface $c): StorageInterface => $c->get(Factory::class)->select('harness'), +); + +// Run checks +$errors = 0; +foreach ($runtime->checks() as $feature => $definition) { + try { + $container->runScope( + new Scope(name: 'feature',bindings: [ + Feature::class => $feature, + ]), + static function (Container $container) use ($definition) { + // todo modify services based on feature requirements + [$class, $method] = $definition; + $container->bindSingleton($class, $class); + echo "Running check \e[1;36m{$class}::{$method}\e[0m "; + $container->invoke($definition); + echo "\e[1;32mSUCCESS\e[0m\n"; + }, + ); + } catch (SkipTest $e) { + echo "\e[1;33mSKIPPED\e[0m\n"; + echo "\e[35m{$e->reason}\e[0m\n"; + } catch (\Throwable $e) { + echo "\e[1;31mFAILED\e[0m\n"; + + \trap($e); + ++$errors; + Support::echoException($e); + echo "\n"; + } finally { + $runner->start(); + } +} + +exit($errors === 0 ? 0 : 1); diff --git a/harness/php/src/Attribute/Check.php b/harness/php/src/Attribute/Check.php new file mode 100644 index 00000000..782b189f --- /dev/null +++ b/harness/php/src/Attribute/Check.php @@ -0,0 +1,13 @@ + $args + */ + public function __construct( + public string $type, + public bool $eagerStart = false, + public ?string $workflowId = null, + public array $args = [], + public array $memo = [], + ) { + } +} diff --git a/harness/php/src/ClassLocator.php b/harness/php/src/ClassLocator.php new file mode 100644 index 00000000..b5135036 --- /dev/null +++ b/harness/php/src/ClassLocator.php @@ -0,0 +1,38 @@ + + */ + public static function loadClasses(string $dir, string $namespace): iterable + { + $dir = \realpath($dir); + $files = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($dir, FilesystemIterator::SKIP_DOTS)); + + /** @var SplFileInfo $_ */ + foreach ($files as $path => $_) { + if (!\is_file($path) || !\str_ends_with($path, '.php')) { + continue; + } + + include_once $path; + } + + yield from \array_filter( + \get_declared_classes(), + static fn (string $class): bool => \str_starts_with($class, $namespace), + ); + } +} diff --git a/harness/php/src/Exception/SkipTest.php b/harness/php/src/Exception/SkipTest.php new file mode 100644 index 00000000..e69d840e --- /dev/null +++ b/harness/php/src/Exception/SkipTest.php @@ -0,0 +1,13 @@ + + */ +#[Singleton] +final class ClientFactory +{ + public function __construct( + #[Proxy] private readonly ContainerInterface $container, + #[Proxy] private readonly InvokerInterface $invoker, + ) { + } + + public function workflowClient(\ReflectionParameter $context): WorkflowClientInterface + { + /** @var Client|null $attribute */ + $attribute = ($context->getAttributes(Client::class)[0] ?? null)?->newInstance(); + + /** @var WorkflowClientInterface $client */ + $client = $this->container->get(WorkflowClientInterface::class); + + if ($attribute === null) { + return $client; + } + + if ($attribute->payloadConverters !== []) { + $converters = [ + new NullConverter(), + new BinaryConverter(), + new ProtoConverter(), + new ProtoJsonConverter(), + new JsonConverter(), + ]; + // Collect converters from all features + foreach ($attribute->payloadConverters as $converterClass) { + \array_unshift($converters, $this->container->get($converterClass)); + } + $converter = new DataConverter(...$converters); + } else { + $converter = $this->container->get(DataConverterInterface::class); + } + + /** @var PipelineProvider|null $pipelineProvider */ + $pipelineProvider = $attribute->pipelineProvider === null + ? null + : $this->invoker->invoke($attribute->pipelineProvider); + + // Build custom WorkflowClient with gRPC interceptor + $serviceClient = $client->getServiceClient() + ->withInterceptorPipeline($pipelineProvider->getPipeline(GrpcClientInterceptor::class)); + + /** @var State $runtime */ + $runtime = $this->container->get(State::class); + $client = WorkflowClient::create( + serviceClient: $serviceClient, + options: (new ClientOptions())->withNamespace($runtime->namespace), + converter: $converter, + interceptorProvider: $pipelineProvider, + )->withTimeout(5); + + $attribute->timeout === null or $client = $client->withTimeout($attribute->timeout); + + return $client; + } +} diff --git a/harness/php/src/Feature/WorkflowStubInjector.php b/harness/php/src/Feature/WorkflowStubInjector.php new file mode 100644 index 00000000..b8bb864f --- /dev/null +++ b/harness/php/src/Feature/WorkflowStubInjector.php @@ -0,0 +1,64 @@ + + */ +final class WorkflowStubInjector implements InjectorInterface +{ + public function __construct( + #[Proxy] private readonly ContainerInterface $container, + private readonly ClientFactory $clientFactory, + ) { + } + + public function createInjection( + \ReflectionClass $class, + \ReflectionParameter|null|string $context = null, + ): WorkflowStubInterface { + if (!$context instanceof \ReflectionParameter) { + throw new \InvalidArgumentException('Context is not clear.'); + } + + /** @var Stub|null $attribute */ + $attribute = ($context->getAttributes(Stub::class)[0] ?? null)?->newInstance(); + if ($attribute === null) { + throw new \InvalidArgumentException(\sprintf('Attribute %s is not found.', Stub::class)); + } + + $client = $this->clientFactory->workflowClient($context); + + if ($attribute->eagerStart) { + // If the server does not support eager start, skip the test + $client->getServiceClient()->getServerCapabilities()->eagerWorkflowStart or throw new SkipTest( + 'Eager workflow start is not supported by the server.' + ); + } + + /** @var Feature $feature */ + $feature = $this->container->get(Feature::class); + $options = WorkflowOptions::new() + ->withTaskQueue($feature->taskQueue) + ->withEagerStart($attribute->eagerStart); + + $attribute->workflowId === null or $options = $options->withWorkflowId($attribute->workflowId); + $attribute->memo === [] or $options = $options->withMemo($attribute->memo); + + $stub = $client->newUntypedWorkflowStub($attribute->type, $options); + $client->start($stub, ...$attribute->args); + + return $stub; + } +} diff --git a/harness/php/src/Input/Command.php b/harness/php/src/Input/Command.php new file mode 100644 index 00000000..1c39a347 --- /dev/null +++ b/harness/php/src/Input/Command.php @@ -0,0 +1,88 @@ + */ + public array $features = []; + + /** @var non-empty-string|null */ + public ?string $tlsKey = null; + + /** @var non-empty-string|null */ + public ?string $tlsCert = null; + + public static function fromCommandLine(array $argv): self + { + $self = new self(); + + \array_shift($argv); // remove the script name (worker.php or runner.php) + foreach ($argv as $chunk) { + if (\str_starts_with($chunk, 'namespace=')) { + $self->namespace = \substr($chunk, 10); + continue; + } + + if (\str_starts_with($chunk, 'address=')) { + $self->address = \substr($chunk, 8); + continue; + } + + if (\str_starts_with($chunk, 'tls.cert=')) { + $self->tlsCert = \substr($chunk, 9); + continue; + } + + if (\str_starts_with($chunk, 'tls.key=')) { + $self->tlsKey = \substr($chunk, 8); + continue; + } + + if (!\str_contains($chunk, ':')) { + continue; + } + + [$dir, $taskQueue] = \explode(':', $chunk, 2); + $self->features[] = new Feature( + dir: $dir, + namespace: 'Harness\\Feature\\' . self::namespaceFromPath($dir), + taskQueue: $taskQueue, + ); + } + + return $self; + } + + /** + * @return list CLI arguments that can be parsed by `fromCommandLine` + */ + public function toCommandLineArguments(): array + { + $result = []; + $this->namespace === null or $result[] = "namespace=$this->namespace"; + $this->address === null or $result[] = "address=$this->address"; + $this->tlsCert === null or $result[] = "tls.cert=$this->tlsCert"; + $this->tlsKey === null or $result[] = "tls.key=$this->tlsKey"; + foreach ($this->features as $feature) { + $result[] = "{$feature->dir}:{$feature->taskQueue}"; + } + + return $result; + } + + private static function namespaceFromPath(string $dir): string + { + $normalized = \str_replace('/', '\\', \trim($dir, '/\\')) . '\\'; + // snake_case to PascalCase: + return \str_replace('_', '', \ucwords($normalized, '_\\')); + } +} diff --git a/harness/php/src/Input/Feature.php b/harness/php/src/Input/Feature.php new file mode 100644 index 00000000..b3cc2bff --- /dev/null +++ b/harness/php/src/Input/Feature.php @@ -0,0 +1,15 @@ + Workflow classes */ + public array $workflows = []; + + /** @var list Activity classes */ + public array $activities = []; + + /** @var list> Lazy callables */ + public array $checks = []; + + /** @var list> Lazy callables */ + public array $converters = []; + + public function __construct( + public readonly string $taskQueue, + ) { + } +} diff --git a/harness/php/src/Runtime/Runner.php b/harness/php/src/Runtime/Runner.php new file mode 100644 index 00000000..74f50ac9 --- /dev/null +++ b/harness/php/src/Runtime/Runner.php @@ -0,0 +1,63 @@ +environment = Environment::create(); + \register_shutdown_function(fn() => $this->stop()); + } + + public function start(): void + { + if ($this->started) { + return; + } + + $run = $this->runtime->command; + $rrCommand = [ + $this->runtime->workDir . DIRECTORY_SEPARATOR . 'rr', + 'serve', + '-w', + $this->runtime->workDir, + '-o', + "temporal.namespace={$this->runtime->namespace}", + '-o', + "temporal.address={$this->runtime->address}", + '-o', + 'server.command=' . \implode(',', [ + 'php', + $this->runtime->sourceDir . DIRECTORY_SEPARATOR . 'worker.php', + ...$run->toCommandLineArguments(), + ]), + ]; + $run->tlsKey === null or $rrCommand = [...$rrCommand, '-o', "tls.key={$run->tlsKey}"]; + $run->tlsCert === null or $rrCommand = [...$rrCommand, '-o', "tls.cert={$run->tlsCert}"]; + $command = \implode(' ', $rrCommand); + + // echo "\e[1;36mStart RoadRunner with command:\e[0m {$command}\n"; + $this->environment->startRoadRunner($command); + $this->started = true; + } + + public function stop(): void + { + if (!$this->started) { + return; + } + + // echo "\e[1;36mStop RoadRunner\e[0m\n"; + $this->environment->stop(); + $this->started = false; + } +} \ No newline at end of file diff --git a/harness/php/src/Runtime/State.php b/harness/php/src/Runtime/State.php new file mode 100644 index 00000000..122aabba --- /dev/null +++ b/harness/php/src/Runtime/State.php @@ -0,0 +1,127 @@ + */ + public array $features = []; + + /** @var non-empty-string */ + public string $namespace; + + /** @var non-empty-string */ + public string $address; + + /** + * @param non-empty-string $sourceDir Dir with rr.yaml, composer.json, etc + * @param non-empty-string $workDir Dir where tests are run + */ + public function __construct( + public readonly Command $command, + public readonly string $sourceDir, + public readonly string $workDir, + ) { + $this->namespace = $command->namespace ?? 'default'; + $this->address = $command->address ?? 'localhost:7233'; + } + + /** + * Iterate over all the Workflows. + * + * @return \Traversable + */ + public function workflows(): \Traversable + { + foreach ($this->features as $feature) { + foreach ($feature->workflows as $workflow) { + yield $feature => $workflow; + } + } + } + + /** + * Iterate over all the Activities. + * + * @return \Traversable + */ + public function activities(): \Traversable + { + foreach ($this->features as $feature) { + foreach ($feature->activities as $activity) { + yield $feature => $activity; + } + } + } + + /** + * Iterate over all the Payload Converters. + * + * @return \Traversable> + */ + public function converters(): \Traversable + { + foreach ($this->features as $feature) { + foreach ($feature->converters as $converter) { + yield $feature => $converter; + } + } + } + + /** + * Iterate over all the Checks. + * + * @return \Traversable + */ + public function checks(): \Traversable + { + foreach ($this->features as $feature) { + foreach ($feature->checks as $check) { + yield $feature => $check; + } + } + } + + /** + * @param class-string $class + */ + public function addConverter(\Harness\Input\Feature $inputFeature, string $class): void + { + $this->getFeature($inputFeature)->converters[] = $class; + } + + /** + * @param class-string $class + * @param non-empty-string $method + */ + public function addCheck(\Harness\Input\Feature $inputFeature, string $class, string $method): void + { + $this->getFeature($inputFeature)->checks[] = [$class, $method]; + } + + /** + * @param class-string $class + */ + public function addWorkflow(\Harness\Input\Feature $inputFeature, string $class): void + { + $this->getFeature($inputFeature)->workflows[] = $class; + } + + /** + * @param class-string $class + */ + public function addActivity(\Harness\Input\Feature $inputFeature, string $class): void + { + $this->getFeature($inputFeature)->activities[] = $class; + } + + private function getFeature(\Harness\Input\Feature $feature): Feature + { + return $this->features[$feature->namespace] ??= new Feature($feature->taskQueue); + } +} \ No newline at end of file diff --git a/harness/php/src/RuntimeBuilder.php b/harness/php/src/RuntimeBuilder.php new file mode 100644 index 00000000..cc7e3da3 --- /dev/null +++ b/harness/php/src/RuntimeBuilder.php @@ -0,0 +1,72 @@ + $class) { + # Register Workflow + $class->getAttributes(WorkflowInterface::class) === [] or $runtime + ->addWorkflow($feature, $class->getName()); + + # Register Activity + $class->getAttributes(ActivityInterface::class) === [] or $runtime + ->addActivity($feature, $class->getName()); + + # Register Converters + $class->implementsInterface(PayloadConverterInterface::class) and $runtime + ->addConverter($feature, $class->getName()); + + # Register Check + foreach ($class->getMethods() as $method) { + $method->getAttributes(Check::class) === [] or $runtime + ->addCheck($feature, $class->getName(), $method->getName()); + } + } + + return $runtime; + } + + public static function init(): void + { + \ini_set('display_errors', 'stderr'); + include 'vendor/autoload.php'; + + \spl_autoload_register(static function (string $class): void { + if (\str_starts_with($class, 'Harness\\')) { + $file = \str_replace('\\', '/', \substr($class, 8)) . '.php'; + $path = __DIR__ . '/' . $file; + \is_file($path) and require $path; + } + }); + } + + /** + * @param non-empty-string $featuresDir + * @return iterable + */ + private static function iterateClasses(string $featuresDir, Command $run): iterable + { + foreach ($run->features as $feature) { + foreach (ClassLocator::loadClasses($featuresDir . $feature->dir, $feature->namespace) as $class) { + yield $feature => new \ReflectionClass($class); + } + } + } +} \ No newline at end of file diff --git a/harness/php/src/Support.php b/harness/php/src/Support.php new file mode 100644 index 00000000..3cc9647c --- /dev/null +++ b/harness/php/src/Support.php @@ -0,0 +1,28 @@ +getTrace(), static fn(array $trace): bool => + isset($trace['file']) && + !\str_contains($trace['file'], DIRECTORY_SEPARATOR . 'vendor' . DIRECTORY_SEPARATOR), + ); + if ($trace !== []) { + $line = \reset($trace); + echo "-> \e[1;33m{$line['file']}:{$line['line']}\e[0m\n"; + } + + do { + /** @var \Throwable $err */ + $name = \ltrim(\strrchr($e::class, "\\") ?: $e::class, "\\"); + echo "\e[1;34m$name\e[0m\n"; + echo "\e[3m{$e->getMessage()}\e[0m\n"; + $e = $e->getPrevious(); + } while ($e !== null); + } +} diff --git a/harness/php/worker.php b/harness/php/worker.php new file mode 100644 index 00000000..9bbcd8ec --- /dev/null +++ b/harness/php/worker.php @@ -0,0 +1,104 @@ + $run */ +$workers = []; + +FeatureFlags::$workflowDeferredHandlerStart = true; + +try { + // Load runtime options + $runtime = RuntimeBuilder::createState($argv, \getcwd()); + $run = $runtime->command; + // Init container + $container = new Spiral\Core\Container(); + + $converters = [ + new NullConverter(), + new BinaryConverter(), + new ProtoJsonConverter(), + new ProtoConverter(), + new JsonConverter(), + ]; + // Collect converters from all features + foreach ($runtime->converters() as $feature => $converter) { + \array_unshift($converters, $container->get($converter)); + } + $converter = new DataConverter(...$converters); + $container->bindSingleton(DataConverter::class, $converter); + + $factory = WorkerFactory::create(converter: $converter); + $getWorker = static function (string $taskQueue) use (&$workers, $factory): WorkerInterface { + return $workers[$taskQueue] ??= $factory->newWorker( + $taskQueue, + WorkerOptions::new()->withMaxConcurrentActivityExecutionSize(10) + ); + }; + + // Create client services + $serviceClient = $runtime->command->tlsKey === null && $runtime->command->tlsCert === null + ? ServiceClient::create($runtime->address) + : ServiceClient::createSSL( + $runtime->address, + clientKey: $runtime->command->tlsKey, + clientPem: $runtime->command->tlsCert, + ); + $options = (new ClientOptions())->withNamespace($runtime->namespace); + $workflowClient = WorkflowClient::create(serviceClient: $serviceClient, options: $options, converter: $converter); + $scheduleClient = ScheduleClient::create(serviceClient: $serviceClient, options: $options, converter: $converter); + + // Bind services + $container->bindSingleton(State::class, $runtime); + $container->bindSingleton(ServiceClientInterface::class, $serviceClient); + $container->bindSingleton(WorkflowClientInterface::class, $workflowClient); + $container->bindSingleton(ScheduleClientInterface::class, $scheduleClient); + $container->bindSingleton(RPCInterface::class, RPC::create('tcp://127.0.0.1:6001')); + $container->bind( + StorageInterface::class, + fn (#[Proxy] ContainerInterface $c): StorageInterface => $c->get(Factory::class)->select('harness'), + ); + + // Register Workflows + foreach ($runtime->workflows() as $feature => $workflow) { + $getWorker($feature->taskQueue)->registerWorkflowTypes($workflow); + } + + // Register Activities + foreach ($runtime->activities() as $feature => $activity) { + $getWorker($feature->taskQueue)->registerActivityImplementations($container->make($activity)); + } + + $factory->run(); +} catch (\Throwable $e) { + \td($e); +} diff --git a/sdkbuild/php.go b/sdkbuild/php.go new file mode 100644 index 00000000..58e5ef91 --- /dev/null +++ b/sdkbuild/php.go @@ -0,0 +1,135 @@ +package sdkbuild + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" +) + +// BuildPhpProgramOptions are options for BuildPhpProgram. +type BuildPhpProgramOptions struct { + // If not set, the default version from composer.json is used. + Version string + // If present, this directory is expected to exist beneath base dir. Otherwise + // a temporary dir is created. + DirName string + RootDir string +} + +// PhpProgram is a PHP-specific implementation of Program. +type PhpProgram struct { + dir string + source string +} + +var _ Program = (*PhpProgram)(nil) + +// BuildPhpProgram builds a PHP program. If completed successfully, this +// can be stored and re-obtained via PhpProgramFromDir() with the Dir() value +func BuildPhpProgram(ctx context.Context, options BuildPhpProgramOptions) (*PhpProgram, error) { + // Working directory + // Create temp dir if needed that we will remove if creating is unsuccessful + var dir string + if options.DirName != "" { + dir = filepath.Join(options.RootDir, options.DirName) + } else { + var err error + dir, err = os.MkdirTemp(options.RootDir, "program-") + if err != nil { + return nil, fmt.Errorf("failed making temp dir: %w", err) + } + } + + sourceDir := GetSourceDir(options.RootDir) + + // Skip if installed + if st, err := os.Stat(filepath.Join(dir, "vendor")); err == nil && st.IsDir() { + return &PhpProgram{dir, sourceDir}, nil + } + + // Copy composer.json from sourceDir into dir + data, err := os.ReadFile(filepath.Join(sourceDir, "composer.json")) + if err != nil { + return nil, fmt.Errorf("failed reading composer.json file: %w", err) + } + err = os.WriteFile(filepath.Join(dir, "composer.json"), data, 0755) + if err != nil { + return nil, fmt.Errorf("failed writing composer.json file: %w", err) + } + + // Copy .rr.yaml from sourceDir into dir + data, err = os.ReadFile(filepath.Join(sourceDir, ".rr.yaml")) + if err != nil { + return nil, fmt.Errorf("failed reading .rr.yaml file: %w", err) + } + err = os.WriteFile(filepath.Join(dir, ".rr.yaml"), data, 0755) + if err != nil { + return nil, fmt.Errorf("failed writing .rr.yaml file: %w", err) + } + + var cmd *exec.Cmd + // Setup required SDK version if specified + if options.Version != "" { + cmd = exec.CommandContext(ctx, "composer", "req", "temporal/sdk", options.Version, "-W", "--no-install", "--ignore-platform-reqs") + cmd.Dir = dir + cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("failed installing SDK deps: %w", err) + } + } + + // Install dependencies via composer + cmd = exec.CommandContext(ctx, "composer", "i", "-n", "-o", "-q", "--no-scripts", "--ignore-platform-reqs") + cmd.Dir = dir + cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("failed installing SDK deps: %w", err) + } + + // Download RoadRunner + rrExe := filepath.Join(dir, "rr") + if runtime.GOOS == "windows" { + rrExe += ".exe" + } + _, err = os.Stat(rrExe) + if os.IsNotExist(err) { + cmd = exec.CommandContext(ctx, "composer", "run", "rr-get") + cmd.Dir = dir + cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("failed downloading RoadRunner: %w", err) + } + } + + return &PhpProgram{dir, sourceDir}, nil +} + +// PhpProgramFromDir recreates the Php program from a Dir() result of a +// BuildPhpProgram(). Note, the base directory of dir when it was built must +// also be present. +func PhpProgramFromDir(dir string, rootDir string) (*PhpProgram, error) { + // Quick sanity check on the presence of package.json here + if _, err := os.Stat(filepath.Join(dir, "composer.json")); err != nil { + return nil, fmt.Errorf("failed finding composer.json in dir: %w", err) + } + return &PhpProgram{dir, GetSourceDir(rootDir)}, nil +} + +func GetSourceDir(rootDir string) string { + return filepath.Join(rootDir, "harness", "php") +} + +// Dir is the directory to run in. +func (p *PhpProgram) Dir() string { return p.dir } + +// NewCommand makes a new RoadRunner run command +func (p *PhpProgram) NewCommand(ctx context.Context, args ...string) (*exec.Cmd, error) { + args = append([]string{filepath.Join(p.source, "runner.php")}, args...) + cmd := exec.CommandContext(ctx, "php", args...) + cmd.Dir = p.dir + cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr + return cmd, nil +}