diff --git a/petastorm/arrow_reader_worker.py b/petastorm/arrow_reader_worker.py index bdac668e7..d333b4376 100644 --- a/petastorm/arrow_reader_worker.py +++ b/petastorm/arrow_reader_worker.py @@ -130,7 +130,7 @@ def process(self, piece_index, worker_predicate, shuffle_row_drop_partition): self._dataset = pq.ParquetDataset( self._dataset_path_or_paths, filesystem=self._filesystem, - validate_schema=False, filters=self._arrow_filters) + validate_schema=False, filters=self._arrow_filters, use_legacy_dataset=True) if self._dataset.partitions is None: # When read from parquet file list, the `dataset.partitions` will be None. diff --git a/petastorm/etl/dataset_metadata.py b/petastorm/etl/dataset_metadata.py index 48c6f557c..94b590ea2 100644 --- a/petastorm/etl/dataset_metadata.py +++ b/petastorm/etl/dataset_metadata.py @@ -21,12 +21,13 @@ from urllib.parse import urlparse from packaging import version +from pyarrow import fs from pyarrow import parquet as pq from six.moves import cPickle as pickle from petastorm import utils from petastorm.etl.legacy import depickle_legacy_package_name_compatible -from petastorm.fs_utils import FilesystemResolver, get_filesystem_and_path_or_paths, get_dataset_path +from petastorm.fs_utils import get_filesystem_and_path_or_paths, get_dataset_path from petastorm.unischema import Unischema logger = logging.getLogger(__name__) @@ -73,15 +74,6 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_ A user may provide their own recipe for creation of pyarrow filesystem object in ``filesystem_factory`` argument (otherwise, petastorm will create a default one based on the url). - The following example shows how a custom pyarrow HDFS filesystem, instantiated using ``libhdfs`` driver can be used - during Petastorm dataset generation: - - >>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(), - >>> hdfs_driver='libhdfs') - >>> with materialize_dataset(..., filesystem_factory=resolver.filesystem_factory()): - >>> ... - - :param spark: The spark session you are using :param dataset_url: The dataset url to output your dataset to (e.g. ``hdfs:///path/to/dataset``) :param schema: The :class:`petastorm.unischema.Unischema` definition of your dataset @@ -96,18 +88,18 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_ yield # After job completes, add the unischema metadata and check for the metadata summary file if filesystem_factory is None: - resolver = FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(), - user=spark.sparkContext.sparkUser()) - filesystem_factory = resolver.filesystem_factory() - dataset_path = resolver.get_dataset_path() + filesystem, dataset_path = fs.FileSystem.from_uri(dataset_url) + + def filesystem_factory(): + return fs.FileSystem.from_uri(dataset_url)[0] else: dataset_path = get_dataset_path(urlparse(dataset_url)) - filesystem = filesystem_factory() + filesystem = filesystem_factory() dataset = pq.ParquetDataset( dataset_path, filesystem=filesystem, - validate_schema=False) + validate_schema=False, use_legacy_dataset=True) _generate_unischema_metadata(dataset, schema) if not use_summary_metadata: @@ -117,7 +109,7 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_ dataset = pq.ParquetDataset( dataset_path, filesystem=filesystem, - validate_schema=False) + validate_schema=False, use_legacy_dataset=True) try: # Try to load the row groups, if it fails that means the metadata was not generated properly load_row_groups(dataset) @@ -228,7 +220,7 @@ def _generate_num_row_groups_per_file(dataset, spark_context, filesystem_factory def get_row_group_info(path): fs = filesystem_factory() relative_path = os.path.relpath(path, base_path) - pq_file = fs.open(path) + pq_file = fs.open_input_file(path) num_row_groups = pq.read_metadata(pq_file).num_row_groups pq_file.close() return relative_path, num_row_groups @@ -395,7 +387,8 @@ def get_schema_from_dataset_url(dataset_url_or_urls, hdfs_driver='libhdfs3'): """ fs, path_or_paths = get_filesystem_and_path_or_paths(dataset_url_or_urls, hdfs_driver) - dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10) + dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10, + use_legacy_dataset=True) # Get a unischema stored in the dataset metadata. stored_schema = get_schema(dataset) diff --git a/petastorm/etl/metadata_util.py b/petastorm/etl/metadata_util.py index 8530481a8..c9e252305 100644 --- a/petastorm/etl/metadata_util.py +++ b/petastorm/etl/metadata_util.py @@ -16,10 +16,11 @@ from __future__ import print_function import argparse + +from pyarrow import fs from pyarrow import parquet as pq from petastorm.etl import dataset_metadata, rowgroup_indexing -from petastorm.fs_utils import FilesystemResolver if __name__ == "__main__": @@ -46,9 +47,8 @@ args.dataset_url = args.dataset_url[:-1] # Create pyarrow file system - resolver = FilesystemResolver(args.dataset_url, hdfs_driver=args.hdfs_driver) - dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(), - validate_schema=False) + fs, path = fs.FileSystem.from_uri(args.dataset_url) + dataset = pq.ParquetDataset(path, filesystem=fs, validate_schema=False, use_legacy_dataset=True) print_all = not args.schema and not args.index if args.schema or print_all: diff --git a/petastorm/etl/petastorm_generate_metadata.py b/petastorm/etl/petastorm_generate_metadata.py index b33768dda..74b8c715b 100644 --- a/petastorm/etl/petastorm_generate_metadata.py +++ b/petastorm/etl/petastorm_generate_metadata.py @@ -18,12 +18,12 @@ import sys from pydoc import locate +from pyarrow import fs from pyarrow import parquet as pq from pyspark.sql import SparkSession from petastorm.etl.dataset_metadata import materialize_dataset, get_schema, ROW_GROUPS_PER_FILE_KEY from petastorm.etl.rowgroup_indexing import ROWGROUPS_INDEX_KEY -from petastorm.fs_utils import FilesystemResolver from petastorm.unischema import Unischema from petastorm.utils import add_to_dataset_metadata @@ -60,13 +60,12 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_su """ sc = spark.sparkContext - resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration(), hdfs_driver=hdfs_driver, - user=spark.sparkContext.sparkUser()) - fs = resolver.filesystem() + filesystem, path = fs.FileSystem.from_uri(dataset_url) dataset = pq.ParquetDataset( - resolver.get_dataset_path(), - filesystem=fs, - validate_schema=False) + path, + filesystem=filesystem, + validate_schema=False, + use_legacy_dataset=True) if unischema_class: schema = locate(unischema_class) @@ -85,8 +84,11 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_su # overwriting the metadata to keep row group indexes and the old row group per file index arrow_metadata = dataset.common_metadata or None + def filesystem_factory(): + return fs.FileSystem.from_uri(dataset_url)[0] + with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata, - filesystem_factory=resolver.filesystem_factory()): + filesystem_factory=filesystem_factory): if use_summary_metadata: # Inside the materialize dataset context we just need to write the metadata file as the schema will # be written by the context manager. diff --git a/petastorm/etl/rowgroup_indexing.py b/petastorm/etl/rowgroup_indexing.py index f0f8b067f..c48f29e11 100644 --- a/petastorm/etl/rowgroup_indexing.py +++ b/petastorm/etl/rowgroup_indexing.py @@ -16,6 +16,7 @@ import time from collections import namedtuple +from pyarrow import fs from pyarrow import parquet as pq from six.moves import cPickle as pickle from six.moves import range @@ -23,7 +24,6 @@ from petastorm import utils from petastorm.etl import dataset_metadata from petastorm.etl.legacy import depickle_legacy_package_name_compatible -from petastorm.fs_utils import FilesystemResolver logger = logging.getLogger(__name__) @@ -49,10 +49,8 @@ def build_rowgroup_index(dataset_url, spark_context, indexers, hdfs_driver='libh dataset_url = dataset_url[:-1] # Create pyarrow file system - resolver = FilesystemResolver(dataset_url, spark_context._jsc.hadoopConfiguration(), - hdfs_driver=hdfs_driver, user=spark_context.sparkUser()) - dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(), - validate_schema=False) + filesystem, path = fs.FileSystem.from_uri(dataset_url) + dataset = pq.ParquetDataset(path, filesystem=filesystem, validate_schema=False, use_legacy_dataset=True) split_pieces = dataset_metadata.load_row_groups(dataset) schema = dataset_metadata.get_schema(dataset) @@ -92,12 +90,11 @@ def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_d libhdfs (java through JNI) or libhdfs3 (C++) :return: list of indexers containing index data """ - # Resolver in executor context will get hadoop config from environment - resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver) - fs = resolver.filesystem() + filesystem, _ = fs.FileSystem.from_uri(dataset_url) # Create pyarrow piece - piece = pq.ParquetDatasetPiece(piece_info.path, open_file_func=fs.open, row_group=piece_info.row_group, + piece = pq.ParquetDatasetPiece(piece_info.path, open_file_func=filesystem.open_input_file, + row_group=piece_info.row_group, partition_keys=piece_info.partition_keys) # Collect column names needed for indexing diff --git a/petastorm/fs_utils.py b/petastorm/fs_utils.py index a0374419b..6cc45862f 100644 --- a/petastorm/fs_utils.py +++ b/petastorm/fs_utils.py @@ -219,7 +219,7 @@ def get_filesystem_and_path_or_paths(url_or_urls, hdfs_driver='libhdfs3', s3_con if parsed_url.scheme != first_scheme or parsed_url.netloc != first_netloc: raise ValueError('The dataset url list must contain url with the same scheme and netloc.') - fs = FilesystemResolver(url_list[0], hdfs_driver=hdfs_driver, s3_config_kwargs=s3_config_kwargs).filesystem() + fs, _ = pyarrow.fs.FileSystem.from_uri(url_list[0]) path_list = [get_dataset_path(parsed_url) for parsed_url in parsed_url_list] if isinstance(url_or_urls, list): diff --git a/petastorm/py_dict_reader_worker.py b/petastorm/py_dict_reader_worker.py index 5f1edb120..d74b9a4b4 100644 --- a/petastorm/py_dict_reader_worker.py +++ b/petastorm/py_dict_reader_worker.py @@ -135,7 +135,7 @@ def process(self, piece_index, worker_predicate, shuffle_row_drop_partition): self._dataset = pq.ParquetDataset( self._dataset_path, filesystem=self._filesystem, - validate_schema=False, filters=self._arrow_filters) + validate_schema=False, filters=self._arrow_filters, use_legacy_dataset=True) piece = self._split_pieces[piece_index] diff --git a/petastorm/reader.py b/petastorm/reader.py index 9ca6d3c61..2b1cc79cb 100644 --- a/petastorm/reader.py +++ b/petastorm/reader.py @@ -386,7 +386,7 @@ def __init__(self, pyarrow_filesystem, dataset_path, schema_fields=None, # 1. Resolve dataset path (hdfs://, file://) and open the parquet storage (dataset) self.dataset = pq.ParquetDataset(dataset_path, filesystem=pyarrow_filesystem, validate_schema=False, metadata_nthreads=10, - filters=filters) + filters=filters, use_legacy_dataset=True) if self.dataset.partitions is None: # When read from parquet file list, the `dataset.partitions` will be None. diff --git a/petastorm/spark/spark_dataset_converter.py b/petastorm/spark/spark_dataset_converter.py index 946b90709..ed2767168 100644 --- a/petastorm/spark/spark_dataset_converter.py +++ b/petastorm/spark/spark_dataset_converter.py @@ -25,14 +25,13 @@ from typing import List, Any import pyspark -from pyarrow import LocalFileSystem +from pyarrow import fs from pyspark.sql.session import SparkSession from pyspark.sql.types import ArrayType, DoubleType, FloatType from six.moves.urllib.parse import urlparse from petastorm import make_batch_reader -from petastorm.fs_utils import (FilesystemResolver, - get_filesystem_and_path_or_paths, normalize_dir_url) +from petastorm.fs_utils import get_filesystem_and_path_or_paths, normalize_dir_url if LooseVersion(pyspark.__version__) < LooseVersion('3.0'): def vector_to_array(_1, _2='float32'): @@ -79,10 +78,9 @@ def _get_parent_cache_dir_url(): def _default_delete_dir_handler(dataset_url): - resolver = FilesystemResolver(dataset_url) - fs = resolver.filesystem() + filesystem, path = fs.FileSystem.from_uri(dataset_url) parsed = urlparse(dataset_url) - if isinstance(fs, LocalFileSystem): + if isinstance(filesystem, fs.LocalFileSystem): # pyarrow has a bug: LocalFileSystem.delete() is not implemented. # https://issues.apache.org/jira/browse/ARROW-7953 # We can remove this branch once ARROW-7953 is fixed. @@ -455,9 +453,9 @@ def _check_url(dir_url): def _check_parent_cache_dir_url(dir_url): """Check dir url whether is suitable to be used as parent cache directory.""" _check_url(dir_url) - fs, dir_path = get_filesystem_and_path_or_paths(dir_url) + filesystem, dir_path = get_filesystem_and_path_or_paths(dir_url) if 'DATABRICKS_RUNTIME_VERSION' in os.environ and not _is_spark_local_mode(): - if isinstance(fs, LocalFileSystem): + if isinstance(filesystem, fs.LocalFileSystem): # User need to use dbfs fuse URL. if not dir_path.startswith('/dbfs/'): logger.warning( @@ -597,13 +595,13 @@ def _wait_file_available(url_list): all files are available for reading. This is useful in some filesystems, such as S3 which only providing eventually consistency. """ - fs, path_list = get_filesystem_and_path_or_paths(url_list) + filesystem, path_list = get_filesystem_and_path_or_paths(url_list) logger.debug('Waiting some seconds until all parquet-store files appear at urls %s', ','.join(url_list)) def wait_for_file(path): end_time = time.time() + _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS while time.time() < end_time: - if fs.exists(path): + if filesystem.get_file_info(path).type == fs.FileType.File: return True time.sleep(0.1) return False @@ -626,7 +624,7 @@ def _check_dataset_file_median_size(url_list): RECOMMENDED_FILE_SIZE_BYTES = 50 * 1024 * 1024 # TODO: also check file size for other file system. - if isinstance(fs, LocalFileSystem): + if isinstance(fs, fs.LocalFileSystem): pool = ThreadPool(64) try: file_size_list = pool.map(os.path.getsize, path_list) diff --git a/petastorm/spark_utils.py b/petastorm/spark_utils.py index ed4678d8e..08e669078 100644 --- a/petastorm/spark_utils.py +++ b/petastorm/spark_utils.py @@ -13,11 +13,10 @@ # limitations under the License. """A set of Spark specific helper functions for the petastorm dataset""" -from six.moves.urllib.parse import urlparse from petastorm import utils from petastorm.etl.dataset_metadata import get_schema_from_dataset_url -from petastorm.fs_utils import FilesystemResolver +from petastorm.fs_utils import get_dataset_path def dataset_as_rdd(dataset_url, spark_session, schema_fields=None, hdfs_driver='libhdfs3'): @@ -33,12 +32,7 @@ def dataset_as_rdd(dataset_url, spark_session, schema_fields=None, hdfs_driver=' """ schema = get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver) - dataset_url_parsed = urlparse(dataset_url) - - resolver = FilesystemResolver(dataset_url_parsed, spark_session.sparkContext._jsc.hadoopConfiguration(), - hdfs_driver=hdfs_driver) - - dataset_df = spark_session.read.parquet(resolver.get_dataset_path()) + dataset_df = spark_session.read.parquet(get_dataset_path(dataset_url)) if schema_fields is not None: # If wanting a subset of fields, create the schema view and run a select on those fields schema = schema.create_schema_view(schema_fields) diff --git a/petastorm/tests/test_dataset_metadata.py b/petastorm/tests/test_dataset_metadata.py index fb617a676..f3228b185 100644 --- a/petastorm/tests/test_dataset_metadata.py +++ b/petastorm/tests/test_dataset_metadata.py @@ -14,7 +14,7 @@ import numpy as np import pytest -import pyarrow +from pyarrow import fs from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType @@ -33,9 +33,6 @@ def test_get_schema_from_dataset_url_bogus_url(): with pytest.raises(IOError): get_schema_from_dataset_url('file:///non-existing-path') - with pytest.raises(ValueError): - get_schema_from_dataset_url('/invalid_url') - def test_serialize_filesystem_factory(tmpdir): SimpleSchema = Unischema('SimpleSchema', [ @@ -43,7 +40,7 @@ def test_serialize_filesystem_factory(tmpdir): UnischemaField('foo', np.int32, (), ScalarCodec(IntegerType()), False), ]) - class BogusFS(pyarrow.LocalFileSystem): + class BogusFS(fs.LocalFileSystem): def __getstate__(self): raise RuntimeError("can not serialize") @@ -53,8 +50,8 @@ def __getstate__(self): spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate() sc = spark.sparkContext with materialize_dataset(spark, output_url, SimpleSchema, rowgroup_size_mb, filesystem_factory=BogusFS): - rows_rdd = sc.parallelize(range(rows_count))\ - .map(lambda x: {'id': x, 'foo': x})\ + rows_rdd = sc.parallelize(range(rows_count)) \ + .map(lambda x: {'id': x, 'foo': x}) \ .map(lambda x: dict_to_spark_row(SimpleSchema, x)) spark.createDataFrame(rows_rdd, SimpleSchema.as_spark_schema()) \ diff --git a/petastorm/tests/test_end_to_end.py b/petastorm/tests/test_end_to_end.py index 2af672a2e..c04531e68 100644 --- a/petastorm/tests/test_end_to_end.py +++ b/petastorm/tests/test_end_to_end.py @@ -12,20 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import tempfile import operator import os +import tempfile from concurrent.futures import ThreadPoolExecutor from shutil import rmtree, copytree -from six.moves.urllib.parse import urlparse +from unittest import mock import numpy as np -import pyarrow.hdfs import pytest +from pyarrow import fs from pyspark.sql import SparkSession from pyspark.sql.types import LongType, ShortType, StringType - -from unittest import mock +from six.moves.urllib.parse import urlparse from petastorm import make_reader, make_batch_reader, TransformSpec from petastorm.codecs import ScalarCodec, CompressedImageCodec @@ -783,7 +782,7 @@ def test_pass_in_pyarrow_filesystem_to_materialize_dataset(synthetic_dataset, tm a_moved_path = tmpdir.join('moved').strpath copytree(synthetic_dataset.path, a_moved_path) - local_fs = pyarrow.LocalFileSystem + local_fs = fs.LocalFileSystem os.remove(a_moved_path + '/_common_metadata') spark = SparkSession.builder.getOrCreate() diff --git a/petastorm/tests/test_fs_utils.py b/petastorm/tests/test_fs_utils.py index 423b6e36a..3e77c7b25 100644 --- a/petastorm/tests/test_fs_utils.py +++ b/petastorm/tests/test_fs_utils.py @@ -13,17 +13,11 @@ # limitations under the License. import unittest -import dill -import mock -from pyarrow.filesystem import LocalFileSystem -from pyarrow.lib import ArrowIOError -from six.moves.urllib.parse import urlparse -import s3fs +from pyarrow import fs -from petastorm.fs_utils import FilesystemResolver, get_filesystem_and_path_or_paths -from petastorm.gcsfs_helpers.gcsfs_wrapper import GCSFSWrapper +from petastorm.fs_utils import get_filesystem_and_path_or_paths from petastorm.hdfs.tests.test_hdfs_namenode import HC, MockHadoopConfiguration, \ - MockHdfs, MockHdfsConnector + MockHdfsConnector ABS_PATH = '/abs/path' @@ -48,167 +42,12 @@ def setUp(self): self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn1'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN1) self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn2'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN2) - def test_error_url_cases(self): - """Various error cases that result in exception raised.""" - # Case 1: Schemeless path asserts - with self.assertRaises(ValueError): - FilesystemResolver(ABS_PATH, {}) - - # Case 4b: HDFS default path case with NO defaultFS - with self.assertRaises(RuntimeError): - FilesystemResolver('hdfs:///some/path', {}) - - # Case 4b: Using `default` as host, while apparently a pyarrow convention, is NOT valid - with self.assertRaises(ArrowIOError): - FilesystemResolver('hdfs://default', {}) - - # Case 5: other schemes result in ValueError; urlparse to cover an else branch! - with self.assertRaises(ValueError): - FilesystemResolver(urlparse('http://foo/bar'), {}) - with self.assertRaises(ValueError): - FilesystemResolver(urlparse('ftp://foo/bar'), {}) - with self.assertRaises(ValueError): - FilesystemResolver(urlparse('ssh://foo/bar'), {}) - - # s3 paths must have the bucket as the netloc - with self.assertRaises(ValueError): - FilesystemResolver(urlparse('s3:///foo/bar'), {}) - - # GCS paths must have the bucket as the netloc - with self.assertRaises(ValueError): - FilesystemResolver(urlparse('gcs:///foo/bar'), {}) - - def test_file_url(self): - """ Case 2: File path, agnostic to content of hadoop configuration.""" - suj = FilesystemResolver('file://{}'.format(ABS_PATH), self._hadoop_configuration, connector=self.mock) - self.assertTrue(isinstance(suj.filesystem(), LocalFileSystem)) - self.assertEqual('', suj.parsed_dataset_url().netloc) - self.assertEqual(ABS_PATH, suj.get_dataset_path()) - - # Make sure we did not capture FilesystemResolver in a closure by mistake - dill.dumps(suj.filesystem_factory()) - - def test_hdfs_url_with_nameservice(self): - """ Case 3a: HDFS nameservice.""" - suj = FilesystemResolver(HC.WARP_TURTLE_PATH, self._hadoop_configuration, connector=self.mock, - user=self.mock_name) - self.assertEqual(MockHdfs, type(suj.filesystem()._hdfs)) - self.assertEqual(self.mock_name, suj.filesystem()._user) - self.assertEqual(HC.WARP_TURTLE, suj.parsed_dataset_url().netloc) - self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2)) - self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN)) - - # Make sure we did not capture FilesystemResolver in a closure by mistake - dill.dumps(suj.filesystem_factory()) - - def test_hdfs_url_no_nameservice(self): - """ Case 3b: HDFS with no nameservice should connect to default namenode.""" - suj = FilesystemResolver('hdfs:///some/path', self._hadoop_configuration, connector=self.mock, - user=self.mock_name) - self.assertEqual(MockHdfs, type(suj.filesystem()._hdfs)) - self.assertEqual(self.mock_name, suj.filesystem()._user) - self.assertEqual(HC.WARP_TURTLE, suj.parsed_dataset_url().netloc) - # ensure path is preserved in parsed URL - self.assertEqual('/some/path', suj.get_dataset_path()) - self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2)) - self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN)) - - # Make sure we did not capture FilesystemResolver in a closure by mistake - dill.dumps(suj.filesystem_factory()) - - def test_hdfs_url_direct_namenode(self): - """ Case 4: direct namenode.""" - suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN1), - self._hadoop_configuration, - connector=self.mock, - user=self.mock_name) - self.assertEqual(MockHdfs, type(suj.filesystem())) - self.assertEqual(self.mock_name, suj.filesystem()._user) - self.assertEqual(HC.WARP_TURTLE_NN1, suj.parsed_dataset_url().netloc) - self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN2)) - self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN)) - - # Make sure we did not capture FilesystemResolver in a closure by mistake - dill.dumps(suj.filesystem_factory()) - - def test_hdfs_url_direct_namenode_driver_libhdfs(self): - suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN1), - self._hadoop_configuration, - connector=self.mock, hdfs_driver='libhdfs', user=self.mock_name) - self.assertEqual(MockHdfs, type(suj.filesystem())) - self.assertEqual(self.mock_name, suj.filesystem()._user) - # Make sure we did not capture FilesystemResolver in a closure by mistake - dill.dumps(suj.filesystem_factory()) - - def test_hdfs_url_direct_namenode_retries(self): - """ Case 4: direct namenode fails first two times thru, but 2nd retry succeeds.""" - self.mock.set_fail_n_next_connect(2) - with self.assertRaises(ArrowIOError): - suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2), - self._hadoop_configuration, - connector=self.mock, user=self.mock_name) - self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2)) - self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN)) - with self.assertRaises(ArrowIOError): - suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2), - self._hadoop_configuration, - connector=self.mock) - self.assertEqual(2, self.mock.connect_attempted(HC.WARP_TURTLE_NN2)) - self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN)) - # this one should connect "successfully" - suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2), - self._hadoop_configuration, - connector=self.mock, user=self.mock_name) - self.assertEqual(MockHdfs, type(suj.filesystem())) - self.assertEqual(self.mock_name, suj.filesystem()._user) - self.assertEqual(HC.WARP_TURTLE_NN2, suj.parsed_dataset_url().netloc) - self.assertEqual(3, self.mock.connect_attempted(HC.WARP_TURTLE_NN2)) - self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN)) - - def test_s3_without_s3fs(self): - with mock.patch.dict('sys.modules', s3fs=None): - # `import s3fs` will fail in this context - with self.assertRaises(ValueError): - FilesystemResolver(urlparse('s3://foo/bar'), {}) - - def test_s3_url(self): - suj = FilesystemResolver('s3://bucket{}'.format(ABS_PATH), self._hadoop_configuration, connector=self.mock) - self.assertTrue(isinstance(suj.filesystem(), s3fs.S3FileSystem)) - self.assertEqual('bucket', suj.parsed_dataset_url().netloc) - self.assertEqual('bucket' + ABS_PATH, suj.get_dataset_path()) - - # Make sure we did not capture FilesystemResolver in a closure by mistake - dill.dumps(suj.filesystem_factory()) - - def test_gcs_without_gcsfs(self): - with mock.patch.dict('sys.modules', gcsfs=None): - # `import gcsfs` will fail in this context - with self.assertRaises(ValueError): - FilesystemResolver(urlparse('gcs://foo/bar'), {}) - - def test_gcs_url(self): - suj = FilesystemResolver('gcs://bucket{}'.format(ABS_PATH), self._hadoop_configuration, connector=self.mock) - self.assertTrue(isinstance(suj.filesystem(), GCSFSWrapper)) - self.assertEqual('bucket', suj.parsed_dataset_url().netloc) - self.assertEqual('bucket' + ABS_PATH, suj.get_dataset_path()) - - # Make sure we did not capture FilesystemResolver in a closure by mistake - dill.dumps(suj.filesystem_factory()) - def test_get_filesystem_and_path_or_paths(self): fs1, path1 = get_filesystem_and_path_or_paths('file:///some/path') - assert isinstance(fs1, LocalFileSystem) and path1 == '/some/path' + assert isinstance(fs1, fs.LocalFileSystem) and path1 == '/some/path' - fs2, paths2 = get_filesystem_and_path_or_paths( - ['file:///some/path/01.parquet', 'file:///some/path/02.parquet']) - assert isinstance(fs2, LocalFileSystem) and \ - paths2 == ['/some/path/01.parquet', '/some/path/02.parquet'] + fs2, paths2 = get_filesystem_and_path_or_paths(['file:///some/path/01.parquet', 'file:///some/path/02.parquet']) + assert isinstance(fs2, fs.LocalFileSystem) and paths2 == ['/some/path/01.parquet', '/some/path/02.parquet'] with self.assertRaises(ValueError): get_filesystem_and_path_or_paths( diff --git a/petastorm/tests/test_spark_dataset_converter.py b/petastorm/tests/test_spark_dataset_converter.py index 0ae3bfb66..99690c10f 100644 --- a/petastorm/tests/test_spark_dataset_converter.py +++ b/petastorm/tests/test_spark_dataset_converter.py @@ -25,6 +25,7 @@ import pytest import py4j import tensorflow.compat.v1 as tf # pylint: disable=import-error +from pyarrow import fs from pyspark.sql.functions import pandas_udf from pyspark.sql.types import (ArrayType, BinaryType, BooleanType, ByteType, DoubleType, FloatType, IntegerType, LongType, @@ -32,7 +33,6 @@ from six.moves.urllib.parse import urlparse from petastorm import make_batch_reader -from petastorm.fs_utils import FilesystemResolver from petastorm.spark import (SparkDatasetConverter, make_spark_converter, spark_dataset_converter) from petastorm.spark.spark_dataset_converter import ( @@ -151,8 +151,8 @@ def test_atexit(spark_test_ctx): with open(os.path.join(spark_test_ctx.tempdir, 'test_atexit.out')) as f: cache_dir_url = f.read() - fs = FilesystemResolver(cache_dir_url).filesystem() - assert not fs.exists(urlparse(cache_dir_url).path) + filesystem, path = fs.LocalFileSystem.from_uri(cache_dir_url) + assert not filesystem.get_file_info(path).is_file def test_set_delete_handler(spark_test_ctx): diff --git a/petastorm/tests/test_unischema.py b/petastorm/tests/test_unischema.py index a7904b54f..62201e917 100644 --- a/petastorm/tests/test_unischema.py +++ b/petastorm/tests/test_unischema.py @@ -33,7 +33,8 @@ def _mock_parquet_dataset(partitions, arrow_schema): """Creates a pyarrow.ParquetDataset mock capable of returning: - parquet_dataset.pieces[0].get_metadata(parquet_dataset.fs.open).schema.to_arrow_schema() == schema + parquet_dataset.pieces[0].get_metadata(parquet_dataset.fs.open + ).schema.to_arrow_schema() == schema parquet_dataset.partitions = partitions :param partitions: expected to be a list of pa.parquet.PartitionSet diff --git a/petastorm/tools/copy_dataset.py b/petastorm/tools/copy_dataset.py index 8d4c264ff..5d78fe221 100644 --- a/petastorm/tools/copy_dataset.py +++ b/petastorm/tools/copy_dataset.py @@ -25,10 +25,9 @@ from pyspark.sql import SparkSession -from petastorm.unischema import match_unischema_fields from petastorm.etl.dataset_metadata import materialize_dataset, get_schema_from_dataset_url from petastorm.tools.spark_session_cli import add_configure_spark_arguments, configure_spark -from petastorm.fs_utils import FilesystemResolver +from petastorm.unischema import match_unischema_fields def copy_dataset(spark, source_url, target_url, field_regex, not_null_fields, overwrite_output, partitions_count, @@ -67,10 +66,7 @@ def copy_dataset(spark, source_url, target_url, field_regex, not_null_fields, ov else: subschema = schema - resolver = FilesystemResolver(target_url, spark.sparkContext._jsc.hadoopConfiguration(), - hdfs_driver=hdfs_driver, user=spark.sparkContext.sparkUser()) - with materialize_dataset(spark, target_url, subschema, row_group_size_mb, - filesystem_factory=resolver.filesystem_factory()): + with materialize_dataset(spark, target_url, subschema, row_group_size_mb): data_frame = spark.read \ .parquet(source_url) diff --git a/petastorm/utils.py b/petastorm/utils.py index cb83bb85d..89ea8881b 100644 --- a/petastorm/utils.py +++ b/petastorm/utils.py @@ -20,7 +20,7 @@ import numpy as np import pyarrow from future.utils import raise_with_traceback -from pyarrow.filesystem import LocalFileSystem +from pyarrow import fs logger = logging.getLogger(__name__) @@ -125,8 +125,9 @@ def add_to_dataset_metadata(dataset, key, value): # We have just modified _common_metadata file, but the filesystem implementation used by pyarrow does not # update the .crc value. We better delete the .crc to make sure there is no mismatch between _common_metadata # content and the checksum. - if isinstance(dataset.fs, LocalFileSystem) and dataset.fs.exists(common_metadata_file_crc_path): - try: - dataset.fs.rm(common_metadata_file_crc_path) - except NotImplementedError: - os.remove(common_metadata_file_crc_path) + if isinstance(dataset.fs, fs.LocalFileSystem): + if dataset.fs.get_file_info(common_metadata_file_crc_path).is_file: + try: + dataset.fs.delete_file(common_metadata_file_crc_path) + except NotImplementedError: + os.remove(common_metadata_file_crc_path)