From f9f05634562595579b1f1c9b67ef84d11ea55e0c Mon Sep 17 00:00:00 2001 From: Raj Sinha Date: Thu, 20 Jun 2024 07:38:43 -0700 Subject: [PATCH] Add preprocessing Colab to Github, with appropriate License header. PiperOrigin-RevId: 645026401 --- .../colabs/spade_preprocessing.ipynb | 1079 +++++++++++++++++ 1 file changed, 1079 insertions(+) create mode 100644 spade_anomaly_detection/colabs/spade_preprocessing.ipynb diff --git a/spade_anomaly_detection/colabs/spade_preprocessing.ipynb b/spade_anomaly_detection/colabs/spade_preprocessing.ipynb new file mode 100644 index 0000000..9d923b0 --- /dev/null +++ b/spade_anomaly_detection/colabs/spade_preprocessing.ipynb @@ -0,0 +1,1079 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "uRwZNJphFFcA", + "metadata": { + "id": "uRwZNJphFFcA" + }, + "source": [ + "# Copyright 2023-2024 Google LLC\n", + "\n", + "Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "you may not use this file except in compliance with the License.\n", + "You may obtain a copy of the License at\n", + "\n", + " https://www.apache.org/licenses/LICENSE-2.0\n", + "\n", + "Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "See the License for the specific language governing permissions and limitations under the License." + ] + }, + { + "cell_type": "markdown", + "id": "6b18ee64-adc0-4376-89d1-23ec89e34cb7", + "metadata": { + "id": "6b18ee64-adc0-4376-89d1-23ec89e34cb7" + }, + "source": [ + "# Preprocessing for SPADE Anomaly Detection\n", + "\n", + "### This is an example of preprocessing an input dataset so that it is suitable for SPADE. Different use cases may require different types of preprocessing.\n", + "\n", + "\n", + "### Some use cases require a Colab runtime with at least 64GB of RAM and up to 256GB of RAM (for some preprocessing configurations). GPUs are not required." + ] + }, + { + "cell_type": "markdown", + "id": "f6a8b64f-6e8e-4f20-a2f5-7fcfe956692b", + "metadata": { + "id": "f6a8b64f-6e8e-4f20-a2f5-7fcfe956692b" + }, + "source": [ + "This Notebook will read your BigQuery table and preprocess the features for SPADE training. In summary:\n", + "\n", + "1. It will verify that the label column exists and is of type boolean, integer or float. It will verify its cardinality to be 2 (Positive+Negative samples) or 3 (Positive+Negative+Unlabeled samples).\n", + "2. All NaNs (Pandas and Numpy) will be replaced with an appropriate fill value for that type of column.\n", + "3. All datetime columns will be converted to string columns.\n", + "4. All string columns will be converted to categorical columns. *Optionally*, their maximum cardinality will be reduced to 5 categories (including an 'infrequent' category). All values with frequency less than 5% will be assigned to the 'infrequent' category. Then they will be [one-hot encoded](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OneHotEncoder.html).\n", + "5. Then all columns except the label column (integer, float, one-hot encoded) will be normalized in the range [0, 1] with [min-max scaling](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html).\n", + "6. The resulting table will be uploaded to the same GCP Project and BigQuery Dataset as the input table.\n", + "7. The resulting table is then randomly split into a `train` and `test` table specified by the fractional size of the `test` table.\n", + "8. A specified fraction of the labels in the `train` table are discarded. This is for unsupervised training in Stage 1 of SPADE.\n", + "7. Both the `train` and `test` tables are uploaded to the same GCP Project and BigQuery Dataset as the input table." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "83521ede-3890-449c-a696-08b9ac7c9600", + "metadata": { + "id": "83521ede-3890-449c-a696-08b9ac7c9600", + "tags": [] + }, + "outputs": [], + "source": [ + "# @title Install packages for preprocessing - it is safe to ignore any `subprocess` errors in this cell\n", + "\n", + "!pip install -qq -U pandas-gbq pyarrow google-cloud-bigquery-storage\n", + "!pip install -qq -U dask dask-bigquery\n", + "\n", + "# Upgrade Sklearn to the latest version - needed for preprocessing.\n", + "!pip install -qq -U sklearn" + ] + }, + { + "cell_type": "markdown", + "id": "Bj5O0S5RTxzY", + "metadata": { + "id": "Bj5O0S5RTxzY" + }, + "source": [ + "### Restart the kernel\n", + "Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.\n", + "\n", + "\n", + "**Note: Once this cell has finished running, continue on. You do not need to re-run any of the cells above.**\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "023DMKUaTypt", + "metadata": { + "id": "023DMKUaTypt" + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "if not os.getenv(\"IS_TESTING\"):\n", + " # Automatically restart kernel after installs\n", + " import IPython\n", + "\n", + " app = IPython.Application.instance()\n", + " app.kernel.do_shutdown(True)" + ] + }, + { + "cell_type": "markdown", + "id": "95cb7ffd6895", + "metadata": { + "id": "95cb7ffd6895" + }, + "source": [ + "### Set your project ID\n", + "\n", + "Set your project ID below. If you know know your project ID, leave the field blank and the following cells may be able to find it. Optionally, you may also set a service account in the cell below." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cd85f5c794e5", + "metadata": { + "id": "cd85f5c794e5" + }, + "outputs": [], + "source": [ + "PROJECT_ID = \"[your-project-id]\" # @param {type:\"string\", default:\"[your-project-id]\"}\n", + "\n", + "if PROJECT_ID == \"\" or PROJECT_ID is None or PROJECT_ID == \"[your-project-id]\":\n", + " # Get your GCP project id from gcloud\n", + " shell_output = !gcloud config list --format 'value(core.project)' 2\u003e/dev/null\n", + " PROJECT_ID = shell_output[0]\n", + "print(\"Project ID:\", PROJECT_ID)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "SxBpIJ4n1A5B", + "metadata": { + "id": "SxBpIJ4n1A5B" + }, + "outputs": [], + "source": [ + "!gcloud auth application-default login" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "FKE3bpwGHM47", + "metadata": { + "id": "FKE3bpwGHM47" + }, + "outputs": [], + "source": [ + "from google.colab import auth as google_auth\n", + "google_auth.authenticate_user()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "903c5382-06aa-446b-9551-b86a1acc858b", + "metadata": { + "id": "903c5382-06aa-446b-9551-b86a1acc858b", + "tags": [] + }, + "outputs": [], + "source": [ + "# @title Imports\n", + "\n", + "from datetime import datetime\n", + "from typing import Any, Dict, List, Optional, Tuple, Union\n", + "\n", + "import numpy as np\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3439d22a-9d07-406f-9a6f-501c0645774f", + "metadata": { + "id": "3439d22a-9d07-406f-9a6f-501c0645774f", + "tags": [] + }, + "outputs": [], + "source": [ + "\n", + "import pandas as pd\n", + "import pandas_gbq\n", + "\n", + "import dask\n", + "import dask_bigquery\n", + "\n", + "from sklearn import model_selection, preprocessing\n", + "from google.cloud import bigquery\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "687bdfff-255d-4e3e-ac96-90b2257c7442", + "metadata": { + "id": "687bdfff-255d-4e3e-ac96-90b2257c7442", + "tags": [] + }, + "outputs": [], + "source": [ + "!gcloud auth application-default set-quota-project $PROJECT_ID\n", + "!gcloud config set project $PROJECT_ID" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21357542-29ec-4e62-944e-b1eb17add6d2", + "metadata": { + "id": "21357542-29ec-4e62-944e-b1eb17add6d2", + "tags": [] + }, + "outputs": [], + "source": [ + "# @title The BigQuery location of your source and preprocessed data tables.\n", + "\n", + "dataset_id = '[your-dataset]' # @param {type:\"string\"}\n", + "source_table_id = '[source-source-table]' # @param {type:\"string\"}\n", + "label_column = '[your-label-column]' # @param {type:\"string\"}\n", + "preprocessed_table_id = '[your-preprocessed-table]' # @param {type:\"string\"}\n", + "\n", + "location = 'us-central1' # @param {type:\"string\"}\n", + "\n", + "client = bigquery.Client(project=PROJECT_ID, location=location)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6c3e7145-4e8f-4c79-b92b-305cfe4ab5f1", + "metadata": { + "cellView": "form", + "id": "6c3e7145-4e8f-4c79-b92b-305cfe4ab5f1", + "tags": [] + }, + "outputs": [], + "source": [ + "# @title Preprocessing Functions - do not edit if not needed\n", + "\n", + "def get_number_of_rows(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame]\n", + ") -\u003e int:\n", + " if isinstance(df, dask.dataframe.core.DataFrame):\n", + " return df.shape[0].compute()\n", + " else:\n", + " return len(df)\n", + "\n", + "def get_column_unique_counts(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame]\n", + ") -\u003e Dict[str, int]:\n", + " # Get a column-to-number-of-unique-values dict, as it is expensive to compute.\n", + " # Reuse the dict as needed.\n", + " nuniques = df.nunique()\n", + " if isinstance(df, dask.dataframe.core.DataFrame):\n", + " nuniques = nuniques.compute()\n", + " return nuniques.to_dict()\n", + " # return {c: df[c].nunique() for c in df.columns}\n", + "\n", + "def get_columns_to_drop(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + " columns_to_nuniques: Dict[str, int],\n", + " len_df: int,\n", + ") -\u003e list[str]:\n", + " df_columns = df.columns.to_list()\n", + " columns_with_all_nan = [c for c in df_columns if columns_to_nuniques[c] == 0]\n", + " columns_with_cardinality_1 = [c for c in df_columns if columns_to_nuniques[c] == 1]\n", + " columns_with_all_uniques = [c for c in df_columns if columns_to_nuniques[c] == len_df]\n", + " return columns_with_all_nan + columns_with_cardinality_1 + columns_with_all_uniques\n", + "\n", + "def set_datetime_on_timestamp_columns(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + " timestamp_columns: List[str],\n", + ") -\u003e Union[pd.DataFrame, dask.dataframe.core.DataFrame]:\n", + " for c in timestamp_columns:\n", + " # 'utc' may need to be True for your use case.\n", + " df[c] = pd.to_datetime(df[c], utc=False)\n", + " return df\n", + "\n", + "def convert_datetime_columns_to_string(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + ") -\u003e Union[pd.DataFrame, dask.dataframe.core.DataFrame]:\n", + " columns_datetime = df.select_dtypes(\n", + " include=['datetime', 'datetimetz', 'datetime64', np.datetime64]).columns\n", + " # Convert all datetime columns to strings.\n", + " for c in columns_datetime:\n", + " # df[c] = pd.to_datetime(df[c])\n", + " df[c] = df[c].dt.strftime(\"%Y-%m-%d %H:%M:%S\")\n", + " df[c] = df[c].astype(str)\n", + " return df\n", + "\n", + "def replace_na(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + ") -\u003e Union[pd.DataFrame, dask.dataframe.core.DataFrame]:\n", + " \"\"\"Replace NaN values with an appropriate fill value determined by column type.\"\"\"\n", + " columns_datetime = df.select_dtypes(\n", + " include=['datetime', 'datetimetz', 'datetime64', np.datetime64,\n", + " ]\n", + " ).columns\n", + " columns_numeric = df.select_dtypes(include='number').columns\n", + " columns_string = df.select_dtypes(include='object').columns\n", + " dt_nan_replacement = str(pd.to_datetime(datetime.fromtimestamp(0.0)))\n", + " cols_dt_map = dict(\n", + " zip(columns_datetime, [dt_nan_replacement] * len(columns_datetime))\n", + " )\n", + " cols_num_map = dict(zip(columns_numeric, [0.0] * len(columns_numeric)))\n", + " cols_str_map = dict(zip(columns_string, [''] * len(columns_string)))\n", + " cols_nan_map = {**cols_num_map, **cols_str_map, **cols_dt_map}\n", + " return df.fillna(cols_nan_map)\n", + "\n", + "def get_numeric_columns(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + ") -\u003e list[str]:\n", + " return df.select_dtypes(include=['number']).columns\n", + "\n", + "def get_string_columns(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + ") -\u003e list[str]:\n", + " return df.select_dtypes(include=['object']).columns\n", + "\n", + "def select_categorical_columns(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + " columns_to_nuniques: Dict[str, int],\n", + " cat_cutoff: int,\n", + ") -\u003e list[str]:\n", + " columns_string = get_string_columns(df)\n", + " return [c for c in columns_string if columns_to_nuniques[c] \u003c= cat_cutoff]\n", + "\n", + "def get_binary_columns(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + " columns_to_nuniques: Dict[str, int],\n", + ") -\u003e list[str]:\n", + " return [col for col, c in columns_to_nuniques.items() if c == 2]\n", + "\n", + "def remap_to_binary_columns(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + " binary_columns: list[str],\n", + ") -\u003e Union[pd.DataFrame, dask.dataframe.core.DataFrame]:\n", + " if binary_columns:\n", + " if isinstance(df, dask.dataframe.core.DataFrame):\n", + " # Find the pairs of unique values in the binary columns.\n", + " uniques = [df[c].unique().compute().to_list() for c in binary_columns]\n", + " # Need to call categorize() first in Dask.\n", + " df = df.categorize(columns=binary_columns)\n", + " df_bin = dask.dataframe.get_dummies(data=df, columns=binary_columns)\n", + " df = dask.dataframe.concat([df, df_bin], axis=1)\n", + " # At this point there are 2 new columns per binary column. Each new\n", + " # column has a name equal to one of the 2 binary values from the\n", + " # original column. We arbitrarily drop the first value and keep the\n", + " # second.\n", + " cols_to_drop = [u[0] for u in uniques] + binary_columns\n", + " df = df.drop(cols_to_drop, axis=1)\n", + " df = df.rename(dict(zip([u[1] for u in uniques], binary_columns)))\n", + " elif isinstance(df, pd.DataFrame):\n", + " uniques = [df[c].unique().to_list() for c in binary_columns]\n", + " for column, unique_values in zip(binary_columns, uniques):\n", + " df_bin = pd.get_dummies(df[column])\n", + " df = pd.concat((df, df_bin), axis=1)\n", + " # At this point there are 2 new columns per binary column. Each new\n", + " # column has a name equal to one of the 2 binary values from the\n", + " # original column. We arbitrarily drop the first value and keep the\n", + " # second.\n", + " cols_to_drop = [unique_values[0], column]\n", + " df = df.drop(cols_to_drop, axis=1)\n", + " df = df.rename(columns={unique_values[1]: column})\n", + " return df\n", + "\n", + "def drop_duplicate_columns(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + ") -\u003e pd.DataFrame:\n", + " return df.iloc[:,~df.columns.duplicated()]\n", + "\n", + "# One-Hot Encoding\n", + "def _one_hot_encode_categorical_features(\n", + " data: pd.DataFrame,\n", + " original_columns: list[str],\n", + " categorical_columns: list[str],\n", + " ohe: preprocessing.OneHotEncoder = None,\n", + " # How to handle unknown categories: one of ‘ignore’, ‘infrequent_if_exist’\n", + " handle_unknown: str = 'infrequent_if_exist',\n", + " # Minimum fraction of samples below which a category will be considered\n", + " # infrequent.\n", + " min_frequency: float = 0.05,\n", + " # Upper limit to the number of output features for each input feature,\n", + " # including an infrequent category.\n", + " max_categories: int = 5,\n", + ") -\u003e Tuple[pd.DataFrame, preprocessing.OneHotEncoder]:\n", + " \"\"\"one-hot encodes categorical feature columns in the input array.\"\"\"\n", + " # Setup the One-Hot Encoder. If it is already provided, that means that the\n", + " # encoder was already previously fitted to training data. In that case we will\n", + " # just apply the transform using the fitted encoder.\n", + " if ohe is None:\n", + " # Need to fit a new One-Hot Encoder.\n", + " ohe = preprocessing.OneHotEncoder(\n", + " sparse_output=False,\n", + " drop='first',\n", + " handle_unknown=handle_unknown,\n", + " min_frequency=min_frequency,\n", + " max_categories=max_categories)\n", + "\n", + " #One-hot-encode the categorical columns using the newly fitted encoder.\n", + " data_onehot_encoded = ohe.fit_transform(data)\n", + " else:\n", + " #One-hot-encode the categorical columns using the previously fitted encoder.\n", + " data_onehot_encoded = ohe.transform(data)\n", + "\n", + " # Concatenate the two arrays: categorical-one-hot-encoded and other.\n", + " data_encoded = pd.DataFrame(\n", + " data=data_onehot_encoded, columns=ohe.get_feature_names_out())\n", + " return data_encoded, ohe\n", + "\n", + "def one_hot_encode_categorical_features(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + " categorical_columns: list[str],\n", + " label_column: str,\n", + " ohe: preprocessing.OneHotEncoder = None,\n", + " # How to handle unknown categories: one of ‘ignore’, ‘infrequent_if_exist’\n", + " handle_unknown: str = 'infrequent_if_exist',\n", + " # Minimum fraction of samples below which a category will be considered\n", + " # infrequent.\n", + " min_frequency: float = 0.05,\n", + " # Upper limit to the number of output features for each input feature,\n", + " # including an infrequent category.\n", + " max_categories: int = 5,\n", + ") -\u003e Tuple[np.ndarray, preprocessing.OneHotEncoder]:\n", + " \"\"\"one-hot encodes categorical feature columns in the input DataFrame.\"\"\"\n", + " # We choose to loop over the columns and pass each as a Numpy array to the\n", + " # One Hot encoder, because the Dask DummyEncoder oes not support setting the\n", + " # minimum frequency or the maximum categories.\n", + " df_columns = df.columns.to_list()\n", + " cols_excl_label = [c for c in df_columns if c != label_column]\n", + " cols_excl_cat_cols = [c for c in cols_excl_label if c not in categorical_columns]\n", + " # Extract the categorical array for this dataframe.\n", + " if isinstance(df, dask.dataframe.core.DataFrame):\n", + " df_cat = df.loc[:, categorical_columns].compute()\n", + " df_noncat = df.loc[:, cols_excl_cat_cols].compute()\n", + " df_label = df.loc[:, label_column].compute()\n", + " elif isinstance(df, pd.DataFrame):\n", + " df_cat = df.loc[:, categorical_columns]\n", + " df_noncat = df.loc[:, cols_excl_cat_cols]\n", + " df_label = df.loc[:, label_column]\n", + " # Encode the array\n", + " df_encoded, ohe = _one_hot_encode_categorical_features(\n", + " data=df_cat,\n", + " original_columns=df_columns,\n", + " categorical_columns=categorical_columns,\n", + " ohe=None,\n", + " handle_unknown='infrequent_if_exist',\n", + " min_frequency=min_frequency,\n", + " max_categories=max_categories,\n", + " )\n", + " result = pd.concat([df_noncat, df_encoded, df_label], axis=1)\n", + " return result, ohe\n", + "\n", + "# Min-Max Scaling\n", + "def _min_max_scale_features(\n", + " data: pd.DataFrame,\n", + " mms: preprocessing.MinMaxScaler = None,\n", + " # Whether to clip transformed data to the same range as the fitted data\n", + " clip: bool = True,\n", + ") -\u003e Tuple[pd.DataFrame, preprocessing.MinMaxScaler]:\n", + " \"\"\"min-max scales feature columns in the input array.\"\"\"\n", + " columns = data.columns.to_list()\n", + " # Setup the Min-Max Scaler. If it is already provided, that means that the\n", + " # encoder was already previously fitted to training data. In that case we will\n", + " # just apply the transform using the fitted scaler.\n", + " if mms is None:\n", + " # Need to fit a new Min-Max Scaler.\n", + " mms = preprocessing.MinMaxScaler(clip=clip)\n", + "\n", + " # Min-max-scale the feature columns using the newly fitted scaler.\n", + " data_minmax_scaled = mms.fit_transform(data)\n", + " else:\n", + " # Min-max-scale the feature columns using the previously fitted scaler.\n", + " data_minmax_scaled = mms.transform(data)\n", + "\n", + " # Create dataframe.\n", + " data_scaled = pd.DataFrame(data=data_minmax_scaled, columns=columns)\n", + " return data_scaled, mms\n", + "\n", + "def min_max_scale_features(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame],\n", + " label_column: str,\n", + " mms: preprocessing.MinMaxScaler = None,\n", + " # Whether to clip transformed data to the same range as the fitted data\n", + " clip: bool = True,\n", + ") -\u003e Tuple[np.ndarray, preprocessing.MinMaxScaler]:\n", + " \"\"\"min-max scaled feature columns in the input DataFrame.\"\"\"\n", + " df_columns = df.columns.to_list()\n", + " cols_excl_label = [c for c in df_columns if c != label_column]\n", + " # Extract the categorical array for this dataframe.\n", + " if isinstance(df, dask.dataframe.core.DataFrame):\n", + " df_features = df.loc[:, cols_excl_label].compute()\n", + " df_label = df.loc[:, label_column].compute()\n", + " elif isinstance(df, pd.DataFrame):\n", + " df_features = df.loc[:, cols_excl_label]\n", + " df_label = df.loc[:, label_column]\n", + " # Encode the array\n", + " df_scaled, mms = _min_max_scale_features(\n", + " data=df_features,\n", + " mms=None,\n", + " clip=clip,\n", + " )\n", + " df_label = df_label.astype(np.float64)\n", + " result = pd.concat([df_scaled, df_label], axis=1)\n", + " return result, mms\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81f59f4d-1597-4962-be68-416554cbe260", + "metadata": { + "id": "81f59f4d-1597-4962-be68-416554cbe260", + "tags": [] + }, + "outputs": [], + "source": [ + "# @title Load the data using Pandas or Dask - Dask is not as well tested\n", + "\n", + "%%time\n", + "\n", + "use_pandas = True # @param {type:\"boolean\"}\n", + "\n", + "if use_pandas:\n", + " df = pandas_gbq.read_gbq(\n", + " query_or_table=f'{dataset_id}.{source_table_id}',\n", + " project_id=PROJECT_ID,\n", + " use_bqstorage_api=True,\n", + " )\n", + "else:\n", + " df = dask_bigquery.read_gbq(\n", + " project_id=PROJECT_ID,\n", + " dataset_id=dataset_id,\n", + " table_id=source_table_id,\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "218e8170-3426-49b2-ae73-1b600a66e67a", + "metadata": { + "id": "218e8170-3426-49b2-ae73-1b600a66e67a", + "tags": [] + }, + "outputs": [], + "source": [ + "df.info()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ZjDeoXtvI7Ie", + "metadata": { + "id": "ZjDeoXtvI7Ie" + }, + "outputs": [], + "source": [ + "df.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9appjtcnI9SB", + "metadata": { + "id": "9appjtcnI9SB" + }, + "outputs": [], + "source": [ + "# @title Get counts of unique values for each column\n", + "\n", + "# Warning: this cell can take a long time to run.\n", + "\n", + "%%time\n", + "\n", + "column_nuniques = get_column_unique_counts(df=df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "JmqCUGt0NN0d", + "metadata": { + "id": "JmqCUGt0NN0d" + }, + "outputs": [], + "source": [ + "# @title Get length of dataframe\n", + "\n", + "# Warning: this cell can take a long time to run.\n", + "\n", + "%%time\n", + "\n", + "len_df = get_number_of_rows(df)\n", + "print(f'Number of rows: {len_df}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "itmm39B60JyO", + "metadata": { + "id": "itmm39B60JyO" + }, + "outputs": [], + "source": [ + "# @title Verify the type and cardinality of the label column\n", + "\n", + "label_col_cardinality = column_nuniques.get(label_column, -1)\n", + "if label_col_cardinality == -1:\n", + " raise ValueError(\n", + " f'Label column \"{label_column}\" is not found in input table '\n", + " f'{PROJECT_ID}.{dataset_id}.{source_table_id}.'\n", + " )\n", + "if label_col_cardinality \u003c 2 or label_col_cardinality \u003e 3:\n", + " raise ValueError(\n", + " f'Label column \"{label_column}\" must have either 2 (Positive-Negative) '\n", + " 'or 3 (Positive-Negative-Unlabeled) unique values.'\n", + " )\n", + "\n", + "label_col_type = df[label_column].dtype\n", + "if label_col_type not in [pd.BooleanDtype(), np.dtypes.Float64DType(), pd.Int64Dtype()]:\n", + " raise TypeError(\n", + " f'The type {label_col_type} of label column \"{label_column}\" is not in '\n", + " 'the allowed types [int, float].'\n", + " )\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "rrowAkX3Ifao", + "metadata": { + "id": "rrowAkX3Ifao" + }, + "outputs": [], + "source": [ + "# @title Drop the columns that we want to exclude\n", + "\n", + "%%time\n", + "\n", + "# Columns with all NaNs, with cardinality=1 (only 1 unique value) or\n", + "# cardinality=number of rows (all unique values) should be dropped.\n", + "\n", + "columns_to_drop = get_columns_to_drop(df, column_nuniques, len_df)\n", + "df = df.drop(columns_to_drop, axis='columns')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "IVA0tyl4Y2Sb", + "metadata": { + "id": "IVA0tyl4Y2Sb" + }, + "outputs": [], + "source": [ + "# @title Set datetime on timestamp columns\n", + "\n", + "%%time\n", + "\n", + "df = set_datetime_on_timestamp_columns(\n", + " df=df, timestamp_columns=[c for c in df.columns if 'timestamp' in c]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "NZ5_epDwWJoG", + "metadata": { + "id": "NZ5_epDwWJoG" + }, + "outputs": [], + "source": [ + "# @title Replace all NaNs with zero.\n", + "\n", + "%%time\n", + "\n", + "df = replace_na(df=df)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2zWL2S6FfxEJ", + "metadata": { + "id": "2zWL2S6FfxEJ" + }, + "outputs": [], + "source": [ + "# @title Convert datetime columns to string columns.\n", + "\n", + "# Warning: this cell can take a long time to run.\n", + "\n", + "%%time\n", + "\n", + "df = convert_datetime_columns_to_string(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8lUWZ_76SfNJ", + "metadata": { + "id": "8lUWZ_76SfNJ" + }, + "outputs": [], + "source": [ + "# @title Handle categorical columns\n", + "\n", + "# Warning: this cell can take a long time to run.\n", + "\n", + "%%time\n", + "\n", + "# @markdown ### Set minimum frequency and maximum categories here.\n", + "min_frequency = 0.01\n", + "# Set maximum categories to None if no upper limit to the number of categories.\n", + "max_categories = None\n", + "\n", + "df_dtypes = {c: df[c].dtype for c in df.columns}\n", + "categorical_columns = [c for c, t in df_dtypes.items() if t == np.dtypes.ObjectDType]\n", + "\n", + "df_final, ohe = one_hot_encode_categorical_features(\n", + " df=df,\n", + " categorical_columns=categorical_columns,\n", + " label_column=label_column,\n", + " ohe=None,\n", + " # How to handle unknown categories: one of ‘ignore’, ‘infrequent_if_exist’\n", + " handle_unknown='infrequent_if_exist',\n", + " # Minimum fraction of samples below which a category will be considered\n", + " # infrequent.\n", + " min_frequency=min_frequency,\n", + " # Upper limit to the number of output features for each input feature,\n", + " # including an infrequent category.\n", + " max_categories=max_categories)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "qOEN3MDDunMw", + "metadata": { + "id": "qOEN3MDDunMw" + }, + "outputs": [], + "source": [ + "# @title Min-Max scale all features columns.\n", + "\n", + "# Warning: this cell can take a long time to run.\n", + "\n", + "%%time\n", + "\n", + "df_final, mms = min_max_scale_features(\n", + " df=df_final,\n", + " label_column=label_column,\n", + " mms=None,\n", + " clip=True,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "620cWZ8fhaEa", + "metadata": { + "id": "620cWZ8fhaEa" + }, + "outputs": [], + "source": [ + "# @title Column names are mangled so rename for BigQuery compatibility.\n", + "\n", + "# This means that only alphanumeric characters and underscores are allowed.\n", + "# Remap special characters to underscores.\n", + "\n", + "%%time\n", + "\n", + "replace_chars = '/\\\\.,:;@#{}()[]!?$%^\u0026*+-=\u003c\u003e|\\'\"`~ '\n", + "replace_with = ''.join(['_'] * len(replace_chars))\n", + "translation = str.maketrans(replace_chars, replace_with)\n", + "\n", + "\n", + "df_final = df_final.rename(\n", + " columns={\n", + " c: c.translate(translation)\n", + " for c in df_final.columns.to_list()\n", + " }\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "zd-Ajzs4h55B", + "metadata": { + "id": "zd-Ajzs4h55B" + }, + "outputs": [], + "source": [ + "df_final.columns" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5e90245d-8afc-48d4-b855-fb06dff5e9ef", + "metadata": { + "id": "5e90245d-8afc-48d4-b855-fb06dff5e9ef", + "tags": [] + }, + "outputs": [], + "source": [ + "df_final.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "KzVPDSp45Dxx", + "metadata": { + "id": "KzVPDSp45Dxx" + }, + "outputs": [], + "source": [ + "df_final_dtypes = {c: df_final[c].dtype for c in df_final.columns}\n", + "df_final_dtypes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6985889b-76c8-4ee2-b7b5-da61d9e6d58e", + "metadata": { + "id": "6985889b-76c8-4ee2-b7b5-da61d9e6d58e", + "tags": [] + }, + "outputs": [], + "source": [ + "# @title Upload preprocessed table to BigQuery\n", + "\n", + "%%time\n", + "\n", + "def _get_schema_from_dataframe(\n", + " df: Union[pd.DataFrame, dask.dataframe.core.DataFrame]\n", + ") -\u003e List[bigquery.SchemaField]:\n", + " \"\"\"Gets the BigQuery schema from a dataframe.\"\"\"\n", + " df_dtypes = {c: df[c].dtype for c in df.columns}\n", + " # df_dtypes_updt = dict()\n", + " # for c, dt in df_dtypes.items():\n", + " # if dt == pd.BooleanDtype(): df_dtypes_updt[c] = 'boolean'\n", + " # elif dt == pd.Int64Dtype(): df_dtypes_updt[c] = 'Int64'\n", + " # else: df_dtypes_updt[c] = dt\n", + " # BOOL, FLOAT64, INT64 are the only allowed dtypes after preprocessing.\n", + " # allowed_dtypes = [\"boolean\", np.dtypes.Float64DType, \"Int64\"]\n", + " allowed_dtypes = [pd.BooleanDtype(), np.dtypes.Float64DType(), pd.Int64Dtype()]\n", + " df_dtypes_filt = {c: dt for c, dt in df_dtypes.items() if dt not in allowed_dtypes}\n", + " if df_dtypes_filt:\n", + " raise ValueError(f'Preprocessing did not handle all dtypes: {df_dtypes_filt}')\n", + " pd_to_bq_dtype_map = {\n", + " pd.BooleanDtype(): bigquery.enums.SqlTypeNames.BOOL,\n", + " np.dtypes.Float64DType(): bigquery.enums.SqlTypeNames.FLOAT64,\n", + " pd.Int64Dtype(): bigquery.enums.SqlTypeNames.INT64,\n", + " }\n", + " schema=[\n", + " bigquery.SchemaField(\n", + " c, pd_to_bq_dtype_map[dt], mode='REQUIRED'\n", + " ) for c, dt in df_dtypes.items()\n", + " ]\n", + " return schema\n", + "\n", + "def load_dataframe_to_bq_table(\n", + " df: pd.DataFrame,\n", + " table_id: str,\n", + "):\n", + " \"\"\"Loads a dataframe into a BigQuery table.\"\"\"\n", + " job_config = bigquery.LoadJobConfig(\n", + " schema=_get_schema_from_dataframe(df),\n", + " write_disposition='WRITE_TRUNCATE',\n", + " )\n", + "\n", + " job = client.load_table_from_dataframe(\n", + " df, table_id, job_config=job_config\n", + " ) # Make an API request.\n", + " job.result() # Wait for the job to complete.\n", + "\n", + " table = client.get_table(table_id) # Make an API request.\n", + " print(\n", + " \"Loaded {} rows and {} columns to {}\".format(\n", + " table.num_rows, len(table.schema), table_id\n", + " )\n", + " )\n", + "\n", + "\n", + "table_id = f'{PROJECT_ID}.{dataset_id}.{preprocessed_table_id}'\n", + "\n", + "load_dataframe_to_bq_table(df=df_final, table_id=table_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "77d7a1a8-b93c-4294-8fcd-1b2a525ae8e4", + "metadata": { + "id": "77d7a1a8-b93c-4294-8fcd-1b2a525ae8e4" + }, + "outputs": [], + "source": [ + "# @title Split the final dataframe into a train and test set\n", + "\n", + "%%time\n", + "\n", + "test_fraction = 0.2 # @param {type:\"number\"}\n", + "assert 0.0 \u003c test_fraction \u003c= 0.5, \"'test_fraction' should be between 0 an 0.5\"\n", + "\n", + "df_train, df_test = model_selection.train_test_split(\n", + " df_final,\n", + " test_size=test_fraction,\n", + " random_state=42,\n", + " shuffle=True,\n", + " stratify=None,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "Pcrmeen9NtTV", + "metadata": { + "id": "Pcrmeen9NtTV" + }, + "outputs": [], + "source": [ + "df_train.reset_index(drop=True, inplace=True)\n", + "df_test.reset_index(drop=True, inplace=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "D5MgP2KsGcVy", + "metadata": { + "id": "D5MgP2KsGcVy" + }, + "outputs": [], + "source": [ + "# @title For the training set, set a large fraction of labels to 'unlabeled'.\n", + "\n", + "%%time\n", + "\n", + "# Set the same value for the unlabeled data here as you use in the training script.\n", + "unlabeled_data_value = -1 # @param {type:\"integer\"}\n", + "\n", + "# This fraction of labels will be discarded for unsupervisd training.\n", + "unlabeled_data_fraction = 0.98 # @param {type:\"number\"}\n", + "\n", + "unlabeled_size = int(len(df_train) * unlabeled_data_fraction) + 1\n", + "unlabeled_size = min(len(df_train), unlabeled_size)\n", + "print(f'Setting {unlabeled_size} rows to unlabeled.')\n", + "\n", + "unlabeled_idxs = np.random.choice(np.arange(len(df_train)), size=unlabeled_size, replace=False)\n", + "\n", + "df_train.loc[unlabeled_idxs, label_column] = unlabeled_data_value" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "vfCTzu2mL5cu", + "metadata": { + "id": "vfCTzu2mL5cu" + }, + "outputs": [], + "source": [ + "df_train.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "F-XooSE0L5aU", + "metadata": { + "id": "F-XooSE0L5aU" + }, + "outputs": [], + "source": [ + "df_test.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "OINfm7iRNrtF", + "metadata": { + "id": "OINfm7iRNrtF" + }, + "outputs": [], + "source": [ + "# @title Upload both train and test tables to BigQuery\n", + "\n", + "%%time\n", + "\n", + "# Train table\n", + "train_table_id = f'{PROJECT_ID}.{dataset_id}.{preprocessed_table_id}_train'\n", + "load_dataframe_to_bq_table(df=df_train, table_id=train_table_id)\n", + "\n", + "# Test table\n", + "test_table_id = f'{PROJECT_ID}.{dataset_id}.{preprocessed_table_id}_test'\n", + "load_dataframe_to_bq_table(df=df_test, table_id=test_table_id)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "z3OMjEUdZfbu", + "metadata": { + "id": "z3OMjEUdZfbu" + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "accelerator": "TPU", + "colab": { + "gpuType": "V28", + "private_outputs": true, + "provenance": [ + { + "file_id": "1wZ7A-CJitZFcKDwasbX9Fg56dUJe3Ab7", + "timestamp": 1714667887851 + }, + { + "file_id": "12lJ4F4pksHQvwyPbaJh_x0fruP0KcDol", + "timestamp": 1713895750845 + } + ], + "toc_visible": true + }, + "environment": { + "kernel": "conda-root-py", + "name": "workbench-notebooks.m119", + "type": "gcloud", + "uri": "us-docker.pkg.dev/deeplearning-platform-release/gcr.io/workbench-notebooks:m119" + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel) (Local)", + "language": "python", + "name": "conda-root-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}