Skip to content

Latest commit

 

History

History
146 lines (128 loc) · 5.01 KB

13. Concurrent Structures: Semaphore - Work Limiting.md

File metadata and controls

146 lines (128 loc) · 5.01 KB

13. Concurrent Structures: Semaphore - Work Limiting

13.1 Semaphore Interface

// Effect.ts
interface Semaphore {
  /** when the given amount of permits are available, run the effect and release the permits when finished */
  withPermits(permits: number): <A, E, R>(self: Effect<A, E, R>) => Effect<A, E, R>
  /** only if the given permits are available, run the effect and release the permits when finished */
  withPermitsIfAvailable(permits: number): <A, E, R>(self: Effect<A, E, R>) => Effect<Option<A>, E, R>
  /** take the given amount of permits, suspending if they are not yet available */
  take(permits: number): Effect<number>
  /** release the given amount of permits, and return the resulting available permits */
  release(permits: number): Effect<number>
  /** release all the taken permits, and return the resulting available permits */
  releaseAll: Effect<number>
}

13.2 Using Semaphores to Limit Parallelism

// https://effect.website/play#2b313f26dc3e
const queryDatabase = (connections: Ref.Ref<number>): Effect<void> =>
  Ref.updateAndGet(connections, (n) => n + 1).pipe(
    Effect.flatMap((n) =>
      Effect.log(`Aquiring, now ${n} connections`).pipe(
        Effect.zipRight(Effect.sleep("1 second")),
        Effect.zipRight(Effect.log(`Closing, now ${n - 1} connections`))
      )
    )
  )

const program = Effect.gen(function*() {
  const ref = yield* Ref.make(0)
  const semaphore = yield* Effect.makeSemaphore(4)
  const query = semaphore.withPermits(1)(queryDatabase(ref))
  yield* Effect.forEach(Array.range(1, 10), (_) => query, {
    concurrency: "unbounded"
  })
})
// [13:10:01.314] INFO (#9): Aquiring, now 1 connections
// [13:10:01.316] INFO (#10): Aquiring, now 2 connections
// [13:10:01.316] INFO (#11): Aquiring, now 3 connections
// [13:10:01.317] INFO (#12): Aquiring, now 4 connections
// [13:10:02.325] INFO (#9): Closing, now 0 connections
// [13:10:02.327] INFO (#13): Aquiring, now 5 connections
// [13:10:02.327] INFO (#10): Closing, now 1 connections
// [13:10:02.328] INFO (#14): Aquiring, now 6 connections
// [13:10:02.328] INFO (#11): Closing, now 2 connections
// [13:10:02.329] INFO (#15): Aquiring, now 7 connections
// [13:10:02.330] INFO (#12): Closing, now 3 connections
// [13:10:02.330] INFO (#16): Aquiring, now 8 connections
// [13:10:03.334] INFO (#13): Closing, now 4 connections
// [13:10:03.335] INFO (#17): Aquiring, now 9 connections
// [13:10:03.335] INFO (#14): Closing, now 5 connections
// [13:10:03.336] INFO (#18): Aquiring, now 10 connections
// [13:10:03.341] INFO (#15): Closing, now 6 connections
// [13:10:03.342] INFO (#16): Closing, now 7 connections
// [13:10:04.345] INFO (#17): Closing, now 8 connections
// [13:10:04.346] INFO (#18): Closing, now 9 connections

13.3 Using Semaphore to Implement Operators

// https://effect.website/play#7d9e91f586da
const forEachParN = <A, B, E, R>(n: number, as: Iterable<A>, f: (a: A) => Effect<B, E, R>) =>
  Effect.gen(function*() {
    const semaphore = yield* Effect.makeSemaphore(n)
    const withPermit = semaphore.withPermits(1)
    const bs = yield* Effect.forEach(as, (a) => withPermit(f(a)), { concurrency: "unbounded" })
    return bs
  })

// But in Effect we have embeded concurrency option
Effect.forEach(Array.range(1, 10), (_) => query, {
    concurrency: 4 // <-- this is "n"-arg from `forEachParN`
  })

13.4 Making a Data Structure Safe for Concurrent Access

// https://effect.website/play#25b8be85d5fd
interface Synchronized<A> {
  modify: <B, E, R>(f: (a: A) => Effect<[B, A], E, R>) => Effect<B, E, R>
}

const make = <A>(a: A) =>
  Ref.make(a).pipe(Effect.map((ref): Synchronized<A> => ({
    modify: <B, E, R>(f: (a: A) => Effect<[B, A], E, R>) =>
      Effect.gen(function*() {
        const a = yield* Ref.get(ref)
        const v = yield* f(a)
        yield* Ref.set(ref, v[1])
        return v[0]
      })
  })))
// https://effect.website/play#f2746a38240c
interface Synchronized<A> {
  modify: <B, E, R>(f: (a: A) => Effect<[B, A], E, R>) => Effect<B, E, R>
}

const make = Effect.fn(function*<A>(a: A) {
  const ref = yield* Ref.make(a)
  const semaphore = yield* Effect.makeSemaphore(1)

  const synchronized: Synchronized<A> = {
    modify: Effect.fn(function*<B, E, R>(f: (a: A) => Effect<[B, A], E, R>) {
      const a = yield* Ref.get(ref)
      const v = yield* f(a)

      yield* Ref.set(ref, v[1])
      // but
      // const _temp = Ref.set(ref, v[1])
      // yield* _temp

      return v[0]
    }, semaphore.withPermits(1))
  }

  return synchronized
})

// same as
// https://effect.website/play#bbd01d01a409

const make = <A>(a: A) => Effect.gen(function*() {
    const ref = yield* Ref.make(a)
    const semaphore = yield* Effect.makeSemaphore(1)

    const synchronized: Synchronized<A> = {
      modify: <B, E, R>(f: (a: A) => Effect<[B, A], E, R>) =>
        semaphore.withPermits(1)(Effect.gen(function*() {
          const a = yield* Ref.get(ref)
          const v = yield* f(a)

          yield* Ref.set(ref, v[1])

          return v[0]
        }))
    }

    return synchronized
  })