Skip to content

Commit 25021af

Browse files
authored
Transfer setup & teardown copy statement aux functions to executor (#999)
* Transfer setup & teardown copy statement aux functions to executor * Transfer ProcQuery logic to Executor * fix
1 parent 648be91 commit 25021af

File tree

7 files changed

+386
-351
lines changed

7 files changed

+386
-351
lines changed

router/frontend/frontend.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func Frontend(qr qrouter.QueryRouter, cl client.RouterClient, cmngr poolmgr.Pool
192192
// ok
193193
default:
194194
spqrlog.Zero.Error().
195-
Uint("client", rst.Client().ID()).Int("tx-status", int(rst.TxStatus())).Err(err).
195+
Uint("client", rst.Client().ID()).Int("tx-status", int(rst.QueryExecutor().TxStatus())).Err(err).
196196
Msg("client iteration done with error")
197197
if err := rst.UnRouteWithError(rst.ActiveShards(), fmt.Errorf("client processing error: %v, tx status %s", err, rst.TxStatus().String())); err != nil {
198198
return err

router/frontend/frontend_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,17 @@ func TestFrontendSimple(t *testing.T) {
5959
qr := mockqr.NewMockQueryRouter(ctrl)
6060
cmngr := mockcmgr.NewMockPoolMgr(ctrl)
6161

62+
mmgr := mockmgr.NewMockEntityMgr(ctrl)
63+
6264
frrule := &config.FrontendRule{
6365
DB: "db1",
6466
Usr: "user1",
6567
}
6668

6769
beRule := &config.BackendRule{}
6870

71+
qr.EXPECT().Mgr().Return(mmgr).AnyTimes()
72+
6973
srv.EXPECT().Datashards().AnyTimes().Return([]shard.Shard{})
7074
srv.EXPECT().Name().AnyTimes().Return("serv1")
7175

@@ -167,6 +171,8 @@ func TestFrontendXProto(t *testing.T) {
167171
qr := mockqr.NewMockQueryRouter(ctrl)
168172
cmngr := mockcmgr.NewMockPoolMgr(ctrl)
169173

174+
mmgr := mockmgr.NewMockEntityMgr(ctrl)
175+
170176
frrule := &config.FrontendRule{
171177
DB: "db1",
172178
Usr: "user1",
@@ -175,6 +181,8 @@ func TestFrontendXProto(t *testing.T) {
175181

176182
beRule := &config.BackendRule{}
177183

184+
qr.EXPECT().Mgr().Return(mmgr).AnyTimes()
185+
178186
sh.EXPECT().ID().AnyTimes()
179187
sh.EXPECT().Send(gomock.Any()).AnyTimes()
180188
sh.EXPECT().Receive().AnyTimes()

router/qrouter/local.go

+5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/juju/errors"
77
"github.com/pg-sharding/lyx/lyx"
88
"github.com/pg-sharding/spqr/pkg/config"
9+
"github.com/pg-sharding/spqr/pkg/meta"
910
"github.com/pg-sharding/spqr/pkg/models/kr"
1011
"github.com/pg-sharding/spqr/pkg/models/topology"
1112
"github.com/pg-sharding/spqr/pkg/session"
@@ -88,3 +89,7 @@ func (l *LocalQrouter) DataShardsRoutes() []*kr.ShardKey {
8889
},
8990
}
9091
}
92+
93+
func (l *LocalQrouter) Mgr() meta.EntityMgr {
94+
return nil
95+
}

router/relay/phandler.go

+16
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,35 @@
11
package relay
22

33
import (
4+
"context"
5+
6+
"github.com/jackc/pgx/v5/pgproto3"
7+
"github.com/pg-sharding/lyx/lyx"
8+
"github.com/pg-sharding/spqr/pkg/meta"
49
"github.com/pg-sharding/spqr/pkg/txstatus"
10+
"github.com/pg-sharding/spqr/router/client"
511
"github.com/pg-sharding/spqr/router/parser"
12+
"github.com/pg-sharding/spqr/router/pgcopy"
613
)
714

815
// Execute requered command via
916
// some protoc-specific logic
1017
type QueryStateExecutor interface {
1118
txstatus.TxStatusMgr
1219

20+
Client() client.RouterClient
21+
1322
ExecBegin(rst RelayStateMgr, query string, st *parser.ParseStateTXBegin) error
1423
ExecCommit(rst RelayStateMgr, query string) error
1524
ExecRollback(rst RelayStateMgr, query string) error
1625

26+
/* Copy execution */
27+
ProcCopyPrepare(ctx context.Context, mgr meta.EntityMgr, stmt *lyx.Copy) (*pgcopy.CopyState, error)
28+
ProcCopy(ctx context.Context, data *pgproto3.CopyData, cps *pgcopy.CopyState) ([]byte, error)
29+
ProcCopyComplete(query pgproto3.FrontendMessage) error
30+
31+
ProcQuery(query pgproto3.FrontendMessage, stmt lyx.Node, mgr meta.EntityMgr, waitForResp bool, replyCl bool) ([]pgproto3.BackendMessage, bool, error)
32+
1733
ExecSet(rst RelayStateMgr, query, name, value string) error
1834
ExecSetLocal(rst RelayStateMgr, query, name, value string) error
1935
ExecReset(rst RelayStateMgr, query, name string) error

0 commit comments

Comments
 (0)