• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 2b7e23adbed4cef8a99fa4edff4892f30a4d0b73-PR-1576

12 Nov 2025 03:49PM UTC coverage: 86.989% (+0.03%) from 86.957%
2b7e23adbed4cef8a99fa4edff4892f30a4d0b73-PR-1576

Pull #1576

github

filipecabaco
fix: provide microsecond granularity by using binary pgoutput mode
Pull Request #1576: fix: provide microsecond granularity by using binary pgoutput mode

4 of 4 new or added lines in 2 files covered. (100.0%)

4 existing lines in 2 files now uncovered.

2340 of 2690 relevant lines covered (86.99%)

8909.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.75
/lib/extensions/postgres_cdc_rls/message_dispatcher.ex
1
# This file draws from https://github.com/phoenixframework/phoenix/blob/9941711736c8464b27b40914a4d954ed2b4f5958/lib/phoenix/channel/server.ex
2
# License: https://github.com/phoenixframework/phoenix/blob/518a4640a70aa4d1370a64c2280d598e5b928168/LICENSE.md
3

4
defmodule Extensions.PostgresCdcRls.MessageDispatcher do
5
  @moduledoc """
6
  Hook invoked by Phoenix.PubSub dispatch.
7
  """
8

9
  alias Phoenix.Socket.Broadcast
10

11
  def dispatch([_ | _] = topic_subscriptions, _from, {type, payload, sub_ids}) do
12
    _ =
80✔
13
      Enum.reduce(topic_subscriptions, %{}, fn
14
        {_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, is_new_api}}, cache ->
15
          for {bin_id, id} <- ids, reduce: [] do
82✔
16
            acc ->
17
              if MapSet.member?(sub_ids, bin_id) do
82✔
18
                [id | acc]
19
              else
20
                acc
×
21
              end
22
          end
23
          |> case do
80✔
24
            [_ | _] = valid_ids ->
25
              new_payload =
80✔
26
                if is_new_api do
27
                  %Broadcast{
80✔
28
                    topic: join_topic,
29
                    event: "postgres_changes",
30
                    payload: %{ids: valid_ids, data: Jason.Fragment.new(payload)}
31
                  }
32
                else
33
                  %Broadcast{topic: join_topic, event: type, payload: Jason.Fragment.new(payload)}
×
34
                end
35

36
              broadcast_message(cache, fastlane_pid, new_payload, serializer)
80✔
37

38
            _ ->
39
              cache
×
40
          end
41
      end)
42

43
    :ok
44
  end
45

46
  defp broadcast_message(cache, fastlane_pid, msg, serializer) do
47
    case cache do
80✔
48
      %{^msg => encoded_msg} ->
UNCOV
49
        send(fastlane_pid, encoded_msg)
×
UNCOV
50
        cache
×
51

52
      %{} ->
53
        encoded_msg = serializer.fastlane!(msg)
80✔
54
        send(fastlane_pid, encoded_msg)
80✔
55
        Map.put(cache, msg, encoded_msg)
80✔
56
    end
57
  end
58
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc