• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 8b8c459f55a92c8826fdfd5212844c818871bfa8

18 Sep 2025 03:40AM UTC coverage: 65.711% (+1.9%) from 63.832%
8b8c459f55a92c8826fdfd5212844c818871bfa8

push

github

jallum
Merge branch 'hotfix/0.3.0-rc3'

3896 of 5929 relevant lines covered (65.71%)

652.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.82
/lib/bedrock/data_plane/commit_proxy/finalization.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Finalization do
2
  @moduledoc """
3
  Transaction finalization pipeline that handles conflict resolution and log persistence.
4

5
  ## Version Chain Integrity
6

7
  CRITICAL: This module maintains the Lamport clock version chain established by the sequencer.
8
  The sequencer provides both `last_commit_version` and `commit_version` as a proper chain link:
9

10
  - `last_commit_version`: The actual last committed version from the sequencer
11
  - `commit_version`: The new version assigned to this batch
12

13
  Always use the exact version values provided by the sequencer through the batch to maintain
14
  proper MVCC conflict detection and transaction ordering. Version gaps can exist due to failed
15
  transactions, recovery scenarios, or system restarts.
16
  """
17

18
  import Bedrock.DataPlane.CommitProxy.Telemetry,
19
    only: [
20
      trace_commit_proxy_batch_started: 3,
21
      trace_commit_proxy_batch_finished: 4,
22
      trace_commit_proxy_batch_failed: 3
23
    ]
24

25
  import Bitwise, only: [<<<: 2]
26

27
  alias Bedrock.ControlPlane.Config.ServiceDescriptor
28
  alias Bedrock.ControlPlane.Config.StorageTeamDescriptor
29
  alias Bedrock.ControlPlane.Config.TransactionSystemLayout
30
  alias Bedrock.DataPlane.CommitProxy.Batch
31
  alias Bedrock.DataPlane.CommitProxy.ConflictSharding
32
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
33
  alias Bedrock.DataPlane.CommitProxy.Tracing
34
  alias Bedrock.DataPlane.Log
35
  alias Bedrock.DataPlane.Resolver
36
  alias Bedrock.DataPlane.Sequencer
37
  alias Bedrock.DataPlane.Transaction
38
  alias Bedrock.Internal.Time
39
  alias Bedrock.KeyRange
40

41
  @type resolver_fn() :: (Resolver.ref(),
42
                          Bedrock.epoch(),
43
                          Bedrock.version(),
44
                          Bedrock.version(),
45
                          [Transaction.encoded()],
46
                          keyword() ->
47
                            {:ok, [non_neg_integer()]}
48
                            | {:error, term()}
49
                            | {:failure, :timeout, Resolver.ref()}
50
                            | {:failure, :unavailable, Resolver.ref()})
51

52
  @type log_push_batch_fn() :: (TransactionSystemLayout.t(),
53
                                last_commit_version :: Bedrock.version(),
54
                                transactions_by_tag :: %{
55
                                  Bedrock.range_tag() => Transaction.encoded()
56
                                },
57
                                commit_version :: Bedrock.version(),
58
                                opts :: [
59
                                  timeout: Bedrock.timeout_in_ms(),
60
                                  async_stream_fn: async_stream_fn()
61
                                ] ->
62
                                  :ok | {:error, log_push_error()})
63

64
  @type log_push_single_fn() :: (ServiceDescriptor.t(), binary(), Bedrock.version() ->
65
                                   :ok | {:error, :unavailable})
66

67
  @type async_stream_fn() :: (enumerable :: Enumerable.t(), fun :: (term() -> term()), opts :: keyword() ->
68
                                Enumerable.t())
69

70
  @type abort_reply_fn() :: ([Batch.reply_fn()] -> :ok)
71

72
  @type success_reply_fn() :: ([{Batch.reply_fn(), non_neg_integer(), non_neg_integer()}], Bedrock.version() -> :ok)
73

74
  @type timeout_fn() :: (non_neg_integer() -> non_neg_integer())
75

76
  @type sequencer_notify_fn() :: (Sequencer.ref(), Bedrock.version() -> :ok | {:error, term()})
77

78
  @type resolution_error() ::
79
          :timeout
80
          | :unavailable
81
          | {:resolver_unavailable, term()}
82

83
  @type storage_coverage_error() ::
84
          {:storage_team_coverage_error, binary()}
85

86
  @type log_push_error() ::
87
          {:log_failures, [{Log.id(), term()}]}
88
          | {:insufficient_acknowledgments, non_neg_integer(), non_neg_integer(), [{Log.id(), term()}]}
89
          | :log_push_failed
90

91
  @type finalization_error() ::
92
          resolution_error()
93
          | storage_coverage_error()
94
          | log_push_error()
95

96
  # ============================================================================
97
  # Data Structures
98
  # ============================================================================
99

100
  defmodule FinalizationPlan do
101
    @moduledoc """
102
    Pipeline state for transaction finalization using unified transaction storage
103
    for maximum efficiency and clarity.
104
    """
105

106
    @enforce_keys [
107
      :transactions,
108
      :transaction_count,
109
      :commit_version,
110
      :last_commit_version,
111
      :storage_teams,
112
      :logs_by_id
113
    ]
114
    defstruct [
115
      :transactions,
116
      :transaction_count,
117
      :commit_version,
118
      :last_commit_version,
119
      :storage_teams,
120
      :logs_by_id,
121
      transactions_by_log: %{},
122
      replied_indices: MapSet.new(),
123
      aborted_count: 0,
124
      stage: :initialized,
125
      error: nil
126
    ]
127

128
    @type t :: %__MODULE__{
129
            transactions: %{
130
              non_neg_integer() => {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t() | nil}
131
            },
132
            transaction_count: non_neg_integer(),
133
            commit_version: Bedrock.version(),
134
            last_commit_version: Bedrock.version(),
135
            storage_teams: [StorageTeamDescriptor.t()],
136
            logs_by_id: %{Log.id() => [Bedrock.range_tag()]},
137
            transactions_by_log: %{Log.id() => Transaction.encoded()},
138
            replied_indices: MapSet.t(non_neg_integer()),
139
            aborted_count: non_neg_integer(),
140
            stage: atom(),
141
            error: term() | nil
142
          }
143
  end
144

145
  # ============================================================================
146
  # Main Pipeline
147
  # ============================================================================
148

149
  @doc """
150
  Executes the complete transaction finalization pipeline for a batch of transactions.
151

152
  This function processes a batch through a multi-stage pipeline: conflict resolution,
153
  abort notification, log preparation, log persistence, sequencer notification, and
154
  success notification. The pipeline maintains transactional consistency by ensuring
155
  all operations complete successfully or all pending clients are notified of failure.
156

157
  ## Pipeline Stages
158

159
  1. **Conflict Resolution**: Calls resolvers to determine which transactions must be aborted
160
  2. **Abort Notification**: Immediately notifies clients of aborted transactions
161
  3. **Log Preparation**: Distributes successful transaction mutations to appropriate logs
162
  4. **Log Persistence**: Pushes transactions to ALL log servers and waits for acknowledgment
163
  5. **Sequencer Notification**: Reports successful commit version to the sequencer
164
  6. **Success Notification**: Notifies clients of successful transactions with commit version
165

166
  ## Parameters
167

168
    - `batch`: Transaction batch with commit version details from the sequencer
169
    - `transaction_system_layout`: System configuration including resolvers and log servers
170
    - `opts`: Optional functions for testing and configuration overrides
171

172
  ## Returns
173

174
    - `{:ok, n_aborts, n_successes}` - Pipeline completed successfully with counts
175
    - `{:error, finalization_error()}` - Pipeline failed; all pending clients notified of failure
176

177
  ## Error Handling
178

179
  On any pipeline failure, all transactions that haven't been replied to are automatically
180
  notified with abort responses before returning the error.
181
  """
182
  @spec finalize_batch(
183
          Batch.t(),
184
          TransactionSystemLayout.t(),
185
          opts :: [
186
            epoch: Bedrock.epoch(),
187
            precomputed: LayoutOptimization.precomputed_layout() | nil,
188
            resolver_fn: resolver_fn(),
189
            batch_log_push_fn: log_push_batch_fn(),
190
            abort_reply_fn: abort_reply_fn(),
191
            success_reply_fn: success_reply_fn(),
192
            async_stream_fn: async_stream_fn(),
193
            log_push_fn: log_push_single_fn(),
194
            sequencer_notify_fn: sequencer_notify_fn(),
195
            timeout: non_neg_integer()
196
          ]
197
        ) ::
198
          {:ok, n_aborts :: non_neg_integer(), n_oks :: non_neg_integer()}
199
          | {:error, finalization_error()}
200
  def finalize_batch(batch, transaction_system_layout, opts \\ []) do
201
    trace_commit_proxy_batch_started(batch.commit_version, length(batch.buffer), Time.now_in_ms())
40✔
202

203
    fn ->
204
      batch
205
      |> create_finalization_plan(transaction_system_layout)
206
      |> resolve_conflicts(transaction_system_layout, opts)
207
      |> prepare_for_logging()
208
      |> push_to_logs(transaction_system_layout, opts)
209
      |> notify_sequencer(transaction_system_layout.sequencer, opts)
40✔
210
      |> notify_successes(opts)
211
      |> extract_result_or_handle_error(opts)
40✔
212
    end
213
    |> :timer.tc()
214
    |> case do
40✔
215
      {n_usec, {:ok, n_aborts, n_oks}} ->
216
        trace_commit_proxy_batch_finished(batch.commit_version, n_aborts, n_oks, n_usec)
34✔
217
        {:ok, n_aborts, n_oks}
34✔
218

219
      {n_usec, {:error, reason}} ->
220
        trace_commit_proxy_batch_failed(batch, reason, n_usec)
6✔
221
        {:error, reason}
222
    end
223
  end
224

225
  # ============================================================================
226
  # Pipeline Initialization
227
  # ============================================================================
228

229
  @spec create_finalization_plan(Batch.t(), TransactionSystemLayout.t()) :: FinalizationPlan.t()
230
  def create_finalization_plan(batch, transaction_system_layout) do
231
    %FinalizationPlan{
47✔
232
      transactions: Map.new(batch.buffer, &{elem(&1, 0), &1}),
47✔
233
      transaction_count: Batch.transaction_count(batch),
234
      commit_version: batch.commit_version,
47✔
235
      last_commit_version: batch.last_commit_version,
47✔
236
      storage_teams: transaction_system_layout.storage_teams,
47✔
237
      logs_by_id: transaction_system_layout.logs,
47✔
238
      stage: :ready_for_resolution
239
    }
240
  end
241

242
  # ============================================================================
243
  # Conflict Resolution
244
  # ============================================================================
245

246
  @spec resolve_conflicts(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
247
          FinalizationPlan.t()
248
  def resolve_conflicts(%FinalizationPlan{stage: :ready_for_resolution} = plan, layout, opts) do
249
    epoch = Keyword.get(opts, :epoch) || raise "Missing epoch in finalization opts"
40✔
250
    precomputed_layout = Keyword.get(opts, :precomputed) || raise "Missing precomputed layout in finalization opts"
40✔
251

252
    resolver_transaction_map =
40✔
253
      if plan.transaction_count == 0 do
40✔
254
        Map.new(layout.resolvers, fn {_key, ref} -> {ref, []} end)
3✔
255
      else
256
        # Create and await resolver tasks within the finalization process
257
        maps =
37✔
258
          for idx <- 0..(plan.transaction_count - 1) do
37✔
259
            {_idx, _reply_fn, transaction, _task} = Map.fetch!(plan.transactions, idx)
132✔
260
            task = create_resolver_task_in_finalization(transaction, precomputed_layout)
132✔
261
            Task.await(task, 5000)
132✔
262
          end
263

264
        Map.new(layout.resolvers, fn {_key, ref} ->
37✔
265
          transactions = Enum.map(maps, &Map.fetch!(&1, ref))
37✔
266
          {ref, transactions}
267
        end)
268
      end
269

270
    case call_all_resolvers_with_map(
40✔
271
           resolver_transaction_map,
272
           epoch,
273
           plan.last_commit_version,
40✔
274
           plan.commit_version,
40✔
275
           layout.resolvers,
40✔
276
           opts
277
         ) do
278
      {:ok, aborted_set} ->
279
        split_and_notify_aborts_with_set(%{plan | stage: :conflicts_resolved}, aborted_set, opts)
38✔
280

281
      {:error, reason} ->
282
        %{plan | error: reason, stage: :failed}
2✔
283
    end
284
  end
285

286
  @spec call_all_resolvers_with_map(
287
          %{Resolver.ref() => [Transaction.encoded()]},
288
          Bedrock.epoch(),
289
          Bedrock.version(),
290
          Bedrock.version(),
291
          [{start_key :: Bedrock.key(), Resolver.ref()}],
292
          keyword()
293
        ) :: {:ok, MapSet.t(non_neg_integer())} | {:error, term()}
294
  defp call_all_resolvers_with_map(resolver_transaction_map, epoch, last_version, commit_version, resolvers, opts) do
295
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
40✔
296
    timeout = Keyword.get(opts, :timeout, 5_000)
40✔
297

298
    resolvers
299
    |> async_stream_fn.(
300
      fn {_start_key, ref} ->
301
        # Every resolver must have transactions after task processing
302
        filtered_transactions = Map.fetch!(resolver_transaction_map, ref)
38✔
303
        call_resolver_with_retry(ref, epoch, last_version, commit_version, filtered_transactions, opts)
38✔
304
      end,
305
      timeout: timeout
306
    )
307
    |> Enum.reduce_while({:ok, MapSet.new()}, fn
40✔
308
      {:ok, {:ok, aborted}}, {:ok, acc} ->
36✔
309
        {:cont, {:ok, Enum.into(aborted, acc)}}
310

311
      {:ok, {:error, reason}}, _ ->
2✔
312
        {:halt, {:error, reason}}
313

314
      {:exit, reason}, _ ->
×
315
        {:halt, {:error, {:resolver_exit, reason}}}
316
    end)
317
  end
318

319
  @spec call_resolver_with_retry(
320
          Resolver.ref(),
321
          Bedrock.epoch(),
322
          Bedrock.version(),
323
          Bedrock.version(),
324
          [Transaction.encoded()],
325
          keyword(),
326
          non_neg_integer()
327
        ) :: {:ok, [non_neg_integer()]} | {:error, term()}
328
  defp call_resolver_with_retry(
329
         ref,
330
         epoch,
331
         last_version,
332
         commit_version,
333
         filtered_transactions,
334
         opts,
335
         attempts_used \\ 0
38✔
336
       ) do
337
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
42✔
338
    resolver_fn = Keyword.get(opts, :resolver_fn, &Resolver.resolve_transactions/6)
42✔
339
    max_attempts = Keyword.get(opts, :max_attempts, 3)
42✔
340

341
    timeout_in_ms = timeout_fn.(attempts_used)
42✔
342

343
    case resolver_fn.(ref, epoch, last_version, commit_version, filtered_transactions, timeout: timeout_in_ms) do
42✔
344
      {:ok, _} = success ->
345
        success
36✔
346

347
      {:error, reason} when reason in [:timeout, :unavailable] and attempts_used < max_attempts - 1 ->
348
        Tracing.emit_resolver_retry(max_attempts - attempts_used - 2, attempts_used + 1, reason)
2✔
349

350
        call_resolver_with_retry(
2✔
351
          ref,
352
          epoch,
353
          last_version,
354
          commit_version,
355
          filtered_transactions,
356
          opts,
357
          attempts_used + 1
358
        )
359

360
      {:failure, reason, _ref} when reason in [:timeout, :unavailable] and attempts_used < max_attempts - 1 ->
361
        Tracing.emit_resolver_retry(max_attempts - attempts_used - 2, attempts_used + 1, reason)
2✔
362

363
        call_resolver_with_retry(
2✔
364
          ref,
365
          epoch,
366
          last_version,
367
          commit_version,
368
          filtered_transactions,
369
          opts,
370
          attempts_used + 1
371
        )
372

373
      {:error, reason} when reason in [:timeout, :unavailable] ->
374
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
1✔
375
        {:error, {:resolver_unavailable, reason}}
376

377
      {:failure, reason, _ref} when reason in [:timeout, :unavailable] ->
378
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
1✔
379
        {:error, {:resolver_unavailable, reason}}
380

381
      {:error, reason} ->
×
382
        {:error, reason}
383
    end
384
  end
385

386
  @spec default_timeout_fn(non_neg_integer()) :: non_neg_integer()
387
  def default_timeout_fn(attempts_used), do: 500 * (1 <<< attempts_used)
42✔
388

389
  @spec create_resolver_task_in_finalization(Transaction.encoded(), LayoutOptimization.precomputed_layout()) :: Task.t()
390
  defp create_resolver_task_in_finalization(transaction, %{resolver_refs: [single_ref], resolver_ends: _ends}) do
391
    Task.async(fn ->
132✔
392
      sections = Transaction.extract_sections!(transaction, [:read_conflicts, :write_conflicts])
132✔
393
      %{single_ref => sections}
132✔
394
    end)
395
  end
396

397
  defp create_resolver_task_in_finalization(transaction, %{resolver_refs: refs, resolver_ends: ends}) do
398
    Task.async(fn ->
×
399
      sections = Transaction.extract_sections!(transaction, [:read_conflicts, :write_conflicts])
×
400
      ConflictSharding.shard_conflicts_across_resolvers(sections, ends, refs)
×
401
    end)
402
  end
403

404
  @spec split_and_notify_aborts_with_set(FinalizationPlan.t(), MapSet.t(non_neg_integer()), keyword()) ::
405
          FinalizationPlan.t()
406
  defp split_and_notify_aborts_with_set(%FinalizationPlan{stage: :conflicts_resolved} = plan, aborted_set, opts) do
407
    abort_reply_fn =
38✔
408
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
409

410
    # Reply to aborted transactions
411
    aborted_set
412
    |> Enum.map(fn idx ->
413
      {_idx, reply_fn, _binary, _task} = Map.fetch!(plan.transactions, idx)
4✔
414
      reply_fn
4✔
415
    end)
416
    |> abort_reply_fn.()
38✔
417

418
    # Track that we've replied to these transactions and count them as aborted
419
    %{plan | replied_indices: aborted_set, aborted_count: MapSet.size(aborted_set), stage: :aborts_notified}
38✔
420
  end
421

422
  @spec reply_to_all_clients_with_aborted_transactions([Batch.reply_fn()]) :: :ok
423
  def reply_to_all_clients_with_aborted_transactions([]), do: :ok
36✔
424
  def reply_to_all_clients_with_aborted_transactions(aborts), do: Enum.each(aborts, & &1.({:error, :aborted}))
7✔
425

426
  # ============================================================================
427
  # Log Preparation
428
  # ============================================================================
429

430
  @spec prepare_for_logging(FinalizationPlan.t()) :: FinalizationPlan.t()
431
  def prepare_for_logging(%FinalizationPlan{stage: :failed} = plan), do: plan
2✔
432

433
  def prepare_for_logging(%FinalizationPlan{stage: :aborts_notified} = plan) do
434
    case build_transactions_for_logs(plan, plan.logs_by_id) do
38✔
435
      {:ok, transactions_by_log} ->
436
        %{plan | transactions_by_log: transactions_by_log, stage: :ready_for_logging}
38✔
437

438
      {:error, reason} ->
439
        %{plan | error: reason, stage: :failed}
×
440
    end
441
  end
442

443
  @spec build_transactions_for_logs(FinalizationPlan.t(), %{Log.id() => [Bedrock.range_tag()]}) ::
444
          {:ok, %{Log.id() => Transaction.encoded()}} | {:error, term()}
445
  defp build_transactions_for_logs(plan, logs_by_id) do
446
    initial_mutations_by_log =
38✔
447
      logs_by_id
448
      |> Map.keys()
449
      |> Map.new(&{&1, []})
39✔
450

451
    plan.transactions
38✔
452
    |> Enum.reduce_while(
453
      {:ok, initial_mutations_by_log},
454
      fn {idx, entry}, {:ok, acc} ->
455
        process_transaction_for_logs({idx, entry}, plan, logs_by_id, acc)
130✔
456
      end
457
    )
458
    |> case do
38✔
459
      {:ok, mutations_by_log} ->
460
        result =
38✔
461
          Map.new(mutations_by_log, fn {log_id, mutations_list} ->
462
            # Use the existing encode approach for transaction building
463
            encoded =
39✔
464
              Transaction.encode(%{
465
                mutations: Enum.reverse(mutations_list),
466
                commit_version: plan.commit_version
39✔
467
              })
468

469
            {log_id, encoded}
470
          end)
471

472
        {:ok, result}
473

474
      {:error, reason} ->
×
475
        {:error, reason}
476
    end
477
  end
478

479
  @spec process_transaction_for_logs(
480
          {non_neg_integer(), {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t() | nil}},
481
          FinalizationPlan.t(),
482
          %{Log.id() => [Bedrock.range_tag()]},
483
          %{Log.id() => [term()]}
484
        ) ::
485
          {:cont, {:ok, %{Log.id() => [term()]}}}
486
          | {:halt, {:error, term()}}
487
  defp process_transaction_for_logs({idx, {_idx, _reply_fn, binary, _task}}, plan, logs_by_id, acc) do
488
    if MapSet.member?(plan.replied_indices, idx) do
130✔
489
      # Skip transactions that were already replied to (aborted)
490
      {:cont, {:ok, acc}}
491
    else
492
      process_transaction_mutations(binary, plan.storage_teams, logs_by_id, acc)
126✔
493
    end
494
  end
495

496
  @spec process_transaction_mutations(
497
          binary(),
498
          [StorageTeamDescriptor.t()],
499
          %{Log.id() => [Bedrock.range_tag()]},
500
          %{Log.id() => [term()]}
501
        ) ::
502
          {:cont, {:ok, %{Log.id() => [term()]}}} | {:halt, {:error, term()}}
503
  defp process_transaction_mutations(binary_transaction, storage_teams, logs_by_id, acc) do
504
    case Transaction.mutations(binary_transaction) do
126✔
505
      {:ok, mutations_stream} ->
506
        case process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
126✔
507
          {:ok, updated_acc} ->
126✔
508
            {:cont, {:ok, updated_acc}}
509

510
          {:error, reason} ->
×
511
            {:halt, {:error, reason}}
512
        end
513

514
      {:error, :section_not_found} ->
×
515
        {:cont, {:ok, acc}}
516

517
      {:error, reason} ->
×
518
        {:halt, {:error, {:mutation_extraction_failed, reason}}}
519
    end
520
  end
521

522
  @spec process_mutations_for_transaction(
523
          Enumerable.t(),
524
          [StorageTeamDescriptor.t()],
525
          %{Log.id() => [Bedrock.range_tag()]},
526
          %{Log.id() => [term()]}
527
        ) ::
528
          {:ok, %{Log.id() => [term()]}} | {:error, term()}
529
  defp process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
530
    Enum.reduce_while(mutations_stream, {:ok, acc}, fn mutation, {:ok, mutations_acc} ->
126✔
531
      distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc)
126✔
532
    end)
533
  end
534

535
  @spec distribute_mutation_to_logs(
536
          term(),
537
          [StorageTeamDescriptor.t()],
538
          %{Log.id() => [Bedrock.range_tag()]},
539
          %{Log.id() => [term()]}
540
        ) ::
541
          {:cont, {:ok, %{Log.id() => [term()]}}}
542
          | {:halt, {:error, term()}}
543
  defp distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc) do
544
    key_or_range = mutation_to_key_or_range(mutation)
126✔
545

546
    case key_or_range_to_tags(key_or_range, storage_teams) do
126✔
547
      {:ok, []} ->
×
548
        {:halt, {:error, {:storage_team_coverage_error, key_or_range}}}
549

550
      {:ok, affected_tags} ->
551
        affected_logs = find_logs_for_tags(affected_tags, logs_by_id)
126✔
552

553
        updated_acc =
126✔
554
          Enum.reduce(affected_logs, mutations_acc, fn log_id, acc_inner ->
555
            Map.update!(acc_inner, log_id, &[mutation | &1])
127✔
556
          end)
557

558
        {:cont, {:ok, updated_acc}}
559
    end
560
  end
561

562
  @spec mutation_to_key_or_range(
563
          {:set, Bedrock.key(), Bedrock.value()}
564
          | {:clear, Bedrock.key()}
565
          | {:clear_range, Bedrock.key(), Bedrock.key()}
566
          | {:atomic, atom(), Bedrock.key(), Bedrock.value()}
567
        ) ::
568
          Bedrock.key() | {Bedrock.key(), Bedrock.key()}
569
  def mutation_to_key_or_range({:set, key, _value}), do: key
126✔
570
  def mutation_to_key_or_range({:clear, key}), do: key
1✔
571
  def mutation_to_key_or_range({:clear_range, start_key, end_key}), do: {start_key, end_key}
2✔
572
  def mutation_to_key_or_range({:atomic, _op, key, _value}), do: key
×
573

574
  @spec key_or_range_to_tags(Bedrock.key() | Bedrock.key_range(), [StorageTeamDescriptor.t()]) ::
575
          {:ok, [Bedrock.range_tag()]}
576
  def key_or_range_to_tags({start_key, end_key}, storage_teams) do
577
    tags =
5✔
578
      for %{tag: tag, key_range: {team_start, team_end}} <- storage_teams,
14✔
579
          ranges_intersect?(start_key, end_key, team_start, team_end) do
580
        tag
581
      end
582

583
    {:ok, tags}
584
  end
585

586
  def key_or_range_to_tags(key, storage_teams) do
587
    tags =
140✔
588
      for %{tag: tag, key_range: {min_key, max_key_ex}} <- storage_teams,
171✔
589
          KeyRange.contains?({min_key, max_key_ex}, key) do
590
        tag
591
      end
592

593
    {:ok, tags}
594
  end
595

596
  @spec find_logs_for_tags([Bedrock.range_tag()], %{Log.id() => [Bedrock.range_tag()]}) :: [Log.id()]
597
  def find_logs_for_tags(tags, logs_by_id) do
598
    tag_set = MapSet.new(tags)
131✔
599

600
    logs_by_id
601
    |> Enum.filter(fn {_log_id, log_tags} ->
602
      Enum.any?(log_tags, &MapSet.member?(tag_set, &1))
148✔
603
    end)
604
    |> Enum.map(fn {log_id, _log_tags} -> log_id end)
131✔
605
  end
606

607
  @spec ranges_intersect?(Bedrock.key(), Bedrock.key() | :end, Bedrock.key(), Bedrock.key() | :end) :: boolean()
608
  defp ranges_intersect?(_start1, :end, _start2, :end), do: true
1✔
609
  defp ranges_intersect?(start1, :end, _start2, end2), do: start1 < end2
2✔
610
  defp ranges_intersect?(_start1, end1, start2, :end), do: end1 > start2
3✔
611
  defp ranges_intersect?(start1, end1, start2, end2), do: start1 < end2 and end1 > start2
8✔
612

613
  # ============================================================================
614
  # Log Distribution
615
  # ============================================================================
616

617
  @spec push_to_logs(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) :: FinalizationPlan.t()
618
  def push_to_logs(%FinalizationPlan{stage: :failed} = plan, _layout, _opts), do: plan
2✔
619

620
  def push_to_logs(%FinalizationPlan{stage: :ready_for_logging} = plan, layout, opts) do
621
    batch_log_push_fn = Keyword.get(opts, :batch_log_push_fn, &push_transaction_to_logs_direct/5)
38✔
622

623
    case batch_log_push_fn.(layout, plan.last_commit_version, plan.transactions_by_log, plan.commit_version, opts) do
38✔
624
      :ok ->
625
        %{plan | stage: :logged}
35✔
626

627
      {:error, reason} ->
628
        %{plan | error: reason, stage: :failed}
3✔
629
    end
630
  end
631

632
  @spec resolve_log_descriptors(%{Log.id() => term()}, %{term() => ServiceDescriptor.t()}) :: %{
633
          Log.id() => ServiceDescriptor.t()
634
        }
635
  def resolve_log_descriptors(log_descriptors, services) do
636
    log_descriptors
637
    |> Map.keys()
638
    |> Enum.map(&{&1, Map.get(services, &1)})
41✔
639
    |> Enum.reject(&is_nil(elem(&1, 1)))
41✔
640
    |> Map.new()
36✔
641
  end
642

643
  @spec try_to_push_transaction_to_log(ServiceDescriptor.t(), binary(), Bedrock.version()) ::
644
          :ok | {:error, :unavailable}
645
  def try_to_push_transaction_to_log(%{kind: :log, status: {:up, log_server}}, transaction, last_commit_version) do
646
    Log.push(log_server, transaction, last_commit_version)
43✔
647
  end
648

649
  def try_to_push_transaction_to_log(_, _, _), do: {:error, :unavailable}
1✔
650

651
  @doc """
652
  Pushes transactions directly to logs and waits for acknowledgement from ALL log servers.
653

654
  This function takes transactions that have already been built per log and pushes them
655
  to the appropriate log servers. Each log receives its pre-built transaction.
656
  All logs must acknowledge to maintain durability guarantees.
657

658
  ## Parameters
659

660
    - `transaction_system_layout`: Contains configuration information about the
661
      transaction system, including available log servers.
662
    - `last_commit_version`: The last known committed version; used to
663
      ensure consistency in log ordering.
664
    - `transactions_by_log`: Map of log_id to transaction for that log.
665
      May be empty transactions if all transactions were aborted.
666
    - `commit_version`: The version assigned by the sequencer for this batch.
667
    - `opts`: Optional configuration for testing and customization.
668

669
  ## Options
670
    - `:async_stream_fn` - Function for parallel processing (default: Task.async_stream/3)
671
    - `:log_push_fn` - Function for pushing to individual logs (default: try_to_push_transaction_to_log/3)
672
    - `:timeout` - Timeout for log push operations (default: 5_000ms)
673

674
  ## Returns
675
    - `:ok` if acknowledgements have been received from ALL log servers.
676
    - `{:error, log_push_error()}` if any log has not successfully acknowledged the
677
       push within the timeout period or other errors occur.
678
  """
679
  @spec push_transaction_to_logs_direct(
680
          TransactionSystemLayout.t(),
681
          last_commit_version :: Bedrock.version(),
682
          %{Log.id() => Transaction.encoded()},
683
          commit_version :: Bedrock.version(),
684
          opts :: [
685
            async_stream_fn: async_stream_fn(),
686
            log_push_fn: log_push_single_fn(),
687
            timeout: non_neg_integer()
688
          ]
689
        ) :: :ok | {:error, log_push_error()}
690
  def push_transaction_to_logs_direct(
691
        transaction_system_layout,
692
        last_commit_version,
693
        transactions_by_log,
694
        _commit_version,
695
        opts \\ []
×
696
      ) do
697
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
36✔
698
    log_push_fn = Keyword.get(opts, :log_push_fn, &try_to_push_transaction_to_log/3)
36✔
699
    timeout = Keyword.get(opts, :timeout, 5_000)
36✔
700

701
    logs_by_id = transaction_system_layout.logs
36✔
702
    required_acknowledgments = map_size(logs_by_id)
36✔
703

704
    logs_by_id
705
    |> resolve_log_descriptors(transaction_system_layout.services)
36✔
706
    |> async_stream_fn.(
707
      fn {log_id, service_descriptor} ->
708
        encoded_transaction = Map.get(transactions_by_log, log_id)
40✔
709
        result = log_push_fn.(service_descriptor, encoded_transaction, last_commit_version)
40✔
710
        {log_id, result}
711
      end,
712
      timeout: timeout
713
    )
714
    |> Enum.reduce_while({0, []}, fn
715
      {:ok, {log_id, {:error, reason}}}, {_count, errors} ->
3✔
716
        {:halt, {:error, [{log_id, reason} | errors]}}
717

718
      {:ok, {_log_id, :ok}}, {count, errors} ->
719
        count = 1 + count
38✔
720

721
        if count == required_acknowledgments do
38✔
722
          {:halt, {:ok, count}}
723
        else
724
          {:cont, {count, errors}}
725
        end
726

727
      {:exit, {log_id, reason}}, {_count, errors} ->
×
728
        {:halt, {:error, [{log_id, reason} | errors]}}
729
    end)
730
    |> case do
36✔
731
      {:ok, ^required_acknowledgments} ->
33✔
732
        :ok
733

734
      {:error, errors} ->
3✔
735
        {:error, {:log_failures, errors}}
736

737
      {count, errors} when count < required_acknowledgments ->
×
738
        {:error, {:insufficient_acknowledgments, count, required_acknowledgments, errors}}
739

740
      _other ->
×
741
        {:error, :log_push_failed}
742
    end
743
  end
744

745
  # ============================================================================
746
  # Sequencer Notification
747
  # ============================================================================
748

749
  @spec notify_sequencer(FinalizationPlan.t(), Sequencer.ref(), keyword()) :: FinalizationPlan.t()
750
  def notify_sequencer(%FinalizationPlan{stage: :failed} = plan, _sequencer, _opts), do: plan
5✔
751

752
  def notify_sequencer(%FinalizationPlan{stage: :logged} = plan, sequencer, opts) do
753
    sequencer_notify_fn = Keyword.get(opts, :sequencer_notify_fn, &Sequencer.report_successful_commit/2)
35✔
754

755
    case sequencer_notify_fn.(sequencer, plan.commit_version) do
35✔
756
      :ok ->
757
        %{plan | stage: :sequencer_notified}
34✔
758

759
      {:error, reason} ->
760
        %{plan | error: reason, stage: :failed}
1✔
761
    end
762
  end
763

764
  # ============================================================================
765
  # Success Notification
766
  # ============================================================================
767

768
  @spec notify_successes(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
769
  def notify_successes(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
6✔
770

771
  def notify_successes(%FinalizationPlan{stage: :sequencer_notified} = plan, opts) do
772
    success_reply_fn = Keyword.get(opts, :success_reply_fn, &send_reply_with_commit_version_and_index/2)
34✔
773

774
    successful_entries =
34✔
775
      plan.transactions
34✔
776
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
125✔
777
      |> Enum.map(fn {idx, {tx_idx, reply_fn, _binary, _task}} -> {reply_fn, tx_idx, idx} end)
121✔
778

779
    successful_indices = Enum.map(successful_entries, fn {_reply_fn, _tx_idx, idx} -> idx end)
34✔
780

781
    success_reply_fn.(successful_entries, plan.commit_version)
34✔
782

783
    %{plan | replied_indices: MapSet.union(plan.replied_indices, MapSet.new(successful_indices)), stage: :completed}
34✔
784
  end
785

786
  @spec send_reply_with_commit_version([Batch.reply_fn()], Bedrock.version()) :: :ok
787
  def send_reply_with_commit_version(oks, commit_version), do: Enum.each(oks, & &1.({:ok, commit_version}))
×
788

789
  @spec send_reply_with_commit_version_and_index(
790
          [{Batch.reply_fn(), non_neg_integer(), non_neg_integer()}],
791
          Bedrock.version()
792
        ) :: :ok
793
  def send_reply_with_commit_version_and_index(entries, commit_version) do
794
    Enum.each(entries, fn {reply_fn, tx_idx, _plan_idx} ->
34✔
795
      reply_fn.({:ok, commit_version, tx_idx})
121✔
796
    end)
797
  end
798

799
  # ============================================================================
800
  # Result Extraction and Error Handling
801
  # ============================================================================
802

803
  @spec extract_result_or_handle_error(FinalizationPlan.t(), keyword()) ::
804
          {:ok, non_neg_integer(), non_neg_integer()} | {:error, finalization_error()}
805
  def extract_result_or_handle_error(%FinalizationPlan{stage: :completed} = plan, _opts) do
806
    n_aborts = plan.aborted_count
34✔
807
    n_successes = plan.transaction_count - n_aborts
34✔
808

809
    {:ok, n_aborts, n_successes}
34✔
810
  end
811

812
  def extract_result_or_handle_error(%FinalizationPlan{stage: :failed} = plan, opts), do: handle_error(plan, opts)
6✔
813

814
  @spec handle_error(FinalizationPlan.t(), keyword()) :: {:error, finalization_error()}
815
  defp handle_error(%FinalizationPlan{error: error} = plan, opts) when not is_nil(error) do
816
    abort_reply_fn =
6✔
817
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
818

819
    # Notify all transactions that haven't been replied to yet
820
    pending_reply_fns =
6✔
821
      plan.transactions
6✔
822
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
7✔
823
      |> Enum.map(fn {_idx, {_tx_idx, reply_fn, _binary, _task}} -> reply_fn end)
7✔
824

825
    abort_reply_fn.(pending_reply_fns)
6✔
826

827
    {:error, plan.error}
6✔
828
  end
829
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc