From 0f981e8572ed6c1a99eef3fcb08c6888a7478820 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 21 Oct 2023 20:12:07 +0800 Subject: [PATCH 1/3] update lock on trans --- dtmsvr/storage/sql/sql.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/dtmsvr/storage/sql/sql.go b/dtmsvr/storage/sql/sql.go index cf8583c8..8e444bc5 100644 --- a/dtmsvr/storage/sql/sql.go +++ b/dtmsvr/storage/sql/sql.go @@ -8,6 +8,8 @@ package sql import ( + "database/sql" + "errors" "fmt" "math" "time" @@ -159,23 +161,26 @@ func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalS db := dbGet() owner := shortuuid.New() nextCronTime := getTimeStr(int64(expireIn / time.Second)) - where := map[string]string{ - dtmimp.DBTypeMysql: fmt.Sprintf(`next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted') limit 1`, nextCronTime), - dtmimp.DBTypePostgres: fmt.Sprintf(`id in (select id from trans_global where next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted') limit 1 )`, nextCronTime), + where := fmt.Sprintf(`next_cron_time < '%s' and status in ('prepared', 'aborting', 'submitted')`, nextCronTime) + + order := map[string]string{ + dtmimp.DBTypeMysql: `order by rand()`, + dtmimp.DBTypePostgres: `order by random()`, }[conf.Store.Driver] - ssql := fmt.Sprintf(`select count(1) from trans_global where %s`, where) - var cnt int64 - err := db.ToSQLDB().QueryRow(ssql).Scan(&cnt) - dtmimp.PanicIf(err != nil, err) - if cnt == 0 { + ssql := fmt.Sprintf(`select id from trans_global where %s %s limit 1`, where, order) + var id int64 + err := db.ToSQLDB().QueryRow(ssql).Scan(&id) + if errors.Is(err, sql.ErrNoRows) { return nil } + dtmimp.PanicIf(err != nil, err) - sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE %s`, + sql := fmt.Sprintf(`UPDATE trans_global SET update_time='%s',next_cron_time='%s', owner='%s' WHERE id=%d and %s`, getTimeStr(0), getTimeStr(conf.RetryInterval), owner, + id, where) affected, err := dtmimp.DBExec(conf.Store.Driver, db.ToSQLDB(), sql) From 3e59da2a752e48c29e37e860bc3708a304cd0c6e Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 21 Oct 2023 20:16:23 +0800 Subject: [PATCH 2/3] fix format --- dtmsvr/trans_status.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index 9879b875..742a0949 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -262,13 +262,15 @@ func (t *TransGlobal) execBranch(ctx context.Context, branch *TransBranch, branc func (t *TransGlobal) getNextCronInterval(ctype cronType) int64 { if ctype == cronBackoff { return t.NextCronInterval * 2 - } else if ctype == cronKeep { + } + if ctype == cronKeep { return t.NextCronInterval - } else if t.RetryInterval != 0 { + } + if t.RetryInterval != 0 { return t.RetryInterval - } else if t.TimeoutToFail > 0 && t.TimeoutToFail < conf.RetryInterval { + } + if t.TimeoutToFail > 0 && t.TimeoutToFail < conf.RetryInterval { return t.TimeoutToFail - } else { - return conf.RetryInterval } + return conf.RetryInterval } From a5f0ae139153eced9c864a1cf6ff4fa670d36544 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Sat, 21 Oct 2023 20:20:54 +0800 Subject: [PATCH 3/3] remove dot imports --- dtmsvr/storage/boltdb/boltdb_test.go | 92 ++++++++++++++-------------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/dtmsvr/storage/boltdb/boltdb_test.go b/dtmsvr/storage/boltdb/boltdb_test.go index e89a3e46..c13fab8f 100644 --- a/dtmsvr/storage/boltdb/boltdb_test.go +++ b/dtmsvr/storage/boltdb/boltdb_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - . "github.com/onsi/gomega" + ga "github.com/onsi/gomega" bolt "go.etcd.io/bbolt" "github.com/dtm-labs/dtm/client/dtmcli/dtmimp" @@ -20,13 +20,13 @@ import ( func TestInitializeBuckets(t *testing.T) { t.Run("normal test", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() err = initializeBuckets(db) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) actualBuckets := [][]byte{} err = db.View(func(t *bolt.Tx) error { @@ -35,42 +35,42 @@ func TestInitializeBuckets(t *testing.T) { return nil }) }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) - g.Expect(actualBuckets).To(Equal(allBuckets)) + g.Expect(actualBuckets).To(ga.Equal(allBuckets)) }) } func TestCleanupExpiredData(t *testing.T) { t.Run("negative expired seconds", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() err = cleanupExpiredData(-1*time.Second, db) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) }) t.Run("nil global bucket", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() err = cleanupExpiredData(time.Second, db) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) }) t.Run("normal test", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() // Initialize data err = initializeBuckets(db) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) err = db.Update(func(t *bolt.Tx) error { doneTime := time.Now().Add(-10 * time.Minute) @@ -95,10 +95,10 @@ func TestCleanupExpiredData(t *testing.T) { return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) err = cleanupExpiredData(time.Minute, db) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) actualGids := []string{} err = db.View(func(t *bolt.Tx) error { @@ -108,29 +108,29 @@ func TestCleanupExpiredData(t *testing.T) { } return nil }) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(actualGids).To(Equal([]string{"gid0"})) + g.Expect(err).ToNot(ga.HaveOccurred()) + g.Expect(actualGids).To(ga.Equal([]string{"gid0"})) }) } func TestCleanupGlobalWithGids(t *testing.T) { t.Run("nil bucket", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() err = db.Update(func(t *bolt.Tx) error { cleanupGlobalWithGids(t, nil) return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) }) t.Run("normal test", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() // Initialize data @@ -151,7 +151,7 @@ func TestCleanupGlobalWithGids(t *testing.T) { return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) err = db.Update(func(t *bolt.Tx) error { cleanupGlobalWithGids(t, map[string]struct{}{ @@ -160,7 +160,7 @@ func TestCleanupGlobalWithGids(t *testing.T) { }) return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) actualGids := []string{} err = db.View(func(t *bolt.Tx) error { @@ -170,29 +170,29 @@ func TestCleanupGlobalWithGids(t *testing.T) { } return nil }) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(actualGids).To(Equal([]string{"k3"})) + g.Expect(err).ToNot(ga.HaveOccurred()) + g.Expect(actualGids).To(ga.Equal([]string{"k3"})) }) } func TestCleanupBranchWithGids(t *testing.T) { t.Run("nil bucket", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() err = db.Update(func(t *bolt.Tx) error { cleanupBranchWithGids(t, nil) return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) }) t.Run("normal test", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() // Initialize data @@ -232,7 +232,7 @@ func TestCleanupBranchWithGids(t *testing.T) { return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) err = db.Update(func(t *bolt.Tx) error { cleanupBranchWithGids(t, map[string]struct{}{ @@ -241,7 +241,7 @@ func TestCleanupBranchWithGids(t *testing.T) { }) return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) actualKeys := []string{} err = db.View(func(t *bolt.Tx) error { @@ -251,29 +251,29 @@ func TestCleanupBranchWithGids(t *testing.T) { } return nil }) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(actualKeys).To(Equal([]string{"a", "gid201", "z"})) + g.Expect(err).ToNot(ga.HaveOccurred()) + g.Expect(actualKeys).To(ga.Equal([]string{"a", "gid201", "z"})) }) } func TestCleanupIndexWithGids(t *testing.T) { t.Run("nil bucket", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() err = db.Update(func(t *bolt.Tx) error { cleanupIndexWithGids(t, nil) return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) }) t.Run("normal test", func(t *testing.T) { - g := NewWithT(t) + g := ga.NewWithT(t) db, err := bolt.Open(path.Join(t.TempDir(), "./test.bolt"), 0666, &bolt.Options{Timeout: 1 * time.Second}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) defer db.Close() // Initialize data @@ -313,7 +313,7 @@ func TestCleanupIndexWithGids(t *testing.T) { return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) err = db.Update(func(t *bolt.Tx) error { cleanupIndexWithGids(t, map[string]struct{}{ @@ -322,7 +322,7 @@ func TestCleanupIndexWithGids(t *testing.T) { }) return nil }) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(ga.HaveOccurred()) actualKeys := []string{} err = db.View(func(t *bolt.Tx) error { @@ -332,7 +332,7 @@ func TestCleanupIndexWithGids(t *testing.T) { } return nil }) - g.Expect(err).ToNot(HaveOccurred()) - g.Expect(actualKeys).To(Equal([]string{"3-gid2", "a", "z"})) + g.Expect(err).ToNot(ga.HaveOccurred()) + g.Expect(actualKeys).To(ga.Equal([]string{"3-gid2", "a", "z"})) }) }