• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / ab2c6c3b5c2ca795449d60536259da9da850b9d0

01 Sep 2025 01:46PM UTC coverage: 63.428% (+0.08%) from 63.351%
ab2c6c3b5c2ca795449d60536259da9da850b9d0

push

github

jallum
Reworked tree balancing, Transaction processing

54 of 66 new or added lines in 3 files covered. (81.82%)

65 existing lines in 7 files now uncovered.

2990 of 4714 relevant lines covered (63.43%)

592.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.36
/lib/bedrock/data_plane/commit_proxy/finalization.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Finalization do
2
  @moduledoc """
3
  Transaction finalization pipeline that handles conflict resolution and log persistence.
4

5
  ## Version Chain Integrity
6

7
  CRITICAL: This module maintains the Lamport clock version chain established by the sequencer.
8
  The sequencer provides both `last_commit_version` and `commit_version` as a proper chain link:
9

10
  - `last_commit_version`: The actual last committed version from the sequencer
11
  - `commit_version`: The new version assigned to this batch
12

13
  Always use the exact version values provided by the sequencer through the batch to maintain
14
  proper MVCC conflict detection and transaction ordering. Version gaps can exist due to failed
15
  transactions, recovery scenarios, or system restarts.
16
  """
17

18
  import Bitwise, only: [<<<: 2]
19

20
  alias __MODULE__.ResolutionPlan
21
  alias Bedrock.ControlPlane.Config.ServiceDescriptor
22
  alias Bedrock.ControlPlane.Config.StorageTeamDescriptor
23
  alias Bedrock.ControlPlane.Config.TransactionSystemLayout
24
  alias Bedrock.DataPlane.CommitProxy.Batch
25
  alias Bedrock.DataPlane.CommitProxy.Tracing
26
  alias Bedrock.DataPlane.Log
27
  alias Bedrock.DataPlane.Resolver
28
  alias Bedrock.DataPlane.Sequencer
29
  alias Bedrock.DataPlane.Transaction
30

UNCOV
31
  defguard key_in_range?(key, min_key, max_key_ex)
×
32
           when key >= min_key and (max_key_ex == :end or key < max_key_ex)
33

34
  # ============================================================================
35
  # Type Definitions
36
  # ============================================================================
37

38
  @type resolver_fn() :: (Resolver.ref(),
39
                          Bedrock.epoch(),
40
                          Bedrock.version(),
41
                          Bedrock.version(),
42
                          [Resolver.transaction_summary()],
43
                          keyword() ->
44
                            {:ok, [non_neg_integer()]} | {:error, term()})
45

46
  @type log_push_batch_fn() :: (TransactionSystemLayout.t(),
47
                                last_commit_version :: Bedrock.version(),
48
                                transactions_by_tag :: %{
49
                                  Bedrock.range_tag() => Transaction.encoded()
50
                                },
51
                                commit_version :: Bedrock.version(),
52
                                opts :: [
53
                                  timeout: Bedrock.timeout_in_ms(),
54
                                  async_stream_fn: async_stream_fn()
55
                                ] ->
56
                                  :ok | {:error, log_push_error()})
57

58
  @type log_push_single_fn() :: (ServiceDescriptor.t(), binary(), Bedrock.version() ->
59
                                   :ok | {:error, :unavailable})
60

61
  @type async_stream_fn() :: (enumerable :: Enumerable.t(), fun :: (term() -> term()), opts :: keyword() ->
62
                                Enumerable.t())
63

64
  @type abort_reply_fn() :: ([Batch.reply_fn()] -> :ok)
65

66
  @type success_reply_fn() :: ([Batch.reply_fn()], Bedrock.version() -> :ok)
67

68
  @type timeout_fn() :: (non_neg_integer() -> non_neg_integer())
69

70
  @type sequencer_notify_fn() :: (Sequencer.ref(), Bedrock.version() -> :ok | {:error, term()})
71

72
  @type resolution_error() ::
73
          :timeout
74
          | :unavailable
75
          | {:resolver_unavailable, term()}
76

77
  @type storage_coverage_error() ::
78
          {:storage_team_coverage_error, binary()}
79

80
  @type log_push_error() ::
81
          {:log_failures, [{Log.id(), term()}]}
82
          | {:insufficient_acknowledgments, non_neg_integer(), non_neg_integer(), [{Log.id(), term()}]}
83
          | :log_push_failed
84

85
  @type finalization_error() ::
86
          resolution_error()
87
          | storage_coverage_error()
88
          | log_push_error()
89

90
  # ============================================================================
91
  # Data Structures
92
  # ============================================================================
93

94
  defmodule ResolutionPlan do
95
    @moduledoc """
96
    Configuration and data for resolver conflict resolution operations.
97
    Encapsulates all parameters needed for calling resolvers with retry logic.
98
    """
99

100
    alias Bedrock.DataPlane.CommitProxy.Finalization
101

102
    @enforce_keys [:epoch, :last_version, :commit_version, :resolver_data_list, :resolvers]
103
    defstruct [
104
      :epoch,
105
      :last_version,
106
      :commit_version,
107
      :resolver_data_list,
108
      :resolvers,
109
      timeout_fn: &Finalization.default_timeout_fn/1,
110
      resolver_fn: &Resolver.resolve_transactions/6,
111
      max_attempts: 3
112
    ]
113

114
    @type t :: %__MODULE__{
115
            epoch: Bedrock.epoch(),
116
            last_version: Bedrock.version(),
117
            commit_version: Bedrock.version(),
118
            resolver_data_list: [Resolver.transaction_summary()],
119
            resolvers: [{start_key :: Bedrock.key(), Resolver.ref()}],
120
            timeout_fn: (non_neg_integer() -> non_neg_integer()),
121
            resolver_fn: (Resolver.ref(),
122
                          Bedrock.epoch(),
123
                          Bedrock.version(),
124
                          Bedrock.version(),
125
                          [Resolver.transaction_summary()],
126
                          keyword() ->
127
                            {:ok, [non_neg_integer()]} | {:error, term()}),
128
            max_attempts: non_neg_integer()
129
          }
130
  end
131

132
  defmodule FinalizationPlan do
133
    @moduledoc """
134
    Pipeline state for transaction finalization using unified transaction storage
135
    for maximum efficiency and clarity.
136
    """
137

138
    @enforce_keys [
139
      :transactions,
140
      :transaction_count,
141
      :commit_version,
142
      :last_commit_version,
143
      :storage_teams,
144
      :logs_by_id
145
    ]
146
    defstruct [
147
      :transactions,
148
      :transaction_count,
149
      :commit_version,
150
      :last_commit_version,
151
      :storage_teams,
152
      :logs_by_id,
153
      transactions_by_log: %{},
154
      replied_indices: MapSet.new(),
155
      aborted_count: 0,
156
      stage: :initialized,
157
      error: nil
158
    ]
159

160
    @type t :: %__MODULE__{
161
            transactions: %{non_neg_integer() => {non_neg_integer(), Batch.reply_fn(), Transaction.encoded()}},
162
            transaction_count: non_neg_integer(),
163
            commit_version: Bedrock.version(),
164
            last_commit_version: Bedrock.version(),
165
            storage_teams: [StorageTeamDescriptor.t()],
166
            logs_by_id: %{Log.id() => [Bedrock.range_tag()]},
167
            transactions_by_log: %{Log.id() => Transaction.encoded()},
168
            replied_indices: MapSet.t(non_neg_integer()),
169
            aborted_count: non_neg_integer(),
170
            stage: atom(),
171
            error: term() | nil
172
          }
173
  end
174

175
  # ============================================================================
176
  # Main Pipeline
177
  # ============================================================================
178

179
  @doc """
180
  Executes the complete transaction finalization pipeline for a batch of transactions.
181

182
  This function processes a batch through a multi-stage pipeline: conflict resolution,
183
  abort notification, log preparation, log persistence, sequencer notification, and
184
  success notification. The pipeline maintains transactional consistency by ensuring
185
  all operations complete successfully or all pending clients are notified of failure.
186

187
  ## Pipeline Stages
188

189
  1. **Conflict Resolution**: Calls resolvers to determine which transactions must be aborted
190
  2. **Abort Notification**: Immediately notifies clients of aborted transactions
191
  3. **Log Preparation**: Distributes successful transaction mutations to appropriate logs
192
  4. **Log Persistence**: Pushes transactions to ALL log servers and waits for acknowledgment
193
  5. **Sequencer Notification**: Reports successful commit version to the sequencer
194
  6. **Success Notification**: Notifies clients of successful transactions with commit version
195

196
  ## Parameters
197

198
    - `batch`: Transaction batch with commit version details from the sequencer
199
    - `transaction_system_layout`: System configuration including resolvers and log servers
200
    - `opts`: Optional functions for testing and configuration overrides
201

202
  ## Returns
203

204
    - `{:ok, n_aborts, n_successes}` - Pipeline completed successfully with counts
205
    - `{:error, finalization_error()}` - Pipeline failed; all pending clients notified of failure
206

207
  ## Error Handling
208

209
  On any pipeline failure, all transactions that haven't been replied to are automatically
210
  notified with abort responses before returning the error.
211
  """
212
  @spec finalize_batch(
213
          Batch.t(),
214
          TransactionSystemLayout.t(),
215
          opts :: [
216
            epoch: Bedrock.epoch(),
217
            resolver_fn: resolver_fn(),
218
            batch_log_push_fn: log_push_batch_fn(),
219
            abort_reply_fn: abort_reply_fn(),
220
            success_reply_fn: success_reply_fn(),
221
            async_stream_fn: async_stream_fn(),
222
            log_push_fn: log_push_single_fn(),
223
            sequencer_notify_fn: sequencer_notify_fn(),
224
            timeout: non_neg_integer()
225
          ]
226
        ) ::
227
          {:ok, n_aborts :: non_neg_integer(), n_oks :: non_neg_integer()}
228
          | {:error, finalization_error()}
229
  def finalize_batch(batch, transaction_system_layout, opts \\ []) do
230
    batch
231
    |> create_finalization_plan(transaction_system_layout)
232
    |> resolve_conflicts(transaction_system_layout, opts)
233
    |> prepare_for_logging()
234
    |> push_to_logs(transaction_system_layout, opts)
235
    |> notify_sequencer(transaction_system_layout.sequencer, opts)
14✔
236
    |> notify_successes(opts)
237
    |> extract_result_or_handle_error(opts)
14✔
238
  end
239

240
  @doc """
241
  Legacy function for testing resolver calls independently.
242

243
  This function provides backwards compatibility for existing tests that test
244
  the resolver resolution logic in isolation.
245
  """
246
  @spec resolve_transactions(
247
          Bedrock.epoch(),
248
          [{start_key :: Bedrock.key(), Resolver.ref()}],
249
          last_version :: Bedrock.version(),
250
          commit_version :: Bedrock.version(),
251
          [Resolver.transaction_summary()],
252
          keyword()
253
        ) :: {:ok, MapSet.t(non_neg_integer())} | {:error, term()}
254
  def resolve_transactions(epoch, resolvers, last_version, commit_version, transaction_summaries, opts \\ []) do
255
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
6✔
256
    resolver_fn = Keyword.get(opts, :resolver_fn, &Resolver.resolve_transactions/6)
6✔
257
    max_attempts = Keyword.get(opts, :max_attempts, 3)
6✔
258

259
    call_all_resolvers_with_retry(%ResolutionPlan{
6✔
260
      epoch: epoch,
261
      last_version: last_version,
262
      commit_version: commit_version,
263
      resolver_data_list: transaction_summaries,
264
      resolvers: resolvers,
265
      timeout_fn: timeout_fn,
266
      resolver_fn: resolver_fn,
267
      max_attempts: max_attempts
268
    })
269
  end
270

271
  # ============================================================================
272
  # Pipeline Initialization
273
  # ============================================================================
274

275
  @spec create_finalization_plan(Batch.t(), TransactionSystemLayout.t()) :: FinalizationPlan.t()
276
  def create_finalization_plan(batch, transaction_system_layout) do
277
    %FinalizationPlan{
21✔
278
      transactions: Map.new(batch.buffer, &{elem(&1, 0), &1}),
21✔
279
      transaction_count: Batch.transaction_count(batch),
280
      commit_version: batch.commit_version,
21✔
281
      last_commit_version: batch.last_commit_version,
21✔
282
      storage_teams: transaction_system_layout.storage_teams,
21✔
283
      logs_by_id: transaction_system_layout.logs,
21✔
284
      stage: :ready_for_resolution
285
    }
286
  end
287

288
  # ============================================================================
289
  # Conflict Resolution
290
  # ============================================================================
291

292
  @spec resolve_conflicts(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
293
          FinalizationPlan.t()
294
  def resolve_conflicts(%FinalizationPlan{stage: :ready_for_resolution} = plan, layout, opts) do
295
    epoch = Keyword.get(opts, :epoch) || raise "Missing epoch in finalization opts"
14✔
296
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
14✔
297
    resolver_fn = Keyword.get(opts, :resolver_fn, &Resolver.resolve_transactions/6)
14✔
298
    max_attempts = Keyword.get(opts, :max_attempts, 3)
14✔
299

300
    # Extract conflict sections from transactions
301
    resolver_data_list =
14✔
302
      if plan.transaction_count == 0 do
14✔
303
        []
304
      else
305
        for idx <- 0..(plan.transaction_count - 1) do
11✔
306
          {_idx, _reply_fn, binary} = Map.fetch!(plan.transactions, idx)
17✔
307
          Transaction.extract_sections!(binary, [:read_conflicts, :write_conflicts])
17✔
308
        end
309
      end
310

311
    %ResolutionPlan{
312
      epoch: epoch,
313
      last_version: plan.last_commit_version,
14✔
314
      commit_version: plan.commit_version,
14✔
315
      resolver_data_list: resolver_data_list,
316
      resolvers: layout.resolvers,
14✔
317
      timeout_fn: timeout_fn,
318
      resolver_fn: resolver_fn,
319
      max_attempts: max_attempts
320
    }
321
    |> call_all_resolvers_with_retry()
322
    |> case do
14✔
323
      {:ok, aborted_set} ->
324
        split_and_notify_aborts_with_set(%{plan | stage: :conflicts_resolved}, aborted_set, opts)
12✔
325

326
      {:error, reason} ->
327
        %{plan | error: reason, stage: :failed}
2✔
328
    end
329
  end
330

331
  @spec call_all_resolvers_with_retry(ResolutionPlan.t()) :: {:ok, MapSet.t(non_neg_integer())} | {:error, term()}
332
  defp call_all_resolvers_with_retry(%ResolutionPlan{} = resolution_plan) do
333
    resolution_plan.resolvers
20✔
334
    |> Enum.map(fn {start_key, ref} ->
335
      call_resolver_with_retry(ref, start_key, resolution_plan)
19✔
336
    end)
337
    |> Enum.reduce({:ok, MapSet.new()}, fn
20✔
338
      {:ok, aborted}, {:ok, acc} ->
12✔
339
        {:ok, Enum.into(aborted, acc)}
340

341
      {:error, reason}, _ ->
7✔
342
        {:error, reason}
343
    end)
344
  end
345

346
  @spec call_resolver_with_retry(
347
          Resolver.ref(),
348
          start_key :: Bedrock.key(),
349
          ResolutionPlan.t(),
350
          attempts_used :: non_neg_integer()
351
        ) :: {:ok, [non_neg_integer()]} | {:error, term()}
352
  defp call_resolver_with_retry(ref, start_key, %ResolutionPlan{} = plan, attempts_used \\ 0) do
353
    timeout_in_ms = plan.timeout_fn.(attempts_used)
33✔
354

355
    case plan.resolver_fn.(ref, plan.epoch, plan.last_version, plan.commit_version, plan.resolver_data_list,
33✔
356
           timeout: timeout_in_ms
357
         ) do
358
      {:ok, _} = success ->
359
        success
12✔
360

361
      {:error, reason} when reason in [:timeout, :unavailable] and attempts_used < plan.max_attempts - 1 ->
362
        Tracing.emit_resolver_retry(plan.max_attempts - attempts_used - 2, attempts_used + 1, reason)
14✔
363
        call_resolver_with_retry(ref, start_key, plan, attempts_used + 1)
14✔
364

365
      {:error, reason} when reason in [:timeout, :unavailable] ->
366
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
7✔
367
        {:error, {:resolver_unavailable, reason}}
368

UNCOV
369
      {:error, reason} ->
×
370
        {:error, reason}
371
    end
372
  end
373

374
  @spec default_timeout_fn(non_neg_integer()) :: non_neg_integer()
375
  def default_timeout_fn(attempts_used), do: 500 * (1 <<< attempts_used)
27✔
376

377
  @spec split_and_notify_aborts_with_set(FinalizationPlan.t(), MapSet.t(non_neg_integer()), keyword()) ::
378
          FinalizationPlan.t()
379
  defp split_and_notify_aborts_with_set(%FinalizationPlan{stage: :conflicts_resolved} = plan, aborted_set, opts) do
380
    abort_reply_fn =
12✔
381
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
382

383
    # Reply to aborted transactions
384
    aborted_set
385
    |> Enum.map(fn idx ->
386
      {_idx, reply_fn, _binary} = Map.fetch!(plan.transactions, idx)
4✔
387
      reply_fn
4✔
388
    end)
389
    |> abort_reply_fn.()
12✔
390

391
    # Track that we've replied to these transactions and count them as aborted
392
    %{plan | replied_indices: aborted_set, aborted_count: MapSet.size(aborted_set), stage: :aborts_notified}
12✔
393
  end
394

395
  @spec reply_to_all_clients_with_aborted_transactions([Batch.reply_fn()]) :: :ok
396
  def reply_to_all_clients_with_aborted_transactions([]), do: :ok
10✔
397
  def reply_to_all_clients_with_aborted_transactions(aborts), do: Enum.each(aborts, & &1.({:error, :aborted}))
6✔
398

399
  # ============================================================================
400
  # Log Preparation
401
  # ============================================================================
402

403
  @spec prepare_for_logging(FinalizationPlan.t()) :: FinalizationPlan.t()
404
  def prepare_for_logging(%FinalizationPlan{stage: :failed} = plan), do: plan
2✔
405

406
  def prepare_for_logging(%FinalizationPlan{stage: :aborts_notified} = plan) do
407
    case build_transactions_for_logs(plan, plan.logs_by_id) do
12✔
408
      {:ok, transactions_by_log} ->
409
        %{plan | transactions_by_log: transactions_by_log, stage: :ready_for_logging}
12✔
410

411
      {:error, reason} ->
UNCOV
412
        %{plan | error: reason, stage: :failed}
×
413
    end
414
  end
415

416
  @spec build_transactions_for_logs(FinalizationPlan.t(), %{Log.id() => [Bedrock.range_tag()]}) ::
417
          {:ok, %{Log.id() => Transaction.encoded()}} | {:error, term()}
418
  defp build_transactions_for_logs(plan, logs_by_id) do
419
    initial_mutations_by_log =
12✔
420
      logs_by_id
421
      |> Map.keys()
422
      |> Map.new(&{&1, []})
13✔
423

424
    plan.transactions
12✔
425
    |> Enum.reduce_while(
426
      {:ok, initial_mutations_by_log},
427
      fn {idx, entry}, {:ok, acc} ->
428
        process_transaction_for_logs({idx, entry}, plan, logs_by_id, acc)
15✔
429
      end
430
    )
431
    |> case do
12✔
432
      {:ok, mutations_by_log} ->
433
        result =
12✔
434
          Map.new(mutations_by_log, fn {log_id, mutations_list} ->
435
            # Use the existing encode approach for transaction building
436
            encoded =
13✔
437
              Transaction.encode(%{
438
                mutations: Enum.reverse(mutations_list),
439
                commit_version: plan.commit_version
13✔
440
              })
441

442
            {log_id, encoded}
443
          end)
444

445
        {:ok, result}
446

UNCOV
447
      {:error, reason} ->
×
448
        {:error, reason}
449
    end
450
  end
451

452
  @spec process_transaction_for_logs(
453
          {non_neg_integer(), {non_neg_integer(), Batch.reply_fn(), Transaction.encoded()}},
454
          FinalizationPlan.t(),
455
          %{Log.id() => [Bedrock.range_tag()]},
456
          %{Log.id() => [term()]}
457
        ) ::
458
          {:cont, {:ok, %{Log.id() => [term()]}}}
459
          | {:halt, {:error, term()}}
460
  defp process_transaction_for_logs({idx, {_idx, _reply_fn, binary}}, plan, logs_by_id, acc) do
461
    if MapSet.member?(plan.replied_indices, idx) do
15✔
462
      # Skip transactions that were already replied to (aborted)
463
      {:cont, {:ok, acc}}
464
    else
465
      process_transaction_mutations(binary, plan.storage_teams, logs_by_id, acc)
11✔
466
    end
467
  end
468

469
  @spec process_transaction_mutations(
470
          binary(),
471
          [StorageTeamDescriptor.t()],
472
          %{Log.id() => [Bedrock.range_tag()]},
473
          %{Log.id() => [term()]}
474
        ) ::
475
          {:cont, {:ok, %{Log.id() => [term()]}}} | {:halt, {:error, term()}}
476
  defp process_transaction_mutations(binary_transaction, storage_teams, logs_by_id, acc) do
477
    case Transaction.mutations(binary_transaction) do
11✔
478
      {:ok, mutations_stream} ->
479
        case process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
11✔
480
          {:ok, updated_acc} ->
11✔
481
            {:cont, {:ok, updated_acc}}
482

UNCOV
483
          {:error, reason} ->
×
484
            {:halt, {:error, reason}}
485
        end
486

UNCOV
487
      {:error, reason} ->
×
488
        {:halt, {:error, {:mutation_extraction_failed, reason}}}
489
    end
490
  end
491

492
  @spec process_mutations_for_transaction(
493
          Enumerable.t(),
494
          [StorageTeamDescriptor.t()],
495
          %{Log.id() => [Bedrock.range_tag()]},
496
          %{Log.id() => [term()]}
497
        ) ::
498
          {:ok, %{Log.id() => [term()]}} | {:error, term()}
499
  defp process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
500
    Enum.reduce_while(mutations_stream, {:ok, acc}, fn mutation, {:ok, mutations_acc} ->
11✔
501
      distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc)
11✔
502
    end)
503
  end
504

505
  @spec distribute_mutation_to_logs(
506
          term(),
507
          [StorageTeamDescriptor.t()],
508
          %{Log.id() => [Bedrock.range_tag()]},
509
          %{Log.id() => [term()]}
510
        ) ::
511
          {:cont, {:ok, %{Log.id() => [term()]}}}
512
          | {:halt, {:error, term()}}
513
  defp distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc) do
514
    key_or_range = mutation_to_key_or_range(mutation)
11✔
515

516
    case key_or_range_to_tags(key_or_range, storage_teams) do
11✔
UNCOV
517
      {:ok, []} ->
×
518
        {:halt, {:error, {:storage_team_coverage_error, key_or_range}}}
519

520
      {:ok, affected_tags} ->
521
        affected_logs = find_logs_for_tags(affected_tags, logs_by_id)
11✔
522

523
        updated_acc =
11✔
524
          Enum.reduce(affected_logs, mutations_acc, fn log_id, acc_inner ->
525
            Map.update!(acc_inner, log_id, &[mutation | &1])
12✔
526
          end)
527

528
        {:cont, {:ok, updated_acc}}
529
    end
530
  end
531

532
  @spec mutation_to_key_or_range(
533
          {:set, Bedrock.key(), Bedrock.value()}
534
          | {:clear, Bedrock.key()}
535
          | {:clear_range, Bedrock.key(), Bedrock.key()}
536
        ) ::
537
          Bedrock.key() | {Bedrock.key(), Bedrock.key()}
538
  def mutation_to_key_or_range({:set, key, _value}), do: key
11✔
539
  def mutation_to_key_or_range({:clear, key}), do: key
1✔
540
  def mutation_to_key_or_range({:clear_range, start_key, end_key}), do: {start_key, end_key}
2✔
541

542
  @spec key_or_range_to_tags(Bedrock.key() | Bedrock.key_range(), [StorageTeamDescriptor.t()]) ::
543
          {:ok, [Bedrock.range_tag()]}
544
  def key_or_range_to_tags({start_key, end_key}, storage_teams) do
545
    tags =
5✔
546
      for %{tag: tag, key_range: {team_start, team_end}} <- storage_teams,
14✔
547
          ranges_intersect?(start_key, end_key, team_start, team_end) do
548
        tag
549
      end
550

551
    {:ok, tags}
552
  end
553

554
  def key_or_range_to_tags(key, storage_teams) do
555
    tags =
25✔
556
      for %{tag: tag, key_range: {min_key, max_key_ex}} <- storage_teams,
56✔
557
          key_in_range?(key, min_key, max_key_ex) do
56✔
558
        tag
559
      end
560

561
    {:ok, tags}
562
  end
563

564
  @spec find_logs_for_tags([Bedrock.range_tag()], %{Log.id() => [Bedrock.range_tag()]}) :: [Log.id()]
565
  def find_logs_for_tags(tags, logs_by_id) do
566
    tag_set = MapSet.new(tags)
16✔
567

568
    logs_by_id
569
    |> Enum.filter(fn {_log_id, log_tags} ->
570
      Enum.any?(log_tags, &MapSet.member?(tag_set, &1))
33✔
571
    end)
572
    |> Enum.map(fn {log_id, _log_tags} -> log_id end)
16✔
573
  end
574

575
  @spec ranges_intersect?(Bedrock.key(), Bedrock.key() | :end, Bedrock.key(), Bedrock.key() | :end) :: boolean()
576
  defp ranges_intersect?(_start1, :end, _start2, :end), do: true
1✔
577
  defp ranges_intersect?(start1, :end, _start2, end2), do: start1 < end2
2✔
578
  defp ranges_intersect?(_start1, end1, start2, :end), do: end1 > start2
3✔
579
  defp ranges_intersect?(start1, end1, start2, end2), do: start1 < end2 and end1 > start2
8✔
580

581
  # ============================================================================
582
  # Log Distribution
583
  # ============================================================================
584

585
  @spec push_to_logs(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) :: FinalizationPlan.t()
586
  def push_to_logs(%FinalizationPlan{stage: :failed} = plan, _layout, _opts), do: plan
2✔
587

588
  def push_to_logs(%FinalizationPlan{stage: :ready_for_logging} = plan, layout, opts) do
589
    batch_log_push_fn = Keyword.get(opts, :batch_log_push_fn, &push_transaction_to_logs_direct/5)
12✔
590

591
    case batch_log_push_fn.(layout, plan.last_commit_version, plan.transactions_by_log, plan.commit_version, opts) do
12✔
592
      :ok ->
593
        %{plan | stage: :logged}
10✔
594

595
      {:error, reason} ->
596
        %{plan | error: reason, stage: :failed}
2✔
597
    end
598
  end
599

600
  @spec resolve_log_descriptors(%{Log.id() => term()}, %{term() => ServiceDescriptor.t()}) :: %{
601
          Log.id() => ServiceDescriptor.t()
602
        }
603
  def resolve_log_descriptors(log_descriptors, services) do
604
    log_descriptors
605
    |> Map.keys()
606
    |> Enum.map(&{&1, Map.get(services, &1)})
15✔
607
    |> Enum.reject(&is_nil(elem(&1, 1)))
15✔
608
    |> Map.new()
10✔
609
  end
610

611
  @spec try_to_push_transaction_to_log(ServiceDescriptor.t(), binary(), Bedrock.version()) ::
612
          :ok | {:error, :unavailable}
613
  def try_to_push_transaction_to_log(%{kind: :log, status: {:up, log_server}}, transaction, last_commit_version) do
614
    Log.push(log_server, transaction, last_commit_version)
17✔
615
  end
616

617
  def try_to_push_transaction_to_log(_, _, _), do: {:error, :unavailable}
1✔
618

619
  @doc """
620
  Pushes transactions directly to logs and waits for acknowledgement from ALL log servers.
621

622
  This function takes transactions that have already been built per log and pushes them
623
  to the appropriate log servers. Each log receives its pre-built transaction.
624
  All logs must acknowledge to maintain durability guarantees.
625

626
  ## Parameters
627

628
    - `transaction_system_layout`: Contains configuration information about the
629
      transaction system, including available log servers.
630
    - `last_commit_version`: The last known committed version; used to
631
      ensure consistency in log ordering.
632
    - `transactions_by_log`: Map of log_id to transaction for that log.
633
      May be empty transactions if all transactions were aborted.
634
    - `commit_version`: The version assigned by the sequencer for this batch.
635
    - `opts`: Optional configuration for testing and customization.
636

637
  ## Options
638
    - `:async_stream_fn` - Function for parallel processing (default: Task.async_stream/3)
639
    - `:log_push_fn` - Function for pushing to individual logs (default: try_to_push_transaction_to_log/3)
640
    - `:timeout` - Timeout for log push operations (default: 5_000ms)
641

642
  ## Returns
643
    - `:ok` if acknowledgements have been received from ALL log servers.
644
    - `{:error, log_push_error()}` if any log has not successfully acknowledged the
645
       push within the timeout period or other errors occur.
646
  """
647
  @spec push_transaction_to_logs_direct(
648
          TransactionSystemLayout.t(),
649
          last_commit_version :: Bedrock.version(),
650
          %{Log.id() => Transaction.encoded()},
651
          commit_version :: Bedrock.version(),
652
          opts :: [
653
            async_stream_fn: async_stream_fn(),
654
            log_push_fn: log_push_single_fn(),
655
            timeout: non_neg_integer()
656
          ]
657
        ) :: :ok | {:error, log_push_error()}
658
  def push_transaction_to_logs_direct(
659
        transaction_system_layout,
660
        last_commit_version,
661
        transactions_by_log,
662
        _commit_version,
UNCOV
663
        opts \\ []
×
664
      ) do
665
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
10✔
666
    log_push_fn = Keyword.get(opts, :log_push_fn, &try_to_push_transaction_to_log/3)
10✔
667
    timeout = Keyword.get(opts, :timeout, 5_000)
10✔
668

669
    logs_by_id = transaction_system_layout.logs
10✔
670
    required_acknowledgments = map_size(logs_by_id)
10✔
671

672
    logs_by_id
673
    |> resolve_log_descriptors(transaction_system_layout.services)
10✔
674
    |> async_stream_fn.(
675
      fn {log_id, service_descriptor} ->
676
        encoded_transaction = Map.get(transactions_by_log, log_id)
14✔
677
        result = log_push_fn.(service_descriptor, encoded_transaction, last_commit_version)
14✔
678
        {log_id, result}
679
      end,
680
      timeout: timeout
681
    )
682
    |> Enum.reduce_while({0, []}, fn
683
      {:ok, {log_id, {:error, reason}}}, {_count, errors} ->
2✔
684
        {:halt, {:error, [{log_id, reason} | errors]}}
685

686
      {:ok, {_log_id, :ok}}, {count, errors} ->
687
        count = 1 + count
13✔
688

689
        if count == required_acknowledgments do
13✔
690
          {:halt, {:ok, count}}
691
        else
692
          {:cont, {count, errors}}
693
        end
694

UNCOV
695
      {:exit, {log_id, reason}}, {_count, errors} ->
×
696
        {:halt, {:error, [{log_id, reason} | errors]}}
697
    end)
698
    |> case do
10✔
699
      {:ok, ^required_acknowledgments} ->
8✔
700
        :ok
701

702
      {:error, errors} ->
2✔
703
        {:error, {:log_failures, errors}}
704

UNCOV
705
      {count, errors} when count < required_acknowledgments ->
×
706
        {:error, {:insufficient_acknowledgments, count, required_acknowledgments, errors}}
707

UNCOV
708
      _other ->
×
709
        {:error, :log_push_failed}
710
    end
711
  end
712

713
  # ============================================================================
714
  # Sequencer Notification
715
  # ============================================================================
716

717
  @spec notify_sequencer(FinalizationPlan.t(), Sequencer.ref(), keyword()) :: FinalizationPlan.t()
718
  def notify_sequencer(%FinalizationPlan{stage: :failed} = plan, _sequencer, _opts), do: plan
4✔
719

720
  def notify_sequencer(%FinalizationPlan{stage: :logged} = plan, sequencer, opts) do
721
    sequencer_notify_fn = Keyword.get(opts, :sequencer_notify_fn, &Sequencer.report_successful_commit/2)
10✔
722

723
    case sequencer_notify_fn.(sequencer, plan.commit_version) do
10✔
724
      :ok ->
725
        %{plan | stage: :sequencer_notified}
9✔
726

727
      {:error, reason} ->
728
        %{plan | error: reason, stage: :failed}
1✔
729
    end
730
  end
731

732
  # ============================================================================
733
  # Success Notification
734
  # ============================================================================
735

736
  @spec notify_successes(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
737
  def notify_successes(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
5✔
738

739
  def notify_successes(%FinalizationPlan{stage: :sequencer_notified} = plan, opts) do
740
    success_reply_fn = Keyword.get(opts, :success_reply_fn, &send_reply_with_commit_version/2)
9✔
741

742
    {successful_reply_fns, successful_indices} =
9✔
743
      plan.transactions
9✔
744
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
13✔
745
      |> Enum.map(fn {idx, {_tx_idx, reply_fn, _binary}} -> {reply_fn, idx} end)
9✔
746
      |> Enum.unzip()
747

748
    success_reply_fn.(successful_reply_fns, plan.commit_version)
9✔
749

750
    %{plan | replied_indices: MapSet.union(plan.replied_indices, MapSet.new(successful_indices)), stage: :completed}
9✔
751
  end
752

753
  @spec send_reply_with_commit_version([Batch.reply_fn()], Bedrock.version()) :: :ok
754
  def send_reply_with_commit_version(oks, commit_version), do: Enum.each(oks, & &1.({:ok, commit_version}))
9✔
755

756
  # ============================================================================
757
  # Result Extraction and Error Handling
758
  # ============================================================================
759

760
  @spec extract_result_or_handle_error(FinalizationPlan.t(), keyword()) ::
761
          {:ok, non_neg_integer(), non_neg_integer()} | {:error, finalization_error()}
762
  def extract_result_or_handle_error(%FinalizationPlan{stage: :completed} = plan, _opts) do
763
    n_aborts = plan.aborted_count
9✔
764
    n_successes = plan.transaction_count - n_aborts
9✔
765

766
    {:ok, n_aborts, n_successes}
9✔
767
  end
768

769
  def extract_result_or_handle_error(%FinalizationPlan{stage: :failed} = plan, opts), do: handle_error(plan, opts)
5✔
770

771
  @spec handle_error(FinalizationPlan.t(), keyword()) :: {:error, finalization_error()}
772
  defp handle_error(%FinalizationPlan{error: error} = plan, opts) when not is_nil(error) do
773
    abort_reply_fn =
5✔
774
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
775

776
    # Notify all transactions that haven't been replied to yet
777
    pending_reply_fns =
5✔
778
      plan.transactions
5✔
779
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
4✔
780
      |> Enum.map(fn {_idx, {_tx_idx, reply_fn, _binary}} -> reply_fn end)
4✔
781

782
    abort_reply_fn.(pending_reply_fns)
5✔
783

784
    {:error, plan.error}
5✔
785
  end
786
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc