• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 35c5908ec6142451bcdbeaf7780e6283a2552010-PR-43

04 Sep 2025 12:03AM UTC coverage: 63.455% (+0.03%) from 63.428%
35c5908ec6142451bcdbeaf7780e6283a2552010-PR-43

Pull #43

github

jallum
New and improved conflict sharding
Pull Request #43: Add conflict sharding with async resolver assignment

68 of 101 new or added lines in 10 files covered. (67.33%)

1 existing line in 1 file now uncovered.

3030 of 4775 relevant lines covered (63.46%)

609.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.06
/lib/bedrock/data_plane/commit_proxy/server.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Server do
2
  @moduledoc """
3
  GenServer implementation of the Commit Proxy.
4

5
  ## Overview
6

7
  The Commit Proxy batches transaction requests from clients to optimize throughput while
8
  maintaining strict consistency guarantees. It coordinates with resolvers for conflict
9
  detection and logs for durable persistence.
10

11
  ## Lifecycle
12

13
  1. **Initialization**: Starts in `:locked` mode, waiting for recovery completion
14
  2. **Recovery**: Director calls `recover_from/3` to provide transaction system layout and unlock
15
  3. **Transaction Processing**: Accepts `:commit` calls, batches transactions, and finalizes
16
  4. **Empty Transaction Timeout**: Creates empty transactions during quiet periods to advance read versions
17

18
  ## Batching Strategy
19

20
  - **Size-based**: Batches finalize when reaching `max_per_batch` transactions
21
  - **Time-based**: Batches finalize after `max_latency_in_ms` milliseconds
22
  - **Immediate**: Single transactions may bypass batching for low-latency processing
23

24
  ## Timeout Mechanisms
25

26
  - **Fast timeout (0ms)**: Allows GenServer to process any queued `:commit` messages before
27
    finalizing the current batch, ensuring optimal batching efficiency
28
  - **Empty transaction timeout**: Creates empty `{nil, %{}}` transactions during quiet periods
29
    to keep read versions advancing and provide system health checking
30

31
  ## Error Handling
32

33
  Uses fail-fast recovery model where unrecoverable errors (sequencer unavailable, log failures)
34
  trigger process exit and Director-coordinated cluster recovery.
35
  """
36

37
  use GenServer
38

39
  import Bedrock.DataPlane.CommitProxy.Batching,
40
    only: [
41
      start_batch_if_needed: 1,
42
      add_transaction_to_batch: 4,
43
      apply_finalization_policy: 1,
44
      single_transaction_batch: 2,
45
      create_resolver_task: 2
46
    ]
47

48
  import Bedrock.DataPlane.CommitProxy.Finalization, only: [finalize_batch: 3]
49

50
  import Bedrock.DataPlane.CommitProxy.Telemetry,
51
    only: [
52
      trace_metadata: 1,
53
      trace_commit_proxy_batch_started: 3,
54
      trace_commit_proxy_batch_finished: 4,
55
      trace_commit_proxy_batch_failed: 3
56
    ]
57

58
  import Bedrock.Internal.GenServer.Replies
59

60
  alias Bedrock.Cluster
61
  alias Bedrock.DataPlane.CommitProxy.Batch
62
  # ConflictSharding moved to Batching module
63
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
64
  alias Bedrock.DataPlane.CommitProxy.State
65
  alias Bedrock.DataPlane.Transaction
66
  alias Bedrock.Internal.Time
67

68
  @spec child_spec(
69
          opts :: [
70
            cluster: Cluster.t(),
71
            director: pid(),
72
            epoch: Bedrock.epoch(),
73
            lock_token: Bedrock.lock_token(),
74
            instance: non_neg_integer(),
75
            max_latency_in_ms: non_neg_integer(),
76
            max_per_batch: pos_integer(),
77
            empty_transaction_timeout_ms: non_neg_integer()
78
          ]
79
        ) :: Supervisor.child_spec() | no_return()
80
  def child_spec(opts) do
81
    cluster = opts[:cluster] || raise "Missing :cluster option"
18✔
82
    director = opts[:director] || raise "Missing :director option"
18✔
83
    epoch = opts[:epoch] || raise "Missing :epoch option"
18✔
84
    lock_token = opts[:lock_token] || raise "Missing :lock_token option"
18✔
85
    instance = opts[:instance] || raise "Missing :instance option"
18✔
86
    max_latency_in_ms = opts[:max_latency_in_ms] || 1
18✔
87
    max_per_batch = opts[:max_per_batch] || 10
18✔
88
    empty_transaction_timeout_ms = opts[:empty_transaction_timeout_ms] || 1_000
18✔
89

90
    %{
18✔
91
      id: {__MODULE__, cluster, epoch, instance},
92
      start:
93
        {GenServer, :start_link,
94
         [
95
           __MODULE__,
96
           {cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}
97
         ]},
98
      restart: :temporary
99
    }
100
  end
101

102
  @impl true
103
  @spec init({module(), pid(), Bedrock.epoch(), non_neg_integer(), pos_integer(), non_neg_integer(), binary()}) ::
104
          {:ok, State.t(), timeout()}
105
  def init({cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}) do
106
    # Monitor the Director - if it dies, this commit proxy should terminate
107
    Process.monitor(director)
1✔
108

109
    trace_metadata(%{cluster: cluster, pid: self()})
1✔
110

111
    then(
1✔
112
      %State{
113
        cluster: cluster,
114
        director: director,
115
        epoch: epoch,
116
        max_latency_in_ms: max_latency_in_ms,
117
        max_per_batch: max_per_batch,
118
        empty_transaction_timeout_ms: empty_transaction_timeout_ms,
119
        lock_token: lock_token
120
      },
121
      &{:ok, &1, empty_transaction_timeout_ms}
1✔
122
    )
123
  end
124

125
  @impl true
126
  @spec terminate(term(), State.t()) :: :ok
127
  def terminate(_reason, t) do
128
    abort_current_batch(t)
×
129
    :ok
130
  end
131

132
  @impl true
133
  @spec handle_call(
134
          {:recover_from, binary(), map()} | {:commit, Bedrock.transaction()},
135
          GenServer.from(),
136
          State.t()
137
        ) ::
138
          {:reply, term(), State.t()} | {:noreply, State.t(), timeout() | {:continue, term()}}
139
  def handle_call({:recover_from, lock_token, transaction_system_layout}, _from, %{mode: :locked} = t) do
140
    if lock_token == t.lock_token do
×
141
      # Precompute expensive static structures once at boot
NEW
142
      precomputed_layout = LayoutOptimization.precompute_from_layout(transaction_system_layout)
×
143

NEW
144
      reply(
×
145
        %{
146
          t
147
          | transaction_system_layout: transaction_system_layout,
148
            precomputed_layout: precomputed_layout,
149
            mode: :running
150
        },
151
        :ok
152
      )
153
    else
154
      reply(t, {:error, :unauthorized})
×
155
    end
156
  end
157

158
  def handle_call({:commit, transaction}, _from, %{mode: :running, transaction_system_layout: nil} = t)
159
      when is_binary(transaction) do
160
    reply(t, {:error, :no_transaction_system_layout})
×
161
  end
162

163
  def handle_call({:commit, transaction}, from, %{mode: :running} = t) when is_binary(transaction) do
164
    case start_batch_if_needed(t) do
×
165
      {:error, reason} ->
166
        GenServer.reply(from, {:error, :abort})
×
167
        exit(reason)
×
168

169
      updated_t ->
170
        # Start resolver assignment task
NEW
171
        task = create_resolver_task(transaction, updated_t.precomputed_layout)
×
172

173
        updated_t
174
        |> add_transaction_to_batch(transaction, reply_fn(from), task)
175
        |> apply_finalization_policy()
176
        |> case do
×
177
          {t, nil} -> noreply(t, timeout: 0)
×
178
          {t, batch} -> noreply(t, continue: {:finalize, batch})
×
179
        end
180
    end
181
  end
182

183
  def handle_call({:commit, _transaction}, _from, %{mode: :locked} = t), do: reply(t, {:error, :locked})
×
184

185
  @impl true
186
  @spec handle_info(:timeout, State.t()) ::
187
          {:noreply, State.t()} | {:noreply, State.t(), {:continue, term()}}
188
  def handle_info(:timeout, %{batch: nil, mode: :running} = t) do
189
    empty_transaction = Transaction.empty_transaction()
1✔
190

191
    case single_transaction_batch(t, empty_transaction) do
1✔
192
      {:ok, batch} ->
193
        noreply(t, continue: {:finalize, batch})
×
194

195
      {:error, :sequencer_unavailable} ->
196
        exit({:sequencer_unavailable, :timeout_empty_transaction})
1✔
197
    end
198
  end
199

200
  def handle_info(:timeout, %{batch: nil} = t), do: noreply(t, timeout: t.empty_transaction_timeout_ms)
1✔
201

202
  def handle_info(:timeout, %{batch: batch} = t), do: noreply(%{t | batch: nil}, continue: {:finalize, batch})
1✔
203

204
  def handle_info({:DOWN, _ref, :process, director_pid, _reason}, %{director: director_pid} = t) do
205
    # Director has died - this commit proxy should terminate gracefully
206
    {:stop, :normal, t}
×
207
  end
208

209
  def handle_info(_msg, t) do
×
210
    {:noreply, t}
211
  end
212

213
  @impl true
214
  @spec handle_continue({:finalize, Batch.t()}, State.t()) :: {:noreply, State.t()}
215
  def handle_continue({:finalize, batch}, t) do
216
    trace_commit_proxy_batch_started(batch.commit_version, length(batch.buffer), Time.now_in_ms())
×
217

NEW
218
    case :timer.tc(fn ->
×
NEW
219
           finalize_batch(batch, t.transaction_system_layout, epoch: t.epoch, precomputed: t.precomputed_layout)
×
220
         end) do
221
      {n_usec, {:ok, n_aborts, n_oks}} ->
222
        trace_commit_proxy_batch_finished(batch.commit_version, n_aborts, n_oks, n_usec)
×
223
        maybe_set_empty_transaction_timeout(t)
×
224

225
      {n_usec, {:error, {:log_failures, errors}}} ->
226
        trace_commit_proxy_batch_failed(batch, {:log_failures, errors}, n_usec)
×
227
        exit({:log_failures, errors})
×
228

229
      {n_usec, {:error, {:insufficient_acknowledgments, count, required, errors}}} ->
230
        trace_commit_proxy_batch_failed(
×
231
          batch,
232
          {:insufficient_acknowledgments, count, required, errors},
233
          n_usec
234
        )
235

236
        exit({:insufficient_acknowledgments, count, required, errors})
×
237

238
      {n_usec, {:error, {:resolver_unavailable, reason}}} ->
239
        trace_commit_proxy_batch_failed(batch, {:resolver_unavailable, reason}, n_usec)
×
240
        exit({:resolver_unavailable, reason})
×
241

242
      {n_usec, {:error, {:storage_team_coverage_error, key}}} ->
243
        trace_commit_proxy_batch_failed(batch, {:storage_team_coverage_error, key}, n_usec)
×
244
        exit({:storage_team_coverage_error, key})
×
245

246
      {n_usec, {:error, reason}} ->
247
        trace_commit_proxy_batch_failed(batch, reason, n_usec)
×
248
        exit({:unknown_error, reason})
×
249
    end
250
  end
251

252
  @spec reply_fn(GenServer.from()) :: Batch.reply_fn()
253
  def reply_fn(from), do: &GenServer.reply(from, &1)
×
254

255
  # Moved to Batching module to avoid duplication
256

257
  @spec maybe_set_empty_transaction_timeout(State.t()) :: {:noreply, State.t()}
258
  defp maybe_set_empty_transaction_timeout(%{mode: :running} = t),
259
    do: noreply(t, timeout: t.empty_transaction_timeout_ms)
×
260

261
  defp maybe_set_empty_transaction_timeout(t), do: noreply(t)
×
262

263
  @spec abort_current_batch(State.t()) :: :ok
264
  defp abort_current_batch(%{batch: nil}), do: :ok
×
265

266
  defp abort_current_batch(%{batch: batch}) do
267
    batch
268
    |> Batch.all_callers()
269
    |> Enum.each(fn reply_fn -> reply_fn.({:error, :abort}) end)
×
270
  end
271

272
  @doc """
273
  Updates the transaction system layout if the transaction contains system layout information.
274

275
  This function extracts transaction system layout from system transactions that contain
276
  the layout key and updates the commit proxy's state accordingly.
277
  """
278
  @spec maybe_update_layout_from_transaction(State.t(), Bedrock.transaction()) :: State.t()
279
  def maybe_update_layout_from_transaction(state, {_reads, writes}) when is_map(writes) do
280
    layout_key = "\xff/system/transaction_system_layout"
5✔
281

282
    case Map.get(writes, layout_key) do
5✔
283
      nil ->
284
        state
3✔
285

286
      encoded_layout when is_binary(encoded_layout) ->
287
        try do
2✔
288
          layout = :erlang.binary_to_term(encoded_layout)
2✔
289
          %{state | transaction_system_layout: layout}
2✔
290
        rescue
291
          _ ->
292
            state
×
293
        end
294

295
      _ ->
296
        state
×
297
    end
298
  end
299

300
  def maybe_update_layout_from_transaction(state, _transaction), do: state
2✔
301
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc