diff --git a/pkg/pool/dbpool.go b/pkg/pool/dbpool.go index 6fb46af3e..a67fde779 100644 --- a/pkg/pool/dbpool.go +++ b/pkg/pool/dbpool.go @@ -12,6 +12,7 @@ import ( "github.com/pg-sharding/spqr/pkg/shard" "github.com/pg-sharding/spqr/pkg/spqrlog" "github.com/pg-sharding/spqr/pkg/tsa" + "github.com/pg-sharding/spqr/pkg/txstatus" ) type InstancePoolImpl struct { @@ -178,6 +179,13 @@ func (s *InstancePoolImpl) Put(sh shard.Shard) error { Msg("discarding unsync connection") return s.pool.Discard(sh) } + if sh.TxStatus() != txstatus.TXIDLE { + spqrlog.Zero.Error(). + Uint("shard", spqrlog.GetPointer(sh)). + Str("txstatus", sh.TxStatus().String()). + Msg("discarding non-idle connection") + return s.pool.Discard(sh) + } return s.pool.Put(sh) } diff --git a/router/qrouter/proxy_routing.go b/router/qrouter/proxy_routing.go index 56f4055e7..5b695722a 100644 --- a/router/qrouter/proxy_routing.go +++ b/router/qrouter/proxy_routing.go @@ -213,6 +213,25 @@ func (qr *ProxyQrouter) RouteKeyWithRanges(ctx context.Context, expr lyx.Node, m } } +func (meta *RoutingMetadataContext) RecordShardingColumnValue(alias, colname, value string) { + if !meta.CheckColumnRls(colname) { + spqrlog.Zero.Debug(). + Str("colname", colname). + Msg("skip column due no rule mathing") + return + } + + resolvedRelation, err := meta.ResolveRelationByAlias(alias) + if err != nil { + // failed to relove relation, skip column + meta.unparsed_columns[colname] = struct{}{} + return + } + + // will not work not ints + meta.RecordConstExpr(resolvedRelation, colname, value) +} + // TODO : unit tests // deparse sharding column-value pair from query Where clause func (qr *ProxyQrouter) routeByClause(ctx context.Context, expr lyx.Node, meta *RoutingMetadataContext) error { @@ -232,48 +251,31 @@ func (qr *ProxyQrouter) routeByClause(ctx context.Context, expr lyx.Node, meta * alias, colname := lft.TableAlias, lft.ColName - if !meta.CheckColumnRls(colname) { - spqrlog.Zero.Debug(). - Str("colname", colname). - Msg("skip column due no rule mathing") - continue - } - - resolvedRelation, err := meta.ResolveRelationByAlias(alias) - if err != nil { - // failed to relove relation, skip column - meta.unparsed_columns[colname] = struct{}{} - continue - } - /* simple key-value pair */ switch rght := texpr.Right.(type) { case *lyx.ParamRef: - if rght.Number > len(meta.params) { - return ComplexQuery + if rght.Number <= len(meta.params) { + meta.RecordShardingColumnValue(alias, colname, string(meta.params[rght.Number-1])) } - - // will not work not ints - meta.RecordConstExpr(resolvedRelation, colname, string(meta.params[rght.Number-1])) - + // else error out? case *lyx.AExprSConst: // TBD: postpone routing from here to root of parsing tree - meta.RecordConstExpr(resolvedRelation, colname, rght.Value) + meta.RecordShardingColumnValue(alias, colname, rght.Value) case *lyx.AExprIConst: // TBD: postpone routing from here to root of parsing tree // maybe expimely inefficient. Will be fixed in SPQR-2.0 - meta.RecordConstExpr(resolvedRelation, colname, fmt.Sprintf("%d", rght.Value)) + meta.RecordShardingColumnValue(alias, colname, fmt.Sprintf("%d", rght.Value)) case *lyx.AExprList: if len(rght.List) != 0 { expr := rght.List[0] switch bexpr := expr.(type) { case *lyx.AExprSConst: // TBD: postpone routing from here to root of parsing tree - meta.RecordConstExpr(resolvedRelation, colname, bexpr.Value) + meta.RecordShardingColumnValue(alias, colname, bexpr.Value) case *lyx.AExprIConst: // TBD: postpone routing from here to root of parsing tree // maybe expimely inefficient. Will be fixed in SPQR-2.0 - meta.RecordConstExpr(resolvedRelation, colname, fmt.Sprintf("%d", bexpr.Value)) + meta.RecordShardingColumnValue(alias, colname, fmt.Sprintf("%d", bexpr.Value)) } } case *lyx.FuncApplication: @@ -342,8 +344,8 @@ func (qr *ProxyQrouter) DeparseSelectStmt(ctx context.Context, selectStmt lyx.No /* SELECT * FROM VALUES() ... */ case *lyx.ValueClause: - // TODO: process this meta.ValuesLists = s.Values + return nil } return ComplexQuery diff --git a/router/qrouter/proxy_routing_test.go b/router/qrouter/proxy_routing_test.go index 1f955a3ca..066a0a6a6 100644 --- a/router/qrouter/proxy_routing_test.go +++ b/router/qrouter/proxy_routing_test.go @@ -300,6 +300,34 @@ func TestSingleShard(t *testing.T) { assert.NoError(err) for _, tt := range []tcase{ + + { + query: ` + DELETE + FROM t + WHERE + j = + any(array(select * from t where i <= 2)) + /* __spqr__default_route_behaviour: BLOCK */ returning *; + `, + err: nil, + exp: routingstate.ShardMatchState{ + Route: &routingstate.DataShardRoute{ + Shkey: kr.ShardKey{ + Name: "sh1", + }, + Matchedkr: &kr.KeyRange{ + ShardID: "sh1", + ID: "id1", + Dataspace: dataspace, + LowerBound: []byte("1"), + UpperBound: []byte("11"), + }, + }, + TargetSessionAttrs: "any", + }, + }, + { query: ` DELETE