• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / f61a44f4e10113c63b30f74946c462d773f5b222-PR-1291

31 Jan 2025 10:02PM UTC coverage: 80.406% (-0.03%) from 80.433%
f61a44f4e10113c63b30f74946c462d773f5b222-PR-1291

Pull #1291

github

filipecabaco
fix: apply formatting and style fixes; enable in CI
Pull Request #1291: fix: apply formatting and style fixes; enable in CI

25 of 35 new or added lines in 18 files covered. (71.43%)

5 existing lines in 2 files now uncovered.

1662 of 2067 relevant lines covered (80.41%)

329.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.0
/lib/realtime/syn_handler.ex
1
defmodule Realtime.SynHandler do
2
  @moduledoc """
3
  Custom defined Syn's callbacks
4
  """
5
  require Logger
6
  alias RealtimeWeb.Endpoint
7

8
  @doc """
9
  When processes registered with :syn are unregistered, either manually or by stopping, this
10
  callback is invoked.
11

12
  Other processes can subscribe to these events via PubSub to respond to them.
13

14
  We want to log conflict resolutions to know when more than one process on the cluster
15
  was started, and subsequently stopped because :syn handled the conflict.
16
  """
17
  def on_process_unregistered(mod, name, _pid, _meta, reason) do
18
    case reason do
55✔
19
      :syn_conflict_resolution ->
20
        Logger.warning("#{mod} terminated: #{inspect(name)} #{node()}")
×
21

22
      _ ->
23
        topic = topic(mod)
55✔
24
        Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", nil)
55✔
25
    end
26

27
    :ok
28
  end
29

30
  def resolve_registry_conflict(mod, name, {pid1, %{region: region}, time1}, {pid2, _, time2}) do
31
    platform_region = Realtime.Nodes.platform_region_translator(region)
×
32

33
    platform_region_nodes =
×
34
      RegionNodes
35
      |> :syn.members(platform_region)
36
      |> Enum.map(fn {_, [node: node]} -> node end)
×
37

38
    {keep, stop} =
×
39
      [pid1, pid2]
40
      |> Enum.filter(fn pid ->
41
        Enum.member?(platform_region_nodes, node(pid))
×
42
      end)
43
      |> then(fn
44
        [pid] ->
×
45
          {pid, if(pid != pid1, do: pid1, else: pid2)}
×
46

47
        _ ->
48
          if time1 < time2 do
×
49
            {pid1, pid2}
50
          else
51
            {pid2, pid1}
52
          end
53
      end)
54

55
    if node() == node(stop) do
×
56
      spawn(fn -> resolve_conflict(mod, stop, name) end)
×
57
    else
58
      Logger.warning("Resolving #{name} conflict, remote pid: #{inspect(stop)}")
×
59
    end
60

61
    keep
×
62
  end
63

64
  defp resolve_conflict(mod, stop, name) do
65
    resp =
×
66
      if Process.alive?(stop) do
×
67
        try do
×
68
          DynamicSupervisor.stop(stop, :shutdown, 30_000)
×
69
        catch
70
          error, reason -> {:error, {error, reason}}
×
71
        end
72
      else
73
        :not_alive
74
      end
75

76
    topic = topic(mod)
×
77
    Endpoint.broadcast(topic <> ":" <> name, topic <> "_down", nil)
×
78

NEW
79
    Logger.warning("Resolving #{name} conflict, stop local pid: #{inspect(stop)}, response: #{inspect(resp)}")
×
80
  end
81

82
  defp topic(mod) do
83
    mod
84
    |> Macro.underscore()
85
    |> String.split("/")
86
    |> Enum.take(-1)
87
    |> hd()
55✔
88
  end
89
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc