• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 984a90d50e5214a5922f87eaa21b526a5950cda6-PR-43

06 Sep 2025 09:12PM UTC coverage: 63.695% (+0.02%) from 63.674%
984a90d50e5214a5922f87eaa21b526a5950cda6-PR-43

Pull #43

github

jallum
Merge remote-tracking branch 'origin/develop' into feature/rework_commit_proxy
Pull Request #43: Add conflict sharding with async resolver assignment

68 of 99 new or added lines in 8 files covered. (68.69%)

3 existing lines in 2 files now uncovered.

3265 of 5126 relevant lines covered (63.69%)

620.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.73
/lib/bedrock/data_plane/commit_proxy/server.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Server do
2
  @moduledoc """
3
  GenServer implementation of the Commit Proxy.
4

5
  ## Overview
6

7
  The Commit Proxy batches transaction requests from clients to optimize throughput while
8
  maintaining strict consistency guarantees. It coordinates with resolvers for conflict
9
  detection and logs for durable persistence.
10

11
  ## Lifecycle
12

13
  1. **Initialization**: Starts in `:locked` mode, waiting for recovery completion
14
  2. **Recovery**: Director calls `recover_from/3` to provide transaction system layout and unlock
15
  3. **Transaction Processing**: Accepts `:commit` calls, batches transactions, and finalizes
16
  4. **Empty Transaction Timeout**: Creates empty transactions during quiet periods to advance read versions
17

18
  ## Batching Strategy
19

20
  - **Size-based**: Batches finalize when reaching `max_per_batch` transactions
21
  - **Time-based**: Batches finalize after `max_latency_in_ms` milliseconds
22
  - **Immediate**: Single transactions may bypass batching for low-latency processing
23

24
  ## Timeout Mechanisms
25

26
  - **Fast timeout (0ms)**: Allows GenServer to process any queued `:commit` messages before
27
    finalizing the current batch, ensuring optimal batching efficiency
28
  - **Empty transaction timeout**: Creates empty `{nil, %{}}` transactions during quiet periods
29
    to keep read versions advancing and provide system health checking
30

31
  ## Error Handling
32

33
  Uses fail-fast recovery model where unrecoverable errors (sequencer unavailable, log failures)
34
  trigger process exit and Director-coordinated cluster recovery.
35
  """
36

37
  use GenServer
38

39
  import Bedrock.DataPlane.CommitProxy.Batching,
40
    only: [
41
      start_batch_if_needed: 1,
42
      add_transaction_to_batch: 4,
43
      apply_finalization_policy: 1,
44
      single_transaction_batch: 2,
45
      create_resolver_task: 2
46
    ]
47

48
  import Bedrock.DataPlane.CommitProxy.Finalization, only: [finalize_batch: 3]
49

50
  import Bedrock.DataPlane.CommitProxy.Telemetry,
51
    only: [
52
      trace_metadata: 1,
53
      trace_commit_proxy_batch_started: 3,
54
      trace_commit_proxy_batch_finished: 4,
55
      trace_commit_proxy_batch_failed: 3
56
    ]
57

58
  import Bedrock.Internal.GenServer.Replies
59

60
  alias Bedrock.Cluster
61
  alias Bedrock.DataPlane.CommitProxy.Batch
62
  # ConflictSharding moved to Batching module
63
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
64
  alias Bedrock.DataPlane.CommitProxy.State
65
  alias Bedrock.DataPlane.Transaction
66
  alias Bedrock.Internal.Time
67

68
  @spec child_spec(
69
          opts :: [
70
            cluster: Cluster.t(),
71
            director: pid(),
72
            epoch: Bedrock.epoch(),
73
            lock_token: Bedrock.lock_token(),
74
            instance: non_neg_integer(),
75
            max_latency_in_ms: non_neg_integer(),
76
            max_per_batch: pos_integer(),
77
            empty_transaction_timeout_ms: non_neg_integer()
78
          ]
79
        ) :: Supervisor.child_spec() | no_return()
80
  def child_spec(opts) do
81
    cluster = opts[:cluster] || raise "Missing :cluster option"
18✔
82
    director = opts[:director] || raise "Missing :director option"
18✔
83
    epoch = opts[:epoch] || raise "Missing :epoch option"
18✔
84
    lock_token = opts[:lock_token] || raise "Missing :lock_token option"
18✔
85
    instance = opts[:instance] || raise "Missing :instance option"
18✔
86
    max_latency_in_ms = opts[:max_latency_in_ms] || 1
18✔
87
    max_per_batch = opts[:max_per_batch] || 10
18✔
88
    empty_transaction_timeout_ms = opts[:empty_transaction_timeout_ms] || 1_000
18✔
89

90
    %{
18✔
91
      id: {__MODULE__, cluster, epoch, instance},
92
      start:
93
        {GenServer, :start_link,
94
         [
95
           __MODULE__,
96
           {cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}
97
         ]},
98
      restart: :temporary
99
    }
100
  end
101

102
  @impl true
103
  @spec init({module(), pid(), Bedrock.epoch(), non_neg_integer(), pos_integer(), non_neg_integer(), binary()}) ::
104
          {:ok, State.t(), timeout()}
105
  def init({cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}) do
106
    # Monitor the Director - if it dies, this commit proxy should terminate
107
    Process.monitor(director)
1✔
108

109
    trace_metadata(%{cluster: cluster, pid: self()})
1✔
110

111
    then(
1✔
112
      %State{
113
        cluster: cluster,
114
        director: director,
115
        epoch: epoch,
116
        max_latency_in_ms: max_latency_in_ms,
117
        max_per_batch: max_per_batch,
118
        empty_transaction_timeout_ms: empty_transaction_timeout_ms,
119
        lock_token: lock_token
120
      },
121
      &{:ok, &1, empty_transaction_timeout_ms}
1✔
122
    )
123
  end
124

125
  @impl true
126
  @spec terminate(term(), State.t()) :: :ok
127
  def terminate(_reason, t) do
128
    abort_current_batch(t)
×
129
    :ok
130
  end
131

132
  @impl true
133
  @spec handle_call(
134
          {:recover_from, binary(), map()} | {:commit, Bedrock.transaction()},
135
          GenServer.from(),
136
          State.t()
137
        ) ::
138
          {:reply, term(), State.t()} | {:noreply, State.t(), timeout() | {:continue, term()}}
139
  def handle_call({:recover_from, lock_token, transaction_system_layout}, _from, %{mode: :locked} = t) do
140
    if lock_token == t.lock_token do
×
141
      # Precompute expensive static structures once at boot
NEW
142
      precomputed_layout = LayoutOptimization.precompute_from_layout(transaction_system_layout)
×
143

NEW
144
      reply(
×
145
        %{
146
          t
147
          | transaction_system_layout: transaction_system_layout,
148
            precomputed_layout: precomputed_layout,
149
            mode: :running
150
        },
151
        :ok
152
      )
153
    else
154
      reply(t, {:error, :unauthorized})
×
155
    end
156
  end
157

158
  def handle_call({:commit, transaction}, _from, %{mode: :running, transaction_system_layout: nil} = t)
159
      when is_binary(transaction) do
160
    reply(t, {:error, :no_transaction_system_layout})
×
161
  end
162

163
  def handle_call({:commit, transaction}, from, %{mode: :running} = t) when is_binary(transaction) do
164
    case start_batch_if_needed(t) do
×
165
      {:error, reason} ->
166
        GenServer.reply(from, {:error, :abort})
×
167
        exit(reason)
×
168

169
      updated_t ->
170
        # Start resolver assignment task
NEW
171
        task = create_resolver_task(transaction, updated_t.precomputed_layout)
×
172

173
        updated_t
174
        |> add_transaction_to_batch(transaction, reply_fn(from), task)
175
        |> apply_finalization_policy()
176
        |> case do
×
177
          {t, nil} -> noreply(t, timeout: 0)
×
178
          {t, batch} -> noreply(t, continue: {:finalize, batch})
×
179
        end
180
    end
181
  end
182

183
  def handle_call({:commit, _transaction}, _from, %{mode: :locked} = t), do: reply(t, {:error, :locked})
×
184

185
  @impl true
186
  @spec handle_info(:timeout, State.t()) ::
187
          {:noreply, State.t()} | {:noreply, State.t(), {:continue, term()}}
188
  def handle_info(:timeout, %{batch: nil, mode: :running} = t) do
189
    empty_transaction = Transaction.empty_transaction()
1✔
190

191
    case single_transaction_batch(t, empty_transaction) do
1✔
192
      {:ok, batch} ->
193
        noreply(t, continue: {:finalize, batch})
×
194

195
      {:error, :sequencer_unavailable} ->
196
        exit({:sequencer_unavailable, :timeout_empty_transaction})
1✔
197
    end
198
  end
199

200
  def handle_info(:timeout, %{batch: nil} = t), do: noreply(t, timeout: t.empty_transaction_timeout_ms)
1✔
201

202
  def handle_info(:timeout, %{batch: batch} = t), do: noreply(%{t | batch: nil}, continue: {:finalize, batch})
1✔
203

204
  def handle_info({:DOWN, _ref, :process, director_pid, _reason}, %{director: director_pid} = t) do
205
    # Director has died - this commit proxy should terminate gracefully
206
    {:stop, :normal, t}
×
207
  end
208

209
  def handle_info(_msg, t) do
×
210
    {:noreply, t}
211
  end
212

213
  @impl true
214
  @spec handle_continue({:finalize, Batch.t()}, State.t()) :: {:noreply, State.t()}
215
  def handle_continue({:finalize, batch}, t) do
216
    trace_commit_proxy_batch_started(batch.commit_version, length(batch.buffer), Time.now_in_ms())
×
217

NEW
218
    case :timer.tc(fn ->
×
NEW
219
           finalize_batch(batch, t.transaction_system_layout, epoch: t.epoch, precomputed: t.precomputed_layout)
×
220
         end) do
221
      {n_usec, {:ok, n_aborts, n_oks}} ->
222
        trace_commit_proxy_batch_finished(batch.commit_version, n_aborts, n_oks, n_usec)
×
223
        maybe_set_empty_transaction_timeout(t)
×
224

225
      {n_usec, {:error, {:log_failures, errors}}} ->
226
        trace_commit_proxy_batch_failed(batch, {:log_failures, errors}, n_usec)
×
227
        exit({:log_failures, errors})
×
228

229
      {n_usec, {:error, {:insufficient_acknowledgments, count, required, errors}}} ->
230
        trace_commit_proxy_batch_failed(
×
231
          batch,
232
          {:insufficient_acknowledgments, count, required, errors},
233
          n_usec
234
        )
235

236
        exit({:insufficient_acknowledgments, count, required, errors})
×
237

238
      {n_usec, {:error, {:resolver_unavailable, reason}}} ->
239
        trace_commit_proxy_batch_failed(batch, {:resolver_unavailable, reason}, n_usec)
×
240
        exit({:resolver_unavailable, reason})
×
241

242
      {n_usec, {:error, {:storage_team_coverage_error, key}}} ->
243
        trace_commit_proxy_batch_failed(batch, {:storage_team_coverage_error, key}, n_usec)
×
244
        exit({:storage_team_coverage_error, key})
×
245

246
      {n_usec, {:error, reason}} ->
247
        trace_commit_proxy_batch_failed(batch, reason, n_usec)
×
248
        exit({:unknown_error, reason})
×
249
    end
250
  end
251

252
  @spec reply_fn(GenServer.from()) :: Batch.reply_fn()
253
  def reply_fn(from), do: &GenServer.reply(from, &1)
×
254

255
  # Moved to Batching module to avoid duplication
256

257
  @spec maybe_set_empty_transaction_timeout(State.t()) :: {:noreply, State.t()}
258
  defp maybe_set_empty_transaction_timeout(%{mode: :running} = t),
259
    do: noreply(t, timeout: t.empty_transaction_timeout_ms)
×
260

261
  defp maybe_set_empty_transaction_timeout(t), do: noreply(t)
×
262

263
  @spec abort_current_batch(State.t()) :: :ok
264
  defp abort_current_batch(%{batch: nil}), do: :ok
×
265

266
  defp abort_current_batch(%{batch: batch}) do
267
    batch
268
    |> Batch.all_callers()
269
    |> Enum.each(fn reply_fn -> reply_fn.({:error, :abort}) end)
×
270
  end
271
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc