Skip to content

Commit

Permalink
add PubSub.withReplay api
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jul 1, 2024
1 parent 35017e9 commit 24d4b44
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 13 deletions.
20 changes: 20 additions & 0 deletions .changeset/seven-ghosts-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
"effect": minor
---

add PubSub.withReplay api

This api adds a replay buffer in front of the given PubSub. The buffer will
replay the last `n` messages to any new subscriber.

```ts
Effect.gen(function*() {
const messages = [1, 2, 3, 4, 5]
const pubsub = yield* PubSub.unbounded<number>().pipe(
Effect.map(PubSub.withReplay(3))
)
yield* PubSub.publishAll(pubsub, messages)
const sub = yield* PubSub.subscribe(pubsub)
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [3, 4, 5])
}))
```
12 changes: 12 additions & 0 deletions packages/effect/src/PubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,15 @@ export const publishAll: {
* @category utils
*/
export const subscribe: <A>(self: PubSub<A>) => Effect.Effect<Queue.Dequeue<A>, never, Scope.Scope> = internal.subscribe

/**
* Add a replay buffer to the `PubSub`. The replay buffer will replay the last
* `n` messages to new subscribers.
*
* @since 3.5.0
* @category utils
*/
export const withReplay: {
(n: number): <A>(self: PubSub<A>) => PubSub<A>
<A>(self: PubSub<A>, n: number): PubSub<A>
} = internal.withReplay
160 changes: 147 additions & 13 deletions packages/effect/src/internal/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ export const publishAll = dual<
export const subscribe = <A>(self: PubSub.PubSub<A>): Effect.Effect<Queue.Dequeue<A>, never, Scope.Scope> =>
self.subscribe

/** @internal */
export const withReplay = dual<
(n: number) => <A>(self: PubSub.PubSub<A>) => PubSub.PubSub<A>,
<A>(self: PubSub.PubSub<A>, n: number) => PubSub.PubSub<A>
>(2, <A>(self: PubSub.PubSub<A>, n: number) => new ReplayPubSubImpl(self as PubSubImpl<A>, n))

/** @internal */
const makeBoundedPubSub = <A>(requestedCapacity: number): AtomicPubSub<A> => {
ensureCapacity(requestedCapacity)
Expand All @@ -158,7 +164,8 @@ const makeUnboundedPubSub = <A>(): AtomicPubSub<A> => {
const makeSubscription = <A>(
pubsub: AtomicPubSub<A>,
subscribers: Subscribers<A>,
strategy: PubSubStrategy<A>
strategy: PubSubStrategy<A>,
replayBuffer: Chunk.Chunk<A>
): Effect.Effect<Queue.Dequeue<A>> =>
core.map(core.deferredMake<void>(), (deferred) =>
unsafeMakeSubscription(
Expand All @@ -168,7 +175,8 @@ const makeSubscription = <A>(
MutableQueue.unbounded<Deferred.Deferred<A>>(),
deferred,
MutableRef.make(false),
strategy
strategy,
replayBuffer
))

/** @internal */
Expand All @@ -179,18 +187,19 @@ export const unsafeMakeSubscription = <A>(
pollers: MutableQueue.MutableQueue<Deferred.Deferred<A>>,
shutdownHook: Deferred.Deferred<void>,
shutdownFlag: MutableRef.MutableRef<boolean>,
strategy: PubSubStrategy<A>
): Queue.Dequeue<A> => {
return new SubscriptionImpl(
strategy: PubSubStrategy<A>,
replayBuffer: Chunk.Chunk<A>
): Queue.Dequeue<A> =>
new SubscriptionImpl(
pubsub,
subscribers,
subscription,
pollers,
shutdownHook,
shutdownFlag,
strategy
strategy,
replayBuffer
)
}

/** @internal */
class BoundedPubSubArb<in out A> implements AtomicPubSub<A> {
Expand Down Expand Up @@ -830,7 +839,8 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
readonly pollers: MutableQueue.MutableQueue<Deferred.Deferred<A>>,
readonly shutdownHook: Deferred.Deferred<void>,
readonly shutdownFlag: MutableRef.MutableRef<boolean>,
readonly strategy: PubSubStrategy<A>
readonly strategy: PubSubStrategy<A>,
public replayBuffer: Chunk.Chunk<A>
) {
}

Expand All @@ -850,19 +860,23 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
return core.suspend(() =>
MutableRef.get(this.shutdownFlag)
? core.interrupt
: core.succeed(this.subscription.size())
: core.succeed(this.subscription.size() + this.replayBuffer.length)
)
}

unsafeSize(): Option.Option<number> {
if (MutableRef.get(this.shutdownFlag)) {
return Option.none()
}
return Option.some(this.subscription.size())
return Option.some(this.subscription.size() + this.replayBuffer.length)
}

get isFull(): Effect.Effect<boolean> {
return core.map(this.size, (size) => size === this.capacity())
return core.suspend(() =>
MutableRef.get(this.shutdownFlag)
? core.interrupt
: core.succeed(this.subscription.size() === this.capacity())
)
}

get isEmpty(): Effect.Effect<boolean> {
Expand Down Expand Up @@ -904,6 +918,11 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
if (MutableRef.get(this.shutdownFlag)) {
return core.interrupt
}
if (Chunk.isNonEmpty(this.replayBuffer)) {
const message = Chunk.headNonEmpty(this.replayBuffer)
this.replayBuffer = Chunk.drop(this.replayBuffer, 1)
return core.succeed(message)
}
const message = MutableQueue.isEmpty(this.pollers)
? this.subscription.poll(MutableQueue.EmptyMutableQueue)
: MutableQueue.EmptyMutableQueue
Expand Down Expand Up @@ -939,6 +958,11 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
? unsafePollAllSubscription(this.subscription)
: Chunk.empty()
this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers)
if (Chunk.isNonEmpty(this.replayBuffer)) {
const replay = this.replayBuffer
this.replayBuffer = Chunk.empty()
return core.succeed(Chunk.appendAll(replay, as))
}
return core.succeed(as)
})
}
Expand All @@ -948,11 +972,22 @@ class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
if (MutableRef.get(this.shutdownFlag)) {
return core.interrupt
}
const replayLen = this.replayBuffer.length
let replay: Chunk.Chunk<A> | undefined = undefined
if (replayLen >= max) {
const as = Chunk.take(this.replayBuffer, max)
this.replayBuffer = Chunk.drop(this.replayBuffer, max)
return core.succeed(as)
} else if (replayLen > 0) {
replay = this.replayBuffer
max = max - replayLen
this.replayBuffer = Chunk.empty()
}
const as = MutableQueue.isEmpty(this.pollers)
? unsafePollN(this.subscription, max)
: Chunk.empty()
this.strategy.unsafeOnPubSubEmptySpace(this.pubsub, this.subscribers)
return core.succeed(as)
return replay ? core.succeed(Chunk.appendAll(replay, as)) : core.succeed(as)
})
}

Expand Down Expand Up @@ -1118,7 +1153,7 @@ class PubSubImpl<in out A> implements PubSub.PubSub<A> {
const acquire = core.tap(
fiberRuntime.all([
this.scope.fork(executionStrategy.sequential),
makeSubscription(this.pubsub, this.subscribers, this.strategy)
makeSubscription(this.pubsub, this.subscribers, this.strategy, Chunk.empty())
]),
(tuple) => tuple[0].addFinalizer(() => tuple[1].shutdown)
)
Expand All @@ -1141,6 +1176,105 @@ class PubSubImpl<in out A> implements PubSub.PubSub<A> {
}
}

/** @internal */
class ReplayPubSubImpl<in out A> implements PubSub.PubSub<A> {
readonly [queue.EnqueueTypeId] = queue.enqueueVariance
readonly [queue.DequeueTypeId] = queue.dequeueVariance

constructor(
readonly backing: PubSubImpl<A>,
readonly replayBufferSize: number
) {}

replayBuffer = Chunk.empty<A>()
offerReplay(value: A): void {
this.replayBuffer = Chunk.append(this.replayBuffer, value)
if (this.replayBuffer.length > this.replayBufferSize) {
this.replayBuffer = Chunk.drop(this.replayBuffer, 1)
}
}
offerAllReplay(value: Chunk.Chunk<A>): void {
this.replayBuffer = Chunk.appendAll(this.replayBuffer, value)
if (this.replayBuffer.length > this.replayBufferSize) {
this.replayBuffer = Chunk.drop(this.replayBuffer, this.replayBuffer.length - this.replayBufferSize)
}
}

capacity(): number {
return this.backing.capacity()
}
get size(): Effect.Effect<number> {
return this.backing.size
}
unsafeSize(): Option.Option<number> {
return this.backing.unsafeSize()
}
get isFull(): Effect.Effect<boolean> {
return this.backing.isFull
}
get isEmpty(): Effect.Effect<boolean> {
return this.backing.isEmpty
}
get awaitShutdown(): Effect.Effect<void> {
return this.backing.awaitShutdown
}
get isShutdown(): Effect.Effect<boolean> {
return this.backing.isShutdown
}
get shutdown(): Effect.Effect<void> {
return this.backing.shutdown
}
publish(value: A): Effect.Effect<boolean> {
return core.suspend(() => {
this.offerReplay(value)
return this.backing.publish(value)
})
}
isActive(): boolean {
return this.backing.isActive()
}
unsafeOffer(value: A): boolean {
this.offerReplay(value)
return this.backing.unsafeOffer(value)
}
publishAll(elements: Iterable<A>): Effect.Effect<boolean> {
return core.suspend(() => {
this.offerAllReplay(Chunk.fromIterable(elements))
return this.backing.publishAll(elements)
})
}
offer(value: A): Effect.Effect<boolean> {
return core.suspend(() => {
this.offerReplay(value)
return this.backing.offer(value)
})
}
offerAll(elements: Iterable<A>): Effect.Effect<boolean> {
return core.suspend(() => {
this.offerAllReplay(Chunk.fromIterable(elements))
return this.backing.offerAll(elements)
})
}
pipe() {
return pipeArguments(this, arguments)
}

get subscribe(): Effect.Effect<Queue.Dequeue<A>, never, Scope.Scope> {
const acquire = this.backing.scope.fork(executionStrategy.sequential).pipe(
core.zip(
core.suspend(() =>
makeSubscription(this.backing.pubsub, this.backing.subscribers, this.backing.strategy, this.replayBuffer)
)
),
core.tap(([scope, sub]) => scope.addFinalizer(() => sub.shutdown))
)
return core.map(
fiberRuntime.acquireRelease(acquire, ([scope], exit) => scope.close(exit)),
([, sub]) => sub
)
}
}

/** @internal */
export const makePubSub = <A>(
pubsub: AtomicPubSub<A>,
Expand Down
34 changes: 34 additions & 0 deletions packages/effect/test/PubSub.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Chunk } from "effect"
import * as Array from "effect/Array"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
Expand Down Expand Up @@ -645,4 +646,37 @@ describe("PubSub", () => {
)
)
})
it.scoped("withReplay", () =>
Effect.gen(function*() {
const messages = [1, 2, 3, 4, 5]
const pubsub = yield* PubSub.unbounded<number>().pipe(
Effect.map(PubSub.withReplay(3))
)
yield* PubSub.publishAll(pubsub, messages)
const sub = yield* PubSub.subscribe(pubsub)
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [3, 4, 5])
}))
it.effect("withReplay takeUpTo", () => {
const messages = [1, 2, 3, 4, 5]
return PubSub.unbounded<number>().pipe(
Effect.map(PubSub.withReplay(3)),
Effect.flatMap((pubsub) =>
Effect.scoped(
Effect.gen(function*() {
yield* PubSub.publishAll(pubsub, messages)

const dequeue1 = yield* PubSub.subscribe(pubsub)
yield* PubSub.publish(pubsub, 6)
const dequeue2 = yield* PubSub.subscribe(pubsub)

assert.strictEqual(yield* Queue.size(dequeue1), 4)
assert.strictEqual(yield* Queue.size(dequeue2), 3)
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeUpTo(dequeue1, 2)), [3, 4])
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeUpTo(dequeue1, 2)), [5, 6])
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeUpTo(dequeue2, 3)), [4, 5, 6])
})
)
)
)
})
})

0 comments on commit 24d4b44

Please sign in to comment.