diff --git a/go.mod b/go.mod index 9eff0323dba5..3611b7e90a4e 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.7 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 github.com/minio/minio-go/v7 v7.0.61 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 @@ -56,6 +56,7 @@ require ( ) require ( + cloud.google.com/go/storage v1.43.0 github.com/bits-and-blooms/bitset v1.10.0 github.com/bytedance/sonic v1.12.2 github.com/cenkalti/backoff/v4 v4.2.1 @@ -70,6 +71,7 @@ require ( github.com/valyala/fastjson v1.6.4 github.com/zeebo/xxh3 v1.0.2 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 + google.golang.org/api v0.187.0 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -80,7 +82,6 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/iam v1.1.8 // indirect - cloud.google.com/go/storage v1.43.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.1 // indirect github.com/AthenZ/athenz v1.10.39 // indirect @@ -246,7 +247,6 @@ require ( golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gonum.org/v1/gonum v0.11.0 // indirect - google.golang.org/api v0.187.0 // indirect google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect diff --git a/go.sum b/go.sum index b5108edda424..8adc52e61522 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1 cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= cloud.google.com/go/iam v1.1.8 h1:r7umDwhj+BQyz0ScZMp4QrGXjSTI3ZINnpgU2nlB/K0= cloud.google.com/go/iam v1.1.8/go.mod h1:GvE6lyMmfxXauzNq8NbgJbeVQNspG+tcdL/W8QO1+zE= +cloud.google.com/go/longrunning v0.5.7 h1:WLbHekDbjK1fVFD3ibpFFVoyizlLRl73I7YKuAKilhU= +cloud.google.com/go/longrunning v0.5.7/go.mod h1:8GClkudohy1Fxm3owmBGid8W0pSgodEMwEAztp38Xng= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= @@ -412,9 +414,12 @@ github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= +github.com/google/martian/v3 v3.3.3 h1:DIhPTQrbPkgs2yJYdXU/eNACCG5DVQjySNRNlflZ9Fc= +github.com/google/martian/v3 v3.3.3/go.mod h1:iEPrYcgCF7jA9OtScMFQyAlZZ4YXTKEtJ1E6RWzmBA0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= @@ -620,8 +625,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497 h1:t4sQMbSy05p8qgMGvEGyLYYLoZ9fD1dushS1bj5X6+0= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 h1:Fwxpg98128gfWRbQ1A3PMP9o2IfYZk7RSEy8rcoCWDA= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= @@ -1143,8 +1148,6 @@ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo= -golang.org/x/oauth2 v0.20.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1417,12 +1420,8 @@ google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaE google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= -google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ= -google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY= google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d h1:PksQg4dV6Sem3/HkBX+Ltq8T0ke0PKIRBNBatoDTVls= google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:s7iA721uChleev562UJO2OYB0PPT9CMFjV+Ce7VJH5M= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 h1:MuYw1wJzT+ZkybKfaOXKp5hJiZDn2iHaXRw0mRYdHSc= google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4/go.mod h1:px9SlOOZBg1wM1zdnr8jEL4CNGUBZ+ZKYtNPApNQc4c= google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q= diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 49254d1a8be0..bb006b573217 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -23,8 +23,6 @@ import ( "strconv" "time" - "github.com/milvus-io/milvus/internal/storage" - "github.com/cockroachdb/errors" "github.com/samber/lo" "go.opentelemetry.io/otel/trace" @@ -35,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 6a11089741df..f3cfdfaa5775 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -24,8 +24,6 @@ import ( "testing" "time" - "github.com/milvus-io/milvus/pkg/util/metautil" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/atomic" @@ -41,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metautil" ) func TestClusteringCompactionTaskSuite(t *testing.T) { diff --git a/internal/proxy/search_util.go b/internal/proxy/search_util.go index d499872b19ba..28276dc4145e 100644 --- a/internal/proxy/search_util.go +++ b/internal/proxy/search_util.go @@ -75,22 +75,29 @@ func (r *rankParams) String() string { return fmt.Sprintf("limit: %d, offset: %d, roundDecimal: %d", r.GetLimit(), r.GetOffset(), r.GetRoundDecimal()) } +type SearchInfo struct { + planInfo *planpb.QueryInfo + offset int64 + parseError error + isIterator bool +} + // parseSearchInfo returns QueryInfo and offset -func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb.CollectionSchema, rankParams *rankParams) (*planpb.QueryInfo, int64, error) { +func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb.CollectionSchema, rankParams *rankParams) *SearchInfo { var topK int64 isAdvanced := rankParams != nil externalLimit := rankParams.GetLimit() + rankParams.GetOffset() topKStr, err := funcutil.GetAttrByKeyFromRepeatedKV(TopKKey, searchParamsPair) if err != nil { if externalLimit <= 0 { - return nil, 0, fmt.Errorf("%s is required", TopKKey) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s is required", TopKKey)} } topK = externalLimit } else { topKInParam, err := strconv.ParseInt(topKStr, 0, 64) if err != nil { if externalLimit <= 0 { - return nil, 0, fmt.Errorf("%s [%s] is invalid", TopKKey, topKStr) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%s] is invalid", TopKKey, topKStr)} } topK = externalLimit } else { @@ -106,7 +113,7 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb // 2. GetAsInt64 has cached inside, no need to worry about cpu cost for parsing here topK = Params.QuotaConfig.TopKLimit.GetAsInt64() } else { - return nil, 0, fmt.Errorf("%s [%d] is invalid, %w", TopKKey, topK, err) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%d] is invalid, %w", TopKKey, topK, err)} } } @@ -117,12 +124,12 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb if err == nil { offset, err = strconv.ParseInt(offsetStr, 0, 64) if err != nil { - return nil, 0, fmt.Errorf("%s [%s] is invalid", OffsetKey, offsetStr) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%s] is invalid", OffsetKey, offsetStr)} } if offset != 0 { if err := validateLimit(offset); err != nil { - return nil, 0, fmt.Errorf("%s [%d] is invalid, %w", OffsetKey, offset, err) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%d] is invalid, %w", OffsetKey, offset, err)} } } } @@ -130,7 +137,7 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb queryTopK := topK + offset if err := validateLimit(queryTopK); err != nil { - return nil, 0, fmt.Errorf("%s+%s [%d] is invalid, %w", OffsetKey, TopKKey, queryTopK, err) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s+%s [%d] is invalid, %w", OffsetKey, TopKKey, queryTopK, err)} } // 2. parse metrics type @@ -147,11 +154,11 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb roundDecimal, err := strconv.ParseInt(roundDecimalStr, 0, 64) if err != nil { - return nil, 0, fmt.Errorf("%s [%s] is invalid, should be -1 or an integer in range [0, 6]", RoundDecimalKey, roundDecimalStr) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%s] is invalid, should be -1 or an integer in range [0, 6]", RoundDecimalKey, roundDecimalStr)} } if roundDecimal != -1 && (roundDecimal > 6 || roundDecimal < 0) { - return nil, 0, fmt.Errorf("%s [%s] is invalid, should be -1 or an integer in range [0, 6]", RoundDecimalKey, roundDecimalStr) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%s] is invalid, should be -1 or an integer in range [0, 6]", RoundDecimalKey, roundDecimalStr)} } // 4. parse search param str @@ -168,30 +175,35 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb } else { groupByInfo := parseGroupByInfo(searchParamsPair, schema) if groupByInfo.err != nil { - return nil, 0, groupByInfo.err + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: groupByInfo.err} } groupByFieldId, groupSize, groupStrictSize = groupByInfo.GetGroupByFieldId(), groupByInfo.GetGroupSize(), groupByInfo.GetGroupStrictSize() } // 6. parse iterator tag, prevent trying to groupBy when doing iteration or doing range-search if isIterator == "True" && groupByFieldId > 0 { - return nil, 0, merr.WrapErrParameterInvalid("", "", - "Not allowed to do groupBy when doing iteration") + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: merr.WrapErrParameterInvalid("", "", + "Not allowed to do groupBy when doing iteration")} } if strings.Contains(searchParamStr, radiusKey) && groupByFieldId > 0 { - return nil, 0, merr.WrapErrParameterInvalid("", "", - "Not allowed to do range-search when doing search-group-by") - } - - return &planpb.QueryInfo{ - Topk: queryTopK, - MetricType: metricType, - SearchParams: searchParamStr, - RoundDecimal: roundDecimal, - GroupByFieldId: groupByFieldId, - GroupSize: groupSize, - GroupStrictSize: groupStrictSize, - }, offset, nil + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: merr.WrapErrParameterInvalid("", "", + "Not allowed to do range-search when doing search-group-by")} + } + + return &SearchInfo{ + planInfo: &planpb.QueryInfo{ + Topk: queryTopK, + MetricType: metricType, + SearchParams: searchParamStr, + RoundDecimal: roundDecimal, + GroupByFieldId: groupByFieldId, + GroupSize: groupSize, + GroupStrictSize: groupStrictSize, + }, + offset: offset, + isIterator: isIterator == "True", + parseError: nil, + } } func getOutputFieldIDs(schema *schemaInfo, outputFields []string) (outputFieldIDs []UniqueID, err error) { diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index a79af7e7d752..8188274259de 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -79,6 +79,7 @@ type queryParams struct { limit int64 offset int64 reduceType reduce.IReduceType + isIterator bool } // translateToOutputFieldIDs translates output fields name to output fields id. @@ -178,7 +179,7 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e limitStr, err := funcutil.GetAttrByKeyFromRepeatedKV(LimitKey, queryParamsPair) // if limit is not provided if err != nil { - return &queryParams{limit: typeutil.Unlimited, reduceType: reduceType}, nil + return &queryParams{limit: typeutil.Unlimited, reduceType: reduceType, isIterator: isIterator}, nil } limit, err = strconv.ParseInt(limitStr, 0, 64) if err != nil { @@ -203,6 +204,7 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e limit: limit, offset: offset, reduceType: reduceType, + isIterator: isIterator, }, nil } @@ -461,6 +463,11 @@ func (t *queryTask) PreExecute(ctx context.Context) error { } } t.GuaranteeTimestamp = guaranteeTs + // need modify mvccTs and guaranteeTs for iterator specially + if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() > 0 { + t.MvccTimestamp = t.request.GetGuaranteeTimestamp() + t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp() + } deadline, ok := t.TraceCtx().Deadline() if ok { @@ -542,6 +549,10 @@ func (t *queryTask) PostExecute(ctx context.Context) error { t.result.OutputFields = t.userOutputFields metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Observe(float64(tr.RecordSpan().Milliseconds())) + if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() == 0 { + // first page for iteration, need to set up sessionTs for iterator + t.result.SessionTs = t.BeginTs() + } log.Debug("Query PostExecute done") return nil } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index ccd4e591ca81..341d0cd6ed9c 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -80,6 +80,8 @@ type searchTask struct { reScorers []reScorer rankParams *rankParams groupScorer func(group *Group) error + + isIterator bool } func (t *searchTask) CanSkipAllocTimestamp() bool { @@ -249,6 +251,10 @@ func (t *searchTask) PreExecute(ctx context.Context) error { } t.SearchRequest.GuaranteeTimestamp = guaranteeTs t.SearchRequest.ConsistencyLevel = consistencyLevel + if t.isIterator && t.request.GetGuaranteeTimestamp() > 0 { + t.MvccTimestamp = t.request.GetGuaranteeTimestamp() + t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp() + } if deadline, ok := t.TraceCtx().Deadline(); ok { t.SearchRequest.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0) @@ -351,7 +357,7 @@ func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error { t.SearchRequest.SubReqs = make([]*internalpb.SubSearchRequest, len(t.request.GetSubReqs())) t.queryInfos = make([]*planpb.QueryInfo, len(t.request.GetSubReqs())) for index, subReq := range t.request.GetSubReqs() { - plan, queryInfo, offset, err := t.tryGeneratePlan(subReq.GetSearchParams(), subReq.GetDsl()) + plan, queryInfo, offset, _, err := t.tryGeneratePlan(subReq.GetSearchParams(), subReq.GetDsl()) if err != nil { return err } @@ -443,11 +449,12 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error { log := log.Ctx(ctx).With(zap.Int64("collID", t.GetCollectionID()), zap.String("collName", t.collectionName)) // fetch search_growing from search param - plan, queryInfo, offset, err := t.tryGeneratePlan(t.request.GetSearchParams(), t.request.GetDsl()) + plan, queryInfo, offset, isIterator, err := t.tryGeneratePlan(t.request.GetSearchParams(), t.request.GetDsl()) if err != nil { return err } + t.isIterator = isIterator t.SearchRequest.Offset = offset if t.partitionKeyMode { @@ -490,38 +497,38 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error { return nil } -func (t *searchTask) tryGeneratePlan(params []*commonpb.KeyValuePair, dsl string) (*planpb.PlanNode, *planpb.QueryInfo, int64, error) { +func (t *searchTask) tryGeneratePlan(params []*commonpb.KeyValuePair, dsl string) (*planpb.PlanNode, *planpb.QueryInfo, int64, bool, error) { annsFieldName, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, params) if err != nil || len(annsFieldName) == 0 { vecFields := typeutil.GetVectorFieldSchemas(t.schema.CollectionSchema) if len(vecFields) == 0 { - return nil, nil, 0, errors.New(AnnsFieldKey + " not found in schema") + return nil, nil, 0, false, errors.New(AnnsFieldKey + " not found in schema") } if enableMultipleVectorFields && len(vecFields) > 1 { - return nil, nil, 0, errors.New("multiple anns_fields exist, please specify a anns_field in search_params") + return nil, nil, 0, false, errors.New("multiple anns_fields exist, please specify a anns_field in search_params") } annsFieldName = vecFields[0].Name } - queryInfo, offset, parseErr := parseSearchInfo(params, t.schema.CollectionSchema, t.rankParams) - if parseErr != nil { - return nil, nil, 0, parseErr + searchInfo := parseSearchInfo(params, t.schema.CollectionSchema, t.rankParams) + if searchInfo.parseError != nil { + return nil, nil, 0, false, searchInfo.parseError } annField := typeutil.GetFieldByName(t.schema.CollectionSchema, annsFieldName) - if queryInfo.GetGroupByFieldId() != -1 && annField.GetDataType() == schemapb.DataType_BinaryVector { - return nil, nil, 0, errors.New("not support search_group_by operation based on binary vector column") + if searchInfo.planInfo.GetGroupByFieldId() != -1 && annField.GetDataType() == schemapb.DataType_BinaryVector { + return nil, nil, 0, false, errors.New("not support search_group_by operation based on binary vector column") } - plan, planErr := planparserv2.CreateSearchPlan(t.schema.schemaHelper, dsl, annsFieldName, queryInfo) + plan, planErr := planparserv2.CreateSearchPlan(t.schema.schemaHelper, dsl, annsFieldName, searchInfo.planInfo) if planErr != nil { log.Warn("failed to create query plan", zap.Error(planErr), zap.String("dsl", dsl), // may be very large if large term passed. - zap.String("anns field", annsFieldName), zap.Any("query info", queryInfo)) - return nil, nil, 0, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", planErr) + zap.String("anns field", annsFieldName), zap.Any("query info", searchInfo.planInfo)) + return nil, nil, 0, false, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", planErr) } log.Debug("create query plan", zap.String("dsl", t.request.Dsl), // may be very large if large term passed. - zap.String("anns field", annsFieldName), zap.Any("query info", queryInfo)) - return plan, queryInfo, offset, nil + zap.String("anns field", annsFieldName), zap.Any("query info", searchInfo.planInfo)) + return plan, searchInfo.planInfo, searchInfo.offset, searchInfo.isIterator, nil } func (t *searchTask) tryParsePartitionIDsFromPlan(plan *planpb.PlanNode) ([]int64, error) { @@ -714,6 +721,10 @@ func (t *searchTask) PostExecute(ctx context.Context) error { } t.result.Results.OutputFields = t.userOutputFields t.result.CollectionName = t.request.GetCollectionName() + if t.isIterator && t.request.GetGuaranteeTimestamp() == 0 { + // first page for iteration, need to set up sessionTs for iterator + t.result.SessionTs = t.BeginTs() + } metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds())) diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 70799c9fde53..1dcdf0fe2d82 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -2235,11 +2235,11 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - info, offset, err := parseSearchInfo(test.validParams, nil, nil) - assert.NoError(t, err) - assert.NotNil(t, info) + searchInfo := parseSearchInfo(test.validParams, nil, nil) + assert.NoError(t, searchInfo.parseError) + assert.NotNil(t, searchInfo.planInfo) if test.description == "offsetParam" { - assert.Equal(t, targetOffset, offset) + assert.Equal(t, targetOffset, searchInfo.offset) } }) } @@ -2256,11 +2256,11 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { limit: externalLimit, } - info, offset, err := parseSearchInfo(offsetParam, nil, rank) - assert.NoError(t, err) - assert.NotNil(t, info) - assert.Equal(t, int64(10), info.GetTopk()) - assert.Equal(t, int64(0), offset) + searchInfo := parseSearchInfo(offsetParam, nil, rank) + assert.NoError(t, searchInfo.parseError) + assert.NotNil(t, searchInfo.planInfo) + assert.Equal(t, int64(10), searchInfo.planInfo.GetTopk()) + assert.Equal(t, int64(0), searchInfo.offset) }) t.Run("parseSearchInfo groupBy info for hybrid search", func(t *testing.T) { @@ -2309,15 +2309,15 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { Value: "true", }) - info, _, err := parseSearchInfo(params, schema, testRankParams) - assert.NoError(t, err) - assert.NotNil(t, info) + searchInfo := parseSearchInfo(params, schema, testRankParams) + assert.NoError(t, searchInfo.parseError) + assert.NotNil(t, searchInfo.planInfo) // all group_by related parameters should be aligned to parameters // set by main request rather than inner sub request - assert.Equal(t, int64(101), info.GetGroupByFieldId()) - assert.Equal(t, int64(3), info.GetGroupSize()) - assert.False(t, info.GetGroupStrictSize()) + assert.Equal(t, int64(101), searchInfo.planInfo.GetGroupByFieldId()) + assert.Equal(t, int64(3), searchInfo.planInfo.GetGroupSize()) + assert.False(t, searchInfo.planInfo.GetGroupStrictSize()) }) t.Run("parseSearchInfo error", func(t *testing.T) { @@ -2399,12 +2399,12 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - info, offset, err := parseSearchInfo(test.invalidParams, nil, nil) - assert.Error(t, err) - assert.Nil(t, info) - assert.Zero(t, offset) + searchInfo := parseSearchInfo(test.invalidParams, nil, nil) + assert.Error(t, searchInfo.parseError) + assert.Nil(t, searchInfo.planInfo) + assert.Zero(t, searchInfo.offset) - t.Logf("err=%s", err.Error()) + t.Logf("err=%s", searchInfo.parseError) }) } }) @@ -2426,9 +2426,9 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.Nil(t, info) - assert.ErrorIs(t, err, merr.ErrParameterInvalid) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.Nil(t, searchInfo.planInfo) + assert.ErrorIs(t, searchInfo.parseError, merr.ErrParameterInvalid) }) t.Run("check range-search and groupBy", func(t *testing.T) { normalParam := getValidSearchParams() @@ -2445,9 +2445,9 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.Nil(t, info) - assert.ErrorIs(t, err, merr.ErrParameterInvalid) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.Nil(t, searchInfo.planInfo) + assert.ErrorIs(t, searchInfo.parseError, merr.ErrParameterInvalid) }) t.Run("check nullable and groupBy", func(t *testing.T) { normalParam := getValidSearchParams() @@ -2464,9 +2464,9 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.Nil(t, info) - assert.ErrorIs(t, err, merr.ErrParameterInvalid) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.Nil(t, searchInfo.planInfo) + assert.ErrorIs(t, searchInfo.parseError, merr.ErrParameterInvalid) }) t.Run("check iterator and topK", func(t *testing.T) { normalParam := getValidSearchParams() @@ -2483,10 +2483,10 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.NotNil(t, info) - assert.NoError(t, err) - assert.Equal(t, Params.QuotaConfig.TopKLimit.GetAsInt64(), info.Topk) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.NotNil(t, searchInfo.planInfo) + assert.NoError(t, searchInfo.parseError) + assert.Equal(t, Params.QuotaConfig.TopKLimit.GetAsInt64(), searchInfo.planInfo.GetTopk()) }) t.Run("check max group size", func(t *testing.T) { @@ -2503,15 +2503,15 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.Nil(t, info) - assert.Error(t, err) - assert.True(t, strings.Contains(err.Error(), "exceeds configured max group size")) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.Nil(t, searchInfo.planInfo) + assert.Error(t, searchInfo.parseError) + assert.True(t, strings.Contains(searchInfo.parseError.Error(), "exceeds configured max group size")) resetSearchParamsValue(normalParam, GroupSizeKey, `10`) - info, _, err = parseSearchInfo(normalParam, schema, nil) - assert.NotNil(t, info) - assert.NoError(t, err) + searchInfo = parseSearchInfo(normalParam, schema, nil) + assert.NotNil(t, searchInfo.planInfo) + assert.NoError(t, searchInfo.parseError) }) } diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index b04003ac12de..1d61206a26d1 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -402,7 +402,7 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq // wait tsafe waitTr := timerecord.NewTimeRecorder("wait tSafe") - tSafe, err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp) + tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp()) if err != nil { log.Warn("delegator query failed to wait tsafe", zap.Error(err)) return err @@ -473,7 +473,7 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) // wait tsafe waitTr := timerecord.NewTimeRecorder("wait tSafe") - tSafe, err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp) + tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp()) if err != nil { log.Warn("delegator query failed to wait tsafe", zap.Error(err)) return nil, err