Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add parallel processing to SimpleDirectoryReader #830

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stale-dots-search.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"llamaindex": patch
---

add concurrency managment for SimpleDirectoryReader
Binary file added examples/data/parallel/brk-2022-001-024.pdf
Binary file not shown.
Binary file added examples/data/parallel/brk-2022-025-048.pdf
Binary file not shown.
Binary file added examples/data/parallel/brk-2022-049-072.pdf
Binary file not shown.
Binary file added examples/data/parallel/brk-2022-073-096.pdf
Binary file not shown.
Binary file added examples/data/parallel/brk-2022-097-120.pdf
Binary file not shown.
Binary file added examples/data/parallel/brk-2022-121-144.pdf
Binary file not shown.
2 changes: 1 addition & 1 deletion examples/readers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"start:pdf": "node --import tsx ./src/pdf.ts",
"start:llamaparse": "node --import tsx ./src/llamaparse.ts",
"start:notion": "node --import tsx ./src/notion.ts",
"start:llamaparse2": "node --import tsx ./src/llamaparse_2.ts"
"start:llamaparse-dir": "node --import tsx ./src/simple-directory-reader-with-llamaparse.ts"
},
"dependencies": {
"llamaindex": "*"
Expand Down
26 changes: 0 additions & 26 deletions examples/readers/src/llamaparse_2.ts

This file was deleted.

35 changes: 35 additions & 0 deletions examples/readers/src/simple-directory-reader-with-llamaparse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import {
LlamaParseReader,
SimpleDirectoryReader,
VectorStoreIndex,
} from "llamaindex";

async function main() {
const reader = new SimpleDirectoryReader();

const docs = await reader.loadData({
directoryPath: "../data/parallel", // brk-2022.pdf split into 6 parts
numWorkers: 2,
// set LlamaParse as the default reader for all file types. Set apiKey here or in environment variable LLAMA_CLOUD_API_KEY
overrideReader: new LlamaParseReader({
language: "en",
resultType: "markdown",
parsingInstruction:
"The provided files is Berkshire Hathaway's 2022 Annual Report. They contain figures, tables and raw data. Capture the data in a structured format. Mathematical equation should be put out as LATEX markdown (between $$).",
}),
});

const index = await VectorStoreIndex.fromDocuments(docs);

// Query the index
const queryEngine = index.asQueryEngine();
const response = await queryEngine.query({
query:
"What is the general strategy for shareholder safety outlined in the report? Use a concrete example with numbers",
});

// Output response
console.log(response.toString());
}

main().catch(console.error);
3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
"md-utils-ts": "^2.0.0",
"mongodb": "^6.6.1",
"notion-md-crawler": "^1.0.0",
"openai": "^4.46.0",
"openai": "^4.43.0",
"p-limit": "^5.0.0",
"papaparse": "^5.4.1",
"pathe": "^1.1.2",
"pdf2json": "3.0.5",
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/readers/LlamaParseReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export class LlamaParseReader implements FileReader {
language: Language = "en";
// The parsing instruction for the parser.
parsingInstruction: string = "";
// numWorkers is implemented in SimpleDirectoryReader

constructor(params: Partial<LlamaParseReader> = {}) {
Object.assign(this, params);
Expand All @@ -86,6 +87,10 @@ export class LlamaParseReader implements FileReader {
const data = await fs.readFile(file);
const mimeType = await this.getMimeType(data);

if (this.verbose) {
console.log(`Starting load for file: ${file}`);
}

const body = new FormData();
body.set("file", new Blob([data], { type: mimeType }), file);
body.append("language", this.language);
Expand Down
119 changes: 82 additions & 37 deletions packages/core/src/readers/SimpleDirectoryReader.edge.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { fs, path } from "@llamaindex/env";
import pLimit from "p-limit";
import { Document, type Metadata } from "../Node.js";
import { walk } from "../storage/FileSystem.js";
import { TextFileReader } from "./TextFileReader.js";
Expand All @@ -18,10 +19,21 @@ enum ReaderStatus {

export type SimpleDirectoryReaderLoadDataParams = {
directoryPath: string;
// Fallback Reader, defaults to TextFileReader
defaultReader?: BaseReader | null;
// Overrides the reader for specific file extensions
fileExtToReader?: Record<string, BaseReader>;
// Defines the amount of "workers" to use. Must be between 1 and 9
numWorkers?: number;
// Overrides the reader for all file extension
overrideReader?: BaseReader;
};

type ProcessFileParams = Omit<
SimpleDirectoryReaderLoadDataParams,
"directoryPath"
>;

/**
* Read all the documents in a directory.
* By default, supports the list of file types
Expand All @@ -45,67 +57,100 @@ export class SimpleDirectoryReader implements BaseReader {
directoryPath,
defaultReader = new TextFileReader(),
fileExtToReader,
numWorkers = 1,
overrideReader,
} = params;

if (numWorkers < 1 || numWorkers > 9) {
throw new Error("The number of workers must be between 1 - 9.");
}

// Observer can decide to skip the directory
if (
!this.doObserverCheck("directory", directoryPath, ReaderStatus.STARTED)
) {
return [];
}

const docs: Document[] = [];
// Creates a Queue of filePaths to be accessed by each individual worker
const filePathQueue: string[] = [];

for await (const filePath of walk(directoryPath)) {
try {
const fileExt = path.extname(filePath).slice(1).toLowerCase();
filePathQueue.push(filePath);
}

// Observer can decide to skip each file
if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) {
// Skip this file
continue;
}
const processFileParams: ProcessFileParams = {
defaultReader,
fileExtToReader,
overrideReader,
};

let reader: BaseReader;
// Uses pLimit to control the number of parallel requests
const limit = pLimit(numWorkers);
const workerPromises = filePathQueue.map((filePath) =>
limit(() => this.processFile(filePath, processFileParams)),
);

if (fileExtToReader && fileExt in fileExtToReader) {
reader = fileExtToReader[fileExt];
} else if (defaultReader != null) {
reader = defaultReader;
} else {
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);
const results: Document[][] = await Promise.all(workerPromises);

// In an error condition, observer's false cancels the whole process.
if (
!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)
) {
return [];
}
// After successful import of all files, directory completion
// is only a notification for observer, cannot be cancelled.
this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE);

continue;
}
return results.flat();
}

private async processFile(
filePath: string,
params: ProcessFileParams,
): Promise<Document[]> {
const docs: Document[] = [];

const fileDocs = await reader.loadData(filePath, fs);
fileDocs.forEach(addMetaData(filePath));
try {
const fileExt = path.extname(filePath).slice(1).toLowerCase();

// Observer can still cancel addition of the resulting docs from this file
if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
docs.push(...fileDocs);
}
} catch (e) {
const msg = `Error reading file ${filePath}: ${e}`;
console.error(msg);
// Observer can decide to skip each file
if (!this.doObserverCheck("file", filePath, ReaderStatus.STARTED)) {
// Skip this file
return [];
}

let reader: BaseReader;

if (params.overrideReader) {
reader = params.overrideReader;
} else if (params.fileExtToReader && fileExt in params.fileExtToReader) {
reader = params.fileExtToReader[fileExt];
} else if (params.defaultReader != null) {
reader = params.defaultReader;
} else {
const msg = `No reader for file extension of ${filePath}`;
console.warn(msg);

// In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return [];
}

return [];
}
}

// After successful import of all files, directory completion
// is only a notification for observer, cannot be cancelled.
this.doObserverCheck("directory", directoryPath, ReaderStatus.COMPLETE);
const fileDocs = await reader.loadData(filePath, fs);
fileDocs.forEach(addMetaData(filePath));

// Observer can still cancel addition of the resulting docs from this file
if (this.doObserverCheck("file", filePath, ReaderStatus.COMPLETE)) {
docs.push(...fileDocs);
}
} catch (e) {
const msg = `Error reading file ${filePath}: ${e}`;
console.error(msg);

// In an error condition, observer's false cancels the whole process.
if (!this.doObserverCheck("file", filePath, ReaderStatus.ERROR, msg)) {
return [];
}
}

return docs;
}
Expand Down
7 changes: 5 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.