Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Futher split Qrouter logic into reusable parts #973

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions router/plan/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package plan

import (
"bytes"
"encoding/binary"
"strconv"

"github.com/pg-sharding/spqr/qdb"
"github.com/pg-sharding/spqr/router/xproto"
)

func ParseResolveParamValue(paramCode int16, ind int, tp string, bindParams [][]byte) (interface{}, bool) {

switch paramCode {
case xproto.FormatCodeBinary:
switch tp {
case qdb.ColumnTypeVarcharDeprecated:
fallthrough
case qdb.ColumnTypeVarcharHashed:
fallthrough
case qdb.ColumnTypeVarchar:
return []interface{}{string(bindParams[ind])}, true
case qdb.ColumnTypeInteger:

var num int64
var err error

buf := bytes.NewBuffer(bindParams[ind])

if len(bindParams[ind]) == 4 {
var tmpnum int32
err = binary.Read(buf, binary.BigEndian, &tmpnum)
num = int64(tmpnum)
} else {
err = binary.Read(buf, binary.BigEndian, &num)
}
if err != nil {
return nil, false
}

return num, true
case qdb.ColumnTypeUinteger:

var num uint64
var err error

buf := bytes.NewBuffer(bindParams[ind])

if len(bindParams[ind]) == 4 {
var tmpnum uint32
err = binary.Read(buf, binary.BigEndian, &tmpnum)
num = uint64(tmpnum)
} else {
err = binary.Read(buf, binary.BigEndian, &num)
}
if err != nil {
return nil, false
}

return num, true
}
case xproto.FormatCodeText:
switch tp {
case qdb.ColumnTypeVarcharDeprecated:
fallthrough
case qdb.ColumnTypeVarcharHashed:
fallthrough
case qdb.ColumnTypeVarchar:
return []interface{}{string(bindParams[ind])}, true
case qdb.ColumnTypeInteger:
num, err := strconv.ParseInt(string(bindParams[ind]), 10, 64)
if err != nil {
return nil, false
}
return num, true
case qdb.ColumnTypeUinteger:
num, err := strconv.ParseUint(string(bindParams[ind]), 10, 64)
if err != nil {
return nil, false
}
return num, true
}
default:
// ??? protoc violation

}

return nil, false
}
2 changes: 1 addition & 1 deletion router/planner/planner.go
Original file line number Diff line number Diff line change
@@ -126,7 +126,7 @@ func PlanDistributedQuery(ctx context.Context, rm *rmeta.RoutingMetadataContext,
SchemaName: q.SchemaName,
}

if ds, err := rm.GetRelationDistribution(ctx, rm.Mgr, rfqn); err != nil {
if ds, err := rm.GetRelationDistribution(ctx, rfqn); err != nil {
return nil, rerrors.ErrComplexQuery
} else if ds.Id != distributions.REPLICATED {
return nil, rerrors.ErrComplexQuery
113 changes: 9 additions & 104 deletions router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package qrouter

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"strconv"
"strings"
@@ -39,28 +37,6 @@ func (qr *ProxyQrouter) DeparseExprShardingEntries(expr lyx.Node, meta *rmeta.Ro
}
}

func (qr *ProxyQrouter) GetDistributionKeyOffsetType(meta *rmeta.RoutingMetadataContext, resolvedRelation rfqn.RelationFQN, colname string) (int, string) {
/* do not process non-distributed relations or columns not from relation distribution key */

ds, err := meta.GetRelationDistribution(context.TODO(), qr.Mgr(), resolvedRelation)
if err != nil {
return -1, ""
} else if ds.Id == distributions.REPLICATED {
return -1, ""
}
// TODO: optimize
relation, exists := ds.Relations[resolvedRelation.RelationName]
if !exists {
return -1, ""
}
for ind, c := range relation.DistributionKey {
if c.Column == colname {
return ind, ds.ColTypes[ind]
}
}
return -1, ""
}

func (qr *ProxyQrouter) processConstExpr(alias, colname string, expr lyx.Node, meta *rmeta.RoutingMetadataContext) error {
resolvedRelation, err := meta.ResolveRelationByAlias(alias)
if err != nil {
@@ -72,7 +48,7 @@ func (qr *ProxyQrouter) processConstExpr(alias, colname string, expr lyx.Node, m
}

func (qr *ProxyQrouter) processConstExprOnRFQN(resolvedRelation rfqn.RelationFQN, colname string, exprs []lyx.Node, meta *rmeta.RoutingMetadataContext) error {
off, tp := qr.GetDistributionKeyOffsetType(meta, resolvedRelation, colname)
off, tp := meta.GetDistributionKeyOffsetType(resolvedRelation, colname)
if off == -1 {
// column not from distr key
return nil
@@ -288,7 +264,7 @@ func (qr *ProxyQrouter) deparseFromNode(ctx context.Context, node lyx.FromClause
return nil
}

if _, err := meta.GetRelationDistribution(ctx, qr.Mgr(), rqdn); err != nil {
if _, err := meta.GetRelationDistribution(ctx, rqdn); err != nil {
return err
}

@@ -372,7 +348,7 @@ func (qr *ProxyQrouter) processInsertFromSelectOffsets(ctx context.Context, stmt
var ds *distributions.Distribution
var err error

if ds, err = meta.GetRelationDistribution(ctx, qr.Mgr(), curr_rfqn); err != nil {
if ds, err = meta.GetRelationDistribution(ctx, curr_rfqn); err != nil {
return nil, rfqn.RelationFQN{}, nil, err
}

@@ -522,7 +498,7 @@ func (qr *ProxyQrouter) planQueryV1(

rqdn := rfqn.RelationFQNFromRangeRangeVar(q)

if d, err := meta.GetRelationDistribution(ctx, qr.Mgr(), rqdn); err != nil {
if d, err := meta.GetRelationDistribution(ctx, rqdn); err != nil {
return nil, err
} else if d.Id == distributions.REPLICATED {
if meta.SPH.EnhancedMultiShardProcessing() {
@@ -547,7 +523,7 @@ func (qr *ProxyQrouter) planQueryV1(

rqdn := rfqn.RelationFQNFromRangeRangeVar(q)

if d, err := meta.GetRelationDistribution(ctx, qr.Mgr(), rqdn); err != nil {
if d, err := meta.GetRelationDistribution(ctx, rqdn); err != nil {
return nil, err
} else if d.Id == distributions.REPLICATED {
if meta.SPH.EnhancedMultiShardProcessing() {
@@ -638,7 +614,7 @@ func (qr *ProxyQrouter) resolveValue(meta *rmeta.RoutingMetadataContext, rfqn rf
return nil, false
}

off, tp := qr.GetDistributionKeyOffsetType(meta, rfqn, col)
off, tp := meta.GetDistributionKeyOffsetType(rfqn, col)
if off == -1 {
// column not from distr key
return nil, false
@@ -649,80 +625,9 @@ func (qr *ProxyQrouter) resolveValue(meta *rmeta.RoutingMetadataContext, rfqn rf
ind := inds[0]
fc := paramResCodes[ind]

switch fc {
case xproto.FormatCodeBinary:
switch tp {
case qdb.ColumnTypeVarcharDeprecated:
fallthrough
case qdb.ColumnTypeVarcharHashed:
fallthrough
case qdb.ColumnTypeVarchar:
return []interface{}{string(bindParams[ind])}, true
case qdb.ColumnTypeInteger:

var num int64
var err error

buf := bytes.NewBuffer(bindParams[ind])

if len(bindParams[ind]) == 4 {
var tmpnum int32
err = binary.Read(buf, binary.BigEndian, &tmpnum)
num = int64(tmpnum)
} else {
err = binary.Read(buf, binary.BigEndian, &num)
}
if err != nil {
return nil, false
}

return []interface{}{num}, true
case qdb.ColumnTypeUinteger:

var num uint64
var err error

buf := bytes.NewBuffer(bindParams[ind])

if len(bindParams[ind]) == 4 {
var tmpnum uint32
err = binary.Read(buf, binary.BigEndian, &tmpnum)
num = uint64(tmpnum)
} else {
err = binary.Read(buf, binary.BigEndian, &num)
}
if err != nil {
return nil, false
}

return []interface{}{num}, true
}
case xproto.FormatCodeText:
switch tp {
case qdb.ColumnTypeVarcharDeprecated:
fallthrough
case qdb.ColumnTypeVarcharHashed:
fallthrough
case qdb.ColumnTypeVarchar:
return []interface{}{string(bindParams[ind])}, true
case qdb.ColumnTypeInteger:
num, err := strconv.ParseInt(string(bindParams[ind]), 10, 64)
if err != nil {
return nil, false
}
return []interface{}{num}, true
case qdb.ColumnTypeUinteger:
num, err := strconv.ParseUint(string(bindParams[ind]), 10, 64)
if err != nil {
return nil, false
}
return []interface{}{num}, true
}
default:
// ??? protoc violation
}
singleVal, res := plan.ParseResolveParamValue(fc, ind, tp, bindParams)

return nil, false
return []interface{}{singleVal}, res
}

// Returns state, is read-only flag and err if any
@@ -976,7 +881,7 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, rm *rmeta.RoutingMet

for rfqn := range rm.Rels {
// TODO: check by whole RFQN
ds, err := rm.GetRelationDistribution(ctx, qr.Mgr(), rfqn)
ds, err := rm.GetRelationDistribution(ctx, rfqn)
if err != nil {
return nil, false, err
} else if ds.Id == distributions.REPLICATED {
26 changes: 24 additions & 2 deletions router/rmeta/rmeta.go
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ var CatalogDistribution = distributions.Distribution{
ColTypes: nil,
}

func (rm *RoutingMetadataContext) GetRelationDistribution(ctx context.Context, mgr meta.EntityMgr, resolvedRelation rfqn.RelationFQN) (*distributions.Distribution, error) {
func (rm *RoutingMetadataContext) GetRelationDistribution(ctx context.Context, resolvedRelation rfqn.RelationFQN) (*distributions.Distribution, error) {
if res, ok := rm.Distributions[resolvedRelation]; ok {
return res, nil
}
@@ -77,7 +77,7 @@ func (rm *RoutingMetadataContext) GetRelationDistribution(ctx context.Context, m
return &CatalogDistribution, nil
}

ds, err := mgr.GetRelationDistribution(ctx, resolvedRelation.RelationName)
ds, err := rm.Mgr.GetRelationDistribution(ctx, resolvedRelation.RelationName)

if err != nil {
return nil, err
@@ -222,3 +222,25 @@ func (rm *RoutingMetadataContext) ResolveRouteHint() (routehint.RouteHint, error

return &routehint.EmptyRouteHint{}, nil
}

func (rm *RoutingMetadataContext) GetDistributionKeyOffsetType(resolvedRelation rfqn.RelationFQN, colname string) (int, string) {
/* do not process non-distributed relations or columns not from relation distribution key */

ds, err := rm.GetRelationDistribution(context.TODO(), resolvedRelation)
if err != nil {
return -1, ""
} else if ds.Id == distributions.REPLICATED {
return -1, ""
}
// TODO: optimize
relation, exists := ds.Relations[resolvedRelation.RelationName]
if !exists {
return -1, ""
}
for ind, c := range relation.DistributionKey {
if c.Column == colname {
return ind, ds.ColTypes[ind]
}
}
return -1, ""
}
Loading