• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 84f2ec5d5a2517957ec3c4741969ffc8414a4994

13 Aug 2025 02:57AM UTC coverage: 58.423%. Remained the same
84f2ec5d5a2517957ec3c4741969ffc8414a4994

push

github

jallum
0.1.2

- Reworked and simplified documentation
- Increased test coverage
- Project housekeeping

1515 of 2481 new or added lines in 105 files covered. (61.06%)

424 existing lines in 63 files now uncovered.

2112 of 3615 relevant lines covered (58.42%)

1870.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.18
/lib/bedrock/data_plane/commit_proxy/server.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Server do
2
  @moduledoc """
3
  GenServer implementation of the Commit Proxy.
4

5
  ## Overview
6

7
  The Commit Proxy batches transaction requests from clients to optimize throughput while
8
  maintaining strict consistency guarantees. It coordinates with resolvers for conflict
9
  detection and logs for durable persistence.
10

11
  ## Lifecycle
12

13
  1. **Initialization**: Starts in `:locked` mode, waiting for recovery completion
14
  2. **Recovery**: Director calls `recover_from/3` to provide transaction system layout and unlock
15
  3. **Transaction Processing**: Accepts `:commit` calls, batches transactions, and finalizes
16
  4. **Empty Transaction Timeout**: Creates empty transactions during quiet periods to advance read versions
17

18
  ## Batching Strategy
19

20
  - **Size-based**: Batches finalize when reaching `max_per_batch` transactions
21
  - **Time-based**: Batches finalize after `max_latency_in_ms` milliseconds
22
  - **Immediate**: Single transactions may bypass batching for low-latency processing
23

24
  ## Timeout Mechanisms
25

26
  - **Fast timeout (0ms)**: Allows GenServer to process any queued `:commit` messages before
27
    finalizing the current batch, ensuring optimal batching efficiency
28
  - **Empty transaction timeout**: Creates empty `{nil, %{}}` transactions during quiet periods
29
    to keep read versions advancing and provide system health checking
30

31
  ## Error Handling
32

33
  Uses fail-fast recovery model where unrecoverable errors (sequencer unavailable, log failures)
34
  trigger process exit and Director-coordinated cluster recovery.
35
  """
36

37
  alias Bedrock.Cluster
38
  alias Bedrock.DataPlane.CommitProxy.Batch
39
  alias Bedrock.DataPlane.CommitProxy.State
40
  alias Bedrock.Internal.Time
41

42
  import Bedrock.DataPlane.CommitProxy.Batching,
43
    only: [
44
      start_batch_if_needed: 1,
45
      add_transaction_to_batch: 3,
46
      apply_finalization_policy: 1,
47
      single_transaction_batch: 2
48
    ]
49

50
  import Bedrock.DataPlane.CommitProxy.Finalization, only: [finalize_batch: 2]
51

52
  import Bedrock.DataPlane.CommitProxy.Telemetry,
53
    only: [
54
      trace_metadata: 1,
55
      trace_commit_proxy_batch_started: 3,
56
      trace_commit_proxy_batch_finished: 4,
57
      trace_commit_proxy_batch_failed: 3
58
    ]
59

60
  use GenServer
61
  import Bedrock.Internal.GenServer.Replies
62

63
  @spec child_spec(
64
          opts :: [
65
            cluster: Cluster.t(),
66
            director: pid(),
67
            epoch: Bedrock.epoch(),
68
            lock_token: Bedrock.lock_token(),
69
            max_latency_in_ms: non_neg_integer(),
70
            max_per_batch: pos_integer(),
71
            empty_transaction_timeout_ms: non_neg_integer()
72
          ]
73
        ) :: Supervisor.child_spec()
74
  def child_spec(opts) do
75
    cluster = opts[:cluster] || raise "Missing :cluster option"
10✔
76
    director = opts[:director] || raise "Missing :director option"
10✔
77
    epoch = opts[:epoch] || raise "Missing :epoch option"
10✔
78
    lock_token = opts[:lock_token] || raise "Missing :lock_token option"
10✔
79
    max_latency_in_ms = opts[:max_latency_in_ms] || 1
10✔
80
    max_per_batch = opts[:max_per_batch] || 10
10✔
81
    empty_transaction_timeout_ms = opts[:empty_transaction_timeout_ms] || 1_000
10✔
82

83
    %{
10✔
84
      id: __MODULE__,
85
      start:
86
        {GenServer, :start_link,
87
         [
88
           __MODULE__,
89
           {cluster, director, epoch, max_latency_in_ms, max_per_batch,
90
            empty_transaction_timeout_ms, lock_token}
91
         ]},
92
      restart: :temporary
93
    }
94
  end
95

96
  @impl true
97
  @spec init(
98
          {module(), pid(), Bedrock.epoch(), non_neg_integer(), pos_integer(), non_neg_integer(),
99
           binary()}
100
        ) ::
101
          {:ok, State.t(), timeout()}
102
  def init(
103
        {cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms,
104
         lock_token}
105
      ) do
106
    trace_metadata(%{cluster: cluster, pid: self()})
2✔
107

108
    %State{
109
      cluster: cluster,
110
      director: director,
111
      epoch: epoch,
112
      max_latency_in_ms: max_latency_in_ms,
113
      max_per_batch: max_per_batch,
114
      empty_transaction_timeout_ms: empty_transaction_timeout_ms,
115
      lock_token: lock_token
116
    }
117
    |> then(&{:ok, &1, empty_transaction_timeout_ms})
2✔
118
  end
119

120
  @impl true
121
  @spec terminate(term(), State.t()) :: :ok
NEW
122
  def terminate(_reason, _t), do: :ok
×
123

124
  @impl true
125
  @spec handle_call(
126
          {:recover_from, binary(), map()} | {:commit, Bedrock.transaction()},
127
          GenServer.from(),
128
          State.t()
129
        ) ::
130
          {:reply, term(), State.t()} | {:noreply, State.t(), timeout() | {:continue, term()}}
131
  def handle_call(
132
        {:recover_from, lock_token, transaction_system_layout},
133
        _from,
134
        %{mode: :locked} = t
135
      ) do
NEW
136
    if lock_token == t.lock_token do
×
137
      %{t | transaction_system_layout: transaction_system_layout, mode: :running}
NEW
138
      |> reply(:ok)
×
139
    else
NEW
140
      t |> reply({:error, :unauthorized})
×
141
    end
142
  end
143

144
  # When a transaction is submitted, we check to see if we have a batch already
145
  # in progress. If we do, we add the transaction to the batch. If we don't, we
146
  # create a new batch and add the transaction to it. Once added, we check to
147
  # see if the batch meets the finalization policy. If it does, we finalize the
148
  # batch. If it doesn't, we wait for a short timeout to see if anything else
149
  # is submitted before re-considering finalizing the batch.
150
  def handle_call({:commit, transaction}, from, %{mode: :running} = t) do
NEW
151
    case t.transaction_system_layout do
×
152
      nil ->
NEW
153
        t |> reply({:error, :no_transaction_system_layout})
×
154

155
      _layout ->
156
        t
157
        |> start_batch_if_needed()
158
        |> add_transaction_to_batch(transaction, reply_fn(from))
159
        |> apply_finalization_policy()
NEW
160
        |> case do
×
NEW
161
          {t, nil} -> t |> noreply(timeout: 0)
×
NEW
162
          {t, batch} -> t |> noreply(continue: {:finalize, batch})
×
163
        end
164
    end
165
  end
166

167
  # Reject commit calls when locked (except from director with system layout)
168
  def handle_call({:commit, _transaction}, _from, %{mode: :locked} = t),
NEW
169
    do: t |> reply({:error, :locked})
×
170

171
  @impl true
172
  @spec handle_info(:timeout, State.t()) ::
173
          {:noreply, State.t()} | {:noreply, State.t(), {:continue, term()}}
174
  def handle_info(:timeout, %{batch: nil, mode: :running} = t) do
175
    case single_transaction_batch(t, {nil, %{}}) do
2✔
176
      {:ok, batch} ->
NEW
177
        t |> noreply(continue: {:finalize, batch})
×
178

179
      {:error, :sequencer_unavailable} ->
180
        exit({:sequencer_unavailable, :timeout_empty_transaction})
2✔
181
    end
182
  end
183

184
  def handle_info(:timeout, %{batch: nil} = t),
185
    do: t |> noreply(timeout: t.empty_transaction_timeout_ms)
2✔
186

187
  def handle_info(:timeout, %{batch: batch} = t),
188
    do: %{t | batch: nil} |> noreply(continue: {:finalize, batch})
2✔
189

190
  @impl true
191
  @spec handle_continue({:finalize, Batch.t()}, State.t()) :: {:noreply, State.t()}
192
  def handle_continue({:finalize, batch}, t) do
UNCOV
193
    trace_commit_proxy_batch_started(batch.commit_version, length(batch.buffer), Time.now_in_ms())
×
194

UNCOV
195
    case :timer.tc(fn -> finalize_batch(batch, t.transaction_system_layout) end) do
×
196
      {n_usec, {:ok, n_aborts, n_oks}} ->
UNCOV
197
        trace_commit_proxy_batch_finished(batch.commit_version, n_aborts, n_oks, n_usec)
×
NEW
198
        maybe_set_empty_transaction_timeout(t)
×
199

200
      {n_usec, {:error, {:log_failures, errors}}} ->
NEW
201
        trace_commit_proxy_batch_failed(batch, {:log_failures, errors}, n_usec)
×
202
        # Exit to trigger recovery since logs are failing
NEW
203
        exit({:log_failures, errors})
×
204

205
      {n_usec, {:error, {:insufficient_acknowledgments, count, required, errors}}} ->
NEW
206
        trace_commit_proxy_batch_failed(
×
207
          batch,
208
          {:insufficient_acknowledgments, count, required, errors},
209
          n_usec
210
        )
211

NEW
212
        exit({:insufficient_acknowledgments, count, required, errors})
×
213

214
      {n_usec, {:error, {:resolver_unavailable, reason}}} ->
NEW
215
        trace_commit_proxy_batch_failed(batch, {:resolver_unavailable, reason}, n_usec)
×
NEW
216
        exit({:resolver_unavailable, reason})
×
217

218
      {n_usec, {:error, {:storage_team_coverage_error, key}}} ->
NEW
219
        trace_commit_proxy_batch_failed(batch, {:storage_team_coverage_error, key}, n_usec)
×
NEW
220
        exit({:storage_team_coverage_error, key})
×
221

222
      {n_usec, {:error, reason}} ->
UNCOV
223
        trace_commit_proxy_batch_failed(batch, reason, n_usec)
×
UNCOV
224
        t |> noreply()
×
225
    end
226
  end
227

228
  @spec reply_fn(GenServer.from()) :: Batch.reply_fn()
NEW
229
  def reply_fn(from), do: &GenServer.reply(from, &1)
×
230

231
  @spec maybe_set_empty_transaction_timeout(State.t()) :: {:noreply, State.t()}
232
  defp maybe_set_empty_transaction_timeout(%{mode: :running} = t),
NEW
233
    do: t |> noreply(timeout: t.empty_transaction_timeout_ms)
×
234

NEW
235
  defp maybe_set_empty_transaction_timeout(t), do: t |> noreply()
×
236

237
  @doc """
238
  Updates the transaction system layout if the transaction contains system layout information.
239

240
  This function extracts transaction system layout from system transactions that contain
241
  the layout key and updates the commit proxy's state accordingly.
242
  """
243
  @spec maybe_update_layout_from_transaction(State.t(), Bedrock.transaction()) :: State.t()
244
  def maybe_update_layout_from_transaction(state, {_reads, writes}) when is_map(writes) do
245
    layout_key = "\xff/system/transaction_system_layout"
10✔
246

247
    case Map.get(writes, layout_key) do
10✔
248
      nil ->
249
        state
6✔
250

251
      encoded_layout when is_binary(encoded_layout) ->
252
        try do
4✔
253
          layout = :erlang.binary_to_term(encoded_layout)
4✔
254
          %{state | transaction_system_layout: layout}
4✔
255
        rescue
256
          _ ->
257
            # If decoding fails, return state unchanged
NEW
258
            state
×
259
        end
260

261
      _ ->
NEW
262
        state
×
263
    end
264
  end
265

266
  def maybe_update_layout_from_transaction(state, _transaction), do: state
4✔
267
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc