• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 1ade961f479f27895cfee27bf5fa17f58a8aa15f-PR-1403

05 Jun 2025 02:12PM UTC coverage: 83.763% (+0.7%) from 83.027%
1ade961f479f27895cfee27bf5fa17f58a8aa15f-PR-1403

Pull #1403

github

filipecabaco
fix tests
Pull Request #1403: fix: handle tuple errors

5 of 9 new or added lines in 2 files covered. (55.56%)

7 existing lines in 3 files now uncovered.

1821 of 2174 relevant lines covered (83.76%)

4367.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.64
/lib/realtime/tenants/replication_connection.ex
1
defmodule Realtime.Tenants.ReplicationConnection do
2
  @moduledoc """
3
  ReplicationConnection it's the module that provides a way to stream data from a PostgreSQL database using logical replication.
4

5
  ## Struct parameters
6
  * `connection_opts` - The connection options to connect to the database.
7
  * `table` - The table to replicate. If `:all` is passed, it will replicate all tables.
8
  * `schema` - The schema of the table to replicate. If not provided, it will use the `public` schema. If `:all` is passed, this option is ignored.
9
  * `opts` - The options to pass to this module
10
  * `step` - The current step of the replication process
11
  * `publication_name` - The name of the publication to create. If not provided, it will use the schema and table name.
12
  * `replication_slot_name` - The name of the replication slot to create. If not provided, it will use the schema and table name.
13
  * `output_plugin` - The output plugin to use. Default is `pgoutput`.
14
  * `proto_version` - The protocol version to use. Default is `1`.
15
  * `handler_module` - The module that will handle the data received from the replication stream.
16
  * `metadata` - The metadata to pass to the handler module.
17

18
  """
19
  use Postgrex.ReplicationConnection
20
  use Realtime.Logs
21

22
  import Realtime.Adapters.Postgres.Protocol
23
  import Realtime.Adapters.Postgres.Decoder
24

25
  alias Realtime.Adapters.Postgres.Decoder
26
  alias Realtime.Adapters.Postgres.Protocol.KeepAlive
27
  alias Realtime.Adapters.Postgres.Protocol.Write
28
  alias Realtime.Api.Tenant
29
  alias Realtime.Database
30
  alias Realtime.Tenants.BatchBroadcast
31
  alias Realtime.Tenants.Cache
32

33
  @type t :: %__MODULE__{
34
          tenant_id: String.t(),
35
          table: String.t(),
36
          schema: String.t(),
37
          opts: Keyword.t(),
38
          step:
39
            :disconnected
40
            | :check_replication_slot
41
            | :create_publication
42
            | :check_publication
43
            | :create_slot
44
            | :start_replication_slot
45
            | :streaming,
46
          publication_name: String.t(),
47
          replication_slot_name: String.t(),
48
          output_plugin: String.t(),
49
          proto_version: integer(),
50
          relations: map(),
51
          buffer: list(),
52
          monitored_pid: pid()
53
        }
54
  defstruct tenant_id: nil,
55
            table: nil,
56
            schema: "public",
57
            opts: [],
58
            step: :disconnected,
59
            publication_name: nil,
60
            replication_slot_name: nil,
61
            output_plugin: "pgoutput",
62
            proto_version: 1,
63
            relations: %{},
64
            buffer: [],
65
            monitored_pid: nil
66

67
  @doc """
68
  Starts the replication connection for a tenant and monitors a given pid to stop the ReplicationConnection.
69
  """
70
  @spec start(Realtime.Api.Tenant.t(), pid()) :: {:ok, pid()} | {:error, any()}
71
  def start(tenant, monitored_pid) do
72
    Logger.info("Starting replication for Broadcast Changes")
211✔
73
    opts = %__MODULE__{tenant_id: tenant.external_id, monitored_pid: monitored_pid}
211✔
74
    supervisor_spec = supervisor_spec(tenant)
211✔
75

76
    child_spec = %{
211✔
77
      id: __MODULE__,
78
      start: {__MODULE__, :start_link, [opts]},
79
      restart: :transient,
80
      type: :worker
81
    }
82

83
    case DynamicSupervisor.start_child(supervisor_spec, child_spec) do
211✔
84
      {:ok, pid} -> {:ok, pid}
166✔
85
      {:error, {:already_started, pid}} -> {:ok, pid}
×
86
      {:error, {:bad_return_from_init, {:stop, error, _}}} -> {:error, error}
×
87
      {:error, %Postgrex.Error{postgres: %{pg_code: "53300"}}} -> {:error, :max_wal_senders_reached}
4✔
88
      error -> error
41✔
89
    end
90
  end
91

92
  @doc """
93
  Finds replication connection by tenant_id
94
  """
95
  @spec whereis(String.t()) :: pid() | nil
96
  def whereis(tenant_id) do
97
    case Registry.lookup(Realtime.Registry.Unique, {__MODULE__, tenant_id}) do
16✔
98
      [{pid, _}] -> pid
14✔
99
      [] -> nil
2✔
100
    end
101
  end
102

103
  def start_link(%__MODULE__{tenant_id: tenant_id} = attrs) do
104
    tenant = Cache.get_tenant_by_external_id(tenant_id)
315✔
105
    connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop)
313✔
106

107
    connection_opts =
302✔
108
      [
109
        name: {:via, Registry, {Realtime.Registry.Unique, {__MODULE__, tenant_id}}},
110
        hostname: connection_opts.hostname,
302✔
111
        username: connection_opts.username,
302✔
112
        password: connection_opts.password,
302✔
113
        database: connection_opts.database,
302✔
114
        port: connection_opts.port,
302✔
115
        socket_options: connection_opts.socket_options,
302✔
116
        ssl: connection_opts.ssl,
302✔
117
        backoff_type: :stop,
118
        sync_connect: true,
119
        parameters: [
120
          application_name: "realtime_replication_connection"
121
        ]
122
      ]
123

124
    case Postgrex.ReplicationConnection.start_link(__MODULE__, attrs, connection_opts) do
302✔
125
      {:ok, pid} -> {:ok, pid}
261✔
UNCOV
126
      {:error, {:already_started, pid}} -> {:ok, pid}
×
127
      {:error, {:bad_return_from_init, {:stop, error}}} -> {:error, error}
×
128
      {:error, error} -> {:error, error}
41✔
129
    end
130
  end
131

132
  @impl true
133
  def init(%__MODULE__{tenant_id: tenant_id, monitored_pid: monitored_pid} = state) do
134
    Logger.metadata(external_id: tenant_id, project: tenant_id)
302✔
135
    Process.monitor(monitored_pid)
302✔
136
    state = %{state | table: "messages", schema: "realtime"}
302✔
137

138
    state = %{
302✔
139
      state
140
      | publication_name: publication_name(state),
141
        replication_slot_name: replication_slot_name(state)
142
    }
143

144
    Logger.info("Initializing connection with the status: #{inspect(state, pretty: true)}")
302✔
145

146
    {:ok, state}
147
  end
148

149
  @impl true
150
  def handle_connect(state) do
151
    replication_slot_name = replication_slot_name(state)
296✔
152
    Logger.info("Checking if replication slot #{replication_slot_name} exists")
296✔
153

154
    query =
296✔
155
      "SELECT * FROM pg_replication_slots WHERE slot_name = '#{replication_slot_name}'"
296✔
156

157
    {:query, query, %{state | step: :check_replication_slot}}
296✔
158
  end
159

160
  @impl true
161
  def handle_result([%Postgrex.Result{num_rows: 1}], %__MODULE__{step: :check_replication_slot}) do
2✔
162
    {:disconnect, "Temporary Replication slot already exists and in use"}
163
  end
164

165
  def handle_result(
166
        [%Postgrex.Result{num_rows: 0}],
167
        %__MODULE__{step: :check_replication_slot} = state
168
      ) do
169
    %__MODULE__{
170
      output_plugin: output_plugin,
171
      replication_slot_name: replication_slot_name,
172
      step: :check_replication_slot
173
    } = state
291✔
174

175
    Logger.info("Create replication slot #{replication_slot_name} using plugin #{output_plugin}")
291✔
176

177
    query =
291✔
178
      "CREATE_REPLICATION_SLOT #{replication_slot_name} TEMPORARY LOGICAL #{output_plugin} NOEXPORT_SNAPSHOT"
291✔
179

180
    {:query, query, %{state | step: :check_publication}}
291✔
181
  end
182

183
  def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_publication} = state) do
184
    %__MODULE__{table: table, schema: schema, publication_name: publication_name} = state
276✔
185

186
    Logger.info("Check publication #{publication_name} for table #{schema}.#{table} exists")
276✔
187
    query = "SELECT * FROM pg_publication WHERE pubname = '#{publication_name}'"
276✔
188

189
    {:query, query, %{state | step: :create_publication}}
276✔
190
  end
191

192
  def handle_result(
193
        [%Postgrex.Result{num_rows: 0}],
194
        %__MODULE__{step: :create_publication} = state
195
      ) do
196
    %__MODULE__{table: table, schema: schema, publication_name: publication_name} = state
175✔
197

198
    Logger.info("Create publication #{publication_name} for table #{schema}.#{table}")
175✔
199
    query = "CREATE PUBLICATION #{publication_name} FOR TABLE #{schema}.#{table}"
175✔
200

201
    {:query, query, %{state | step: :start_replication_slot}}
175✔
202
  end
203

204
  def handle_result(
205
        [%Postgrex.Result{num_rows: 1}],
206
        %__MODULE__{step: :create_publication} = state
207
      ) do
208
    {:query, "SELECT 1", %{state | step: :start_replication_slot}}
101✔
209
  end
210

211
  @impl true
212
  def handle_result(
213
        [%Postgrex.Result{}],
214
        %__MODULE__{step: :start_replication_slot} = state
215
      ) do
216
    %__MODULE__{
217
      proto_version: proto_version,
218
      replication_slot_name: replication_slot_name,
219
      publication_name: publication_name
220
    } = state
264✔
221

222
    Logger.info(
264✔
223
      "Starting stream replication for slot #{replication_slot_name} using publication #{publication_name} and protocol version #{proto_version}"
×
224
    )
225

226
    query =
264✔
227
      "START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}')"
264✔
228

229
    {:stream, query, [], %{state | step: :streaming}}
264✔
230
  end
231

232
  def handle_result(%Postgrex.Error{postgres: %{message: message}}, _state) do
×
233
    {:disconnect, "Error starting replication: #{message}"}
×
234
  end
235

236
  @impl true
237
  def handle_data(data, state) when is_keep_alive(data) do
238
    %KeepAlive{reply: reply, wal_end: wal_end} = parse(data)
1,175✔
239
    wal_end = wal_end + 1
1,175✔
240

241
    message =
1,175✔
242
      case reply do
243
        :now -> standby_status(wal_end, wal_end, wal_end, reply)
10✔
244
        :later -> hold()
1,165✔
245
      end
246

247
    {:noreply, message, state}
1,175✔
248
  end
249

250
  def handle_data(data, state) when is_write(data) do
251
    %Write{message: message} = parse(data)
228✔
252
    message |> decode_message() |> then(&send(self(), &1))
228✔
253
    {:noreply, [], state}
228✔
254
  end
255

256
  def handle_data(e, state) do
257
    log_error("UnexpectedMessageReceived", e)
×
258
    {:noreply, [], state}
×
259
  end
260

261
  @impl true
262
  def handle_info(%Decoder.Messages.Relation{} = msg, state) do
22✔
263
    %Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
22✔
264
    %{relations: relations} = state
22✔
265
    relation = %{name: name, columns: columns, namespace: namespace}
22✔
266
    relations = Map.put(relations, id, relation)
22✔
267
    {:noreply, %{state | relations: relations}}
268
  rescue
269
    e ->
×
270
      log_error("UnableToBroadcastChanges", e)
×
271
      {:noreply, state}
272
  catch
273
    e ->
274
      log_error("UnableToBroadcastChanges", e)
×
275
      {:noreply, state}
276
  end
277

278
  def handle_info(%Decoder.Messages.Insert{} = msg, state) do
74✔
279
    %Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
74✔
280
    %{relations: relations, tenant_id: tenant_id} = state
74✔
281

282
    case Map.get(relations, relation_id) do
74✔
283
      %{columns: columns} ->
284
        to_broadcast =
74✔
285
          tuple_data
286
          |> Tuple.to_list()
287
          |> Enum.zip(columns)
288
          |> Map.new(fn
289
            {nil, %{name: name}} -> {name, nil}
72✔
290
            {value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
38✔
291
            {value, %{name: name, type: "bool"}} -> {name, value == "t"}
74✔
292
            {value, %{name: name}} -> {name, value}
408✔
293
          end)
294

295
        payload = Map.get(to_broadcast, "payload")
74✔
296

297
        case payload do
74✔
298
          nil ->
36✔
299
            {:noreply, state}
300

301
          payload ->
302
            id = Map.fetch!(to_broadcast, "id")
38✔
303

304
            to_broadcast =
38✔
305
              %{
306
                topic: Map.fetch!(to_broadcast, "topic"),
307
                event: Map.fetch!(to_broadcast, "event"),
308
                private: Map.fetch!(to_broadcast, "private"),
309
                # Avoid overriding user provided id
310
                payload: Map.put_new(payload, "id", id)
311
              }
312

313
            %Tenant{} = tenant = Cache.get_tenant_by_external_id(tenant_id)
38✔
314

315
            case BatchBroadcast.broadcast(nil, tenant, %{messages: [to_broadcast]}, true) do
38✔
316
              :ok -> :ok
38✔
317
              error -> log_error("UnableToBatchBroadcastChanges", error)
×
318
            end
319

320
            {:noreply, state}
321
        end
322

323
      _ ->
324
        log_error("UnknownBroadcastChangesRelation", "Relation ID not found: #{relation_id}")
×
325
        {:noreply, state}
326
    end
327
  rescue
328
    e ->
×
329
      log_error("UnableToBroadcastChanges", e)
×
330
      {:noreply, state}
331
  catch
332
    e ->
333
      log_error("UnableToBroadcastChanges", e)
×
334
      {:noreply, state}
335
  end
336

337
  def handle_info(:shutdown, _), do: {:disconnect, :normal}
×
338
  def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal}
160✔
339
  def handle_info(_, state), do: {:noreply, state}
140✔
340

341
  @impl true
342
  def handle_disconnect(state) do
343
    Logger.warning("Disconnecting broadcast changes handler in the step : #{inspect(state.step)}")
287✔
344
    {:noreply, %{state | step: :disconnected}}
345
  end
346

347
  @spec supervisor_spec(Tenant.t()) :: term()
348
  def supervisor_spec(%Tenant{external_id: tenant_id}) do
349
    {:via, PartitionSupervisor, {__MODULE__.DynamicSupervisor, tenant_id}}
211✔
350
  end
351

352
  def publication_name(%__MODULE__{table: table, schema: schema}) do
353
    "supabase_#{schema}_#{table}_publication"
302✔
354
  end
355

356
  def replication_slot_name(%__MODULE__{table: table, schema: schema}) do
357
    "supabase_#{schema}_#{table}_replication_slot_#{slot_suffix()}"
598✔
358
  end
359

360
  defp slot_suffix, do: Application.get_env(:realtime, :slot_name_suffix)
598✔
361
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc