From f590539ba0b4c0bbb800e1f0b81716c9ae7b107d Mon Sep 17 00:00:00 2001 From: Datendrache Date: Thu, 12 Oct 2023 12:41:17 -0600 Subject: [PATCH] Datendrache; crawl and ingest --- crawl.py | 91 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ ingest.py | 68 +++++++++++++++++++++++++++-------------- 2 files changed, 136 insertions(+), 23 deletions(-) create mode 100644 crawl.py diff --git a/crawl.py b/crawl.py new file mode 100644 index 00000000..2ab67e1e --- /dev/null +++ b/crawl.py @@ -0,0 +1,91 @@ +import os +import shutil +import click +import subprocess + +from constants import ( + DOCUMENT_MAP, + SOURCE_DIRECTORY +) + +def logToFile(logentry): + file1 = open("crawl.log","a") + file1.write(logentry + "\n") + file1.close() + print(logentry + "\n") + +@click.command() +@click.option( + "--device_type", + default="cuda", + type=click.Choice( + [ + "cpu", + "cuda", + "ipu", + "xpu", + "mkldnn", + "opengl", + "opencl", + "ideep", + "hip", + "ve", + "fpga", + "ort", + "xla", + "lazy", + "vulkan", + "mps", + "meta", + "hpu", + "mtia", + ], + ), + help="Device to run on. (Default is cuda)", +) +@click.option( + "--landing_directory", + default="./LANDING_DOCUMENTS" +) +@click.option( + "--processed_directory", + default="./PROCESSED_DOCUMENTS" +) +@click.option( + "--error_directory", + default="./ERROR_DOCUMENTS" +) +@click.option( + "--unsupported_directory", + default="./UNSUPPORTED_DOCUMENTS" +) + +def main(device_type, landing_directory, processed_directory, error_directory, unsupported_directory): + paths = [] + + os.makedirs(processed_directory, exist_ok=True) + os.makedirs(error_directory, exist_ok=True) + os.makedirs(unsupported_directory, exist_ok=True) + + for root, _, files in os.walk(landing_directory): + for file_name in files: + file_extension = os.path.splitext(file_name)[1] + short_filename = os.path.basename(file_name) + + if not os.path.isdir(root + "/" + file_name): + if file_extension in DOCUMENT_MAP.keys(): + shutil.move(root + "/" + file_name, SOURCE_DIRECTORY+ "/" + short_filename) + logToFile("START: " + root + "/" + short_filename) + process = subprocess.Popen("python ingest.py --device_type=" + device_type, shell=True, stdout=subprocess.PIPE) + process.wait() + if process.returncode > 0: + shutil.move(SOURCE_DIRECTORY + "/" + short_filename, error_directory + "/" + short_filename) + logToFile("ERROR: " + root + "/" + short_filename) + else: + logToFile("VALID: " + root + "/" + short_filename) + shutil.move(SOURCE_DIRECTORY + "/" + short_filename, processed_directory+ "/" + short_filename) + else: + shutil.move(root + "/" + file_name, unsupported_directory+ "/" + short_filename) + +if __name__ == "__main__": + main() diff --git a/ingest.py b/ingest.py index 0cfe20bc..5819e0f8 100644 --- a/ingest.py +++ b/ingest.py @@ -18,17 +18,27 @@ SOURCE_DIRECTORY, ) +def file_log(logentry): + file1 = open("file_ingest.log","a") + file1.write(logentry + "\n") + file1.close() + print(logentry + "\n") def load_single_document(file_path: str) -> Document: # Loads a single document from a file path - file_extension = os.path.splitext(file_path)[1] - loader_class = DOCUMENT_MAP.get(file_extension) - if loader_class: - loader = loader_class(file_path) - else: - raise ValueError("Document type is undefined") - return loader.load()[0] - + try: + file_extension = os.path.splitext(file_path)[1] + loader_class = DOCUMENT_MAP.get(file_extension) + if loader_class: + file_log(file_path + ' loaded.') + loader = loader_class(file_path) + else: + file_log(file_path + ' document type is undefined.') + raise ValueError("Document type is undefined") + return loader.load()[0] + except Exception as ex: + file_log('%s loading error: \n%s' % (file_path, ex)) + return None def load_document_batch(filepaths): logging.info("Loading document batch") @@ -37,9 +47,13 @@ def load_document_batch(filepaths): # load files futures = [exe.submit(load_single_document, name) for name in filepaths] # collect data - data_list = [future.result() for future in futures] - # return data and file paths - return (data_list, filepaths) + if futures is None: + file_log(name + ' failed to submit') + return None + else: + data_list = [future.result() for future in futures] + # return data and file paths + return (data_list, filepaths) def load_documents(source_dir: str) -> list[Document]: @@ -47,6 +61,7 @@ def load_documents(source_dir: str) -> list[Document]: paths = [] for root, _, files in os.walk(source_dir): for file_name in files: + print('Importing: ' + file_name) file_extension = os.path.splitext(file_name)[1] source_file_path = os.path.join(root, file_name) if file_extension in DOCUMENT_MAP.keys(): @@ -63,14 +78,22 @@ def load_documents(source_dir: str) -> list[Document]: # select a chunk of filenames filepaths = paths[i : (i + chunksize)] # submit the task - future = executor.submit(load_document_batch, filepaths) - futures.append(future) + try: + future = executor.submit(load_document_batch, filepaths) + except Exception as ex: + file_log('executor task failed: %s' % (ex)) + future = None + if future is not None: + futures.append(future) # process all results for future in as_completed(futures): # open the file and load the data - contents, _ = future.result() - docs.extend(contents) - + try: + contents, _ = future.result() + docs.extend(contents) + except Exception as ex: + file_log('Exception: %s' % (ex)) + return docs @@ -78,12 +101,12 @@ def split_documents(documents: list[Document]) -> tuple[list[Document], list[Doc # Splits documents for correct Text Splitter text_docs, python_docs = [], [] for doc in documents: - file_extension = os.path.splitext(doc.metadata["source"])[1] - if file_extension == ".py": - python_docs.append(doc) - else: - text_docs.append(doc) - + if doc is not None: + file_extension = os.path.splitext(doc.metadata["source"])[1] + if file_extension == ".py": + python_docs.append(doc) + else: + text_docs.append(doc) return text_docs, python_docs @@ -147,7 +170,6 @@ def main(device_type): embeddings, persist_directory=PERSIST_DIRECTORY, client_settings=CHROMA_SETTINGS, - )