diff --git a/package.json b/package.json index f36c6cd..859fc56 100644 --- a/package.json +++ b/package.json @@ -169,12 +169,12 @@ }, "dependencies": { "get-iterator": "^2.0.0", - "it-stream-types": "^1.0.3" + "it-stream-types": "^2.0.1" }, "devDependencies": { "aegir": "^38.1.7", "delay": "^5.0.0", - "it-drain": "^2.0.1", - "it-pipe": "^2.0.2" + "it-drain": "^3.0.1", + "it-pipe": "^3.0.1" } } diff --git a/src/index.ts b/src/index.ts index 404dfa2..5d2fb0e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -56,8 +56,10 @@ export interface Options { returnOnAbort?: boolean } -// Wrap an iterator to make it abortable, allow cleanup when aborted via onAbort -export function abortableSource (source: Source, signal: AbortSignal, options?: Options): AsyncGenerator, void, unknown> { +/** + * Wrap an iterator to make it abortable, allow cleanup when aborted via onAbort + */ +export function abortableSource (source: Source, signal: AbortSignal, options?: Options): AsyncGenerator { const opts: Options = options ?? {} const iterator = getIterator(source) @@ -137,11 +139,11 @@ export function abortableSource (source: Source, signal: AbortSignal, opt return abortable() } -export function abortableSink (sink: Sink, signal: AbortSignal, options?: Options): Sink { +export function abortableSink > (sink: Sink, R>, signal: AbortSignal, options?: Options): Sink, R> { return (source: Source) => sink(abortableSource(source, signal, options)) } -export function abortableDuplex > (duplex: Duplex, signal: AbortSignal, options?: Options): Duplex, TSink, RSink> { +export function abortableDuplex > (duplex: Duplex, AsyncIterable, RSink>, signal: AbortSignal, options?: Options): Duplex, AsyncIterable, RSink> { return { sink: abortableSink(duplex.sink, signal, { ...options, diff --git a/test/index.spec.ts b/test/index.spec.ts index 551f62b..dbec5b4 100644 --- a/test/index.spec.ts +++ b/test/index.spec.ts @@ -3,7 +3,7 @@ import { abortableDuplex, abortableSink, abortableSource, abortableTransform } f import drain from 'it-drain' import delay from 'delay' import { pipe } from 'it-pipe' -import type { Sink, Transform, Duplex } from 'it-stream-types' +import type { Sink, Transform, Duplex, Source } from 'it-stream-types' async function * forever (interval = 1): AsyncGenerator { // Never ends! @@ -142,7 +142,7 @@ describe('abortable-iterator', () => { it('should abort a sink', async () => { const controller = new AbortController() - const sink: Sink = async (source) => { + const sink: Sink, Promise> = async (source) => { await drain(source) } @@ -151,14 +151,14 @@ describe('abortable-iterator', () => { await expect(pipe( forever(), - abortableSink(sink, controller.signal) + async (source) => { await abortableSink(sink, controller.signal)(source) } )) .to.eventually.be.rejected.with.property('type', 'aborted') }) it('should abort a transform', async () => { const controller = new AbortController() - const transform: Transform = async function * (source) { + const transform: Transform, Source> = async function * (source) { yield * source } @@ -175,9 +175,9 @@ describe('abortable-iterator', () => { it('should abort a duplex used as a source', async () => { const controller = new AbortController() - const duplex: Duplex = { + const duplex: Duplex> = { source: forever(), - sink: drain + sink: async (source) => { await drain(source) } } // Abort after 10ms @@ -192,7 +192,7 @@ describe('abortable-iterator', () => { it('should abort a duplex used as a transform', async () => { const controller = new AbortController() - const duplex: Duplex = { + const duplex: Duplex> = { source: forever(), sink: drain } @@ -210,7 +210,7 @@ describe('abortable-iterator', () => { it('should abort a duplex used as a sink', async () => { const controller = new AbortController() - const duplex: Duplex = { + const duplex: Duplex> = { source: forever(), sink: drain }