diff --git a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java index bd7c27a79..d9b1ef4c1 100644 --- a/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java +++ b/sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java @@ -30,7 +30,6 @@ import io.milvus.v2.service.collection.request.CreateCollectionReq; import io.milvus.v2.service.collection.request.DescribeCollectionReq; import io.milvus.v2.service.collection.response.DescribeCollectionResp; -import io.milvus.v2.service.index.IndexService; import io.milvus.v2.service.vector.request.*; import io.milvus.v2.service.vector.response.*; import io.milvus.v2.utils.DataUtils; @@ -72,10 +71,10 @@ private DescribeCollectionResponse describeCollection(MilvusServiceGrpc.MilvusSe * If insert/upsert get server error, remove the cached collection info. */ private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, - String databaseName, String collectionName) { + String databaseName, String collectionName, boolean forceUpdate) { String key = combineCacheKey(databaseName, collectionName); DescribeCollectionResponse info = cacheCollectionInfo.get(key); - if (info == null) { + if (info == null || forceUpdate) { info = describeCollection(blockingStub, databaseName, collectionName); cacheCollectionInfo.put(key, info); } @@ -110,14 +109,26 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle } } + private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) { + DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper(); + DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp); + InsertRequest rpcRequest = requestBuilder.convertGrpcInsertRequest(request, descColl); + return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build(); + } + public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) { String title = String.format("InsertRequest collectionName:%s", request.getCollectionName()); // TODO: set the database name - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName()); - DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper(); - DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp); - MutationResult response = blockingStub.insert(requestBuilder.convertGrpcInsertRequest(request, descColl)); + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false); + InsertRequest rpcRequest = buildInsertRequest(request, descResp); + MutationResult response = blockingStub.insert(rpcRequest); + if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) { + descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true); + rpcRequest = buildInsertRequest(request, descResp); + response = blockingStub.insert(rpcRequest); + } + cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp()); @@ -137,14 +148,26 @@ public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu } } + private UpsertRequest buildUpsertRequest(UpsertReq request, DescribeCollectionResponse descResp) { + DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper(); + DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp); + UpsertRequest rpcRequest = requestBuilder.convertGrpcUpsertRequest(request, descColl); + return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build(); + } + public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) { String title = String.format("UpsertRequest collectionName:%s", request.getCollectionName()); // TODO: set the database name - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName()); - DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper(); - DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp); - MutationResult response = blockingStub.upsert(requestBuilder.convertGrpcUpsertRequest(request, descColl)); + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false); + UpsertRequest rpcRequest = buildUpsertRequest(request, descResp); + MutationResult response = blockingStub.upsert(rpcRequest); + if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) { + descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true); + rpcRequest = buildUpsertRequest(request, descResp); + response = blockingStub.upsert(rpcRequest); + } + cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp()); @@ -235,9 +258,9 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time"); } - DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName()); - DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp); if (request.getFilter() == null) { + DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false); + DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp); request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds())); } DeleteRequest.Builder builder = DeleteRequest.newBuilder() diff --git a/sdk-core/src/main/milvus-proto b/sdk-core/src/main/milvus-proto index 66c87c7e9..62dd88a09 160000 --- a/sdk-core/src/main/milvus-proto +++ b/sdk-core/src/main/milvus-proto @@ -1 +1 @@ -Subproject commit 66c87c7e94889993d4493cb77c6354dac8c1047e +Subproject commit 62dd88a09b7ea452606a21bc8b34fbbe7a7c7c1c