• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 1ade961f479f27895cfee27bf5fa17f58a8aa15f-PR-1403

05 Jun 2025 02:12PM UTC coverage: 83.763% (+0.7%) from 83.027%
1ade961f479f27895cfee27bf5fa17f58a8aa15f-PR-1403

Pull #1403

github

filipecabaco
fix tests
Pull Request #1403: fix: handle tuple errors

5 of 9 new or added lines in 2 files covered. (55.56%)

7 existing lines in 3 files now uncovered.

1821 of 2174 relevant lines covered (83.76%)

4367.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/lib/extensions/postgres_cdc_rls/cdc_rls.ex
1
defmodule Extensions.PostgresCdcRls do
2
  @moduledoc """
3
  Callbacks for initiating a Postgres connection and creating a Realtime subscription for database changes.
4
  """
5

6
  @behaviour Realtime.PostgresCdc
7
  use Realtime.Logs
8

9
  alias RealtimeWeb.Endpoint
10
  alias Extensions.PostgresCdcRls, as: Rls
11
  alias Rls.Subscriptions
12
  alias Realtime.Rpc
13

14
  @spec handle_connect(map()) :: {:ok, {pid(), pid()}} | nil
15
  def handle_connect(args) do
16
    case get_manager_conn(args["id"]) do
20✔
17
      {:error, nil} ->
18
        start_distributed(args)
12✔
19
        nil
20

21
      {:error, :wait} ->
×
22
        nil
23

24
      {:ok, pid, conn} ->
8✔
25
        {:ok, {pid, conn}}
26
    end
27
  end
28

29
  def handle_after_connect({manager_pid, conn}, settings, params) do
30
    publication = settings["publication"]
8✔
31
    opts = [conn, publication, params, manager_pid, self()]
8✔
32
    conn_node = node(conn)
8✔
33

34
    if conn_node !== node() do
8✔
35
      Rpc.call(conn_node, Subscriptions, :create, opts, timeout: 15_000)
×
36
    else
37
      apply(Subscriptions, :create, opts)
8✔
38
    end
39
  end
40

41
  def handle_subscribe(_, tenant, metadata) do
42
    Endpoint.subscribe("realtime:postgres:" <> tenant, metadata)
14✔
43
  end
44

45
  @doc """
46
  Stops the Supervision tree for a tenant.
47

48
  Expects an `external_id` as the `tenant`.
49
  """
50

51
  @spec handle_stop(String.t(), non_neg_integer()) :: :ok
52
  def handle_stop(tenant, timeout) when is_binary(tenant) do
53
    case :syn.whereis_name({__MODULE__, tenant}) do
10✔
54
      :undefined ->
55
        Logger.warning("Database supervisor not found for tenant #{tenant}")
8✔
56
        :ok
57

58
      pid ->
59
        DynamicSupervisor.stop(pid, :shutdown, timeout)
2✔
60
    end
61
  end
62

63
  ## Internal functions
64

65
  def start_distributed(%{"region" => region, "id" => tenant} = args) do
66
    platform_region = Realtime.Nodes.platform_region_translator(region)
12✔
67
    launch_node = Realtime.Nodes.launch_node(tenant, platform_region, node())
12✔
68

69
    Logger.warning(
12✔
70
      "Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
71
    )
72

73
    case Rpc.call(launch_node, __MODULE__, :start, [args], timeout: 30_000, tenant: tenant) do
12✔
74
      {:ok, _pid} = ok ->
75
        ok
12✔
76

77
      {:error, {:already_started, _pid}} = error ->
78
        Logger.info("Postgres Extension already started on node #{inspect(launch_node)}")
×
79
        error
×
80

81
      error ->
82
        log_error("ErrorStartingPostgresCDC", error)
×
83
        error
×
84
    end
85
  end
86

87
  @doc """
88
  Start db poller. Expects an `external_id` as a `tenant`.
89
  """
90

91
  @spec start(map()) :: :ok | {:error, :already_started | :reserved}
92
  def start(%{"id" => tenant} = args) when is_binary(tenant) do
93
    args = Map.merge(args, %{"subs_pool_size" => Map.get(args, "subcriber_pool_size", 4)})
12✔
94

95
    Logger.debug("Starting #{__MODULE__} extension with args: #{inspect(args, pretty: true)}")
12✔
96

97
    DynamicSupervisor.start_child(
12✔
98
      {:via, PartitionSupervisor, {Rls.DynamicSupervisor, tenant}},
99
      %{
100
        id: tenant,
101
        start: {Rls.WorkerSupervisor, :start_link, [args]},
102
        restart: :transient
103
      }
104
    )
105
  end
106

107
  @spec get_manager_conn(String.t()) :: {:error, nil | :wait} | {:ok, pid(), pid()}
108
  def get_manager_conn(id) do
109
    case :syn.lookup(__MODULE__, id) do
25✔
UNCOV
110
      {_, %{manager: nil, subs_pool: nil}} -> {:error, :wait}
1✔
111
      {_, %{manager: manager, subs_pool: conn}} -> {:ok, manager, conn}
12✔
112
      _ -> {:error, nil}
12✔
113
    end
114
  end
115

116
  @spec supervisor_id(String.t(), String.t()) :: {atom(), String.t(), map()}
117
  def supervisor_id(tenant, region) do
118
    {__MODULE__, tenant, %{region: region, manager: nil, subs_pool: nil}}
16✔
119
  end
120

121
  @spec update_meta(String.t(), pid(), pid()) :: {:ok, {pid(), term()}} | {:error, term()}
122
  def update_meta(tenant, manager_pid, subs_pool) do
123
    :syn.update_registry(__MODULE__, tenant, fn pid, meta ->
11✔
124
      if node(pid) == node(manager_pid) do
11✔
125
        %{meta | manager: manager_pid, subs_pool: subs_pool}
11✔
126
      else
127
        Logger.warning("Node mismatch for tenant #{tenant} #{inspect(node(pid))} #{inspect(node(manager_pid))}")
×
128

129
        meta
×
130
      end
131
    end)
132
  end
133
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc