Skip to content

Commit fa85887

Browse files
committed
Prepare for multishard tx
1 parent 5fba86c commit fa85887

File tree

4 files changed

+63
-20
lines changed

4 files changed

+63
-20
lines changed

router/frontend/frontend.go

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func ProcessMessage(qr qrouter.QueryRouter, rst relay.RelayStateMgr, msg pgproto
3838
rst.AddQuery(q)
3939
return rst.ProcessMessageBuf(true, true)
4040
}, true); err != nil {
41+
_ = rst.CompleteRelay(true)
4142
return err
4243
}
4344

@@ -66,6 +67,7 @@ func ProcessMessage(qr qrouter.QueryRouter, rst relay.RelayStateMgr, msg pgproto
6667

6768
return rst.ProcessMessageBuf(true, true)
6869
}, false); err != nil {
70+
_ = rst.CompleteRelay(true)
6971
return err
7072
}
7173

@@ -132,6 +134,7 @@ func ProcessMessage(qr qrouter.QueryRouter, rst relay.RelayStateMgr, msg pgproto
132134
// this call compeletes relay, sends RFQ
133135
return rst.ProcessMessageBuf(true, true)
134136
}, false); err != nil {
137+
_ = rst.CompleteRelay(true)
135138
return err
136139
}
137140

router/relay/phandler.go

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/pg-sharding/spqr/router/client"
1111
"github.com/pg-sharding/spqr/router/parser"
1212
"github.com/pg-sharding/spqr/router/pgcopy"
13+
"github.com/pg-sharding/spqr/router/server"
1314
)
1415

1516
// Execute requered command via
@@ -19,6 +20,8 @@ type QueryStateExecutor interface {
1920

2021
Client() client.RouterClient
2122

23+
Deploy(server server.Server) error
24+
2225
ExecBegin(rst RelayStateMgr, query string, st *parser.ParseStateTXBegin) error
2326
ExecCommit(rst RelayStateMgr, query string) error
2427
ExecRollback(rst RelayStateMgr, query string) error

router/relay/phs.go

+38-18
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/pg-sharding/spqr/router/parser"
1919
"github.com/pg-sharding/spqr/router/pgcopy"
2020
"github.com/pg-sharding/spqr/router/rmeta"
21+
"github.com/pg-sharding/spqr/router/server"
2122

2223
"github.com/pg-sharding/lyx/lyx"
2324
)
@@ -27,6 +28,24 @@ type QueryStateExecutorImpl struct {
2728

2829
txStatus txstatus.TXStatus
2930
cl client.RouterClient
31+
32+
savedBegin *pgproto3.Query
33+
}
34+
35+
// Deploy implements QueryStateExecutor.
36+
func (s *QueryStateExecutorImpl) Deploy(server server.Server) error {
37+
if s.txStatus == txstatus.TXIDLE {
38+
return nil
39+
}
40+
41+
for _, sh := range server.Datashards() {
42+
if err := sh.Send(s.savedBegin); err != nil {
43+
s.txStatus = txstatus.TXERR
44+
return err
45+
}
46+
}
47+
48+
return nil
3049
}
3150

3251
func (s *QueryStateExecutorImpl) SetTxStatus(status txstatus.TXStatus) {
@@ -41,12 +60,10 @@ func (s *QueryStateExecutorImpl) TxStatus() txstatus.TXStatus {
4160

4261
func (s *QueryStateExecutorImpl) ExecBegin(rst RelayStateMgr, query string, st *parser.ParseStateTXBegin) error {
4362
// explicitly set silent query message, as it can differ from query begin in xporot
44-
rst.AddSilentQuery(&pgproto3.Query{
45-
String: query,
46-
})
4763

4864
s.SetTxStatus(txstatus.TXACT)
4965
s.cl.StartTx()
66+
s.savedBegin = &pgproto3.Query{String: query}
5067

5168
spqrlog.Zero.Debug().Msg("start new transaction")
5269

@@ -59,7 +76,6 @@ func (s *QueryStateExecutorImpl) ExecBegin(rst RelayStateMgr, query string, st *
5976
}
6077
}
6178
return rst.Client().ReplyCommandComplete("BEGIN")
62-
6379
}
6480

6581
// query in commit query. maybe commit or commit `name`
@@ -73,14 +89,16 @@ func (s *QueryStateExecutorImpl) ExecCommit(rst RelayStateMgr, query string) err
7389
rst.Flush()
7490
return nil
7591
}
76-
rst.AddQuery(&pgproto3.Query{
77-
String: query,
78-
})
79-
err := rst.ProcessMessageBuf(true, true)
80-
if err == nil {
81-
rst.Client().CommitActiveSet()
92+
93+
for _, sh := range rst.Client().Server().Datashards() {
94+
if err := sh.Send(&pgproto3.Query{String: query}); err != nil {
95+
s.txStatus = txstatus.TXERR
96+
return err
97+
}
8298
}
83-
return err
99+
100+
rst.Client().CommitActiveSet()
101+
return nil
84102
}
85103

86104
/* TODO: proper support for rollback to savepoint */
@@ -94,14 +112,16 @@ func (s *QueryStateExecutorImpl) ExecRollback(rst RelayStateMgr, query string) e
94112
rst.Flush()
95113
return nil
96114
}
97-
rst.AddQuery(&pgproto3.Query{
98-
String: query,
99-
})
100-
err := rst.ProcessMessageBuf(true, true)
101-
if err == nil {
102-
s.cl.Rollback()
115+
116+
for _, sh := range rst.Client().Server().Datashards() {
117+
if err := sh.Send(&pgproto3.Query{String: query}); err != nil {
118+
s.txStatus = txstatus.TXERR
119+
return err
120+
}
103121
}
104-
return err
122+
123+
s.cl.Rollback()
124+
return nil
105125
}
106126

107127
func (s *QueryStateExecutorImpl) ExecSet(rst RelayStateMgr, query string, name, value string) error {

router/relay/relay.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ var ErrSkipQuery = fmt.Errorf("wait for a next query")
501501

502502
// TODO : unit tests
503503
func (rst *RelayStateImpl) procRoutes(routes []*kr.ShardKey) error {
504-
// if there is no routes configurted, there is nowhere to route to
504+
// if there is no routes configured, there is nowhere to route to
505505
if len(routes) == 0 {
506506
return qrouter.MatchShardError
507507
}
@@ -531,6 +531,15 @@ func (rst *RelayStateImpl) procRoutes(routes []*kr.ShardKey) error {
531531
Uint("client", rst.Client().ID()).
532532
Msg("client encounter while initialing server connection")
533533

534+
_ = rst.Unroute(rst.activeShards)
535+
_ = rst.Reset()
536+
return err
537+
}
538+
539+
/* if transaction is explicitly requested, deploy */
540+
541+
if err := rst.QueryExecutor().Deploy(rst.Client().Server()); err != nil {
542+
_ = rst.Unroute(rst.activeShards)
534543
_ = rst.Reset()
535544
return err
536545
}
@@ -756,6 +765,10 @@ func (rst *RelayStateImpl) CompleteRelay(replyCl bool) error {
756765
return nil
757766
}
758767

768+
_ = rst.Reset()
769+
770+
_ = rst.Unroute(rst.activeShards)
771+
759772
if replyCl {
760773
if err := rst.Cl.Send(&pgproto3.ReadyForQuery{
761774
TxStatus: byte(txstatus.TXIDLE),
@@ -793,7 +806,10 @@ func (rst *RelayStateImpl) CompleteRelay(replyCl bool) error {
793806
return err
794807
}
795808
}
796-
return nil
809+
810+
_ = rst.Reset()
811+
812+
return rst.Unroute(rst.activeShards)
797813
default:
798814
err := fmt.Errorf("unknown tx status %v", rst.qse.TxStatus())
799815
return err
@@ -1087,6 +1103,7 @@ func (rst *RelayStateImpl) ProcessExtendedBuffer() error {
10871103

10881104
return nil
10891105
}, true /* cache parsing for prep statement */); err != nil {
1106+
_ = rst.CompleteRelay(true)
10901107
return err
10911108
}
10921109

0 commit comments

Comments
 (0)