• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / d7ac8959925c4bae173515eb4fdfcb27517b289f-PR-51

16 Sep 2025 03:11AM UTC coverage: 65.411% (+0.001%) from 65.41%
d7ac8959925c4bae173515eb4fdfcb27517b289f-PR-51

Pull #51

github

jallum
Repo calls are all implicit, like ecto.
Pull Request #51: Integrating feedback on the API

81 of 103 new or added lines in 3 files covered. (78.64%)

6 existing lines in 3 files now uncovered.

3856 of 5895 relevant lines covered (65.41%)

668.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.92
/lib/bedrock/data_plane/commit_proxy/server.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Server do
2
  @moduledoc """
3
  GenServer implementation of the Commit Proxy.
4

5
  ## Overview
6

7
  The Commit Proxy batches transaction requests from clients to optimize throughput while
8
  maintaining strict consistency guarantees. It coordinates with resolvers for conflict
9
  detection and logs for durable persistence.
10

11
  ## Lifecycle
12

13
  1. **Initialization**: Starts in `:locked` mode, waiting for recovery completion
14
  2. **Recovery**: Director calls `recover_from/3` to provide transaction system layout and unlock
15
  3. **Transaction Processing**: Accepts `:commit` calls, batches transactions, and finalizes
16
  4. **Empty Transaction Timeout**: Creates empty transactions during quiet periods to advance read versions
17

18
  ## Batching Strategy
19

20
  - **Size-based**: Batches finalize when reaching `max_per_batch` transactions
21
  - **Time-based**: Batches finalize after `max_latency_in_ms` milliseconds
22
  - **Immediate**: Single transactions may bypass batching for low-latency processing
23

24
  ## Timeout Mechanisms
25

26
  - **Fast timeout (0ms)**: Allows GenServer to process any queued `:commit` messages before
27
    finalizing the current batch, ensuring optimal batching efficiency
28
  - **Empty transaction timeout**: Creates empty `{nil, %{}}` transactions during quiet periods
29
    to keep read versions advancing and provide system health checking
30

31
  ## Error Handling
32

33
  Uses fail-fast recovery model where unrecoverable errors (sequencer unavailable, log failures)
34
  trigger process exit and Director-coordinated cluster recovery.
35
  """
36

37
  use GenServer
38

39
  import Bedrock.DataPlane.CommitProxy.Batching,
40
    only: [
41
      start_batch_if_needed: 1,
42
      apply_finalization_policy: 1,
43
      add_transaction_to_batch: 4,
44
      single_transaction_batch: 2
45
    ]
46

47
  import Bedrock.DataPlane.CommitProxy.Finalization, only: [finalize_batch: 3]
48

49
  import Bedrock.DataPlane.CommitProxy.Telemetry,
50
    only: [trace_metadata: 0, trace_metadata: 1]
51

52
  import Bedrock.Internal.GenServer.Replies
53

54
  alias Bedrock.Cluster
55
  alias Bedrock.DataPlane.CommitProxy.Batch
56
  # ConflictSharding moved to Batching module
57
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
58
  alias Bedrock.DataPlane.CommitProxy.State
59
  alias Bedrock.DataPlane.Transaction
60

61
  @spec child_spec(
62
          opts :: [
63
            cluster: Cluster.t(),
64
            director: pid(),
65
            epoch: Bedrock.epoch(),
66
            lock_token: Bedrock.lock_token(),
67
            instance: non_neg_integer(),
68
            max_latency_in_ms: non_neg_integer(),
69
            max_per_batch: pos_integer(),
70
            empty_transaction_timeout_ms: non_neg_integer()
71
          ]
72
        ) :: Supervisor.child_spec() | no_return()
73
  def child_spec(opts) do
74
    cluster = opts[:cluster] || raise "Missing :cluster option"
21✔
75
    director = opts[:director] || raise "Missing :director option"
21✔
76
    epoch = opts[:epoch] || raise "Missing :epoch option"
21✔
77
    lock_token = opts[:lock_token] || raise "Missing :lock_token option"
21✔
78
    instance = opts[:instance] || raise "Missing :instance option"
21✔
79
    max_latency_in_ms = opts[:max_latency_in_ms] || 4
21✔
80
    max_per_batch = opts[:max_per_batch] || 32
21✔
81
    empty_transaction_timeout_ms = opts[:empty_transaction_timeout_ms] || 1_000
21✔
82

83
    %{
21✔
84
      id: {__MODULE__, cluster, epoch, instance},
85
      start:
86
        {GenServer, :start_link,
87
         [
88
           __MODULE__,
89
           {cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}
90
         ]},
91
      restart: :temporary
92
    }
93
  end
94

95
  @impl true
96
  @spec init({module(), pid(), Bedrock.epoch(), non_neg_integer(), pos_integer(), non_neg_integer(), binary()}) ::
97
          {:ok, State.t(), timeout()}
98
  def init({cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}) do
99
    # Monitor the Director - if it dies, this commit proxy should terminate
100
    Process.monitor(director)
4✔
101

102
    trace_metadata(%{cluster: cluster, pid: self()})
4✔
103

104
    then(
4✔
105
      %State{
106
        cluster: cluster,
107
        director: director,
108
        epoch: epoch,
109
        max_latency_in_ms: max_latency_in_ms,
110
        max_per_batch: max_per_batch,
111
        empty_transaction_timeout_ms: empty_transaction_timeout_ms,
112
        lock_token: lock_token
113
      },
114
      &{:ok, &1, empty_transaction_timeout_ms}
4✔
115
    )
116
  end
117

118
  @impl true
119
  @spec terminate(term(), State.t()) :: :ok
120
  def terminate(_reason, %State{} = t) do
UNCOV
121
    abort_current_batch(t)
×
122
    :ok
123
  end
124

125
  @impl true
126
  @spec handle_call(
127
          {:recover_from, binary(), map()} | {:commit, Bedrock.transaction()},
128
          GenServer.from(),
129
          State.t()
130
        ) ::
131
          {:reply, term(), State.t()} | {:noreply, State.t(), timeout() | {:continue, term()}}
132
  def handle_call({:recover_from, lock_token, transaction_system_layout}, _from, %{mode: :locked} = t) do
133
    if lock_token == t.lock_token do
3✔
134
      precomputed_layout = LayoutOptimization.precompute_from_layout(transaction_system_layout)
3✔
135

136
      reply(
3✔
137
        %{
138
          t
139
          | transaction_system_layout: transaction_system_layout,
140
            precomputed_layout: precomputed_layout,
141
            mode: :running
142
        },
143
        :ok
144
      )
145
    else
146
      reply(t, {:error, :unauthorized})
×
147
    end
148
  end
149

150
  def handle_call({:commit, transaction}, _from, %{mode: :running, transaction_system_layout: nil} = t)
151
      when is_binary(transaction) do
152
    reply(t, {:error, :no_transaction_system_layout})
×
153
  end
154

155
  def handle_call({:commit, transaction}, from, %{mode: :running} = t) when is_binary(transaction) do
156
    case start_batch_if_needed(t) do
208✔
157
      {:error, reason} ->
158
        GenServer.reply(from, {:error, :abort})
×
159
        exit(reason)
×
160

161
      updated_t ->
162
        updated_t
163
        |> add_transaction_to_batch(transaction, reply_fn(from), nil)
164
        |> apply_finalization_policy()
165
        |> case do
208✔
166
          {t, nil} ->
167
            # Use zero timeout to process any pending messages first
168
            noreply(t, timeout: 0)
168✔
169

170
          {t, batch} ->
171
            # Finalize asynchronously and reset for next batch
172
            finalize_batch_async(batch, t.transaction_system_layout, t.epoch, t.precomputed_layout)
40✔
173
            maybe_set_empty_transaction_timeout(t)
40✔
174
        end
175
    end
176
  end
177

178
  def handle_call({:commit, _transaction}, _from, %{mode: :locked} = t), do: reply(t, {:error, :locked})
×
179

180
  @impl true
181
  @spec handle_info(:timeout, State.t()) :: {:noreply, State.t(), timeout()}
182
  def handle_info(:timeout, %{batch: nil, mode: :running} = t) do
183
    empty_transaction = Transaction.empty_transaction()
1✔
184

185
    case single_transaction_batch(t, empty_transaction) do
1✔
186
      {:ok, batch} ->
187
        # Send empty batch asynchronously
188
        finalize_batch_async(batch, t.transaction_system_layout, t.epoch, t.precomputed_layout)
×
189
        maybe_set_empty_transaction_timeout(t)
×
190

191
      {:error, :sequencer_unavailable} ->
192
        exit({:sequencer_unavailable, :timeout_empty_transaction})
1✔
193
    end
194
  end
195

196
  def handle_info(:timeout, %{batch: nil} = t) do
197
    noreply(t, timeout: t.empty_transaction_timeout_ms)
1✔
198
  end
199

200
  def handle_info(:timeout, %{batch: batch} = t) do
201
    # Timeout reached - finalize current batch asynchronously
202
    finalize_batch_async(batch, t.transaction_system_layout, t.epoch, t.precomputed_layout)
5✔
203
    maybe_set_empty_transaction_timeout(%{t | batch: nil})
5✔
204
  end
205

206
  def handle_info({:DOWN, _ref, :process, director_pid, _reason}, %{director: director_pid} = t) do
207
    # Director has died - this commit proxy should terminate gracefully
UNCOV
208
    {:stop, :normal, t}
×
209
  end
210

211
  def handle_info(_msg, t) do
×
212
    {:noreply, t}
213
  end
214

215
  defp finalize_batch_async(batch, transaction_system_layout, epoch, precomputed_layout) do
216
    metadata = trace_metadata()
45✔
217

218
    Task.start_link(fn ->
45✔
219
      trace_metadata(metadata)
44✔
220

221
      case finalize_batch(batch, transaction_system_layout, epoch: epoch, precomputed: precomputed_layout) do
44✔
222
        {:ok, _n_aborts, _n_oks} ->
43✔
223
          :ok
224

225
        {:error, reason} ->
226
          exit(reason)
1✔
227
      end
228
    end)
229
  end
230

231
  @spec reply_fn(GenServer.from()) :: Batch.reply_fn()
232
  def reply_fn(from), do: &GenServer.reply(from, &1)
208✔
233

234
  # Moved to Batching module to avoid duplication
235

236
  @spec maybe_set_empty_transaction_timeout(State.t()) :: {:noreply, State.t(), timeout()}
237
  defp maybe_set_empty_transaction_timeout(%{mode: :running} = t),
238
    do: noreply(t, timeout: t.empty_transaction_timeout_ms)
45✔
239

240
  defp maybe_set_empty_transaction_timeout(t), do: noreply(t)
×
241

242
  @spec abort_current_batch(State.t()) :: :ok
UNCOV
243
  defp abort_current_batch(%{batch: nil}), do: :ok
×
244

245
  defp abort_current_batch(%{batch: batch}) do
246
    batch
247
    |> Batch.all_callers()
248
    |> Enum.each(fn reply_fn -> reply_fn.({:error, :abort}) end)
×
249
  end
250
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc