diff --git a/petastorm/arrow_reader_worker.py b/petastorm/arrow_reader_worker.py index f03a7f456..bdac668e7 100644 --- a/petastorm/arrow_reader_worker.py +++ b/petastorm/arrow_reader_worker.py @@ -23,7 +23,6 @@ from pyarrow.parquet import ParquetFile from petastorm.cache import NullCache -from petastorm.compat import compat_piece_read, compat_table_columns_gen, compat_column_data from petastorm.workers_pool import EmptyResultError from petastorm.workers_pool.worker_base import WorkerBase @@ -45,11 +44,12 @@ def read_next(self, workers_pool, schema, ngram): # Convert arrow table columns into numpy. Strings are handled differently since to_pandas() returns # numpy array of dtype=object. result_dict = dict() - for column_name, column in compat_table_columns_gen(result_table): + for column_name in result_table.column_names: + column = result_table.column(column_name) # Assume we get only one chunk since reader worker reads one rowgroup at a time # `to_pandas` works slower when called on the entire `data` rather directly on a chunk. - if compat_column_data(result_table.column(0)).num_chunks == 1: + if result_table.column(0).num_chunks == 1: column_as_pandas = column.data.chunks[0].to_pandas() else: column_as_pandas = column.data.to_pandas() @@ -287,13 +287,13 @@ def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_ partition_names = self._dataset.partitions.partition_names # pyarrow would fail if we request a column names that the dataset is partitioned by - table = compat_piece_read(piece, lambda _: pq_file, columns=column_names - partition_names, - partitions=self._dataset.partitions) + table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions) # Drop columns we did not explicitly request. This may happen when a table is partitioned. Besides columns # requested, pyarrow will also return partition values. Having these unexpected fields will break some # downstream code. - loaded_column_names = set(column[0] for column in compat_table_columns_gen(table)) + + loaded_column_names = set(table.column_names) unasked_for_columns = loaded_column_names - column_names if unasked_for_columns: table = table.drop(unasked_for_columns) diff --git a/petastorm/compat.py b/petastorm/compat.py deleted file mode 100644 index 1f7cad10b..000000000 --- a/petastorm/compat.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright (c) 2017-2018 Uber Technologies, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""0.15.0 cancelled previously deprecated functions. We still want to support 0.11 as it is being used by some users. -This file implements compatibility interfaces. Once we drop support of 0.11, we can get rid of this file.""" - -import pyarrow as pa -from packaging import version -from pyarrow import parquet as pq - -_PYARROW_BEFORE_013 = version.parse(pa.__version__) < version.parse('0.13.0') -_PYARROW_BEFORE_014 = version.parse(pa.__version__) < version.parse('0.14.0') -_PYARROW_BEFORE_015 = version.parse(pa.__version__) < version.parse('0.15.0') - - -def compat_get_metadata(piece, open_func): - if _PYARROW_BEFORE_013: - arrow_metadata = piece.get_metadata(open_func) - else: - arrow_metadata = piece.get_metadata() - return arrow_metadata - - -def compat_piece_read(piece, open_file_func, **kwargs): - if _PYARROW_BEFORE_013: - table = piece.read(open_file_func=open_file_func, **kwargs) - else: - table = piece.read(**kwargs) - return table - - -def compat_table_columns_gen(table): - if _PYARROW_BEFORE_014: - for column in table.columns: - name = column.name - yield name, column - else: - for name in table.column_names: - column = table.column(name) - yield name, column - - -def compat_column_data(column): - if _PYARROW_BEFORE_015: - return column.data - else: - return column - - -def compat_make_parquet_piece(path, open_file_func, **kwargs): - if _PYARROW_BEFORE_013: - return pq.ParquetDatasetPiece(path, **kwargs) - else: - return pq.ParquetDatasetPiece(path, open_file_func=open_file_func, # pylint: disable=unexpected-keyword-arg - **kwargs) - - -def compat_with_metadata(schema, metadata): - if _PYARROW_BEFORE_015: - return schema.add_metadata(metadata) - else: - return schema.with_metadata(metadata) - - -def compat_schema_field(schema, name): - if _PYARROW_BEFORE_013: - return schema.field_by_name(name) - else: - return schema.field(name) diff --git a/petastorm/etl/dataset_metadata.py b/petastorm/etl/dataset_metadata.py index 352286c29..48c6f557c 100644 --- a/petastorm/etl/dataset_metadata.py +++ b/petastorm/etl/dataset_metadata.py @@ -25,7 +25,6 @@ from six.moves import cPickle as pickle from petastorm import utils -from petastorm.compat import compat_get_metadata, compat_make_parquet_piece from petastorm.etl.legacy import depickle_legacy_package_name_compatible from petastorm.fs_utils import FilesystemResolver, get_filesystem_and_path_or_paths, get_dataset_path from petastorm.unischema import Unischema @@ -286,8 +285,8 @@ def load_row_groups(dataset): # This is not a real "piece" and we won't have row_groups_per_file recorded for it. if row_groups_key != ".": for row_group in range(row_groups_per_file[row_groups_key]): - rowgroups.append(compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group, - partition_keys=piece.partition_keys)) + rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group, + partition_keys=piece.partition_keys)) return rowgroups @@ -323,18 +322,18 @@ def _split_row_groups(dataset): continue for row_group in range(row_groups_per_file[relative_path]): - split_piece = compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group, - partition_keys=piece.partition_keys) + split_piece = pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group, + partition_keys=piece.partition_keys) split_pieces.append(split_piece) return split_pieces def _split_piece(piece, fs_open): - metadata = compat_get_metadata(piece, fs_open) - return [compat_make_parquet_piece(piece.path, fs_open, - row_group=row_group, - partition_keys=piece.partition_keys) + metadata = piece.get_metadata() + return [pq.ParquetDatasetPiece(piece.path, open_file_func=fs_open, + row_group=row_group, + partition_keys=piece.partition_keys) for row_group in range(metadata.num_row_groups)] diff --git a/petastorm/etl/rowgroup_indexing.py b/petastorm/etl/rowgroup_indexing.py index b16d47ea5..f0f8b067f 100644 --- a/petastorm/etl/rowgroup_indexing.py +++ b/petastorm/etl/rowgroup_indexing.py @@ -21,7 +21,6 @@ from six.moves import range from petastorm import utils -from petastorm.compat import compat_piece_read, compat_make_parquet_piece from petastorm.etl import dataset_metadata from petastorm.etl.legacy import depickle_legacy_package_name_compatible from petastorm.fs_utils import FilesystemResolver @@ -98,8 +97,8 @@ def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_d fs = resolver.filesystem() # Create pyarrow piece - piece = compat_make_parquet_piece(piece_info.path, fs.open, row_group=piece_info.row_group, - partition_keys=piece_info.partition_keys) + piece = pq.ParquetDatasetPiece(piece_info.path, open_file_func=fs.open, row_group=piece_info.row_group, + partition_keys=piece_info.partition_keys) # Collect column names needed for indexing column_names = set() @@ -107,8 +106,7 @@ def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_d column_names.update(indexer.column_names) # Read columns needed for indexing - column_rows = compat_piece_read(piece, fs.open, columns=list(column_names), - partitions=partitions).to_pandas().to_dict('records') + column_rows = piece.read(columns=list(column_names), partitions=partitions).to_pandas().to_dict('records') # Decode columns values decoded_rows = [utils.decode_row(row, schema) for row in column_rows] diff --git a/petastorm/py_dict_reader_worker.py b/petastorm/py_dict_reader_worker.py index 86f9941e7..5f1edb120 100644 --- a/petastorm/py_dict_reader_worker.py +++ b/petastorm/py_dict_reader_worker.py @@ -22,7 +22,6 @@ from petastorm import utils from petastorm.cache import NullCache -from petastorm.compat import compat_piece_read from petastorm.workers_pool import EmptyResultError from petastorm.workers_pool.worker_base import WorkerBase @@ -255,8 +254,8 @@ def _load_rows_with_predicate(self, pq_file, piece, worker_predicate, shuffle_ro def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_drop_partition): # If integer_object_nulls is set to False, nullable integer fields are return as floats # with nulls translated to nans - data_frame = compat_piece_read(piece, lambda _: pq_file, columns=column_names, - partitions=self._dataset.partitions).to_pandas(integer_object_nulls=True) + data_frame = piece.read(columns=column_names, partitions=self._dataset.partitions).to_pandas( + integer_object_nulls=True) num_rows = len(data_frame) num_partitions = shuffle_row_drop_partition[1] diff --git a/petastorm/pyarrow_helpers/tests/test_batch_buffer.py b/petastorm/pyarrow_helpers/tests/test_batch_buffer.py index 2b6de8500..cadd19171 100644 --- a/petastorm/pyarrow_helpers/tests/test_batch_buffer.py +++ b/petastorm/pyarrow_helpers/tests/test_batch_buffer.py @@ -15,7 +15,6 @@ import numpy as np import pyarrow as pa -from petastorm.compat import compat_column_data from petastorm.pyarrow_helpers.batching_table_queue import BatchingTableQueue @@ -43,16 +42,16 @@ def test_single_table_of_10_rows_added_and_2_batches_of_4_read(): next_batch = batcher.get() assert 4 == next_batch.num_rows - np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(0, 4))) - np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(0, 4))) + np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(0, 4))) + np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(0, 4))) # Get second batch of 4 assert not batcher.empty() next_batch = batcher.get() assert 4 == next_batch.num_rows - np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(4, 8))) - np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(4, 8))) + np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(4, 8))) + np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(4, 8))) # No more batches available assert batcher.empty() @@ -77,8 +76,8 @@ def test_two_tables_of_10_added_reading_5_batches_of_4(): assert 4 == next_batch.num_rows expected_values = list(range(i * 4, i * 4 + 4)) - np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), expected_values) - np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), expected_values) + np.testing.assert_equal(next_batch.column(0).to_pylist(), expected_values) + np.testing.assert_equal(next_batch.column(1).to_pylist(), expected_values) def test_read_batches_larger_than_a_table_added(): @@ -94,15 +93,15 @@ def test_read_batches_larger_than_a_table_added(): next_batch = batcher.get() assert 4 == next_batch.num_rows - np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(0, 4))) - np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(0, 4))) + np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(0, 4))) + np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(0, 4))) assert not batcher.empty() next_batch = batcher.get() assert 4 == next_batch.num_rows - np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(4, 8))) - np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(4, 8))) + np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(4, 8))) + np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(4, 8))) assert batcher.empty() @@ -144,7 +143,7 @@ def test_random_table_size_and_random_batch_sizes(): for _ in range(next_read): if not batcher.empty(): read_batch = batcher.get() - for value in compat_column_data(read_batch.columns[0]): + for value in read_batch.columns[0]: assert value.as_py() == read_seq read_seq += 1 diff --git a/petastorm/tests/test_parquet_reader.py b/petastorm/tests/test_parquet_reader.py index e149c8183..33ff89112 100644 --- a/petastorm/tests/test_parquet_reader.py +++ b/petastorm/tests/test_parquet_reader.py @@ -21,7 +21,6 @@ from petastorm import make_batch_reader from petastorm.arrow_reader_worker import ArrowReaderWorker # pylint: disable=unnecessary-lambda -from petastorm.compat import compat_get_metadata from petastorm.tests.test_common import create_test_scalar_dataset from petastorm.transform import TransformSpec from petastorm.unischema import UnischemaField @@ -128,7 +127,7 @@ def test_asymetric_parquet_pieces(reader_factory, tmpdir): # We verify we have pieces with different number of row-groups dataset = pq.ParquetDataset(tmpdir.strpath) - row_group_counts = set(compat_get_metadata(piece, dataset.fs.open).num_row_groups for piece in dataset.pieces) + row_group_counts = set(piece.get_metadata().num_row_groups for piece in dataset.pieces) assert len(row_group_counts) > 1 # Make sure we are not missing any rows. diff --git a/petastorm/unischema.py b/petastorm/unischema.py index 33cd662f3..9c2902407 100644 --- a/petastorm/unischema.py +++ b/petastorm/unischema.py @@ -30,8 +30,6 @@ from pyarrow.lib import StructType as pyStructType from six import string_types -from petastorm.compat import compat_get_metadata, compat_schema_field - # _UNISCHEMA_FIELD_ORDER available values are 'preserve_input_order' or 'alphabetical' # Current default behavior is 'preserve_input_order', the legacy behavior is 'alphabetical', which is deprecated and # will be removed in future versions. @@ -316,7 +314,7 @@ def from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields=False): :param omit_unsupported_fields: :class:`Boolean` :return: A :class:`Unischema` object. """ - meta = compat_get_metadata(parquet_dataset.pieces[0], parquet_dataset.fs.open) + meta = parquet_dataset.pieces[0].get_metadata() arrow_schema = meta.schema.to_arrow_schema() unischema_fields = [] @@ -333,7 +331,7 @@ def from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields=False): unischema_fields.append(UnischemaField(partition.name, numpy_dtype, (), None, False)) for column_name in arrow_schema.names: - arrow_field = compat_schema_field(arrow_schema, column_name) + arrow_field = arrow_schema.field(column_name) field_type = arrow_field.type field_shape = () if isinstance(field_type, ListType): diff --git a/petastorm/utils.py b/petastorm/utils.py index 3f6a005d2..cb83bb85d 100644 --- a/petastorm/utils.py +++ b/petastorm/utils.py @@ -22,8 +22,6 @@ from future.utils import raise_with_traceback from pyarrow.filesystem import LocalFileSystem -from petastorm.compat import compat_get_metadata, compat_with_metadata - logger = logging.getLogger(__name__) @@ -112,14 +110,14 @@ def add_to_dataset_metadata(dataset, key, value): with dataset.fs.open(metadata_file_path) as f: arrow_metadata = pyarrow.parquet.read_metadata(f) else: - arrow_metadata = compat_get_metadata(dataset.pieces[0], dataset.fs.open) + arrow_metadata = dataset.pieces[0].get_metadata() base_schema = arrow_metadata.schema.to_arrow_schema() # base_schema.metadata may be None, e.g. metadata_dict = base_schema.metadata or dict() metadata_dict[key] = value - schema = compat_with_metadata(base_schema, metadata_dict) + schema = base_schema.with_metadata(metadata_dict) with dataset.fs.open(common_metadata_file_path, 'wb') as metadata_file: pyarrow.parquet.write_metadata(schema, metadata_file)