• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / a098b54181d4fa0ae32a74f35981d53f80169087-PR-1353

30 Apr 2025 02:05PM UTC coverage: 73.438% (-8.1%) from 81.555%
a098b54181d4fa0ae32a74f35981d53f80169087-PR-1353

Pull #1353

github

filipecabaco
reduce flakiness of replication connection test
Pull Request #1353: fix: Selectively run migrations

24 of 33 new or added lines in 5 files covered. (72.73%)

163 existing lines in 18 files now uncovered.

1551 of 2112 relevant lines covered (73.44%)

206.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.63
/lib/realtime/postgres_cdc.ex
1
defmodule Realtime.PostgresCdc do
2
  @moduledoc false
3

4
  require Logger
5

6
  alias Realtime.Api.Tenant
7

8
  @timeout 10_000
9
  @extensions Application.compile_env(:realtime, :extensions)
10

11
  defmodule Exception do
12
    defexception message: "PostgresCdc error!"
13
  end
14

15
  def connect(module, opts) do
16
    apply(module, :handle_connect, [opts])
8✔
17
  end
18

19
  def after_connect(module, connect_response, extension, params) do
20
    apply(module, :handle_after_connect, [connect_response, extension, params])
5✔
21
  end
22

23
  def subscribe(module, pg_change_params, tenant, metadata) do
24
    RealtimeWeb.Endpoint.subscribe("postgres_cdc_rls:" <> tenant)
4✔
25
    apply(module, :handle_subscribe, [pg_change_params, tenant, metadata])
4✔
26
  end
27

28
  @spec stop(module, Tenant.t(), pos_integer) :: :ok
29
  def stop(module, tenant, timeout \\ @timeout) do
30
    apply(module, :handle_stop, [tenant.external_id, timeout])
1✔
31
  end
32

33
  @doc """
34
  Stops all available drivers within a specified timeout.
35

36
  Expects all handle_stop calls to return `:ok` within the `stop_timeout`.
37

38
  We want all available drivers to stop within the `timeout`.
39
  """
40

41
  @spec stop_all(Tenant.t(), pos_integer) :: :ok | :error
42
  def stop_all(tenant, timeout \\ @timeout) do
UNCOV
43
    count = Enum.count(available_drivers())
×
UNCOV
44
    stop_timeout = Kernel.ceil(timeout / count)
×
45

UNCOV
46
    stops = Enum.map(available_drivers(), fn module -> stop(module, tenant, stop_timeout) end)
×
47

UNCOV
48
    case Enum.all?(stops, &(&1 == :ok)) do
×
UNCOV
49
      true -> :ok
×
50
      false -> :error
×
51
    end
52
  end
53

54
  @spec available_drivers :: list
55
  def available_drivers do
56
    @extensions
UNCOV
57
    |> Enum.filter(fn {_, e} -> e.type == :postgres_cdc end)
×
UNCOV
58
    |> Enum.map(fn {_, e} -> e.driver end)
×
59
  end
60

61
  @spec filter_settings(binary(), list()) :: map()
62
  def filter_settings(key, extensions) do
63
    [cdc] = Enum.filter(extensions, fn e -> e.type == key end)
553✔
64

65
    cdc.settings
553✔
66
  end
67

68
  @doc """
69
  Gets the extension module for a tenant.
70
  """
71

72
  @spec driver(String.t()) :: {:ok, module()} | {:error, String.t()}
73
  def driver(tenant_key) do
74
    @extensions
75
    |> Enum.filter(fn {_, %{key: key}} -> tenant_key == key end)
42✔
76
    |> case do
42✔
77
      [{_, %{driver: driver}}] -> {:ok, driver}
42✔
78
      _ -> {:error, "No driver found for key #{tenant_key}"}
×
79
    end
80
  end
81

82
  @callback handle_connect(any()) :: {:ok, any()} | nil
83
  @callback handle_after_connect(any(), any(), any()) :: {:ok, any()} | {:error, any()}
84
  @callback handle_subscribe(any(), any(), any()) :: :ok
85
  @callback handle_stop(any(), any()) :: any()
86
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc