Skip to content

Commit

Permalink
fix: replaces poll by queue
Browse files Browse the repository at this point in the history
  • Loading branch information
3axap4eHko committed Aug 8, 2023
1 parent 4e0e51e commit b03f045
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 64 deletions.
122 changes: 119 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,61 @@ Supports ESM and CommonJS modules.
[![Coverage Status][codecov-image]][codecov-url]
[![Maintainability][codeclimate-image]][codeclimate-url]

## Installation

Npm installation

```bash
npm install asygen
```

Yarn installation

```bash
yarn add asygen
```

## Utilities

#### Deferred

- Represents a deferred operation.
- Provides methods resolve and reject to control the wrapped promise.
- Exposes properties promise and status to get the underlying promise and its current status.

#### Queue

- A queue system for handling asynchronous tasks.
- Offers methods `push`, `pull`, and `done` to manage tasks.

#### Generatorify

- Convert a task into an asynchronous iterable.
- The iterable can be used in `for await...of` loops to process values as they're produced.

#### Combine

- Combine multiple asynchronous iterables into a single iterable.
- The resulting iterable will yield values from all input iterables and complete when all of them are done.

## Usage

#### Create deferred token
#### Create deferred operation

```typescript
import { defer, Status } from 'asygen';

const deferred = defer<number>();
console.log(deferred.status); // Status.PENDING

deferred.resolve(42);
deferred.promise.then((value) => {
console.log(value); // 42
console.log(deferred.status); // Status.RESOLVED
});
```

#### Create a deferred operation from events

```typescript
import { defer } from 'asygen';
Expand All @@ -28,7 +80,43 @@ await result.promise;
console.log(result.status); // resolved or rejected
```

#### Task queue

```typescript
import { createQueue } from 'asygen';

const queue = createQueue<number>();

queue.push(1);
queue.push(2);
queue.push(3);

queue.pull().promise.then((value) => console.log(value)); // 1
queue.pull().promise.then((value) => console.log(value)); // 2
```

#### Generatorify

```typescript
import { generatorify } from 'asygen';

const task = async (callback) => {
await callback('Hello');
await callback('World');
return 'Done!';
};

const iterable = generatorify(task);

(async () => {
for await (const value of iterable) {
console.log(value); // "Hello", then "World"
}
})();
```

#### Convert events to asyncGenerator

```typescript
import { once } from 'node:events';
import { generatorify, Task } from 'asygen';
Expand All @@ -44,11 +132,40 @@ for await (const data of generatorify(task)) {
}
```

#### Combine tasks

```typescript
import { generatorify, combine } from 'asygen';

const task1 = async (callback) => {
await callback('Task1 - Hello');
await callback('Task1 - World');
};

const task2 = async (callback) => {
await callback('Task2 - Foo');
await callback('Task2 - Bar');
};

const iterable1 = generatorify(task1);
const iterable2 = generatorify(task2);

const combined = combine(iterable1, iterable2);

(async () => {
for await (const value of combined) {
console.log(value); // Logs values from both task1 and task2
}
})();
```

#### Combine generators

```typescript
import { combine } from 'asygen';

const sleep = (timeout: number) => new Promise(resolve => setTimeout(resolve, timeout));
const sleep = (timeout: number) =>
new Promise((resolve) => setTimeout(resolve, timeout));

async function* generate(timeout: number, count: number) {
for (let index = 0; index < count; index++) {
Expand All @@ -70,7 +187,6 @@ for await (const data of combine(generate(100, 5), generate(500, 2))) {
License [Apache-2.0](http://www.apache.org/licenses/LICENSE-2.0)
Copyright (c) 2023-present Ivan Zakharchanka


[npm-url]: https://www.npmjs.com/package/asygen
[downloads-image]: https://img.shields.io/npm/dw/asygen.svg?maxAge=43200
[npm-image]: https://img.shields.io/npm/v/asygen.svg?maxAge=43200
Expand Down
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"build",
"src/combine.ts",
"src/defer.ts",
"src/deferredPoll.ts",
"src/queue.ts",
"src/generatorify.ts",
"src/index.ts"
],
Expand Down Expand Up @@ -42,14 +42,17 @@
"defer",
"deferred",
"generator",
"generatorify",
"asyncgenerator",
"iterate",
"promise",
"async",
"await",
"yield",
"flow",
"control"
"control",
"queue",
"task"
],
"license": "Apache-2.0",
"bugs": {
Expand Down
13 changes: 7 additions & 6 deletions src/__tests__/generatorify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@ import { generatorify, Task } from '../index';

describe('Generatorify test suite', () => {
it('should iterate an array syncronously', async () => {
const array = [0,1,2,3,4];
const array = [0, 1, 2, 3, 4];
const handler = jest.fn();
for await (const value of generatorify(array.forEach.bind(array))) {
handler(value);
}
expect(handler).toBeCalledTimes(array.length);
for(const value of array) {
for (const value of array) {
expect(handler).toBeCalledWith(value);
}
});

it('should iterate an array asyncronously', async () => {
const array = [0,1,2,3,4];
const array = [0, 1, 2, 3, 4];
const handler = jest.fn();
const task: Task<number> = async (send) => {
for(const value of array) {
await new Promise(r => setTimeout(r, value * 10));
for (const value of array) {
await new Promise((r) => setTimeout(r, value * 10));
await send(value);
}
};
for await (const value of generatorify(task)) {
handler(value);
}
expect(handler).toBeCalledTimes(array.length);
for(const value of array) {
for (const value of array) {
expect(handler).toBeCalledWith(value);
}
});
Expand Down
12 changes: 9 additions & 3 deletions src/__tests__/index.cjs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
const { strict: assert } = require('node:assert')
const { defer, Deferred, generatorify, createPoll, combine } = require('../../build/index.cjs');
const { strict: assert } = require('node:assert');
const {
defer,
Deferred,
generatorify,
createQueue,
combine,
} = require('../../build/index.cjs');

assert(typeof defer === 'function');
assert(typeof Deferred === 'function');
assert(typeof generatorify === 'function');
assert(typeof createPoll === 'function');
assert(typeof createQueue === 'function');
assert(typeof combine === 'function');

console.log('CJS import test passed');
10 changes: 8 additions & 2 deletions src/__tests__/index.mjs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { strict as assert } from 'node:assert';
import { defer, Deferred, generatorify, createPoll, combine } from '../../build/index.js';
import {
defer,
Deferred,
generatorify,
createQueue,
combine,
} from '../../build/index.js';

assert(typeof defer === 'function');
assert(typeof Deferred === 'function');
assert(typeof generatorify === 'function');
assert(typeof createPoll === 'function');
assert(typeof createQueue === 'function');
assert(typeof combine === 'function');

console.log('MJS import test passed');
87 changes: 87 additions & 0 deletions src/__tests__/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { createQueue, Deferred, Status } from '../index';

const wait = (timeout: number, value: unknown) =>
new Promise((resolve) => setTimeout(resolve, timeout, value));

describe('createQueue test suite', () => {
it('should create a queue', async () => {
createQueue();
});

it('should push a value to the queue', async () => {
const value = Symbol('value');
const queue = createQueue();
const result = queue.push(value);

expect(result.status).toBe(Status.PENDING);
await expect(result.promise).resolves.toBe(value);
expect(result).toBeInstanceOf(Deferred);
expect(result.status).toBe(Status.RESOLVED);
});

it('should pull a value from the queue', async () => {
const value = Symbol('value');
const queue = createQueue();
const result = queue.pull();
await expect(Promise.race([wait(1, value), result.promise])).resolves.toBe(
value
);
expect(result).toBeInstanceOf(Deferred);
expect(result.status).toBe(Status.PENDING);
});

it('should pull a pushed value from the queue', async () => {
const value = Symbol('value');
const queue = createQueue();
queue.push(wait(1, value));
const result = queue.pull();
await expect(result.promise).resolves.toBe(value);
expect(result).toBeInstanceOf(Deferred);
expect(result.status).toBe(Status.RESOLVED);
expect(queue.size).toBe(0);
});

it('should pull a few pushed values from the queue', async () => {
const value1 = Symbol('value1');
const value2 = Symbol('value2');
const queue = createQueue();
queue.push(wait(10, value2));
queue.push(wait(0, value1));
const result1 = queue.pull();
const result2 = queue.pull();

expect(result1.status).toBe(Status.PENDING);
await expect(result1.promise).resolves.toBe(value1);
expect(result1.status).toBe(Status.RESOLVED);

expect(result2.status).toBe(Status.PENDING);
await expect(result2.promise).resolves.toBe(value2);
expect(result2.status).toBe(Status.RESOLVED);

expect(queue.size).toBe(0);
});

it('should pull a few not pushed values from the queue and wait them', async () => {
const value1 = Symbol('value1');
const value2 = Symbol('value2');
const queue = createQueue();

const result1 = queue.pull();
const result2 = queue.pull();

setTimeout(() => {
queue.push(wait(10, value2));
queue.push(wait(0, value1));
}, 100);

expect(result1.status).toBe(Status.PENDING);
await expect(result1.promise).resolves.toBe(value1);
expect(result1.status).toBe(Status.RESOLVED);

expect(result2.status).toBe(Status.PENDING);
await expect(result2.promise).resolves.toBe(value2);
expect(result2.status).toBe(Status.RESOLVED);

expect(queue.size).toBe(0);
});
});
10 changes: 5 additions & 5 deletions src/combine.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import { createPoll } from './deferredPoll.js';
import { createQueue } from './queue.js';

export const combine = <T, R = unknown>(...iterables: AsyncIterable<T>[]) => {
const poll = createPoll<IteratorResult<T, R>>();
const queue = createQueue<IteratorResult<T, R>>();

Promise.all(
iterables.map(async (iterable) => {
for await (const value of iterable) {
await poll.push({ value, done: false }).promise;
await queue.push({ value, done: false }).promise;
}
})
).then(async () => poll.done({ value: null, done: true }));
).then(async () => queue.done({ value: null, done: true }));

return {
[Symbol.asyncIterator]() {
return {
next() {
return poll.pull().promise;
return queue.pull().promise;
},
};
},
Expand Down
Loading

0 comments on commit b03f045

Please sign in to comment.