Skip to content

Commit

Permalink
FIX-modin-project#5723: Attempt to read list of parquet files in one go.
Browse files Browse the repository at this point in the history
Signed-off-by: mvashishtha <[email protected]>
  • Loading branch information
mvashishtha committed Mar 1, 2023
1 parent d724802 commit 74a6360
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 55 deletions.
133 changes: 78 additions & 55 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from fsspec.spec import AbstractBufferedFile
import numpy as np
from packaging import version
from pandas.api.types import is_list_like
import s3fs

from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.config import NPartitions
Expand All @@ -40,7 +42,7 @@ class ColumnStoreDataset:
Attributes
----------
path : str, path object or file-like object
paths : list-like of str, path object or file-like object
The filepath of the parquet file in local filesystem or hdfs.
storage_options : dict
Parameters for specific storage engine.
Expand All @@ -60,10 +62,13 @@ class ColumnStoreDataset:
List that contains the full paths of the parquet files in the dataset.
"""

def __init__(self, path, storage_options): # noqa : PR01
self.path = path.__fspath__() if isinstance(path, os.PathLike) else path
def __init__(self, paths, storage_options): # noqa : PR01
self.paths = [
path.__fspath__() if isinstance(path, os.PathLike) else path
for path in paths
]
self.storage_options = storage_options
self._fs_path = None
self._fs_paths = None
self._fs = None
self.dataset = self._init_dataset()
self._row_groups_per_file = None
Expand Down Expand Up @@ -107,14 +112,18 @@ def fs(self):
Filesystem object.
"""
if self._fs is None:
if isinstance(self.path, AbstractBufferedFile):
self._fs = self.path.fs
if isinstance(self.paths[0], AbstractBufferedFile):
self._fs = self.paths[0].fs
else:
self._fs, self._fs_path = url_to_fs(self.path, **self.storage_options)
self._fs_paths = [
url_to_fs(path, **self.storage_options)[1] for path in self.paths
]
# If given list of paths, can assume they are all on the same filesystem
self._fs = url_to_fs(self.paths[0])[0]
return self._fs

@property
def fs_path(self):
def fs_paths(self):
"""
Return the filesystem-specific path or file handle.
Expand All @@ -123,12 +132,18 @@ def fs_path(self):
fs_path : str, path object or file-like object
String path specific to filesystem or a file handle.
"""
if self._fs_path is None:
if isinstance(self.path, AbstractBufferedFile):
self._fs_path = self.path
if self._fs_paths is None:
if len(self.paths) > 1:
self._fs_paths = self.paths
elif isinstance(self.paths[0], AbstractBufferedFile):
self._fs_paths = [self.paths[0]]
else:
self._fs, self._fs_path = url_to_fs(self.path, **self.storage_options)
return self._fs_path
self._fs_paths = [
url_to_fs(path, **self.storage_options)[1] for path in self.paths
]
# If given list of paths, can assume they are all on the same filesystem
self._fs = url_to_fs(self.paths[0])[0]
return self._fs_paths

def to_pandas_dataframe(self, columns):
"""
Expand Down Expand Up @@ -166,8 +181,8 @@ def _unstrip_protocol(protocol, path):
return path
return f"{protos[0]}://{path}"

if isinstance(self.path, AbstractBufferedFile):
return [self.path]
if len(self.paths) == 1 and isinstance(self.paths[0], AbstractBufferedFile):
return [self.paths[0]]
# version.parse() is expensive, so we can split this into two separate loops
if version.parse(fsspec.__version__) < version.parse("2022.5.0"):
fs_files = [_unstrip_protocol(self.fs.protocol, fpath) for fpath in files]
Expand All @@ -183,7 +198,7 @@ def _init_dataset(self): # noqa: GL08
from pyarrow.parquet import ParquetDataset

return ParquetDataset(
self.fs_path, filesystem=self.fs, use_legacy_dataset=False
self.fs_paths, filesystem=self.fs, use_legacy_dataset=False
)

@property
Expand Down Expand Up @@ -231,7 +246,7 @@ def to_pandas_dataframe(
from pyarrow.parquet import read_table

return read_table(
self._fs_path, columns=columns, filesystem=self.fs
self._fs_paths, columns=columns, filesystem=self.fs
).to_pandas()


Expand All @@ -240,7 +255,7 @@ class FastParquetDataset(ColumnStoreDataset):
def _init_dataset(self): # noqa: GL08
from fastparquet import ParquetFile

return ParquetFile(self.fs_path, fs=self.fs)
return ParquetFile(self.fs_paths, fs=self.fs)

@property
def pandas_metadata(self):
Expand Down Expand Up @@ -285,12 +300,13 @@ def _get_fastparquet_files(self): # noqa: GL08
# have to copy some of their logic here while we work on getting
# an easier method to get a list of valid files.
# See: https://github.com/dask/fastparquet/issues/795
if "*" in self.path:
if "*" in self.paths:
files = self.fs.glob(self.path)
else:
files = [
f
for f in self.fs.find(self.path)
for path in self.paths
for f in self.fs.find(path)
if f.endswith(".parquet") or f.endswith(".parq")
]
return files
Expand Down Expand Up @@ -612,41 +628,49 @@ def _read(cls, path, engine, columns, **kwargs):
ParquetFile API is used. Please refer to the documentation here
https://arrow.apache.org/docs/python/parquet.html
"""
if isinstance(path, str):
if os.path.isdir(path):
path_generator = os.walk(path)
else:
storage_options = kwargs.get("storage_options")
if storage_options is not None:
fs, fs_path = url_to_fs(path, **storage_options)
# s3fs.core.S3File is considered list-like
path_list = (
path
if is_list_like(path) and not isinstance(path, s3fs.core.S3File)
else [path]
)
for path in path_list:
if isinstance(path, str):
if os.path.isdir(path):
path_generator = os.walk(path)
else:
fs, fs_path = url_to_fs(path)
path_generator = fs.walk(fs_path)
partitioned_columns = set()
# We do a tree walk of the path directory because partitioned
# parquet directories have a unique column at each directory level.
# Thus, we can use os.walk(), which does a dfs search, to walk
# through the different columns that the data is partitioned on
for _, dir_names, files in path_generator:
if dir_names:
partitioned_columns.add(dir_names[0].split("=")[0])
if files:
# Metadata files, git files, .DSStore
# TODO: fix conditional for column partitioning, see issue #4637
if len(files[0]) > 0 and files[0][0] == ".":
continue
break
partitioned_columns = list(partitioned_columns)
if len(partitioned_columns):
return cls.single_worker_read(
path,
engine=engine,
columns=columns,
reason="Mixed partitioning columns in Parquet",
**kwargs,
)

dataset = cls.get_dataset(path, engine, kwargs.get("storage_options") or {})
storage_options = kwargs.get("storage_options")
if storage_options is not None:
fs, fs_path = url_to_fs(path, **storage_options)
else:
fs, fs_path = url_to_fs(path)
path_generator = fs.walk(fs_path)
partitioned_columns = set()
# We do a tree walk of the path directory because partitioned
# parquet directories have a unique column at each directory level.
# Thus, we can use os.walk(), which does a dfs search, to walk
# through the different columns that the data is partitioned on
for _, dir_names, files in path_generator:
if dir_names:
partitioned_columns.add(dir_names[0].split("=")[0])
if files:
# Metadata files, git files, .DSStore
# TODO: fix conditional for column partitioning, see issue #4637
if len(files[0]) > 0 and files[0][0] == ".":
continue
break
partitioned_columns = list(partitioned_columns)
if len(partitioned_columns):
return cls.single_worker_read(
path,
engine=engine,
columns=columns,
reason="Mixed partitioning columns in Parquet",
**kwargs,
)
dataset = cls.get_dataset(
path_list, engine, kwargs.get("storage_options") or {}
)
index_columns = (
dataset.pandas_metadata.get("index_columns", [])
if dataset.pandas_metadata
Expand All @@ -659,5 +683,4 @@ def _read(cls, path, engine, columns, **kwargs):
for c in column_names
if c not in index_columns and not cls.index_regex.match(c)
]

return cls.build_query_compiler(dataset, columns, index_columns, **kwargs)
14 changes: 14 additions & 0 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,20 @@ def test_read_parquet(
columns=columns,
)

def test_read_parquet_list_of_files_5698(self, engine, make_parquet_file):
with ensure_clean(".parquet") as f1, ensure_clean(
".parquet"
) as f2, ensure_clean(".parquet") as f3:
for f in [f1, f2, f3]:
make_parquet_file(filename=f)
eval_io(fn_name="read_parquet", path=[f1, f2, f3], engine=engine)

def test_empty_list(self, engine):
eval_io(fn_name="read_parquet", path=[], engine=engine)

def test_list_of_s3_file(self, engine):
raise NotImplementedError

@pytest.mark.xfail(
condition="config.getoption('--simulate-cloud').lower() != 'off'",
reason="The reason of tests fail in `cloud` mode is unknown for now - issue #3264",
Expand Down

0 comments on commit 74a6360

Please sign in to comment.