@@ -3,6 +3,7 @@ package provider
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "github.com/pg-sharding/spqr/pkg/models/spqrerror"
6
7
"net"
7
8
"time"
8
9
@@ -97,19 +98,19 @@ func (ci grpcConnectionIterator) ClientPoolForeach(cb func(client client.ClientI
97
98
// TODO : implement
98
99
// TODO : unit tests
99
100
func (ci grpcConnectionIterator ) Put (client client.Client ) error {
100
- return fmt . Errorf ( "grpcConnectionIterator put not implemented" )
101
+ return spqrerror . New ( spqrerror . SPQR_NOT_IMPLEMENTED , "grpcConnectionIterator put not implemented" )
101
102
}
102
103
103
104
// TODO : implement
104
105
// TODO : unit tests
105
106
func (ci grpcConnectionIterator ) Pop (id uint ) (bool , error ) {
106
- return true , fmt . Errorf ( "grpcConnectionIterator pop not implemented" )
107
+ return true , spqrerror . New ( spqrerror . SPQR_NOT_IMPLEMENTED , "grpcConnectionIterator pop not implemented" )
107
108
}
108
109
109
110
// TODO : implement
110
111
// TODO : unit tests
111
112
func (ci grpcConnectionIterator ) Shutdown () error {
112
- return fmt . Errorf ( "grpcConnectionIterator shutdown not implemented" )
113
+ return spqrerror . New ( spqrerror . SPQR_NOT_IMPLEMENTED , "grpcConnectionIterator shutdown not implemented" )
113
114
}
114
115
115
116
// TODO : unit tests
@@ -374,7 +375,7 @@ func (qc *qdbCoordinator) traverseRouters(ctx context.Context, cb func(cc *grpc.
374
375
for _ , rtr := range rtrs {
375
376
if err := func () error {
376
377
if rtr .State != qdb .OPENED {
377
- return fmt . Errorf ( "router is closed" )
378
+ return spqrerror . New ( spqrerror . SPQR_ROUTER_ERROR , "router is closed" )
378
379
}
379
380
380
381
// TODO: run cb`s async
@@ -665,7 +666,7 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro
665
666
Msg ("split request is" )
666
667
667
668
if _ , err := qc .db .GetKeyRange (ctx , req .Krid ); err == nil {
668
- return fmt . Errorf ( "key range %v already present in qdb" , req .Krid )
669
+ return spqrerror . Newf ( spqrerror . SPQR_KEYRANGE_ERROR , "key range %v already present in qdb" , req .Krid )
669
670
}
670
671
671
672
krOld , err := qc .db .LockKeyRange (ctx , req .SourceID )
@@ -680,10 +681,10 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro
680
681
}()
681
682
682
683
if kr .CmpRangesEqual (req .Bound , krOld .LowerBound ) || kr .CmpRangesEqual (req .Bound , krOld .UpperBound ) {
683
- return fmt . Errorf ( "failed to split because bound equals lower or upper bound of the key range" )
684
+ return spqrerror . New ( spqrerror . SPQR_KEYRANGE_ERROR , "failed to split because bound equals lower or upper bound of the key range" )
684
685
}
685
686
if kr .CmpRangesLess (req .Bound , krOld .LowerBound ) || ! kr .CmpRangesLess (req .Bound , krOld .UpperBound ) {
686
- return fmt . Errorf ( "failed to split because bound is out of key range" )
687
+ return spqrerror . New ( spqrerror . SPQR_KEYRANGE_ERROR , "failed to split because bound is out of key range" )
687
688
}
688
689
689
690
krNew := kr .KeyRangeFromDB (
@@ -709,7 +710,7 @@ func (qc *qdbCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) erro
709
710
}
710
711
711
712
if err := ops .AddKeyRangeWithChecks (ctx , qc .db , krNew ); err != nil {
712
- return fmt . Errorf ( "failed to add a new key range: %w " , err )
713
+ return spqrerror . Newf ( spqrerror . SPQR_KEYRANGE_ERROR , "failed to add a new key range: %s " , err . Error () )
713
714
}
714
715
715
716
if err := qc .traverseRouters (ctx , func (cc * grpc.ClientConn ) error {
@@ -819,23 +820,23 @@ func (qc *qdbCoordinator) Unite(ctx context.Context, uniteKeyRange *kr.UniteKeyR
819
820
}()
820
821
821
822
if krLeft .ShardID != krRight .ShardID {
822
- return fmt . Errorf ( "failed to unite key ranges routing different shards" )
823
+ return spqrerror . New ( spqrerror . SPQR_KEYRANGE_ERROR , "failed to unite key ranges routing different shards" )
823
824
}
824
825
if ! kr .CmpRangesEqual (krLeft .UpperBound , krRight .LowerBound ) {
825
826
if ! kr .CmpRangesEqual (krLeft .LowerBound , krRight .UpperBound ) {
826
- return fmt . Errorf ( "failed to unite non-adjacent key ranges" )
827
+ return spqrerror . New ( spqrerror . SPQR_KEYRANGE_ERROR , "failed to unite non-adjacent key ranges" )
827
828
}
828
829
krLeft , krRight = krRight , krLeft
829
830
}
830
831
831
832
krLeft .UpperBound = krRight .UpperBound
832
833
833
834
if err := qc .db .DropKeyRange (ctx , krRight .KeyRangeID ); err != nil {
834
- return fmt . Errorf ( "failed to drop an old key range: %w " , err )
835
+ return spqrerror . Newf ( spqrerror . SPQR_KEYRANGE_ERROR , "failed to drop an old key range: %s " , err . Error () )
835
836
}
836
837
837
838
if err := ops .ModifyKeyRangeWithChecks (ctx , qc .db , kr .KeyRangeFromDB (krLeft )); err != nil {
838
- return fmt . Errorf ( "failed to update a new key range: %w " , err )
839
+ return spqrerror . Newf ( spqrerror . SPQR_KEYRANGE_ERROR , "failed to update a new key range: %s " , err . Error () )
839
840
}
840
841
841
842
if err := qc .traverseRouters (ctx , func (cc * grpc.ClientConn ) error {
@@ -1054,13 +1055,13 @@ func (qc *qdbCoordinator) RegisterRouter(ctx context.Context, r *topology.Router
1054
1055
// ping router
1055
1056
conn , err := DialRouter (r )
1056
1057
if err != nil {
1057
- return fmt . Errorf ( "failed to ping router: %s" , err )
1058
+ return spqrerror . Newf ( spqrerror . SPQR_CONNECTION_ERROR , "failed to ping router: %s" , err )
1058
1059
}
1059
1060
defer conn .Close ()
1060
1061
cl := routerproto .NewTopologyServiceClient (conn )
1061
1062
_ , err = cl .GetRouterStatus (ctx , & routerproto.GetRouterStatusRequest {})
1062
1063
if err != nil {
1063
- return fmt . Errorf ( "failed to ping router: %s" , err )
1064
+ return spqrerror . Newf ( spqrerror . SPQR_CONNECTION_ERROR , "failed to ping router: %s" , err )
1064
1065
}
1065
1066
1066
1067
return qc .db .AddRouter (ctx , qdb .NewRouter (r .Address , r .ID , qdb .OPENED ))
@@ -1157,7 +1158,7 @@ func (qc *qdbCoordinator) ProcClient(ctx context.Context, nconn net.Conn) error
1157
1158
spqrlog .Zero .Debug ().Msg ("processed OK" )
1158
1159
}
1159
1160
default :
1160
- return cli . ReportError ( fmt . Errorf ( "unsupported msg type %T" , msg ) )
1161
+ return spqrerror . Newf ( spqrerror . SPQR_COMPLEX_QUERY , "unsupported msg type %T" , msg )
1161
1162
}
1162
1163
}
1163
1164
}
0 commit comments