Skip to content

Commit

Permalink
Start a new test
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Oct 13, 2024
1 parent 3e5f133 commit a1b6431
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 2 deletions.
44 changes: 44 additions & 0 deletions examples/fail-submission.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: fail-submission
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.2
imagePullPolicy: IfNotPresent
mainClass: non-existent
mainApplicationFile: local:///non-existent.jar
sparkVersion: 3.5.2
restartPolicy:
type: OnFailure
onSubmissionFailureRetries: 3
onSubmissionFailureRetryInterval: 1
driver:
serviceAccount: non-existent
labels:
version: 3.5.2
cores: 1
memory: 512m
executor:
labels:
version: 3.5.2
instances: 1
cores: 1
memory: 512m
77 changes: 77 additions & 0 deletions test/e2e/sparkapplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ package e2e_test

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"

// "time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/yaml"

Expand Down Expand Up @@ -236,6 +241,78 @@ var _ = Describe("Example SparkApplication", func() {
})
})

Context("fail-submission", func() {
ctx := context.Background()
path := filepath.Join("..", "..", "examples", "fail-submission.yaml")
app := &v1beta2.SparkApplication{}

BeforeEach(func() {
By("Parsing SparkApplication from file")
file, err := os.Open(path)
Expect(err).NotTo(HaveOccurred())
Expect(file).NotTo(BeNil())

decoder := yaml.NewYAMLOrJSONDecoder(file, 100)
Expect(decoder).NotTo(BeNil())
Expect(decoder.Decode(app)).NotTo(HaveOccurred())

By("Creating SparkApplication")
Expect(k8sClient.Create(ctx, app)).To(Succeed())
})

AfterEach(func() {
key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name}
Expect(k8sClient.Get(ctx, key, app)).To(Succeed())

By("Deleting SparkApplication")
Expect(k8sClient.Delete(ctx, app)).To(Succeed())
})

It("Fails submission and retries until retries are exhausted", func() {
By("Waiting for SparkApplication to complete")
key := types.NamespacedName{Namespace: app.Namespace, Name: app.Name}
Expect(waitForSparkApplicationCompleted(ctx, key)).To(HaveOccurred())

app := &v1beta2.SparkApplication{}
fetch_app_err := k8sClient.Get(ctx, key, app)
Expect(fetch_app_err).NotTo(HaveOccurred())
Expect(app.Status.AppState.State).To(Equal(v1beta2.ApplicationStateFailed))
Expect(app.Status.AppState.ErrorMessage).To(ContainSubstring("failed to run spark-submit"))
Expect(app.Status.SubmissionAttempts).To(Equal(*app.Spec.RestartPolicy.OnSubmissionFailureRetries + 1))

By("Checking SparkApplication events")
eventList := &corev1.EventList{}
err := k8sClient.List(ctx, eventList, &client.ListOptions{
Namespace: app.Namespace,
FieldSelector: fields.AndSelectors(
fields.OneTermEqualSelector("involvedObject.kind", "SparkApplication"),
fields.OneTermEqualSelector("involvedObject.name", app.Name),
// fields.OneTermEqualSelector("involvedObject.uid", string(app.ObjectMeta.UID)),
),
})
Expect(err).NotTo(HaveOccurred())

var sparkAppEvents []corev1.Event
for _, event := range eventList.Items {
if event.InvolvedObject.Kind == "SparkApplication" && event.InvolvedObject.Name == app.Name {
sparkAppEvents = append(sparkAppEvents, event)
}
}
// "spark-application-controller"
By("Printing SparkApplication events")
for _, event := range sparkAppEvents {
fmt.Printf("Event: %v, Reason: %v, Message: %v\n", event.LastTimestamp, event.Reason, event.Message)
}

By("Checking driver does not exist")
driverPodName := util.GetDriverPodName(app)
_, get_driver_err := clientset.CoreV1().Pods(app.Namespace).Get(ctx, driverPodName, metav1.GetOptions{})
Expect(get_driver_err).To(HaveOccurred())
// TODO(tomnewton): Switch to proper not found error code
Expect(strings.Contains(get_driver_err.Error(), "not found")).To(BeTrue())
})
})

Context("spark-pi-python", func() {
ctx := context.Background()
path := filepath.Join("..", "..", "examples", "spark-pi-python.yaml")
Expand Down
5 changes: 3 additions & 2 deletions test/e2e/suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,9 @@ func waitForSparkApplicationCompleted(ctx context.Context, key types.NamespacedN
return false, err
}
switch app.Status.AppState.State {
case v1beta2.ApplicationStateFailedSubmission, v1beta2.ApplicationStateFailed:
return false, errors.New(app.Status.AppState.ErrorMessage)
case v1beta2.ApplicationStateFailed:
// TODO: Try combining this case with the one below.
return true, errors.New(app.Status.AppState.ErrorMessage)
case v1beta2.ApplicationStateCompleted:
return true, nil
}
Expand Down

0 comments on commit a1b6431

Please # to comment.