Skip to content

Commit

Permalink
Add support for handling multiple jobs in a flink cluster (#31)
Browse files Browse the repository at this point in the history
* Add support for handling multiple jobs in a flink cluster

* Fixed review comments and unit tests to support multiple jobs in the
same cluster

* Addressed review comments

* Fixed unit tests
  • Loading branch information
joshuavijay authored and Marc Rooding committed Mar 22, 2019
1 parent a6b4354 commit d5a9c82
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 21 deletions.
7 changes: 4 additions & 3 deletions cmd/cli/operations/update_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"log"
"strings"
"time"

"github.com/cenkalti/backoff"
Expand All @@ -24,9 +25,9 @@ type UpdateJob struct {
AllowNonRestoredState bool
}

func (o RealOperator) filterRunningJobs(jobs []flink.Job) (ret []flink.Job) {
func (o RealOperator) filterRunningJobsByName(jobs []flink.Job, jobNameBase string) (ret []flink.Job) {
for _, job := range jobs {
if job.Status == "RUNNING" {
if job.Status == "RUNNING" && strings.HasPrefix(job.Name, jobNameBase) {
ret = append(ret, job)
}
}
Expand Down Expand Up @@ -89,7 +90,7 @@ func (o RealOperator) Update(u UpdateJob) error {
return fmt.Errorf("retrieving jobs failed: %v", err)
}

runningJobs := o.filterRunningJobs(jobs)
runningJobs := o.filterRunningJobsByName(jobs, u.JobNameBase)

deploy := Deploy{
LocalFilename: u.LocalFilename,
Expand Down
71 changes: 53 additions & 18 deletions cmd/cli/operations/update_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,70 @@ import (
)

/*
* filterRunningJobs
* filterRunningJobsByName
*/
func TestFilterRunningJobsShouldReturnAnEmptySliceIfNoJobsWereProvided(t *testing.T) {
operator := RealOperator{}

res := operator.filterRunningJobs([]flink.Job{})
res := operator.filterRunningJobsByName([]flink.Job{}, "test")

assert.Len(t, res, 0)
}

func TestFilterRunningJobsShouldReturnAnEmptySliceIfNoJobsAreRunning(t *testing.T) {
operator := RealOperator{}

res := operator.filterRunningJobs([]flink.Job{
flink.Job{
Status: "STOPPED",
res := operator.filterRunningJobsByName(
[]flink.Job{
flink.Job{
Status: "STOPPED",
},
},
})

"test",
)
assert.Len(t, res, 0)
}

func TestFilterRunningJobsShouldReturnTheRunningJobs(t *testing.T) {
operator := RealOperator{}

res := operator.filterRunningJobs([]flink.Job{
flink.Job{
Status: "STOPPED",
res := operator.filterRunningJobsByName(
[]flink.Job{
flink.Job{
Status: "STOPPED",
Name: "test",
},
flink.Job{
Status: "RUNNING",
Name: "test",
},
},
flink.Job{
Status: "RUNNING",
"test",
)

assert.Len(t, res, 1)
}

func TestFilterRunningJobsShouldReturnTheRunningJobsMatchingTheJobBaseName(t *testing.T) {
operator := RealOperator{}

res := operator.filterRunningJobsByName(
[]flink.Job{
flink.Job{
Status: "STOPPED",
Name: "jobA v1.0",
},
flink.Job{
Status: "RUNNING",
Name: "JobB",
},
flink.Job{
Status: "RUNNING",
Name: "jobA v1.1",
},
},
})
"jobA",
)

assert.Len(t, res, 1)
}
Expand Down Expand Up @@ -158,7 +189,9 @@ func TestUpdateJobShouldReturnAnErrorWhenTheSavepointCannotBeCreated(t *testing.
}

err := operator.Update(UpdateJob{
JobNameBase: "WordCountStateful",
// Use the same job name as the mock job above
// operator.Update will filter running jobs by name to cancel.
JobNameBase: "WordCountStateful v1.0",
LocalFilename: "testdata/sample.jar",
SavepointDir: "/data/flink",
})
Expand Down Expand Up @@ -194,7 +227,7 @@ func TestUpdateJobShouldReturnAnErrorWhenTheJobCannotBeCanceled(t *testing.T) {
}

err := operator.Update(UpdateJob{
JobNameBase: "WordCountStateful",
JobNameBase: "WordCountStateful v1.0",
LocalFilename: "../testdata/sample.jar",
SavepointDir: "/data/flink",
})
Expand Down Expand Up @@ -239,7 +272,7 @@ func TestUpdateJobShouldReturnAnErrorWhenTheLatestSavepointCannotBeRetrieved(t *
}

err := operator.Update(UpdateJob{
JobNameBase: "WordCountStateful",
JobNameBase: "WordCountStateful v1.0",
LocalFilename: "../testdata/sample.jar",
SavepointDir: "/data/flink",
})
Expand Down Expand Up @@ -285,7 +318,7 @@ func TestUpdateJobShouldReturnNilWhenTheUpdateSucceeds(t *testing.T) {
}

err := operator.Update(UpdateJob{
JobNameBase: "WordCountStateful",
JobNameBase: "WordCountStateful v1.0",
LocalFilename: "../testdata/sample.jar",
SavepointDir: "/data/flink",
})
Expand Down Expand Up @@ -323,7 +356,7 @@ func TestUpdateJobShouldReturnAnErrorWhenMultipleRunningJobsAreFound(t *testing.
},
flink.Job{
ID: "Job-B",
Name: "WordCountStateful v1.1",
Name: "WordCountStateful v1.0",
Status: "RUNNING",
},
}
Expand All @@ -335,6 +368,8 @@ func TestUpdateJobShouldReturnAnErrorWhenMultipleRunningJobsAreFound(t *testing.
},
}

// flink-deployer wont want to update (stop/start) an undesired job
// when there are two running jobs with same name. So it must abort the update
err := operator.Update(UpdateJob{
JobNameBase: "WordCountStateful",
LocalFilename: "testdata/sample.jar",
Expand Down

0 comments on commit d5a9c82

Please # to comment.