Skip to content
This repository has been archived by the owner on Aug 24, 2022. It is now read-only.

PMM-5194 Tunnels #227

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ services:

go:
- 1.15.x
- tip
- 1.16.x
# TODO enable when 1.15.x is removed
# - tip

matrix:
fast_finish: true
Expand Down
12 changes: 8 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ required = [

[[constraint]]
name = "github.com/percona/pmm"
branch = "PMM-2.0"
# branch = "PMM-2.0"
branch = "PMM-5194-tunnels"

[[constraint]]
name = "github.com/percona/go-mysql"
Expand Down
1 change: 1 addition & 0 deletions actions/concurrent_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (r *ConcurrentRunner) Start(a Action, timeout time.Duration) {
// 5. Add panics with "sync: WaitGroup misuse: Add called concurrently with Wait"
// See skipped test (run it in a loop with race detector).
// https://jira.percona.com/browse/PMM-4112
// https://jira.percona.com/browse/PMM-7206
r.runningActions.Add(1)
actionID, actionType := a.ID(), a.Type()

Expand Down
17 changes: 6 additions & 11 deletions agentlocal/agent_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/percona/pmm/api/agentlocalpb"
"github.com/percona/pmm/api/agentpb"
"github.com/percona/pmm/version"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand All @@ -52,13 +51,11 @@ const (
shutdownTimeout = 1 * time.Second
)

// ErrReload is returned from Service.Run after request to reload configuration.
var ErrReload = errors.New("reload")

// Server represents local pmm-agent API server.
type Server struct {
cfg *config.Config
supervisor supervisor
registry registry
client client
configFilepath string

Expand All @@ -70,10 +67,11 @@ type Server struct {
// NewServer creates new server.
//
// Caller should call Run.
func NewServer(cfg *config.Config, supervisor supervisor, client client, configFilepath string) *Server {
func NewServer(cfg *config.Config, supervisor supervisor, registry registry, client client, configFilepath string) *Server {
return &Server{
cfg: cfg,
supervisor: supervisor,
registry: registry,
client: client,
configFilepath: configFilepath,
l: logrus.WithField("component", "local-server"),
Expand All @@ -84,8 +82,7 @@ func NewServer(cfg *config.Config, supervisor supervisor, client client, configF
// Run runs gRPC and JSON servers with API and debug endpoints until ctx is canceled.
//
// Run exits when ctx is canceled, or when a request to reload configuration is received.
// In the latter case, the returned error is ErrReload.
func (s *Server) Run(ctx context.Context) error {
func (s *Server) Run(ctx context.Context) {
defer s.l.Info("Done.")

serverCtx, serverCancel := context.WithCancel(ctx)
Expand All @@ -109,16 +106,12 @@ func (s *Server) Run(ctx context.Context) error {
s.runJSONServer(serverCtx, l.Addr().String())
}()

var res error
select {
case <-ctx.Done():
res = ctx.Err()
case <-s.reload:
res = ErrReload
}
serverCancel()
wg.Wait()
return res
}

// Status returns current pmm-agent status.
Expand Down Expand Up @@ -151,6 +144,7 @@ func (s *Server) Status(ctx context.Context, req *agentlocalpb.StatusRequest) (*
}

agentsInfo := s.supervisor.AgentsList()
tunnelsInfo := s.registry.TunnelsList()

return &agentlocalpb.StatusResponse{
AgentId: s.cfg.ID,
Expand All @@ -159,6 +153,7 @@ func (s *Server) Status(ctx context.Context, req *agentlocalpb.StatusRequest) (*
AgentsInfo: agentsInfo,
ConfigFilepath: s.configFilepath,
AgentVersion: version.Version,
TunnelsInfo: tunnelsInfo,
}, nil
}

Expand Down
83 changes: 50 additions & 33 deletions agentlocal/agent_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,38 +30,53 @@ import (
"github.com/percona/pmm-agent/config"
)

func TestServerStatus(t *testing.T) {
setup := func(t *testing.T) ([]*agentlocalpb.AgentInfo, *mockSupervisor, *mockClient, *config.Config) {
agentInfo := []*agentlocalpb.AgentInfo{{
AgentId: "/agent_id/00000000-0000-4000-8000-000000000002",
AgentType: inventorypb.AgentType_NODE_EXPORTER,
Status: inventorypb.AgentStatus_RUNNING,
}}
supervisor := new(mockSupervisor)
supervisor.Test(t)
supervisor.On("AgentsList").Return(agentInfo)
client := new(mockClient)
client.Test(t)
client.On("GetServerConnectMetadata").Return(&agentpb.ServerConnectMetadata{
AgentRunsOnNodeID: "/node_id/00000000-0000-4000-8000-000000000003",
ServerVersion: "2.0.0-dev",
})
cfg := &config.Config{
ID: "/agent_id/00000000-0000-4000-8000-000000000001",
Server: config.Server{
Address: "127.0.0.1:8443",
Username: "username",
Password: "password",
},
}
return agentInfo, supervisor, client, cfg
func setup(t *testing.T) ([]*agentlocalpb.AgentInfo, []*agentlocalpb.TunnelInfo, *mockSupervisor, *mockRegistry, *mockClient, *config.Config) {
t.Helper()

agentInfo := []*agentlocalpb.AgentInfo{{
AgentId: "/agent_id/00000000-0000-4000-8000-000000000002",
AgentType: inventorypb.AgentType_NODE_EXPORTER,
Status: inventorypb.AgentStatus_RUNNING,
}}
supervisor := new(mockSupervisor)
supervisor.Test(t)
supervisor.On("AgentsList").Return(agentInfo)
t.Cleanup(func() { supervisor.AssertExpectations(t) })

tunnelInfo := []*agentlocalpb.TunnelInfo{}
registry := new(mockRegistry)
registry.Test(t)
registry.On("TunnelsList").Return(tunnelInfo)
t.Cleanup(func() { registry.AssertExpectations(t) })

client := new(mockClient)
client.Test(t)
client.On("GetServerConnectMetadata").Return(&agentpb.ServerConnectMetadata{
AgentRunsOnNodeID: "/node_id/00000000-0000-4000-8000-000000000003",
ServerVersion: "2.0.0-dev",
})
t.Cleanup(func() { client.AssertExpectations(t) })

cfg := &config.Config{
ID: "/agent_id/00000000-0000-4000-8000-000000000001",
Server: config.Server{
Address: "127.0.0.1:8443",
Username: "username",
Password: "password",
},
}

return agentInfo, tunnelInfo, supervisor, registry, client, cfg
}

func TestServerStatus(t *testing.T) {
t.Parallel()

t.Run("without network info", func(t *testing.T) {
agentInfo, supervisor, client, cfg := setup(t)
defer supervisor.AssertExpectations(t)
defer client.AssertExpectations(t)
s := NewServer(cfg, supervisor, client, "/some/dir/pmm-agent.yaml")
t.Parallel()

agentInfo, tunnelInfo, supervisor, registry, client, cfg := setup(t)
s := NewServer(cfg, supervisor, registry, client, "/some/dir/pmm-agent.yaml")

// without network info
actual, err := s.Status(context.Background(), &agentlocalpb.StatusRequest{GetNetworkInfo: false})
Expand All @@ -76,18 +91,19 @@ func TestServerStatus(t *testing.T) {
},
AgentsInfo: agentInfo,
ConfigFilepath: "/some/dir/pmm-agent.yaml",
TunnelsInfo: tunnelInfo,
}
assert.Equal(t, expected, actual)
})

t.Run("with network info", func(t *testing.T) {
agentInfo, supervisor, client, cfg := setup(t)
t.Parallel()

agentInfo, tunnelInfo, supervisor, registry, client, cfg := setup(t)
latency := 5 * time.Millisecond
clockDrift := time.Second
client.On("GetNetworkInformation").Return(latency, clockDrift, nil)
defer supervisor.AssertExpectations(t)
defer client.AssertExpectations(t)
s := NewServer(cfg, supervisor, client, "/some/dir/pmm-agent.yaml")
s := NewServer(cfg, supervisor, registry, client, "/some/dir/pmm-agent.yaml")

// with network info
actual, err := s.Status(context.Background(), &agentlocalpb.StatusRequest{GetNetworkInfo: true})
Expand All @@ -104,6 +120,7 @@ func TestServerStatus(t *testing.T) {
},
AgentsInfo: agentInfo,
ConfigFilepath: "/some/dir/pmm-agent.yaml",
TunnelsInfo: tunnelInfo,
}
assert.Equal(t, expected, actual)
})
Expand Down
7 changes: 7 additions & 0 deletions agentlocal/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

//go:generate mockery -name=client -case=snake -inpkg -testonly
//go:generate mockery -name=registry -case=snake -inpkg -testonly
//go:generate mockery -name=supervisor -case=snake -inpkg -testonly

// client is a subset of methods of client.Client used by this package.
Expand All @@ -35,6 +36,12 @@ type client interface {
GetNetworkInformation() (latency, clockDrift time.Duration, err error)
}

// registry is a subset of methods of tunnels.Registry used by this package.
// We use it instead of real type for testing and to avoid dependency cycle.
type registry interface {
TunnelsList() []*agentlocalpb.TunnelInfo
}

// supervisor is a subset of methods of supervisor.Supervisor used by this package.
// We use it instead of real type for testing and to avoid dependency cycle.
type supervisor interface {
Expand Down
29 changes: 29 additions & 0 deletions agentlocal/mock_registry_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions client/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type AgentResponse struct {

// Channel encapsulates two-way communication channel between pmm-managed and pmm-agent.
//
// pmm-agent establishes a single gRPC connection with pmm-managed
// and then starts a single gRPC bidirectional stream, which is represented by this type.
//
// All exported methods are thread-safe.
type Channel struct { //nolint:maligned
s agentpb.Agent_ConnectClient
Expand All @@ -73,7 +76,7 @@ type Channel struct { //nolint:maligned
closeErr error
}

// New creates new two-way communication channel with given stream.
// New creates new two-way communication channel with given gRPC bidirectional stream.
//
// Stream should not be used by the caller after channel is created.
func New(stream agentpb.Agent_ConnectClient) *Channel {
Expand Down Expand Up @@ -137,7 +140,7 @@ func (c *Channel) Requests() <-chan *ServerRequest {
return c.requests
}

// SendResponse sends message to pmm-managed. It is no-op once channel is closed (see Wait).
// SendResponse sends response to pmm-managed. It is no-op once channel is closed (see Wait).
func (c *Channel) SendResponse(resp *AgentResponse) {
msg := &agentpb.AgentMessage{
Id: resp.ID,
Expand Down Expand Up @@ -235,6 +238,11 @@ func (c *Channel) runReceiver() {
ID: msg.Id,
Payload: p.CheckConnection,
}
case *agentpb.ServerMessage_TunnelData:
c.requests <- &ServerRequest{
ID: msg.Id,
Payload: p.TunnelData,
}

// responses
case *agentpb.ServerMessage_Pong:
Expand All @@ -245,6 +253,8 @@ func (c *Channel) runReceiver() {
c.publish(msg.Id, p.QanCollect)
case *agentpb.ServerMessage_ActionResult:
c.publish(msg.Id, p.ActionResult)
case *agentpb.ServerMessage_TunnelDataAck:
c.publish(msg.Id, p.TunnelDataAck)

case nil:
c.close(errors.Errorf("failed to handle received message %s", msg))
Expand Down
Loading