Skip to content
/ FegTec Public

FegTec é uma empresa fictícia que quer transferir arquivos parquet contendo dados dos clientes da nuvem AWS para a Google Cloud

Notifications You must be signed in to change notification settings

Ahbiels/FegTec

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 

Repository files navigation

Projeto de migração de Dados da AWS para GCP

Arquitetura de referência

Visão geral do projeto

Este projeto tem como objetivo migrar dados armazenados em formato Parquet (formato de arquivo em coluna com otimizações para acelerar as consultas mais eficiente do que CSV ou JSON) em buckets do Amazon S3 para o Google Cloud Storage (GCS) utilizando o serviço Storage Transfer Service da Google Cloud Platform (GCP). Após a transferência dos dados, uma pipeline será configurada para processar e analisar esses dados diariamente, resultando na visualização de dashboards através do Looker.

Proposta de solução

  • Configuração dos arquivos na aws
    • Criar arquivos com o formato .parquet
    • Criar uma bucket no S3 para armazenar os arquivos
    • Criar um usuário com permissão de "ListObject" e "GetObject" para realizar a transferência
    • Criar credenciais (public and secret key) do usuário criado
  • Transferir os dados da bucket do S3 na AWS para o BigQuery
    • Usar o storage transfer para transferir dados do S3 para o Cloud Storage
    • Configuração da pipeline
    • Transferência Do Cloud Storage para o BigQuery
  • Criação e desenvolvimento no Looker para análise de dados
    • Preparação do ambiente
    • Permissoes
    • Criacao de metricas
    • Integração com o git para o deploy
    • Criação do relatório de consumo
    • Ativação e configuração do Looker Embed
    • Disponibilizar as tabelas e relatórios

Descrição do serviços

Abaixo, uma descrição dos serviços utilizados nesse projeto

Storage Transfer Service

O Storage Transfer Service é uma ferramenta do Google Cloud que permite transferir dados de maneira rápida e segura (os dados em transferência são criptografados por padrão) entre um ambiente on-premise, recursos da web (http/s) ou outro provedor de nuvem (como a AWS e Azure) para dentro do cloud storage da Google Cloud, ou ainda entre buckets do Cloud Storage. Uma das principais características desse serviço, além da velocidade de transferência de dados, é a criação de agendamentos de transferência, o que automatiza o processo de transferência de dados, reduzindo a necessidade de intervenção manual.

Cloud Storage

O Cloud Storage é um serviço de Data Lake (repositório altamente flexível e escalável que armazena dados brutos em diversos formatos) gerenciado pelo Google Cloud com a finalidade de armazenar dados não estruturados chamados de objetos blob (uma coleção de dados binários armazenados como uma única entidade), como vídeos, imagens, arquivos de texto, etc.

Este serviço é altamente escalável, durável e seguro, projetado para armazenar e acessar grandes quantidades de dados - O armazenamento é ilimitado, sem um tamanho mínimo de objetos - de qualquer lugar do mundo.

O armazenamento pode ser regional ou multiregional (abrangendo várias regiões)

O Cloud Storage conta com 4 tipos de classe de armazenamento, são elas:

  • Standard: classe de armazenamento padrão. Os projetos criados são iniciados dentro dessa classe. É utilizado com arquivos que são acessados com frequência e é mais caro que as outras classes.
    • Ex: website que exibe vídeos, e esses vídeos são armazenados dentro do cloud storage
  • nearline: Nearline é uma classe de armazenamento que é melhor para dados que você não espera acessar mais de uma vez por mês. É mais barato do que a classe padrão.
    • ex: backups
  • coldline: Coldline é uma classe de armazenamento que é melhor para dados que você planeja acessar no máximo uma vez a cada 90 dias ou trimestre. É mais barato do que a classe nearline
    • Ex: Imagens de servidores ou máquinas para caso de um disaster recovery
  • Archive: O archive é uma classe de armazenamento que é melhor para dados que você planeja acessar no máximo uma vez por ano. Muito mais barato comparado com as outras classes de armazenamento
    • Ex: Arquivos que não podem ser deletados mas que não são acessados.

| OBS: O preço do Cloud Storage vai depender da classe de armazenamento escolhido e também da região em que a bucket for criada, e pode ficar mais caro ou mais barato

Cloud Schedule

O Cloud Scheduler é um serviço totalmente gerenciado da Google Cloud que permite agendar a execução de tarefas de forma programática. Ou seja, você pode usá-lo para disparar tarefas programadas que chamam serviços específicos em horários definidos.

O Cloud Scheduler permite a automação de praticamente qualquer tarefa por meio de gatilhos. É Possível "engatilhar" um app do App Engine, Workflows via HTTP, enviar uma mensagem para um tópico do Pub/Sub ou acessar um endpoint HTTP (como uma API REST ou endpoint de um serviço web)

Cloud Pub/Sub

O Cloud Pub/Sub é um serviço de mensagens assíncronas (que não ocorre em tempo real, fazendo com que as mensagens sejam entregues em ordem) do Google Cloud Platform que permite a comunicação entre sistemas de forma desacoplada.

A principal vantagem desse serviço é criar uma comunicação entre serviços ou entre serviços e determinados recursos dentro ou fora da gcp sem que esses serviços estejam diretamente ligados (por isso o termo desacoplado)

Cloud Function

O cloud Functions é uma solução FaaS (Function as a Service) que executa código por meio de chamadas de funções em diversas linguagens de programação. É um serviço sem servidor (gerenciado pelo GCP), com escalabilidade automática (escala com base no aumento das solicitações) e com pagamento por utilização (paga apenas pelo o que utilizar).

O cloud functions é baseado em triggers, ou seja, executa determinadas ações com base em eventos - tanto de usuários, como de serviços.

Há duas formas principais de estabelecer uma comunicação com o cloud functions:

  • Através da integração rápida entre serviços da gcp
    • Ex: executar uma ação sempre que um objeto dentro de uma bucket do cloud storage for deletada
  • Por meio da URL externa disponibilizado, permitindo que usuários de fora da gcp utilizem o cloud function como uma API
    • Ex: uma aplicação front-end utilizar o cloud functions para salvar usuários cadastrados em um banco de dados.

Além de poder se conectar facilmente com serviços da Google Cloud, o Cloud Functions pode servir como uma ponte entre diferentes APIs, atuando como o chamado webhook, o que permite processar requisições HTTP de serviços de terceiros. Dessa forma, eu poderia por exemplo enviar um e-mail de confirmação após um pagamento bem-sucedido realizado pelo Strip (um dos mais conhecidos meios de pagamentos online, com milhares de clientes empresariais ao redor do mundo)

Cloud BigQuery

O BigQuery é um data warehouse (um repositório central de informações de múltiplas fontes - como sistemas transacionais, bases de dados relacionais, entre outras -, usado para análise de dados e tomada de decisão com base nessa análise) corporativo totalmente gerenciado que ajuda a gerenciar e analisar dados com recursos integrados, como aprendizado de máquina, análise geoespacial e business intelligence. A arquitetura sem servidor do BigQuery permite usar consultas SQL de dados em tabela para responder às maiores questões de uma organização sem a necessidade de gerenciamento de infraestrutura.

O Bigquery pode analisar petabytes de dados usando velocidades incrivelmente rápidas e sem sobrecarga operacional. Em casos de manutenção e atualizações, não há tempo de inatividade dentro do bigquery.

A arquitetura do BigQuery consiste em duas partes: uma camada de armazenamento que ingere, armazena e otimiza dados e uma camada de computação que fornece recursos de análise.

Looker

O Locker é uma plataforma de business intelligence (BI) e análise de dados oferecido pelo Google cloud, que permite criar dashboards interativos, relatórios e análises detalhadas, autoatendimento ou gerenciamento de BI, aplicativos personalizados com métricas confiáveis, entre outros; facilitando a compreensão de dados complexos e ajudando na tomada de decisões baseada em dados.

Temos três conceitos principais quando mexemos com o Looker, são eles:

  • LookML
    • O Looker possui uma linguagem de modelagem de dados chamada LookML, que permite criar modelos de dados semânticos. É possível usar o LookML para descrever dimensões, agregados, cálculos e relações de dados em seu banco de dados SQL.
  • Model
    • Um model é a camada semântica do Looker que controla a lógica e impede o acesso aos dados para os usuários. A camada semântica da Looker assegura que os dados sejam apresentados de uma forma que seja relevante e compreensível para os usuários finais, permitindo que os usuários explorem os dados de forma interativa, fazendo perguntas complexas e obtendo respostas em tempo real. A principal vantagem é poder controlar os dados e a lógica que os usuários irão visualizar
  • Looker Embed
    • O embedding envolve o uso do código iframe para colocar um objeto (por exemplo: gráficos, dashboards ou partes de uma interface de usuário do Looker) em um site, uma planilha ou em outro local fora do Looker
    • Existe 3 principais looker embedding:
      • Private embed: é feito gerando uma URL de incorporação para um dashboard ou visualização específica, e então essa URL é usada dentro de um elemento iFrame em sua página web ou aplicação.
        • Abordagem mais fácil e direta
        • Permite que o conteúdo do Looker seja exibido como se fosse parte integrante de uma aplicação.
        • Os visitantes do sites verão uma uma página de login do Looker dentro do iframe, precisando de credenciais do looker para continuar
      • SSO Embed: O SSO Embed permite que os usuários acessem conteúdo incorporado do Looker sem precisar realizar um segundo login
        • Tipo mais comum
        • Os usuários serão autenticados por meio da própria aplicação.
        • Funciona criando um URL especial do Looker que você usará em um iframe, contendo informações que você deseja compartilhar, juntamente com as permissões que o usuário irá ter. Em seguida, você assinará o URL com uma chave secreta fornecida pelo Looker.
      • Public embed looks: não exige nenhuma autenticação. permite apenas a incorporação de Looks, não de dashboards ou qualquer outra página do Looker.

Etapas do desenvolvimento

1. Criação dos arquivos .parquet

Para esse projeto, vamos criar 6 arquivos parquet para a realização da transferência. Mas na prática, esses arquivos seriam dados reais de clientes ou usuários externos. Para a criação desses 6 arquivos, vamos executar o seguinte código:

#file in ./code/generate_file_parquet.py

import pandas as pd
from faker import Faker #lib para gerar dados aleatórios
from random import choice

Status = ('Silver',"Gold","Platinum") #Tupla para os status que serão selecionados aleatoriamente
Estado = ("RJ","MG","MT","PR","SP","PA","AM") #Tupla para os estados que serão selecionados aleatoriamente

faker = Faker() #Inicializa a biblioteca

df_data = []

schema = ("name","state","job","birthday","status") #Cria o schema que será utilizado no dataframe

#Um for dentro do outro fará executar 6 vezes a criação de um dataframe com 10 linhas
for client in range(6): #Esse for vai ser responsável por criar 6 arquivos
  df_data.clear() #Limpa os dados para não haver concatenação 
  for id in range(10): #Esse for vai ser responsável por criar um dataframe com 10 linhas de dados aleatórios
    data_client = [
        faker.name(),
        choice(Estado),
        faker.job(),
        faker.date(),
        choice(Status)
    ]
    df_data.append(data_client)
  if client == 0:
    df = pd.DataFrame(df_data,columns=schema)
  else:
    df = pd.DataFrame(df_data,index=None)
  #Cria o arquivo parquet
  df.to_parquet(f'client_{client+1}.parquet',index=None)

#Visualiza um arquivo parquet
df = pd.read_parquet("client_2.parquet")
print(df.info())
print(df.dtypes)
print(df)

Com esse código, vamos criar arquivos parquet que tenham os esquemas de dados consistentes e compatíveis entre esses arquivos, isso será útil quando formos concatenar (juntar todos esses arquivos em um) os arquivos parquet.

Possíveis problemas que podem ocorrer ao concatenar arquivos sem uma consistência nos esquemas

  • Usar um schema definido em apenas um arquivo, e nos outros não, e depois tentar juntar
    • Se você tentar criar arquivos onde um tem um esquema definido e outros não, o pd.concat poderá lançar um erro ou gerar um dataframe com colunas incoerentes se os esquemas não forem compatíveis. Nesse caso, irá gerar dados do tipo NaN
    • Isso é tecnicamente inviável porque cada arquivo Parquet precisa ter um esquema para ser válido.
  • Não usar um schema definido em nenhum dos arquivos
    • Se todos os arquivos Parquet forem criados sem um esquema explícito, eles ainda terão esquemas implícitos baseados nos dados que eles contêm, permitindo assim a concatenação entre os DataFrames dos arquivos parquet. Porém, vale ressaltar que, um DataFrames sem um esquema explícito se torna mais complexo o gerenciamento e a análise
  • Ao tentar concatenar a um arquivo principal suas cópias
    • Ao criar cópias de um arquivo principal, e tentar efetuar a concatenação, pode ocorrer um problema de consistência entre os schemas. Em resumo, os schemas dentro das cópias do arquivo principal não serão respeitadas e tratadas como schema, podendo prejudicar o DataFrame gerado pela concatenação

2. Configuração na AWS

Primeiro, vamos nos serviços da Amazon S3 e criar uma bucket

  • Precisamos criar um nome único. No meu caso, usarei o nome data-parquet-list-clients-fagtec
  • Todas as outras configurações deixaremos como padrão

Após criada a bucket, vamos selecioná-la, ir em "Carregar" e carregar os seis arquivos parquet que criamos

Agora, vamos criar o usuário que usaremos para termos permissão de acessar a bucket criada através do Google Cloud Platform

  • Vamos nos serviços de IAM (Identity and Access Management), no painel esquerdo vamos em Políticas e depois em Criar política
  • Em Serviço, vamos selecionar o S3
  • Em ações permitidas, vamos selecionar o ListObject e o GetObject
    • Poderíamos marcar a opção "Todas as ações do S3", permitindo assim que tenhamos acesso completo ao serviço, porém, isso não é uma boa prática
    • Em nome, vamos digitar "Allow-transfer" e depois vamos clicar em Criar política
  • Agora, ainda no IAM, no painel esquerdo, vamos clicar em Usuários e depois em Criar usuário
    • Vamos atribuir um nome a esse usuário
    • Vamos em "Anexar políticas diretamente" e selecionar a política "Allow-transfer" que criamos
    • Vamos em Próximo e depois em criar usuário
  • Após a criação do u suário, vamos criar suas credenciais de segurança
    • Vamos selecionar o usuário, ir em "credenciais de segurança" e depois em "Criar chave de acesso"
    • Vamos selecionar "Serviços de terceiros", confirmar o checkbox abaixo, e clicar em "Próximo" -Em tags, vamos deixar vazias e continuar
    • E por fim, teremos nossa chave de acesso e chave secreta. Vamos baixar o arquivo contendo as credenciais e depois clicar em 'Concluído'
      • Após sair dessa tela, se não tivermos baixado o arquivo, não teremos mais acesso às credenciais, tendo que criar novamente.

3. Configuração do Google Storage Transfer Service

Para configurar o Cloud Storage Transfer, primeiramente devemos criar uma bucket dentro do Google Cloud Platform

No servico de cloud Storage, vamos criar a bucket data-parquet-list-clients-fagtec

Após isso, no serviço de Storage Transfer, vamos criar um transfer job

Action Option
Source Type Amazon S3
Destination Type Google Cloud Storage
Scheduling mode Batch
Bucket or folder Nome da bucket do Amazon S3
Authentication options Access key
Access key ID Chave de acesso criada no final do passo 2
Secret Access key Chave secreta criada no final do passo 2
Bucket Or Folder Nome da bucket do Google Cloud Storage

Por fim, vamos configurar quando que o job do Cloud Transfer será executado

Option
Run Every Day
Starting on
d/MM/YY 08:00 Am

Essa configuração irá executar o job todos os dias, a partir do dia seguinte, às 8 horas da manhã

  • Substituir o dd/mm/yy pela data equivalente ao dia seguinte

Por fim, em Choose Setting, vamos na seção When to overwrite e marcar a opção Alwals. E depois vamos criar o job

4. Processamento e Armazenamento de Dados

Erro de quota

Ao trabalhar com o BigQuery, há uma limitação na frequência de atualizações que você pode fazer nos metadados de uma tabela. Onde, nesse caso, o limite é de 5 operações por 10 segundos por tabela. Para evitar esse problema, usaremos a seguinte arquitetura de serviço

  • Cloud Scheduler: Configurar o Cloud Scheduler para disparar um evento todos os dias às 8 da manhã, de segunda a sexta.
    • Usaremos nesse serviço o modelo de cron para especificar o disparo do evento, para isso, usaremos o seguinte modelo:
      • "0 8 * * 1-5"
  • Pub/Sub: Configurar um tópico do Pub/Sub que será acionado pelo Cloud Scheduler.
    • Criaremos um tópico para que o Cloud schedule possa utilizar para triggar o Cloud Function
    • Assim, todos os dias, as 8 da manhã, o Cloud Function será chamado
  • Cloud Function: Criar uma função no Cloud Functions que será acionada pelo evento do Pub/Sub. A função irá ler os dados do Cloud Storage, depois irá processar esses dados usando Python e Pandas, e por fim, irá armazenar os dados processados no BigQuery.

Essa configuração permite um controle mais refinado sobre a taxa de chamadas às APIs, evitando exceder os limites de frequência e consequentes erros. Além disso, as operações são desacopladas, o que significa que modificações na programação ou no processamento não afetam outros componentes diretamente.

Código do Cloud Function:

import pandas as pd
import pandas_gbq
from google.cloud import storage
from google.cloud import bigquery
from datetime import datetime

def cs_to_bq(event, context):
    today = datetime.now()

    today_year = today.year
    today_month = today.month
    today_day = today.day
    bucket_name = 'upload_objects_data_engineer_challenger'

    dataset_id = "dados_by_looker"
    project_id = "projeto-estudos-415711"
    table_id = "{}.{}.{}".format(project_id, dataset_id, "Clients")

    storage_client = storage.Client()
    client = bigquery.Client()

    # source_bucket = storage_client.bucket(bucket_name)
    blobs = storage_client.list_blobs(bucket_name)
    
    dfs = []
    print(0)
    for blob in blobs:
        uri = 'gs://' + bucket_name + "/" + blob.name
        df = pd.read_parquet(uri, engine='pyarrow')
        dfs.append(df)

    today = pd.Timestamp.now()
    timestamp_today = f'{today.year}-{today.month}-{today.day} {today.hour}:{today.minute}:{today.second}'

    combined_df = pd.concat(dfs, ignore_index=True)
    combined_df["birthday"] = pd.to_datetime(combined_df["birthday"])
    print("-"*10)
    print(combined_df["birthday"].dt.year)
    print("-"*10)
    combined_df['timestamp_processamento_gcp'] = pd.to_datetime(timestamp_today)
    
    year = combined_df["birthday"].dt.year
    month = combined_df["birthday"].dt.month
    day = combined_df["birthday"].dt.day

    age = today_year - year

    # Criando uma máscara para verificar se ainda não chegou o aniversário no ano corrente
    not_yet_birthday = (today_month < month) | ((today_month == month) & (today_day < day))
    # Aplicando a máscara: se não chegou ao aniversário, subtrai 1
    age = age - not_yet_birthday.astype(int)
    combined_df["age"] = pd.to_numeric(age)
    combined_df["year"] = pd.to_numeric(year)

    client.delete_table(table_id, not_found_ok=True)  # Make an API request.
    print("Deleted table '{}'.".format(table_id))
    pandas_gbq.to_gbq(combined_df, table_id, project_id=project_id)
    print(combined_df)

About

FegTec é uma empresa fictícia que quer transferir arquivos parquet contendo dados dos clientes da nuvem AWS para a Google Cloud

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages