From d9f0a535c2a3f576ab80c7f73d2ab22b27795943 Mon Sep 17 00:00:00 2001 From: nhsmw Date: Mon, 23 Sep 2024 17:41:30 +0800 Subject: [PATCH] ticdc: Support Vector data type | tidb=master pd=master tikv=master (#11538) ref pingcap/tiflow#11530 --- cdc/entry/codec.go | 3 + cdc/entry/mounter.go | 5 ++ cdc/entry/mounter_test.go | 10 ++- cdc/entry/schema_storage_test.go | 5 -- cdc/entry/schema_test_helper.go | 1 - cdc/sink/ddlsink/mysql/format_ddl.go | 64 +++++++++++++++ cdc/sink/ddlsink/mysql/format_ddl_test.go | 47 +++++++++++ cdc/sink/ddlsink/mysql/mysql_ddl_sink.go | 38 +++++++++ cdc/sink/dmlsink/txn/mysql/dml.go | 17 ++-- cdc/sink/dmlsink/txn/mysql/dml_test.go | 50 ++++++++++++ cdc/sink/dmlsink/txn/mysql/mysql.go | 19 +++-- cdc/sink/dmlsink/txn/mysql/mysql_test.go | 81 +++++++++++++++++++ dm/pkg/conn/mockdb.go | 1 - dm/syncer/expr_filter_group_test.go | 3 +- .../download-compatibility-test-binaries.sh | 15 +++- .../download-integration-test-binaries.sh | 19 ++++- go.mod | 44 +++++----- go.sum | 73 +++++++++-------- pkg/filter/filter_test_helper.go | 1 - pkg/sink/codec/avro/avro.go | 61 ++++++++------ pkg/sink/codec/canal/canal_entry.go | 3 + pkg/sink/codec/canal/canal_entry_test.go | 12 ++- pkg/sink/codec/canal/canal_json_message.go | 1 + .../canal_json_row_event_encoder_test.go | 29 ++++++- pkg/sink/codec/common/verify_checksum.go | 4 + pkg/sink/codec/craft/message_decoder.go | 5 ++ pkg/sink/codec/craft/message_encoder.go | 4 + pkg/sink/codec/csv/csv_message.go | 6 +- pkg/sink/codec/csv/csv_message_test.go | 14 ++++ pkg/sink/codec/debezium/codec.go | 12 +++ pkg/sink/codec/internal/column.go | 5 +- pkg/sink/codec/internal/java.go | 3 + pkg/sink/codec/maxwell/maxwell_message.go | 12 +++ .../codec/open/open_protocol_encoder_test.go | 8 +- pkg/sink/codec/simple/encoder_test.go | 53 ++++++++++-- pkg/sink/codec/simple/message.go | 29 +++++-- pkg/sink/codec/utils/test_utils.go | 3 + pkg/sink/mysql/config.go | 12 +++ scripts/download-integration-test-binaries.sh | 27 +++++-- .../kafka_simple_basic/data/data.sql | 2 +- .../kafka_simple_basic_avro/data/data.sql | 2 +- tests/integration_tests/run_group.sh | 2 +- .../vector/conf/diff_config.toml | 30 +++++++ tests/integration_tests/vector/data/data.sql | 34 ++++++++ tests/integration_tests/vector/run.sh | 44 ++++++++++ 45 files changed, 769 insertions(+), 144 deletions(-) create mode 100644 cdc/sink/ddlsink/mysql/format_ddl.go create mode 100644 cdc/sink/ddlsink/mysql/format_ddl_test.go create mode 100644 tests/integration_tests/vector/conf/diff_config.toml create mode 100644 tests/integration_tests/vector/data/data.sql create mode 100755 tests/integration_tests/vector/run.sh diff --git a/cdc/entry/codec.go b/cdc/entry/codec.go index 7224957d4d1..7828b7a1665 100644 --- a/cdc/entry/codec.go +++ b/cdc/entry/codec.go @@ -210,6 +210,9 @@ func unflatten(datum types.Datum, ft *types.FieldType, loc *time.Location) (type byteSize := (ft.GetFlen() + 7) >> 3 datum.SetUint64(0) datum.SetMysqlBit(types.NewBinaryLiteralFromUint(val, byteSize)) + case mysql.TypeTiDBVectorFloat32: + datum.SetVectorFloat32(types.ZeroVectorFloat32) + return datum, nil } return datum, nil } diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 0e511e47a78..c2ea2319ad2 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -592,6 +592,8 @@ func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) { return types.NewFloat32Datum(value.(float32)), nil case mysql.TypeDouble: return types.NewFloat64Datum(value.(float64)), nil + case mysql.TypeTiDBVectorFloat32: + return types.NewVectorFloat32Datum(value.(types.VectorFloat32)), nil default: log.Panic("unexpected mysql type found", zap.Any("type", ft.GetType())) } @@ -888,6 +890,9 @@ func formatColVal(datum types.Datum, col *timodel.ColumnInfo) ( } const sizeOfV = unsafe.Sizeof(v) return v, int(sizeOfV), warn, nil + case mysql.TypeTiDBVectorFloat32: + b := datum.GetVectorFloat32() + return b, b.Len(), "", nil default: // NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail // Make specified convert upper if you need diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index 739080d63b4..760481ebd80 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -269,7 +269,6 @@ func testMounterDisableOldValue(t *testing.T, tc struct { ticonfig.UpdateGlobal(func(conf *ticonfig.Config) { // we can update the tidb config here }) - session.SetSchemaLease(0) session.DisableStats4Test() domain, err := session.BootstrapSession(store) require.Nil(t, err) @@ -1682,4 +1681,13 @@ func TestFormatColVal(t *testing.T) { require.NoError(t, err) require.Equal(t, float32(0), value) require.NotZero(t, warn) + + vector, _ := types.ParseVectorFloat32("[1,2,3,4,5]") + ftTypeVector := types.NewFieldType(mysql.TypeTiDBVectorFloat32) + col = &timodel.ColumnInfo{FieldType: *ftTypeVector} + datum.SetVectorFloat32(vector) + value, _, warn, err = formatColVal(datum, col) + require.NoError(t, err) + require.Equal(t, vector, value) + require.Zero(t, warn) } diff --git a/cdc/entry/schema_storage_test.go b/cdc/entry/schema_storage_test.go index 4ae58289754..1e8e96e62dc 100644 --- a/cdc/entry/schema_storage_test.go +++ b/cdc/entry/schema_storage_test.go @@ -672,8 +672,6 @@ func TestCreateSnapFromMeta(t *testing.T) { store, err := mockstore.NewMockStore() require.Nil(t, err) defer store.Close() //nolint:errcheck - - session.SetSchemaLease(0) session.DisableStats4Test() domain, err := session.BootstrapSession(store) require.Nil(t, err) @@ -708,7 +706,6 @@ func TestExplicitTables(t *testing.T) { require.Nil(t, err) defer store.Close() //nolint:errcheck - session.SetSchemaLease(0) session.DisableStats4Test() domain, err := session.BootstrapSession(store) require.Nil(t, err) @@ -859,7 +856,6 @@ func TestSchemaStorage(t *testing.T) { ticonfig.UpdateGlobal(func(conf *ticonfig.Config) { conf.AlterPrimaryKey = true }) - session.SetSchemaLease(0) session.DisableStats4Test() domain, err := session.BootstrapSession(store) require.Nil(t, err) @@ -952,7 +948,6 @@ func TestHandleKey(t *testing.T) { require.Nil(t, err) defer store.Close() //nolint:errcheck - session.SetSchemaLease(0) session.DisableStats4Test() domain, err := session.BootstrapSession(store) require.Nil(t, err) diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index fba513ba08c..9d9e14faf45 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -61,7 +61,6 @@ func NewSchemaTestHelperWithReplicaConfig( ticonfig.UpdateGlobal(func(conf *ticonfig.Config) { conf.AlterPrimaryKey = true }) - session.SetSchemaLease(0) session.DisableStats4Test() domain, err := session.BootstrapSession(store) require.NoError(t, err) diff --git a/cdc/sink/ddlsink/mysql/format_ddl.go b/cdc/sink/ddlsink/mysql/format_ddl.go new file mode 100644 index 00000000000..8332d18ff28 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/format_ddl.go @@ -0,0 +1,64 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "bytes" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/format" + "github.com/pingcap/tidb/pkg/parser/mysql" + "go.uber.org/zap" +) + +type visiter struct{} + +func (f *visiter) Enter(n ast.Node) (node ast.Node, skipChildren bool) { + switch v := n.(type) { + case *ast.ColumnDef: + if v.Tp != nil { + switch v.Tp.GetType() { + case mysql.TypeTiDBVectorFloat32: + v.Tp.SetType(mysql.TypeLongBlob) + v.Tp.SetCharset("") + v.Tp.SetCollate("") + v.Tp.SetFlen(-1) + v.Options = []*ast.ColumnOption{} // clear COMMENT + } + } + } + return n, false +} + +func (f *visiter) Leave(n ast.Node) (node ast.Node, ok bool) { + return n, true +} + +func formatQuery(sql string) string { + p := parser.New() + stmt, err := p.ParseOneStmt(sql, "", "") + if err != nil { + log.Error("format query parse one stmt failed", zap.Error(err)) + } + stmt.Accept(&visiter{}) + + buf := new(bytes.Buffer) + restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf) + if err = stmt.Restore(restoreCtx); err != nil { + log.Error("format query restore failed", zap.Error(err)) + } + return buf.String() +} diff --git a/cdc/sink/ddlsink/mysql/format_ddl_test.go b/cdc/sink/ddlsink/mysql/format_ddl_test.go new file mode 100644 index 00000000000..2059877ce79 --- /dev/null +++ b/cdc/sink/ddlsink/mysql/format_ddl_test.go @@ -0,0 +1,47 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mysql + +import ( + "bytes" + "testing" + + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/format" + "github.com/stretchr/testify/require" +) + +func TestFormatQuery(t *testing.T) { + sql := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` VECTOR(5))" + expectSQL := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` LONGTEXT)" + p := parser.New() + stmt, err := p.ParseOneStmt(sql, "", "") + require.NoError(t, err) + stmt.Accept(&visiter{}) + + buf := new(bytes.Buffer) + restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, buf) + err = stmt.Restore(restoreCtx) + require.NoError(t, err) + require.Equal(t, buf.String(), expectSQL) +} + +func BenchmarkFormatQuery(b *testing.B) { + sql := "CREATE TABLE `test` (`id` INT PRIMARY KEY,`data` LONGTEXT)" + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + formatQuery(sql) + } +} diff --git a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go index dfac1654577..ed5be1b8158 100644 --- a/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go +++ b/cdc/sink/ddlsink/mysql/mysql_ddl_sink.go @@ -20,9 +20,12 @@ import ( "net/url" "time" + "github.com/coreos/go-semver/semver" lru "github.com/hashicorp/golang-lru" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/version" + "github.com/pingcap/tidb/dumpling/export" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink" @@ -42,6 +45,8 @@ const ( // networkDriftDuration is used to construct a context timeout for database operations. networkDriftDuration = 5 * time.Second + + defaultSupportVectorVersion = "8.3.0" ) // GetDBConnImpl is the implementation of pmysql.IDBConnectionFactory. @@ -66,6 +71,8 @@ type DDLSink struct { // is running in downstream. // map: model.TableName -> timodel.ActionType lastExecutedNormalDDLCache *lru.Cache + + needFormat bool } // NewDDLSink creates a new DDLSink. @@ -105,12 +112,14 @@ func NewDDLSink( if err != nil { return nil, err } + m := &DDLSink{ id: changefeedID, db: db, cfg: cfg, statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink), lastExecutedNormalDDLCache: lruCache, + needFormat: needFormatDDL(db, cfg), } log.Info("MySQL DDL sink is created", @@ -198,6 +207,14 @@ func (m *DDLSink) execDDL(pctx context.Context, ddl *model.DDLEvent) error { shouldSwitchDB := needSwitchDB(ddl) + // Convert vector type to string type for unsupport database + if m.needFormat { + if newQuery := formatQuery(ddl.Query); newQuery != ddl.Query { + log.Warn("format ddl query", zap.String("newQuery", newQuery), zap.String("query", ddl.Query), zap.String("collate", ddl.Collate), zap.String("charset", ddl.Charset)) + ddl.Query = newQuery + } + } + failpoint.Inject("MySQLSinkExecDDLDelay", func() { select { case <-ctx.Done(): @@ -271,6 +288,27 @@ func needSwitchDB(ddl *model.DDLEvent) bool { return true } +// needFormatDDL checks vector type support +func needFormatDDL(db *sql.DB, cfg *pmysql.Config) bool { + if !cfg.HasVectorType { + log.Warn("please set `has-vector-type` to be true if a column is vector type when the downstream is not TiDB or TiDB version less than specify version", + zap.Any("hasVectorType", cfg.HasVectorType), zap.Any("supportVectorVersion", defaultSupportVectorVersion)) + return false + } + versionInfo, err := export.SelectVersion(db) + if err != nil { + log.Warn("fail to get version", zap.Error(err), zap.Bool("isTiDB", cfg.IsTiDB)) + return false + } + serverInfo := version.ParseServerInfo(versionInfo) + version := semver.New(defaultSupportVectorVersion) + if !cfg.IsTiDB || serverInfo.ServerVersion.LessThan(*version) { + log.Error("downstream unsupport vector type. it will be converted to longtext", zap.String("version", serverInfo.ServerVersion.String()), zap.String("supportVectorVersion", defaultSupportVectorVersion), zap.Bool("isTiDB", cfg.IsTiDB)) + return true + } + return false +} + // WriteCheckpointTs does nothing. func (m *DDLSink) WriteCheckpointTs(_ context.Context, _ uint64, _ []*model.TableInfo) error { // Only for RowSink for now. diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index da29618908f..a82d7fc65c4 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -17,6 +17,7 @@ import ( "strings" "github.com/pingcap/tidb/pkg/parser/charset" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/quotes" ) @@ -109,16 +110,16 @@ func prepareReplace( // will automatically set `_binary` charset for that column, which is not expected. // See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267 func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { - if col.Charset != "" && col.Charset != charset.CharsetBin { - colValBytes, ok := col.Value.([]byte) - if ok { - args = append(args, string(colValBytes)) - } else { - args = append(args, col.Value) + switch v := col.Value.(type) { + case []byte: + if col.Charset != "" && col.Charset != charset.CharsetBin { + args = append(args, string(v)) + return args } - } else { - args = append(args, col.Value) + case types.VectorFloat32: + col.Value = v.String() } + args = append(args, col.Value) return args } diff --git a/cdc/sink/dmlsink/txn/mysql/dml_test.go b/cdc/sink/dmlsink/txn/mysql/dml_test.go index dab7c4b104f..ffc3c846982 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml_test.go +++ b/cdc/sink/dmlsink/txn/mysql/dml_test.go @@ -18,7 +18,9 @@ import ( "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -248,6 +250,37 @@ func TestPrepareUpdate(t *testing.T) { expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1", expectedArgs: []interface{}{2, "世界", 1, "你好"}, }, + { + quoteTable: "`test`.`t1`", + preCols: []*model.Column{ + { + Name: "a", + Type: mysql.TypeLong, + Flag: model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, + { + Name: "b", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1.0,-2,0.33,-4.4,55]")), + }, + }, + cols: []*model.Column{ + { + Name: "a", + Type: mysql.TypeLong, + Flag: model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, + { + Name: "b", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")), + }, + }, + expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? LIMIT 1", + expectedArgs: []interface{}{1, "[1,2,3,4,5]", 1}, + }, } for _, tc := range testCases { query, args := prepareUpdate(tc.quoteTable, tc.preCols, tc.cols, false) @@ -709,6 +742,23 @@ func TestMapReplace(t *testing.T) { []byte("你好,世界"), }, }, + { + quoteTable: "`test`.`t1`", + cols: []*model.Column{ + { + Name: "a", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1.0,-2,0.3,-4.4,55]")), + }, + { + Name: "b", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")), + }, + }, + expectedQuery: "REPLACE INTO `test`.`t1` (`a`,`b`) VALUES ", + expectedArgs: []interface{}{"[1,-2,0.3,-4.4,55]", "[1,2,3,4,5]"}, + }, } for _, tc := range testCases { // multiple times to verify the stability of column sequence in query string diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index ac11a198014..60fd558f424 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" @@ -338,17 +339,19 @@ func convert2RowChanges( return res } -func convertBinaryToString(cols []*model.ColumnData, tableInfo *model.TableInfo) { +func convertValue(cols []*model.ColumnData, tableInfo *model.TableInfo) { for i, col := range cols { if col == nil { continue } - colInfo := tableInfo.ForceGetColumnInfo(col.ColumnID) - if colInfo.GetCharset() != "" && colInfo.GetCharset() != charset.CharsetBin { - colValBytes, ok := col.Value.([]byte) - if ok { - cols[i].Value = string(colValBytes) + switch v := col.Value.(type) { + case []byte: + colInfo := tableInfo.ForceGetColumnInfo(col.ColumnID) + if colInfo.GetCharset() != "" && colInfo.GetCharset() != charset.CharsetBin { + cols[i].Value = string(v) } + case types.VectorFloat32: + cols[i].Value = v.String() } } } @@ -367,8 +370,8 @@ func (s *mysqlBackend) groupRowsByType( deleteRow := make([]*sqlmodel.RowChange, 0, preAllocateSize) for _, row := range event.Event.Rows { - convertBinaryToString(row.Columns, tableInfo) - convertBinaryToString(row.PreColumns, tableInfo) + convertValue(row.Columns, tableInfo) + convertValue(row.PreColumns, tableInfo) if row.IsInsert() { insertRow = append( diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index ce0adb637aa..26ff9cc28d8 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink" "github.com/pingcap/tiflow/cdc/sink/metrics" @@ -41,6 +42,7 @@ import ( "github.com/pingcap/tiflow/pkg/sink" pmysql "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/sqlmodel" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -116,6 +118,17 @@ func TestPrepareDML(t *testing.T) { }, }, [][]int{{1, 2}}) + tableInfoVector := model.BuildTableInfo("common_1", "uk_without_pk", []*model.Column{ + nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + }, { + Name: "a3", + Type: mysql.TypeTiDBVectorFloat32, + }, + }, [][]int{{1, 2}}) + testCases := []struct { input []*model.RowChangedEvent expected *preparedDMLs @@ -185,6 +198,35 @@ func TestPrepareDML(t *testing.T) { approximateSize: 63, }, }, + // vector type + { + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813518, + CommitTs: 418658114257813519, + TableInfo: tableInfoVector, + Columns: model.Columns2ColumnDatas( + []*model.Column{ + nil, { + Name: "a1", + Type: mysql.TypeLong, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeTiDBVectorFloat32, + Value: util.Must(types.ParseVectorFloat32("[1.1,-2,3.33,-4.12,-5]")), + }, + }, tableInfoVector), + }, + }, + expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813518}, + sqls: []string{"INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)"}, + values: [][]interface{}{{1, "[1.1,-2,3.33,-4.12,-5]"}}, + rowCount: 1, + approximateSize: 63, + }, + }, } ctx, cancel := context.WithCancel(context.Background()) @@ -1111,6 +1153,14 @@ func TestPrepareBatchDMLs(t *testing.T) { Charset: charset.CharsetGBK, Flag: model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, }}, [][]int{{0, 1}}) + tableInfoWithVector := model.BuildTableInfo("common_1", "uk_without_pk", []*model.Column{{ + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag | model.UniqueKeyFlag, + }, { + Name: "a3", + Type: mysql.TypeTiDBVectorFloat32, + }}, [][]int{{0, 1}}) testCases := []struct { isTiDB bool input []*model.RowChangedEvent @@ -1425,6 +1475,37 @@ func TestPrepareBatchDMLs(t *testing.T) { approximateSize: 204, }, }, + + // inser vector data + { + isTiDB: true, + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + TableInfo: tableInfoWithVector, + Columns: model.Columns2ColumnDatas([]*model.Column{{ + Name: "a1", + Value: 1, + }, { + Name: "a3", + Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")), + }}, tableInfoWithVector), + ApproximateDataSize: 10, + }, + }, + expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516}, + sqls: []string{ + "INSERT INTO `common_1`.`uk_without_pk` (`a1`,`a3`) VALUES (?,?)", + }, + values: [][]interface{}{ + {1, "[1,2,3,4,5]"}, + }, + rowCount: 1, + approximateSize: 73, + }, + }, } ctx, cancel := context.WithCancel(context.Background()) diff --git a/dm/pkg/conn/mockdb.go b/dm/pkg/conn/mockdb.go index f9d943071b5..700f4d3e6b9 100644 --- a/dm/pkg/conn/mockdb.go +++ b/dm/pkg/conn/mockdb.go @@ -147,7 +147,6 @@ func NewCluster() (*Cluster, error) { } cluster.Storage = storage - session.SetSchemaLease(0) session.DisableStats4Test() dom, err := session.BootstrapSession(storage) if err != nil { diff --git a/dm/syncer/expr_filter_group_test.go b/dm/syncer/expr_filter_group_test.go index 7bb15b68f01..0cc161981f4 100644 --- a/dm/syncer/expr_filter_group_test.go +++ b/dm/syncer/expr_filter_group_test.go @@ -17,6 +17,7 @@ import ( "context" "testing" + "github.com/pingcap/errors" ddl2 "github.com/pingcap/tidb/pkg/ddl" context2 "github.com/pingcap/tidb/pkg/expression/context" "github.com/pingcap/tidb/pkg/parser/ast" @@ -440,7 +441,7 @@ create table t ( require.NoError(t, err) require.Len(t, exprs, 1) expr := exprs[0] - require.Equal(t, "0", expr.StringWithCtx(context2.EmptyParamValues)) + require.Equal(t, "0", expr.StringWithCtx(context2.EmptyParamValues, errors.RedactLogDisable)) // skip nothing skip, err := SkipDMLByExpression(sessCtx, []interface{}{0}, expr, ti.Columns) diff --git a/dm/tests/download-compatibility-test-binaries.sh b/dm/tests/download-compatibility-test-binaries.sh index cceb8c4432d..c4797084447 100755 --- a/dm/tests/download-compatibility-test-binaries.sh +++ b/dm/tests/download-compatibility-test-binaries.sh @@ -41,6 +41,19 @@ function download() { wget --no-verbose --retry-connrefused --waitretry=1 -t 3 -O "${file_path}" "${url}" } +function get_sha1() { + local repo="$1" + local branch="$2" + file_server_url="http://fileserver.pingcap.net" + sha1=$(curl -s "${file_server_url}/download/refs/pingcap/${repo}/${branch}/sha1") + if [ $? -ne 0 ] || echo "$sha1" | grep -q "Error"; then + echo "Failed to get sha1 with repo ${repo} branch ${branch}: $sha1. use branch master to instead" >&2 + branch=master + sha1=$(curl -s "${file_server_url}/download/refs/pingcap/${repo}/${branch}/sha1") + fi + echo $sha1 +} + # Specify the download branch. branch=$1 @@ -48,7 +61,7 @@ branch=$1 file_server_url="http://fileserver.pingcap.net" # Get sha1 based on branch name. -tidb_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1") +tidb_sha1=$(get_sha1 "tidb" "$branch") # All download links. tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz" diff --git a/dm/tests/download-integration-test-binaries.sh b/dm/tests/download-integration-test-binaries.sh index 21a505664d9..34dab604e85 100755 --- a/dm/tests/download-integration-test-binaries.sh +++ b/dm/tests/download-integration-test-binaries.sh @@ -41,6 +41,19 @@ function download() { wget --no-verbose --retry-connrefused --waitretry=1 -t 3 -O "${file_path}" "${url}" } +function get_sha1() { + local repo="$1" + local branch="$2" + file_server_url="http://fileserver.pingcap.net" + sha1=$(curl -s "${file_server_url}/download/refs/pingcap/${repo}/${branch}/sha1") + if [ $? -ne 0 ] || echo "$sha1" | grep -q "Error"; then + echo "Failed to get sha1 with repo ${repo} branch ${branch}: $sha1. use branch master to instead" >&2 + branch=master + sha1=$(curl -s "${file_server_url}/download/refs/pingcap/${repo}/${branch}/sha1") + fi + echo $sha1 +} + # Specify the download branch. branch=$1 @@ -48,9 +61,9 @@ branch=$1 file_server_url="http://fileserver.pingcap.net" # Get sha1 based on branch name. -tidb_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1") -tikv_sha1=$(curl "${file_server_url}/download/refs/pingcap/tikv/${branch}/sha1") -pd_sha1=$(curl "${file_server_url}/download/refs/pingcap/pd/${branch}/sha1") +tidb_sha1=$(get_sha1 "tidb" "$branch") +tikv_sha1=$(get_sha1 "tikv" "$branch") +pd_sha1=$(get_sha1 "pd" "$branch") tidb_tools_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb-tools/master/sha1") # All download links. diff --git a/go.mod b/go.mod index 3757a1ef1dd..0643b691dce 100644 --- a/go.mod +++ b/go.mod @@ -67,11 +67,11 @@ require ( github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d - github.com/pingcap/tidb v1.1.0-beta.0.20240722024203-504960d51b2a + github.com/pingcap/tidb v1.1.0-beta.0.20240822082843-d2b20bd898d1 github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 - github.com/pingcap/tidb/pkg/parser v0.0.0-20240722015532-8edd4ed54376 + github.com/pingcap/tidb/pkg/parser v0.0.0-20240822082843-d2b20bd898d1 github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_model v0.6.1 github.com/r3labs/diff v1.1.0 @@ -89,9 +89,9 @@ require ( github.com/swaggo/swag v1.16.3 github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 github.com/thanhpk/randstr v1.0.6 - github.com/tikv/client-go/v2 v2.0.8-0.20240703095801-d73cc1ed6503 + github.com/tikv/client-go/v2 v2.0.8-0.20240815020919-c810ed88fb02 github.com/tikv/pd v1.1.0-beta.0.20240407022249-7179657d129b - github.com/tikv/pd/client v0.0.0-20240717053728-5ec6af403019 + github.com/tikv/pd/client v0.0.0-20240805092608-838ee7983b78 github.com/tinylib/msgp v1.1.6 github.com/uber-go/atomic v1.4.0 github.com/vmihailenco/msgpack/v5 v5.3.5 @@ -111,19 +111,19 @@ require ( go.uber.org/ratelimit v0.2.0 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 - golang.org/x/net v0.27.0 + golang.org/x/net v0.28.0 golang.org/x/oauth2 v0.21.0 - golang.org/x/sync v0.7.0 - golang.org/x/sys v0.22.0 - golang.org/x/text v0.16.0 + golang.org/x/sync v0.8.0 + golang.org/x/sys v0.24.0 + golang.org/x/text v0.17.0 golang.org/x/time v0.5.0 google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v2 v2.4.0 - gorm.io/driver/mysql v1.4.5 - gorm.io/gorm v1.24.5 + gorm.io/driver/mysql v1.5.7 + gorm.io/gorm v1.25.11 upper.io/db.v3 v3.7.1+incompatible ) @@ -309,7 +309,7 @@ require ( github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/mtibben/percent v0.2.1 // indirect @@ -321,14 +321,14 @@ require ( github.com/opentracing/basictracer-go v1.1.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect - github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect + github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/philhofer/fwd v1.1.1 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d // indirect github.com/pingcap/fn v1.0.0 // indirect github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 - github.com/pingcap/tipb v0.0.0-20240318032315-55a7867ddd50 // indirect + github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -339,7 +339,7 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/rs/cors v1.7.0 // indirect - github.com/sasha-s/go-deadlock v0.3.1 // indirect + github.com/sasha-s/go-deadlock v0.3.5 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c // indirect github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect @@ -380,10 +380,10 @@ require ( go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect - golang.org/x/crypto v0.25.0 // indirect - golang.org/x/mod v0.18.0 // indirect - golang.org/x/term v0.22.0 - golang.org/x/tools v0.22.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/mod v0.20.0 // indirect + golang.org/x/term v0.23.0 + golang.org/x/tools v0.24.0 // indirect google.golang.org/api v0.170.0 // indirect gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect @@ -398,7 +398,7 @@ require ( ) // Fix https://github.com/pingcap/tiflow/issues/4961 -replace github.com/benbjohnson/clock v1.3.0 => github.com/benbjohnson/clock v1.1.0 +replace github.com/benbjohnson/clock v1.3.5 => github.com/benbjohnson/clock v1.1.0 // copy from TiDB replace go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac @@ -410,7 +410,9 @@ replace github.com/tildeleb/hashland => leb.io/hashland v0.1.5 replace github.com/chaos-mesh/go-sqlsmith => github.com/PingCAP-QE/go-sqlsmith v0.0.0-20231213065948-336e064b488d -replace gorm.io/driver/mysql v1.4.5 => gorm.io/driver/mysql v1.3.3 +replace gorm.io/driver/mysql v1.5.7 => gorm.io/driver/mysql v1.3.3 + +replace gorm.io/gorm v1.25.11 => gorm.io/gorm v1.24.5 // TODO: `sourcegraph.com/sourcegraph/appdash` has been archived, and the original host has been removed. // Please remove these dependencies. diff --git a/go.sum b/go.sum index dd74ef9a83f..c2926405a99 100644 --- a/go.sum +++ b/go.sum @@ -152,9 +152,9 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.20.1 h1:U7h9CPoyMfVoN5jUglB0LglCMP10 github.com/aws/aws-sdk-go-v2/service/sts v1.20.1/go.mod h1:BUHusg4cOA1TFGegj7x8/eoWrbdHzJfoMrXcbMQAG0k= github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= -github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.4.0 h1:+YZ8ePm+He2pU3dZlIZiOeAKfrBkXi1lSrXJ/Xzgbu8= @@ -773,8 +773,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= -github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= -github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebGE2xrk= github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= @@ -860,9 +860,8 @@ github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwp github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= -github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o= -github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 h1:jik8PHtAIsPlCRJjJzl4udgEf7hawInF9texMeO2jrU= -github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= +github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 h1:Dx7Ovyv/SFnMFw3fD4oEoeorXc6saIiQ23LrGLth0Gw= +github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc= github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= @@ -892,8 +891,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc= +github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= @@ -901,14 +900,14 @@ github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfU github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE= github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530= -github.com/pingcap/tidb v1.1.0-beta.0.20240722024203-504960d51b2a h1:lIj1fFRvB1EwU5005B7vlAl5eu1LUg18FTw83X5Ed1A= -github.com/pingcap/tidb v1.1.0-beta.0.20240722024203-504960d51b2a/go.mod h1:2FO9Yzc8HOYk+aZRYrguuwN6OS12TE+OA2RtukAS+Z8= +github.com/pingcap/tidb v1.1.0-beta.0.20240822082843-d2b20bd898d1 h1:xsXvpphP/G3VVzicD5sU5iA5OBkLYxcGAjR9KmhIziE= +github.com/pingcap/tidb v1.1.0-beta.0.20240822082843-d2b20bd898d1/go.mod h1:fIHzhGkdtFsq29vDWpVLQZAUDCQYc5Khd3UF8J+vxd0= github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4= github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= -github.com/pingcap/tidb/pkg/parser v0.0.0-20240722015532-8edd4ed54376 h1:tz757zpST60fweJB5ZjVl74kQGbRrpmIiTtF6BdnMow= -github.com/pingcap/tidb/pkg/parser v0.0.0-20240722015532-8edd4ed54376/go.mod h1:c/4la2yfv1vBYvtIG8WCDyDinLMDIUC5+zLRHiafY+Y= -github.com/pingcap/tipb v0.0.0-20240318032315-55a7867ddd50 h1:fVNBE06Rjec+EIHaYAKAHa/bIt5lnu3Zh9O6kV7ZAdg= -github.com/pingcap/tipb v0.0.0-20240318032315-55a7867ddd50/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tidb/pkg/parser v0.0.0-20240822082843-d2b20bd898d1 h1:AdP6/kzxPTEXFjWCDl4beuRKo7lrOLW2kjWVccAOx74= +github.com/pingcap/tidb/pkg/parser v0.0.0-20240822082843-d2b20bd898d1/go.mod h1:c/4la2yfv1vBYvtIG8WCDyDinLMDIUC5+zLRHiafY+Y= +github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b h1:tySAGYw21A3Xa8CcA9jBTfrgAB3+KQWyqyW7fUyokzk= +github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -974,8 +973,8 @@ github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfF github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46/go.mod h1:uAQ5PCi+MFsC7HjREoAz1BU+Mq60+05gifQSsHSDG/8= -github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= -github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= +github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU= +github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -1104,12 +1103,12 @@ github.com/tidwall/rtree v0.0.0-20180113144539-6cd427091e0e/go.mod h1:/h+UnNGt0I github.com/tidwall/tinyqueue v0.0.0-20180302190814-1e39f5511563/go.mod h1:mLqSmt7Dv/CNneF2wfcChfN1rvapyQr01LGKnKex0DQ= github.com/tidwall/tinyqueue v0.1.1 h1:SpNEvEggbpyN5DIReaJ2/1ndroY8iyEGxPYxoSaymYE= github.com/tidwall/tinyqueue v0.1.1/go.mod h1:O/QNHwrnjqr6IHItYrzoHAKYhBkLI67Q096fQP5zMYw= -github.com/tikv/client-go/v2 v2.0.8-0.20240703095801-d73cc1ed6503 h1:0mUlg3+dA5LvwKs1U6i/ID/8RsYgLVLGyM8fSBMb630= -github.com/tikv/client-go/v2 v2.0.8-0.20240703095801-d73cc1ed6503/go.mod h1:4HDOAx8OXAJPtqhCZ03IhChXgaFs4B3+vSrPWmiPxjg= +github.com/tikv/client-go/v2 v2.0.8-0.20240815020919-c810ed88fb02 h1:XKZTb6ZyosZSkvOlmROlhGVHlGHEa3FmIip86cRI1TY= +github.com/tikv/client-go/v2 v2.0.8-0.20240815020919-c810ed88fb02/go.mod h1:4HDOAx8OXAJPtqhCZ03IhChXgaFs4B3+vSrPWmiPxjg= github.com/tikv/pd v1.1.0-beta.0.20240407022249-7179657d129b h1:t2XoZp4UHrkPpYPsxbRTRVExJnriWlh+ZsDIfpYyd98= github.com/tikv/pd v1.1.0-beta.0.20240407022249-7179657d129b/go.mod h1:7HJMdb0O5umNpZIFt8e/wKAcEmH99n2HsYgXX+vZj3k= -github.com/tikv/pd/client v0.0.0-20240717053728-5ec6af403019 h1:7VoatJKzIrsjepOaXQjpAcgxQrx2QBAI4HuZ0wFdinA= -github.com/tikv/pd/client v0.0.0-20240717053728-5ec6af403019/go.mod h1:QeMzXKDOW+GbbE+ckcVPBVS6vX3//QB99dXU+niYRq0= +github.com/tikv/pd/client v0.0.0-20240805092608-838ee7983b78 h1:PtW+yTvs9eGTMblulaCHmJ5OtifuE4SJXCACCtkd6ko= +github.com/tikv/pd/client v0.0.0-20240805092608-838ee7983b78/go.mod h1:TxrJRY949Vl14Lmarx6hTNP/HEDYzn4dP0KmjdzQ59w= github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw= github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= @@ -1297,8 +1296,8 @@ golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1354,8 +1353,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0= -golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= +golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1399,8 +1398,8 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1419,8 +1418,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180816055513-1c9583448a9c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1495,8 +1494,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1505,8 +1504,8 @@ golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= -golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= +golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1521,8 +1520,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1586,8 +1585,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= -golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/filter/filter_test_helper.go b/pkg/filter/filter_test_helper.go index e5b4afedb23..30f39233be1 100644 --- a/pkg/filter/filter_test_helper.go +++ b/pkg/filter/filter_test_helper.go @@ -46,7 +46,6 @@ func newTestHelper(t *testing.T) *testHelper { ticonfig.UpdateGlobal(func(conf *ticonfig.Config) { conf.AlterPrimaryKey = true }) - session.SetSchemaLease(0) session.DisableStats4Test() domain, err := session.BootstrapSession(store) require.Nil(t, err) diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index 10222296a06..368f4030471 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -359,30 +359,31 @@ const ( ) var type2TiDBType = map[byte]string{ - mysql.TypeTiny: "INT", - mysql.TypeShort: "INT", - mysql.TypeInt24: "INT", - mysql.TypeLong: "INT", - mysql.TypeLonglong: "BIGINT", - mysql.TypeFloat: "FLOAT", - mysql.TypeDouble: "DOUBLE", - mysql.TypeBit: "BIT", - mysql.TypeNewDecimal: "DECIMAL", - mysql.TypeTinyBlob: "TEXT", - mysql.TypeMediumBlob: "TEXT", - mysql.TypeBlob: "TEXT", - mysql.TypeLongBlob: "TEXT", - mysql.TypeVarchar: "TEXT", - mysql.TypeVarString: "TEXT", - mysql.TypeString: "TEXT", - mysql.TypeEnum: "ENUM", - mysql.TypeSet: "SET", - mysql.TypeJSON: "JSON", - mysql.TypeDate: "DATE", - mysql.TypeDatetime: "DATETIME", - mysql.TypeTimestamp: "TIMESTAMP", - mysql.TypeDuration: "TIME", - mysql.TypeYear: "YEAR", + mysql.TypeTiny: "INT", + mysql.TypeShort: "INT", + mysql.TypeInt24: "INT", + mysql.TypeLong: "INT", + mysql.TypeLonglong: "BIGINT", + mysql.TypeFloat: "FLOAT", + mysql.TypeDouble: "DOUBLE", + mysql.TypeBit: "BIT", + mysql.TypeNewDecimal: "DECIMAL", + mysql.TypeTinyBlob: "TEXT", + mysql.TypeMediumBlob: "TEXT", + mysql.TypeBlob: "TEXT", + mysql.TypeLongBlob: "TEXT", + mysql.TypeVarchar: "TEXT", + mysql.TypeVarString: "TEXT", + mysql.TypeString: "TEXT", + mysql.TypeEnum: "ENUM", + mysql.TypeSet: "SET", + mysql.TypeJSON: "JSON", + mysql.TypeDate: "DATE", + mysql.TypeDatetime: "DATETIME", + mysql.TypeTimestamp: "TIMESTAMP", + mysql.TypeDuration: "TIME", + mysql.TypeYear: "YEAR", + mysql.TypeTiDBVectorFloat32: "TiDBVECTORFloat32", } func getTiDBTypeFromColumn(col *model.Column) string { @@ -439,6 +440,8 @@ func mysqlTypeFromTiDBType(tidbType string) byte { result = mysql.TypeDuration case "YEAR": result = mysql.TypeYear + case "TiDBVECTORFloat32": + result = mysql.TypeTiDBVectorFloat32 default: log.Panic("this should not happen, unknown TiDB type", zap.String("type", tidbType)) } @@ -812,6 +815,11 @@ func (a *BatchEncoder) columnToAvroSchema( Type: "int", Parameters: map[string]string{tidbType: tt}, }, nil + case mysql.TypeTiDBVectorFloat32: + return avroSchema{ + Type: "string", + Parameters: map[string]string{tidbType: tt}, + }, nil default: log.Error("unknown mysql type", zap.Any("mysqlType", col.Type)) return nil, cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type") @@ -971,6 +979,11 @@ func (a *BatchEncoder) columnToAvroData( return int32(n), "int", nil } return int32(col.Value.(int64)), "int", nil + case mysql.TypeTiDBVectorFloat32: + if vec, ok := col.Value.(types.VectorFloat32); ok { + return vec.String(), "string", nil + } + return nil, "", cerror.ErrAvroEncodeFailed default: log.Error("unknown mysql type", zap.Any("value", col.Value), zap.Any("mysqlType", col.Type)) return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type") diff --git a/pkg/sink/codec/canal/canal_entry.go b/pkg/sink/codec/canal/canal_entry.go index e4724a525c1..ac58f8d1d7e 100644 --- a/pkg/sink/codec/canal/canal_entry.go +++ b/pkg/sink/codec/canal/canal_entry.go @@ -24,6 +24,7 @@ import ( mm "github.com/pingcap/tidb/pkg/parser/model" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/common" @@ -109,6 +110,8 @@ func (b *canalEntryBuilder) formatValue(value interface{}, isBinary bool) (resul } else { result = string(v) } + case types.VectorFloat32: + result = v.String() default: result = fmt.Sprintf("%v", v) } diff --git a/pkg/sink/codec/canal/canal_entry_test.go b/pkg/sink/codec/canal/canal_entry_test.go index 385c595dfcf..43ef22bec0c 100644 --- a/pkg/sink/codec/canal/canal_entry_test.go +++ b/pkg/sink/codec/canal/canal_entry_test.go @@ -35,10 +35,11 @@ func TestInsert(t *testing.T) { name varchar(32), tiny tinyint, comment text, - bb blob)` + bb blob, + vec vector(5))` _ = helper.DDL2Event(sql) - event := helper.DML2Event(`insert into test.t values(1, "Bob", 127, "测试", "测试blob")`, "test", "t") + event := helper.DML2Event(`insert into test.t values(1, "Bob", 127, "测试", "测试blob", '[1,2,3,4,5]')`, "test", "t") codecConfig := common.NewConfig(config.ProtocolCanalJSON) builder := newCanalEntryBuilder(codecConfig) @@ -97,6 +98,13 @@ func TestInsert(t *testing.T) { require.NoError(t, err) require.Equal(t, "测试blob", s) require.Equal(t, "blob", col.GetMysqlType()) + case "vec": + require.Equal(t, int32(internal.JavaSQLTypeVARCHAR), col.GetSqlType()) + require.False(t, col.GetIsKey()) + require.False(t, col.GetIsNull()) + require.NoError(t, err) + require.Equal(t, "[1,2,3,4,5]", col.GetValue()) + require.Equal(t, "vector", col.GetMysqlType()) } } } diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index 3072af553dd..64e7b438188 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -287,6 +287,7 @@ func canalJSONFormatColumn(value interface{}, name string, mysqlTypeStr string) if err != nil { log.Panic("invalid column value for double", zap.Any("col", result), zap.Error(err)) } + case mysql.TypeTiDBVectorFloat32: } result.Value = value diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 9a6fafd22ba..d8682314733 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "testing" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/compression" @@ -88,7 +89,12 @@ func TestDMLE2E(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } err = encoder.AppendRowChangedEvent(ctx, "", updateEvent, func() {}) @@ -256,7 +262,12 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } } } @@ -667,7 +678,12 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := obtainedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } } @@ -722,7 +738,12 @@ func TestNewCanalJSONBatchDecoder4RowMessage(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } _, hasNext, _ = decoder.HasNext() diff --git a/pkg/sink/codec/common/verify_checksum.go b/pkg/sink/codec/common/verify_checksum.go index 2557b39ef8c..49c0ba37724 100644 --- a/pkg/sink/codec/common/verify_checksum.go +++ b/pkg/sink/codec/common/verify_checksum.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" @@ -215,6 +216,9 @@ func buildChecksumBytes(buf []byte, value interface{}, mysqlType byte) ([]byte, // this should not happen, does not take into the checksum calculation. case mysql.TypeNull, mysql.TypeGeometry: // do nothing + case mysql.TypeTiDBVectorFloat32: + vec, _ := types.ParseVectorFloat32(value.(string)) + buf = vec.SerializeTo(buf) default: return buf, errors.New("invalid type for the checksum calculation") } diff --git a/pkg/sink/codec/craft/message_decoder.go b/pkg/sink/codec/craft/message_decoder.go index 81d7bd88509..016f8fd74f2 100644 --- a/pkg/sink/codec/craft/message_decoder.go +++ b/pkg/sink/codec/craft/message_decoder.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" ) @@ -353,6 +354,10 @@ func DecodeTiDBType(ty byte, flag model.ColumnFlagType, bits []byte) (interface{ fallthrough case mysql.TypeGeometry: return nil, nil + case mysql.TypeTiDBVectorFloat32: + if val, err := types.ParseVectorFloat32(string(bits)); err != nil { + return val, nil + } } return nil, nil } diff --git a/pkg/sink/codec/craft/message_encoder.go b/pkg/sink/codec/craft/message_encoder.go index 92fd3b3d5e4..8f887226bdc 100644 --- a/pkg/sink/codec/craft/message_encoder.go +++ b/pkg/sink/codec/craft/message_encoder.go @@ -19,6 +19,7 @@ import ( "unsafe" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" ) @@ -220,6 +221,9 @@ func EncodeTiDBType(allocator *SliceAllocator, ty byte, flag model.ColumnFlagTyp fallthrough case mysql.TypeGeometry: return nil + case mysql.TypeTiDBVectorFloat32: + vec := value.(types.VectorFloat32) + return []byte(vec.String()) } return nil } diff --git a/pkg/sink/codec/csv/csv_message.go b/pkg/sink/codec/csv/csv_message.go index 32486b2fe82..087daa9ce8a 100644 --- a/pkg/sink/codec/csv/csv_message.go +++ b/pkg/sink/codec/csv/csv_message.go @@ -362,6 +362,11 @@ func fromColValToCsvVal(csvConfig *common.Config, col *model.Column, ft *types.F return nil, cerror.WrapError(cerror.ErrCSVEncodeFailed, err) } return setVar.Name, nil + case mysql.TypeTiDBVectorFloat32: + if vec, ok := col.Value.(types.VectorFloat32); ok { + return vec.String(), nil + } + return nil, cerror.ErrCSVEncodeFailed default: return col.Value, nil } @@ -440,7 +445,6 @@ func csvMsg2RowChangedEvent(csvConfig *common.Config, csvMsg *csvMessage, tableI if err != nil { return nil, err } - return e, nil } diff --git a/pkg/sink/codec/csv/csv_message_test.go b/pkg/sink/codec/csv/csv_message_test.go index 57fe5f96010..3ce686bcb33 100644 --- a/pkg/sink/codec/csv/csv_message_test.go +++ b/pkg/sink/codec/csv/csv_message_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -629,6 +630,19 @@ var csvTestColumnsGroup = [][]*csvTestColumnTuple{ config.BinaryEncodingBase64, }, }, + { + { + model.Column{Name: "vectorfloat32", Value: util.Must(types.ParseVectorFloat32("[1,2,3,4,5]")), Type: mysql.TypeTiDBVectorFloat32}, + rowcodec.ColInfo{ + ID: 37, + IsPKHandle: false, + VirtualGenCol: false, + Ft: types.NewFieldType(mysql.TypeTiDBVectorFloat32), + }, + "[1,2,3,4,5]", + config.BinaryEncodingBase64, + }, + }, } func setBinChsClnFlag(ft *types.FieldType) *types.FieldType { diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index 22a9bca7940..75c3ada818a 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -240,6 +240,13 @@ func (c *dbzCodec) writeDebeziumFieldSchema( writer.WriteStringField("field", col.Name) }) + case mysql.TypeTiDBVectorFloat32: + writer.WriteObjectElement(func() { + writer.WriteStringField("type", "string") + writer.WriteBoolField("optional", !mysql.HasNotNullFlag(ft.GetFlag())) + writer.WriteStringField("name", "io.debezium.data.TiDBVectorFloat32") + writer.WriteStringField("field", col.Name) + }) default: log.Warn( "meet unsupported field type", @@ -502,6 +509,11 @@ func (c *dbzCodec) writeDebeziumFieldValue( return nil } + case mysql.TypeTiDBVectorFloat32: + v := col.Value.(types.VectorFloat32).String() + writer.WriteStringField(col.Name, v) + return nil + // Note: Although Debezium's doc claims to use INT32 for INT, but it // actually uses INT64. Debezium also uses INT32 for SMALLINT. // So we only handle with TypeLonglong here. diff --git a/pkg/sink/codec/internal/column.go b/pkg/sink/codec/internal/column.go index 5aefe9d9a16..0070f3f138c 100644 --- a/pkg/sink/codec/internal/column.go +++ b/pkg/sink/codec/internal/column.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" "go.uber.org/zap" ) @@ -61,6 +62,8 @@ func (c *Column) FromRowChangeColumn(col *model.Column) { str = str[1 : len(str)-1] } c.Value = str + case mysql.TypeTiDBVectorFloat32: + c.Value = col.Value.(types.VectorFloat32).String() default: c.Value = col.Value } @@ -98,8 +101,8 @@ func (c *Column) ToRowChangeColumn(name string) *model.Column { zap.Any("col", c), zap.Error(err)) } col.Value = uint64(val) + case mysql.TypeTiDBVectorFloat32: default: - col.Value = c.Value } return col } diff --git a/pkg/sink/codec/internal/java.go b/pkg/sink/codec/internal/java.go index 38ec2e33f6c..b25421fb6ba 100644 --- a/pkg/sink/codec/internal/java.go +++ b/pkg/sink/codec/internal/java.go @@ -143,6 +143,9 @@ func MySQLType2JavaType(mysqlType byte, isBinary bool) JavaSQLType { case mysql.TypeJSON: return JavaSQLTypeVARCHAR + case mysql.TypeTiDBVectorFloat32: + return JavaSQLTypeVARCHAR + default: return JavaSQLTypeVARCHAR } diff --git a/pkg/sink/codec/maxwell/maxwell_message.go b/pkg/sink/codec/maxwell/maxwell_message.go index 9d0de05db53..6b48f5dd0e7 100644 --- a/pkg/sink/codec/maxwell/maxwell_message.go +++ b/pkg/sink/codec/maxwell/maxwell_message.go @@ -18,6 +18,7 @@ import ( model2 "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/internal" @@ -84,6 +85,8 @@ func rowChangeToMaxwellMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) } else { value.Old[colName] = string(v.Value.([]byte)) } + case mysql.TypeTiDBVectorFloat32: + value.Old[colName] = v.Value.(types.VectorFloat32).String() default: value.Old[colName] = v.Value } @@ -102,6 +105,8 @@ func rowChangeToMaxwellMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) } else { value.Data[colName] = string(v.Value.([]byte)) } + case mysql.TypeTiDBVectorFloat32: + value.Data[colName] = v.Value.(types.VectorFloat32).String() default: value.Data[colName] = v.Value } @@ -129,6 +134,11 @@ func rowChangeToMaxwellMsg(e *model.RowChangedEvent, onlyHandleKeyColumns bool) value.Old[colName] = string(v.Value.([]byte)) } } + case mysql.TypeTiDBVectorFloat32: + val := v.Value.(types.VectorFloat32).String() + if value.Old[colName] != val { + value.Old[colName] = val + } default: if value.Data[colName] != v.Value { value.Old[colName] = v.Value @@ -282,6 +292,8 @@ func columnToMaxwellType(columnType byte) (string, error) { return "float", nil case mysql.TypeNewDecimal: return "decimal", nil + case mysql.TypeTiDBVectorFloat32: + return "string", nil default: return "", cerror.ErrMaxwellInvalidData.GenWithStack("unsupported column type - %v", columnType) } diff --git a/pkg/sink/codec/open/open_protocol_encoder_test.go b/pkg/sink/codec/open/open_protocol_encoder_test.go index f1833795999..5ad82506bd2 100644 --- a/pkg/sink/codec/open/open_protocol_encoder_test.go +++ b/pkg/sink/codec/open/open_protocol_encoder_test.go @@ -18,6 +18,7 @@ import ( "database/sql" "testing" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/compression" @@ -420,7 +421,12 @@ func TestE2EClaimCheckMessage(t *testing.T) { colName := insertEvent.TableInfo.ForceGetColumnName(column.ColumnID) decodedColumn, ok := decodedColumns[colName] require.True(t, ok) - require.Equal(t, column.Value, decodedColumn.Value, colName) + switch v := column.Value.(type) { + case types.VectorFloat32: + require.Equal(t, v.String(), decodedColumn.Value, colName) + default: + require.Equal(t, v, decodedColumn.Value, colName) + } } } diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 5536bf4a8c4..59fb4fef7ea 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -27,6 +27,7 @@ import ( "github.com/golang/mock/gomock" timodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/compression" @@ -1387,7 +1388,12 @@ func TestEncodeLargeEventsNormal(t *testing.T) { colName := event.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } decodedPreviousColumns := make(map[string]*model.ColumnData, len(decodedRow.PreColumns)) @@ -1399,7 +1405,12 @@ func TestEncodeLargeEventsNormal(t *testing.T) { colName := event.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedPreviousColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value) + default: + require.EqualValues(t, v, decoded.Value) + } } } } @@ -1541,7 +1552,12 @@ func TestLargerMessageHandleClaimCheck(t *testing.T) { colName := updateEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value, colName) + default: + require.EqualValues(t, v, decoded.Value, colName) + } } for _, column := range decodedRow.PreColumns { @@ -1552,7 +1568,12 @@ func TestLargerMessageHandleClaimCheck(t *testing.T) { colName := updateEvent.TableInfo.ForceGetColumnName(col.ColumnID) decoded, ok := decodedColumns[colName] require.True(t, ok) - require.EqualValues(t, col.Value, decoded.Value) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.EqualValues(t, v.String(), decoded.Value, colName) + default: + require.EqualValues(t, v, decoded.Value, colName) + } } } } @@ -1723,11 +1744,18 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { case []byte: length := len(decoded.Value.([]uint8)) require.Equal(t, v[:length], decoded.Value, colName) + case types.VectorFloat32: + require.Equal(t, v.String(), decoded.Value, colName) default: - require.EqualValues(t, col.Value, decoded.Value, colName) + require.Equal(t, col.Value, decoded.Value, colName) } } else { - require.EqualValues(t, col.Value, decoded.Value, colName) + switch v := col.Value.(type) { + case []byte: + require.Equal(t, string(v), decoded.Value, colName) + default: + require.Equal(t, v, decoded.Value, colName) + } } } @@ -1746,11 +1774,20 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) { case []byte: length := len(decoded.Value.([]uint8)) require.Equal(t, v[:length], decoded.Value, colName) + case types.VectorFloat32: + require.Equal(t, v.String(), decoded.Value, colName) default: - require.EqualValues(t, col.Value, decoded.Value, colName) + require.Equal(t, col.Value, decoded.Value, colName) } } else { - require.EqualValues(t, col.Value, decoded.Value, colName) + switch v := col.Value.(type) { + case types.VectorFloat32: + require.Equal(t, v.String(), decoded.Value, colName) + case []byte: + require.Equal(t, string(v), decoded.Value, colName) + default: + require.Equal(t, v, decoded.Value, colName) + } } } } diff --git a/pkg/sink/codec/simple/message.go b/pkg/sink/codec/simple/message.go index fe7ab25125b..4b0ff09d107 100644 --- a/pkg/sink/codec/simple/message.go +++ b/pkg/sink/codec/simple/message.go @@ -402,7 +402,7 @@ func buildRowChangedEvent( err := common.VerifyChecksum(result, db) if err != nil || msg.Checksum.Corrupted { log.Warn("consumer detect checksum corrupted", - zap.String("schema", msg.Schema), zap.String("table", msg.Table)) + zap.String("schema", msg.Schema), zap.String("table", msg.Table), zap.Error(err)) return nil, cerror.ErrDecodeFailed.GenWithStackByArgs("checksum corrupted") } @@ -638,6 +638,8 @@ func (a *avroMarshaller) encodeValue4Avro( return v, "double" case string: return v, "string" + case tiTypes.VectorFloat32: + return v.String(), "string" default: log.Panic("unexpected type for avro value", zap.Any("value", value)) } @@ -650,7 +652,6 @@ func encodeValue( if value == nil { return nil } - var err error switch ft.GetType() { case mysql.TypeBit: @@ -714,6 +715,8 @@ func encodeValue( } else { result = string(v) } + case tiTypes.VectorFloat32: + result = v.String() default: result = fmt.Sprintf("%v", v) } @@ -754,7 +757,18 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *mode case int64: value = uint64(v) } - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeInt24, mysql.TypeYear: + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeInt24: + switch v := value.(type) { + case string: + if mysql.HasUnsignedFlag(fieldType.GetFlag()) { + value, err = strconv.ParseUint(v, 10, 64) + } else { + value, err = strconv.ParseInt(v, 10, 64) + } + default: + value = v + } + case mysql.TypeYear: switch v := value.(type) { case string: value, err = strconv.ParseInt(v, 10, 64) @@ -764,9 +778,10 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *mode case mysql.TypeLonglong: switch v := value.(type) { case string: - value, err = strconv.ParseInt(v, 10, 64) - if err != nil { + if mysql.HasUnsignedFlag(fieldType.GetFlag()) { value, err = strconv.ParseUint(v, 10, 64) + } else { + value, err = strconv.ParseInt(v, 10, 64) } case map[string]interface{}: value = uint64(v["value"].(int64)) @@ -776,7 +791,9 @@ func decodeColumn(value interface{}, id int64, fieldType *types.FieldType) *mode case mysql.TypeFloat: switch v := value.(type) { case string: - value, err = strconv.ParseFloat(v, 32) + var val float64 + val, err = strconv.ParseFloat(v, 32) + value = float32(val) default: value = v } diff --git a/pkg/sink/codec/utils/test_utils.go b/pkg/sink/codec/utils/test_utils.go index 4385b411d18..18c38c89186 100644 --- a/pkg/sink/codec/utils/test_utils.go +++ b/pkg/sink/codec/utils/test_utils.go @@ -81,6 +81,7 @@ func NewLargeEvent4Test(t *testing.T, replicaConfig *config.ReplicaConfig) (*mod enumT enum('a', 'b', 'c') default 'b', setT set('a', 'b', 'c') default 'c', bitT bit(10) default b'1010101010', + vectorT vector(5), jsonT json)` ddlEvent := helper.DDL2Event(sql) @@ -138,6 +139,7 @@ func NewLargeEvent4Test(t *testing.T, replicaConfig *config.ReplicaConfig) (*mod 'a', 'b', 65, + '[1,2,3,4,5]', '{"key1": "value1"}')` insert := helper.DML2Event(sql, "test", "t") @@ -217,5 +219,6 @@ var LargeTableColumns = map[string]interface{}{ "enumT": []uint8("a"), "setT": []uint8("b"), "bitT": []uint8{65}, + "vectorT": []uint8("[1,2,3,4,5]"), "jsonT": []uint8("{\"key1\": \"value1\"}"), } diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 4adf68639a8..9e9f149eee3 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -77,6 +77,8 @@ const ( // defaultcachePrepStmts is the default value of cachePrepStmts defaultCachePrepStmts = true + + defaultHasVectorType = false ) type urlConfig struct { @@ -96,6 +98,7 @@ type urlConfig struct { EnableBatchDML *bool `form:"batch-dml-enable"` EnableMultiStatement *bool `form:"multi-stmt-enable"` EnableCachePreparedStatement *bool `form:"cache-prep-stmts"` + HasVectorType *bool `form:"has-vector-type"` } // Config is the configs for MySQL backend. @@ -117,6 +120,7 @@ type Config struct { // IsBDRModeSupported is true if the downstream is TiDB and write source is existed. // write source exists when the downstream is TiDB and version is greater than or equal to v6.5.0. IsWriteSourceExisted bool + HasVectorType bool // HasVectorType is true if the column is vector type SourceID uint64 BatchDMLEnable bool @@ -140,6 +144,7 @@ func NewConfig() *Config { MultiStmtEnable: defaultMultiStmtEnable, CachePrepStmts: defaultCachePrepStmts, SourceID: config.DefaultTiDBSourceID, + HasVectorType: defaultHasVectorType, } } @@ -197,6 +202,7 @@ func (c *Config) Apply( } getBatchDMLEnable(urlParameter, &c.BatchDMLEnable) + getHasVectorType(urlParameter, &c.HasVectorType) getMultiStmtEnable(urlParameter, &c.MultiStmtEnable) getCachePrepStmts(urlParameter, &c.CachePrepStmts) c.ForceReplicate = replicaConfig.ForceReplicate @@ -446,6 +452,12 @@ func getBatchDMLEnable(values *urlConfig, batchDMLEnable *bool) { } } +func getHasVectorType(values *urlConfig, hasVectorType *bool) { + if values.HasVectorType != nil { + *hasVectorType = *values.HasVectorType + } +} + func getMultiStmtEnable(values *urlConfig, multiStmtEnable *bool) { if values.EnableMultiStatement != nil { *multiStmtEnable = *values.EnableMultiStatement diff --git a/scripts/download-integration-test-binaries.sh b/scripts/download-integration-test-binaries.sh index 537dd260c1b..04b4aaf78b5 100755 --- a/scripts/download-integration-test-binaries.sh +++ b/scripts/download-integration-test-binaries.sh @@ -108,26 +108,41 @@ function download_community_binaries() { chmod a+x third_bin/* } +function get_sha1() { + local repo="$1" + local branch="$2" + file_server_url="http://fileserver.pingcap.net" + sha1=$(curl -s "${file_server_url}/download/refs/pingcap/${repo}/${branch}/sha1") + if [ $? -ne 0 ] || echo "$sha1" | grep -q "Error"; then + echo "Failed to get sha1 with repo ${repo} branch ${branch}: $sha1. use branch master to instead" >&2 + branch=master + sha1=$(curl -s "${file_server_url}/download/refs/pingcap/${repo}/${branch}/sha1") + fi + echo "$branch:$sha1" +} + function download_binaries() { color-green "Download binaries..." # PingCAP file server URL. file_server_url="http://fileserver.pingcap.net" # Get sha1 based on branch name. - tidb_sha1=$(curl "${file_server_url}/download/refs/pingcap/tidb/${branch}/sha1") - tikv_sha1=$(curl "${file_server_url}/download/refs/pingcap/tikv/${branch}/sha1") - pd_sha1=$(curl "${file_server_url}/download/refs/pingcap/pd/${branch}/sha1") - tiflash_sha1=$(curl "${file_server_url}/download/refs/pingcap/tiflash/${branch}/sha1") + tidb_sha1=$(get_sha1 "tidb" "$branch" | cut -d':' -f2) + tikv_sha1=$(get_sha1 "tikv" "$branch" | cut -d':' -f2) + pd_sha1=$(get_sha1 "pd" "$branch" | cut -d':' -f2) + tiflash_branch_sha1=$(get_sha1 "tiflash" "$branch") + tiflash_branch=$(echo "$tiflash_branch_sha1" | cut -d':' -f1) + tiflash_sha1=$(echo "$tiflash_branch_sha1" | cut -d':' -f2) # All download links. tidb_download_url="${file_server_url}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz" tikv_download_url="${file_server_url}/download/builds/pingcap/tikv/${tikv_sha1}/centos7/tikv-server.tar.gz" pd_download_url="${file_server_url}/download/builds/pingcap/pd/${pd_sha1}/centos7/pd-server.tar.gz" - tiflash_download_url="${file_server_url}/download/builds/pingcap/tiflash/${branch}/${tiflash_sha1}/centos7/tiflash.tar.gz" + tiflash_download_url="${file_server_url}/download/builds/pingcap/tiflash/${tiflash_branch}/${tiflash_sha1}/centos7/tiflash.tar.gz" minio_download_url="${file_server_url}/download/minio.tar.gz" go_ycsb_download_url="${file_server_url}/download/builds/pingcap/go-ycsb/test-br/go-ycsb" etcd_download_url="${file_server_url}/download/builds/pingcap/cdc/etcd-v3.4.7-linux-amd64.tar.gz" - sync_diff_inspector_url="${file_server_url}/download/builds/pingcap/cdc/sync_diff_inspector_hash-79f1fd1e_linux-amd64.tar.gz" + sync_diff_inspector_url="${file_server_url}/download/builds/pingcap/cdc/sync_diff_inspector_hash-a129f096_linux-amd64.tar.gz" jq_download_url="${file_server_url}/download/builds/pingcap/test/jq-1.6/jq-linux64" schema_registry_url="${file_server_url}/download/builds/pingcap/cdc/schema-registry.tar.gz" diff --git a/tests/integration_tests/kafka_simple_basic/data/data.sql b/tests/integration_tests/kafka_simple_basic/data/data.sql index 71709dbafed..426c352b589 100644 --- a/tests/integration_tests/kafka_simple_basic/data/data.sql +++ b/tests/integration_tests/kafka_simple_basic/data/data.sql @@ -179,7 +179,7 @@ alter table tp_time add column c_timestamp2 timestamp default now(); insert into tp_time(c_date, c_datetime, c_timestamp, c_time, c_year) values ('2024-03-09', '2022-02-22 22:22:22', '2020-02-20 02:20:20', '02:20:20', '2021'); -delete from tp_time where id in (2, 3, 4); +delete from tp_time where id in (1, 2, 3, 4, 5); alter table tp_time alter column c_timestamp2 drop default; diff --git a/tests/integration_tests/kafka_simple_basic_avro/data/data.sql b/tests/integration_tests/kafka_simple_basic_avro/data/data.sql index d52461d6c92..8317cf4ee32 100644 --- a/tests/integration_tests/kafka_simple_basic_avro/data/data.sql +++ b/tests/integration_tests/kafka_simple_basic_avro/data/data.sql @@ -231,7 +231,7 @@ alter table tp_time add column c_timestamp2 timestamp default now(); insert into tp_time(c_date, c_datetime, c_timestamp, c_time, c_year) values ('2024-03-09', '2022-02-22 22:22:22', '2020-02-20 02:20:20', '02:20:20', '2021'); -delete from tp_time where id in (2, 3, 4); +delete from tp_time where id in (1, 2, 3, 4, 5); alter table tp_time alter column c_timestamp2 drop default; diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 991ba3d188a..df2ab20ed4d 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -42,7 +42,7 @@ groups=( # G04 'foreign_key ddl_puller_lag ddl_only_block_related_table changefeed_auto_stop' # G05 - 'charset_gbk ddl_manager multi_source' + 'charset_gbk ddl_manager multi_source vector' # G06 'sink_retry changefeed_error ddl_sequence resourcecontrol' # G07 pulsar oauth2 authentication enabled diff --git a/tests/integration_tests/vector/conf/diff_config.toml b/tests/integration_tests/vector/conf/diff_config.toml new file mode 100644 index 00000000000..d37f42f32c8 --- /dev/null +++ b/tests/integration_tests/vector/conf/diff_config.toml @@ -0,0 +1,30 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/vector/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/vector/data/data.sql b/tests/integration_tests/vector/data/data.sql new file mode 100644 index 00000000000..daa94d721ee --- /dev/null +++ b/tests/integration_tests/vector/data/data.sql @@ -0,0 +1,34 @@ +DROP DATABASE IF EXISTS test; +CREATE DATABASE test; +use test; +DROP TABLE IF EXISTS test.simple1; +DROP TABLE IF EXISTS test.simple2; + +CREATE TABLE test.simple1(id int primary key, data VECTOR(5)); +-- CREATE VECTOR INDEX idx_name1 USING HNSW ON test.simple1(VEC_COSINE_DISTANCE(data)) ; +INSERT INTO test.simple1(id, data) VALUES (1, "[1,2,3,4,5]"); +INSERT INTO test.simple1(id, data) VALUES (2, '[2,3,4,5,6]'); +INSERT INTO test.simple1(id, data) VALUES (3, '[0.1,0.2,0.3,0.4,0.5]'); +INSERT INTO test.simple1(id, data) VALUES (4, '[0,-0.1,-2,2,0.1]'); + + +CREATE TABLE test.simple2(id int primary key, data VECTOR(5), embedding VECTOR(5) COMMENT "hnsw(distance=cosine)"); +INSERT INTO test.simple2(id, data, embedding) VALUES (1, '[1,2,3,4,5]','[1,2,3,4,5]'); +INSERT INTO test.simple2(id, data, embedding) VALUES (2, '[2,3,4,5,6]','[1,2,3,4,5]'); +INSERT INTO test.simple2(id, data, embedding) VALUES (3, '[0.1,0.2,0.3,0.4,0.5]','[1,2,3,4,5]'); +INSERT INTO test.simple2(id, data, embedding) VALUES (4, '[0,-0.1,-2,2,0.1]','[1,2,3,4,5]'); + +DELETE FROM test.simple1 where id=1; +DELETE FROM test.simple2 where id=1; +DELETE FROM test.simple1 where id=2; +DELETE FROM test.simple2 where id=2; + +UPDATE test.simple1 SET data = '[0,-0.1,-2,2.0,0.1]' WHERE id = 3; +UPDATE test.simple2 SET data = '[0,-0.1,-2,2.0,0.1]' WHERE id = 3; + +ALTER TABLE test.simple1 ADD column embedding VECTOR(3) COMMENT "hnsw(distance=cosine)"; +INSERT INTO test.simple1 (id, data, embedding) VALUES (5, '[1,2,3,4,5]', '[1,2,3]'); + +ALTER TABLE test.simple2 DROP column embedding; + +CREATE TABLE test.finish_mark(id int primary key); \ No newline at end of file diff --git a/tests/integration_tests/vector/run.sh b/tests/integration_tests/vector/run.sh new file mode 100755 index 00000000000..aedcd2bccec --- /dev/null +++ b/tests/integration_tests/vector/run.sh @@ -0,0 +1,44 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + TOPIC_NAME="vector-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&max-message-bytes=10485760" ;; + storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;; + esac + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + case $SINK_TYPE in + kafka) run_kafka_consumer $WORK_DIR $SINK_URI ;; + storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;; + esac + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + sleep 3 + # sync_diff can't check non-exist table, so we check expected tables are created in downstream first + check_table_exists test.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"