• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / b6edf2721597b2e3285bded8c2b796c010f396d2-PR-49

12 Sep 2025 07:47PM UTC coverage: 65.004% (+0.7%) from 64.284%
b6edf2721597b2e3285bded8c2b796c010f396d2-PR-49

Pull #49

github

jallum
Refactor transaction error handling and modernize system architecture

Core Changes:
- Error handling modernization: Replaced exception-based control flow with structured error patterns
  - Converted HCARetryException to {__MODULE__, :retry} throw pattern in HCA allocation
  - Enhanced storage racing with detailed failure categorization and proper error propagation
  - Added StorageError and TransactionError exception types for proper error classification
- Transaction builder enhancements:
  - Renamed Committing module to Finalization for better semantic clarity
  - Added nested transaction support with proper stack management
  - Enhanced failure handling with structured error reasons across all operations
  - Added support for atomic operations and individual key clearing
- Storage layer improvements:
  - Refined error propagation with detailed failure reasons (timeout, unavailable, layout failures)
  - Enhanced storage racing with proper server failure tracking
  - Removed timeout parameter from async streams for better reliability
- Repository layer overhaul:
  - Simplified transaction management with proper retry logic
  - Enhanced error classification between retryable and non-retryable failures
  - Removed complex TransactionManager in favor of direct repo transaction handling
  - Added comprehensive error handling tests
- Directory layer enhancements:
  - Added range() operation to Directory protocol
  - Improved HCA integration with proper transaction context
  - Enhanced prefix allocation with better error handling
  - Fixed root directory handling semantics
- Testing & tooling:
  - Added StorageTelemetry debug module for operation monitoring
  - Comprehensive test coverage for all error handling scenarios
  - Added storage retry tests and error handling validation
Pull Request #49: Implement FoundationDB-compatible directory layer with HCA and subspace support

276 of 409 new or added lines in 18 files covered. (67.48%)

4 existing lines in 3 files now uncovered.

3687 of 5672 relevant lines covered (65.0%)

653.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.39
/lib/bedrock/data_plane/commit_proxy/finalization.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Finalization do
2
  @moduledoc """
3
  Transaction finalization pipeline that handles conflict resolution and log persistence.
4

5
  ## Version Chain Integrity
6

7
  CRITICAL: This module maintains the Lamport clock version chain established by the sequencer.
8
  The sequencer provides both `last_commit_version` and `commit_version` as a proper chain link:
9

10
  - `last_commit_version`: The actual last committed version from the sequencer
11
  - `commit_version`: The new version assigned to this batch
12

13
  Always use the exact version values provided by the sequencer through the batch to maintain
14
  proper MVCC conflict detection and transaction ordering. Version gaps can exist due to failed
15
  transactions, recovery scenarios, or system restarts.
16
  """
17

18
  import Bitwise, only: [<<<: 2]
19

20
  alias Bedrock.ControlPlane.Config.ServiceDescriptor
21
  alias Bedrock.ControlPlane.Config.StorageTeamDescriptor
22
  alias Bedrock.ControlPlane.Config.TransactionSystemLayout
23
  alias Bedrock.DataPlane.CommitProxy.Batch
24
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
25
  alias Bedrock.DataPlane.CommitProxy.Tracing
26
  alias Bedrock.DataPlane.Log
27
  alias Bedrock.DataPlane.Resolver
28
  alias Bedrock.DataPlane.Sequencer
29
  alias Bedrock.DataPlane.Transaction
30
  alias Bedrock.KeyRange
31

32
  @type resolver_fn() :: (Resolver.ref(),
33
                          Bedrock.epoch(),
34
                          Bedrock.version(),
35
                          Bedrock.version(),
36
                          [Transaction.encoded()],
37
                          keyword() ->
38
                            {:ok, [non_neg_integer()]}
39
                            | {:error, term()}
40
                            | {:failure, :timeout, Resolver.ref()}
41
                            | {:failure, :unavailable, Resolver.ref()})
42

43
  @type log_push_batch_fn() :: (TransactionSystemLayout.t(),
44
                                last_commit_version :: Bedrock.version(),
45
                                transactions_by_tag :: %{
46
                                  Bedrock.range_tag() => Transaction.encoded()
47
                                },
48
                                commit_version :: Bedrock.version(),
49
                                opts :: [
50
                                  timeout: Bedrock.timeout_in_ms(),
51
                                  async_stream_fn: async_stream_fn()
52
                                ] ->
53
                                  :ok | {:error, log_push_error()})
54

55
  @type log_push_single_fn() :: (ServiceDescriptor.t(), binary(), Bedrock.version() ->
56
                                   :ok | {:error, :unavailable})
57

58
  @type async_stream_fn() :: (enumerable :: Enumerable.t(), fun :: (term() -> term()), opts :: keyword() ->
59
                                Enumerable.t())
60

61
  @type abort_reply_fn() :: ([Batch.reply_fn()] -> :ok)
62

63
  @type success_reply_fn() :: ([Batch.reply_fn()], Bedrock.version() -> :ok)
64

65
  @type timeout_fn() :: (non_neg_integer() -> non_neg_integer())
66

67
  @type sequencer_notify_fn() :: (Sequencer.ref(), Bedrock.version() -> :ok | {:error, term()})
68

69
  @type resolution_error() ::
70
          :timeout
71
          | :unavailable
72
          | {:resolver_unavailable, term()}
73

74
  @type storage_coverage_error() ::
75
          {:storage_team_coverage_error, binary()}
76

77
  @type log_push_error() ::
78
          {:log_failures, [{Log.id(), term()}]}
79
          | {:insufficient_acknowledgments, non_neg_integer(), non_neg_integer(), [{Log.id(), term()}]}
80
          | :log_push_failed
81

82
  @type finalization_error() ::
83
          resolution_error()
84
          | storage_coverage_error()
85
          | log_push_error()
86

87
  # ============================================================================
88
  # Data Structures
89
  # ============================================================================
90

91
  defmodule FinalizationPlan do
92
    @moduledoc """
93
    Pipeline state for transaction finalization using unified transaction storage
94
    for maximum efficiency and clarity.
95
    """
96

97
    @enforce_keys [
98
      :transactions,
99
      :transaction_count,
100
      :commit_version,
101
      :last_commit_version,
102
      :storage_teams,
103
      :logs_by_id
104
    ]
105
    defstruct [
106
      :transactions,
107
      :transaction_count,
108
      :commit_version,
109
      :last_commit_version,
110
      :storage_teams,
111
      :logs_by_id,
112
      transactions_by_log: %{},
113
      replied_indices: MapSet.new(),
114
      aborted_count: 0,
115
      stage: :initialized,
116
      error: nil
117
    ]
118

119
    @type t :: %__MODULE__{
120
            transactions: %{
121
              non_neg_integer() => {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t()}
122
            },
123
            transaction_count: non_neg_integer(),
124
            commit_version: Bedrock.version(),
125
            last_commit_version: Bedrock.version(),
126
            storage_teams: [StorageTeamDescriptor.t()],
127
            logs_by_id: %{Log.id() => [Bedrock.range_tag()]},
128
            transactions_by_log: %{Log.id() => Transaction.encoded()},
129
            replied_indices: MapSet.t(non_neg_integer()),
130
            aborted_count: non_neg_integer(),
131
            stage: atom(),
132
            error: term() | nil
133
          }
134
  end
135

136
  # ============================================================================
137
  # Main Pipeline
138
  # ============================================================================
139

140
  @doc """
141
  Executes the complete transaction finalization pipeline for a batch of transactions.
142

143
  This function processes a batch through a multi-stage pipeline: conflict resolution,
144
  abort notification, log preparation, log persistence, sequencer notification, and
145
  success notification. The pipeline maintains transactional consistency by ensuring
146
  all operations complete successfully or all pending clients are notified of failure.
147

148
  ## Pipeline Stages
149

150
  1. **Conflict Resolution**: Calls resolvers to determine which transactions must be aborted
151
  2. **Abort Notification**: Immediately notifies clients of aborted transactions
152
  3. **Log Preparation**: Distributes successful transaction mutations to appropriate logs
153
  4. **Log Persistence**: Pushes transactions to ALL log servers and waits for acknowledgment
154
  5. **Sequencer Notification**: Reports successful commit version to the sequencer
155
  6. **Success Notification**: Notifies clients of successful transactions with commit version
156

157
  ## Parameters
158

159
    - `batch`: Transaction batch with commit version details from the sequencer
160
    - `transaction_system_layout`: System configuration including resolvers and log servers
161
    - `opts`: Optional functions for testing and configuration overrides
162

163
  ## Returns
164

165
    - `{:ok, n_aborts, n_successes}` - Pipeline completed successfully with counts
166
    - `{:error, finalization_error()}` - Pipeline failed; all pending clients notified of failure
167

168
  ## Error Handling
169

170
  On any pipeline failure, all transactions that haven't been replied to are automatically
171
  notified with abort responses before returning the error.
172
  """
173
  @spec finalize_batch(
174
          Batch.t(),
175
          TransactionSystemLayout.t(),
176
          opts :: [
177
            epoch: Bedrock.epoch(),
178
            precomputed: LayoutOptimization.precomputed_layout() | nil,
179
            resolver_fn: resolver_fn(),
180
            batch_log_push_fn: log_push_batch_fn(),
181
            abort_reply_fn: abort_reply_fn(),
182
            success_reply_fn: success_reply_fn(),
183
            async_stream_fn: async_stream_fn(),
184
            log_push_fn: log_push_single_fn(),
185
            sequencer_notify_fn: sequencer_notify_fn(),
186
            timeout: non_neg_integer()
187
          ]
188
        ) ::
189
          {:ok, n_aborts :: non_neg_integer(), n_oks :: non_neg_integer()}
190
          | {:error, finalization_error()}
191
  def finalize_batch(batch, transaction_system_layout, opts \\ []) do
192
    batch
193
    |> create_finalization_plan(transaction_system_layout)
194
    |> resolve_conflicts(transaction_system_layout, opts)
195
    |> prepare_for_logging()
196
    |> push_to_logs(transaction_system_layout, opts)
197
    |> notify_sequencer(transaction_system_layout.sequencer, opts)
14✔
198
    |> notify_successes(opts)
199
    |> extract_result_or_handle_error(opts)
14✔
200
  end
201

202
  # ============================================================================
203
  # Pipeline Initialization
204
  # ============================================================================
205

206
  @spec create_finalization_plan(Batch.t(), TransactionSystemLayout.t()) :: FinalizationPlan.t()
207
  def create_finalization_plan(batch, transaction_system_layout) do
208
    %FinalizationPlan{
21✔
209
      transactions: Map.new(batch.buffer, &{elem(&1, 0), &1}),
21✔
210
      transaction_count: Batch.transaction_count(batch),
211
      commit_version: batch.commit_version,
21✔
212
      last_commit_version: batch.last_commit_version,
21✔
213
      storage_teams: transaction_system_layout.storage_teams,
21✔
214
      logs_by_id: transaction_system_layout.logs,
21✔
215
      stage: :ready_for_resolution
216
    }
217
  end
218

219
  # ============================================================================
220
  # Conflict Resolution
221
  # ============================================================================
222

223
  @spec resolve_conflicts(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
224
          FinalizationPlan.t()
225
  def resolve_conflicts(%FinalizationPlan{stage: :ready_for_resolution} = plan, layout, opts) do
226
    epoch = Keyword.get(opts, :epoch) || raise "Missing epoch in finalization opts"
14✔
227

228
    resolver_transaction_map =
14✔
229
      if plan.transaction_count == 0 do
14✔
230
        Map.new(layout.resolvers, fn {_key, ref} -> {ref, []} end)
3✔
231
      else
232
        maps =
11✔
233
          for idx <- 0..(plan.transaction_count - 1) do
11✔
234
            {_idx, _reply_fn, _transaction, task} = Map.fetch!(plan.transactions, idx)
17✔
235
            Task.await(task, 5000)
17✔
236
          end
237

238
        Map.new(layout.resolvers, fn {_key, ref} ->
11✔
239
          transactions = Enum.map(maps, &Map.fetch!(&1, ref))
11✔
240
          {ref, transactions}
241
        end)
242
      end
243

244
    case call_all_resolvers_with_map(
14✔
245
           resolver_transaction_map,
246
           epoch,
247
           plan.last_commit_version,
14✔
248
           plan.commit_version,
14✔
249
           layout.resolvers,
14✔
250
           opts
251
         ) do
252
      {:ok, aborted_set} ->
253
        split_and_notify_aborts_with_set(%{plan | stage: :conflicts_resolved}, aborted_set, opts)
12✔
254

255
      {:error, reason} ->
256
        %{plan | error: reason, stage: :failed}
2✔
257
    end
258
  end
259

260
  @spec call_all_resolvers_with_map(
261
          %{Resolver.ref() => [Transaction.encoded()]},
262
          Bedrock.epoch(),
263
          Bedrock.version(),
264
          Bedrock.version(),
265
          [{start_key :: Bedrock.key(), Resolver.ref()}],
266
          keyword()
267
        ) :: {:ok, MapSet.t(non_neg_integer())} | {:error, term()}
268
  defp call_all_resolvers_with_map(resolver_transaction_map, epoch, last_version, commit_version, resolvers, opts) do
269
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
14✔
270
    timeout = Keyword.get(opts, :timeout, 5_000)
14✔
271

272
    resolvers
273
    |> async_stream_fn.(
274
      fn {_start_key, ref} ->
275
        # Every resolver must have transactions after task processing
276
        filtered_transactions = Map.fetch!(resolver_transaction_map, ref)
12✔
277
        call_resolver_with_retry(ref, epoch, last_version, commit_version, filtered_transactions, opts)
12✔
278
      end,
279
      timeout: timeout
280
    )
281
    |> Enum.reduce_while({:ok, MapSet.new()}, fn
14✔
282
      {:ok, {:ok, aborted}}, {:ok, acc} ->
10✔
283
        {:cont, {:ok, Enum.into(aborted, acc)}}
284

285
      {:ok, {:error, reason}}, _ ->
2✔
286
        {:halt, {:error, reason}}
287

288
      {:exit, reason}, _ ->
×
289
        {:halt, {:error, {:resolver_exit, reason}}}
290
    end)
291
  end
292

293
  @spec call_resolver_with_retry(
294
          Resolver.ref(),
295
          Bedrock.epoch(),
296
          Bedrock.version(),
297
          Bedrock.version(),
298
          [Transaction.encoded()],
299
          keyword(),
300
          non_neg_integer()
301
        ) :: {:ok, [non_neg_integer()]} | {:error, term()}
302
  defp call_resolver_with_retry(
303
         ref,
304
         epoch,
305
         last_version,
306
         commit_version,
307
         filtered_transactions,
308
         opts,
309
         attempts_used \\ 0
12✔
310
       ) do
311
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
16✔
312
    resolver_fn = Keyword.get(opts, :resolver_fn, &Resolver.resolve_transactions/6)
16✔
313
    max_attempts = Keyword.get(opts, :max_attempts, 3)
16✔
314

315
    timeout_in_ms = timeout_fn.(attempts_used)
16✔
316

317
    case resolver_fn.(ref, epoch, last_version, commit_version, filtered_transactions, timeout: timeout_in_ms) do
16✔
318
      {:ok, _} = success ->
319
        success
10✔
320

321
      {:error, reason} when reason in [:timeout, :unavailable] and attempts_used < max_attempts - 1 ->
322
        Tracing.emit_resolver_retry(max_attempts - attempts_used - 2, attempts_used + 1, reason)
2✔
323

324
        call_resolver_with_retry(
2✔
325
          ref,
326
          epoch,
327
          last_version,
328
          commit_version,
329
          filtered_transactions,
330
          opts,
331
          attempts_used + 1
332
        )
333

334
      {:failure, reason, _ref} when reason in [:timeout, :unavailable] and attempts_used < max_attempts - 1 ->
335
        Tracing.emit_resolver_retry(max_attempts - attempts_used - 2, attempts_used + 1, reason)
2✔
336

337
        call_resolver_with_retry(
2✔
338
          ref,
339
          epoch,
340
          last_version,
341
          commit_version,
342
          filtered_transactions,
343
          opts,
344
          attempts_used + 1
345
        )
346

347
      {:error, reason} when reason in [:timeout, :unavailable] ->
348
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
1✔
349
        {:error, {:resolver_unavailable, reason}}
350

351
      {:failure, reason, _ref} when reason in [:timeout, :unavailable] ->
352
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
1✔
353
        {:error, {:resolver_unavailable, reason}}
354

355
      {:error, reason} ->
×
356
        {:error, reason}
357
    end
358
  end
359

360
  @spec default_timeout_fn(non_neg_integer()) :: non_neg_integer()
361
  def default_timeout_fn(attempts_used), do: 500 * (1 <<< attempts_used)
16✔
362

363
  @spec split_and_notify_aborts_with_set(FinalizationPlan.t(), MapSet.t(non_neg_integer()), keyword()) ::
364
          FinalizationPlan.t()
365
  defp split_and_notify_aborts_with_set(%FinalizationPlan{stage: :conflicts_resolved} = plan, aborted_set, opts) do
366
    abort_reply_fn =
12✔
367
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
368

369
    # Reply to aborted transactions
370
    aborted_set
371
    |> Enum.map(fn idx ->
372
      {_idx, reply_fn, _binary, _task} = Map.fetch!(plan.transactions, idx)
4✔
373
      reply_fn
4✔
374
    end)
375
    |> abort_reply_fn.()
12✔
376

377
    # Track that we've replied to these transactions and count them as aborted
378
    %{plan | replied_indices: aborted_set, aborted_count: MapSet.size(aborted_set), stage: :aborts_notified}
12✔
379
  end
380

381
  @spec reply_to_all_clients_with_aborted_transactions([Batch.reply_fn()]) :: :ok
382
  def reply_to_all_clients_with_aborted_transactions([]), do: :ok
10✔
383
  def reply_to_all_clients_with_aborted_transactions(aborts), do: Enum.each(aborts, & &1.({:error, :aborted}))
6✔
384

385
  # ============================================================================
386
  # Log Preparation
387
  # ============================================================================
388

389
  @spec prepare_for_logging(FinalizationPlan.t()) :: FinalizationPlan.t()
390
  def prepare_for_logging(%FinalizationPlan{stage: :failed} = plan), do: plan
2✔
391

392
  def prepare_for_logging(%FinalizationPlan{stage: :aborts_notified} = plan) do
393
    case build_transactions_for_logs(plan, plan.logs_by_id) do
12✔
394
      {:ok, transactions_by_log} ->
395
        %{plan | transactions_by_log: transactions_by_log, stage: :ready_for_logging}
12✔
396

397
      {:error, reason} ->
398
        %{plan | error: reason, stage: :failed}
×
399
    end
400
  end
401

402
  @spec build_transactions_for_logs(FinalizationPlan.t(), %{Log.id() => [Bedrock.range_tag()]}) ::
403
          {:ok, %{Log.id() => Transaction.encoded()}} | {:error, term()}
404
  defp build_transactions_for_logs(plan, logs_by_id) do
405
    initial_mutations_by_log =
12✔
406
      logs_by_id
407
      |> Map.keys()
408
      |> Map.new(&{&1, []})
13✔
409

410
    plan.transactions
12✔
411
    |> Enum.reduce_while(
412
      {:ok, initial_mutations_by_log},
413
      fn {idx, entry}, {:ok, acc} ->
414
        process_transaction_for_logs({idx, entry}, plan, logs_by_id, acc)
15✔
415
      end
416
    )
417
    |> case do
12✔
418
      {:ok, mutations_by_log} ->
419
        result =
12✔
420
          Map.new(mutations_by_log, fn {log_id, mutations_list} ->
421
            # Use the existing encode approach for transaction building
422
            encoded =
13✔
423
              Transaction.encode(%{
424
                mutations: Enum.reverse(mutations_list),
425
                commit_version: plan.commit_version
13✔
426
              })
427

428
            {log_id, encoded}
429
          end)
430

431
        {:ok, result}
432

433
      {:error, reason} ->
×
434
        {:error, reason}
435
    end
436
  end
437

438
  @spec process_transaction_for_logs(
439
          {non_neg_integer(), {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t()}},
440
          FinalizationPlan.t(),
441
          %{Log.id() => [Bedrock.range_tag()]},
442
          %{Log.id() => [term()]}
443
        ) ::
444
          {:cont, {:ok, %{Log.id() => [term()]}}}
445
          | {:halt, {:error, term()}}
446
  defp process_transaction_for_logs({idx, {_idx, _reply_fn, binary, _task}}, plan, logs_by_id, acc) do
447
    if MapSet.member?(plan.replied_indices, idx) do
15✔
448
      # Skip transactions that were already replied to (aborted)
449
      {:cont, {:ok, acc}}
450
    else
451
      process_transaction_mutations(binary, plan.storage_teams, logs_by_id, acc)
11✔
452
    end
453
  end
454

455
  @spec process_transaction_mutations(
456
          binary(),
457
          [StorageTeamDescriptor.t()],
458
          %{Log.id() => [Bedrock.range_tag()]},
459
          %{Log.id() => [term()]}
460
        ) ::
461
          {:cont, {:ok, %{Log.id() => [term()]}}} | {:halt, {:error, term()}}
462
  defp process_transaction_mutations(binary_transaction, storage_teams, logs_by_id, acc) do
463
    case Transaction.mutations(binary_transaction) do
11✔
464
      {:ok, mutations_stream} ->
465
        case process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
11✔
466
          {:ok, updated_acc} ->
11✔
467
            {:cont, {:ok, updated_acc}}
468

469
          {:error, reason} ->
×
470
            {:halt, {:error, reason}}
471
        end
472

473
      {:error, :section_not_found} ->
×
474
        {:cont, {:ok, acc}}
475

476
      {:error, reason} ->
×
477
        {:halt, {:error, {:mutation_extraction_failed, reason}}}
478
    end
479
  end
480

481
  @spec process_mutations_for_transaction(
482
          Enumerable.t(),
483
          [StorageTeamDescriptor.t()],
484
          %{Log.id() => [Bedrock.range_tag()]},
485
          %{Log.id() => [term()]}
486
        ) ::
487
          {:ok, %{Log.id() => [term()]}} | {:error, term()}
488
  defp process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
489
    Enum.reduce_while(mutations_stream, {:ok, acc}, fn mutation, {:ok, mutations_acc} ->
11✔
490
      distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc)
11✔
491
    end)
492
  end
493

494
  @spec distribute_mutation_to_logs(
495
          term(),
496
          [StorageTeamDescriptor.t()],
497
          %{Log.id() => [Bedrock.range_tag()]},
498
          %{Log.id() => [term()]}
499
        ) ::
500
          {:cont, {:ok, %{Log.id() => [term()]}}}
501
          | {:halt, {:error, term()}}
502
  defp distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc) do
503
    key_or_range = mutation_to_key_or_range(mutation)
11✔
504

505
    case key_or_range_to_tags(key_or_range, storage_teams) do
11✔
506
      {:ok, []} ->
×
507
        {:halt, {:error, {:storage_team_coverage_error, key_or_range}}}
508

509
      {:ok, affected_tags} ->
510
        affected_logs = find_logs_for_tags(affected_tags, logs_by_id)
11✔
511

512
        updated_acc =
11✔
513
          Enum.reduce(affected_logs, mutations_acc, fn log_id, acc_inner ->
514
            Map.update!(acc_inner, log_id, &[mutation | &1])
12✔
515
          end)
516

517
        {:cont, {:ok, updated_acc}}
518
    end
519
  end
520

521
  @spec mutation_to_key_or_range(
522
          {:set, Bedrock.key(), Bedrock.value()}
523
          | {:clear, Bedrock.key()}
524
          | {:clear_range, Bedrock.key(), Bedrock.key()}
525
          | {:atomic, atom(), Bedrock.key(), Bedrock.value()}
526
        ) ::
527
          Bedrock.key() | {Bedrock.key(), Bedrock.key()}
528
  def mutation_to_key_or_range({:set, key, _value}), do: key
11✔
529
  def mutation_to_key_or_range({:clear, key}), do: key
1✔
530
  def mutation_to_key_or_range({:clear_range, start_key, end_key}), do: {start_key, end_key}
2✔
NEW
531
  def mutation_to_key_or_range({:atomic, _op, key, _value}), do: key
×
532

533
  @spec key_or_range_to_tags(Bedrock.key() | Bedrock.key_range(), [StorageTeamDescriptor.t()]) ::
534
          {:ok, [Bedrock.range_tag()]}
535
  def key_or_range_to_tags({start_key, end_key}, storage_teams) do
536
    tags =
5✔
537
      for %{tag: tag, key_range: {team_start, team_end}} <- storage_teams,
14✔
538
          ranges_intersect?(start_key, end_key, team_start, team_end) do
539
        tag
540
      end
541

542
    {:ok, tags}
543
  end
544

545
  def key_or_range_to_tags(key, storage_teams) do
546
    tags =
25✔
547
      for %{tag: tag, key_range: {min_key, max_key_ex}} <- storage_teams,
56✔
548
          KeyRange.contains?({min_key, max_key_ex}, key) do
549
        tag
550
      end
551

552
    {:ok, tags}
553
  end
554

555
  @spec find_logs_for_tags([Bedrock.range_tag()], %{Log.id() => [Bedrock.range_tag()]}) :: [Log.id()]
556
  def find_logs_for_tags(tags, logs_by_id) do
557
    tag_set = MapSet.new(tags)
16✔
558

559
    logs_by_id
560
    |> Enum.filter(fn {_log_id, log_tags} ->
561
      Enum.any?(log_tags, &MapSet.member?(tag_set, &1))
33✔
562
    end)
563
    |> Enum.map(fn {log_id, _log_tags} -> log_id end)
16✔
564
  end
565

566
  @spec ranges_intersect?(Bedrock.key(), Bedrock.key() | :end, Bedrock.key(), Bedrock.key() | :end) :: boolean()
567
  defp ranges_intersect?(_start1, :end, _start2, :end), do: true
1✔
568
  defp ranges_intersect?(start1, :end, _start2, end2), do: start1 < end2
2✔
569
  defp ranges_intersect?(_start1, end1, start2, :end), do: end1 > start2
3✔
570
  defp ranges_intersect?(start1, end1, start2, end2), do: start1 < end2 and end1 > start2
8✔
571

572
  # ============================================================================
573
  # Log Distribution
574
  # ============================================================================
575

576
  @spec push_to_logs(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) :: FinalizationPlan.t()
577
  def push_to_logs(%FinalizationPlan{stage: :failed} = plan, _layout, _opts), do: plan
2✔
578

579
  def push_to_logs(%FinalizationPlan{stage: :ready_for_logging} = plan, layout, opts) do
580
    batch_log_push_fn = Keyword.get(opts, :batch_log_push_fn, &push_transaction_to_logs_direct/5)
12✔
581

582
    case batch_log_push_fn.(layout, plan.last_commit_version, plan.transactions_by_log, plan.commit_version, opts) do
12✔
583
      :ok ->
584
        %{plan | stage: :logged}
10✔
585

586
      {:error, reason} ->
587
        %{plan | error: reason, stage: :failed}
2✔
588
    end
589
  end
590

591
  @spec resolve_log_descriptors(%{Log.id() => term()}, %{term() => ServiceDescriptor.t()}) :: %{
592
          Log.id() => ServiceDescriptor.t()
593
        }
594
  def resolve_log_descriptors(log_descriptors, services) do
595
    log_descriptors
596
    |> Map.keys()
597
    |> Enum.map(&{&1, Map.get(services, &1)})
15✔
598
    |> Enum.reject(&is_nil(elem(&1, 1)))
15✔
599
    |> Map.new()
10✔
600
  end
601

602
  @spec try_to_push_transaction_to_log(ServiceDescriptor.t(), binary(), Bedrock.version()) ::
603
          :ok | {:error, :unavailable}
604
  def try_to_push_transaction_to_log(%{kind: :log, status: {:up, log_server}}, transaction, last_commit_version) do
605
    Log.push(log_server, transaction, last_commit_version)
17✔
606
  end
607

608
  def try_to_push_transaction_to_log(_, _, _), do: {:error, :unavailable}
1✔
609

610
  @doc """
611
  Pushes transactions directly to logs and waits for acknowledgement from ALL log servers.
612

613
  This function takes transactions that have already been built per log and pushes them
614
  to the appropriate log servers. Each log receives its pre-built transaction.
615
  All logs must acknowledge to maintain durability guarantees.
616

617
  ## Parameters
618

619
    - `transaction_system_layout`: Contains configuration information about the
620
      transaction system, including available log servers.
621
    - `last_commit_version`: The last known committed version; used to
622
      ensure consistency in log ordering.
623
    - `transactions_by_log`: Map of log_id to transaction for that log.
624
      May be empty transactions if all transactions were aborted.
625
    - `commit_version`: The version assigned by the sequencer for this batch.
626
    - `opts`: Optional configuration for testing and customization.
627

628
  ## Options
629
    - `:async_stream_fn` - Function for parallel processing (default: Task.async_stream/3)
630
    - `:log_push_fn` - Function for pushing to individual logs (default: try_to_push_transaction_to_log/3)
631
    - `:timeout` - Timeout for log push operations (default: 5_000ms)
632

633
  ## Returns
634
    - `:ok` if acknowledgements have been received from ALL log servers.
635
    - `{:error, log_push_error()}` if any log has not successfully acknowledged the
636
       push within the timeout period or other errors occur.
637
  """
638
  @spec push_transaction_to_logs_direct(
639
          TransactionSystemLayout.t(),
640
          last_commit_version :: Bedrock.version(),
641
          %{Log.id() => Transaction.encoded()},
642
          commit_version :: Bedrock.version(),
643
          opts :: [
644
            async_stream_fn: async_stream_fn(),
645
            log_push_fn: log_push_single_fn(),
646
            timeout: non_neg_integer()
647
          ]
648
        ) :: :ok | {:error, log_push_error()}
649
  def push_transaction_to_logs_direct(
650
        transaction_system_layout,
651
        last_commit_version,
652
        transactions_by_log,
653
        _commit_version,
654
        opts \\ []
×
655
      ) do
656
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
10✔
657
    log_push_fn = Keyword.get(opts, :log_push_fn, &try_to_push_transaction_to_log/3)
10✔
658
    timeout = Keyword.get(opts, :timeout, 5_000)
10✔
659

660
    logs_by_id = transaction_system_layout.logs
10✔
661
    required_acknowledgments = map_size(logs_by_id)
10✔
662

663
    logs_by_id
664
    |> resolve_log_descriptors(transaction_system_layout.services)
10✔
665
    |> async_stream_fn.(
666
      fn {log_id, service_descriptor} ->
667
        encoded_transaction = Map.get(transactions_by_log, log_id)
14✔
668
        result = log_push_fn.(service_descriptor, encoded_transaction, last_commit_version)
14✔
669
        {log_id, result}
670
      end,
671
      timeout: timeout
672
    )
673
    |> Enum.reduce_while({0, []}, fn
674
      {:ok, {log_id, {:error, reason}}}, {_count, errors} ->
2✔
675
        {:halt, {:error, [{log_id, reason} | errors]}}
676

677
      {:ok, {_log_id, :ok}}, {count, errors} ->
678
        count = 1 + count
13✔
679

680
        if count == required_acknowledgments do
13✔
681
          {:halt, {:ok, count}}
682
        else
683
          {:cont, {count, errors}}
684
        end
685

686
      {:exit, {log_id, reason}}, {_count, errors} ->
×
687
        {:halt, {:error, [{log_id, reason} | errors]}}
688
    end)
689
    |> case do
10✔
690
      {:ok, ^required_acknowledgments} ->
8✔
691
        :ok
692

693
      {:error, errors} ->
2✔
694
        {:error, {:log_failures, errors}}
695

696
      {count, errors} when count < required_acknowledgments ->
×
697
        {:error, {:insufficient_acknowledgments, count, required_acknowledgments, errors}}
698

699
      _other ->
×
700
        {:error, :log_push_failed}
701
    end
702
  end
703

704
  # ============================================================================
705
  # Sequencer Notification
706
  # ============================================================================
707

708
  @spec notify_sequencer(FinalizationPlan.t(), Sequencer.ref(), keyword()) :: FinalizationPlan.t()
709
  def notify_sequencer(%FinalizationPlan{stage: :failed} = plan, _sequencer, _opts), do: plan
4✔
710

711
  def notify_sequencer(%FinalizationPlan{stage: :logged} = plan, sequencer, opts) do
712
    sequencer_notify_fn = Keyword.get(opts, :sequencer_notify_fn, &Sequencer.report_successful_commit/2)
10✔
713

714
    case sequencer_notify_fn.(sequencer, plan.commit_version) do
10✔
715
      :ok ->
716
        %{plan | stage: :sequencer_notified}
9✔
717

718
      {:error, reason} ->
719
        %{plan | error: reason, stage: :failed}
1✔
720
    end
721
  end
722

723
  # ============================================================================
724
  # Success Notification
725
  # ============================================================================
726

727
  @spec notify_successes(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
728
  def notify_successes(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
5✔
729

730
  def notify_successes(%FinalizationPlan{stage: :sequencer_notified} = plan, opts) do
731
    success_reply_fn = Keyword.get(opts, :success_reply_fn, &send_reply_with_commit_version/2)
9✔
732

733
    {successful_reply_fns, successful_indices} =
9✔
734
      plan.transactions
9✔
735
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
13✔
736
      |> Enum.map(fn {idx, {_tx_idx, reply_fn, _binary, _task}} -> {reply_fn, idx} end)
9✔
737
      |> Enum.unzip()
738

739
    success_reply_fn.(successful_reply_fns, plan.commit_version)
9✔
740

741
    %{plan | replied_indices: MapSet.union(plan.replied_indices, MapSet.new(successful_indices)), stage: :completed}
9✔
742
  end
743

744
  @spec send_reply_with_commit_version([Batch.reply_fn()], Bedrock.version()) :: :ok
745
  def send_reply_with_commit_version(oks, commit_version), do: Enum.each(oks, & &1.({:ok, commit_version}))
9✔
746

747
  # ============================================================================
748
  # Result Extraction and Error Handling
749
  # ============================================================================
750

751
  @spec extract_result_or_handle_error(FinalizationPlan.t(), keyword()) ::
752
          {:ok, non_neg_integer(), non_neg_integer()} | {:error, finalization_error()}
753
  def extract_result_or_handle_error(%FinalizationPlan{stage: :completed} = plan, _opts) do
754
    n_aborts = plan.aborted_count
9✔
755
    n_successes = plan.transaction_count - n_aborts
9✔
756

757
    {:ok, n_aborts, n_successes}
9✔
758
  end
759

760
  def extract_result_or_handle_error(%FinalizationPlan{stage: :failed} = plan, opts), do: handle_error(plan, opts)
5✔
761

762
  @spec handle_error(FinalizationPlan.t(), keyword()) :: {:error, finalization_error()}
763
  defp handle_error(%FinalizationPlan{error: error} = plan, opts) when not is_nil(error) do
764
    abort_reply_fn =
5✔
765
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
766

767
    # Notify all transactions that haven't been replied to yet
768
    pending_reply_fns =
5✔
769
      plan.transactions
5✔
770
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
4✔
771
      |> Enum.map(fn {_idx, {_tx_idx, reply_fn, _binary, _task}} -> reply_fn end)
4✔
772

773
    abort_reply_fn.(pending_reply_fns)
5✔
774

775
    {:error, plan.error}
5✔
776
  end
777
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc