Skip to content

Commit

Permalink
Cosmos: Pruning test extensions as master #259
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 3, 2020
1 parent e8aa1e0 commit bd04865
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 38 deletions.
6 changes: 3 additions & 3 deletions samples/Store/Backend/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ type Service(log, resolve, ?maxAttempts) =
let command = Update { email = email; preferences = value }
stream.Transact(Commands.interpret command)

member __.Update email value =
update email value
member __.Update(id, value) =
update id value

member __.Read(email) =
let stream = resolve email
stream.Query id
stream.Query id
2 changes: 1 addition & 1 deletion samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Tests(testOutputHelper) =

let act (service : Backend.ContactPreferences.Service) (id,value) = async {
let (Domain.ContactPreferences.Id email) = id
do! service.Update email value
do! service.Update(email, value)

let! actual = service.Read email
test <@ value = actual @> }
Expand Down
9 changes: 5 additions & 4 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -523,11 +523,11 @@ function sync(req, expIndex, expEtag) {
|> match result with
| Result.Written pos ->
Log.prop "nextExpectedVersion" pos >> Log.event (Log.SyncSuccess (mkMetric ru))
| Result.ConflictUnknown pos ->
Log.prop "nextExpectedVersion" pos >> propConflict >> Log.event (Log.SyncConflict (mkMetric ru))
| Result.Conflict (pos, xs) ->
| Result.ConflictUnknown pos' ->
Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.SyncConflict (mkMetric ru))
| Result.Conflict (pos', xs) ->
(if verbose then Log.propData "conflicts" xs else id)
>> Log.prop "nextExpectedVersion" pos >> propConflict >> Log.event (Log.SyncResync(mkMetric ru))
>> Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.SyncResync (mkMetric ru))
log.Information("EqxCosmos {action:l} {stream} {count}+{ucount} {ms:f1}ms {ru}RU {bytes:n0}b {exp}",
"Sync", stream, count, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru, bytes, exp)
return result }
Expand Down Expand Up @@ -832,6 +832,7 @@ module Delete =
let q = SqlQuerySpec("SELECT c.id, c.i, c.n FROM c")
let qro = Client.FeedOptions(PartitionKey = PartitionKey stream, MaxItemCount=Nullable maxItems)
container.Client.CreateDocumentQuery<_>(container.CollectionUri, q, qro).AsDocumentQuery()
log.Debug("EqxCosmos Query {query} on {stream}", query, stream)
let tryReadNextPage (x : IDocumentQuery<_>) = async {
if not x.HasMoreResults then return None else

Expand Down
2 changes: 1 addition & 1 deletion tests/Equinox.Cosmos.Integration/AsyncBatchingGateTests.fs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Equinox.CosmosStore.Integration.AsyncBatchingGateTests
module Equinox.Cosmos.Integration.AsyncBatchingGateTests

open System
open System.Collections.Concurrent
Expand Down
51 changes: 33 additions & 18 deletions tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,20 @@ type Tests(testOutputHelper) =
let sx,sy = stringOfUtf8 x, stringOfUtf8 y
test <@ ignore i; blobEquals x y || "" = xmlDiff sx sy @>

let add6EventsIn2Batches ctx streamName = async {
let add6EventsIn2BatchesEx ctx streamName splitAt = async {
let index = 0L
let! res = Events.append ctx streamName index <| TestEvents.Create(0,1)
let! res = Events.append ctx streamName index <| TestEvents.Create(0, splitAt)

test <@ AppendResult.Ok 1L = res @>
let! res = Events.append ctx streamName 1L <| TestEvents.Create(1,5)
test <@ AppendResult.Ok (int64 splitAt) = res @>
let! res = Events.append ctx streamName (int64 splitAt) <| TestEvents.Create(splitAt, 6 - splitAt)
test <@ AppendResult.Ok 6L = res @>
// Only start counting RUs from here
capture.Clear()
return TestEvents.Create(0,6)
}

let add6EventsIn2Batches ctx streamName = add6EventsIn2BatchesEx ctx streamName 1

let verifyCorrectEventsEx direction baseIndex (expected: IEventData<_>[]) (xs: ITimelineEvent<byte[]>[]) =
let xs, baseIndex =
if direction = Equinox.Cosmos.Store.Direction.Forward then xs, baseIndex
Expand Down Expand Up @@ -246,8 +248,7 @@ type Tests(testOutputHelper) =
let! conn = connectToSpecifiedCosmosOrSimulator log
let ctx = mkContextWithItemLimit conn (Some 1)

let! expected = add6EventsIn2Batches ctx streamName
capture.Clear()
let! expected = add6EventsIn2BatchesEx ctx streamName 4

let! res = Events.getAll ctx streamName 0L 1 |> AsyncSeq.concatSeq |> AsyncSeq.takeWhileInclusive (fun _ -> false) |> AsyncSeq.toArrayAsync
let expected = expected |> Array.take 1
Expand Down Expand Up @@ -330,36 +331,50 @@ type Tests(testOutputHelper) =
(* Prune *)
[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let prune (TestStream streamName) = Async.RunSynchronously <| async {
capture.Clear()
let! conn = connectToSpecifiedCosmosOrSimulator log
let ctx = mkContextWithItemLimit conn None

let! expected = add6EventsIn2Batches ctx streamName
let! expected = add6EventsIn2BatchesEx ctx streamName 4

// We should still the correct high-water mark even if we don't delete anything
capture.Clear()
let! deleted, deferred, trimmedPos = Events.prune ctx streamName 0L
test <@ deleted = 0 && deferred = 0 && trimmedPos = 0L @>
test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @>
verifyRequestChargesMax 3 // 2.86

// Trigger deletion of first batch
// Trigger deletion of first batch, but as we're in the middle of the next Batch...
capture.Clear()
let! deleted, deferred, trimmedPos = Events.prune ctx streamName 5L
test <@ deleted = 1 && deferred = 4 && trimmedPos = 1L @>
test <@ deleted = 4 && deferred = 1 && trimmedPos = 4L @>
test <@ [EqxAct.PruneResponse; EqxAct.Delete; EqxAct.Prune] = capture.ExternalCalls @>
verifyRequestChargesMax 17 // 13.33 + 2.9
verifyRequestChargesMax 17 // [13.33; 2.9]

let pos = 4L
let! res = Events.get ctx streamName 0L Int32.MaxValue
verifyCorrectEvents 1L (Array.skip 1 expected) res
verifyCorrectEvents pos (Array.skip (int pos) expected) res

// Repeat the process, but this time there should be no actual deletes
capture.Clear()
let! deleted, deferred, trimmedPos = Events.prune ctx streamName 4L
test <@ deleted = 0 && deferred = 3 && trimmedPos = 1L @>
let! deleted, deferred, trimmedPos = Events.prune ctx streamName 5L
test <@ deleted = 0 && deferred = 1 && trimmedPos = pos @>
test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @>
verifyRequestChargesMax 3 // 2.86

// We should still get the high-water mark even if we asked for less
capture.Clear()
let! deleted, deferred, trimmedPos = Events.prune ctx streamName 3L
test <@ deleted = 0 && deferred = 0 (*BUG*) && trimmedPos = 6L @>
test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @>
verifyRequestChargesMax 3 // 2.86

let! res = Events.get ctx streamName 0L Int32.MaxValue
verifyCorrectEvents 1L (Array.skip 1 expected) res
verifyCorrectEvents pos (Array.skip (int pos) expected) res

// Delete second batch
capture.Clear()
let! deleted, deferred, trimmedPos = Events.prune ctx streamName 6L
test <@ deleted = 5 && deferred = 0 && trimmedPos = 6L @>
test <@ deleted = 2 && deferred = 0 && trimmedPos = 6L @>
test <@ [EqxAct.PruneResponse; EqxAct.Delete; EqxAct.Prune] = capture.ExternalCalls @>
verifyRequestChargesMax 17 // 13.33 + 2.86

Expand All @@ -368,8 +383,8 @@ type Tests(testOutputHelper) =

// Attempt to repeat
capture.Clear()
let! deleted, deferred, trimmedPos = Events.prune ctx streamName 6L
let! deleted, deferred, trimmedPos = Events.prune ctx streamName 5L
test <@ deleted = 0 && deferred = 0 && trimmedPos = 6L @>
test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @>
verifyRequestChargesMax 3 // 2.83
}
}
10 changes: 5 additions & 5 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,16 @@ type Tests(testOutputHelper) =

// We need to be sure every Update changes something as we rely on an expected number of events in the end
let value = if value <> ContactPreferences.Fold.initial then value else { value with manyPromotions = true }
let email = let g = System.Guid.NewGuid() in g.ToString "N"
let id = let g = System.Guid.NewGuid() in g.ToString "N"

let id = ContactPreferences.Id (let g = System.Guid.NewGuid() in g.ToString "N")
let id = (let g = System.Guid.NewGuid() in g.ToString "N")
// Ensure there will be something to be changed by the Update below
for i in 0..13 do
do! service.Update(id, if i % 2 = 0 then value else { value with quickSurveys = not value.quickSurveys })
capture.Clear()
do! service.Update email value
do! service.Update(id, value)

let! result = service.Read email
let! result = service.Read id
test <@ value = result @>

test <@ [EqxAct.Tip; EqxAct.Append; EqxAct.Tip] = capture.ExternalCalls @>
Expand All @@ -213,7 +213,7 @@ type Tests(testOutputHelper) =
let! conn = connectToSpecifiedCosmosOrSimulator log
let service = ContactPreferences.createServiceWithLatestKnownEvent (createCosmosContext conn) log CachingStrategy.NoCaching

let id = ContactPreferences.Id (let g = System.Guid.NewGuid() in g.ToString "N")
let id = let g = System.Guid.NewGuid() in g.ToString "N"
// Ensure there will be something to be changed by the Update below
for i in 1..13 do
do! service.Update(id, if i % 2 = 0 then value else { value with quickSurveys = not value.quickSurveys })
Expand Down
12 changes: 6 additions & 6 deletions tests/Equinox.EventStore.Integration/StoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,18 @@ type Tests(testOutputHelper) =
let! conn = connectToLocalStore log
let service = ContactPreferences.createService log conn

let (Domain.ContactPreferences.Id email) = id
let (Domain.ContactPreferences.Id id) = id
// Feed some junk into the stream
for i in 0..11 do
let quickSurveysValue = i % 2 = 0
do! service.Update email { value with quickSurveys = quickSurveysValue }
do! service.Update(id, { value with quickSurveys = quickSurveysValue })
// Ensure there will be something to be changed by the Update below
do! service.Update email { value with quickSurveys = not value.quickSurveys }
do! service.Update(id, { value with quickSurveys = not value.quickSurveys })

capture.Clear()
do! service.Update email value
do! service.Update(id, value)

let! result = service.Read email
let! result = service.Read id
test <@ value = result @>

test <@ batchBackwardsAndAppend @ singleBatchBackwards = capture.ExternalCalls @>
Expand Down Expand Up @@ -382,4 +382,4 @@ type Tests(testOutputHelper) =
let! _ = service2.Read cartId
let suboptimalExtraSlice = [singleSliceForward]
test <@ singleBatchBackwards @ batchBackwardsAndAppend @ suboptimalExtraSlice @ singleBatchForward = capture.ExternalCalls @>
}
}

0 comments on commit bd04865

Please # to comment.