Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Small prelimitary refactorings #989

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions router/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Combine(p1, p2 Plan) Plan {
switch shq1 := p1.(type) {
case ScatterPlan:
return p1
case RandomMatchState:
case RandomDispatchPlan:
return p2
case ReferenceRelationState:
return p2
Expand Down Expand Up @@ -83,7 +83,11 @@ type SkipRoutingState struct {
Plan
}

type RandomMatchState struct {
type RandomDispatchPlan struct {
Plan
}

type VirtualPlan struct {
Plan
}

Expand Down
4 changes: 2 additions & 2 deletions router/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ func PlanDistributedQuery(ctx context.Context, rm *rmeta.RoutingMetadataContext,
/*
* SET x = y etc., do not dispatch any statement to shards, just process this in router
*/
return plan.RandomMatchState{}, nil
return plan.RandomDispatchPlan{}, nil

case *lyx.VariableShowStmt:
/*
if we want to reroute to execute this stmt, route to random shard
XXX: support intelegent show support, without direct query dispatch
*/
return plan.RandomMatchState{}, nil
return plan.RandomDispatchPlan{}, nil

// XXX: need alter table which renames sharding column to non-sharding column check
case *lyx.CreateTable:
Expand Down
22 changes: 12 additions & 10 deletions router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (qr *ProxyQrouter) resolveValue(meta *rmeta.RoutingMetadataContext, rfqn rf
func (qr *ProxyQrouter) routeWithRules(ctx context.Context, rm *rmeta.RoutingMetadataContext, stmt lyx.Node) (plan.Plan, bool, error) {
if stmt == nil {
// empty statement
return plan.RandomMatchState{}, false, nil
return plan.RandomDispatchPlan{}, false, nil
}

rh, err := rm.ResolveRouteHint()
Expand Down Expand Up @@ -622,14 +622,14 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, rm *rmeta.RoutingMet
/*
* SET x = y etc., do not dispatch any statement to shards, just process this in router
*/
return plan.RandomMatchState{}, true, nil
return plan.RandomDispatchPlan{}, true, nil

case *lyx.VariableShowStmt:
/*
if we want to reroute to execute this stmt, route to random shard
XXX: support intelegent show support, without direct query dispatch
*/
return plan.RandomMatchState{}, true, nil
return plan.RandomDispatchPlan{}, true, nil

// XXX: need alter table which renames sharding column to non-sharding column check
case *lyx.CreateTable:
Expand Down Expand Up @@ -710,17 +710,19 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, rm *rmeta.RoutingMet
switch e := expr.(type) {
/* Special cases for SELECT current_schema(), SELECT set_config(...), and SELECT pg_is_in_recovery() */
case *lyx.FuncApplication:

/* for queries, that need to access data on shard, ignore these "virtual" func. */
if e.Name == "current_schema" || e.Name == "set_config" || e.Name == "pg_is_in_recovery" || e.Name == "version" {
return plan.RandomMatchState{}, ro, nil
return plan.RandomDispatchPlan{}, ro, nil
}
/* Expression like SELECT 1, SELECT 'a', SELECT 1.0, SELECT true, SELECT false */
case *lyx.AExprIConst, *lyx.AExprSConst, *lyx.AExprNConst, *lyx.AExprBConst:
return plan.RandomMatchState{}, ro, nil
return plan.RandomDispatchPlan{}, ro, nil

/* Special case for SELECT current_schema */
case *lyx.ColumnRef:
if e.ColName == "current_schema" {
return plan.RandomMatchState{}, ro, nil
return plan.RandomDispatchPlan{}, ro, nil
}
}
}
Expand Down Expand Up @@ -761,13 +763,13 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, rm *rmeta.RoutingMet
}

if onlyCatalog && anyCatalog {
return plan.RandomMatchState{}, ro, nil
return plan.RandomDispatchPlan{}, ro, nil
}
if hasInfSchema && hasOtherSchema {
return nil, false, rerrors.ErrInformationSchemaCombinedQuery
}
if hasInfSchema {
return plan.RandomMatchState{}, ro, nil
return plan.RandomDispatchPlan{}, ro, nil
}

if deparseError != nil {
Expand Down Expand Up @@ -905,11 +907,11 @@ func (qr *ProxyQrouter) Route(ctx context.Context, stmt lyx.Node, sph session.Se
switch v := route.(type) {
case plan.ShardMatchState:
return v, nil
case plan.RandomMatchState:
case plan.RandomDispatchPlan:
return v, nil
case plan.ReferenceRelationState:
/* check for unroutable here - TODO */
return plan.RandomMatchState{}, nil
return plan.RandomDispatchPlan{}, nil
case plan.DDLState:
return v, nil
case plan.CopyState:
Expand Down
44 changes: 22 additions & 22 deletions router/qrouter/proxy_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func TestMultiShardRouting(t *testing.T) {
},
{
query: "select 42;",
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "select current_schema;",
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "select current_schema();",
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
Expand All @@ -105,18 +105,18 @@ func TestMultiShardRouting(t *testing.T) {
},
{
query: "SELECT * FROM pg_catalog.pg_type",
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},

{
query: "SELECT * FROM pg_class",
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: `SELECT count(*) FROM information_schema.tables WHERE table_schema = CURRENT_SCHEMA() AND table_name = 'people' AND table_type = 'BASE TABLE'`,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
},
} {
parserRes, err := lyx.Parse(tt.query)
Expand Down Expand Up @@ -1417,19 +1417,19 @@ func TestSetStmt(t *testing.T) {
{
query: "SET extra_float_digits = 3",
distribution: distribution1,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SET application_name = 'jiofewjijiojioji';",
distribution: distribution2,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SHOW TRANSACTION ISOLATION LEVEL;",
distribution: distribution1,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
} {
Expand Down Expand Up @@ -1520,14 +1520,14 @@ func TestRouteWithRules_Select(t *testing.T) {
{
query: "SELECT * FROM information_schema.columns;",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},

{
query: "SELECT * FROM information_schema.sequences;",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
Expand All @@ -1545,61 +1545,61 @@ func TestRouteWithRules_Select(t *testing.T) {
{
query: "SELECT * FROM pg_class JOIN users ON true;",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT * FROM pg_tables WHERE schemaname = 'information_schema'",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT current_schema;",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT current_schema();",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT pg_is_in_recovery();",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT set_config('log_statement_stats', 'off', false);",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT version()",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT 1;",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT true;",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
query: "SELECT 'Hello, world!'",
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},
{
Expand Down Expand Up @@ -1628,7 +1628,7 @@ SELECT NULL::pg_catalog.text, n.nspname FROM pg_catalog.pg_namespace n WHERE n.n
LIMIT 1000
`,
distribution: distribution.ID,
exp: plan.RandomMatchState{},
exp: plan.RandomDispatchPlan{},
err: nil,
},

Expand Down
8 changes: 2 additions & 6 deletions router/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func (rst *RelayStateImpl) Reroute() error {
return rst.procRoutes([]*kr.ShardKey{v.Route})
case plan.SkipRoutingState:
return ErrSkipQuery
case plan.RandomMatchState:
case plan.RandomDispatchPlan:
return rst.RerouteToRandomRoute()
default:
return fmt.Errorf("unexpected query plan %T", v)
Expand Down Expand Up @@ -1493,12 +1493,8 @@ func (rst *RelayStateImpl) ProcessExtendedBuffer() error {
rst.AddQuery(&pgproto3.Execute{})

rst.AddQuery(&pgproto3.Sync{})
if err := rst.RelayFlush(true, true); err != nil {
return err
}

// do not complete relay here yet
return nil
return rst.RelayFlush(true, true)
}

return nil
Expand Down
Loading