Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

manager: manage capture and replay jobs #666

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/proxy/backend/mock_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,9 @@ func (mc *mockCapture) Capture(packet []byte, startTime time.Time, connID uint64
mc.connID = connID
}

func (mc *mockCapture) Progress() (float64, error) {
return 0, nil
}

func (mc *mockCapture) Close() {
}
11 changes: 11 additions & 0 deletions pkg/sqlreplay/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Capture interface {
Stop(err error)
// Capture captures traffic
Capture(packet []byte, startTime time.Time, connID uint64)
// Progress returns the progress of the capture job
Progress() (float64, error)
// Close closes the capture
Close()
}
Expand Down Expand Up @@ -202,6 +204,15 @@ func (c *capture) Capture(packet []byte, startTime time.Time, connID uint64) {
}
}

func (c *capture) Progress() (float64, error) {
c.Lock()
defer c.Unlock()
if c.startTime.IsZero() || c.cfg.Duration == 0 {
return 0, c.err
}
return float64(time.Since(c.startTime)) / float64(c.cfg.Duration), c.err
}

// stopNoLock must be called after holding a lock.
func (c *capture) stopNoLock(err error) {
// already stopped
Expand Down
7 changes: 6 additions & 1 deletion pkg/sqlreplay/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

"github.com/pingcap/tiproxy/lib/util/errors"
"github.com/pingcap/tiproxy/lib/util/logger"
"github.com/pingcap/tiproxy/lib/util/waitgroup"
pnet "github.com/pingcap/tiproxy/pkg/proxy/net"
Expand All @@ -34,7 +35,11 @@ func TestStartAndStop(t *testing.T) {
// start capture and the traffic should be outputted
require.NoError(t, cpt.Start(cfg))
cpt.Capture(packet, time.Now(), 100)
cpt.Stop(nil)
_, err := cpt.Progress()
require.NoError(t, err)
cpt.Stop(errors.Errorf("mock error"))
_, err = cpt.Progress()
require.ErrorContains(t, err, "mock error")
cpt.wg.Wait()
data := writer.getData()
require.Greater(t, len(data), 0)
Expand Down
72 changes: 72 additions & 0 deletions pkg/sqlreplay/manager/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package manager

import (
"encoding/json"
"time"

"github.com/siddontang/go/hack"
)

type jobType int

const (
Capture jobType = iota
Replay
)

type Job interface {
Type() jobType
String() string
SetProgress(progress float64, err error)
IsRunning() bool
}

type job struct {
StartTime time.Time
Duration time.Duration
Progress float64
Error error
}

func (job *job) IsRunning() bool {
return job.Error == nil
}

// TODO: refine the output
func (job *job) String() string {
b, err := json.Marshal(job)
if err != nil {
return err.Error()
}
return hack.String(b)
}

func (job *job) SetProgress(progress float64, err error) {
if progress > job.Progress {
job.Progress = progress
}
job.Error = err
}

var _ Job = (*captureJob)(nil)

type captureJob struct {
job
}

func (job *captureJob) Type() jobType {
return Capture
}

var _ Job = (*replayJob)(nil)

type replayJob struct {
job
}

func (job *replayJob) Type() jobType {
return Replay
}
46 changes: 46 additions & 0 deletions pkg/sqlreplay/manager/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package manager

import (
"testing"
"time"

"github.com/pingcap/tiproxy/lib/util/errors"
"github.com/stretchr/testify/require"
)

func TestJobs(t *testing.T) {
tests := []struct {
job Job
tp jobType
}{
{
&captureJob{
job: job{
StartTime: time.Now(),
Duration: 10 * time.Second,
},
},
Capture,
},
{
&replayJob{
job: job{
StartTime: time.Now(),
Duration: 10 * time.Second,
},
},
Replay,
},
}

for i, test := range tests {
require.Equal(t, test.tp, test.job.Type(), "case %d", i)
require.True(t, test.job.IsRunning(), "case %d", i)
test.job.SetProgress(0.5, errors.New("stopped manually"))
require.False(t, test.job.IsRunning(), "case %d", i)
require.NotEmpty(t, test.job.String(), "case %d", i)
}
}
155 changes: 155 additions & 0 deletions pkg/sqlreplay/manager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package manager

import (
"crypto/tls"
"encoding/json"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/errors"
"github.com/pingcap/tiproxy/pkg/proxy/backend"
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
"github.com/siddontang/go/hack"
"go.uber.org/zap"
)

type CertManager interface {
SQLTLS() *tls.Config
}

type JobManager interface {
StartCapture(capture.CaptureConfig) error
StartReplay(replay.ReplayConfig) error
GetCapture() capture.Capture
Stop() string
Jobs() string
Close()
}

var _ JobManager = (*jobManager)(nil)

type jobManager struct {
jobHistory []Job
capture capture.Capture
replay replay.Replay
hsHandler backend.HandshakeHandler
certManager CertManager
cfg *config.Config
lg *zap.Logger
}

func NewJobManager(lg *zap.Logger, cfg *config.Config, certMgr CertManager, hsHandler backend.HandshakeHandler) *jobManager {
return &jobManager{
lg: lg,
capture: capture.NewCapture(lg.Named("capture")),
replay: replay.NewReplay(lg.Named("replay")),
hsHandler: hsHandler,
cfg: cfg,
certManager: certMgr,
}
}

func (jm *jobManager) runningJob() Job {
if len(jm.jobHistory) == 0 {
return nil
}
job := jm.jobHistory[len(jm.jobHistory)-1]
if job.IsRunning() {
switch job.Type() {
case Capture:
progress, err := jm.capture.Progress()
job.SetProgress(progress, err)
case Replay:
progress, err := jm.replay.Progress()
job.SetProgress(progress, err)
}
if job.IsRunning() {
return job
}
}
return nil
}

func (jm *jobManager) StartCapture(cfg capture.CaptureConfig) error {
running := jm.runningJob()
if running != nil {
return errors.Errorf("a job is running: %s", running.String())
}
newJob := &captureJob{
job: job{
StartTime: time.Now(),
Duration: cfg.Duration,
},
}
err := jm.capture.Start(cfg)
if err != nil {
newJob.SetProgress(0, err)
}
jm.jobHistory = append(jm.jobHistory, newJob)
return errors.Wrapf(err, "start capture failed")
}

func (jm *jobManager) StartReplay(cfg replay.ReplayConfig) error {
running := jm.runningJob()
if running != nil {
return errors.Errorf("a job is running: %s", running.String())
}
newJob := &replayJob{
job: job{
StartTime: time.Now(),
},
}
// TODO: support update configs online
err := jm.replay.Start(cfg, jm.certManager.SQLTLS(), jm.hsHandler, &backend.BCConfig{
ProxyProtocol: jm.cfg.Proxy.ProxyProtocol != "",
RequireBackendTLS: jm.cfg.Security.RequireBackendTLS,
HealthyKeepAlive: jm.cfg.Proxy.BackendHealthyKeepalive,
UnhealthyKeepAlive: jm.cfg.Proxy.BackendUnhealthyKeepalive,
ConnBufferSize: jm.cfg.Proxy.ConnBufferSize,
})
if err != nil {
newJob.SetProgress(0, err)
}
jm.jobHistory = append(jm.jobHistory, newJob)
return errors.Wrapf(err, "start replay failed")
}

func (jm *jobManager) GetCapture() capture.Capture {
return jm.capture
}

func (jm *jobManager) Jobs() string {
b, err := json.Marshal(jm.jobHistory)
if err != nil {
return err.Error()
}
return hack.String(b)
}

func (jm *jobManager) Stop() string {
job := jm.runningJob()
if job == nil {
return "no job running"
}
switch job.Type() {
case Capture:
jm.capture.Stop(errors.Errorf("manually stopped"))
case Replay:
jm.replay.Stop(errors.Errorf("manually stopped"))
}
job.SetProgress(0, errors.Errorf("manually stopped"))
return "stopped: " + job.String()
}

func (jm *jobManager) Close() {
if jm.capture != nil {
jm.capture.Close()
}
if jm.replay != nil {
jm.replay.Close()
}
}
41 changes: 41 additions & 0 deletions pkg/sqlreplay/manager/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package manager

import (
"testing"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/pkg/sqlreplay/capture"
"github.com/pingcap/tiproxy/pkg/sqlreplay/replay"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestStartAndStop(t *testing.T) {
mgr := NewJobManager(zap.NewNop(), &config.Config{}, &mockCertMgr{}, nil)
defer mgr.Close()
mgr.capture = &mockCapture{}
mgr.replay = &mockReplay{}

require.Contains(t, mgr.Stop(), "no job running")
require.NotNil(t, mgr.GetCapture())

require.NoError(t, mgr.StartCapture(capture.CaptureConfig{}))
require.Error(t, mgr.StartCapture(capture.CaptureConfig{}))
require.Error(t, mgr.StartReplay(replay.ReplayConfig{}))
require.Len(t, mgr.jobHistory, 1)
require.NotEmpty(t, mgr.Jobs())
require.Contains(t, mgr.Stop(), "stopped")
require.Contains(t, mgr.Stop(), "no job running")
require.Len(t, mgr.jobHistory, 1)

require.NoError(t, mgr.StartReplay(replay.ReplayConfig{}))
require.Error(t, mgr.StartCapture(capture.CaptureConfig{}))
require.Error(t, mgr.StartReplay(replay.ReplayConfig{}))
require.Len(t, mgr.jobHistory, 2)
require.Contains(t, mgr.Stop(), "stopped")
require.Contains(t, mgr.Stop(), "no job running")
require.Len(t, mgr.jobHistory, 2)
}
Loading