• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 3ad59827436aefbedeccac74876345f233e1cce2

05 Sep 2025 09:01PM UTC coverage: 63.832% (+5.4%) from 58.423%
3ad59827436aefbedeccac74876345f233e1cce2

push

github

jallum
Merge branch 'release/0.2.0'

1855 of 2728 new or added lines in 138 files covered. (68.0%)

49 existing lines in 22 files now uncovered.

3228 of 5057 relevant lines covered (63.83%)

597.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.98
/lib/bedrock/data_plane/commit_proxy/server.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Server do
2
  @moduledoc """
3
  GenServer implementation of the Commit Proxy.
4

5
  ## Overview
6

7
  The Commit Proxy batches transaction requests from clients to optimize throughput while
8
  maintaining strict consistency guarantees. It coordinates with resolvers for conflict
9
  detection and logs for durable persistence.
10

11
  ## Lifecycle
12

13
  1. **Initialization**: Starts in `:locked` mode, waiting for recovery completion
14
  2. **Recovery**: Director calls `recover_from/3` to provide transaction system layout and unlock
15
  3. **Transaction Processing**: Accepts `:commit` calls, batches transactions, and finalizes
16
  4. **Empty Transaction Timeout**: Creates empty transactions during quiet periods to advance read versions
17

18
  ## Batching Strategy
19

20
  - **Size-based**: Batches finalize when reaching `max_per_batch` transactions
21
  - **Time-based**: Batches finalize after `max_latency_in_ms` milliseconds
22
  - **Immediate**: Single transactions may bypass batching for low-latency processing
23

24
  ## Timeout Mechanisms
25

26
  - **Fast timeout (0ms)**: Allows GenServer to process any queued `:commit` messages before
27
    finalizing the current batch, ensuring optimal batching efficiency
28
  - **Empty transaction timeout**: Creates empty `{nil, %{}}` transactions during quiet periods
29
    to keep read versions advancing and provide system health checking
30

31
  ## Error Handling
32

33
  Uses fail-fast recovery model where unrecoverable errors (sequencer unavailable, log failures)
34
  trigger process exit and Director-coordinated cluster recovery.
35
  """
36

37
  use GenServer
38

39
  import Bedrock.DataPlane.CommitProxy.Batching,
40
    only: [
41
      start_batch_if_needed: 1,
42
      add_transaction_to_batch: 3,
43
      apply_finalization_policy: 1,
44
      single_transaction_batch: 2
45
    ]
46

47
  import Bedrock.DataPlane.CommitProxy.Finalization, only: [finalize_batch: 3]
48

49
  import Bedrock.DataPlane.CommitProxy.Telemetry,
50
    only: [
51
      trace_metadata: 1,
52
      trace_commit_proxy_batch_started: 3,
53
      trace_commit_proxy_batch_finished: 4,
54
      trace_commit_proxy_batch_failed: 3
55
    ]
56

57
  import Bedrock.Internal.GenServer.Replies
58

59
  alias Bedrock.Cluster
60
  alias Bedrock.DataPlane.CommitProxy.Batch
61
  alias Bedrock.DataPlane.CommitProxy.State
62
  alias Bedrock.DataPlane.Transaction
63
  alias Bedrock.Internal.Time
64

65
  @spec child_spec(
66
          opts :: [
67
            cluster: Cluster.t(),
68
            director: pid(),
69
            epoch: Bedrock.epoch(),
70
            lock_token: Bedrock.lock_token(),
71
            instance: non_neg_integer(),
72
            max_latency_in_ms: non_neg_integer(),
73
            max_per_batch: pos_integer(),
74
            empty_transaction_timeout_ms: non_neg_integer()
75
          ]
76
        ) :: Supervisor.child_spec() | no_return()
77
  def child_spec(opts) do
78
    cluster = opts[:cluster] || raise "Missing :cluster option"
18✔
79
    director = opts[:director] || raise "Missing :director option"
18✔
80
    epoch = opts[:epoch] || raise "Missing :epoch option"
18✔
81
    lock_token = opts[:lock_token] || raise "Missing :lock_token option"
18✔
82
    instance = opts[:instance] || raise "Missing :instance option"
18✔
83
    max_latency_in_ms = opts[:max_latency_in_ms] || 1
18✔
84
    max_per_batch = opts[:max_per_batch] || 10
18✔
85
    empty_transaction_timeout_ms = opts[:empty_transaction_timeout_ms] || 1_000
18✔
86

87
    %{
18✔
88
      id: {__MODULE__, cluster, epoch, instance},
89
      start:
90
        {GenServer, :start_link,
91
         [
92
           __MODULE__,
93
           {cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}
94
         ]},
95
      restart: :temporary
96
    }
97
  end
98

99
  @impl true
100
  @spec init({module(), pid(), Bedrock.epoch(), non_neg_integer(), pos_integer(), non_neg_integer(), binary()}) ::
101
          {:ok, State.t(), timeout()}
102
  def init({cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}) do
103
    # Monitor the Director - if it dies, this commit proxy should terminate
104
    Process.monitor(director)
1✔
105

106
    trace_metadata(%{cluster: cluster, pid: self()})
1✔
107

108
    then(
1✔
109
      %State{
110
        cluster: cluster,
111
        director: director,
112
        epoch: epoch,
113
        max_latency_in_ms: max_latency_in_ms,
114
        max_per_batch: max_per_batch,
115
        empty_transaction_timeout_ms: empty_transaction_timeout_ms,
116
        lock_token: lock_token
117
      },
118
      &{:ok, &1, empty_transaction_timeout_ms}
1✔
119
    )
120
  end
121

122
  @impl true
123
  @spec terminate(term(), State.t()) :: :ok
124
  def terminate(_reason, t) do
NEW
125
    abort_current_batch(t)
×
126
    :ok
127
  end
128

129
  @impl true
130
  @spec handle_call(
131
          {:recover_from, binary(), map()} | {:commit, Bedrock.transaction()},
132
          GenServer.from(),
133
          State.t()
134
        ) ::
135
          {:reply, term(), State.t()} | {:noreply, State.t(), timeout() | {:continue, term()}}
136
  def handle_call({:recover_from, lock_token, transaction_system_layout}, _from, %{mode: :locked} = t) do
UNCOV
137
    if lock_token == t.lock_token do
×
NEW
138
      reply(%{t | transaction_system_layout: transaction_system_layout, mode: :running}, :ok)
×
139
    else
NEW
140
      reply(t, {:error, :unauthorized})
×
141
    end
142
  end
143

144
  def handle_call({:commit, transaction}, _from, %{mode: :running, transaction_system_layout: nil} = t)
145
      when is_binary(transaction) do
NEW
146
    reply(t, {:error, :no_transaction_system_layout})
×
147
  end
148

149
  def handle_call({:commit, transaction}, from, %{mode: :running} = t) when is_binary(transaction) do
NEW
150
    case start_batch_if_needed(t) do
×
151
      {:error, reason} ->
NEW
152
        GenServer.reply(from, {:error, :abort})
×
NEW
153
        exit(reason)
×
154

155
      updated_t ->
156
        updated_t
157
        |> add_transaction_to_batch(transaction, reply_fn(from))
158
        |> apply_finalization_policy()
159
        |> case do
×
NEW
160
          {t, nil} -> noreply(t, timeout: 0)
×
NEW
161
          {t, batch} -> noreply(t, continue: {:finalize, batch})
×
162
        end
163
    end
164
  end
165

NEW
166
  def handle_call({:commit, _transaction}, _from, %{mode: :locked} = t), do: reply(t, {:error, :locked})
×
167

168
  @impl true
169
  @spec handle_info(:timeout, State.t()) ::
170
          {:noreply, State.t()} | {:noreply, State.t(), {:continue, term()}}
171
  def handle_info(:timeout, %{batch: nil, mode: :running} = t) do
172
    empty_transaction = Transaction.encode(%{mutations: []})
1✔
173

174
    case single_transaction_batch(t, empty_transaction) do
1✔
175
      {:ok, batch} ->
NEW
176
        noreply(t, continue: {:finalize, batch})
×
177

178
      {:error, :sequencer_unavailable} ->
179
        exit({:sequencer_unavailable, :timeout_empty_transaction})
1✔
180
    end
181
  end
182

183
  def handle_info(:timeout, %{batch: nil} = t), do: noreply(t, timeout: t.empty_transaction_timeout_ms)
1✔
184

185
  def handle_info(:timeout, %{batch: batch} = t), do: noreply(%{t | batch: nil}, continue: {:finalize, batch})
1✔
186

187
  def handle_info({:DOWN, _ref, :process, director_pid, _reason}, %{director: director_pid} = t) do
188
    # Director has died - this commit proxy should terminate gracefully
NEW
189
    {:stop, :normal, t}
×
190
  end
191

NEW
192
  def handle_info(_msg, t) do
×
193
    {:noreply, t}
194
  end
195

196
  @impl true
197
  @spec handle_continue({:finalize, Batch.t()}, State.t()) :: {:noreply, State.t()}
198
  def handle_continue({:finalize, batch}, t) do
199
    trace_commit_proxy_batch_started(batch.commit_version, length(batch.buffer), Time.now_in_ms())
×
200

NEW
201
    case :timer.tc(fn -> finalize_batch(batch, t.transaction_system_layout, epoch: t.epoch) end) do
×
202
      {n_usec, {:ok, n_aborts, n_oks}} ->
203
        trace_commit_proxy_batch_finished(batch.commit_version, n_aborts, n_oks, n_usec)
×
204
        maybe_set_empty_transaction_timeout(t)
×
205

206
      {n_usec, {:error, {:log_failures, errors}}} ->
207
        trace_commit_proxy_batch_failed(batch, {:log_failures, errors}, n_usec)
×
UNCOV
208
        exit({:log_failures, errors})
×
209

210
      {n_usec, {:error, {:insufficient_acknowledgments, count, required, errors}}} ->
211
        trace_commit_proxy_batch_failed(
×
212
          batch,
213
          {:insufficient_acknowledgments, count, required, errors},
214
          n_usec
215
        )
216

217
        exit({:insufficient_acknowledgments, count, required, errors})
×
218

219
      {n_usec, {:error, {:resolver_unavailable, reason}}} ->
220
        trace_commit_proxy_batch_failed(batch, {:resolver_unavailable, reason}, n_usec)
×
221
        exit({:resolver_unavailable, reason})
×
222

223
      {n_usec, {:error, {:storage_team_coverage_error, key}}} ->
224
        trace_commit_proxy_batch_failed(batch, {:storage_team_coverage_error, key}, n_usec)
×
225
        exit({:storage_team_coverage_error, key})
×
226

227
      {n_usec, {:error, reason}} ->
228
        trace_commit_proxy_batch_failed(batch, reason, n_usec)
×
NEW
229
        exit({:unknown_error, reason})
×
230
    end
231
  end
232

233
  @spec reply_fn(GenServer.from()) :: Batch.reply_fn()
234
  def reply_fn(from), do: &GenServer.reply(from, &1)
×
235

236
  @spec maybe_set_empty_transaction_timeout(State.t()) :: {:noreply, State.t()}
237
  defp maybe_set_empty_transaction_timeout(%{mode: :running} = t),
NEW
238
    do: noreply(t, timeout: t.empty_transaction_timeout_ms)
×
239

NEW
240
  defp maybe_set_empty_transaction_timeout(t), do: noreply(t)
×
241

242
  @spec abort_current_batch(State.t()) :: :ok
NEW
243
  defp abort_current_batch(%{batch: nil}), do: :ok
×
244

245
  defp abort_current_batch(%{batch: batch}) do
246
    batch
247
    |> Batch.all_callers()
NEW
248
    |> Enum.each(fn reply_fn -> reply_fn.({:error, :abort}) end)
×
249
  end
250

251
  @doc """
252
  Updates the transaction system layout if the transaction contains system layout information.
253

254
  This function extracts transaction system layout from system transactions that contain
255
  the layout key and updates the commit proxy's state accordingly.
256
  """
257
  @spec maybe_update_layout_from_transaction(State.t(), Bedrock.transaction()) :: State.t()
258
  def maybe_update_layout_from_transaction(state, {_reads, writes}) when is_map(writes) do
259
    layout_key = "\xff/system/transaction_system_layout"
5✔
260

261
    case Map.get(writes, layout_key) do
5✔
262
      nil ->
263
        state
3✔
264

265
      encoded_layout when is_binary(encoded_layout) ->
266
        try do
2✔
267
          layout = :erlang.binary_to_term(encoded_layout)
2✔
268
          %{state | transaction_system_layout: layout}
2✔
269
        rescue
270
          _ ->
UNCOV
271
            state
×
272
        end
273

274
      _ ->
275
        state
×
276
    end
277
  end
278

279
  def maybe_update_layout_from_transaction(state, _transaction), do: state
2✔
280
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc