Skip to content

Commit

Permalink
Remove petastorm.hdfs
Browse files Browse the repository at this point in the history
  • Loading branch information
Yevgeni Litvin committed Nov 7, 2020
1 parent 6d9892a commit 0d5a755
Show file tree
Hide file tree
Showing 8 changed files with 5 additions and 1,092 deletions.
165 changes: 0 additions & 165 deletions petastorm/fs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
import six
from six.moves.urllib.parse import urlparse

from petastorm.gcsfs_helpers.gcsfs_wrapper import GCSFSWrapper
from petastorm.hdfs.namenode import HdfsNamenodeResolver, HdfsConnector

logger = logging.getLogger(__name__)


Expand All @@ -36,168 +33,6 @@ def get_dataset_path(parsed_url):
return parsed_url.path


class FilesystemResolver(object):
"""Resolves a dataset URL, makes a connection via pyarrow, and provides a filesystem object."""

def __init__(self, dataset_url, hadoop_configuration=None, connector=HdfsConnector,
hdfs_driver='libhdfs3', user=None, s3_config_kwargs=None):
"""
Given a dataset URL and an optional hadoop configuration, parse and interpret the URL to
instantiate a pyarrow filesystem.
Interpretation of the URL ``scheme://hostname:port/path`` occurs in the following order:
1. If no ``scheme``, no longer supported, so raise an exception!
2. If ``scheme`` is ``file``, use local filesystem path.
3. If ``scheme`` is ``hdfs``:
a. Try the ``hostname`` as a namespace and attempt to connect to a name node.
1. If that doesn't work, try connecting directly to namenode ``hostname:port``.
b. If no host, connect to the default name node.
5. If ``scheme`` is ``s3``, use s3fs. The user must manually install s3fs before using s3
6. If ``scheme`` is ``gs``or ``gcs``, use gcsfs. The user must manually install gcsfs before using GCS
7. Fail otherwise.
:param dataset_url: The hdfs URL or absolute path to the dataset
:param hadoop_configuration: an optional hadoop configuration
:param connector: the HDFS connector object to use (ONLY override for testing purposes)
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:param user: String denoting username when connecting to HDFS. None implies login user.
"""
# Cache both the original URL and the resolved, urlparsed dataset_url
self._dataset_url = dataset_url
self._parsed_dataset_url = None
# Cache the instantiated filesystem object
self._filesystem = None

if isinstance(self._dataset_url, six.string_types):
self._parsed_dataset_url = urlparse(self._dataset_url)
else:
self._parsed_dataset_url = self._dataset_url

if not self._parsed_dataset_url.scheme:
# Case 1
raise ValueError('ERROR! A scheme-less dataset url ({}) is no longer supported. '
'Please prepend "file://" for local filesystem.'.format(self._parsed_dataset_url.scheme))

elif self._parsed_dataset_url.scheme == 'file':
# Case 2: definitely local
self._filesystem = pyarrow.localfs
self._filesystem_factory = lambda: pyarrow.localfs

elif self._parsed_dataset_url.scheme == 'hdfs':

if hdfs_driver == 'libhdfs3':
# libhdfs3 does not do any namenode resolution itself so we do it manually. This is not necessary
# if using libhdfs

# Obtain singleton and force hadoop config evaluation
namenode_resolver = HdfsNamenodeResolver(hadoop_configuration)

# Since we can't tell for sure, first treat the URL as though it references a name service
if self._parsed_dataset_url.netloc:
# Case 3a: Use the portion of netloc before any port, which doesn't get lowercased
nameservice = self._parsed_dataset_url.netloc.split(':')[0]
namenodes = namenode_resolver.resolve_hdfs_name_service(nameservice)
if namenodes:
self._filesystem = connector.connect_to_either_namenode(namenodes, user=user)
self._filesystem_factory = lambda: connector.connect_to_either_namenode(namenodes, user=user)
if self._filesystem is None:
# Case 3a1: That didn't work; try the URL as a namenode host
self._filesystem = connector.hdfs_connect_namenode(self._parsed_dataset_url, user=user)
self._filesystem_factory = \
lambda url=self._dataset_url, user=user: \
connector.hdfs_connect_namenode(urlparse(url), user=user)
else:
# Case 3b: No netloc, so let's try to connect to default namenode
# HdfsNamenodeResolver will raise exception if it fails to connect.
nameservice, namenodes = namenode_resolver.resolve_default_hdfs_service()
filesystem = connector.connect_to_either_namenode(namenodes, user=user)
self._filesystem_factory = lambda: connector.connect_to_either_namenode(namenodes, user=user)
if filesystem is not None:
# Properly replace the parsed dataset URL once default namenode is confirmed
self._parsed_dataset_url = urlparse(
'hdfs://{}{}'.format(nameservice, self._parsed_dataset_url.path))
self._filesystem = filesystem
else:
self._filesystem = connector.hdfs_connect_namenode(self._parsed_dataset_url, hdfs_driver, user=user)
self._filesystem_factory = \
lambda url=self._dataset_url, user=user: \
connector.hdfs_connect_namenode(urlparse(url), hdfs_driver, user=user)

elif self._parsed_dataset_url.scheme in ('s3', 's3a', 's3n'):
# Case 5
# S3 support requires s3fs to be installed
try:
import s3fs
except ImportError:
raise ValueError('Must have s3fs installed in order to use datasets on s3. '
'Please install s3fs and try again.')

if not self._parsed_dataset_url.netloc:
raise ValueError('URLs must be of the form s3://bucket/path')

fs = s3fs.S3FileSystem(config_kwargs=s3_config_kwargs)
self._filesystem = fs
self._filesystem_factory = lambda: s3fs.S3FileSystem(
config_kwargs=s3_config_kwargs
)

elif self._parsed_dataset_url.scheme in ['gs', 'gcs']:
# Case 6
# GCS support requires gcsfs to be installed
try:
import gcsfs
except ImportError:
raise ValueError('Must have gcsfs installed in order to use datasets on GCS. '
'Please install gcsfs and try again.')

if not self._parsed_dataset_url.netloc:
raise ValueError('URLs must be of the form gs://bucket/path or gcs://bucket/path')

fs = gcsfs.GCSFileSystem()
self._filesystem = GCSFSWrapper(fs)
self._filesystem_factory = lambda: GCSFSWrapper(gcsfs.GCSFileSystem())

else:
# Case 7
raise ValueError('Unsupported scheme in dataset url {}. '
'Currently, only "file" and "hdfs" are supported.'.format(self._parsed_dataset_url.scheme))

def parsed_dataset_url(self):
"""
:return: The urlparse'd dataset_url
"""
return self._parsed_dataset_url

def get_dataset_path(self):
"""
The dataset path is different than the one in `_parsed_dataset_url` for some filesystems.
For example s3fs expects the bucket name to be included in the path and doesn't support
paths that start with a `/`
"""
return get_dataset_path(self._parsed_dataset_url)

def filesystem(self):
"""
:return: The pyarrow filesystem object
"""
return self._filesystem

def filesystem_factory(self):
"""
:return: A serializable function that can be used to recreate the filesystem
object; useful for providing access to the filesystem object on distributed
Spark executors.
"""
return self._filesystem_factory

def __getstate__(self):
raise RuntimeError('Pickling FilesystemResolver is not supported as it may contain some '
'a file-system instance objects that do not support pickling but do not have '
'anti-pickling protection')


def get_filesystem_and_path_or_paths(url_or_urls, hdfs_driver='libhdfs3', s3_config_kwargs=None):
"""
Given a url or url list, return a tuple ``(filesystem, path_or_paths)``
Expand Down
17 changes: 0 additions & 17 deletions petastorm/hdfs/__init__.py

This file was deleted.

Loading

0 comments on commit 0d5a755

Please sign in to comment.