Como processar consultas de CPF em paralelo usando GenServer e Task

Processe múltiplas consultas de CPF em paralelo usando GenServer e Task do Elixir para máximo throughput e resiliência.

Redação CPFHub.io
Redação CPFHub.io
··6 min de leitura
Como processar consultas de CPF em paralelo usando GenServer e Task

O modelo de concorrência do Elixir, baseado na BEAM VM, permite processar lotes de consultas de CPF em paralelo de forma eficiente usando GenServer para gerenciar estado e Task para execução assíncrona. Com poucos módulos, é possível alcançar alto throughput mantendo controle total sobre concorrência, timeouts e tolerância a falhas. Este guia mostra implementações completas com GenServer, Task.async_stream e Task.Supervisor.

Introdução

O modelo de concorrência do Elixir, baseado na BEAM VM, permite criar milhares de processos leves que executam em paralelo de forma eficiente. Utilizando GenServer para gerenciar estado e Task para execução assíncrona, é possível processar grandes volumes de consultas de CPF com alto throughput.


Entendendo GenServer e Task

Antes de implementar, é importante compreender o papel de cada componente no processamento paralelo:

ComponentePapelCaracterísticas
GenServerGerenciar estado e filasProcesso único, acesso serializado
TaskExecutar operações assíncronasProcessos efêmeros, execução paralela
Task.SupervisorSupervisionar tasksTolerância a falhas, reinicialização
Task.async_streamStream de operações paralelasControle de concorrência, backpressure

A documentação oficial da hexdocs.pm/elixir/Task detalha as garantias de cada abordagem e quando preferir uma sobre a outra.


Criando o GenServer de gerenciamento

O GenServer coordena as consultas, mantendo o estado do processamento e os resultados:

defmodule CpfProcessor do
    use GenServer

    defstruct [:api_key, pendentes: [], resultados: [], em_processamento: 0, max_concurrent: 5]

    # API pública
    def start_link(opts) do
    api_key = Keyword.fetch!(opts, :api_key)
    max_concurrent = Keyword.get(opts, :max_concurrent, 5)
    GenServer.start_link(__MODULE__, %__MODULE__{api_key: api_key, max_concurrent: max_concurrent}, name: __MODULE__)
    end

    def processar_lote(cpfs) when is_list(cpfs) do
    GenServer.call(__MODULE__, {:processar_lote, cpfs}, :infinity)
    end

    def status do
    GenServer.call(__MODULE__, :status)
    end

    # Callbacks
    @impl true
    def init(state) do
    {:ok, state}
    end

    @impl true
    def handle_call({:processar_lote, cpfs}, from, state) do
    state = %{state | pendentes: cpfs, resultados: [], em_processamento: 0}
    state = disparar_tasks(state, from)
    {:noreply, state}
    end

    @impl true
    def handle_call(:status, _from, state) do
    {:reply, %{
    pendentes: length(state.pendentes),
    em_processamento: state.em_processamento,
    concluidos: length(state.resultados)
    }, state}
    end

    @impl true
    def handle_info({ref, resultado}, state) when is_reference(ref) do
    Process.demonitor(ref, [:flush])
    state = %{state | resultados: [resultado | state.resultados], em_processamento: state.em_processamento - 1}
    state = disparar_tasks(state, nil)
    {:noreply, state}
    end

    @impl true
    def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
    state = %{state | em_processamento: state.em_processamento - 1}
    {:noreply, state}
    end

    defp disparar_tasks(state, from) do
    slots_disponiveis = state.max_concurrent - state.em_processamento
    {a_processar, restantes} = Enum.split(state.pendentes, slots_disponiveis)

    Enum.each(a_processar, fn cpf ->
    Task.async(fn -> consultar_cpf(cpf, state.api_key) end)
    end)

    new_state = %{state |
    pendentes: restantes,
    em_processamento: state.em_processamento + length(a_processar)
    }

    if new_state.pendentes == [] and new_state.em_processamento == 0 and from do
    GenServer.reply(from, Enum.reverse(new_state.resultados))
    end

    new_state
    end

    defp consultar_cpf(cpf, api_key) do
    headers = [{"x-api-key", api_key}]
    url = "https://api.cpfhub.io/cpf/#{cpf}"

    case HTTPoison.get(url, headers, recv_timeout: 10_000) do
    {:ok, %{status_code: 200, body: body}} ->
    dados = Jason.decode!(body)
    {cpf, :ok, dados["data"]}

    {:ok, %{status_code: status}} ->
    {cpf, :erro, "Status #{status}"}

    {:error, reason} ->
    {cpf, :erro, inspect(reason)}
    end
    end
end

Processamento com Task.async_stream

Para cenários mais simples, Task.async_stream oferece processamento paralelo com controle de concorrência integrado:

defmodule CpfBatchProcessor do
    @base_url "https://api.cpfhub.io"

    def processar(cpfs, api_key, opts \\ []) do
    max_concurrency = Keyword.get(opts, :max_concurrency, 5)
    timeout = Keyword.get(opts, :timeout, 15_000)

    cpfs
    |> Task.async_stream(
    fn cpf -> consultar(cpf, api_key) end,
    max_concurrency: max_concurrency,
    timeout: timeout,
    on_timeout: :kill_task
    )
    |> Enum.map(fn
    {:ok, resultado} -> resultado
    {:exit, :timeout} -> {:erro, "Timeout na consulta"}
    end)
    end

    defp consultar(cpf, api_key) do
    headers = [{"x-api-key", api_key}]

    case HTTPoison.get("#{@base_url}/cpf/#{cpf}", headers) do
    {:ok, %{status_code: 200, body: body}} ->
    %{"data" => data} = Jason.decode!(body)
    {:ok, cpf, data}

    {:ok, %{status_code: status}} ->
    {:erro, cpf, "HTTP #{status}"}

    {:error, %{reason: reason}} ->
    {:erro, cpf, reason}
    end
    end
end

Supervisionando as Tasks

Para garantir tolerância a falhas, utilize um Task.Supervisor:

defmodule CpfSupervisedProcessor do
    def processar_com_supervisor(cpfs, api_key) do
    cpfs
    |> Enum.map(fn cpf ->
    Task.Supervisor.async_nolink(CpfTaskSupervisor, fn ->
    consultar_com_retry(cpf, api_key, 3)
    end)
    end)
    |> Task.yield_many(15_000)
    |> Enum.map(fn
    {_task, {:ok, resultado}} -> resultado
    {task, nil} ->
    Task.shutdown(task, :brutal_kill)
    {:erro, "Timeout"}
    {_task, {:exit, reason}} ->
    {:erro, "Processo falhou: #{inspect(reason)}"}
    end)
    end

    defp consultar_com_retry(cpf, api_key, tentativas_restantes) do
    headers = [{"x-api-key", api_key}]
    url = "https://api.cpfhub.io/cpf/#{cpf}"

    case HTTPoison.get(url, headers) do
    {:ok, %{status_code: 200, body: body}} ->
    {:ok, cpf, Jason.decode!(body)}

    {:ok, %{status_code: status}} when status in [500, 503] and tentativas_restantes > 0 ->
    Process.sleep(1000 * (4 - tentativas_restantes))
    consultar_com_retry(cpf, api_key, tentativas_restantes - 1)

    {:error, _} when tentativas_restantes > 0 ->
    Process.sleep(500)
    consultar_com_retry(cpf, api_key, tentativas_restantes - 1)

    other ->
    {:erro, cpf, other}
    end
    end
end

Exemplo de uso completo

Integre o processador na sua aplicação:

# No Application supervisor
children = [
    {Task.Supervisor, name: CpfTaskSupervisor},
    {CpfProcessor, api_key: System.get_env("CPFHUB_API_KEY"), max_concurrent: 5}
]

Supervisor.start_link(children, strategy: :one_for_one)

# Uso com Task.async_stream
api_key = System.get_env("CPFHUB_API_KEY")
cpfs = ["12345678900", "98765432100", "11122233344"]

resultados = CpfBatchProcessor.processar(cpfs, api_key, max_concurrency: 3)

Enum.each(resultados, fn
    {:ok, cpf, data} ->
    IO.puts("[OK] #{cpf} -> #{data["name"]}")

    {:erro, cpf, reason} ->
    IO.puts("[ERRO] #{cpf} -> #{reason}")

    {:erro, reason} ->
    IO.puts("[ERRO] #{reason}")
end)

Perguntas frequentes

Task.async vincula o processo da task ao processo chamador: se a task falhar, o chamador também falha. Task.Supervisor.async_nolink isola a task do chamador, permitindo que o sistema continue funcionando mesmo se uma task individual falhar. Para consultas de CPF em lote, prefira async_nolink com um supervisor dedicado — isso garante que a falha em um CPF não aborte o processamento dos demais.

Quantas consultas paralelas posso enviar para a API da CPFHub.io?

A API da CPFHub.io não impõe um limite rígido de requisições paralelas que resulte em bloqueio. O plano gratuito oferece 50 consultas mensais, e qualquer excedente é cobrado a R$0,15 por consulta — a API continua respondendo normalmente. Na prática, um max_concurrency de 5 a 10 é suficiente para a maioria dos casos e evita sobrecarregar a rede local.

Como lidar com timeouts individuais em Task.async_stream?

Configure o parâmetro timeout em milissegundos e use on_timeout: :kill_task. Assim, tasks que excedam o limite são encerradas sem afetar as demais. Para a CPFHub.io, com latência média de ~900ms, um timeout de 10.000ms por consulta é uma margem segura.

O GenServer é a melhor abordagem para processar grandes lotes de CPF em Elixir?

O GenServer é ideal quando você precisa de controle de estado centralizado, como monitorar progresso em tempo real ou pausar o processamento. Para lotes simples e pontuais sem necessidade de estado persistente, Task.async_stream é mais direto e igualmente eficiente. Escolha o GenServer quando o processamento precisar ser observável ou controlável externamente.


Conclusão

O processamento paralelo de consultas de CPF com GenServer e Task aproveita ao máximo o modelo de concorrência do Elixir. A combinação de processos supervisionados, controle de concorrência e retry automático resulta em um sistema resiliente e eficiente.

Cadastre-se em cpfhub.io — 50 consultas mensais gratuitas, sem cartão de crédito — e comece a processar consultas de CPF em escala com a infraestrutura certa para seus projetos Elixir.

CPFHub.io

Pronto para integrar a API?

50 consultas gratuitas para testar agora. Sem cartão de crédito. Acesso imediato à documentação.

Redação CPFHub.io

Sobre a redação

Redação CPFHub.io

Time editorial especializado em APIs de CPF, identidade digital e compliance no mercado brasileiro. Produzimos guias técnicos, análises regulatórias e tutoriais sobre LGPD e KYC para desenvolvedores e líderes de produto.

WhatsAppFale conosco via WhatsApp