Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support writing and filtered reading from bbox columns in GeoParquet #3282

Merged
merged 43 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b77f289
REF: frontload metadata validation
nicholas-ys-tan May 14, 2024
9a17f22
REF: rename function
nicholas-ys-tan May 14, 2024
0725190
REF: frontload metadata validation for _read_feather
nicholas-ys-tan May 14, 2024
196e67d
REF: remove duplicate metadata validation code from _arrow_to_pandas
nicholas-ys-tan May 14, 2024
3ac2483
REF: import parquet as optional dependency, remove redundant line
nicholas-ys-tan May 14, 2024
c618c30
ENH: write failing tests
nicholas-ys-tan May 14, 2024
20ae75b
ENH: add write bbox column to _to_parquet
nicholas-ys-tan May 15, 2024
74d4177
ENH: add bbox filtering to _read_parquet
nicholas-ys-tan May 16, 2024
29046e5
ENH: add read_bbox_column to _read_parquet
nicholas-ys-tan May 16, 2024
90560d5
ENH: update geometry bbox calculation method
nicholas-ys-tan May 18, 2024
ba31f04
ENH: fix docstring of bbox tuple order
nicholas-ys-tan May 18, 2024
381186a
ENH: update how bbox column gets appended to table
nicholas-ys-tan May 18, 2024
81e7b38
ENH: allow use of both bbox and filters kwarg in read_parquet
nicholas-ys-tan May 19, 2024
1b95a11
ENH: fix failing tests
nicholas-ys-tan May 19, 2024
5b0736b
ENH: change if write bbox column to predicate with default name 'bbox'
nicholas-ys-tan May 20, 2024
c7c52ea
ENH: add fallback in case user does not have ds.dataset
nicholas-ys-tan May 20, 2024
53fb78b
ENH: changelog and some comments
nicholas-ys-tan May 20, 2024
c9c71e2
ENH: clean up rebase linting
nicholas-ys-tan May 20, 2024
5a23676
ENH: clean up rebase linting in arrow
nicholas-ys-tan May 20, 2024
a2cc212
Remove minimum pyarrow version in tests
nicholas-ys-tan May 25, 2024
a494c52
Merge branch 'main' into issue3252
nicholas-ys-tan May 25, 2024
40e06b5
linting
nicholas-ys-tan May 25, 2024
79761b6
hard code 'bbox' as column name in tests
nicholas-ys-tan May 25, 2024
1597a66
Add tests for filters without a bbox
nicholas-ys-tan May 25, 2024
c565e6c
pass in geometadata to prevent repeated decoding
nicholas-ys-tan May 25, 2024
b55fcfb
refactor _arrow_to_pandas to take geo_metadata
nicholas-ys-tan May 25, 2024
1be2f60
Fix bbox to include all intersections
nicholas-ys-tan May 26, 2024
f503c22
Update docstring
nicholas-ys-tan May 27, 2024
7aeefac
Update docstring
nicholas-ys-tan May 27, 2024
611c6ba
Update test description
nicholas-ys-tan May 27, 2024
a4583d6
simplify line
nicholas-ys-tan May 27, 2024
248f68d
update bbox_filter line
nicholas-ys-tan May 27, 2024
7e40345
remove redundant kwarg in test
nicholas-ys-tan May 27, 2024
9662fd2
docstring formatting
nicholas-ys-tan May 27, 2024
d91a8ef
docstring formatting
nicholas-ys-tan May 27, 2024
fac0e7c
updated from review
nicholas-ys-tan May 27, 2024
163b7d7
linting geodataframe
nicholas-ys-tan May 27, 2024
7c1e1a6
remove read_covering_column
jorisvandenbossche May 27, 2024
cecb4d6
simplify tests a bit
jorisvandenbossche May 27, 2024
d06cfcd
Change read_parquet to read metadata upfront before reading the data
jorisvandenbossche May 27, 2024
4fb7e6d
fix for older pyarrow
jorisvandenbossche May 27, 2024
efc149c
move validation to existing validation function
jorisvandenbossche May 27, 2024
7291940
Merge remote-tracking branch 'upstream/main' into issue3252
jorisvandenbossche May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ New features and improvements:
- `explore` now supports `GeoDataFrame`s with additional columns containing datetimes, uuids and
other non JSON serializable objects (#3261).
- The `GeoSeries.fillna` method now supports the `limit` keyword (#3290).
- Added support for `bbox` covering encoding in geoparquet. Can filter reading of parquet
files based on a bounding box, and write out a bounding box column to parquet files (#3282)

Backwards incompatible API changes:

Expand Down
14 changes: 13 additions & 1 deletion geopandas/geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,13 @@ def to_arrow(
return ArrowTable(table)

def to_parquet(
self, path, index=None, compression="snappy", schema_version=None, **kwargs
self,
path,
index=None,
compression="snappy",
schema_version=None,
write_covering_bbox=False,
**kwargs,
):
"""Write a GeoDataFrame to the Parquet format.

Expand All @@ -1262,6 +1268,11 @@ def to_parquet(
schema_version : {'0.1.0', '0.4.0', '1.0.0', None}
GeoParquet specification version; if not provided will default to
latest supported version.
write_covering_bbox : bool, default False
Writes the bounding box column for each row entry with column
name 'bbox'. Writing a bbox column can be computationally
expensive, but allows you to specify a `bbox` in :
func:`read_parquet` for filtered reading.
kwargs
Additional keyword arguments passed to :func:`pyarrow.parquet.write_table`.

Expand Down Expand Up @@ -1294,6 +1305,7 @@ def to_parquet(
compression=compression,
index=index,
schema_version=schema_version,
write_covering_bbox=write_covering_bbox,
**kwargs,
)

Expand Down
189 changes: 160 additions & 29 deletions geopandas/io/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
return sorted([_geometry_type_names[idx] for idx in geometry_types])


def _create_metadata(df, schema_version=None):
def _create_metadata(df, schema_version=None, write_covering_bbox=False):
"""Create and encode geo metadata dict.

Parameters
Expand All @@ -119,6 +119,10 @@
schema_version : {'0.1.0', '0.4.0', '1.0.0-beta.1', '1.0.0', None}
GeoParquet specification version; if not provided will default to
latest supported version.
write_covering_bbox : bool, default False
Writes the bounding box column for each row entry with column
name 'bbox'. Writing a bbox column can be computationally
expensive, hence is default setting is False.

Returns
-------
Expand Down Expand Up @@ -164,6 +168,16 @@
# don't add bbox with NaNs for empty / all-NA geometry column
column_metadata[col]["bbox"] = bbox

if write_covering_bbox:
column_metadata[col]["covering"] = {
"bbox": {
"xmin": ["bbox", "xmin"],
"ymin": ["bbox", "ymin"],
"xmax": ["bbox", "xmax"],
"ymax": ["bbox", "ymax"],
},
}

return {
"primary_column": df._geometry_column_name,
"columns": column_metadata,
Expand Down Expand Up @@ -231,7 +245,7 @@
raise ValueError("Index level names must be strings")


def _validate_metadata(metadata):
def _validate_geo_metadata(metadata):
"""Validate geo metadata.
Must not be empty, and must contain the structure specified above.

Expand Down Expand Up @@ -292,22 +306,42 @@
stacklevel=4,
)

if "covering" in column_metadata:
covering = column_metadata["covering"]
if "bbox" in covering:
bbox = covering["bbox"]
for var in ["xmin", "ymin", "xmax", "ymax"]:
if var not in bbox.keys():
raise ValueError("Metadata for bbox column is malformed.")

Check warning on line 315 in geopandas/io/arrow.py

View check run for this annotation

Codecov / codecov/patch

geopandas/io/arrow.py#L315

Added line #L315 was not covered by tests


def _geopandas_to_arrow(df, index=None, schema_version=None):
def _geopandas_to_arrow(df, index=None, schema_version=None, write_covering_bbox=None):
"""
Helper function with main, shared logic for to_parquet/to_feather.
"""
from pyarrow import StructArray

from geopandas.io.geoarrow import geopandas_to_arrow

_validate_dataframe(df)

# create geo metadata before altering incoming data frame
geo_metadata = _create_metadata(df, schema_version=schema_version)
geo_metadata = _create_metadata(
df, schema_version=schema_version, write_covering_bbox=write_covering_bbox
)

table = geopandas_to_arrow(
df, geometry_encoding="WKB", index=index, interleaved=True
)

if write_covering_bbox:
bounds = df.bounds
bbox_array = StructArray.from_arrays(
[bounds["minx"], bounds["miny"], bounds["maxx"], bounds["maxy"]],
names=["xmin", "ymin", "xmax", "ymax"],
)
table = table.append_column("bbox", bbox_array)

# Store geopandas specific file-level metadata
# This must be done AFTER creating the table or it is not persisted
metadata = table.schema.metadata
Expand All @@ -317,7 +351,13 @@


def _to_parquet(
df, path, index=None, compression="snappy", schema_version=None, **kwargs
df,
path,
index=None,
compression="snappy",
schema_version=None,
write_covering_bbox=False,
**kwargs,
):
"""
Write a GeoDataFrame to the Parquet format.
Expand Down Expand Up @@ -346,6 +386,10 @@
schema_version : {'0.1.0', '0.4.0', '1.0.0', None}
GeoParquet specification version; if not provided will default to
latest supported version.
write_covering_bbox : bool, default False
Writes the bounding box column for each row entry with column
name 'bbox'. Writing a bbox column can be computationally
expensive, hence is default setting is False.
**kwargs
Additional keyword arguments passed to pyarrow.parquet.write_table().
"""
Expand All @@ -365,7 +409,12 @@
schema_version = kwargs.pop("version")

path = _expand_user(path)
table = _geopandas_to_arrow(df, index=index, schema_version=schema_version)
table = _geopandas_to_arrow(
df,
index=index,
schema_version=schema_version,
write_covering_bbox=write_covering_bbox,
)
parquet.write_table(table, path, compression=compression, **kwargs)


Expand Down Expand Up @@ -426,31 +475,19 @@
feather.write_feather(table, path, compression=compression, **kwargs)


def _arrow_to_geopandas(table, metadata=None):
def _arrow_to_geopandas(table, geo_metadata=None):
"""
Helper function with main, shared logic for read_parquet/read_feather.
"""
df = table.to_pandas()

metadata = metadata or table.schema.metadata

if metadata is None or b"geo" not in metadata:
raise ValueError(
"""Missing geo metadata in Parquet/Feather file.
Use pandas.read_parquet/read_feather() instead."""
)

try:
metadata = _decode_metadata(metadata.get(b"geo", b""))

except (TypeError, json.decoder.JSONDecodeError):
raise ValueError("Missing or malformed geo metadata in Parquet/Feather file")

_validate_metadata(metadata)
geo_metadata = geo_metadata or _decode_metadata(
table.schema.metadata.get(b"geo", b"")
)

# Find all geometry columns that were read from the file. May
# be a subset if 'columns' parameter is used.
geometry_columns = df.columns.intersection(metadata["columns"])
geometry_columns = df.columns.intersection(geo_metadata["columns"])

if not len(geometry_columns):
raise ValueError(
Expand All @@ -459,7 +496,7 @@
use pandas.read_parquet/read_feather() instead."""
)

geometry = metadata["primary_column"]
geometry = geo_metadata["primary_column"]

# Missing geometry likely indicates a subset of columns was read;
# promote the first available geometry to the primary geometry.
Expand All @@ -476,7 +513,7 @@

# Convert the WKB columns that are present back to geometry.
for col in geometry_columns:
col_metadata = metadata["columns"][col]
col_metadata = geo_metadata["columns"][col]
if "crs" in col_metadata:
crs = col_metadata["crs"]
if isinstance(crs, dict):
Expand Down Expand Up @@ -562,6 +599,23 @@
return filesystem


def _validate_and_decode_metadata(metadata):
if metadata is None or b"geo" not in metadata:
raise ValueError(
"""Missing geo metadata in Parquet/Feather file.
Use pandas.read_parquet/read_feather() instead."""
)

# check for malformed metadata
try:
decoded_geo_metadata = _decode_metadata(metadata.get(b"geo", b""))
except (TypeError, json.decoder.JSONDecodeError):
raise ValueError("Missing or malformed geo metadata in Parquet/Feather file")

_validate_geo_metadata(decoded_geo_metadata)
return decoded_geo_metadata


def _read_parquet_schema_and_metadata(path, filesystem):
"""
Opening the Parquet file/dataset a first time to get the schema and metadata.
Expand Down Expand Up @@ -597,7 +651,7 @@
return schema, metadata


def _read_parquet(path, columns=None, storage_options=None, **kwargs):
def _read_parquet(path, columns=None, storage_options=None, bbox=None, **kwargs):
"""
Load a Parquet object from the file path, returning a GeoDataFrame.

Expand Down Expand Up @@ -641,6 +695,11 @@
both ``pyarrow.fs`` and ``fsspec`` (e.g. "s3://") then the ``pyarrow.fs``
filesystem is preferred. Provide the instantiated fsspec filesystem using
the ``filesystem`` keyword if you wish to use its implementation.
bbox : tuple, optional
Bounding box to be used to filter selection from geoparquet data. This
is only usable if the data was saved with the bbox covering metadata.
Input is of the tuple format (xmin, ymin, xmax, ymax).

**kwargs
Any additional kwargs passed to :func:`pyarrow.parquet.read_table`.

Expand Down Expand Up @@ -672,13 +731,35 @@
path, filesystem=filesystem, storage_options=storage_options
)
path = _expand_user(path)
schema, metadata = _read_parquet_schema_and_metadata(path, filesystem)

geo_metadata = _validate_and_decode_metadata(metadata)

bbox_filter = (
_get_parquet_bbox_filter(geo_metadata, bbox) if bbox is not None else None
)

if_bbox_column_exists = _check_if_covering_in_geo_metadata(geo_metadata)

_, metadata = _read_parquet_schema_and_metadata(path, filesystem)
# by default, bbox column is not read in, so must specify which
# columns are read in if it exists.
if not columns and if_bbox_column_exists:
columns = _get_non_bbox_columns(schema, geo_metadata)

# if both bbox and filters kwargs are used, must splice together.
if "filters" in kwargs:
filters_kwarg = kwargs.pop("filters")
filters = _splice_bbox_and_filters(filters_kwarg, bbox_filter)
else:
filters = bbox_filter

kwargs["use_pandas_metadata"] = True
table = parquet.read_table(path, columns=columns, filesystem=filesystem, **kwargs)

return _arrow_to_geopandas(table, metadata)
table = parquet.read_table(
path, columns=columns, filesystem=filesystem, filters=filters, **kwargs
)

return _arrow_to_geopandas(table, geo_metadata)


def _read_feather(path, columns=None, **kwargs):
Expand Down Expand Up @@ -745,5 +826,55 @@
raise ImportError("pyarrow >= 0.17 required for Feather support")

path = _expand_user(path)

table = feather.read_table(path, columns=columns, **kwargs)
_validate_and_decode_metadata(table.schema.metadata)
return _arrow_to_geopandas(table)


def _get_parquet_bbox_filter(geo_metadata, bbox):

if not _check_if_covering_in_geo_metadata(geo_metadata):
raise ValueError("No covering bbox in parquet file.")

bbox_column_name = _get_bbox_encoding_column_name(geo_metadata)
return _convert_bbox_to_parquet_filter(bbox, bbox_column_name)


def _convert_bbox_to_parquet_filter(bbox, bbox_column_name):
import pyarrow.compute as pc

return ~(
(pc.field((bbox_column_name, "xmin")) > bbox[2])
| (pc.field((bbox_column_name, "ymin")) > bbox[3])
| (pc.field((bbox_column_name, "xmax")) < bbox[0])
| (pc.field((bbox_column_name, "ymax")) < bbox[1])
)


def _check_if_covering_in_geo_metadata(geo_metadata):
return "covering" in geo_metadata["columns"]["geometry"].keys()


def _get_bbox_encoding_column_name(geo_metadata):
return geo_metadata["columns"]["geometry"]["covering"]["bbox"]["xmin"][0]


def _get_non_bbox_columns(schema, geo_metadata):

bbox_column_name = _get_bbox_encoding_column_name(geo_metadata)
columns = schema.names
if bbox_column_name in columns:
columns.remove(bbox_column_name)
return columns


def _splice_bbox_and_filters(kwarg_filters, bbox_filter):
parquet = import_optional_dependency(
"pyarrow.parquet", extra="pyarrow is required for Parquet support."
)
if bbox_filter is None:
return kwarg_filters

filters_expression = parquet.filters_to_expression(kwarg_filters)
return bbox_filter & filters_expression