• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 16470769502

23 Jul 2025 12:34PM UTC coverage: 57.067% (+1.7%) from 55.355%
16470769502

Pull #694

github

web-flow
Merge 78a9c0b2c into deaa48192
Pull Request #694: feat: improved named prepared statements support

175 of 217 new or added lines in 11 files covered. (80.65%)

16 existing lines in 4 files now uncovered.

1292 of 2264 relevant lines covered (57.07%)

1126.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.68
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server. It is
4
  implemented as a Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
5
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
6
  supervisor. Each client connection is assigned to a specific tenant supervisor.
7
  """
8

9
  require Logger
10

11
  @behaviour :ranch_protocol
12
  @behaviour :gen_statem
13
  @proto [:tcp, :ssl]
14
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
15
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
16
  @timeout_subscribe 500
17
  @clients_registry Supavisor.Registry.TenantClients
18
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
19

20
  alias Supavisor.{
21
    DbHandler,
22
    HandlerHelpers,
23
    Helpers,
24
    Monitoring.Telem,
25
    Protocol,
26
    Protocol.Client,
27
    Protocol.PreparedStatements,
28
    Tenants
29
  }
30

31
  require Supavisor.Protocol.Server, as: Server
32

33
  @impl true
34
  def start_link(ref, transport, opts) do
35
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
375✔
36
    {:ok, pid}
37
  end
38

39
  @impl true
40
  def callback_mode, do: [:handle_event_function]
375✔
41

42
  @spec db_status(pid(), :ready_for_query | :read_sql_error) :: :ok
43
  def db_status(pid, status), do: :gen_statem.cast(pid, {:db_status, status})
3,425✔
44

45
  @impl true
46
  def init(_), do: :ignore
×
47

48
  def init(ref, trans, opts) do
49
    Process.flag(:trap_exit, true)
375✔
50
    Helpers.set_max_heap_size(90)
375✔
51

52
    {:ok, sock} = :ranch.handshake(ref)
375✔
53
    peer_ip = Helpers.peer_ip(sock)
375✔
54
    local = opts[:local] || false
375✔
55

56
    Logger.metadata(
375✔
57
      peer_ip: peer_ip,
58
      local: local,
59
      state: :init
60
    )
61

62
    :ok =
375✔
63
      trans.setopts(sock,
64
        # mode: :binary,
65
        # packet: :raw,
66
        # recbuf: 8192,
67
        # sndbuf: 8192,
68
        # # backlog: 2048,
69
        # send_timeout: 120,
70
        # keepalive: true,
71
        # nodelay: true,
72
        # nopush: true,
73
        active: true
74
      )
75

76
    Logger.debug("ClientHandler is: #{inspect(self())}")
375✔
77

78
    data = %{
375✔
79
      id: nil,
80
      sock: {:gen_tcp, sock},
81
      trans: trans,
82
      db_pid: nil,
83
      tenant: nil,
84
      user: nil,
85
      pool: nil,
86
      manager: nil,
87
      query_start: nil,
88
      timeout: nil,
89
      ps: nil,
90
      ssl: false,
91
      auth_secrets: nil,
92
      proxy_type: nil,
93
      mode: opts.mode,
375✔
94
      stats: %{},
95
      idle_timeout: 0,
96
      db_name: nil,
97
      last_query: nil,
98
      heartbeat_interval: 0,
99
      connection_start: System.monotonic_time(),
100
      log_level: nil,
101
      auth: %{},
102
      tenant_availability_zone: nil,
103
      local: local,
104
      active_count: 0,
105
      peer_ip: peer_ip,
106
      app_name: nil,
107
      subscribe_retries: 0,
108
      prepared_statements: PreparedStatements.Storage.new(),
109
      pending: ""
110
    }
111

112
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data)
375✔
113
  end
114

115
  @impl true
116
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, data) do
117
    Logger.metadata(state: :exchange)
1✔
118
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
119

120
    HandlerHelpers.sock_send(
1✔
121
      data.sock,
1✔
122
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
123
    )
124

125
    {:stop, {:shutdown, :http_request}}
126
  end
127

128
  # cancel request
129
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, state, _) do
130
    Logger.metadata(state: state)
89✔
131
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
89✔
132
    :ok = HandlerHelpers.send_cancel_query(pid, key)
89✔
133
    {:stop, {:shutdown, :cancel_query}}
134
  end
135

136
  # send cancel request to db
137
  def handle_event(:info, :cancel_query, :busy, data) do
138
    Logger.metadata(state: :busy)
3✔
139
    key = {data.tenant, data.db_pid}
3✔
140
    Logger.debug("ClientHandler: Cancel query for #{inspect(key)}")
3✔
141
    {_pool, db_pid, _db_sock} = data.db_pid
3✔
142

143
    case db_pid_meta(key) do
3✔
144
      [{^db_pid, meta}] ->
145
        :ok = HandlerHelpers.cancel_query(meta.host, meta.port, meta.ip_ver, meta.pid, meta.key)
3✔
146

147
      error ->
148
        Logger.error(
×
149
          "ClientHandler: Received cancel but no proc was found #{inspect(key)} #{inspect(error)}"
150
        )
151
    end
152

153
    :keep_state_and_data
154
  end
155

156
  def handle_event(:info, :cancel_query, :idle, _data) do
84✔
157
    :keep_state_and_data
158
  end
159

160
  def handle_event(
161
        :info,
162
        {:tcp, _, Server.ssl_request_message()},
163
        :exchange,
164
        %{sock: sock} = data
165
      ) do
166
    Logger.metadata(state: :exchange)
×
167
    Logger.debug("ClientHandler: Client is trying to connect with SSL")
×
168

169
    downstream_cert = Helpers.downstream_cert()
×
170
    downstream_key = Helpers.downstream_key()
×
171

172
    # SSL negotiation, S/N/Error
173
    if !!downstream_cert and !!downstream_key do
×
174
      :ok = HandlerHelpers.setopts(sock, active: false)
×
175
      :ok = HandlerHelpers.sock_send(sock, "S")
×
176

177
      opts = [
×
178
        verify: :verify_none,
179
        certfile: downstream_cert,
180
        keyfile: downstream_key
181
      ]
182

183
      case :ssl.handshake(elem(sock, 1), opts) do
×
184
        {:ok, ssl_sock} ->
185
          socket = {:ssl, ssl_sock}
×
186
          :ok = HandlerHelpers.setopts(socket, active: true)
×
187
          {:keep_state, %{data | sock: socket, ssl: true}}
188

189
        error ->
190
          Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}")
×
191
          Telem.client_join(:fail, data.id)
×
192
          {:stop, {:shutdown, :ssl_handshake_error}}
193
      end
194
    else
195
      Logger.error(
×
196
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
197
      )
198

199
      :ok = HandlerHelpers.sock_send(data.sock, "N")
×
200
      :keep_state_and_data
201
    end
202
  end
203

204
  def handle_event(:info, {_, _, bin}, :exchange, _) when byte_size(bin) > 1024 do
205
    Logger.metadata(state: :exchange)
×
206
    Logger.error("ClientHandler: Startup packet too large #{byte_size(bin)}")
×
207
    {:stop, {:shutdown, :startup_packet_too_large}}
208
  end
209

210
  def handle_event(:info, {_, _, bin}, :exchange, data) do
211
    Logger.metadata(state: :exchange)
283✔
212

213
    case Server.decode_startup_packet(bin) do
283✔
214
      {:ok, hello} ->
215
        Logger.debug("ClientHandler: Client startup message: #{inspect(hello)}")
281✔
216
        {type, {user, tenant_or_alias, db_name}} = HandlerHelpers.parse_user_info(hello.payload)
281✔
217

218
        if Helpers.validate_name(user) and Helpers.validate_name(db_name) do
281✔
219
          log_level = maybe_change_log(hello)
280✔
220
          search_path = hello.payload["options"]["--search_path"]
280✔
221
          event = {:hello, {type, {user, tenant_or_alias, db_name, search_path}}}
280✔
222
          app_name = app_name(hello.payload["application_name"])
280✔
223

224
          {:keep_state, %{data | log_level: log_level, app_name: app_name},
280✔
225
           {:next_event, :internal, event}}
226
        else
227
          reason = "Invalid format for user or db_name"
1✔
228

229
          Logger.error("ClientHandler: #{inspect(reason)} #{inspect({user, db_name})}")
1✔
230

231
          Telem.client_join(:fail, tenant_or_alias)
1✔
232

233
          HandlerHelpers.send_error(
1✔
234
            data.sock,
1✔
235
            "XX000",
236
            "Authentication error, reason: #{inspect(reason)}"
237
          )
238

239
          {:stop, {:shutdown, :invalid_format}}
240
        end
241

242
      {:error, error} ->
243
        Logger.error("ClientHandler: Client startup message error: #{inspect(error)}")
2✔
244

245
        Telem.client_join(:fail, data.id)
2✔
246
        {:stop, {:shutdown, :startup_packet_error}}
247
    end
248
  end
249

250
  def handle_event(
251
        :internal,
252
        {:hello, {type, {user, tenant_or_alias, db_name, search_path}}},
253
        :exchange,
254
        %{sock: sock} = data
255
      ) do
256
    Logger.metadata(state: :exchange)
280✔
257
    sni_hostname = HandlerHelpers.try_get_sni(sock)
280✔
258

259
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
280✔
260
      {:ok, info} ->
261
        db_name = db_name || info.tenant.db_database
280✔
262

263
        id =
280✔
264
          Supavisor.id(
265
            {type, tenant_or_alias},
266
            user,
267
            data.mode,
280✔
268
            info.user.mode_type,
280✔
269
            db_name,
270
            search_path
271
          )
272

273
        mode = Supavisor.mode(id)
280✔
274

275
        Logger.metadata(
280✔
276
          project: tenant_or_alias,
277
          user: user,
278
          mode: mode,
279
          type: type,
280
          db_name: db_name,
281
          app_name: data.app_name
280✔
282
        )
283

284
        {:ok, addr} = HandlerHelpers.addr_from_sock(sock)
280✔
285

286
        cond do
279✔
287
          !data.local and info.tenant.enforce_ssl and !data.ssl ->
279✔
288
            Logger.error(
×
289
              "ClientHandler: Tenant is not allowed to connect without SSL, user #{user}"
×
290
            )
291

292
            :ok = HandlerHelpers.send_error(sock, "XX000", "SSL connection is required")
×
293
            Telem.client_join(:fail, id)
×
294
            {:stop, {:shutdown, :ssl_required}}
295

296
          HandlerHelpers.filter_cidrs(info.tenant.allow_list, addr) == [] ->
279✔
297
            message = "Address not in tenant allow_list: " <> inspect(addr)
×
298
            Logger.error("ClientHandler: #{message}")
×
299
            :ok = HandlerHelpers.send_error(sock, "XX000", message)
×
300

301
            Telem.client_join(:fail, id)
×
302
            {:stop, {:shutdown, :address_not_allowed}}
303

304
          true ->
279✔
305
            new_data = update_user_data(data, info, user, id, db_name, mode)
279✔
306

307
            key = {:secrets, tenant_or_alias, user}
279✔
308

309
            case auth_secrets(info, user, key, :timer.hours(24)) do
279✔
310
              {:ok, auth_secrets} ->
311
                Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")
279✔
312

313
                {:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets, info}}}
279✔
314

315
              {:error, reason} ->
316
                Logger.error(
×
317
                  "ClientHandler: Authentication auth_secrets error: #{inspect(reason)}"
318
                )
319

320
                :ok =
×
321
                  HandlerHelpers.send_error(
322
                    sock,
323
                    "XX000",
324
                    "Authentication error, reason: #{inspect(reason)}"
325
                  )
326

327
                Telem.client_join(:fail, id)
×
328
                {:stop, {:shutdown, :auth_secrets_error}}
329
            end
330
        end
331

332
      {:error, reason} ->
333
        Logger.error(
×
334
          "ClientHandler: User not found: #{inspect(reason)} #{inspect({type, user, tenant_or_alias})}"
335
        )
336

337
        :ok = HandlerHelpers.send_error(sock, "XX000", "Tenant or user not found")
×
338
        Telem.client_join(:fail, data.id)
×
339
        {:stop, {:shutdown, :user_not_found}}
340
    end
341
  end
342

343
  def handle_event(
344
        :internal,
345
        {:handle, {method, secrets}, info},
346
        state,
347
        %{sock: sock} = data
348
      ) do
349
    Logger.metadata(state: state)
279✔
350
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}")
279✔
351

352
    case handle_exchange(sock, {method, secrets}) do
279✔
353
      {:error, reason} ->
354
        Logger.error(
5✔
355
          "ClientHandler: Exchange error: #{inspect(reason)} when method #{inspect(method)}"
356
        )
357

358
        msg =
5✔
359
          if method == :auth_query_md5,
360
            do: Server.error_message("XX000", reason),
×
361
            else: Server.exchange_message(:final, "e=#{reason}")
5✔
362

363
        key = {:secrets_check, data.tenant, data.user}
5✔
364

365
        if method != :password and reason == "Wrong password" and
5✔
366
             Cachex.get(Supavisor.Cache, key) == {:ok, nil} do
1✔
367
          case auth_secrets(info, data.user, key, 15_000) do
1✔
368
            {:ok, {method2, secrets2}} = value ->
369
              if method != method2 or Map.delete(secrets.(), :client_key) != secrets2.() do
1✔
370
                Logger.warning("ClientHandler: Update secrets and terminate pool")
1✔
371

372
                Cachex.update(
1✔
373
                  Supavisor.Cache,
374
                  {:secrets, data.tenant, data.user},
1✔
375
                  {:cached, value}
376
                )
377

378
                Supavisor.stop(data.id)
1✔
379
              else
380
                Logger.debug("ClientHandler: Cache the same #{inspect(key)}")
×
381
              end
382

383
            other ->
384
              Logger.error("ClientHandler: Auth secrets check error: #{inspect(other)}")
×
385
          end
386
        else
387
          Logger.debug("ClientHandler: Cache hit for #{inspect(key)}")
4✔
388
        end
389

390
        HandlerHelpers.sock_send(sock, msg)
5✔
391
        Telem.client_join(:fail, data.id)
5✔
392
        {:stop, {:shutdown, :exchange_error}}
393

394
      {:ok, client_key} ->
395
        secrets =
274✔
396
          if client_key,
397
            do: fn -> Map.put(secrets.(), :client_key, client_key) end,
180✔
398
            else: secrets
94✔
399

400
        Logger.debug("ClientHandler: Exchange success")
274✔
401
        :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
274✔
402
        Telem.client_join(:ok, data.id)
274✔
403

404
        auth = Map.merge(data.auth, %{secrets: secrets, method: method})
274✔
405

406
        conn_type =
274✔
407
          if data.mode == :proxy,
274✔
408
            do: :connect_db,
409
            else: :subscribe
410

411
        {:keep_state, %{data | auth_secrets: {method, secrets}, auth: auth},
274✔
412
         {:next_event, :internal, conn_type}}
413
    end
414
  end
415

416
  def handle_event(:internal, :subscribe, state, data) do
417
    Logger.metadata(state: state)
274✔
418
    Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}")
274✔
419

420
    with {:ok, sup} <-
274✔
421
           Supavisor.start_dist(data.id, data.auth_secrets,
274✔
422
             log_level: data.log_level,
274✔
423
             availability_zone: data.tenant_availability_zone
274✔
424
           ),
425
         true <-
274✔
426
           if(node(sup) != node() and data.mode in [:transaction, :session],
274✔
427
             do: :proxy,
428
             else: true
429
           ),
430
         {:ok, opts} <- Supavisor.subscribe(sup, data.id) do
274✔
431
      manager_ref = Process.monitor(opts.workers.manager)
273✔
432

433
      data = Map.merge(data, opts.workers)
273✔
434
      db_pid = db_checkout(:both, :on_connect, data)
273✔
435
      data = %{data | manager: manager_ref, db_pid: db_pid, idle_timeout: opts.idle_timeout}
272✔
436

437
      Registry.register(@clients_registry, data.id, started_at: System.monotonic_time())
272✔
438

439
      next =
272✔
440
        if opts.ps == [],
272✔
441
          do: {:timeout, 10_000, :wait_ps},
31✔
442
          else: {:next_event, :internal, {:greetings, opts.ps}}
241✔
443

444
      {:keep_state, data, next}
272✔
445
    else
446
      {:error, :max_clients_reached} ->
447
        msg = "Max client connections reached"
1✔
448
        Logger.error("ClientHandler: #{msg}")
1✔
449
        :ok = HandlerHelpers.send_error(data.sock, "XX000", msg)
1✔
450
        Telem.client_join(:fail, data.id)
1✔
451
        {:stop, {:shutdown, :max_clients_reached}}
452

453
      {:error, :max_pools_reached} ->
454
        msg = "Max pools count reached"
×
455
        Logger.error("ClientHandler: #{msg}")
×
456
        :ok = HandlerHelpers.send_error(data.sock, "XX000", msg)
×
457
        Telem.client_join(:fail, data.id)
×
458
        {:stop, {:shutdown, :max_pools_reached}}
459

460
      :proxy ->
461
        case Supavisor.get_pool_ranch(data.id) do
×
462
          {:ok, %{port: port, host: host}} ->
463
            auth =
×
464
              Map.merge(data.auth, %{
×
465
                port: port,
466
                host: to_charlist(host),
467
                ip_version: :inet,
468
                upstream_ssl: false,
469
                upstream_tls_ca: nil,
470
                upstream_verify: nil
471
              })
472

473
            Logger.metadata(proxy: true)
×
474
            Registry.register(@proxy_clients_registry, data.id, [])
×
475

476
            {:keep_state, %{data | auth: auth}, {:next_event, :internal, :connect_db}}
×
477

478
          other ->
479
            Logger.error("ClientHandler: Subscribe proxy error: #{inspect(other)}")
×
480
            timeout_subscribe_or_terminate(data)
×
481
        end
482

483
      error ->
484
        Logger.error("ClientHandler: Subscribe error: #{inspect(error)}")
×
485
        timeout_subscribe_or_terminate(data)
×
486
    end
487
  end
488

489
  def handle_event(:internal, :connect_db, state, data) do
490
    Logger.metadata(state: state)
×
491
    Logger.debug("ClientHandler: Trying to connect to DB")
×
492

493
    args = %{
×
494
      id: data.id,
×
495
      auth: data.auth,
×
496
      user: data.user,
×
497
      tenant: {:single, data.tenant},
×
498
      replica_type: :write,
499
      mode: :proxy,
500
      proxy: true,
501
      log_level: data.log_level,
×
502
      caller: self(),
503
      client_sock: data.sock
×
504
    }
505

506
    {:ok, db_pid} = DbHandler.start_link(args)
×
507
    db_sock = :gen_statem.call(db_pid, {:checkout, data.sock, self()})
×
508
    {:keep_state, %{data | db_pid: {nil, db_pid, db_sock}, mode: :proxy}}
509
  end
510

511
  def handle_event(:internal, {:greetings, ps}, state, %{sock: sock} = data) do
512
    Logger.metadata(state: state)
272✔
513
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
272✔
514
    msg = [ps, [header, payload], Server.ready_for_query()]
272✔
515
    :ok = HandlerHelpers.listen_cancel_query(pid, key)
272✔
516
    :ok = HandlerHelpers.sock_send(sock, msg)
272✔
517
    Telem.client_connection_time(data.connection_start, data.id)
272✔
518
    {:next_state, :idle, data, handle_actions(data)}
272✔
519
  end
520

521
  def handle_event(:timeout, :subscribe, state, _) do
522
    Logger.metadata(state: state)
×
523
    {:keep_state_and_data, {:next_event, :internal, :subscribe}}
524
  end
525

526
  def handle_event(:timeout, :wait_ps, state, data) do
527
    Logger.metadata(state: state)
×
528

529
    Logger.error(
×
530
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
531
    )
532

533
    ps = Server.encode_parameter_status(data.ps)
×
534
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
535
  end
536

537
  def handle_event(:timeout, :idle_terminate, state, data) do
538
    Logger.metadata(state: state)
×
539
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
540
    {:stop, {:shutdown, :idle_terminate}}
541
  end
542

543
  def handle_event(:timeout, :heartbeat_check, state, data) do
544
    Logger.metadata(state: state)
×
545
    Logger.debug("ClientHandler: Send heartbeat to client")
×
546
    HandlerHelpers.sock_send(data.sock, Server.application_name())
×
547
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
548
  end
549

550
  def handle_event(kind, {proto, socket, msg}, state, data) when proto in @proto do
551
    Logger.metadata(state: state)
11,550✔
552

553
    with {:next_state, next_state, new_data, actions} <- handle_data(kind, msg, state, data) do
11,550✔
554
      new_actions =
3,519✔
555
        actions
556
        |> List.wrap()
557
        |> Enum.map(fn
558
          {:next_event, type, bin_or_pkts} when is_binary(bin_or_pkts) or is_list(bin_or_pkts) ->
559
            {:next_event, type, {proto, socket, bin_or_pkts}}
3,519✔
560

561
          other ->
562
            other
×
563
        end)
564

565
      {:next_state, next_state, new_data, new_actions}
3,519✔
566
    end
567
  end
568

569
  def handle_event(:info, {:parameter_status, ps}, :exchange, _) do
570
    Logger.metadata(state: :exchange)
31✔
571
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
572
  end
573

574
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
575
    Logger.metadata(state: :exchange)
1✔
576
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
577
    :keep_state_and_data
578
  end
579

580
  # client closed connection
581
  def handle_event(_, {closed, _}, state, data)
582
      when closed in [:tcp_closed, :ssl_closed] do
583
    Logger.metadata(state: state)
123✔
584
    Logger.debug("ClientHandler: #{closed} socket closed for #{inspect(data.tenant)}")
123✔
585
    {:stop, {:shutdown, :socket_closed}}
586
  end
587

588
  # linked DbHandler went down
589
  def handle_event(:info, {:EXIT, db_pid, reason}, state, data) do
590
    Logger.metadata(state: state)
×
591
    Logger.error("ClientHandler: DbHandler #{inspect(db_pid)} exited #{inspect(reason)}")
×
592
    HandlerHelpers.sock_send(data.sock, Server.error_message("XX000", "DbHandler exited"))
×
593
    {:stop, {:shutdown, :db_handler_exit}}
594
  end
595

596
  # pool's manager went down
597
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
598
    Logger.metadata(state: state)
×
599

600
    Logger.error(
×
601
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
×
602
    )
603

604
    case {state, reason} do
×
605
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
×
606
      {:idle, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
607
      {:busy, _} -> {:stop, {:shutdown, :manager_down}}
×
608
    end
609
  end
610

611
  def handle_event(:info, {:disconnect, reason}, state, _data) do
612
    Logger.metadata(state: state)
×
613
    Logger.warning("ClientHandler: Disconnected due to #{inspect(reason)}")
×
614
    {:stop, {:shutdown, {:disconnect, reason}}}
615
  end
616

617
  # emulate handle_cast
618
  def handle_event(:cast, {:db_status, status}, :busy, data) do
619
    Logger.metadata(state: :busy)
3,425✔
620

621
    case status do
3,425✔
622
      :ready_for_query ->
623
        Logger.debug("ClientHandler: Client is ready")
3,425✔
624

625
        db_pid = handle_db_pid(data.mode, data.pool, data.db_pid)
3,425✔
626

627
        {_, stats} =
3,425✔
628
          if data.local,
3,425✔
629
            do: {nil, data.stats},
2✔
630
            else: Telem.network_usage(:client, data.sock, data.id, data.stats)
3,423✔
631

632
        Telem.client_query_time(data.query_start, data.id)
3,425✔
633

634
        {:next_state, :idle,
3,425✔
635
         %{data | db_pid: db_pid, stats: stats, active_count: reset_active_count(data)},
636
         handle_actions(data)}
637

638
      :read_sql_error ->
639
        Logger.error(
×
640
          "ClientHandler: read only sql transaction, rerunning the query to write pool"
641
        )
642

643
        # release the read pool
644
        _ = handle_db_pid(data.mode, data.pool, data.db_pid)
×
645

646
        ts = System.monotonic_time()
×
647
        db_pid = db_checkout(:write, :on_query, data)
×
648

649
        {:keep_state, %{data | db_pid: db_pid, query_start: ts},
×
650
         {:next_event, :internal, {:tcp, nil, data.last_query}}}
×
651
    end
652
  end
653

654
  def handle_event(:info, {sock_error, _sock, msg}, state, _data)
655
      when sock_error in [:tcp_error, :ssl_error] do
656
    Logger.metadata(state: state)
×
657
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}")
×
658

659
    {:stop, {:shutdown, {:socket_error, msg}}}
660
  end
661

662
  def handle_event(type, content, state, _data) do
663
    Logger.metadata(state: state)
×
664

665
    msg = [
×
666
      {"type", type},
667
      {"content", content}
668
    ]
669

670
    Logger.error("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
×
671

672
    :keep_state_and_data
673
  end
674

675
  @impl true
676
  def terminate(
677
        {:timeout, {_, _, [_, {:checkout, _, _}, _]}},
678
        state,
679
        data
680
      ) do
681
    Logger.metadata(state: state)
1✔
682

683
    msg =
1✔
684
      case data.mode do
1✔
685
        :session ->
1✔
686
          "MaxClientsInSessionMode: max clients reached - in Session mode max clients are limited to pool_size"
687

688
        :transaction ->
×
689
          "Unable to check out process from the pool due to timeout"
690
      end
691

692
    Logger.error("ClientHandler: #{msg}")
1✔
693
    HandlerHelpers.sock_send(data.sock, Server.error_message("XX000", msg))
1✔
694
    :ok
695
  end
696

697
  def terminate(reason, state, %{db_pid: {_, pid, _}}) do
698
    Logger.metadata(state: state)
99✔
699

700
    db_info =
99✔
701
      with {:ok, {state, mode} = resp} <- DbHandler.get_state_and_mode(pid) do
×
702
        if state == :busy or mode == :session, do: DbHandler.stop(pid)
99✔
703
        resp
99✔
704
      end
705

706
    Logger.debug(
99✔
707
      "ClientHandler: socket closed with reason #{inspect(reason)}, DbHandler #{inspect({pid, db_info})}"
708
    )
709

710
    :ok
711
  end
712

713
  def terminate(reason, state, _data) do
714
    Logger.metadata(state: state)
275✔
715
    Logger.debug("ClientHandler: socket closed with reason #{inspect(reason)}")
275✔
716
    :ok
717
  end
718

719
  ## Internal functions
720

721
  @spec handle_exchange(Supavisor.sock(), {atom(), fun()}) ::
722
          {:ok, binary() | nil} | {:error, String.t()}
723
  def handle_exchange({_, socket} = sock, {:auth_query_md5 = method, secrets}) do
724
    salt = :crypto.strong_rand_bytes(4)
×
725
    :ok = HandlerHelpers.sock_send(sock, Server.md5_request(salt))
×
726

727
    with {:ok,
×
728
          %{
729
            tag: :password_message,
730
            payload: {:md5, client_md5}
731
          }, _} <- receive_next(socket, "Timeout while waiting for the md5 exchange"),
×
732
         {:ok, key} <- authenticate_exchange(method, client_md5, secrets.().secret, salt) do
×
733
      {:ok, key}
734
    else
735
      {:error, message} -> {:error, message}
×
736
      other -> {:error, "Unexpected message #{inspect(other)}"}
×
737
    end
738
  end
739

740
  def handle_exchange({_, socket} = sock, {method, secrets}) do
741
    :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
279✔
742

743
    with {:ok,
279✔
744
          %{
745
            tag: :password_message,
746
            payload: {:scram_sha_256, %{"n" => user, "r" => nonce, "c" => channel}}
747
          },
748
          _} <-
279✔
749
           receive_next(
750
             socket,
751
             "Timeout while waiting for the first password message"
752
           ),
753
         {:ok, signatures} = reply_first_exchange(sock, method, secrets, channel, nonce, user),
279✔
754
         {:ok,
755
          %{
756
            tag: :password_message,
757
            payload: {:first_msg_response, %{"p" => p}}
758
          },
759
          _} <-
279✔
760
           receive_next(
761
             socket,
762
             "Timeout while waiting for the second password message"
763
           ),
764
         {:ok, key} <- authenticate_exchange(method, secrets, signatures, p) do
276✔
765
      message = "v=#{Base.encode64(signatures.server)}"
274✔
766
      :ok = HandlerHelpers.sock_send(sock, Server.exchange_message(:final, message))
274✔
767
      {:ok, key}
768
    else
769
      {:error, message} -> {:error, message}
5✔
770
      other -> {:error, "Unexpected message #{inspect(other)}"}
×
771
    end
772
  end
773

774
  defp receive_next(socket, timeout_message) do
775
    receive do
558✔
776
      {_proto, ^socket, bin} ->
777
        Server.decode_pkt(bin)
555✔
778

779
      other ->
3✔
780
        {:error, "Unexpected message in receive_next/2 #{inspect(other)}"}
781
    after
782
      15_000 -> {:error, timeout_message}
×
783
    end
784
  end
785

786
  defp reply_first_exchange(sock, method, secrets, channel, nonce, user) do
787
    {message, signatures} = exchange_first(method, secrets, nonce, user, channel)
279✔
788
    :ok = HandlerHelpers.sock_send(sock, Server.exchange_message(:first, message))
279✔
789
    {:ok, signatures}
790
  end
791

792
  defp authenticate_exchange(:password, _secrets, signatures, p) do
793
    if p == signatures.client,
95✔
794
      do: {:ok, nil},
795
      else: {:error, "Wrong password"}
796
  end
797

798
  defp authenticate_exchange(:auth_query, secrets, signatures, p) do
799
    client_key = :crypto.exor(Base.decode64!(p), signatures.client)
181✔
800

801
    if Helpers.hash(client_key) == secrets.().stored_key do
181✔
802
      {:ok, client_key}
803
    else
804
      {:error, "Wrong password"}
805
    end
806
  end
807

808
  defp authenticate_exchange(:auth_query_md5, client_hash, server_hash, salt) do
809
    if "md5" <> Helpers.md5([server_hash, salt]) == client_hash,
×
810
      do: {:ok, nil},
811
      else: {:error, "Wrong password"}
812
  end
813

814
  @spec db_checkout(:write | :read | :both, :on_connect | :on_query, map) ::
815
          {pid, pid, Supavisor.sock()} | nil
816
  defp db_checkout(_, _, %{mode: mode, db_pid: {pool, db_pid, db_sock}})
817
       when is_pid(db_pid) and mode in [:session, :proxy] do
818
    {pool, db_pid, db_sock}
89✔
819
  end
820

821
  defp db_checkout(_, :on_connect, %{mode: :transaction}), do: nil
178✔
822

823
  defp db_checkout(query_type, _, data) when query_type in [:write, :read] do
824
    pool =
×
825
      data.pool[query_type]
×
826
      |> Enum.random()
827

828
    {time, db_pid} = :timer.tc(:poolboy, :checkout, [pool, true, data.timeout])
×
829
    Process.link(db_pid)
×
830
    same_box = if node(db_pid) == node(), do: :local, else: :remote
×
831
    Telem.pool_checkout_time(time, data.id, same_box)
×
832
    {pool, db_pid}
833
  end
834

835
  defp db_checkout(_, _, data) do
836
    start = System.monotonic_time(:microsecond)
3,525✔
837
    db_pid = :poolboy.checkout(data.pool, true, data.timeout)
3,525✔
838
    Process.link(db_pid)
3,524✔
839
    db_sock = DbHandler.checkout(db_pid, data.sock, self())
3,524✔
840
    same_box = if node(db_pid) == node(), do: :local, else: :remote
3,524✔
841
    Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
3,524✔
842
    {data.pool, db_pid, db_sock}
3,524✔
843
  end
844

845
  @spec handle_db_pid(:transaction, pid(), pid() | nil) :: nil
846
  @spec handle_db_pid(:session, pid(), pid()) :: pid()
847
  @spec handle_db_pid(:proxy, pid(), pid()) :: pid()
848
  defp handle_db_pid(:transaction, _pool, nil), do: nil
×
849

850
  defp handle_db_pid(:transaction, pool, {_, db_pid, _}) do
851
    Process.unlink(db_pid)
3,425✔
852
    :poolboy.checkin(pool, db_pid)
3,425✔
853
    nil
854
  end
855

856
  defp handle_db_pid(:session, _, db_pid), do: db_pid
×
857
  defp handle_db_pid(:proxy, _, db_pid), do: db_pid
×
858

859
  defp update_user_data(data, info, user, id, db_name, mode) do
860
    proxy_type =
279✔
861
      if info.tenant.require_user,
279✔
862
        do: :password,
863
        else: :auth_query
864

865
    auth = %{
279✔
866
      application_name: data[:app_name] || "Supavisor",
279✔
867
      database: db_name,
868
      host: to_charlist(info.tenant.db_host),
279✔
869
      sni_hostname:
870
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
279✔
871
      port: info.tenant.db_port,
279✔
872
      user: user,
873
      password: info.user.db_password,
279✔
874
      require_user: info.tenant.require_user,
279✔
875
      upstream_ssl: info.tenant.upstream_ssl,
279✔
876
      upstream_tls_ca: info.tenant.upstream_tls_ca,
279✔
877
      upstream_verify: info.tenant.upstream_verify
279✔
878
    }
879

880
    %{
881
      data
882
      | tenant: info.tenant.external_id,
279✔
883
        user: user,
884
        timeout: info.user.pool_checkout_timeout,
279✔
885
        ps: info.tenant.default_parameter_status,
279✔
886
        proxy_type: proxy_type,
887
        id: id,
888
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
279✔
889
        db_name: db_name,
890
        mode: mode,
891
        auth: auth,
892
        tenant_availability_zone: info.tenant.availability_zone
279✔
893
    }
894
  end
895

896
  @spec auth_secrets(map, String.t(), term(), non_neg_integer()) ::
897
          {:ok, Supavisor.secrets()} | {:error, term()}
898
  ## password secrets
899
  def auth_secrets(%{user: user, tenant: %{require_user: true}}, _, _, _) do
900
    secrets = %{db_user: user.db_user, password: user.db_password, alias: user.db_user_alias}
95✔
901

902
    {:ok, {:password, fn -> secrets end}}
166✔
903
  end
904

905
  ## auth_query secrets
906
  def auth_secrets(info, db_user, key, ttl) do
907
    fetch = fn _key ->
185✔
908
      case get_secrets(info, db_user) do
16✔
909
        {:ok, _} = resp -> {:commit, {:cached, resp}, ttl: ttl}
16✔
910
        {:error, _} = resp -> {:ignore, resp}
×
911
      end
912
    end
913

914
    case Cachex.fetch(Supavisor.Cache, key, fetch) do
185✔
915
      {:ok, {:cached, value}} -> value
169✔
916
      {:commit, {:cached, value}, _opts} -> value
16✔
917
      {:ignore, resp} -> resp
×
918
    end
919
  end
920

921
  @spec get_secrets(map, String.t()) :: {:ok, {:auth_query, fun()}} | {:error, term()}
922
  def get_secrets(%{user: user, tenant: tenant}, db_user) do
923
    ssl_opts =
16✔
924
      if tenant.upstream_ssl and tenant.upstream_verify == :peer do
16✔
925
        [
926
          verify: :verify_peer,
927
          cacerts: [Helpers.upstream_cert(tenant.upstream_tls_ca)],
×
928
          server_name_indication: String.to_charlist(tenant.db_host),
×
929
          customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
×
930
        ]
931
      else
932
        [
933
          verify: :verify_none
934
        ]
935
      end
936

937
    {:ok, conn} =
16✔
938
      Postgrex.start_link(
939
        hostname: tenant.db_host,
16✔
940
        port: tenant.db_port,
16✔
941
        database: tenant.db_database,
16✔
942
        password: user.db_password,
16✔
943
        username: user.db_user,
16✔
944
        parameters: [application_name: "Supavisor auth_query"],
945
        ssl: tenant.upstream_ssl,
16✔
946
        socket_options: [
947
          Helpers.ip_version(tenant.ip_version, tenant.db_host)
16✔
948
        ],
949
        queue_target: 1_000,
950
        queue_interval: 5_000,
951
        ssl_opts: ssl_opts
952
      )
953

954
    try do
16✔
955
      Logger.debug(
16✔
956
        "ClientHandler: Connected to db #{tenant.db_host} #{tenant.db_port} #{tenant.db_database} #{user.db_user}"
16✔
957
      )
958

959
      resp =
16✔
960
        with {:ok, secret} <- Helpers.get_user_secret(conn, tenant.auth_query, db_user) do
16✔
961
          t = if secret.digest == :md5, do: :auth_query_md5, else: :auth_query
16✔
962
          {:ok, {t, fn -> Map.put(secret, :alias, user.db_user_alias) end}}
857✔
963
        end
964

965
      Logger.info("ClientHandler: Get secrets finished")
16✔
966
      resp
16✔
967
    rescue
968
      exception ->
×
969
        Logger.error("ClientHandler: Couldn't fetch user secrets from #{tenant.db_host}")
×
970
        reraise exception, __STACKTRACE__
×
971
    after
972
      GenServer.stop(conn, :normal, 5_000)
16✔
973
    end
974
  end
975

976
  @spec exchange_first(:password | :auth_query, fun(), binary(), binary(), binary()) ::
977
          {binary(), map()}
978
  defp exchange_first(:password, secret, nonce, user, channel) do
979
    message = Server.exchange_first_message(nonce)
95✔
980
    server_first_parts = Helpers.parse_server_first(message, nonce)
95✔
981

982
    {client_final_message, server_proof} =
95✔
983
      Helpers.get_client_final(
984
        :password,
985
        secret.().password,
95✔
986
        server_first_parts,
987
        nonce,
988
        user,
989
        channel
990
      )
991

992
    sings = %{
95✔
993
      client: List.last(client_final_message),
994
      server: server_proof
995
    }
996

997
    {message, sings}
998
  end
999

1000
  defp exchange_first(:auth_query, secret, nonce, user, channel) do
1001
    secret = secret.()
184✔
1002
    message = Server.exchange_first_message(nonce, secret.salt)
184✔
1003
    server_first_parts = Helpers.parse_server_first(message, nonce)
184✔
1004

1005
    sings =
184✔
1006
      Helpers.signatures(
1007
        secret.stored_key,
184✔
1008
        secret.server_key,
184✔
1009
        server_first_parts,
1010
        nonce,
1011
        user,
1012
        channel
1013
      )
1014

1015
    {message, sings}
1016
  end
1017

1018
  @spec try_get_sni(Supavisor.sock()) :: String.t() | nil
1019
  def try_get_sni({:ssl, sock}) do
1020
    case :ssl.connection_information(sock, [:sni_hostname]) do
×
1021
      {:ok, [sni_hostname: sni]} -> List.to_string(sni)
×
1022
      _ -> nil
×
1023
    end
1024
  end
1025

1026
  def try_get_sni(_), do: nil
×
1027

1028
  defp db_pid_meta({_, {_, pid, _}} = _key) do
1029
    rkey = Supavisor.Registry.PoolPids
3✔
1030
    fnode = node(pid)
3✔
1031

1032
    if fnode == node() do
3✔
1033
      Registry.lookup(rkey, pid)
3✔
1034
    else
1035
      :erpc.call(fnode, Registry, :lookup, [rkey, pid], 15_000)
×
1036
    end
1037
  end
1038

1039
  @spec handle_data(kind :: atom(), data :: binary(), state, data) ::
1040
          :gen_statem.event_handler_result(data)
1041
        when state: atom() | term(),
1042
             data: term()
1043

1044
  # handle Terminate message
1045
  defp handle_data(:info, <<?X, 4::32>>, :idle, %{local: true}) do
1046
    Logger.info("ClientHandler: Terminate received from proxy client")
×
1047
    :keep_state_and_data
1048
  end
1049

1050
  defp handle_data(:info, <<?X, 4::32>>, :idle, _data) do
1051
    Logger.info("ClientHandler: Terminate received from client")
148✔
1052
    {:stop, {:shutdown, :terminate_received}}
1053
  end
1054

1055
  defp handle_data(:info, <<?S, 4::32>> <> _ = msg, :idle, data) do
1056
    Logger.debug("ClientHandler: Receive sync")
3✔
1057

1058
    # db_pid can be nil in transaction mode, so we will send ready_for_query
1059
    # without checking out a direct connection. If there is a linked db_pid,
1060
    # we will forward the message to it
1061
    if data.db_pid != nil,
3✔
NEW
1062
      do: :ok = sock_send_binary_maybe_active_once(msg, data),
×
1063
      else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
3✔
1064

1065
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
3✔
1066
  end
1067

1068
  defp handle_data(:info, <<?S, 4::32, _::binary>> = msg, _, data) do
1069
    Logger.debug("ClientHandler: Receive sync while not idle")
75✔
1070
    :ok = sock_send_binary_maybe_active_once(msg, data)
75✔
1071
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
75✔
1072
  end
1073

1074
  # handle Flush message
1075
  defp handle_data(:info, <<?H, 4::32, _::binary>> = msg, _, data) do
1076
    Logger.debug("ClientHandler: Receive flush while not idle")
×
NEW
1077
    :ok = sock_send_binary_maybe_active_once(msg, data)
×
1078
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
×
1079
  end
1080

1081
  # incoming query with a single pool
1082
  defp handle_data(:info, bin, :idle, %{pool: pid} = data) when is_pid(pid) do
1083
    Logger.debug("ClientHandler: Receive query #{inspect(bin)}")
3,519✔
1084
    db_pid = db_checkout(:both, :on_query, data)
3,519✔
1085

1086
    {:next_state, :busy,
3,519✔
1087
     %{
1088
       data
1089
       | db_pid: db_pid,
1090
         query_start: System.monotonic_time()
1091
     }, {:next_event, :internal, bin}}
1092
  end
1093

1094
  defp handle_data(:info, bin, _, %{mode: :proxy} = data) do
1095
    {:next_state, :busy, %{data | query_start: System.monotonic_time()},
×
1096
     {:next_event, :internal, bin}}
1097
  end
1098

1099
  # incoming query with read/write pools
1100
  defp handle_data(:info, bin, :idle, data) do
1101
    query_type =
×
1102
      with {:ok, payload} <- Client.get_payload(bin),
×
1103
           {:ok, statements} <- Supavisor.PgParser.statements(payload) do
×
1104
        Logger.debug(
×
1105
          "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}"
1106
        )
1107

1108
        case statements do
×
1109
          # naive check for read only queries
1110
          ["SelectStmt"] -> :read
×
1111
          _ -> :write
×
1112
        end
1113
      else
1114
        other ->
1115
          Logger.error("ClientHandler: Receive query error: #{inspect(other)}")
×
1116
          :write
1117
      end
1118

1119
    ts = System.monotonic_time()
×
1120
    db_pid = db_checkout(query_type, :on_query, data)
×
1121

1122
    {:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin},
×
1123
     {:next_event, :internal, bin}}
1124
  end
1125

1126
  # forward query to db
1127
  defp handle_data(_, data_to_send, :busy, data) do
1128
    Logger.debug(
7,805✔
1129
      "ClientHandler: Forward query to db #{inspect(data_to_send)} #{inspect(data.db_pid)}"
7,805✔
1130
    )
1131

1132
    case handle_client_pkts(<<data.pending::binary, data_to_send::binary>>, data) do
7,805✔
1133
      {:ok, bin_or_pkts, prepared_statements, rest} ->
1134
        case sock_send_maybe_active_once(bin_or_pkts, data) do
7,802✔
1135
          :ok ->
7,802✔
1136
            {:keep_state,
1137
             %{
1138
               data
1139
               | prepared_statements: prepared_statements,
1140
                 pending: rest,
1141
                 active_count: data.active_count + 1
7,802✔
1142
             }}
1143

1144
          error ->
NEW
1145
            Logger.error("ClientHandler: error while sending query: #{inspect(error)}")
×
1146

NEW
1147
            HandlerHelpers.sock_send(
×
NEW
1148
              data.sock,
×
1149
              Server.error_message("XX000", "Error while sending query")
1150
            )
1151

1152
            {:stop, {:shutdown, :send_query_error}}
1153
        end
1154

1155
      error ->
1156
        handle_error(data.sock, error)
3✔
1157
    end
1158
  end
1159

1160
  @spec handle_client_pkts(binary, map) ::
1161
          {:ok, [PreparedStatements.handled_pkt()] | binary, map, binary}
1162
          | {:error, :max_prepared_statements}
1163
          | {:error, :max_prepared_statements_memory}
1164
          | {:error, :prepared_statement_on_simple_query}
1165
          | {:error, :duplicate_prepared_statement, PreparedStatements.statement_name()}
1166
          | {:error, :prepared_statement_not_found, PreparedStatements.statement_name()}
1167
  defp handle_client_pkts(
1168
         bin,
1169
         %{mode: :transaction} = data
1170
       ) do
1171
    stmts = data.prepared_statements
5,572✔
1172
    {pkts, rest} = Protocol.split_pkts(bin)
5,572✔
1173

1174
    pkts
1175
    |> Enum.reduce_while({:ok, stmts, []}, fn pkt, {:ok, stmts, pkts} ->
1176
      case PreparedStatements.handle_pkt(stmts, pkt) do
13,750✔
1177
        {:ok, stmts, pkt} ->
13,747✔
1178
          {:cont, {:ok, stmts, [pkt | pkts]}}
1179

1180
        error ->
3✔
1181
          {:halt, error}
1182
      end
1183
    end)
1184
    |> case do
5,572✔
1185
      {:ok, stmts, pkts} ->
1186
        {:ok, Enum.reverse(pkts), stmts, rest}
5,569✔
1187

1188
      error ->
1189
        error
3✔
1190
    end
1191
  end
1192

1193
  defp handle_client_pkts(bin, data), do: {:ok, bin, data.prepared_statements, data.pending}
2,233✔
1194

1195
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
1196
  defp handle_actions(%{} = data) do
1197
    heartbeat =
3,775✔
1198
      if data.heartbeat_interval > 0,
3,775✔
1199
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
3,775✔
1200
        else: []
1201

1202
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
3,775✔
1203

1204
    idle ++ heartbeat
3,775✔
1205
  end
1206

1207
  @spec app_name(any()) :: String.t()
1208
  def app_name(name) when is_binary(name), do: name
139✔
1209

1210
  def app_name(nil), do: "Supavisor"
141✔
1211

1212
  def app_name(name) do
1213
    Logger.debug("ClientHandler: Invalid application name #{inspect(name)}")
×
1214
    "Supavisor"
1215
  end
1216

1217
  @spec maybe_change_log(map()) :: atom() | nil
1218
  def maybe_change_log(%{"payload" => %{"options" => options}}) do
1219
    level = options["log_level"] && String.to_existing_atom(options["log_level"])
×
1220

1221
    if level in [:debug, :info, :notice, :warning, :error] do
×
1222
      Helpers.set_log_level(level)
×
1223
      level
×
1224
    end
1225
  end
1226

1227
  def maybe_change_log(_), do: :ok
280✔
1228

1229
  defp sock_send_maybe_active_once(bin, data) when is_binary(bin) do
1230
    sock_send_binary_maybe_active_once(bin, data)
2,233✔
1231
  end
1232

1233
  defp sock_send_maybe_active_once(pkts, data) when is_list(pkts) do
1234
    sock_send_pkts_maybe_active_once(pkts, data)
5,569✔
1235
  end
1236

1237
  @spec sock_send_binary_maybe_active_once(binary(), map()) :: :ok | {:error, term()}
1238
  defp sock_send_binary_maybe_active_once(bin, data) when is_binary(bin) do
1239
    Logger.debug("ClientHandler: Send maybe active once")
2,308✔
1240
    active_count = data.active_count
2,308✔
1241

1242
    if active_count > @switch_active_count do
2,308✔
1243
      Logger.debug("ClientHandler: Activate socket #{inspect(active_count)}")
1,082✔
1244
      HandlerHelpers.active_once(data.sock)
1,082✔
1245
    end
1246

1247
    HandlerHelpers.sock_send(elem(data.db_pid, 2), bin)
2,308✔
1248
  end
1249

1250
  # Chunking to ensure we send bigger packets
1251
  @spec sock_send_pkts_maybe_active_once([PreparedStatements.handled_pkt()], map()) ::
1252
          :ok | {:error, term()}
1253
  defp sock_send_pkts_maybe_active_once(pkts, data) do
1254
    {_pool, db_handler, db_sock} = data.db_pid
5,569✔
1255
    active_count = data.active_count
5,569✔
1256

1257
    if active_count > @switch_active_count do
5,569✔
1258
      Logger.debug("ClientHandler: Activate socket #{inspect(active_count)}")
657✔
1259
      HandlerHelpers.active_once(data.sock)
657✔
1260
    end
1261

1262
    pkts
1263
    |> Enum.chunk_by(&is_tuple/1)
1264
    |> Enum.reduce_while(:ok, fn chunk, _acc ->
5,569✔
1265
      case chunk do
1266
        [t | _] = prepared_pkts when is_tuple(t) ->
1267
          Supavisor.DbHandler.handle_prepared_statement_pkts(db_handler, prepared_pkts)
3,583✔
1268

1269
        bins ->
1270
          HandlerHelpers.sock_send(db_sock, bins)
4,612✔
1271
      end
1272
      |> case do
8,195✔
1273
        :ok -> {:cont, :ok}
8,195✔
NEW
1274
        error -> {:halt, error}
×
1275
      end
1276
    end)
1277
  end
1278

1279
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
1280
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
1281
    if subscribe_retries < @subscribe_retries do
×
1282
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
×
1283

1284
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
×
1285
       {:timeout, @timeout_subscribe, :subscribe}}
1286
    else
1287
      Logger.error("ClientHandler: Terminate after retries")
×
1288
      {:stop, {:shutdown, :subscribe_retries}}
1289
    end
1290
  end
1291

1292
  @spec reset_active_count(map()) :: 0
1293
  def reset_active_count(data) do
1294
    HandlerHelpers.activate(data.sock)
3,503✔
1295
    0
1296
  end
1297

1298
  defp handle_error(sock, error) do
1299
    message = error_message(error)
3✔
1300

1301
    case error do
3✔
1302
      {:error, :prepared_statement_on_simple_query} ->
1303
        HandlerHelpers.sock_send(
1✔
1304
          sock,
1305
          [message, Server.ready_for_query()]
1306
        )
1307

1308
      _ ->
1309
        HandlerHelpers.sock_send(sock, message)
2✔
1310
    end
1311

1312
    {:stop, {:shutdown, elem(error, 1)}}
1313
  end
1314

1315
  defp error_message({:error, :max_prepared_statements}) do
1316
    message_text =
1✔
1317
      "max prepared statements limit reached. Limit: #{PreparedStatements.client_limit()} per connection"
1✔
1318

1319
    Server.error_message("XX000", message_text)
1✔
1320
  end
1321

1322
  defp error_message({:error, :max_prepared_statements_memory}) do
1323
    limit_mb = PreparedStatements.client_memory_limit_bytes() / 1_000_000
1✔
1324

1325
    message_text =
1✔
1326
      "max prepared statements memory limit reached. Limit: #{limit_mb}MB per connection"
1✔
1327

1328
    Server.error_message("XX000", message_text)
1✔
1329
  end
1330

1331
  defp error_message({:error, :prepared_statement_not_found, name}) do
NEW
1332
    message_text = "prepared statement #{inspect(name)} does not exist"
×
NEW
1333
    Server.error_message("26000", message_text)
×
1334
  end
1335

1336
  defp error_message({:error, :duplicate_prepared_statement, name}) do
NEW
1337
    Server.error_message("42P05", "prepared statement #{inspect(name)} already exists")
×
1338
  end
1339

1340
  defp error_message({:error, :prepared_statement_on_simple_query}) do
1341
    message_text =
1✔
1342
      "Supavisor transaction mode only supports prepared statements using the Extended Query Protocol"
1343

1344
    Server.error_message("XX000", message_text)
1✔
1345
  end
1346
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc