From d3eadd125717f90bb89bfebc75bf5ec92cd4c5d0 Mon Sep 17 00:00:00 2001 From: MrPresent-Han Date: Thu, 26 Sep 2024 07:20:20 -0400 Subject: [PATCH] enhance: add ts support for iterator(#36599) Signed-off-by: MrPresent-Han --- go.mod | 2 +- go.sum | 2 + .../datacoord/compaction_task_clustering.go | 3 +- .../compaction_task_clustering_test.go | 3 +- internal/proxy/search_util.go | 62 ++++++++------ internal/proxy/task_query.go | 13 ++- internal/proxy/task_search.go | 41 ++++++---- internal/proxy/task_search_test.go | 80 +++++++++---------- internal/querynodev2/delegator/delegator.go | 4 +- 9 files changed, 122 insertions(+), 88 deletions(-) diff --git a/go.mod b/go.mod index 0522e82e76e5e..ef6ddede5ba15 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.7 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 github.com/minio/minio-go/v7 v7.0.61 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index 7f995530510a9..c5bde3620c528 100644 --- a/go.sum +++ b/go.sum @@ -619,6 +619,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd h1:x0b0+foTe23sKcVFseR1DE8+BB08EH6ViiRHaz8PEik= github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240909041258-8f8ca67816cd/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34 h1:Fwxpg98128gfWRbQ1A3PMP9o2IfYZk7RSEy8rcoCWDA= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240930043709-0c23514e4c34/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.11.1 h1:5jiRP5j93CrgqcC20XVn68DX27htZdhedP1NyoIwkVg= github.com/milvus-io/pulsar-client-go v0.11.1/go.mod h1:cipLojlpUzs3i3cDNrK2MdOVs4HWPD7MQsAoOUqWcec= github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240923125106-ef9b8fd69497 h1:t4sQMbSy05p8qgMGvEGyLYYLoZ9fD1dushS1bj5X6+0= diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 49254d1a8be05..bb006b5732176 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -23,8 +23,6 @@ import ( "strconv" "time" - "github.com/milvus-io/milvus/internal/storage" - "github.com/cockroachdb/errors" "github.com/samber/lo" "go.opentelemetry.io/otel/trace" @@ -35,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" diff --git a/internal/datacoord/compaction_task_clustering_test.go b/internal/datacoord/compaction_task_clustering_test.go index 6a11089741df5..f3cfdfaa57758 100644 --- a/internal/datacoord/compaction_task_clustering_test.go +++ b/internal/datacoord/compaction_task_clustering_test.go @@ -24,8 +24,6 @@ import ( "testing" "time" - "github.com/milvus-io/milvus/pkg/util/metautil" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/atomic" @@ -41,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metautil" ) func TestClusteringCompactionTaskSuite(t *testing.T) { diff --git a/internal/proxy/search_util.go b/internal/proxy/search_util.go index d499872b19ba4..28276dc4145e0 100644 --- a/internal/proxy/search_util.go +++ b/internal/proxy/search_util.go @@ -75,22 +75,29 @@ func (r *rankParams) String() string { return fmt.Sprintf("limit: %d, offset: %d, roundDecimal: %d", r.GetLimit(), r.GetOffset(), r.GetRoundDecimal()) } +type SearchInfo struct { + planInfo *planpb.QueryInfo + offset int64 + parseError error + isIterator bool +} + // parseSearchInfo returns QueryInfo and offset -func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb.CollectionSchema, rankParams *rankParams) (*planpb.QueryInfo, int64, error) { +func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb.CollectionSchema, rankParams *rankParams) *SearchInfo { var topK int64 isAdvanced := rankParams != nil externalLimit := rankParams.GetLimit() + rankParams.GetOffset() topKStr, err := funcutil.GetAttrByKeyFromRepeatedKV(TopKKey, searchParamsPair) if err != nil { if externalLimit <= 0 { - return nil, 0, fmt.Errorf("%s is required", TopKKey) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s is required", TopKKey)} } topK = externalLimit } else { topKInParam, err := strconv.ParseInt(topKStr, 0, 64) if err != nil { if externalLimit <= 0 { - return nil, 0, fmt.Errorf("%s [%s] is invalid", TopKKey, topKStr) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%s] is invalid", TopKKey, topKStr)} } topK = externalLimit } else { @@ -106,7 +113,7 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb // 2. GetAsInt64 has cached inside, no need to worry about cpu cost for parsing here topK = Params.QuotaConfig.TopKLimit.GetAsInt64() } else { - return nil, 0, fmt.Errorf("%s [%d] is invalid, %w", TopKKey, topK, err) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%d] is invalid, %w", TopKKey, topK, err)} } } @@ -117,12 +124,12 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb if err == nil { offset, err = strconv.ParseInt(offsetStr, 0, 64) if err != nil { - return nil, 0, fmt.Errorf("%s [%s] is invalid", OffsetKey, offsetStr) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%s] is invalid", OffsetKey, offsetStr)} } if offset != 0 { if err := validateLimit(offset); err != nil { - return nil, 0, fmt.Errorf("%s [%d] is invalid, %w", OffsetKey, offset, err) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%d] is invalid, %w", OffsetKey, offset, err)} } } } @@ -130,7 +137,7 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb queryTopK := topK + offset if err := validateLimit(queryTopK); err != nil { - return nil, 0, fmt.Errorf("%s+%s [%d] is invalid, %w", OffsetKey, TopKKey, queryTopK, err) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s+%s [%d] is invalid, %w", OffsetKey, TopKKey, queryTopK, err)} } // 2. parse metrics type @@ -147,11 +154,11 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb roundDecimal, err := strconv.ParseInt(roundDecimalStr, 0, 64) if err != nil { - return nil, 0, fmt.Errorf("%s [%s] is invalid, should be -1 or an integer in range [0, 6]", RoundDecimalKey, roundDecimalStr) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%s] is invalid, should be -1 or an integer in range [0, 6]", RoundDecimalKey, roundDecimalStr)} } if roundDecimal != -1 && (roundDecimal > 6 || roundDecimal < 0) { - return nil, 0, fmt.Errorf("%s [%s] is invalid, should be -1 or an integer in range [0, 6]", RoundDecimalKey, roundDecimalStr) + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: fmt.Errorf("%s [%s] is invalid, should be -1 or an integer in range [0, 6]", RoundDecimalKey, roundDecimalStr)} } // 4. parse search param str @@ -168,30 +175,35 @@ func parseSearchInfo(searchParamsPair []*commonpb.KeyValuePair, schema *schemapb } else { groupByInfo := parseGroupByInfo(searchParamsPair, schema) if groupByInfo.err != nil { - return nil, 0, groupByInfo.err + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: groupByInfo.err} } groupByFieldId, groupSize, groupStrictSize = groupByInfo.GetGroupByFieldId(), groupByInfo.GetGroupSize(), groupByInfo.GetGroupStrictSize() } // 6. parse iterator tag, prevent trying to groupBy when doing iteration or doing range-search if isIterator == "True" && groupByFieldId > 0 { - return nil, 0, merr.WrapErrParameterInvalid("", "", - "Not allowed to do groupBy when doing iteration") + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: merr.WrapErrParameterInvalid("", "", + "Not allowed to do groupBy when doing iteration")} } if strings.Contains(searchParamStr, radiusKey) && groupByFieldId > 0 { - return nil, 0, merr.WrapErrParameterInvalid("", "", - "Not allowed to do range-search when doing search-group-by") - } - - return &planpb.QueryInfo{ - Topk: queryTopK, - MetricType: metricType, - SearchParams: searchParamStr, - RoundDecimal: roundDecimal, - GroupByFieldId: groupByFieldId, - GroupSize: groupSize, - GroupStrictSize: groupStrictSize, - }, offset, nil + return &SearchInfo{planInfo: nil, offset: 0, isIterator: false, parseError: merr.WrapErrParameterInvalid("", "", + "Not allowed to do range-search when doing search-group-by")} + } + + return &SearchInfo{ + planInfo: &planpb.QueryInfo{ + Topk: queryTopK, + MetricType: metricType, + SearchParams: searchParamStr, + RoundDecimal: roundDecimal, + GroupByFieldId: groupByFieldId, + GroupSize: groupSize, + GroupStrictSize: groupStrictSize, + }, + offset: offset, + isIterator: isIterator == "True", + parseError: nil, + } } func getOutputFieldIDs(schema *schemaInfo, outputFields []string) (outputFieldIDs []UniqueID, err error) { diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index a79af7e7d752f..8188274259de0 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -79,6 +79,7 @@ type queryParams struct { limit int64 offset int64 reduceType reduce.IReduceType + isIterator bool } // translateToOutputFieldIDs translates output fields name to output fields id. @@ -178,7 +179,7 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e limitStr, err := funcutil.GetAttrByKeyFromRepeatedKV(LimitKey, queryParamsPair) // if limit is not provided if err != nil { - return &queryParams{limit: typeutil.Unlimited, reduceType: reduceType}, nil + return &queryParams{limit: typeutil.Unlimited, reduceType: reduceType, isIterator: isIterator}, nil } limit, err = strconv.ParseInt(limitStr, 0, 64) if err != nil { @@ -203,6 +204,7 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e limit: limit, offset: offset, reduceType: reduceType, + isIterator: isIterator, }, nil } @@ -461,6 +463,11 @@ func (t *queryTask) PreExecute(ctx context.Context) error { } } t.GuaranteeTimestamp = guaranteeTs + // need modify mvccTs and guaranteeTs for iterator specially + if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() > 0 { + t.MvccTimestamp = t.request.GetGuaranteeTimestamp() + t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp() + } deadline, ok := t.TraceCtx().Deadline() if ok { @@ -542,6 +549,10 @@ func (t *queryTask) PostExecute(ctx context.Context) error { t.result.OutputFields = t.userOutputFields metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Observe(float64(tr.RecordSpan().Milliseconds())) + if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() == 0 { + // first page for iteration, need to set up sessionTs for iterator + t.result.SessionTs = t.BeginTs() + } log.Debug("Query PostExecute done") return nil } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index ccd4e591ca81c..341d0cd6ed9ce 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -80,6 +80,8 @@ type searchTask struct { reScorers []reScorer rankParams *rankParams groupScorer func(group *Group) error + + isIterator bool } func (t *searchTask) CanSkipAllocTimestamp() bool { @@ -249,6 +251,10 @@ func (t *searchTask) PreExecute(ctx context.Context) error { } t.SearchRequest.GuaranteeTimestamp = guaranteeTs t.SearchRequest.ConsistencyLevel = consistencyLevel + if t.isIterator && t.request.GetGuaranteeTimestamp() > 0 { + t.MvccTimestamp = t.request.GetGuaranteeTimestamp() + t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp() + } if deadline, ok := t.TraceCtx().Deadline(); ok { t.SearchRequest.TimeoutTimestamp = tsoutil.ComposeTSByTime(deadline, 0) @@ -351,7 +357,7 @@ func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error { t.SearchRequest.SubReqs = make([]*internalpb.SubSearchRequest, len(t.request.GetSubReqs())) t.queryInfos = make([]*planpb.QueryInfo, len(t.request.GetSubReqs())) for index, subReq := range t.request.GetSubReqs() { - plan, queryInfo, offset, err := t.tryGeneratePlan(subReq.GetSearchParams(), subReq.GetDsl()) + plan, queryInfo, offset, _, err := t.tryGeneratePlan(subReq.GetSearchParams(), subReq.GetDsl()) if err != nil { return err } @@ -443,11 +449,12 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error { log := log.Ctx(ctx).With(zap.Int64("collID", t.GetCollectionID()), zap.String("collName", t.collectionName)) // fetch search_growing from search param - plan, queryInfo, offset, err := t.tryGeneratePlan(t.request.GetSearchParams(), t.request.GetDsl()) + plan, queryInfo, offset, isIterator, err := t.tryGeneratePlan(t.request.GetSearchParams(), t.request.GetDsl()) if err != nil { return err } + t.isIterator = isIterator t.SearchRequest.Offset = offset if t.partitionKeyMode { @@ -490,38 +497,38 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error { return nil } -func (t *searchTask) tryGeneratePlan(params []*commonpb.KeyValuePair, dsl string) (*planpb.PlanNode, *planpb.QueryInfo, int64, error) { +func (t *searchTask) tryGeneratePlan(params []*commonpb.KeyValuePair, dsl string) (*planpb.PlanNode, *planpb.QueryInfo, int64, bool, error) { annsFieldName, err := funcutil.GetAttrByKeyFromRepeatedKV(AnnsFieldKey, params) if err != nil || len(annsFieldName) == 0 { vecFields := typeutil.GetVectorFieldSchemas(t.schema.CollectionSchema) if len(vecFields) == 0 { - return nil, nil, 0, errors.New(AnnsFieldKey + " not found in schema") + return nil, nil, 0, false, errors.New(AnnsFieldKey + " not found in schema") } if enableMultipleVectorFields && len(vecFields) > 1 { - return nil, nil, 0, errors.New("multiple anns_fields exist, please specify a anns_field in search_params") + return nil, nil, 0, false, errors.New("multiple anns_fields exist, please specify a anns_field in search_params") } annsFieldName = vecFields[0].Name } - queryInfo, offset, parseErr := parseSearchInfo(params, t.schema.CollectionSchema, t.rankParams) - if parseErr != nil { - return nil, nil, 0, parseErr + searchInfo := parseSearchInfo(params, t.schema.CollectionSchema, t.rankParams) + if searchInfo.parseError != nil { + return nil, nil, 0, false, searchInfo.parseError } annField := typeutil.GetFieldByName(t.schema.CollectionSchema, annsFieldName) - if queryInfo.GetGroupByFieldId() != -1 && annField.GetDataType() == schemapb.DataType_BinaryVector { - return nil, nil, 0, errors.New("not support search_group_by operation based on binary vector column") + if searchInfo.planInfo.GetGroupByFieldId() != -1 && annField.GetDataType() == schemapb.DataType_BinaryVector { + return nil, nil, 0, false, errors.New("not support search_group_by operation based on binary vector column") } - plan, planErr := planparserv2.CreateSearchPlan(t.schema.schemaHelper, dsl, annsFieldName, queryInfo) + plan, planErr := planparserv2.CreateSearchPlan(t.schema.schemaHelper, dsl, annsFieldName, searchInfo.planInfo) if planErr != nil { log.Warn("failed to create query plan", zap.Error(planErr), zap.String("dsl", dsl), // may be very large if large term passed. - zap.String("anns field", annsFieldName), zap.Any("query info", queryInfo)) - return nil, nil, 0, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", planErr) + zap.String("anns field", annsFieldName), zap.Any("query info", searchInfo.planInfo)) + return nil, nil, 0, false, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", planErr) } log.Debug("create query plan", zap.String("dsl", t.request.Dsl), // may be very large if large term passed. - zap.String("anns field", annsFieldName), zap.Any("query info", queryInfo)) - return plan, queryInfo, offset, nil + zap.String("anns field", annsFieldName), zap.Any("query info", searchInfo.planInfo)) + return plan, searchInfo.planInfo, searchInfo.offset, searchInfo.isIterator, nil } func (t *searchTask) tryParsePartitionIDsFromPlan(plan *planpb.PlanNode) ([]int64, error) { @@ -714,6 +721,10 @@ func (t *searchTask) PostExecute(ctx context.Context) error { } t.result.Results.OutputFields = t.userOutputFields t.result.CollectionName = t.request.GetCollectionName() + if t.isIterator && t.request.GetGuaranteeTimestamp() == 0 { + // first page for iteration, need to set up sessionTs for iterator + t.result.SessionTs = t.BeginTs() + } metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(tr.RecordSpan().Milliseconds())) diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 70799c9fde539..1dcdf0fe2d828 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -2235,11 +2235,11 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - info, offset, err := parseSearchInfo(test.validParams, nil, nil) - assert.NoError(t, err) - assert.NotNil(t, info) + searchInfo := parseSearchInfo(test.validParams, nil, nil) + assert.NoError(t, searchInfo.parseError) + assert.NotNil(t, searchInfo.planInfo) if test.description == "offsetParam" { - assert.Equal(t, targetOffset, offset) + assert.Equal(t, targetOffset, searchInfo.offset) } }) } @@ -2256,11 +2256,11 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { limit: externalLimit, } - info, offset, err := parseSearchInfo(offsetParam, nil, rank) - assert.NoError(t, err) - assert.NotNil(t, info) - assert.Equal(t, int64(10), info.GetTopk()) - assert.Equal(t, int64(0), offset) + searchInfo := parseSearchInfo(offsetParam, nil, rank) + assert.NoError(t, searchInfo.parseError) + assert.NotNil(t, searchInfo.planInfo) + assert.Equal(t, int64(10), searchInfo.planInfo.GetTopk()) + assert.Equal(t, int64(0), searchInfo.offset) }) t.Run("parseSearchInfo groupBy info for hybrid search", func(t *testing.T) { @@ -2309,15 +2309,15 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { Value: "true", }) - info, _, err := parseSearchInfo(params, schema, testRankParams) - assert.NoError(t, err) - assert.NotNil(t, info) + searchInfo := parseSearchInfo(params, schema, testRankParams) + assert.NoError(t, searchInfo.parseError) + assert.NotNil(t, searchInfo.planInfo) // all group_by related parameters should be aligned to parameters // set by main request rather than inner sub request - assert.Equal(t, int64(101), info.GetGroupByFieldId()) - assert.Equal(t, int64(3), info.GetGroupSize()) - assert.False(t, info.GetGroupStrictSize()) + assert.Equal(t, int64(101), searchInfo.planInfo.GetGroupByFieldId()) + assert.Equal(t, int64(3), searchInfo.planInfo.GetGroupSize()) + assert.False(t, searchInfo.planInfo.GetGroupStrictSize()) }) t.Run("parseSearchInfo error", func(t *testing.T) { @@ -2399,12 +2399,12 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - info, offset, err := parseSearchInfo(test.invalidParams, nil, nil) - assert.Error(t, err) - assert.Nil(t, info) - assert.Zero(t, offset) + searchInfo := parseSearchInfo(test.invalidParams, nil, nil) + assert.Error(t, searchInfo.parseError) + assert.Nil(t, searchInfo.planInfo) + assert.Zero(t, searchInfo.offset) - t.Logf("err=%s", err.Error()) + t.Logf("err=%s", searchInfo.parseError) }) } }) @@ -2426,9 +2426,9 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.Nil(t, info) - assert.ErrorIs(t, err, merr.ErrParameterInvalid) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.Nil(t, searchInfo.planInfo) + assert.ErrorIs(t, searchInfo.parseError, merr.ErrParameterInvalid) }) t.Run("check range-search and groupBy", func(t *testing.T) { normalParam := getValidSearchParams() @@ -2445,9 +2445,9 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.Nil(t, info) - assert.ErrorIs(t, err, merr.ErrParameterInvalid) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.Nil(t, searchInfo.planInfo) + assert.ErrorIs(t, searchInfo.parseError, merr.ErrParameterInvalid) }) t.Run("check nullable and groupBy", func(t *testing.T) { normalParam := getValidSearchParams() @@ -2464,9 +2464,9 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.Nil(t, info) - assert.ErrorIs(t, err, merr.ErrParameterInvalid) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.Nil(t, searchInfo.planInfo) + assert.ErrorIs(t, searchInfo.parseError, merr.ErrParameterInvalid) }) t.Run("check iterator and topK", func(t *testing.T) { normalParam := getValidSearchParams() @@ -2483,10 +2483,10 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.NotNil(t, info) - assert.NoError(t, err) - assert.Equal(t, Params.QuotaConfig.TopKLimit.GetAsInt64(), info.Topk) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.NotNil(t, searchInfo.planInfo) + assert.NoError(t, searchInfo.parseError) + assert.Equal(t, Params.QuotaConfig.TopKLimit.GetAsInt64(), searchInfo.planInfo.GetTopk()) }) t.Run("check max group size", func(t *testing.T) { @@ -2503,15 +2503,15 @@ func TestTaskSearch_parseSearchInfo(t *testing.T) { schema := &schemapb.CollectionSchema{ Fields: fields, } - info, _, err := parseSearchInfo(normalParam, schema, nil) - assert.Nil(t, info) - assert.Error(t, err) - assert.True(t, strings.Contains(err.Error(), "exceeds configured max group size")) + searchInfo := parseSearchInfo(normalParam, schema, nil) + assert.Nil(t, searchInfo.planInfo) + assert.Error(t, searchInfo.parseError) + assert.True(t, strings.Contains(searchInfo.parseError.Error(), "exceeds configured max group size")) resetSearchParamsValue(normalParam, GroupSizeKey, `10`) - info, _, err = parseSearchInfo(normalParam, schema, nil) - assert.NotNil(t, info) - assert.NoError(t, err) + searchInfo = parseSearchInfo(normalParam, schema, nil) + assert.NotNil(t, searchInfo.planInfo) + assert.NoError(t, searchInfo.parseError) }) } diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index b04003ac12de5..1d61206a26d18 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -402,7 +402,7 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq // wait tsafe waitTr := timerecord.NewTimeRecorder("wait tSafe") - tSafe, err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp) + tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp()) if err != nil { log.Warn("delegator query failed to wait tsafe", zap.Error(err)) return err @@ -473,7 +473,7 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest) // wait tsafe waitTr := timerecord.NewTimeRecorder("wait tSafe") - tSafe, err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp) + tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp()) if err != nil { log.Warn("delegator query failed to wait tsafe", zap.Error(err)) return nil, err