-
Notifications
You must be signed in to change notification settings - Fork 904
GODRIVER-2388 Improved Bulk Write API. #1884
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Conversation
API Change Report./v2/mongoincompatible changes(*ReplaceOneModel).SetSort: removed compatible changes(*Client).BulkWrite: added ./v2/mongo/optionsincompatible changes(*ReplaceOptionsBuilder).SetSort: removed compatible changesClientBulkWrite: added ./v2/x/mongo/driverincompatible changes(*Batches).AdvanceBatch: removed compatible changes(*Batches).AdvanceBatches: added ./v2/x/mongo/driver/sessionincompatible changesClient.RetryRead: removed |
@@ -398,7 +398,7 @@ func TestClientSideEncryptionCustomCrypt(t *testing.T) { | |||
"expected 0 calls to DecryptExplicit, got %v", cc.numDecryptExplicitCalls) | |||
assert.Equal(mt, cc.numCloseCalls, 0, | |||
"expected 0 calls to Close, got %v", cc.numCloseCalls) | |||
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 2, | |||
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only call it once after the operation.go refactoring.
mongo/errors.go
Outdated
// A top-level error that occurred when attempting to communicate with the server | ||
// or execute the bulk write. This value may not be populated if the exception was | ||
// thrown due to errors occurring on individual writes. | ||
TopLevelError *WriteError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot use Error as a field name because of the conflict with the conventional method name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: Consider the more conventional Err
or WriteError
.
mongo/client_bulk_write_models.go
Outdated
} | ||
type clientWriteModel struct { | ||
namespace string | ||
model interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add stronger type constraints to this?
type clientBulkWriteModel interface {
ClientInsertOneModel // etc.
}
type clientWriteModel struct {
namespace string
model clientBulkWriteModel
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need an additional abstraction for an un-exported struct.
} | ||
|
||
// Error implements the error interface. | ||
func (bwe ClientBulkWriteException) Error() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't return an error if the write is unacknowledged. The specifications required that users be able to discern whether a BulkWriteResult
contains acknowledged results. Either return an error indicating an unacknowledged result, or update ClientBulkWriteResult
in the spirit of GODRIVER-2821.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can test this with the following:
package main
import (
"context"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
)
func main() {
client, err := mongo.Connect()
if err != nil {
panic(err)
}
defer func() { _ = client.Disconnect(context.Background()) }()
pairs := &mongo.ClientWriteModels{}
insertOneModel := mongo.NewClientInsertOneModel().SetDocument(bson.D{{"x", 1}})
opts := options.ClientBulkWrite().SetWriteConcern(writeconcern.Unacknowledged()).SetOrdered(false)
pairs = pairs.AppendInsertOne("db", "k", insertOneModel)
_, err = client.BulkWrite(context.Background(), pairs, opts) // Should not panic
if err != nil {
panic(err)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a unified spec test that covers this case? If not we should add one / add an integration test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mongo/bulk_write.go
Outdated
if filter == nil { | ||
return nil, fmt.Errorf("%w: filter is required", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the error message when the filter is not given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to nest this nil filter check in this error handling block? Can we move this check to the beginning of marshal
?
Optional: Consider a clearer error message, like
delete filter cannot be nil
mongo/bulk_write.go
Outdated
if doc.filter == nil { | ||
return nil, fmt.Errorf("%w: filter is required", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the error message when the filter is not given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to nest this nil filter check in this error handling block? Can we move this check to the beginning of marshal
?
Optional: Consider a clearer error message, like
update filter cannot be nil
4bc724e
to
dbd44c9
Compare
dbd44c9
to
d10eff2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qingyang-hu There are still outstanding issues from the previous review.
@@ -13,6 +13,55 @@ import ( | |||
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation" | |||
) | |||
|
|||
// ClientBulkWriteResult is the result type returned by a client-level BulkWrite operation. | |||
type ClientBulkWriteResult struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The specifications say that "Users MUST be able to discern whether a [result] contains verbose results without inspecting the value provided for verboseResults
in [options]". Does this mean we should add a boolean value ClientBulkWriteResult
: HasVerboseResults
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial thought was to leave results maps as nil
when verboseResults
is false. However, I think you are right that an additional HasVerboseResults
field is more obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, either solution sounds good to me.
mongo/client_bulk_write_test.go
Outdated
}, | ||
} | ||
var n int | ||
n, _, err = batches.AppendBatchSequence(nil, 4, 16_000, 16_000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the significance of 16_000
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is just a number big enough not to cut the document, so only the maxCount
regulates the output. Will add a comment there.
var idx int32 | ||
dst = wiremessage.AppendMsgSectionType(dst, wiremessage.DocumentSequence) | ||
idx, dst = bsoncore.ReserveLength(dst) | ||
dst = append(dst, identifier...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the specifications:
The first entry in each document has the name of the operation as its key and the index ini the
nsInfo
array of the namespace on which the operation should be performed as its value
When I do command monitoring for client bulk write with multiple pairs, I get the following:
2024/11/18 14:39:47 started: &{Command:{"bulkWrite": {"$numberInt":"1"},"errorsOnly": false,"ordered": true,"lsid": {"id": {"$binary":{"base64":"XTDtLVGhTx6MEIcFDhf0qw==","subType":"04"}}},"txnNumber": {"$numberLong":"1"},"$clusterTime": {"clusterTime": {"$timestamp":{"t":1731965987,"i":1}},"signature": {"hash": {"$binary":{"base64":"AAAAAAAAAAAAAAAAAAAAAAAAAAA=","subType":"00"}},"keyId": {"$numberLong":"0"}}},"$db": "admin","ops": [{"insert": {"$numberInt":"0"},"document": {"_id": {"$oid":"673bb423a86efe4126c4c585"},"x": {"$numberInt":"1"}}},{"insert": {"$numberInt":"0"},"document": {"_id": {"$oid":"673bb423a86efe4126c4c586"},"x": {"$numberInt":"2"}}}],"nsInfo": [{"ns": "db.coll"}]} DatabaseName:admin CommandName:bulkWrite RequestID:1 ConnectionID:localhost:27017[-4] ServerConnectionID:0x14000390250 ServiceID:<nil>}
Where the index value for each document in the sequence is {"$numberInt":"0"}
. Shouldn't this be {"$numberInt":"0"}
, then {"$numberInt":"1"}
, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same thing occurs with the other client bulk write operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean the Int32 value of the operation name such as "insert", "update", or "delete"?
...
"ops":[
{
"insert":{
"$numberInt":"0"
},
...
It is "the index in the nsInfo
array of the namespace on which the operation should be performed as its value".
The specs also require:
When constructing the
nsInfo
array for abulkWrite
batch, drivers MUST only include the namespaces that are referenced in theops
array for that batch.
and:
Drivers MUST NOT include duplicate namespaces in this list.
Therefore, if both operations perform on the same namespace, the nsInfo
array should contain only one item, and both operation indices are 0, pointing to "db.coll".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for the explanation!
dd218ab
to
3489ac8
Compare
x/mongo/driver/batches.go
Outdated
// AppendBatchSequence appends dst with document sequence of batches as long as the limits of max count, max | ||
// document size, or total size allows. It returns the number of batches appended, the new appended slice, and | ||
// any error raised. It returns the origenal input slice if nothing can be appends within the limits. | ||
func (b *Batches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, _ int) (int, []byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does maxMessageSizeBytes
not apply when not using bulk write? For example, when using collection.Insert
would we batch up to 48mb or 16mb? If I understand the documentation correctly, the batch has a limit of 48mb and individual documents in the batch have a limit of 16mb. Right now it appears we are limiting the entire batch to 16mb. The clientBulkWrite API does what I would expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic derives from the original *Batches.AdvanceBatch()
, which takes maxDocumentSize
as maxMessageSizeBytes
. A test case covers the logic here.
@matthewdale, do you know any history of the logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the logic in fc91428.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we open a ticket for this to avoid adding more complexity to this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, it doesn't make sense to review incorrect logic since we've already refactored that block. Moreover, fc91428 removed the maxBsonObjectSize
requirement from the client-level bulk write to simplify the PR.
However, I'm open to reverting and only keeping what is more relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIU the updated solution correctly it is now possible to put a single document over the 16mb limit on the wire message, relying on the server to validate the document size. From what I can tell, this is spec-compliant. But do we want to make data-heavy round trips that we know are going to fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, both the client-level bulk write and legacy bulk write rely on the server to check the document size limit in fc91428.
But I agree the validation on the server side is inefficient. Pre-validating the size on the driver side is not a breach of the spec because it is not prohibited, either, especially for performance reasons. So I will commit another change to return driver side errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW IMO we can defer that work.
x/mongo/driver/operation.go
Outdated
@@ -268,7 +271,13 @@ type Operation struct { | |||
// has more documents than can fit in a single command. This should only be specified for | |||
// commands that are batch compatible. For more information, please refer to the definition of | |||
// Batches. | |||
Batches *Batches | |||
Batches interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest internalizing this interface so that we can create a compile check for mongo.modelBatches
and x/mongo.Batches
.
mongo/client_bulk_write.go
Outdated
return len(mb.models) - mb.offset | ||
} | ||
|
||
func (mb *modelBatches) AppendBatchSequence(dst []byte, maxCount, maxDocSize, totalSize int) (int, []byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have test coverage for this in the following two cases:
- One doc exceeds
maxBsonObjectSize
, I would expect an error. - Multiple documents <
maxBsonObjectSize
together exceedmaxMessageSizeBytes
. I would expect n > 1.
Here is a helpful gist I used: https://gist.github.com/prestonvasquez/40daa306abc5a21147235d2ea982d841
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've involved GODRIVER-3421 in fc91428, which removes the BSON document size validation requirement. So the first case is no longer needed. The second case is added in both client_bulk_write_test.go and client_test.go.
9958e3f
to
82df60a
Compare
mtOpts := mtest.NewOptions().MinServerVersion("8.0").AtlasDataLake(false).ClientType(mtest.Pinned) | ||
mt := mtest.New(t, mtOpts) | ||
|
||
mt.Run("bulkWrite batch splits a writeModels input with greater than maxWriteBatchSize operations", func(mt *mtest.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we label the prose tests? I.e., this should be
3. bulkWrite batch splits a writeModels input with greater than maxWriteBatchSize operations
cli, err := mongo.Connect(cliOptions) | ||
require.NoError(mt, err, "Connect error: %v", err) | ||
_, err = cli.BulkWrite(context.Background(), models) | ||
assert.ErrorContains(mt, err, "context deadline exceeded", "expected a timeout error, got: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest using asser.ErrorIs
and the package error context.DeadlineExceeded
:
assert.ErrorContains(mt, err, "context deadline exceeded", "expected a timeout error, got: %v", err) | |
assert.ErrorIs(mt, err, context.DeadlineExceeded, "expected a timeout error, got: %v", err) |
if res == nil || !res.Acknowledged { | ||
return newDocumentResult(emptyCoreDocument, err), nil | ||
} | ||
rawBuilder := bsoncore.NewDocumentBuilder(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use bsoncore? Can we just populate mongo.ClientBulkWriteResult
and encode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The encoder encodes all fields in lowercase by default. An alternative may be a struct with tags. The current resolution follows the pattern in executeCreateChangeStream
and executeListDatabases
.
model.ArrayFilters = v.ArrayFilters | ||
} | ||
return v.Namespace, model, nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hint *bson.RawValue | ||
Upsert *bool | ||
} | ||
err := bson.Unmarshal(value, &v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to define the extra fields? Can we unmarshal into an embedded mongo.ClientUpdateOneModel
?
var v struct {
Namespace string
mongo.ClientUpdateOneModel
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The spec tests put bulk write fields in parallel with the namespace. We cannot embed mongo.ClientUpdateOneModel
without a nested level.
bca2564
to
91dabcd
Compare
91dabcd
to
89947bd
Compare
mongo/bulk_write.go
Outdated
if doc.filter == nil { | ||
return nil, fmt.Errorf("%w: filter is required", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to nest this nil filter check in this error handling block? Can we move this check to the beginning of marshal
?
Optional: Consider a clearer error message, like
update filter cannot be nil
mongo/bulk_write.go
Outdated
if filter == nil { | ||
return nil, fmt.Errorf("%w: filter is required", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to nest this nil filter check in this error handling block? Can we move this check to the beginning of marshal
?
Optional: Consider a clearer error message, like
delete filter cannot be nil
mongo/errors.go
Outdated
// A top-level error that occurred when attempting to communicate with the server | ||
// or execute the bulk write. This value may not be populated if the exception was | ||
// thrown due to errors occurring on individual writes. | ||
TopLevelError *WriteError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: Consider the more conventional Err
or WriteError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few requests for comments on confusing code, but everything else looks good.
x/mongo/driver/operation.go
Outdated
@@ -2172,3 +2218,35 @@ func sessionsSupported(wireVersion *description.VersionRange) bool { | |||
func retryWritesSupported(s description.Server) bool { | |||
return s.SessionTimeoutMinutes != nil && s.Kind != description.ServerKindStandalone | |||
} | |||
|
|||
func documentSequenceToArray(src []byte) (bsoncore.Array, []byte, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mistaken about the return type of this function in my previous comment. It seems that this function actually returns a bsoncore.Element
, not a bsoncore.Array
.
I strongly recommend adding a comment that describes the expected inputs and outputs for this function to reduce confusion.
x/mongo/driver/operation.go
Outdated
cmd bsoncore.Document | ||
requestID int32 | ||
cmdName string | ||
docArray bsoncore.Array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was mistaken about the type of this value in my previous comment. It seems that this is actually a byte slice containing one or many BSON array elements (i.e. multiple concatenated bsoncore.Element
values), not a bsoncore.Array
. Since there's not an analogous bsoncore
type, I recommend making this a []byte
. I also recommend adding a comment to describe the expected contents of the byte slice to reduce confusion.
x/mongo/driver/operation.go
Outdated
for b := dst[batchOffset:]; len(b) > 0; /* nothing */ { | ||
var array bsoncore.Array | ||
var ok bool | ||
array, b, ok = documentSequenceToArray(b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend adding a comment that describes what this block does, including the expected contents of b
.
cliOptions := options.Client(). | ||
SetTimeout(2 * time.Second). | ||
SetMonitor(cm). | ||
ApplyURI(mtest.ClusterURI()) | ||
integtest.AddTestServerAPIVersion(cliOptions) | ||
cli, err := mongo.Connect(cliOptions) | ||
require.NoError(mt, err, "Connect error: %v", err) | ||
_, err = cli.BulkWrite(context.Background(), writes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use mt.ResetClient
instead.
cliOptions := options.Client(). | |
SetTimeout(2 * time.Second). | |
SetMonitor(cm). | |
ApplyURI(mtest.ClusterURI()) | |
integtest.AddTestServerAPIVersion(cliOptions) | |
cli, err := mongo.Connect(cliOptions) | |
require.NoError(mt, err, "Connect error: %v", err) | |
_, err = cli.BulkWrite(context.Background(), writes) | |
cliOptions := options.Client(). | |
SetTimeout(2 * time.Second). | |
SetMonitor(cm) | |
mt.ResetClient(cliOptions) | |
_, err = mt.Client.BulkWrite(context.Background(), writes) |
mongo/client_bulk_write.go
Outdated
dec := bson.NewDecoder(bson.NewDocumentReader(bytes.NewReader(resp))) | ||
dec.SetRegistry(mb.client.registry) | ||
err := dec.Decode(&res) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the user-defined registry is not necessary for decoding command documents (only for encoding/decoding user document data).
dec := bson.NewDecoder(bson.NewDocumentReader(bytes.NewReader(resp))) | |
dec.SetRegistry(mb.client.registry) | |
err := dec.Decode(&res) | |
err := bson.Unmarshal(resp, &res) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! 👍
GODRIVER-2388
GODRIVER-3348
GODRIVER-3349
GODRIVER-3364
GODRIVER-3421 (added in fc91428)
Summary
Improved Bulk Write API.
Background & Motivation
Refactor the
(Operation).createWireMessage()
to support the bulk write batching.