diff --git a/cmd/cli/operations/update_job.go b/cmd/cli/operations/update_job.go index 3789f77..d2634cf 100644 --- a/cmd/cli/operations/update_job.go +++ b/cmd/cli/operations/update_job.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "log" + "strings" "time" "github.com/cenkalti/backoff" @@ -24,9 +25,9 @@ type UpdateJob struct { AllowNonRestoredState bool } -func (o RealOperator) filterRunningJobs(jobs []flink.Job) (ret []flink.Job) { +func (o RealOperator) filterRunningJobsByName(jobs []flink.Job, jobNameBase string) (ret []flink.Job) { for _, job := range jobs { - if job.Status == "RUNNING" { + if job.Status == "RUNNING" && strings.HasPrefix(job.Name, jobNameBase) { ret = append(ret, job) } } @@ -89,7 +90,7 @@ func (o RealOperator) Update(u UpdateJob) error { return fmt.Errorf("retrieving jobs failed: %v", err) } - runningJobs := o.filterRunningJobs(jobs) + runningJobs := o.filterRunningJobsByName(jobs, u.JobNameBase) deploy := Deploy{ LocalFilename: u.LocalFilename, diff --git a/cmd/cli/operations/update_job_test.go b/cmd/cli/operations/update_job_test.go index 39c600c..99fb754 100644 --- a/cmd/cli/operations/update_job_test.go +++ b/cmd/cli/operations/update_job_test.go @@ -11,12 +11,12 @@ import ( ) /* - * filterRunningJobs + * filterRunningJobsByName */ func TestFilterRunningJobsShouldReturnAnEmptySliceIfNoJobsWereProvided(t *testing.T) { operator := RealOperator{} - res := operator.filterRunningJobs([]flink.Job{}) + res := operator.filterRunningJobsByName([]flink.Job{}, "test") assert.Len(t, res, 0) } @@ -24,26 +24,57 @@ func TestFilterRunningJobsShouldReturnAnEmptySliceIfNoJobsWereProvided(t *testin func TestFilterRunningJobsShouldReturnAnEmptySliceIfNoJobsAreRunning(t *testing.T) { operator := RealOperator{} - res := operator.filterRunningJobs([]flink.Job{ - flink.Job{ - Status: "STOPPED", + res := operator.filterRunningJobsByName( + []flink.Job{ + flink.Job{ + Status: "STOPPED", + }, }, - }) - + "test", + ) assert.Len(t, res, 0) } func TestFilterRunningJobsShouldReturnTheRunningJobs(t *testing.T) { operator := RealOperator{} - res := operator.filterRunningJobs([]flink.Job{ - flink.Job{ - Status: "STOPPED", + res := operator.filterRunningJobsByName( + []flink.Job{ + flink.Job{ + Status: "STOPPED", + Name: "test", + }, + flink.Job{ + Status: "RUNNING", + Name: "test", + }, }, - flink.Job{ - Status: "RUNNING", + "test", + ) + + assert.Len(t, res, 1) +} + +func TestFilterRunningJobsShouldReturnTheRunningJobsMatchingTheJobBaseName(t *testing.T) { + operator := RealOperator{} + + res := operator.filterRunningJobsByName( + []flink.Job{ + flink.Job{ + Status: "STOPPED", + Name: "jobA v1.0", + }, + flink.Job{ + Status: "RUNNING", + Name: "JobB", + }, + flink.Job{ + Status: "RUNNING", + Name: "jobA v1.1", + }, }, - }) + "jobA", + ) assert.Len(t, res, 1) } @@ -158,7 +189,9 @@ func TestUpdateJobShouldReturnAnErrorWhenTheSavepointCannotBeCreated(t *testing. } err := operator.Update(UpdateJob{ - JobNameBase: "WordCountStateful", + // Use the same job name as the mock job above + // operator.Update will filter running jobs by name to cancel. + JobNameBase: "WordCountStateful v1.0", LocalFilename: "testdata/sample.jar", SavepointDir: "/data/flink", }) @@ -194,7 +227,7 @@ func TestUpdateJobShouldReturnAnErrorWhenTheJobCannotBeCanceled(t *testing.T) { } err := operator.Update(UpdateJob{ - JobNameBase: "WordCountStateful", + JobNameBase: "WordCountStateful v1.0", LocalFilename: "../testdata/sample.jar", SavepointDir: "/data/flink", }) @@ -239,7 +272,7 @@ func TestUpdateJobShouldReturnAnErrorWhenTheLatestSavepointCannotBeRetrieved(t * } err := operator.Update(UpdateJob{ - JobNameBase: "WordCountStateful", + JobNameBase: "WordCountStateful v1.0", LocalFilename: "../testdata/sample.jar", SavepointDir: "/data/flink", }) @@ -285,7 +318,7 @@ func TestUpdateJobShouldReturnNilWhenTheUpdateSucceeds(t *testing.T) { } err := operator.Update(UpdateJob{ - JobNameBase: "WordCountStateful", + JobNameBase: "WordCountStateful v1.0", LocalFilename: "../testdata/sample.jar", SavepointDir: "/data/flink", }) @@ -323,7 +356,7 @@ func TestUpdateJobShouldReturnAnErrorWhenMultipleRunningJobsAreFound(t *testing. }, flink.Job{ ID: "Job-B", - Name: "WordCountStateful v1.1", + Name: "WordCountStateful v1.0", Status: "RUNNING", }, } @@ -335,6 +368,8 @@ func TestUpdateJobShouldReturnAnErrorWhenMultipleRunningJobsAreFound(t *testing. }, } + // flink-deployer wont want to update (stop/start) an undesired job + // when there are two running jobs with same name. So it must abort the update err := operator.Update(UpdateJob{ JobNameBase: "WordCountStateful", LocalFilename: "testdata/sample.jar",