Skip to content

Commit

Permalink
Added additional logging for savepoint creation monitoring (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc Rooding authored Mar 18, 2019
1 parent b540b0a commit a6b4354
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
language: go

go:
- 1.9.3
- 1.11.5

env:
global:
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM golang:1.10.4-alpine3.8 as build
FROM golang:1.11.5-alpine3.9 as build
WORKDIR /go/src/github.com/ing-bank/flink-deployer/
COPY . .
RUN go build ./cmd/cli

FROM alpine:3.8
FROM alpine:3.9
WORKDIR /flink-deployer
COPY --from=build /go/src/github.com/ing-bank/flink-deployer/cli .
VOLUME [ "/data/flink" ]
Expand Down
7 changes: 6 additions & 1 deletion cmd/cli/flink/savepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ type MonitorSavepointCreationResponse struct {
// MonitorSavepointCreation allows for monitoring the status of a savepoint creation
// identified by the job ID and request ID
func (c FlinkRestClient) MonitorSavepointCreation(jobID string, requestID string) (MonitorSavepointCreationResponse, error) {
res, err := c.Client.Get(c.constructURL(fmt.Sprintf("jobs/%v/savepoints/%v", jobID, requestID)))
req, err := c.newRequest("GET", c.constructURL(fmt.Sprintf("jobs/%v/savepoints/%v", jobID, requestID)), nil)
if err != nil {
return MonitorSavepointCreationResponse{}, err
}

res, err := c.Client.Do(req)
if err != nil {
return MonitorSavepointCreationResponse{}, err
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/cli/operations/update_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,21 @@ func (o RealOperator) monitorSavepointCreation(jobID string, requestID string, m
log.Println("checking status of savepoint creation")
res, err := o.FlinkRestAPI.MonitorSavepointCreation(jobID, requestID)
if err != nil {
log.Println(err)
return err
}

switch res.Status.Id {
case "COMPLETED":
return nil
case "IN_PROGRESS":
return fmt.Errorf("savepoint creation for job \"%v\" is still pending", jobID)
err = fmt.Errorf("savepoint creation for job \"%v\" is still pending", jobID)
log.Println(err)
return err
default:
return fmt.Errorf("savepoint creation for job \"%v\" returned an unknown status \"%v\"", jobID, res.Status)
err = fmt.Errorf("savepoint creation for job \"%v\" returned an unknown status \"%v\"", jobID, res.Status)
log.Println(err)
return err
}
}
b := &backoff.ExponentialBackOff{
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.4'
services:

jobmanager:
image: flink:1.6-alpine
image: flink:1.7.2-scala_2.11
ports:
- "8081:8081"
- "6123:6123"
Expand All @@ -13,7 +13,7 @@ services:
- /tmp/flink-deployer:/data/flink

taskmanager:
image: flink:1.6-alpine
image: flink:1.7.2-scala_2.11
depends_on:
- jobmanager
command: taskmanager
Expand Down

0 comments on commit a6b4354

Please # to comment.