Skip to content

Commit e28f2d9

Browse files
committed
Extract upsert from sink strategies as configurable option
1 parent 94e23a8 commit e28f2d9

File tree

8 files changed

+118
-13
lines changed

8 files changed

+118
-13
lines changed

config/MongoSinkConnector.properties

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
3131
# Write configuration
3232
delete.on.null.values=false
3333
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
34+
writemodel.strategy.upsert=true
3435

3536
max.batch.size = 0
3637
rate.limiting.timeout=0

src/main/java/com/mongodb/kafka/connect/sink/MongoSinkTopicConfig.java

+20
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ public String value() {
173173
static final String WRITEMODEL_STRATEGY_DEFAULT =
174174
"com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy";
175175

176+
public static final String WRITEMODEL_STRATEGY_UPSERT_CONFIG = "writemodel.strategy.upsert";
177+
private static final String WRITEMODEL_STRATEGY_UPSERT_DISPLAY = "The upsert writeModel strategy";
178+
private static final String WRITEMODEL_STRATEGY_UPSERT_DOC =
179+
"Whether or not to use upserts for the write model strategy";
180+
static final boolean WRITEMODEL_STRATEGY_UPSERT_DEFAULT = true;
181+
176182
public static final String DELETE_WRITEMODEL_STRATEGY_CONFIG = "delete.writemodel.strategy";
177183
private static final String DELETE_WRITEMODEL_STRATEGY_DISPLAY = "The delete writeModel strategy";
178184
private static final String DELETE_WRITEMODEL_STRATEGY_DOC =
@@ -518,6 +524,10 @@ public IdStrategy getIdStrategy() {
518524
return idStrategy;
519525
}
520526

527+
public boolean isUpsertEnabled() {
528+
return getBoolean(WRITEMODEL_STRATEGY_UPSERT_CONFIG);
529+
}
530+
521531
PostProcessors getPostProcessors() {
522532
if (postProcessors == null) {
523533
postProcessors = new PostProcessors(this, getList(POST_PROCESSOR_CHAIN_CONFIG));
@@ -832,6 +842,16 @@ private static ConfigDef createConfigDef() {
832842
++orderInGroup,
833843
ConfigDef.Width.MEDIUM,
834844
WRITEMODEL_STRATEGY_DISPLAY);
845+
configDef.define(
846+
WRITEMODEL_STRATEGY_UPSERT_CONFIG,
847+
ConfigDef.Type.BOOLEAN,
848+
WRITEMODEL_STRATEGY_UPSERT_DEFAULT,
849+
ConfigDef.Importance.LOW,
850+
WRITEMODEL_STRATEGY_UPSERT_DOC,
851+
group,
852+
++orderInGroup,
853+
ConfigDef.Width.MEDIUM,
854+
WRITEMODEL_STRATEGY_UPSERT_DISPLAY);
835855
configDef.define(
836856
DELETE_WRITEMODEL_STRATEGY_CONFIG,
837857
ConfigDef.Type.STRING,

src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneBusinessKeyStrategy.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@
3939

4040
public class ReplaceOneBusinessKeyStrategy implements WriteModelStrategy, Configurable {
4141

42-
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
4342
private boolean isPartialId = false;
43+
private boolean isUpsertEnabled = true;
4444

4545
@Override
4646
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
47+
ReplaceOptions replaceOptions = new ReplaceOptions().upsert(isUpsertEnabled);
4748
BsonDocument vd =
4849
document
4950
.getValueDoc()
@@ -65,12 +66,13 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
6566
if (isPartialId) {
6667
businessKey = flattenKeys(businessKey);
6768
}
68-
return new ReplaceOneModel<>(businessKey, vd, REPLACE_OPTIONS);
69+
return new ReplaceOneModel<>(businessKey, vd, replaceOptions);
6970
}
7071

7172
@Override
7273
public void configure(final MongoSinkTopicConfig configuration) {
7374
IdStrategy idStrategy = configuration.getIdStrategy();
75+
isUpsertEnabled = configuration.isUpsertEnabled();
7476
isPartialId =
7577
idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy;
7678
}

src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/ReplaceOneDefaultStrategy.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,17 @@
2929
import com.mongodb.client.model.ReplaceOptions;
3030
import com.mongodb.client.model.WriteModel;
3131

32+
import com.mongodb.kafka.connect.sink.Configurable;
33+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
3234
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
3335

34-
public class ReplaceOneDefaultStrategy implements WriteModelStrategy {
36+
public class ReplaceOneDefaultStrategy implements WriteModelStrategy, Configurable {
3537

36-
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
38+
private boolean isUpsertEnabled = true;
3739

3840
@Override
3941
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
42+
ReplaceOptions replaceOptions = new ReplaceOptions().upsert(isUpsertEnabled);
4043
BsonDocument vd =
4144
document
4245
.getValueDoc()
@@ -51,6 +54,11 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
5154
"Could not build the WriteModel,the `_id` field was missing unexpectedly");
5255
}
5356

54-
return new ReplaceOneModel<>(new BsonDocument(ID_FIELD, idValue), vd, REPLACE_OPTIONS);
57+
return new ReplaceOneModel<>(new BsonDocument(ID_FIELD, idValue), vd, replaceOptions);
58+
}
59+
60+
@Override
61+
public void configure(final MongoSinkTopicConfig configuration) {
62+
isUpsertEnabled = configuration.isUpsertEnabled();
5563
}
5664
}

src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneBusinessKeyTimestampStrategy.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,14 @@
3838

3939
public class UpdateOneBusinessKeyTimestampStrategy implements WriteModelStrategy, Configurable {
4040

41-
private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
4241
static final String FIELD_NAME_MODIFIED_TS = "_modifiedTS";
4342
static final String FIELD_NAME_INSERTED_TS = "_insertedTS";
4443
private boolean isPartialId = false;
44+
private boolean isUpsertEnabled = true;
4545

4646
@Override
4747
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
48+
UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled);
4849
BsonDocument vd =
4950
document
5051
.getValueDoc()
@@ -71,12 +72,13 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
7172
businessKey,
7273
new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
7374
.append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
74-
UPDATE_OPTIONS);
75+
updateOptions);
7576
}
7677

7778
@Override
7879
public void configure(final MongoSinkTopicConfig configuration) {
7980
IdStrategy idStrategy = configuration.getIdStrategy();
81+
isUpsertEnabled = configuration.isUpsertEnabled();
8082
isPartialId =
8183
idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy;
8284
}

src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneDefaultStrategy.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,17 @@
2929
import com.mongodb.client.model.UpdateOptions;
3030
import com.mongodb.client.model.WriteModel;
3131

32+
import com.mongodb.kafka.connect.sink.Configurable;
33+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
3234
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
3335

34-
public class UpdateOneDefaultStrategy implements WriteModelStrategy {
36+
public class UpdateOneDefaultStrategy implements WriteModelStrategy, Configurable {
3537

36-
private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
38+
private boolean isUpsertEnabled = true;
3739

3840
@Override
3941
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
42+
UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled);
4043
BsonDocument vd =
4144
document
4245
.getValueDoc()
@@ -52,6 +55,11 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
5255
}
5356
vd.remove(ID_FIELD);
5457
return new UpdateOneModel<>(
55-
new BsonDocument(ID_FIELD, idValue), new BsonDocument("$set", vd), UPDATE_OPTIONS);
58+
new BsonDocument(ID_FIELD, idValue), new BsonDocument("$set", vd), updateOptions);
59+
}
60+
61+
@Override
62+
public void configure(final MongoSinkTopicConfig configuration) {
63+
isUpsertEnabled = configuration.isUpsertEnabled();
5664
}
5765
}

src/main/java/com/mongodb/kafka/connect/sink/writemodel/strategy/UpdateOneTimestampsStrategy.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,18 @@
3030
import com.mongodb.client.model.UpdateOptions;
3131
import com.mongodb.client.model.WriteModel;
3232

33+
import com.mongodb.kafka.connect.sink.Configurable;
34+
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
3335
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
3436

35-
public class UpdateOneTimestampsStrategy implements WriteModelStrategy {
36-
private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
37+
public class UpdateOneTimestampsStrategy implements WriteModelStrategy, Configurable {
3738
static final String FIELD_NAME_MODIFIED_TS = "_modifiedTS";
3839
static final String FIELD_NAME_INSERTED_TS = "_insertedTS";
40+
private boolean isUpsertEnabled = true;
3941

4042
@Override
4143
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
44+
UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled);
4245
BsonDocument vd =
4346
document
4447
.getValueDoc()
@@ -58,6 +61,11 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
5861
new BsonDocument(ID_FIELD, idValue),
5962
new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
6063
.append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
61-
UPDATE_OPTIONS);
64+
updateOptions);
65+
}
66+
67+
@Override
68+
public void configure(final MongoSinkTopicConfig configuration) {
69+
isUpsertEnabled = configuration.isUpsertEnabled();
6270
}
6371
}

src/test/java/com/mongodb/kafka/connect/sink/writemodel/strategy/WriteModelStrategyTest.java

+56
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static com.mongodb.kafka.connect.sink.SinkTestHelper.createTopicConfig;
2424
import static org.junit.jupiter.api.Assertions.assertAll;
2525
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertFalse;
2627
import static org.junit.jupiter.api.Assertions.assertNotNull;
2728
import static org.junit.jupiter.api.Assertions.assertThrows;
2829
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -42,6 +43,7 @@
4243
import com.mongodb.client.model.UpdateOneModel;
4344
import com.mongodb.client.model.WriteModel;
4445

46+
import com.mongodb.kafka.connect.sink.Configurable;
4547
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
4648
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
4749
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
@@ -81,6 +83,7 @@ class WriteModelStrategyTest {
8183
MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_CONFIG, PartialKeyStrategy.class.getName());
8284
configMap.put(
8385
MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_TYPE_CONFIG, "AllowList");
86+
configMap.put(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true");
8487

8588
MongoSinkTopicConfig partialKeyConfig =
8689
new MongoSinkConfig(configMap).getMongoSinkTopicConfig(TEST_TOPIC);
@@ -130,6 +133,59 @@ void testDefaultWriteModelStrategy() {
130133
defaultWriteModelStrategy.getWriteModelStrategy() instanceof InsertOneDefaultStrategy);
131134
}
132135

136+
@Test
137+
@DisplayName("Ensure upsert config is correctly set")
138+
void testWriteModelStrategyUpsertConfig() {
139+
MongoSinkTopicConfig topicConfig;
140+
ReplaceOneModel<BsonDocument> replaceOneModel;
141+
UpdateOneModel<BsonDocument> updateOneModel;
142+
WriteModel<BsonDocument> result;
143+
144+
Object[] replaceStrategies = {REPLACE_ONE_BUSINESS_KEY_STRATEGY, REPLACE_ONE_DEFAULT_STRATEGY};
145+
for (Object strategy : replaceStrategies) {
146+
Configurable configurableStrategy = (Configurable) strategy;
147+
WriteModelStrategy writeStrategy = (WriteModelStrategy) strategy;
148+
149+
topicConfig =
150+
createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "false");
151+
configurableStrategy.configure(topicConfig);
152+
result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
153+
replaceOneModel = (ReplaceOneModel<BsonDocument>) result;
154+
assertFalse(replaceOneModel.getReplaceOptions().isUpsert());
155+
156+
topicConfig =
157+
createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true");
158+
configurableStrategy.configure(topicConfig);
159+
result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
160+
replaceOneModel = (ReplaceOneModel<BsonDocument>) result;
161+
assertTrue(replaceOneModel.getReplaceOptions().isUpsert());
162+
}
163+
164+
Object[] updateStrategies = {
165+
UPDATE_ONE_BUSINESS_KEY_TIMESTAMPS_STRATEGY,
166+
UPDATE_ONE_DEFAULT_STRATEGY,
167+
UPDATE_ONE_TIMESTAMPS_STRATEGY,
168+
};
169+
for (Object strategy : updateStrategies) {
170+
Configurable configurableStrategy = (Configurable) strategy;
171+
WriteModelStrategy writeStrategy = (WriteModelStrategy) strategy;
172+
173+
topicConfig =
174+
createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "false");
175+
configurableStrategy.configure(topicConfig);
176+
result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
177+
updateOneModel = (UpdateOneModel<BsonDocument>) result;
178+
assertFalse(updateOneModel.getOptions().isUpsert());
179+
180+
topicConfig =
181+
createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true");
182+
configurableStrategy.configure(topicConfig);
183+
result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
184+
updateOneModel = (UpdateOneModel<BsonDocument>) result;
185+
assertTrue(updateOneModel.getOptions().isUpsert());
186+
}
187+
}
188+
133189
@Test
134190
@DisplayName(
135191
"when sink document is valid for InsertOneDefaultStrategy then correct InsertOneModel")

0 commit comments

Comments
 (0)