Skip to content

Latest commit

 

History

History
235 lines (213 loc) · 6.4 KB

20. Software Transactional Memory: STM Data Structures.md

File metadata and controls

235 lines (213 loc) · 6.4 KB

20. Software Transactional Memory: STM Data Structures

20.1.1 TArray

// TArray.ts
interface TArray<in out A> extends TArray.Variance<A> {
    readonly chunk: Array<TRef.TRef<A>>
}
// https://effect.website/play/#02b649955010

const swap = <A>(
  array: TArray.TArray<A>,
  i: number,
  j: number
) =>
  STM.gen(function*() {
    const a1 = yield* TArray.get(array, i)
    const a2 = yield* TArray.get(array, j)
    yield* TArray.update(array, i, () => a2)
    yield* TArray.update(array, j, () => a1)
  })
// https://effect.website/play/#2665d5e90ff2

const transaction = STM.gen(function*() {
  const array = yield* TArray.make(1, 2, 3)
  yield* TArray.transform(array, (x) => x + 1)
  return yield* TArray.toArray(array)
}).pipe(STM.commit)

20.1.2 TMap

// TMap.ts
interface TMap<in out K, in out V> extends TMap.Variance<K, V> {}

declare const remove: <K, V>(self: TMap<K, V>, key: K) => STM.STM<void>
declare const get: <K, V>(self: TMap<K, V>, key: K) => STM.STM<Option<V>>
declare const set: <K, V>(self: TMap<K, V>, key: K, value: V) => STM.STM<void>
// https://effect.website/play/#383f36d9c0dc
const getOrElseUpdate = <K, V>(map: TMap.TMap<K, V>, k: K, v: V) =>
  TMap.get(map, k).pipe(
    STM.flatMap(Option.match({
      onSome: STM.succeed,
      onNone: () =>
        STM.succeed(v).pipe(
          STM.flatMap((x) =>
            TMap.set(map, k, x)
              .pipe(STM.as(x))
          )
        )
    }))
  )

20.1.3 TPriorityQueue

// https://effect.website/play/#2f72f53cd4e3
class Event extends Data.Class<{
  time: number
  action: Effect<void>
}> {
  static TimeOrder = Order.mapInput(Order.number, (x: Event) => x.time)
}

STM.gen(function*() {
  const queue = yield* TPriorityQueue.empty(Event.TimeOrder)
  //    ^? TPriorityQueue<Event>
  return queue
})

20.1.4 TPromise

// TDeferred.ts
interface TDeferred<in out A, in out E> {
  readonly ref: TRef.TRef<Option<Either<A, E>>>
}

20.1.5 TQueue

// TQueue.ts
declare interface TQueue<in out A> extends TEnqueue<A>, TDequeue<A> {}

/**
 * The base interface that all `TQueue`s must implement.
 */
interface BaseTQueue {
  /**
   * Returns the number of elements the queue can hold.
   */
  capacity(): number

  /**
   * Retrieves the size of the queue, which is equal to the number of elements
   * in the queue. This may be negative if fibers are suspended waiting for
   * elements to be added to the queue.
   */
  readonly size: STM.STM<number>

  /**
   * Returns `true` if the `TQueue` contains at least one element, `false`
   * otherwise.
   */
  readonly isFull: STM.STM<boolean>

  /**
   * Returns `true` if the `TQueue` contains zero elements, `false` otherwise.
   */
  readonly isEmpty: STM.STM<boolean>

  /**
   * Interrupts any fibers that are suspended on `offer` or `take`. Future calls
   * to `offer*` and `take*` will be interrupted immediately.
   */
  readonly shutdown: STM.STM<void>

  /**
   * Returns `true` if `shutdown` has been called, otherwise returns `false`.
   */
  readonly isShutdown: STM.STM<boolean>

  /**
   * Waits until the queue is shutdown. The `STM` returned by this method will
   * not resume until the queue has been shutdown. If the queue is already
   * shutdown, the `STM` will resume right away.
   */
  readonly awaitShutdown: STM.STM<void>
}

interface TDequeue<out A> extends TQueue.TDequeueVariance<A>, BaseTQueue {
  /**
   * Views the next element in the queue without removing it, retrying if the
   * queue is empty.
   */
  readonly peek: STM.STM<A>

  /**
   * Views the next element in the queue without removing it, returning `None`
   * if the queue is empty.
   */
  readonly peekOption: STM.STM<Option<A>>

  /**
   * Takes the oldest value in the queue. If the queue is empty, this will return
   * a computation that resumes when an item has been added to the queue.
   */
  readonly take: STM.STM<A>

  /**
   * Takes all the values in the queue and returns the values. If the queue is
   * empty returns an empty collection.
   */
  readonly takeAll: STM.STM<Array<A>>

  /**
   * Takes up to max number of values from the queue.
   */
  takeUpTo(max: number): STM.STM<Array<A>>
}

interface TEnqueue<in A> extends TQueue.TEnqueueVariance<A>, BaseTQueue {
  /**
   * Places one value in the queue.
   */
  offer(value: A): STM.STM<boolean>

  /**
   * For Bounded TQueue: uses the `BackPressure` Strategy, places the values in
   * the queue and always returns true. If the queue has reached capacity, then
   * the fiber performing the `offerAll` will be suspended until there is room
   * in the queue.
   *
   * For Unbounded TQueue: Places all values in the queue and returns true.
   *
   * For Sliding TQueue: uses `Sliding` Strategy If there is room in the queue,
   * it places the values otherwise it removes the old elements and enqueues the
   * new ones. Always returns true.
   *
   * For Dropping TQueue: uses `Dropping` Strategy, It places the values in the
   * queue but if there is no room it will not enqueue them and return false.
   */
  offerAll(iterable: Iterable<A>): STM.STM<boolean>
}

20.1.6 TReentrantLock

// TReentrantLock.ts
interface TReentrantLock {}
declare const acquireRead: (self: TReentrantLock) => STM.STM<number>
declare const acquireWrite: (self: TReentrantLock) => STM.STM<number>
declare const releaseRead: (self: TReentrantLock) => STM.STM<number>
declare const releaseWrite: (self: TReentrantLock) => STM.STM<number>
// https://effect.website/play/#f6563033c9d3

STM.gen(function*() {
  const lock = yield* TReentrantLock.make
  const ref = yield* TRef.make(0)
  yield* TReentrantLock.acquireWrite(lock)
  yield* TRef.update(ref, (x) => x + 1)
  yield* TReentrantLock.releaseWrite(lock)
  return yield* TRef.get(ref)
})
// https://effect.website/play/#4a556d79c9c2
let bankAccount = 0

const program = Effect.gen(function*() {
  const lock = yield* TReentrantLock.make
  const effect1 = TReentrantLock.writeLock(lock).pipe(
    Effect.flatMap(() => Effect.sync(() => bankAccount += 100)),
    Effect.scoped
  )
  const effect2 = TReentrantLock.writeLock(lock).pipe(
    Effect.flatMap((x) => Effect.sync(() => bankAccount += 50)),
    Effect.scoped
  )
  const effect3 = TReentrantLock.writeLock(lock).pipe(
    Effect.flatMap((x) => Effect.sync(() => bankAccount += 25)),
    Effect.scoped
  )

  yield* Effect.all([effect1, effect2, effect3], {
    concurrency: "unbounded",
    discard: true
  })

  return bankAccount
})
// 175