Skip to content

Commit

Permalink
Add type annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
William Heyman Krill committed Jun 16, 2023
1 parent 4f1a6c6 commit e8cd27f
Showing 1 changed file with 65 additions and 57 deletions.
122 changes: 65 additions & 57 deletions watobs/datafarm.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from __future__ import annotations

import base64
import datetime
import json
from functools import cached_property, wraps
import logging
import os
from typing import Dict, List, Optional, Union
import numpy as np
from functools import cached_property, wraps


import numpy as np
import pandas as pd
import pandas.io.json as pj
import requests
import pandera as pa
import requests

DateTime = str | datetime.datetime


def to_pandas_df(json_input: str) -> pd.DataFrame:
Expand Down Expand Up @@ -106,7 +109,7 @@ class DatafarmRepository:
}
)

def __init__(self, api_key):
def __init__(self, api_key: str):
self.api_key = api_key
self.session = requests.Session()
self.access_token = None
Expand All @@ -125,23 +128,23 @@ def list_time_series(self) -> pd.DataFrame:
@ensure_auth
def get_data(
self,
time_series_id,
start=datetime.datetime(1900, 1, 1, 0, 0, 0, 0),
end=datetime.datetime.now(),
fields=None,
qualities=None,
limit=0,
ascending=True,
):
time_series_id: str | list[str],
start: DateTime = datetime.datetime(1900, 1, 1, 0, 0, 0, 0),
end: DateTime = datetime.datetime.now(),
fields: list[str] | None = None,
qualities: list[str] | None = None,
limit: int = 0,
ascending: bool = True,
) -> pd.DataFrame:
"""Get data from Datafarm.
Parameters
==========
time_series_id : str or list of str
The time series to get data from.
start : str | datetime.datetime
start : DateTime, optional
The start of the range to get data from.
end : str | datetime.datetime, optional
end : DateTime, optional
The end of the range to get data from.
fields : list of str, optional
fields/columns to return
Expand Down Expand Up @@ -183,7 +186,7 @@ def get_data(
@ensure_auth
def insert_data(
self, time_series_id: str, data: pd.DataFrame, bulk_insert: bool = False
):
) -> requests.Response:
"""Insert data into a time series."""
body = self._get_insert_data_body(time_series_id, data, bulk_insert)
endpoint = "/TimeSeries/InsertData"
Expand All @@ -194,7 +197,7 @@ def insert_data(

def _get_insert_data_body(
self, time_series_id: str, data: pd.DataFrame, bulk_insert: bool = False
):
) -> dict:
"""Insert data into a time series.
Parameters
Expand All @@ -218,7 +221,7 @@ def _get_insert_data_body(
}
return body

def _prepare_insert_data(self, data: pd.DataFrame):
def _prepare_insert_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""Prepare data for insertion. This includes converting the timestamp to ISO8601 format,
converting the quality level to an integer, and checking that the data is valid.
"""
Expand Down Expand Up @@ -298,18 +301,24 @@ def _prepare_insert_data(self, data: pd.DataFrame):
return insert_data

@ensure_auth
def delete_data(self, time_series_id, timestamps=None, start=None, end=None):
def delete_data(
self,
time_series_id: str,
timestamps: list[DateTime] | None = None,
start: DateTime | None = None,
end: DateTime | None = None,
) -> requests.Response:
"""Delete data from a time series. Either timestamps or start and end must be provided.
Parameters
----------
time_series_id : str
The time series to delete data from.
timestamps : list of str | datetime.datetime objects, optional
timestamps : list of DateTime objects, optional
The timestamps to delete.
start : str | datetime.datetime, optional
start : DateTime, optional
The start of the range to delete data from.
end : str | datetime.datetime, optional
end : DateTime, optional
The end of the range to delete data from.
Note that this is NOT inclusive.
"""
Expand All @@ -322,14 +331,16 @@ def delete_data(self, time_series_id, timestamps=None, start=None, end=None):

return self._delete_data_range(time_series_id, start, end)

def _delete_data_timestamps(self, time_series_id, timestamps):
def _delete_data_timestamps(
self, time_series_id: str, timestamps: list[DateTime]
) -> requests.Response:
"""Delete data from a time series by timestamps.
Parameters
----------
time_series_id : str
The time series to delete data from.
timestamps : list of str | datetime.datetime objects
timestamps : list of DateTime objects
The timestamps to delete.
"""
endpoint = "/TimeSeries/DeleteData"
Expand All @@ -342,16 +353,18 @@ def _delete_data_timestamps(self, time_series_id, timestamps):
response.raise_for_status()
return response

def _delete_data_range(self, time_series_id, start, end):
def _delete_data_range(
self, time_series_id: str, start: DateTime, end: DateTime
) -> requests.Response:
"""Delete data from a time series by range [start, end).
Parameters
----------
time_series_id : str
The time series to delete data from.
start : str | datetime.datetime
start : DateTime
The start of the range to delete data from.
end : str | datetime.datetime
end : DateTime
The end of the range to delete data from.
Note that this is NOT inclusive.
"""
Expand All @@ -366,7 +379,12 @@ def _delete_data_range(self, time_series_id, start, end):
response.raise_for_status()
return response

def update_data_quality(self, time_series_id: str, timestamps, quality):
def update_data_quality(
self,
time_series_id: str,
timestamps: list[DateTime],
quality: int | str | list[int] | list[str],
) -> requests.Response:
"""Update the quality of data in a time series.
Parameters
Expand Down Expand Up @@ -407,7 +425,7 @@ def update_data_quality(self, time_series_id: str, timestamps, quality):
return response

@ensure_auth
def get_statistics(self, time_series_id: Union[str, List[str]]) -> pd.DataFrame:
def get_statistics(self, time_series_id: str | list[str]) -> pd.DataFrame:
"""Get statistics for a time series or a list of time series.
Parameters
Expand All @@ -426,84 +444,74 @@ def get_statistics(self, time_series_id: Union[str, List[str]]) -> pd.DataFrame:

return to_pandas_df(json.dumps(data))

def _get_file_base64(self, file_paths):
file_exists = [os.path.exists(x) for x in file_paths]
if not all(file_exists):
raise ValueError(
f"File does not exist: {file_paths[~np.array(file_exists)]}"
)
file_base64 = [base64.b64encode(open(x, "rb").read()) for x in file_paths]
file_names = [os.path.basename(x) for x in file_paths]
return file_base64, file_names

@cached_property
@ensure_auth
def time_series_metadata(self):
def time_series_metadata(self) -> pd.DataFrame:
endpoint = "/MetaData/Entity"
params = {"aClassId": "Timeseries"}
return self._get_pandas_df(endpoint, params)

@cached_property
@ensure_auth
def time_series_source_descriptions(self):
def time_series_source_descriptions(self) -> pd.DataFrame:
endpoint = "/List/TimeSeriesSourceDescriptions"
return self._get_pandas_df(endpoint)

@cached_property
@ensure_auth
def units(self):
def units(self) -> pd.DataFrame:
endpoint = "/List/Units"
return self._get_pandas_df(endpoint)

@cached_property
@ensure_auth
def time_series_types(self):
def time_series_types(self) -> pd.DataFrame:
endpoint = "/List/TimeSeriesTypes"
return self._get_pandas_df(endpoint)

@cached_property
@ensure_auth
def time_series_status(self):
def time_series_status(self) -> pd.DataFrame:
endpoint = "/List/TimeSeriesStatus"
return self._get_pandas_df(endpoint)

@cached_property
@ensure_auth
def qualities(self):
def qualities(self) -> pd.DataFrame:
endpoint = "/List/Qualities"
return self._get_pandas_df(endpoint)

@cached_property
@ensure_auth
def parameters(self):
def parameters(self) -> pd.DataFrame:
endpoint = "/List/Parameters"
return self._get_pandas_df(endpoint)

@cached_property
@ensure_auth
def medias(self):
def medias(self) -> pd.DataFrame:
endpoint = "/List/Medias"
return self._get_pandas_df(endpoint)

@cached_property
@ensure_auth
def locations(self):
def locations(self) -> pd.DataFrame:
endpoint = "/List/Locations"
return self._get_pandas_df(endpoint)

@cached_property
@ensure_auth
def quality_level_to_name(self):
def quality_level_to_name(self) -> dict[int, str]:
df = self.qualities
return {df["Level"].iloc[i]: df["IDName"].iloc[i] for i in range(len(df))}

@cached_property
@ensure_auth
def quality_name_to_level(self):
def quality_name_to_level(self) -> dict[str, int]:
df = self.qualities
return {df["IDName"].iloc[i]: df["Level"].iloc[i] for i in range(len(df))}

def connect(self):
def connect(self) -> None:
"""Connect to the Datafarm API."""
url = self.API_URL + "/Login/Login"
data = {"Token": self.api_key}
Expand All @@ -518,7 +526,7 @@ def connect(self):
self.headers = {"Access-Token": self.access_token}
self._connected = True

def close(self):
def close(self) -> None:
"""Close the connection to the Datafarm API."""
url = self.API_URL + "/Login/Logoff"
response = self.session.post(url, headers=self.headers)
Expand All @@ -527,23 +535,23 @@ def close(self):
self.headers = None
self._connected = False

def _get_pandas_df(self, endpoint, params=None):
def _get_pandas_df(self, endpoint: str, params: dict | None = None) -> pd.DataFrame:
url = self.API_URL + endpoint
r = requests.get(url, headers=self.headers, params=params)
r.raise_for_status()
data = r.json()
return to_pandas_df(json.dumps(data))

@staticmethod
def _format_float(x: Optional[float]) -> Dict[str, Union[int, float]]:
def _format_float(x: float | None) -> dict[str, float]:
"""Format a float for JSON serialization."""
if x is None or np.isnan(x):
return {"N": 1, "V": 0.0}
return {"N": 0, "V": x}
return {"N": 0, "V": float(x)}

def __enter__(self):
def __enter__(self) -> DatafarmRepository:
self.connect()
return self

def __exit__(self, exc_type, exc_value, traceback):
def __exit__(self, exc_type, exc_value, traceback) -> None:
self.close()

0 comments on commit e8cd27f

Please sign in to comment.