diff --git a/README.md b/README.md index b2b7e2a..e922708 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,61 @@ Supports ESM and CommonJS modules. [![Coverage Status][codecov-image]][codecov-url] [![Maintainability][codeclimate-image]][codeclimate-url] +## Installation + +Npm installation + +```bash +npm install asygen +``` + +Yarn installation + +```bash +yarn add asygen +``` + +## Utilities + +#### Deferred + +- Represents a deferred operation. +- Provides methods resolve and reject to control the wrapped promise. +- Exposes properties promise and status to get the underlying promise and its current status. + +#### Queue + +- A queue system for handling asynchronous tasks. +- Offers methods `push`, `pull`, and `done` to manage tasks. + +#### Generatorify + +- Convert a task into an asynchronous iterable. +- The iterable can be used in `for await...of` loops to process values as they're produced. + +#### Combine + +- Combine multiple asynchronous iterables into a single iterable. +- The resulting iterable will yield values from all input iterables and complete when all of them are done. + ## Usage -#### Create deferred token +#### Create deferred operation + +```typescript +import { defer, Status } from 'asygen'; + +const deferred = defer(); +console.log(deferred.status); // Status.PENDING + +deferred.resolve(42); +deferred.promise.then((value) => { + console.log(value); // 42 + console.log(deferred.status); // Status.RESOLVED +}); +``` + +#### Create a deferred operation from events ```typescript import { defer } from 'asygen'; @@ -28,7 +80,43 @@ await result.promise; console.log(result.status); // resolved or rejected ``` +#### Task queue + +```typescript +import { createQueue } from 'asygen'; + +const queue = createQueue(); + +queue.push(1); +queue.push(2); +queue.push(3); + +queue.pull().promise.then((value) => console.log(value)); // 1 +queue.pull().promise.then((value) => console.log(value)); // 2 +``` + +#### Generatorify + +```typescript +import { generatorify } from 'asygen'; + +const task = async (callback) => { + await callback('Hello'); + await callback('World'); + return 'Done!'; +}; + +const iterable = generatorify(task); + +(async () => { + for await (const value of iterable) { + console.log(value); // "Hello", then "World" + } +})(); +``` + #### Convert events to asyncGenerator + ```typescript import { once } from 'node:events'; import { generatorify, Task } from 'asygen'; @@ -44,11 +132,40 @@ for await (const data of generatorify(task)) { } ``` +#### Combine tasks + +```typescript +import { generatorify, combine } from 'asygen'; + +const task1 = async (callback) => { + await callback('Task1 - Hello'); + await callback('Task1 - World'); +}; + +const task2 = async (callback) => { + await callback('Task2 - Foo'); + await callback('Task2 - Bar'); +}; + +const iterable1 = generatorify(task1); +const iterable2 = generatorify(task2); + +const combined = combine(iterable1, iterable2); + +(async () => { + for await (const value of combined) { + console.log(value); // Logs values from both task1 and task2 + } +})(); +``` + #### Combine generators + ```typescript import { combine } from 'asygen'; -const sleep = (timeout: number) => new Promise(resolve => setTimeout(resolve, timeout)); +const sleep = (timeout: number) => + new Promise((resolve) => setTimeout(resolve, timeout)); async function* generate(timeout: number, count: number) { for (let index = 0; index < count; index++) { @@ -70,7 +187,6 @@ for await (const data of combine(generate(100, 5), generate(500, 2))) { License [Apache-2.0](http://www.apache.org/licenses/LICENSE-2.0) Copyright (c) 2023-present Ivan Zakharchanka - [npm-url]: https://www.npmjs.com/package/asygen [downloads-image]: https://img.shields.io/npm/dw/asygen.svg?maxAge=43200 [npm-image]: https://img.shields.io/npm/v/asygen.svg?maxAge=43200 diff --git a/package.json b/package.json index deb6faf..bb64d0f 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,7 @@ "build", "src/combine.ts", "src/defer.ts", - "src/deferredPoll.ts", + "src/queue.ts", "src/generatorify.ts", "src/index.ts" ], @@ -42,6 +42,7 @@ "defer", "deferred", "generator", + "generatorify", "asyncgenerator", "iterate", "promise", @@ -49,7 +50,9 @@ "await", "yield", "flow", - "control" + "control", + "queue", + "task" ], "license": "Apache-2.0", "bugs": { diff --git a/src/__tests__/generatorify.ts b/src/__tests__/generatorify.ts index 73a2a22..b6d69bd 100644 --- a/src/__tests__/generatorify.ts +++ b/src/__tests__/generatorify.ts @@ -2,22 +2,23 @@ import { generatorify, Task } from '../index'; describe('Generatorify test suite', () => { it('should iterate an array syncronously', async () => { - const array = [0,1,2,3,4]; + const array = [0, 1, 2, 3, 4]; const handler = jest.fn(); for await (const value of generatorify(array.forEach.bind(array))) { handler(value); } expect(handler).toBeCalledTimes(array.length); - for(const value of array) { + for (const value of array) { expect(handler).toBeCalledWith(value); } }); + it('should iterate an array asyncronously', async () => { - const array = [0,1,2,3,4]; + const array = [0, 1, 2, 3, 4]; const handler = jest.fn(); const task: Task = async (send) => { - for(const value of array) { - await new Promise(r => setTimeout(r, value * 10)); + for (const value of array) { + await new Promise((r) => setTimeout(r, value * 10)); await send(value); } }; @@ -25,7 +26,7 @@ describe('Generatorify test suite', () => { handler(value); } expect(handler).toBeCalledTimes(array.length); - for(const value of array) { + for (const value of array) { expect(handler).toBeCalledWith(value); } }); diff --git a/src/__tests__/index.cjs b/src/__tests__/index.cjs index a4c5df0..7e9f5f5 100644 --- a/src/__tests__/index.cjs +++ b/src/__tests__/index.cjs @@ -1,10 +1,16 @@ -const { strict: assert } = require('node:assert') -const { defer, Deferred, generatorify, createPoll, combine } = require('../../build/index.cjs'); +const { strict: assert } = require('node:assert'); +const { + defer, + Deferred, + generatorify, + createQueue, + combine, +} = require('../../build/index.cjs'); assert(typeof defer === 'function'); assert(typeof Deferred === 'function'); assert(typeof generatorify === 'function'); -assert(typeof createPoll === 'function'); +assert(typeof createQueue === 'function'); assert(typeof combine === 'function'); console.log('CJS import test passed'); diff --git a/src/__tests__/index.mjs b/src/__tests__/index.mjs index 93d52fc..19f661d 100644 --- a/src/__tests__/index.mjs +++ b/src/__tests__/index.mjs @@ -1,10 +1,16 @@ import { strict as assert } from 'node:assert'; -import { defer, Deferred, generatorify, createPoll, combine } from '../../build/index.js'; +import { + defer, + Deferred, + generatorify, + createQueue, + combine, +} from '../../build/index.js'; assert(typeof defer === 'function'); assert(typeof Deferred === 'function'); assert(typeof generatorify === 'function'); -assert(typeof createPoll === 'function'); +assert(typeof createQueue === 'function'); assert(typeof combine === 'function'); console.log('MJS import test passed'); diff --git a/src/__tests__/queue.ts b/src/__tests__/queue.ts new file mode 100644 index 0000000..e7be593 --- /dev/null +++ b/src/__tests__/queue.ts @@ -0,0 +1,87 @@ +import { createQueue, Deferred, Status } from '../index'; + +const wait = (timeout: number, value: unknown) => + new Promise((resolve) => setTimeout(resolve, timeout, value)); + +describe('createQueue test suite', () => { + it('should create a queue', async () => { + createQueue(); + }); + + it('should push a value to the queue', async () => { + const value = Symbol('value'); + const queue = createQueue(); + const result = queue.push(value); + + expect(result.status).toBe(Status.PENDING); + await expect(result.promise).resolves.toBe(value); + expect(result).toBeInstanceOf(Deferred); + expect(result.status).toBe(Status.RESOLVED); + }); + + it('should pull a value from the queue', async () => { + const value = Symbol('value'); + const queue = createQueue(); + const result = queue.pull(); + await expect(Promise.race([wait(1, value), result.promise])).resolves.toBe( + value + ); + expect(result).toBeInstanceOf(Deferred); + expect(result.status).toBe(Status.PENDING); + }); + + it('should pull a pushed value from the queue', async () => { + const value = Symbol('value'); + const queue = createQueue(); + queue.push(wait(1, value)); + const result = queue.pull(); + await expect(result.promise).resolves.toBe(value); + expect(result).toBeInstanceOf(Deferred); + expect(result.status).toBe(Status.RESOLVED); + expect(queue.size).toBe(0); + }); + + it('should pull a few pushed values from the queue', async () => { + const value1 = Symbol('value1'); + const value2 = Symbol('value2'); + const queue = createQueue(); + queue.push(wait(10, value2)); + queue.push(wait(0, value1)); + const result1 = queue.pull(); + const result2 = queue.pull(); + + expect(result1.status).toBe(Status.PENDING); + await expect(result1.promise).resolves.toBe(value1); + expect(result1.status).toBe(Status.RESOLVED); + + expect(result2.status).toBe(Status.PENDING); + await expect(result2.promise).resolves.toBe(value2); + expect(result2.status).toBe(Status.RESOLVED); + + expect(queue.size).toBe(0); + }); + + it('should pull a few not pushed values from the queue and wait them', async () => { + const value1 = Symbol('value1'); + const value2 = Symbol('value2'); + const queue = createQueue(); + + const result1 = queue.pull(); + const result2 = queue.pull(); + + setTimeout(() => { + queue.push(wait(10, value2)); + queue.push(wait(0, value1)); + }, 100); + + expect(result1.status).toBe(Status.PENDING); + await expect(result1.promise).resolves.toBe(value1); + expect(result1.status).toBe(Status.RESOLVED); + + expect(result2.status).toBe(Status.PENDING); + await expect(result2.promise).resolves.toBe(value2); + expect(result2.status).toBe(Status.RESOLVED); + + expect(queue.size).toBe(0); + }); +}); diff --git a/src/combine.ts b/src/combine.ts index 51f59db..0f8cee8 100644 --- a/src/combine.ts +++ b/src/combine.ts @@ -1,21 +1,21 @@ -import { createPoll } from './deferredPoll.js'; +import { createQueue } from './queue.js'; export const combine = (...iterables: AsyncIterable[]) => { - const poll = createPoll>(); + const queue = createQueue>(); Promise.all( iterables.map(async (iterable) => { for await (const value of iterable) { - await poll.push({ value, done: false }).promise; + await queue.push({ value, done: false }).promise; } }) - ).then(async () => poll.done({ value: null, done: true })); + ).then(async () => queue.done({ value: null, done: true })); return { [Symbol.asyncIterator]() { return { next() { - return poll.pull().promise; + return queue.pull().promise; }, }; }, diff --git a/src/defer.ts b/src/defer.ts index e0c0891..9de6989 100644 --- a/src/defer.ts +++ b/src/defer.ts @@ -14,22 +14,36 @@ export const getId = () => { return (counter[0] + counter[1]++).toString(16); }; -export class Deferred { +export class Deferred { private txts = getId(); private _promise: Promise; - private _resolve?: (value: T) => void; + private _resolve?: (value: T | PromiseLike) => void; private _reject?: (error: E) => void; private _status: Status = Status.PENDING; + public state: S; + constructor() { this._promise = new Promise((resolve, reject) => { this._resolve = resolve; this._reject = reject; - }); + }) + .catch((error) => { + this._status = Status.REJECTED; + throw error; + }) + .then((value) => { + this._status = Status.RESOLVED; + return value; + }); this.resolve = this.resolve.bind(this); this.reject = this.reject.bind(this); } + get id() { + return this.txts; + } + get [Symbol.toStringTag]() { return `Deferred ${this.txts} ${this._status}`; } @@ -42,20 +56,15 @@ export class Deferred { return this._status; } - resolve(value: T) { - if (this._status === Status.PENDING) { - this._status = Status.RESOLVED; - this._resolve(value); - } + resolve(value: T | PromiseLike) { + this._resolve(value); return this; } reject(error: E) { - if (this._status === Status.PENDING) { - this._status = Status.REJECTED; - this._reject(error); - } + this._reject(error); + return this; } } diff --git a/src/deferredPoll.ts b/src/deferredPoll.ts deleted file mode 100644 index f1e7e67..0000000 --- a/src/deferredPoll.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { defer, Deferred } from './defer.js'; - -export const createPoll = () => { - const dPoll: Deferred[] = [defer()]; - - return { - push(value: T) { - const next = defer(); - const prev = dPoll.push(next) - 2; - const prevDeferred = dPoll[prev] as Deferred; - prevDeferred.resolve(value); - - return prevDeferred; - }, - pull() { - const current = dPoll[0]; - current.promise.then(() => dPoll.shift()); - return current; - }, - done(value: T) { - dPoll[dPoll.length - 1].resolve(value); - return Promise.all(dPoll.map((d) => d.promise)); - }, - }; -}; diff --git a/src/generatorify.ts b/src/generatorify.ts index 571f7a8..910059d 100644 --- a/src/generatorify.ts +++ b/src/generatorify.ts @@ -1,4 +1,4 @@ -import { createPoll } from './deferredPoll.js'; +import { createQueue } from './queue.js'; export interface TaskCallback { (value: T): Promise; @@ -9,19 +9,19 @@ export interface Task { } export const generatorify = (task: Task): AsyncIterable => { - const poll = createPoll>(); + const queue = createQueue>(); Promise.resolve( task(async (value) => { - await poll.push({ value, done: false }).promise; + await queue.push({ value, done: false }).promise; }) - ).then(async (value) => poll.done({ value, done: true })); + ).then(async (value) => queue.done({ value, done: true })); return { [Symbol.asyncIterator]() { return { next() { - return poll.pull().promise; + return queue.pull().promise; }, }; }, diff --git a/src/index.ts b/src/index.ts index efe1af6..2ac8511 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ export * from './defer.js'; export * from './generatorify.js'; -export * from './deferredPoll.js'; +export * from './queue.js'; export * from './combine.js'; diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 0000000..57179d3 --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,53 @@ +import { defer, Deferred, Status } from './defer.js'; + +export const createQueue = () => { + const pendingQueue: Deferred[] = []; + const resolvedQueue: Deferred[] = []; + const requestQueue: Deferred[] = []; + + const push = (value: T): Deferred => { + const task = defer(); + pendingQueue.push(task); + + Promise.resolve(value) + .then(task.resolve) + .catch(task.reject) + .finally(() => { + pendingQueue.splice(pendingQueue.indexOf(task), 1); + if (requestQueue.length > 0) { + const request = requestQueue.shift(); + request.resolve(task.promise); + } else { + resolvedQueue.push(task); + } + }); + + return task; + }; + + const pull = (): Deferred => { + if (resolvedQueue.length > 0) { + return resolvedQueue.shift(); + } + const task = defer(); + requestQueue.push(task); + + return task; + }; + + return { + push, + pull, + done(value: T) { + push(value); + return Promise.all( + [...pendingQueue, ...resolvedQueue, ...requestQueue].map( + (d) => d.promise + ) + ); + }, + get size() { + return pendingQueue.length + resolvedQueue.length + requestQueue.length; + }, + }; +};