Skip to content

Commit

Permalink
[#28187] Add standalone prism validates runner precommit (#28487)
Browse files Browse the repository at this point in the history
* Add container using standalone prism go precommit.

* Add GoPrism precommit action.

* Don't serve prism UI by default.

---------

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
lostluck and lostluck authored Sep 18, 2023
1 parent 91842e6 commit 0d37c6f
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 25 deletions.
89 changes: 89 additions & 0 deletions .github/workflows/beam_PreCommit_GoPrism.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: PreCommit GoPrism

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths: ['model/**', 'sdks/go.**', 'release/**','.github/workflows/beam_PreCommit_GoPrism.yml']
pull_request_target:
branches: ['master', 'release-*']
paths: ['model/**', 'sdks/go.**', 'release/**']
issue_comment:
types: [created]
schedule:
- cron: '0 */6 * * *'
workflow_dispatch:

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.body || github.event.sender.login}}'
cancel-in-progress: true

env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

# Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: read
checks: read
contents: read
deployments: read
id-token: none
issues: read
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

jobs:
beam_PreCommit_GoPrism:
name: ${{matrix.job_name}} (${{ matrix.job_phrase }})
runs-on: [self-hosted, ubuntu-20.04, main]
strategy:
matrix:
job_name: [beam_PreCommit_GoPrism]
job_phrase: [Run GoPrism PreCommit]
timeout-minutes: 120
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
github.event_name == 'schedule' ||
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run GoPrism PreCommit'
steps:
- uses: actions/checkout@v3
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup self-hosted
uses: ./.github/actions/setup-self-hosted-action
with:
requires-py-39: false
requires-go: false
- name: Run goPrismPreCommit script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :goPrismPreCommit
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ tasks.register("goPortablePreCommit") {
dependsOn(":sdks:go:test:ulrValidatesRunner")
}

tasks.register("goPrismPreCommit") {
dependsOn(":sdks:go:test:prismValidatesRunner")
}

tasks.register("goPostCommitDataflowARM") {
dependsOn(":sdks:go:test:dataflowValidatesRunnerARM64")
}
Expand Down
1 change: 1 addition & 0 deletions sdks/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Executing all unit tests for the SDK is possible from the `<beam root>\sdks\go`
To test your change as Jenkins would execute it from a PR, from the
beam root directory, run:
* `./gradlew :sdks:go:goTest` executes the unit tests.
* `./gradlew :sdks:go:test:prismValidatesRunner` validates the SDK against the Go Prism runner as a stand alone binary, with containers.
* `./gradlew :sdks:go:test:ulrValidatesRunner` validates the SDK against the Portable Python runner.
* `./gradlew :sdks:go:test:flinkValidatesRunner` validates the SDK against the Flink runner.

Expand Down
10 changes: 6 additions & 4 deletions sdks/go/cmd/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ import (
)

var (
jobPort = flag.Int("job_port", 8073, "specify the job management service port")
webPort = flag.Int("web_port", 8074, "specify the web ui port")
jobManagerEndpoint = flag.String("jm_override", "", "set to only stand up a web ui that refers to a seperate JobManagement endpoint")
serveHTTP = flag.Bool("serve_http", true, "enable or disable the web ui")
)

func main() {
flag.Parse()
ctx := context.Background()
cli, err := makeJobClient(ctx, *jobManagerEndpoint)
cli, err := makeJobClient(ctx, prism.Options{Port: *jobPort}, *jobManagerEndpoint)
if err != nil {
log.Fatalf("error creating job server: %v", err)
}
if *serveHTTP {
if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: 8074}); err != nil {
if err := prism.CreateWebServer(ctx, cli, prism.Options{Port: *webPort}); err != nil {
log.Fatalf("error creating web server: %v", err)
}
} else {
Expand All @@ -51,15 +53,15 @@ func main() {
}
}

func makeJobClient(ctx context.Context, endpoint string) (jobpb.JobServiceClient, error) {
func makeJobClient(ctx context.Context, opts prism.Options, endpoint string) (jobpb.JobServiceClient, error) {
if endpoint != "" {
clientConn, err := grpc.DialContext(ctx, endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
return nil, fmt.Errorf("error connecting to job server at %v: %v", endpoint, err)
}
return jobpb.NewJobServiceClient(clientConn), nil
}
cli, err := prism.CreateJobServer(ctx, prism.Options{Port: 8073})
cli, err := prism.CreateJobServer(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error creating local job server: %v", err)
}
Expand Down
16 changes: 9 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,15 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
envs = append(envs, credEnv)
}
}

if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil {
// Copy the output, but discard it so we can wait until the image pull is finished.
io.Copy(io.Discard, rc)
rc.Close()
} else {
logger.Warn("unable to pull image", "error", err)
if _, _, err := cli.ImageInspectWithRaw(ctx, dp.GetContainerImage()); err != nil {
// We don't have a local image, so we should pull it.
if rc, err := cli.ImagePull(ctx, dp.GetContainerImage(), dtyp.ImagePullOptions{}); err == nil {
// Copy the output, but discard it so we can wait until the image pull is finished.
io.Copy(io.Discard, rc)
rc.Close()
} else {
logger.Warn("unable to pull image and it's not local", "error", err)
}
}

ccr, err := cli.ContainerCreate(ctx, &container.Config{
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,14 @@ func RunPipeline(j *jobservices.Job) {
// makeWorker creates a worker for that environment.
func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
wk := worker.New(j.String()+"_"+env, env)

wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
wk.PipelineOptions = j.PipelineOptions()
wk.JobKey = j.JobKey()
wk.ArtifactEndpoint = j.ArtifactEndpoint()

go wk.Serve()

if err := runEnvironment(j.RootCtx, j, env, wk); err != nil {
return nil, fmt.Errorf("failed to start environment %v for job %v: %w", env, j, err)
}
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (j *Job) ArtifactEndpoint() string {
return j.artifactEndpoint
}

func (j *Job) PipelineOptions() *structpb.Struct {
return j.options
}

// ContributeTentativeMetrics returns the datachannel read index, and any unknown monitoring short ids.
func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) {
return j.metrics.ContributeTentativeMetrics(payloads)
Expand Down
11 changes: 6 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/types/known/structpb"
)

// A W manages worker environments, sending them work
Expand All @@ -59,6 +60,7 @@ type W struct {

JobKey, ArtifactEndpoint string
EnvPb *pipepb.Environment
PipelineOptions *structpb.Struct

// Server management
lis net.Listener
Expand Down Expand Up @@ -163,7 +165,6 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest
}
resp := &fnpb.GetProvisionInfoResponse{
Info: &fnpb.ProvisionInfo{
// TODO: Add the job's Pipeline options
// TODO: Include runner capabilities with the per job configuration.
RunnerCapabilities: []string{
urns.CapabilityMonitoringInfoShortIDs,
Expand All @@ -174,14 +175,14 @@ func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest
Url: wk.ArtifactEndpoint,
},

RetrievalToken: wk.JobKey,
Dependencies: wk.EnvPb.GetDependencies(),

// TODO add this job's artifact Dependencies
RetrievalToken: wk.JobKey,
Dependencies: wk.EnvPb.GetDependencies(),
PipelineOptions: wk.PipelineOptions,

Metadata: map[string]string{
"runner": "prism",
"runner_version": core.SdkVersion,
"variant": "test",
},
},
}
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/runners/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
s := jobservices.NewServer(0, internal.RunPipeline)
*jobopts.Endpoint = s.Endpoint()
go s.Serve()
}
if !jobopts.IsLoopback() {
*jobopts.EnvironmentType = "loopback"
if !jobopts.IsLoopback() {
*jobopts.EnvironmentType = "loopback"
}
}
return universal.Execute(ctx, p)
}
Expand Down
24 changes: 24 additions & 0 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,30 @@ tasks.register("ulrValidatesRunner") {
}
}

// ValidatesRunner tests for Prism. Runs tests in the integration directory
// with prism in docker mod to validate that the runner behaves as expected.
task prismValidatesRunner {
group = "Verification"

dependsOn ":sdks:go:test:goBuild"
dependsOn ":sdks:go:container:docker"
dependsOn ":sdks:java:container:java8:docker"
dependsOn ":sdks:java:testing:expansion-service:buildTestExpansionServiceJar"
doLast {
def pipelineOptions = [ // Pipeline options piped directly to Go SDK flags.
"--expansion_jar=test:${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
def options = [
"--runner prism",
"--pipeline_opts \"${pipelineOptions.join(' ')}\"",
]
exec {
executable "sh"
args "-c", "./run_validatesrunner_tests.sh ${options.join(' ')}"
}
}
}

// A method for configuring a cross-language validates runner test task,
// intended to be used in calls to createCrossLanguageValidatesRunnerTask.
ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, pipelineOpts ->
Expand Down
4 changes: 3 additions & 1 deletion sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ package integration
import (
"fmt"
"math/rand"
"os"
"regexp"
"strings"
"testing"
"time"
"os"

// common runner flag.
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
Expand Down Expand Up @@ -140,6 +140,8 @@ var portableFilters = []string{
}

var prismFilters = []string{
// The prism runner does not yet support cross-language.
"TestXLang.*",
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
Expand Down
16 changes: 11 additions & 5 deletions sdks/go/test/run_validatesrunner_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,10 @@ print(s.getsockname()[1])
s.close()
"

TMPDIR=$(mktemp -d)

# Set up environment based on runner.
if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$RUNNER" == "portable" ]]; then
if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$RUNNER" == "portable" || "$RUNNER" == "prism" ]]; then
if [[ -z "$ENDPOINT" ]]; then
JOB_PORT=$(python3 -c "$SOCKET_SCRIPT")
ENDPOINT="localhost:$JOB_PORT"
Expand Down Expand Up @@ -288,6 +290,10 @@ if [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "samza" || "$
python3 \
-m apache_beam.runners.portability.local_job_service_main \
--port $JOB_PORT &
elif [[ "$RUNNER" == "prism" ]]; then
PRISMBIN=$TMPDIR/prismbin
./sdks/go/run_with_go_version.sh build -o $PRISMBIN sdks/go/cmd/prism/*.go
$PRISMBIN --job_port $JOB_PORT &
else
echo "Unknown runner: $RUNNER"
exit 1;
Expand Down Expand Up @@ -340,7 +346,6 @@ if [[ "$RUNNER" == "dataflow" ]]; then
gcloud --version

# ensure gcloud is version 186 or above
TMPDIR=$(mktemp -d)
gcloud_ver=$(gcloud -v | head -1 | awk '{print $4}')
if [[ "$gcloud_ver" < "186" ]]
then
Expand Down Expand Up @@ -402,6 +407,7 @@ fi
ARGS="$ARGS -p $SIMULTANEOUS"

# Assemble test arguments and pipeline options.
ARGS="$ARGS -v"
ARGS="$ARGS -timeout $TIMEOUT"
ARGS="$ARGS --runner=$RUNNER"
ARGS="$ARGS --project=$DATAFLOW_PROJECT"
Expand Down Expand Up @@ -449,9 +455,9 @@ if [[ "$RUNNER" == "dataflow" ]]; then
docker rmi $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to remove container"
gcloud --quiet container images delete $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to delete container"
fi

# Clean up tempdir
rm -rf $TMPDIR
fi

# Clean up tempdir
rm -rf $TMPDIR

exit $TEST_EXIT_CODE

0 comments on commit 0d37c6f

Please # to comment.