• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 555444ad51824f2345f4e4858c035d7ede919b7e-PR-1481

04 Aug 2025 12:49AM UTC coverage: 85.002% (+0.1%) from 84.858%
555444ad51824f2345f4e4858c035d7ede919b7e-PR-1481

Pull #1481

github

edgurgel
fix: rewrite SynHandler to pick the oldest process on conflicts

We try to keep the oldest process. If the time they were registered is exactly the same we use
their node names to decide.

The most important part is that both nodes must 100% of the time agree on the decision

Also refactored Clustered to allow for a peer to be started without
automatically connecting to the main node.
Pull Request #1481: fix: syn conflict resolution

17 of 19 new or added lines in 1 file covered. (89.47%)

1 existing line in 1 file now uncovered.

1995 of 2347 relevant lines covered (85.0%)

1307.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/lib/realtime/syn_handler.ex
1
defmodule Realtime.SynHandler do
2
  @moduledoc """
3
  Custom defined Syn's callbacks
4
  """
5
  require Logger
6
  alias Extensions.PostgresCdcRls
7
  alias RealtimeWeb.Endpoint
8
  alias Realtime.Tenants.Connect
9

10
  @behaviour :syn_event_handler
11

12
  @impl true
13
  def on_registry_process_updated(Connect, tenant_id, _pid, %{conn: conn}, :normal) when is_pid(conn) do
14
    # Update that a database connection is ready
15
    Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{conn: conn})
164✔
16
  end
17

18
  def on_registry_process_updated(PostgresCdcRls, tenant_id, _pid, meta, _reason) do
19
    # Update that the CdCRls connection is ready
20
    Endpoint.local_broadcast(PostgresCdcRls.syn_topic(tenant_id), "ready", meta)
11✔
21
  end
22

23
  def on_registry_process_updated(_scope, _name, _pid, _meta, _reason), do: :ok
4✔
24

25
  @doc """
26
  When processes registered with :syn are unregistered, either manually or by stopping, this
27
  callback is invoked.
28

29
  Other processes can subscribe to these events via PubSub to respond to them.
30

31
  We want to log conflict resolutions to know when more than one process on the cluster
32
  was started, and subsequently stopped because :syn handled the conflict.
33
  """
34
  @impl true
35
  def on_process_unregistered(mod, name, pid, _meta, reason) do
36
    if reason == :syn_conflict_resolution do
189✔
37
      log("#{mod} terminated due to syn conflict resolution: #{inspect(name)} #{inspect(pid)}")
2✔
38
    end
39

40
    topic = topic(mod)
189✔
41
    Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", nil)
189✔
42

43
    :ok
44
  end
45

46
  @doc """
47
  We try to keep the oldest process. If the time they were registered is exactly the same we use
48
  their node names to decide.
49

50
  The most important part is that both nodes must 100% of the time agree on the decision
51
  """
52
  @impl true
53
  def resolve_registry_conflict(mod, name, {pid1, _meta1, time1}, {pid2, _meta2, time2}) do
54
    {pid_to_keep, pid_to_stop} = decide(pid1, time1, pid2, time2)
2✔
55

56
    # Is this function running on the node that should stop?
57
    if node(pid_to_stop) == node() do
2✔
58
      log(
1✔
59
        "Resolving conflict on scope #{inspect(mod)} for name #{inspect(name)} {#{inspect(pid1)}, #{time1}} vs {#{inspect(pid2)}, #{time2}}, stop local process: #{inspect(pid_to_stop)}"
1✔
60
      )
61

62
      stop(pid_to_stop)
1✔
63
    else
64
      log(
1✔
65
        "Resolving conflict on scope #{inspect(mod)} for name #{inspect(name)} {#{inspect(pid1)}, #{time1}} vs {#{inspect(pid2)}, #{time2}}, remote process will be stopped: #{inspect(pid_to_stop)}"
1✔
66
      )
67
    end
68

69
    pid_to_keep
2✔
70
  end
71

72
  defp stop(pid_to_stop) do
73
    spawn(fn ->
1✔
74
      Process.monitor(pid_to_stop)
1✔
75
      Process.exit(pid_to_stop, {:shutdown, :syn_conflict_resolution})
1✔
76

77
      receive do
1✔
78
        {:DOWN, _ref, :process, ^pid_to_stop, reason} ->
79
          log("Successfully stopped #{inspect(pid_to_stop)}. Reason: #{inspect(reason)}")
1✔
80
      after
81
        30_000 ->
NEW
82
          log("Timed out while waiting for process #{inspect(pid_to_stop)} to stop")
×
83
      end
84
    end)
85
  end
86

87
  defp log(message), do: Logger.warning("SynHandler(#{node()}): #{message}")
5✔
88

89
  # If the time on both pids are exactly the same
90
  # we compare the node names and pick one consistently
91
  # Node names are necessarily unique
92
  defp decide(pid1, time1, pid2, time2) when time1 == time2 do
NEW
93
    if node(pid1) < node(pid2) do
×
94
      {pid1, pid2}
95
    else
96
      {pid2, pid1}
97
    end
98
  end
99

100
  defp decide(pid1, time1, pid2, time2) do
101
    # We pick the one that started first.
102
    if time1 < time2 do
2✔
103
      {pid1, pid2}
104
    else
105
      {pid2, pid1}
106
    end
107
  end
108

109
  defp topic(mod) do
110
    mod
111
    |> Macro.underscore()
112
    |> String.split("/")
113
    |> Enum.take(-1)
114
    |> hd()
189✔
115
  end
116
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc