• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 6a1d77a5420203dcc840c17c888d82dff37f69e6

10 Dec 2025 03:40AM UTC coverage: 80.087% (-7.8%) from 87.901%
6a1d77a5420203dcc840c17c888d82dff37f69e6

push

github

web-flow
fix: improve metrics handling (#1654)

* Allow 3x more max heap size than other processes
* Avoid compressing given that gen_rpc does this for us already
* fix: remove limit_concurrent metric
* fix: revert the way we fetched tenants to report on connected metric

2 of 18 new or added lines in 3 files covered. (11.11%)

203 existing lines in 24 files now uncovered.

2200 of 2747 relevant lines covered (80.09%)

2956.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.37
/lib/realtime/postgres_cdc.ex
1
defmodule Realtime.PostgresCdc do
2
  @moduledoc false
3

4
  require Logger
5

6
  alias Realtime.Api.Tenant
7

8
  @timeout 10_000
9
  @extensions Application.compile_env(:realtime, :extensions)
10

11
  defmodule Exception do
12
    defexception message: "PostgresCdc error!"
13
  end
14

15
  def connect(module, opts) do
16
    apply(module, :handle_connect, [opts])
70✔
17
  end
18

19
  def after_connect(module, connect_response, extension, params, tenant) do
20
    apply(module, :handle_after_connect, [connect_response, extension, params, tenant])
26✔
21
  end
22

23
  def subscribe(module, pg_change_params, tenant, metadata) do
24
    RealtimeWeb.Endpoint.subscribe("postgres_cdc_rls:" <> tenant)
49✔
25
    apply(module, :handle_subscribe, [pg_change_params, tenant, metadata])
49✔
26
  end
27

28
  @spec stop(module, Tenant.t(), pos_integer) :: :ok
29
  def stop(module, tenant, timeout \\ @timeout) do
UNCOV
30
    apply(module, :handle_stop, [tenant.external_id, timeout])
×
31
  end
32

33
  @doc """
34
  Stops all available drivers within a specified timeout.
35

36
  Expects all handle_stop calls to return `:ok` within the `stop_timeout`.
37

38
  We want all available drivers to stop within the `timeout`.
39
  """
40

41
  @spec stop_all(Tenant.t(), pos_integer) :: :ok | :error
42
  def stop_all(tenant, timeout \\ @timeout) do
UNCOV
43
    count = Enum.count(available_drivers())
×
UNCOV
44
    stop_timeout = Kernel.ceil(timeout / count)
×
45

UNCOV
46
    stops = Enum.map(available_drivers(), fn module -> stop(module, tenant, stop_timeout) end)
×
47

UNCOV
48
    case Enum.all?(stops, &(&1 == :ok)) do
×
UNCOV
49
      true -> :ok
×
50
      false -> :error
×
51
    end
52
  end
53

54
  @spec available_drivers :: list
55
  def available_drivers do
56
    @extensions
UNCOV
57
    |> Enum.filter(fn {_, e} -> e.type == :postgres_cdc end)
×
UNCOV
58
    |> Enum.map(fn {_, e} -> e.driver end)
×
59
  end
60

61
  @spec filter_settings(binary(), list()) :: map()
62
  def filter_settings(key, extensions) do
63
    [cdc] = Enum.filter(extensions, fn e -> e.type == key end)
1,921✔
64

65
    cdc.settings
1,921✔
66
  end
67

68
  @doc """
69
  Gets the extension module for a tenant.
70
  """
71

72
  @spec driver(String.t()) :: {:ok, module()} | {:error, String.t()}
73
  def driver(tenant_key) do
74
    @extensions
75
    |> Enum.filter(fn {_, %{key: key}} -> tenant_key == key end)
209✔
76
    |> case do
209✔
77
      [{_, %{driver: driver}}] -> {:ok, driver}
209✔
78
      _ -> {:error, "No driver found for key #{tenant_key}"}
×
79
    end
80
  end
81

82
  @callback handle_connect(any()) :: {:ok, any()} | nil
83
  @callback handle_after_connect(any(), any(), any(), tenant_id :: String.t()) ::
84
              {:ok, any()} | {:error, any()} | {:error, any(), any()}
85
  @callback handle_subscribe(any(), any(), any()) :: :ok
86
  @callback handle_stop(any(), any()) :: any()
87
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc