← Blog

Pipelines de dados na AWS: Glue, Lambda e Redshift na prática

2025-03-12AWSETLData Engineering

Por que pipelines de dados importam para ML

Modelos de ML dependem de dados limpos, atualizados e rastreáveis. Em produção, os dados raramente chegam prontos — vêm de APIs, arquivos batch, streams de transações e bases legadas. Um pipeline de dados ETL (Extract, Transform, Load) garante que o que alimenta o treino e o scoring passa por limpeza, validação e tipagem consistente.

Na AWS, três serviços cobrem a maior parte dos casos: S3 para storage, Glue para ETL em escala, Lambda para processamento event-driven, e Redshift como data warehouse para analytics e feature engineering. Este post mostra como encaixar esses blocos em um fluxo coerente.

Arquitetura do pipeline

O fluxo típico para dados transacionais em fintech:

Pipeline de dados AWS

Arquivos brutos chegam no S3 (CSV, JSON, Parquet). Eventos disparam o próximo estágio via EventBridge ou schedule.

ins3://raw/transactions/
outs3://staging/

Cada estágio tem responsabilidade clara: S3 armazena, Glue transforma em batch, Lambda reage a eventos pontuais, Redshift serve SQL para analytics e treino.

Ingestão: S3 como data lake

S3 é o ponto de entrada. Organize por camadas:

s3://data-lake/
├── raw/           # dados brutos, imutáveis
│   └── transactions/year=2025/month=03/
├── staging/       # pós-limpeza Glue
└── curated/       # prontos para analytics/ML

Upload via SDK, SFTP, ou Kinesis Firehose para streaming:

import boto3
import json
from datetime import datetime

s3 = boto3.client("s3")

def ingest_transaction(event: dict):
    now = datetime.utcnow()
    key = (
        f"raw/transactions/"
        f"year={now.year}/month={now.month:02d}/"
        f"{event['transaction_id']}.json"
    )
    s3.put_object(
        Bucket="data-lake",
        Key=key,
        Body=json.dumps(event),
        ContentType="application/json",
    )

Particionar por data (year=/month=) acelera queries no Athena e Glue — o motor lê apenas as partições relevantes.

ETL com AWS Glue

Glue executa jobs Spark (PySpark) gerenciados — sem cluster para provisionar. O Glue Data Catalog registra schemas automaticamente.

Job PySpark de limpeza:

# glue_job_clean_transactions.py
import sys
from awsglue.transforms import ApplyMapping
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Ler dados brutos do Catálogo
raw = glueContext.create_dynamic_frame.from_catalog(
    database="data_lake",
    table_name="raw_transactions",
)

# Tipagem e seleção de colunas
mapped = ApplyMapping.apply(
    frame=raw,
    mappings=[
        ("transaction_id", "string", "transaction_id", "string"),
        ("amount", "string", "amount", "double"),
        ("customer_id", "string", "customer_id", "string"),
        ("timestamp", "string", "event_time", "timestamp"),
    ],
)

# Deduplicação e filtro de qualidade
df = mapped.toDF().dropDuplicates(["transaction_id"])
df = df.filter(df.amount > 0)

# Escrever Parquet particionado em staging
glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrame.fromDF(df, glueContext, "clean"),
    connection_type="s3",
    connection_options={"path": "s3://data-lake/staging/transactions/"},
    format="parquet",
)

Agende o job via Glue Triggers (cron) ou EventBridge quando novos arquivos chegam no S3.

Processamento event-driven com Lambda

Lambda complementa Glue em tarefas leves e event-driven — validação, enriquecimento, notificação de erro:

# lambda_validate_upload.py
import boto3
import json

s3 = boto3.client("s3")
sns = boto3.client("sns")

REQUIRED_FIELDS = ["transaction_id", "amount", "customer_id", "timestamp"]

def handler(event, context):
    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]

    obj = s3.get_object(Bucket=bucket, Key=key)
    records = json.loads(obj["Body"].read())

    errors = []
    for record in records:
        missing = [f for f in REQUIRED_FIELDS if f not in record]
        if missing:
            errors.append({"id": record.get("transaction_id"), "missing": missing})

    if errors:
        sns.publish(
            TopicArn="arn:aws:sns:us-east-1:123456789:data-quality-alerts",
            Subject=f"Data quality failure: {key}",
            Message=json.dumps(errors, indent=2),
        )
        return {"statusCode": 422, "body": json.dumps(errors)}

    return {"statusCode": 200, "body": "valid"}

Configure o trigger S3 → Lambda no console ou Terraform. Para volumes altos, prefira SQS entre S3 e Lambda — evita throttling.

Analytics e features no Redshift

Redshift recebe dados curados para SQL analytics e feature engineering:

-- Criar tabela de fatos a partir do staging
COPY analytics.fact_transactions
FROM 's3://data-lake/staging/transactions/'
IAM_ROLE 'arn:aws:iam::123456789:role/RedshiftCopyRole'
FORMAT AS PARQUET;

-- Feature agregada para ML: gastos por cliente nos últimos 30 dias
CREATE TABLE ml_features.customer_tx_30d AS
SELECT
    customer_id,
    COUNT(*) AS tx_count_30d,
    SUM(amount) AS tx_sum_30d,
    AVG(amount) AS tx_mean_30d,
    MAX(amount) AS tx_max_30d
FROM analytics.fact_transactions
WHERE event_time >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY customer_id;

Redshift Spectrum permite query direto no S3 sem COPY — útil para exploração. Para produção, COPY ou materialized views garantem performance previsível.

Exporte features para o treino:

import redshift_connector
import pandas as pd

conn = redshift_connector.connect(
    host="my-cluster.region.redshift.amazonaws.com",
    database="analytics",
    user="ml_user",
    password="...",
)

df = pd.read_sql("SELECT * FROM ml_features.customer_tx_30d", conn)
df.to_parquet("s3://data-lake/curated/features/customer_tx_30d.parquet")

Orquestração com Airflow ou EventBridge

Pipelines completos precisam de orquestração. Duas opções comuns:

Apache Airflow (MWAA na AWS) — DAGs Python com dependências explícitas:

from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

with DAG("daily_etl", schedule_interval="0 6 * * *") as dag:
    run_glue = GlueJobOperator(
        task_id="clean_transactions",
        job_name="clean-transactions",
    )
    load_redshift = RedshiftSQLOperator(
        task_id="copy_to_redshift",
        sql="sql/copy_transactions.sql",
    )
    run_glue >> load_redshift

EventBridge — regras baseadas em eventos S3 para pipelines mais simples sem Airflow.

Além do básico: pontos de atenção

Boas práticas e armadilhas em pipelines AWS:

  • Camadas raw/staging/curated — nunca sobrescreva raw; reprodutibilidade depende de dados imutáveis
  • IAM least privilege — Glue, Lambda e Redshift com roles separadas e mínimas
  • Custo Glue — jobs Spark têm custo por DPU-hora; otimize partições e evite scans completos
  • Lambda timeout — máximo 15 min; jobs pesados vão para Glue, não Lambda
  • Schema evolution — novos campos em raw devem ser tratados no Glue com defaults, não quebrar o pipeline
  • Idempotência — jobs devem poder rerodar sem duplicar dados (dedup por chave natural)

Integração com MLOps:

  • DVC remote no S3 aponta para curated/dvc pull no CI puxa features prontas
  • Glue job gera metrics.json de qualidade (null rate, row count) — gate no CI
  • Features versionadas por partição de data — reprodutibilidade temporal

Conclusão

Um pipeline de dados na AWS segue o fluxo S3 → Glue → Lambda → Redshift: ingestão imutável, transformação em batch, validação event-driven e analytics SQL. Essa base alimenta feature engineering, treino e monitoramento de drift.

Comece com raw particionado no S3, um job Glue de limpeza, e COPY para Redshift. Adicione Lambda para qualidade e Airflow para orquestração conforme o volume cresce. Com dados confiáveis no lugar, o ciclo MLOps completo — versionamento, CI/CD e monitoramento — tem fundação sólida.

Referencias

  1. AWS Glue — Documentação
  2. AWS Glue — PySpark jobs
  3. AWS Lambda — Documentação
  4. Amazon S3 — Best practices
  5. Amazon Redshift — COPY command
  6. Amazon EventBridge — Documentação
  7. Apache Airflow — AWS providers