Skip to content

Commit

Permalink
Fallback CP
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 30, 2020
1 parent f546b1b commit 4ca11f3
Show file tree
Hide file tree
Showing 11 changed files with 404 additions and 123 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Equinox does not focus on projection logic or wrapping thereof - each store brin

- `FsKafka` [![FsKafka NuGet](https://img.shields.io/nuget/v/FsKafka.svg)](https://www.nuget.org/packages/FsKafka/): Wraps `Confluent.Kafka` to provide efficient batched Kafka Producer and Consumer configurations, with basic logging instrumentation. Used in the [`propulsion project kafka`](https://github.com/jet/propulsion#dotnet-tool-provisioning--projections-test-tool) tool command; see [`dotnet new proProjector -k; dotnet new proConsumer` to generate a sample app](https://github.com/jet/dotnet-templates#propulsion-related) using it (see the `BatchedAsync` and `BatchedSync` modules in `Examples.fs`).
- `Propulsion` [![Propulsion NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/): defines a canonical `Propulsion.Streams.StreamEvent` used to interop with `Propulsion.*` in processing pipelines for the `proProjector` and `proSync` templates in the [templates repo](https://github.com/jet/dotnet-templates), together with the `Ingestion`, `Streams`, `Progress` and `Parallel` modules that get composed into those processing pipelines. ([depends](https://www.fuget.org/packages/Propulsion) on `Serilog`)
- `Propulsion.Cosmos` [![Propulsion.Cosmos NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/): Wraps the [Microsoft .NET `ChangeFeedProcessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) providing a [processor loop](DOCUMENTATION.md#change-feed-processors) that maintains a continuous query loop per CosmosDb Physical Partition (Range) yielding new or updated documents (optionally unrolling events written by `Equinox.CosmosStore` for processing or forwarding). Used in the [`propulsion project stats cosmos`](dotnet-tool-provisioning--benchmarking-tool) tool command; see [`dotnet new proProjector` to generate a sample app](#quickstart) using it. ([depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.CosmosStore`, `Microsoft.Azure.DocumentDb.ChangeFeedProcessor >= 2.2.5`)
- `Propulsion.Cosmos` [![Propulsion.Cosmos NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/): Wraps the [Microsoft .NET `ChangeFeedProcessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) providing a [processor loop](DOCUMENTATION.md#change-feed-processors) that maintains a continuous query loop per CosmosDb Physical Partition (Range) yielding new or updated documents (optionally unrolling events written by `Equinox.CosmosStore` for processing or forwarding). Used in the [`propulsion project stats cosmos`](dotnet-tool-provisioning--benchmarking-tool) tool command; see [`dotnet new proProjector` to generate a sample app](#quickstart) using it. ([depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDb.ChangeFeedProcessor >= 2.2.5`)
- `Propulsion.EventStore` [![Propulsion.EventStore NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/) Used in the [`propulsion project es`](dotnet-tool-provisioning--benchmarking-tool) tool command; see [`dotnet new proSync` to generate a sample app](#quickstart) using it. ([depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore`)
- `Propulsion.Kafka` [![Propulsion.Kafka NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/): Provides a canonical `RenderedSpan` that can be used as a default format when projecting events via e.g. the Producer/Consumer pair in `dotnet new proProjector -k; dotnet new proConsumer`. ([depends](https://www.fuget.org/packages/Propulsion.Kafka) on `Newtonsoft.Json >= 11.0.2`, `Propulsion`, `FsKafka`)

Expand Down
41 changes: 32 additions & 9 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ module Cosmos =
| [<AltCommandLine "-s">] Connection of string
| [<AltCommandLine "-d">] Database of string
| [<AltCommandLine "-c">] Container of string
| [<AltCommandLine "-s2">] Connection2 of string
| [<AltCommandLine "-d2">] Database2 of string
| [<AltCommandLine "-c2">] Container2 of string
interface IArgParserTemplate with
member a.Usage =
match a with
Expand All @@ -53,11 +56,20 @@ module Cosmos =
| Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)"
| Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)"
| Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)"
| Connection2 _ -> "specify a connection string for Secondary Cosmos account. Default: use same as Primary Connection"
| Database2 _ -> "specify a database name for Secondary store. Default: use same as Primary Database"
| Container2 _ -> "specify a container name for store. Default: use same as Primary Container"
type Info(args : ParseResults<Arguments>) =
member __.Mode = args.GetResult(ConnectionMode,Microsoft.Azure.Cosmos.ConnectionMode.Direct)
member __.Connection = args.TryGetResult Connection |> defaultWithEnvVar "EQUINOX_COSMOS_CONNECTION" "Connection"
member __.Database = args.TryGetResult Database |> defaultWithEnvVar "EQUINOX_COSMOS_DATABASE" "Database"
member __.Container = args.TryGetResult Container |> defaultWithEnvVar "EQUINOX_COSMOS_CONTAINER" "Container"
member private __.Connection2 = args.TryGetResult Connection2
member private __.Database2 = args.TryGetResult Database2 |> Option.defaultWith (fun () -> __.Database)
member private __.Container2 = args.TryGetResult Container2 |> Option.defaultWith (fun () -> __.Container)
member __.Secondary = if args.Contains Connection2 || args.Contains Database2 || args.Contains Container2
then Some (__.Connection2, __.Database2, __.Container2)
else None

member __.Timeout = args.GetResult(Timeout,5.) |> TimeSpan.FromSeconds
member __.Retries = args.GetResult(Retries,1)
Expand All @@ -70,17 +82,28 @@ module Cosmos =
open Equinox.CosmosStore
open Serilog

let logContainer (log: ILogger) name (mode, endpoint, db, container) =
log.Information("CosmosDb {name:l} {mode} {connection} Database {database} Container {container}", name, mode, endpoint, db, container)
let connect (a : Info) conn =
let discovery = Discovery.ConnectionString conn
CosmosStoreClientFactory(a.Timeout, a.Retries, a.MaxRetryWaitTime, mode=a.Mode).Create(discovery)
let conn (log: ILogger) (a : Info) =
let discovery = Discovery.ConnectionString a.Connection
let client = CosmosStoreClientFactory(a.Timeout, a.Retries, a.MaxRetryWaitTime, mode=a.Mode).Create(discovery)
log.Information("CosmosDb {mode} {connection} Database {database} Container {container}",
a.Mode, client.Endpoint, a.Database, a.Container)
log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s",
(let t = a.Timeout in t.TotalSeconds), a.Retries, let x = a.MaxRetryWaitTime in x.TotalSeconds)
client, a.Database, a.Container
let (primaryClient, primaryDatabase, primaryContainer) as primary = connect a a.Connection, a.Database, a.Container
logContainer log "Primary" (a.Mode, primaryClient.Endpoint, primaryDatabase, primaryContainer)
let secondary =
match a.Secondary with
| Some (Some c2, db, container) -> Some (connect a c2, db, container)
| Some (None, db, container) -> Some (primaryClient, db, container)
| None -> None
secondary |> Option.iter (fun (client, db, container) -> logContainer log "Secondary" (a.Mode, client.Endpoint, db, container))
primary, secondary
let config (log: ILogger) (cache, unfolds, batchSize) info =
let client, databaseId, containerId = conn log info
let conn = CosmosStoreConnection(client, databaseId, containerId)
let conn =
match conn log info with
| (client, databaseId, containerId), None ->
CosmosStoreConnection(client, databaseId, containerId)
| (client, databaseId, containerId), Some (client2, db2, cont2) ->
CosmosStoreConnection(client, databaseId, containerId, client2=client2, databaseId2=db2, containerId2=cont2)
let ctx = CosmosStoreContext(conn, defaultMaxItems = batchSize)
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
StorageConfig.Cosmos (ctx, cacheStrategy, unfolds)
Expand Down
2 changes: 2 additions & 0 deletions samples/Tutorial/Tutorial.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
<ItemGroup>
<PackageReference Include="FSharp.Core" Version="4.3.4" />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.1.0" />

<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
</ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions samples/Tutorial/Upload.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module Events =
type Event =
| IdAssigned of IdAssigned
interface TypeShape.UnionContract.IUnionContract

let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Fold =
Expand Down
2 changes: 2 additions & 0 deletions src/Equinox.Core/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ module Array =
elif predicate array.[i] then Some i
else loop (i - 1)
loop (array.Length - 1)
// let singleton v = Array.create 1 v

module Option =
let filter predicate option = match option with None -> None | Some x -> if predicate x then Some x else None
let toNullable option = match option with Some x -> Nullable x | None -> Nullable ()
let ofObj obj = match obj with null -> None | x -> Some x
let toObj option = match option with None -> null | Some x -> x
// let defaultWith f = function | Some v -> v | _ -> f()
#endif

type Async with
Expand Down
Loading

0 comments on commit 4ca11f3

Please # to comment.