Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add PubSub.withReplay api #3133

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/pubsub-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

feat(PubSub): implement PubSub.replay
8 changes: 8 additions & 0 deletions packages/effect/src/PubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ export const sliding: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>>
*/
export const unbounded: <A>() => Effect.Effect<PubSub<A>> = internal.unbounded

/**
* Creates a replay `PubSub`.
*
* @since 3.5.0
* @category constructors
*/
export const replay: <A>(bufferSize: number) => Effect.Effect<PubSub<A>> = internal.replay

/**
* Returns the number of elements the queue can hold.
*
Expand Down
217 changes: 217 additions & 0 deletions packages/effect/src/internal/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ export const unbounded = <A>(): Effect.Effect<PubSub.PubSub<A>> =>
core.flatMap((atomicPubSub) => makePubSub(atomicPubSub, new DroppingStrategy()))
)

export const replay = <A>(
bufferSize: number = Infinity
): Effect.Effect<PubSub.PubSub<A>> =>
pipe(
core.sync(() => makeReplayPubSub<A>(bufferSize)),
core.flatMap((atomicPubSub) => makePubSub(atomicPubSub, new DroppingStrategy()))
)

/** @internal */
export const capacity = <A>(self: PubSub.PubSub<A>): number => self.capacity()

Expand Down Expand Up @@ -154,6 +162,11 @@ const makeUnboundedPubSub = <A>(): AtomicPubSub<A> => {
return new UnboundedPubSub()
}

/** @internal */
const makeReplayPubSub = <A>(bufferSize: number = 0): AtomicPubSub<A> => {
return new ReplayPubSub(bufferSize)
}

/** @internal */
const makeSubscription = <A>(
pubsub: AtomicPubSub<A>,
Expand Down Expand Up @@ -819,6 +832,210 @@ class UnboundedPubSubSubscription<in out A> implements Subscription<A> {
}
}

/** @internal */
interface ReplayPubSubNode<out A> {
value: A | AbsentValue
subscribers: number
index: number
next: ReplayPubSubNode<A> | null
}

/** @internal */
class ReplayPubSub<in out A> implements AtomicPubSub<A> {
/**
* The oldest value that was unhandled by at least 1 subscriber
*/
head: ReplayPubSubNode<A> = {
value: AbsentValue,
subscribers: 0,
index: 0,
next: null
}
/**
* The most recently published value
*/
tail = this.head
/**
* The starting node for replaying values to new subscribers.
*/
replayStart = this.tail

constructor(readonly replayBufferSize: number = 0) {
}

readonly capacity = Number.MAX_SAFE_INTEGER

isEmpty(): boolean {
return this.head === this.tail
}

isFull(): boolean {
return false
}

size(): number {
return this.tail.index - this.head.index
}

publish(value: A): boolean {
const subscribers = this.tail.subscribers
if (subscribers === 0 && this.replayBufferSize === 0) {
return true
}
this.tail.next = {
value,
subscribers,
index: ++this.tail.index,
next: null
}
// Update replayStart if necessary
if (this.replayStart.index + this.replayBufferSize === this.tail.index) {
this.replayStart = this.replayStart.next!
}
this.tail = this.tail.next
this.cleanup()
return true
}

publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
for (const a of elements) {
this.publish(a)
}
return Chunk.empty()
}

slide(): void {
if (this.head !== this.tail) {
this.head = this.head.next!
if (this.head.index <= this.replayStart.index) {
this.head.value = AbsentValue
}
}
}

cleanup() {
while (
this.head.next &&
(this.head.subscribers === 0 || this.head.next.value === AbsentValue)
) {
if (this.head.index <= this.replayStart.index) {
this.head.value = AbsentValue
}
this.head = this.head.next
}
}

subscribe(): Subscription<A> {
let node: ReplayPubSubNode<A> | null = this.replayStart

// Increment subscriber count for all nodes from replayStart to tail
while (node !== null) {
node.subscribers += 1
node = node.next
}

return new ReplayPubSubSubscription(
this,
this.replayStart,
false
)
}
}

/** @internal */
class ReplayPubSubSubscription<in out A> implements Subscription<A> {
constructor(
private publisher: ReplayPubSub<A>,
/**
* The current value being handled by this subscription.
*/
private head: ReplayPubSubNode<A>,
private unsubscribed: boolean
) {
}

isEmpty(): boolean {
if (this.unsubscribed) {
return true
}
let empty = true
let loop = true
while (loop) {
if (this.head === this.publisher.tail) {
loop = false
} else {
if (this.head.next!.value === AbsentValue) {
this.head = this.head.next!
} else {
empty = false
loop = false
}
}
}
return empty
}

size() {
if (this.unsubscribed) {
return 0
}
return this.publisher.tail.index - Math.max(this.head.index, this.publisher.head.index)
}

poll<D>(default_: D): A | D {
if (this.unsubscribed) {
return default_
}
let loop = true
let polled: A | D = default_
while (loop) {
if (this.head === this.publisher.tail) {
loop = false
} else {
const next = this.head.next!
const elem = next.value
if (elem !== AbsentValue) {
polled = elem
this.head.subscribers -= 1
if (this.head.subscribers === 0) {
this.publisher.cleanup()
}
loop = false
}
this.head = next
}
}
return polled
}

pollUpTo(n: number): Chunk.Chunk<A> {
const builder: Array<A> = []
const default_ = AbsentValue
let i = 0
while (i !== n) {
const a = this.poll(default_ as unknown as A)
if (a === default_) {
i = n
} else {
builder.push(a)
i += 1
}
}
return Chunk.fromIterable(builder)
}

unsubscribe(): void {
if (!this.unsubscribed) {
this.unsubscribed = true
do {
this.head.subscribers -= 1
this.head = this.head?.next ?? this.publisher.tail
} while (this.head !== this.publisher.tail)
this.publisher.cleanup()
}
}
}

/** @internal */
class SubscriptionImpl<in out A> implements Queue.Dequeue<A> {
[queue.DequeueTypeId] = queue.dequeueVariance
Expand Down
19 changes: 19 additions & 0 deletions packages/effect/test/PubSub.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -645,4 +645,23 @@ describe("PubSub", () => {
)
)
})
it.effect("replay", () => {
const messages = [1, 2, 3, 4, 5]
return PubSub.replay<number>(3).pipe(
Effect.flatMap((pubsub) =>
Effect.scoped(
Effect.gen(function*() {
yield* PubSub.publishAll(pubsub, messages)

const dequeue1 = yield* PubSub.subscribe(pubsub)
const dequeue2 = yield* PubSub.subscribe(pubsub)
const takes1 = yield* Queue.takeAll(dequeue1)
const takes2 = yield* Queue.takeAll(dequeue2)
assert.deepStrictEqual([...takes1], [3, 4, 5])
assert.deepStrictEqual([...takes2], [3, 4, 5])
})
)
)
)
})
})