• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / realtime / 3d04303c3cf845547393948866fd484ab864bc82-PR-1261

14 Jan 2025 11:46AM UTC coverage: 74.988%. First build
3d04303c3cf845547393948866fd484ab864bc82-PR-1261

Pull #1261

github

filipecabaco
fix: Add test coverage to repo; delete cdc stream

* Attempt to send coverage to coveralls - https://coveralls.io/github/supabase/realtime
* Removes CDC Stream code
* Fix small compilation error with Gettext deprecation warnings
* Renamed Realtime.Tenants.BroadcastChanges.Handler to Realtime.Tenants.ReplicationConnection
* Reverses PID monitoring so Realtime.Tenants.ReplicationConnection and Realtime.Tenants.Listen are the ones tracking Realtime.Connect and die accordingly, reducing probability of process linking issues
* Fixed Realtime.Tenants.Connect.shutdown/1 that was killing the Postgrex connection instead of killing Realtime.Tenants.Connect potentially leading to issues in the state of the connection
Pull Request #1261: fix: Add test coverage to repo; delete cdc stream; improve ReplicationConnection lifecycle

54 of 61 new or added lines in 6 files covered. (88.52%)

1532 of 2043 relevant lines covered (74.99%)

304.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.64
/lib/realtime/tenants/replication_connection.ex
1
defmodule Realtime.Tenants.ReplicationConnection do
2
  @moduledoc """
3
  ReplicationConnection it's the module that provides a way to stream data from a PostgreSQL database using logical replication.
4

5
  ## Struct parameters
6
  * `connection_opts` - The connection options to connect to the database.
7
  * `table` - The table to replicate. If `:all` is passed, it will replicate all tables.
8
  * `schema` - The schema of the table to replicate. If not provided, it will use the `public` schema. If `:all` is passed, this option is ignored.
9
  * `opts` - The options to pass to this module
10
  * `step` - The current step of the replication process
11
  * `publication_name` - The name of the publication to create. If not provided, it will use the schema and table name.
12
  * `replication_slot_name` - The name of the replication slot to create. If not provided, it will use the schema and table name.
13
  * `output_plugin` - The output plugin to use. Default is `pgoutput`.
14
  * `proto_version` - The protocol version to use. Default is `1`.
15
  * `handler_module` - The module that will handle the data received from the replication stream.
16
  * `metadata` - The metadata to pass to the handler module.
17

18
  """
19
  use Postgrex.ReplicationConnection
20
  require Logger
21

22
  import Realtime.Adapters.Postgres.Protocol
23
  import Realtime.Adapters.Postgres.Decoder
24
  import Realtime.Logs
25

26
  alias Realtime.Adapters.Postgres.Decoder
27
  alias Realtime.Adapters.Postgres.Protocol.KeepAlive
28
  alias Realtime.Adapters.Postgres.Protocol.Write
29
  alias Realtime.Api.Tenant
30
  alias Realtime.Database
31
  alias Realtime.Tenants.BatchBroadcast
32
  alias Realtime.Tenants.Cache
33

34
  @type t :: %__MODULE__{
35
          tenant_id: String.t(),
36
          table: String.t(),
37
          schema: String.t(),
38
          opts: Keyword.t(),
39
          step:
40
            :disconnected
41
            | :check_replication_slot
42
            | :create_publication
43
            | :check_publication
44
            | :create_slot
45
            | :start_replication_slot
46
            | :streaming,
47
          publication_name: String.t(),
48
          replication_slot_name: String.t(),
49
          output_plugin: String.t(),
50
          proto_version: integer(),
51
          relations: map(),
52
          buffer: list(),
53
          monitored_pid: pid()
54
        }
55
  defstruct tenant_id: nil,
56
            table: nil,
57
            schema: "public",
58
            opts: [],
59
            step: :disconnected,
60
            publication_name: nil,
61
            replication_slot_name: nil,
62
            output_plugin: "pgoutput",
63
            proto_version: 1,
64
            relations: %{},
65
            buffer: [],
66
            monitored_pid: nil
67

68
  @doc """
69
  Starts the replication connection for a tenant and monitors a given pid to stop the ReplicationConnection.
70
  """
71
  @spec start(Realtime.Api.Tenant.t(), pid()) :: {:ok, pid()} | {:error, any()}
72
  def start(tenant, monitored_pid) do
73
    Logger.info("Starting replication for Broadcast Changes")
58✔
74
    opts = %__MODULE__{tenant_id: tenant.external_id, monitored_pid: monitored_pid}
58✔
75
    supervisor_spec = supervisor_spec(tenant)
58✔
76

77
    child_spec = %{
58✔
78
      id: __MODULE__,
79
      start: {__MODULE__, :start_link, [opts]},
80
      restart: :transient,
81
      type: :worker
82
    }
83

84
    case DynamicSupervisor.start_child(supervisor_spec, child_spec) do
58✔
85
      {:ok, pid} -> {:ok, pid}
48✔
NEW
86
      {:error, {:already_started, pid}} -> {:ok, pid}
×
87
      {:error, {:bad_return_from_init, {:stop, error, _}}} -> {:error, error}
10✔
NEW
88
      error -> {:error, error}
×
89
    end
90
  end
91

92
  @doc """
93
  Finds replication connection by tenant_id
94
  """
95
  @spec whereis(String.t()) :: pid() | nil
96
  def whereis(tenant_id) do
97
    case Registry.lookup(Realtime.Registry.Unique, {__MODULE__, tenant_id}) do
6✔
98
      [{pid, _}] -> pid
6✔
NEW
99
      [] -> nil
×
100
    end
101
  end
102

103
  def start_link(%__MODULE__{tenant_id: tenant_id} = attrs) do
104
    tenant = Cache.get_tenant_by_external_id(tenant_id)
61✔
105
    connection_opts = Database.from_tenant(tenant, "realtime_broadcast_changes", :stop, true)
61✔
106
    {:ok, ip_version} = Database.detect_ip_version(connection_opts.host)
61✔
107

108
    ssl = if connection_opts.ssl_enforced, do: [verify: :verify_none], else: false
61✔
109

110
    connection_opts =
61✔
111
      [
112
        name: {:via, Registry, {Realtime.Registry.Unique, {__MODULE__, tenant_id}}},
113
        hostname: connection_opts.host,
61✔
114
        username: connection_opts.user,
61✔
115
        password: connection_opts.pass,
61✔
116
        database: connection_opts.name,
61✔
117
        port: String.to_integer(connection_opts.port),
61✔
118
        socket_options: [ip_version],
119
        backoff_type: :stop,
120
        sync_connect: true,
121
        parameters: [
122
          application_name: connection_opts.application_name
61✔
123
        ],
124
        ssl: ssl
125
      ]
126

127
    case Postgrex.ReplicationConnection.start_link(__MODULE__, attrs, connection_opts) do
61✔
128
      {:ok, pid} -> {:ok, pid}
48✔
129
      {:error, {:already_started, pid}} -> {:ok, pid}
3✔
NEW
130
      {:error, {:bad_return_from_init, {:stop, error}}} -> {:error, error}
×
131
      {:error, error} -> {:error, error}
10✔
132
    end
133
  end
134

135
  @impl true
136
  def init(%__MODULE__{tenant_id: tenant_id, monitored_pid: monitored_pid} = state) do
137
    Logger.metadata(external_id: tenant_id, project: tenant_id)
58✔
138
    Process.monitor(monitored_pid)
58✔
139
    state = %{state | table: "messages", schema: "realtime"}
58✔
140

141
    state = %{
58✔
142
      state
143
      | publication_name: publication_name(state),
144
        replication_slot_name: replication_slot_name(state)
145
    }
146

147
    Logger.info("Initializing connection with the status: #{inspect(state, pretty: true)}")
58✔
148

149
    {:ok, state}
150
  end
151

152
  @impl true
153
  def handle_connect(state) do
154
    replication_slot_name = replication_slot_name(state)
55✔
155
    Logger.info("Checking if replication slot #{replication_slot_name} exists")
55✔
156

157
    query =
55✔
158
      "SELECT * FROM pg_replication_slots WHERE slot_name = '#{replication_slot_name}'"
55✔
159

160
    {:query, query, %{state | step: :check_replication_slot}}
55✔
161
  end
162

163
  @impl true
164
  def handle_result([%Postgrex.Result{num_rows: 1}], %__MODULE__{step: :check_replication_slot}) do
7✔
165
    {:disconnect, "Temporary Replication slot already exists and in use"}
166
  end
167

168
  def handle_result(
169
        [%Postgrex.Result{num_rows: 0}],
170
        %__MODULE__{step: :check_replication_slot} = state
171
      ) do
172
    %__MODULE__{
173
      output_plugin: output_plugin,
174
      replication_slot_name: replication_slot_name,
175
      step: :check_replication_slot
176
    } = state
48✔
177

178
    Logger.info("Create replication slot #{replication_slot_name} using plugin #{output_plugin}")
48✔
179

180
    query =
48✔
181
      "CREATE_REPLICATION_SLOT #{replication_slot_name} TEMPORARY LOGICAL #{output_plugin} NOEXPORT_SNAPSHOT"
48✔
182

183
    {:query, query, %{state | step: :check_publication}}
48✔
184
  end
185

186
  def handle_result([%Postgrex.Result{}], %__MODULE__{step: :check_publication} = state) do
187
    %__MODULE__{table: table, schema: schema, publication_name: publication_name} = state
48✔
188

189
    Logger.info("Check publication #{publication_name} for table #{schema}.#{table} exists")
48✔
190
    query = "SELECT * FROM pg_publication WHERE pubname = '#{publication_name}'"
48✔
191

192
    {:query, query, %{state | step: :create_publication}}
48✔
193
  end
194

195
  def handle_result(
196
        [%Postgrex.Result{num_rows: 0}],
197
        %__MODULE__{step: :create_publication} = state
198
      ) do
199
    %__MODULE__{table: table, schema: schema, publication_name: publication_name} = state
15✔
200

201
    Logger.info("Create publication #{publication_name} for table #{schema}.#{table}")
15✔
202
    query = "CREATE PUBLICATION #{publication_name} FOR TABLE #{schema}.#{table}"
15✔
203

204
    {:query, query, %{state | step: :start_replication_slot}}
15✔
205
  end
206

207
  def handle_result(
208
        [%Postgrex.Result{num_rows: 1}],
209
        %__MODULE__{step: :create_publication} = state
210
      ) do
211
    {:query, "SELECT 1", %{state | step: :start_replication_slot}}
33✔
212
  end
213

214
  @impl true
215
  def handle_result(
216
        [%Postgrex.Result{}],
217
        %__MODULE__{step: :start_replication_slot} = state
218
      ) do
219
    %__MODULE__{
220
      proto_version: proto_version,
221
      replication_slot_name: replication_slot_name,
222
      publication_name: publication_name
223
    } = state
48✔
224

225
    Logger.info(
48✔
226
      "Starting stream replication for slot #{replication_slot_name} using publication #{publication_name} and protocol version #{proto_version}"
×
227
    )
228

229
    query =
48✔
230
      "START_REPLICATION SLOT #{replication_slot_name} LOGICAL 0/0 (proto_version '#{proto_version}', publication_names '#{publication_name}')"
48✔
231

232
    {:stream, query, [], %{state | step: :streaming}}
48✔
233
  end
234

235
  def handle_result(%Postgrex.Error{postgres: %{message: message}}, _state) do
×
236
    {:disconnect, "Error starting replication: #{message}"}
×
237
  end
238

239
  @impl true
240
  def handle_disconnect(state) do
241
    Logger.warning("Disconnecting broadcast changes handler: #{inspect(state, pretty: true)}")
49✔
242
    {:noreply, %{state | step: :disconnected}}
243
  end
244

245
  @impl true
246
  def handle_data(data, state) when is_keep_alive(data) do
247
    %KeepAlive{reply: reply, wal_end: wal_end} = parse(data)
1,379✔
248
    wal_end = wal_end + 1
1,379✔
249

250
    message =
1,379✔
251
      case reply do
252
        :now -> standby_status(wal_end, wal_end, wal_end, reply)
1✔
253
        :later -> hold()
1,378✔
254
      end
255

256
    {:noreply, message, state}
1,379✔
257
  end
258

259
  def handle_data(data, state) when is_write(data) do
260
    %Write{message: message} = parse(data)
1,712✔
261
    message |> decode_message() |> then(&send(self(), &1))
1,712✔
262
    {:noreply, [], state}
1,712✔
263
  end
264

265
  def handle_data(e, state) do
266
    log_error("UnexpectedMessageReceived", e)
×
267
    {:noreply, [], state}
×
268
  end
269

270
  @impl true
271
  @spec handle_info(any(), any()) :: {:disconnect, <<_::128>>} | {:noreply, any()}
272
  def handle_info(%Decoder.Messages.Relation{} = msg, state) do
427✔
273
    %Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
427✔
274
    %{relations: relations} = state
427✔
275
    relation = %{name: name, columns: columns, namespace: namespace}
427✔
276
    relations = Map.put(relations, id, relation)
427✔
277
    {:noreply, %{state | relations: relations}}
278
  rescue
279
    e ->
×
280
      log_error("UnableToBroadcastChanges", e)
×
281
      {:noreply, state}
282
  catch
283
    e ->
284
      log_error("UnableToBroadcastChanges", e)
×
285
      {:noreply, state}
286
  end
287

288
  def handle_info(%Decoder.Messages.Insert{} = msg, state) do
136✔
289
    %Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
136✔
290
    %{relations: relations, tenant_id: tenant_id} = state
136✔
291

292
    case Map.get(relations, relation_id) do
136✔
293
      %{columns: columns} ->
294
        to_broadcast =
136✔
295
          tuple_data
296
          |> Tuple.to_list()
297
          |> Enum.zip(columns)
298
          |> Map.new(fn
299
            {nil, %{name: name}} -> {name, nil}
190✔
300
            {value, %{name: name, type: "jsonb"}} -> {name, Jason.decode!(value)}
41✔
301
            {value, %{name: name, type: "bool"}} -> {name, value == "t"}
136✔
302
            {value, %{name: name}} -> {name, value}
721✔
303
          end)
304

305
        payload = Map.get(to_broadcast, "payload")
136✔
306

307
        case payload do
136✔
308
          nil ->
95✔
309
            {:noreply, state}
310

311
          payload ->
312
            id = Map.fetch!(to_broadcast, "id")
41✔
313

314
            to_broadcast =
41✔
315
              %{
316
                topic: Map.fetch!(to_broadcast, "topic"),
317
                event: Map.fetch!(to_broadcast, "event"),
318
                private: Map.fetch!(to_broadcast, "private"),
319
                payload: Map.put(payload, "id", id)
320
              }
321

322
            %Tenant{} = tenant = Cache.get_tenant_by_external_id(tenant_id)
41✔
323

324
            case BatchBroadcast.broadcast(nil, tenant, %{messages: [to_broadcast]}, true) do
41✔
325
              :ok -> :ok
41✔
326
              error -> log_error("UnableToBatchBroadcastChanges", error)
×
327
            end
328

329
            {:noreply, state}
330
        end
331

332
      _ ->
333
        log_error("UnknownBroadcastChangesRelation", "Relation ID not found: #{relation_id}")
×
334
        {:noreply, state}
335
    end
336
  rescue
337
    e ->
×
338
      log_error("UnableToBroadcastChanges", e)
×
339
      {:noreply, state}
340
  catch
341
    e ->
342
      log_error("UnableToBroadcastChanges", e)
×
343
      {:noreply, state}
344
  end
345

NEW
346
  def handle_info(:shutdown, _), do: {:disconnect, :normal}
×
347

348
  def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :normal}
39✔
349

350
  def handle_info(_, state), do: {:noreply, state}
1,146✔
351

352
  @spec supervisor_spec(Tenant.t()) :: term()
353
  def supervisor_spec(%Tenant{external_id: tenant_id}) do
354
    {:via, PartitionSupervisor, {__MODULE__.DynamicSupervisor, tenant_id}}
58✔
355
  end
356

357
  def publication_name(%__MODULE__{table: table, schema: schema}) do
358
    "#{schema}_#{table}_publication_#{slot_suffix()}"
70✔
359
  end
360

361
  def replication_slot_name(%__MODULE__{table: table, schema: schema}) do
362
    "#{schema}_#{table}_replication_slot_#{slot_suffix()}"
113✔
363
  end
364

365
  defp slot_suffix(), do: Application.get_env(:realtime, :slot_name_suffix)
183✔
366
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc