Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Prepared statements with multishard DDL #958

Merged
merged 21 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/datashard/datashard.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/pkg/conn"
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/models/spqrerror"
"github.com/pg-sharding/spqr/pkg/prepstatement"
"github.com/pg-sharding/spqr/pkg/shard"
"github.com/pg-sharding/spqr/pkg/spqrlog"
Expand Down Expand Up @@ -556,7 +557,10 @@ func (sh *Conn) TxStatus() txstatus.TXStatus {
// Returns:
// - bool: true if the prepared statement exists, false otherwise.
// - *shard.PreparedStatementDescriptor: the prepared statement descriptor, or nil if it does not exist.
func (srv *Conn) HasPrepareStatement(hash uint64) (bool, *prepstatement.PreparedStatementDescriptor) {
func (srv *Conn) HasPrepareStatement(hash uint64, shardId uint) (bool, *prepstatement.PreparedStatementDescriptor) {
if shardId != srv.ID() {
return false, nil
}
rd, ok := srv.stmtDesc[hash]
return ok, rd
}
Expand All @@ -571,9 +575,14 @@ func (srv *Conn) HasPrepareStatement(hash uint64) (bool, *prepstatement.Prepared
//
// Returns:
// - None.
func (srv *Conn) StorePrepareStatement(hash uint64, def *prepstatement.PreparedStatementDefinition, rd *prepstatement.PreparedStatementDescriptor) {
func (srv *Conn) StorePrepareStatement(hash uint64, shardId uint, def *prepstatement.PreparedStatementDefinition, rd *prepstatement.PreparedStatementDescriptor) error {
id := srv.ID()
if shardId != id {
return spqrerror.Newf(spqrerror.SPQR_ROUTING_ERROR, "Cannot store stmt for shard \"%d\" in shard \"%d\"", shardId, id)
}
srv.stmtDef[hash] = def
srv.stmtDesc[hash] = rd
return nil
}

// AuthRule returns the backend auth configuration of the Conn object.
Expand Down
18 changes: 10 additions & 8 deletions pkg/mock/shard/mock_shard.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/prepstatement/prepstatement.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type PreparedStatementDescriptor struct {
}

type PreparedStatementHolder interface {
HasPrepareStatement(hash uint64) (bool, *PreparedStatementDescriptor)
StorePrepareStatement(hash uint64, d *PreparedStatementDefinition, rd *PreparedStatementDescriptor)
HasPrepareStatement(hash uint64, shardId uint) (bool, *PreparedStatementDescriptor)
StorePrepareStatement(hash uint64, shardId uint, d *PreparedStatementDefinition, rd *PreparedStatementDescriptor) error
}

type PreparedStatementMapper interface {
Expand Down
10 changes: 3 additions & 7 deletions router/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,7 @@ func TestFrontendXProto(t *testing.T) {

/* query Router */

qr.EXPECT().DataShardsRoutes().AnyTimes().Return([]*kr.ShardKey{
&kr.ShardKey{
Name: "sh1",
},
})
qr.EXPECT().DataShardsRoutes().AnyTimes().Return([]*kr.ShardKey{{Name: "sh1"}})

cl.EXPECT().Server().AnyTimes().Return(srv)
cl.EXPECT().MaintainParams().AnyTimes().Return(false)
Expand Down Expand Up @@ -271,8 +267,8 @@ func TestFrontendXProto(t *testing.T) {
res := false
rd := &prepstatement.PreparedStatementDescriptor{}

srv.EXPECT().HasPrepareStatement(gomock.Any()).DoAndReturn(func(interface{}) (interface{}, interface{}) { return res, rd }).AnyTimes()
srv.EXPECT().StorePrepareStatement(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(interface{}, interface{}, interface{}) {
srv.EXPECT().HasPrepareStatement(gomock.Any(), gomock.Any()).DoAndReturn(func(interface{}, interface{}) (interface{}, interface{}) { return res, rd }).AnyTimes()
srv.EXPECT().StorePrepareStatement(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(interface{}, interface{}, interface{}, interface{}) {
res = true
rd.ParamDesc = &pgproto3.ParameterDescription{}
rd.RowDesc = &pgproto3.RowDescription{}
Expand Down
47 changes: 39 additions & 8 deletions router/mock/server/mock_server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading