Skip to content

Commit

Permalink
Deprecate compat library since we no longer support pre pyarrow 0.17
Browse files Browse the repository at this point in the history
  • Loading branch information
Yevgeni Litvin committed Nov 7, 2020
1 parent a38a283 commit 675fed3
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 125 deletions.
12 changes: 6 additions & 6 deletions petastorm/arrow_reader_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from pyarrow.parquet import ParquetFile

from petastorm.cache import NullCache
from petastorm.compat import compat_piece_read, compat_table_columns_gen, compat_column_data
from petastorm.workers_pool import EmptyResultError
from petastorm.workers_pool.worker_base import WorkerBase

Expand All @@ -45,11 +44,12 @@ def read_next(self, workers_pool, schema, ngram):
# Convert arrow table columns into numpy. Strings are handled differently since to_pandas() returns
# numpy array of dtype=object.
result_dict = dict()
for column_name, column in compat_table_columns_gen(result_table):
for column_name in result_table.column_names:
column = result_table.column(column_name)
# Assume we get only one chunk since reader worker reads one rowgroup at a time

# `to_pandas` works slower when called on the entire `data` rather directly on a chunk.
if compat_column_data(result_table.column(0)).num_chunks == 1:
if result_table.column(0).num_chunks == 1:
column_as_pandas = column.data.chunks[0].to_pandas()
else:
column_as_pandas = column.data.to_pandas()
Expand Down Expand Up @@ -287,13 +287,13 @@ def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_
partition_names = self._dataset.partitions.partition_names

# pyarrow would fail if we request a column names that the dataset is partitioned by
table = compat_piece_read(piece, lambda _: pq_file, columns=column_names - partition_names,
partitions=self._dataset.partitions)
table = piece.read(columns=column_names - partition_names, partitions=self._dataset.partitions)

# Drop columns we did not explicitly request. This may happen when a table is partitioned. Besides columns
# requested, pyarrow will also return partition values. Having these unexpected fields will break some
# downstream code.
loaded_column_names = set(column[0] for column in compat_table_columns_gen(table))

loaded_column_names = set(table.column_names)
unasked_for_columns = loaded_column_names - column_names
if unasked_for_columns:
table = table.drop(unasked_for_columns)
Expand Down
80 changes: 0 additions & 80 deletions petastorm/compat.py

This file was deleted.

17 changes: 8 additions & 9 deletions petastorm/etl/dataset_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from six.moves import cPickle as pickle

from petastorm import utils
from petastorm.compat import compat_get_metadata, compat_make_parquet_piece
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
from petastorm.fs_utils import FilesystemResolver, get_filesystem_and_path_or_paths, get_dataset_path
from petastorm.unischema import Unischema
Expand Down Expand Up @@ -286,8 +285,8 @@ def load_row_groups(dataset):
# This is not a real "piece" and we won't have row_groups_per_file recorded for it.
if row_groups_key != ".":
for row_group in range(row_groups_per_file[row_groups_key]):
rowgroups.append(compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys))
rowgroups.append(pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys))
return rowgroups


Expand Down Expand Up @@ -323,18 +322,18 @@ def _split_row_groups(dataset):
continue

for row_group in range(row_groups_per_file[relative_path]):
split_piece = compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys)
split_piece = pq.ParquetDatasetPiece(piece.path, open_file_func=dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys)
split_pieces.append(split_piece)

return split_pieces


def _split_piece(piece, fs_open):
metadata = compat_get_metadata(piece, fs_open)
return [compat_make_parquet_piece(piece.path, fs_open,
row_group=row_group,
partition_keys=piece.partition_keys)
metadata = piece.get_metadata()
return [pq.ParquetDatasetPiece(piece.path, open_file_func=fs_open,
row_group=row_group,
partition_keys=piece.partition_keys)
for row_group in range(metadata.num_row_groups)]


Expand Down
8 changes: 3 additions & 5 deletions petastorm/etl/rowgroup_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from six.moves import range

from petastorm import utils
from petastorm.compat import compat_piece_read, compat_make_parquet_piece
from petastorm.etl import dataset_metadata
from petastorm.etl.legacy import depickle_legacy_package_name_compatible
from petastorm.fs_utils import FilesystemResolver
Expand Down Expand Up @@ -98,17 +97,16 @@ def _index_columns(piece_info, dataset_url, partitions, indexers, schema, hdfs_d
fs = resolver.filesystem()

# Create pyarrow piece
piece = compat_make_parquet_piece(piece_info.path, fs.open, row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)
piece = pq.ParquetDatasetPiece(piece_info.path, open_file_func=fs.open, row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)

# Collect column names needed for indexing
column_names = set()
for indexer in indexers:
column_names.update(indexer.column_names)

# Read columns needed for indexing
column_rows = compat_piece_read(piece, fs.open, columns=list(column_names),
partitions=partitions).to_pandas().to_dict('records')
column_rows = piece.read(columns=list(column_names), partitions=partitions).to_pandas().to_dict('records')

# Decode columns values
decoded_rows = [utils.decode_row(row, schema) for row in column_rows]
Expand Down
5 changes: 2 additions & 3 deletions petastorm/py_dict_reader_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from petastorm import utils
from petastorm.cache import NullCache
from petastorm.compat import compat_piece_read
from petastorm.workers_pool import EmptyResultError
from petastorm.workers_pool.worker_base import WorkerBase

Expand Down Expand Up @@ -255,8 +254,8 @@ def _load_rows_with_predicate(self, pq_file, piece, worker_predicate, shuffle_ro
def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_drop_partition):
# If integer_object_nulls is set to False, nullable integer fields are return as floats
# with nulls translated to nans
data_frame = compat_piece_read(piece, lambda _: pq_file, columns=column_names,
partitions=self._dataset.partitions).to_pandas(integer_object_nulls=True)
data_frame = piece.read(columns=column_names, partitions=self._dataset.partitions).to_pandas(
integer_object_nulls=True)

num_rows = len(data_frame)
num_partitions = shuffle_row_drop_partition[1]
Expand Down
23 changes: 11 additions & 12 deletions petastorm/pyarrow_helpers/tests/test_batch_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import numpy as np
import pyarrow as pa

from petastorm.compat import compat_column_data
from petastorm.pyarrow_helpers.batching_table_queue import BatchingTableQueue


Expand Down Expand Up @@ -43,16 +42,16 @@ def test_single_table_of_10_rows_added_and_2_batches_of_4_read():
next_batch = batcher.get()

assert 4 == next_batch.num_rows
np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(0, 4)))
np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(0, 4)))
np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(0, 4)))
np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(0, 4)))

# Get second batch of 4
assert not batcher.empty()
next_batch = batcher.get()

assert 4 == next_batch.num_rows
np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(4, 8)))
np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(4, 8)))
np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(4, 8)))
np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(4, 8)))

# No more batches available
assert batcher.empty()
Expand All @@ -77,8 +76,8 @@ def test_two_tables_of_10_added_reading_5_batches_of_4():

assert 4 == next_batch.num_rows
expected_values = list(range(i * 4, i * 4 + 4))
np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), expected_values)
np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), expected_values)
np.testing.assert_equal(next_batch.column(0).to_pylist(), expected_values)
np.testing.assert_equal(next_batch.column(1).to_pylist(), expected_values)


def test_read_batches_larger_than_a_table_added():
Expand All @@ -94,15 +93,15 @@ def test_read_batches_larger_than_a_table_added():
next_batch = batcher.get()

assert 4 == next_batch.num_rows
np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(0, 4)))
np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(0, 4)))
np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(0, 4)))
np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(0, 4)))

assert not batcher.empty()
next_batch = batcher.get()

assert 4 == next_batch.num_rows
np.testing.assert_equal(compat_column_data(next_batch.column(0)).to_pylist(), list(range(4, 8)))
np.testing.assert_equal(compat_column_data(next_batch.column(1)).to_pylist(), list(range(4, 8)))
np.testing.assert_equal(next_batch.column(0).to_pylist(), list(range(4, 8)))
np.testing.assert_equal(next_batch.column(1).to_pylist(), list(range(4, 8)))

assert batcher.empty()

Expand Down Expand Up @@ -144,7 +143,7 @@ def test_random_table_size_and_random_batch_sizes():
for _ in range(next_read):
if not batcher.empty():
read_batch = batcher.get()
for value in compat_column_data(read_batch.columns[0]):
for value in read_batch.columns[0]:
assert value.as_py() == read_seq
read_seq += 1

Expand Down
3 changes: 1 addition & 2 deletions petastorm/tests/test_parquet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from petastorm import make_batch_reader
from petastorm.arrow_reader_worker import ArrowReaderWorker
# pylint: disable=unnecessary-lambda
from petastorm.compat import compat_get_metadata
from petastorm.tests.test_common import create_test_scalar_dataset
from petastorm.transform import TransformSpec
from petastorm.unischema import UnischemaField
Expand Down Expand Up @@ -128,7 +127,7 @@ def test_asymetric_parquet_pieces(reader_factory, tmpdir):

# We verify we have pieces with different number of row-groups
dataset = pq.ParquetDataset(tmpdir.strpath)
row_group_counts = set(compat_get_metadata(piece, dataset.fs.open).num_row_groups for piece in dataset.pieces)
row_group_counts = set(piece.get_metadata().num_row_groups for piece in dataset.pieces)
assert len(row_group_counts) > 1

# Make sure we are not missing any rows.
Expand Down
6 changes: 2 additions & 4 deletions petastorm/unischema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from pyarrow.lib import StructType as pyStructType
from six import string_types

from petastorm.compat import compat_get_metadata, compat_schema_field

# _UNISCHEMA_FIELD_ORDER available values are 'preserve_input_order' or 'alphabetical'
# Current default behavior is 'preserve_input_order', the legacy behavior is 'alphabetical', which is deprecated and
# will be removed in future versions.
Expand Down Expand Up @@ -316,7 +314,7 @@ def from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields=False):
:param omit_unsupported_fields: :class:`Boolean`
:return: A :class:`Unischema` object.
"""
meta = compat_get_metadata(parquet_dataset.pieces[0], parquet_dataset.fs.open)
meta = parquet_dataset.pieces[0].get_metadata()
arrow_schema = meta.schema.to_arrow_schema()
unischema_fields = []

Expand All @@ -333,7 +331,7 @@ def from_arrow_schema(cls, parquet_dataset, omit_unsupported_fields=False):
unischema_fields.append(UnischemaField(partition.name, numpy_dtype, (), None, False))

for column_name in arrow_schema.names:
arrow_field = compat_schema_field(arrow_schema, column_name)
arrow_field = arrow_schema.field(column_name)
field_type = arrow_field.type
field_shape = ()
if isinstance(field_type, ListType):
Expand Down
6 changes: 2 additions & 4 deletions petastorm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from future.utils import raise_with_traceback
from pyarrow.filesystem import LocalFileSystem

from petastorm.compat import compat_get_metadata, compat_with_metadata

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -112,14 +110,14 @@ def add_to_dataset_metadata(dataset, key, value):
with dataset.fs.open(metadata_file_path) as f:
arrow_metadata = pyarrow.parquet.read_metadata(f)
else:
arrow_metadata = compat_get_metadata(dataset.pieces[0], dataset.fs.open)
arrow_metadata = dataset.pieces[0].get_metadata()

base_schema = arrow_metadata.schema.to_arrow_schema()

# base_schema.metadata may be None, e.g.
metadata_dict = base_schema.metadata or dict()
metadata_dict[key] = value
schema = compat_with_metadata(base_schema, metadata_dict)
schema = base_schema.with_metadata(metadata_dict)

with dataset.fs.open(common_metadata_file_path, 'wb') as metadata_file:
pyarrow.parquet.write_metadata(schema, metadata_file)
Expand Down

0 comments on commit 675fed3

Please sign in to comment.