diff --git a/petastorm/fs_utils.py b/petastorm/fs_utils.py index 6cc45862f..9c09fed67 100644 --- a/petastorm/fs_utils.py +++ b/petastorm/fs_utils.py @@ -17,9 +17,6 @@ import six from six.moves.urllib.parse import urlparse -from petastorm.gcsfs_helpers.gcsfs_wrapper import GCSFSWrapper -from petastorm.hdfs.namenode import HdfsNamenodeResolver, HdfsConnector - logger = logging.getLogger(__name__) @@ -36,168 +33,6 @@ def get_dataset_path(parsed_url): return parsed_url.path -class FilesystemResolver(object): - """Resolves a dataset URL, makes a connection via pyarrow, and provides a filesystem object.""" - - def __init__(self, dataset_url, hadoop_configuration=None, connector=HdfsConnector, - hdfs_driver='libhdfs3', user=None, s3_config_kwargs=None): - """ - Given a dataset URL and an optional hadoop configuration, parse and interpret the URL to - instantiate a pyarrow filesystem. - - Interpretation of the URL ``scheme://hostname:port/path`` occurs in the following order: - - 1. If no ``scheme``, no longer supported, so raise an exception! - 2. If ``scheme`` is ``file``, use local filesystem path. - 3. If ``scheme`` is ``hdfs``: - a. Try the ``hostname`` as a namespace and attempt to connect to a name node. - 1. If that doesn't work, try connecting directly to namenode ``hostname:port``. - b. If no host, connect to the default name node. - 5. If ``scheme`` is ``s3``, use s3fs. The user must manually install s3fs before using s3 - 6. If ``scheme`` is ``gs``or ``gcs``, use gcsfs. The user must manually install gcsfs before using GCS - 7. Fail otherwise. - - :param dataset_url: The hdfs URL or absolute path to the dataset - :param hadoop_configuration: an optional hadoop configuration - :param connector: the HDFS connector object to use (ONLY override for testing purposes) - :param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are - libhdfs (java through JNI) or libhdfs3 (C++) - :param user: String denoting username when connecting to HDFS. None implies login user. - """ - # Cache both the original URL and the resolved, urlparsed dataset_url - self._dataset_url = dataset_url - self._parsed_dataset_url = None - # Cache the instantiated filesystem object - self._filesystem = None - - if isinstance(self._dataset_url, six.string_types): - self._parsed_dataset_url = urlparse(self._dataset_url) - else: - self._parsed_dataset_url = self._dataset_url - - if not self._parsed_dataset_url.scheme: - # Case 1 - raise ValueError('ERROR! A scheme-less dataset url ({}) is no longer supported. ' - 'Please prepend "file://" for local filesystem.'.format(self._parsed_dataset_url.scheme)) - - elif self._parsed_dataset_url.scheme == 'file': - # Case 2: definitely local - self._filesystem = pyarrow.localfs - self._filesystem_factory = lambda: pyarrow.localfs - - elif self._parsed_dataset_url.scheme == 'hdfs': - - if hdfs_driver == 'libhdfs3': - # libhdfs3 does not do any namenode resolution itself so we do it manually. This is not necessary - # if using libhdfs - - # Obtain singleton and force hadoop config evaluation - namenode_resolver = HdfsNamenodeResolver(hadoop_configuration) - - # Since we can't tell for sure, first treat the URL as though it references a name service - if self._parsed_dataset_url.netloc: - # Case 3a: Use the portion of netloc before any port, which doesn't get lowercased - nameservice = self._parsed_dataset_url.netloc.split(':')[0] - namenodes = namenode_resolver.resolve_hdfs_name_service(nameservice) - if namenodes: - self._filesystem = connector.connect_to_either_namenode(namenodes, user=user) - self._filesystem_factory = lambda: connector.connect_to_either_namenode(namenodes, user=user) - if self._filesystem is None: - # Case 3a1: That didn't work; try the URL as a namenode host - self._filesystem = connector.hdfs_connect_namenode(self._parsed_dataset_url, user=user) - self._filesystem_factory = \ - lambda url=self._dataset_url, user=user: \ - connector.hdfs_connect_namenode(urlparse(url), user=user) - else: - # Case 3b: No netloc, so let's try to connect to default namenode - # HdfsNamenodeResolver will raise exception if it fails to connect. - nameservice, namenodes = namenode_resolver.resolve_default_hdfs_service() - filesystem = connector.connect_to_either_namenode(namenodes, user=user) - self._filesystem_factory = lambda: connector.connect_to_either_namenode(namenodes, user=user) - if filesystem is not None: - # Properly replace the parsed dataset URL once default namenode is confirmed - self._parsed_dataset_url = urlparse( - 'hdfs://{}{}'.format(nameservice, self._parsed_dataset_url.path)) - self._filesystem = filesystem - else: - self._filesystem = connector.hdfs_connect_namenode(self._parsed_dataset_url, hdfs_driver, user=user) - self._filesystem_factory = \ - lambda url=self._dataset_url, user=user: \ - connector.hdfs_connect_namenode(urlparse(url), hdfs_driver, user=user) - - elif self._parsed_dataset_url.scheme in ('s3', 's3a', 's3n'): - # Case 5 - # S3 support requires s3fs to be installed - try: - import s3fs - except ImportError: - raise ValueError('Must have s3fs installed in order to use datasets on s3. ' - 'Please install s3fs and try again.') - - if not self._parsed_dataset_url.netloc: - raise ValueError('URLs must be of the form s3://bucket/path') - - fs = s3fs.S3FileSystem(config_kwargs=s3_config_kwargs) - self._filesystem = fs - self._filesystem_factory = lambda: s3fs.S3FileSystem( - config_kwargs=s3_config_kwargs - ) - - elif self._parsed_dataset_url.scheme in ['gs', 'gcs']: - # Case 6 - # GCS support requires gcsfs to be installed - try: - import gcsfs - except ImportError: - raise ValueError('Must have gcsfs installed in order to use datasets on GCS. ' - 'Please install gcsfs and try again.') - - if not self._parsed_dataset_url.netloc: - raise ValueError('URLs must be of the form gs://bucket/path or gcs://bucket/path') - - fs = gcsfs.GCSFileSystem() - self._filesystem = GCSFSWrapper(fs) - self._filesystem_factory = lambda: GCSFSWrapper(gcsfs.GCSFileSystem()) - - else: - # Case 7 - raise ValueError('Unsupported scheme in dataset url {}. ' - 'Currently, only "file" and "hdfs" are supported.'.format(self._parsed_dataset_url.scheme)) - - def parsed_dataset_url(self): - """ - :return: The urlparse'd dataset_url - """ - return self._parsed_dataset_url - - def get_dataset_path(self): - """ - The dataset path is different than the one in `_parsed_dataset_url` for some filesystems. - For example s3fs expects the bucket name to be included in the path and doesn't support - paths that start with a `/` - """ - return get_dataset_path(self._parsed_dataset_url) - - def filesystem(self): - """ - :return: The pyarrow filesystem object - """ - return self._filesystem - - def filesystem_factory(self): - """ - :return: A serializable function that can be used to recreate the filesystem - object; useful for providing access to the filesystem object on distributed - Spark executors. - """ - return self._filesystem_factory - - def __getstate__(self): - raise RuntimeError('Pickling FilesystemResolver is not supported as it may contain some ' - 'a file-system instance objects that do not support pickling but do not have ' - 'anti-pickling protection') - - def get_filesystem_and_path_or_paths(url_or_urls, hdfs_driver='libhdfs3', s3_config_kwargs=None): """ Given a url or url list, return a tuple ``(filesystem, path_or_paths)`` diff --git a/petastorm/hdfs/__init__.py b/petastorm/hdfs/__init__.py deleted file mode 100644 index 98b74f46e..000000000 --- a/petastorm/hdfs/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright (c) 2017-2018 Uber Technologies, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Common HDFS functional modules. -""" diff --git a/petastorm/hdfs/namenode.py b/petastorm/hdfs/namenode.py deleted file mode 100644 index 4d86e50a3..000000000 --- a/petastorm/hdfs/namenode.py +++ /dev/null @@ -1,319 +0,0 @@ -# Copyright (c) 2017-2018 Uber Technologies, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools -import inspect -import logging -import os -from distutils.version import LooseVersion -from xml.etree import ElementTree as ET - -import pyarrow -import six -from pyarrow.hdfs import HadoopFileSystem -from pyarrow.lib import ArrowIOError -from six.moves.urllib.parse import urlparse - -logger = logging.getLogger(__name__) - - -class HdfsNamenodeResolver(object): - """This class embodies functionality to resolve HDFS namenodes: per default or a nameservice.""" - - def __init__(self, hadoop_configuration=None): - """ - Sets the given HadoopConfiguration object for the resolver; or check for and pull hadoop - configuration from an environment variable, in below preferred order to check. - - :param hadoop_configuration: an optional ``HadoopConfiguration`` - """ - self._hadoop_env = None - self._hadoop_path = None - if hadoop_configuration is None: - # Pull from environment variable, in this preferred order - for env in ["HADOOP_HOME", "HADOOP_PREFIX", "HADOOP_INSTALL"]: - # Use the first available - if env in os.environ: - self._hadoop_env = env - self._hadoop_path = os.environ[env] - hadoop_configuration = {} - self._load_site_xml_into_dict( - '{}/etc/hadoop/hdfs-site.xml'.format(self._hadoop_path), - hadoop_configuration) - self._load_site_xml_into_dict( - '{}/etc/hadoop/core-site.xml'.format(self._hadoop_path), - hadoop_configuration) - break - if hadoop_configuration is None: - # ensures at least an empty dict so no further checks required in member functions - logger.warning('Unable to populate a sensible HadoopConfiguration for namenode resolution!\n' - 'Path of last environment var (%s) tried [%s]. Please set up your Hadoop and \n' - 'define environment variable HADOOP_HOME to point to your Hadoop installation path.', - self._hadoop_env, self._hadoop_path) - hadoop_configuration = {} - self._hadoop_configuration = hadoop_configuration - - def _load_site_xml_into_dict(self, xml_path, in_dict): - assert in_dict is not None, 'A valid dictionary must be supplied to process site XML' - try: - for prop in ET.parse(xml_path).getroot().iter('property'): - in_dict[prop.find('name').text] = prop.find('value').text - except ET.ParseError as ex: - logger.error( - 'Unable to obtain a root node for the supplied XML in %s: %s', xml_path, ex) - - def _build_error_string(self, msg): - if self._hadoop_path is not None: - return msg + '\nHadoop path {} in environment variable {}!\n' \ - 'Please check your hadoop configuration!' \ - .format(self._hadoop_path, self._hadoop_env) - else: - return msg + ' the supplied Spark HadoopConfiguration' - - def resolve_hdfs_name_service(self, namespace): - """ - Given the namespace of a name service, resolves the configured list of name nodes, and - returns them as a list of URL strings. - - :param namespace: the HDFS name service to resolve - :return: a list of URL strings of the name nodes for the given name service; or None of not - properly configured. - """ - list_of_namenodes = None - namenodes = self._hadoop_configuration.get('dfs.ha.namenodes.' + namespace) - if namenodes: - # populate namenode_urls list for the given namespace - list_of_namenodes = [] - for nn in namenodes.split(','): - prop_key = 'dfs.namenode.rpc-address.{}.{}'.format(namespace, nn) - namenode_url = self._hadoop_configuration.get(prop_key) - if namenode_url: - list_of_namenodes.append(namenode_url) - else: - raise RuntimeError(self._build_error_string('Failed to get property "{}" from' - .format(prop_key))) - # Don't raise and exception otherwise, because the supplied name could just be a hostname. - # We don't have an easy way to tell at this point. - return list_of_namenodes - - def resolve_default_hdfs_service(self): - """ - Resolves the default namenode using the given, or environment-derived, hadoop configuration, - by parsing the configuration for ``fs.defaultFS``. - - :return: a tuple of structure ``(nameservice, list of namenodes)`` - """ - default_fs = self._hadoop_configuration.get('fs.defaultFS') - if default_fs: - nameservice = urlparse(default_fs).netloc - list_of_namenodes = self.resolve_hdfs_name_service(nameservice) - if list_of_namenodes is None: - raise IOError(self._build_error_string('Unable to get namenodes for ' - 'default service "{}" from' - .format(default_fs))) - return [nameservice, list_of_namenodes] - else: - raise RuntimeError( - self._build_error_string('Failed to get property "fs.defaultFS" from')) - - -class HdfsConnectError(IOError): - pass - - -class MaxFailoversExceeded(RuntimeError): - def __init__(self, failed_exceptions, max_failover_attempts, func_name): - self.failed_exceptions = failed_exceptions - self.max_failover_attempts = max_failover_attempts - self.__name__ = func_name - message = 'Failover attempts exceeded maximum ({}) for action "{}". ' \ - 'Exceptions:\n{}'.format(self.max_failover_attempts, self.__name__, - self.failed_exceptions) - super(MaxFailoversExceeded, self).__init__(message) - - -class namenode_failover(object): - """ - This decorator class ensures seamless namenode failover and retry, when an HDFS call fails - due to StandbyException, up to a maximum retry. - """ - # Allow for 2 failovers to a different namenode (i.e., if 2 NNs, try back to the original) - MAX_FAILOVER_ATTEMPTS = 2 - - def __init__(self, func): - # limit wrapper attributes updated to just name and doc string - functools.update_wrapper(self, func, ('__name__', '__doc__')) - # cache the function name, only because we don't need the function object in __call__ - self._func_name = func.__name__ - - def __get__(self, obj, obj_type): - """ Support usage of decorator on instance methods. """ - # This avoids needing to cache the `obj` as member variable - return functools.partial(self.__call__, obj) - - def __call__(self, obj, *args, **kwargs): - """ - Attempts the function call, catching exception, re-connecting, and retrying, up to a - pre-configured maximum number of attempts. - - :param obj: calling class instance, the HDFS client object - :param args: positional arguments to func - :param kwargs: arbitrary keyword arguments to func - :return: return of ``func`` call; if max retries exceeded, raise a RuntimeError; or raise - any unexpected exception - """ - failures = [] - while len(failures) <= self.MAX_FAILOVER_ATTEMPTS: - try: - # Invoke the filesystem function on the connected HDFS object - return getattr(obj._hdfs, self._func_name)(*args, **kwargs) - except ArrowIOError as e: - # An HDFS IP error occurred, retry HDFS connect to failover - obj._do_connect() - failures.append(e) - # Failover attempts exceeded at this point! - raise MaxFailoversExceeded(failures, self.MAX_FAILOVER_ATTEMPTS, self._func_name) - - -def failover_all_class_methods(decorator): - """ - This decorator function wraps an entire class to decorate each member method, incl. inherited. - - Adapted from https://stackoverflow.com/a/6307868 - """ - - # Convenience function to ensure `decorate` gets wrapper function attributes: name, docs, etc. - @functools.wraps(decorator) - def decorate(cls): - all_methods = inspect.getmembers(cls, inspect.isbuiltin) \ - + inspect.getmembers(cls, inspect.ismethod) \ - + inspect.getmembers(cls, inspect.isroutine) - for name, method in all_methods: - if not name.startswith('_'): - # It's safer to exclude all protected/private method from decoration - setattr(cls, name, decorator(method)) - return cls - - return decorate - - -@failover_all_class_methods(namenode_failover) -class HAHdfsClient(HadoopFileSystem): - def __init__(self, connector_cls, list_of_namenodes, user=None): - """ - Attempt HDFS connection operation, storing the hdfs object for intercepted calls. - - :param connector_cls: HdfsConnector class, so connector logic resides in one place, and - also facilitates testing. - :param list_of_namenodes: List of name nodes to failover, cached to enable un-/pickling - :param user: String denoting username when connecting to HDFS. None implies login user. - :return: None - """ - # Use protected attribute to prevent mistaken decorator application - self._connector_cls = connector_cls - self._list_of_namenodes = list_of_namenodes - self._user = user - # Ensure that a retry will attempt a different name node in the list - self._index_of_nn = -1 - self._do_connect() - - def __reduce__(self): - """ Returns object state for pickling. """ - return self.__class__, (self._connector_cls, self._list_of_namenodes, self._user) - - def _do_connect(self): - """ Makes a new connection attempt, caching the new namenode index and HDFS connection. """ - self._index_of_nn, self._hdfs = \ - self._connector_cls._try_next_namenode(self._index_of_nn, self._list_of_namenodes, user=self._user) - - -class HdfsConnector(object): - """ HDFS connector class where failover logic is implemented. Facilitates testing. """ - # Refactored constant - MAX_NAMENODES = 2 - - @classmethod - def hdfs_connect_namenode(cls, url, driver='libhdfs3', user=None): - """ - Performs HDFS connect in one place, facilitating easy change of driver and test mocking. - - :param url: An parsed URL object to the HDFS end point - :param driver: An optional driver identifier - :param user: String denoting username when connecting to HDFS. None implies login user. - :return: Pyarrow HDFS connection object. - """ - - # According to pyarrow.hdfs.connect: - # host : NameNode. Set to "default" for fs.defaultFS from core-site.xml - # So we pass 'default' as a host name if the url does not specify one (i.e. hdfs:///...) - if LooseVersion(pyarrow.__version__) < LooseVersion('0.12.0'): - hostname = url.hostname or 'default' - driver = driver - else: - hostname = six.text_type(url.hostname or 'default') - driver = six.text_type(driver) - - kwargs = dict(user=user) - if LooseVersion(pyarrow.__version__) < LooseVersion('0.17.0'): - # Support for libhdfs3 was removed in v0.17.0, we include it here for backwards - # compatibility - kwargs['driver'] = driver - return pyarrow.hdfs.connect(hostname, url.port or 8020, **kwargs) - - @classmethod - def connect_to_either_namenode(cls, list_of_namenodes, user=None): - """ - Returns a wrapper HadoopFileSystem "high-availability client" object that enables - name node failover. - - Raises a HdfsConnectError if no successful connection can be established. - - :param list_of_namenodes: a required list of name node URLs to connect to. - :param user: String denoting username when connecting to HDFS. None implies login user. - :return: the wrapped HDFS connection object - """ - assert list_of_namenodes is not None and len(list_of_namenodes) <= cls.MAX_NAMENODES, \ - "Must supply a list of namenodes, but HDFS only supports up to {} namenode URLs" \ - .format(cls.MAX_NAMENODES) - return HAHdfsClient(cls, list_of_namenodes, user=user) - - @classmethod - def _try_next_namenode(cls, index_of_nn, list_of_namenodes, user=None): - """ - Instead of returning an inline function, this protected class method implements the - failover logic: circling between namenodes using the supplied index as the last - index into the name nodes list. - - :param list_of_namenodes: a required list of name node URLs to connect to. - :param user: String denoting username when connecting to HDFS. None implies login user. - :return: a tuple of (new index into list, actual pyarrow HDFS connection object), or raise - a HdfsConnectError if no successful connection can be established. - """ - nn_len = len(list_of_namenodes) - if nn_len > 0: - for i in range(1, cls.MAX_NAMENODES + 1): - # Use a modulo mechanism to hit the "next" name node, as opposed to always - # starting from the first entry in the list - idx = (index_of_nn + i) % nn_len - host = list_of_namenodes[idx] - try: - return idx, \ - cls.hdfs_connect_namenode(urlparse('hdfs://' + str(host or 'default')), user=user) - except ArrowIOError as e: - # This is an expected error if the namenode we are trying to connect to is - # not the active one - logger.debug('Attempted to connect to namenode %s but failed: %e', host, str(e)) - # It is a problem if we cannot connect to either of the namenodes when tried back-to-back, - # so better raise an error. - raise HdfsConnectError("Unable to connect to HDFS cluster!") diff --git a/petastorm/hdfs/tests/__init__.py b/petastorm/hdfs/tests/__init__.py deleted file mode 100644 index 1d38bb322..000000000 --- a/petastorm/hdfs/tests/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2017-2018 Uber Technologies, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/petastorm/hdfs/tests/test_hdfs_namenode.py b/petastorm/hdfs/tests/test_hdfs_namenode.py deleted file mode 100644 index 8aad60f18..000000000 --- a/petastorm/hdfs/tests/test_hdfs_namenode.py +++ /dev/null @@ -1,515 +0,0 @@ -# Copyright (c) 2017-2018 Uber Technologies, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import pickle -import textwrap -import unittest -from typing import Dict - -import pytest -from pyarrow.lib import ArrowIOError - -from unittest import mock - -from petastorm.hdfs.namenode import HdfsNamenodeResolver, HdfsConnector, \ - HdfsConnectError, MaxFailoversExceeded, HAHdfsClient, namenode_failover - - -class HC: - """Hadoop constants for testing convenience""" - WARP_TURTLE = 'WARP-TURTLE' - FS_WARP_TURTLE = 'hdfs://{}'.format(WARP_TURTLE) - DEFAULT_NN = 'default:8020' - WARP_TURTLE_NN1 = 'some.domain.name.net:8020' - WARP_TURTLE_NN2 = 'some.other.domain.name.net:8020' - WARP_TURTLE_PATH = '{}/x/y/z'.format(FS_WARP_TURTLE) - HADOOP_CONFIG_PATH = '/etc/hadoop' - - -class MockHadoopConfiguration(object): - def __init__(self): - self._dict = {} - - def get(self, key): - val = None - if key in self._dict: - val = self._dict[key] - # print('MockHadoopConfiguration: "{}" == "{}"'.format(key, val)) - return val - - def set(self, key, val): - self._dict[key] = val - - -class HdfsNamenodeResolverTest(unittest.TestCase): - def setUp(self): - """Initializes a mock hadoop config and a namenode resolver instance, for convenience.""" - self._hadoop_configuration = MockHadoopConfiguration() - self.suj = HdfsNamenodeResolver(self._hadoop_configuration) - - def test_default_hdfs_service_errors(self): - """Check error cases with connecting to default namenode""" - # No default yields RuntimeError - with self.assertRaises(RuntimeError): - self.suj.resolve_default_hdfs_service() - # Bad default FS yields IOError - self._hadoop_configuration.set('fs.defaultFS', 'invalidFS') - with self.assertRaises(IOError): - self.suj.resolve_default_hdfs_service() - # Random FS host yields IOError - self._hadoop_configuration.set('fs.defaultFS', 'hdfs://random') - with self.assertRaises(IOError): - self.suj.resolve_default_hdfs_service() - # Valid FS host with no namenode defined yields IOError - self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE) - with self.assertRaises(IOError): - self.suj.resolve_default_hdfs_service() - - def test_default_hdfs_service_typical(self): - """Check typical cases resolving default namenode""" - # One nn - self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE) - self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn1') - self._hadoop_configuration.set( - 'dfs.namenode.rpc-address.{}.nn1'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN1) - nameservice, namenodes = self.suj.resolve_default_hdfs_service() - self.assertEqual(HC.WARP_TURTLE, nameservice) - self.assertEqual(HC.WARP_TURTLE_NN1, namenodes[0]) - - # Second of two nns, when the first is undefined - self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn2,nn1') - with self.assertRaises(RuntimeError): - self.suj.resolve_default_hdfs_service() - - # Two valid and defined nns - self._hadoop_configuration.set( - 'dfs.namenode.rpc-address.{}.nn2'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN2) - nameservice, namenodes = self.suj.resolve_default_hdfs_service() - self.assertEqual(HC.WARP_TURTLE, nameservice) - self.assertEqual(HC.WARP_TURTLE_NN2, namenodes[0]) - self.assertEqual(HC.WARP_TURTLE_NN1, namenodes[1]) - - def test_resolve_hdfs_name_service(self): - """Check edge cases with resolving a nameservice""" - # Most cases already covered by test_default_hdfs_service_ok above... - # Empty config or no namespace yields None - self.assertIsNone(HdfsNamenodeResolver({}).resolve_hdfs_name_service('')) - self.assertIsNone(self.suj.resolve_hdfs_name_service('')) - - # Test a single undefined namenode case, as well as an unconventional multi-NN case; - # both result in an exception raised - self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE) - self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn1') - with self.assertRaises(RuntimeError): - self.suj.resolve_hdfs_name_service(HC.WARP_TURTLE) - - # Test multiple undefined NNs, which will also throw HdfsConnectError - nns = 'nn1,nn2,nn3,nn4,nn5,nn6,nn7,nn8' - self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), nns) - with self.assertRaises(RuntimeError): - self.suj.resolve_hdfs_name_service(HC.WARP_TURTLE) - - -@pytest.fixture() -def mock_hadoop_home_directory(tmpdir): - """Create hadoop site files once""" - tmpdir_path = tmpdir.strpath - os.makedirs('{}{}'.format(tmpdir_path, HC.HADOOP_CONFIG_PATH)) - with open('{}{}/core-site.xml'.format(tmpdir_path, HC.HADOOP_CONFIG_PATH), 'wt') as f: - f.write(textwrap.dedent("""\ - - - - - fs.defaultFS - hdfs://{0} - - - """.format(HC.WARP_TURTLE))) - with open('{}{}/hdfs-site.xml'.format(tmpdir_path, HC.HADOOP_CONFIG_PATH), 'wt') as f: - f.write(textwrap.dedent("""\ - - - - - dfs.ha.namenodes.{0} - nn2,nn1 - - - dfs.namenode.rpc-address.{0}.nn1 - {1} - - - dfs.namenode.rpc-address.{0}.nn2 - {2} - - - dfs.ha.namenodes.foobar - nn - - - """.format(HC.WARP_TURTLE, HC.WARP_TURTLE_NN1, HC.WARP_TURTLE_NN2))) - return tmpdir_path - - -def _test_default_hdfs_service(mock_hadoop_home_directory, env_var): - # Trigger env var evaluation - suj = HdfsNamenodeResolver() - assert env_var == suj._hadoop_env - assert mock_hadoop_home_directory == suj._hadoop_path - # List of namenodes returned nominally - nameservice, namenodes = suj.resolve_default_hdfs_service() - assert HC.WARP_TURTLE == nameservice - assert HC.WARP_TURTLE_NN2 == namenodes[0] - assert HC.WARP_TURTLE_NN1 == namenodes[1] - # Exception raised for badly defined nameservice (XML issue) - with pytest.raises(RuntimeError): - suj.resolve_hdfs_name_service('foobar') - # None for nonexistent nameservice (intentional design) - assert suj.resolve_hdfs_name_service('nonexistent') is None - - -def test_env_hadoop_home_prefix_install(mock_hadoop_home_directory): - # The second+third env vars won't cause an error - with mock.patch.dict(os.environ, {'HADOOP_PREFIX': '{}/no/where/here'.format(mock_hadoop_home_directory), - 'HADOOP_INSTALL': '{}/no/where/here'.format(mock_hadoop_home_directory), - 'HADOOP_HOME': mock_hadoop_home_directory}, clear=True): - _test_default_hdfs_service(mock_hadoop_home_directory, 'HADOOP_HOME') - - -def test_env_hadoop_prefix_only(mock_hadoop_home_directory): - with mock.patch.dict(os.environ, {'HADOOP_PREFIX': mock_hadoop_home_directory}, clear=True): - _test_default_hdfs_service(mock_hadoop_home_directory, 'HADOOP_PREFIX') - - -def test_env_hadoop_install_only(mock_hadoop_home_directory): - with mock.patch.dict(os.environ, {'HADOOP_INSTALL': mock_hadoop_home_directory}, clear=True): - _test_default_hdfs_service(mock_hadoop_home_directory, 'HADOOP_INSTALL') - - -def test_env_bad_hadoop_home_with_hadoop_install(mock_hadoop_home_directory): - with mock.patch.dict(os.environ, {'HADOOP_HOME': '{}/no/where/here'.format(mock_hadoop_home_directory), - 'HADOOP_INSTALL': mock_hadoop_home_directory}, clear=True): - with pytest.raises(IOError): - # Trigger env var evaluation - HdfsNamenodeResolver() - - -def test_unmatched_env_var(mock_hadoop_home_directory): - with mock.patch.dict(os.environ, {'HADOOP_HOME_X': mock_hadoop_home_directory}, clear=True): - suj = HdfsNamenodeResolver() - # No successful connection - with pytest.raises(RuntimeError): - suj.resolve_default_hdfs_service() - - -def test_bad_hadoop_path(mock_hadoop_home_directory): - with mock.patch.dict(os.environ, {'HADOOP_HOME': '{}/no/where/here'.format(mock_hadoop_home_directory)}, - clear=True): - with pytest.raises(IOError): - HdfsNamenodeResolver() - - -def test_missing_or_empty_core_site(mock_hadoop_home_directory): - with mock.patch.dict(os.environ, {'HADOOP_HOME': mock_hadoop_home_directory}): - # Make core-site "disappear" and make sure we raise an error - cur_path = '{}{}/core-site.xml'.format(mock_hadoop_home_directory, HC.HADOOP_CONFIG_PATH) - new_path = '{}{}/core-site.xml.bak'.format(mock_hadoop_home_directory, HC.HADOOP_CONFIG_PATH) - os.rename(cur_path, new_path) - with pytest.raises(IOError): - HdfsNamenodeResolver() - # Make an empty file - with open(cur_path, 'wt') as f: - f.write('') - # Re-trigger env var evaluation - suj = HdfsNamenodeResolver() - with pytest.raises(RuntimeError): - suj.resolve_default_hdfs_service() - # restore file for other tests to work - os.rename(new_path, cur_path) - - -class HdfsMockError(Exception): - pass - - -class MockHdfs(object): - """ - Any operation in the mock class raises an exception for the first N failovers, and then returns - True after those N calls. - """ - - def __init__(self, n_failovers=0, user=None): - self._n_failovers = n_failovers - self._user = user - - def __getattribute__(self, attr): - """ - The Mock HDFS simply calls check_failover, regardless of the filesystem operator invoked. - """ - - def op(*args, **kwargs): - """ Mock operator """ - return self._check_failovers() - - # Of course, exclude any protected/private method calls - if not attr.startswith('_'): - return op - return object.__getattribute__(self, attr) - - def _check_failovers(self): - if self._n_failovers == -1: - # Special case to exercise the unhandled exception path - raise HdfsMockError('Some random HDFS exception!') - - if self._n_failovers > 0: - self._n_failovers -= 1 - raise ArrowIOError('org.apache.hadoop.ipc.RemoteException' - '(org.apache.hadoop.ipc.StandbyException): ' - 'Operation category READ is not supported in state standby. ' - 'Visit https://s.apache.org/sbnn-error\n' - '{} namenode failover(s) remaining!'.format(self._n_failovers)) - return True - - def __reduce__(self): - raise AssertionError('A connection object can not be pickled. If we try to pickle it, it means ' - 'it leaks somehow with a closure that holds it and we need to make sure it ' - 'does not happen.') - - -class MockHdfsConnector(HdfsConnector): - # static member for static hdfs_connect_namenode to access - _n_failovers = 0 - _fail_n_next_connect = 0 - _connect_attempted: Dict[str, int] = {} - - @classmethod - def reset(cls): - cls._n_failovers = 0 - cls._fail_n_next_connect = 0 - cls._connect_attempted = {} - - @classmethod - def set_n_failovers(cls, failovers): - cls._n_failovers = failovers - - @classmethod - def set_fail_n_next_connect(cls, fails): - cls._fail_n_next_connect = fails - - @classmethod - def connect_attempted(cls, host): - if host in cls._connect_attempted: - return cls._connect_attempted[host] - else: - return 0 - - @classmethod - def hdfs_connect_namenode(cls, url, driver='libhdfs3', user=None): - netloc = '{}:{}'.format(url.hostname or 'default', url.port or 8020) - if netloc not in cls._connect_attempted: - cls._connect_attempted[netloc] = 0 - cls._connect_attempted[netloc] += 1 - # We just want to check connection attempt, but also raise an error if - # 'default' or fail counter - if cls._fail_n_next_connect != 0 or netloc == HC.DEFAULT_NN: - if cls._fail_n_next_connect != 0: - cls._fail_n_next_connect -= 1 - raise ArrowIOError('ERROR! Mock pyarrow hdfs connect to {} using driver {}, ' - 'fail counter: {}' - .format(netloc, driver, cls._fail_n_next_connect)) - # Return a mock HDFS object with optional failovers, so that this connector mock can - # be shared for the HAHdfsClient failover tests below. - hdfs = MockHdfs(cls._n_failovers, user=user) - if cls._n_failovers > 0: - cls._n_failovers -= 1 - return hdfs - - -class HdfsConnectorTest(unittest.TestCase): - """Check correctness of connecting to a list of namenodes. """ - - @classmethod - def setUpClass(cls): - """Initializes a mock HDFS namenode connector to track connection attempts.""" - cls.NAMENODES = [HC.WARP_TURTLE_NN1, HC.WARP_TURTLE_NN2] - cls.suj = MockHdfsConnector() - - def setUp(self): - self.suj.reset() - - def test_connect_to_either_namenode_ok(self): - """ Test connecting OK to first of name node URLs. """ - self.assertIsNotNone(self.suj.connect_to_either_namenode(self.NAMENODES)) - self.assertEqual(0, self.suj.connect_attempted(HC.DEFAULT_NN)) - self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(0, self.suj.connect_attempted(HC.WARP_TURTLE_NN2)) - - def test_connect_to_either_with_user(self): - mock_name = "mock-manager" - mocked_hdfs = self.suj.connect_to_either_namenode(self.NAMENODES, user=mock_name) - self.assertEqual(mocked_hdfs._user, mock_name) - - def test_connect_to_either_namenode_ok_one_failed(self): - """ With one failver, test that both namenode URLS are attempted, with 2nd connected. """ - self.suj.set_fail_n_next_connect(1) - self.assertIsNotNone(self.suj.connect_to_either_namenode(self.NAMENODES)) - self.assertEqual(0, self.suj.connect_attempted(HC.DEFAULT_NN)) - self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN2)) - - def test_connect_to_either_namenode_exception_two_failed(self): - """ With 2 failvers, test no connection, and no exception is raised. """ - self.suj.set_fail_n_next_connect(2) - with self.assertRaises(HdfsConnectError): - self.suj.connect_to_either_namenode(self.NAMENODES) - self.assertEqual(0, self.suj.connect_attempted(HC.DEFAULT_NN)) - self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(1, self.suj.connect_attempted(HC.WARP_TURTLE_NN2)) - - def test_connect_to_either_namenode_exception_four_failed(self): - """ With 4 failvers, test that exception is raised. """ - self.suj.set_fail_n_next_connect(4) - with self.assertRaises(HdfsConnectError): - self.suj.connect_to_either_namenode(self.NAMENODES) - with self.assertRaises(HdfsConnectError): - self.suj.connect_to_either_namenode(self.NAMENODES) - self.assertEqual(0, self.suj.connect_attempted(HC.DEFAULT_NN)) - self.assertEqual(2, self.suj.connect_attempted(HC.WARP_TURTLE_NN1)) - self.assertEqual(2, self.suj.connect_attempted(HC.WARP_TURTLE_NN2)) - - -class HAHdfsClientTest(unittest.TestCase): - """ - The HDFS testing functions are enumerated explicitly below for simplicity and clarity, but it - should impose but a minute maintenance overhead, since MockHdfs class requires no enumeration. - """ - - @classmethod - def setUpClass(cls): - """Initializes namenodes list and mock HDFS namenode connector.""" - cls.NAMENODES = [HC.WARP_TURTLE_NN1, HC.WARP_TURTLE_NN2] - - def setUp(self): - """Reset mock HDFS failover count.""" - MockHdfsConnector.reset() - - def test_unhandled_exception(self): - """Exercise the unhandled exception execution path.""" - MockHdfsConnector.set_n_failovers(-1) - with self.assertRaises(HdfsMockError) as e: - getattr(HAHdfsClient(MockHdfsConnector, [HC.WARP_TURTLE_NN1]), 'ls')('random') - self.assertTrue('random HDFS exception' in str(e.exception)) - - def test_invalid_namenode_list(self): - """Make sure robust to invalid namenode list.""" - MockHdfsConnector.set_n_failovers(-1) - with self.assertRaises(HdfsConnectError) as e: - getattr(HAHdfsClient(MockHdfsConnector, []), 'ls')('random') - self.assertTrue('Unable to connect' in str(e.exception)) - with self.assertRaises(HdfsConnectError) as e: - getattr(HAHdfsClient(MockHdfsConnector, [None]), 'ls')('random') - self.assertTrue('Unable to connect' in str(e.exception)) - - def test_client_pickles_correctly(self): - """ - Does HAHdfsClient pickle properly? - - Check that all attributes are equal, with the exception of the HDFS object, which is fine - as long as the types are the same. - """ - mock_name = "mock-manager" - client = HAHdfsClient(MockHdfsConnector, self.NAMENODES, user=mock_name) - client_unpickled = pickle.loads(pickle.dumps(client)) - self.assertEqual(client._connector_cls, client_unpickled._connector_cls) - self.assertEqual(client._list_of_namenodes, client_unpickled._list_of_namenodes) - self.assertEqual(client._index_of_nn, client_unpickled._index_of_nn) - self.assertEqual(client._user, client_unpickled._user) - self.assertEqual(type(client._hdfs), type(client_unpickled._hdfs)) - - def _try_failover_combos(self, func, *args, **kwargs): - """Common tests for each of the known HDFS operators, with varying failover counts.""" - MockHdfsConnector.set_n_failovers(1) - suj = HAHdfsClient(MockHdfsConnector, self.NAMENODES) - self.assertTrue(getattr(suj, func)(*args, **kwargs)) - - MockHdfsConnector.set_n_failovers(namenode_failover.MAX_FAILOVER_ATTEMPTS) - suj = HAHdfsClient(MockHdfsConnector, self.NAMENODES) - self.assertTrue(getattr(suj, func)(*args, **kwargs)) - - MockHdfsConnector.set_n_failovers(namenode_failover.MAX_FAILOVER_ATTEMPTS + 1) - suj = HAHdfsClient(MockHdfsConnector, self.NAMENODES) - with self.assertRaises(MaxFailoversExceeded) as e: - getattr(suj, func)(*args, **kwargs) - self.assertEqual(len(e.exception.failed_exceptions), - namenode_failover.MAX_FAILOVER_ATTEMPTS + 1) - self.assertEqual(e.exception.max_failover_attempts, namenode_failover.MAX_FAILOVER_ATTEMPTS) - self.assertEqual(e.exception.__name__, func) - self.assertTrue('Failover attempts exceeded' in str(e.exception)) - - def test_cat(self): - self._try_failover_combos('cat', 'random') - - def test_chmod(self): - self._try_failover_combos('chmod', 'random', 0) - - def test_chown(self): - self._try_failover_combos('chown', 'random', 'user') - - def test_delete(self): - self._try_failover_combos('delete', 'random', recursive=True) - - def test_df(self): - self._try_failover_combos('df') - - def test_disk_usage(self): - self._try_failover_combos('disk_usage', 'random') - - def test_download(self): - self._try_failover_combos('download', 'random', None) - - def test_exists(self): - self._try_failover_combos('exists', 'random') - - def test_get_capacity(self): - self._try_failover_combos('get_capacity') - - def test_get_space_used(self): - self._try_failover_combos('get_space_used') - - def test_info(self): - self._try_failover_combos('info', 'random') - - def test_ls(self): - self._try_failover_combos('ls', 'random', detail=True) - - def test_mkdir(self): - self._try_failover_combos('mkdir', 'random', create_parents=False) - - def test_open(self): - self._try_failover_combos('open', 'random', 'rb') - - def test_rename(self): - self._try_failover_combos('rename', 'random', 'new_random') - - def test_rm(self): - self._try_failover_combos('rm', 'random', recursive=True) - - def test_upload(self): - self._try_failover_combos('upload', 'random', None) - - -if __name__ == '__main__': - unittest.main() diff --git a/petastorm/reader_impl/arrow_table_serializer.py b/petastorm/reader_impl/arrow_table_serializer.py index b7ce31c6a..b2ea143ac 100644 --- a/petastorm/reader_impl/arrow_table_serializer.py +++ b/petastorm/reader_impl/arrow_table_serializer.py @@ -28,6 +28,6 @@ def serialize(self, rows): return sink.getvalue() def deserialize(self, serialized_rows): - reader = pa.open_stream(serialized_rows) + reader = pa.ipc.open_stream(serialized_rows) table = reader.read_all() return table diff --git a/petastorm/spark/spark_dataset_converter.py b/petastorm/spark/spark_dataset_converter.py index ed2767168..7c3fd547b 100644 --- a/petastorm/spark/spark_dataset_converter.py +++ b/petastorm/spark/spark_dataset_converter.py @@ -88,8 +88,8 @@ def _default_delete_dir_handler(dataset_url): if os.path.exists(local_path): shutil.rmtree(local_path, ignore_errors=False) else: - if fs.exists(parsed.path): - fs.delete(parsed.path, recursive=True) + if filesystem.get_file_info(parsed.path).type == fs.FileType.Directory: + filesystem.delete_dir(parsed.path) _delete_dir_handler = _default_delete_dir_handler @@ -620,11 +620,11 @@ def wait_for_file(path): def _check_dataset_file_median_size(url_list): - fs, path_list = get_filesystem_and_path_or_paths(url_list) + filesystem, path_list = get_filesystem_and_path_or_paths(url_list) RECOMMENDED_FILE_SIZE_BYTES = 50 * 1024 * 1024 # TODO: also check file size for other file system. - if isinstance(fs, fs.LocalFileSystem): + if isinstance(filesystem, fs.LocalFileSystem): pool = ThreadPool(64) try: file_size_list = pool.map(os.path.getsize, path_list) diff --git a/petastorm/tests/test_fs_utils.py b/petastorm/tests/test_fs_utils.py deleted file mode 100644 index 3e77c7b25..000000000 --- a/petastorm/tests/test_fs_utils.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (c) 2017-2018 Uber Technologies, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import unittest - -from pyarrow import fs - -from petastorm.fs_utils import get_filesystem_and_path_or_paths -from petastorm.hdfs.tests.test_hdfs_namenode import HC, MockHadoopConfiguration, \ - MockHdfsConnector - -ABS_PATH = '/abs/path' - - -class FilesystemResolverTest(unittest.TestCase): - """ - Checks the full filesystem resolution functionality, exercising each URL interpretation case. - """ - - @classmethod - def setUpClass(cls): - cls.mock = MockHdfsConnector() - cls.mock_name = "mock-manager" - - def setUp(self): - """Initializes a mock hadoop config and populate with basic properties.""" - # Reset counters in mock connector - self.mock.reset() - self._hadoop_configuration = MockHadoopConfiguration() - self._hadoop_configuration.set('fs.defaultFS', HC.FS_WARP_TURTLE) - self._hadoop_configuration.set('dfs.ha.namenodes.{}'.format(HC.WARP_TURTLE), 'nn2,nn1') - self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn1'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN1) - self._hadoop_configuration.set('dfs.namenode.rpc-address.{}.nn2'.format(HC.WARP_TURTLE), HC.WARP_TURTLE_NN2) - - def test_get_filesystem_and_path_or_paths(self): - fs1, path1 = get_filesystem_and_path_or_paths('file:///some/path') - assert isinstance(fs1, fs.LocalFileSystem) and path1 == '/some/path' - - fs2, paths2 = get_filesystem_and_path_or_paths(['file:///some/path/01.parquet', 'file:///some/path/02.parquet']) - assert isinstance(fs2, fs.LocalFileSystem) and paths2 == ['/some/path/01.parquet', '/some/path/02.parquet'] - - with self.assertRaises(ValueError): - get_filesystem_and_path_or_paths( - ['file:///some/path/01.parquet', 'hdfs:///some/path/02.parquet']) - - -if __name__ == '__main__': - unittest.main()