Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

relay.go: use config.RouterConfig() for easy config reload #917

Merged
merged 5 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions router/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel
q = &cpQ
if err := relay.ProcQueryAdvanced(rst, q.Query, ph, func() error {
rst.AddQuery(q)
_, err := rst.ProcessMessageBuf(true, true, false, rst.ConnMgr())
_, err := rst.ProcessMessageBuf(true, true, false, cmngr)
return err
}, true); err != nil {
return err
Expand Down Expand Up @@ -71,7 +71,7 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel
if err := relay.ProcQueryAdvanced(rst, q.String, ph, func() error {
rst.AddQuery(q)

_, err := rst.ProcessMessageBuf(true, true, false, rst.ConnMgr())
_, err := rst.ProcessMessageBuf(true, true, false, cmngr)
return err
}, false); err != nil {
return err
Expand Down Expand Up @@ -138,7 +138,7 @@ func ProcessMessage(qr qrouter.QueryRouter, cmngr poolmgr.PoolMgr, rst relay.Rel
if err := relay.ProcQueryAdvanced(rst, q.String, ph, func() error {
rst.AddQuery(q)
// this call compeletes relay, sends RFQ
_, err := rst.ProcessMessageBuf(true, true, false, rst.ConnMgr())
_, err := rst.ProcessMessageBuf(true, true, false, cmngr)
return err
}, false); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion router/relay/qstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func AdvancedPoolModeNeeded(rst RelayStateMgr) bool {
return rst.Client().Rule().PoolMode == config.PoolModeTransaction && rst.Client().Rule().PoolPreparedStatement || rst.RouterMode() == config.ProxyMode
return rst.Client().Rule().PoolMode == config.PoolModeTransaction && rst.Client().Rule().PoolPreparedStatement || config.RouterMode(config.RouterConfig().RouterMode) == config.ProxyMode
}

func deparseRouteHint(rst RelayStateMgr, params map[string]string) (routehint.RouteHint, error) {
Expand Down
60 changes: 15 additions & 45 deletions router/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,14 @@ type RelayStateMgr interface {
route.RouteMgr
QueryRouter() qrouter.QueryRouter
Reset() error
StartTrace()
Flush()

ConnMgr() poolmgr.PoolMgr

ShouldRetry(err error) bool
Parse(query string, doCaching bool) (parser.ParseState, string, error)

AddQuery(q pgproto3.FrontendMessage)
AddSilentQuery(q pgproto3.FrontendMessage)
TxActive() bool

PgprotoDebug() bool

RelayStep(msg pgproto3.FrontendMessage, waitForResp bool, replyCl bool) (txstatus.TXStatus, []pgproto3.BackendMessage, error)

CompleteRelay(replyCl bool) error
Expand Down Expand Up @@ -78,8 +72,6 @@ type RelayStateMgr interface {

ProcCopyComplete(query *pgproto3.FrontendMessage) error

RouterMode() config.RouterMode

AddExtendedProtocMessage(q pgproto3.FrontendMessage)
ProcessExtendedBuffer(cmngr poolmgr.PoolMgr) error
}
Expand Down Expand Up @@ -128,14 +120,8 @@ type RelayStateImpl struct {
txStatus txstatus.TXStatus
CopyActive bool

activeShards []kr.ShardKey
TargetKeyRange kr.KeyRange

traceMsgs bool
WorldShardFallback bool
routerMode config.RouterMode

pgprotoDebug bool
traceMsgs bool
activeShards []kr.ShardKey

routingState routingstate.RoutingState

Expand Down Expand Up @@ -177,20 +163,17 @@ func (rst *RelayStateImpl) UnholdRouting() {

func NewRelayState(qr qrouter.QueryRouter, client client.RouterClient, manager poolmgr.PoolMgr, rcfg *config.Router) RelayStateMgr {
return &RelayStateImpl{
activeShards: nil,
txStatus: txstatus.TXIDLE,
msgBuf: nil,
traceMsgs: false,
Qr: qr,
Cl: client,
poolMgr: manager,
WorldShardFallback: rcfg.WorldShardFallback,
routerMode: config.RouterMode(rcfg.RouterMode),
maintain_params: rcfg.MaintainParams,
pgprotoDebug: rcfg.PgprotoDebug,
execute: nil,
savedPortalDesc: map[string]PortalDesc{},
parseCache: map[string]ParseCacheEntry{},
activeShards: nil,
txStatus: txstatus.TXIDLE,
msgBuf: nil,
traceMsgs: false,
Qr: qr,
Cl: client,
poolMgr: manager,
maintain_params: rcfg.MaintainParams,
execute: nil,
savedPortalDesc: map[string]PortalDesc{},
parseCache: map[string]ParseCacheEntry{},
}
}

Expand All @@ -217,10 +200,6 @@ func (rst *RelayStateImpl) SetTxStatus(status txstatus.TXStatus) {
rst.txStatus = status
}

func (rst *RelayStateImpl) PgprotoDebug() bool {
return rst.pgprotoDebug
}

func (rst *RelayStateImpl) Client() client.RouterClient {
return rst.Cl
}
Expand Down Expand Up @@ -311,10 +290,6 @@ func (rst *RelayStateImpl) PrepareStatement(hash uint64, d *prepstatement.Prepar
return rd, retMsg, nil
}

func (rst *RelayStateImpl) RouterMode() config.RouterMode {
return rst.routerMode
}

func (rst *RelayStateImpl) Close() error {
defer rst.Cl.Close()
defer rst.ActiveShardsReset()
Expand Down Expand Up @@ -343,7 +318,6 @@ func (rst *RelayStateImpl) Reset() error {
return rst.Cl.Unroute()
}

// TODO : unit tests
func (rst *RelayStateImpl) StartTrace() {
rst.traceMsgs = true
}
Expand Down Expand Up @@ -376,7 +350,7 @@ func (rst *RelayStateImpl) procRoutes(routes []*routingstate.DataShardRoute) err
rst.activeShards = append(rst.activeShards, shr.Shkey)
}

if rst.PgprotoDebug() {
if config.RouterConfig().PgprotoDebug {
if err := rst.Cl.ReplyDebugNoticef("matched datashard routes %+v", routes); err != nil {
return err
}
Expand Down Expand Up @@ -1243,7 +1217,7 @@ func (rst *RelayStateImpl) ProcessExtendedBuffer(cmngr poolmgr.PoolMgr) error {
Uint("client", rst.Client().ID()).
Msg("Parsing prepared statement")

if rst.PgprotoDebug() {
if config.RouterConfig().PgprotoDebug {
if err := rst.Client().ReplyDebugNoticef("name %v, query %v, hash %d", q.Name, q.Query, hash); err != nil {
return err
}
Expand Down Expand Up @@ -1746,7 +1720,3 @@ func (rst *RelayStateImpl) ProcessMessage(

return rst.CompleteRelay(replyCl)
}

func (rst *RelayStateImpl) ConnMgr() poolmgr.PoolMgr {
return rst.poolMgr
}
Loading