diff --git a/samples/Infrastructure/Storage.fs b/samples/Infrastructure/Storage.fs index 32ae868a7..1d65c93c7 100644 --- a/samples/Infrastructure/Storage.fs +++ b/samples/Infrastructure/Storage.fs @@ -42,6 +42,9 @@ module Cosmos = | [] Connection of string | [] Database of string | [] Container of string + | [] Connection2 of string + | [] Database2 of string + | [] Container2 of string interface IArgParserTemplate with member a.Usage = match a with @@ -53,11 +56,20 @@ module Cosmos = | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" | Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" | Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" + | Connection2 _ -> "specify a connection string for Secondary Cosmos account. Default: use same as Primary Connection" + | Database2 _ -> "specify a database name for Secondary store. Default: use same as Primary Database" + | Container2 _ -> "specify a container name for store. Default: use same as Primary Container" type Info(args : ParseResults) = member __.Mode = args.GetResult(ConnectionMode,Microsoft.Azure.Cosmos.ConnectionMode.Direct) member __.Connection = args.TryGetResult Connection |> defaultWithEnvVar "EQUINOX_COSMOS_CONNECTION" "Connection" member __.Database = args.TryGetResult Database |> defaultWithEnvVar "EQUINOX_COSMOS_DATABASE" "Database" member __.Container = args.TryGetResult Container |> defaultWithEnvVar "EQUINOX_COSMOS_CONTAINER" "Container" + member private __.Connection2 = args.TryGetResult Connection2 + member private __.Database2 = args.TryGetResult Database2 |> Option.defaultWith (fun () -> __.Database) + member private __.Container2 = args.TryGetResult Container2 |> Option.defaultWith (fun () -> __.Container) + member __.Secondary = if args.Contains Connection2 || args.Contains Database2 || args.Contains Container2 + then Some (__.Connection2, __.Database2, __.Container2) + else None member __.Timeout = args.GetResult(Timeout,5.) |> TimeSpan.FromSeconds member __.Retries = args.GetResult(Retries,1) @@ -70,17 +82,28 @@ module Cosmos = open Equinox.CosmosStore open Serilog + let logContainer (log: ILogger) name (mode, endpoint, db, container) = + log.Information("CosmosDb {name:l} {mode} {connection} Database {database} Container {container}", name, mode, endpoint, db, container) + let connect (a : Info) conn = + let discovery = Discovery.ConnectionString conn + CosmosStoreClientFactory(a.Timeout, a.Retries, a.MaxRetryWaitTime, mode=a.Mode).Create(discovery) let conn (log: ILogger) (a : Info) = - let discovery = Discovery.ConnectionString a.Connection - let client = CosmosStoreClientFactory(a.Timeout, a.Retries, a.MaxRetryWaitTime, mode=a.Mode).Create(discovery) - log.Information("CosmosDb {mode} {connection} Database {database} Container {container}", - a.Mode, client.Endpoint, a.Database, a.Container) - log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s", - (let t = a.Timeout in t.TotalSeconds), a.Retries, let x = a.MaxRetryWaitTime in x.TotalSeconds) - client, a.Database, a.Container + let (primaryClient, primaryDatabase, primaryContainer) as primary = connect a a.Connection, a.Database, a.Container + logContainer log "Primary" (a.Mode, primaryClient.Endpoint, primaryDatabase, primaryContainer) + let secondary = + match a.Secondary with + | Some (Some c2, db, container) -> Some (connect a c2, db, container) + | Some (None, db, container) -> Some (primaryClient, db, container) + | None -> None + secondary |> Option.iter (fun (client, db, container) -> logContainer log "Secondary" (a.Mode, client.Endpoint, db, container)) + primary, secondary let config (log: ILogger) (cache, unfolds, batchSize) info = - let client, databaseId, containerId = conn log info - let conn = CosmosStoreConnection(client, databaseId, containerId) + let conn = + match conn log info with + | (client, databaseId, containerId), None -> + CosmosStoreConnection(client, databaseId, containerId) + | (client, databaseId, containerId), Some (client2, db2, cont2) -> + CosmosStoreConnection(client, databaseId, containerId, client2=client2, databaseId2=db2, containerId2=cont2) let ctx = CosmosStoreContext(conn, defaultMaxItems = batchSize) let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching StorageConfig.Cosmos (ctx, cacheStrategy, unfolds) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index ccd07794d..7d9cbb362 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -171,10 +171,6 @@ module internal Position = /// Just Do It mode let fromAppendAtEnd = fromI -1L // sic - needs to yield -1 let fromEtag (value : string) = { fromI -2L with etag = Some value } - /// NB very inefficient compared to FromDocument or using one already returned to you - let fromMaxIndex (xs: ITimelineEvent[]) = - if Array.isEmpty xs then fromKnownEmpty - else fromI (1L + Seq.max (seq { for x in xs -> x.Index })) /// Create Position from Tip record context (facilitating 1 RU reads) let fromTip (x: Tip) = { index = x.n; etag = match x._etag with null -> None | x -> Some x } /// If we encounter the tip (id=-1) item/document, we're interested in its etag so we can re-sync for 1 RU @@ -186,27 +182,21 @@ module internal Position = type Direction = Forward | Backward override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward" type internal Enum() = - static member internal Events(b: Tip) : ITimelineEvent seq = - b.e |> Seq.mapi (fun offset x -> FsCodec.Core.TimelineEvent.Create(b.i + int64 offset, x.c, x.d, x.m, Guid.Empty, x.correlationId, x.causationId, x.t)) - static member Events(i: int64, e: Event[], startPos : Position option, direction) : ITimelineEvent seq = seq { - // If we're loading from a nominated position, we need to discard items in the batch before/after the start on the start page - let isValidGivenStartPos i = - match startPos with - | Some sp when direction = Direction.Backward -> i < sp.index - | Some sp -> i >= sp.index - | _ -> true + static member Events(i: int64, e: Event[], indexMin, indexMax) : ITimelineEvent seq = seq { for offset in 0..e.Length-1 do let index = i + int64 offset - if isValidGivenStartPos index then + // If we're loading from a nominated position, we need to discard items in the batch before/after the start on the start page + if index >= indexMin && index < indexMax then let x = e.[offset] yield FsCodec.Core.TimelineEvent.Create(index, x.c, x.d, x.m, Guid.Empty, x.correlationId, x.causationId, x.t) } - static member internal Events(b: Batch, startPos, direction) = - Enum.Events(b.i, b.e, startPos, direction) - |> if direction = Direction.Backward then System.Linq.Enumerable.Reverse else id + static member internal Events(t: Tip, ?minIndex, ?maxIndex) : ITimelineEvent seq = + Enum.Events(t.i, t.e, defaultArg minIndex 0L, defaultArg maxIndex Int64.MaxValue) + static member internal Events(b: Batch, ?minIndex, ?maxIndex) = + Enum.Events(b.i, b.e, defaultArg minIndex 0L, defaultArg maxIndex Int64.MaxValue) static member Unfolds(xs: Unfold[]) : ITimelineEvent seq = seq { for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, x.d, x.m, Guid.Empty, null, null, x.t, isUnfold=true) } - static member EventsAndUnfolds(x: Tip): ITimelineEvent seq = - Enum.Events x + static member EventsAndUnfolds(x: Tip, ?minIndex, ?maxIndex): ITimelineEvent seq = + Enum.Events(x, ?minIndex=minIndex, ?maxIndex=maxIndex) |> Seq.append (Enum.Unfolds x.u) // where Index is equal, unfolds get delivered after the events so the fold semantics can be 'idempotent' |> Seq.sortBy (fun x -> x.Index, x.IsUnfold) @@ -488,6 +478,7 @@ type internal SyncExp = Version of int64 | Etag of string | Any module internal Sync = + // NB don't nest in a private module, or serialization will fail miserably ;) [] type Result = | Written of Position @@ -510,7 +501,7 @@ module internal Sync = | null -> Result.Written newPos | [||] when newPos.index = 0L -> Result.Conflict (newPos, Array.empty) | [||] -> Result.ConflictUnknown newPos - | xs -> // stored proc can return unfolds with i >= req.i + | xs -> // stored proc can return events and/or unfolds with i >= req.i - no need to trim to a minIndex Result.Conflict (newPos, Enum.Unfolds xs |> Array.ofSeq) } let private logged (container,stream) (exp : SyncExp, req: Tip) (log : ILogger) @@ -628,10 +619,11 @@ module Initialization = return! createAuxContainerIfNotExists client (dName,cName) mode } /// Holds Container state, coordinating initialization activities - type internal ContainerInitializerGuard(container : Container, ?initContainer : Container -> Async) = + type internal ContainerInitializerGuard(container : Container, fallback : Container option, ?initContainer : Container -> Async) = let initGuard = initContainer |> Option.map (fun init -> AsyncCacheCell(init container)) member __.Container = container + member __.Fallback = fallback member internal __.InitializationGate = match initGuard with Some g when not (g.IsValid()) -> Some g.AwaitValue | _ -> None module internal Tip = @@ -659,13 +651,14 @@ module internal Tip = return ru, res } type [] Result = NotModified | NotFound | Found of Position * ITimelineEvent[] /// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with Result.NotModified - let tryLoad (log : ILogger) retryPolicy containerStream (maybePos: Position option): Async = async { + let tryLoad (log : ILogger) retryPolicy containerStream (maybePos: Position option, maxIndex): Async = async { let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGet get containerStream maybePos) log match res with | ReadResult.NotModified -> return Result.NotModified | ReadResult.NotFound -> return Result.NotFound | ReadResult.Found tip -> - return Result.Found (Position.fromTip tip, Enum.EventsAndUnfolds tip |> Array.ofSeq) } + let minIndex = maybePos |> Option.map (fun x -> x.index) + return Result.Found (Position.fromTip tip, Enum.EventsAndUnfolds(tip, ?maxIndex=maxIndex, ?minIndex=minIndex) |> Array.ofSeq) } module internal Query = @@ -678,21 +671,25 @@ module internal Tip = if query.HasMoreResults then yield! loop (i + 1) } loop 0 - let private mkQuery (container : Container, stream: string) maxItems (direction: Direction) startPos : FeedIterator= + let private mkQuery (container : Container, stream: string) includeTip maxItems (direction: Direction, minIndex, maxIndex) : FeedIterator = + let order = if direction = Direction.Forward then "ASC" else "DESC" let query = - let root = sprintf "SELECT c.id, c.i, c._etag, c.n, c.e FROM c WHERE c.id!=\"%s\"" Tip.WellKnownDocumentId - let tail = sprintf "ORDER BY c.i %s" (if direction = Direction.Forward then "ASC" else "DESC") - match startPos with - | None -> QueryDefinition(sprintf "%s %s" root tail) - | Some { index = positionSoExclusiveWhenBackward } -> - let cond = if direction = Direction.Forward then "c.n > @startPos" else "c.i < @startPos" - QueryDefinition(sprintf "%s AND %s %s" root cond tail).WithParameter("@startPos", positionSoExclusiveWhenBackward) - let qro = QueryRequestOptions(PartitionKey = Nullable(PartitionKey stream), MaxItemCount=Nullable maxItems) - container.GetItemQueryIterator(query, requestOptions = qro) + let args = [ + match minIndex with None -> () | Some x -> yield "c.n > @minPos", fun (q : QueryDefinition) -> q.WithParameter("@minPos", x) + match maxIndex with None -> () | Some x -> yield "c.i < @maxPos", fun (q : QueryDefinition) -> q.WithParameter("@maxPos", x) ] + let whereClause = + let notTip = sprintf "c.id!=\"%s\"" Tip.WellKnownDocumentId // until tip-isa-batch, we have a guarantee there are no events in Tip + let conditions = Seq.map fst args + String.Join(" AND ", if includeTip then conditions else Seq.append conditions (Seq.singleton notTip)) + let queryString = sprintf "SELECT c.id, c.i, c._etag, c.n, c.e FROM c WHERE %s ORDER BY c.i %s" whereClause order + let prams = Seq.map snd args + (QueryDefinition queryString, prams) ||> Seq.fold (fun q wp -> q |> wp) + let qro = QueryRequestOptions(PartitionKey=Nullable (PartitionKey stream), MaxItemCount=Nullable maxItems) + container.GetItemQueryIterator(query, requestOptions=qro) // Unrolls the Batches in a response // NOTE when reading backwards, the events are emitted in reverse Index order to suit the takeWhile consumption - let private mapPage direction (streamName: string) startPos (maxRequests: int option) + let private mapPage direction (streamName: string) (minIndex, maxIndex) (maxRequests: int option) (log: ILogger) i t (res : FeedResponse) : ITimelineEvent[] * Position option * float = let log = log |> Log.prop "batchIndex" i @@ -700,14 +697,18 @@ module internal Tip = | Some mr when i >= mr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" | _ -> () let batches, ru = Array.ofSeq res, res.RequestCharge - let events = batches |> Seq.collect (fun b -> Enum.Events(b, startPos, direction)) |> Array.ofSeq + let unwrapBatch (b : Batch) = + Enum.Events(b, ?minIndex=minIndex, ?maxIndex=maxIndex) + |> if direction = Direction.Backward then System.Linq.Enumerable.Reverse else id + let events = batches |> Seq.collect unwrapBatch |> Array.ofSeq let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count; ru = ru } let log = let evt = Log.Response (direction, reqMetric) in log |> Log.event evt let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEvents events let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i }) (log|> Log.prop "bytes" bytes - |> match startPos with Some pos -> Log.propStartPos pos | None -> id) + |> match minIndex with None -> id | Some i -> Log.prop "minIndex" i + |> match maxIndex with None -> id | Some i -> Log.prop "maxIndex" i) .Information("EqxCosmos {action:l} {count}/{batches} {direction} {ms}ms i={index} rc={ru}", "Response", count, batches.Length, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru) let maybePosition = batches |> Array.tryPick Position.tryFromBatch @@ -732,10 +733,26 @@ module internal Tip = if x.Index = stopIndex then found <- true used, dropped - let walk<'event> (log : ILogger) (container,stream) maxItems maxRequests direction + [] + type ScanResult<'event> = { found : bool; index : int64; next : int64; maybeTipPos : Position option; events : 'event[] } + + let scanTip (tryDecode: #IEventData -> 'event option, isOrigin: 'event -> bool) (pos : Position, xs: #ITimelineEvent[]) : ScanResult<'event> = + let items = ResizeArray() + let isOrigin' e = + match tryDecode e with + | None -> false + | Some e -> + items.Insert(0, e) // WalkResult always renders events ordered correctly - here we're aiming to align with Enum.EventsAndUnfolds + isOrigin e + let f, e = xs |> Seq.tryFindBack isOrigin' |> Option.isSome, items.ToArray() + { found = f; maybeTipPos = Some pos; index = pos.index; next = pos.index + 1L; events = e } + + // Yields events in ascending Index order + let scan<'event> (log : ILogger) (container,stream) includeTip maxItems maxRequests direction (tryDecode : ITimelineEvent -> 'event option, isOrigin: 'event -> bool) - startPos - : Async = async { + (minIndex, maxIndex) + : Async option> = async { + let mutable found = false let mutable responseCount = 0 let mergeBatches (log : ILogger) (batchesBackward: AsyncSeq[] * Position option * float>) = async { let mutable lastResponse, maybeTipPos, ru = None, None, 0. @@ -749,6 +766,7 @@ module internal Tip = |> AsyncSeq.concatSeq |> AsyncSeq.takeWhileInclusive (function | x, Some e when isOrigin e -> + found <- true match lastResponse with | None -> log.Information("EqxCosmos Stop stream={stream} at={index} {case}", stream, x.Index, x.EventType) | Some batch -> @@ -762,24 +780,23 @@ module internal Tip = let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream let readLog = log |> Log.prop "direction" direction let batches : AsyncSeq[] * Position option * float> = - mkQuery (container,stream) maxItems direction startPos - |> feedIteratorMapTi (mapPage direction stream startPos maxRequests readLog) + mkQuery (container,stream) includeTip maxItems (direction, minIndex, maxIndex) + |> feedIteratorMapTi (mapPage direction stream (minIndex, maxIndex) maxRequests readLog) let! t, (events, maybeTipPos, ru) = mergeBatches log batches |> Stopwatch.Time let raws = Array.map fst events - let decoded = Array.choose snd events + let decoded = if direction = Direction.Forward then Array.choose snd events else Seq.choose snd events |> Seq.rev |> Array.ofSeq let minMax = (None, raws) ||> Array.fold (fun acc x -> let i = x.Index in Some (match acc with None -> i, i | Some (n, x) -> min n i, max x i)) let version = match minMax with Some (_, max) -> max + 1L | None -> 0L - let pos = match maybeTipPos with Some p -> p | None -> Position.fromMaxIndex raws log |> logQuery direction maxItems stream t (responseCount,raws) version ru - return pos, decoded } + return minMax |> Option.map (fun (i,m) -> { found = found; index = i; next = m + 1L; maybeTipPos = maybeTipPos; events = decoded }) } - let walkLazy<'event> (log : ILogger) (container,stream) maxItems maxRequests direction + let walkLazy<'event> (log : ILogger) (container,stream) maxItems maxRequests (tryDecode : ITimelineEvent -> 'event option, isOrigin: 'event -> bool) - startPos + (direction, minIndex, maxIndex) : AsyncSeq<'event[]> = asyncSeq { - let query = mkQuery (container,stream) maxItems direction startPos + let query = mkQuery (container,stream) true maxItems (direction, minIndex, maxIndex) - let readPage = mapPage direction stream startPos maxRequests + let readPage = mapPage direction stream (minIndex, maxIndex) maxRequests let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream let readLog = log |> Log.prop "direction" direction let query = query |> feedIteratorMapTi (readPage readLog) @@ -819,6 +836,55 @@ module internal Tip = let t = StopwatchInterval(startTicks, endTicks) log |> logQuery direction maxItems stream t (i, allEvents.ToArray()) -1L ru } + /// Manages coalescing of spans of events obtained from various sources: + /// 1) Tip Data and/or Conflicting events + /// 2) Querying Primary for predecessors of what's obtained from 1 + /// 2) Querying Secondary for predecessors of what's obtained from 1 + let load (log : ILogger) (minIndex, maxIndex) (tip : ScanResult<'event> option) + (primary : int64 option * int64 option -> Async option>) + (secondary : (int64 option * int64 option -> Async option>) option) + : Async = async { + let minI = defaultArg minIndex 0L + match tip with + | Some { index = i; maybeTipPos = Some p; events = e } when i <= minI -> return p, e + | Some { found = true; maybeTipPos = Some p; events = e } -> return p, e + | _ -> + + let i, events, pos = + match tip with + | Some { index = i; maybeTipPos = p; events = e } -> Some i, e, p + | None -> maxIndex, Array.empty, None + let! primary = primary (minIndex, i) + let events, pos = + match primary with + | None -> events, pos |> Option.defaultValue Position.fromKnownEmpty + | Some p -> Array.append p.events events, pos |> Option.orElse p.maybeTipPos |> Option.defaultValue (Position.fromI p.next) + let inline logMissing (minIndex, maxIndex) message = + (log|> fun log -> match minIndex with None -> log | Some mi -> log |> Log.prop "minIndex" mi + |> fun log -> match maxIndex with None -> log | Some mi -> log |> Log.prop "maxIndex" mi) + .Information(message) + + match primary, secondary with + | Some { index = i }, _ when i <= minI -> return pos, events // primary had required earliest event Index, no need to look at secondary + | Some { found = true }, _ -> return pos, events // origin found in primary, no need to look in secondary + | None, _ when Option.isSome tip -> return pos, events // If there's no data in Tip or primary, there won't be any in secondary + | _, None -> + logMissing (minIndex, i) "Origin event not found; no secondary container supplied" + return pos, events + | _, Some secondary -> + + let maxIndex = match primary with Some p -> Some p.index | None -> maxIndex // if no batches in primary, high water mark from tip is max + let! secondary = secondary (minIndex, maxIndex) + let events = + match secondary with + | Some s -> Array.append s.events events + | None -> events + match secondary with + | Some { index = i } when i <= minI -> () + | Some { found = true } -> () + | _ -> logMissing (minIndex, maxIndex) "Origin event not found in secondary container" + return pos, events } + // Manages deletion of batches // Note: it's critical that we delete individually, in the correct order so as not to leave gaps // Note: public so BatchIndices can be deserialized into @@ -922,8 +988,7 @@ module Delete = } type [] Token = { stream: string; pos: Position } -module internal Token = - +module Token = let create stream pos : StreamToken = { value = box { stream = stream; pos = pos }; version = pos.index } let (|Unpack|) (token: StreamToken) : string*Position = let t = unbox token.value in t.stream,t.pos let supersedes (Unpack (_,currentPos)) (Unpack (_,xPos)) = @@ -935,7 +1000,7 @@ module internal Token = module Internal = [] - type InternalSyncResult = Written of StreamToken | ConflictUnknown of StreamToken | Conflict of StreamToken * ITimelineEvent[] + type InternalSyncResult = Written of StreamToken | ConflictUnknown of StreamToken | Conflict of Position * ITimelineEvent[] [] type LoadFromTokenResult<'event> = Unchanged | Found of StreamToken * 'event[] @@ -959,63 +1024,65 @@ type BatchingPolicy /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxItems` member __.MaxRequests = maxRequests -type internal StoreClient(container : Container, batching : BatchingPolicy, retry : RetryPolicy) = - let (|FromUnfold|_|) (tryDecode: #IEventData<_> -> 'event option) (isOrigin: 'event -> bool) (xs:#IEventData<_>[]) : Option<'event[]> = - let items = ResizeArray() - let isOrigin' e = - match tryDecode e with - | None -> false - | Some e -> - items.Insert(0,e) - isOrigin e - match Array.tryFindIndexBack isOrigin' xs with - | None -> None - | Some _ -> items.ToArray() |> Some - member __.LoadBackwardsStopping(log, stream, (tryDecode,isOrigin)): Async = async { - let! pos, events = Query.walk log (container,stream) batching.MaxItems batching.MaxRequests Direction.Backward (tryDecode,isOrigin) None - Array.Reverse events - return Token.create stream pos, events } - member __.Read(log, stream, direction, (tryDecode,isOrigin), startPos) : Async = async { - let! pos, events = Query.walk log (container,stream) batching.MaxItems batching.MaxRequests direction (tryDecode,isOrigin) startPos +type StoreClient(container : Container, fallback : Container option, batching : BatchingPolicy, retry: RetryPolicy) = + + // Always yields events forward, regardless of direction + member internal __.Read(log, stream, direction, (tryDecode, isOrigin), ?minIndex, ?maxIndex, ?tip): Async = async { + let tip = tip |> Option.map (Query.scanTip (tryDecode,isOrigin)) + let walk log gateway = Query.scan log (gateway,stream) (Option.isNone tip) batching.MaxItems batching.MaxRequests direction (tryDecode, isOrigin) + let walkFallback = + match fallback with + | None -> None + | Some f -> walk (log |> Log.prop "Secondary" true) f |> Some + + let log = log |> Log.prop "stream" stream + let! pos, events = Query.load log (minIndex, maxIndex) tip (walk log container) walkFallback return Token.create stream pos, events } - member __.ReadLazy(log, batching: BatchingPolicy, stream, direction, startPos, (tryDecode,isOrigin)) : AsyncSeq<'event[]> = - Query.walkLazy log (container,stream) batching.MaxItems batching.MaxRequests direction (tryDecode,isOrigin) startPos - member __.LoadFromUnfoldsOrRollingSnapshots(log, (stream,maybePos), (tryDecode,isOrigin)): Async = async { - match! Tip.tryLoad log retry.TipRetryPolicy (container,stream) maybePos with - | Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty, Array.empty - | Tip.Result.NotModified -> return invalidOp "Not handled" - | Tip.Result.Found (pos, FromUnfold tryDecode isOrigin span) -> return Token.create stream pos, span - | _ -> return! __.LoadBackwardsStopping(log, stream, (tryDecode,isOrigin)) } + + member con.Load(log, (stream, maybePos), (tryDecode, isOrigin), includeUnfolds): Async = + if not includeUnfolds then con.Read(log, stream, Direction.Backward, (tryDecode, isOrigin)) + else async { + match! Tip.tryLoad log retry.TipRetryPolicy (container,stream) (maybePos, None) with + | Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty, Array.empty + | Tip.Result.NotModified -> return invalidOp "Not applicable" + | Tip.Result.Found (pos, xs) -> return! con.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), tip=(pos, xs)) } + + member __.ReadLazy(log, batching: BatchingPolicy, stream, direction, (tryDecode,isOrigin), ?minIndex, ?maxIndex) : AsyncSeq<'event[]> = + Query.walkLazy log (container,stream) batching.MaxItems batching.MaxRequests (tryDecode,isOrigin) (direction, minIndex, maxIndex) + member __.GetPosition(log, stream, ?pos): Async = async { - match! Tip.tryLoad log retry.TipRetryPolicy (container,stream) pos with + match! Tip.tryLoad log retry.TipRetryPolicy (container,stream) (pos, None) with | Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty | Tip.Result.NotModified -> return Token.create stream pos.Value | Tip.Result.Found (pos, _unfoldsAndEvents) -> return Token.create stream pos } - member __.LoadFromToken(log, (stream,pos), (tryDecode, isOrigin)): Async> = async { - match! Tip.tryLoad log retry.TipRetryPolicy (container,stream) (Some pos) with - | Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create stream Position.fromKnownEmpty,Array.empty) - | Tip.Result.NotModified -> return LoadFromTokenResult.Unchanged - | Tip.Result.Found (pos, FromUnfold tryDecode isOrigin span) -> return LoadFromTokenResult.Found (Token.create stream pos, span) - | _ -> let! res = __.Read(log, stream, Direction.Forward, (tryDecode,isOrigin), (Some pos)) - return LoadFromTokenResult.Found res } - member __.Sync(log, stream, exp : SyncExp, batch: Tip): Async = async { + + member con.Reload(log, (stream, pos), (tryDecode, isOrigin), ?preview): Async> = + let query (pos, xs) = async { + let! res = con.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), tip=(pos, xs), minIndex=pos.index) + return LoadFromTokenResult.Found res } + match preview with + | Some (pos, xs) -> query (pos, xs) + | None -> async { + match! Tip.tryLoad log retry.TipRetryPolicy (container,stream) (Some pos, None) with + | Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create stream Position.fromKnownEmpty, Array.empty) + | Tip.Result.NotModified -> return LoadFromTokenResult.Unchanged + | Tip.Result.Found (pos,xs) -> return! query (pos, xs) } + + member internal __.Sync(log, stream, exp, batch: Tip): Async = async { if Array.isEmpty batch.e && Array.isEmpty batch.u then invalidOp "Must write either events or unfolds." match! Sync.batch log retry.WriteRetryPolicy (container,stream) (exp,batch) with - | Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (Token.create stream pos',events) + | Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (pos',events) | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create stream pos') | Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create stream pos') } member __.Prune(log, stream, beforeIndex) = - Delete.pruneBefore log (container, stream) batching.MaxItems beforeIndex + Delete.pruneBefore log (container,stream) batching.MaxItems beforeIndex type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IEventCodec<'event,byte[],'context>) = - let (|TryDecodeFold|) (fold: 'state -> 'event seq -> 'state) initial (events: ITimelineEvent seq) : 'state = Seq.choose codec.TryDecode events |> fold initial member __.Load(log, stream, initial, includeUnfolds, fold, isOrigin): Async = async { - let! token, events = - if not includeUnfolds then store.LoadBackwardsStopping(log, stream, (codec.TryDecode,isOrigin)) - else store.LoadFromUnfoldsOrRollingSnapshots(log, (stream,None), (codec.TryDecode,isOrigin)) + let! token, events = store.Load(log, (stream, None), (codec.TryDecode,isOrigin), includeUnfolds) return token, fold initial events } - member __.LoadFromToken(log, (Token.Unpack (stream, pos) as streamToken), state: 'state, fold, isOrigin): Async = async { - match! store.LoadFromToken(log, (stream, pos), (codec.TryDecode,isOrigin)) with + member __.Reload(log, (Token.Unpack (stream, pos) as streamToken), state, fold, isOrigin, ?preloaded): Async = async { + match! store.Reload(log, (stream, pos), (codec.TryDecode,isOrigin), ?preview=preloaded) with | LoadFromTokenResult.Unchanged -> return streamToken, state | LoadFromTokenResult.Found (token', events) -> return token', fold state events } member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context): Async> = async { @@ -1033,8 +1100,8 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE let projections = Sync.mkUnfold baseIndex projectionsEncoded let batch = Sync.mkBatch stream eventsEncoded projections match! store.Sync(log, stream, exp, batch) with - | InternalSyncResult.Conflict (token',TryDecodeFold fold state events') -> return SyncResult.Conflict (async { return token', events' }) - | InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (cat.LoadFromToken(log, token, state, fold, isOrigin)) + | InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', tipEvents))) + | InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin)) | InternalSyncResult.Written token' -> return SyncResult.Written (token', state') } module internal Caching = @@ -1083,7 +1150,7 @@ type internal Folder<'event, 'state, 'context> match! cache.TryGet(prefix + streamName) with | None -> return! batched log streamName | Some tokenAndState when opt = Some Equinox.AllowStale -> return tokenAndState - | Some (token, state) -> return! category.LoadFromToken(log, token, state, fold, isOrigin) } + | Some (token, state) -> return! category.Reload(log, token, state, fold, isOrigin) } member __.TrySync(log : ILogger, streamToken, state, events : 'event list, context) : Async> = async { match! category.Sync(log, streamToken, state, events, mapUnfolds, fold, isOrigin, context) with @@ -1107,22 +1174,34 @@ type CosmosStoreConnection ( /// Facilitates custom mapping of Stream Category Name to underlying Cosmos Database/Container names categoryAndStreamNameToDatabaseContainerStream : string * string -> string * string * string, createContainer : string * string -> Container, + createSecondaryContainer : string * string -> Container option, + []?primaryDatabaseAndContainerToSecondary : string * string -> string * string, /// Admits a hook to enable customization of how Equinox.CosmosStore handles the low level interactions with the underlying CosmosContainer. []?createGateway, /// Inhibit CreateStoredProcedureIfNotExists when a given Container is used for the first time []?disableInitialization) = let createGateway = match createGateway with Some creator -> creator | None -> id + let primaryDatabaseAndContainerToSecondary = defaultArg primaryDatabaseAndContainerToSecondary id // Index of database*collection -> Initialization Context let containerInitGuards = System.Collections.Concurrent.ConcurrentDictionary() new(client, databaseId : string, containerId : string, /// Inhibit CreateStoredProcedureIfNotExists when a given Container is used for the first time []?disableInitialization, /// Admits a hook to enable customization of how Equinox.CosmosStore handles the low level interactions with the underlying CosmosContainer. - []?createGateway : Container -> Container) = + []?createGateway : Container -> Container, + /// Client to use for fallback Containers. Default: use same as primary + ?client2 : CosmosClient, + /// Database to use for fallback Containers. Default: use same as databaseId + ?databaseId2, + /// Container to use for fallback Containers. Default: use same as containerId + ?containerId2) = let genStreamName (categoryName, streamId) = if categoryName = null then streamId else sprintf "%s-%s" categoryName streamId let catAndStreamToDatabaseContainerStream (categoryName, streamId) = databaseId, containerId, genStreamName (categoryName, streamId) let primaryContainer (d, c) = (client : CosmosClient).GetDatabase(d).GetContainer(c) - CosmosStoreConnection(catAndStreamToDatabaseContainerStream, primaryContainer, + let secondaryContainer = + if Option.isNone client2 && Option.isNone databaseId2 && Option.isNone containerId2 then fun (_, _) -> None + else fun (d, c) -> Some ((defaultArg client2 client).GetDatabase(defaultArg databaseId2 d).GetContainer(defaultArg containerId2 c)) + CosmosStoreConnection(catAndStreamToDatabaseContainerStream, primaryContainer, secondaryContainer, ?disableInitialization=disableInitialization, ?createGateway=createGateway) member internal __.ResolveContainerGuardAndStreamName(categoryName, streamId) : Initialization.ContainerInitializerGuard * string = let databaseId, containerId, streamName = categoryAndStreamNameToDatabaseContainerStream (categoryName, streamId) @@ -1130,8 +1209,9 @@ type CosmosStoreConnection let init = if Some true = disableInitialization then None else Some (fun cosmosContainer -> Initialization.createSyncStoredProcIfNotExists None cosmosContainer |> Async.Ignore) - let primaryContainer = createContainer (d, c) - Initialization.ContainerInitializerGuard(createGateway primaryContainer, ?initContainer=init) + let secondaryD, secondaryC = primaryDatabaseAndContainerToSecondary (d, c) + let primaryContainer, secondaryContainer = createContainer (d, c), createSecondaryContainer (secondaryD, secondaryC) + Initialization.ContainerInitializerGuard(createGateway primaryContainer, Option.map createGateway secondaryContainer, ?initContainer=init) let g = containerInitGuards.GetOrAdd((databaseId, containerId), createContainerInitializerGuard) g, streamName @@ -1145,7 +1225,7 @@ type CosmosStoreContext(connection : CosmosStoreConnection, batchingPolicy, retr member __.Retries = retryPolicy member internal __.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId) = let cg, streamId = connection.ResolveContainerGuardAndStreamName(categoryName, streamId) - let store = StoreClient(cg.Container, batchingPolicy, retryPolicy) + let store = StoreClient(cg.Container, cg.Fallback, batchingPolicy, retryPolicy) store, streamId, cg.InitializationGate [] @@ -1342,6 +1422,12 @@ type EventsContext internal let! (Token.Unpack (_,pos')), data = res return pos', data } + let getRange direction startPos = + let startPos = startPos |> Option.map (fun x -> x.index) + match direction with + | Direction.Forward -> startPos, None + | Direction.Backward -> None, startPos + new (context : Equinox.CosmosStore.CosmosStoreContext, log, ?defaultMaxItems, ?getDefaultMaxItems) = let storeClient, _streamId, _ = context.ResolveContainerClientAndStreamIdAndInit(null, null) EventsContext(context, storeClient, log, ?defaultMaxItems=defaultMaxItems, ?getDefaultMaxItems=getDefaultMaxItems) @@ -1351,10 +1437,10 @@ type EventsContext internal streamId, init member __.StreamId(streamName) : string = __.ResolveStream streamName |> fst - member internal __.GetLazy(stream, ?batchSize, ?direction, ?startPos) : AsyncSeq[]> = + member internal __.GetLazy(stream, ?batchSize, ?direction, ?minIndex, ?maxIndex) : AsyncSeq[]> = let direction = defaultArg direction Direction.Forward let batching = BatchingPolicy(defaultArg batchSize batching.MaxItems) - store.ReadLazy(log, batching, stream, direction, startPos, (Some,fun _ -> false)) + store.ReadLazy(log, batching, stream, direction, (Some,fun _ -> false), ?minIndex=minIndex, ?maxIndex=maxIndex) member internal __.GetInternal((stream, startPos), ?maxCount, ?direction) = async { let direction = defaultArg direction Direction.Forward @@ -1366,7 +1452,10 @@ type EventsContext internal match maxCount with | Some limit -> maxCountPredicate limit | None -> fun _ -> false - return! store.Read(log, stream, direction, (Some,isOrigin), startPos) } + let minIndex, maxIndex = getRange direction startPos + let! token, events = store.Read(log, stream, direction, (Some, isOrigin), ?minIndex=minIndex, ?maxIndex=maxIndex) + if direction = Direction.Backward then System.Array.Reverse events + return token, events } /// Establishes the current position of the stream in as efficient a manner as possible /// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks) @@ -1376,8 +1465,8 @@ type EventsContext internal /// Reads in batches of `batchSize` from the specified `Position`, allowing the reader to efficiently walk away from a running query /// ... NB as long as they Dispose! - member __.Walk(stream, batchSize, ?position, ?direction) : AsyncSeq[]> = - __.GetLazy(stream, batchSize, ?direction=direction, ?startPos=position) + member __.Walk(stream, batchSize, ?minIndex, ?maxIndex, ?direction) : AsyncSeq[]> = + __.GetLazy(stream, batchSize, ?direction=direction, ?minIndex=minIndex, ?maxIndex=maxIndex) /// Reads all Events from a `Position` in a given `direction` member __.Read(stream, ?position, ?maxCount, ?direction) : Async[]> = @@ -1395,7 +1484,7 @@ type EventsContext internal let batch = Sync.mkBatch stream events Seq.empty match! store.Sync(log, stream, SyncExp.Version position.index, batch) with | InternalSyncResult.Written (Token.Unpack (_,pos)) -> return AppendResult.Ok pos - | InternalSyncResult.Conflict (Token.Unpack (_,pos),events) -> return AppendResult.Conflict (pos, events) + | InternalSyncResult.Conflict (pos,events) -> return AppendResult.Conflict (pos, events) | InternalSyncResult.ConflictUnknown (Token.Unpack (_,pos)) -> return AppendResult.ConflictUnknown pos } /// Low level, non-idempotent call appending events to a stream without a concurrency control mechanism in play @@ -1439,8 +1528,8 @@ module Events = /// reading in batches of the specified size. /// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest /// sequence number in the stream. - let getAll (ctx: EventsContext) (streamName: string) (MinPosition index: int64) (batchSize: int): AsyncSeq[]> = - ctx.Walk(ctx.StreamId streamName, batchSize, ?position=index) + let getAll (ctx: EventsContext) (streamName: string) (index: int64) (batchSize: int) : AsyncSeq[]> = + ctx.Walk(ctx.StreamId streamName, batchSize, minIndex=index) /// Returns an async array of events in the stream starting at the specified sequence number, /// number of events to read is specified by batchSize @@ -1472,8 +1561,8 @@ module Events = /// reading in batches of the specified size. /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getAllBackwards (ctx: EventsContext) (streamName: string) (MaxPosition index: int64) (batchSize: int): AsyncSeq[]> = - ctx.Walk(ctx.StreamId streamName, batchSize, ?position=index, direction=Direction.Backward) + let getAllBackwards (ctx: EventsContext) (streamName: string) (index: int64) (batchSize: int) : AsyncSeq[]> = + ctx.Walk(ctx.StreamId streamName, batchSize, maxIndex=index, direction=Direction.Backward) /// Returns an async array of events in the stream backwards starting from the specified sequence number, /// number of events to read is specified by batchSize diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs index 1fa353d8e..44658930b 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs @@ -358,3 +358,62 @@ type Tests(testOutputHelper) = test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @> verifyRequestChargesMax 3 // 2.83 } + + (* Fallback *) + + [] + let fallback (TestStream streamName) = Async.RunSynchronously <| async { + let ctx1 = createPrimaryEventsContext log None + let ctx2 = createSecondaryEventsContext log None + let ctx12 = createFallbackEventsContext log None + + let! expected = add6EventsIn2Batches ctx1 streamName + // Add the same events to the secondary container + let! _ = add6EventsIn2Batches ctx2 streamName + + // Trigger deletion of first batch from primary + let! deleted, deferred, trimmedPos = Events.prune ctx1 streamName 5L + test <@ deleted = 1 && deferred = 4 && trimmedPos = 1L @> + + // Prove it's gone + capture.Clear() + let! res = Events.get ctx1 streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + verifyCorrectEvents 1L (Array.skip 1 expected) res + verifyRequestChargesMax 4 // 3.04 + + // Prove the full set exists in the secondary + capture.Clear() + let! res = Events.get ctx2 streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + verifyCorrectEvents 0L expected res + verifyRequestChargesMax 4 // 3.09 + + // Prove we can fallback with full set in secondary + capture.Clear() + let! res = Events.get ctx12 streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward; EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + verifyCorrectEvents 0L expected res + verifyRequestChargesMax 7 // 3.04 + 3.06 + + // Delete second batch in primary + capture.Clear() + let! deleted, deferred, trimmedPos = Events.prune ctx1 streamName 6L + test <@ deleted = 5 && deferred = 0 && trimmedPos = 6L @> + + // Nothing left in primary + capture.Clear() + let! res = Events.get ctx1 streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + test <@ [||] = res @> + verifyRequestChargesMax 3 // 2.99 + + // Fallback still does two queries (the first one is empty) // TODO demonstrate Primary read is only of Tip when using snapshots + capture.Clear() + let! res = Events.get ctx12 streamName 0L Int32.MaxValue +// test <@ [EqxAct.ResponseForward; EqxAct.QueryForward; EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + verifyCorrectEvents 0L expected res + verifyRequestChargesMax 7 // 2.99+3.09 + + // NOTE lazy variants don't presently apply fallback logic + } diff --git a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs index 15d0bbf60..c518a736d 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs @@ -15,6 +15,7 @@ let (|Default|) def name = (tryRead name),def ||> defaultArg let private databaseId = tryRead "EQUINOX_COSMOS_DATABASE" |> Option.defaultValue "equinox-test" let private containerId = tryRead "EQUINOX_COSMOS_CONTAINER" |> Option.defaultValue "equinox-test" +let private containerId2 = tryRead "EQUINOX_COSMOS_CONTAINER2" |> Option.defaultValue "equinox-test2" let discoverConnection () = match tryRead "EQUINOX_COSMOS_CONNECTION" with @@ -32,13 +33,41 @@ let connectPrimary (log : Serilog.ILogger) = let client = createClient log name discovery CosmosStoreConnection(client, databaseId, containerId) +let connectSecondary (log : Serilog.ILogger) = + let name, discovery = discoverConnection () + let client = createClient log name discovery + CosmosStoreConnection(client, databaseId, containerId2) + +let connectWithFallback (log : Serilog.ILogger) = + let name, discovery = discoverConnection () + let client = createClient log name discovery + CosmosStoreConnection(client, databaseId, containerId, containerId2=containerId2) + let createPrimaryContext (log: Serilog.ILogger) batchSize = let conn = connectPrimary log CosmosStoreContext(conn, defaultMaxItems = batchSize) +let createSecondaryContext (log: Serilog.ILogger) batchSize = + let conn = connectSecondary log + CosmosStoreContext(conn, defaultMaxItems = batchSize) + +let createFallbackContext (log: Serilog.ILogger) batchSize = + let conn = connectWithFallback log + CosmosStoreContext(conn, defaultMaxItems = batchSize) + let defaultBatchSize = 500 let createPrimaryEventsContext log batchSize = let batchSize = defaultArg batchSize defaultBatchSize let context = createPrimaryContext log batchSize Equinox.CosmosStore.Core.EventsContext(context, log, defaultMaxItems = batchSize) + +let createSecondaryEventsContext log batchSize = + let batchSize = defaultArg batchSize defaultBatchSize + let ctx = createSecondaryContext log batchSize + Equinox.CosmosStore.Core.EventsContext(ctx, log, defaultMaxItems = batchSize) + +let createFallbackEventsContext log batchSize = + let batchSize = defaultArg batchSize defaultBatchSize + let ctx = createFallbackContext log batchSize + Equinox.CosmosStore.Core.EventsContext(ctx, log, defaultMaxItems = batchSize) diff --git a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs index 120817f0c..4b6c4b30a 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs @@ -208,6 +208,35 @@ type Tests(testOutputHelper) = test <@ value = result @> test <@ [EqxAct.Tip; EqxAct.Append; EqxAct.Tip] = capture.ExternalCalls @> + + (* Verify pruning does not affect the copies of the events maintained as Unfolds *) + + //let ctx = createPrimaryEventsContext log None + + // Needs to share the same client for the session key to be threaded through + // If we run on an independent context, we won't see (and hence prune) the full set of events + // TODO: explain why this sleep is still needed though! + do! Async.Sleep 1000 + let ctx = Core.EventsContext(context, log) + let streamName = ContactPreferences.streamName id |> FsCodec.StreamName.toString + + // Prune all the events + let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 14L + test <@ deleted = 14 && deferred = 0 && trimmedPos = 14L @> + + // Prove they're gone + capture.Clear() + let! res = Core.Events.get ctx streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + test <@ [||] = res @> + verifyRequestChargesMax 3 // 2.99 + + // But we can still read (there's no cache so we'll definitely be reading) + capture.Clear() + let! _ = service.Read id + test <@ value = result @> + test <@ [EqxAct.Tip] = capture.ExternalCalls @> + verifyRequestChargesMax 1 } [] @@ -332,6 +361,29 @@ type Tests(testOutputHelper) = capture.Clear() let! _ = service2.Read cartId test <@ [EqxAct.Tip] = capture.ExternalCalls @> + + (* Verify pruning does not affect snapshots, though Tip is re-read in this scenario due to lack of caching *) + + // TODO: explain why this sleep is still needed though! + do! Async.Sleep 1000 + let ctx = Core.EventsContext(context, log) + let streamName = Cart.streamName cartId |> FsCodec.StreamName.toString + // Prune all the events + let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 12L + test <@ deleted = 12 && deferred = 0 && trimmedPos = 12L @> + + // Prove they're gone + capture.Clear() + let! res = Core.Events.get ctx streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + test <@ [||] = res @> + verifyRequestChargesMax 3 // 2.99 + + // But we can still read (there's no cache so we'll definitely be reading) + capture.Clear() + let! _ = service2.Read cartId + test <@ [EqxAct.Tip] = capture.ExternalCalls @> + verifyRequestChargesMax 1 } [] @@ -370,4 +422,27 @@ type Tests(testOutputHelper) = capture.Clear() do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne cartContext cartId skuId service1 1 test <@ [EqxAct.Append] = capture.ExternalCalls @> + + (* Verify pruning does not affect snapshots, and does not touch the Tip *) + + // TODO: explain why this sleep is still needed though! + do! Async.Sleep 1000 + let ctx = Core.EventsContext(context, log) + let streamName = Cart.streamName cartId |> FsCodec.StreamName.toString + // Prune all the events + let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 13L + test <@ deleted = 13 && deferred = 0 && trimmedPos = 13L @> + + // Prove they're gone + capture.Clear() + let! res = Core.Events.get ctx streamName 0L Int32.MaxValue + test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> + test <@ [||] = res @> + verifyRequestChargesMax 3 // 2.99 + + // But we can still read (service2 shares the cache so is aware of the last writes, and pruning does not invalidate the Tip) + capture.Clear() + let! _ = service2.Read cartId + test <@ [EqxAct.TipNotModified] = capture.ExternalCalls @> + verifyRequestChargesMax 1 } diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index af9fd40a7..b574430bd 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -320,7 +320,7 @@ module CosmosInit = open Equinox.CosmosStore.Core.Initialization let conn log (sargs : ParseResults) = - Storage.Cosmos.conn log (Storage.Cosmos.Info sargs) + Storage.Cosmos.conn log (Storage.Cosmos.Info sargs) |> fst let containerAndOrDb log (iargs: ParseResults) = async { match iargs.TryGetSubCommand() with