• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 022bf889f44fabf5818028244ac5e8e953e1a798

18 Aug 2025 02:00AM UTC coverage: 62.424% (+2.5%) from 59.888%
022bf889f44fabf5818028244ac5e8e953e1a798

push

github

web-flow
Feature/31 (#35)

# Major Transaction System Overhaul

This branch represents a **fundamental rearchitecture of Bedrock's
transaction processing system** with three main
phases of development spanning performance, unification, reliability,
and architectural improvements.

  ## Phase 1: Transaction Format Unification (fd66e2d)
**Rationale**: Replace the fragmented, inconsistent transaction encoding
system with a unified, robust approach.

  **Key Changes**:
- **Consolidated Transaction Format**: Merged `read_conflicts` and
`read_version` into a single tuple format,
  eliminating redundancy
- **New BedrockTransaction Module**: Replaced the limited
`EncodedTransaction` with a comprehensive 971-line module
  featuring:
    - Tagged binary encoding with self-describing sections
    - CRC validation for data integrity
    - Order-independent sections for better extensibility
    - Efficient partial decoding capabilities
- **Removed Legacy Code**: Eliminated obsolete resolver recovery logic
and transaction fragmentation
- **Comprehensive Testing**: Added extensive test coverage including
binary integration tests

  ## Phase 2: Enhanced Transaction Processing (f96a1ed)
**Rationale**: Address reliability and robustness gaps in transaction
handling.

  **Key Improvements**:
- **Transaction Validation**: Added validation in resolver to ensure
proper transaction format before processing
- **Timeout Management**: Implemented `WaitingList` module for
systematic timeout handling across components
- **Error Resilience**: Enhanced error handling for Director
notifications and unavailable states
- **Standardization**: Moved Logger.require statements to module level
for consistency

  ## Phase 3: Commit Proxy Rework & System-Wide Improvements (dc6a048)
**Rationale**: Optimize transaction distribution and improve
observability across the data plane.

  **Major Architectural Changes**:
- **Single Transaction Per Log**: Reworked commit proxy to produce one
transaction per ... (continued)

817 of 1213 new or added lines in 110 files covered. (67.35%)

40 existing lines in 21 files now uncovered.

2369 of 3795 relevant lines covered (62.42%)

871.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.35
/lib/bedrock/data_plane/commit_proxy/server.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Server do
2
  @moduledoc """
3
  GenServer implementation of the Commit Proxy.
4

5
  ## Overview
6

7
  The Commit Proxy batches transaction requests from clients to optimize throughput while
8
  maintaining strict consistency guarantees. It coordinates with resolvers for conflict
9
  detection and logs for durable persistence.
10

11
  ## Lifecycle
12

13
  1. **Initialization**: Starts in `:locked` mode, waiting for recovery completion
14
  2. **Recovery**: Director calls `recover_from/3` to provide transaction system layout and unlock
15
  3. **Transaction Processing**: Accepts `:commit` calls, batches transactions, and finalizes
16
  4. **Empty Transaction Timeout**: Creates empty transactions during quiet periods to advance read versions
17

18
  ## Batching Strategy
19

20
  - **Size-based**: Batches finalize when reaching `max_per_batch` transactions
21
  - **Time-based**: Batches finalize after `max_latency_in_ms` milliseconds
22
  - **Immediate**: Single transactions may bypass batching for low-latency processing
23

24
  ## Timeout Mechanisms
25

26
  - **Fast timeout (0ms)**: Allows GenServer to process any queued `:commit` messages before
27
    finalizing the current batch, ensuring optimal batching efficiency
28
  - **Empty transaction timeout**: Creates empty `{nil, %{}}` transactions during quiet periods
29
    to keep read versions advancing and provide system health checking
30

31
  ## Error Handling
32

33
  Uses fail-fast recovery model where unrecoverable errors (sequencer unavailable, log failures)
34
  trigger process exit and Director-coordinated cluster recovery.
35
  """
36

37
  use GenServer
38

39
  import Bedrock.DataPlane.CommitProxy.Batching,
40
    only: [
41
      start_batch_if_needed: 1,
42
      add_transaction_to_batch: 3,
43
      apply_finalization_policy: 1,
44
      single_transaction_batch: 2
45
    ]
46

47
  import Bedrock.DataPlane.CommitProxy.Finalization, only: [finalize_batch: 3]
48

49
  import Bedrock.DataPlane.CommitProxy.Telemetry,
50
    only: [
51
      trace_metadata: 1,
52
      trace_commit_proxy_batch_started: 3,
53
      trace_commit_proxy_batch_finished: 4,
54
      trace_commit_proxy_batch_failed: 3
55
    ]
56

57
  import Bedrock.Internal.GenServer.Replies
58

59
  alias Bedrock.Cluster
60
  alias Bedrock.DataPlane.CommitProxy.Batch
61
  alias Bedrock.DataPlane.CommitProxy.State
62
  alias Bedrock.DataPlane.Transaction
63
  alias Bedrock.Internal.Time
64

65
  @spec child_spec(
66
          opts :: [
67
            cluster: Cluster.t(),
68
            director: pid(),
69
            epoch: Bedrock.epoch(),
70
            lock_token: Bedrock.lock_token(),
71
            max_latency_in_ms: non_neg_integer(),
72
            max_per_batch: pos_integer(),
73
            empty_transaction_timeout_ms: non_neg_integer()
74
          ]
75
        ) :: Supervisor.child_spec()
76
  def child_spec(opts) do
77
    cluster = opts[:cluster] || raise "Missing :cluster option"
5✔
78
    director = opts[:director] || raise "Missing :director option"
5✔
79
    epoch = opts[:epoch] || raise "Missing :epoch option"
5✔
80
    lock_token = opts[:lock_token] || raise "Missing :lock_token option"
5✔
81
    max_latency_in_ms = opts[:max_latency_in_ms] || 1
5✔
82
    max_per_batch = opts[:max_per_batch] || 10
5✔
83
    empty_transaction_timeout_ms = opts[:empty_transaction_timeout_ms] || 1_000
5✔
84

85
    %{
5✔
86
      id: __MODULE__,
87
      start:
88
        {GenServer, :start_link,
89
         [
90
           __MODULE__,
91
           {cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}
92
         ]},
93
      restart: :temporary
94
    }
95
  end
96

97
  @impl true
98
  @spec init({module(), pid(), Bedrock.epoch(), non_neg_integer(), pos_integer(), non_neg_integer(), binary()}) ::
99
          {:ok, State.t(), timeout()}
100
  def init({cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}) do
101
    trace_metadata(%{cluster: cluster, pid: self()})
1✔
102

103
    then(
1✔
104
      %State{
105
        cluster: cluster,
106
        director: director,
107
        epoch: epoch,
108
        max_latency_in_ms: max_latency_in_ms,
109
        max_per_batch: max_per_batch,
110
        empty_transaction_timeout_ms: empty_transaction_timeout_ms,
111
        lock_token: lock_token
112
      },
113
      &{:ok, &1, empty_transaction_timeout_ms}
1✔
114
    )
115
  end
116

117
  @impl true
118
  @spec terminate(term(), State.t()) :: :ok
119
  def terminate(_reason, t) do
NEW
120
    abort_current_batch(t)
×
121
    :ok
122
  end
123

124
  @impl true
125
  @spec handle_call(
126
          {:recover_from, binary(), map()} | {:commit, Bedrock.transaction()},
127
          GenServer.from(),
128
          State.t()
129
        ) ::
130
          {:reply, term(), State.t()} | {:noreply, State.t(), timeout() | {:continue, term()}}
131
  def handle_call({:recover_from, lock_token, transaction_system_layout}, _from, %{mode: :locked} = t) do
UNCOV
132
    if lock_token == t.lock_token do
×
NEW
133
      reply(%{t | transaction_system_layout: transaction_system_layout, mode: :running}, :ok)
×
134
    else
NEW
135
      reply(t, {:error, :unauthorized})
×
136
    end
137
  end
138

139
  def handle_call({:commit, transaction}, _from, %{mode: :running, transaction_system_layout: nil} = t)
140
      when is_binary(transaction) do
NEW
141
    reply(t, {:error, :no_transaction_system_layout})
×
142
  end
143

144
  def handle_call({:commit, transaction}, from, %{mode: :running} = t) when is_binary(transaction) do
NEW
145
    case start_batch_if_needed(t) do
×
146
      {:error, reason} ->
NEW
147
        GenServer.reply(from, {:error, :abort})
×
NEW
148
        exit(reason)
×
149

150
      updated_t ->
151
        updated_t
152
        |> add_transaction_to_batch(transaction, reply_fn(from))
153
        |> apply_finalization_policy()
154
        |> case do
×
NEW
155
          {t, nil} -> noreply(t, timeout: 0)
×
NEW
156
          {t, batch} -> noreply(t, continue: {:finalize, batch})
×
157
        end
158
    end
159
  end
160

NEW
161
  def handle_call({:commit, _transaction}, _from, %{mode: :locked} = t), do: reply(t, {:error, :locked})
×
162

163
  @impl true
164
  @spec handle_info(:timeout, State.t()) ::
165
          {:noreply, State.t()} | {:noreply, State.t(), {:continue, term()}}
166
  def handle_info(:timeout, %{batch: nil, mode: :running} = t) do
167
    empty_transaction = Transaction.encode(%{mutations: []})
1✔
168

169
    case single_transaction_batch(t, empty_transaction) do
1✔
170
      {:ok, batch} ->
NEW
171
        noreply(t, continue: {:finalize, batch})
×
172

173
      {:error, :sequencer_unavailable} ->
174
        exit({:sequencer_unavailable, :timeout_empty_transaction})
1✔
175
    end
176
  end
177

178
  def handle_info(:timeout, %{batch: nil} = t), do: noreply(t, timeout: t.empty_transaction_timeout_ms)
1✔
179

180
  def handle_info(:timeout, %{batch: batch} = t), do: noreply(%{t | batch: nil}, continue: {:finalize, batch})
1✔
181

182
  @impl true
183
  @spec handle_continue({:finalize, Batch.t()}, State.t()) :: {:noreply, State.t()}
184
  def handle_continue({:finalize, batch}, t) do
185
    trace_commit_proxy_batch_started(batch.commit_version, length(batch.buffer), Time.now_in_ms())
×
186

NEW
187
    case :timer.tc(fn -> finalize_batch(batch, t.transaction_system_layout, epoch: t.epoch) end) do
×
188
      {n_usec, {:ok, n_aborts, n_oks}} ->
189
        trace_commit_proxy_batch_finished(batch.commit_version, n_aborts, n_oks, n_usec)
×
190
        maybe_set_empty_transaction_timeout(t)
×
191

192
      {n_usec, {:error, {:log_failures, errors}}} ->
193
        trace_commit_proxy_batch_failed(batch, {:log_failures, errors}, n_usec)
×
UNCOV
194
        exit({:log_failures, errors})
×
195

196
      {n_usec, {:error, {:insufficient_acknowledgments, count, required, errors}}} ->
197
        trace_commit_proxy_batch_failed(
×
198
          batch,
199
          {:insufficient_acknowledgments, count, required, errors},
200
          n_usec
201
        )
202

203
        exit({:insufficient_acknowledgments, count, required, errors})
×
204

205
      {n_usec, {:error, {:resolver_unavailable, reason}}} ->
206
        trace_commit_proxy_batch_failed(batch, {:resolver_unavailable, reason}, n_usec)
×
207
        exit({:resolver_unavailable, reason})
×
208

209
      {n_usec, {:error, {:storage_team_coverage_error, key}}} ->
210
        trace_commit_proxy_batch_failed(batch, {:storage_team_coverage_error, key}, n_usec)
×
211
        exit({:storage_team_coverage_error, key})
×
212

213
      {n_usec, {:error, reason}} ->
214
        trace_commit_proxy_batch_failed(batch, reason, n_usec)
×
NEW
215
        exit({:unknown_error, reason})
×
216
    end
217
  end
218

219
  @spec reply_fn(GenServer.from()) :: Batch.reply_fn()
220
  def reply_fn(from), do: &GenServer.reply(from, &1)
×
221

222
  @spec maybe_set_empty_transaction_timeout(State.t()) :: {:noreply, State.t()}
223
  defp maybe_set_empty_transaction_timeout(%{mode: :running} = t),
NEW
224
    do: noreply(t, timeout: t.empty_transaction_timeout_ms)
×
225

NEW
226
  defp maybe_set_empty_transaction_timeout(t), do: noreply(t)
×
227

228
  @spec abort_current_batch(State.t()) :: :ok
NEW
229
  defp abort_current_batch(%{batch: nil}), do: :ok
×
230

231
  defp abort_current_batch(%{batch: batch}) do
232
    batch
233
    |> Batch.all_callers()
NEW
234
    |> Enum.each(fn reply_fn -> reply_fn.({:error, :abort}) end)
×
235
  end
236

237
  @doc """
238
  Updates the transaction system layout if the transaction contains system layout information.
239

240
  This function extracts transaction system layout from system transactions that contain
241
  the layout key and updates the commit proxy's state accordingly.
242
  """
243
  @spec maybe_update_layout_from_transaction(State.t(), Bedrock.transaction()) :: State.t()
244
  def maybe_update_layout_from_transaction(state, {_reads, writes}) when is_map(writes) do
245
    layout_key = "\xff/system/transaction_system_layout"
5✔
246

247
    case Map.get(writes, layout_key) do
5✔
248
      nil ->
249
        state
3✔
250

251
      encoded_layout when is_binary(encoded_layout) ->
252
        try do
2✔
253
          layout = :erlang.binary_to_term(encoded_layout)
2✔
254
          %{state | transaction_system_layout: layout}
2✔
255
        rescue
256
          _ ->
UNCOV
257
            state
×
258
        end
259

260
      _ ->
261
        state
×
262
    end
263
  end
264

265
  def maybe_update_layout_from_transaction(state, _transaction), do: state
2✔
266
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc