Skip to content

gakas14/Serverless-data-stream-with-kinesis-data-stream-kinesis-firehouse-S3-and-snowflake

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 

Repository files navigation

Serverless-data-stream-with-kinesis-data-stream-kinesis-firehouse-S3-and-snowflake

Kinesis_ApiGateway_Snowflake

Use Postman to make an API call; the Amazon API Gateway will trigger a Lambda function; the function will write into the s3 bucket; then, Snowpipe will start to write the data into a Snowflake. To not activate Snowpipe for every API call and optimize the process, we will use Amazon Kinesis data Stream to collect the data first (dump the data based on the buffer size or the time or if there are few messages), then use Kinesis Firehouse to leave the data into s3.

Step 1: Create the Lambda Role

Screen Shot 2024-01-02 at 12 55 54 PM

Step 2: Create the lambda function to read the data from API Gateway and put it in Kinesis Data Stream

Screen Shot 2024-01-02 at 12 57 50 PM
Screen Shot 2024-01-02 at 12 57 59 PM
		import json
		import datetime
		import random
		import boto3
		
		client = boto3.client('kinesis')
		
		def lambda_handler(event, context):
		    TODO implement
		    data = json.dumps(event['body'])  
		    client.put_record(StreamName="project3_kinesis_apigateway", Data=data, PartitionKey="1")
		    print("Data Inserted")

Step 3: Create API Gateway and make the integration with AWS Lambda created in Step 2

  • Create a simple HTTP API and integrate it with the lambda function.
Screen Shot 2024-01-02 at 1 03 12 PM
  • Configure the route: POST method.
Screen Shot 2024-01-02 at 1 07 18 PM
Screen Shot 2024-01-02 at 1 07 37 PM - Screen Shot 2024-01-02 at 1 08 08 PM - Screen Shot 2024-01-02 at 1 10 11 PM -

Get the API endpoint " https://fwpwl5qova.execute-api.us-east-1.amazonaws.com/dev/project3_lambda1_kinesis_apigateway"

Step 4: Create a Kinesis Data Stream to consume data from AWS Lambda created in Step 2

Screen Shot 2024-01-02 at 1 23 24 PM

Step 5: Create a second Lambda function for processing the data before s3 dump

Screen Shot 2024-01-02 at 1 36 27 PM

This second lambda function will transform the data( decode the data to put a delimiter in between each record) to make it easy to put on Snowflake. We will convert the binary data into string data. Then, add a \n to make a new line.

		import json
		import boto3
		import base64
		output = []
		
		def lambda_handler(event, context):
		    print(event)
		    for record in event['records']:
		        payload = base64.b64decode(record['data']).decode('utf-8')
		        print('payload:', payload)
		        
		        row_w_newline = payload + "\n"
		        print('row_w_newline type:', type(row_w_newline))
		        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
		        
		        output_record = {
		            'recordId': record['recordId'],
		            'result': 'Ok',
		            'data': row_w_newline
		        }
		        output.append(output_record)
		
		    print('Processed {} records.'.format(len(event['records'])))
		    
		    return {'records': output}

Step 6: Create a S3 bucket that will be the kinesis Firehose destination

S3: Screen Shot 2024-01-02 at 1 40 26 PM

Step 7: Create Kinesis Firehose

  • Create kinesis firehouse
Screen Shot 2024-01-02 at 1 40 45 PM
Screen Shot 2024-01-02 at 1 41 48 PM
  • Activate Transform source records with AWS Lambda.
Screen Shot 2024-01-02 at 1 43 21 PM

Step 8: Create a Snowflake role.

Screen Shot 2024-01-02 at 2 02 22 PM

Storage Integration Creation: Copy the snowflake role arm into the snowflake console and copy the s3 bucket arm role.

		create warehouse s3_to_snowflake_wh;
		use s3_to_snowflake_wh;
		--Specify the role
		use role ACCOUNTADMIN;
		
		drop database if exists s3_to_snowflake;
		
		--Database Creation 
		create database if not exists s3_to_snowflake;
		
		--Specify the active/current database for the session.
		use s3_to_snowflake;
		
		
		--Storage Integration Creation
		create or replace storage integration s3_int
		TYPE = EXTERNAL_STAGE
		STORAGE_PROVIDER = S3
		ENABLED = TRUE 
		STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::200105849428:role/project3_snwoflake_role'
		STORAGE_ALLOWED_LOCATIONS = ('s3://gakas-project3-kinesis-apigateway')
		COMMENT = 'Testing Snowflake getting refresh or not';
		
		--Describe the Integration Object
		DESC INTEGRATION  s3_int;
		
		
		--External Stage Creation
		
		create stage mystage
		  url = 's3://gakas-project3-kinesis-apigateway'
		  storage_integration = s3_int;
		
		list @mystage;
		
		--File Format Creation
		create or replace file format my_json_format
		type = json;
		
		 
		--Table Creation
		create or replace external table s3_to_snowflake.PUBLIC.Person with location = @mystage file_format ='my_json_format';
		show external tables;
		
		select * from s3_to_snowflake.public.person;
		 
		 
		--Query the table
		select parse_json(VALUE):Age as Age  , trim(parse_json(VALUE):Name,'"') as Name from  s3_to_snowflake.PUBLIC.Person;

Copy the 'STORAGE_AWS_IAM_USER_ARN' ARN and 'STORAGE_AWS_EXTERNAL_ID' from Snowflake and update the Trust Policy in the Snowflake role in IAM.

Screen Shot 2024-01-02 at 2 19 25 PM

Create an event notification for the s3 bucket.

Screen Shot 2024-01-02 at 2 26 11 PM
Screen Shot 2024-01-02 at 2 26 17 PM
Screen Shot 2024-01-02 at 2 26 26 PM

Step 9: Test the pipeline

  • Open Postman and pass a series of records {"Name": "wang", "Age":4} {"Name": "ali", "Age":32} {"Name": "li", "Age":54} {"Name": "Moctar", "Age":44} {"Name": "she", "Age":86} {"Name": "Abdoul", "Age":22} {"Name": "lie", "Age":34} {"Name": "Cheng", "Age":55} {"Name": "Karim", "Age":23} {"Name": "ram", "Age":34} {"Name": "li", "Age":23} {"Name": "she", "Age":36}

  • Check Kinesis Data Stream and Kinesis Firehose metrics: Screen Shot 2024-01-02 at 2 32 54 PM

Screen Shot 2024-01-02 at 3 27 12 PM
  • Check S3
Screen Shot 2024-01-02 at 3 27 30 PM
  • Check the data into snowflake
Screen Shot 2024-01-02 at 3 07 30 PM

About

Serverless Data pipeline with Api Gateway, AWS lambda, Amazon Kinesis, S3 and Snoflake

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published