Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new pyarrow.fs filesystem objects #619

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions petastorm/arrow_reader_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from pyarrow.parquet import ParquetFile

from petastorm.cache import NullCache
from petastorm.compat import compat_piece_read, compat_table_columns_gen, compat_column_data
from petastorm.workers_pool import EmptyResultError
from petastorm.workers_pool.worker_base import WorkerBase

Expand All @@ -45,11 +44,12 @@ def read_next(self, workers_pool, schema, ngram):
# Convert arrow table columns into numpy. Strings are handled differently since to_pandas() returns
# numpy array of dtype=object.
result_dict = dict()
for column_name, column in compat_table_columns_gen(result_table):
for column_name in result_table.column_names:
column = result_table.column(column_name)
# Assume we get only one chunk since reader worker reads one rowgroup at a time

# `to_pandas` works slower when called on the entire `data` rather directly on a chunk.
if compat_column_data(result_table.column(0)).num_chunks == 1:
if result_table.column(0).num_chunks == 1:
column_as_pandas = column.data.chunks[0].to_pandas()
else:
column_as_pandas = column.data.to_pandas()
Expand Down Expand Up @@ -130,7 +130,7 @@ def process(self, piece_index, worker_predicate, shuffle_row_drop_partition):
self._dataset = pq.ParquetDataset(
self._dataset_path_or_paths,
filesystem=self._filesystem,
validate_schema=False, filters=self._arrow_filters)
validate_schema=False, filters=self._arrow_filters, use_legacy_dataset=True)

if self._dataset.partitions is None:
# When read from parquet file list, the `dataset.partitions` will be None.
Expand Down Expand Up @@ -287,13 +287,13 @@ def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_
partition_names = self._dataset.partitions.partition_names

# pyarrow would fail if we request a column names that the dataset is partitioned by
table = compat_piece_read(piece, lambda _: pq_file, columns=column_names - partition_names,
partitions=self._dataset.partitions)
table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions)

# Drop columns we did not explicitly request. This may happen when a table is partitioned. Besides columns
# requested, pyarrow will also return partition values. Having these unexpected fields will break some
# downstream code.
loaded_column_names = set(column[0] for column in compat_table_columns_gen(table))

loaded_column_names = set(table.column_names)
unasked_for_columns = loaded_column_names - column_names
if unasked_for_columns:
table = table.drop(unasked_for_columns)
Expand Down
80 changes: 0 additions & 80 deletions petastorm/compat.py

This file was deleted.

48 changes: 20 additions & 28 deletions petastorm/etl/dataset_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
from urllib.parse import urlparse

from packaging import version
from pyarrow import fs
from pyarrow import parquet as pq
from six.moves import cPickle as pickle

from petastorm import utils
from petastorm.compat import compat_get_metadata, compat_make_parquet_piece
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
from petastorm.fs_utils import FilesystemResolver, get_filesystem_and_path_or_paths, get_dataset_path
from petastorm.fs_utils import get_filesystem_and_path_or_paths, get_dataset_path
from petastorm.unischema import Unischema

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,15 +74,6 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
A user may provide their own recipe for creation of pyarrow filesystem object in ``filesystem_factory``
argument (otherwise, petastorm will create a default one based on the url).

The following example shows how a custom pyarrow HDFS filesystem, instantiated using ``libhdfs`` driver can be used
during Petastorm dataset generation:

>>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
>>> hdfs_driver='libhdfs')
>>> with materialize_dataset(..., filesystem_factory=resolver.filesystem_factory()):
>>> ...


:param spark: The spark session you are using
:param dataset_url: The dataset url to output your dataset to (e.g. ``hdfs:///path/to/dataset``)
:param schema: The :class:`petastorm.unischema.Unischema` definition of your dataset
Expand All @@ -97,18 +88,18 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
yield
# After job completes, add the unischema metadata and check for the metadata summary file
if filesystem_factory is None:
resolver = FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
user=spark.sparkContext.sparkUser())
filesystem_factory = resolver.filesystem_factory()
dataset_path = resolver.get_dataset_path()
filesystem, dataset_path = fs.FileSystem.from_uri(dataset_url)

def filesystem_factory():
return fs.FileSystem.from_uri(dataset_url)[0]
else:
dataset_path = get_dataset_path(urlparse(dataset_url))
filesystem = filesystem_factory()
filesystem = filesystem_factory()

dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
validate_schema=False)
validate_schema=False, use_legacy_dataset=True)

_generate_unischema_metadata(dataset, schema)
if not use_summary_metadata:
Expand All @@ -118,7 +109,7 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
validate_schema=False)
validate_schema=False, use_legacy_dataset=True)
try:
# Try to load the row groups, if it fails that means the metadata was not generated properly
load_row_groups(dataset)
Expand Down Expand Up @@ -229,7 +220,7 @@ def _generate_num_row_groups_per_file(dataset, spark_context, filesystem_factory
def get_row_group_info(path):
fs = filesystem_factory()
relative_path = os.path.relpath(path, base_path)
pq_file = fs.open(path)
pq_file = fs.open_input_file(path)
num_row_groups = pq.read_metadata(pq_file).num_row_groups
pq_file.close()
return relative_path, num_row_groups
Expand Down Expand Up @@ -286,8 +277,8 @@ def load_row_groups(dataset):
# This is not a real "piece" and we won't have row_groups_per_file recorded for it.
if row_groups_key != ".":
for row_group in range(row_groups_per_file[row_groups_key]):
rowgroups.append(compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys))
rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys))
return rowgroups


Expand Down Expand Up @@ -323,18 +314,18 @@ def _split_row_groups(dataset):
continue

for row_group in range(row_groups_per_file[relative_path]):
split_piece = compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys)
split_piece = pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys)
split_pieces.append(split_piece)

return split_pieces


def _split_piece(piece, fs_open):
metadata = compat_get_metadata(piece, fs_open)
return [compat_make_parquet_piece(piece.path, fs_open,
row_group=row_group,
partition_keys=piece.partition_keys)
metadata = piece.get_metadata()
return [pq.ParquetDatasetPiece(piece.path, open_file_func=fs_open,
row_group=row_group,
partition_keys=piece.partition_keys)
for row_group in range(metadata.num_row_groups)]


Expand Down Expand Up @@ -396,7 +387,8 @@ def get_schema_from_dataset_url(dataset_url_or_urls, hdfs_driver='libhdfs3'):
"""
fs, path_or_paths = get_filesystem_and_path_or_paths(dataset_url_or_urls, hdfs_driver)

dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10,
use_legacy_dataset=True)

# Get a unischema stored in the dataset metadata.
stored_schema = get_schema(dataset)
Expand Down
8 changes: 4 additions & 4 deletions petastorm/etl/metadata_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
from __future__ import print_function

import argparse

from pyarrow import fs
from pyarrow import parquet as pq

from petastorm.etl import dataset_metadata, rowgroup_indexing
from petastorm.fs_utils import FilesystemResolver

if __name__ == "__main__":

Expand All @@ -46,9 +47,8 @@
args.dataset_url = args.dataset_url[:-1]

# Create pyarrow file system
resolver = FilesystemResolver(args.dataset_url, hdfs_driver=args.hdfs_driver)
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
fs, path = fs.FileSystem.from_uri(args.dataset_url)
dataset = pq.ParquetDataset(path, filesystem=fs, validate_schema=False, use_legacy_dataset=True)

print_all = not args.schema and not args.index
if args.schema or print_all:
Expand Down
18 changes: 10 additions & 8 deletions petastorm/etl/petastorm_generate_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import sys
from pydoc import locate

from pyarrow import fs
from pyarrow import parquet as pq
from pyspark.sql import SparkSession

from petastorm.etl.dataset_metadata import materialize_dataset, get_schema, ROW_GROUPS_PER_FILE_KEY
from petastorm.etl.rowgroup_indexing import ROWGROUPS_INDEX_KEY
from petastorm.fs_utils import FilesystemResolver
from petastorm.unischema import Unischema
from petastorm.utils import add_to_dataset_metadata

Expand Down Expand Up @@ -60,13 +60,12 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_su
"""
sc = spark.sparkContext

resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration(), hdfs_driver=hdfs_driver,
user=spark.sparkContext.sparkUser())
fs = resolver.filesystem()
filesystem, path = fs.FileSystem.from_uri(dataset_url)
dataset = pq.ParquetDataset(
resolver.get_dataset_path(),
filesystem=fs,
validate_schema=False)
path,
filesystem=filesystem,
validate_schema=False,
use_legacy_dataset=True)

if unischema_class:
schema = locate(unischema_class)
Expand All @@ -85,8 +84,11 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_su
# overwriting the metadata to keep row group indexes and the old row group per file index
arrow_metadata = dataset.common_metadata or None

def filesystem_factory():
return fs.FileSystem.from_uri(dataset_url)[0]

with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata,
filesystem_factory=resolver.filesystem_factory()):
filesystem_factory=filesystem_factory):
if use_summary_metadata:
# Inside the materialize dataset context we just need to write the metadata file as the schema will
# be written by the context manager.
Expand Down
21 changes: 8 additions & 13 deletions petastorm/etl/rowgroup_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
import time
from collections import namedtuple

from pyarrow import fs
from pyarrow import parquet as pq
from six.moves import cPickle as pickle
from six.moves import range

from petastorm import utils
from petastorm.compat import compat_piece_read, compat_make_parquet_piece
from petastorm.etl import dataset_metadata
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
from petastorm.fs_utils import FilesystemResolver

logger = logging.getLogger(__name__)

Expand All @@ -50,10 +49,8 @@ def build_rowgroup_index(dataset_url, spark_context, indexers, hdfs_driver='libh
dataset_url = dataset_url[:-1]

# Create pyarrow file system
resolver = FilesystemResolver(dataset_url, spark_context._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver, user=spark_context.sparkUser())
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
filesystem, path = fs.FileSystem.from_uri(dataset_url)
dataset = pq.ParquetDataset(path, filesystem=filesystem, validate_schema=False, use_legacy_dataset=True)

split_pieces = dataset_metadata.load_row_groups(dataset)
schema = dataset_metadata.get_schema(dataset)
Expand Down Expand Up @@ -93,22 +90,20 @@ def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_d
libhdfs (java through JNI) or libhdfs3 (C++)
:return: list of indexers containing index data
"""
# Resolver in executor context will get hadoop config from environment
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
fs = resolver.filesystem()
filesystem, _ = fs.FileSystem.from_uri(dataset_url)

# Create pyarrow piece
piece = compat_make_parquet_piece(piece_info.path, fs.open, row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)
piece = pq.ParquetDatasetPiece(piece_info.path, open_file_func=filesystem.open_input_file,
row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)

# Collect column names needed for indexing
column_names = set()
for indexer in indexers:
column_names.update(indexer.column_names)

# Read columns needed for indexing
column_rows = compat_piece_read(piece, fs.open, columns=list(column_names),
partitions=partitions).to_pandas().to_dict('records')
column_rows = piece.read(columns=list(column_names), partitions=partitions).to_pandas().to_dict('records')

# Decode columns values
decoded_rows = [utils.decode_row(row, schema) for row in column_rows]
Expand Down
Loading