Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Rename grpc pipeline to workflow #2173

Merged
merged 3 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
)

func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, work *rpc.Pipeline) pipeline.LogFunc {
func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, workflow *rpc.Workflow) pipeline.LogFunc {
return func(step *backend.Step, rc multipart.Reader) error {
loglogger := logger.With().
Str("image", step.Image).
Expand All @@ -41,7 +41,7 @@ func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, wo
uploads.Add(1)

var secrets []string
for _, secret := range work.Config.Secrets {
for _, secret := range workflow.Config.Secrets {
if secret.Mask {
secrets = append(secrets, secret.Value)
}
Expand Down
30 changes: 15 additions & 15 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (c *client) Version(ctx context.Context) (*rpc.Version, error) {
}, nil
}

// Next returns the next pipeline in the queue.
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error) {
// Next returns the next workflow in the queue.
func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) {
var res *proto.NextResponse
var err error
req := new(proto.NextRequest)
Expand Down Expand Up @@ -106,17 +106,17 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Pipeline, error)
return nil, nil
}

p := new(rpc.Pipeline)
p.ID = res.GetPipeline().GetId()
p.Timeout = res.GetPipeline().GetTimeout()
p.Config = new(backend.Config)
if err := json.Unmarshal(res.GetPipeline().GetPayload(), p.Config); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pipeline config of '%s'", p.ID)
w := new(rpc.Workflow)
w.ID = res.GetPipeline().GetId()
w.Timeout = res.GetPipeline().GetTimeout()
w.Config = new(backend.Config)
if err := json.Unmarshal(res.GetPipeline().GetPayload(), w.Config); err != nil {
log.Error().Err(err).Msgf("could not unmarshal workflow config of '%s'", w.ID)
}
return p, nil
return w, nil
}

// Wait blocks until the pipeline is complete.
// Wait blocks until the workflow is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) {
req := new(proto.WaitRequest)
req.Id = id
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *client) Wait(ctx context.Context, id string) (err error) {
return nil
}

// Init signals the pipeline is initialized.
// Init signals the workflow is initialized.
func (c *client) Init(ctx context.Context, id string, state rpc.State) (err error) {
req := new(proto.InitRequest)
req.Id = id
Expand Down Expand Up @@ -179,7 +179,7 @@ func (c *client) Init(ctx context.Context, id string, state rpc.State) (err erro
return nil
}

// Done signals the pipeline is complete.
// Done signals the work is complete.
func (c *client) Done(ctx context.Context, id string, state rpc.State) (err error) {
req := new(proto.DoneRequest)
req.Id = id
Expand Down Expand Up @@ -214,7 +214,7 @@ func (c *client) Done(ctx context.Context, id string, state rpc.State) (err erro
return nil
}

// Extend extends the pipeline deadline
// Extend extends the workflow deadline
func (c *client) Extend(ctx context.Context, id string) (err error) {
req := new(proto.ExtendRequest)
req.Id = id
Expand Down Expand Up @@ -242,7 +242,7 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
return nil
}

// Update updates the pipeline state.
// Update updates the workflow state.
func (c *client) Update(ctx context.Context, id string, state rpc.State) (err error) {
req := new(proto.UpdateRequest)
req.Id = id
Expand Down Expand Up @@ -277,7 +277,7 @@ func (c *client) Update(ctx context.Context, id string, state rpc.State) (err er
return nil
}

// Log writes the pipeline log entry.
// Log writes the workflow log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
req := new(proto.LogRequest)
req.LogEntry = new(proto.LogEntry)
Expand Down
4 changes: 2 additions & 2 deletions agent/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
)

func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, work *rpc.Pipeline) pipeline.TraceFunc {
func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
return func(state *pipeline.State) error {
steplogger := logger.With().
Str("image", state.Pipeline.Step.Image).
Expand All @@ -50,7 +50,7 @@ func (r *Runner) createTracer(ctxmeta context.Context, logger zerolog.Logger, wo
defer func() {
steplogger.Debug().Msg("update step status")

if uerr := r.client.Update(ctxmeta, work.ID, stepState); uerr != nil {
if uerr := r.client.Update(ctxmeta, workflow.ID, stepState); uerr != nil {
steplogger.Debug().
Err(uerr).
Msg("update step status error")
Expand Down
22 changes: 11 additions & 11 deletions pipeline/rpc/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type (
Labels map[string]string `json:"labels"`
}

// State defines the pipeline state.
// State defines the workflow state.
State struct {
Step string `json:"step"`
Exited bool `json:"exited"`
Expand All @@ -37,8 +37,8 @@ type (
Error string `json:"error"`
}

// Pipeline defines the pipeline execution details.
Pipeline struct {
// Workflow defines the workflow execution details.
Workflow struct {
ID string `json:"id"`
Config *backend.Config `json:"config"`
Timeout int64 `json:"timeout"`
Expand All @@ -55,25 +55,25 @@ type Peer interface {
// Version returns the server- & grpc-version
Version(c context.Context) (*Version, error)

// Next returns the next pipeline in the queue.
Next(c context.Context, f Filter) (*Pipeline, error)
// Next returns the next workflow in the queue
Next(c context.Context, f Filter) (*Workflow, error)

// Wait blocks until the pipeline is complete.
// Wait blocks until the workflow is complete
Wait(c context.Context, id string) error

// Init signals the pipeline is initialized.
// Init signals the workflow is initialized
Init(c context.Context, id string, state State) error

// Done signals the pipeline is complete.
// Done signals the workflow is complete
Done(c context.Context, id string, state State) error

// Extend extends the pipeline deadline
// Extend extends the workflow deadline
Extend(c context.Context, id string) error

// Update updates the pipeline state.
// Update updates the workflow state
Update(c context.Context, id string, state State) error

// Log writes the pipeline log entry.
// Log writes the workflow log entry
Log(c context.Context, logEntry *LogEntry) error

// RegisterAgent register our agent to the server
Expand Down
4 changes: 2 additions & 2 deletions pipeline/rpc/proto/woodpecker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ message Filter {
map<string, string> labels = 1;
}

message Pipeline {
6543 marked this conversation as resolved.
Show resolved Hide resolved
message Workflow {
string id = 1;
int64 timeout = 2;
bytes payload = 3;
Expand Down Expand Up @@ -126,7 +126,7 @@ message VersionResponse {
}

message NextResponse {
Pipeline pipeline = 1;
Workflow workflow = 1;
}

message RegisterAgentResponse {
Expand Down
8 changes: 4 additions & 4 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type RPC struct {
}

// Next implements the rpc.Next function
func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, error) {
func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) {
metadata, ok := grpcMetadata.FromIncomingContext(c)
if ok {
hostname, ok := metadata["hostname"]
Expand Down Expand Up @@ -82,9 +82,9 @@ func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Pipeline, er
}

if task.ShouldRun() {
pipeline := new(rpc.Pipeline)
err = json.Unmarshal(task.Data, pipeline)
return pipeline, err
workflow := new(rpc.Workflow)
err = json.Unmarshal(task.Data, workflow)
return workflow, err
}

if err := s.Done(c, task.ID, rpc.State{}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/pipeline/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func queuePipeline(repo *model.Repo, pipelineItems []*pipeline.Item) error {
task.RunOn = item.RunsOn
task.DepStatus = make(map[string]model.StatusValue)

task.Data, _ = json.Marshal(rpc.Pipeline{
task.Data, _ = json.Marshal(rpc.Workflow{
ID: fmt.Sprint(item.Workflow.ID),
Config: item.Config,
Timeout: repo.Timeout,
Expand Down