Skip to content

Commit

Permalink
feat: add p-limit to manage concurrency requests in SimpleDirectoryRe…
Browse files Browse the repository at this point in the history
…ader
  • Loading branch information
KindOfAScam committed May 15, 2024
1 parent 0437967 commit 239ec5c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/stale-dots-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"llamaindex": patch
---

add p-limit for enhanced concurrency managment
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"mongodb": "^6.6.1",
"notion-md-crawler": "^1.0.0",
"openai": "^4.43.0",
"p-limit": "^5.0.0",
"papaparse": "^5.4.1",
"pathe": "^1.1.2",
"pdf2json": "3.0.5",
Expand Down
88 changes: 40 additions & 48 deletions packages/core/src/readers/SimpleDirectoryReader.edge.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { CompleteFileSystem } from "@llamaindex/env";
import { defaultFS, path } from "@llamaindex/env";
import pLimit from "p-limit";
import { Document, type Metadata } from "../Node.js";
import { walk } from "../storage/FileSystem.js";
import { TextFileReader } from "./TextFileReader.js";
Expand Down Expand Up @@ -59,7 +60,6 @@ export class SimpleDirectoryReader implements BaseReader {
overrideReader,
} = params;

// Check if LlamaParseReader is used as the defaultReader and if so checks if numWorkers is in the valid range
if (numWorkers < 1 || numWorkers > 9) {
throw new Error("The number of workers must be between 1 - 9.");
}
Expand All @@ -84,8 +84,9 @@ export class SimpleDirectoryReader implements BaseReader {
overrideReader,
};

const workerPromises = Array.from({ length: numWorkers }, () =>
this.processFiles(filePathQueue, processFilesParams),
const limit = pLimit(numWorkers);
const workerPromises = filePathQueue.map((filePath) =>
limit(() => this.processFiles(filePath, processFilesParams)),
);

const results: Document[][] = await Promise.all(workerPromises);
Expand All @@ -98,63 +99,54 @@ export class SimpleDirectoryReader implements BaseReader {
}

private async processFiles(
filePathQueue: string[],
filePath: string,
params: ProcessFilesParams,
): Promise<Document[]> {
const docs: Document[] = [];

while (filePathQueue.length > 0) {
const filePath = filePathQueue.shift()!;
try {
const fileExt = path.extname(filePath).slice(1).toLowerCase();

try {
const fileExt = path.extname(filePath).slice(1).toLowerCase();

// Observer can decide to skip each file
if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) {
// Skip this file
continue;
}

let reader: BaseReader;

if (params.overrideReader) {
reader = params.overrideReader;
} else if (
params.fileExtToReader &&
fileExt in params.fileExtToReader
) {
reader = params.fileExtToReader[fileExt];
} else if (params.defaultReader != null) {
reader = params.defaultReader;
} else {
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);

// In an error condition, observer's false cancels the whole process.
if (
!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)
) {
return [];
}

continue;
}
// Observer can decide to skip each file
if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) {
// Skip this file
return [];
}

const fileDocs = await reader.loadData(filePath, params.fs);
fileDocs.forEach(addMetaData(filePath));
let reader: BaseReader;

// Observer can still cancel addition of the resulting docs from this file
if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
docs.push(...fileDocs);
}
} catch (e) {
const msg = `Error reading file ${filePath}: ${e}`;
console.error(msg);
if (params.overrideReader) {
reader = params.overrideReader;
} else if (params.fileExtToReader && fileExt in params.fileExtToReader) {
reader = params.fileExtToReader[fileExt];
} else if (params.defaultReader != null) {
reader = params.defaultReader;
} else {
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);

// In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return [];
}

return [];
}

const fileDocs = await reader.loadData(filePath, params.fs);
fileDocs.forEach(addMetaData(filePath));

// Observer can still cancel addition of the resulting docs from this file
if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
docs.push(...fileDocs);
}
} catch (e) {
const msg = `Error reading file ${filePath}: ${e}`;
console.error(msg);

// In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return [];
}
}

Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 239ec5c

Please sign in to comment.