• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 95e65e20dbf37d801611cc277ce3ef3c47de5f02-PR-1555

02 Oct 2025 06:39PM UTC coverage: 85.17% (-0.03%) from 85.202%
95e65e20dbf37d801611cc277ce3ef3c47de5f02-PR-1555

Pull #1555

github

filipecabaco
ignore presence test for now
Pull Request #1555: fix: move connect rate limit to socket

9 of 15 new or added lines in 2 files covered. (60.0%)

2102 of 2468 relevant lines covered (85.17%)

2760.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.93
/lib/realtime_web/channels/user_socket.ex
1
defmodule RealtimeWeb.UserSocket do
2
  # This is defined up here before `use Phoenix.Socket` is called so that we can define `Phoenix.Socket.init/1`
3
  # It has to be overridden because we need to set the `max_heap_size` flag from the transport process context
4
  @impl true
5
  def init(state) when is_tuple(state) do
6
    Process.flag(:max_heap_size, max_heap_size())
117✔
7
    Phoenix.Socket.__init__(state)
117✔
8
  end
9

10
  use Phoenix.Socket
11
  use Realtime.Logs
12

13
  alias Realtime.Api.Tenant
14
  alias Realtime.Crypto
15
  alias Realtime.Database
16
  alias Realtime.PostgresCdc
17
  alias Realtime.Tenants
18

19
  alias RealtimeWeb.Channels.TenantRateLimiters
20
  alias RealtimeWeb.ChannelsAuthorization
21
  alias RealtimeWeb.RealtimeChannel
22
  alias RealtimeWeb.RealtimeChannel.Logging
23
  ## Channels
24
  channel "realtime:*", RealtimeChannel
25

26
  @default_log_level :error
27

28
  @impl true
29
  def id(%{assigns: %{tenant: tenant}}), do: subscribers_id(tenant)
117✔
30

31
  @spec subscribers_id(String.t()) :: String.t()
32
  def subscribers_id(tenant), do: "user_socket:" <> tenant
117✔
33

34
  @impl true
35
  def connect(params, socket, opts) do
36
    %{uri: %{host: host}, x_headers: headers} = opts
128✔
37

38
    {:ok, external_id} = Database.get_external_id(host)
128✔
39
    token = access_token(params, headers)
128✔
40
    log_level = log_level(params)
128✔
41

42
    Logger.metadata(external_id: external_id, project: external_id)
128✔
43
    Logger.put_process_level(self(), log_level)
128✔
44

45
    socket =
128✔
46
      socket
47
      |> assign(:tenant, external_id)
48
      |> assign(:log_level, log_level)
49
      |> assign(:access_token, token)
50

51
    with %Tenant{
128✔
52
           jwt_secret: jwt_secret,
53
           jwt_jwks: jwt_jwks,
54
           postgres_cdc_default: postgres_cdc_default,
55
           suspend: false
56
         } = tenant <- Tenants.Cache.get_tenant_by_external_id(external_id),
128✔
57
         token when is_binary(token) <- token,
127✔
58
         jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
127✔
59
         {:ok, claims} <- ChannelsAuthorization.authorize_conn(token, jwt_secret_dec, jwt_jwks),
127✔
60
         :ok <- TenantRateLimiters.check_tenant(tenant),
117✔
61
         {:ok, postgres_cdc_module} <- PostgresCdc.driver(postgres_cdc_default) do
117✔
62
      %Tenant{
63
        extensions: extensions,
64
        max_concurrent_users: max_conn_users,
65
        max_events_per_second: max_events_per_second,
66
        max_bytes_per_second: max_bytes_per_second,
67
        max_joins_per_second: max_joins_per_second,
68
        max_channels_per_client: max_channels_per_client,
69
        postgres_cdc_default: postgres_cdc_default
70
      } = tenant
117✔
71

72
      assigns = %RealtimeChannel.Assigns{
117✔
73
        claims: claims,
74
        jwt_secret: jwt_secret,
75
        jwt_jwks: jwt_jwks,
76
        limits: %{
77
          max_concurrent_users: max_conn_users,
78
          max_events_per_second: max_events_per_second,
79
          max_bytes_per_second: max_bytes_per_second,
80
          max_joins_per_second: max_joins_per_second,
81
          max_channels_per_client: max_channels_per_client
82
        },
83
        postgres_extension: PostgresCdc.filter_settings(postgres_cdc_default, extensions),
84
        postgres_cdc_module: postgres_cdc_module,
85
        tenant: external_id,
86
        log_level: log_level,
87
        tenant_token: token,
88
        headers: opts.x_headers
117✔
89
      }
90

91
      assigns = Map.from_struct(assigns)
117✔
92

93
      {:ok, assign(socket, assigns)}
94
    else
95
      nil ->
96
        log_error("TenantNotFound", "Tenant not found: #{external_id}")
×
97
        {:error, :tenant_not_found}
98

99
      %Tenant{suspend: true} ->
100
        Logging.log_error(socket, "RealtimeDisabledForTenant", "Realtime disabled for this tenant")
1✔
101
        {:error, :tenant_suspended}
102

103
      {:error, :expired_token, msg} ->
104
        Logging.maybe_log_warning(socket, "InvalidJWTToken", msg)
5✔
105
        {:error, :expired_token}
106

107
      {:error, :missing_claims} ->
108
        msg = "Fields `role` and `exp` are required in JWT"
4✔
109
        Logging.maybe_log_warning(socket, "InvalidJWTToken", msg)
4✔
110
        {:error, :missing_claims}
111

112
      {:error, :token_malformed} ->
113
        log_error("MalformedJWT", "The token provided is not a valid JWT")
1✔
114
        {:error, :token_malformed}
115

116
      {:error, :too_many_connections} ->
NEW
117
        msg = "Too many connected users"
×
NEW
118
        Logging.log_error(socket, "ConnectionRateLimitReached", msg)
×
119
        {:error, :too_many_connections}
120

121
      {:error, :too_many_joins} ->
NEW
122
        msg = "Too many joins per second"
×
NEW
123
        Logging.log_error(socket, "JoinsRateLimitReached", msg)
×
124
        {:error, :too_many_joins}
125

126
      error ->
127
        log_error("ErrorConnectingToWebsocket", error)
×
128
        error
×
129
    end
130
  end
131

132
  defp access_token(params, headers) do
133
    case :proplists.lookup("x-api-key", headers) do
128✔
134
      :none -> Map.get(params, "apikey")
1✔
135
      {"x-api-key", token} -> token
127✔
136
    end
137
  end
138

139
  defp log_level(params) do
140
    case Map.get(params, "log_level") do
128✔
141
      level when level in ["info", "warning", "error"] -> String.to_existing_atom(level)
111✔
142
      _ -> @default_log_level
17✔
143
    end
144
  end
145

146
  defp max_heap_size(), do: Application.fetch_env!(:realtime, :websocket_max_heap_size)
117✔
147
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc