• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 0e10e0b03cfd70673380309429572fda951bfef4

13 Sep 2025 04:13PM UTC coverage: 66.024% (-0.3%) from 66.298%
0e10e0b03cfd70673380309429572fda951bfef4

push

github

jallum
Added basic String.Chars for Directory

0 of 10 new or added lines in 1 file covered. (0.0%)

9 existing lines in 4 files now uncovered.

3766 of 5704 relevant lines covered (66.02%)

658.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.59
/lib/bedrock/data_plane/commit_proxy/server.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Server do
2
  @moduledoc """
3
  GenServer implementation of the Commit Proxy.
4

5
  ## Overview
6

7
  The Commit Proxy batches transaction requests from clients to optimize throughput while
8
  maintaining strict consistency guarantees. It coordinates with resolvers for conflict
9
  detection and logs for durable persistence.
10

11
  ## Lifecycle
12

13
  1. **Initialization**: Starts in `:locked` mode, waiting for recovery completion
14
  2. **Recovery**: Director calls `recover_from/3` to provide transaction system layout and unlock
15
  3. **Transaction Processing**: Accepts `:commit` calls, batches transactions, and finalizes
16
  4. **Empty Transaction Timeout**: Creates empty transactions during quiet periods to advance read versions
17

18
  ## Batching Strategy
19

20
  - **Size-based**: Batches finalize when reaching `max_per_batch` transactions
21
  - **Time-based**: Batches finalize after `max_latency_in_ms` milliseconds
22
  - **Immediate**: Single transactions may bypass batching for low-latency processing
23

24
  ## Timeout Mechanisms
25

26
  - **Fast timeout (0ms)**: Allows GenServer to process any queued `:commit` messages before
27
    finalizing the current batch, ensuring optimal batching efficiency
28
  - **Empty transaction timeout**: Creates empty `{nil, %{}}` transactions during quiet periods
29
    to keep read versions advancing and provide system health checking
30

31
  ## Error Handling
32

33
  Uses fail-fast recovery model where unrecoverable errors (sequencer unavailable, log failures)
34
  trigger process exit and Director-coordinated cluster recovery.
35
  """
36

37
  use GenServer
38

39
  import Bedrock.DataPlane.CommitProxy.Batching,
40
    only: [
41
      start_batch_if_needed: 1,
42
      apply_finalization_policy: 1,
43
      add_transaction_to_batch: 4,
44
      single_transaction_batch: 2
45
    ]
46

47
  import Bedrock.DataPlane.CommitProxy.Finalization, only: [finalize_batch: 3]
48

49
  import Bedrock.DataPlane.CommitProxy.Telemetry,
50
    only: [trace_metadata: 1]
51

52
  import Bedrock.Internal.GenServer.Replies
53

54
  alias Bedrock.Cluster
55
  alias Bedrock.DataPlane.CommitProxy.Batch
56
  # ConflictSharding moved to Batching module
57
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
58
  alias Bedrock.DataPlane.CommitProxy.State
59
  alias Bedrock.DataPlane.Transaction
60
  alias Bedrock.Internal.Time
61

62
  @spec child_spec(
63
          opts :: [
64
            cluster: Cluster.t(),
65
            director: pid(),
66
            epoch: Bedrock.epoch(),
67
            lock_token: Bedrock.lock_token(),
68
            instance: non_neg_integer(),
69
            max_latency_in_ms: non_neg_integer(),
70
            max_per_batch: pos_integer(),
71
            empty_transaction_timeout_ms: non_neg_integer()
72
          ]
73
        ) :: Supervisor.child_spec() | no_return()
74
  def child_spec(opts) do
75
    cluster = opts[:cluster] || raise "Missing :cluster option"
22✔
76
    director = opts[:director] || raise "Missing :director option"
22✔
77
    epoch = opts[:epoch] || raise "Missing :epoch option"
22✔
78
    lock_token = opts[:lock_token] || raise "Missing :lock_token option"
22✔
79
    instance = opts[:instance] || raise "Missing :instance option"
22✔
80
    max_latency_in_ms = opts[:max_latency_in_ms] || 4
22✔
81
    max_per_batch = opts[:max_per_batch] || 32
22✔
82
    empty_transaction_timeout_ms = opts[:empty_transaction_timeout_ms] || 1_000
22✔
83

84
    %{
22✔
85
      id: {__MODULE__, cluster, epoch, instance},
86
      start:
87
        {GenServer, :start_link,
88
         [
89
           __MODULE__,
90
           {cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}
91
         ]},
92
      restart: :temporary
93
    }
94
  end
95

96
  @impl true
97
  @spec init({module(), pid(), Bedrock.epoch(), non_neg_integer(), pos_integer(), non_neg_integer(), binary()}) ::
98
          {:ok, State.t(), timeout()}
99
  def init({cluster, director, epoch, max_latency_in_ms, max_per_batch, empty_transaction_timeout_ms, lock_token}) do
100
    # Monitor the Director - if it dies, this commit proxy should terminate
101
    Process.monitor(director)
5✔
102

103
    trace_metadata(%{cluster: cluster, pid: self()})
5✔
104

105
    then(
5✔
106
      %State{
107
        cluster: cluster,
108
        director: director,
109
        epoch: epoch,
110
        max_latency_in_ms: max_latency_in_ms,
111
        max_per_batch: max_per_batch,
112
        empty_transaction_timeout_ms: empty_transaction_timeout_ms,
113
        lock_token: lock_token
114
      },
115
      &{:ok, &1, empty_transaction_timeout_ms}
5✔
116
    )
117
  end
118

119
  @impl true
120
  @spec terminate(term(), State.t()) :: :ok
121
  def terminate(_reason, %State{} = t) do
122
    abort_current_batch(t)
3✔
123
    :ok
124
  end
125

126
  @impl true
127
  @spec handle_call(
128
          {:recover_from, binary(), map()} | {:commit, Bedrock.transaction()},
129
          GenServer.from(),
130
          State.t()
131
        ) ::
132
          {:reply, term(), State.t()} | {:noreply, State.t(), timeout() | {:continue, term()}}
133
  def handle_call({:recover_from, lock_token, transaction_system_layout}, _from, %{mode: :locked} = t) do
134
    if lock_token == t.lock_token do
4✔
135
      precomputed_layout = LayoutOptimization.precompute_from_layout(transaction_system_layout)
4✔
136

137
      reply(
4✔
138
        %{
139
          t
140
          | transaction_system_layout: transaction_system_layout,
141
            precomputed_layout: precomputed_layout,
142
            mode: :running
143
        },
144
        :ok
145
      )
146
    else
147
      reply(t, {:error, :unauthorized})
×
148
    end
149
  end
150

151
  def handle_call({:commit, transaction}, _from, %{mode: :running, transaction_system_layout: nil} = t)
152
      when is_binary(transaction) do
153
    reply(t, {:error, :no_transaction_system_layout})
×
154
  end
155

156
  def handle_call({:commit, transaction}, from, %{mode: :running} = t) when is_binary(transaction) do
157
    case start_batch_if_needed(t) do
204✔
158
      {:error, reason} ->
159
        GenServer.reply(from, {:error, :abort})
×
160
        exit(reason)
×
161

162
      updated_t ->
163
        updated_t
164
        |> add_transaction_to_batch(transaction, reply_fn(from), nil)
165
        |> apply_finalization_policy()
166
        |> case do
204✔
167
          {t, nil} ->
168
            # Keep waiting - calculate remaining time until max latency
169
            now = Time.monotonic_now_in_ms()
165✔
170
            elapsed = now - t.batch.started_at
165✔
171
            remaining_time = max(0, t.max_latency_in_ms - elapsed)
165✔
172
            noreply(t, timeout: remaining_time)
165✔
173

174
          {t, batch} ->
175
            # Finalize asynchronously and reset for next batch
176
            finalize_batch_async(batch, t.transaction_system_layout, t.epoch, t.precomputed_layout)
39✔
177
            maybe_set_empty_transaction_timeout(t)
39✔
178
        end
179
    end
180
  end
181

182
  def handle_call({:commit, _transaction}, _from, %{mode: :locked} = t), do: reply(t, {:error, :locked})
×
183

184
  @impl true
185
  @spec handle_info(:timeout, State.t()) :: {:noreply, State.t(), timeout()}
186
  def handle_info(:timeout, %{batch: nil, mode: :running} = t) do
187
    empty_transaction = Transaction.empty_transaction()
1✔
188

189
    case single_transaction_batch(t, empty_transaction) do
1✔
190
      {:ok, batch} ->
191
        # Send empty batch asynchronously
UNCOV
192
        finalize_batch_async(batch, t.transaction_system_layout, t.epoch, t.precomputed_layout)
×
UNCOV
193
        maybe_set_empty_transaction_timeout(t)
×
194

195
      {:error, :sequencer_unavailable} ->
196
        exit({:sequencer_unavailable, :timeout_empty_transaction})
1✔
197
    end
198
  end
199

200
  def handle_info(:timeout, %{batch: nil} = t) do
201
    noreply(t, timeout: t.empty_transaction_timeout_ms)
1✔
202
  end
203

204
  def handle_info(:timeout, %{batch: batch} = t) do
205
    # Timeout reached - finalize current batch asynchronously
206
    finalize_batch_async(batch, t.transaction_system_layout, t.epoch, t.precomputed_layout)
5✔
207
    maybe_set_empty_transaction_timeout(%{t | batch: nil})
5✔
208
  end
209

210
  def handle_info({:DOWN, _ref, :process, director_pid, _reason}, %{director: director_pid} = t) do
211
    # Director has died - this commit proxy should terminate gracefully
212
    {:stop, :normal, t}
3✔
213
  end
214

215
  def handle_info(_msg, t) do
×
216
    {:noreply, t}
217
  end
218

219
  # Asynchronous batch finalization - creates its own resolver tasks
220
  defp finalize_batch_async(batch, transaction_system_layout, epoch, precomputed_layout) do
221
    Task.start_link(fn ->
44✔
222
      case finalize_batch(batch, transaction_system_layout, epoch: epoch, precomputed: precomputed_layout) do
43✔
223
        {:ok, _n_aborts, _n_oks} ->
42✔
224
          :ok
225

226
        {:error, reason} ->
227
          exit(reason)
1✔
228
      end
229
    end)
230
  end
231

232
  @spec reply_fn(GenServer.from()) :: Batch.reply_fn()
233
  def reply_fn(from), do: &GenServer.reply(from, &1)
204✔
234

235
  # Moved to Batching module to avoid duplication
236

237
  @spec maybe_set_empty_transaction_timeout(State.t()) :: {:noreply, State.t(), timeout()}
238
  defp maybe_set_empty_transaction_timeout(%{mode: :running} = t),
239
    do: noreply(t, timeout: t.empty_transaction_timeout_ms)
44✔
240

241
  defp maybe_set_empty_transaction_timeout(t), do: noreply(t)
×
242

243
  @spec abort_current_batch(State.t()) :: :ok
244
  defp abort_current_batch(%{batch: nil}), do: :ok
3✔
245

246
  defp abort_current_batch(%{batch: batch}) do
247
    batch
248
    |> Batch.all_callers()
249
    |> Enum.each(fn reply_fn -> reply_fn.({:error, :abort}) end)
×
250
  end
251
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc