• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 32f0f41ede54b9ba80cde49a5ef9d4db0d07fb49-PR-1403

05 Jun 2025 02:12PM UTC coverage: 83.625% (+0.6%) from 83.027%
32f0f41ede54b9ba80cde49a5ef9d4db0d07fb49-PR-1403

Pull #1403

github

filipecabaco
fix tests
Pull Request #1403: fix: handle tuple errors

5 of 9 new or added lines in 2 files covered. (55.56%)

6 existing lines in 2 files now uncovered.

1818 of 2174 relevant lines covered (83.62%)

2186.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.45
/lib/realtime/tenants/batch_broadcast.ex
1
defmodule Realtime.Tenants.BatchBroadcast do
2
  @moduledoc """
3
  Virtual schema with a representation of a batched broadcast.
4
  """
5
  use Ecto.Schema
6
  import Ecto.Changeset
7

8
  alias Realtime.Api.Tenant
9
  alias Realtime.GenCounter
10
  alias Realtime.RateCounter
11
  alias Realtime.Tenants
12
  alias Realtime.Tenants.Authorization
13
  alias Realtime.Tenants.Authorization.Policies
14
  alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
15
  alias Realtime.Tenants.Connect
16

17
  alias RealtimeWeb.Endpoint
18

19
  embedded_schema do
54✔
20
    embeds_many :messages, Message do
3,159✔
21
      field :event, :string
22
      field :topic, :string
23
      field :payload, :map
24
      field :private, :boolean, default: false
25
    end
26
  end
27

28
  def broadcast(auth_params, tenant, messages, super_user \\ false)
8✔
29

30
  def broadcast(%Plug.Conn{} = conn, %Tenant{} = tenant, messages, super_user) do
31
    auth_params = %{
8✔
32
      tenant_id: tenant.external_id,
8✔
33
      headers: conn.req_headers,
8✔
34
      jwt: conn.assigns.jwt,
8✔
35
      claims: conn.assigns.claims,
8✔
36
      role: conn.assigns.role
8✔
37
    }
38

39
    broadcast(auth_params, %Tenant{} = tenant, messages, super_user)
8✔
40
  end
41

42
  def broadcast(auth_params, %Tenant{} = tenant, messages, super_user) do
43
    with %Ecto.Changeset{valid?: true} = changeset <- changeset(%__MODULE__{}, messages),
27✔
44
         %Ecto.Changeset{changes: %{messages: messages}} = changeset,
26✔
45
         events_per_second_key = Tenants.events_per_second_key(tenant),
26✔
46
         :ok <- check_rate_limit(events_per_second_key, tenant, length(messages)) do
26✔
47
      events =
24✔
48
        messages
49
        |> Enum.map(fn %{changes: event} -> event end)
49✔
50
        |> Enum.group_by(fn event -> Map.get(event, :private, false) end)
49✔
51

52
      # Handle events for public channel
53
      events
54
      |> Map.get(false, [])
55
      |> Enum.each(fn %{topic: sub_topic, payload: payload, event: event} ->
24✔
56
        send_message_and_count(tenant, sub_topic, event, payload, true)
5✔
57
      end)
58

59
      # Handle events for private channel
60
      events
61
      |> Map.get(true, [])
62
      |> Enum.group_by(fn event -> Map.get(event, :topic) end)
44✔
63
      |> Enum.each(fn {topic, events} ->
24✔
64
        if super_user do
39✔
65
          Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} ->
18✔
66
            send_message_and_count(tenant, sub_topic, event, payload, false)
18✔
67
          end)
68
        else
69
          case permissions_for_message(tenant, auth_params, topic) do
21✔
70
            %Policies{broadcast: %BroadcastPolicies{write: true}} ->
71
              Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} ->
15✔
72
                send_message_and_count(tenant, sub_topic, event, payload, false)
15✔
73
              end)
74

75
            _ ->
6✔
76
              nil
77
          end
78
        end
79
      end)
80

81
      :ok
82
    end
83
  end
84

85
  def broadcast(_, nil, _, _), do: {:error, :tenant_not_found}
×
86

87
  def changeset(payload, attrs) do
88
    payload
89
    |> cast(attrs, [])
90
    |> cast_embed(:messages, required: true, with: &message_changeset/2)
27✔
91
  end
92

93
  def message_changeset(message, attrs) do
94
    message
95
    |> cast(attrs, [:topic, :payload, :event, :private])
96
    |> maybe_put_private_change()
97
    |> validate_required([:topic, :payload, :event])
1,053✔
98
  end
99

100
  defp maybe_put_private_change(changeset) do
101
    case get_change(changeset, :private) do
1,053✔
102
      nil -> put_change(changeset, :private, false)
1,009✔
103
      _ -> changeset
44✔
104
    end
105
  end
106

107
  defp send_message_and_count(tenant, topic, event, payload, public?) do
108
    events_per_second_key = Tenants.events_per_second_key(tenant)
38✔
109
    tenant_topic = Tenants.tenant_topic(tenant, topic, public?)
38✔
110
    payload = %{"payload" => payload, "event" => event, "type" => "broadcast"}
38✔
111

112
    GenCounter.add(events_per_second_key)
38✔
113
    Endpoint.broadcast_from(self(), tenant_topic, "broadcast", payload)
38✔
114
  end
115

NEW
116
  defp permissions_for_message(_, nil, _), do: nil
×
117

118
  defp permissions_for_message(tenant, auth_params, topic) do
119
    with {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant.external_id) do
21✔
120
      auth_params =
21✔
121
        auth_params
122
        |> Map.put(:topic, topic)
123
        |> Authorization.build_authorization_params()
124

125
      case Authorization.get_write_authorizations(db_conn, auth_params) do
21✔
126
        {:ok, policies} -> policies
21✔
NEW
127
        {:error, :not_found} -> nil
×
NEW
128
        error -> error
×
129
      end
130
    end
131
  end
132

133
  defp check_rate_limit(events_per_second_key, %Tenant{} = tenant, total_messages_to_broadcast) do
134
    %{max_events_per_second: max_events_per_second} = tenant
26✔
135
    {:ok, %{avg: events_per_second}} = RateCounter.get(events_per_second_key)
26✔
136

137
    cond do
26✔
138
      events_per_second > max_events_per_second ->
139
        {:error, :too_many_requests, "You have exceeded your rate limit"}
1✔
140

141
      total_messages_to_broadcast + events_per_second > max_events_per_second ->
25✔
142
        {:error, :too_many_requests, "Too many messages to broadcast, please reduce the batch size"}
1✔
143

144
      true ->
24✔
145
        :ok
146
    end
147
  end
148
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc