Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

relay.go: use config.RouterConfig() for easy config reload #917

Merged
merged 5 commits into from
Jan 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions router/frontend/frontend.go
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel
q = &cpQ
if err := relay.ProcQueryAdvanced(rst, q.Query, ph, func() error {
rst.AddQuery(q)
_, err := rst.ProcessMessageBuf(true, true, false, rst.ConnMgr())
_, err := rst.ProcessMessageBuf(true, true, false, cmngr)
return err
}, true); err != nil {
return err
@@ -71,7 +71,7 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel
if err := relay.ProcQueryAdvanced(rst, q.String, ph, func() error {
rst.AddQuery(q)

_, err := rst.ProcessMessageBuf(true, true, false, rst.ConnMgr())
_, err := rst.ProcessMessageBuf(true, true, false, cmngr)
return err
}, false); err != nil {
return err
@@ -138,7 +138,7 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel
if err := relay.ProcQueryAdvanced(rst, q.String, ph, func() error {
rst.AddQuery(q)
// this call compeletes relay, sends RFQ
_, err := rst.ProcessMessageBuf(true, true, false, rst.ConnMgr())
_, err := rst.ProcessMessageBuf(true, true, false, cmngr)
return err
}, false); err != nil {
return err
2 changes: 1 addition & 1 deletion router/relay/qstate.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ import (
)

func AdvancedPoolModeNeeded(rst RelayStateMgr) bool {
return rst.Client().Rule().PoolMode == config.PoolModeTransaction && rst.Client().Rule().PoolPreparedStatement || rst.RouterMode() == config.ProxyMode
return rst.Client().Rule().PoolMode == config.PoolModeTransaction && rst.Client().Rule().PoolPreparedStatement || config.RouterMode(config.RouterConfig().RouterMode) == config.ProxyMode
}

func deparseRouteHint(rst RelayStateMgr, params map[string]string) (routehint.RouteHint, error) {
60 changes: 15 additions & 45 deletions router/relay/relay.go
Original file line number Diff line number Diff line change
@@ -37,20 +37,14 @@ type RelayStateMgr interface {
route.RouteMgr
QueryRouter() qrouter.QueryRouter
Reset() error
StartTrace()
Flush()

ConnMgr() poolmgr.PoolMgr

ShouldRetry(err error) bool
Parse(query string, doCaching bool) (parser.ParseState, string, error)

AddQuery(q pgproto3.FrontendMessage)
AddSilentQuery(q pgproto3.FrontendMessage)
TxActive() bool

PgprotoDebug() bool

RelayStep(msg pgproto3.FrontendMessage, waitForResp bool, replyCl bool) (txstatus.TXStatus, []pgproto3.BackendMessage, error)

CompleteRelay(replyCl bool) error
@@ -78,8 +72,6 @@ type RelayStateMgr interface {

ProcCopyComplete(query *pgproto3.FrontendMessage) error

RouterMode() config.RouterMode

AddExtendedProtocMessage(q pgproto3.FrontendMessage)
ProcessExtendedBuffer(cmngr poolmgr.PoolMgr) error
}
@@ -128,14 +120,8 @@ type RelayStateImpl struct {
txStatus txstatus.TXStatus
CopyActive bool

activeShards []kr.ShardKey
TargetKeyRange kr.KeyRange

traceMsgs bool
WorldShardFallback bool
routerMode config.RouterMode

pgprotoDebug bool
traceMsgs bool
activeShards []kr.ShardKey

routingState routingstate.RoutingState

@@ -177,20 +163,17 @@ func (rst *RelayStateImpl) UnholdRouting() {

func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router) RelayStateMgr {
return &RelayStateImpl{
activeShards: nil,
txStatus: txstatus.TXIDLE,
msgBuf: nil,
traceMsgs: false,
Qr: qr,
Cl: client,
poolMgr: manager,
WorldShardFallback: rcfg.WorldShardFallback,
routerMode: config.RouterMode(rcfg.RouterMode),
maintain_params: rcfg.MaintainParams,
pgprotoDebug: rcfg.PgprotoDebug,
execute: nil,
savedPortalDesc: map[string]PortalDesc{},
parseCache: map[string]ParseCacheEntry{},
activeShards: nil,
txStatus: txstatus.TXIDLE,
msgBuf: nil,
traceMsgs: false,
Qr: qr,
Cl: client,
poolMgr: manager,
maintain_params: rcfg.MaintainParams,
execute: nil,
savedPortalDesc: map[string]PortalDesc{},
parseCache: map[string]ParseCacheEntry{},
}
}

@@ -217,10 +200,6 @@ func (rst *RelayStateImpl) SetTxStatus(status txstatus.TXStatus) {
rst.txStatus = status
}

func (rst *RelayStateImpl) PgprotoDebug() bool {
return rst.pgprotoDebug
}

func (rst *RelayStateImpl) Client() client.RouterClient {
return rst.Cl
}
@@ -311,10 +290,6 @@ func (rst *RelayStateImpl) PrepareStatement(hash uint64, d *prepstatement.Prepar
return rd, retMsg, nil
}

func (rst *RelayStateImpl) RouterMode() config.RouterMode {
return rst.routerMode
}

func (rst *RelayStateImpl) Close() error {
defer rst.Cl.Close()
defer rst.ActiveShardsReset()
@@ -343,7 +318,6 @@ func (rst *RelayStateImpl) Reset() error {
return rst.Cl.Unroute()
}

// TODO : unit tests
func (rst *RelayStateImpl) StartTrace() {
rst.traceMsgs = true
}
@@ -376,7 +350,7 @@ func (rst *RelayStateImpl) procRoutes(routes []*routingstate.DataShardRoute) err
rst.activeShards = append(rst.activeShards, shr.Shkey)
}

if rst.PgprotoDebug() {
if config.RouterConfig().PgprotoDebug {
if err := rst.Cl.ReplyDebugNoticef("matched datashard routes %+v", routes); err != nil {
return err
}
@@ -1243,7 +1217,7 @@ func (rst *RelayStateImpl) ProcessExtendedBuffer(cmngr poolmgr.PoolMgr) error {
Uint("client", rst.Client().ID()).
Msg("Parsing prepared statement")

if rst.PgprotoDebug() {
if config.RouterConfig().PgprotoDebug {
if err := rst.Client().ReplyDebugNoticef("name %v, query %v, hash %d", q.Name, q.Query, hash); err != nil {
return err
}
@@ -1746,7 +1720,3 @@ func (rst *RelayStateImpl) ProcessMessage(

return rst.CompleteRelay(replyCl)
}

func (rst *RelayStateImpl) ConnMgr() poolmgr.PoolMgr {
return rst.poolMgr
}
Loading