Skip to content

Commit

Permalink
Split GRPC code of related contexts into separate files (#798)
Browse files Browse the repository at this point in the history
  • Loading branch information
6543 authored Feb 26, 2022
1 parent 1cc8122 commit 4607cf5
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 168 deletions.
48 changes: 48 additions & 0 deletions server/grpc/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"github.com/woodpecker-ci/expr"

"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
"github.com/woodpecker-ci/woodpecker/server/queue"
)

func createFilterFunc(filter rpc.Filter) (queue.Filter, error) {
var st *expr.Selector
var err error

if filter.Expr != "" {
st, err = expr.ParseString(filter.Expr)
if err != nil {
return nil, err
}
}

return func(task *queue.Task) bool {
if st != nil {
match, _ := st.Eval(expr.NewRow(task.Labels))
return match
}

for k, v := range filter.Labels {
if task.Labels[k] != v {
return false
}
}
return true
}, nil
}
168 changes: 0 additions & 168 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@ import (
"github.com/rs/zerolog/log"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
grpcMetadata "google.golang.org/grpc/metadata"

"github.com/woodpecker-ci/expr"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
"github.com/woodpecker-ci/woodpecker/server"
"github.com/woodpecker-ci/woodpecker/server/logging"
"github.com/woodpecker-ci/woodpecker/server/model"
Expand Down Expand Up @@ -440,168 +437,3 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, build *model.Build, pr
}
return nil
}

func createFilterFunc(filter rpc.Filter) (queue.Filter, error) {
var st *expr.Selector
var err error

if filter.Expr != "" {
st, err = expr.ParseString(filter.Expr)
if err != nil {
return nil, err
}
}

return func(task *queue.Task) bool {
if st != nil {
match, _ := st.Eval(expr.NewRow(task.Labels))
return match
}

for k, v := range filter.Labels {
if task.Labels[k] != v {
return false
}
}
return true
}, nil
}

//
//
//

// WoodpeckerServer is a grpc server implementation.
type WoodpeckerServer struct {
proto.UnimplementedWoodpeckerServer
peer RPC
}

func NewWoodpeckerServer(remote remote.Remote, queue queue.Queue, logger logging.Log, pubsub pubsub.Publisher, store store.Store, host string) *WoodpeckerServer {
buildTime := promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "build_time",
Help: "Build time.",
}, []string{"repo", "branch", "status", "pipeline"})
buildCount := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "woodpecker",
Name: "build_count",
Help: "Build count.",
}, []string{"repo", "branch", "status", "pipeline"})
peer := RPC{
remote: remote,
store: store,
queue: queue,
pubsub: pubsub,
logger: logger,
host: host,
buildTime: buildTime,
buildCount: buildCount,
}
return &WoodpeckerServer{peer: peer}
}

func (s *WoodpeckerServer) Next(c context.Context, req *proto.NextRequest) (*proto.NextReply, error) {
filter := rpc.Filter{
Labels: req.GetFilter().GetLabels(),
Expr: req.GetFilter().GetExpr(),
}

res := new(proto.NextReply)
pipeline, err := s.peer.Next(c, filter)
if err != nil {
return res, err
}
if pipeline == nil {
return res, err
}

res.Pipeline = new(proto.Pipeline)
res.Pipeline.Id = pipeline.ID
res.Pipeline.Timeout = pipeline.Timeout
res.Pipeline.Payload, _ = json.Marshal(pipeline.Config)

return res, err
}

func (s *WoodpeckerServer) Init(c context.Context, req *proto.InitRequest) (*proto.Empty, error) {
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := s.peer.Init(c, req.GetId(), state)
return res, err
}

func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) (*proto.Empty, error) {
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := s.peer.Update(c, req.GetId(), state)
return res, err
}

func (s *WoodpeckerServer) Upload(c context.Context, req *proto.UploadRequest) (*proto.Empty, error) {
file := &rpc.File{
Data: req.GetFile().GetData(),
Mime: req.GetFile().GetMime(),
Name: req.GetFile().GetName(),
Proc: req.GetFile().GetProc(),
Size: int(req.GetFile().GetSize()),
Time: req.GetFile().GetTime(),
Meta: req.GetFile().GetMeta(),
}

res := new(proto.Empty)
err := s.peer.Upload(c, req.GetId(), file)
return res, err
}

func (s *WoodpeckerServer) Done(c context.Context, req *proto.DoneRequest) (*proto.Empty, error) {
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := s.peer.Done(c, req.GetId(), state)
return res, err
}

func (s *WoodpeckerServer) Wait(c context.Context, req *proto.WaitRequest) (*proto.Empty, error) {
res := new(proto.Empty)
err := s.peer.Wait(c, req.GetId())
return res, err
}

func (s *WoodpeckerServer) Extend(c context.Context, req *proto.ExtendRequest) (*proto.Empty, error) {
res := new(proto.Empty)
err := s.peer.Extend(c, req.GetId())
return res, err
}

func (s *WoodpeckerServer) Log(c context.Context, req *proto.LogRequest) (*proto.Empty, error) {
line := &rpc.Line{
Out: req.GetLine().GetOut(),
Pos: int(req.GetLine().GetPos()),
Time: req.GetLine().GetTime(),
Proc: req.GetLine().GetProc(),
}
res := new(proto.Empty)
err := s.peer.Log(c, req.GetId(), line)
return res, err
}
166 changes: 166 additions & 0 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"context"
"encoding/json"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
"github.com/woodpecker-ci/woodpecker/pipeline/rpc/proto"
"github.com/woodpecker-ci/woodpecker/server/logging"
"github.com/woodpecker-ci/woodpecker/server/pubsub"
"github.com/woodpecker-ci/woodpecker/server/queue"
"github.com/woodpecker-ci/woodpecker/server/remote"
"github.com/woodpecker-ci/woodpecker/server/store"
)

// WoodpeckerServer is a grpc server implementation.
type WoodpeckerServer struct {
proto.UnimplementedWoodpeckerServer
peer RPC
}

func NewWoodpeckerServer(remote remote.Remote, queue queue.Queue, logger logging.Log, pubsub pubsub.Publisher, store store.Store, host string) *WoodpeckerServer {
buildTime := promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "woodpecker",
Name: "build_time",
Help: "Build time.",
}, []string{"repo", "branch", "status", "pipeline"})
buildCount := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "woodpecker",
Name: "build_count",
Help: "Build count.",
}, []string{"repo", "branch", "status", "pipeline"})
peer := RPC{
remote: remote,
store: store,
queue: queue,
pubsub: pubsub,
logger: logger,
host: host,
buildTime: buildTime,
buildCount: buildCount,
}
return &WoodpeckerServer{peer: peer}
}

func (s *WoodpeckerServer) Next(c context.Context, req *proto.NextRequest) (*proto.NextReply, error) {
filter := rpc.Filter{
Labels: req.GetFilter().GetLabels(),
Expr: req.GetFilter().GetExpr(),
}

res := new(proto.NextReply)
pipeline, err := s.peer.Next(c, filter)
if err != nil {
return res, err
}
if pipeline == nil {
return res, err
}

res.Pipeline = new(proto.Pipeline)
res.Pipeline.Id = pipeline.ID
res.Pipeline.Timeout = pipeline.Timeout
res.Pipeline.Payload, _ = json.Marshal(pipeline.Config)

return res, err
}

func (s *WoodpeckerServer) Init(c context.Context, req *proto.InitRequest) (*proto.Empty, error) {
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := s.peer.Init(c, req.GetId(), state)
return res, err
}

func (s *WoodpeckerServer) Update(c context.Context, req *proto.UpdateRequest) (*proto.Empty, error) {
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := s.peer.Update(c, req.GetId(), state)
return res, err
}

func (s *WoodpeckerServer) Upload(c context.Context, req *proto.UploadRequest) (*proto.Empty, error) {
file := &rpc.File{
Data: req.GetFile().GetData(),
Mime: req.GetFile().GetMime(),
Name: req.GetFile().GetName(),
Proc: req.GetFile().GetProc(),
Size: int(req.GetFile().GetSize()),
Time: req.GetFile().GetTime(),
Meta: req.GetFile().GetMeta(),
}

res := new(proto.Empty)
err := s.peer.Upload(c, req.GetId(), file)
return res, err
}

func (s *WoodpeckerServer) Done(c context.Context, req *proto.DoneRequest) (*proto.Empty, error) {
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Finished: req.GetState().GetFinished(),
Started: req.GetState().GetStarted(),
Proc: req.GetState().GetName(),
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := s.peer.Done(c, req.GetId(), state)
return res, err
}

func (s *WoodpeckerServer) Wait(c context.Context, req *proto.WaitRequest) (*proto.Empty, error) {
res := new(proto.Empty)
err := s.peer.Wait(c, req.GetId())
return res, err
}

func (s *WoodpeckerServer) Extend(c context.Context, req *proto.ExtendRequest) (*proto.Empty, error) {
res := new(proto.Empty)
err := s.peer.Extend(c, req.GetId())
return res, err
}

func (s *WoodpeckerServer) Log(c context.Context, req *proto.LogRequest) (*proto.Empty, error) {
line := &rpc.Line{
Out: req.GetLine().GetOut(),
Pos: int(req.GetLine().GetPos()),
Time: req.GetLine().GetTime(),
Proc: req.GetLine().GetProc(),
}
res := new(proto.Empty)
err := s.peer.Log(c, req.GetId(), line)
return res, err
}

0 comments on commit 4607cf5

Please # to comment.