• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / c5c8f617a3ad63a2c3506a1b61ecd513ebad1ed3

07 May 2025 08:44PM UTC coverage: 82.461% (+0.4%) from 82.079%
c5c8f617a3ad63a2c3506a1b61ecd513ebad1ed3

push

github

web-flow
fix: remove region from syn conflict handling & non found process on register_process (#1363)

7 of 10 new or added lines in 4 files covered. (70.0%)

8 existing lines in 3 files now uncovered.

1749 of 2121 relevant lines covered (82.46%)

1349.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.61
/lib/realtime/tenants/listen.ex
1
defmodule Realtime.Tenants.Listen do
2
  @moduledoc """
3
  Listen for Postgres notifications to identify issues with the functions that are being called in tenants database
4
  """
5
  use GenServer, restart: :transient
6
  require Logger
7
  alias Realtime.Api.Tenant
8
  alias Realtime.Database
9
  alias Realtime.Logs
10
  alias Realtime.Registry.Unique
11
  alias Realtime.Tenants.Cache
12

13
  @type t :: %__MODULE__{
14
          tenant_id: binary,
15
          listen_conn: pid(),
16
          monitored_pid: pid()
17
        }
18
  defstruct tenant_id: nil, listen_conn: nil, monitored_pid: nil
19

20
  @topic "realtime:system"
21
  def start_link(%__MODULE__{tenant_id: tenant_id} = state) do
22
    name = {:via, Registry, {Unique, {__MODULE__, :tenant_id, tenant_id}}}
139✔
23
    GenServer.start_link(__MODULE__, state, name: name)
139✔
24
  end
25

26
  def init(%__MODULE__{tenant_id: tenant_id, monitored_pid: monitored_pid}) do
139✔
27
    Logger.metadata(external_id: tenant_id, project: tenant_id)
139✔
28
    Process.monitor(monitored_pid)
139✔
29

30
    tenant = Cache.get_tenant_by_external_id(tenant_id)
139✔
31
    connection_opts = Database.from_tenant(tenant, "realtime_listen", :stop)
139✔
32

33
    name =
139✔
34
      {:via, Registry, {Realtime.Registry.Unique, {Postgrex.Notifications, :tenant_id, tenant_id}}}
35

36
    settings =
139✔
37
      [
38
        hostname: connection_opts.hostname,
139✔
39
        database: connection_opts.database,
139✔
40
        password: connection_opts.password,
139✔
41
        username: connection_opts.username,
139✔
42
        port: connection_opts.port,
139✔
43
        ssl: connection_opts.ssl,
139✔
44
        socket_options: connection_opts.socket_options,
139✔
45
        sync_connect: true,
46
        auto_reconnect: false,
47
        backoff_type: :stop,
48
        max_restarts: 0,
49
        name: name,
50
        parameters: [application_name: "realtime_listen"]
51
      ]
52

53
    Logger.info("Listening for notifications on #{@topic}")
139✔
54

55
    case Postgrex.Notifications.start_link(settings) do
139✔
56
      {:ok, conn} ->
57
        Postgrex.Notifications.listen!(conn, @topic)
139✔
58
        {:ok, %{tenant_id: tenant.external_id, listen_conn: conn}}
139✔
59

60
      {:error, {:already_started, conn}} ->
61
        Postgrex.Notifications.listen!(conn, @topic)
×
62
        {:ok, %{tenant_id: tenant.external_id, listen_conn: conn}}
×
63

64
      {:error, reason} ->
×
65
        {:stop, reason}
66
    end
67
  catch
68
    e -> {:stop, e}
×
69
  end
70

71
  @spec start(Realtime.Api.Tenant.t(), pid()) :: {:ok, pid()} | {:error, any()}
72
  def start(%Tenant{} = tenant, pid) do
97✔
73
    supervisor = {:via, PartitionSupervisor, {Realtime.Tenants.Listen.DynamicSupervisor, self()}}
97✔
74
    spec = {__MODULE__, %__MODULE__{tenant_id: tenant.external_id, monitored_pid: pid}}
97✔
75

76
    case DynamicSupervisor.start_child(supervisor, spec) do
97✔
77
      {:ok, pid} -> {:ok, pid}
97✔
78
      {:error, {:already_started, pid}} -> {:ok, pid}
×
UNCOV
79
      error -> {:error, error}
×
80
    end
81
  catch
82
    e -> {:error, e}
×
83
  end
84

85
  @doc """
86
  Finds replication connection by tenant_id
87
  """
88
  @spec whereis(String.t()) :: pid() | nil
89
  def whereis(tenant_id) do
90
    case Registry.lookup(Realtime.Registry.Unique, {Postgrex.Notifications, :tenant_id, tenant_id}) do
6✔
91
      [{pid, _}] -> pid
4✔
92
      [] -> nil
2✔
93
    end
94
  end
95

96
  def handle_info({:notification, _, _, @topic, payload}, state) do
97
    case Jason.decode(payload) do
1✔
98
      {:ok, %{"function" => "realtime.send"} = parsed} when is_map_key(parsed, "error") ->
99
        Logs.log_error("FailedSendFromDatabase", parsed)
1✔
100

101
      {:error, _} ->
102
        Logs.log_error("FailedToParseDiagnosticMessage", payload)
×
103

104
      _ ->
×
105
        :ok
106
    end
107

108
    {:noreply, state}
109
  end
110

111
  def handle_info({:DOWN, _, :process, _, _}, state), do: {:stop, :normal, state}
96✔
112
  def handle_info(_, state), do: {:noreply, state}
×
113
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc