• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 18663a33ebc83fcab1fcf3bae0ea13cfa0a317e3-PR-1363

07 May 2025 11:29AM UTC coverage: 82.676%. First build
18663a33ebc83fcab1fcf3bae0ea13cfa0a317e3-PR-1363

Pull #1363

github

filipecabaco
remove logger
Pull Request #1363: fix: handle regionless state in syn & non found process on register_process

11 of 14 new or added lines in 4 files covered. (78.57%)

1761 of 2130 relevant lines covered (82.68%)

1195.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.48
/lib/realtime/syn_handler.ex
1
defmodule Realtime.SynHandler do
2
  @moduledoc """
3
  Custom defined Syn's callbacks
4
  """
5
  require Logger
6
  alias RealtimeWeb.Endpoint
7

8
  @doc """
9
  When processes registered with :syn are unregistered, either manually or by stopping, this
10
  callback is invoked.
11

12
  Other processes can subscribe to these events via PubSub to respond to them.
13

14
  We want to log conflict resolutions to know when more than one process on the cluster
15
  was started, and subsequently stopped because :syn handled the conflict.
16
  """
17
  def on_process_unregistered(mod, name, _pid, _meta, reason) do
18
    case reason do
120✔
19
      :syn_conflict_resolution ->
20
        Logger.warning("#{mod} terminated: #{inspect(name)} #{node()}")
1✔
21

22
      _ ->
23
        topic = topic(mod)
119✔
24
        Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", nil)
119✔
25
    end
26

27
    :ok
28
  end
29

30
  def resolve_registry_conflict(mod, name, process1, process2) do
31
    {pid1, state1, time1} = process1
2✔
32
    {pid2, state2, time2} = process2
2✔
33

34
    region = Map.get(state1, :region) || Map.get(state2, :region)
2✔
35

36
    platform_region = Realtime.Nodes.platform_region_translator(region)
2✔
37

38
    platform_region_nodes =
2✔
39
      RegionNodes
40
      |> :syn.members(platform_region)
41
      |> Enum.map(fn {_, [node: node]} -> node end)
1✔
42

43
    {keep, stop} =
2✔
44
      Enum.filter([pid1, pid2], fn pid -> Enum.member?(platform_region_nodes, node(pid)) end)
4✔
45
      |> then(fn
NEW
46
        [pid] -> {pid, if(pid != pid1, do: pid1, else: pid2)}
×
47
        _ -> if time1 < time2, do: {pid1, pid2}, else: {pid2, pid1}
2✔
48
      end)
49

50
    if node() == node(stop),
2✔
51
      do: spawn(fn -> resolve_conflict(mod, stop, name) end),
2✔
NEW
52
      else: Logger.warning("Resolving #{name} conflict, remote pid: #{inspect(stop)}")
×
53

54
    keep
2✔
55
  end
56

57
  defp resolve_conflict(mod, stop, name) do
58
    resp =
2✔
59
      if Process.alive?(stop) do
2✔
60
        try do
×
61
          DynamicSupervisor.stop(stop, :shutdown, 30_000)
×
62
        catch
63
          error, reason -> {:error, {error, reason}}
×
64
        end
65
      else
66
        :not_alive
67
      end
68

69
    topic = topic(mod)
2✔
70
    Endpoint.broadcast(topic <> ":" <> name, topic <> "_down", nil)
2✔
71

72
    Logger.warning("Resolving #{name} conflict, stop local pid: #{inspect(stop)}, response: #{inspect(resp)}")
2✔
73
  end
74

75
  defp topic(mod) do
76
    mod
77
    |> Macro.underscore()
78
    |> String.split("/")
79
    |> Enum.take(-1)
80
    |> hd()
121✔
81
  end
82
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc