diff --git a/petastorm/arrow_reader_worker.py b/petastorm/arrow_reader_worker.py
index f03a7f456..d333b4376 100644
--- a/petastorm/arrow_reader_worker.py
+++ b/petastorm/arrow_reader_worker.py
@@ -23,7 +23,6 @@
from pyarrow.parquet import ParquetFile
from petastorm.cache import NullCache
-from petastorm.compat import compat_piece_read, compat_table_columns_gen, compat_column_data
from petastorm.workers_pool import EmptyResultError
from petastorm.workers_pool.worker_base import WorkerBase
@@ -45,11 +44,12 @@ def read_next(self, workers_pool, schema, ngram):
# Convert arrow table columns into numpy. Strings are handled differently since to_pandas() returns
# numpy array of dtype=object.
result_dict = dict()
- for column_name, column in compat_table_columns_gen(result_table):
+ for column_name in result_table.column_names:
+ column = result_table.column(column_name)
# Assume we get only one chunk since reader worker reads one rowgroup at a time
# `to_pandas` works slower when called on the entire `data` rather directly on a chunk.
- if compat_column_data(result_table.column(0)).num_chunks == 1:
+ if result_table.column(0).num_chunks == 1:
column_as_pandas = column.data.chunks[0].to_pandas()
else:
column_as_pandas = column.data.to_pandas()
@@ -130,7 +130,7 @@ def process(self, piece_index, worker_predicate, shuffle_row_drop_partition):
self._dataset = pq.ParquetDataset(
self._dataset_path_or_paths,
filesystem=self._filesystem,
- validate_schema=False, filters=self._arrow_filters)
+ validate_schema=False, filters=self._arrow_filters, use_legacy_dataset=True)
if self._dataset.partitions is None:
# When read from parquet file list, the `dataset.partitions` will be None.
@@ -287,13 +287,13 @@ def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_
partition_names = self._dataset.partitions.partition_names
# pyarrow would fail if we request a column names that the dataset is partitioned by
- table = compat_piece_read(piece, lambda _: pq_file, columns=column_names - partition_names,
- partitions=self._dataset.partitions)
+ table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions)
# Drop columns we did not explicitly request. This may happen when a table is partitioned. Besides columns
# requested, pyarrow will also return partition values. Having these unexpected fields will break some
# downstream code.
- loaded_column_names = set(column[0] for column in compat_table_columns_gen(table))
+
+ loaded_column_names = set(table.column_names)
unasked_for_columns = loaded_column_names - column_names
if unasked_for_columns:
table = table.drop(unasked_for_columns)
diff --git a/petastorm/compat.py b/petastorm/compat.py
deleted file mode 100644
index 1f7cad10b..000000000
--- a/petastorm/compat.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright (c) 2017-2018 Uber Technologies, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""0.15.0 cancelled previously deprecated functions. We still want to support 0.11 as it is being used by some users.
-This file implements compatibility interfaces. Once we drop support of 0.11, we can get rid of this file."""
-
-import pyarrow as pa
-from packaging import version
-from pyarrow import parquet as pq
-
-_PYARROW_BEFORE_013 = version.parse(pa.__version__) < version.parse('0.13.0')
-_PYARROW_BEFORE_014 = version.parse(pa.__version__) < version.parse('0.14.0')
-_PYARROW_BEFORE_015 = version.parse(pa.__version__) < version.parse('0.15.0')
-
-
-def compat_get_metadata(piece, open_func):
- if _PYARROW_BEFORE_013:
- arrow_metadata = piece.get_metadata(open_func)
- else:
- arrow_metadata = piece.get_metadata()
- return arrow_metadata
-
-
-def compat_piece_read(piece, open_file_func, **kwargs):
- if _PYARROW_BEFORE_013:
- table = piece.read(open_file_func=open_file_func, **kwargs)
- else:
- table = piece.read(**kwargs)
- return table
-
-
-def compat_table_columns_gen(table):
- if _PYARROW_BEFORE_014:
- for column in table.columns:
- name = column.name
- yield name, column
- else:
- for name in table.column_names:
- column = table.column(name)
- yield name, column
-
-
-def compat_column_data(column):
- if _PYARROW_BEFORE_015:
- return column.data
- else:
- return column
-
-
-def compat_make_parquet_piece(path, open_file_func, **kwargs):
- if _PYARROW_BEFORE_013:
- return pq.ParquetDatasetPiece(path, **kwargs)
- else:
- return pq.ParquetDatasetPiece(path, open_file_func=open_file_func, # pylint: disable=unexpected-keyword-arg
- **kwargs)
-
-
-def compat_with_metadata(schema, metadata):
- if _PYARROW_BEFORE_015:
- return schema.add_metadata(metadata)
- else:
- return schema.with_metadata(metadata)
-
-
-def compat_schema_field(schema, name):
- if _PYARROW_BEFORE_013:
- return schema.field_by_name(name)
- else:
- return schema.field(name)
diff --git a/petastorm/etl/dataset_metadata.py b/petastorm/etl/dataset_metadata.py
index 352286c29..94b590ea2 100644
--- a/petastorm/etl/dataset_metadata.py
+++ b/petastorm/etl/dataset_metadata.py
@@ -21,13 +21,13 @@
from urllib.parse import urlparse
from packaging import version
+from pyarrow import fs
from pyarrow import parquet as pq
from six.moves import cPickle as pickle
from petastorm import utils
-from petastorm.compat import compat_get_metadata, compat_make_parquet_piece
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
-from petastorm.fs_utils import FilesystemResolver, get_filesystem_and_path_or_paths, get_dataset_path
+from petastorm.fs_utils import get_filesystem_and_path_or_paths, get_dataset_path
from petastorm.unischema import Unischema
logger = logging.getLogger(__name__)
@@ -74,15 +74,6 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
A user may provide their own recipe for creation of pyarrow filesystem object in ``filesystem_factory``
argument (otherwise, petastorm will create a default one based on the url).
- The following example shows how a custom pyarrow HDFS filesystem, instantiated using ``libhdfs`` driver can be used
- during Petastorm dataset generation:
-
- >>> resolver=FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
- >>> hdfs_driver='libhdfs')
- >>> with materialize_dataset(..., filesystem_factory=resolver.filesystem_factory()):
- >>> ...
-
-
:param spark: The spark session you are using
:param dataset_url: The dataset url to output your dataset to (e.g. ``hdfs:///path/to/dataset``)
:param schema: The :class:`petastorm.unischema.Unischema` definition of your dataset
@@ -97,18 +88,18 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
yield
# After job completes, add the unischema metadata and check for the metadata summary file
if filesystem_factory is None:
- resolver = FilesystemResolver(dataset_url, spark.sparkContext._jsc.hadoopConfiguration(),
- user=spark.sparkContext.sparkUser())
- filesystem_factory = resolver.filesystem_factory()
- dataset_path = resolver.get_dataset_path()
+ filesystem, dataset_path = fs.FileSystem.from_uri(dataset_url)
+
+ def filesystem_factory():
+ return fs.FileSystem.from_uri(dataset_url)[0]
else:
dataset_path = get_dataset_path(urlparse(dataset_url))
- filesystem = filesystem_factory()
+ filesystem = filesystem_factory()
dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
- validate_schema=False)
+ validate_schema=False, use_legacy_dataset=True)
_generate_unischema_metadata(dataset, schema)
if not use_summary_metadata:
@@ -118,7 +109,7 @@ def materialize_dataset(spark, dataset_url, schema, row_group_size_mb=None, use_
dataset = pq.ParquetDataset(
dataset_path,
filesystem=filesystem,
- validate_schema=False)
+ validate_schema=False, use_legacy_dataset=True)
try:
# Try to load the row groups, if it fails that means the metadata was not generated properly
load_row_groups(dataset)
@@ -229,7 +220,7 @@ def _generate_num_row_groups_per_file(dataset, spark_context, filesystem_factory
def get_row_group_info(path):
fs = filesystem_factory()
relative_path = os.path.relpath(path, base_path)
- pq_file = fs.open(path)
+ pq_file = fs.open_input_file(path)
num_row_groups = pq.read_metadata(pq_file).num_row_groups
pq_file.close()
return relative_path, num_row_groups
@@ -286,8 +277,8 @@ def load_row_groups(dataset):
# This is not a real "piece" and we won't have row_groups_per_file recorded for it.
if row_groups_key != ".":
for row_group in range(row_groups_per_file[row_groups_key]):
- rowgroups.append(compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
- partition_keys=piece.partition_keys))
+ rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
+ partition_keys=piece.partition_keys))
return rowgroups
@@ -323,18 +314,18 @@ def _split_row_groups(dataset):
continue
for row_group in range(row_groups_per_file[relative_path]):
- split_piece = compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
- partition_keys=piece.partition_keys)
+ split_piece = pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
+ partition_keys=piece.partition_keys)
split_pieces.append(split_piece)
return split_pieces
def _split_piece(piece, fs_open):
- metadata = compat_get_metadata(piece, fs_open)
- return [compat_make_parquet_piece(piece.path, fs_open,
- row_group=row_group,
- partition_keys=piece.partition_keys)
+ metadata = piece.get_metadata()
+ return [pq.ParquetDatasetPiece(piece.path, open_file_func=fs_open,
+ row_group=row_group,
+ partition_keys=piece.partition_keys)
for row_group in range(metadata.num_row_groups)]
@@ -396,7 +387,8 @@ def get_schema_from_dataset_url(dataset_url_or_urls, hdfs_driver='libhdfs3'):
"""
fs, path_or_paths = get_filesystem_and_path_or_paths(dataset_url_or_urls, hdfs_driver)
- dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
+ dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10,
+ use_legacy_dataset=True)
# Get a unischema stored in the dataset metadata.
stored_schema = get_schema(dataset)
diff --git a/petastorm/etl/metadata_util.py b/petastorm/etl/metadata_util.py
index 8530481a8..c9e252305 100644
--- a/petastorm/etl/metadata_util.py
+++ b/petastorm/etl/metadata_util.py
@@ -16,10 +16,11 @@
from __future__ import print_function
import argparse
+
+from pyarrow import fs
from pyarrow import parquet as pq
from petastorm.etl import dataset_metadata, rowgroup_indexing
-from petastorm.fs_utils import FilesystemResolver
if __name__ == "__main__":
@@ -46,9 +47,8 @@
args.dataset_url = args.dataset_url[:-1]
# Create pyarrow file system
- resolver = FilesystemResolver(args.dataset_url, hdfs_driver=args.hdfs_driver)
- dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
- validate_schema=False)
+ fs, path = fs.FileSystem.from_uri(args.dataset_url)
+ dataset = pq.ParquetDataset(path, filesystem=fs, validate_schema=False, use_legacy_dataset=True)
print_all = not args.schema and not args.index
if args.schema or print_all:
diff --git a/petastorm/etl/petastorm_generate_metadata.py b/petastorm/etl/petastorm_generate_metadata.py
index b33768dda..74b8c715b 100644
--- a/petastorm/etl/petastorm_generate_metadata.py
+++ b/petastorm/etl/petastorm_generate_metadata.py
@@ -18,12 +18,12 @@
import sys
from pydoc import locate
+from pyarrow import fs
from pyarrow import parquet as pq
from pyspark.sql import SparkSession
from petastorm.etl.dataset_metadata import materialize_dataset, get_schema, ROW_GROUPS_PER_FILE_KEY
from petastorm.etl.rowgroup_indexing import ROWGROUPS_INDEX_KEY
-from petastorm.fs_utils import FilesystemResolver
from petastorm.unischema import Unischema
from petastorm.utils import add_to_dataset_metadata
@@ -60,13 +60,12 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_su
"""
sc = spark.sparkContext
- resolver = FilesystemResolver(dataset_url, sc._jsc.hadoopConfiguration(), hdfs_driver=hdfs_driver,
- user=spark.sparkContext.sparkUser())
- fs = resolver.filesystem()
+ filesystem, path = fs.FileSystem.from_uri(dataset_url)
dataset = pq.ParquetDataset(
- resolver.get_dataset_path(),
- filesystem=fs,
- validate_schema=False)
+ path,
+ filesystem=filesystem,
+ validate_schema=False,
+ use_legacy_dataset=True)
if unischema_class:
schema = locate(unischema_class)
@@ -85,8 +84,11 @@ def generate_petastorm_metadata(spark, dataset_url, unischema_class=None, use_su
# overwriting the metadata to keep row group indexes and the old row group per file index
arrow_metadata = dataset.common_metadata or None
+ def filesystem_factory():
+ return fs.FileSystem.from_uri(dataset_url)[0]
+
with materialize_dataset(spark, dataset_url, schema, use_summary_metadata=use_summary_metadata,
- filesystem_factory=resolver.filesystem_factory()):
+ filesystem_factory=filesystem_factory):
if use_summary_metadata:
# Inside the materialize dataset context we just need to write the metadata file as the schema will
# be written by the context manager.
diff --git a/petastorm/etl/rowgroup_indexing.py b/petastorm/etl/rowgroup_indexing.py
index b16d47ea5..c48f29e11 100644
--- a/petastorm/etl/rowgroup_indexing.py
+++ b/petastorm/etl/rowgroup_indexing.py
@@ -16,15 +16,14 @@
import time
from collections import namedtuple
+from pyarrow import fs
from pyarrow import parquet as pq
from six.moves import cPickle as pickle
from six.moves import range
from petastorm import utils
-from petastorm.compat import compat_piece_read, compat_make_parquet_piece
from petastorm.etl import dataset_metadata
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
-from petastorm.fs_utils import FilesystemResolver
logger = logging.getLogger(__name__)
@@ -50,10 +49,8 @@ def build_rowgroup_index(dataset_url, spark_context, indexers, hdfs_driver='libh
dataset_url = dataset_url[:-1]
# Create pyarrow file system
- resolver = FilesystemResolver(dataset_url, spark_context._jsc.hadoopConfiguration(),
- hdfs_driver=hdfs_driver, user=spark_context.sparkUser())
- dataset = pq.ParquetDataset(resolver.get_dataset_path(), filesystem=resolver.filesystem(),
- validate_schema=False)
+ filesystem, path = fs.FileSystem.from_uri(dataset_url)
+ dataset = pq.ParquetDataset(path, filesystem=filesystem, validate_schema=False, use_legacy_dataset=True)
split_pieces = dataset_metadata.load_row_groups(dataset)
schema = dataset_metadata.get_schema(dataset)
@@ -93,13 +90,12 @@ def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_d
libhdfs (java through JNI) or libhdfs3 (C++)
:return: list of indexers containing index data
"""
- # Resolver in executor context will get hadoop config from environment
- resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
- fs = resolver.filesystem()
+ filesystem, _ = fs.FileSystem.from_uri(dataset_url)
# Create pyarrow piece
- piece = compat_make_parquet_piece(piece_info.path, fs.open, row_group=piece_info.row_group,
- partition_keys=piece_info.partition_keys)
+ piece = pq.ParquetDatasetPiece(piece_info.path, open_file_func=filesystem.open_input_file,
+ row_group=piece_info.row_group,
+ partition_keys=piece_info.partition_keys)
# Collect column names needed for indexing
column_names = set()
@@ -107,8 +103,7 @@ def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_d
column_names.update(indexer.column_names)
# Read columns needed for indexing
- column_rows = compat_piece_read(piece, fs.open, columns=list(column_names),
- partitions=partitions).to_pandas().to_dict('records')
+ column_rows = piece.read(columns=list(column_names), partitions=partitions).to_pandas().to_dict('records')
# Decode columns values
decoded_rows = [utils.decode_row(row, schema) for row in column_rows]
diff --git a/petastorm/fs_utils.py b/petastorm/fs_utils.py
index a0374419b..9c09fed67 100644
--- a/petastorm/fs_utils.py
+++ b/petastorm/fs_utils.py
@@ -17,9 +17,6 @@
import six
from six.moves.urllib.parse import urlparse
-from petastorm.gcsfs_helpers.gcsfs_wrapper import GCSFSWrapper
-from petastorm.hdfs.namenode import HdfsNamenodeResolver, HdfsConnector
-
logger = logging.getLogger(__name__)
@@ -36,168 +33,6 @@ def get_dataset_path(parsed_url):
return parsed_url.path
-class FilesystemResolver(object):
- """Resolves a dataset URL, makes a connection via pyarrow, and provides a filesystem object."""
-
- def __init__(self, dataset_url, hadoop_configuration=None, connector=HdfsConnector,
- hdfs_driver='libhdfs3', user=None, s3_config_kwargs=None):
- """
- Given a dataset URL and an optional hadoop configuration, parse and interpret the URL to
- instantiate a pyarrow filesystem.
-
- Interpretation of the URL ``scheme://hostname:port/path`` occurs in the following order:
-
- 1. If no ``scheme``, no longer supported, so raise an exception!
- 2. If ``scheme`` is ``file``, use local filesystem path.
- 3. If ``scheme`` is ``hdfs``:
- a. Try the ``hostname`` as a namespace and attempt to connect to a name node.
- 1. If that doesn't work, try connecting directly to namenode ``hostname:port``.
- b. If no host, connect to the default name node.
- 5. If ``scheme`` is ``s3``, use s3fs. The user must manually install s3fs before using s3
- 6. If ``scheme`` is ``gs``or ``gcs``, use gcsfs. The user must manually install gcsfs before using GCS
- 7. Fail otherwise.
-
- :param dataset_url: The hdfs URL or absolute path to the dataset
- :param hadoop_configuration: an optional hadoop configuration
- :param connector: the HDFS connector object to use (ONLY override for testing purposes)
- :param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
- libhdfs (java through JNI) or libhdfs3 (C++)
- :param user: String denoting username when connecting to HDFS. None implies login user.
- """
- # Cache both the original URL and the resolved, urlparsed dataset_url
- self._dataset_url = dataset_url
- self._parsed_dataset_url = None
- # Cache the instantiated filesystem object
- self._filesystem = None
-
- if isinstance(self._dataset_url, six.string_types):
- self._parsed_dataset_url = urlparse(self._dataset_url)
- else:
- self._parsed_dataset_url = self._dataset_url
-
- if not self._parsed_dataset_url.scheme:
- # Case 1
- raise ValueError('ERROR! A scheme-less dataset url ({}) is no longer supported. '
- 'Please prepend "file://" for local filesystem.'.format(self._parsed_dataset_url.scheme))
-
- elif self._parsed_dataset_url.scheme == 'file':
- # Case 2: definitely local
- self._filesystem = pyarrow.localfs
- self._filesystem_factory = lambda: pyarrow.localfs
-
- elif self._parsed_dataset_url.scheme == 'hdfs':
-
- if hdfs_driver == 'libhdfs3':
- # libhdfs3 does not do any namenode resolution itself so we do it manually. This is not necessary
- # if using libhdfs
-
- # Obtain singleton and force hadoop config evaluation
- namenode_resolver = HdfsNamenodeResolver(hadoop_configuration)
-
- # Since we can't tell for sure, first treat the URL as though it references a name service
- if self._parsed_dataset_url.netloc:
- # Case 3a: Use the portion of netloc before any port, which doesn't get lowercased
- nameservice = self._parsed_dataset_url.netloc.split(':')[0]
- namenodes = namenode_resolver.resolve_hdfs_name_service(nameservice)
- if namenodes:
- self._filesystem = connector.connect_to_either_namenode(namenodes, user=user)
- self._filesystem_factory = lambda: connector.connect_to_either_namenode(namenodes, user=user)
- if self._filesystem is None:
- # Case 3a1: That didn't work; try the URL as a namenode host
- self._filesystem = connector.hdfs_connect_namenode(self._parsed_dataset_url, user=user)
- self._filesystem_factory = \
- lambda url=self._dataset_url, user=user: \
- connector.hdfs_connect_namenode(urlparse(url), user=user)
- else:
- # Case 3b: No netloc, so let's try to connect to default namenode
- # HdfsNamenodeResolver will raise exception if it fails to connect.
- nameservice, namenodes = namenode_resolver.resolve_default_hdfs_service()
- filesystem = connector.connect_to_either_namenode(namenodes, user=user)
- self._filesystem_factory = lambda: connector.connect_to_either_namenode(namenodes, user=user)
- if filesystem is not None:
- # Properly replace the parsed dataset URL once default namenode is confirmed
- self._parsed_dataset_url = urlparse(
- 'hdfs://{}{}'.format(nameservice, self._parsed_dataset_url.path))
- self._filesystem = filesystem
- else:
- self._filesystem = connector.hdfs_connect_namenode(self._parsed_dataset_url, hdfs_driver, user=user)
- self._filesystem_factory = \
- lambda url=self._dataset_url, user=user: \
- connector.hdfs_connect_namenode(urlparse(url), hdfs_driver, user=user)
-
- elif self._parsed_dataset_url.scheme in ('s3', 's3a', 's3n'):
- # Case 5
- # S3 support requires s3fs to be installed
- try:
- import s3fs
- except ImportError:
- raise ValueError('Must have s3fs installed in order to use datasets on s3. '
- 'Please install s3fs and try again.')
-
- if not self._parsed_dataset_url.netloc:
- raise ValueError('URLs must be of the form s3://bucket/path')
-
- fs = s3fs.S3FileSystem(config_kwargs=s3_config_kwargs)
- self._filesystem = fs
- self._filesystem_factory = lambda: s3fs.S3FileSystem(
- config_kwargs=s3_config_kwargs
- )
-
- elif self._parsed_dataset_url.scheme in ['gs', 'gcs']:
- # Case 6
- # GCS support requires gcsfs to be installed
- try:
- import gcsfs
- except ImportError:
- raise ValueError('Must have gcsfs installed in order to use datasets on GCS. '
- 'Please install gcsfs and try again.')
-
- if not self._parsed_dataset_url.netloc:
- raise ValueError('URLs must be of the form gs://bucket/path or gcs://bucket/path')
-
- fs = gcsfs.GCSFileSystem()
- self._filesystem = GCSFSWrapper(fs)
- self._filesystem_factory = lambda: GCSFSWrapper(gcsfs.GCSFileSystem())
-
- else:
- # Case 7
- raise ValueError('Unsupported scheme in dataset url {}. '
- 'Currently, only "file" and "hdfs" are supported.'.format(self._parsed_dataset_url.scheme))
-
- def parsed_dataset_url(self):
- """
- :return: The urlparse'd dataset_url
- """
- return self._parsed_dataset_url
-
- def get_dataset_path(self):
- """
- The dataset path is different than the one in `_parsed_dataset_url` for some filesystems.
- For example s3fs expects the bucket name to be included in the path and doesn't support
- paths that start with a `/`
- """
- return get_dataset_path(self._parsed_dataset_url)
-
- def filesystem(self):
- """
- :return: The pyarrow filesystem object
- """
- return self._filesystem
-
- def filesystem_factory(self):
- """
- :return: A serializable function that can be used to recreate the filesystem
- object; useful for providing access to the filesystem object on distributed
- Spark executors.
- """
- return self._filesystem_factory
-
- def __getstate__(self):
- raise RuntimeError('Pickling FilesystemResolver is not supported as it may contain some '
- 'a file-system instance objects that do not support pickling but do not have '
- 'anti-pickling protection')
-
-
def get_filesystem_and_path_or_paths(url_or_urls, hdfs_driver='libhdfs3', s3_config_kwargs=None):
"""
Given a url or url list, return a tuple ``(filesystem, path_or_paths)``
@@ -219,7 +54,7 @@ def get_filesystem_and_path_or_paths(url_or_urls, hdfs_driver='libhdfs3', s3_con
if parsed_url.scheme != first_scheme or parsed_url.netloc != first_netloc:
raise ValueError('The dataset url list must contain url with the same scheme and netloc.')
- fs = FilesystemResolver(url_list[0], hdfs_driver=hdfs_driver, s3_config_kwargs=s3_config_kwargs).filesystem()
+ fs, _ = pyarrow.fs.FileSystem.from_uri(url_list[0])
path_list = [get_dataset_path(parsed_url) for parsed_url in parsed_url_list]
if isinstance(url_or_urls, list):
diff --git a/petastorm/hdfs/__init__.py b/petastorm/hdfs/__init__.py
deleted file mode 100644
index 98b74f46e..000000000
--- a/petastorm/hdfs/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-# Copyright (c) 2017-2018 Uber Technologies, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Common HDFS functional modules.
-"""
diff --git a/petastorm/hdfs/namenode.py b/petastorm/hdfs/namenode.py
deleted file mode 100644
index 4d86e50a3..000000000
--- a/petastorm/hdfs/namenode.py
+++ /dev/null
@@ -1,319 +0,0 @@
-# Copyright (c) 2017-2018 Uber Technologies, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import functools
-import inspect
-import logging
-import os
-from distutils.version import LooseVersion
-from xml.etree import ElementTree as ET
-
-import pyarrow
-import six
-from pyarrow.hdfs import HadoopFileSystem
-from pyarrow.lib import ArrowIOError
-from six.moves.urllib.parse import urlparse
-
-logger = logging.getLogger(__name__)
-
-
-class HdfsNamenodeResolver(object):
- """This class embodies functionality to resolve HDFS namenodes: per default or a nameservice."""
-
- def __init__(self, hadoop_configuration=None):
- """
- Sets the given HadoopConfiguration object for the resolver; or check for and pull hadoop
- configuration from an environment variable, in below preferred order to check.
-
- :param hadoop_configuration: an optional ``HadoopConfiguration``
- """
- self._hadoop_env = None
- self._hadoop_path = None
- if hadoop_configuration is None:
- # Pull from environment variable, in this preferred order
- for env in ["HADOOP_HOME", "HADOOP_PREFIX", "HADOOP_INSTALL"]:
- # Use the first available
- if env in os.environ:
- self._hadoop_env = env
- self._hadoop_path = os.environ[env]
- hadoop_configuration = {}
- self._load_site_xml_into_dict(
- '{}/etc/hadoop/hdfs-site.xml'.format(self._hadoop_path),
- hadoop_configuration)
- self._load_site_xml_into_dict(
- '{}/etc/hadoop/core-site.xml'.format(self._hadoop_path),
- hadoop_configuration)
- break
- if hadoop_configuration is None:
- # ensures at least an empty dict so no further checks required in member functions
- logger.warning('Unable to populate a sensible HadoopConfiguration for namenode resolution!\n'
- 'Path of last environment var (%s) tried [%s]. Please set up your Hadoop and \n'
- 'define environment variable HADOOP_HOME to point to your Hadoop installation path.',
- self._hadoop_env, self._hadoop_path)
- hadoop_configuration = {}
- self._hadoop_configuration = hadoop_configuration
-
- def _load_site_xml_into_dict(self, xml_path, in_dict):
- assert in_dict is not None, 'A valid dictionary must be supplied to process site XML'
- try:
- for prop in ET.parse(xml_path).getroot().iter('property'):
- in_dict[prop.find('name').text] = prop.find('value').text
- except ET.ParseError as ex:
- logger.error(
- 'Unable to obtain a root node for the supplied XML in %s: %s', xml_path, ex)
-
- def _build_error_string(self, msg):
- if self._hadoop_path is not None:
- return msg + '\nHadoop path {} in environment variable {}!\n' \
- 'Please check your hadoop configuration!' \
- .format(self._hadoop_path, self._hadoop_env)
- else:
- return msg + ' the supplied Spark HadoopConfiguration'
-
- def resolve_hdfs_name_service(self, namespace):
- """
- Given the namespace of a name service, resolves the configured list of name nodes, and
- returns them as a list of URL strings.
-
- :param namespace: the HDFS name service to resolve
- :return: a list of URL strings of the name nodes for the given name service; or None of not
- properly configured.
- """
- list_of_namenodes = None
- namenodes = self._hadoop_configuration.get('dfs.ha.namenodes.' + namespace)
- if namenodes:
- # populate namenode_urls list for the given namespace
- list_of_namenodes = []
- for nn in namenodes.split(','):
- prop_key = 'dfs.namenode.rpc-address.{}.{}'.format(namespace, nn)
- namenode_url = self._hadoop_configuration.get(prop_key)
- if namenode_url:
- list_of_namenodes.append(namenode_url)
- else:
- raise RuntimeError(self._build_error_string('Failed to get property "{}" from'
- .format(prop_key)))
- # Don't raise and exception otherwise, because the supplied name could just be a hostname.
- # We don't have an easy way to tell at this point.
- return list_of_namenodes
-
- def resolve_default_hdfs_service(self):
- """
- Resolves the default namenode using the given, or environment-derived, hadoop configuration,
- by parsing the configuration for ``fs.defaultFS``.
-
- :return: a tuple of structure ``(nameservice, list of namenodes)``
- """
- default_fs = self._hadoop_configuration.get('fs.defaultFS')
- if default_fs:
- nameservice = urlparse(default_fs).netloc
- list_of_namenodes = self.resolve_hdfs_name_service(nameservice)
- if list_of_namenodes is None:
- raise IOError(self._build_error_string('Unable to get namenodes for '
- 'default service "{}" from'
- .format(default_fs)))
- return [nameservice, list_of_namenodes]
- else:
- raise RuntimeError(
- self._build_error_string('Failed to get property "fs.defaultFS" from'))
-
-
-class HdfsConnectError(IOError):
- pass
-
-
-class MaxFailoversExceeded(RuntimeError):
- def __init__(self, failed_exceptions, max_failover_attempts, func_name):
- self.failed_exceptions = failed_exceptions
- self.max_failover_attempts = max_failover_attempts
- self.__name__ = func_name
- message = 'Failover attempts exceeded maximum ({}) for action "{}". ' \
- 'Exceptions:\n{}'.format(self.max_failover_attempts, self.__name__,
- self.failed_exceptions)
- super(MaxFailoversExceeded, self).__init__(message)
-
-
-class namenode_failover(object):
- """
- This decorator class ensures seamless namenode failover and retry, when an HDFS call fails
- due to StandbyException, up to a maximum retry.
- """
- # Allow for 2 failovers to a different namenode (i.e., if 2 NNs, try back to the original)
- MAX_FAILOVER_ATTEMPTS = 2
-
- def __init__(self, func):
- # limit wrapper attributes updated to just name and doc string
- functools.update_wrapper(self, func, ('__name__', '__doc__'))
- # cache the function name, only because we don't need the function object in __call__
- self._func_name = func.__name__
-
- def __get__(self, obj, obj_type):
- """ Support usage of decorator on instance methods. """
- # This avoids needing to cache the `obj` as member variable
- return functools.partial(self.__call__, obj)
-
- def __call__(self, obj, *args, **kwargs):
- """
- Attempts the function call, catching exception, re-connecting, and retrying, up to a
- pre-configured maximum number of attempts.
-
- :param obj: calling class instance, the HDFS client object
- :param args: positional arguments to func
- :param kwargs: arbitrary keyword arguments to func
- :return: return of ``func`` call; if max retries exceeded, raise a RuntimeError; or raise
- any unexpected exception
- """
- failures = []
- while len(failures) <= self.MAX_FAILOVER_ATTEMPTS:
- try:
- # Invoke the filesystem function on the connected HDFS object
- return getattr(obj._hdfs, self._func_name)(*args, **kwargs)
- except ArrowIOError as e:
- # An HDFS IP error occurred, retry HDFS connect to failover
- obj._do_connect()
- failures.append(e)
- # Failover attempts exceeded at this point!
- raise MaxFailoversExceeded(failures, self.MAX_FAILOVER_ATTEMPTS, self._func_name)
-
-
-def failover_all_class_methods(decorator):
- """
- This decorator function wraps an entire class to decorate each member method, incl. inherited.
-
- Adapted from https://stackoverflow.com/a/6307868
- """
-
- # Convenience function to ensure `decorate` gets wrapper function attributes: name, docs, etc.
- @functools.wraps(decorator)
- def decorate(cls):
- all_methods = inspect.getmembers(cls, inspect.isbuiltin) \
- + inspect.getmembers(cls, inspect.ismethod) \
- + inspect.getmembers(cls, inspect.isroutine)
- for name, method in all_methods:
- if not name.startswith('_'):
- # It's safer to exclude all protected/private method from decoration
- setattr(cls, name, decorator(method))
- return cls
-
- return decorate
-
-
-@failover_all_class_methods(namenode_failover)
-class HAHdfsClient(HadoopFileSystem):
- def __init__(self, connector_cls, list_of_namenodes, user=None):
- """
- Attempt HDFS connection operation, storing the hdfs object for intercepted calls.
-
- :param connector_cls: HdfsConnector class, so connector logic resides in one place, and
- also facilitates testing.
- :param list_of_namenodes: List of name nodes to failover, cached to enable un-/pickling
- :param user: String denoting username when connecting to HDFS. None implies login user.
- :return: None
- """
- # Use protected attribute to prevent mistaken decorator application
- self._connector_cls = connector_cls
- self._list_of_namenodes = list_of_namenodes
- self._user = user
- # Ensure that a retry will attempt a different name node in the list
- self._index_of_nn = -1
- self._do_connect()
-
- def __reduce__(self):
- """ Returns object state for pickling. """
- return self.__class__, (self._connector_cls, self._list_of_namenodes, self._user)
-
- def _do_connect(self):
- """ Makes a new connection attempt, caching the new namenode index and HDFS connection. """
- self._index_of_nn, self._hdfs = \
- self._connector_cls._try_next_namenode(self._index_of_nn, self._list_of_namenodes, user=self._user)
-
-
-class HdfsConnector(object):
- """ HDFS connector class where failover logic is implemented. Facilitates testing. """
- # Refactored constant
- MAX_NAMENODES = 2
-
- @classmethod
- def hdfs_connect_namenode(cls, url, driver='libhdfs3', user=None):
- """
- Performs HDFS connect in one place, facilitating easy change of driver and test mocking.
-
- :param url: An parsed URL object to the HDFS end point
- :param driver: An optional driver identifier
- :param user: String denoting username when connecting to HDFS. None implies login user.
- :return: Pyarrow HDFS connection object.
- """
-
- # According to pyarrow.hdfs.connect:
- # host : NameNode. Set to "default" for fs.defaultFS from core-site.xml
- # So we pass 'default' as a host name if the url does not specify one (i.e. hdfs:///...)
- if LooseVersion(pyarrow.__version__) < LooseVersion('0.12.0'):
- hostname = url.hostname or 'default'
- driver = driver
- else:
- hostname = six.text_type(url.hostname or 'default')
- driver = six.text_type(driver)
-
- kwargs = dict(user=user)
- if LooseVersion(pyarrow.__version__) < LooseVersion('0.17.0'):
- # Support for libhdfs3 was removed in v0.17.0, we include it here for backwards
- # compatibility
- kwargs['driver'] = driver
- return pyarrow.hdfs.connect(hostname, url.port or 8020, **kwargs)
-
- @classmethod
- def connect_to_either_namenode(cls, list_of_namenodes, user=None):
- """
- Returns a wrapper HadoopFileSystem "high-availability client" object that enables
- name node failover.
-
- Raises a HdfsConnectError if no successful connection can be established.
-
- :param list_of_namenodes: a required list of name node URLs to connect to.
- :param user: String denoting username when connecting to HDFS. None implies login user.
- :return: the wrapped HDFS connection object
- """
- assert list_of_namenodes is not None and len(list_of_namenodes) <= cls.MAX_NAMENODES, \
- "Must supply a list of namenodes, but HDFS only supports up to {} namenode URLs" \
- .format(cls.MAX_NAMENODES)
- return HAHdfsClient(cls, list_of_namenodes, user=user)
-
- @classmethod
- def _try_next_namenode(cls, index_of_nn, list_of_namenodes, user=None):
- """
- Instead of returning an inline function, this protected class method implements the
- failover logic: circling between namenodes using the supplied index as the last
- index into the name nodes list.
-
- :param list_of_namenodes: a required list of name node URLs to connect to.
- :param user: String denoting username when connecting to HDFS. None implies login user.
- :return: a tuple of (new index into list, actual pyarrow HDFS connection object), or raise
- a HdfsConnectError if no successful connection can be established.
- """
- nn_len = len(list_of_namenodes)
- if nn_len > 0:
- for i in range(1, cls.MAX_NAMENODES + 1):
- # Use a modulo mechanism to hit the "next" name node, as opposed to always
- # starting from the first entry in the list
- idx = (index_of_nn + i) % nn_len
- host = list_of_namenodes[idx]
- try:
- return idx, \
- cls.hdfs_connect_namenode(urlparse('hdfs://' + str(host or 'default')), user=user)
- except ArrowIOError as e:
- # This is an expected error if the namenode we are trying to connect to is
- # not the active one
- logger.debug('Attempted to connect to namenode %s but failed: %e', host, str(e))
- # It is a problem if we cannot connect to either of the namenodes when tried back-to-back,
- # so better raise an error.
- raise HdfsConnectError("Unable to connect to HDFS cluster!")
diff --git a/petastorm/hdfs/tests/__init__.py b/petastorm/hdfs/tests/__init__.py
deleted file mode 100644
index 1d38bb322..000000000
--- a/petastorm/hdfs/tests/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright (c) 2017-2018 Uber Technologies, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/petastorm/hdfs/tests/test_hdfs_namenode.py b/petastorm/hdfs/tests/test_hdfs_namenode.py
deleted file mode 100644
index 8aad60f18..000000000
--- a/petastorm/hdfs/tests/test_hdfs_namenode.py
+++ /dev/null
@@ -1,515 +0,0 @@
-# Copyright (c) 2017-2018 Uber Technologies, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import pickle
-import textwrap
-import unittest
-from typing import Dict
-
-import pytest
-from pyarrow.lib import ArrowIOError
-
-from unittest import mock
-
-from petastorm.hdfs.namenode import HdfsNamenodeResolver, HdfsConnector, \
- HdfsConnectError, MaxFailoversExceeded, HAHdfsClient, namenode_failover
-
-
-class HC:
- """Hadoop constants for testing convenience"""
- WARP_TURTLE = 'WARP-TURTLE'
- FS_WARP_TURTLE = 'hdfs://{}'.format(WARP_TURTLE)
- DEFAULT_NN = 'default:8020'
- WARP_TURTLE_NN1 = 'some.domain.name.net:8020'
- WARP_TURTLE_NN2 = 'some.other.domain.name.net:8020'
- WARP_TURTLE_PATH = '{}/x/y/z'.format(FS_WARP_TURTLE)
- HADOOP_CONFIG_PATH = '/etc/hadoop'
-
-
-class MockHadoopConfiguration(object):
- def __init__(self):
- self._dict = {}
-
- def get(self, key):
- val = None
- if key in self._dict:
- val = self._dict[key]
- # print('MockHadoopConfiguration: "{}" == "{}"'.format(key, val))
- return val
-
- def set(self, key, val):
- self._dict[key] = val
-
-
-class HdfsNamenodeResolverTest(unittest.TestCase):
- def setUp(self):
- """Initializes a mock hadoop config and a namenode resolver instance, for convenience."""
- self._hadoop_configuration = MockHadoopConfiguration()
- self.suj = HdfsNamenodeResolver(self._hadoop_configuration)
-
- def test_default_hdfs_service_errors(self):
- """Check error cases with connecting to default namenode"""
- # No default yields RuntimeError
- with self.assertRaises(RuntimeError):
- self.suj.resolve_default_hdfs_service()
- # Bad default FS yields IOError
- self._hadoop_configuration.set('fs.defaultFS', 'invalidFS')
- with self.assertRaises(IOError):
- self.suj.resolve_default_hdfs_service()
- # Random FS host yields IOError
- self._hadoop_configuration.set('fs.defaultFS', 'hdfs://random')
- with self.assertRaises(IOError):
- self.suj.resolve_default_hdfs_service()
- # Valid FS host with no namenode defined yields IOError
- self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE)
- with self.assertRaises(IOError):
- self.suj.resolve_default_hdfs_service()
-
- def test_default_hdfs_service_typical(self):
- """Check typical cases resolving default namenode"""
- # One nn
- self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE)
- self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn1')
- self._hadoop_configuration.set(
- 'dfs.namenode.rpc-address.{}.nn1'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN1)
- nameservice, namenodes = self.suj.resolve_default_hdfs_service()
- self.assertEqual(HC.WARP_TURTLE, nameservice)
- self.assertEqual(HC.WARP_TURTLE_NN1, namenodes[0])
-
- # Second of two nns, when the first is undefined
- self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn2,nn1')
- with self.assertRaises(RuntimeError):
- self.suj.resolve_default_hdfs_service()
-
- # Two valid and defined nns
- self._hadoop_configuration.set(
- 'dfs.namenode.rpc-address.{}.nn2'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN2)
- nameservice, namenodes = self.suj.resolve_default_hdfs_service()
- self.assertEqual(HC.WARP_TURTLE, nameservice)
- self.assertEqual(HC.WARP_TURTLE_NN2, namenodes[0])
- self.assertEqual(HC.WARP_TURTLE_NN1, namenodes[1])
-
- def test_resolve_hdfs_name_service(self):
- """Check edge cases with resolving a nameservice"""
- # Most cases already covered by test_default_hdfs_service_ok above...
- # Empty config or no namespace yields None
- self.assertIsNone(HdfsNamenodeResolver({}).resolve_hdfs_name_service(''))
- self.assertIsNone(self.suj.resolve_hdfs_name_service(''))
-
- # Test a single undefined namenode case, as well as an unconventional multi-NN case;
- # both result in an exception raised
- self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE)
- self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn1')
- with self.assertRaises(RuntimeError):
- self.suj.resolve_hdfs_name_service(HC.WARP_TURTLE)
-
- # Test multiple undefined NNs, which will also throw HdfsConnectError
- nns = 'nn1,nn2,nn3,nn4,nn5,nn6,nn7,nn8'
- self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), nns)
- with self.assertRaises(RuntimeError):
- self.suj.resolve_hdfs_name_service(HC.WARP_TURTLE)
-
-
-@pytest.fixture()
-def mock_hadoop_home_directory(tmpdir):
- """Create hadoop site files once"""
- tmpdir_path = tmpdir.strpath
- os.makedirs('{}{}'.format(tmpdir_path, HC.HADOOP_CONFIG_PATH))
- with open('{}{}/core-site.xml'.format(tmpdir_path, HC.HADOOP_CONFIG_PATH), 'wt') as f:
- f.write(textwrap.dedent("""\
-
-
-
-
- fs.defaultFS
- hdfs://{0}
-
-
- """.format(HC.WARP_TURTLE)))
- with open('{}{}/hdfs-site.xml'.format(tmpdir_path, HC.HADOOP_CONFIG_PATH), 'wt') as f:
- f.write(textwrap.dedent("""\
-
-
-
-
- dfs.ha.namenodes.{0}
- nn2,nn1
-
-
- dfs.namenode.rpc-address.{0}.nn1
- {1}
-
-
- dfs.namenode.rpc-address.{0}.nn2
- {2}
-
-
- dfs.ha.namenodes.foobar
- nn
-
-
- """.format(HC.WARP_TURTLE, HC.WARP_TURTLE_NN1, HC.WARP_TURTLE_NN2)))
- return tmpdir_path
-
-
-def _test_default_hdfs_service(mock_hadoop_home_directory, env_var):
- # Trigger env var evaluation
- suj = HdfsNamenodeResolver()
- assert env_var == suj._hadoop_env
- assert mock_hadoop_home_directory == suj._hadoop_path
- # List of namenodes returned nominally
- nameservice, namenodes = suj.resolve_default_hdfs_service()
- assert HC.WARP_TURTLE == nameservice
- assert HC.WARP_TURTLE_NN2 == namenodes[0]
- assert HC.WARP_TURTLE_NN1 == namenodes[1]
- # Exception raised for badly defined nameservice (XML issue)
- with pytest.raises(RuntimeError):
- suj.resolve_hdfs_name_service('foobar')
- # None for nonexistent nameservice (intentional design)
- assert suj.resolve_hdfs_name_service('nonexistent') is None
-
-
-def test_env_hadoop_home_prefix_install(mock_hadoop_home_directory):
- # The second+third env vars won't cause an error
- with mock.patch.dict(os.environ, {'HADOOP_PREFIX': '{}/no/where/here'.format(mock_hadoop_home_directory),
- 'HADOOP_INSTALL': '{}/no/where/here'.format(mock_hadoop_home_directory),
- 'HADOOP_HOME': mock_hadoop_home_directory}, clear=True):
- _test_default_hdfs_service(mock_hadoop_home_directory, 'HADOOP_HOME')
-
-
-def test_env_hadoop_prefix_only(mock_hadoop_home_directory):
- with mock.patch.dict(os.environ, {'HADOOP_PREFIX': mock_hadoop_home_directory}, clear=True):
- _test_default_hdfs_service(mock_hadoop_home_directory, 'HADOOP_PREFIX')
-
-
-def test_env_hadoop_install_only(mock_hadoop_home_directory):
- with mock.patch.dict(os.environ, {'HADOOP_INSTALL': mock_hadoop_home_directory}, clear=True):
- _test_default_hdfs_service(mock_hadoop_home_directory, 'HADOOP_INSTALL')
-
-
-def test_env_bad_hadoop_home_with_hadoop_install(mock_hadoop_home_directory):
- with mock.patch.dict(os.environ, {'HADOOP_HOME': '{}/no/where/here'.format(mock_hadoop_home_directory),
- 'HADOOP_INSTALL': mock_hadoop_home_directory}, clear=True):
- with pytest.raises(IOError):
- # Trigger env var evaluation
- HdfsNamenodeResolver()
-
-
-def test_unmatched_env_var(mock_hadoop_home_directory):
- with mock.patch.dict(os.environ, {'HADOOP_HOME_X': mock_hadoop_home_directory}, clear=True):
- suj = HdfsNamenodeResolver()
- # No successful connection
- with pytest.raises(RuntimeError):
- suj.resolve_default_hdfs_service()
-
-
-def test_bad_hadoop_path(mock_hadoop_home_directory):
- with mock.patch.dict(os.environ, {'HADOOP_HOME': '{}/no/where/here'.format(mock_hadoop_home_directory)},
- clear=True):
- with pytest.raises(IOError):
- HdfsNamenodeResolver()
-
-
-def test_missing_or_empty_core_site(mock_hadoop_home_directory):
- with mock.patch.dict(os.environ, {'HADOOP_HOME': mock_hadoop_home_directory}):
- # Make core-site "disappear" and make sure we raise an error
- cur_path = '{}{}/core-site.xml'.format(mock_hadoop_home_directory, HC.HADOOP_CONFIG_PATH)
- new_path = '{}{}/core-site.xml.bak'.format(mock_hadoop_home_directory, HC.HADOOP_CONFIG_PATH)
- os.rename(cur_path, new_path)
- with pytest.raises(IOError):
- HdfsNamenodeResolver()
- # Make an empty file
- with open(cur_path, 'wt') as f:
- f.write('')
- # Re-trigger env var evaluation
- suj = HdfsNamenodeResolver()
- with pytest.raises(RuntimeError):
- suj.resolve_default_hdfs_service()
- # restore file for other tests to work
- os.rename(new_path, cur_path)
-
-
-class HdfsMockError(Exception):
- pass
-
-
-class MockHdfs(object):
- """
- Any operation in the mock class raises an exception for the first N failovers, and then returns
- True after those N calls.
- """
-
- def __init__(self, n_failovers=0, user=None):
- self._n_failovers = n_failovers
- self._user = user
-
- def __getattribute__(self, attr):
- """
- The Mock HDFS simply calls check_failover, regardless of the filesystem operator invoked.
- """
-
- def op(*args, **kwargs):
- """ Mock operator """
- return self._check_failovers()
-
- # Of course, exclude any protected/private method calls
- if not attr.startswith('_'):
- return op
- return object.__getattribute__(self, attr)
-
- def _check_failovers(self):
- if self._n_failovers == -1:
- # Special case to exercise the unhandled exception path
- raise HdfsMockError('Some random HDFS exception!')
-
- if self._n_failovers > 0:
- self._n_failovers -= 1
- raise ArrowIOError('org.apache.hadoop.ipc.RemoteException'
- '(org.apache.hadoop.ipc.StandbyException): '
- 'Operation category READ is not supported in state standby. '
- 'Visit https://s.apache.org/sbnn-error\n'
- '{} namenode failover(s) remaining!'.format(self._n_failovers))
- return True
-
- def __reduce__(self):
- raise AssertionError('A connection object can not be pickled. If we try to pickle it, it means '
- 'it leaks somehow with a closure that holds it and we need to make sure it '
- 'does not happen.')
-
-
-class MockHdfsConnector(HdfsConnector):
- # static member for static hdfs_connect_namenode to access
- _n_failovers = 0
- _fail_n_next_connect = 0
- _connect_attempted: Dict[str, int] = {}
-
- @classmethod
- def reset(cls):
- cls._n_failovers = 0
- cls._fail_n_next_connect = 0
- cls._connect_attempted = {}
-
- @classmethod
- def set_n_failovers(cls, failovers):
- cls._n_failovers = failovers
-
- @classmethod
- def set_fail_n_next_connect(cls, fails):
- cls._fail_n_next_connect = fails
-
- @classmethod
- def connect_attempted(cls, host):
- if host in cls._connect_attempted:
- return cls._connect_attempted[host]
- else:
- return 0
-
- @classmethod
- def hdfs_connect_namenode(cls, url, driver='libhdfs3', user=None):
- netloc = '{}:{}'.format(url.hostname or 'default', url.port or 8020)
- if netloc not in cls._connect_attempted:
- cls._connect_attempted[netloc] = 0
- cls._connect_attempted[netloc] += 1
- # We just want to check connection attempt, but also raise an error if
- # 'default' or fail counter
- if cls._fail_n_next_connect != 0 or netloc == HC.DEFAULT_NN:
- if cls._fail_n_next_connect != 0:
- cls._fail_n_next_connect -= 1
- raise ArrowIOError('ERROR! Mock pyarrow hdfs connect to {} using driver {}, '
- 'fail counter: {}'
- .format(netloc, driver, cls._fail_n_next_connect))
- # Return a mock HDFS object with optional failovers, so that this connector mock can
- # be shared for the HAHdfsClient failover tests below.
- hdfs = MockHdfs(cls._n_failovers, user=user)
- if cls._n_failovers > 0:
- cls._n_failovers -= 1
- return hdfs
-
-
-class HdfsConnectorTest(unittest.TestCase):
- """Check correctness of connecting to a list of namenodes. """
-
- @classmethod
- def setUpClass(cls):
- """Initializes a mock HDFS namenode connector to track connection attempts."""
- cls.NAMENODES = [HC.WARP_TURTLE_NN1, HC.WARP_TURTLE_NN2]
- cls.suj = MockHdfsConnector()
-
- def setUp(self):
- self.suj.reset()
-
- def test_connect_to_either_namenode_ok(self):
- """ Test connecting OK to first of name node URLs. """
- self.assertIsNotNone(self.suj.connect_to_either_namenode(self.NAMENODES))
- self.assertEqual(0, self.suj.connect_attempted(HC.DEFAULT_NN))
- self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(0, self.suj.connect_attempted(HC.WARP_TURTLE_NN2))
-
- def test_connect_to_either_with_user(self):
- mock_name = "mock-manager"
- mocked_hdfs = self.suj.connect_to_either_namenode(self.NAMENODES, user=mock_name)
- self.assertEqual(mocked_hdfs._user, mock_name)
-
- def test_connect_to_either_namenode_ok_one_failed(self):
- """ With one failver, test that both namenode URLS are attempted, with 2nd connected. """
- self.suj.set_fail_n_next_connect(1)
- self.assertIsNotNone(self.suj.connect_to_either_namenode(self.NAMENODES))
- self.assertEqual(0, self.suj.connect_attempted(HC.DEFAULT_NN))
- self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN2))
-
- def test_connect_to_either_namenode_exception_two_failed(self):
- """ With 2 failvers, test no connection, and no exception is raised. """
- self.suj.set_fail_n_next_connect(2)
- with self.assertRaises(HdfsConnectError):
- self.suj.connect_to_either_namenode(self.NAMENODES)
- self.assertEqual(0, self.suj.connect_attempted(HC.DEFAULT_NN))
- self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN2))
-
- def test_connect_to_either_namenode_exception_four_failed(self):
- """ With 4 failvers, test that exception is raised. """
- self.suj.set_fail_n_next_connect(4)
- with self.assertRaises(HdfsConnectError):
- self.suj.connect_to_either_namenode(self.NAMENODES)
- with self.assertRaises(HdfsConnectError):
- self.suj.connect_to_either_namenode(self.NAMENODES)
- self.assertEqual(0, self.suj.connect_attempted(HC.DEFAULT_NN))
- self.assertEqual(2, self.suj.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(2, self.suj.connect_attempted(HC.WARP_TURTLE_NN2))
-
-
-class HAHdfsClientTest(unittest.TestCase):
- """
- The HDFS testing functions are enumerated explicitly below for simplicity and clarity, but it
- should impose but a minute maintenance overhead, since MockHdfs class requires no enumeration.
- """
-
- @classmethod
- def setUpClass(cls):
- """Initializes namenodes list and mock HDFS namenode connector."""
- cls.NAMENODES = [HC.WARP_TURTLE_NN1, HC.WARP_TURTLE_NN2]
-
- def setUp(self):
- """Reset mock HDFS failover count."""
- MockHdfsConnector.reset()
-
- def test_unhandled_exception(self):
- """Exercise the unhandled exception execution path."""
- MockHdfsConnector.set_n_failovers(-1)
- with self.assertRaises(HdfsMockError) as e:
- getattr(HAHdfsClient(MockHdfsConnector, [HC.WARP_TURTLE_NN1]), 'ls')('random')
- self.assertTrue('random HDFS exception' in str(e.exception))
-
- def test_invalid_namenode_list(self):
- """Make sure robust to invalid namenode list."""
- MockHdfsConnector.set_n_failovers(-1)
- with self.assertRaises(HdfsConnectError) as e:
- getattr(HAHdfsClient(MockHdfsConnector, []), 'ls')('random')
- self.assertTrue('Unable to connect' in str(e.exception))
- with self.assertRaises(HdfsConnectError) as e:
- getattr(HAHdfsClient(MockHdfsConnector, [None]), 'ls')('random')
- self.assertTrue('Unable to connect' in str(e.exception))
-
- def test_client_pickles_correctly(self):
- """
- Does HAHdfsClient pickle properly?
-
- Check that all attributes are equal, with the exception of the HDFS object, which is fine
- as long as the types are the same.
- """
- mock_name = "mock-manager"
- client = HAHdfsClient(MockHdfsConnector, self.NAMENODES, user=mock_name)
- client_unpickled = pickle.loads(pickle.dumps(client))
- self.assertEqual(client._connector_cls, client_unpickled._connector_cls)
- self.assertEqual(client._list_of_namenodes, client_unpickled._list_of_namenodes)
- self.assertEqual(client._index_of_nn, client_unpickled._index_of_nn)
- self.assertEqual(client._user, client_unpickled._user)
- self.assertEqual(type(client._hdfs), type(client_unpickled._hdfs))
-
- def _try_failover_combos(self, func, *args, **kwargs):
- """Common tests for each of the known HDFS operators, with varying failover counts."""
- MockHdfsConnector.set_n_failovers(1)
- suj = HAHdfsClient(MockHdfsConnector, self.NAMENODES)
- self.assertTrue(getattr(suj, func)(*args, **kwargs))
-
- MockHdfsConnector.set_n_failovers(namenode_failover.MAX_FAILOVER_ATTEMPTS)
- suj = HAHdfsClient(MockHdfsConnector, self.NAMENODES)
- self.assertTrue(getattr(suj, func)(*args, **kwargs))
-
- MockHdfsConnector.set_n_failovers(namenode_failover.MAX_FAILOVER_ATTEMPTS + 1)
- suj = HAHdfsClient(MockHdfsConnector, self.NAMENODES)
- with self.assertRaises(MaxFailoversExceeded) as e:
- getattr(suj, func)(*args, **kwargs)
- self.assertEqual(len(e.exception.failed_exceptions),
- namenode_failover.MAX_FAILOVER_ATTEMPTS + 1)
- self.assertEqual(e.exception.max_failover_attempts, namenode_failover.MAX_FAILOVER_ATTEMPTS)
- self.assertEqual(e.exception.__name__, func)
- self.assertTrue('Failover attempts exceeded' in str(e.exception))
-
- def test_cat(self):
- self._try_failover_combos('cat', 'random')
-
- def test_chmod(self):
- self._try_failover_combos('chmod', 'random', 0)
-
- def test_chown(self):
- self._try_failover_combos('chown', 'random', 'user')
-
- def test_delete(self):
- self._try_failover_combos('delete', 'random', recursive=True)
-
- def test_df(self):
- self._try_failover_combos('df')
-
- def test_disk_usage(self):
- self._try_failover_combos('disk_usage', 'random')
-
- def test_download(self):
- self._try_failover_combos('download', 'random', None)
-
- def test_exists(self):
- self._try_failover_combos('exists', 'random')
-
- def test_get_capacity(self):
- self._try_failover_combos('get_capacity')
-
- def test_get_space_used(self):
- self._try_failover_combos('get_space_used')
-
- def test_info(self):
- self._try_failover_combos('info', 'random')
-
- def test_ls(self):
- self._try_failover_combos('ls', 'random', detail=True)
-
- def test_mkdir(self):
- self._try_failover_combos('mkdir', 'random', create_parents=False)
-
- def test_open(self):
- self._try_failover_combos('open', 'random', 'rb')
-
- def test_rename(self):
- self._try_failover_combos('rename', 'random', 'new_random')
-
- def test_rm(self):
- self._try_failover_combos('rm', 'random', recursive=True)
-
- def test_upload(self):
- self._try_failover_combos('upload', 'random', None)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/petastorm/py_dict_reader_worker.py b/petastorm/py_dict_reader_worker.py
index 86f9941e7..d74b9a4b4 100644
--- a/petastorm/py_dict_reader_worker.py
+++ b/petastorm/py_dict_reader_worker.py
@@ -22,7 +22,6 @@
from petastorm import utils
from petastorm.cache import NullCache
-from petastorm.compat import compat_piece_read
from petastorm.workers_pool import EmptyResultError
from petastorm.workers_pool.worker_base import WorkerBase
@@ -136,7 +135,7 @@ def process(self, piece_index, worker_predicate, shuffle_row_drop_partition):
self._dataset = pq.ParquetDataset(
self._dataset_path,
filesystem=self._filesystem,
- validate_schema=False, filters=self._arrow_filters)
+ validate_schema=False, filters=self._arrow_filters, use_legacy_dataset=True)
piece = self._split_pieces[piece_index]
@@ -255,8 +254,8 @@ def _load_rows_with_predicate(self, pq_file, piece, worker_predicate, shuffle_ro
def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_drop_partition):
# If integer_object_nulls is set to False, nullable integer fields are return as floats
# with nulls translated to nans
- data_frame = compat_piece_read(piece, lambda _: pq_file, columns=column_names,
- partitions=self._dataset.partitions).to_pandas(integer_object_nulls=True)
+ data_frame = piece.read(columns=column_names, partitions=self._dataset.partitions).to_pandas(
+ integer_object_nulls=True)
num_rows = len(data_frame)
num_partitions = shuffle_row_drop_partition[1]
diff --git a/petastorm/pyarrow_helpers/tests/test_batch_buffer.py b/petastorm/pyarrow_helpers/tests/test_batch_buffer.py
index 2b6de8500..cadd19171 100644
--- a/petastorm/pyarrow_helpers/tests/test_batch_buffer.py
+++ b/petastorm/pyarrow_helpers/tests/test_batch_buffer.py
@@ -15,7 +15,6 @@
import numpy as np
import pyarrow as pa
-from petastorm.compat import compat_column_data
from petastorm.pyarrow_helpers.batching_table_queue import BatchingTableQueue
@@ -43,16 +42,16 @@ def test_single_table_of_10_rows_added_and_2_batches_of_4_read():
next_batch = batcher.get()
assert 4 == next_batch.num_rows
- np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(0, 4)))
- np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(0, 4)))
+ np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(0, 4)))
+ np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(0, 4)))
# Get second batch of 4
assert not batcher.empty()
next_batch = batcher.get()
assert 4 == next_batch.num_rows
- np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(4, 8)))
- np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(4, 8)))
+ np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(4, 8)))
+ np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(4, 8)))
# No more batches available
assert batcher.empty()
@@ -77,8 +76,8 @@ def test_two_tables_of_10_added_reading_5_batches_of_4():
assert 4 == next_batch.num_rows
expected_values = list(range(i * 4, i * 4 + 4))
- np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), expected_values)
- np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), expected_values)
+ np.testing.assert_equal(next_batch.column(0).to_pylist(), expected_values)
+ np.testing.assert_equal(next_batch.column(1).to_pylist(), expected_values)
def test_read_batches_larger_than_a_table_added():
@@ -94,15 +93,15 @@ def test_read_batches_larger_than_a_table_added():
next_batch = batcher.get()
assert 4 == next_batch.num_rows
- np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(0, 4)))
- np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(0, 4)))
+ np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(0, 4)))
+ np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(0, 4)))
assert not batcher.empty()
next_batch = batcher.get()
assert 4 == next_batch.num_rows
- np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(4, 8)))
- np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(4, 8)))
+ np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(4, 8)))
+ np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(4, 8)))
assert batcher.empty()
@@ -144,7 +143,7 @@ def test_random_table_size_and_random_batch_sizes():
for _ in range(next_read):
if not batcher.empty():
read_batch = batcher.get()
- for value in compat_column_data(read_batch.columns[0]):
+ for value in read_batch.columns[0]:
assert value.as_py() == read_seq
read_seq += 1
diff --git a/petastorm/reader.py b/petastorm/reader.py
index 9ca6d3c61..2b1cc79cb 100644
--- a/petastorm/reader.py
+++ b/petastorm/reader.py
@@ -386,7 +386,7 @@ def __init__(self, pyarrow_filesystem, dataset_path, schema_fields=None,
# 1. Resolve dataset path (hdfs://, file://) and open the parquet storage (dataset)
self.dataset = pq.ParquetDataset(dataset_path, filesystem=pyarrow_filesystem,
validate_schema=False, metadata_nthreads=10,
- filters=filters)
+ filters=filters, use_legacy_dataset=True)
if self.dataset.partitions is None:
# When read from parquet file list, the `dataset.partitions` will be None.
diff --git a/petastorm/reader_impl/arrow_table_serializer.py b/petastorm/reader_impl/arrow_table_serializer.py
index b7ce31c6a..b2ea143ac 100644
--- a/petastorm/reader_impl/arrow_table_serializer.py
+++ b/petastorm/reader_impl/arrow_table_serializer.py
@@ -28,6 +28,6 @@ def serialize(self, rows):
return sink.getvalue()
def deserialize(self, serialized_rows):
- reader = pa.open_stream(serialized_rows)
+ reader = pa.ipc.open_stream(serialized_rows)
table = reader.read_all()
return table
diff --git a/petastorm/spark/spark_dataset_converter.py b/petastorm/spark/spark_dataset_converter.py
index 946b90709..7c3fd547b 100644
--- a/petastorm/spark/spark_dataset_converter.py
+++ b/petastorm/spark/spark_dataset_converter.py
@@ -25,14 +25,13 @@
from typing import List, Any
import pyspark
-from pyarrow import LocalFileSystem
+from pyarrow import fs
from pyspark.sql.session import SparkSession
from pyspark.sql.types import ArrayType, DoubleType, FloatType
from six.moves.urllib.parse import urlparse
from petastorm import make_batch_reader
-from petastorm.fs_utils import (FilesystemResolver,
- get_filesystem_and_path_or_paths, normalize_dir_url)
+from petastorm.fs_utils import get_filesystem_and_path_or_paths, normalize_dir_url
if LooseVersion(pyspark.__version__) < LooseVersion('3.0'):
def vector_to_array(_1, _2='float32'):
@@ -79,10 +78,9 @@ def _get_parent_cache_dir_url():
def _default_delete_dir_handler(dataset_url):
- resolver = FilesystemResolver(dataset_url)
- fs = resolver.filesystem()
+ filesystem, path = fs.FileSystem.from_uri(dataset_url)
parsed = urlparse(dataset_url)
- if isinstance(fs, LocalFileSystem):
+ if isinstance(filesystem, fs.LocalFileSystem):
# pyarrow has a bug: LocalFileSystem.delete() is not implemented.
# https://issues.apache.org/jira/browse/ARROW-7953
# We can remove this branch once ARROW-7953 is fixed.
@@ -90,8 +88,8 @@ def _default_delete_dir_handler(dataset_url):
if os.path.exists(local_path):
shutil.rmtree(local_path, ignore_errors=False)
else:
- if fs.exists(parsed.path):
- fs.delete(parsed.path, recursive=True)
+ if filesystem.get_file_info(parsed.path).type == fs.FileType.Directory:
+ filesystem.delete_dir(parsed.path)
_delete_dir_handler = _default_delete_dir_handler
@@ -455,9 +453,9 @@ def _check_url(dir_url):
def _check_parent_cache_dir_url(dir_url):
"""Check dir url whether is suitable to be used as parent cache directory."""
_check_url(dir_url)
- fs, dir_path = get_filesystem_and_path_or_paths(dir_url)
+ filesystem, dir_path = get_filesystem_and_path_or_paths(dir_url)
if 'DATABRICKS_RUNTIME_VERSION' in os.environ and not _is_spark_local_mode():
- if isinstance(fs, LocalFileSystem):
+ if isinstance(filesystem, fs.LocalFileSystem):
# User need to use dbfs fuse URL.
if not dir_path.startswith('/dbfs/'):
logger.warning(
@@ -597,13 +595,13 @@ def _wait_file_available(url_list):
all files are available for reading. This is useful in some filesystems, such as S3 which only
providing eventually consistency.
"""
- fs, path_list = get_filesystem_and_path_or_paths(url_list)
+ filesystem, path_list = get_filesystem_and_path_or_paths(url_list)
logger.debug('Waiting some seconds until all parquet-store files appear at urls %s', ','.join(url_list))
def wait_for_file(path):
end_time = time.time() + _FILE_AVAILABILITY_WAIT_TIMEOUT_SECS
while time.time() < end_time:
- if fs.exists(path):
+ if filesystem.get_file_info(path).type == fs.FileType.File:
return True
time.sleep(0.1)
return False
@@ -622,11 +620,11 @@ def wait_for_file(path):
def _check_dataset_file_median_size(url_list):
- fs, path_list = get_filesystem_and_path_or_paths(url_list)
+ filesystem, path_list = get_filesystem_and_path_or_paths(url_list)
RECOMMENDED_FILE_SIZE_BYTES = 50 * 1024 * 1024
# TODO: also check file size for other file system.
- if isinstance(fs, LocalFileSystem):
+ if isinstance(filesystem, fs.LocalFileSystem):
pool = ThreadPool(64)
try:
file_size_list = pool.map(os.path.getsize, path_list)
diff --git a/petastorm/spark_utils.py b/petastorm/spark_utils.py
index ed4678d8e..08e669078 100644
--- a/petastorm/spark_utils.py
+++ b/petastorm/spark_utils.py
@@ -13,11 +13,10 @@
# limitations under the License.
"""A set of Spark specific helper functions for the petastorm dataset"""
-from six.moves.urllib.parse import urlparse
from petastorm import utils
from petastorm.etl.dataset_metadata import get_schema_from_dataset_url
-from petastorm.fs_utils import FilesystemResolver
+from petastorm.fs_utils import get_dataset_path
def dataset_as_rdd(dataset_url, spark_session, schema_fields=None, hdfs_driver='libhdfs3'):
@@ -33,12 +32,7 @@ def dataset_as_rdd(dataset_url, spark_session, schema_fields=None, hdfs_driver='
"""
schema = get_schema_from_dataset_url(dataset_url, hdfs_driver=hdfs_driver)
- dataset_url_parsed = urlparse(dataset_url)
-
- resolver = FilesystemResolver(dataset_url_parsed, spark_session.sparkContext._jsc.hadoopConfiguration(),
- hdfs_driver=hdfs_driver)
-
- dataset_df = spark_session.read.parquet(resolver.get_dataset_path())
+ dataset_df = spark_session.read.parquet(get_dataset_path(dataset_url))
if schema_fields is not None:
# If wanting a subset of fields, create the schema view and run a select on those fields
schema = schema.create_schema_view(schema_fields)
diff --git a/petastorm/tests/test_dataset_metadata.py b/petastorm/tests/test_dataset_metadata.py
index fb617a676..f3228b185 100644
--- a/petastorm/tests/test_dataset_metadata.py
+++ b/petastorm/tests/test_dataset_metadata.py
@@ -14,7 +14,7 @@
import numpy as np
import pytest
-import pyarrow
+from pyarrow import fs
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
@@ -33,9 +33,6 @@ def test_get_schema_from_dataset_url_bogus_url():
with pytest.raises(IOError):
get_schema_from_dataset_url('file:///non-existing-path')
- with pytest.raises(ValueError):
- get_schema_from_dataset_url('/invalid_url')
-
def test_serialize_filesystem_factory(tmpdir):
SimpleSchema = Unischema('SimpleSchema', [
@@ -43,7 +40,7 @@ def test_serialize_filesystem_factory(tmpdir):
UnischemaField('foo', np.int32, (), ScalarCodec(IntegerType()), False),
])
- class BogusFS(pyarrow.LocalFileSystem):
+ class BogusFS(fs.LocalFileSystem):
def __getstate__(self):
raise RuntimeError("can not serialize")
@@ -53,8 +50,8 @@ def __getstate__(self):
spark = SparkSession.builder.config('spark.driver.memory', '2g').master('local[2]').getOrCreate()
sc = spark.sparkContext
with materialize_dataset(spark, output_url, SimpleSchema, rowgroup_size_mb, filesystem_factory=BogusFS):
- rows_rdd = sc.parallelize(range(rows_count))\
- .map(lambda x: {'id': x, 'foo': x})\
+ rows_rdd = sc.parallelize(range(rows_count)) \
+ .map(lambda x: {'id': x, 'foo': x}) \
.map(lambda x: dict_to_spark_row(SimpleSchema, x))
spark.createDataFrame(rows_rdd, SimpleSchema.as_spark_schema()) \
diff --git a/petastorm/tests/test_end_to_end.py b/petastorm/tests/test_end_to_end.py
index 2af672a2e..c04531e68 100644
--- a/petastorm/tests/test_end_to_end.py
+++ b/petastorm/tests/test_end_to_end.py
@@ -12,20 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import tempfile
import operator
import os
+import tempfile
from concurrent.futures import ThreadPoolExecutor
from shutil import rmtree, copytree
-from six.moves.urllib.parse import urlparse
+from unittest import mock
import numpy as np
-import pyarrow.hdfs
import pytest
+from pyarrow import fs
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, ShortType, StringType
-
-from unittest import mock
+from six.moves.urllib.parse import urlparse
from petastorm import make_reader, make_batch_reader, TransformSpec
from petastorm.codecs import ScalarCodec, CompressedImageCodec
@@ -783,7 +782,7 @@ def test_pass_in_pyarrow_filesystem_to_materialize_dataset(synthetic_dataset, tm
a_moved_path = tmpdir.join('moved').strpath
copytree(synthetic_dataset.path, a_moved_path)
- local_fs = pyarrow.LocalFileSystem
+ local_fs = fs.LocalFileSystem
os.remove(a_moved_path + '/_common_metadata')
spark = SparkSession.builder.getOrCreate()
diff --git a/petastorm/tests/test_fs_utils.py b/petastorm/tests/test_fs_utils.py
deleted file mode 100644
index 423b6e36a..000000000
--- a/petastorm/tests/test_fs_utils.py
+++ /dev/null
@@ -1,219 +0,0 @@
-# Copyright (c) 2017-2018 Uber Technologies, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import unittest
-
-import dill
-import mock
-from pyarrow.filesystem import LocalFileSystem
-from pyarrow.lib import ArrowIOError
-from six.moves.urllib.parse import urlparse
-import s3fs
-
-from petastorm.fs_utils import FilesystemResolver, get_filesystem_and_path_or_paths
-from petastorm.gcsfs_helpers.gcsfs_wrapper import GCSFSWrapper
-from petastorm.hdfs.tests.test_hdfs_namenode import HC, MockHadoopConfiguration, \
- MockHdfs, MockHdfsConnector
-
-ABS_PATH = '/abs/path'
-
-
-class FilesystemResolverTest(unittest.TestCase):
- """
- Checks the full filesystem resolution functionality, exercising each URL interpretation case.
- """
-
- @classmethod
- def setUpClass(cls):
- cls.mock = MockHdfsConnector()
- cls.mock_name = "mock-manager"
-
- def setUp(self):
- """Initializes a mock hadoop config and populate with basic properties."""
- # Reset counters in mock connector
- self.mock.reset()
- self._hadoop_configuration = MockHadoopConfiguration()
- self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE)
- self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn2,nn1')
- self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn1'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN1)
- self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn2'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN2)
-
- def test_error_url_cases(self):
- """Various error cases that result in exception raised."""
- # Case 1: Schemeless path asserts
- with self.assertRaises(ValueError):
- FilesystemResolver(ABS_PATH, {})
-
- # Case 4b: HDFS default path case with NO defaultFS
- with self.assertRaises(RuntimeError):
- FilesystemResolver('hdfs:///some/path', {})
-
- # Case 4b: Using `default` as host, while apparently a pyarrow convention, is NOT valid
- with self.assertRaises(ArrowIOError):
- FilesystemResolver('hdfs://default', {})
-
- # Case 5: other schemes result in ValueError; urlparse to cover an else branch!
- with self.assertRaises(ValueError):
- FilesystemResolver(urlparse('http://foo/bar'), {})
- with self.assertRaises(ValueError):
- FilesystemResolver(urlparse('ftp://foo/bar'), {})
- with self.assertRaises(ValueError):
- FilesystemResolver(urlparse('ssh://foo/bar'), {})
-
- # s3 paths must have the bucket as the netloc
- with self.assertRaises(ValueError):
- FilesystemResolver(urlparse('s3:///foo/bar'), {})
-
- # GCS paths must have the bucket as the netloc
- with self.assertRaises(ValueError):
- FilesystemResolver(urlparse('gcs:///foo/bar'), {})
-
- def test_file_url(self):
- """ Case 2: File path, agnostic to content of hadoop configuration."""
- suj = FilesystemResolver('file://{}'.format(ABS_PATH), self._hadoop_configuration, connector=self.mock)
- self.assertTrue(isinstance(suj.filesystem(), LocalFileSystem))
- self.assertEqual('', suj.parsed_dataset_url().netloc)
- self.assertEqual(ABS_PATH, suj.get_dataset_path())
-
- # Make sure we did not capture FilesystemResolver in a closure by mistake
- dill.dumps(suj.filesystem_factory())
-
- def test_hdfs_url_with_nameservice(self):
- """ Case 3a: HDFS nameservice."""
- suj = FilesystemResolver(HC.WARP_TURTLE_PATH, self._hadoop_configuration, connector=self.mock,
- user=self.mock_name)
- self.assertEqual(MockHdfs, type(suj.filesystem()._hdfs))
- self.assertEqual(self.mock_name, suj.filesystem()._user)
- self.assertEqual(HC.WARP_TURTLE, suj.parsed_dataset_url().netloc)
- self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
- self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
-
- # Make sure we did not capture FilesystemResolver in a closure by mistake
- dill.dumps(suj.filesystem_factory())
-
- def test_hdfs_url_no_nameservice(self):
- """ Case 3b: HDFS with no nameservice should connect to default namenode."""
- suj = FilesystemResolver('hdfs:///some/path', self._hadoop_configuration, connector=self.mock,
- user=self.mock_name)
- self.assertEqual(MockHdfs, type(suj.filesystem()._hdfs))
- self.assertEqual(self.mock_name, suj.filesystem()._user)
- self.assertEqual(HC.WARP_TURTLE, suj.parsed_dataset_url().netloc)
- # ensure path is preserved in parsed URL
- self.assertEqual('/some/path', suj.get_dataset_path())
- self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
- self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
-
- # Make sure we did not capture FilesystemResolver in a closure by mistake
- dill.dumps(suj.filesystem_factory())
-
- def test_hdfs_url_direct_namenode(self):
- """ Case 4: direct namenode."""
- suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN1),
- self._hadoop_configuration,
- connector=self.mock,
- user=self.mock_name)
- self.assertEqual(MockHdfs, type(suj.filesystem()))
- self.assertEqual(self.mock_name, suj.filesystem()._user)
- self.assertEqual(HC.WARP_TURTLE_NN1, suj.parsed_dataset_url().netloc)
- self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
- self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
-
- # Make sure we did not capture FilesystemResolver in a closure by mistake
- dill.dumps(suj.filesystem_factory())
-
- def test_hdfs_url_direct_namenode_driver_libhdfs(self):
- suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN1),
- self._hadoop_configuration,
- connector=self.mock, hdfs_driver='libhdfs', user=self.mock_name)
- self.assertEqual(MockHdfs, type(suj.filesystem()))
- self.assertEqual(self.mock_name, suj.filesystem()._user)
- # Make sure we did not capture FilesystemResolver in a closure by mistake
- dill.dumps(suj.filesystem_factory())
-
- def test_hdfs_url_direct_namenode_retries(self):
- """ Case 4: direct namenode fails first two times thru, but 2nd retry succeeds."""
- self.mock.set_fail_n_next_connect(2)
- with self.assertRaises(ArrowIOError):
- suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
- self._hadoop_configuration,
- connector=self.mock, user=self.mock_name)
- self.assertEqual(1, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
- self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
- with self.assertRaises(ArrowIOError):
- suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
- self._hadoop_configuration,
- connector=self.mock)
- self.assertEqual(2, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
- self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
- # this one should connect "successfully"
- suj = FilesystemResolver('hdfs://{}/path'.format(HC.WARP_TURTLE_NN2),
- self._hadoop_configuration,
- connector=self.mock, user=self.mock_name)
- self.assertEqual(MockHdfs, type(suj.filesystem()))
- self.assertEqual(self.mock_name, suj.filesystem()._user)
- self.assertEqual(HC.WARP_TURTLE_NN2, suj.parsed_dataset_url().netloc)
- self.assertEqual(3, self.mock.connect_attempted(HC.WARP_TURTLE_NN2))
- self.assertEqual(0, self.mock.connect_attempted(HC.WARP_TURTLE_NN1))
- self.assertEqual(0, self.mock.connect_attempted(HC.DEFAULT_NN))
-
- def test_s3_without_s3fs(self):
- with mock.patch.dict('sys.modules', s3fs=None):
- # `import s3fs` will fail in this context
- with self.assertRaises(ValueError):
- FilesystemResolver(urlparse('s3://foo/bar'), {})
-
- def test_s3_url(self):
- suj = FilesystemResolver('s3://bucket{}'.format(ABS_PATH), self._hadoop_configuration, connector=self.mock)
- self.assertTrue(isinstance(suj.filesystem(), s3fs.S3FileSystem))
- self.assertEqual('bucket', suj.parsed_dataset_url().netloc)
- self.assertEqual('bucket' + ABS_PATH, suj.get_dataset_path())
-
- # Make sure we did not capture FilesystemResolver in a closure by mistake
- dill.dumps(suj.filesystem_factory())
-
- def test_gcs_without_gcsfs(self):
- with mock.patch.dict('sys.modules', gcsfs=None):
- # `import gcsfs` will fail in this context
- with self.assertRaises(ValueError):
- FilesystemResolver(urlparse('gcs://foo/bar'), {})
-
- def test_gcs_url(self):
- suj = FilesystemResolver('gcs://bucket{}'.format(ABS_PATH), self._hadoop_configuration, connector=self.mock)
- self.assertTrue(isinstance(suj.filesystem(), GCSFSWrapper))
- self.assertEqual('bucket', suj.parsed_dataset_url().netloc)
- self.assertEqual('bucket' + ABS_PATH, suj.get_dataset_path())
-
- # Make sure we did not capture FilesystemResolver in a closure by mistake
- dill.dumps(suj.filesystem_factory())
-
- def test_get_filesystem_and_path_or_paths(self):
- fs1, path1 = get_filesystem_and_path_or_paths('file:///some/path')
- assert isinstance(fs1, LocalFileSystem) and path1 == '/some/path'
-
- fs2, paths2 = get_filesystem_and_path_or_paths(
- ['file:///some/path/01.parquet', 'file:///some/path/02.parquet'])
- assert isinstance(fs2, LocalFileSystem) and \
- paths2 == ['/some/path/01.parquet', '/some/path/02.parquet']
-
- with self.assertRaises(ValueError):
- get_filesystem_and_path_or_paths(
- ['file:///some/path/01.parquet', 'hdfs:///some/path/02.parquet'])
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/petastorm/tests/test_parquet_reader.py b/petastorm/tests/test_parquet_reader.py
index e149c8183..33ff89112 100644
--- a/petastorm/tests/test_parquet_reader.py
+++ b/petastorm/tests/test_parquet_reader.py
@@ -21,7 +21,6 @@
from petastorm import make_batch_reader
from petastorm.arrow_reader_worker import ArrowReaderWorker
# pylint: disable=unnecessary-lambda
-from petastorm.compat import compat_get_metadata
from petastorm.tests.test_common import create_test_scalar_dataset
from petastorm.transform import TransformSpec
from petastorm.unischema import UnischemaField
@@ -128,7 +127,7 @@ def test_asymetric_parquet_pieces(reader_factory, tmpdir):
# We verify we have pieces with different number of row-groups
dataset = pq.ParquetDataset(tmpdir.strpath)
- row_group_counts = set(compat_get_metadata(piece, dataset.fs.open).num_row_groups for piece in dataset.pieces)
+ row_group_counts = set(piece.get_metadata().num_row_groups for piece in dataset.pieces)
assert len(row_group_counts) > 1
# Make sure we are not missing any rows.
diff --git a/petastorm/tests/test_spark_dataset_converter.py b/petastorm/tests/test_spark_dataset_converter.py
index 0ae3bfb66..99690c10f 100644
--- a/petastorm/tests/test_spark_dataset_converter.py
+++ b/petastorm/tests/test_spark_dataset_converter.py
@@ -25,6 +25,7 @@
import pytest
import py4j
import tensorflow.compat.v1 as tf # pylint: disable=import-error
+from pyarrow import fs
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import (ArrayType, BinaryType, BooleanType, ByteType,
DoubleType, FloatType, IntegerType, LongType,
@@ -32,7 +33,6 @@
from six.moves.urllib.parse import urlparse
from petastorm import make_batch_reader
-from petastorm.fs_utils import FilesystemResolver
from petastorm.spark import (SparkDatasetConverter, make_spark_converter,
spark_dataset_converter)
from petastorm.spark.spark_dataset_converter import (
@@ -151,8 +151,8 @@ def test_atexit(spark_test_ctx):
with open(os.path.join(spark_test_ctx.tempdir, 'test_atexit.out')) as f:
cache_dir_url = f.read()
- fs = FilesystemResolver(cache_dir_url).filesystem()
- assert not fs.exists(urlparse(cache_dir_url).path)
+ filesystem, path = fs.LocalFileSystem.from_uri(cache_dir_url)
+ assert not filesystem.get_file_info(path).is_file
def test_set_delete_handler(spark_test_ctx):
diff --git a/petastorm/tests/test_unischema.py b/petastorm/tests/test_unischema.py
index a7904b54f..62201e917 100644
--- a/petastorm/tests/test_unischema.py
+++ b/petastorm/tests/test_unischema.py
@@ -33,7 +33,8 @@
def _mock_parquet_dataset(partitions, arrow_schema):
"""Creates a pyarrow.ParquetDataset mock capable of returning:
- parquet_dataset.pieces[0].get_metadata(parquet_dataset.fs.open).schema.to_arrow_schema() == schema
+ parquet_dataset.pieces[0].get_metadata(parquet_dataset.fs.open
+ ).schema.to_arrow_schema() == schema
parquet_dataset.partitions = partitions
:param partitions: expected to be a list of pa.parquet.PartitionSet
diff --git a/petastorm/tools/copy_dataset.py b/petastorm/tools/copy_dataset.py
index 8d4c264ff..5d78fe221 100644
--- a/petastorm/tools/copy_dataset.py
+++ b/petastorm/tools/copy_dataset.py
@@ -25,10 +25,9 @@
from pyspark.sql import SparkSession
-from petastorm.unischema import match_unischema_fields
from petastorm.etl.dataset_metadata import materialize_dataset, get_schema_from_dataset_url
from petastorm.tools.spark_session_cli import add_configure_spark_arguments, configure_spark
-from petastorm.fs_utils import FilesystemResolver
+from petastorm.unischema import match_unischema_fields
def copy_dataset(spark, source_url, target_url, field_regex, not_null_fields, overwrite_output, partitions_count,
@@ -67,10 +66,7 @@ def copy_dataset(spark, source_url, target_url, field_regex, not_null_fields, ov
else:
subschema = schema
- resolver = FilesystemResolver(target_url, spark.sparkContext._jsc.hadoopConfiguration(),
- hdfs_driver=hdfs_driver, user=spark.sparkContext.sparkUser())
- with materialize_dataset(spark, target_url, subschema, row_group_size_mb,
- filesystem_factory=resolver.filesystem_factory()):
+ with materialize_dataset(spark, target_url, subschema, row_group_size_mb):
data_frame = spark.read \
.parquet(source_url)
diff --git a/petastorm/unischema.py b/petastorm/unischema.py
index 33cd662f3..9c2902407 100644
--- a/petastorm/unischema.py
+++ b/petastorm/unischema.py
@@ -30,8 +30,6 @@
from pyarrow.lib import StructType as pyStructType
from six import string_types
-from petastorm.compat import compat_get_metadata, compat_schema_field
-
# _UNISCHEMA_FIELD_ORDER available values are 'preserve_input_order' or 'alphabetical'
# Current default behavior is 'preserve_input_order', the legacy behavior is 'alphabetical', which is deprecated and
# will be removed in future versions.
@@ -316,7 +314,7 @@ def from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields=False):
:param omit_unsupported_fields: :class:`Boolean`
:return: A :class:`Unischema` object.
"""
- meta = compat_get_metadata(parquet_dataset.pieces[0], parquet_dataset.fs.open)
+ meta = parquet_dataset.pieces[0].get_metadata()
arrow_schema = meta.schema.to_arrow_schema()
unischema_fields = []
@@ -333,7 +331,7 @@ def from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields=False):
unischema_fields.append(UnischemaField(partition.name, numpy_dtype, (), None, False))
for column_name in arrow_schema.names:
- arrow_field = compat_schema_field(arrow_schema, column_name)
+ arrow_field = arrow_schema.field(column_name)
field_type = arrow_field.type
field_shape = ()
if isinstance(field_type, ListType):
diff --git a/petastorm/utils.py b/petastorm/utils.py
index 3f6a005d2..89ea8881b 100644
--- a/petastorm/utils.py
+++ b/petastorm/utils.py
@@ -20,9 +20,7 @@
import numpy as np
import pyarrow
from future.utils import raise_with_traceback
-from pyarrow.filesystem import LocalFileSystem
-
-from petastorm.compat import compat_get_metadata, compat_with_metadata
+from pyarrow import fs
logger = logging.getLogger(__name__)
@@ -112,14 +110,14 @@ def add_to_dataset_metadata(dataset, key, value):
with dataset.fs.open(metadata_file_path) as f:
arrow_metadata = pyarrow.parquet.read_metadata(f)
else:
- arrow_metadata = compat_get_metadata(dataset.pieces[0], dataset.fs.open)
+ arrow_metadata = dataset.pieces[0].get_metadata()
base_schema = arrow_metadata.schema.to_arrow_schema()
# base_schema.metadata may be None, e.g.
metadata_dict = base_schema.metadata or dict()
metadata_dict[key] = value
- schema = compat_with_metadata(base_schema, metadata_dict)
+ schema = base_schema.with_metadata(metadata_dict)
with dataset.fs.open(common_metadata_file_path, 'wb') as metadata_file:
pyarrow.parquet.write_metadata(schema, metadata_file)
@@ -127,8 +125,9 @@ def add_to_dataset_metadata(dataset, key, value):
# We have just modified _common_metadata file, but the filesystem implementation used by pyarrow does not
# update the .crc value. We better delete the .crc to make sure there is no mismatch between _common_metadata
# content and the checksum.
- if isinstance(dataset.fs, LocalFileSystem) and dataset.fs.exists(common_metadata_file_crc_path):
- try:
- dataset.fs.rm(common_metadata_file_crc_path)
- except NotImplementedError:
- os.remove(common_metadata_file_crc_path)
+ if isinstance(dataset.fs, fs.LocalFileSystem):
+ if dataset.fs.get_file_info(common_metadata_file_crc_path).is_file:
+ try:
+ dataset.fs.delete_file(common_metadata_file_crc_path)
+ except NotImplementedError:
+ os.remove(common_metadata_file_crc_path)