Skip to content

Commit

Permalink
Update cached collection schema by updatetime
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <yihua.mo@zilliz.com>
  • Loading branch information
yhmo committed Feb 6, 2025
1 parent b452c61 commit d4c5267
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import io.milvus.v2.service.index.IndexService;
import io.milvus.v2.service.vector.request.*;
import io.milvus.v2.service.vector.response.*;
import io.milvus.v2.utils.DataUtils;
Expand Down Expand Up @@ -72,10 +71,10 @@ private DescribeCollectionResponse describeCollection(MilvusServiceGrpc.MilvusSe
* If insert/upsert get server error, remove the cached collection info.
*/
private DescribeCollectionResponse getCollectionInfo(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
String databaseName, String collectionName) {
String databaseName, String collectionName, boolean forceUpdate) {
String key = combineCacheKey(databaseName, collectionName);
DescribeCollectionResponse info = cacheCollectionInfo.get(key);
if (info == null) {
if (info == null || forceUpdate) {
info = describeCollection(blockingStub, databaseName, collectionName);
cacheCollectionInfo.put(key, info);
}
Expand Down Expand Up @@ -110,14 +109,26 @@ private void cleanCacheIfFailed(Status status, String databaseName, String colle
}
}

private InsertRequest buildInsertRequest(InsertReq request, DescribeCollectionResponse descResp) {
DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
InsertRequest rpcRequest = requestBuilder.convertGrpcInsertRequest(request, descColl);
return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
}

public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, InsertReq request) {
String title = String.format("InsertRequest collectionName:%s", request.getCollectionName());

// TODO: set the database name
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
MutationResult response = blockingStub.insert(requestBuilder.convertGrpcInsertRequest(request, descColl));
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
InsertRequest rpcRequest = buildInsertRequest(request, descResp);
MutationResult response = blockingStub.insert(rpcRequest);
if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true);
rpcRequest = buildInsertRequest(request, descResp);
response = blockingStub.insert(rpcRequest);
}

cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());
GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
Expand All @@ -137,14 +148,26 @@ public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
}
}

private UpsertRequest buildUpsertRequest(UpsertReq request, DescribeCollectionResponse descResp) {
DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
UpsertRequest rpcRequest = requestBuilder.convertGrpcUpsertRequest(request, descColl);
return rpcRequest.toBuilder().setSchemaTimestamp(descResp.getUpdateTimestamp()).build();
}

public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, UpsertReq request) {
String title = String.format("UpsertRequest collectionName:%s", request.getCollectionName());

// TODO: set the database name
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DataUtils.InsertBuilderWrapper requestBuilder = new DataUtils.InsertBuilderWrapper();
DescribeCollectionResp descColl = convertUtils.convertDescCollectionResp(descResp);
MutationResult response = blockingStub.upsert(requestBuilder.convertGrpcUpsertRequest(request, descColl));
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
UpsertRequest rpcRequest = buildUpsertRequest(request, descResp);
MutationResult response = blockingStub.upsert(rpcRequest);
if (response.getStatus().getErrorCode() == io.milvus.grpc.ErrorCode.SchemaMismatch) {
descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), true);
rpcRequest = buildUpsertRequest(request, descResp);
response = blockingStub.upsert(rpcRequest);
}

cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());
GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp());
Expand Down Expand Up @@ -235,9 +258,9 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu
throw new MilvusClientException(ErrorCode.INVALID_PARAMS, "filter and ids can't be set at the same time");
}

DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName());
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
if (request.getFilter() == null) {
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, "", request.getCollectionName(), false);
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
request.setFilter(vectorUtils.getExprById(respR.getPrimaryFieldName(), request.getIds()));
}
DeleteRequest.Builder builder = DeleteRequest.newBuilder()
Expand Down

0 comments on commit d4c5267

Please # to comment.