• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / b4dd368c22b9bd0976b45bcdc78267068366369a-PR-1415

10 Jun 2025 08:56AM UTC coverage: 84.384% (+0.3%) from 84.059%
b4dd368c22b9bd0976b45bcdc78267068366369a-PR-1415

Pull #1415

github

edgurgel
chore: bump version
Pull Request #1415: Fix unexpected errors returned on realtime channel

1821 of 2158 relevant lines covered (84.38%)

2441.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.23
/lib/extensions/postgres_cdc_rls/replications.ex
1
defmodule Extensions.PostgresCdcRls.Replications do
2
  @moduledoc """
3
  SQL queries that use PostgresCdcRls.ReplicationPoller to create a temporary slot and poll the write-ahead log.
4
  """
5

6
  import Postgrex, only: [query: 3]
7

8
  @spec prepare_replication(pid(), String.t()) ::
9
          {:ok, Postgrex.Result.t()} | {:error, Postgrex.Error.t()}
10
  def prepare_replication(conn, slot_name) do
11
    query(
164✔
12
      conn,
13
      "select
14
        case when not exists (
15
          select 1
16
          from pg_replication_slots
17
          where slot_name = $1
18
        )
19
        then (
20
          select 1 from pg_create_logical_replication_slot($1, 'wal2json', 'true')
21
        )
22
        else 1
23
        end;",
24
      [slot_name]
25
    )
26
  end
27

28
  @spec terminate_backend(pid(), String.t()) ::
29
          {:ok, :terminated} | {:error, :slot_not_found | Postgrex.Error.t()}
30
  def terminate_backend(conn, slot_name) do
31
    slots =
37✔
32
      query(conn, "select active_pid from pg_replication_slots where slot_name = $1", [slot_name])
33

34
    case slots do
37✔
35
      {:ok, %Postgrex.Result{rows: [[backend]]}} ->
36
        case query(conn, "select pg_terminate_backend($1)", [backend]) do
37✔
37
          {:ok, _resp} -> {:ok, :terminated}
37✔
38
          {:error, erroer} -> {:error, erroer}
×
39
        end
40

41
      {:ok, %Postgrex.Result{num_rows: 0}} ->
×
42
        {:error, :slot_not_found}
43

44
      {:error, error} ->
×
45
        {:error, error}
46
    end
47
  end
48

49
  @spec get_pg_stat_activity_diff(pid(), integer()) ::
50
          {:ok, integer()} | {:error, Postgrex.Error.t()}
51
  def get_pg_stat_activity_diff(conn, db_pid) do
52
    query =
37✔
53
      query(
54
        conn,
55
        "select
56
         extract(
57
          epoch from (now() - state_change)
58
         )::int as diff
59
         from pg_stat_activity where application_name = 'realtime_rls' and pid = $1",
60
        [db_pid]
61
      )
62

63
    case query do
37✔
64
      {:ok, %{rows: [[diff]]}} ->
37✔
65
        {:ok, diff}
66

67
      {:error, error} ->
×
68
        {:error, error}
69
    end
70
  end
71

72
  def list_changes(conn, slot_name, publication, max_changes, max_record_bytes) do
73
    query(
887✔
74
      conn,
75
      "select * from realtime.list_changes($1, $2, $3, $4)",
76
      [
77
        publication,
78
        slot_name,
79
        max_changes,
80
        max_record_bytes
81
      ]
82
    )
83
  end
84
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc