• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / f1f00ddb770b1eff4e47a0957ac3cb0ce13ea930-PR-1574

14 Oct 2025 11:17PM UTC coverage: 85.624% (-0.3%) from 85.908%
f1f00ddb770b1eff4e47a0957ac3cb0ce13ea930-PR-1574

Pull #1574

github

filipecabaco
move test out of integration testing
Pull Request #1574: fix: validate size of track message

8 of 9 new or added lines in 2 files covered. (88.89%)

7 existing lines in 2 files now uncovered.

2156 of 2518 relevant lines covered (85.62%)

2670.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.17
/lib/realtime/tenants/replication_connection.ex
1
defmodule Realtime.Tenants.ReplicationConnection do
2
  @moduledoc """
3
  ReplicationConnection it's the module that provides a way to stream data from a PostgreSQL database using logical replication.
4

5
  ## Struct parameters
6
  * `connection_opts` - The connection options to connect to the database.
7
  * `table` - The table to replicate. If `:all` is passed, it will replicate all tables.
8
  * `schema` - The schema of the table to replicate. If not provided, it will use the `public` schema. If `:all` is passed, this option is ignored.
9
  * `opts` - The options to pass to this module
10
  * `step` - The current step of the replication process
11
  * `publication_name` - The name of the publication to create. If not provided, it will use the schema and table name.
12
  * `replication_slot_name` - The name of the replication slot to create. If not provided, it will use the schema and table name.
13
  * `output_plugin` - The output plugin to use. Default is `pgoutput`.
14
  * `proto_version` - The protocol version to use. Default is `1`.
15
  * `handler_module` - The module that will handle the data received from the replication stream.
16
  * `metadata` - The metadata to pass to the handler module.
17

18
  """
19
  use Postgrex.ReplicationConnection
20
  use Realtime.Logs
21

22
  import Realtime.Adapters.Postgres.Protocol
23
  import Realtime.Adapters.Postgres.Decoder
24

25
  alias Realtime.Adapters.Postgres.Decoder
26
  alias Realtime.Adapters.Postgres.Protocol.KeepAlive
27
  alias Realtime.Adapters.Postgres.Protocol.Write
28
  alias Realtime.Api.Tenant
29
  alias Realtime.Database
30
  alias Realtime.Telemetry
31
  alias Realtime.Tenants.BatchBroadcast
32
  alias Realtime.Tenants.Cache
33

34
  @type t :: %__MODULE__{
35
          tenant_id: String.t(),
36
          opts: Keyword.t(),
37
          step:
38
            :disconnected
39
            | :check_replication_slot
40
            | :create_publication
41
            | :check_publication
42
            | :create_slot
43
            | :start_replication_slot
44
            | :streaming,
45
          publication_name: String.t(),
46
          replication_slot_name: String.t(),
47
          output_plugin: String.t(),
48
          proto_version: integer(),
49
          relations: map(),
50
          buffer: list(),
51
          monitored_pid: pid(),
52
          latency_committed_at: integer()
53
        }
54
  defstruct tenant_id: nil,
55
            opts: [],
56
            step: :disconnected,
57
            publication_name: nil,
58
            replication_slot_name: nil,
59
            output_plugin: "pgoutput",
60
            proto_version: 1,
61
            relations: %{},
62
            buffer: [],
63
            monitored_pid: nil,
64
            latency_committed_at: nil
65

66
  defmodule Wrapper do
67
    @moduledoc """
68
    This GenServer exists at the moment so that we can have an init timeout for ReplicationConnection
69
    """
70
    use GenServer
71

72
    def start_link(args, init_timeout) do
73
      GenServer.start_link(__MODULE__, args, timeout: init_timeout)
184✔
74
    end
75

76
    @impl true
77
    def init(args) do
78
      case Realtime.Tenants.ReplicationConnection.start_link(args) do
184✔
79
        {:ok, pid} -> {:ok, pid}
175✔
80
        {:error, reason} -> {:stop, reason}
×
81
      end
82
    end
83
  end
84

85
  @default_init_timeout 30_000
86
  @table "messages"
87
  @schema "realtime"
88
  @doc """
89
  Starts the replication connection for a tenant and monitors a given pid to stop the ReplicationConnection.
90
  """
91
  @spec start(Realtime.Api.Tenant.t(), pid()) :: {:ok, pid()} | {:error, any()}
92
  def start(tenant, monitored_pid, init_timeout \\ @default_init_timeout) do
93
    Logger.info("Starting replication for Broadcast Changes")
184✔
94
    opts = %__MODULE__{tenant_id: tenant.external_id, monitored_pid: monitored_pid}
184✔
95
    supervisor_spec = supervisor_spec(tenant)
184✔
96

97
    child_spec = %{
184✔
98
      id: __MODULE__,
99
      start: {Wrapper, :start_link, [opts, init_timeout]},
100
      restart: :temporary,
101
      type: :worker
102
    }
103

104
    case DynamicSupervisor.start_child(supervisor_spec, child_spec) do
184✔
105
      {:ok, pid} ->
161✔
106
        {:ok, pid}
107

108
      {:error, {:already_started, pid}} ->
×
109
        {:ok, pid}
110

111
      {:error, {:bad_return_from_init, {:stop, error, _}}} ->
×
112
        {:error, error}
113

114
      {:error, %Postgrex.Error{postgres: %{pg_code: pg_code}}} when pg_code in ~w(53300 53400) ->
×
115
        {:error, :max_wal_senders_reached}
116

117
      error ->
118
        error
9✔
119
    end
120
  end
121

122
  @doc """
123
  Finds replication connection by tenant_id
124
  """
125
  @spec whereis(String.t()) :: pid() | nil
126
  def whereis(tenant_id) do
127
    case Registry.lookup(Realtime.Registry.Unique, {__MODULE__, tenant_id}) do
12✔
128
      [{pid, _}] -> pid
10✔
129
      [] -> nil
2✔
130
    end
131
  end
132

133
  def start_link(%__MODULE__{tenant_id: tenant_id} = attrs) do
134
    tenant = Cache.get_tenant_by_external_id(tenant_id)
190✔
135
    connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop)
190✔
136

137
    connection_opts =
190✔
138
      [
139
        name: {:via, Registry, {Realtime.Registry.Unique, {__MODULE__, tenant_id}}},
140
        hostname: connection_opts.hostname,
190✔
141
        username: connection_opts.username,
190✔
142
        password: connection_opts.password,
190✔
143
        database: connection_opts.database,
190✔
144
        port: connection_opts.port,
190✔
145
        socket_options: connection_opts.socket_options,
190✔
146
        ssl: connection_opts.ssl,
190✔
147
        sync_connect: true,
148
        auto_reconnect: false,
149
        parameters: [application_name: "realtime_replication_connection"]
150
      ]
151

152
    case Postgrex.ReplicationConnection.start_link(__MODULE__, attrs, connection_opts) do
190✔
153
      {:ok, pid} -> {:ok, pid}
181✔
154
      {:error, {:already_started, pid}} -> {:ok, pid}
×
155
      {:error, {:bad_return_from_init, {:stop, error}}} -> {:error, error}
×
156
      {:error, error} -> {:error, error}
×
157
    end
158
  end
159

160
  @impl true
161
  def init(%__MODULE__{tenant_id: tenant_id, monitored_pid: monitored_pid} = state) do
162
    Logger.metadata(external_id: tenant_id, project: tenant_id)
189✔
163
    Process.monitor(monitored_pid)
189✔
164

165
    state = %{
189✔
166
      state
167
      | publication_name: publication_name(@schema, @table),
168
        replication_slot_name: replication_slot_name(@schema, @table)
169
    }
170

171
    Logger.info("Initializing connection with the status: #{inspect(state, pretty: true)}")
189✔
172

173
    {:ok, state}
174
  end
175

176
  @impl true
177
  def handle_connect(state) do
178
    replication_slot_name = replication_slot_name(@schema, @table)
188✔
179
    Logger.info("Checking if replication slot #{replication_slot_name} exists")
188✔
180

181
    query = "SELECT * FROM pg_replication_slots WHERE slot_name = '#{replication_slot_name}'"
188✔
182

183
    {:query, query, %{state | step: :check_replication_slot}}
188✔
184
  end
185

186
  @impl true
187
  def handle_result([%Postgrex.Result{num_rows: 1}], %__MODULE__{step: :check_replication_slot}) do
1✔
188
    {:disconnect, {:shutdown, "Temporary Replication slot already exists and in use"}}
189
  end
190

191
  def handle_result([%Postgrex.Result{num_rows: 0}], %__MODULE__{step: :check_replication_slot} = state) do
192
    %__MODULE__{
193
      output_plugin: output_plugin,
194
      replication_slot_name: replication_slot_name,
195
      step: :check_replication_slot
196
    } = state
187✔
197

198
    Logger.info("Create replication slot #{replication_slot_name} using plugin #{output_plugin}")
187✔
199

200
    query = "CREATE_REPLICATION_SLOT #{replication_slot_name} TEMPORARY LOGICAL #{output_plugin} NOEXPORT_SNAPSHOT"
187✔
201

202
    {:query, query, %{state | step: :check_publication}}
187✔
203
  end
204

205
  def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_publication} = state) do
206
    %__MODULE__{publication_name: publication_name} = state
181✔
207

208
    Logger.info("Check publication #{publication_name} for table #{@schema}.#{@table} exists")
181✔
209
    query = "SELECT * FROM pg_publication WHERE pubname = '#{publication_name}'"
181✔
210

211
    {:query, query, %{state | step: :create_publication}}
181✔
212
  end
213

214
  def handle_result([%Postgrex.Result{num_rows: 0}], %__MODULE__{step: :create_publication} = state) do
215
    %__MODULE__{publication_name: publication_name} = state
176✔
216

217
    Logger.info("Create publication #{publication_name} for table #{@schema}.#{@table}")
176✔
218
    query = "CREATE PUBLICATION #{publication_name} FOR TABLE #{@schema}.#{@table}"
176✔
219

220
    {:query, query, %{state | step: :start_replication_slot}}
176✔
221
  end
222

223
  def handle_result([%Postgrex.Result{num_rows: 1}], %__MODULE__{step: :create_publication} = state) do
224
    {:query, "SELECT 1", %{state | step: :start_replication_slot}}
5✔
225
  end
226

227
  def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do
228
    %__MODULE__{
229
      proto_version: proto_version,
230
      replication_slot_name: replication_slot_name,
231
      publication_name: publication_name
232
    } = state
181✔
233

234
    Logger.info(
181✔
235
      "Starting stream replication for slot #{replication_slot_name} using publication #{publication_name} and protocol version #{proto_version}"
181✔
236
    )
237

238
    query =
181✔
239
      "START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}')"
181✔
240

241
    {:stream, query, [], %{state | step: :streaming}}
181✔
242
  end
243

244
  # %Postgrex.Error{message: nil, postgres: %{code: :configuration_limit_exceeded, line: "291", message: "all replication slots are in use", file: "slot.c", unknown: "ERROR", severity: "ERROR", hint: "Free one or increase max_replication_slots.", routine: "ReplicationSlotCreate", pg_code: "53400"}, connection_id: 217538, query: nil}
245
  def handle_result(%Postgrex.Error{postgres: %{pg_code: pg_code}}, _state) when pg_code in ~w(53300 53400) do
2✔
246
    {:disconnect, :max_wal_senders_reached}
247
  end
248

UNCOV
249
  def handle_result(%Postgrex.Error{postgres: %{message: message}}, _state) do
×
UNCOV
250
    {:disconnect, "Error starting replication: #{message}"}
×
251
  end
252

253
  @impl true
254
  def handle_data(data, state) when is_keep_alive(data) do
255
    %KeepAlive{reply: reply, wal_end: wal_end} = parse(data)
1,377✔
256
    wal_end = wal_end + 1
1,377✔
257

258
    message =
1,377✔
259
      case reply do
260
        :now -> standby_status(wal_end, wal_end, wal_end, reply)
×
261
        :later -> hold()
1,377✔
262
      end
263

264
    {:noreply, message, state}
1,377✔
265
  end
266

267
  def handle_data(data, state) when is_write(data) do
268
    %Write{message: message} = parse(data)
618✔
269
    message |> decode_message() |> then(&send(self(), &1))
618✔
270
    {:noreply, [], state}
618✔
271
  end
272

273
  def handle_data(e, state) do
274
    log_error("UnexpectedMessageReceived", e)
×
275
    {:noreply, [], state}
×
276
  end
277

278
  @impl true
279
  def handle_info(%Decoder.Messages.Begin{commit_timestamp: commit_timestamp}, state) do
280
    latency_committed_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(commit_timestamp, :millisecond)
98✔
281
    {:noreply, %{state | latency_committed_at: latency_committed_at}}
282
  end
283

284
  def handle_info(%Decoder.Messages.Relation{} = msg, state) do
320✔
285
    %Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
320✔
286
    %{relations: relations} = state
320✔
287
    relation = %{name: name, columns: columns, namespace: namespace}
320✔
288
    relations = Map.put(relations, id, relation)
320✔
289
    {:noreply, %{state | relations: relations}}
290
  rescue
291
    e ->
×
292
      log_error("UnableToBroadcastChanges", e)
×
293
      {:noreply, state}
294
  catch
295
    e ->
296
      log_error("UnableToBroadcastChanges", e)
×
297
      {:noreply, state}
298
  end
299

300
  def handle_info(%Decoder.Messages.Insert{} = msg, state) do
44✔
301
    %Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
44✔
302
    %{relations: relations, tenant_id: tenant_id, latency_committed_at: latency_committed_at} = state
44✔
303

304
    with %{columns: columns} <- Map.get(relations, relation_id),
44✔
305
         to_broadcast = tuple_to_map(tuple_data, columns),
44✔
306
         {:ok, payload} <- get_or_error(to_broadcast, "payload", :payload_missing),
44✔
307
         {:ok, inserted_at} <- get_or_error(to_broadcast, "inserted_at", :inserted_at_missing),
26✔
308
         {:ok, event} <- get_or_error(to_broadcast, "event", :event_missing),
26✔
309
         {:ok, id} <- get_or_error(to_broadcast, "id", :id_missing),
25✔
310
         {:ok, topic} <- get_or_error(to_broadcast, "topic", :topic_missing),
25✔
311
         {:ok, private} <- get_or_error(to_broadcast, "private", :private_missing),
25✔
312
         %Tenant{} = tenant <- Cache.get_tenant_by_external_id(tenant_id),
25✔
313
         broadcast_message = %{
25✔
314
           id: id,
315
           topic: topic,
316
           event: event,
317
           private: private,
318
           payload: Map.put_new(payload, "id", id)
319
         },
320
         :ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do
25✔
321
      inserted_at = NaiveDateTime.from_iso8601!(inserted_at)
25✔
322
      latency_inserted_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(inserted_at)
25✔
323

324
      Telemetry.execute(
25✔
325
        [:realtime, :tenants, :broadcast_from_database],
326
        %{latency_committed_at: latency_committed_at, latency_inserted_at: latency_inserted_at},
327
        %{tenant: tenant_id}
328
      )
329

330
      {:noreply, state}
331
    else
332
      {:error, error} ->
333
        log_error("UnableToBroadcastChanges", error)
19✔
334
        {:noreply, state}
335

336
      _ ->
×
337
        {:noreply, state}
338
    end
339
  rescue
340
    e ->
×
341
      log_error("UnableToBroadcastChanges", e)
×
342
      {:noreply, state}
343
  catch
344
    e ->
345
      log_error("UnableToBroadcastChanges", e)
×
346
      {:noreply, state}
347
  end
348

349
  def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :shutdown}
177✔
350
  def handle_info(_, state), do: {:noreply, state}
160✔
351

352
  @impl true
353
  def handle_disconnect(state) do
354
    Logger.warning("Disconnecting broadcast changes handler in the step : #{inspect(state.step)}")
186✔
355
    {:noreply, %{state | step: :disconnected}}
356
  end
357

358
  @spec supervisor_spec(Tenant.t()) :: term()
359
  def supervisor_spec(%Tenant{external_id: tenant_id}) do
360
    {:via, PartitionSupervisor, {__MODULE__.DynamicSupervisor, tenant_id}}
184✔
361
  end
362

363
  def publication_name(schema, table) do
364
    "supabase_#{schema}_#{table}_publication"
189✔
365
  end
366

367
  def replication_slot_name(schema, table) do
368
    "supabase_#{schema}_#{table}_replication_slot_#{slot_suffix()}"
377✔
369
  end
370

371
  defp slot_suffix, do: Application.get_env(:realtime, :slot_name_suffix)
377✔
372

373
  defp tuple_to_map(tuple_data, columns) do
374
    tuple_data
375
    |> Tuple.to_list()
376
    |> Enum.zip(columns)
377
    |> Map.new(fn
44✔
378
      {nil, %{name: name}} -> {name, nil}
37✔
379
      {value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
26✔
380
      {value, %{name: name, type: "bool"}} -> {name, value == "t"}
44✔
381
      {value, %{name: name}} -> {name, value}
245✔
382
    end)
383
  end
384

385
  defp get_or_error(map, key, error_type) do
386
    case Map.get(map, key) do
171✔
387
      nil -> {:error, error_type}
19✔
388
      value -> {:ok, value}
152✔
389
    end
390
  end
391
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc