Skip to content

Commit

Permalink
Deprecate custom FileSystemResolver and user pyarrow.fs from_uri impl…
Browse files Browse the repository at this point in the history
…ementation.
  • Loading branch information
Yevgeni Litvin committed Nov 7, 2020
1 parent 675fed3 commit 6d9892a
Show file tree
Hide file tree
Showing 17 changed files with 76 additions and 259 deletions.
2 changes: 1 addition & 1 deletion petastorm/arrow_reader_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def process(self, piece_index, worker_predicate, shuffle_row_drop_partition):
self._dataset = pq.ParquetDataset(
self._dataset_path_or_paths,
filesystem=self._filesystem,
validate_schema=False, filters=self._arrow_filters)
validate_schema=False, filters=self._arrow_filters, use_legacy_dataset=True)

if self._dataset.partitions is None:
# When read from parquet file list, the `dataset.partitions` will be None.
Expand Down
31 changes: 12 additions & 19 deletions petastorm/etl/dataset_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
from urllib.parse import urlparse

from packaging import version
from pyarrow import fs
from pyarrow import parquet as pq
from six.moves import cPickle as pickle

from petastorm import utils
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
from petastorm.fs_utils import FilesystemResolver, get_filesystem_and_path_or_paths, get_dataset_path
from petastorm.fs_utils import get_filesystem_and_path_or_paths, get_dataset_path
from petastorm.unischema import Unischema

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,15 +74,6 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
A user may provide their own recipe for creation of pyarrow filesystem object in ``filesystem_factory``
argument (otherwise, petastorm will create a default one based on the url).
The following example shows how a custom pyarrow HDFS filesystem, instantiated using ``libhdfs`` driver can be used
during Petastorm dataset generation:
>>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
>>> hdfs_driver='libhdfs')
>>> with materialize_dataset(..., filesystem_factory=resolver.filesystem_factory()):
>>> ...
:param spark: The spark session you are using
:param dataset_url: The dataset url to output your dataset to (e.g. ``hdfs:///path/to/dataset``)
:param schema: The :class:`petastorm.unischema.Unischema` definition of your dataset
Expand All @@ -96,18 +88,18 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
yield
# After job completes, add the unischema metadata and check for the metadata summary file
if filesystem_factory is None:
resolver = FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
user=spark.sparkContext.sparkUser())
filesystem_factory = resolver.filesystem_factory()
dataset_path = resolver.get_dataset_path()
filesystem, dataset_path = fs.FileSystem.from_uri(dataset_url)

def filesystem_factory():
return fs.FileSystem.from_uri(dataset_url)[0]
else:
dataset_path = get_dataset_path(urlparse(dataset_url))
filesystem = filesystem_factory()
filesystem = filesystem_factory()

dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
validate_schema=False)
validate_schema=False, use_legacy_dataset=True)

_generate_unischema_metadata(dataset, schema)
if not use_summary_metadata:
Expand All @@ -117,7 +109,7 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
validate_schema=False)
validate_schema=False, use_legacy_dataset=True)
try:
# Try to load the row groups, if it fails that means the metadata was not generated properly
load_row_groups(dataset)
Expand Down Expand Up @@ -228,7 +220,7 @@ def _generate_num_row_groups_per_file(dataset, spark_context, filesystem_factory
def get_row_group_info(path):
fs = filesystem_factory()
relative_path = os.path.relpath(path, base_path)
pq_file = fs.open(path)
pq_file = fs.open_input_file(path)
num_row_groups = pq.read_metadata(pq_file).num_row_groups
pq_file.close()
return relative_path, num_row_groups
Expand Down Expand Up @@ -395,7 +387,8 @@ def get_schema_from_dataset_url(dataset_url_or_urls, hdfs_driver='libhdfs3'):
"""
fs, path_or_paths = get_filesystem_and_path_or_paths(dataset_url_or_urls, hdfs_driver)

dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10,
use_legacy_dataset=True)

# Get a unischema stored in the dataset metadata.
stored_schema = get_schema(dataset)
Expand Down
8 changes: 4 additions & 4 deletions petastorm/etl/metadata_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
from __future__ import print_function

import argparse

from pyarrow import fs
from pyarrow import parquet as pq

from petastorm.etl import dataset_metadata, rowgroup_indexing
from petastorm.fs_utils import FilesystemResolver

if __name__ == "__main__":

Expand All @@ -46,9 +47,8 @@
args.dataset_url = args.dataset_url[:-1]

# Create pyarrow file system
resolver = FilesystemResolver(args.dataset_url, hdfs_driver=args.hdfs_driver)
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
fs, path = fs.FileSystem.from_uri(args.dataset_url)
dataset = pq.ParquetDataset(path, filesystem=fs, validate_schema=False, use_legacy_dataset=True)

print_all = not args.schema and not args.index
if args.schema or print_all:
Expand Down
18 changes: 10 additions & 8 deletions petastorm/etl/petastorm_generate_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import sys
from pydoc import locate

from pyarrow import fs
from pyarrow import parquet as pq
from pyspark.sql import SparkSession

from petastorm.etl.dataset_metadata import materialize_dataset, get_schema, ROW_GROUPS_PER_FILE_KEY
from petastorm.etl.rowgroup_indexing import ROWGROUPS_INDEX_KEY
from petastorm.fs_utils import FilesystemResolver
from petastorm.unischema import Unischema
from petastorm.utils import add_to_dataset_metadata

Expand Down Expand Up @@ -60,13 +60,12 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_su
"""
sc = spark.sparkContext

resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration(), hdfs_driver=hdfs_driver,
user=spark.sparkContext.sparkUser())
fs = resolver.filesystem()
filesystem, path = fs.FileSystem.from_uri(dataset_url)
dataset = pq.ParquetDataset(
resolver.get_dataset_path(),
filesystem=fs,
validate_schema=False)
path,
filesystem=filesystem,
validate_schema=False,
use_legacy_dataset=True)

if unischema_class:
schema = locate(unischema_class)
Expand All @@ -85,8 +84,11 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_su
# overwriting the metadata to keep row group indexes and the old row group per file index
arrow_metadata = dataset.common_metadata or None

def filesystem_factory():
return fs.FileSystem.from_uri(dataset_url)[0]

with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata,
filesystem_factory=resolver.filesystem_factory()):
filesystem_factory=filesystem_factory):
if use_summary_metadata:
# Inside the materialize dataset context we just need to write the metadata file as the schema will
# be written by the context manager.
Expand Down
15 changes: 6 additions & 9 deletions petastorm/etl/rowgroup_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import time
from collections import namedtuple

from pyarrow import fs
from pyarrow import parquet as pq
from six.moves import cPickle as pickle
from six.moves import range

from petastorm import utils
from petastorm.etl import dataset_metadata
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
from petastorm.fs_utils import FilesystemResolver

logger = logging.getLogger(__name__)

Expand All @@ -49,10 +49,8 @@ def build_rowgroup_index(dataset_url, spark_context, indexers, hdfs_driver='libh
dataset_url = dataset_url[:-1]

# Create pyarrow file system
resolver = FilesystemResolver(dataset_url, spark_context._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver, user=spark_context.sparkUser())
dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
validate_schema=False)
filesystem, path = fs.FileSystem.from_uri(dataset_url)
dataset = pq.ParquetDataset(path, filesystem=filesystem, validate_schema=False, use_legacy_dataset=True)

split_pieces = dataset_metadata.load_row_groups(dataset)
schema = dataset_metadata.get_schema(dataset)
Expand Down Expand Up @@ -92,12 +90,11 @@ def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_d
libhdfs (java through JNI) or libhdfs3 (C++)
:return: list of indexers containing index data
"""
# Resolver in executor context will get hadoop config from environment
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
fs = resolver.filesystem()
filesystem, _ = fs.FileSystem.from_uri(dataset_url)

# Create pyarrow piece
piece = pq.ParquetDatasetPiece(piece_info.path, open_file_func=fs.open, row_group=piece_info.row_group,
piece = pq.ParquetDatasetPiece(piece_info.path, open_file_func=filesystem.open_input_file,
row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)

# Collect column names needed for indexing
Expand Down
2 changes: 1 addition & 1 deletion petastorm/fs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def get_filesystem_and_path_or_paths(url_or_urls, hdfs_driver='libhdfs3', s3_con
if parsed_url.scheme != first_scheme or parsed_url.netloc != first_netloc:
raise ValueError('The dataset url list must contain url with the same scheme and netloc.')

fs = FilesystemResolver(url_list[0], hdfs_driver=hdfs_driver, s3_config_kwargs=s3_config_kwargs).filesystem()
fs, _ = pyarrow.fs.FileSystem.from_uri(url_list[0])
path_list = [get_dataset_path(parsed_url) for parsed_url in parsed_url_list]

if isinstance(url_or_urls, list):
Expand Down
2 changes: 1 addition & 1 deletion petastorm/py_dict_reader_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def process(self, piece_index, worker_predicate, shuffle_row_drop_partition):
self._dataset = pq.ParquetDataset(
self._dataset_path,
filesystem=self._filesystem,
validate_schema=False, filters=self._arrow_filters)
validate_schema=False, filters=self._arrow_filters, use_legacy_dataset=True)

piece = self._split_pieces[piece_index]

Expand Down
2 changes: 1 addition & 1 deletion petastorm/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def __init__(self, pyarrow_filesystem, dataset_path, schema_fields=None,
# 1. Resolve dataset path (hdfs://, file://) and open the parquet storage (dataset)
self.dataset = pq.ParquetDataset(dataset_path, filesystem=pyarrow_filesystem,
validate_schema=False, metadata_nthreads=10,
filters=filters)
filters=filters, use_legacy_dataset=True)

if self.dataset.partitions is None:
# When read from parquet file list, the `dataset.partitions` will be None.
Expand Down
20 changes: 9 additions & 11 deletions petastorm/spark/spark_dataset_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@
from typing import List, Any

import pyspark
from pyarrow import LocalFileSystem
from pyarrow import fs
from pyspark.sql.session import SparkSession
from pyspark.sql.types import ArrayType, DoubleType, FloatType
from six.moves.urllib.parse import urlparse

from petastorm import make_batch_reader
from petastorm.fs_utils import (FilesystemResolver,
get_filesystem_and_path_or_paths, normalize_dir_url)
from petastorm.fs_utils import get_filesystem_and_path_or_paths, normalize_dir_url

if LooseVersion(pyspark.__version__) < LooseVersion('3.0'):
def vector_to_array(_1, _2='float32'):
Expand Down Expand Up @@ -79,10 +78,9 @@ def _get_parent_cache_dir_url():


def _default_delete_dir_handler(dataset_url):
resolver = FilesystemResolver(dataset_url)
fs = resolver.filesystem()
filesystem, path = fs.FileSystem.from_uri(dataset_url)
parsed = urlparse(dataset_url)
if isinstance(fs, LocalFileSystem):
if isinstance(filesystem, fs.LocalFileSystem):
# pyarrow has a bug: LocalFileSystem.delete() is not implemented.
# https://issues.apache.org/jira/browse/ARROW-7953
# We can remove this branch once ARROW-7953 is fixed.
Expand Down Expand Up @@ -455,9 +453,9 @@ def _check_url(dir_url):
def _check_parent_cache_dir_url(dir_url):
"""Check dir url whether is suitable to be used as parent cache directory."""
_check_url(dir_url)
fs, dir_path = get_filesystem_and_path_or_paths(dir_url)
filesystem, dir_path = get_filesystem_and_path_or_paths(dir_url)
if 'DATABRICKS_RUNTIME_VERSION' in os.environ and not _is_spark_local_mode():
if isinstance(fs, LocalFileSystem):
if isinstance(filesystem, fs.LocalFileSystem):
# User need to use dbfs fuse URL.
if not dir_path.startswith('/dbfs/'):
logger.warning(
Expand Down Expand Up @@ -597,13 +595,13 @@ def _wait_file_available(url_list):
all files are available for reading. This is useful in some filesystems, such as S3 which only
providing eventually consistency.
"""
fs, path_list = get_filesystem_and_path_or_paths(url_list)
filesystem, path_list = get_filesystem_and_path_or_paths(url_list)
logger.debug('Waiting some seconds until all parquet-store files appear at urls %s', ','.join(url_list))

def wait_for_file(path):
end_time = time.time() + _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS
while time.time() < end_time:
if fs.exists(path):
if filesystem.get_file_info(path).type == fs.FileType.File:
return True
time.sleep(0.1)
return False
Expand All @@ -626,7 +624,7 @@ def _check_dataset_file_median_size(url_list):
RECOMMENDED_FILE_SIZE_BYTES = 50 * 1024 * 1024

# TODO: also check file size for other file system.
if isinstance(fs, LocalFileSystem):
if isinstance(fs, fs.LocalFileSystem):
pool = ThreadPool(64)
try:
file_size_list = pool.map(os.path.getsize, path_list)
Expand Down
10 changes: 2 additions & 8 deletions petastorm/spark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
# limitations under the License.

"""A set of Spark specific helper functions for the petastorm dataset"""
from six.moves.urllib.parse import urlparse

from petastorm import utils
from petastorm.etl.dataset_metadata import get_schema_from_dataset_url
from petastorm.fs_utils import FilesystemResolver
from petastorm.fs_utils import get_dataset_path


def dataset_as_rdd(dataset_url, spark_session, schema_fields=None, hdfs_driver='libhdfs3'):
Expand All @@ -33,12 +32,7 @@ def dataset_as_rdd(dataset_url, spark_session, schema_fields=None, hdfs_driver='
"""
schema = get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)

dataset_url_parsed = urlparse(dataset_url)

resolver = FilesystemResolver(dataset_url_parsed, spark_session.sparkContext._jsc.hadoopConfiguration(),
hdfs_driver=hdfs_driver)

dataset_df = spark_session.read.parquet(resolver.get_dataset_path())
dataset_df = spark_session.read.parquet(get_dataset_path(dataset_url))
if schema_fields is not None:
# If wanting a subset of fields, create the schema view and run a select on those fields
schema = schema.create_schema_view(schema_fields)
Expand Down
11 changes: 4 additions & 7 deletions petastorm/tests/test_dataset_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import numpy as np
import pytest
import pyarrow
from pyarrow import fs
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

Expand All @@ -33,17 +33,14 @@ def test_get_schema_from_dataset_url_bogus_url():
with pytest.raises(IOError):
get_schema_from_dataset_url('file:///non-existing-path')

with pytest.raises(ValueError):
get_schema_from_dataset_url('/invalid_url')


def test_serialize_filesystem_factory(tmpdir):
SimpleSchema = Unischema('SimpleSchema', [
UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),
UnischemaField('foo', np.int32, (), ScalarCodec(IntegerType()), False),
])

class BogusFS(pyarrow.LocalFileSystem):
class BogusFS(fs.LocalFileSystem):
def __getstate__(self):
raise RuntimeError("can not serialize")

Expand All @@ -53,8 +50,8 @@ def __getstate__(self):
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
with materialize_dataset(spark, output_url, SimpleSchema, rowgroup_size_mb, filesystem_factory=BogusFS):
rows_rdd = sc.parallelize(range(rows_count))\
.map(lambda x: {'id': x, 'foo': x})\
rows_rdd = sc.parallelize(range(rows_count)) \
.map(lambda x: {'id': x, 'foo': x}) \
.map(lambda x: dict_to_spark_row(SimpleSchema, x))

spark.createDataFrame(rows_rdd, SimpleSchema.as_spark_schema()) \
Expand Down
Loading

0 comments on commit 6d9892a

Please sign in to comment.