From 47d16a73d03caf415585a291e76c2a96ed735b16 Mon Sep 17 00:00:00 2001 From: Jamie Date: Thu, 21 Dec 2023 14:20:01 +1300 Subject: [PATCH] try to decode once via all --- .../20231128_jellyfish_migration.go | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go index baed77ef7..e37813688 100644 --- a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go +++ b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go @@ -385,9 +385,9 @@ func (m *Migration) fetchAndUpdateBatch() bool { if dataC := m.getDataCollection(); dataC != nil { fetchStart := time.Now() + dataSet := []bson.M{} dDataCursor, err := dataC.Find(m.ctx, selector, &options.FindOptions{ - Limit: &m.config.readBatchSize, Sort: bson.M{"_id": 1}, BatchSize: &size, }, @@ -398,27 +398,37 @@ func (m *Migration) fetchAndUpdateBatch() bool { return false } + err = dDataCursor.All(m.ctx, &dataSet) + + if err != nil { + log.Printf("decoding data: %s", err) + return false + } + defer dDataCursor.Close(m.ctx) - log.Printf("fetch took %s", time.Since(fetchStart)) + log.Printf("fetch took %s for %d items", time.Since(fetchStart), len(dataSet)) updateStart := time.Now() - for dDataCursor.Next(m.ctx) { - var dDataResult bson.M - if err = dDataCursor.Decode(&dDataResult); err != nil { - log.Printf("failed decoding data: %s", err) - return false - } - datumID, datumUpdates, err := utils.GetDatumUpdates(dDataResult) + for _, item := range dataSet { + + //for dDataCursor.Next(m.ctx) { + // var dDataResult bson.M + // if err = dDataCursor.Decode(&dDataResult); err != nil { + // log.Printf("failed decoding data: %s", err) + // return false + // } + datumID, datumUpdates, err := utils.GetDatumUpdates(item) if err != nil { m.onError(err, datumID, "failed getting updates") continue } updateOp := mongo.NewUpdateOneModel() - updateOp.SetFilter(bson.M{"_id": datumID, "modifiedTime": dDataResult["modifiedTime"]}) + updateOp.SetFilter(bson.M{"_id": datumID, "modifiedTime": item["modifiedTime"]}) updateOp.SetUpdate(datumUpdates) m.updates = append(m.updates, updateOp) m.lastUpdatedId = datumID } + //} log.Printf("batch iteration took %s", time.Since(updateStart)) log.Printf("fetch and update took %s", time.Since(fetchAndUpdateStart))