• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 984a90d50e5214a5922f87eaa21b526a5950cda6-PR-43

06 Sep 2025 09:12PM UTC coverage: 63.695% (+0.02%) from 63.674%
984a90d50e5214a5922f87eaa21b526a5950cda6-PR-43

Pull #43

github

jallum
Merge remote-tracking branch 'origin/develop' into feature/rework_commit_proxy
Pull Request #43: Add conflict sharding with async resolver assignment

68 of 99 new or added lines in 8 files covered. (68.69%)

3 existing lines in 2 files now uncovered.

3265 of 5126 relevant lines covered (63.69%)

620.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.84
/lib/bedrock/data_plane/commit_proxy/finalization.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Finalization do
2
  @moduledoc """
3
  Transaction finalization pipeline that handles conflict resolution and log persistence.
4

5
  ## Version Chain Integrity
6

7
  CRITICAL: This module maintains the Lamport clock version chain established by the sequencer.
8
  The sequencer provides both `last_commit_version` and `commit_version` as a proper chain link:
9

10
  - `last_commit_version`: The actual last committed version from the sequencer
11
  - `commit_version`: The new version assigned to this batch
12

13
  Always use the exact version values provided by the sequencer through the batch to maintain
14
  proper MVCC conflict detection and transaction ordering. Version gaps can exist due to failed
15
  transactions, recovery scenarios, or system restarts.
16
  """
17

18
  import Bitwise, only: [<<<: 2]
19

20
  alias Bedrock.ControlPlane.Config.ServiceDescriptor
21
  alias Bedrock.ControlPlane.Config.StorageTeamDescriptor
22
  alias Bedrock.ControlPlane.Config.TransactionSystemLayout
23
  alias Bedrock.DataPlane.CommitProxy.Batch
24
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
25
  alias Bedrock.DataPlane.CommitProxy.Tracing
26
  alias Bedrock.DataPlane.Log
27
  alias Bedrock.DataPlane.Resolver
28
  alias Bedrock.DataPlane.Sequencer
29
  alias Bedrock.DataPlane.Transaction
30
  alias Bedrock.KeyRange
31

32
  @type resolver_fn() :: (Resolver.ref(),
33
                          Bedrock.epoch(),
34
                          Bedrock.version(),
35
                          Bedrock.version(),
36
                          [Resolver.transaction_summary()],
37
                          keyword() ->
38
                            {:ok, [non_neg_integer()]} | {:error, term()})
39

40
  @type log_push_batch_fn() :: (TransactionSystemLayout.t(),
41
                                last_commit_version :: Bedrock.version(),
42
                                transactions_by_tag :: %{
43
                                  Bedrock.range_tag() => Transaction.encoded()
44
                                },
45
                                commit_version :: Bedrock.version(),
46
                                opts :: [
47
                                  timeout: Bedrock.timeout_in_ms(),
48
                                  async_stream_fn: async_stream_fn()
49
                                ] ->
50
                                  :ok | {:error, log_push_error()})
51

52
  @type log_push_single_fn() :: (ServiceDescriptor.t(), binary(), Bedrock.version() ->
53
                                   :ok | {:error, :unavailable})
54

55
  @type async_stream_fn() :: (enumerable :: Enumerable.t(), fun :: (term() -> term()), opts :: keyword() ->
56
                                Enumerable.t())
57

58
  @type abort_reply_fn() :: ([Batch.reply_fn()] -> :ok)
59

60
  @type success_reply_fn() :: ([Batch.reply_fn()], Bedrock.version() -> :ok)
61

62
  @type timeout_fn() :: (non_neg_integer() -> non_neg_integer())
63

64
  @type sequencer_notify_fn() :: (Sequencer.ref(), Bedrock.version() -> :ok | {:error, term()})
65

66
  @type resolution_error() ::
67
          :timeout
68
          | :unavailable
69
          | {:resolver_unavailable, term()}
70

71
  @type storage_coverage_error() ::
72
          {:storage_team_coverage_error, binary()}
73

74
  @type log_push_error() ::
75
          {:log_failures, [{Log.id(), term()}]}
76
          | {:insufficient_acknowledgments, non_neg_integer(), non_neg_integer(), [{Log.id(), term()}]}
77
          | :log_push_failed
78

79
  @type finalization_error() ::
80
          resolution_error()
81
          | storage_coverage_error()
82
          | log_push_error()
83

84
  # ============================================================================
85
  # Data Structures
86
  # ============================================================================
87

88
  defmodule FinalizationPlan do
89
    @moduledoc """
90
    Pipeline state for transaction finalization using unified transaction storage
91
    for maximum efficiency and clarity.
92
    """
93

94
    @enforce_keys [
95
      :transactions,
96
      :transaction_count,
97
      :commit_version,
98
      :last_commit_version,
99
      :storage_teams,
100
      :logs_by_id
101
    ]
102
    defstruct [
103
      :transactions,
104
      :transaction_count,
105
      :commit_version,
106
      :last_commit_version,
107
      :storage_teams,
108
      :logs_by_id,
109
      transactions_by_log: %{},
110
      replied_indices: MapSet.new(),
111
      aborted_count: 0,
112
      stage: :initialized,
113
      error: nil
114
    ]
115

116
    @type t :: %__MODULE__{
117
            transactions: %{
118
              non_neg_integer() => {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t()}
119
            },
120
            transaction_count: non_neg_integer(),
121
            commit_version: Bedrock.version(),
122
            last_commit_version: Bedrock.version(),
123
            storage_teams: [StorageTeamDescriptor.t()],
124
            logs_by_id: %{Log.id() => [Bedrock.range_tag()]},
125
            transactions_by_log: %{Log.id() => Transaction.encoded()},
126
            replied_indices: MapSet.t(non_neg_integer()),
127
            aborted_count: non_neg_integer(),
128
            stage: atom(),
129
            error: term() | nil
130
          }
131
  end
132

133
  # ============================================================================
134
  # Main Pipeline
135
  # ============================================================================
136

137
  @doc """
138
  Executes the complete transaction finalization pipeline for a batch of transactions.
139

140
  This function processes a batch through a multi-stage pipeline: conflict resolution,
141
  abort notification, log preparation, log persistence, sequencer notification, and
142
  success notification. The pipeline maintains transactional consistency by ensuring
143
  all operations complete successfully or all pending clients are notified of failure.
144

145
  ## Pipeline Stages
146

147
  1. **Conflict Resolution**: Calls resolvers to determine which transactions must be aborted
148
  2. **Abort Notification**: Immediately notifies clients of aborted transactions
149
  3. **Log Preparation**: Distributes successful transaction mutations to appropriate logs
150
  4. **Log Persistence**: Pushes transactions to ALL log servers and waits for acknowledgment
151
  5. **Sequencer Notification**: Reports successful commit version to the sequencer
152
  6. **Success Notification**: Notifies clients of successful transactions with commit version
153

154
  ## Parameters
155

156
    - `batch`: Transaction batch with commit version details from the sequencer
157
    - `transaction_system_layout`: System configuration including resolvers and log servers
158
    - `opts`: Optional functions for testing and configuration overrides
159

160
  ## Returns
161

162
    - `{:ok, n_aborts, n_successes}` - Pipeline completed successfully with counts
163
    - `{:error, finalization_error()}` - Pipeline failed; all pending clients notified of failure
164

165
  ## Error Handling
166

167
  On any pipeline failure, all transactions that haven't been replied to are automatically
168
  notified with abort responses before returning the error.
169
  """
170
  @spec finalize_batch(
171
          Batch.t(),
172
          TransactionSystemLayout.t(),
173
          opts :: [
174
            epoch: Bedrock.epoch(),
175
            precomputed: LayoutOptimization.precomputed_layout() | nil,
176
            resolver_fn: resolver_fn(),
177
            batch_log_push_fn: log_push_batch_fn(),
178
            abort_reply_fn: abort_reply_fn(),
179
            success_reply_fn: success_reply_fn(),
180
            async_stream_fn: async_stream_fn(),
181
            log_push_fn: log_push_single_fn(),
182
            sequencer_notify_fn: sequencer_notify_fn(),
183
            timeout: non_neg_integer()
184
          ]
185
        ) ::
186
          {:ok, n_aborts :: non_neg_integer(), n_oks :: non_neg_integer()}
187
          | {:error, finalization_error()}
188
  def finalize_batch(batch, transaction_system_layout, opts \\ []) do
189
    batch
190
    |> create_finalization_plan(transaction_system_layout)
191
    |> resolve_conflicts(transaction_system_layout, opts)
192
    |> prepare_for_logging()
193
    |> push_to_logs(transaction_system_layout, opts)
194
    |> notify_sequencer(transaction_system_layout.sequencer, opts)
14✔
195
    |> notify_successes(opts)
196
    |> extract_result_or_handle_error(opts)
14✔
197
  end
198

199
  # ============================================================================
200
  # Pipeline Initialization
201
  # ============================================================================
202

203
  @spec create_finalization_plan(Batch.t(), TransactionSystemLayout.t()) :: FinalizationPlan.t()
204
  def create_finalization_plan(batch, transaction_system_layout) do
205
    %FinalizationPlan{
21✔
206
      transactions: Map.new(batch.buffer, &{elem(&1, 0), &1}),
21✔
207
      transaction_count: Batch.transaction_count(batch),
208
      commit_version: batch.commit_version,
21✔
209
      last_commit_version: batch.last_commit_version,
21✔
210
      storage_teams: transaction_system_layout.storage_teams,
21✔
211
      logs_by_id: transaction_system_layout.logs,
21✔
212
      stage: :ready_for_resolution
213
    }
214
  end
215

216
  # ============================================================================
217
  # Conflict Resolution
218
  # ============================================================================
219

220
  @spec resolve_conflicts(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
221
          FinalizationPlan.t()
222
  def resolve_conflicts(%FinalizationPlan{stage: :ready_for_resolution} = plan, layout, opts) do
223
    epoch = Keyword.get(opts, :epoch) || raise "Missing epoch in finalization opts"
14✔
224

225
    resolver_transaction_map =
14✔
226
      if plan.transaction_count == 0 do
14✔
227
        Map.new(layout.resolvers, fn {_key, ref} -> {ref, []} end)
3✔
228
      else
229
        maps =
11✔
230
          for idx <- 0..(plan.transaction_count - 1) do
11✔
231
            {_idx, _reply_fn, _transaction, task} = Map.fetch!(plan.transactions, idx)
17✔
232
            Task.await(task, 5000)
17✔
233
          end
234

235
        Map.new(layout.resolvers, fn {_key, ref} ->
11✔
236
          transactions = Enum.map(maps, &Map.fetch!(&1, ref))
11✔
237
          {ref, transactions}
238
        end)
239
      end
240

241
    case call_all_resolvers_with_map(
14✔
242
           resolver_transaction_map,
243
           epoch,
244
           plan.last_commit_version,
14✔
245
           plan.commit_version,
14✔
246
           layout.resolvers,
14✔
247
           opts
248
         ) do
249
      {:ok, aborted_set} ->
250
        split_and_notify_aborts_with_set(%{plan | stage: :conflicts_resolved}, aborted_set, opts)
12✔
251

252
      {:error, reason} ->
253
        %{plan | error: reason, stage: :failed}
2✔
254
    end
255
  end
256

257
  @spec call_all_resolvers_with_map(
258
          %{Resolver.ref() => [Transaction.encoded()]},
259
          Bedrock.epoch(),
260
          Bedrock.version(),
261
          Bedrock.version(),
262
          [{start_key :: Bedrock.key(), Resolver.ref()}],
263
          keyword()
264
        ) :: {:ok, MapSet.t(non_neg_integer())} | {:error, term()}
265
  defp call_all_resolvers_with_map(resolver_transaction_map, epoch, last_version, commit_version, resolvers, opts) do
266
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
14✔
267
    timeout = Keyword.get(opts, :timeout, 5_000)
14✔
268

269
    resolvers
270
    |> async_stream_fn.(
271
      fn {_start_key, ref} ->
272
        # Every resolver must have transactions after task processing
273
        filtered_transactions = Map.fetch!(resolver_transaction_map, ref)
12✔
274
        call_resolver_with_retry(ref, epoch, last_version, commit_version, filtered_transactions, opts)
12✔
275
      end,
276
      timeout: timeout
277
    )
278
    |> Enum.reduce_while({:ok, MapSet.new()}, fn
14✔
279
      {:ok, {:ok, aborted}}, {:ok, acc} ->
10✔
280
        {:cont, {:ok, Enum.into(aborted, acc)}}
281

282
      {:ok, {:error, reason}}, _ ->
2✔
283
        {:halt, {:error, reason}}
284

NEW
285
      {:exit, reason}, _ ->
×
286
        {:halt, {:error, {:resolver_exit, reason}}}
287
    end)
288
  end
289

290
  @spec call_resolver_with_retry(
291
          Resolver.ref(),
292
          Bedrock.epoch(),
293
          Bedrock.version(),
294
          Bedrock.version(),
295
          [Transaction.encoded()],
296
          keyword(),
297
          non_neg_integer()
298
        ) :: {:ok, [non_neg_integer()]} | {:error, term()}
299
  defp call_resolver_with_retry(
300
         ref,
301
         epoch,
302
         last_version,
303
         commit_version,
304
         filtered_transactions,
305
         opts,
306
         attempts_used \\ 0
12✔
307
       ) do
308
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
16✔
309
    resolver_fn = Keyword.get(opts, :resolver_fn, &Resolver.resolve_transactions/6)
16✔
310
    max_attempts = Keyword.get(opts, :max_attempts, 3)
16✔
311

312
    timeout_in_ms = timeout_fn.(attempts_used)
16✔
313

314
    case resolver_fn.(ref, epoch, last_version, commit_version, filtered_transactions, timeout: timeout_in_ms) do
16✔
315
      {:ok, _} = success ->
316
        success
10✔
317

318
      {:error, reason} when reason in [:timeout, :unavailable] and attempts_used < max_attempts - 1 ->
319
        Tracing.emit_resolver_retry(max_attempts - attempts_used - 2, attempts_used + 1, reason)
4✔
320

321
        call_resolver_with_retry(
4✔
322
          ref,
323
          epoch,
324
          last_version,
325
          commit_version,
326
          filtered_transactions,
327
          opts,
328
          attempts_used + 1
329
        )
330

331
      {:error, reason} when reason in [:timeout, :unavailable] ->
332
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
2✔
333
        {:error, {:resolver_unavailable, reason}}
334

335
      {:error, reason} ->
×
336
        {:error, reason}
337
    end
338
  end
339

340
  @spec default_timeout_fn(non_neg_integer()) :: non_neg_integer()
341
  def default_timeout_fn(attempts_used), do: 500 * (1 <<< attempts_used)
16✔
342

343
  @spec split_and_notify_aborts_with_set(FinalizationPlan.t(), MapSet.t(non_neg_integer()), keyword()) ::
344
          FinalizationPlan.t()
345
  defp split_and_notify_aborts_with_set(%FinalizationPlan{stage: :conflicts_resolved} = plan, aborted_set, opts) do
346
    abort_reply_fn =
12✔
347
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
348

349
    # Reply to aborted transactions
350
    aborted_set
351
    |> Enum.map(fn idx ->
352
      {_idx, reply_fn, _binary, _task} = Map.fetch!(plan.transactions, idx)
4✔
353
      reply_fn
4✔
354
    end)
355
    |> abort_reply_fn.()
12✔
356

357
    # Track that we've replied to these transactions and count them as aborted
358
    %{plan | replied_indices: aborted_set, aborted_count: MapSet.size(aborted_set), stage: :aborts_notified}
12✔
359
  end
360

361
  @spec reply_to_all_clients_with_aborted_transactions([Batch.reply_fn()]) :: :ok
362
  def reply_to_all_clients_with_aborted_transactions([]), do: :ok
10✔
363
  def reply_to_all_clients_with_aborted_transactions(aborts), do: Enum.each(aborts, & &1.({:error, :aborted}))
6✔
364

365
  # ============================================================================
366
  # Log Preparation
367
  # ============================================================================
368

369
  @spec prepare_for_logging(FinalizationPlan.t()) :: FinalizationPlan.t()
370
  def prepare_for_logging(%FinalizationPlan{stage: :failed} = plan), do: plan
2✔
371

372
  def prepare_for_logging(%FinalizationPlan{stage: :aborts_notified} = plan) do
373
    case build_transactions_for_logs(plan, plan.logs_by_id) do
12✔
374
      {:ok, transactions_by_log} ->
375
        %{plan | transactions_by_log: transactions_by_log, stage: :ready_for_logging}
12✔
376

377
      {:error, reason} ->
378
        %{plan | error: reason, stage: :failed}
×
379
    end
380
  end
381

382
  @spec build_transactions_for_logs(FinalizationPlan.t(), %{Log.id() => [Bedrock.range_tag()]}) ::
383
          {:ok, %{Log.id() => Transaction.encoded()}} | {:error, term()}
384
  defp build_transactions_for_logs(plan, logs_by_id) do
385
    initial_mutations_by_log =
12✔
386
      logs_by_id
387
      |> Map.keys()
388
      |> Map.new(&{&1, []})
13✔
389

390
    plan.transactions
12✔
391
    |> Enum.reduce_while(
392
      {:ok, initial_mutations_by_log},
393
      fn {idx, entry}, {:ok, acc} ->
394
        process_transaction_for_logs({idx, entry}, plan, logs_by_id, acc)
15✔
395
      end
396
    )
397
    |> case do
12✔
398
      {:ok, mutations_by_log} ->
399
        result =
12✔
400
          Map.new(mutations_by_log, fn {log_id, mutations_list} ->
401
            # Use the existing encode approach for transaction building
402
            encoded =
13✔
403
              Transaction.encode(%{
404
                mutations: Enum.reverse(mutations_list),
405
                commit_version: plan.commit_version
13✔
406
              })
407

408
            {log_id, encoded}
409
          end)
410

411
        {:ok, result}
412

413
      {:error, reason} ->
×
414
        {:error, reason}
415
    end
416
  end
417

418
  @spec process_transaction_for_logs(
419
          {non_neg_integer(), {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t()}},
420
          FinalizationPlan.t(),
421
          %{Log.id() => [Bedrock.range_tag()]},
422
          %{Log.id() => [term()]}
423
        ) ::
424
          {:cont, {:ok, %{Log.id() => [term()]}}}
425
          | {:halt, {:error, term()}}
426
  defp process_transaction_for_logs({idx, {_idx, _reply_fn, binary, _task}}, plan, logs_by_id, acc) do
427
    if MapSet.member?(plan.replied_indices, idx) do
15✔
428
      # Skip transactions that were already replied to (aborted)
429
      {:cont, {:ok, acc}}
430
    else
431
      process_transaction_mutations(binary, plan.storage_teams, logs_by_id, acc)
11✔
432
    end
433
  end
434

435
  @spec process_transaction_mutations(
436
          binary(),
437
          [StorageTeamDescriptor.t()],
438
          %{Log.id() => [Bedrock.range_tag()]},
439
          %{Log.id() => [term()]}
440
        ) ::
441
          {:cont, {:ok, %{Log.id() => [term()]}}} | {:halt, {:error, term()}}
442
  defp process_transaction_mutations(binary_transaction, storage_teams, logs_by_id, acc) do
443
    case Transaction.mutations(binary_transaction) do
11✔
444
      {:ok, mutations_stream} ->
445
        case process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
11✔
446
          {:ok, updated_acc} ->
11✔
447
            {:cont, {:ok, updated_acc}}
448

449
          {:error, reason} ->
×
450
            {:halt, {:error, reason}}
451
        end
452

NEW
453
      {:error, :section_not_found} ->
×
454
        {:cont, {:ok, acc}}
455

UNCOV
456
      {:error, reason} ->
×
457
        {:halt, {:error, {:mutation_extraction_failed, reason}}}
458
    end
459
  end
460

461
  @spec process_mutations_for_transaction(
462
          Enumerable.t(),
463
          [StorageTeamDescriptor.t()],
464
          %{Log.id() => [Bedrock.range_tag()]},
465
          %{Log.id() => [term()]}
466
        ) ::
467
          {:ok, %{Log.id() => [term()]}} | {:error, term()}
468
  defp process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
469
    Enum.reduce_while(mutations_stream, {:ok, acc}, fn mutation, {:ok, mutations_acc} ->
11✔
470
      distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc)
11✔
471
    end)
472
  end
473

474
  @spec distribute_mutation_to_logs(
475
          term(),
476
          [StorageTeamDescriptor.t()],
477
          %{Log.id() => [Bedrock.range_tag()]},
478
          %{Log.id() => [term()]}
479
        ) ::
480
          {:cont, {:ok, %{Log.id() => [term()]}}}
481
          | {:halt, {:error, term()}}
482
  defp distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc) do
483
    key_or_range = mutation_to_key_or_range(mutation)
11✔
484

485
    case key_or_range_to_tags(key_or_range, storage_teams) do
11✔
486
      {:ok, []} ->
×
487
        {:halt, {:error, {:storage_team_coverage_error, key_or_range}}}
488

489
      {:ok, affected_tags} ->
490
        affected_logs = find_logs_for_tags(affected_tags, logs_by_id)
11✔
491

492
        updated_acc =
11✔
493
          Enum.reduce(affected_logs, mutations_acc, fn log_id, acc_inner ->
494
            Map.update!(acc_inner, log_id, &[mutation | &1])
12✔
495
          end)
496

497
        {:cont, {:ok, updated_acc}}
498
    end
499
  end
500

501
  @spec mutation_to_key_or_range(
502
          {:set, Bedrock.key(), Bedrock.value()}
503
          | {:clear, Bedrock.key()}
504
          | {:clear_range, Bedrock.key(), Bedrock.key()}
505
        ) ::
506
          Bedrock.key() | {Bedrock.key(), Bedrock.key()}
507
  def mutation_to_key_or_range({:set, key, _value}), do: key
11✔
508
  def mutation_to_key_or_range({:clear, key}), do: key
1✔
509
  def mutation_to_key_or_range({:clear_range, start_key, end_key}), do: {start_key, end_key}
2✔
510

511
  @spec key_or_range_to_tags(Bedrock.key() | Bedrock.key_range(), [StorageTeamDescriptor.t()]) ::
512
          {:ok, [Bedrock.range_tag()]}
513
  def key_or_range_to_tags({start_key, end_key}, storage_teams) do
514
    tags =
5✔
515
      for %{tag: tag, key_range: {team_start, team_end}} <- storage_teams,
14✔
516
          ranges_intersect?(start_key, end_key, team_start, team_end) do
517
        tag
518
      end
519

520
    {:ok, tags}
521
  end
522

523
  def key_or_range_to_tags(key, storage_teams) do
524
    tags =
25✔
525
      for %{tag: tag, key_range: {min_key, max_key_ex}} <- storage_teams,
56✔
526
          KeyRange.contains?({min_key, max_key_ex}, key) do
527
        tag
528
      end
529

530
    {:ok, tags}
531
  end
532

533
  @spec find_logs_for_tags([Bedrock.range_tag()], %{Log.id() => [Bedrock.range_tag()]}) :: [Log.id()]
534
  def find_logs_for_tags(tags, logs_by_id) do
535
    tag_set = MapSet.new(tags)
16✔
536

537
    logs_by_id
538
    |> Enum.filter(fn {_log_id, log_tags} ->
539
      Enum.any?(log_tags, &MapSet.member?(tag_set, &1))
33✔
540
    end)
541
    |> Enum.map(fn {log_id, _log_tags} -> log_id end)
16✔
542
  end
543

544
  @spec ranges_intersect?(Bedrock.key(), Bedrock.key() | :end, Bedrock.key(), Bedrock.key() | :end) :: boolean()
545
  defp ranges_intersect?(_start1, :end, _start2, :end), do: true
1✔
546
  defp ranges_intersect?(start1, :end, _start2, end2), do: start1 < end2
2✔
547
  defp ranges_intersect?(_start1, end1, start2, :end), do: end1 > start2
3✔
548
  defp ranges_intersect?(start1, end1, start2, end2), do: start1 < end2 and end1 > start2
8✔
549

550
  # ============================================================================
551
  # Log Distribution
552
  # ============================================================================
553

554
  @spec push_to_logs(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) :: FinalizationPlan.t()
555
  def push_to_logs(%FinalizationPlan{stage: :failed} = plan, _layout, _opts), do: plan
2✔
556

557
  def push_to_logs(%FinalizationPlan{stage: :ready_for_logging} = plan, layout, opts) do
558
    batch_log_push_fn = Keyword.get(opts, :batch_log_push_fn, &push_transaction_to_logs_direct/5)
12✔
559

560
    case batch_log_push_fn.(layout, plan.last_commit_version, plan.transactions_by_log, plan.commit_version, opts) do
12✔
561
      :ok ->
562
        %{plan | stage: :logged}
10✔
563

564
      {:error, reason} ->
565
        %{plan | error: reason, stage: :failed}
2✔
566
    end
567
  end
568

569
  @spec resolve_log_descriptors(%{Log.id() => term()}, %{term() => ServiceDescriptor.t()}) :: %{
570
          Log.id() => ServiceDescriptor.t()
571
        }
572
  def resolve_log_descriptors(log_descriptors, services) do
573
    log_descriptors
574
    |> Map.keys()
575
    |> Enum.map(&{&1, Map.get(services, &1)})
15✔
576
    |> Enum.reject(&is_nil(elem(&1, 1)))
15✔
577
    |> Map.new()
10✔
578
  end
579

580
  @spec try_to_push_transaction_to_log(ServiceDescriptor.t(), binary(), Bedrock.version()) ::
581
          :ok | {:error, :unavailable}
582
  def try_to_push_transaction_to_log(%{kind: :log, status: {:up, log_server}}, transaction, last_commit_version) do
583
    Log.push(log_server, transaction, last_commit_version)
17✔
584
  end
585

586
  def try_to_push_transaction_to_log(_, _, _), do: {:error, :unavailable}
1✔
587

588
  @doc """
589
  Pushes transactions directly to logs and waits for acknowledgement from ALL log servers.
590

591
  This function takes transactions that have already been built per log and pushes them
592
  to the appropriate log servers. Each log receives its pre-built transaction.
593
  All logs must acknowledge to maintain durability guarantees.
594

595
  ## Parameters
596

597
    - `transaction_system_layout`: Contains configuration information about the
598
      transaction system, including available log servers.
599
    - `last_commit_version`: The last known committed version; used to
600
      ensure consistency in log ordering.
601
    - `transactions_by_log`: Map of log_id to transaction for that log.
602
      May be empty transactions if all transactions were aborted.
603
    - `commit_version`: The version assigned by the sequencer for this batch.
604
    - `opts`: Optional configuration for testing and customization.
605

606
  ## Options
607
    - `:async_stream_fn` - Function for parallel processing (default: Task.async_stream/3)
608
    - `:log_push_fn` - Function for pushing to individual logs (default: try_to_push_transaction_to_log/3)
609
    - `:timeout` - Timeout for log push operations (default: 5_000ms)
610

611
  ## Returns
612
    - `:ok` if acknowledgements have been received from ALL log servers.
613
    - `{:error, log_push_error()}` if any log has not successfully acknowledged the
614
       push within the timeout period or other errors occur.
615
  """
616
  @spec push_transaction_to_logs_direct(
617
          TransactionSystemLayout.t(),
618
          last_commit_version :: Bedrock.version(),
619
          %{Log.id() => Transaction.encoded()},
620
          commit_version :: Bedrock.version(),
621
          opts :: [
622
            async_stream_fn: async_stream_fn(),
623
            log_push_fn: log_push_single_fn(),
624
            timeout: non_neg_integer()
625
          ]
626
        ) :: :ok | {:error, log_push_error()}
627
  def push_transaction_to_logs_direct(
628
        transaction_system_layout,
629
        last_commit_version,
630
        transactions_by_log,
631
        _commit_version,
632
        opts \\ []
×
633
      ) do
634
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
10✔
635
    log_push_fn = Keyword.get(opts, :log_push_fn, &try_to_push_transaction_to_log/3)
10✔
636
    timeout = Keyword.get(opts, :timeout, 5_000)
10✔
637

638
    logs_by_id = transaction_system_layout.logs
10✔
639
    required_acknowledgments = map_size(logs_by_id)
10✔
640

641
    logs_by_id
642
    |> resolve_log_descriptors(transaction_system_layout.services)
10✔
643
    |> async_stream_fn.(
644
      fn {log_id, service_descriptor} ->
645
        encoded_transaction = Map.get(transactions_by_log, log_id)
14✔
646
        result = log_push_fn.(service_descriptor, encoded_transaction, last_commit_version)
14✔
647
        {log_id, result}
648
      end,
649
      timeout: timeout
650
    )
651
    |> Enum.reduce_while({0, []}, fn
652
      {:ok, {log_id, {:error, reason}}}, {_count, errors} ->
2✔
653
        {:halt, {:error, [{log_id, reason} | errors]}}
654

655
      {:ok, {_log_id, :ok}}, {count, errors} ->
656
        count = 1 + count
13✔
657

658
        if count == required_acknowledgments do
13✔
659
          {:halt, {:ok, count}}
660
        else
661
          {:cont, {count, errors}}
662
        end
663

664
      {:exit, {log_id, reason}}, {_count, errors} ->
×
665
        {:halt, {:error, [{log_id, reason} | errors]}}
666
    end)
667
    |> case do
10✔
668
      {:ok, ^required_acknowledgments} ->
8✔
669
        :ok
670

671
      {:error, errors} ->
2✔
672
        {:error, {:log_failures, errors}}
673

674
      {count, errors} when count < required_acknowledgments ->
×
675
        {:error, {:insufficient_acknowledgments, count, required_acknowledgments, errors}}
676

677
      _other ->
×
678
        {:error, :log_push_failed}
679
    end
680
  end
681

682
  # ============================================================================
683
  # Sequencer Notification
684
  # ============================================================================
685

686
  @spec notify_sequencer(FinalizationPlan.t(), Sequencer.ref(), keyword()) :: FinalizationPlan.t()
687
  def notify_sequencer(%FinalizationPlan{stage: :failed} = plan, _sequencer, _opts), do: plan
4✔
688

689
  def notify_sequencer(%FinalizationPlan{stage: :logged} = plan, sequencer, opts) do
690
    sequencer_notify_fn = Keyword.get(opts, :sequencer_notify_fn, &Sequencer.report_successful_commit/2)
10✔
691

692
    case sequencer_notify_fn.(sequencer, plan.commit_version) do
10✔
693
      :ok ->
694
        %{plan | stage: :sequencer_notified}
9✔
695

696
      {:error, reason} ->
697
        %{plan | error: reason, stage: :failed}
1✔
698
    end
699
  end
700

701
  # ============================================================================
702
  # Success Notification
703
  # ============================================================================
704

705
  @spec notify_successes(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
706
  def notify_successes(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
5✔
707

708
  def notify_successes(%FinalizationPlan{stage: :sequencer_notified} = plan, opts) do
709
    success_reply_fn = Keyword.get(opts, :success_reply_fn, &send_reply_with_commit_version/2)
9✔
710

711
    {successful_reply_fns, successful_indices} =
9✔
712
      plan.transactions
9✔
713
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
13✔
714
      |> Enum.map(fn {idx, {_tx_idx, reply_fn, _binary, _task}} -> {reply_fn, idx} end)
9✔
715
      |> Enum.unzip()
716

717
    success_reply_fn.(successful_reply_fns, plan.commit_version)
9✔
718

719
    %{plan | replied_indices: MapSet.union(plan.replied_indices, MapSet.new(successful_indices)), stage: :completed}
9✔
720
  end
721

722
  @spec send_reply_with_commit_version([Batch.reply_fn()], Bedrock.version()) :: :ok
723
  def send_reply_with_commit_version(oks, commit_version), do: Enum.each(oks, & &1.({:ok, commit_version}))
9✔
724

725
  # ============================================================================
726
  # Result Extraction and Error Handling
727
  # ============================================================================
728

729
  @spec extract_result_or_handle_error(FinalizationPlan.t(), keyword()) ::
730
          {:ok, non_neg_integer(), non_neg_integer()} | {:error, finalization_error()}
731
  def extract_result_or_handle_error(%FinalizationPlan{stage: :completed} = plan, _opts) do
732
    n_aborts = plan.aborted_count
9✔
733
    n_successes = plan.transaction_count - n_aborts
9✔
734

735
    {:ok, n_aborts, n_successes}
9✔
736
  end
737

738
  def extract_result_or_handle_error(%FinalizationPlan{stage: :failed} = plan, opts), do: handle_error(plan, opts)
5✔
739

740
  @spec handle_error(FinalizationPlan.t(), keyword()) :: {:error, finalization_error()}
741
  defp handle_error(%FinalizationPlan{error: error} = plan, opts) when not is_nil(error) do
742
    abort_reply_fn =
5✔
743
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
744

745
    # Notify all transactions that haven't been replied to yet
746
    pending_reply_fns =
5✔
747
      plan.transactions
5✔
748
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
4✔
749
      |> Enum.map(fn {_idx, {_tx_idx, reply_fn, _binary, _task}} -> reply_fn end)
4✔
750

751
    abort_reply_fn.(pending_reply_fns)
5✔
752

753
    {:error, plan.error}
5✔
754
  end
755
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc