Por que pipelines de dados importam para ML
Modelos de ML dependem de dados limpos, atualizados e rastreáveis. Em produção, os dados raramente chegam prontos — vêm de APIs, arquivos batch, streams de transações e bases legadas. Um pipeline de dados ETL (Extract, Transform, Load) garante que o que alimenta o treino e o scoring passa por limpeza, validação e tipagem consistente.
Na AWS, três serviços cobrem a maior parte dos casos: S3 para storage, Glue para ETL em escala, Lambda para processamento event-driven, e Redshift como data warehouse para analytics e feature engineering. Este post mostra como encaixar esses blocos em um fluxo coerente.
Arquitetura do pipeline
O fluxo típico para dados transacionais em fintech:
Arquivos brutos chegam no S3 (CSV, JSON, Parquet). Eventos disparam o próximo estágio via EventBridge ou schedule.
Cada estágio tem responsabilidade clara: S3 armazena, Glue transforma em batch, Lambda reage a eventos pontuais, Redshift serve SQL para analytics e treino.
Ingestão: S3 como data lake
S3 é o ponto de entrada. Organize por camadas:
s3://data-lake/
├── raw/ # dados brutos, imutáveis
│ └── transactions/year=2025/month=03/
├── staging/ # pós-limpeza Glue
└── curated/ # prontos para analytics/ML
Upload via SDK, SFTP, ou Kinesis Firehose para streaming:
import boto3
import json
from datetime import datetime
s3 = boto3.client("s3")
def ingest_transaction(event: dict):
now = datetime.utcnow()
key = (
f"raw/transactions/"
f"year={now.year}/month={now.month:02d}/"
f"{event['transaction_id']}.json"
)
s3.put_object(
Bucket="data-lake",
Key=key,
Body=json.dumps(event),
ContentType="application/json",
)
Particionar por data (year=/month=) acelera queries no Athena e Glue — o motor lê apenas as partições relevantes.
ETL com AWS Glue
Glue executa jobs Spark (PySpark) gerenciados — sem cluster para provisionar. O Glue Data Catalog registra schemas automaticamente.
Job PySpark de limpeza:
# glue_job_clean_transactions.py
import sys
from awsglue.transforms import ApplyMapping
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Ler dados brutos do Catálogo
raw = glueContext.create_dynamic_frame.from_catalog(
database="data_lake",
table_name="raw_transactions",
)
# Tipagem e seleção de colunas
mapped = ApplyMapping.apply(
frame=raw,
mappings=[
("transaction_id", "string", "transaction_id", "string"),
("amount", "string", "amount", "double"),
("customer_id", "string", "customer_id", "string"),
("timestamp", "string", "event_time", "timestamp"),
],
)
# Deduplicação e filtro de qualidade
df = mapped.toDF().dropDuplicates(["transaction_id"])
df = df.filter(df.amount > 0)
# Escrever Parquet particionado em staging
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(df, glueContext, "clean"),
connection_type="s3",
connection_options={"path": "s3://data-lake/staging/transactions/"},
format="parquet",
)
Agende o job via Glue Triggers (cron) ou EventBridge quando novos arquivos chegam no S3.
Processamento event-driven com Lambda
Lambda complementa Glue em tarefas leves e event-driven — validação, enriquecimento, notificação de erro:
# lambda_validate_upload.py
import boto3
import json
s3 = boto3.client("s3")
sns = boto3.client("sns")
REQUIRED_FIELDS = ["transaction_id", "amount", "customer_id", "timestamp"]
def handler(event, context):
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
obj = s3.get_object(Bucket=bucket, Key=key)
records = json.loads(obj["Body"].read())
errors = []
for record in records:
missing = [f for f in REQUIRED_FIELDS if f not in record]
if missing:
errors.append({"id": record.get("transaction_id"), "missing": missing})
if errors:
sns.publish(
TopicArn="arn:aws:sns:us-east-1:123456789:data-quality-alerts",
Subject=f"Data quality failure: {key}",
Message=json.dumps(errors, indent=2),
)
return {"statusCode": 422, "body": json.dumps(errors)}
return {"statusCode": 200, "body": "valid"}
Configure o trigger S3 → Lambda no console ou Terraform. Para volumes altos, prefira SQS entre S3 e Lambda — evita throttling.
Analytics e features no Redshift
Redshift recebe dados curados para SQL analytics e feature engineering:
-- Criar tabela de fatos a partir do staging
COPY analytics.fact_transactions
FROM 's3://data-lake/staging/transactions/'
IAM_ROLE 'arn:aws:iam::123456789:role/RedshiftCopyRole'
FORMAT AS PARQUET;
-- Feature agregada para ML: gastos por cliente nos últimos 30 dias
CREATE TABLE ml_features.customer_tx_30d AS
SELECT
customer_id,
COUNT(*) AS tx_count_30d,
SUM(amount) AS tx_sum_30d,
AVG(amount) AS tx_mean_30d,
MAX(amount) AS tx_max_30d
FROM analytics.fact_transactions
WHERE event_time >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY customer_id;
Redshift Spectrum permite query direto no S3 sem COPY — útil para exploração. Para produção, COPY ou materialized views garantem performance previsível.
Exporte features para o treino:
import redshift_connector
import pandas as pd
conn = redshift_connector.connect(
host="my-cluster.region.redshift.amazonaws.com",
database="analytics",
user="ml_user",
password="...",
)
df = pd.read_sql("SELECT * FROM ml_features.customer_tx_30d", conn)
df.to_parquet("s3://data-lake/curated/features/customer_tx_30d.parquet")
Orquestração com Airflow ou EventBridge
Pipelines completos precisam de orquestração. Duas opções comuns:
Apache Airflow (MWAA na AWS) — DAGs Python com dependências explícitas:
from airflow import DAG
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
with DAG("daily_etl", schedule_interval="0 6 * * *") as dag:
run_glue = GlueJobOperator(
task_id="clean_transactions",
job_name="clean-transactions",
)
load_redshift = RedshiftSQLOperator(
task_id="copy_to_redshift",
sql="sql/copy_transactions.sql",
)
run_glue >> load_redshift
EventBridge — regras baseadas em eventos S3 para pipelines mais simples sem Airflow.
Além do básico: pontos de atenção
Boas práticas e armadilhas em pipelines AWS:
- Camadas raw/staging/curated — nunca sobrescreva raw; reprodutibilidade depende de dados imutáveis
- IAM least privilege — Glue, Lambda e Redshift com roles separadas e mínimas
- Custo Glue — jobs Spark têm custo por DPU-hora; otimize partições e evite scans completos
- Lambda timeout — máximo 15 min; jobs pesados vão para Glue, não Lambda
- Schema evolution — novos campos em raw devem ser tratados no Glue com defaults, não quebrar o pipeline
- Idempotência — jobs devem poder rerodar sem duplicar dados (dedup por chave natural)
Integração com MLOps:
- DVC remote no S3 aponta para
curated/—dvc pullno CI puxa features prontas - Glue job gera
metrics.jsonde qualidade (null rate, row count) — gate no CI - Features versionadas por partição de data — reprodutibilidade temporal
Conclusão
Um pipeline de dados na AWS segue o fluxo S3 → Glue → Lambda → Redshift: ingestão imutável, transformação em batch, validação event-driven e analytics SQL. Essa base alimenta feature engineering, treino e monitoramento de drift.
Comece com raw particionado no S3, um job Glue de limpeza, e COPY para Redshift. Adicione Lambda para qualidade e Airflow para orquestração conforme o volume cresce. Com dados confiáveis no lugar, o ciclo MLOps completo — versionamento, CI/CD e monitoramento — tem fundação sólida.