diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 44b000bb..387e153c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,7 +13,7 @@ repos: - id: debug-statements - repo: https://github.com/psf/black - rev: 22.12.0 + rev: 23.1.0 hooks: - id: black language_version: python3 @@ -25,7 +25,7 @@ repos: name: isort (python) - repo: https://github.com/PyCQA/autoflake - rev: v2.0.0 + rev: v2.0.1 hooks: - id: autoflake args: diff --git a/examples/aws_request.ipynb b/examples/aws_request.ipynb index ad05cc49..6fe19516 100644 --- a/examples/aws_request.ipynb +++ b/examples/aws_request.ipynb @@ -32,7 +32,7 @@ "outputs": [], "source": [ "import os\n", - "from datetime import datetime as dt\n", + "from datetime import datetime\n", "\n", "from sentinelhub import CRS, BBox, DataCollection, SentinelHubCatalog, SHConfig\n", "from sentinelhub.aws import AwsDownloadClient\n", @@ -48,8 +48,8 @@ "metadata": {}, "outputs": [], "source": [ - "search_bbox = BBox(bbox=[46.16, -16.15, 46.51, -5.58], crs=CRS.WGS84)\n", - "search_time_interval = (dt(2022, 12, 11), dt(2022, 12, 17))\n", + "search_bbox = BBox(bbox=(46.16, -16.15, 46.51, -5.58), crs=CRS.WGS84)\n", + "search_time_interval = (datetime(2022, 12, 11), datetime(2022, 12, 17))\n", "data_collection = DataCollection.SENTINEL2_L1C # use DataCollection.SENTINEL2_L1C or DataCollection.SENTINEL2_L2A" ] }, @@ -260,7 +260,7 @@ } ], "source": [ - "search_bbox = BBox(bbox=[46.16, -16.15, 46.51, -15.58], crs=CRS.WGS84)\n", + "search_bbox = BBox(bbox=(46.16, -16.15, 46.51, -15.58), crs=CRS.WGS84)\n", "search_time_interval = (\"2017-12-01T00:00:00\", \"2017-12-15T23:59:59\")\n", "\n", "\n", diff --git a/examples/byoc_request.ipynb b/examples/byoc_request.ipynb index d55d05ff..f168fe24 100644 --- a/examples/byoc_request.ipynb +++ b/examples/byoc_request.ipynb @@ -1285,7 +1285,7 @@ "metadata": {}, "outputs": [], "source": [ - "caspian_sea_bbox = BBox([49.9604, 44.7176, 51.0481, 45.2324], crs=CRS.WGS84)" + "caspian_sea_bbox = BBox((49.9604, 44.7176, 51.0481, 45.2324), crs=CRS.WGS84)" ] }, { diff --git a/examples/data_collections.ipynb b/examples/data_collections.ipynb index a5d8b6a1..25f1ca86 100644 --- a/examples/data_collections.ipynb +++ b/examples/data_collections.ipynb @@ -138,7 +138,7 @@ "\n", "\n", "# Columbia Glacier, Alaska\n", - "glacier_bbox = BBox([-147.8, 60.96, -146.5, 61.38], crs=CRS.WGS84)\n", + "glacier_bbox = BBox((-147.8, 60.96, -146.5, 61.38), crs=CRS.WGS84)\n", "glacier_size = (700, 466)\n", "time_interval = \"2020-07-15\", \"2020-07-16\"\n", "\n", @@ -525,7 +525,7 @@ "source": [ "from sentinelhub import bbox_to_dimensions\n", "\n", - "slovenia_bbox = BBox([13.353882, 45.402307, 16.644287, 46.908998], crs=CRS.WGS84)\n", + "slovenia_bbox = BBox((13.353882, 45.402307, 16.644287, 46.908998), crs=CRS.WGS84)\n", "slovenia_size = bbox_to_dimensions(slovenia_bbox, resolution=240)\n", "\n", "evalscript_byoc = \"\"\"\n", diff --git a/examples/data_search.ipynb b/examples/data_search.ipynb index 1d115020..9daf705e 100644 --- a/examples/data_search.ipynb +++ b/examples/data_search.ipynb @@ -593,7 +593,7 @@ } ], "source": [ - "caspian_sea_bbox = BBox([49.9604, 44.7176, 51.0481, 45.2324], crs=CRS.WGS84)\n", + "caspian_sea_bbox = BBox((49.9604, 44.7176, 51.0481, 45.2324), crs=CRS.WGS84)\n", "time_interval = \"2020-12-10\", \"2021-02-01\"\n", "\n", "search_iterator = catalog.search(\n", @@ -858,7 +858,7 @@ } ], "source": [ - "caspian_sea_bbox = BBox([49.9604, 44.7176, 51.0481, 45.2324], crs=CRS.WGS84)\n", + "caspian_sea_bbox = BBox((49.9604, 44.7176, 51.0481, 45.2324), crs=CRS.WGS84)\n", "time_interval = \"2020-12-10\", \"2021-02-01\"\n", "\n", "wfs_iterator = WebFeatureService(\n", @@ -961,7 +961,7 @@ "source": [ "from sentinelhub import get_area_info\n", "\n", - "caspian_sea_bbox = BBox([49.9604, 44.7176, 51.0481, 45.2324], crs=CRS.WGS84)\n", + "caspian_sea_bbox = BBox((49.9604, 44.7176, 51.0481, 45.2324), crs=CRS.WGS84)\n", "time_interval = \"2020-12-10\", \"2021-02-01\"\n", "\n", "opensearch_iterator = get_area_info(caspian_sea_bbox, time_interval, maxcc=0.05)\n", diff --git a/examples/fis_request.ipynb b/examples/fis_request.ipynb index 5ec0051c..591f8bff 100644 --- a/examples/fis_request.ipynb +++ b/examples/fis_request.ipynb @@ -409,7 +409,7 @@ "metadata": {}, "outputs": [], "source": [ - "bbox1 = BBox([46.16, -16.15, 46.51, -15.58], CRS.WGS84)\n", + "bbox1 = BBox((46.16, -16.15, 46.51, -15.58), CRS.WGS84)\n", "bbox2 = BBox((1292344.0, 5195920.0, 1310615.0, 5214191.0), CRS.POP_WEB)\n", "\n", "geometry1 = Geometry(Polygon([(-5.13, 48), (-5.23, 48.09), (-5.13, 48.17), (-5.03, 48.08), (-5.13, 48)]), CRS.WGS84)\n", diff --git a/examples/ogc_request.ipynb b/examples/ogc_request.ipynb index 533c579d..007485eb 100644 --- a/examples/ogc_request.ipynb +++ b/examples/ogc_request.ipynb @@ -151,7 +151,7 @@ "metadata": {}, "outputs": [], "source": [ - "betsiboka_coords_wgs84 = [46.16, -16.15, 46.51, -15.58]" + "betsiboka_coords_wgs84 = (46.16, -16.15, 46.51, -15.58)" ] }, { @@ -977,7 +977,7 @@ "metadata": {}, "outputs": [], "source": [ - "betsiboka_bbox_large = BBox([45.88, -16.12, 47.29, -15.45], crs=CRS.WGS84)\n", + "betsiboka_bbox_large = BBox((45.88, -16.12, 47.29, -15.45), crs=CRS.WGS84)\n", "\n", "wms_true_color_request = WmsRequest(\n", " data_collection=DataCollection.SENTINEL2_L1C,\n", @@ -1492,7 +1492,7 @@ } ], "source": [ - "volcano_bbox = BBox(bbox=[(-2217485.0, 9228907.0), (-2150692.0, 9284045.0)], crs=CRS.POP_WEB)\n", + "volcano_bbox = BBox(bbox=(-2217485.0, 9228907.0, -2150692.0, 9284045.0), crs=CRS.POP_WEB)\n", "\n", "l2a_request = WmsRequest(\n", " data_collection=DataCollection.SENTINEL2_L2A,\n", @@ -1831,7 +1831,7 @@ "metadata": {}, "outputs": [], "source": [ - "byoc_bbox = BBox([13.82387, 45.85221, 13.83313, 45.85901], crs=CRS.WGS84)\n", + "byoc_bbox = BBox((13.82387, 45.85221, 13.83313, 45.85901), crs=CRS.WGS84)\n", "\n", "collection_id = \"\"\n", "layer = \"\"\n", @@ -1848,7 +1848,7 @@ "metadata": { "hide_input": false, "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, diff --git a/examples/process_request.ipynb b/examples/process_request.ipynb index 18462c38..933bb9a4 100644 --- a/examples/process_request.ipynb +++ b/examples/process_request.ipynb @@ -111,7 +111,7 @@ "metadata": {}, "outputs": [], "source": [ - "betsiboka_coords_wgs84 = [46.16, -16.15, 46.51, -15.58]" + "betsiboka_coords_wgs84 = (46.16, -16.15, 46.51, -15.58)" ] }, { @@ -1173,7 +1173,7 @@ "metadata": { "hide_input": false, "kernelspec": { - "display_name": "Python 3", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, diff --git a/examples/reading_pu_from_headers.ipynb b/examples/reading_pu_from_headers.ipynb index 5eefad31..1c999f37 100644 --- a/examples/reading_pu_from_headers.ipynb +++ b/examples/reading_pu_from_headers.ipynb @@ -51,7 +51,7 @@ "metadata": {}, "outputs": [], "source": [ - "betsiboka_coords_wgs84 = [46.16, -16.15, 46.51, -15.58]\n", + "betsiboka_coords_wgs84 = (46.16, -16.15, 46.51, -15.58)\n", "resolution = 60\n", "betsiboka_bbox = BBox(bbox=betsiboka_coords_wgs84, crs=CRS.WGS84)\n", "betsiboka_size = bbox_to_dimensions(betsiboka_bbox, resolution=resolution)" diff --git a/examples/statistical_request.ipynb b/examples/statistical_request.ipynb index 9dc49fa9..6c582092 100644 --- a/examples/statistical_request.ipynb +++ b/examples/statistical_request.ipynb @@ -127,7 +127,7 @@ "metadata": {}, "outputs": [], "source": [ - "betsiboka_bbox = BBox([46.16, -16.15, 46.51, -15.58], CRS.WGS84)\n", + "betsiboka_bbox = BBox((46.16, -16.15, 46.51, -15.58), CRS.WGS84)\n", "\n", "rgb_evalscript = \"\"\"\n", "//VERSION=3\n", @@ -1836,7 +1836,7 @@ "\n", "for band in BANDS:\n", " for stat in STATISTICAL_QUANTITIES:\n", - " column_name = f\"bands_{band}_{stat}\"\n", + " column_name = f\"bands_{band.name}_{stat}\"\n", " column = features_df[column_name]\n", "\n", " column = column / 10000.0\n", diff --git a/pyproject.toml b/pyproject.toml index 3c846464..f8ca34e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,8 @@ disable = [ "unsubscriptable-object", "invalid-unary-operand-type", "unspecified-encoding", - "unnecessary-ellipsis" + "unnecessary-ellipsis", + "use-dict-literal" ] [tool.pylint.design] diff --git a/requirements-dev.txt b/requirements-dev.txt index bf806516..9717e5e6 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -10,6 +10,7 @@ pre-commit pylint>=2.14.0 pytest>=4.0.0 pytest-cov +pytest-dependency pytest-lazy-fixture pytest-mock ray[default] diff --git a/requirements.txt b/requirements.txt index e8f600c8..4d866193 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,5 +12,5 @@ shapely shared-memory38;python_version<"3.8" tifffile>=2020.9.30 tqdm -typing-extensions;python_version<"3.11" +typing-extensions utm diff --git a/sentinelhub/__init__.py b/sentinelhub/__init__.py index 60483969..76e0ba29 100644 --- a/sentinelhub/__init__.py +++ b/sentinelhub/__init__.py @@ -30,6 +30,8 @@ monitor_batch_statistical_job, opensearch, ) +from .api.fis import HistogramType +from .api.ogc import CustomUrlParam from .api.opensearch import get_area_dates, get_area_info, get_tile_info, get_tile_info_id from .areas import ( BatchSplitter, @@ -41,17 +43,7 @@ UtmZoneSplitter, ) from .config import SHConfig -from .constants import ( - CRS, - CustomUrlParam, - HistogramType, - MimeType, - MosaickingOrder, - ResamplingType, - ServiceType, - ServiceUrl, - SHConstants, -) +from .constants import CRS, MimeType, MosaickingOrder, ResamplingType, ServiceType, ServiceUrl, SHConstants from .data_collections import DataCollection from .data_collections_bands import Band, Unit from .download import ( diff --git a/sentinelhub/_version.py b/sentinelhub/_version.py index fa213ace..bb5d2ad1 100644 --- a/sentinelhub/_version.py +++ b/sentinelhub/_version.py @@ -1,3 +1,3 @@ """Version of the sentinelhub package.""" -__version__ = "3.8.2" +__version__ = "3.8.3" diff --git a/sentinelhub/api/base.py b/sentinelhub/api/base.py index 973fed5b..4880e575 100644 --- a/sentinelhub/api/base.py +++ b/sentinelhub/api/base.py @@ -1,7 +1,6 @@ """ Module implementing some utility functions not suitable for other utility modules """ -import sys from abc import ABCMeta, abstractmethod from dataclasses import dataclass, field from datetime import datetime @@ -11,6 +10,7 @@ from dataclasses_json import CatchAll, LetterCase, Undefined from dataclasses_json import config as dataclass_config from dataclasses_json import dataclass_json +from typing_extensions import Protocol from ..base import FeatureIterator from ..config import SHConfig @@ -20,11 +20,6 @@ from ..types import JsonDict from .utils import datetime_config, remove_undefined -if sys.version_info < (3, 8): - from typing_extensions import Protocol -else: - from typing import Protocol # pylint: disable=ungrouped-imports - class SentinelHubService(metaclass=ABCMeta): """A base class for classes interacting with different Sentinel Hub APIs""" diff --git a/sentinelhub/api/batch/process.py b/sentinelhub/api/batch/process.py index 052c04db..68d7870f 100644 --- a/sentinelhub/api/batch/process.py +++ b/sentinelhub/api/batch/process.py @@ -533,28 +533,24 @@ def evalscript(self) -> str: return self.process_request["evalscript"] @property - def bbox(self) -> BBox: + def bbox(self) -> Optional[BBox]: """Provides a bounding box used by a batch request :return: An area bounding box together with CRS :raises: ValueError """ bbox, _, crs = self._parse_bounds_payload() - if bbox is None: - raise ValueError("Bounding box is not defined for this batch request") - return BBox(bbox, crs) + return None if bbox is None else BBox(bbox, crs) # type: ignore[arg-type] @property - def geometry(self) -> Geometry: + def geometry(self) -> Optional[Geometry]: """Provides a geometry used by a batch request :return: An area geometry together with CRS :raises: ValueError """ _, geometry, crs = self._parse_bounds_payload() - if geometry is None: - raise ValueError("Geometry is not defined for this batch request") - return Geometry(geometry, crs) + return None if geometry is None else Geometry(geometry, crs) def _parse_bounds_payload(self) -> Tuple[Optional[List[float]], Optional[list], CRS]: """Parses bbox, geometry and crs from batch request payload. If bbox or geometry don't exist it returns None diff --git a/sentinelhub/api/batch/statistical.py b/sentinelhub/api/batch/statistical.py index a60cb9f6..8051a4ee 100644 --- a/sentinelhub/api/batch/statistical.py +++ b/sentinelhub/api/batch/statistical.py @@ -4,13 +4,13 @@ """ import datetime as dt import logging -import sys from dataclasses import dataclass, field from typing import Any, Optional, Sequence, Union from dataclasses_json import CatchAll, LetterCase, Undefined from dataclasses_json import config as dataclass_config from dataclasses_json import dataclass_json +from typing_extensions import NotRequired, TypedDict from ...types import Json, JsonDict from ..base_request import InputDataDict @@ -18,11 +18,6 @@ from ..utils import datetime_config, enum_config, remove_undefined from .base import BaseBatchClient, BaseBatchRequest, BatchRequestStatus, BatchUserAction -if sys.version_info < (3, 11): - from typing_extensions import NotRequired, TypedDict -else: - from typing import NotRequired, TypedDict # pylint: disable=ungrouped-imports - LOGGER = logging.getLogger(__name__) BatchStatisticalRequestType = Union[str, dict, "BatchStatisticalRequest"] diff --git a/sentinelhub/api/catalog.py b/sentinelhub/api/catalog.py index abfee39b..621a3222 100644 --- a/sentinelhub/api/catalog.py +++ b/sentinelhub/api/catalog.py @@ -4,11 +4,13 @@ import datetime as dt from typing import Any, Dict, Iterable, List, Optional, Union +from typing_extensions import Literal + from ..base import FeatureIterator from ..data_collections import DataCollection, OrbitDirection from ..geometry import CRS, BBox, Geometry from ..time_utils import parse_time, parse_time_interval, serialize_time -from ..types import JsonDict, Literal, RawTimeIntervalType, RawTimeType +from ..types import JsonDict, RawTimeIntervalType, RawTimeType from .base import SentinelHubService from .utils import remove_undefined diff --git a/sentinelhub/api/fis.py b/sentinelhub/api/fis.py index 85d4b6e8..dfd98257 100644 --- a/sentinelhub/api/fis.py +++ b/sentinelhub/api/fis.py @@ -3,9 +3,10 @@ """ import datetime import warnings +from enum import Enum from typing import Any, List, Optional, Union -from ..constants import HistogramType, MimeType, RequestType, ServiceType +from ..constants import MimeType, RequestType, ServiceType from ..download import DownloadRequest from ..exceptions import SHDeprecationWarning from ..geometry import BBox, Geometry @@ -14,6 +15,17 @@ from .wfs import WebFeatureService +class HistogramType(Enum): + """Enum class for types of histogram supported by Sentinel Hub FIS service + + Supported histogram types are EQUALFREQUENCY, EQUIDISTANT and STREAMING + """ + + EQUALFREQUENCY = "equalfrequency" + EQUIDISTANT = "equidistant" + STREAMING = "streaming" + + class FisRequest(OgcRequest): """``Deprecated - use Statistical API instead!`` diff --git a/sentinelhub/api/ogc.py b/sentinelhub/api/ogc.py index e9999afc..54e7a9ef 100644 --- a/sentinelhub/api/ogc.py +++ b/sentinelhub/api/ogc.py @@ -2,16 +2,16 @@ Module for working with Sentinel Hub OGC services `Sentinel Hub OGC services `__. """ - import datetime import logging from base64 import b64encode +from enum import Enum from typing import Any, Dict, List, Optional, Tuple, Union from urllib.parse import urlencode from ..base import DataRequest from ..config import SHConfig -from ..constants import CRS, CustomUrlParam, MimeType, ResamplingType, ServiceType +from ..constants import CRS, MimeType, ResamplingType, ServiceType from ..data_collections import DataCollection from ..download import DownloadRequest, SentinelHubDownloadClient from ..geo_utils import get_image_dimension @@ -22,6 +22,44 @@ LOGGER = logging.getLogger(__name__) +class CustomUrlParam(Enum): + """Enum class to represent supported custom url parameters of OGC services + + Supported parameters are `SHOWLOGO`, `EVALSCRIPT`, `EVALSCRIPTURL`, `PREVIEW`, `QUALITY`, `UPSAMPLING`, + `DOWNSAMPLING`, `GEOMETRY` and `WARNINGS`. + + See `documentation `__ for more information. + """ + + SHOWLOGO = "ShowLogo" + EVALSCRIPT = "EvalScript" + EVALSCRIPTURL = "EvalScriptUrl" + PREVIEW = "Preview" + QUALITY = "Quality" + UPSAMPLING = "Upsampling" + DOWNSAMPLING = "Downsampling" + GEOMETRY = "Geometry" + MINQA = "MinQA" + + @classmethod + def has_value(cls, value: str) -> bool: + """Tests whether CustomUrlParam contains a constant defined with a string `value` + + :param value: The string representation of the enum constant + :return: `True` if there exists a constant with a string value `value`, `False` otherwise + """ + return any(value.lower() == item.value.lower() for item in cls) + + @staticmethod + def get_string(param: Enum) -> str: + """Get custom url parameter name as string + + :param param: CustomUrlParam enum constant + :return: String describing the file format + """ + return param.value + + class OgcRequest(DataRequest): """The base class for OGC-type requests (WMS and WCS) where all common parameters are defined""" @@ -271,7 +309,7 @@ def __init__(self, config: Optional[SHConfig] = None): self.config = config or SHConfig() self.config.raise_for_missing_instance_id() - self._base_url = self.config.get_sh_ogc_url() + self._base_url = f"{self.config.sh_base_url}/ogc" self.wfs_iterator: Optional[WebFeatureService] = None def get_request(self, request: OgcRequest) -> List[DownloadRequest]: @@ -388,8 +426,10 @@ def _get_wms_wcs_url_parameters(request: OgcRequest, date: Optional[datetime.dat :param date: acquisition date or None :return: dictionary with parameters """ + bbox = request.bbox.reverse() if request.bbox.crs is CRS.WGS84 else request.bbox + params = { - "BBOX": str(request.bbox.reverse()) if request.bbox.crs is CRS.WGS84 else str(request.bbox), + "BBOX": ",".join(map(str, bbox)), "FORMAT": MimeType.get_string(request.image_format), "CRS": CRS.ogc_string(request.bbox.crs), } diff --git a/sentinelhub/api/opensearch.py b/sentinelhub/api/opensearch.py index fead3b4a..65de87f4 100644 --- a/sentinelhub/api/opensearch.py +++ b/sentinelhub/api/opensearch.py @@ -186,6 +186,6 @@ def _prepare_url_params( "startDate": serialize_time(start_date, use_tz=False) if start_date else None, "completionDate": serialize_time(end_date, use_tz=False) if end_date else None, "orbitNumber": absolute_orbit, - "box": bbox, + "box": ",".join(map(str, bbox)) if bbox else None, } return {key: str(value) for key, value in url_params.items() if value} diff --git a/sentinelhub/api/wfs.py b/sentinelhub/api/wfs.py index 4eb7b810..2d053d6e 100644 --- a/sentinelhub/api/wfs.py +++ b/sentinelhub/api/wfs.py @@ -69,7 +69,7 @@ def __init__( def _build_service_url(self) -> str: """Creates a base URL for WFS service""" - base_url = self.config.get_sh_ogc_url() + base_url = f"{self.config.sh_base_url}/ogc" if self.data_collection.service_url: base_url = base_url.replace(self.config.sh_base_url, self.data_collection.service_url) @@ -79,12 +79,13 @@ def _build_service_url(self) -> str: def _build_request_params(self) -> JsonDict: """Builds URL parameters for WFS service""" start_time, end_time = serialize_time(self.time_interval, use_tz=True) + bbox = self.bbox.reverse() if self.bbox.crs is CRS.WGS84 else self.bbox return { "SERVICE": ServiceType.WFS.value, "WARNINGS": False, "REQUEST": "GetFeature", "TYPENAMES": self.data_collection.wfs_id, - "BBOX": str(self.bbox.reverse()) if self.bbox.crs is CRS.WGS84 else str(self.bbox), + "BBOX": ",".join(map(str, bbox)), "OUTPUTFORMAT": MimeType.JSON.get_string(), "SRSNAME": self.bbox.crs.ogc_string(), "TIME": f"{start_time}/{end_time}", diff --git a/sentinelhub/areas.py b/sentinelhub/areas.py index 924d51ac..8904e395 100644 --- a/sentinelhub/areas.py +++ b/sentinelhub/areas.py @@ -19,7 +19,7 @@ from .constants import CRS from .data_collections import DataCollection from .geo_utils import transform_point -from .geometry import BBox, BBoxCollection, Geometry, _BaseGeometry +from .geometry import BBox, Geometry, _BaseGeometry from .types import JsonDict T = TypeVar("T", float, int) @@ -139,7 +139,7 @@ def get_area_bbox(self, crs: Optional[CRS] = None) -> BBox: area_min_y = min(bbox.lower_left[1] for bbox in bbox_list) area_max_x = max(bbox.upper_right[0] for bbox in bbox_list) area_max_y = max(bbox.upper_right[1] for bbox in bbox_list) - bbox = BBox([area_min_x, area_min_y, area_max_x, area_max_y], crs=self.crs) + bbox = BBox((area_min_x, area_min_y, area_max_x, area_max_y), crs=self.crs) if crs is None: return bbox return bbox.transform(crs) @@ -407,7 +407,7 @@ def __init__( self, shape_list: Iterable[Union[Polygon, MultiPolygon, _BaseGeometry]], crs: CRS, - bbox_grid: Union[List[BBox], BBoxCollection], + bbox_grid: Iterable[BBox], bbox_split_shape: Union[int, Tuple[int, int]] = 1, **kwargs: Any, ): @@ -422,21 +422,10 @@ def __init__( :param reduce_bbox_sizes: If `True` it will reduce the sizes of bounding boxes so that they will tightly fit the given geometry in `shape_list`. """ - self.bbox_grid = self._parse_bbox_grid(bbox_grid) + self.bbox_grid = list(bbox_grid) self.bbox_split_shape = bbox_split_shape super().__init__(shape_list, crs, **kwargs) - @staticmethod - def _parse_bbox_grid(bbox_grid: Union[List[BBox], BBoxCollection]) -> BBoxCollection: - """Helper method for parsing bounding box grid. It will try to parse it into `BBoxCollection`""" - if isinstance(bbox_grid, BBoxCollection): - return bbox_grid - - if isinstance(bbox_grid, list): - return BBoxCollection(bbox_grid) - - raise ValueError(f"Parameter 'bbox_grid' should be an instance of {BBoxCollection}") - def _make_split(self) -> Tuple[List[BBox], List[Dict[str, object]]]: bbox_list: List[BBox] = [] info_list: List[Dict[str, object]] = [] @@ -664,7 +653,10 @@ def __init__( self.tile_size = self._get_tile_size() self.tile_buffer = self._get_tile_buffer() - batch_geometry = batch_request.geometry + batch_geometry: Optional[_BaseGeometry] = batch_request.geometry or batch_request.bbox + if batch_geometry is None: + raise ValueError("Batch request has both `bbox` and `geometry` set to `None`, which is invalid.") + super().__init__([batch_geometry.geometry], batch_geometry.crs) def _get_tile_size(self) -> Tuple[float, float]: @@ -699,12 +691,12 @@ def _reconstruct_bbox(self, tile_info: JsonDict) -> BBox: width, height = self.tile_size return BBox( - [ + ( upper_left_corner[0] - self.tile_buffer[0], upper_left_corner[1] - height - self.tile_buffer[1], upper_left_corner[0] + width + self.tile_buffer[0], upper_left_corner[1] + self.tile_buffer[1], - ], + ), tile_crs, ) diff --git a/sentinelhub/config.py b/sentinelhub/config.py index 03e6d704..6161acd1 100644 --- a/sentinelhub/config.py +++ b/sentinelhub/config.py @@ -9,6 +9,8 @@ import os from typing import Any, Dict, Iterable, List, Optional, Union +from .exceptions import deprecated_function + ConfigDict = Dict[str, Union[str, int, float]] @@ -293,6 +295,7 @@ def _mask_credentials(self, param: str, value: object) -> object: hide_size = min(max(len(value) - 4, 10), len(value)) return "*" * hide_size + value[hide_size:] + @deprecated_function(message_suffix="Use `f'{config.sh_auth_base_url}/oauth/token'` instead.") def get_sh_oauth_url(self) -> str: """Provides URL for Sentinel Hub authentication endpoint @@ -300,6 +303,7 @@ def get_sh_oauth_url(self) -> str: """ return f"{self.sh_auth_base_url}/oauth/token" + @deprecated_function(message_suffix="Use `f'{config.sh_base_url}/api/v1/process'` instead.") def get_sh_process_api_url(self) -> str: """Provides URL for Sentinel Hub Process API endpoint @@ -307,6 +311,7 @@ def get_sh_process_api_url(self) -> str: """ return f"{self.sh_base_url}/api/v1/process" + @deprecated_function(message_suffix="Use `f'{config.sh_base_url}/ogc'` instead.") def get_sh_ogc_url(self) -> str: """Provides URL for Sentinel Hub OGC endpoint @@ -314,6 +319,7 @@ def get_sh_ogc_url(self) -> str: """ return f"{self.sh_base_url}/ogc" + @deprecated_function(message_suffix="Use `f'{config.sh_auth_base_url}/aux/ratelimit'` instead.") def get_sh_rate_limit_url(self) -> str: """Provides URL for Sentinel Hub rate limiting endpoint diff --git a/sentinelhub/constants.py b/sentinelhub/constants.py index 569fa027..c65a630a 100644 --- a/sentinelhub/constants.py +++ b/sentinelhub/constants.py @@ -271,55 +271,6 @@ def _get_pyproj_projection_def(self) -> str: return "+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs" if self is CRS.WGS84 else self.ogc_string() -class CustomUrlParam(Enum): - """Enum class to represent supported custom url parameters of OGC services - - Supported parameters are `SHOWLOGO`, `EVALSCRIPT`, `EVALSCRIPTURL`, `PREVIEW`, `QUALITY`, `UPSAMPLING`, - `DOWNSAMPLING`, `GEOMETRY` and `WARNINGS`. - - See `documentation `__ for more information. - """ - - SHOWLOGO = "ShowLogo" - EVALSCRIPT = "EvalScript" - EVALSCRIPTURL = "EvalScriptUrl" - PREVIEW = "Preview" - QUALITY = "Quality" - UPSAMPLING = "Upsampling" - DOWNSAMPLING = "Downsampling" - GEOMETRY = "Geometry" - MINQA = "MinQA" - - @classmethod - def has_value(cls, value: str) -> bool: - """Tests whether CustomUrlParam contains a constant defined with a string `value` - - :param value: The string representation of the enum constant - :return: `True` if there exists a constant with a string value `value`, `False` otherwise - """ - return any(value.lower() == item.value.lower() for item in cls) - - @staticmethod - def get_string(param: Enum) -> str: - """Get custom url parameter name as string - - :param param: CustomUrlParam enum constant - :return: String describing the file format - """ - return param.value - - -class HistogramType(Enum): - """Enum class for types of histogram supported by Sentinel Hub FIS service - - Supported histogram types are EQUALFREQUENCY, EQUIDISTANT and STREAMING - """ - - EQUALFREQUENCY = "equalfrequency" - EQUIDISTANT = "equidistant" - STREAMING = "streaming" - - class MimeType(Enum): """Enum class to represent supported file formats @@ -436,7 +387,7 @@ def get_expected_max_value(self) -> Union[float, int]: class RequestType(Enum): - """Enum constant class for GET/POST request type""" + """Enum constant class for GET/POST request type.""" GET = "GET" POST = "POST" @@ -446,10 +397,7 @@ class RequestType(Enum): class SHConstants: - """Initialisation of constants used by OGC request. - - Constants are LATEST - """ + """Common constants used in various requests.""" LATEST = "latest" HEADERS = {"User-Agent": f"sentinelhub-py/v{__version__}"} diff --git a/sentinelhub/data_collections.py b/sentinelhub/data_collections.py index 48c8f503..0b66bb72 100644 --- a/sentinelhub/data_collections.py +++ b/sentinelhub/data_collections.py @@ -649,7 +649,6 @@ def contains_orbit_direction(self, orbit_direction: str) -> bool: :param orbit_direction: An orbit direction :return: `True` if data collection contains the orbit direction - :return: bool """ defined_direction = self.orbit_direction if defined_direction is None or defined_direction.upper() == OrbitDirection.BOTH: diff --git a/sentinelhub/decoding.py b/sentinelhub/decoding.py index 703d598f..7ef13cf9 100644 --- a/sentinelhub/decoding.py +++ b/sentinelhub/decoding.py @@ -5,9 +5,9 @@ import struct import tarfile import warnings -from io import BytesIO, IOBase +from io import BytesIO from json import JSONDecodeError -from typing import Any, Dict, Union +from typing import IO, Any, Dict, Union from xml.etree import ElementTree import numpy as np @@ -71,7 +71,7 @@ def decode_image(data: bytes, image_type: MimeType) -> np.ndarray: return image -def decode_image_with_pillow(stream: Union[IOBase, str]) -> np.ndarray: +def decode_image_with_pillow(stream: Union[IO, str]) -> np.ndarray: """Decodes an image using `Pillow` package and handles potential warnings. :param stream: A binary stream format or a filename. @@ -82,7 +82,7 @@ def decode_image_with_pillow(stream: Union[IOBase, str]) -> np.ndarray: return np.array(Image.open(stream)) -def decode_jp2_image(stream: IOBase) -> np.ndarray: +def decode_jp2_image(stream: IO) -> np.ndarray: """Tries to decode a JPEG2000 image using the `Pillow` package. :param stream: A binary stream format. @@ -146,7 +146,7 @@ def decode_sentinelhub_err_msg(response: Response) -> str: return response.text -def get_jp2_bit_depth(stream: IOBase) -> int: +def get_jp2_bit_depth(stream: IO) -> int: """Reads a bit encoding depth of jpeg2000 file in binary stream format :param stream: binary stream format diff --git a/sentinelhub/download/handlers.py b/sentinelhub/download/handlers.py index a3ba7a3c..1405534f 100644 --- a/sentinelhub/download/handlers.py +++ b/sentinelhub/download/handlers.py @@ -3,22 +3,17 @@ """ import functools import logging -import sys import time from typing import Callable, Optional, TypeVar import requests +from typing_extensions import Protocol from ..config import SHConfig from ..decoding import decode_sentinelhub_err_msg from ..exceptions import DownloadFailedException from .models import DownloadRequest -if sys.version_info < (3, 8): - from typing_extensions import Protocol -else: - from typing import Protocol # pylint: disable=ungrouped-imports - class _HasConfig(Protocol): """Interface of objects with a config.""" diff --git a/sentinelhub/download/session.py b/sentinelhub/download/session.py index 15d5ec4f..1dd7cfe2 100644 --- a/sentinelhub/download/session.py +++ b/sentinelhub/download/session.py @@ -125,7 +125,7 @@ def _collect_new_token(self) -> JsonDict: Note that the `DownloadRequest` object is created only because retry decorators of `_fetch_token` method require it. """ - request = DownloadRequest(url=self.config.get_sh_oauth_url()) + request = DownloadRequest(url=f"{self.config.sh_auth_base_url}/oauth/token") return self._fetch_token(request) @retry_temporary_errors diff --git a/sentinelhub/geometry.py b/sentinelhub/geometry.py index ccc6973b..37ad5df4 100644 --- a/sentinelhub/geometry.py +++ b/sentinelhub/geometry.py @@ -1,23 +1,27 @@ -""" -Module implementing geometry classes -""" +"""Module implementing geometry classes.""" from __future__ import annotations import contextlib +import warnings from abc import ABCMeta, abstractmethod from math import ceil -from typing import Callable, Iterator, List, Optional, Tuple, TypeVar, Union +from typing import Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union, cast import shapely.geometry +import shapely.geometry.base import shapely.ops import shapely.wkt from shapely.geometry import MultiPolygon, Polygon -from shapely.geometry.base import BaseGeometry +from typing_extensions import TypeAlias from .constants import CRS +from .exceptions import SHDeprecationWarning, deprecated_class from .geo_utils import transform_point Self = TypeVar("Self", bound="_BaseGeometry") +BBoxInputType: TypeAlias = Union[ + Tuple[float, float, float, float], Tuple[Tuple[float, float], Tuple[float, float]], Dict[str, float] +] class _BaseGeometry(metaclass=ABCMeta): @@ -86,39 +90,122 @@ def apply(self: Self, operation: Callable[[float, float], Tuple[float, float]]) class BBox(_BaseGeometry): """Class representing a bounding box in a given CRS. - Throughout the sentinelhub package this class serves as the canonical representation of a bounding - box. It can initialize itself from multiple representations: + Throughout the sentinelhub package this class serves as the canonical representation of a bounding box. It can be + initialized from multiple representations: - 1) ``((min_x,min_y),(max_x,max_y))``, - 2) ``(min_x,min_y,max_x,max_y)``, - 3) ``[min_x,min_y,max_x,max_y]``, - 4) ``[[min_x, min_y],[max_x,max_y]]``, - 5) ``[(min_x, min_y),(max_x,max_y)]``, - 6) ``([min_x, min_y],[max_x,max_y])``, - 7) ``'min_x,min_y,max_x,max_y'``, - 8) ``{'min_x':min_x, 'max_x':max_x, 'min_y':min_y, 'max_y':max_y}``, - 9) ``bbox``, where ``bbox`` is an instance of ``BBox``. + 1) `((min_x, min_y), (max_x, max_y))` + 2) `(min_x, min_y, max_x, max_y)` + 3) `{"min_x": min_x, "max_x": max_x, "min_y": min_y, "max_y": max_y}` - Note that BBox coordinate system depends on ``crs`` parameter: + In the above + Note that BBox coordinate system depends on `crs` parameter: - - In case of ``constants.CRS.WGS84`` axis x represents longitude and axis y represents latitude. - - In case of ``constants.CRS.POP_WEB`` axis x represents easting and axis y represents northing. - - In case of ``constants.CRS.UTM_*`` axis x represents easting and axis y represents northing. + - In case of `constants.CRS.WGS84` axis x represents longitude and axis y represents latitude. + - In case of `constants.CRS.POP_WEB` axis x represents easting and axis y represents northing. + - In case of `constants.CRS.UTM_*` axis x represents easting and axis y represents northing. """ - def __init__(self, bbox: Union[BBox, tuple, list, dict, str, BaseGeometry], crs: CRS): + def __init__(self, bbox: BBoxInputType, crs: CRS): """ :param bbox: A bbox in any valid representation :param crs: Coordinate reference system of the bounding box """ x_fst, y_fst, x_snd, y_snd = BBox._to_tuple(bbox) - self.min_x = min(x_fst, x_snd) - self.max_x = max(x_fst, x_snd) - self.min_y = min(y_fst, y_snd) - self.max_y = max(y_fst, y_snd) + self.min_x, self.max_x = min(x_fst, x_snd), max(x_fst, x_snd) + self.min_y, self.max_y = min(y_fst, y_snd), max(y_fst, y_snd) super().__init__(crs) + @staticmethod + def _to_tuple(bbox: BBoxInputType) -> Tuple[float, float, float, float]: + """Converts the input bbox representation (see the constructor docstring for a list of valid representations) + into a flat tuple. Also supports `list` objects in places where `tuple` is expected. + + :param bbox: A bbox in one of the forms listed in the class description. + :return: A flat tuple `(min_x, min_y, max_x, max_y)` + :raises: TypeError + """ + if isinstance(bbox, (tuple, list)): + return BBox._tuple_from_list_or_tuple(bbox) + if isinstance(bbox, str): # type: ignore[unreachable] + return BBox._tuple_from_str(bbox) # type: ignore[unreachable] + if isinstance(bbox, dict): + return BBox._tuple_from_dict(bbox) + if isinstance(bbox, BBox): # type: ignore[unreachable] + return BBox._tuple_from_bbox(bbox) + if isinstance(bbox, shapely.geometry.base.BaseGeometry): + warnings.warn( + ( + "Initializing `BBox` objects from `shapely` geometries will no longer be possible in future" + " versions. Use the `bounds` property of the `shapely` geometry to initialize the `BBox` instead." + ), + category=SHDeprecationWarning, + stacklevel=2, + ) + return bbox.bounds + raise TypeError( + "Unable to process `BBox` input. Provide `(min_x, min_y, max_x, max_y)` or check documentation for other" + " valid forms of input." + ) + + @staticmethod + def _tuple_from_list_or_tuple( + bbox: Union[Tuple[float, float, float, float], Tuple[Tuple[float, float], Tuple[float, float]]] + ) -> Tuple[float, float, float, float]: + """Converts a list or tuple representation of a bbox into a flat tuple representation. + + :param bbox: a list or tuple with 4 coordinates that is either flat or nested + :return: tuple (min_x, min_y, max_x, max_y) + :raises: TypeError + """ + if len(bbox) == 4: + min_x, min_y, max_x, max_y = cast(Tuple[float, float, float, float], bbox) + else: + (min_x, min_y), (max_x, max_y) = cast(Tuple[Tuple[float, float], Tuple[float, float]], bbox) + return float(min_x), float(min_y), float(max_x), float(max_y) + + @staticmethod + def _tuple_from_str(bbox: str) -> Tuple[float, float, float, float]: + """Parses a string of numbers separated by any combination of commas and spaces + + :param bbox: e.g. str of the form `min_x ,min_y max_x, max_y` + :return: tuple (min_x,min_y,max_x,max_y) + """ + warnings.warn( + "Initializing `BBox` objects from strings will no longer be possible in future versions.", + category=SHDeprecationWarning, + stacklevel=2, + ) + string_parts = bbox.replace(",", " ").split() + if len(string_parts) != 4: + raise ValueError(f"Input {bbox} is not a valid string representation of a BBox.") + min_x, min_y, max_x, max_y = map(float, string_parts) + return min_x, min_y, max_x, max_y + + @staticmethod + def _tuple_from_dict(bbox: dict) -> Tuple[float, float, float, float]: + """Converts a dictionary representation of a bbox into a flat tuple representation + + :param bbox: a dict with keys "min_x, "min_y", "max_x", and "max_y" + :return: tuple (min_x,min_y,max_x,max_y) + :raises: KeyError + """ + return bbox["min_x"], bbox["min_y"], bbox["max_x"], bbox["max_y"] + + @staticmethod + def _tuple_from_bbox(bbox: BBox) -> Tuple[float, float, float, float]: + """Converts a BBox instance into a tuple + + :param bbox: An instance of the BBox type + :return: tuple (min_x, min_y, max_x, max_y) + """ + warnings.warn( + "Initializing `BBox` objects from `BBox` objects will no longer be possible in future versions.", + category=SHDeprecationWarning, + stacklevel=2, + ) + return bbox.lower_left + bbox.upper_right + def __iter__(self) -> Iterator[float]: """This method enables iteration over coordinates of bounding box""" return iter(self.lower_left + self.upper_right) @@ -133,6 +220,11 @@ def __str__(self, reverse: bool = False) -> str: :param reverse: `True` if x and y coordinates should be switched and `False` otherwise :return: String of coordinates """ + warnings.warn( + "The string representation of `BBox` will change to match its `repr` representation.", + category=SHDeprecationWarning, + stacklevel=2, + ) if reverse: return f"{self.min_y},{self.min_x},{self.max_y},{self.max_x}" return f"{self.min_x},{self.min_y},{self.max_x},{self.max_y}" @@ -314,12 +406,10 @@ def get_partition( return [ [ BBox( - [ - self.min_x + i * size_x, - self.min_y + j * size_y, - self.min_x + (i + 1) * size_x, - self.min_y + (j + 1) * size_y, - ], + ( + (self.min_x + i * size_x, self.min_y + j * size_y), + (self.min_x + (i + 1) * size_x, self.min_y + (j + 1) * size_y), + ), crs=self.crs, ) for j in range(num_y) @@ -349,74 +439,6 @@ def _parse_resolution(res: Union[str, int, float]) -> float: raise TypeError(f"Resolution should be a float, got resolution of type {type(res)}") - @staticmethod - def _to_tuple(bbox: Union[BBox, tuple, list, dict, str, BaseGeometry]) -> Tuple[float, float, float, float]: - """Converts the input bbox representation (see the constructor docstring for a list of valid representations) - into a flat tuple - - :param bbox: A bbox in one of 7 forms listed in the class description. - :return: A flat tuple of size - :raises: TypeError - """ - if isinstance(bbox, (list, tuple)): - return BBox._tuple_from_list_or_tuple(bbox) - if isinstance(bbox, str): - return BBox._tuple_from_str(bbox) - if isinstance(bbox, dict): - return BBox._tuple_from_dict(bbox) - if isinstance(bbox, BBox): - return BBox._tuple_from_bbox(bbox) - if isinstance(bbox, BaseGeometry): - return bbox.bounds - raise TypeError("Invalid bbox representation") - - @staticmethod - def _tuple_from_list_or_tuple(bbox: Union[list, tuple]) -> Tuple[float, float, float, float]: - """Converts a list or tuple representation of a bbox into a flat tuple representation. - - :param bbox: a list or tuple with 4 coordinates that is either flat or nested - :return: tuple (min_x,min_y,max_x,max_y) - :raises: TypeError - """ - if len(bbox) == 4: - min_x, min_y, max_x, max_y = map(float, bbox) - return min_x, min_y, max_x, max_y - if len(bbox) == 2 and all(isinstance(point, (list, tuple)) for point in bbox): - return BBox._tuple_from_list_or_tuple(bbox[0] + bbox[1]) - raise TypeError("Expected a valid list or tuple representation of a bbox") - - @staticmethod - def _tuple_from_str(bbox: str) -> Tuple[float, float, float, float]: - """Parses a string of numbers separated by any combination of commas and spaces - - :param bbox: e.g. str of the form `min_x ,min_y max_x, max_y` - :return: tuple (min_x,min_y,max_x,max_y) - """ - string_parts = bbox.replace(",", " ").split() - if len(string_parts) != 4: - raise ValueError(f"Input {bbox} is not a valid string representation of a BBox.") - min_x, min_y, max_x, max_y = map(float, string_parts) - return min_x, min_y, max_x, max_y - - @staticmethod - def _tuple_from_dict(bbox: dict) -> Tuple[float, float, float, float]: - """Converts a dictionary representation of a bbox into a flat tuple representation - - :param bbox: a dict with keys "min_x, "min_y", "max_x", and "max_y" - :return: tuple (min_x,min_y,max_x,max_y) - :raises: KeyError - """ - return bbox["min_x"], bbox["min_y"], bbox["max_x"], bbox["max_y"] - - @staticmethod - def _tuple_from_bbox(bbox: BBox) -> Tuple[float, float, float, float]: - """Converts a BBox instance into a tuple - - :param bbox: An instance of the BBox type - :return: tuple (min_x, min_y, max_x, max_y) - """ - return bbox.lower_left + bbox.upper_right - class Geometry(_BaseGeometry): """A class that combines shapely geometry with coordinate reference system. It currently supports polygons and @@ -510,7 +532,7 @@ def bbox(self) -> BBox: :return: A bounding box, with same CRS """ - return BBox(self.geometry, self.crs) + return BBox(self.geometry.bounds, self.crs) @staticmethod def _parse_geometry(geometry: Union[Polygon, MultiPolygon, dict, str]) -> Union[Polygon, MultiPolygon]: @@ -524,7 +546,7 @@ def _parse_geometry(geometry: Union[Polygon, MultiPolygon, dict, str]) -> Union[ geometry = shapely.wkt.loads(geometry) elif isinstance(geometry, dict): geometry = shapely.geometry.shape(geometry) - elif not isinstance(geometry, BaseGeometry): + elif not isinstance(geometry, shapely.geometry.base.BaseGeometry): raise TypeError("Unsupported geometry representation") if not isinstance(geometry, (Polygon, MultiPolygon)): @@ -533,6 +555,7 @@ def _parse_geometry(geometry: Union[Polygon, MultiPolygon, dict, str]) -> Union[ return geometry +@deprecated_class(message_suffix="Use sequences of BBox objects instead.") class BBoxCollection(_BaseGeometry): """A collection of bounding boxes""" @@ -567,7 +590,6 @@ def __iter__(self) -> Iterator[BBox]: @property def bbox_list(self) -> List[BBox]: """Returns the list of bounding boxes from collection - :return: The list of bounding boxes """ return self._bbox_list @@ -575,7 +597,6 @@ def bbox_list(self) -> List[BBox]: @property def geometry(self) -> MultiPolygon: """Returns shapely object representing geometry - :return: A multipolygon of bounding boxes """ return self._geometry @@ -583,21 +604,18 @@ def geometry(self) -> MultiPolygon: @property def bbox(self) -> BBox: """Returns BBox object representing bounding box around the geometry - :return: A bounding box, with same CRS """ return BBox(self.geometry, self.crs) def reverse(self) -> BBoxCollection: """Returns a new BBoxCollection object where all x and y coordinates are switched - :return: New Geometry object with switched coordinates """ return BBoxCollection([bbox.reverse() for bbox in self.bbox_list]) def transform(self, crs: CRS, always_xy: bool = True) -> BBoxCollection: """Transforms BBoxCollection from current CRS to target CRS - :param crs: target CRS :param always_xy: Parameter that is passed to `pyproj.Transformer` object and defines axis order for transformation. The default value `True` is in most cases the correct one. diff --git a/sentinelhub/geopedia/core.py b/sentinelhub/geopedia/core.py index 84dffe74..42c3da86 100644 --- a/sentinelhub/geopedia/core.py +++ b/sentinelhub/geopedia/core.py @@ -9,6 +9,7 @@ from shapely.geometry import shape as geo_shape from shapely.geometry.base import BaseGeometry +from typing_extensions import Literal from ..api.ogc import OgcImageService, OgcRequest from ..base import FeatureIterator @@ -16,7 +17,7 @@ from ..constants import CRS, MimeType from ..download import DownloadClient, DownloadRequest from ..geometry import BBox -from ..types import JsonDict, Literal +from ..types import JsonDict if TYPE_CHECKING: from .request import GeopediaImageRequest @@ -371,7 +372,7 @@ def _build_request_params(self, bbox: Optional[BBox], query_filter: Optional[str if bbox.crs is not CRS.POP_WEB: bbox = bbox.transform(CRS.POP_WEB) - params[self.FILTER_EXPRESSION] = f'bbox({bbox},"EPSG:3857")' + params[self.FILTER_EXPRESSION] = f'bbox({",".join(map(str, bbox))},"EPSG:3857")' if query_filter is not None: if self.FILTER_EXPRESSION in params: diff --git a/sentinelhub/io_utils.py b/sentinelhub/io_utils.py index 4c2d3da8..916343f4 100644 --- a/sentinelhub/io_utils.py +++ b/sentinelhub/io_utils.py @@ -6,18 +6,16 @@ import json import logging import os -import warnings -from typing import Any, Callable, Dict, Optional +from typing import IO, Any, Callable, Dict, Optional from xml.etree import ElementTree import numpy as np import tifffile as tiff from PIL import Image +from typing_extensions import Literal from .constants import MimeType from .decoding import decode_image_with_pillow, decode_jp2_image, decode_tar, get_data_format -from .exceptions import SHUserWarning -from .types import Json LOGGER = logging.getLogger(__name__) @@ -55,78 +53,39 @@ def read_data(filename: str, data_format: Optional[MimeType] = None) -> Any: def _get_reader(data_format: MimeType) -> Callable[[str], Any]: """Provides a function for reading data in a given data format""" if data_format is MimeType.TIFF: - return read_tiff_image + return tiff.imread if data_format is MimeType.JP2: - return read_jp2_image + return _open_file_and_read(decode_jp2_image, "rb") if data_format.is_image_format(): - return read_image - try: - available_readers: Dict[MimeType, Callable[[str], Any]] = { - MimeType.TAR: read_tar, - MimeType.TXT: read_text, - MimeType.RAW: _read_binary, - MimeType.CSV: read_csv, - MimeType.JSON: read_json, - MimeType.XML: read_xml, - MimeType.GML: read_xml, - MimeType.SAFE: read_xml, - } - return available_readers[data_format] - except KeyError as exception: - raise ValueError(f"Reading data format {data_format} is not supported") from exception - - -def read_tar(filename: str) -> Dict[str, object]: - """Read a tar from file""" - with open(filename, "rb") as file: - return decode_tar(file) # type: ignore[arg-type] - - -def read_tiff_image(filename: str) -> Any: - """Read data from TIFF file - - :param filename: name of TIFF file to be read - :return: data stored in TIFF file - """ - return tiff.imread(filename) - - -def read_jp2_image(filename: str) -> np.ndarray: - """Read data from JPEG2000 file - - :param filename: name of JPEG2000 file to be read - :return: data stored in JPEG2000 file - """ - with open(filename, "rb") as file: - return decode_jp2_image(file) - - -def read_image(filename: str) -> np.ndarray: - """Read data from PNG or JPG file + return decode_image_with_pillow - :param filename: name of PNG or JPG file to be read - :return: data stored in JPG file - """ - return decode_image_with_pillow(filename) + available_readers: Dict[MimeType, Callable[[str], Any]] = { + MimeType.TAR: _open_file_and_read(decode_tar, "rb"), # type: ignore[arg-type] + MimeType.TXT: _open_file_and_read(lambda file: file.read(), "r"), + MimeType.RAW: _open_file_and_read(lambda file: file.read(), "rb"), + MimeType.CSV: _read_csv, + MimeType.JSON: _open_file_and_read(json.load, "rb"), + MimeType.XML: ElementTree.parse, + MimeType.GML: ElementTree.parse, + MimeType.SAFE: ElementTree.parse, + MimeType.NPY: np.load, + } + if data_format not in available_readers: + raise ValueError(f"Reading data format {data_format} is not supported.") -def read_text(filename: str) -> str: - """Read data from text file + return available_readers[data_format] - :param filename: name of text file to be read - :return: data stored in text file - """ - with open(filename, "r") as file: - return file.read() +def _open_file_and_read(reader: Callable[[IO], Any], mode: Literal["r", "rb"]) -> Callable[[str], Any]: + def new_reader(filename: str) -> Any: + with open(filename, mode) as file: + return reader(file) -def _read_binary(filename: str) -> bytes: - """Reads data in bytes""" - with open(filename, "rb") as file: - return file.read() + return new_reader -def read_csv(filename: str, delimiter: str = CSV_DELIMITER) -> list: +def _read_csv(filename: str, delimiter: str = CSV_DELIMITER) -> list: """Read data from CSV file :param filename: name of CSV file to be read @@ -137,40 +96,6 @@ def read_csv(filename: str, delimiter: str = CSV_DELIMITER) -> list: return list(csv.reader(file, delimiter=delimiter)) -def read_json(filename: str) -> Any: - """Read data from JSON file - - :param filename: name of JSON file to be read - :return: data stored in JSON file - """ - with open(filename, "rb") as file: - return json.load(file) - - -def read_xml(filename: str) -> ElementTree.ElementTree: - """Read data from XML or GML file - - :param filename: name of XML or GML file to be read - :return: data stored in XML file - """ - return ElementTree.parse(filename) - - -def read_numpy(filename: str) -> np.ndarray: - """Read data from numpy file - - :param filename: name of numpy file to be read - :return: data stored in file as numpy array - """ - return np.load(filename) - - -def _create_parent_folder(filename: str) -> None: - path = os.path.dirname(filename) - if path != "": - os.makedirs(path, exist_ok=True) - - def write_data( filename: str, data: Any, data_format: Optional[MimeType] = None, compress: bool = False, add: bool = False ) -> None: @@ -184,7 +109,7 @@ def write_data( :param data: image data to write to file :param data_format: format of output file. Default is `None` :param compress: whether to compress data or not. Default is `False` - :param add: whether to append to existing text file or not. Default is `False` + :param add: whether to append to existing file or not. Only supported for TXT. Default is `False` :raises: exception if numpy format is not supported or file cannot be written """ _create_parent_folder(filename) @@ -193,121 +118,40 @@ def write_data( data_format = get_data_format(filename) if data_format is MimeType.TIFF: - return write_tiff_image(filename, data, compress) - if data_format.is_image_format(): - return write_image(filename, data) - if data_format is MimeType.TXT: - return write_text(filename, data, add=add) + tiff.imwrite(filename, data, compression=("lzma" if compress else None)) - try: - available_writers: Dict[MimeType, Callable[[str, Any], None]] = { - MimeType.RAW: write_bytes, - MimeType.CSV: write_csv, - MimeType.JSON: write_json, - MimeType.XML: write_xml, - MimeType.GML: write_xml, - } - return available_writers[data_format](filename, data) - except KeyError as exception: - raise ValueError(f"Writing data format {data_format} is not supported") from exception + elif data_format.is_image_format(): + Image.fromarray(data).save(filename) + elif data_format is MimeType.NPY: + np.save(filename, data) -def write_tiff_image(filename: str, image: np.ndarray, compress: bool = False) -> None: - """Write image data to TIFF file - - :param filename: name of file to write data to - :param image: image data to write to file - :param compress: whether to compress data. If `True`, lzma compression is used. Default is `False` - """ - if compress: - return tiff.imwrite(filename, image, compression="lzma") # lossless compression, works very well on masks - return tiff.imwrite(filename, image) - - -def write_jp2_image(filename: str, image: np.ndarray) -> None: - """Write image data to JPEG2000 file - - :param filename: name of JPEG2000 file to write data to - :param image: image data to write to file - """ - # Other options: - # return glymur.Jp2k(filename, data=image) - # cv2.imwrite(filename, image) - return write_image(filename, image) - - -def write_image(filename: str, image: np.ndarray) -> None: - """Write image data to PNG, JPG file - - :param filename: name of PNG or JPG file to write data to - :param image: image data to write to file - """ - data_format = get_data_format(filename) - if data_format is MimeType.JPG: - warnings.warn("JPEG is a lossy format therefore saved data will be modified.", category=SHUserWarning) - return Image.fromarray(image).save(filename) - - -def write_text(filename: str, data: np.ndarray, add: bool = False) -> None: - """Write image data to text file - - :param filename: name of text file to write data to - :param data: image data to write to text file - :param add: whether to append to existing file or not. Default is `False` - """ - write_type = "a" if add else "w" - with open(filename, write_type) as file: - print(data, end="", file=file) + elif data_format in (MimeType.XML, MimeType.GML): + data.write(filename) + elif data_format is MimeType.TXT: + with open(filename, "a" if add else "w") as file: + print(data, end="", file=file) -def write_csv(filename: str, data: np.ndarray, delimiter: str = CSV_DELIMITER) -> None: - """Write image data to CSV file + elif data_format is MimeType.RAW: + with open(filename, "wb") as file: + file.write(data) - :param filename: name of CSV file to write data to - :param data: image data to write to CSV file - :param delimiter: delimiter used in CSV file. Default is ``;`` - """ - with open(filename, "w") as file: - csv_writer = csv.writer(file, delimiter=delimiter) - for line in data: - csv_writer.writerow(line) + elif data_format is MimeType.CSV: + with open(filename, "w") as file: + csv_writer = csv.writer(file, delimiter=CSV_DELIMITER) + for line in data: + csv_writer.writerow(line) + elif data_format is MimeType.JSON: + with open(filename, "w") as file: + json.dump(data, file, indent=4, sort_keys=True) -def write_json(filename: str, data: Json) -> None: - """Write data to JSON file + else: + raise ValueError(f"Writing data format {data_format} is not supported") - :param filename: name of JSON file to write data to - :param data: data to write to JSON file - """ - with open(filename, "w") as file: - json.dump(data, file, indent=4, sort_keys=True) - -def write_xml(filename: str, element_tree: ElementTree.ElementTree) -> None: - """Write data to XML or GML file - - :param filename: name of XML or GML file to write data to - :param element_tree: data as ElementTree object - """ - return element_tree.write(filename) - # this will write declaration tag in first line: - # return element_tree.write(filename, encoding='utf-8', xml_declaration=True) - - -def write_numpy(filename: str, data: np.ndarray) -> None: - """Write data as numpy file - - :param filename: name of numpy file to write data to - :param data: data to write to numpy file - """ - return np.save(filename, data) - - -def write_bytes(filename: str, data: bytes) -> None: - """Write binary data into a file - - :param filename: name of file to write the data to - :param data: binary data to write - """ - with open(filename, "wb") as file: - file.write(data) +def _create_parent_folder(filename: str) -> None: + path = os.path.dirname(filename) + if path != "": + os.makedirs(path, exist_ok=True) diff --git a/sentinelhub/time_utils.py b/sentinelhub/time_utils.py index 9d78307e..4ac08534 100644 --- a/sentinelhub/time_utils.py +++ b/sentinelhub/time_utils.py @@ -6,8 +6,9 @@ import dateutil.parser import dateutil.tz +from typing_extensions import Literal -from .types import Literal, RawTimeIntervalType, RawTimeType +from .types import RawTimeIntervalType, RawTimeType TimeType = TypeVar("TimeType", dt.date, dt.datetime) # pylint: disable=invalid-name diff --git a/sentinelhub/types.py b/sentinelhub/types.py index 083d5b32..5996050d 100644 --- a/sentinelhub/types.py +++ b/sentinelhub/types.py @@ -1,15 +1,8 @@ """Module with custom types and utilities used in sentinelhub-py.""" import datetime as dt -import sys from typing import Any, Dict, Tuple, Union RawTimeType = Union[None, str, dt.date] RawTimeIntervalType = Tuple[RawTimeType, RawTimeType] JsonDict = Dict[str, Any] Json = Union[JsonDict, list, str, float, int, None] - - -if sys.version_info < (3, 8): - from typing_extensions import Literal # pylint: disable=unused-import -else: - from typing import Literal # pylint: disable=ungrouped-imports # noqa: F401 diff --git a/setup.py b/setup.py index 4c2b3d14..1fc44806 100644 --- a/setup.py +++ b/setup.py @@ -88,8 +88,8 @@ def try_create_config_file() -> None: "Bug Tracker": "https://github.com/sentinel-hub/sentinelhub-py/issues", "Forum": "https://forum.sentinel-hub.com", }, - author="Sinergise ltd.", - author_email="info@sentinel-hub.com", + author="Sinergise EO research team", + author_email="eoresearch@sinergise.com", license="MIT", packages=find_packages(), package_data={ diff --git a/tests/aws/test_data.py b/tests/aws/test_data.py index aa3097ba..7ac34343 100644 --- a/tests/aws/test_data.py +++ b/tests/aws/test_data.py @@ -5,7 +5,7 @@ from sentinelhub import DataCollection from sentinelhub.aws import AwsProductRequest, AwsTileRequest -pytestmark = pytest.mark.aws_integration +pytestmark = [pytest.mark.aws_integration, pytest.mark.filterwarnings("ignore::DeprecationWarning")] def test_aws_tile(output_folder: str) -> None: diff --git a/tests/aws/test_data_safe.py b/tests/aws/test_data_safe.py index 8d63d680..e2d0c45a 100644 --- a/tests/aws/test_data_safe.py +++ b/tests/aws/test_data_safe.py @@ -6,6 +6,8 @@ from sentinelhub import DataCollection, read_data from sentinelhub.aws import AwsConstants, AwsProductRequest, AwsTileRequest +pytestmark = [pytest.mark.aws_integration, pytest.mark.filterwarnings("ignore::DeprecationWarning")] + @pytest.fixture(name="safe_folder", scope="session") def safe_folder_fixture(input_folder: str) -> str: diff --git a/tests/aws/test_request.py b/tests/aws/test_request.py index 0f26d403..9e00d7fc 100644 --- a/tests/aws/test_request.py +++ b/tests/aws/test_request.py @@ -4,6 +4,8 @@ from sentinelhub.aws import AwsProductRequest +pytestmark = pytest.mark.filterwarnings("ignore::DeprecationWarning") + @pytest.mark.aws_integration def test_saving_responses(output_folder: str) -> None: diff --git a/tests/test_config.py b/tests/test_config.py index b949b5b2..2f864997 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -3,6 +3,7 @@ """ import json import os +import shutil from typing import Any, Generator import pytest @@ -10,24 +11,61 @@ from sentinelhub import SHConfig -@pytest.fixture(name="restore_config") -def restore_config_fixture() -> Generator[None, None, None]: - """A fixture that makes sure original config is restored after a test is executed. It restores the config even if +@pytest.fixture(autouse=True, scope="module") +def mask_and_restore_config_fixture() -> Generator[None, None, None]: + """A fixture that makes sure original config is restored after tests are executed. It restores the config even if a test has failed. """ - original_config = SHConfig() + config_path = SHConfig.get_config_location() + cache_path = config_path.replace(".json", "_test_cache.json") + shutil.move(config_path, cache_path) + + # Create a mock config + config = SHConfig(use_defaults=True) + config.geopedia_wms_url = "zero-drama-llama.com" + config.download_timeout_seconds = 100 + config.max_download_attempts = 42 + config.save() + yield - original_config.save() + os.remove(config_path) + shutil.move(cache_path, config_path) + SHConfig._cache = None # makes sure the next invocation loads the SHConfig -def test_config_file() -> None: + +@pytest.fixture(name="restore_config_file") +def restore_config_file_fixture() -> Generator[None, None, None]: + """A fixture that ensures the config file is reset after the test.""" config = SHConfig() + yield + config.save() + + +@pytest.fixture(name="test_config") +def test_config_fixture() -> SHConfig: + config = SHConfig(use_defaults=True) + config.instance_id = "fake_instance_id" + config.sh_client_id = "tester" + config.sh_client_secret = "1_l1k3-p1n34ppl3*0n%p1224" + return config + +@pytest.mark.dependency() +def test_fake_config_during_tests() -> None: + config = SHConfig() + credentials_removed = all(config[field] == "" for field in config.CREDENTIALS) + assert credentials_removed, "Credentials not properly removed for testing. Aborting tests." + + +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) +def test_config_file() -> None: + config = SHConfig() config_file = config.get_config_location() assert os.path.isfile(config_file), f"Config file does not exist: {os.path.abspath(config_file)}" - with open(config_file, "r") as fp: - config_dict = json.load(fp) + with open(config_file, "r") as file_handle: + config_dict = json.load(file_handle) for param, value in config_dict.items(): if param in config.CREDENTIALS: @@ -36,15 +74,16 @@ def test_config_file() -> None: if isinstance(value, str): value = value.rstrip("/") - assert config[param] == value + assert config[param] == value, f"Parameter {param} does not match it's equivalent in the config.json." -def test_reset() -> None: +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) +def test_set_and_reset_value() -> None: config = SHConfig() - default_config = SHConfig(use_defaults=True) old_value = config.instance_id new_value = "new" + config.instance_id = new_value assert config.instance_id == new_value, "New value was not set" assert config["instance_id"] == new_value, "New value was not set" @@ -55,10 +94,11 @@ def test_reset() -> None: assert config.instance_id == new_value, "Instance ID should not reset yet" config.reset() - assert config.instance_id == default_config.instance_id, "Instance ID should reset" + assert config.instance_id == "", "Instance ID should reset" -def test_save(restore_config: None) -> None: +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) +def test_save(restore_config_file: None) -> None: config = SHConfig() old_value = config.download_timeout_seconds @@ -77,43 +117,47 @@ def test_save(restore_config: None) -> None: assert config.download_timeout_seconds == new_value, "Saved value should have changed" -def test_copy() -> None: - config = SHConfig(hide_credentials=True) +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) +@pytest.mark.parametrize("hide_credentials", [True, False]) +def test_copy(hide_credentials: bool) -> None: + config = SHConfig(hide_credentials=hide_credentials) config.instance_id = "a" copied_config = config.copy() - assert copied_config._hide_credentials + assert copied_config is not config + assert copied_config._hide_credentials == hide_credentials assert copied_config._cache is config._cache assert copied_config.instance_id == config.instance_id copied_config.instance_id = "b" - assert config.instance_id == "a" + assert config.instance_id == "a" and copied_config.instance_id == "b" -def test_config_equality() -> None: - assert SHConfig() != 42 +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) +def test_config_equality(test_config: SHConfig) -> None: + assert test_config != 42 + assert test_config != test_config.get_config_dict() - config1 = SHConfig(hide_credentials=False, use_defaults=True) - config2 = SHConfig(hide_credentials=True, use_defaults=True) + config1 = SHConfig(hide_credentials=False) + config2 = SHConfig(hide_credentials=True) assert config1 is not config2 assert config1 == config2 - config2.sh_client_id = "XXX" + config2.sh_client_id = "something_else" assert config1 != config2 -def test_raise_for_missing_instance_id() -> None: - config = SHConfig() - - config.instance_id = "xxx" - config.raise_for_missing_instance_id() +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) +def test_raise_for_missing_instance_id(test_config: SHConfig) -> None: + test_config.raise_for_missing_instance_id() - config.instance_id = "" + test_config.instance_id = "" with pytest.raises(ValueError): - config.raise_for_missing_instance_id() + test_config.raise_for_missing_instance_id() +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) @pytest.mark.parametrize("hide_credentials", [False, True]) def test_config_repr(hide_credentials: bool) -> None: config = SHConfig(hide_credentials=hide_credentials) @@ -130,6 +174,7 @@ def test_config_repr(hide_credentials: bool) -> None: assert f"{param}={repr(config[param])}" in config_repr +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) @pytest.mark.parametrize("hide_credentials", [False, True]) def test_get_config_dict(hide_credentials: bool) -> None: config = SHConfig(hide_credentials=hide_credentials) @@ -148,25 +193,24 @@ def test_get_config_dict(hide_credentials: bool) -> None: assert config_dict["aws_secret_access_key"] == config.aws_secret_access_key -def test_transfer_with_ray(ray: Any) -> None: +@pytest.mark.dependency(depends=["test_fake_config_during_tests"]) +def test_transfer_with_ray(test_config: SHConfig, ray: Any) -> None: """This test makes sure that the process of transferring SHConfig object to a Ray worker, working with it, and sending it back works correctly. """ - config = SHConfig() - config.instance_id = "x" def _remote_ray_testing(remote_config: SHConfig) -> SHConfig: """Makes a few checks and modifications to the config object""" assert repr(remote_config).startswith("SHConfig") assert isinstance(remote_config.get_config_dict(), dict) assert os.path.exists(remote_config.get_config_location()) - assert remote_config.instance_id == "x" + assert remote_config.instance_id == "fake_instance_id" - remote_config.instance_id = "y" + remote_config.instance_id = "new_fake_instance_id" return remote_config - config_future = ray.remote(_remote_ray_testing).remote(config) + config_future = ray.remote(_remote_ray_testing).remote(test_config) transferred_config = ray.get(config_future) - assert repr(config).startswith("SHConfig") - assert transferred_config.instance_id == "y" + assert repr(test_config).startswith("SHConfig") + assert transferred_config.instance_id == "new_fake_instance_id" diff --git a/tests/test_constants.py b/tests/test_constants.py index bdb4a4ff..e0946c66 100644 --- a/tests/test_constants.py +++ b/tests/test_constants.py @@ -1,35 +1,33 @@ """ Tests for constants.py module """ -from typing import Any, Type import numpy as np import pyproj import pytest from sentinelhub import CRS, MimeType -from sentinelhub.constants import RequestType +from sentinelhub.constants import RequestType, ResamplingType from sentinelhub.exceptions import SHUserWarning @pytest.mark.parametrize( - "lng, lat, epsg", + "lng, lat, expected_crs", [ - (13, 46, "32633"), - (13, 0, "32633"), - (13, -45, "32733"), - (13, 0, "32633"), - (13, -0.0001, "32733"), - (13, -46, "32733"), + (13, 46, CRS("32633")), + (13, 0, CRS("32633")), + (13, -45, CRS("32733")), + (13, 0, CRS("32633")), + (13, -0.0001, CRS("32733")), + (13, -46, CRS("32733")), ], ) -def test_utm(lng: float, lat: float, epsg: str) -> None: - crs = CRS.get_utm_from_wgs84(lng, lat) - assert epsg == crs.value +def test_utm_from_wgs84(lng: float, lat: float, expected_crs: CRS) -> None: + assert CRS.get_utm_from_wgs84(lng, lat) is expected_crs @pytest.mark.parametrize( - "parse_value, expected", + "crs_input, expected", [ (4326, CRS.WGS84), (np.int64(4326), CRS.WGS84), @@ -44,34 +42,32 @@ def test_utm(lng: float, lat: float, epsg: str) -> None: (pyproj.CRS(3857), CRS.POP_WEB), ], ) -def test_crs_parsing(parse_value: Any, expected: CRS) -> None: - parsed_result = CRS(parse_value) - assert parsed_result == expected +def test_crs_input(crs_input: object, expected: CRS) -> None: + assert CRS(crs_input) is expected -@pytest.mark.parametrize("parse_value, expected, warning", [(pyproj.CRS(4326), CRS.WGS84, SHUserWarning)]) -def test_crs_parsing_warn(parse_value: Any, expected: CRS, warning: Type[Warning]) -> None: - with pytest.warns(warning): - parsed_result = CRS(parse_value) - assert parsed_result == expected +def test_crs_input_warn() -> None: + with pytest.warns(SHUserWarning): + parsed_result = CRS(pyproj.CRS(4326)) + assert parsed_result == CRS.WGS84 + + +@pytest.mark.parametrize("bad_input", ["string", "12", -1, 999, None, 3035.5]) +def test_crs_faulty_input(bad_input: object) -> None: + with pytest.raises(ValueError): + CRS(bad_input) @pytest.mark.parametrize( "crs, epsg", - [ - (CRS.POP_WEB, "EPSG:3857"), - (CRS.WGS84, "EPSG:4326"), - (CRS.UTM_33N, "EPSG:32633"), - (CRS.UTM_33S, "EPSG:32733"), - ], + [(CRS.POP_WEB, "EPSG:3857"), (CRS.WGS84, "EPSG:4326"), (CRS.UTM_33N, "EPSG:32633"), (CRS.UTM_33S, "EPSG:32733")], ) def test_ogc_string(crs: CRS, epsg: str) -> None: - ogc_str = CRS.ogc_string(crs) - assert epsg == ogc_str + assert crs.ogc_string() == epsg @pytest.mark.parametrize( - "crs, crs_repr", + "crs, expected_repr", [ (CRS.POP_WEB, "CRS('3857')"), (CRS.WGS84, "CRS('4326')"), @@ -83,8 +79,8 @@ def test_ogc_string(crs: CRS, epsg: str) -> None: (CRS("32733"), "CRS('32733')"), ], ) -def test_crs_repr(crs: CRS, crs_repr: str) -> None: - assert crs_repr == repr(crs) +def test_crs_repr(crs: CRS, expected_repr: str) -> None: + assert repr(crs) == expected_repr @pytest.mark.parametrize("crs", CRS) @@ -92,19 +88,11 @@ def test_crs_has_value(crs: CRS) -> None: assert CRS.has_value(crs.value), f"Expected support for CRS {crs.value}" -@pytest.mark.parametrize( - "value, fails", - [("string", True), (-1, True), (999, True), (None, True), (3035, False), ("EPSG:3035", False), (10000, False)], -) -def test_custom_crs(value: Any, fails: bool) -> None: - if fails: - with pytest.raises(ValueError): - CRS(value) - else: - CRS(CRS(value)) - - new_enum_value = str(value).lower().strip("epsg: ") - assert CRS.has_value(new_enum_value) +@pytest.mark.parametrize("crs_input, crs_value", [(3035, "3035"), ("EPSG:3035", "3035"), (10000, "10000")]) +def test_crs_not_predefined(crs_input: object, crs_value: str) -> None: + crs = CRS(crs_input) + assert crs.value == crs_value + assert CRS.has_value(crs_value) @pytest.mark.parametrize("crs", [CRS.WGS84, CRS.POP_WEB, CRS.UTM_38N]) @@ -135,10 +123,16 @@ def test_mimetype_no_value_fail(faulty_arg: str) -> None: MimeType.from_string(faulty_arg) -@pytest.mark.parametrize("ext", ["tif", "tiff", "jpg", "jpeg", "png", "jp2"]) -def test_is_image_format(ext: str) -> None: - mime_type = MimeType.from_string(ext) - assert MimeType.is_image_format(mime_type) +@pytest.mark.parametrize("mime_type", MimeType) +def test_is_image_format(mime_type: MimeType) -> None: + expected_to_be_image = mime_type in {MimeType.TIFF, MimeType.PNG, MimeType.JP2, MimeType.JPG} + assert MimeType.is_image_format(mime_type) == expected_to_be_image + + +@pytest.mark.parametrize("mime_type", MimeType) +def test_is_api_format(mime_type: MimeType) -> None: + expected_to_be_api_format = mime_type in {MimeType.JPG, MimeType.PNG, MimeType.TIFF, MimeType.JSON} + assert MimeType.is_api_format(mime_type) == expected_to_be_api_format @pytest.mark.parametrize( @@ -163,7 +157,13 @@ def test_get_string(mime_type: MimeType, expected_string: str) -> None: @pytest.mark.parametrize( "mime_type, path, expected_answer", - [(MimeType.NPY, "some/path/file.npy", True), (MimeType.GPKG, "file.gpkg.gz", False)], + [ + (MimeType.NPY, "some/path/file.npy", True), + (MimeType.PNG, "./file.png", True), + (MimeType.PNG, "./file.PNG", False), + (MimeType.GPKG, "file.gpkg.gz", False), + (MimeType.JSON, "path/to/file.geojson", False), + ], ) def test_matches_extension(mime_type: MimeType, path: str, expected_answer: bool) -> None: assert mime_type.matches_extension(path) == expected_answer @@ -172,6 +172,8 @@ def test_matches_extension(mime_type: MimeType, path: str, expected_answer: bool def test_get_expected_max_value() -> None: assert MimeType.TIFF.get_expected_max_value() == 65535 assert MimeType.PNG.get_expected_max_value() == 255 + assert MimeType.JPG.get_expected_max_value() == 255 + assert MimeType.JP2.get_expected_max_value() == 10000 with pytest.raises(ValueError): MimeType.TAR.get_expected_max_value() @@ -187,3 +189,11 @@ def test_request_type() -> None: # check that this goes through without errors RequestType("POST") RequestType("GET") + + +def test_resampling_type_not_case_sensitive() -> None: + ResamplingType("nearest") + ResamplingType("Nearest") + ResamplingType("NEAREST") + with pytest.raises(ValueError): + ResamplingType("nyearest") diff --git a/tests/test_data_collections.py b/tests/test_data_collections.py index d722748e..a3040fe0 100644 --- a/tests/test_data_collections.py +++ b/tests/test_data_collections.py @@ -1,7 +1,7 @@ """ Unit tests for data_collections module """ -from typing import Any +from typing import Any, Dict import pytest @@ -9,49 +9,92 @@ from sentinelhub.data_collections import DataCollectionDefinition -def test_repr() -> None: - definition = DataCollection.SENTINEL1_IW.value - representation = repr(definition) - - assert isinstance(representation, str) - assert representation.count("\n") >= 5 - - -def test_derive() -> None: - definition = DataCollectionDefinition(api_id="X", wfs_id="Y") - derived_definition = definition.derive(wfs_id="Z") - - assert derived_definition.api_id == "X" - assert derived_definition.wfs_id == "Z" - assert derived_definition.collection_type is None - +@pytest.mark.parametrize( + "data_colection_def, derive_attributes, expected_attributes", + [ + (DataCollectionDefinition(), {}, {"api_id": None}), + ( + DataCollectionDefinition(api_id="X", wfs_id="Y"), + {"wfs_id": "Z"}, + {"api_id": "X", "wfs_id": "Z", "collection_type": None}, + ), + (DataCollection.LANDSAT_MSS_L1.value, {"api_id": None}, {"api_id": None, "wfs_id": "DSS14"}), + ], +) +def test_derive( + data_colection_def: DataCollectionDefinition, derive_attributes: Dict[str, Any], expected_attributes: Dict[str, Any] +) -> None: + derived_definition = data_colection_def.derive(**derive_attributes) + + for attribute, value in expected_attributes.items(): + assert value == getattr(derived_definition, attribute) + + +@pytest.mark.parametrize( + "definition_input, expected", + [ + ({}, "DataCollectionDefinition(\n is_timeless: False\n has_cloud_coverage: False\n)"), + ( + {"api_id": "X", "_name": "A"}, + "DataCollectionDefinition(\n api_id: X\n is_timeless: False\n has_cloud_coverage: False\n)", + ), + ( + {"api_id": "Y", "is_timeless": True, "has_cloud_coverage": True}, + "DataCollectionDefinition(\n api_id: Y\n is_timeless: True\n has_cloud_coverage: True\n)", + ), + ], +) +def test_collection_repr(definition_input: Dict[str, Any], expected: str) -> None: + assert repr(DataCollectionDefinition(**definition_input)) == expected + + +@pytest.mark.parametrize( + "test_definition, equal_definition", + [ + ({"api_id": "X", "_name": "A"}, {"api_id": "X", "_name": "A"}), + ({"api_id": "X", "_name": "A"}, {"api_id": "X", "_name": "B"}), + ({"api_id": "X", "is_timeless": False}, {"api_id": "X", "is_timeless": False, "_name": "B"}), + ({"api_id": "X", "is_timeless": False}, {"api_id": "X"}), + ], +) +def test_collection_definitions_equal(test_definition: Dict[str, Any], equal_definition: Dict[str, Any]) -> None: + def1 = DataCollectionDefinition(**test_definition) + def2 = DataCollectionDefinition(**equal_definition) + assert def1 == def2 -def test_compare() -> None: - def1 = DataCollectionDefinition(api_id="X", _name="A") - def2 = DataCollectionDefinition(api_id="X", _name="B") - assert def1 == def2 +@pytest.mark.parametrize( + "test_definition, equal_definition", + [ + ({"api_id": "X", "_name": "A"}, {"api_id": "Y", "_name": "A"}), + ({"api_id": "X", "is_timeless": True}, {"api_id": "X"}), + ({"api_id": "X", "wfs_id": 2132342143454364}, {"api_id": "X"}), + ], +) +def test_collection_definitions_not_equal(test_definition: Dict[str, Any], equal_definition: Dict[str, Any]) -> None: + def1 = DataCollectionDefinition(**test_definition) + def2 = DataCollectionDefinition(**equal_definition) + assert def1 != def2 def test_define() -> None: - for _ in range(3): - data_collection = DataCollection.define( - "NEW", api_id="X", sensor_type="Sensor", bands=("B01",), is_timeless=True - ) + data_collection = DataCollection.define("NEW", api_id="X", sensor_type="Sensor", bands=("B01",), is_timeless=True) assert data_collection == DataCollection.NEW + assert DataCollection.NEW.api_id == "X" + # Should fail because DataCollection with same api_id already exists. with pytest.raises(ValueError): DataCollection.define("NEW_NEW", api_id="X", sensor_type="Sensor", bands=("B01",), is_timeless=True) + # Should fail because DataCollection with same name already exists. with pytest.raises(ValueError): DataCollection.define("NEW", api_id="Y") def test_define_from() -> None: bands = ["B01", "XYZ"] - for _ in range(3): - data_collection = DataCollection.define_from(DataCollection.SENTINEL5P, "NEW_5P", api_id="X", bands=bands) + data_collection = DataCollection.define_from(DataCollection.SENTINEL5P, "NEW_5P", api_id="X", bands=bands) assert data_collection == DataCollection.NEW_5P assert data_collection.api_id == "X" @@ -59,27 +102,39 @@ def test_define_from() -> None: assert data_collection.bands == tuple(bands) -def test_define_byoc_and_batch() -> None: +def test_define_byoc() -> None: byoc_id = "0000d273-7e89-4f00-971e-9024f89a0000" byoc = DataCollection.define_byoc(byoc_id, name="MY_BYOC") - batch = DataCollection.define_batch(byoc_id, name="MY_BATCH") assert byoc == DataCollection.MY_BYOC + assert byoc.api_id.endswith(byoc_id) + assert byoc.collection_id == byoc_id + + assert DataCollection.MY_BYOC.is_byoc + assert not DataCollection.SENTINEL5P.is_byoc + + +def test_define_batch() -> None: + batch_id = "0000d273-7e89-4f00-971e-9024f89a0000" + batch = DataCollection.define_batch(batch_id, name="MY_BATCH") + assert batch == DataCollection.MY_BATCH + assert batch.api_id.endswith(batch_id) + assert batch.collection_id == batch_id - for data_collection in [byoc, batch]: - assert data_collection.api_id.endswith(byoc_id) - assert data_collection.collection_id == byoc_id + assert DataCollection.MY_BATCH.is_batch + assert not DataCollection.SENTINEL2_L2A.is_batch -def test_attributes() -> None: - data_collection = DataCollection.SENTINEL3_OLCI +@pytest.mark.parametrize("data_collection", [DataCollection.SENTINEL3_OLCI, DataCollection.SENTINEL2_L2A]) +@pytest.mark.parametrize("attribute", ["api_id", "catalog_id", "wfs_id", "service_url", "bands", "sensor_type"]) +def test_attributes(data_collection: DataCollection, attribute: str) -> None: + value = getattr(data_collection, attribute) + assert value is not None + assert value == getattr(data_collection.value, attribute) - for attr_name in ["api_id", "catalog_id", "wfs_id", "service_url", "bands", "sensor_type"]: - value = getattr(data_collection, attr_name) - assert value is not None - assert value == getattr(data_collection.value, attr_name) +def test_attributes_empty_fail() -> None: data_collection = DataCollection.define("EMPTY") for attr_name in ["api_id", "catalog_id", "wfs_id", "bands"]: @@ -89,25 +144,42 @@ def test_attributes() -> None: assert data_collection.service_url is None -def test_sentinel1_checks() -> None: - assert DataCollection.SENTINEL1_IW.is_sentinel1 - assert not DataCollection.SENTINEL2_L1C.is_sentinel1 - - assert DataCollection.SENTINEL1_IW_ASC.contains_orbit_direction("ascending") - assert not DataCollection.SENTINEL1_IW_DES.contains_orbit_direction("ascending") - - assert DataCollection.SENTINEL2_L2A.contains_orbit_direction("descending") +@pytest.mark.parametrize( + "test_collection, expected", + [ + (DataCollection.SENTINEL2_L1C, False), + (DataCollection.SENTINEL1_EW, True), + (DataCollection.LANDSAT_TM_L1, False), + ], +) +def test_is_sentinel1(test_collection: DataCollection, expected: bool) -> None: + assert test_collection.is_sentinel1 == expected + + +@pytest.mark.parametrize( + "collection, direction, expected", + [ + ("SENTINEL1_IW_ASC", "ascending", True), + ("SENTINEL1_IW_ASC", "descending", False), + ("SENTINEL1_IW_DES", "ascending", False), + ("SENTINEL2_L2A", "descending", True), + ("SENTINEL2_L2A", "ascending", True), + ], +) +def test_contains_orbit_direction(collection: str, direction: str, expected: bool) -> None: + data_collection = getattr(DataCollection, collection) + assert data_collection.contains_orbit_direction(direction) == expected def test_get_available_collections() -> None: + number_of_collection = len(DataCollection.get_available_collections()) + DataCollection.define("NEW_NEW", api_id="Z") + DataCollection.define_batch("batch_id", name="MY_NEW_BATCH") + DataCollection.define_byoc("byoc_id", name="MY_NEW_BYOC") collections = DataCollection.get_available_collections() - assert helper_check_collection_list(collections) - -def helper_check_collection_list(collection_list: Any) -> bool: - is_list = isinstance(collection_list, list) - contains_collections = all(isinstance(data_collection, DataCollection) for data_collection in collection_list) - return is_list and contains_collections + assert len(collections) == number_of_collection + 3 + assert all(isinstance(collection, DataCollection) for collection in collections) def test_transfer_with_ray(ray: Any) -> None: diff --git a/tests/test_geometry.py b/tests/test_geometry.py index 887b4949..326e4d51 100644 --- a/tests/test_geometry.py +++ b/tests/test_geometry.py @@ -1,222 +1,193 @@ import copy -from typing import Any, List, Tuple, TypeVar, Union +import warnings +from typing import Any, Tuple, TypeVar import pytest import shapely.geometry from pytest import approx -from sentinelhub import CRS, BBox, BBoxCollection, Geometry, get_utm_crs -from sentinelhub.geometry import _BaseGeometry +from sentinelhub import CRS, BBox, Geometry, get_utm_crs +from sentinelhub.exceptions import SHDeprecationWarning -GeoType = TypeVar("GeoType", bound=_BaseGeometry) +GeoType = TypeVar("GeoType", BBox, Geometry) WKT_STRING = ( "MULTIPOLYGON (((40 40, 20 45, 45 30, 40 40)), ((20 35, 10 30, 10 10, 30 5, 45 20, 20 35), " "(30 20, 20 15, 20 25, 30 20)))" ) -polygon = shapely.geometry.Polygon([(0, 0), (0, 1), (1, 1), (1, 0), (0, 0)]) -GEOMETRY1 = Geometry(polygon, CRS(32633)) +GEOMETRY1 = Geometry(shapely.geometry.Polygon([(0, 0), (0, 1), (1, 1), (1, 0), (0, 0)]), CRS(32633)) GEOMETRY2 = Geometry(WKT_STRING, CRS.WGS84) -BBOX = BBox(bbox=[14.00, 45.00, 14.03, 45.03], crs=CRS.WGS84) -BBOX_COLLECTION = BBoxCollection([BBOX, BBox("46,13,47,20", CRS.WGS84)]) +BBOX = BBox(bbox=(14.00, 45.00, 14.03, 45.03), crs=CRS.WGS84) -GEOMETRY_LIST = [GEOMETRY1, GEOMETRY2, BBOX_COLLECTION, BBOX] - - -def _round_point_coords(x: float, y: float, decimals: int = 1) -> Tuple[float, float]: - """Rounds coordinates of a point""" - return round(x, decimals), round(y, decimals) - - -def test_bbox_no_crs() -> None: - with pytest.raises(TypeError): - BBox("46,13,47,20") # type: ignore[call-arg] - - -def test_bbox_from_string() -> None: - bbox_str = "46.07, 13.23, 46.24, 13.57" - bbox = BBox(bbox_str, CRS.WGS84) - assert bbox.lower_left == (46.07, 13.23) - assert bbox.upper_right == (46.24, 13.57) - assert bbox.crs == CRS.WGS84 - - -def test_bbox_from_bad_string() -> None: - with pytest.raises(ValueError): - # Too few coordinates - BBox("46.07, 13.23, 46.24", CRS.WGS84) - - with pytest.raises(ValueError): - # Invalid string - BBox("46N,13E,45N,12E", CRS.WGS84) +GEOMETRY_LIST = [GEOMETRY1, GEOMETRY2, BBOX] @pytest.mark.parametrize( - "bbox_coords", + "coords, crs", [ - [46.07, 13.23, 46.24, 13.57], - [46.24, 13.23, 46.07, 13.57], - [46.07, 13.57, 46.24, 13.23], - [46.24, 13.57, 46.07, 13.23], + ([[46.07, 13.23], [46.24, 13.57]], CRS.WGS84), + ((46.07, 13.23, 46.24, 13.57), CRS.POP_WEB), + (((46.07, 13.23), (46.24, 13.57)), CRS(8687)), + ([(46.07, 13.23), (46.24, 13.57)], CRS.WGS84), + ({"min_x": 46.07, "min_y": 13.23, "max_x": 46.24, "max_y": 13.57}, CRS.POP_WEB), ], ) -def test_bbox_from_flat_list(bbox_coords: List[float]) -> None: - bbox = BBox(bbox_coords, CRS.WGS84) - assert bbox.lower_left == (46.07, 13.23) +def test_bbox_different_input_options(coords: Any, crs: CRS) -> None: + bbox = BBox(coords, crs) assert bbox.upper_right == (46.24, 13.57) - assert bbox.crs == CRS.WGS84 + assert bbox.lower_left == (46.07, 13.23) + assert bbox.crs == crs @pytest.mark.parametrize( - "bbox_input", + "coords, crs", [ - [[46.07, 13.23], [46.24, 13.57]], - (46.07, 13.23, 46.24, 13.57), - ((46.07, 13.23), (46.24, 13.57)), - [(46.07, 13.23), (46.24, 13.57)], - {"min_x": 46.07, "min_y": 13.23, "max_x": 46.24, "max_y": 13.57}, - BBox({"min_x": 46.07, "min_y": 13.23, "max_x": 46.24, "max_y": 13.57}, CRS.WGS84), + ({"x1": 46.07, "y1": 13.23, "x2": 46.24, "y2": 13.57}, CRS.WGS84), + ((46.07, 13.23, 46.24, 13.57), None), + ((46.07, 13.23, (46.24, 13.57)), CRS.WGS84), ], ) -def test_bbox_different_input(bbox_input: Any) -> None: - bbox = BBox(bbox_input, CRS.WGS84) - assert bbox.upper_right == (46.24, 13.57) - assert bbox.lower_left == (46.07, 13.23) - assert bbox.crs == CRS.WGS84 +def test_bbox_bad_input_options(coords: Any, crs: CRS) -> None: + with pytest.raises((KeyError, ValueError)): + BBox(coords, crs) -def test_bbox_from_bad_dict() -> None: - bbox_dict = {"x1": 46.07, "y1": 13.23, "x2": 46.24, "y2": 13.57} - with pytest.raises(KeyError): - BBox(bbox_dict, CRS.WGS84) +def test_bbox_to_str() -> None: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", SHDeprecationWarning) + bbox = BBox(((45.0, 12.0, 47.0, 14.0)), CRS.WGS84) + assert str(bbox) == "45.0,12.0,47.0,14.0" @pytest.mark.parametrize( - "bbox_input", + "coords, crs, expected", [ - shapely.geometry.LineString([(0, 0), (1, 1)]), - shapely.geometry.LinearRing([(1, 0), (1, 1), (0, 0)]), - shapely.geometry.Polygon([(1, 0), (1, 1), (0, 0)]), + ((46.07, 13.23, 46.24, 13.57), CRS(4326), "BBox(((46.07, 13.23), (46.24, 13.57)), crs=CRS('4326'))"), + (((42, 13.23), (47.453, 18.57)), CRS.POP_WEB, "BBox(((42.0, 13.23), (47.453, 18.57)), crs=CRS('3857'))"), ], ) -def test_bbox_from_shapely(bbox_input: Any) -> None: - assert BBox(bbox_input, CRS.WGS84) == BBox((0, 0, 1, 1), CRS.WGS84) +def test_bbox_repr(coords: Any, crs: CRS, expected: str) -> None: + assert repr(BBox(coords, crs)) == expected -def test_bbox_to_str() -> None: - x1, y1, x2, y2 = 45.0, 12.0, 47.0, 14.0 - crs = CRS.WGS84 - expect_str = f"{x1},{y1},{x2},{y2}" - bbox = BBox(((x1, y1), (x2, y2)), crs) - assert str(bbox) == expect_str +def test_bbox_iter() -> None: + assert tuple(BBOX) == (14.00, 45.00, 14.03, 45.03) + assert list(BBOX) == [14.00, 45.00, 14.03, 45.03] -def test_bbox_to_repr() -> None: - x1, y1, x2, y2 = 45.0, 12.0, 47.0, 14.0 - bbox = BBox(((x1, y1), (x2, y2)), crs=CRS("4326")) - expect_repr = f"BBox((({x1}, {y1}), ({x2}, {y2})), crs=CRS('4326'))" - assert repr(bbox) == expect_repr +@pytest.mark.parametrize( + "bbox1, bbox2", + [ + [BBOX, BBOX], + [BBox([46.07, 13.23, 46.24, 13.57], CRS.WGS84), BBox(((46.07, 13.23), (46.24, 13.57)), crs=CRS(4326))], + [BBox(((0, 0), (1, 1)), CRS(1234)), BBox({"min_x": 0, "min_y": 0, "max_x": 1, "max_y": 1}, CRS("epsg:1234"))], + ], +) +def test_bbox_eq_true(bbox1: BBox, bbox2: BBox) -> None: + assert bbox1 == bbox2 -def test_bbox_iter() -> None: - bbox_lst = [46.07, 13.23, 46.24, 13.57] - bbox = BBox(bbox_lst, CRS.WGS84) - list_from_bbox_iter = list(bbox) - assert list_from_bbox_iter == bbox_lst +@pytest.mark.parametrize( + "bbox1, bbox2", + [ + pytest.param(BBox((0, 0, 1, 1), CRS(1234)), (0, 0, 1, 1), id="different_types"), + pytest.param(BBox((0, 0, 1, 1), CRS(1234)), BBox((0, 0, 1, 1), CRS(4321)), id="different_CRS"), + pytest.param(BBox((0, 0, 1, 1), CRS(1234)), BBox((0, 0.00000001, 1, 1), CRS(1234)), id="different_coords"), + ], +) +def test_bbox_eq_false(bbox1: BBox, bbox2: BBox) -> None: + assert bbox1 != bbox2 -def test_bbox_eq() -> None: - bbox1 = BBox([46.07, 13.23, 46.24, 13.57], CRS.WGS84) - bbox2 = BBox(((46.24, 13.57), (46.07, 13.23)), 4326) - bbox3 = BBox([46.07, 13.23, 46.24, 13.57], CRS.POP_WEB) - bbox4 = BBox([46.07, 13.23, 46.24, 13.58], CRS.WGS84) - assert bbox1 == bbox2 - assert bbox1 != bbox3 - assert bbox1 != bbox4 - assert bbox1 is not None +def test_bbox_transform() -> None: + original_bbox = BBox((46.07, 13.23, 46.24, 13.57), CRS.WGS84) + transformed_bbox = original_bbox.transform(CRS.POP_WEB) + assert transformed_bbox.crs == CRS.POP_WEB + assert list(transformed_bbox) == approx([5128488.941, 1486021.486, 5147413.254, 1524929.4087], rel=1e-10) -def test_transform() -> None: - bbox1 = BBox([46.07, 13.23, 46.24, 13.57], CRS.WGS84) - bbox2 = bbox1.transform(CRS.POP_WEB).transform(CRS.WGS84) + reconstructed_bbox = transformed_bbox.transform(CRS.WGS84) - for coord1, coord2 in zip(bbox1, bbox2): - assert coord1 == approx(coord2, abs=1e-8) - assert bbox1.crs == bbox2.crs + assert list(original_bbox) == approx(list(reconstructed_bbox), rel=1e-10) + assert original_bbox.crs == reconstructed_bbox.crs -def test_transform_bounds() -> None: - bbox1 = BBox([46.07, 13.23, 46.24, 13.57], CRS.WGS84) - utm_crs = get_utm_crs(*bbox1.middle, source_crs=CRS.WGS84) - bbox2 = bbox1.transform_bounds(utm_crs).transform_bounds(CRS.WGS84) +def test_bbox_transform_bounds() -> None: + original_bbox = BBox((46.07, 13.23, 46.24, 13.57), CRS.WGS84) + utm_crs = get_utm_crs(*original_bbox.middle, source_crs=CRS.WGS84) + reconstructed_bbox = original_bbox.transform_bounds(utm_crs).transform_bounds(CRS.WGS84) - assert bbox2.geometry.contains(bbox1.geometry) - assert bbox2.geometry.difference(bbox1.geometry).area > 1e-4 + assert reconstructed_bbox.geometry.contains(original_bbox.geometry) + area_diff = reconstructed_bbox.geometry.difference(original_bbox.geometry).area + expected_diff = reconstructed_bbox.geometry.area / 20 # the area difference for this case is about 2.5% + assert area_diff < expected_diff -def test_geometry() -> None: - bbox = BBox([46.07, 13.23, 46.24, 13.57], CRS.WGS84) - assert isinstance(bbox.get_geojson(), dict) + +def test_bbox_geometry_attribute() -> None: + bbox = BBox((0, 0, 1, 1), CRS.WGS84) assert isinstance(bbox.geometry, shapely.geometry.Polygon) + assert bbox.geometry.equals(shapely.geometry.Polygon([[0, 0], [0, 1], [1, 1], [1, 0], [0, 0]])) -def test_buffer() -> None: - bbox = BBox([46.07, 13.23, 46.24, 13.57], CRS.WGS84) +@pytest.mark.parametrize( + "bbox, rel_buffered, abs_buffered", + [ + [BBox((10, 10, 20, 20), CRS.WGS84), (5, 5, 25, 25), (9.8, 9.8, 20.2, 20.2)], + [BBox((46.05, 13.21, 47.40, 13.41), CRS.POP_WEB), (45.375, 13.11, 48.075, 13.51), (45.85, 13.01, 47.6, 13.61)], + ], +) +def test_bbox_buffer(bbox, rel_buffered, abs_buffered) -> None: + for relative in (True, False): + assert bbox.buffer(3.7, relative=relative).crs == bbox.crs - assert bbox != bbox.buffer(42) - assert bbox == bbox.buffer(0) - assert bbox == bbox.buffer(1).buffer(-0.5, relative=True) - assert bbox == bbox.buffer((10, -0.1)).buffer((-10 / 11, 1 / 9)) + assert bbox.buffer(0) is not bbox and bbox.buffer(0) == bbox - assert bbox != bbox.buffer(42, relative=False) - assert bbox == bbox.buffer(0, relative=False) - assert bbox == bbox.buffer(3, relative=False).buffer(-3, relative=False) + assert tuple(bbox.buffer(1)) == approx(rel_buffered) + assert tuple(bbox.buffer(0.2, relative=False)) == approx(abs_buffered) + + assert bbox == bbox.buffer((10, -0.1)).buffer((-10 / 11, 1 / 9)) assert bbox == bbox.buffer((-0.01, 0.2), relative=False).buffer((0.01, -0.2), relative=False) + +@pytest.mark.parametrize("buffer, relative", [(-1, True), ((1, -0.5), False)]) +def test_bbox_buffer_fault_input(buffer, relative) -> None: + bbox = BBox((46.05, 13.21, 47.40, 13.41), CRS.POP_WEB) with pytest.raises(ValueError): - bbox.buffer(-1) - with pytest.raises(ValueError): - bbox.buffer((1, -0.5), relative=False) + bbox.buffer(buffer, relative=relative) @pytest.mark.parametrize("geometry", GEOMETRY_LIST) -def test_repr(geometry: GeoType) -> None: +def test_geometry_repr(geometry: GeoType) -> None: assert isinstance(repr(geometry), str) @pytest.mark.parametrize("geometry", GEOMETRY_LIST) -def test_eq(geometry: GeoType) -> None: +def test_geometry_eq(geometry: GeoType) -> None: assert geometry == copy.deepcopy(geometry), "Deep copied object should be equal to the original" - assert geometry is not None + assert geometry != geometry.geometry @pytest.mark.parametrize("geometry", GEOMETRY_LIST) -def test_reverse(geometry: GeoType) -> None: +def test_geometry_reverse(geometry: GeoType) -> None: reversed_geometry = geometry.reverse() assert geometry != reversed_geometry assert geometry == reversed_geometry.reverse(), "Twice reversed geometry should equal the original" @pytest.mark.parametrize("geometry", GEOMETRY_LIST) -def test_transform_geometry(geometry: GeoType) -> None: - new_geometry = geometry.transform(CRS.POP_WEB) +@pytest.mark.parametrize("new_crs", [CRS.POP_WEB, CRS(32737)]) +def test_transform_geometry(new_crs: CRS, geometry: GeoType) -> None: + new_geometry = geometry.transform(new_crs) + assert new_geometry.crs == new_crs assert geometry != new_geometry, "Transformed geometry should be different" - original_geometry = geometry.transform(geometry.crs) - assert geometry.crs == original_geometry.crs, "CRS of twice transformed geometry should preserve" - assert geometry.geometry.area == approx(original_geometry.geometry.area, abs=1e-10), "Geometry area should be equal" - + reconstructed_geometry = new_geometry.transform(geometry.crs) + assert geometry.crs == reconstructed_geometry.crs + assert geometry.geometry.equals_exact(reconstructed_geometry.geometry, tolerance=1e-6) -@pytest.mark.parametrize("geometry", [GEOMETRY1, GEOMETRY2]) -def test_geojson(geometry: Geometry) -> None: - assert geometry == Geometry( - geometry.geojson, geometry.crs - ), "Transforming geometry to geojson and back should preserve it" - assert geometry == Geometry.from_geojson(geometry.geojson) - assert geometry == Geometry.from_geojson(geometry.get_geojson()) - -def test_geojson_parameter_with_crs() -> None: +def test_geometry_geojson_parameter_with_crs() -> None: expected_without_crs = { "type": "Polygon", "coordinates": (((0.0, 0.0), (0.0, 1.0), (1.0, 1.0), (1.0, 0.0), (0.0, 0.0)),), @@ -225,22 +196,27 @@ def test_geojson_parameter_with_crs() -> None: "crs": {"type": "name", "properties": {"name": "urn:ogc:def:crs:EPSG::32633"}}, **expected_without_crs, } + assert GEOMETRY1.geojson == GEOMETRY1.get_geojson() assert GEOMETRY1.get_geojson(with_crs=False) == expected_without_crs assert GEOMETRY1.get_geojson(with_crs=True) == expected_with_crs -def test_wkt() -> None: +@pytest.mark.parametrize("geometry", [GEOMETRY1, GEOMETRY2]) +def test_geometry_geojson_reconstructible(geometry: Geometry) -> None: + assert geometry == Geometry(geometry.geojson, geometry.crs) + assert geometry == Geometry.from_geojson(geometry.geojson) + + +def test_geometry_wkt() -> None: for geometry in [GEOMETRY1, GEOMETRY2]: - assert geometry == Geometry( - geometry.wkt, geometry.crs - ), "Transforming geometry to wkt and back should preserve it" + assert geometry == Geometry(geometry.wkt, geometry.crs) assert GEOMETRY2.wkt == WKT_STRING, "New WKT string does not match the original" -@pytest.mark.parametrize("geometry", [GEOMETRY1, GEOMETRY2, BBOX_COLLECTION]) -def test_bbox(geometry: Union[Geometry, BBoxCollection]) -> None: - assert geometry.bbox == BBox(geometry.geometry, geometry.crs), "Failed bbox property" +@pytest.mark.parametrize("geometry", [GEOMETRY1, GEOMETRY2]) +def test_bbox_of_geometry(geometry: Geometry) -> None: + assert geometry.bbox == BBox(geometry.geometry.bounds, geometry.crs) @pytest.mark.parametrize( @@ -251,13 +227,12 @@ def test_bbox(geometry: Union[Geometry, BBoxCollection]) -> None: Geometry("POLYGON ((0 0, 1.001 0.99, -0.1 0.45, 0 0))", crs=CRS.WGS84), Geometry("POLYGON ((0 0, 1.0 1.0, -0.1 0.5, 0 0))", crs=CRS.WGS84), ), - ( - BBoxCollection([BBox((1.11, 0, 0.999, 0.05), crs=CRS.WGS84) for _ in range(3)]), - BBoxCollection([BBox((1.1, 0, 1.0, 0.1), crs=CRS.WGS84) for _ in range(3)]), - ), ], ) -def test_apply_method(input_geometry: GeoType, expected_output_geometry: GeoType) -> None: +def test_geometry_apply_method(input_geometry: GeoType, expected_output_geometry: GeoType) -> None: + def _round_point_coords(x: float, y: float, decimals: int = 1) -> Tuple[float, float]: + return round(x, decimals), round(y, decimals) + rounded_geometry = input_geometry.apply(_round_point_coords) assert rounded_geometry is not input_geometry diff --git a/tests/test_io_utils.py b/tests/test_io_utils.py index 40911c12..9361cd4b 100644 --- a/tests/test_io_utils.py +++ b/tests/test_io_utils.py @@ -9,7 +9,8 @@ from pytest_lazyfixture import lazy_fixture from sentinelhub import read_data, write_data -from sentinelhub.exceptions import SHUserWarning + +BASIC_IMAGE = np.arange((5 * 6 * 3), dtype=np.uint8).reshape((5, 6, 3)) @pytest.mark.parametrize( @@ -31,6 +32,13 @@ def test_img_read(input_folder: str, filename: str, mean: float, shape: Tuple[in assert img.flags["WRITEABLE"], "Obtained numpy array is not writeable" +def test_read_tar_with_folder(input_folder: str) -> None: + path = os.path.join(input_folder, "tar-folder.tar") + data = read_data(path) + + assert data == {"tar-folder/simple.json": {"message": "test"}} + + @pytest.fixture def xml_testcase(): xml_root = ET.Element("EOPatch") @@ -43,11 +51,11 @@ def xml_testcase(): @pytest.mark.parametrize( "filename, data", [ - ("img.tif", np.arange(5 * 5 * 3).reshape((5, 5, 3))), - ("img.png", np.arange((5 * 5 * 3), dtype=np.uint8).reshape((5, 5, 3))), - ("img-8bit.jp2", np.arange((5 * 5 * 3), dtype=np.uint8).reshape((5, 5, 3))), - ("img-15bit.jp2", np.arange((5 * 5 * 3), dtype=np.uint8).reshape((5, 5, 3))), - ("img-16bit.jp2", np.arange((5 * 5 * 3), dtype=np.uint8).reshape((5, 5, 3))), + ("img.tif", np.arange(5 * 5 * 3).reshape((5, 5, 3))), # not restricting dtype + ("img.png", BASIC_IMAGE), + ("img-8bit.jp2", BASIC_IMAGE), + ("img-15bit.jp2", BASIC_IMAGE), + ("img-16bit.jp2", BASIC_IMAGE), ("test-string.txt", "sentinelhub-py is often shortened to sh-py"), ("test-xml.xml", lazy_fixture("xml_testcase")), ], @@ -68,16 +76,8 @@ def test_write_read(filename: str, data: Union[str, np.ndarray, ET.ElementTree]) @pytest.mark.parametrize("filename", ["img.jpg"]) -def test_img_write_jpeg(input_folder: str, filename: str) -> None: - img = read_data(os.path.join(input_folder, filename)) +def test_img_write_jpg(filename: str) -> None: + # Cannot verify that data is written correctly because JPG is not a lossless format with TempFS() as filesystem: file_path = filesystem.getsyspath(filename) - with pytest.warns(SHUserWarning): - write_data(file_path, img) - - -def test_read_tar_with_folder(input_folder: str) -> None: - path = os.path.join(input_folder, "tar-folder.tar") - data = read_data(path) - - assert data == {"tar-folder/simple.json": {"message": "test"}} + write_data(file_path, BASIC_IMAGE)