• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / fb3defeb951c4b0ad9552be01797f0e4e04d962f-PR-43

03 Sep 2025 03:45AM UTC coverage: 63.433% (+0.005%) from 63.428%
fb3defeb951c4b0ad9552be01797f0e4e04d962f-PR-43

Pull #43

github

jallum
Add conflict sharding with async resolver assignment

Move expensive conflict distribution off critical path by computing resolver
assignments asynchronously during batching instead of blocking transaction
acceptance. Improves throughput and reduces latency.

- Add ConflictSharding module to distribute conflicts by key range
- Implement async Task-based resolver assignment in batching
- Add LayoutOptimization for precomputed static structures
- Update Batch and finalization pipeline for Task integration
- Extract KeyRange.overlap? utility, remove ResolutionPlan
Pull Request #43: Add conflict sharding with async resolver assignment

63 of 95 new or added lines in 10 files covered. (66.32%)

2 existing lines in 2 files now uncovered.

3027 of 4772 relevant lines covered (63.43%)

615.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.36
/lib/bedrock/data_plane/commit_proxy/finalization.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Finalization do
2
  @moduledoc """
3
  Transaction finalization pipeline that handles conflict resolution and log persistence.
4

5
  ## Version Chain Integrity
6

7
  CRITICAL: This module maintains the Lamport clock version chain established by the sequencer.
8
  The sequencer provides both `last_commit_version` and `commit_version` as a proper chain link:
9

10
  - `last_commit_version`: The actual last committed version from the sequencer
11
  - `commit_version`: The new version assigned to this batch
12

13
  Always use the exact version values provided by the sequencer through the batch to maintain
14
  proper MVCC conflict detection and transaction ordering. Version gaps can exist due to failed
15
  transactions, recovery scenarios, or system restarts.
16
  """
17

18
  import Bedrock.KeyRange, only: [key_in_range?: 3]
19
  import Bitwise, only: [<<<: 2]
20

21
  alias Bedrock.ControlPlane.Config.ServiceDescriptor
22
  alias Bedrock.ControlPlane.Config.StorageTeamDescriptor
23
  alias Bedrock.ControlPlane.Config.TransactionSystemLayout
24
  alias Bedrock.DataPlane.CommitProxy.Batch
25
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
26
  alias Bedrock.DataPlane.CommitProxy.Tracing
27
  alias Bedrock.DataPlane.Log
28
  alias Bedrock.DataPlane.Resolver
29
  alias Bedrock.DataPlane.Sequencer
30
  alias Bedrock.DataPlane.Transaction
31

32
  @type resolver_fn() :: (Resolver.ref(),
33
                          Bedrock.epoch(),
34
                          Bedrock.version(),
35
                          Bedrock.version(),
36
                          [Resolver.transaction_summary()],
37
                          keyword() ->
38
                            {:ok, [non_neg_integer()]} | {:error, term()})
39

40
  @type log_push_batch_fn() :: (TransactionSystemLayout.t(),
41
                                last_commit_version :: Bedrock.version(),
42
                                transactions_by_tag :: %{
43
                                  Bedrock.range_tag() => Transaction.encoded()
44
                                },
45
                                commit_version :: Bedrock.version(),
46
                                opts :: [
47
                                  timeout: Bedrock.timeout_in_ms(),
48
                                  async_stream_fn: async_stream_fn()
49
                                ] ->
50
                                  :ok | {:error, log_push_error()})
51

52
  @type log_push_single_fn() :: (ServiceDescriptor.t(), binary(), Bedrock.version() ->
53
                                   :ok | {:error, :unavailable})
54

55
  @type async_stream_fn() :: (enumerable :: Enumerable.t(), fun :: (term() -> term()), opts :: keyword() ->
56
                                Enumerable.t())
57

58
  @type abort_reply_fn() :: ([Batch.reply_fn()] -> :ok)
59

60
  @type success_reply_fn() :: ([Batch.reply_fn()], Bedrock.version() -> :ok)
61

62
  @type timeout_fn() :: (non_neg_integer() -> non_neg_integer())
63

64
  @type sequencer_notify_fn() :: (Sequencer.ref(), Bedrock.version() -> :ok | {:error, term()})
65

66
  @type resolution_error() ::
67
          :timeout
68
          | :unavailable
69
          | {:resolver_unavailable, term()}
70

71
  @type storage_coverage_error() ::
72
          {:storage_team_coverage_error, binary()}
73

74
  @type log_push_error() ::
75
          {:log_failures, [{Log.id(), term()}]}
76
          | {:insufficient_acknowledgments, non_neg_integer(), non_neg_integer(), [{Log.id(), term()}]}
77
          | :log_push_failed
78

79
  @type finalization_error() ::
80
          resolution_error()
81
          | storage_coverage_error()
82
          | log_push_error()
83

84
  # ============================================================================
85
  # Data Structures
86
  # ============================================================================
87

88
  defmodule FinalizationPlan do
89
    @moduledoc """
90
    Pipeline state for transaction finalization using unified transaction storage
91
    for maximum efficiency and clarity.
92
    """
93

94
    @enforce_keys [
95
      :transactions,
96
      :transaction_count,
97
      :commit_version,
98
      :last_commit_version,
99
      :storage_teams,
100
      :logs_by_id
101
    ]
102
    defstruct [
103
      :transactions,
104
      :transaction_count,
105
      :commit_version,
106
      :last_commit_version,
107
      :storage_teams,
108
      :logs_by_id,
109
      transactions_by_log: %{},
110
      replied_indices: MapSet.new(),
111
      aborted_count: 0,
112
      stage: :initialized,
113
      error: nil
114
    ]
115

116
    @type t :: %__MODULE__{
117
            transactions: %{
118
              non_neg_integer() => {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t()}
119
            },
120
            transaction_count: non_neg_integer(),
121
            commit_version: Bedrock.version(),
122
            last_commit_version: Bedrock.version(),
123
            storage_teams: [StorageTeamDescriptor.t()],
124
            logs_by_id: %{Log.id() => [Bedrock.range_tag()]},
125
            transactions_by_log: %{Log.id() => Transaction.encoded()},
126
            replied_indices: MapSet.t(non_neg_integer()),
127
            aborted_count: non_neg_integer(),
128
            stage: atom(),
129
            error: term() | nil
130
          }
131
  end
132

133
  # ============================================================================
134
  # Main Pipeline
135
  # ============================================================================
136

137
  @doc """
138
  Executes the complete transaction finalization pipeline for a batch of transactions.
139

140
  This function processes a batch through a multi-stage pipeline: conflict resolution,
141
  abort notification, log preparation, log persistence, sequencer notification, and
142
  success notification. The pipeline maintains transactional consistency by ensuring
143
  all operations complete successfully or all pending clients are notified of failure.
144

145
  ## Pipeline Stages
146

147
  1. **Conflict Resolution**: Calls resolvers to determine which transactions must be aborted
148
  2. **Abort Notification**: Immediately notifies clients of aborted transactions
149
  3. **Log Preparation**: Distributes successful transaction mutations to appropriate logs
150
  4. **Log Persistence**: Pushes transactions to ALL log servers and waits for acknowledgment
151
  5. **Sequencer Notification**: Reports successful commit version to the sequencer
152
  6. **Success Notification**: Notifies clients of successful transactions with commit version
153

154
  ## Parameters
155

156
    - `batch`: Transaction batch with commit version details from the sequencer
157
    - `transaction_system_layout`: System configuration including resolvers and log servers
158
    - `opts`: Optional functions for testing and configuration overrides
159

160
  ## Returns
161

162
    - `{:ok, n_aborts, n_successes}` - Pipeline completed successfully with counts
163
    - `{:error, finalization_error()}` - Pipeline failed; all pending clients notified of failure
164

165
  ## Error Handling
166

167
  On any pipeline failure, all transactions that haven't been replied to are automatically
168
  notified with abort responses before returning the error.
169
  """
170
  @spec finalize_batch(
171
          Batch.t(),
172
          TransactionSystemLayout.t(),
173
          opts :: [
174
            epoch: Bedrock.epoch(),
175
            precomputed: LayoutOptimization.precomputed_layout() | nil,
176
            resolver_fn: resolver_fn(),
177
            batch_log_push_fn: log_push_batch_fn(),
178
            abort_reply_fn: abort_reply_fn(),
179
            success_reply_fn: success_reply_fn(),
180
            async_stream_fn: async_stream_fn(),
181
            log_push_fn: log_push_single_fn(),
182
            sequencer_notify_fn: sequencer_notify_fn(),
183
            timeout: non_neg_integer()
184
          ]
185
        ) ::
186
          {:ok, n_aborts :: non_neg_integer(), n_oks :: non_neg_integer()}
187
          | {:error, finalization_error()}
188
  def finalize_batch(batch, transaction_system_layout, opts \\ []) do
189
    batch
190
    |> create_finalization_plan(transaction_system_layout)
191
    |> resolve_conflicts(transaction_system_layout, opts)
192
    |> prepare_for_logging()
193
    |> push_to_logs(transaction_system_layout, opts)
194
    |> notify_sequencer(transaction_system_layout.sequencer, opts)
14✔
195
    |> notify_successes(opts)
196
    |> extract_result_or_handle_error(opts)
14✔
197
  end
198

199
  # ============================================================================
200
  # Pipeline Initialization
201
  # ============================================================================
202

203
  @spec create_finalization_plan(Batch.t(), TransactionSystemLayout.t()) :: FinalizationPlan.t()
204
  def create_finalization_plan(batch, transaction_system_layout) do
205
    %FinalizationPlan{
21✔
206
      transactions: Map.new(batch.buffer, &{elem(&1, 0), &1}),
21✔
207
      transaction_count: Batch.transaction_count(batch),
208
      commit_version: batch.commit_version,
21✔
209
      last_commit_version: batch.last_commit_version,
21✔
210
      storage_teams: transaction_system_layout.storage_teams,
21✔
211
      logs_by_id: transaction_system_layout.logs,
21✔
212
      stage: :ready_for_resolution
213
    }
214
  end
215

216
  # ============================================================================
217
  # Conflict Resolution
218
  # ============================================================================
219

220
  @spec resolve_conflicts(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
221
          FinalizationPlan.t()
222
  def resolve_conflicts(%FinalizationPlan{stage: :ready_for_resolution} = plan, layout, opts) do
223
    epoch = Keyword.get(opts, :epoch) || raise "Missing epoch in finalization opts"
14✔
224

225
    resolver_transaction_map =
14✔
226
      if plan.transaction_count == 0 do
14✔
227
        Map.new(layout.resolvers, fn {_key, ref} -> {ref, []} end)
3✔
228
      else
229
        maps =
11✔
230
          for idx <- 0..(plan.transaction_count - 1) do
11✔
231
            {_idx, _reply_fn, _transaction, task} = Map.fetch!(plan.transactions, idx)
17✔
232
            Task.await(task, 5000)
17✔
233
          end
234

235
        Map.new(layout.resolvers, fn {_key, ref} ->
11✔
236
          transactions = Enum.map(maps, &Map.fetch!(&1, ref))
11✔
237
          {ref, transactions}
238
        end)
239
      end
240

241
    case call_all_resolvers_with_map(
14✔
242
           resolver_transaction_map,
243
           epoch,
244
           plan.last_commit_version,
14✔
245
           plan.commit_version,
14✔
246
           layout.resolvers,
14✔
247
           opts
248
         ) do
249
      {:ok, aborted_set} ->
250
        split_and_notify_aborts_with_set(%{plan | stage: :conflicts_resolved}, aborted_set, opts)
12✔
251

252
      {:error, reason} ->
253
        %{plan | error: reason, stage: :failed}
2✔
254
    end
255
  end
256

257
  @spec call_all_resolvers_with_map(
258
          %{Resolver.ref() => [Transaction.encoded()]},
259
          Bedrock.epoch(),
260
          Bedrock.version(),
261
          Bedrock.version(),
262
          [{start_key :: Bedrock.key(), Resolver.ref()}],
263
          keyword()
264
        ) :: {:ok, MapSet.t(non_neg_integer())} | {:error, term()}
265
  defp call_all_resolvers_with_map(resolver_transaction_map, epoch, last_version, commit_version, resolvers, opts) do
266
    resolvers
267
    |> Enum.map(fn {_start_key, ref} ->
268
      # Every resolver must have transactions after task processing
269
      filtered_transactions = Map.fetch!(resolver_transaction_map, ref)
12✔
270

271
      call_resolver_with_retry(ref, epoch, last_version, commit_version, filtered_transactions, opts)
12✔
272
    end)
273
    |> Enum.reduce({:ok, MapSet.new()}, fn
14✔
274
      {:ok, aborted}, {:ok, acc} ->
10✔
275
        {:ok, Enum.into(aborted, acc)}
276

277
      {:error, reason}, _ ->
2✔
278
        {:error, reason}
279
    end)
280
  end
281

282
  @spec call_resolver_with_retry(
283
          Resolver.ref(),
284
          Bedrock.epoch(),
285
          Bedrock.version(),
286
          Bedrock.version(),
287
          [Transaction.encoded()],
288
          keyword(),
289
          non_neg_integer()
290
        ) :: {:ok, [non_neg_integer()]} | {:error, term()}
291
  defp call_resolver_with_retry(
292
         ref,
293
         epoch,
294
         last_version,
295
         commit_version,
296
         filtered_transactions,
297
         opts,
298
         attempts_used \\ 0
12✔
299
       ) do
300
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
16✔
301
    resolver_fn = Keyword.get(opts, :resolver_fn, &Resolver.resolve_transactions/6)
16✔
302
    max_attempts = Keyword.get(opts, :max_attempts, 3)
16✔
303

304
    timeout_in_ms = timeout_fn.(attempts_used)
16✔
305

306
    case resolver_fn.(ref, epoch, last_version, commit_version, filtered_transactions, timeout: timeout_in_ms) do
16✔
307
      {:ok, _} = success ->
308
        success
10✔
309

310
      {:error, reason} when reason in [:timeout, :unavailable] and attempts_used < max_attempts - 1 ->
311
        Tracing.emit_resolver_retry(max_attempts - attempts_used - 2, attempts_used + 1, reason)
4✔
312

313
        call_resolver_with_retry(
4✔
314
          ref,
315
          epoch,
316
          last_version,
317
          commit_version,
318
          filtered_transactions,
319
          opts,
320
          attempts_used + 1
321
        )
322

323
      {:error, reason} when reason in [:timeout, :unavailable] ->
324
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
2✔
325
        {:error, {:resolver_unavailable, reason}}
326

327
      {:error, reason} ->
×
328
        {:error, reason}
329
    end
330
  end
331

332
  @spec default_timeout_fn(non_neg_integer()) :: non_neg_integer()
333
  def default_timeout_fn(attempts_used), do: 500 * (1 <<< attempts_used)
16✔
334

335
  @spec split_and_notify_aborts_with_set(FinalizationPlan.t(), MapSet.t(non_neg_integer()), keyword()) ::
336
          FinalizationPlan.t()
337
  defp split_and_notify_aborts_with_set(%FinalizationPlan{stage: :conflicts_resolved} = plan, aborted_set, opts) do
338
    abort_reply_fn =
12✔
339
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
340

341
    # Reply to aborted transactions
342
    aborted_set
343
    |> Enum.map(fn idx ->
344
      {_idx, reply_fn, _binary, _task} = Map.fetch!(plan.transactions, idx)
4✔
345
      reply_fn
4✔
346
    end)
347
    |> abort_reply_fn.()
12✔
348

349
    # Track that we've replied to these transactions and count them as aborted
350
    %{plan | replied_indices: aborted_set, aborted_count: MapSet.size(aborted_set), stage: :aborts_notified}
12✔
351
  end
352

353
  @spec reply_to_all_clients_with_aborted_transactions([Batch.reply_fn()]) :: :ok
354
  def reply_to_all_clients_with_aborted_transactions([]), do: :ok
10✔
355
  def reply_to_all_clients_with_aborted_transactions(aborts), do: Enum.each(aborts, & &1.({:error, :aborted}))
6✔
356

357
  # ============================================================================
358
  # Log Preparation
359
  # ============================================================================
360

361
  @spec prepare_for_logging(FinalizationPlan.t()) :: FinalizationPlan.t()
362
  def prepare_for_logging(%FinalizationPlan{stage: :failed} = plan), do: plan
2✔
363

364
  def prepare_for_logging(%FinalizationPlan{stage: :aborts_notified} = plan) do
365
    case build_transactions_for_logs(plan, plan.logs_by_id) do
12✔
366
      {:ok, transactions_by_log} ->
367
        %{plan | transactions_by_log: transactions_by_log, stage: :ready_for_logging}
12✔
368

369
      {:error, reason} ->
370
        %{plan | error: reason, stage: :failed}
×
371
    end
372
  end
373

374
  @spec build_transactions_for_logs(FinalizationPlan.t(), %{Log.id() => [Bedrock.range_tag()]}) ::
375
          {:ok, %{Log.id() => Transaction.encoded()}} | {:error, term()}
376
  defp build_transactions_for_logs(plan, logs_by_id) do
377
    initial_mutations_by_log =
12✔
378
      logs_by_id
379
      |> Map.keys()
380
      |> Map.new(&{&1, []})
13✔
381

382
    plan.transactions
12✔
383
    |> Enum.reduce_while(
384
      {:ok, initial_mutations_by_log},
385
      fn {idx, entry}, {:ok, acc} ->
386
        process_transaction_for_logs({idx, entry}, plan, logs_by_id, acc)
15✔
387
      end
388
    )
389
    |> case do
12✔
390
      {:ok, mutations_by_log} ->
391
        result =
12✔
392
          Map.new(mutations_by_log, fn {log_id, mutations_list} ->
393
            # Use the existing encode approach for transaction building
394
            encoded =
13✔
395
              Transaction.encode(%{
396
                mutations: Enum.reverse(mutations_list),
397
                commit_version: plan.commit_version
13✔
398
              })
399

400
            {log_id, encoded}
401
          end)
402

403
        {:ok, result}
404

405
      {:error, reason} ->
×
406
        {:error, reason}
407
    end
408
  end
409

410
  @spec process_transaction_for_logs(
411
          {non_neg_integer(), {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t()}},
412
          FinalizationPlan.t(),
413
          %{Log.id() => [Bedrock.range_tag()]},
414
          %{Log.id() => [term()]}
415
        ) ::
416
          {:cont, {:ok, %{Log.id() => [term()]}}}
417
          | {:halt, {:error, term()}}
418
  defp process_transaction_for_logs({idx, {_idx, _reply_fn, binary, _task}}, plan, logs_by_id, acc) do
419
    if MapSet.member?(plan.replied_indices, idx) do
15✔
420
      # Skip transactions that were already replied to (aborted)
421
      {:cont, {:ok, acc}}
422
    else
423
      process_transaction_mutations(binary, plan.storage_teams, logs_by_id, acc)
11✔
424
    end
425
  end
426

427
  @spec process_transaction_mutations(
428
          binary(),
429
          [StorageTeamDescriptor.t()],
430
          %{Log.id() => [Bedrock.range_tag()]},
431
          %{Log.id() => [term()]}
432
        ) ::
433
          {:cont, {:ok, %{Log.id() => [term()]}}} | {:halt, {:error, term()}}
434
  defp process_transaction_mutations(binary_transaction, storage_teams, logs_by_id, acc) do
435
    case Transaction.mutations(binary_transaction) do
11✔
436
      {:ok, mutations_stream} ->
437
        case process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
11✔
438
          {:ok, updated_acc} ->
11✔
439
            {:cont, {:ok, updated_acc}}
440

441
          {:error, reason} ->
×
442
            {:halt, {:error, reason}}
443
        end
444

NEW
445
      {:error, :section_not_found} ->
×
446
        {:cont, {:ok, acc}}
447

UNCOV
448
      {:error, reason} ->
×
449
        {:halt, {:error, {:mutation_extraction_failed, reason}}}
450
    end
451
  end
452

453
  @spec process_mutations_for_transaction(
454
          Enumerable.t(),
455
          [StorageTeamDescriptor.t()],
456
          %{Log.id() => [Bedrock.range_tag()]},
457
          %{Log.id() => [term()]}
458
        ) ::
459
          {:ok, %{Log.id() => [term()]}} | {:error, term()}
460
  defp process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
461
    Enum.reduce_while(mutations_stream, {:ok, acc}, fn mutation, {:ok, mutations_acc} ->
11✔
462
      distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc)
11✔
463
    end)
464
  end
465

466
  @spec distribute_mutation_to_logs(
467
          term(),
468
          [StorageTeamDescriptor.t()],
469
          %{Log.id() => [Bedrock.range_tag()]},
470
          %{Log.id() => [term()]}
471
        ) ::
472
          {:cont, {:ok, %{Log.id() => [term()]}}}
473
          | {:halt, {:error, term()}}
474
  defp distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc) do
475
    key_or_range = mutation_to_key_or_range(mutation)
11✔
476

477
    case key_or_range_to_tags(key_or_range, storage_teams) do
11✔
478
      {:ok, []} ->
×
479
        {:halt, {:error, {:storage_team_coverage_error, key_or_range}}}
480

481
      {:ok, affected_tags} ->
482
        affected_logs = find_logs_for_tags(affected_tags, logs_by_id)
11✔
483

484
        updated_acc =
11✔
485
          Enum.reduce(affected_logs, mutations_acc, fn log_id, acc_inner ->
486
            Map.update!(acc_inner, log_id, &[mutation | &1])
12✔
487
          end)
488

489
        {:cont, {:ok, updated_acc}}
490
    end
491
  end
492

493
  @spec mutation_to_key_or_range(
494
          {:set, Bedrock.key(), Bedrock.value()}
495
          | {:clear, Bedrock.key()}
496
          | {:clear_range, Bedrock.key(), Bedrock.key()}
497
        ) ::
498
          Bedrock.key() | {Bedrock.key(), Bedrock.key()}
499
  def mutation_to_key_or_range({:set, key, _value}), do: key
11✔
500
  def mutation_to_key_or_range({:clear, key}), do: key
1✔
501
  def mutation_to_key_or_range({:clear_range, start_key, end_key}), do: {start_key, end_key}
2✔
502

503
  @spec key_or_range_to_tags(Bedrock.key() | Bedrock.key_range(), [StorageTeamDescriptor.t()]) ::
504
          {:ok, [Bedrock.range_tag()]}
505
  def key_or_range_to_tags({start_key, end_key}, storage_teams) do
506
    tags =
5✔
507
      for %{tag: tag, key_range: {team_start, team_end}} <- storage_teams,
14✔
508
          ranges_intersect?(start_key, end_key, team_start, team_end) do
509
        tag
510
      end
511

512
    {:ok, tags}
513
  end
514

515
  def key_or_range_to_tags(key, storage_teams) do
516
    tags =
25✔
517
      for %{tag: tag, key_range: {min_key, max_key_ex}} <- storage_teams,
56✔
518
          key_in_range?(key, min_key, max_key_ex) do
519
        tag
520
      end
521

522
    {:ok, tags}
523
  end
524

525
  @spec find_logs_for_tags([Bedrock.range_tag()], %{Log.id() => [Bedrock.range_tag()]}) :: [Log.id()]
526
  def find_logs_for_tags(tags, logs_by_id) do
527
    tag_set = MapSet.new(tags)
16✔
528

529
    logs_by_id
530
    |> Enum.filter(fn {_log_id, log_tags} ->
531
      Enum.any?(log_tags, &MapSet.member?(tag_set, &1))
33✔
532
    end)
533
    |> Enum.map(fn {log_id, _log_tags} -> log_id end)
16✔
534
  end
535

536
  @spec ranges_intersect?(Bedrock.key(), Bedrock.key() | :end, Bedrock.key(), Bedrock.key() | :end) :: boolean()
537
  defp ranges_intersect?(_start1, :end, _start2, :end), do: true
1✔
538
  defp ranges_intersect?(start1, :end, _start2, end2), do: start1 < end2
2✔
539
  defp ranges_intersect?(_start1, end1, start2, :end), do: end1 > start2
3✔
540
  defp ranges_intersect?(start1, end1, start2, end2), do: start1 < end2 and end1 > start2
8✔
541

542
  # ============================================================================
543
  # Log Distribution
544
  # ============================================================================
545

546
  @spec push_to_logs(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) :: FinalizationPlan.t()
547
  def push_to_logs(%FinalizationPlan{stage: :failed} = plan, _layout, _opts), do: plan
2✔
548

549
  def push_to_logs(%FinalizationPlan{stage: :ready_for_logging} = plan, layout, opts) do
550
    batch_log_push_fn = Keyword.get(opts, :batch_log_push_fn, &push_transaction_to_logs_direct/5)
12✔
551

552
    case batch_log_push_fn.(layout, plan.last_commit_version, plan.transactions_by_log, plan.commit_version, opts) do
12✔
553
      :ok ->
554
        %{plan | stage: :logged}
10✔
555

556
      {:error, reason} ->
557
        %{plan | error: reason, stage: :failed}
2✔
558
    end
559
  end
560

561
  @spec resolve_log_descriptors(%{Log.id() => term()}, %{term() => ServiceDescriptor.t()}) :: %{
562
          Log.id() => ServiceDescriptor.t()
563
        }
564
  def resolve_log_descriptors(log_descriptors, services) do
565
    log_descriptors
566
    |> Map.keys()
567
    |> Enum.map(&{&1, Map.get(services, &1)})
15✔
568
    |> Enum.reject(&is_nil(elem(&1, 1)))
15✔
569
    |> Map.new()
10✔
570
  end
571

572
  @spec try_to_push_transaction_to_log(ServiceDescriptor.t(), binary(), Bedrock.version()) ::
573
          :ok | {:error, :unavailable}
574
  def try_to_push_transaction_to_log(%{kind: :log, status: {:up, log_server}}, transaction, last_commit_version) do
575
    Log.push(log_server, transaction, last_commit_version)
17✔
576
  end
577

578
  def try_to_push_transaction_to_log(_, _, _), do: {:error, :unavailable}
1✔
579

580
  @doc """
581
  Pushes transactions directly to logs and waits for acknowledgement from ALL log servers.
582

583
  This function takes transactions that have already been built per log and pushes them
584
  to the appropriate log servers. Each log receives its pre-built transaction.
585
  All logs must acknowledge to maintain durability guarantees.
586

587
  ## Parameters
588

589
    - `transaction_system_layout`: Contains configuration information about the
590
      transaction system, including available log servers.
591
    - `last_commit_version`: The last known committed version; used to
592
      ensure consistency in log ordering.
593
    - `transactions_by_log`: Map of log_id to transaction for that log.
594
      May be empty transactions if all transactions were aborted.
595
    - `commit_version`: The version assigned by the sequencer for this batch.
596
    - `opts`: Optional configuration for testing and customization.
597

598
  ## Options
599
    - `:async_stream_fn` - Function for parallel processing (default: Task.async_stream/3)
600
    - `:log_push_fn` - Function for pushing to individual logs (default: try_to_push_transaction_to_log/3)
601
    - `:timeout` - Timeout for log push operations (default: 5_000ms)
602

603
  ## Returns
604
    - `:ok` if acknowledgements have been received from ALL log servers.
605
    - `{:error, log_push_error()}` if any log has not successfully acknowledged the
606
       push within the timeout period or other errors occur.
607
  """
608
  @spec push_transaction_to_logs_direct(
609
          TransactionSystemLayout.t(),
610
          last_commit_version :: Bedrock.version(),
611
          %{Log.id() => Transaction.encoded()},
612
          commit_version :: Bedrock.version(),
613
          opts :: [
614
            async_stream_fn: async_stream_fn(),
615
            log_push_fn: log_push_single_fn(),
616
            timeout: non_neg_integer()
617
          ]
618
        ) :: :ok | {:error, log_push_error()}
619
  def push_transaction_to_logs_direct(
620
        transaction_system_layout,
621
        last_commit_version,
622
        transactions_by_log,
623
        _commit_version,
624
        opts \\ []
×
625
      ) do
626
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
10✔
627
    log_push_fn = Keyword.get(opts, :log_push_fn, &try_to_push_transaction_to_log/3)
10✔
628
    timeout = Keyword.get(opts, :timeout, 5_000)
10✔
629

630
    logs_by_id = transaction_system_layout.logs
10✔
631
    required_acknowledgments = map_size(logs_by_id)
10✔
632

633
    logs_by_id
634
    |> resolve_log_descriptors(transaction_system_layout.services)
10✔
635
    |> async_stream_fn.(
636
      fn {log_id, service_descriptor} ->
637
        encoded_transaction = Map.get(transactions_by_log, log_id)
14✔
638
        result = log_push_fn.(service_descriptor, encoded_transaction, last_commit_version)
14✔
639
        {log_id, result}
640
      end,
641
      timeout: timeout
642
    )
643
    |> Enum.reduce_while({0, []}, fn
644
      {:ok, {log_id, {:error, reason}}}, {_count, errors} ->
2✔
645
        {:halt, {:error, [{log_id, reason} | errors]}}
646

647
      {:ok, {_log_id, :ok}}, {count, errors} ->
648
        count = 1 + count
13✔
649

650
        if count == required_acknowledgments do
13✔
651
          {:halt, {:ok, count}}
652
        else
653
          {:cont, {count, errors}}
654
        end
655

656
      {:exit, {log_id, reason}}, {_count, errors} ->
×
657
        {:halt, {:error, [{log_id, reason} | errors]}}
658
    end)
659
    |> case do
10✔
660
      {:ok, ^required_acknowledgments} ->
8✔
661
        :ok
662

663
      {:error, errors} ->
2✔
664
        {:error, {:log_failures, errors}}
665

666
      {count, errors} when count < required_acknowledgments ->
×
667
        {:error, {:insufficient_acknowledgments, count, required_acknowledgments, errors}}
668

669
      _other ->
×
670
        {:error, :log_push_failed}
671
    end
672
  end
673

674
  # ============================================================================
675
  # Sequencer Notification
676
  # ============================================================================
677

678
  @spec notify_sequencer(FinalizationPlan.t(), Sequencer.ref(), keyword()) :: FinalizationPlan.t()
679
  def notify_sequencer(%FinalizationPlan{stage: :failed} = plan, _sequencer, _opts), do: plan
4✔
680

681
  def notify_sequencer(%FinalizationPlan{stage: :logged} = plan, sequencer, opts) do
682
    sequencer_notify_fn = Keyword.get(opts, :sequencer_notify_fn, &Sequencer.report_successful_commit/2)
10✔
683

684
    case sequencer_notify_fn.(sequencer, plan.commit_version) do
10✔
685
      :ok ->
686
        %{plan | stage: :sequencer_notified}
9✔
687

688
      {:error, reason} ->
689
        %{plan | error: reason, stage: :failed}
1✔
690
    end
691
  end
692

693
  # ============================================================================
694
  # Success Notification
695
  # ============================================================================
696

697
  @spec notify_successes(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
698
  def notify_successes(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
5✔
699

700
  def notify_successes(%FinalizationPlan{stage: :sequencer_notified} = plan, opts) do
701
    success_reply_fn = Keyword.get(opts, :success_reply_fn, &send_reply_with_commit_version/2)
9✔
702

703
    {successful_reply_fns, successful_indices} =
9✔
704
      plan.transactions
9✔
705
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
13✔
706
      |> Enum.map(fn {idx, {_tx_idx, reply_fn, _binary, _task}} -> {reply_fn, idx} end)
9✔
707
      |> Enum.unzip()
708

709
    success_reply_fn.(successful_reply_fns, plan.commit_version)
9✔
710

711
    %{plan | replied_indices: MapSet.union(plan.replied_indices, MapSet.new(successful_indices)), stage: :completed}
9✔
712
  end
713

714
  @spec send_reply_with_commit_version([Batch.reply_fn()], Bedrock.version()) :: :ok
715
  def send_reply_with_commit_version(oks, commit_version), do: Enum.each(oks, & &1.({:ok, commit_version}))
9✔
716

717
  # ============================================================================
718
  # Result Extraction and Error Handling
719
  # ============================================================================
720

721
  @spec extract_result_or_handle_error(FinalizationPlan.t(), keyword()) ::
722
          {:ok, non_neg_integer(), non_neg_integer()} | {:error, finalization_error()}
723
  def extract_result_or_handle_error(%FinalizationPlan{stage: :completed} = plan, _opts) do
724
    n_aborts = plan.aborted_count
9✔
725
    n_successes = plan.transaction_count - n_aborts
9✔
726

727
    {:ok, n_aborts, n_successes}
9✔
728
  end
729

730
  def extract_result_or_handle_error(%FinalizationPlan{stage: :failed} = plan, opts), do: handle_error(plan, opts)
5✔
731

732
  @spec handle_error(FinalizationPlan.t(), keyword()) :: {:error, finalization_error()}
733
  defp handle_error(%FinalizationPlan{error: error} = plan, opts) when not is_nil(error) do
734
    abort_reply_fn =
5✔
735
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
736

737
    # Notify all transactions that haven't been replied to yet
738
    pending_reply_fns =
5✔
739
      plan.transactions
5✔
740
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
4✔
741
      |> Enum.map(fn {_idx, {_tx_idx, reply_fn, _binary, _task}} -> reply_fn end)
4✔
742

743
    abort_reply_fn.(pending_reply_fns)
5✔
744

745
    {:error, plan.error}
5✔
746
  end
747
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc