Skip to content

Commit

Permalink
Add wait flag for jobs, fix go proto path for dataset service (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilingc authored and feast-ci-bot committed Feb 20, 2019
1 parent 5dbb66e commit 316eac4
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions cli/feast/cmd/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ import (
"errors"
"fmt"
"io/ioutil"
"time"

"github.com/gojek/feast/cli/feast/pkg/parse"
"github.com/gojek/feast/protos/generated/go/feast/core"

"github.com/spf13/cobra"
)

var (
waitJobComplete = false
)

// jobsCmd represents the jobs command
var jobsCmd = &cobra.Command{
Use: "jobs",
Expand Down Expand Up @@ -63,6 +68,7 @@ var jobsAbortCmd = &cobra.Command{
}

func init() {
jobsRunCmd.Flags().BoolVar(&waitJobComplete, "wait", false, "wait for job to run to completion")
jobsCmd.AddCommand(jobsRunCmd)
jobsCmd.AddCommand(jobsAbortCmd)
rootCmd.AddCommand(jobsCmd)
Expand All @@ -86,9 +92,35 @@ func runJob(ctx context.Context, path string) error {
return fmt.Errorf("[jobs] failed to start job: %v", err)
}
fmt.Printf("[jobs] started job with ID: %s", out.GetJobId())
if waitJobComplete {
return waitJob(ctx, jobsClient, out.GetJobId())
}
return nil
}

func waitJob(ctx context.Context, jobsClient core.JobServiceClient, jobID string) error {
for {
response, err := jobsClient.GetJob(ctx, &core.JobServiceTypes_GetJobRequest{
Id: jobID,
})
if err != nil {
return fmt.Errorf("[jobs] error while querying job id %s: %v", jobID, err)
}

status := response.GetJob().GetStatus()
fmt.Printf("\r[jobs] job id %s is currently: %s\n", jobID, status)
switch status {
case "COMPLETED":
return nil
case "ABORTED":
return fmt.Errorf("[jobs] job id %s failed: Job was aborted", jobID)
case "ERROR":
return fmt.Errorf("[jobs] job id %s failed: Job terminated with error. For more information, refer to job logs", jobID)
}
time.Sleep(5 * time.Second)
}
}

func abortJob(ctx context.Context, id string) error {
initConn()
jobsClient := core.NewJobServiceClient(coreConn)
Expand Down

0 comments on commit 316eac4

Please # to comment.