Skip to content

Commit

Permalink
Merge pull request #6 from ing-bank/savepoint-on-update
Browse files Browse the repository at this point in the history
Use savepoint variable on update
  • Loading branch information
arnestaphorsius authored Mar 13, 2018
2 parents 5a091fa + d65b14d commit 211236b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 22 deletions.
9 changes: 6 additions & 3 deletions cmd/cli/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ func ListJobs() ([]byte, error) {
return commander.CombinedOutput("flink", "list")
}

func Savepoint(jobId string) ([]byte, error) {
func Savepoint(jobId string, savepointTargetDir string) ([]byte, error) {
if len(jobId) == 0 {
return nil, errors.New("unspecified argument 'jobId'")
}
log.Printf("Creating savepoint for job %v", jobId)
return commander.CombinedOutput("flink", "savepoint", jobId)
if len(savepointTargetDir) == 0 {
return nil, errors.New("unspecified argument 'savepointTargetDir'")
}
log.Printf("Creating savepoint for job %v at targetDir %v", jobId, savepointTargetDir)
return commander.CombinedOutput("flink", "savepoint", jobId, savepointTargetDir)
}
12 changes: 10 additions & 2 deletions cmd/cli/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,23 @@ func TestListJobsShouldReturnAnOverviewOfJobs(t *testing.T) {
func TestSavepointShouldReturnAnErrorForEmptyJobIds(t *testing.T) {
mockedStdout = ""
commander = TestCommander{}
_, err := Savepoint("")
_, err := Savepoint("", "/dir/")

assert.EqualError(t, err, "unspecified argument 'jobId'")
}

func TestSavepointShouldReturnAnErrorForEmptySavepointTargetDir(t *testing.T) {
mockedStdout = ""
commander = TestCommander{}
_, err := Savepoint("jobId", "")

assert.EqualError(t, err, "unspecified argument 'savepointTargetDir'")
}

func TestSavepointShouldCreateASavepoint(t *testing.T) {
mockedStdout = "Job saved"
commander = TestCommander{}
out, _ := Savepoint("jobid1")
out, _ := Savepoint("jobid1", "/dir/")

assert.Equal(t, mockedStdout, string(out))
}
1 change: 1 addition & 0 deletions cmd/cli/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func TestUpdateActionShouldThrowAnErrorWhenTheCommandFails(t *testing.T) {
set := flag.FlagSet{}
set.String("job-name-base", "Job A", "")
set.String("file-name", "file.jar", "")
set.String("savepoint-dir", "/savepoints", "")
context := cli.NewContext(&app, &set, nil)
err := UpdateAction(context)

Expand Down
17 changes: 12 additions & 5 deletions cmd/cli/updatejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func RetrieveLatestSavepoint(dir string) (string, error) {
return "", err
}

if len(files) == 0 {
return "", errors.New("No savepoints present in directory: " + dir)
}

var newestFile string
var newestTime int64 = 0
for _, f := range files {
Expand All @@ -39,7 +43,7 @@ func RetrieveLatestSavepoint(dir string) (string, error) {
}

func ExtractSavepointPath(output string) (string, error) {
rgx := regexp.MustCompile("Savepoint completed. Path: file:(.*)\n")
rgx := regexp.MustCompile("Savepoint completed. Path: (.*)\n")
matches := rgx.FindAllStringSubmatch(output, -1)

switch len(matches) {
Expand All @@ -52,8 +56,8 @@ func ExtractSavepointPath(output string) (string, error) {
}
}

func CreateSavepoint(jobId string) (string, error) {
out, err := Savepoint(jobId)
func CreateSavepoint(jobId string, savepointTargetDir string) (string, error) {
out, err := Savepoint(jobId, savepointTargetDir)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -85,8 +89,11 @@ func (u UpdateJob) execute() ([]byte, error) {
if len(u.jobNameBase) == 0 {
return nil, errors.New("unspecified argument 'jobNameBase'")
}
if len(u.savepointDirectory) == 0 {
return nil, errors.New("unspecified argument 'savepointDirectory'")
}

log.Printf("starting job update for base name: %v\n", u.jobNameBase)
log.Printf("starting job update for base name: %v, and savepoint dir: %v\n", u.jobNameBase, u.savepointDirectory)

jobIds, err := RetrieveRunningJobIds(u.jobNameBase)
if err != nil {
Expand Down Expand Up @@ -123,7 +130,7 @@ func (u UpdateJob) execute() ([]byte, error) {
log.Printf("Found exactly 1 job with base name: %v\n", u.jobNameBase)
jobId := jobIds[0]

savepoint, err := CreateSavepoint(jobId)
savepoint, err := CreateSavepoint(jobId, u.savepointDirectory)
if err != nil {
return nil, err
}
Expand Down
36 changes: 24 additions & 12 deletions cmd/cli/updatejob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ func TestRetrieveLatestSavepointShouldRemoveTheTrailingSlashFromTheSavepointDire
assert.Nil(t, err)
}

func TestRetrieveLatestSavepointShouldReturnAnErrorWhenDirEmpty(t *testing.T) {
filesystem = afero.NewMemMapFs()
filesystem.Mkdir("/savepoints/", 0755)

files, err := RetrieveLatestSavepoint("/savepoints")

assert.Equal(t, "", files)
assert.EqualError(t, err, "No savepoints present in directory: /savepoints")
}

/*
* ExtractSavepointPath
*/
Expand All @@ -52,7 +62,7 @@ func TestExtractSavepointPathShouldExtractPath(t *testing.T) {
Using address flink-jobmanager/172.26.0.3:6123 to connect to JobManager.
Triggering savepoint for job 683b3f14d75c470de0aaf2b1e83a3158.
Waiting for response...
Savepoint completed. Path: file:/data/flink/savepoints/savepoint-683b3f-59401d30cfc4
Savepoint completed. Path: /data/flink/savepoints/savepoint-683b3f-59401d30cfc4
You can resume your program from this savepoint with the run command.
`)

Expand All @@ -77,8 +87,8 @@ func TestExtractSavepointPathShouldReturnAnErrorIfMultiplePathsAreExtracted(t *t
Using address flink-jobmanager/172.26.0.3:6123 to connect to JobManager.
Triggering savepoint for job 683b3f14d75c470de0aaf2b1e83a3158.
Waiting for response...
Savepoint completed. Path: file:/data/flink/savepoints/savepoint-683b3f-59401d30cfc4
Savepoint completed. Path: file:/data/flink/savepoints/savepoint-883b3f-59401d30cfc1
Savepoint completed. Path: /data/flink/savepoints/savepoint-683b3f-59401d30cfc4
Savepoint completed. Path: /data/flink/savepoints/savepoint-883b3f-59401d30cfc1
You can resume your program from this savepoint with the run command.
`)

Expand All @@ -94,7 +104,7 @@ func TestCreateSavepointShouldReturnAnErrorWhenCreatingTheSavepointFails(t *test
mockedExitStatus = -1
commander = TestCommander{}

out, err := CreateSavepoint("182b71aebf67191683b6917ce95a1f34")
out, err := CreateSavepoint("182b71aebf67191683b6917ce95a1f34", "/dir/")

assert.Equal(t, "", out)
assert.EqualError(t, err, "exit status 255")
Expand All @@ -106,14 +116,14 @@ func TestCreateSavepointShouldReturnAnErrorWhenExtractingTheSavepointPathFails(t
Using address flink-jobmanager/172.26.0.3:6123 to connect to JobManager.
Triggering savepoint for job 683b3f14d75c470de0aaf2b1e83a3158.
Waiting for response...
Savepoint completed. Path: file:/data/flink/savepoints/savepoint-683b3f-59401d30cfc4
Savepoint completed. Path: file:/data/flink/savepoints/savepoint-883b3f-59401d30cfc1
Savepoint completed. Path: /data/flink/savepoints/savepoint-683b3f-59401d30cfc4
Savepoint completed. Path: /data/flink/savepoints/savepoint-883b3f-59401d30cfc1
You can resume your program from this savepoint with the run command.
`
mockedExitStatus = 0
commander = TestCommander{}

out, err := CreateSavepoint("182b71aebf67191683b6917ce95a1f34")
out, err := CreateSavepoint("182b71aebf67191683b6917ce95a1f34", "/data/flink/savepoints")

assert.Equal(t, "", out)
assert.EqualError(t, err, "multiple matches for savepoint found")
Expand All @@ -125,14 +135,14 @@ func TestCreateSavepointShouldReturnTheSavepointPathIfAllGoesWell(t *testing.T)
Using address flink-jobmanager/172.26.0.3:6123 to connect to JobManager.
Triggering savepoint for job 683b3f14d75c470de0aaf2b1e83a3158.
Waiting for response...
Savepoint completed. Path: file:/data/flink/savepoints/savepoint-683b3f-59401d30cfc4
Savepoint completed. Path: /data/flink/savepoints/savepoint-683b3f-59401d30cfc4
You can resume your program from this savepoint with the run command.
`
mockedExitStatus = 0
commander = TestCommander{}
filesystem = afero.NewMemMapFs()

out, err := CreateSavepoint("182b71aebf67191683b6917ce95a1f34")
out, err := CreateSavepoint("182b71aebf67191683b6917ce95a1f34", "/data/flink/savepoints")

assert.Equal(t, "/data/flink/savepoints/savepoint-683b3f-59401d30cfc4", out)
assert.Nil(t, err)
Expand Down Expand Up @@ -167,7 +177,7 @@ func TestUpdateJobShouldReturnAnErrorWhenTheSavepointDirectoryIsUndefined(t *tes
out, err := update.execute()

assert.Nil(t, out)
assert.EqualError(t, err, "cannot retrieve the latest savepoint without specifying the savepoint directory")
assert.EqualError(t, err, "unspecified argument 'savepointDirectory'")
}

func TestUpdateJobShouldExecuteCorrectlyWhenEverythingGoesFine(t *testing.T) {
Expand All @@ -183,7 +193,7 @@ func TestUpdateJobShouldExecuteCorrectlyWhenEverythingGoesFine(t *testing.T) {
Using address flink-jobmanager/172.26.0.3:6123 to connect to JobManager.
Triggering savepoint for job 683b3f14d75c470de0aaf2b1e83a3158.
Waiting for response...
Savepoint completed. Path: file:/data/flink/savepoints/savepoint-683b3f-59401d30cfc4
Savepoint completed. Path: /data/flink/savepoints/savepoint-683b3f-59401d30cfc4
You can resume your program from this savepoint with the run command.
`
mockedExitStatus = 0
Expand All @@ -194,6 +204,7 @@ func TestUpdateJobShouldExecuteCorrectlyWhenEverythingGoesFine(t *testing.T) {
runArgs: "-p 1 -d",
localFilename: "file.jar",
jarArgs: "--kafka.bootstrapServers kafka:9092",
savepointDirectory: "/data/flink/savepoints",
allowNonRestorableState: false,
}

Expand All @@ -217,7 +228,7 @@ func TestUpdateJobShouldReturnAnErrorWhenMultipleRunningJobsAreFound(t *testing.
Using address flink-jobmanager/172.26.0.3:6123 to connect to JobManager.
Triggering savepoint for job 683b3f14d75c470de0aaf2b1e83a3158.
Waiting for response...
Savepoint completed. Path: file:/data/flink/savepoints/savepoint-683b3f-59401d30cfc4
Savepoint completed. Path: /data/flink/savepoints/savepoint-683b3f-59401d30cfc4
You can resume your program from this savepoint with the run command.
`
mockedExitStatus = 0
Expand All @@ -228,6 +239,7 @@ func TestUpdateJobShouldReturnAnErrorWhenMultipleRunningJobsAreFound(t *testing.
runArgs: "-p 1 -d",
localFilename: "file.jar",
jarArgs: "--kafka.bootstrapServers kafka:9092",
savepointDirectory: "/data/flink/savepoints",
allowNonRestorableState: false,
}

Expand Down

0 comments on commit 211236b

Please # to comment.