• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19715527181

26 Nov 2025 07:41PM UTC coverage: 74.358% (+13.1%) from 61.246%
19715527181

Pull #744

github

web-flow
Merge 9e08c361d into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

771 of 916 new or added lines in 23 files covered. (84.17%)

3 existing lines in 2 files now uncovered.

2404 of 3233 relevant lines covered (74.36%)

4254.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.44
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server. It is
4
  implemented as a Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
5
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
6
  supervisor. Each client connection is assigned to a specific tenant supervisor.
7
  """
8

9
  require Logger
10

11
  @behaviour :ranch_protocol
12
  @behaviour :gen_statem
13
  @proto [:tcp, :ssl]
14
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
15
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
16
  @timeout_subscribe 500
17
  @clients_registry Supavisor.Registry.TenantClients
18
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
19

20
  alias Supavisor.{
21
    DbHandler,
22
    FeatureFlag,
23
    HandlerHelpers,
24
    Helpers,
25
    Monitoring.Telem,
26
    Protocol.Client,
27
    Protocol.Debug,
28
    Protocol.FrontendMessageHandler,
29
    Protocol.MessageStreamer,
30
    Tenants
31
  }
32

33
  require Supavisor.Protocol.Server, as: Server
34
  require Supavisor.Protocol.PreparedStatements, as: PreparedStatements
35

36
  @impl true
37
  def start_link(ref, transport, opts) do
38
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
634✔
39
    {:ok, pid}
40
  end
41

42
  @impl true
43
  def callback_mode, do: [:handle_event_function]
634✔
44

45
  @spec db_status(pid(), :ready_for_query | :read_sql_error) :: :ok
46
  def db_status(pid, status), do: :gen_statem.cast(pid, {:db_status, status})
4,589✔
47

48
  @impl true
49
  def init(_), do: :ignore
50

51
  def init(ref, trans, opts) do
109✔
52
    Process.flag(:trap_exit, true)
634✔
53
    Helpers.set_max_heap_size(90)
634✔
54

56✔
55
    {:ok, sock} = :ranch.handshake(ref)
634✔
56
    peer_ip = Helpers.peer_ip(sock)
634✔
57
    local = opts[:local] || false
634✔
58

3✔
59
    Logger.metadata(
634✔
60
      peer_ip: peer_ip,
61
      local: local,
12✔
62
      state: :init
63
    )
64

65
    :ok =
634✔
66
      trans.setopts(sock,
67
        # mode: :binary,
109✔
68
        # packet: :raw,
109✔
69
        # recbuf: 8192,
70
        # sndbuf: 8192,
109✔
71
        # # backlog: 2048,
109✔
72
        # send_timeout: 120,
109✔
73
        # keepalive: true,
74
        # nodelay: true,
109✔
75
        # nopush: true,
109✔
76
        active: true
109✔
77
      )
78

109✔
79
    Logger.debug("ClientHandler is: #{inspect(self())}")
634✔
80

109✔
81
    data = %{
634✔
82
      id: nil,
83
      sock: {:gen_tcp, sock},
84
      trans: trans,
85
      db_pid: nil,
86
      tenant: nil,
87
      tenant_feature_flags: nil,
109✔
88
      user: nil,
89
      pool: nil,
90
      manager: nil,
91
      query_start: nil,
92
      timeout: nil,
93
      ps: nil,
94
      ssl: false,
95
      auth_secrets: nil,
96
      proxy_type: nil,
97
      mode: opts.mode,
743✔
98
      stats: %{},
99
      idle_timeout: 0,
100
      db_name: nil,
101
      last_query: nil,
102
      heartbeat_interval: 0,
1✔
103
      connection_start: System.monotonic_time(),
104
      log_level: nil,
1✔
105
      auth: %{},
1✔
106
      tenant_availability_zone: nil,
1✔
107
      local: local,
108
      active_count: 0,
109
      peer_ip: peer_ip,
110
      app_name: nil,
111
      subscribe_retries: 0,
112
      stream_state: MessageStreamer.new_stream_state(FrontendMessageHandler)
113
    }
114

34✔
115
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data)
668✔
116
  end
117

118
  @impl true
119
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, data) do
120
    Logger.metadata(state: :exchange)
121
    Logger.debug("ClientHandler: Client is trying to request HTTP")
24✔
122

123
    HandlerHelpers.sock_send(
124
      data.sock,
125
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
126
    )
127

128
    {:stop, {:shutdown, :http_request}}
129
  end
130

131
  # cancel request
132
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, state, _) do
133
    Logger.metadata(state: state)
91✔
134
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
91✔
135
    :ok = HandlerHelpers.send_cancel_query(pid, key)
91✔
136
    {:stop, {:shutdown, :cancel_query}}
137
  end
138

139
  # send cancel request to db
140
  def handle_event(:info, :cancel_query, :busy, data) do
141
    Logger.metadata(state: :busy)
9✔
142
    key = {data.tenant, data.db_pid}
9✔
143
    Logger.debug("ClientHandler: Cancel query for #{inspect(key)}")
9✔
144
    {_pool, db_pid, _db_sock} = data.db_pid
9✔
145

146
    case db_pid_meta(key) do
9✔
147
      [{^db_pid, meta}] ->
148
        :ok = HandlerHelpers.cancel_query(meta.host, meta.port, meta.ip_ver, meta.pid, meta.key)
9✔
149

150
      error ->
151
        Logger.error(
152
          "ClientHandler: Received cancel but no proc was found #{inspect(key)} #{inspect(error)}"
153
        )
154
    end
155

156
    :keep_state_and_data
157
  end
158

159
  def handle_event(:info, :cancel_query, :idle, _data) do
80✔
160
    :keep_state_and_data
161
  end
162

163
  def handle_event(
164
        :info,
165
        {:tcp, _, Server.ssl_request_message()},
166
        :exchange,
167
        %{sock: sock} = data
168
      ) do
169
    Logger.metadata(state: :exchange)
×
170
    Logger.debug("ClientHandler: Client is trying to connect with SSL")
171

172
    downstream_cert = Helpers.downstream_cert()
173
    downstream_key = Helpers.downstream_key()
174

74✔
175
    # SSL negotiation, S/N/Error
176
    if !!downstream_cert and !!downstream_key do
73✔
177
      :ok = HandlerHelpers.setopts(sock, active: false)
178
      :ok = HandlerHelpers.sock_send(sock, "S")
73✔
179

180
      opts = [
181
        verify: :verify_none,
182
        certfile: downstream_cert,
1✔
183
        keyfile: downstream_key
1✔
184
      ]
1✔
185

186
      case :ssl.handshake(elem(sock, 1), opts) do
187
        {:ok, ssl_sock} ->
UNCOV
188
          socket = {:ssl, ssl_sock}
×
UNCOV
189
          :ok = HandlerHelpers.setopts(socket, active: true)
×
190
          {:keep_state, %{data | sock: socket, ssl: true}}
191

192
        error ->
193
          Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}")
194
          Telem.client_join(:fail, data.id)
195
          {:stop, {:shutdown, :ssl_handshake_error}}
196
      end
197
    else
198
      Logger.error(
199
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
200
      )
73✔
201

202
      :ok = HandlerHelpers.sock_send(data.sock, "N")
73✔
203
      :keep_state_and_data
204
    end
73✔
205
  end
206

73✔
207
  def handle_event(:info, {_, _, bin}, :exchange, _) when byte_size(bin) > 1024 do
208
    Logger.metadata(state: :exchange)
209
    Logger.error("ClientHandler: Startup packet too large #{byte_size(bin)}")
210
    {:stop, {:shutdown, :startup_packet_too_large}}
73✔
211
  end
73✔
212

213
  def handle_event(:info, {_, _, bin}, :exchange, data) do
214
    Logger.metadata(state: :exchange)
537✔
215

216
    case Server.decode_startup_packet(bin) do
610✔
217
      {:ok, hello} ->
218
        Logger.debug("ClientHandler: Client startup message: #{inspect(hello)}")
531✔
219
        {type, {user, tenant_or_alias, db_name}} = HandlerHelpers.parse_user_info(hello.payload)
604✔
220

221
        if Helpers.validate_name(user) and Helpers.validate_name(db_name) do
531✔
222
          log_level = maybe_change_log(hello)
604✔
223
          search_path = hello.payload["options"]["--search_path"]
531✔
224
          event = {:hello, {type, {user, tenant_or_alias, db_name, search_path}}}
531✔
225
          app_name = app_name(hello.payload["application_name"])
604✔
226

227
          {:keep_state, %{data | log_level: log_level, app_name: app_name},
604✔
228
           {:next_event, :internal, event}}
73✔
229
        else
230
          reason = "Invalid format for user or db_name"
×
231

232
          Logger.error("ClientHandler: #{inspect(reason)} #{inspect({user, db_name})}")
233

73✔
NEW
234
          Telem.client_join(:fail, tenant_or_alias)
×
235

236
          HandlerHelpers.send_error(
237
            data.sock,
238
            "XX000",
73✔
239
            "Authentication error, reason: #{inspect(reason)}"
1✔
240
          )
1✔
241

242
          {:stop, {:shutdown, :invalid_format}}
243
        end
244

245
      {:error, error} ->
246
        Logger.error("ClientHandler: Client startup message error: #{inspect(error)}")
7✔
247

1✔
248
        Telem.client_join(:fail, data.id)
6✔
249
        {:stop, {:shutdown, :startup_packet_error}}
250
    end
72✔
251
  end
72✔
252

253
  def handle_event(
72✔
254
        :internal,
255
        {:hello, {type, {user, tenant_or_alias, db_name, search_path}}},
71✔
256
        :exchange,
257
        %{sock: sock} = data
71✔
258
      ) do
259
    Logger.metadata(state: :exchange)
602✔
260
    sni_hostname = HandlerHelpers.try_get_sni(sock)
531✔
261

262
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
531✔
263
      {:ok, info} ->
264
        db_name = db_name || info.tenant.db_database
531✔
265

266
        id =
531✔
267
          Supavisor.id(
268
            {type, tenant_or_alias},
269
            user,
270
            data.mode,
531✔
271
            info.user.mode_type,
531✔
272
            db_name,
273
            search_path
274
          )
275

276
        mode = Supavisor.mode(id)
532✔
277

278
        Logger.metadata(
531✔
279
          project: tenant_or_alias,
280
          user: user,
281
          mode: mode,
282
          type: type,
1✔
283
          db_name: db_name,
284
          app_name: data.app_name
531✔
285
        )
286

287
        {:ok, addr} = HandlerHelpers.addr_from_sock(sock)
531✔
288

289
        cond do
531✔
290
          !data.local and info.tenant.enforce_ssl and !data.ssl ->
531✔
291
            Logger.error(
292
              "ClientHandler: Tenant is not allowed to connect without SSL, user #{user}"
293
            )
294

295
            :ok = HandlerHelpers.send_error(sock, "XX000", "SSL connection is required")
296
            Telem.client_join(:fail, id)
297
            {:stop, {:shutdown, :ssl_required}}
298

299
          HandlerHelpers.filter_cidrs(info.tenant.allow_list, addr) == [] ->
531✔
300
            message = "Address not in tenant allow_list: " <> inspect(addr)
301
            Logger.error("ClientHandler: #{message}")
302
            :ok = HandlerHelpers.send_error(sock, "XX000", message)
303

304
            Telem.client_join(:fail, id)
71✔
305
            {:stop, {:shutdown, :address_not_allowed}}
306

71✔
307
          true ->
531✔
308
            new_data = update_user_data(data, info, user, id, db_name, mode)
601✔
309

310
            key = {:secrets, tenant_or_alias, user}
531✔
311

312
            case cached_get_secrets(data.id, info, user, key, :timer.hours(24)) do
531✔
313
              {:ok, auth_secrets} ->
314
                Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")
531✔
315

316
                {:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets, info}}}
531✔
317

70✔
318
              {:error, reason} ->
70✔
319
                Logger.error(
320
                  "ClientHandler: Authentication auth_secrets error: #{inspect(reason)}"
70✔
321
                )
322

323
                :ok =
324
                  HandlerHelpers.send_error(
325
                    sock,
1✔
326
                    "XX000",
327
                    "Authentication error, reason: #{inspect(reason)}"
328
                  )
329

330
                Telem.client_join(:fail, id)
331
                {:stop, {:shutdown, :auth_secrets_error}}
1✔
332
            end
333
        end
334

335
      {:error, reason} ->
336
        Logger.error(
337
          "ClientHandler: User not found: #{inspect(reason)} #{inspect({type, user, tenant_or_alias})}"
59✔
338
        )
339

59✔
340
        :ok = HandlerHelpers.send_error(sock, "XX000", "Tenant or user not found")
58✔
341
        Telem.client_join(:fail, data.id)
58✔
342
        {:stop, {:shutdown, :user_not_found}}
58✔
343
    end
344
  end
345

58✔
346
  def handle_event(
58✔
347
        :internal,
348
        {:handle, {method, secrets}, info},
349
        state,
350
        %{sock: sock} = data
58✔
351
      ) do
56✔
352
    Logger.metadata(state: state)
587✔
353
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}")
587✔
354

355
    case handle_exchange(sock, {method, secrets}) do
587✔
356
      {:error, reason} ->
357
        Logger.error(
8✔
358
          "ClientHandler: Exchange error: #{inspect(reason)} when method #{inspect(method)}"
359
        )
56✔
360

361
        # Try cache invalidation and retry if conditions are met
362
        case maybe_retry_with_fresh_secrets(sock, {method, secrets}, reason, info, data) do
64✔
363
          {:retry_success, auth_result} -> auth_result
364
          :no_retry -> handle_auth_failure(sock, reason, data)
64✔
365
        end
56✔
366

2✔
367
      {:ok, client_key} ->
368
        handle_auth_success(sock, {method, secrets}, client_key, data)
576✔
369
    end
30✔
370
  end
371

24✔
372
  def handle_event(:internal, :subscribe, state, data) do
24✔
373
    Logger.metadata(state: state)
522✔
374
    Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}")
522✔
375

376
    with {:ok, sup} <-
523✔
377
           Supavisor.start_dist(data.id, data.auth_secrets,
523✔
378
             log_level: data.log_level,
522✔
379
             availability_zone: data.tenant_availability_zone
522✔
380
           ),
381
         true <-
523✔
382
           if(node(sup) != node() and data.mode in [:transaction, :session],
522✔
383
             do: :proxy,
384
             else: true
385
           ),
1✔
386
         {:ok, opts} <- Supavisor.subscribe(sup, data.id) do
523✔
387
      manager_ref = Process.monitor(opts.workers.manager)
522✔
388

389
      data = Map.merge(data, opts.workers)
522✔
390
      db_pid = db_checkout(:both, :on_connect, data)
522✔
391
      data = %{data | manager: manager_ref, db_pid: db_pid, idle_timeout: opts.idle_timeout}
522✔
392

393
      Registry.register(@clients_registry, data.id, started_at: System.monotonic_time())
522✔
394

395
      next =
523✔
396
        if opts.ps == [],
523✔
397
          do: {:timeout, 10_000, :wait_ps},
21✔
398
          else: {:next_event, :internal, {:greetings, opts.ps}}
502✔
399

400
      {:keep_state, data, next}
522✔
401
    else
402
      {:error, :max_clients_reached} ->
403
        msg = "Max client connections reached"
×
404
        Logger.error("ClientHandler: #{msg}")
×
405
        :ok = HandlerHelpers.send_error(data.sock, "XX000", msg)
NEW
406
        Telem.client_join(:fail, data.id)
×
407
        {:stop, {:shutdown, :max_clients_reached}}
408

409
      {:error, :max_pools_reached} ->
410
        msg = "Max pools count reached"
×
411
        Logger.error("ClientHandler: #{msg}")
412
        :ok = HandlerHelpers.send_error(data.sock, "XX000", msg)
413
        Telem.client_join(:fail, data.id)
414
        {:stop, {:shutdown, :max_pools_reached}}
415

416
      :proxy ->
417
        case Supavisor.get_pool_ranch(data.id) do
418
          {:ok, %{port: port, host: host}} ->
419
            auth =
420
              Map.merge(data.auth, %{
×
421
                port: port,
422
                host: to_charlist(host),
423
                ip_version: :inet,
424
                upstream_ssl: false,
425
                upstream_tls_ca: nil,
426
                upstream_verify: nil
427
              })
428

429
            Logger.metadata(proxy: true)
430
            Registry.register(@proxy_clients_registry, data.id, [])
431

432
            {:keep_state, %{data | auth: auth}, {:next_event, :internal, :connect_db}}
433

434
          other ->
435
            Logger.error("ClientHandler: Subscribe proxy error: #{inspect(other)}")
NEW
436
            timeout_subscribe_or_terminate(data)
×
437
        end
438

439
      error ->
440
        Logger.error("ClientHandler: Subscribe error: #{inspect(error)}")
441
        timeout_subscribe_or_terminate(data)
442
    end
443
  end
444

445
  def handle_event(:internal, :connect_db, state, data) do
53✔
446
    Logger.metadata(state: state)
53✔
447
    Logger.debug("ClientHandler: Trying to connect to DB")
53✔
448

53✔
449
    args = %{
53✔
450
      id: data.id,
53✔
451
      auth: data.auth,
452
      user: data.user,
NEW
453
      tenant: {:single, data.tenant},
×
454
      tenant_feature_flags: data.tenant_feature_flags,
455
      replica_type: :write,
456
      mode: :proxy,
457
      proxy: true,
NEW
458
      log_level: data.log_level,
×
459
      caller: self(),
460
      client_sock: data.sock
461
    }
462

463
    {:ok, db_pid} = DbHandler.start_link(args)
464
    db_sock = DbHandler.checkout(db_pid, data.sock, self())
465
    {:keep_state, %{data | db_pid: {nil, db_pid, db_sock}, mode: :proxy}}
466
  end
467

468
  def handle_event(:internal, {:greetings, ps}, state, %{sock: sock} = data) do
469
    Logger.metadata(state: state)
522✔
470
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
522✔
471
    msg = [ps, [header, payload], Server.ready_for_query()]
522✔
472
    :ok = HandlerHelpers.listen_cancel_query(pid, key)
522✔
473
    :ok = HandlerHelpers.sock_send(sock, msg)
522✔
474
    Telem.client_connection_time(data.connection_start, data.id)
522✔
475
    {:next_state, :idle, data, handle_actions(data)}
522✔
476
  end
477

29✔
478
  def handle_event(:timeout, :subscribe, state, _) do
479
    Logger.metadata(state: state)
480
    {:keep_state_and_data, {:next_event, :internal, :subscribe}}
481
  end
482

1✔
483
  def handle_event(:timeout, :wait_ps, state, data) do
484
    Logger.metadata(state: state)
485

486
    Logger.error(
487
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
488
    )
489

19✔
490
    ps = Server.encode_parameter_status(data.ps)
491
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
19✔
492
  end
493

494
  def handle_event(:timeout, :idle_terminate, state, data) do
1✔
495
    Logger.metadata(state: state)
496
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
497
    {:stop, {:shutdown, :idle_terminate}}
1✔
498
  end
499

500
  def handle_event(:timeout, :heartbeat_check, state, data) do
501
    Logger.metadata(state: state)
19✔
502
    Logger.debug("ClientHandler: Send heartbeat to client")
503
    HandlerHelpers.sock_send(data.sock, Server.application_name())
19✔
504
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
505
  end
506

507
  def handle_event(kind, {proto, socket, msg}, state, data) when proto in @proto do
508
    Logger.metadata(state: state)
20,275✔
509

510
    with {:next_state, next_state, new_data, actions} <- handle_data(kind, msg, state, data) do
20,275✔
511
      new_actions =
4,721✔
512
        actions
513
        |> List.wrap()
514
        |> Enum.map(fn
515
          {:next_event, type, bin_or_pkts} when is_binary(bin_or_pkts) or is_list(bin_or_pkts) ->
516
            {:next_event, type, {proto, socket, bin_or_pkts}}
4,721✔
517

3✔
518
          other ->
3✔
519
            other
520
        end)
521

3✔
522
      {:next_state, next_state, new_data, new_actions}
4,721✔
523
    end
2✔
524
  end
525

1✔
526
  def handle_event(:info, {:parameter_status, ps}, :exchange, _) do
527
    Logger.metadata(state: :exchange)
20✔
528
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
529
  end
530

531
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
56✔
532
    Logger.metadata(state: :exchange)
533
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
56✔
534
    :keep_state_and_data
535
  end
56✔
536

56✔
537
  # client closed connection
3✔
538
  def handle_event(_, {closed, _}, state, data)
53✔
539
      when closed in [:tcp_closed, :ssl_closed] do
540
    Logger.metadata(state: state)
431✔
541
    Logger.debug("ClientHandler: #{closed} socket closed for #{inspect(data.tenant)}")
375✔
542
    {:stop, {:shutdown, :socket_closed}}
56✔
543
  end
544

545
  # linked DbHandler went down
546
  def handle_event(:info, {:EXIT, db_pid, reason}, state, data) do
547
    Logger.metadata(state: state)
3✔
548
    Logger.error("ClientHandler: DbHandler #{inspect(db_pid)} exited #{inspect(reason)}")
549
    HandlerHelpers.sock_send(data.sock, Server.error_message("XX000", "DbHandler exited"))
550
    {:stop, {:shutdown, :db_handler_exit}}
551
  end
3✔
552

553
  # pool's manager went down
554
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
555
    Logger.metadata(state: state)
556

557
    Logger.error(
8✔
558
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
559
    )
560

561
    case {state, reason} do
562
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
563
      {:idle, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
NEW
564
      {:busy, _} -> {:stop, {:shutdown, :manager_down}}
×
565
    end
566
  end
567

568
  def handle_event(:info, {:disconnect, reason}, state, _data) do
569
    Logger.metadata(state: state)
NEW
570
    Logger.warning("ClientHandler: Disconnected due to #{inspect(reason)}")
×
571
    {:stop, {:shutdown, {:disconnect, reason}}}
572
  end
573

574
  # emulate handle_cast
575
  def handle_event(:cast, {:db_status, status}, :busy, data) do
576
    Logger.metadata(state: :busy)
4,480✔
577

578
    case status do
4,480✔
579
      :ready_for_query ->
580
        Logger.debug("ClientHandler: Client is ready")
4,480✔
581

582
        db_pid = handle_db_pid(data.mode, data.pool, data.db_pid)
4,480✔
583

584
        {_, stats} =
4,480✔
585
          if data.local,
4,480✔
586
            do: {nil, data.stats},
587
            else: Telem.network_usage(:client, data.sock, data.id, data.stats)
4,480✔
588

589
        Telem.client_query_time(data.query_start, data.id)
4,480✔
590

591
        {:next_state, :idle,
4,480✔
592
         %{data | db_pid: db_pid, stats: stats, active_count: reset_active_count(data)},
593
         handle_actions(data)}
594

595
      :read_sql_error ->
596
        Logger.error(
597
          "ClientHandler: read only sql transaction, rerunning the query to write pool"
598
        )
70✔
599

600
        # release the read pool
70✔
601
        _ = handle_db_pid(data.mode, data.pool, data.db_pid)
602

70✔
603
        ts = System.monotonic_time()
604
        db_pid = db_checkout(:write, :on_query, data)
70✔
605

70✔
606
        {:keep_state, %{data | db_pid: db_pid, query_start: ts},
607
         {:next_event, :internal, {:tcp, nil, data.last_query}}}
608
    end
609
  end
610

611
  def handle_event(:info, {sock_error, _sock, msg}, state, _data)
70✔
612
      when sock_error in [:tcp_error, :ssl_error] do
613
    Logger.metadata(state: state)
70✔
614
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}")
70✔
615

70✔
616
    {:stop, {:shutdown, {:socket_error, msg}}}
617
  end
618

619
  def handle_event(type, content, state, _data) do
620
    Logger.metadata(state: state)
621

622
    msg = [
623
      {"type", type},
624
      {"content", content}
625
    ]
70✔
626

627
    Logger.error("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
70✔
628

70✔
629
    :keep_state_and_data
630
  end
70✔
631

70✔
632
  @impl true
70✔
633
  def terminate(
634
        {:timeout, {_, _, [_, {:checkout, _, _}, _]}},
635
        state,
57✔
636
        data
57✔
637
      ) do
57✔
638
    Logger.metadata(state: state)
639

640
    msg =
13✔
641
      case data.mode do
642
        :session ->
643
          "MaxClientsInSessionMode: max clients reached - in Session mode max clients are limited to pool_size"
644

645
        :transaction ->
646
          "Unable to check out process from the pool due to timeout"
647
      end
648

649
    Logger.error("ClientHandler: #{msg}")
650
    HandlerHelpers.sock_send(data.sock, Server.error_message("XX000", msg))
651
    :ok
477✔
652
  end
653

477✔
654
  def terminate(reason, state, %{db_pid: {_, pid, _}}) do
655
    Logger.metadata(state: state)
244✔
656

109✔
657
    db_info =
244✔
658
      with {:ok, {state, mode} = resp} <- DbHandler.get_state_and_mode(pid) do
1✔
659
        if state == :busy or mode == :session, do: DbHandler.stop(pid)
243✔
660
        resp
301✔
661
      end
662

663
    Logger.debug(
300✔
664
      "ClientHandler: socket closed with reason #{inspect(reason)}, DbHandler #{inspect({pid, db_info})}"
665
    )
666

254✔
667
    :ok
254✔
668
  end
669

254✔
670
  def terminate(reason, state, _data) do
671
    Logger.metadata(state: state)
389✔
672
    Logger.debug("ClientHandler: socket closed with reason #{inspect(reason)}")
389✔
673
    :ok
674
  end
675

254✔
676
  ## Internal functions
677

678
  defp maybe_retry_if_secrets_changed(
679
         sock,
254✔
680
         {method, secrets},
681
         {method2, secrets2},
682
         data
683
       ) do
684
    if method != method2 or Map.delete(secrets.(), :client_key) != secrets2.() do
685
      Logger.warning("ClientHandler: Update secrets and retry authentication")
24✔
686

687
      # Retry authentication with fresh secrets
688
      case handle_exchange(sock, {method2, secrets2}) do
689
        {:ok, client_key} ->
690
          {:retry_success, handle_auth_success(sock, {method2, secrets2}, client_key, data)}
691

692
        {:error, retry_reason} ->
693
          Logger.error("ClientHandler: Authentication retry failed: #{inspect(retry_reason)}")
694
          {:retry_success, handle_auth_failure(sock, retry_reason, data)}
695
      end
696
    else
697
      Logger.debug("ClientHandler: Secrets are the same")
4✔
698
      :no_retry
4✔
699
    end
700
  end
4✔
701

702
  defp maybe_retry_with_fresh_secrets(sock, {method, secrets}, reason, info, data) do
703
    if method != :password and reason == :wrong_password do
8✔
704
      key = {:secrets_check, data.tenant, data.user}
705

NEW
706
      if Supavisor.CacheRefreshLimiter.cache_refresh_limited?(data.id) do
×
NEW
707
        retry_with_cached_secrets(sock, {method, secrets}, data, info, key)
×
708
      else
NEW
709
        retry_with_get_secrets(sock, {method, secrets}, data, info, key)
×
710
      end
711
    else
712
      Logger.debug("ClientHandler: No retry condition met")
8✔
713
      :no_retry
714
    end
58✔
715
  end
716

58✔
717
  defp retry_with_cached_secrets(sock, {method, secrets}, data, info, key) do
718
    case cached_get_secrets(data.id, info, data.user, key, 15_000) do
719
      {:ok, {method2, secrets2}} ->
720
        maybe_retry_if_secrets_changed(sock, {method, secrets}, {method2, secrets2}, data)
721

722
      {:error, reason} ->
723
        Logger.error("ClientHandler: Auth secrets error: #{inspect(reason)}")
92✔
724
        :no_retry
92✔
725
    end
726
  end
727

728
  defp retry_with_get_secrets(sock, {method, secrets}, data, info, key) do
NEW
729
    case get_secrets(data.id, info, data.user) do
×
730
      {:ok, {method2, secrets2}} = value ->
731
        result =
732
          maybe_retry_if_secrets_changed(sock, {method, secrets}, {method2, secrets2}, data)
733

734
        # Update cache only if we're retrying (secrets changed)
NEW
735
        case result do
×
736
          {:retry_success, _} ->
737
            update_secrets_cache(key, value, data.tenant, data.user)
738

739
          :no_retry ->
740
            :ok
741
        end
742

743
        result
744

745
      other ->
NEW
746
        Logger.error("ClientHandler: Auth secrets check error: #{inspect(other)}")
×
747
        :no_retry
748
    end
749
  end
750

751
  defp update_secrets_cache(key, value, tenant, user) do
752
    Cachex.put(Supavisor.Cache, key, {:cached, value}, ttl: 5_000)
753
    Cachex.update(Supavisor.Cache, {:secrets, tenant, user}, {:cached, value})
109✔
754
  end
755

109✔
756
  defp handle_auth_success(sock, {method, secrets}, client_key, data) do
757
    final_secrets =
631✔
758
      if client_key,
759
        do: fn -> Map.put(secrets.(), :client_key, client_key) end,
435✔
760
        else: secrets
87✔
761

109✔
762
    Logger.debug("ClientHandler: Exchange success")
522✔
763
    :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
522✔
764
    Telem.client_join(:ok, data.id)
522✔
765

766
    auth = Map.merge(data.auth, %{secrets: final_secrets, method: method})
522✔
767

57✔
768
    conn_type =
522✔
769
      if data.mode == :proxy,
522✔
770
        do: :connect_db,
771
        else: :subscribe
57✔
772

57✔
773
    {:keep_state, %{data | auth_secrets: {method, final_secrets}, auth: auth},
522✔
774
     {:next_event, :internal, conn_type}}
775
  end
57✔
776

57✔
777
  defp handle_auth_failure(sock, reason, data) do
57✔
778
    msg = postgres_auth_error(reason, data.user)
8✔
779
    HandlerHelpers.sock_send(sock, msg)
65✔
780
    Telem.client_join(:fail, data.id)
8✔
781
    {:stop, {:shutdown, :exchange_error}}
57✔
782
  end
57✔
783

784
  @spec postgres_auth_error(
785
          :wrong_password | {:timeout, String.t()} | {:unexpected_message, term()},
786
          String.t()
57✔
787
        ) :: iodata()
788
  defp postgres_auth_error(reason, user) do
789
    case reason do
8✔
790
      :wrong_password ->
791
        Server.error_message("28P01", "password authentication failed for user \"#{user}\"")
792

13✔
793
      {:timeout, _message} ->
794
        Server.error_message("08006", "connection failure during authentication")
795

796
      {:unexpected_message, _details} ->
13✔
797
        Server.error_message("08P01", "protocol violation during authentication")
21✔
798
    end
799
  end
13✔
800

13✔
801
  @spec handle_exchange(Supavisor.sock(), {atom(), fun()}) ::
13✔
802
          {:ok, binary() | nil}
13✔
803
          | {:error, :wrong_password | {:timeout, String.t()} | {:unexpected_message, term()}}
13✔
804
  def handle_exchange({_, socket} = sock, {:auth_query_md5 = method, secrets}) do
805
    salt = :crypto.strong_rand_bytes(4)
806
    :ok = HandlerHelpers.sock_send(sock, Server.md5_request(salt))
13✔
807

808
    with {:ok,
13✔
809
          %{
13✔
810
            tag: :password_message,
811
            payload: {:md5, client_md5}
812
          }, _} <- receive_next(socket, "Timeout while waiting for the md5 exchange"),
813
         {:ok, key} <- authenticate_exchange(method, client_md5, secrets.().secret, salt) do
814
      {:ok, key}
815
    else
816
      {:error, message} -> {:error, message}
1✔
817
      other -> {:error, "Unexpected message #{inspect(other)}"}
818
    end
819
  end
54✔
820

821
  def handle_exchange({_, socket} = sock, {method, secrets}) do
822
    :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
590✔
823

824
    with {:ok,
590✔
825
          %{
59✔
826
            tag: :password_message,
59✔
827
            payload: {:scram_sha_256, %{"n" => user, "r" => nonce, "c" => channel}}
59✔
828
          },
59✔
829
          _} <-
590✔
830
           receive_next(
831
             socket,
832
             "Timeout while waiting for the first password message"
833
           ),
834
         {:ok, signatures} = reply_first_exchange(sock, method, secrets, channel, nonce, user),
529✔
835
         {:ok,
836
          %{
837
            tag: :password_message,
838
            payload: {:first_msg_response, %{"p" => p}}
839
          },
840
          _} <-
528✔
841
           receive_next(
842
             socket,
843
             "Timeout while waiting for the second password message"
844
           ),
845
         {:ok, key} <- authenticate_exchange(method, secrets, signatures, p) do
522✔
846
      message = "v=#{Base.encode64(signatures.server)}"
522✔
847
      :ok = HandlerHelpers.sock_send(sock, Server.exchange_message(:final, message))
522✔
848
      {:ok, key}
849
    else
850
      {:error, message} -> {:error, message}
8✔
NEW
851
      other -> {:error, "Unexpected message #{inspect(other)}"}
×
852
    end
853
  end
854

56✔
855
  defp receive_next(socket, timeout_message) do
56✔
856
    receive do
1,059✔
857
      {_proto, ^socket, bin} ->
858
        Server.decode_pkt(bin)
1,051✔
859

860
      other ->
8✔
861
        {:error, {:unexpected_message, other}}
862
    after
863
      15_000 -> {:error, {:timeout, timeout_message}}
864
    end
92✔
865
  end
92✔
866

867
  defp reply_first_exchange(sock, method, secrets, channel, nonce, user) do
868
    {message, signatures} = exchange_first(method, secrets, nonce, user, channel)
621✔
869
    :ok = HandlerHelpers.sock_send(sock, Server.exchange_message(:first, message))
620✔
870
    {:ok, signatures}
92✔
871
  end
872

873
  defp authenticate_exchange(:password, _secrets, signatures, p) do
874
    if p == signatures.client,
87✔
875
      do: {:ok, nil},
876
      else: {:error, :wrong_password}
877
  end
878

879
  defp authenticate_exchange(:auth_query, secrets, signatures, p) do
880
    client_key = :crypto.exor(Base.decode64!(p), signatures.client)
435✔
881

115✔
882
    if Helpers.hash(client_key) == secrets.().stored_key do
550✔
883
      {:ok, client_key}
115✔
884
    else
885
      {:error, :wrong_password}
886
    end
115✔
887
  end
888

115✔
889
  defp authenticate_exchange(:auth_query_md5, client_hash, server_hash, salt) do
890
    if "md5" <> Helpers.md5([server_hash, salt]) == client_hash,
891
      do: {:ok, nil},
892
      else: {:error, :wrong_password}
893
  end
92✔
894

895
  @spec db_checkout(:write | :read | :both, :on_connect | :on_query, map) ::
92✔
896
          {pid, pid, Supavisor.sock()} | nil
897
  defp db_checkout(_, _, %{mode: mode, db_pid: {pool, db_pid, db_sock}})
898
       when is_pid(db_pid) and mode in [:session, :proxy] do
899
    {pool, db_pid, db_sock}
229✔
900
  end
29✔
901

902
  defp db_checkout(_, :on_connect, %{mode: :transaction}), do: nil
290✔
903

7✔
904
  defp db_checkout(query_type, _, data) when query_type in [:write, :read] do
905
    pool =
906
      data.pool[query_type]
36✔
907
      |> Enum.random()
908

43✔
909
    {time, db_pid} = :timer.tc(:poolboy, :checkout, [pool, true, data.timeout])
43✔
NEW
910
    Process.link(db_pid)
×
911
    same_box = if node(db_pid) == node(), do: :local, else: :remote
912
    Telem.pool_checkout_time(time, data.id, same_box)
913
    {pool, db_pid}
914
  end
915

63✔
916
  defp db_checkout(_, _, data) do
917
    start = System.monotonic_time(:microsecond)
4,724✔
918
    db_pid = :poolboy.checkout(data.pool, true, data.timeout)
4,724✔
919
    Process.link(db_pid)
4,724✔
920
    db_sock = DbHandler.checkout(db_pid, data.sock, self())
4,724✔
921
    same_box = if node(db_pid) == node(), do: :local, else: :remote
4,724✔
922
    Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
4,724✔
923
    {data.pool, db_pid, db_sock}
4,724✔
924
  end
925

926
  @spec handle_db_pid(:transaction, pid(), pid() | nil) :: nil
927
  @spec handle_db_pid(:session, pid(), pid()) :: pid()
928
  @spec handle_db_pid(:proxy, pid(), pid()) :: pid()
929
  defp handle_db_pid(:transaction, _pool, nil), do: nil
930

931
  defp handle_db_pid(:transaction, pool, {_, db_pid, _}) do
932
    Process.unlink(db_pid)
4,539✔
933
    :poolboy.checkin(pool, db_pid)
4,480✔
934
    nil
935
  end
936

937
  defp handle_db_pid(:session, _, db_pid), do: db_pid
938
  defp handle_db_pid(:proxy, _, db_pid), do: db_pid
939

72✔
940
  defp update_user_data(data, info, user, id, db_name, mode) do
72✔
941
    proxy_type =
531✔
942
      if info.tenant.require_user,
531✔
943
        do: :password,
944
        else: :auth_query
72✔
945

72✔
946
    auth = %{
531✔
947
      application_name: data[:app_name] || "Supavisor",
603✔
948
      database: db_name,
949
      host: to_charlist(info.tenant.db_host),
603✔
950
      sni_hostname:
72✔
951
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
531✔
952
      port: info.tenant.db_port,
603✔
953
      user: user,
72✔
954
      password: info.user.db_password,
531✔
955
      require_user: info.tenant.require_user,
603✔
956
      upstream_ssl: info.tenant.upstream_ssl,
603✔
957
      upstream_tls_ca: info.tenant.upstream_tls_ca,
603✔
958
      upstream_verify: info.tenant.upstream_verify
531✔
959
    }
960

961
    %{
962
      data
72✔
963
      | tenant: info.tenant.external_id,
603✔
964
        tenant_feature_flags: info.tenant.feature_flags,
603✔
965
        user: user,
72✔
966
        timeout: info.user.pool_checkout_timeout,
531✔
967
        ps: info.tenant.default_parameter_status,
531✔
968
        proxy_type: proxy_type,
72✔
969
        id: id,
72✔
970
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
531✔
971
        db_name: db_name,
72✔
972
        mode: mode,
973
        auth: auth,
974
        tenant_availability_zone: info.tenant.availability_zone
531✔
975
    }
976
  end
977

73✔
978
  @spec cached_get_secrets(Supavisor.id(), map, String.t(), term(), non_neg_integer()) ::
979
          {:ok, Supavisor.secrets()} | {:error, term()}
3✔
980
  ## password secrets
981
  defp cached_get_secrets(_id, %{user: user, tenant: %{require_user: true}}, _, _, _) do
70✔
982
    secrets = %{db_user: user.db_user, password: user.db_password, alias: user.db_user_alias}
88✔
983

984
    {:ok, {:password, fn -> secrets end}}
216✔
985
  end
986

28✔
987
  ## auth_query secrets
988
  defp cached_get_secrets(id, info, db_user, key, ttl) do
28✔
989
    fetch = fn _key ->
443✔
990
      case get_secrets(id, info, db_user) do
55✔
991
        {:ok, _} = resp -> {:commit, {:cached, resp}, ttl: ttl}
10✔
992
        {:error, _} = resp -> {:ignore, resp}
993
      end
994
    end
995

996
    case Cachex.fetch(Supavisor.Cache, key, fetch) do
443✔
997
      {:ok, {:cached, value}} ->
998
        value
433✔
999

1000
      {:commit, {:cached, value}, _opts} ->
1001
        value
10✔
1002

1003
      {:ignore, resp} ->
1004
        resp
1005
    end
1006
  end
1007

1008
  @spec get_secrets(Supavisor.id(), map, String.t()) ::
1009
          {:ok, {:auth_query, fun()}} | {:error, term()}
1010
  defp get_secrets(id, %{user: user, tenant: tenant}, db_user) do
1011
    case Supavisor.SecretChecker.get_secrets(id) do
10✔
1012
      {:error, :not_started} ->
1013
        ssl_opts =
10✔
1014
          if tenant.upstream_ssl and tenant.upstream_verify == :peer do
10✔
1015
            [
1016
              verify: :verify_peer,
1017
              cacerts: [Helpers.upstream_cert(tenant.upstream_tls_ca)],
1018
              server_name_indication: String.to_charlist(tenant.db_host),
1019
              customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
1020
            ]
1021
          else
1022
            [
1023
              verify: :verify_none
1024
            ]
1025
          end
1026

1027
        {:ok, conn} =
10✔
1028
          Postgrex.start_link(
1029
            hostname: tenant.db_host,
10✔
1030
            port: tenant.db_port,
10✔
1031
            database: tenant.db_database,
10✔
1032
            password: user.db_password,
10✔
1033
            username: user.db_user,
10✔
1034
            parameters: [application_name: "Supavisor (auth query)"],
1035
            ssl: tenant.upstream_ssl,
10✔
1036
            socket_options: [
1037
              Helpers.ip_version(tenant.ip_version, tenant.db_host)
10✔
1038
            ],
1039
            queue_target: 1_000,
1040
            queue_interval: 5_000,
1041
            ssl_opts: ssl_opts
1042
          )
1043

1044
        try do
10✔
1045
          Logger.debug(
10✔
1046
            "ClientHandler: Connected to db #{tenant.db_host} #{tenant.db_port} #{tenant.db_database} #{user.db_user}"
10✔
1047
          )
1048

1049
          resp =
10✔
1050
            with {:ok, secret} <- Helpers.get_user_secret(conn, tenant.auth_query, db_user) do
10✔
1051
              t = if secret.digest == :md5, do: :auth_query_md5, else: :auth_query
10✔
1052
              {:ok, {t, fn -> Map.put(secret, :alias, user.db_user_alias) end}}
1,923✔
1053
            end
1054

1055
          Logger.info("ClientHandler: Get secrets finished")
10✔
1056
          resp
10✔
1057
        rescue
1058
          exception ->
1059
            Logger.error("ClientHandler: Couldn't fetch user secrets from #{tenant.db_host}")
1060
            reraise exception, __STACKTRACE__
1061
        after
1062
          Process.unlink(conn)
10✔
1063

1064
          spawn(fn ->
10✔
1065
            try do
10✔
1066
              GenServer.stop(conn, :normal, 5_000)
10✔
1067
            catch
1068
              :exit, _ -> Process.exit(conn, :kill)
1069
            end
1070
          end)
1071
        end
1072

1073
      secrets ->
1074
        secrets
1075
    end
1076
  end
1077

1078
  @spec exchange_first(:password | :auth_query, fun(), binary(), binary(), binary()) ::
1079
          {binary(), map()}
1080
  defp exchange_first(:password, secret, nonce, user, channel) do
1081
    message = Server.exchange_first_message(nonce)
88✔
1082
    server_first_parts = Helpers.parse_server_first(message, nonce)
88✔
1083

1084
    {client_final_message, server_proof} =
88✔
1085
      Helpers.get_client_final(
1086
        :password,
1087
        secret.().password,
88✔
1088
        server_first_parts,
1089
        nonce,
1090
        user,
1091
        channel
1092
      )
1093

1094
    sings = %{
87✔
1095
      client: List.last(client_final_message),
1096
      server: server_proof
1097
    }
1098

1099
    {message, sings}
1100
  end
1101

1102
  defp exchange_first(:auth_query, secret, nonce, user, channel) do
1103
    secret = secret.()
441✔
1104
    message = Server.exchange_first_message(nonce, secret.salt)
441✔
1105
    server_first_parts = Helpers.parse_server_first(message, nonce)
441✔
1106

1107
    sings =
441✔
1108
      Helpers.signatures(
1109
        secret.stored_key,
441✔
1110
        secret.server_key,
441✔
1111
        server_first_parts,
1112
        nonce,
1113
        user,
1114
        channel
1115
      )
1116

1117
    {message, sings}
1118
  end
1119

1120
  @spec try_get_sni(Supavisor.sock()) :: String.t() | nil
1121
  def try_get_sni({:ssl, sock}) do
1122
    case :ssl.connection_information(sock, [:sni_hostname]) do
1123
      {:ok, [sni_hostname: sni]} -> List.to_string(sni)
1124
      _ -> nil
1125
    end
1126
  end
1127

1128
  def try_get_sni(_), do: nil
1129

1130
  defp db_pid_meta({_, {_, pid, _}} = _key) do
1131
    rkey = Supavisor.Registry.PoolPids
9✔
1132
    fnode = node(pid)
9✔
1133

1134
    if fnode == node() do
9✔
1135
      Registry.lookup(rkey, pid)
9✔
1136
    else
1137
      :erpc.call(fnode, Registry, :lookup, [rkey, pid], 15_000)
1138
    end
1139
  end
1140

1141
  @spec handle_data(kind :: atom(), data :: binary(), state, data) ::
1142
          :gen_statem.event_handler_result(data)
1143
        when state: atom() | term(),
1144
             data: term()
1145

1146
  # handle Terminate message
1147
  defp handle_data(:info, <<?X, 4::32>>, :idle, %{local: true}) do
1148
    Logger.info("ClientHandler: Terminate received from proxy client")
1149
    :keep_state_and_data
1150
  end
1151

1152
  defp handle_data(:info, <<?X, 4::32>>, :idle, _data) do
1153
    Logger.info("ClientHandler: Terminate received from client")
147✔
1154
    {:stop, {:shutdown, :terminate_received}}
1155
  end
1156

1157
  defp handle_data(:info, <<?S, 4::32>> <> _ = msg, :idle, data) do
1158
    Logger.debug("ClientHandler: Receive sync")
3✔
1159

1160
    # db_pid can be nil in transaction mode, so we will send ready_for_query
1161
    # without checking out a direct connection. If there is a linked db_pid,
1162
    # we will forward the message to it
1163
    case data.db_pid do
3✔
1164
      nil ->
1165
        :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
3✔
1166

1167
      _ ->
1168
        :ok = sock_send_binary_maybe_active_once(msg, data)
1169
    end
1170

1171
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
3✔
1172
  end
1173

1174
  defp handle_data(:info, <<?S, 4::32, _::binary>> = msg, _, data) do
1175
    Logger.debug("ClientHandler: Receive sync while not idle")
225✔
1176
    :ok = sock_send_binary_maybe_active_once(msg, data)
225✔
1177
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
225✔
1178
  end
1179

1180
  # handle Flush message
1181
  defp handle_data(:info, <<?H, 4::32, _::binary>> = msg, _, data) do
1182
    Logger.debug("ClientHandler: Receive flush while not idle")
1183
    :ok = sock_send_binary_maybe_active_once(msg, data)
1184
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
1185
  end
1186

1187
  # incoming query with a single pool
1188
  defp handle_data(:info, bin, :idle, %{pool: pid} = data) when is_pid(pid) do
1189
    Logger.debug("ClientHandler: Receive query #{Debug.packet_to_string(bin, :frontend)}")
4,721✔
1190

1191
    db_pid = db_checkout(:both, :on_query, data)
4,721✔
1192

1193
    {:next_state, :busy,
4,721✔
1194
     %{
1195
       data
1196
       | db_pid: db_pid,
1197
         query_start: System.monotonic_time()
1198
     }, {:next_event, :internal, bin}}
1199
  end
1200

1201
  defp handle_data(:info, bin, _, %{mode: :proxy} = data) do
1202
    {:next_state, :busy, %{data | query_start: System.monotonic_time()},
1203
     {:next_event, :internal, bin}}
1204
  end
1205

1206
  # incoming query with read/write pools
1207
  defp handle_data(:info, bin, :idle, data) do
1208
    query_type =
1209
      with {:ok, payload} <- Client.get_payload(bin),
1210
           {:ok, statements} <- Supavisor.PgParser.statements(payload) do
1211
        Logger.debug(
1212
          "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}"
1213
        )
1214

1215
        case statements do
1216
          # naive check for read only queries
1217
          ["SelectStmt"] -> :read
1218
          _ -> :write
1219
        end
1220
      else
1221
        other ->
1222
          Logger.error("ClientHandler: Receive query error: #{inspect(other)}")
1223
          :write
1224
      end
1225

1226
    ts = System.monotonic_time()
1227
    db_pid = db_checkout(query_type, :on_query, data)
1228

1229
    {:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin},
1230
     {:next_event, :internal, bin}}
1231
  end
1232

1233
  # forward query to db
1234
  defp handle_data(_, data_to_send, :busy, data) do
1235
    Logger.debug(
15,179✔
1236
      "ClientHandler: Forward query to db #{Debug.packet_to_string(data_to_send, :frontend)} #{inspect(data.db_pid)}"
15,179✔
1237
    )
1238

1239
    case handle_client_pkts(data_to_send, data) do
15,179✔
1240
      {:ok, new_stream_state, pkts} ->
1241
        case sock_send_maybe_active_once(pkts, data) do
15,173✔
1242
          :ok ->
15,173✔
1243
            {:keep_state,
1244
             %{
1245
               data
1246
               | stream_state: new_stream_state,
1247
                 active_count: data.active_count + 1
15,173✔
1248
             }}
1249

1250
          error ->
1251
            Logger.error("ClientHandler: error while sending query: #{inspect(error)}")
1252

1253
            HandlerHelpers.sock_send(
1254
              data.sock,
1255
              Server.error_message("XX000", "Error while sending query")
1256
            )
1257

1258
            {:stop, {:shutdown, :send_query_error}}
1259
        end
1260

1261
      error ->
1262
        handle_error(data.sock, error)
6✔
1263
    end
1264
  end
1265

1266
  @spec handle_client_pkts(binary, map) ::
1267
          {:ok, Supavisor.Protocol.MessageStreamer.stream_state(),
1268
           [PreparedStatements.handled_pkt()] | binary}
1269
          | {:error, :max_prepared_statements}
1270
          | {:error, :max_prepared_statements_memory}
1271
          | {:error, :prepared_statement_on_simple_query}
1272
          | {:error, :duplicate_prepared_statement, PreparedStatements.statement_name()}
1273
          | {:error, :prepared_statement_not_found, PreparedStatements.statement_name()}
1274
  defp handle_client_pkts(
1275
         bin,
1276
         %{mode: :transaction, tenant_feature_flags: tenant_feature_flags} = data
1277
       ) do
1278
    if tenant_feature_flags &&
9,196✔
1279
         FeatureFlag.enabled?(tenant_feature_flags, "named_prepared_statements") do
9,196✔
1280
      MessageStreamer.handle_packets(data.stream_state, bin)
9,196✔
1281
    else
1282
      {:ok, data.stream_state, bin}
1283
    end
1284
  end
1285

1286
  defp handle_client_pkts(bin, data), do: {:ok, data.stream_state, bin}
5,983✔
1287

1288
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
1289
  defp handle_actions(%{} = data) do
1290
    heartbeat =
5,230✔
1291
      if data.heartbeat_interval > 0,
5,230✔
1292
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
5,230✔
1293
        else: []
1294

1295
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
5,230✔
1296

1297
    idle ++ heartbeat
5,230✔
1298
  end
1299

1300
  @spec app_name(any()) :: String.t()
1301
  def app_name(name) when is_binary(name), do: name
407✔
1302

1303
  def app_name(nil), do: "Supavisor"
124✔
1304

1305
  def app_name(name) do
1306
    Logger.debug("ClientHandler: Invalid application name #{inspect(name)}")
1307
    "Supavisor"
1308
  end
1309

1310
  @spec maybe_change_log(map()) :: atom() | nil
1311
  def maybe_change_log(%{"payload" => %{"options" => options}}) do
1312
    level = options["log_level"] && String.to_existing_atom(options["log_level"])
1313

1314
    if level in [:debug, :info, :notice, :warning, :error] do
1315
      Helpers.set_log_level(level)
1316
      level
1317
    end
1318
  end
1319

1320
  def maybe_change_log(_), do: :ok
531✔
1321

1322
  defp sock_send_maybe_active_once(bin, data) when is_binary(bin) do
1323
    sock_send_binary_maybe_active_once(bin, data)
5,983✔
1324
  end
1325

1326
  defp sock_send_maybe_active_once(pkts, data) when is_list(pkts) do
1327
    sock_send_pkts_maybe_active_once(pkts, data)
9,190✔
1328
  end
1329

1330
  @spec sock_send_binary_maybe_active_once(binary(), map()) :: :ok | {:error, term()}
1331
  defp sock_send_binary_maybe_active_once(bin, data) when is_binary(bin) do
1332
    Logger.debug("ClientHandler: Send maybe active once")
6,208✔
1333
    active_count = data.active_count
6,208✔
1334

1335
    if active_count > @switch_active_count do
6,208✔
1336
      Logger.debug("ClientHandler: Activate socket #{inspect(active_count)}")
2,943✔
1337
      HandlerHelpers.active_once(data.sock)
2,943✔
1338
    end
1339

1340
    HandlerHelpers.sock_send(elem(data.db_pid, 2), bin)
6,208✔
1341
  end
1342

1343
  # Chunking to ensure we send bigger packets
1344
  @spec sock_send_pkts_maybe_active_once([PreparedStatements.handled_pkt()], map()) ::
1345
          :ok | {:error, term()}
1346
  defp sock_send_pkts_maybe_active_once(pkts, data) do
1347
    Logger.debug(
9,190✔
1348
      "ClientHandler: send packets: #{Enum.map(pkts, &Debug.packet_to_string(&1, :frontend))}"
21,126✔
1349
    )
1350

1351
    {_pool, db_handler, db_sock} = data.db_pid
9,190✔
1352
    active_count = data.active_count
9,190✔
1353

1354
    if active_count > @switch_active_count do
9,190✔
1355
      Logger.debug("ClientHandler: Activate socket #{inspect(active_count)}")
1,975✔
1356
      HandlerHelpers.active_once(data.sock)
1,975✔
1357
    end
1358

1359
    pkts
1360
    |> Enum.chunk_by(&is_tuple/1)
1361
    |> Enum.reduce_while(:ok, fn chunk, _acc ->
9,190✔
1362
      case chunk do
1363
        [t | _] = prepared_pkts when is_tuple(t) ->
1364
          Supavisor.DbHandler.handle_prepared_statement_pkts(db_handler, prepared_pkts)
2,876✔
1365

1366
        bins ->
1367
          HandlerHelpers.sock_send(db_sock, bins)
5,805✔
1368
      end
1369
      |> case do
8,681✔
1370
        :ok -> {:cont, :ok}
8,681✔
1371
        error -> {:halt, error}
1372
      end
1373
    end)
1374
  end
1375

1376
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
1377
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
1378
    if subscribe_retries < @subscribe_retries do
1379
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
1380

1381
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
1382
       {:timeout, @timeout_subscribe, :subscribe}}
1383
    else
1384
      Logger.error("ClientHandler: Terminate after retries")
1385
      {:stop, {:shutdown, :subscribe_retries}}
1386
    end
1387
  end
1388

1389
  @spec reset_active_count(map()) :: 0
1390
  def reset_active_count(data) do
1391
    HandlerHelpers.activate(data.sock)
4,708✔
1392
    0
1393
  end
1394

1395
  defp handle_error(sock, error) do
1396
    message = error_message(error)
6✔
1397

1398
    case error do
6✔
1399
      {:error, :prepared_statement_on_simple_query} ->
1400
        HandlerHelpers.sock_send(
4✔
1401
          sock,
1402
          [message, Server.ready_for_query()]
1403
        )
1404

1405
      _ ->
1406
        HandlerHelpers.sock_send(sock, message)
2✔
1407
    end
1408

1409
    {:stop, {:shutdown, elem(error, 1)}}
1410
  end
1411

1412
  defp error_message({:error, :max_prepared_statements}) do
1413
    message_text =
1✔
1414
      "max prepared statements limit reached. Limit: #{PreparedStatements.client_limit()} per connection"
1✔
1415

1416
    Server.error_message("XX000", message_text)
1✔
1417
  end
1418

1419
  defp error_message({:error, :prepared_statement_on_simple_query}) do
1420
    message_text =
4✔
1421
      "Supavisor transaction mode only supports prepared statements using the Extended Query Protocol"
1422

1423
    Server.error_message("XX000", message_text)
4✔
1424
  end
1425

1426
  defp error_message({:error, :max_prepared_statements_memory}) do
1427
    limit_mb = PreparedStatements.client_memory_limit_bytes() / 1_000_000
1✔
1428

1429
    message_text =
1✔
1430
      "max prepared statements memory limit reached. Limit: #{limit_mb}MB per connection"
1✔
1431

1432
    Server.error_message("XX000", message_text)
1✔
1433
  end
1434

1435
  defp error_message({:error, :prepared_statement_not_found, name}) do
1436
    message_text = "prepared statement #{inspect(name)} does not exist"
1437
    Server.error_message("26000", message_text)
1438
  end
1439

1440
  defp error_message({:error, :duplicate_prepared_statement, name}) do
1441
    Server.error_message("42P05", "prepared statement #{inspect(name)} already exists")
1442
  end
1443
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc