Como consumir API de CPF em Prefect para orquestração de workflows

Aprenda a consumir a API de CPF da CPFHub.io em workflows Prefect com flows, tasks e concorrência assíncrona para pipelines de dados.

Redação CPFHub.io
Redação CPFHub.io
··7 min de leitura
Como consumir API de CPF em Prefect para orquestração de workflows

O Prefect é uma plataforma moderna de orquestração de workflows que se destaca pela simplicidade e pela abordagem "Pythonic" na definição de pipelines. Para consumir a API de CPF da CPFHub.io dentro de um flow Prefect, você define tasks assíncronas com retentativas automáticas e agrupa a lógica de consulta em um flow principal que controla concorrência e observabilidade. A API aceita uma requisição GET https://api.cpfhub.io/cpf/{CPF} com o header x-api-key e responde em ~900ms com os dados do titular.


1. Pré-requisitos

  • Python 3.10+ instalado.

  • Pacotes necessários: pip install prefect httpx pandas.

  • Uma conta gratuita na CPFHub.io


2. Configure os blocos do Prefect

O Prefect utiliza o conceito de Blocks para armazenar configurações e credenciais de forma segura. Consulte a documentação oficial do Prefect para detalhes sobre gerenciamento de secrets:

# setup_blocks.py
from prefect.blocks.system import Secret, JSON

# Armazenar a chave de API como secret
secret_block = Secret(value="SUA_CHAVE_DE_API")
secret_block.save("cpfhub-api-key", overwrite=True)

# Armazenar configurações gerais
config_block = JSON(value={
    "base_url": "https://api.cpfhub.io",
    "timeout": 5,
    "max_retries": 3,
})
config_block.save("cpfhub-config", overwrite=True)

print("Blocos configurados com sucesso!")

Execute o script uma vez para registrar os blocos:

python setup_blocks.py

3. Crie as tasks de consulta

Defina tasks Prefect com retentativas e tratamento de erros:

# tasks/cpf_tasks.py
import re
import httpx
from prefect import task, get_run_logger
from prefect.blocks.system import Secret, JSON

@task(
    name="consultar-cpf",
    retries=3,
    retry_delay_seconds=10,
    tags=["cpfhub", "api"],
)
async def consultar_cpf(cpf: str) -> dict:
    """
    Consulta dados de um CPF na API da CPFHub.io.
    Task com retentativas automáticas em caso de falha.
    """
    logger = get_run_logger()
    cpf_limpo = re.sub(r"\D", "", cpf)

    if len(cpf_limpo) != 11:
    logger.warning(f"CPF inválido: {cpf}")
    return {"cpf": cpf_limpo, "error": "CPF inválido", "success": False}

    # Carregar configurações dos blocos
    api_key = await Secret.load("cpfhub-api-key")
    config = await JSON.load("cpfhub-config")
    config_data = config.value

    url = f"{config_data['base_url']}/cpf/{cpf_limpo}"
    headers = {
    "x-api-key": api_key.get(),
    "Accept": "application/json",
    }

    logger.info(f"Consultando CPF: {cpf_limpo}")

    async with httpx.AsyncClient(timeout=config_data["timeout"]) as client:
    try:
    response = await client.get(url, headers=headers)
    except httpx.TimeoutException:
    logger.error(f"Timeout na consulta do CPF {cpf_limpo}")
    raise # Prefect fará retry automático
    except httpx.RequestError as e:
    logger.error(f"Erro de conexão: {str(e)}")
    raise

    if response.status_code == 200:
    data = response.json()
    if data.get("success"):
    logger.info(f"CPF {cpf_limpo} consultado com sucesso")
    return {**data["data"], "success": True, "error": None}

    error_map = {
    400: "Formato inválido",
    401: "API key inválida",
    404: "Não encontrado",
    }
    error_msg = error_map.get(response.status_code, f"HTTP {response.status_code}")
    logger.warning(f"Erro na consulta: {error_msg}")

    return {"cpf": cpf_limpo, "error": error_msg, "success": False}

@task(name="gerar-relatorio", tags=["relatorio"])
def gerar_relatorio(resultados: list[dict]) -> dict:
    """Gera relatório consolidado de validação."""
    logger = get_run_logger()

    total = len(resultados)
    sucesso = sum(1 for r in resultados if r.get("success"))
    falha = total - sucesso

    relatorio = {
    "total": total,
    "sucesso": sucesso,
    "falha": falha,
    "taxa_sucesso": f"{(sucesso/total*100):.1f}%" if total > 0 else "0%",
    }

    logger.info(f"Relatório: {total} CPFs | {sucesso} válidos | {falha} erros")
    return relatorio

4. Crie o flow principal

O flow orquestra as tasks e gerencia o fluxo de execução:

# flows/cpf_validation_flow.py
import asyncio
from datetime import timedelta
from prefect import flow, get_run_logger
from tasks.cpf_tasks import consultar_cpf, gerar_relatorio

@flow(
    name="cpf-validation-pipeline",
    description="Pipeline de validação de CPF via API CPFHub.io",
    timeout_seconds=600,
    retries=1,
    retry_delay_seconds=60,
)
async def cpf_validation_pipeline(cpfs: list[str]) -> dict:
    """
    Flow principal de validação de CPF.
    Processa uma lista de CPFs com concorrência controlada.
    """
    logger = get_run_logger()
    logger.info(f"Iniciando validação de {len(cpfs)} CPFs")

    # Consultar CPFs com concorrência controlada
    resultados = []
    batch_size = 5 # Processar 5 por vez

    for i in range(0, len(cpfs), batch_size):
    batch = cpfs[i:i + batch_size]
    logger.info(f"Processando lote {i // batch_size + 1}")

    # Executar batch de forma assíncrona
    tasks = [consultar_cpf(cpf) for cpf in batch]
    batch_results = await asyncio.gather(*tasks)
    resultados.extend(batch_results)

    # Delay entre lotes
    if i + batch_size < len(cpfs):
    await asyncio.sleep(1)

    # Gerar relatório
    relatorio = gerar_relatorio(resultados)

    logger.info("Pipeline concluído com sucesso")
    return relatorio

if __name__ == "__main__":
    cpfs_teste = [
    "12345678900",
    "98765432100",
    "11122233344",
    ]
    resultado = asyncio.run(cpf_validation_pipeline(cpfs_teste))
    print(resultado)

5. Agende o flow com deployments

Configure o deployment para execução agendada:

# deploy.py
from flows.cpf_validation_flow import cpf_validation_pipeline
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

deployment = Deployment.build_from_flow(
    flow=cpf_validation_pipeline,
    name="cpf-validation-semanal",
    schedule=CronSchedule(cron="0 8 * * 1"), # Segunda às 8h
    parameters={"cpfs": []}, # Parâmetros padrão
    tags=["cpfhub", "validacao"],
    description="Validação semanal de CPFs via API CPFHub.io",
)

if __name__ == "__main__":
    deployment.apply()
    print("Deployment criado com sucesso!")
python deploy.py

6. Adicione observabilidade com artifacts

Use Prefect Artifacts para registrar resultados persistentes:

# tasks/cpf_tasks.py (adição)
from prefect.artifacts import create_table_artifact, create_markdown_artifact

@task(name="publicar-resultados")
async def publicar_resultados(resultados: list[dict], relatorio: dict):
    """Publica resultados como artifacts do Prefect."""
    # Tabela com resultados detalhados
    table_data = [
    {"CPF": r.get("cpf", "N/A"), "Nome": r.get("name", "N/A"), "Status": "OK" if r.get("success") else r.get("error", "Erro")}
    for r in resultados
    ]
    await create_table_artifact(
    key="cpf-validation-results",
    table=table_data,
    description="Resultados da validação de CPF",
    )

    # Resumo em markdown
    markdown = f"""
## Relatório de Validação de CPF

| Métrica | Valor |
|---------|-------|
| Total | {relatorio['total']} |
| Sucesso | {relatorio['sucesso']} |
| Falha | {relatorio['falha']} |
| Taxa | {relatorio['taxa_sucesso']} |
"""
    await create_markdown_artifact(
    key="cpf-validation-summary",
    markdown=markdown,
    description="Resumo da validação de CPF",
    )

7. Boas práticas

  • Blocos -- Use Secret blocks para armazenar a chave de API. Nunca exponha credenciais diretamente no código dos flows.

  • Retentativas -- Configure retries nas tasks para lidar com falhas temporárias. O Prefect faz retry automático com backoff.

  • Concorrência -- Controle o número de consultas simultâneas com batch_size. Um valor de 5 é um bom ponto de partida para manter o fluxo estável.

  • Timeout -- Configure timeout tanto na task (via Prefect) quanto na requisição HTTP (via httpx) para evitar execuções travadas.

  • Artifacts -- Publique resultados como artifacts para manter histórico de execuções acessível no Prefect UI.

  • LGPD -- A API da CPFHub.io é 100% compatível com a LGPD. Certifique-se de que os dados processados pelo workflow respeitem as políticas de privacidade.


Perguntas frequentes

O que é necessário para integrar a API de CPF em um flow Prefect?

Para integrar a API da CPFHub.io em um workflow Prefect, você precisa de uma chave de API armazenada em um Secret Block, do pacote httpx para requisições assíncronas e de uma task decorada com @task que faça a chamada GET https://api.cpfhub.io/cpf/{CPF}. O Prefect gerencia retentativas, observabilidade e agendamento automaticamente, sem código adicional.

Como controlar a concorrência de consultas em pipelines de dados?

O padrão mais simples é processar CPFs em lotes com asyncio.gather, controlando o batch_size para limitar requisições simultâneas. A API da CPFHub.io responde em ~900ms, então lotes de 5 permitem processar um volume grande sem sobrecarregar a rede. O plano gratuito oferece 50 consultas/mês; ao ultrapassar, a API não bloqueia — cobra R$0,15 por consulta extra.

Como o Prefect lida com falhas temporárias na consulta de CPF?

O parâmetro retries=3 na task faz com que o Prefect reexecute automaticamente em caso de exceção, com o intervalo definido em retry_delay_seconds. Timeouts e erros de conexão devem ser relançados com raise para que o mecanismo de retry seja acionado. Erros de negócio (CPF inválido, chave incorreta) devem retornar um dicionário de erro sem lançar exceção.

Como garantir conformidade com a LGPD ao processar CPFs em workflows automatizados?

Use o CPF apenas para a finalidade declarada ao titular, armazene apenas o necessário nos artifacts e implemente controle de acesso aos logs de execução do Prefect. A ANPD orienta que dados de identificação devem ser tratados com o princípio da necessidade — documente a base legal para o tratamento em pipelines de dados.



Conclusão

Consumir a API da CPFHub.io

Cadastre-se em cpfhub.io

CPFHub.io

Pronto para integrar a API?

50 consultas gratuitas para testar agora. Sem cartão de crédito. Acesso imediato à documentação.

Redação CPFHub.io

Sobre a redação

Redação CPFHub.io

Time editorial especializado em APIs de CPF, identidade digital e compliance no mercado brasileiro. Produzimos guias técnicos, análises regulatórias e tutoriais sobre LGPD e KYC para desenvolvedores e líderes de produto.

WhatsAppFale conosco via WhatsApp