O modelo de concorrência do Elixir, baseado na BEAM VM, permite processar lotes de consultas de CPF em paralelo de forma eficiente usando GenServer para gerenciar estado e Task para execução assíncrona. Com poucos módulos, é possível alcançar alto throughput mantendo controle total sobre concorrência, timeouts e tolerância a falhas. Este guia mostra implementações completas com GenServer, Task.async_stream e Task.Supervisor.
Introdução
O modelo de concorrência do Elixir, baseado na BEAM VM, permite criar milhares de processos leves que executam em paralelo de forma eficiente. Utilizando GenServer para gerenciar estado e Task para execução assíncrona, é possível processar grandes volumes de consultas de CPF com alto throughput.
Entendendo GenServer e Task
Antes de implementar, é importante compreender o papel de cada componente no processamento paralelo:
| Componente | Papel | Características |
|---|---|---|
GenServer | Gerenciar estado e filas | Processo único, acesso serializado |
Task | Executar operações assíncronas | Processos efêmeros, execução paralela |
Task.Supervisor | Supervisionar tasks | Tolerância a falhas, reinicialização |
Task.async_stream | Stream de operações paralelas | Controle de concorrência, backpressure |
A documentação oficial da hexdocs.pm/elixir/Task detalha as garantias de cada abordagem e quando preferir uma sobre a outra.
Criando o GenServer de gerenciamento
O GenServer coordena as consultas, mantendo o estado do processamento e os resultados:
defmodule CpfProcessor do
use GenServer
defstruct [:api_key, pendentes: [], resultados: [], em_processamento: 0, max_concurrent: 5]
# API pública
def start_link(opts) do
api_key = Keyword.fetch!(opts, :api_key)
max_concurrent = Keyword.get(opts, :max_concurrent, 5)
GenServer.start_link(__MODULE__, %__MODULE__{api_key: api_key, max_concurrent: max_concurrent}, name: __MODULE__)
end
def processar_lote(cpfs) when is_list(cpfs) do
GenServer.call(__MODULE__, {:processar_lote, cpfs}, :infinity)
end
def status do
GenServer.call(__MODULE__, :status)
end
# Callbacks
@impl true
def init(state) do
{:ok, state}
end
@impl true
def handle_call({:processar_lote, cpfs}, from, state) do
state = %{state | pendentes: cpfs, resultados: [], em_processamento: 0}
state = disparar_tasks(state, from)
{:noreply, state}
end
@impl true
def handle_call(:status, _from, state) do
{:reply, %{
pendentes: length(state.pendentes),
em_processamento: state.em_processamento,
concluidos: length(state.resultados)
}, state}
end
@impl true
def handle_info({ref, resultado}, state) when is_reference(ref) do
Process.demonitor(ref, [:flush])
state = %{state | resultados: [resultado | state.resultados], em_processamento: state.em_processamento - 1}
state = disparar_tasks(state, nil)
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
state = %{state | em_processamento: state.em_processamento - 1}
{:noreply, state}
end
defp disparar_tasks(state, from) do
slots_disponiveis = state.max_concurrent - state.em_processamento
{a_processar, restantes} = Enum.split(state.pendentes, slots_disponiveis)
Enum.each(a_processar, fn cpf ->
Task.async(fn -> consultar_cpf(cpf, state.api_key) end)
end)
new_state = %{state |
pendentes: restantes,
em_processamento: state.em_processamento + length(a_processar)
}
if new_state.pendentes == [] and new_state.em_processamento == 0 and from do
GenServer.reply(from, Enum.reverse(new_state.resultados))
end
new_state
end
defp consultar_cpf(cpf, api_key) do
headers = [{"x-api-key", api_key}]
url = "https://api.cpfhub.io/cpf/#{cpf}"
case HTTPoison.get(url, headers, recv_timeout: 10_000) do
{:ok, %{status_code: 200, body: body}} ->
dados = Jason.decode!(body)
{cpf, :ok, dados["data"]}
{:ok, %{status_code: status}} ->
{cpf, :erro, "Status #{status}"}
{:error, reason} ->
{cpf, :erro, inspect(reason)}
end
end
end
Processamento com Task.async_stream
Para cenários mais simples, Task.async_stream oferece processamento paralelo com controle de concorrência integrado:
defmodule CpfBatchProcessor do
@base_url "https://api.cpfhub.io"
def processar(cpfs, api_key, opts \\ []) do
max_concurrency = Keyword.get(opts, :max_concurrency, 5)
timeout = Keyword.get(opts, :timeout, 15_000)
cpfs
|> Task.async_stream(
fn cpf -> consultar(cpf, api_key) end,
max_concurrency: max_concurrency,
timeout: timeout,
on_timeout: :kill_task
)
|> Enum.map(fn
{:ok, resultado} -> resultado
{:exit, :timeout} -> {:erro, "Timeout na consulta"}
end)
end
defp consultar(cpf, api_key) do
headers = [{"x-api-key", api_key}]
case HTTPoison.get("#{@base_url}/cpf/#{cpf}", headers) do
{:ok, %{status_code: 200, body: body}} ->
%{"data" => data} = Jason.decode!(body)
{:ok, cpf, data}
{:ok, %{status_code: status}} ->
{:erro, cpf, "HTTP #{status}"}
{:error, %{reason: reason}} ->
{:erro, cpf, reason}
end
end
end
Supervisionando as Tasks
Para garantir tolerância a falhas, utilize um Task.Supervisor:
defmodule CpfSupervisedProcessor do
def processar_com_supervisor(cpfs, api_key) do
cpfs
|> Enum.map(fn cpf ->
Task.Supervisor.async_nolink(CpfTaskSupervisor, fn ->
consultar_com_retry(cpf, api_key, 3)
end)
end)
|> Task.yield_many(15_000)
|> Enum.map(fn
{_task, {:ok, resultado}} -> resultado
{task, nil} ->
Task.shutdown(task, :brutal_kill)
{:erro, "Timeout"}
{_task, {:exit, reason}} ->
{:erro, "Processo falhou: #{inspect(reason)}"}
end)
end
defp consultar_com_retry(cpf, api_key, tentativas_restantes) do
headers = [{"x-api-key", api_key}]
url = "https://api.cpfhub.io/cpf/#{cpf}"
case HTTPoison.get(url, headers) do
{:ok, %{status_code: 200, body: body}} ->
{:ok, cpf, Jason.decode!(body)}
{:ok, %{status_code: status}} when status in [500, 503] and tentativas_restantes > 0 ->
Process.sleep(1000 * (4 - tentativas_restantes))
consultar_com_retry(cpf, api_key, tentativas_restantes - 1)
{:error, _} when tentativas_restantes > 0 ->
Process.sleep(500)
consultar_com_retry(cpf, api_key, tentativas_restantes - 1)
other ->
{:erro, cpf, other}
end
end
end
Exemplo de uso completo
Integre o processador na sua aplicação:
# No Application supervisor
children = [
{Task.Supervisor, name: CpfTaskSupervisor},
{CpfProcessor, api_key: System.get_env("CPFHUB_API_KEY"), max_concurrent: 5}
]
Supervisor.start_link(children, strategy: :one_for_one)
# Uso com Task.async_stream
api_key = System.get_env("CPFHUB_API_KEY")
cpfs = ["12345678900", "98765432100", "11122233344"]
resultados = CpfBatchProcessor.processar(cpfs, api_key, max_concurrency: 3)
Enum.each(resultados, fn
{:ok, cpf, data} ->
IO.puts("[OK] #{cpf} -> #{data["name"]}")
{:erro, cpf, reason} ->
IO.puts("[ERRO] #{cpf} -> #{reason}")
{:erro, reason} ->
IO.puts("[ERRO] #{reason}")
end)
Perguntas frequentes
Qual é a diferença entre Task.async e Task.Supervisor.async_nolink para consultas de CPF?
Task.async vincula o processo da task ao processo chamador: se a task falhar, o chamador também falha. Task.Supervisor.async_nolink isola a task do chamador, permitindo que o sistema continue funcionando mesmo se uma task individual falhar. Para consultas de CPF em lote, prefira async_nolink com um supervisor dedicado — isso garante que a falha em um CPF não aborte o processamento dos demais.
Quantas consultas paralelas posso enviar para a API da CPFHub.io?
A API da CPFHub.io não impõe um limite rígido de requisições paralelas que resulte em bloqueio. O plano gratuito oferece 50 consultas mensais, e qualquer excedente é cobrado a R$0,15 por consulta — a API continua respondendo normalmente. Na prática, um max_concurrency de 5 a 10 é suficiente para a maioria dos casos e evita sobrecarregar a rede local.
Como lidar com timeouts individuais em Task.async_stream?
Configure o parâmetro timeout em milissegundos e use on_timeout: :kill_task. Assim, tasks que excedam o limite são encerradas sem afetar as demais. Para a CPFHub.io, com latência média de ~900ms, um timeout de 10.000ms por consulta é uma margem segura.
O GenServer é a melhor abordagem para processar grandes lotes de CPF em Elixir?
O GenServer é ideal quando você precisa de controle de estado centralizado, como monitorar progresso em tempo real ou pausar o processamento. Para lotes simples e pontuais sem necessidade de estado persistente, Task.async_stream é mais direto e igualmente eficiente. Escolha o GenServer quando o processamento precisar ser observável ou controlável externamente.
Conclusão
O processamento paralelo de consultas de CPF com GenServer e Task aproveita ao máximo o modelo de concorrência do Elixir. A combinação de processos supervisionados, controle de concorrência e retry automático resulta em um sistema resiliente e eficiente.
Cadastre-se em cpfhub.io — 50 consultas mensais gratuitas, sem cartão de crédito — e comece a processar consultas de CPF em escala com a infraestrutura certa para seus projetos Elixir.
CPFHub.io
Pronto para integrar a API?
50 consultas gratuitas para testar agora. Sem cartão de crédito. Acesso imediato à documentação.
Sobre a redação
Redação CPFHub.io
Time editorial especializado em APIs de CPF, identidade digital e compliance no mercado brasileiro. Produzimos guias técnicos, análises regulatórias e tutoriais sobre LGPD e KYC para desenvolvedores e líderes de produto.



