O Prefect é uma plataforma moderna de orquestração de workflows que se destaca pela simplicidade e pela abordagem "Pythonic" na definição de pipelines. Para consumir a API de CPF da CPFHub.io dentro de um flow Prefect, você define tasks assíncronas com retentativas automáticas e agrupa a lógica de consulta em um flow principal que controla concorrência e observabilidade. A API aceita uma requisição GET https://api.cpfhub.io/cpf/{CPF} com o header x-api-key e responde em ~900ms com os dados do titular.
1. Pré-requisitos
-
Python 3.10+ instalado.
-
Pacotes necessários:
pip install prefect httpx pandas. -
Uma conta gratuita na CPFHub.io
2. Configure os blocos do Prefect
O Prefect utiliza o conceito de Blocks para armazenar configurações e credenciais de forma segura. Consulte a documentação oficial do Prefect para detalhes sobre gerenciamento de secrets:
# setup_blocks.py
from prefect.blocks.system import Secret, JSON
# Armazenar a chave de API como secret
secret_block = Secret(value="SUA_CHAVE_DE_API")
secret_block.save("cpfhub-api-key", overwrite=True)
# Armazenar configurações gerais
config_block = JSON(value={
"base_url": "https://api.cpfhub.io",
"timeout": 5,
"max_retries": 3,
})
config_block.save("cpfhub-config", overwrite=True)
print("Blocos configurados com sucesso!")
Execute o script uma vez para registrar os blocos:
python setup_blocks.py
3. Crie as tasks de consulta
Defina tasks Prefect com retentativas e tratamento de erros:
# tasks/cpf_tasks.py
import re
import httpx
from prefect import task, get_run_logger
from prefect.blocks.system import Secret, JSON
@task(
name="consultar-cpf",
retries=3,
retry_delay_seconds=10,
tags=["cpfhub", "api"],
)
async def consultar_cpf(cpf: str) -> dict:
"""
Consulta dados de um CPF na API da CPFHub.io.
Task com retentativas automáticas em caso de falha.
"""
logger = get_run_logger()
cpf_limpo = re.sub(r"\D", "", cpf)
if len(cpf_limpo) != 11:
logger.warning(f"CPF inválido: {cpf}")
return {"cpf": cpf_limpo, "error": "CPF inválido", "success": False}
# Carregar configurações dos blocos
api_key = await Secret.load("cpfhub-api-key")
config = await JSON.load("cpfhub-config")
config_data = config.value
url = f"{config_data['base_url']}/cpf/{cpf_limpo}"
headers = {
"x-api-key": api_key.get(),
"Accept": "application/json",
}
logger.info(f"Consultando CPF: {cpf_limpo}")
async with httpx.AsyncClient(timeout=config_data["timeout"]) as client:
try:
response = await client.get(url, headers=headers)
except httpx.TimeoutException:
logger.error(f"Timeout na consulta do CPF {cpf_limpo}")
raise # Prefect fará retry automático
except httpx.RequestError as e:
logger.error(f"Erro de conexão: {str(e)}")
raise
if response.status_code == 200:
data = response.json()
if data.get("success"):
logger.info(f"CPF {cpf_limpo} consultado com sucesso")
return {**data["data"], "success": True, "error": None}
error_map = {
400: "Formato inválido",
401: "API key inválida",
404: "Não encontrado",
}
error_msg = error_map.get(response.status_code, f"HTTP {response.status_code}")
logger.warning(f"Erro na consulta: {error_msg}")
return {"cpf": cpf_limpo, "error": error_msg, "success": False}
@task(name="gerar-relatorio", tags=["relatorio"])
def gerar_relatorio(resultados: list[dict]) -> dict:
"""Gera relatório consolidado de validação."""
logger = get_run_logger()
total = len(resultados)
sucesso = sum(1 for r in resultados if r.get("success"))
falha = total - sucesso
relatorio = {
"total": total,
"sucesso": sucesso,
"falha": falha,
"taxa_sucesso": f"{(sucesso/total*100):.1f}%" if total > 0 else "0%",
}
logger.info(f"Relatório: {total} CPFs | {sucesso} válidos | {falha} erros")
return relatorio
4. Crie o flow principal
O flow orquestra as tasks e gerencia o fluxo de execução:
# flows/cpf_validation_flow.py
import asyncio
from datetime import timedelta
from prefect import flow, get_run_logger
from tasks.cpf_tasks import consultar_cpf, gerar_relatorio
@flow(
name="cpf-validation-pipeline",
description="Pipeline de validação de CPF via API CPFHub.io",
timeout_seconds=600,
retries=1,
retry_delay_seconds=60,
)
async def cpf_validation_pipeline(cpfs: list[str]) -> dict:
"""
Flow principal de validação de CPF.
Processa uma lista de CPFs com concorrência controlada.
"""
logger = get_run_logger()
logger.info(f"Iniciando validação de {len(cpfs)} CPFs")
# Consultar CPFs com concorrência controlada
resultados = []
batch_size = 5 # Processar 5 por vez
for i in range(0, len(cpfs), batch_size):
batch = cpfs[i:i + batch_size]
logger.info(f"Processando lote {i // batch_size + 1}")
# Executar batch de forma assíncrona
tasks = [consultar_cpf(cpf) for cpf in batch]
batch_results = await asyncio.gather(*tasks)
resultados.extend(batch_results)
# Delay entre lotes
if i + batch_size < len(cpfs):
await asyncio.sleep(1)
# Gerar relatório
relatorio = gerar_relatorio(resultados)
logger.info("Pipeline concluído com sucesso")
return relatorio
if __name__ == "__main__":
cpfs_teste = [
"12345678900",
"98765432100",
"11122233344",
]
resultado = asyncio.run(cpf_validation_pipeline(cpfs_teste))
print(resultado)
5. Agende o flow com deployments
Configure o deployment para execução agendada:
# deploy.py
from flows.cpf_validation_flow import cpf_validation_pipeline
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=cpf_validation_pipeline,
name="cpf-validation-semanal",
schedule=CronSchedule(cron="0 8 * * 1"), # Segunda às 8h
parameters={"cpfs": []}, # Parâmetros padrão
tags=["cpfhub", "validacao"],
description="Validação semanal de CPFs via API CPFHub.io",
)
if __name__ == "__main__":
deployment.apply()
print("Deployment criado com sucesso!")
python deploy.py
6. Adicione observabilidade com artifacts
Use Prefect Artifacts para registrar resultados persistentes:
# tasks/cpf_tasks.py (adição)
from prefect.artifacts import create_table_artifact, create_markdown_artifact
@task(name="publicar-resultados")
async def publicar_resultados(resultados: list[dict], relatorio: dict):
"""Publica resultados como artifacts do Prefect."""
# Tabela com resultados detalhados
table_data = [
{"CPF": r.get("cpf", "N/A"), "Nome": r.get("name", "N/A"), "Status": "OK" if r.get("success") else r.get("error", "Erro")}
for r in resultados
]
await create_table_artifact(
key="cpf-validation-results",
table=table_data,
description="Resultados da validação de CPF",
)
# Resumo em markdown
markdown = f"""
## Relatório de Validação de CPF
| Métrica | Valor |
|---------|-------|
| Total | {relatorio['total']} |
| Sucesso | {relatorio['sucesso']} |
| Falha | {relatorio['falha']} |
| Taxa | {relatorio['taxa_sucesso']} |
"""
await create_markdown_artifact(
key="cpf-validation-summary",
markdown=markdown,
description="Resumo da validação de CPF",
)
7. Boas práticas
-
Blocos -- Use Secret blocks para armazenar a chave de API. Nunca exponha credenciais diretamente no código dos flows.
-
Retentativas -- Configure
retriesnas tasks para lidar com falhas temporárias. O Prefect faz retry automático com backoff. -
Concorrência -- Controle o número de consultas simultâneas com
batch_size. Um valor de 5 é um bom ponto de partida para manter o fluxo estável. -
Timeout -- Configure timeout tanto na task (via Prefect) quanto na requisição HTTP (via httpx) para evitar execuções travadas.
-
Artifacts -- Publique resultados como artifacts para manter histórico de execuções acessível no Prefect UI.
-
LGPD -- A API da CPFHub.io é 100% compatível com a LGPD. Certifique-se de que os dados processados pelo workflow respeitem as políticas de privacidade.
Perguntas frequentes
O que é necessário para integrar a API de CPF em um flow Prefect?
Para integrar a API da CPFHub.io em um workflow Prefect, você precisa de uma chave de API armazenada em um Secret Block, do pacote httpx para requisições assíncronas e de uma task decorada com @task que faça a chamada GET https://api.cpfhub.io/cpf/{CPF}. O Prefect gerencia retentativas, observabilidade e agendamento automaticamente, sem código adicional.
Como controlar a concorrência de consultas em pipelines de dados?
O padrão mais simples é processar CPFs em lotes com asyncio.gather, controlando o batch_size para limitar requisições simultâneas. A API da CPFHub.io responde em ~900ms, então lotes de 5 permitem processar um volume grande sem sobrecarregar a rede. O plano gratuito oferece 50 consultas/mês; ao ultrapassar, a API não bloqueia — cobra R$0,15 por consulta extra.
Como o Prefect lida com falhas temporárias na consulta de CPF?
O parâmetro retries=3 na task faz com que o Prefect reexecute automaticamente em caso de exceção, com o intervalo definido em retry_delay_seconds. Timeouts e erros de conexão devem ser relançados com raise para que o mecanismo de retry seja acionado. Erros de negócio (CPF inválido, chave incorreta) devem retornar um dicionário de erro sem lançar exceção.
Como garantir conformidade com a LGPD ao processar CPFs em workflows automatizados?
Use o CPF apenas para a finalidade declarada ao titular, armazene apenas o necessário nos artifacts e implemente controle de acesso aos logs de execução do Prefect. A ANPD orienta que dados de identificação devem ser tratados com o princípio da necessidade — documente a base legal para o tratamento em pipelines de dados.
Conclusão
Consumir a API da CPFHub.io
Cadastre-se em cpfhub.io
CPFHub.io
Pronto para integrar a API?
50 consultas gratuitas para testar agora. Sem cartão de crédito. Acesso imediato à documentação.
Sobre a redação
Redação CPFHub.io
Time editorial especializado em APIs de CPF, identidade digital e compliance no mercado brasileiro. Produzimos guias técnicos, análises regulatórias e tutoriais sobre LGPD e KYC para desenvolvedores e líderes de produto.



