Skip to content

Commit

Permalink
Merge pull request #580 from datendrache/crawl
Browse files Browse the repository at this point in the history
Datendrache; crawl and ingest
  • Loading branch information
PromtEngineer committed Oct 20, 2023
2 parents 573c903 + f590539 commit 1aa6d14
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 23 deletions.
91 changes: 91 additions & 0 deletions crawl.py
@@ -0,0 +1,91 @@
import os
import shutil
import click
import subprocess

from constants import (
DOCUMENT_MAP,
SOURCE_DIRECTORY
)

def logToFile(logentry):
file1 = open("crawl.log","a")
file1.write(logentry + "\n")
file1.close()
print(logentry + "\n")

@click.command()
@click.option(
"--device_type",
default="cuda",
type=click.Choice(
[
"cpu",
"cuda",
"ipu",
"xpu",
"mkldnn",
"opengl",
"opencl",
"ideep",
"hip",
"ve",
"fpga",
"ort",
"xla",
"lazy",
"vulkan",
"mps",
"meta",
"hpu",
"mtia",
],
),
help="Device to run on. (Default is cuda)",
)
@click.option(
"--landing_directory",
default="./LANDING_DOCUMENTS"
)
@click.option(
"--processed_directory",
default="./PROCESSED_DOCUMENTS"
)
@click.option(
"--error_directory",
default="./ERROR_DOCUMENTS"
)
@click.option(
"--unsupported_directory",
default="./UNSUPPORTED_DOCUMENTS"
)

def main(device_type, landing_directory, processed_directory, error_directory, unsupported_directory):
paths = []

os.makedirs(processed_directory, exist_ok=True)
os.makedirs(error_directory, exist_ok=True)
os.makedirs(unsupported_directory, exist_ok=True)

for root, _, files in os.walk(landing_directory):
for file_name in files:
file_extension = os.path.splitext(file_name)[1]
short_filename = os.path.basename(file_name)

if not os.path.isdir(root + "/" + file_name):
if file_extension in DOCUMENT_MAP.keys():
shutil.move(root + "/" + file_name, SOURCE_DIRECTORY+ "/" + short_filename)
logToFile("START: " + root + "/" + short_filename)
process = subprocess.Popen("python ingest.py --device_type=" + device_type, shell=True, stdout=subprocess.PIPE)
process.wait()
if process.returncode > 0:
shutil.move(SOURCE_DIRECTORY + "/" + short_filename, error_directory + "/" + short_filename)
logToFile("ERROR: " + root + "/" + short_filename)
else:
logToFile("VALID: " + root + "/" + short_filename)
shutil.move(SOURCE_DIRECTORY + "/" + short_filename, processed_directory+ "/" + short_filename)
else:
shutil.move(root + "/" + file_name, unsupported_directory+ "/" + short_filename)

if __name__ == "__main__":
main()
68 changes: 45 additions & 23 deletions ingest.py
Expand Up @@ -18,17 +18,27 @@
SOURCE_DIRECTORY,
)

def file_log(logentry):
file1 = open("file_ingest.log","a")
file1.write(logentry + "\n")
file1.close()
print(logentry + "\n")

def load_single_document(file_path: str) -> Document:
# Loads a single document from a file path
file_extension = os.path.splitext(file_path)[1]
loader_class = DOCUMENT_MAP.get(file_extension)
if loader_class:
loader = loader_class(file_path)
else:
raise ValueError("Document type is undefined")
return loader.load()[0]

try:
file_extension = os.path.splitext(file_path)[1]
loader_class = DOCUMENT_MAP.get(file_extension)
if loader_class:
file_log(file_path + ' loaded.')
loader = loader_class(file_path)
else:
file_log(file_path + ' document type is undefined.')
raise ValueError("Document type is undefined")
return loader.load()[0]
except Exception as ex:
file_log('%s loading error: \n%s' % (file_path, ex))
return None

def load_document_batch(filepaths):
logging.info("Loading document batch")
Expand All @@ -37,16 +47,21 @@ def load_document_batch(filepaths):
# load files
futures = [exe.submit(load_single_document, name) for name in filepaths]
# collect data
data_list = [future.result() for future in futures]
# return data and file paths
return (data_list, filepaths)
if futures is None:
file_log(name + ' failed to submit')
return None
else:
data_list = [future.result() for future in futures]
# return data and file paths
return (data_list, filepaths)


def load_documents(source_dir: str) -> list[Document]:
# Loads all documents from the source documents directory, including nested folders
paths = []
for root, _, files in os.walk(source_dir):
for file_name in files:
print('Importing: ' + file_name)
file_extension = os.path.splitext(file_name)[1]
source_file_path = os.path.join(root, file_name)
if file_extension in DOCUMENT_MAP.keys():
Expand All @@ -63,27 +78,35 @@ def load_documents(source_dir: str) -> list[Document]:
# select a chunk of filenames
filepaths = paths[i : (i + chunksize)]
# submit the task
future = executor.submit(load_document_batch, filepaths)
futures.append(future)
try:
future = executor.submit(load_document_batch, filepaths)
except Exception as ex:
file_log('executor task failed: %s' % (ex))
future = None
if future is not None:
futures.append(future)
# process all results
for future in as_completed(futures):
# open the file and load the data
contents, _ = future.result()
docs.extend(contents)

try:
contents, _ = future.result()
docs.extend(contents)
except Exception as ex:
file_log('Exception: %s' % (ex))

return docs


def split_documents(documents: list[Document]) -> tuple[list[Document], list[Document]]:
# Splits documents for correct Text Splitter
text_docs, python_docs = [], []
for doc in documents:
file_extension = os.path.splitext(doc.metadata["source"])[1]
if file_extension == ".py":
python_docs.append(doc)
else:
text_docs.append(doc)

if doc is not None:
file_extension = os.path.splitext(doc.metadata["source"])[1]
if file_extension == ".py":
python_docs.append(doc)
else:
text_docs.append(doc)
return text_docs, python_docs


Expand Down Expand Up @@ -147,7 +170,6 @@ def main(device_type):
embeddings,
persist_directory=PERSIST_DIRECTORY,
client_settings=CHROMA_SETTINGS,

)


Expand Down

0 comments on commit 1aa6d14

Please sign in to comment.