Skip to content

The goal of this project is to build data pipeline for gathering real-time carpark lots availability and weather datasets from Data.gov.sg. These data are extracted via API, and stored them in the S3 bucket before ingesting them into the Data Warehouse.

Notifications You must be signed in to change notification settings

aluxh/carpark-sg-data-pipeline

Repository files navigation

Parking Lots Availability and Forecasting App

The goal of this project is to build data pipeline for gathering real-time carpark lots availability and weather datasets from Data.gov.sg. These data are extracted via API, and stored them in the S3 bucket before ingesting them into the Dare Warehouse. These data will be used to power the mechanics of the Parking Lots Availability and Forecasting App

The objectives are to:

  1. Building an ETL pipeline using Apache Airflow to extract data via API and store them in AWS S3.
  2. Ingesting data from AWS S3, and staging them in Redshift, and
  3. Transforming data into a set of dimensional and fact tables.

Although it is outside of this project scope, the plan is to use the data to run exploration and train machine learning models. Then, using the data and models to power the Parking Lots Availability and Forecasting Web App that runs on R Shiny App.

Getting started

The data pipeline is developed using Docker containers, where we could deployed the technology stack locally and in the cloud. The technology stack used for building the pipeline are:

  • Docker Containers:
    • Postgres: Deployed 2 instances, one for the Airflow metadata db, and the other for initial development of data warehouse.
    • Pgadmin: PostgreSQL Db administrative tool
    • Jyupter Notebook: Development environment for automating the data warehouse deployment, developing ETL codes, and run data exploration.
    • Airflow: Designing and deploying codes as workflows.
  • AWS S3: AWS S3 for building the Data Lake
  • AWS Redshift: AWS Redshift for Data Warehouse

Data Pipeline

This is how the pipeline looks like in the Apache Airflow:

data pipeline for carpark sg

Prerequisites

  • You need to have docker installed on your local device or AWS EC2 containers before you beging
  • You need to create an admin user in your AWS account, then include your key and secret to the access.cfg.
  • You need to include all the parameters in dwh.cfg for connecting to your redshift cluster and database.

Note: In the DWH section in the dwh.cfg, we have included some configurations for creating the Redshift cluster automatically in create_redshift.py script. You can modify the configuration for your needs.

Note: We have included some configuration of the tech stack in docker-compose.yml that I'm using for building the pipelines. You can modify the file to customize your pipeline.

Folders and Files

  • dags: Contains all the airflow dags.
    • carparksg_dag.py: This contains the data pipeline for this project.
  • plugins:
    • helpers: Folders storing helper functions for the data pipeline.
      • getCarpark.py: Helper function to extract carpark availability data via API, transform and store the dataset in S3 buckets in CSV format.
      • getCarparkInfo.py: Helper function to extract the information about each carpark via API, transform and store the dataset in S3 buckets in CSV format.
      • getWeather.py: Helper function to extract temperature and rainfall data via API, transform and store the dataset in S3 buckets in CSV format.
      • getWeatherStation.py: Helper function to extract the information about the weather stations via API, transform and store the dataset in S3 buckets in CSV format.
      • sql_queries.py: Helper function to transform data from staging tables to dimension and fact tables in AWS Redshift.
    • operators: Folders storing Airflow custom operators for the data pipeline.
      • data_quality.py: The data quality operator to run checks on the data stored in the Redshift.
      • facts_calculator.py: The custom operator to run statistic summary on carpark availability with daily partitioning.
      • has_rows.py: The custom operator to check and ensure that the table doesn't contain empty rows.
      • load_dimension.py: The custom operator to load data from staging tables to dimension tables in AWS Redshift
      • load_fact.py: The custom operator to load data from staging and dimension tables to fact tables in AWS Redshift
      • load_to_redshift.py: The custom operator to load data from S3 to staging tables in AWS Redshift
      • load_to_s3.py: The custom operator to load data from API calls, transform and saved them in AWS S3 buckets.
  • logs: Folder for storing airflow logs.
  • notebooks: Folder for storing the development codes
    • create_redshift.py: Python script for creating a Redshift cluster.
    • create_tables.py: Python script for creating tables in the Redshift data warehouse.
    • delete_redshift.py: Python script for deleting the Redshift cluster
    • sql_stmt_create_tables.py: SQL helper functions for create_tables.py to create tables in Redshift.
    • access.cfg: Configuration file that contains the AWS access and secret keys
    • dwh.cfg: Configuration file that contains the data warehouse configuration.
    • etl.ipynb: ETL development notebook
    • redshift_connect.ipynb: Redshift Configuration/Connection notebook
  • create_tables.sql: SQL scripts for creating tables in PostgreSQL or Redshift
  • docker-compose.yml: Docker configuration file for deploying containers (technology stacks)
  • requirements.txt: Package to be installed in the airflow container

Setup

Run the docker environment from the carpark-sg folder:

docker-compose up -d

> Creating network "carpark-sg_default" with the default driver
> Creating carpark-sg_postgres_1 ... done
> Creating carpark-sg_pg-data_1  ... done
> Creating carpark-sg_jupyter_1  ... done
> Creating carpark-sg_webserver_1 ... done
> Creating carpark-sg_pgadmin_1   ... done

After it stops, you can point your browser to:

  • localhost:8080 to access Airflow
  • localhost:8888 to access Jyupter Notebook
  • localhost:80 to access pgadmin

From Jyupter Notebook, create a new terminal and run:

python create_redshift.py

> 1. Fetch params
> 2. Setup Clients and resources
> 3.1 Creating a new IAM Role
> 3.2 Attaching Policy
> 3.3 Get the IAM role ARN
> 4. Creating Redshift Cluster
> Redshift is creating
> ..
> Redshift is available
> ..
>                  Key                                              Value
> 0  ClusterIdentifier                                         dwhcluster
> 1           NodeType                                          dc2.large
> 2      ClusterStatus                                          available
> 3     MasterUsername                                            dwhuser
> 4             DBName                                                dwh
> 5           Endpoint  {'Address': 'dwhcluster.crttik8cimnv.us-west-2...
> 6              VpcId                                       vpc-789b3500
> 7      NumberOfNodes                                                  4
> ..
> 5. Setup incoming TCP port...
> ..
> DWH_ENDPOINT :: dwhcluster.crttik8cimnv.us-west-2.redshift.amazonaws.com
> DWH_ROLE_ARN :: arn:aws:iam::996990424048:role/dwhRole

The script will stop after the cluster is created. Then, you can move on to setup the database and tables:

python create_tables.py

> Creating Tables in Redshift
> Tables are created in Redshift

After the tables is setup, you can access airflow via localhost:8080, and begin the data pipeline by switching on the carpark_sg_dag on the dashboard.

Delete the Redshift cluster

If you decide to stop using Redshift, you can delete the cluster by running:

python delete_redshift.py

> 1. Fetch params
> 2. Setup Clients
> 3. Deleting Redshift Clusters
> Redshift is deleting
> Redshift is deleting
..
> Redshift is deleted
> 4. Clean up Resources

The script will stop once Redshift cluster is deleted.

Schema for staging tables

We gathered data from carpark availability, carpark info, temperature and rainfall dataset, and dump all of them to the staging servers:

staging_temperature

NAME DATA TYPE
date_time TIMESTAMPTZ NOT NULL
station_id VARCHAR
temperature DOUBLE PRECISION

staging_rainfall

NAME DATA TYPE
date_time TIMESTAMPTZ NOT NULL
station_id VARCHAR
rainfall DOUBLE PRECISION

staging_carpark_availability

NAME DATA TYPE
date_time TIMESTAMPTZ NOT NULL
carpark_id VARCHAR
lot_type VARCHAR
lots_available INTEGER
total_lots INTEGER

staging_weather_station_info

NAME DATA TYPE
station_id VARCHAR
station_location VARCHAR
station_latitude DOUBLE PRECISION
station_longitude DOUBLE PRECISION

staging_carpark_info

NAME DATA TYPE
carpark_id VARCHAR
carpark_location VARCHAR
carpark_latitude DOUBLE PRECISION
carpark_longitude DOUBLE PRECISION

temperature_events

Temperature events. Setting station_id as FOREIGN KEY referencing to weather_statons. On top of that, configure the distribution style as KEY and compound sort key using date_time and station_id to improve join and group by performance.

NAME DATA TYPE
date_time TIMESTAMPTZ NOT NULL
station_id VARCHAR REFERENCES weather_stations (station_id)
temperature DOUBLE PRECISION

rainfall_events

Rainfall events. Setting station_id as FOREIGN KEY referencing to weather_statons table. On top of that, configure the distribution style as KEY and compound sort key using date_time and station_id to improve join and group by performance.

NAME DATA TYPE
date_time TIMESTAMPTZ NOT NULL
station_id VARCHAR REFERENCES weather_stations (station_id)
rainfall DOUBLE PRECISION

carpark_availability

Carpark Availability events. Setting carpark_id as FOREIGN KEY referencing to carpark table. On top of that, configure the distribution style as KEY and compound sort key using date_time and carpark_id to improve join and group by performance.

NAME DATA TYPE
date_time TIMESTAMPTZ NOT NULL
carpark_id VARCHAR REFERENCES carpark (carpark_id)
lots_available INTEGER

weather_station

Weather stations in weather events database. Setting station_id as PRIMARY KEY constraint

NAME DATA TYPE
station_id VARCHAR PRIMARY KEY
station_location VARCHAR
station_latitude DOUBLE PRECISION
station_longitude DOUBLE PRECISION

carpark

Carparks in carpark availability database. Setting carpark_id as PRIMARY KEY constraint

NAME DATA TYPE
carpark_id VARCHAR PRIMARY KEY
carpark_location VARCHAR
carpark_latitude DOUBLE PRECISION
carpark_longitude DOUBLE PRECISION
total_lots INTEGER

time

Timestamps of records in carpark availability broken down into specific units. Setting start_time as PRIMARY KEY.

NAME DATA TYPE
date_time TIMESTAMPTZ PRIMARY KEY
hour INTEGER
day INTEGER
week INTEGER
month INTEGER
weekday INTEGER

About

The goal of this project is to build data pipeline for gathering real-time carpark lots availability and weather datasets from Data.gov.sg. These data are extracted via API, and stored them in the S3 bucket before ingesting them into the Data Warehouse.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages