From 0d37c6f45d810f01907bdbcc424b621185a0033f Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Mon, 18 Sep 2023 11:18:36 -0700 Subject: [PATCH] [#28187] Add standalone prism validates runner precommit (#28487) * Add container using standalone prism go precommit. * Add GoPrism precommit action. * Don't serve prism UI by default. --------- Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- .github/workflows/beam_PreCommit_GoPrism.yml | 89 +++++++++++++++++++ build.gradle.kts | 4 + sdks/go/README.md | 1 + sdks/go/cmd/prism/prism.go | 10 ++- .../runners/prism/internal/environments.go | 16 ++-- .../beam/runners/prism/internal/execute.go | 4 + .../runners/prism/internal/jobservices/job.go | 4 + .../runners/prism/internal/worker/worker.go | 11 +-- sdks/go/pkg/beam/runners/prism/prism.go | 6 +- sdks/go/test/build.gradle | 24 +++++ sdks/go/test/integration/integration.go | 4 +- sdks/go/test/run_validatesrunner_tests.sh | 16 ++-- 12 files changed, 164 insertions(+), 25 deletions(-) create mode 100644 .github/workflows/beam_PreCommit_GoPrism.yml diff --git a/.github/workflows/beam_PreCommit_GoPrism.yml b/.github/workflows/beam_PreCommit_GoPrism.yml new file mode 100644 index 000000000000..8091c23792c6 --- /dev/null +++ b/.github/workflows/beam_PreCommit_GoPrism.yml @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PreCommit GoPrism + +on: + push: + tags: ['v*'] + branches: ['master', 'release-*'] + paths: ['model/**', 'sdks/go.**', 'release/**','.github/workflows/beam_PreCommit_GoPrism.yml'] + pull_request_target: + branches: ['master', 'release-*'] + paths: ['model/**', 'sdks/go.**', 'release/**'] + issue_comment: + types: [created] + schedule: + - cron: '0 */6 * * *' + workflow_dispatch: + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.body || github.event.sender.login}}' + cancel-in-progress: true + +env: + GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +# Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: read + checks: read + contents: read + deployments: read + id-token: none + issues: read + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +jobs: + beam_PreCommit_GoPrism: + name: ${{matrix.job_name}} (${{ matrix.job_phrase }}) + runs-on: [self-hosted, ubuntu-20.04, main] + strategy: + matrix: + job_name: [beam_PreCommit_GoPrism] + job_phrase: [Run GoPrism PreCommit] + timeout-minutes: 120 + if: | + github.event_name == 'push' || + github.event_name == 'pull_request_target' || + github.event_name == 'schedule' || + github.event_name == 'workflow_dispatch' || + github.event.comment.body == 'Run GoPrism PreCommit' + steps: + - uses: actions/checkout@v3 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup self-hosted + uses: ./.github/actions/setup-self-hosted-action + with: + requires-py-39: false + requires-go: false + - name: Run goPrismPreCommit script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :goPrismPreCommit \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 0d23861a495b..7bd847895293 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -443,6 +443,10 @@ tasks.register("goPortablePreCommit") { dependsOn(":sdks:go:test:ulrValidatesRunner") } +tasks.register("goPrismPreCommit") { + dependsOn(":sdks:go:test:prismValidatesRunner") +} + tasks.register("goPostCommitDataflowARM") { dependsOn(":sdks:go:test:dataflowValidatesRunnerARM64") } diff --git a/sdks/go/README.md b/sdks/go/README.md index a3b03c2e6184..7734d58d9eb9 100644 --- a/sdks/go/README.md +++ b/sdks/go/README.md @@ -131,6 +131,7 @@ Executing all unit tests for the SDK is possible from the `\sdks\go` To test your change as Jenkins would execute it from a PR, from the beam root directory, run: * `./gradlew :sdks:go:goTest` executes the unit tests. + * `./gradlew :sdks:go:test:prismValidatesRunner` validates the SDK against the Go Prism runner as a stand alone binary, with containers. * `./gradlew :sdks:go:test:ulrValidatesRunner` validates the SDK against the Portable Python runner. * `./gradlew :sdks:go:test:flinkValidatesRunner` validates the SDK against the Flink runner. diff --git a/sdks/go/cmd/prism/prism.go b/sdks/go/cmd/prism/prism.go index f00a16c9b2d0..804ae0c2ab2d 100644 --- a/sdks/go/cmd/prism/prism.go +++ b/sdks/go/cmd/prism/prism.go @@ -30,6 +30,8 @@ import ( ) var ( + jobPort = flag.Int("job_port", 8073, "specify the job management service port") + webPort = flag.Int("web_port", 8074, "specify the web ui port") jobManagerEndpoint = flag.String("jm_override", "", "set to only stand up a web ui that refers to a seperate JobManagement endpoint") serveHTTP = flag.Bool("serve_http", true, "enable or disable the web ui") ) @@ -37,12 +39,12 @@ var ( func main() { flag.Parse() ctx := context.Background() - cli, err := makeJobClient(ctx, *jobManagerEndpoint) + cli, err := makeJobClient(ctx, prism.Options{Port: *jobPort}, *jobManagerEndpoint) if err != nil { log.Fatalf("error creating job server: %v", err) } if *serveHTTP { - if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: 8074}); err != nil { + if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: *webPort}); err != nil { log.Fatalf("error creating web server: %v", err) } } else { @@ -51,7 +53,7 @@ func main() { } } -func makeJobClient(ctx context.Context, endpoint string) (jobpb.JobServiceClient, error) { +func makeJobClient(ctx context.Context, opts prism.Options, endpoint string) (jobpb.JobServiceClient, error) { if endpoint != "" { clientConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) if err != nil { @@ -59,7 +61,7 @@ func makeJobClient(ctx context.Context, endpoint string) (jobpb.JobServiceClient } return jobpb.NewJobServiceClient(clientConn), nil } - cli, err := prism.CreateJobServer(ctx, prism.Options{Port: 8073}) + cli, err := prism.CreateJobServer(ctx, opts) if err != nil { return nil, fmt.Errorf("error creating local job server: %v", err) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 5830325bd054..d4fb6ad5b3e1 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -129,13 +129,15 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock envs = append(envs, credEnv) } } - - if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil { - // Copy the output, but discard it so we can wait until the image pull is finished. - io.Copy(io.Discard, rc) - rc.Close() - } else { - logger.Warn("unable to pull image", "error", err) + if _, _, err := cli.ImageInspectWithRaw(ctx, dp.GetContainerImage()); err != nil { + // We don't have a local image, so we should pull it. + if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil { + // Copy the output, but discard it so we can wait until the image pull is finished. + io.Copy(io.Discard, rc) + rc.Close() + } else { + logger.Warn("unable to pull image and it's not local", "error", err) + } } ccr, err := cli.ContainerCreate(ctx, &container.Config{ diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index e0c67105d451..cf04381b9cbe 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -81,10 +81,14 @@ func RunPipeline(j *jobservices.Job) { // makeWorker creates a worker for that environment. func makeWorker(env string, j *jobservices.Job) (*worker.W, error) { wk := worker.New(j.String()+"_"+env, env) + wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env] + wk.PipelineOptions = j.PipelineOptions() wk.JobKey = j.JobKey() wk.ArtifactEndpoint = j.ArtifactEndpoint() + go wk.Serve() + if err := runEnvironment(j.RootCtx, j, env, wk); err != nil { return nil, fmt.Errorf("failed to start environment %v for job %v: %w", env, j, err) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 87b0ec007bfb..cd302a70fcc0 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -94,6 +94,10 @@ func (j *Job) ArtifactEndpoint() string { return j.artifactEndpoint } +func (j *Job) PipelineOptions() *structpb.Struct { + return j.options +} + // ContributeTentativeMetrics returns the datachannel read index, and any unknown monitoring short ids. func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) { return j.metrics.ContributeTentativeMetrics(payloads) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 3a862a143b73..f33ff178c46d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -43,6 +43,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/types/known/structpb" ) // A W manages worker environments, sending them work @@ -59,6 +60,7 @@ type W struct { JobKey, ArtifactEndpoint string EnvPb *pipepb.Environment + PipelineOptions *structpb.Struct // Server management lis net.Listener @@ -163,7 +165,6 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest } resp := &fnpb.GetProvisionInfoResponse{ Info: &fnpb.ProvisionInfo{ - // TODO: Add the job's Pipeline options // TODO: Include runner capabilities with the per job configuration. RunnerCapabilities: []string{ urns.CapabilityMonitoringInfoShortIDs, @@ -174,14 +175,14 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest Url: wk.ArtifactEndpoint, }, - RetrievalToken: wk.JobKey, - Dependencies: wk.EnvPb.GetDependencies(), - - // TODO add this job's artifact Dependencies + RetrievalToken: wk.JobKey, + Dependencies: wk.EnvPb.GetDependencies(), + PipelineOptions: wk.PipelineOptions, Metadata: map[string]string{ "runner": "prism", "runner_version": core.SdkVersion, + "variant": "test", }, }, } diff --git a/sdks/go/pkg/beam/runners/prism/prism.go b/sdks/go/pkg/beam/runners/prism/prism.go index 0be35ad5cc33..bcb7a3fb689f 100644 --- a/sdks/go/pkg/beam/runners/prism/prism.go +++ b/sdks/go/pkg/beam/runners/prism/prism.go @@ -49,9 +49,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) s := jobservices.NewServer(0, internal.RunPipeline) *jobopts.Endpoint = s.Endpoint() go s.Serve() - } - if !jobopts.IsLoopback() { - *jobopts.EnvironmentType = "loopback" + if !jobopts.IsLoopback() { + *jobopts.EnvironmentType = "loopback" + } } return universal.Execute(ctx, p) } diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index d53491194753..5b39cf81400f 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -173,6 +173,30 @@ tasks.register("ulrValidatesRunner") { } } +// ValidatesRunner tests for Prism. Runs tests in the integration directory +// with prism in docker mod to validate that the runner behaves as expected. +task prismValidatesRunner { + group = "Verification" + + dependsOn ":sdks:go:test:goBuild" + dependsOn ":sdks:go:container:docker" + dependsOn ":sdks:java:container:java8:docker" + dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar" + doLast { + def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags. + "--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + ] + def options = [ + "--runner prism", + "--pipeline_opts \"${pipelineOptions.join(' ')}\"", + ] + exec { + executable "sh" + args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}" + } + } +} + // A method for configuring a cross-language validates runner test task, // intended to be used in calls to createCrossLanguageValidatesRunnerTask. ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts -> diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index dee161dcb2af..bb7f5275a163 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -38,11 +38,11 @@ package integration import ( "fmt" "math/rand" + "os" "regexp" "strings" "testing" "time" - "os" // common runner flag. "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" @@ -140,6 +140,8 @@ var portableFilters = []string{ } var prismFilters = []string{ + // The prism runner does not yet support cross-language. + "TestXLang.*", // The prism runner does not support the TestStream primitive "TestTestStream.*", // The trigger and pane tests uses TestStream diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh index c25d59bd57b0..fd4856f25a0f 100755 --- a/sdks/go/test/run_validatesrunner_tests.sh +++ b/sdks/go/test/run_validatesrunner_tests.sh @@ -257,8 +257,10 @@ print(s.getsockname()[1]) s.close() " +TMPDIR=$(mktemp -d) + # Set up environment based on runner. -if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$RUNNER" == "portable" ]]; then +if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$RUNNER" == "portable" || "$RUNNER" == "prism" ]]; then if [[ -z "$ENDPOINT" ]]; then JOB_PORT=$(python3 -c "$SOCKET_SCRIPT") ENDPOINT="localhost:$JOB_PORT" @@ -288,6 +290,10 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$ python3 \ -m apache_beam.runners.portability.local_job_service_main \ --port $JOB_PORT & + elif [[ "$RUNNER" == "prism" ]]; then + PRISMBIN=$TMPDIR/prismbin + ./sdks/go/run_with_go_version.sh build -o $PRISMBIN sdks/go/cmd/prism/*.go + $PRISMBIN --job_port $JOB_PORT & else echo "Unknown runner: $RUNNER" exit 1; @@ -340,7 +346,6 @@ if [[ "$RUNNER" == "dataflow" ]]; then gcloud --version # ensure gcloud is version 186 or above - TMPDIR=$(mktemp -d) gcloud_ver=$(gcloud -v | head -1 | awk '{print $4}') if [[ "$gcloud_ver" < "186" ]] then @@ -402,6 +407,7 @@ fi ARGS="$ARGS -p $SIMULTANEOUS" # Assemble test arguments and pipeline options. +ARGS="$ARGS -v" ARGS="$ARGS -timeout $TIMEOUT" ARGS="$ARGS --runner=$RUNNER" ARGS="$ARGS --project=$DATAFLOW_PROJECT" @@ -449,9 +455,9 @@ if [[ "$RUNNER" == "dataflow" ]]; then docker rmi $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to remove container" gcloud --quiet container images delete $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to delete container" fi - - # Clean up tempdir - rm -rf $TMPDIR fi +# Clean up tempdir +rm -rf $TMPDIR + exit $TEST_EXIT_CODE