• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / ca26ce31cb47417eebd0556e6b6b89e5ee3d09c7-PR-43

07 Sep 2025 09:21PM UTC coverage: 63.935% (+0.3%) from 63.674%
ca26ce31cb47417eebd0556e6b6b89e5ee3d09c7-PR-43

Pull #43

github

jallum
Add periodic tree sweeping to Resolver for memory management

Implement automatic cleanup of old transaction versions in resolver interval trees:

- Add configurable sweep_interval_ms (1s default) and version_retention_ms (6s default)
- Extend State and Server to track sweep timing and configuration
- Sweep triggers when timeout is :infinity or sweep interval elapsed
- Use optimized bulk tree filtering with single rebalance for performance
- Update tests to handle new resolver initialization signature
- Add property tests validating filter correctness
Pull Request #43: Add conflict sharding with async resolver assignment

105 of 138 new or added lines in 12 files covered. (76.09%)

6 existing lines in 3 files now uncovered.

3292 of 5149 relevant lines covered (63.93%)

1277.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/lib/bedrock/data_plane/commit_proxy/finalization.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Finalization do
2
  @moduledoc """
3
  Transaction finalization pipeline that handles conflict resolution and log persistence.
4

5
  ## Version Chain Integrity
6

7
  CRITICAL: This module maintains the Lamport clock version chain established by the sequencer.
8
  The sequencer provides both `last_commit_version` and `commit_version` as a proper chain link:
9

10
  - `last_commit_version`: The actual last committed version from the sequencer
11
  - `commit_version`: The new version assigned to this batch
12

13
  Always use the exact version values provided by the sequencer through the batch to maintain
14
  proper MVCC conflict detection and transaction ordering. Version gaps can exist due to failed
15
  transactions, recovery scenarios, or system restarts.
16
  """
17

18
  import Bitwise, only: [<<<: 2]
19

20
  alias Bedrock.ControlPlane.Config.ServiceDescriptor
21
  alias Bedrock.ControlPlane.Config.StorageTeamDescriptor
22
  alias Bedrock.ControlPlane.Config.TransactionSystemLayout
23
  alias Bedrock.DataPlane.CommitProxy.Batch
24
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
25
  alias Bedrock.DataPlane.CommitProxy.Tracing
26
  alias Bedrock.DataPlane.Log
27
  alias Bedrock.DataPlane.Resolver
28
  alias Bedrock.DataPlane.Sequencer
29
  alias Bedrock.DataPlane.Transaction
30
  alias Bedrock.KeyRange
31

32
  @type resolver_fn() :: (Resolver.ref(),
33
                          Bedrock.epoch(),
34
                          Bedrock.version(),
35
                          Bedrock.version(),
36
                          [Transaction.encoded()],
37
                          keyword() ->
38
                            {:ok, [non_neg_integer()]}
39
                            | {:error, term()}
40
                            | {:failure, :timeout, Resolver.ref()}
41
                            | {:failure, :unavailable, Resolver.ref()})
42

43
  @type log_push_batch_fn() :: (TransactionSystemLayout.t(),
44
                                last_commit_version :: Bedrock.version(),
45
                                transactions_by_tag :: %{
46
                                  Bedrock.range_tag() => Transaction.encoded()
47
                                },
48
                                commit_version :: Bedrock.version(),
49
                                opts :: [
50
                                  timeout: Bedrock.timeout_in_ms(),
51
                                  async_stream_fn: async_stream_fn()
52
                                ] ->
53
                                  :ok | {:error, log_push_error()})
54

55
  @type log_push_single_fn() :: (ServiceDescriptor.t(), binary(), Bedrock.version() ->
56
                                   :ok | {:error, :unavailable})
57

58
  @type async_stream_fn() :: (enumerable :: Enumerable.t(), fun :: (term() -> term()), opts :: keyword() ->
59
                                Enumerable.t())
60

61
  @type abort_reply_fn() :: ([Batch.reply_fn()] -> :ok)
62

63
  @type success_reply_fn() :: ([Batch.reply_fn()], Bedrock.version() -> :ok)
64

65
  @type timeout_fn() :: (non_neg_integer() -> non_neg_integer())
66

67
  @type sequencer_notify_fn() :: (Sequencer.ref(), Bedrock.version() -> :ok | {:error, term()})
68

69
  @type resolution_error() ::
70
          :timeout
71
          | :unavailable
72
          | {:resolver_unavailable, term()}
73

74
  @type storage_coverage_error() ::
75
          {:storage_team_coverage_error, binary()}
76

77
  @type log_push_error() ::
78
          {:log_failures, [{Log.id(), term()}]}
79
          | {:insufficient_acknowledgments, non_neg_integer(), non_neg_integer(), [{Log.id(), term()}]}
80
          | :log_push_failed
81

82
  @type finalization_error() ::
83
          resolution_error()
84
          | storage_coverage_error()
85
          | log_push_error()
86

87
  # ============================================================================
88
  # Data Structures
89
  # ============================================================================
90

91
  defmodule FinalizationPlan do
92
    @moduledoc """
93
    Pipeline state for transaction finalization using unified transaction storage
94
    for maximum efficiency and clarity.
95
    """
96

97
    @enforce_keys [
98
      :transactions,
99
      :transaction_count,
100
      :commit_version,
101
      :last_commit_version,
102
      :storage_teams,
103
      :logs_by_id
104
    ]
105
    defstruct [
106
      :transactions,
107
      :transaction_count,
108
      :commit_version,
109
      :last_commit_version,
110
      :storage_teams,
111
      :logs_by_id,
112
      transactions_by_log: %{},
113
      replied_indices: MapSet.new(),
114
      aborted_count: 0,
115
      stage: :initialized,
116
      error: nil
117
    ]
118

119
    @type t :: %__MODULE__{
120
            transactions: %{
121
              non_neg_integer() => {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t()}
122
            },
123
            transaction_count: non_neg_integer(),
124
            commit_version: Bedrock.version(),
125
            last_commit_version: Bedrock.version(),
126
            storage_teams: [StorageTeamDescriptor.t()],
127
            logs_by_id: %{Log.id() => [Bedrock.range_tag()]},
128
            transactions_by_log: %{Log.id() => Transaction.encoded()},
129
            replied_indices: MapSet.t(non_neg_integer()),
130
            aborted_count: non_neg_integer(),
131
            stage: atom(),
132
            error: term() | nil
133
          }
134
  end
135

136
  # ============================================================================
137
  # Main Pipeline
138
  # ============================================================================
139

140
  @doc """
141
  Executes the complete transaction finalization pipeline for a batch of transactions.
142

143
  This function processes a batch through a multi-stage pipeline: conflict resolution,
144
  abort notification, log preparation, log persistence, sequencer notification, and
145
  success notification. The pipeline maintains transactional consistency by ensuring
146
  all operations complete successfully or all pending clients are notified of failure.
147

148
  ## Pipeline Stages
149

150
  1. **Conflict Resolution**: Calls resolvers to determine which transactions must be aborted
151
  2. **Abort Notification**: Immediately notifies clients of aborted transactions
152
  3. **Log Preparation**: Distributes successful transaction mutations to appropriate logs
153
  4. **Log Persistence**: Pushes transactions to ALL log servers and waits for acknowledgment
154
  5. **Sequencer Notification**: Reports successful commit version to the sequencer
155
  6. **Success Notification**: Notifies clients of successful transactions with commit version
156

157
  ## Parameters
158

159
    - `batch`: Transaction batch with commit version details from the sequencer
160
    - `transaction_system_layout`: System configuration including resolvers and log servers
161
    - `opts`: Optional functions for testing and configuration overrides
162

163
  ## Returns
164

165
    - `{:ok, n_aborts, n_successes}` - Pipeline completed successfully with counts
166
    - `{:error, finalization_error()}` - Pipeline failed; all pending clients notified of failure
167

168
  ## Error Handling
169

170
  On any pipeline failure, all transactions that haven't been replied to are automatically
171
  notified with abort responses before returning the error.
172
  """
173
  @spec finalize_batch(
174
          Batch.t(),
175
          TransactionSystemLayout.t(),
176
          opts :: [
177
            epoch: Bedrock.epoch(),
178
            precomputed: LayoutOptimization.precomputed_layout() | nil,
179
            resolver_fn: resolver_fn(),
180
            batch_log_push_fn: log_push_batch_fn(),
181
            abort_reply_fn: abort_reply_fn(),
182
            success_reply_fn: success_reply_fn(),
183
            async_stream_fn: async_stream_fn(),
184
            log_push_fn: log_push_single_fn(),
185
            sequencer_notify_fn: sequencer_notify_fn(),
186
            timeout: non_neg_integer()
187
          ]
188
        ) ::
189
          {:ok, n_aborts :: non_neg_integer(), n_oks :: non_neg_integer()}
190
          | {:error, finalization_error()}
191
  def finalize_batch(batch, transaction_system_layout, opts \\ []) do
192
    batch
193
    |> create_finalization_plan(transaction_system_layout)
194
    |> resolve_conflicts(transaction_system_layout, opts)
195
    |> prepare_for_logging()
196
    |> push_to_logs(transaction_system_layout, opts)
197
    |> notify_sequencer(transaction_system_layout.sequencer, opts)
28✔
198
    |> notify_successes(opts)
199
    |> extract_result_or_handle_error(opts)
28✔
200
  end
201

202
  # ============================================================================
203
  # Pipeline Initialization
204
  # ============================================================================
205

206
  @spec create_finalization_plan(Batch.t(), TransactionSystemLayout.t()) :: FinalizationPlan.t()
207
  def create_finalization_plan(batch, transaction_system_layout) do
208
    %FinalizationPlan{
42✔
209
      transactions: Map.new(batch.buffer, &{elem(&1, 0), &1}),
42✔
210
      transaction_count: Batch.transaction_count(batch),
211
      commit_version: batch.commit_version,
42✔
212
      last_commit_version: batch.last_commit_version,
42✔
213
      storage_teams: transaction_system_layout.storage_teams,
42✔
214
      logs_by_id: transaction_system_layout.logs,
42✔
215
      stage: :ready_for_resolution
216
    }
217
  end
218

219
  # ============================================================================
220
  # Conflict Resolution
221
  # ============================================================================
222

223
  @spec resolve_conflicts(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
224
          FinalizationPlan.t()
225
  def resolve_conflicts(%FinalizationPlan{stage: :ready_for_resolution} = plan, layout, opts) do
226
    epoch = Keyword.get(opts, :epoch) || raise "Missing epoch in finalization opts"
28✔
227

228
    resolver_transaction_map =
28✔
229
      if plan.transaction_count == 0 do
28✔
230
        Map.new(layout.resolvers, fn {_key, ref} -> {ref, []} end)
6✔
231
      else
232
        maps =
22✔
233
          for idx <- 0..(plan.transaction_count - 1) do
22✔
234
            {_idx, _reply_fn, _transaction, task} = Map.fetch!(plan.transactions, idx)
34✔
235
            Task.await(task, 5000)
34✔
236
          end
237

238
        Map.new(layout.resolvers, fn {_key, ref} ->
22✔
239
          transactions = Enum.map(maps, &Map.fetch!(&1, ref))
22✔
240
          {ref, transactions}
241
        end)
242
      end
243

244
    case call_all_resolvers_with_map(
28✔
245
           resolver_transaction_map,
246
           epoch,
247
           plan.last_commit_version,
28✔
248
           plan.commit_version,
28✔
249
           layout.resolvers,
28✔
250
           opts
251
         ) do
252
      {:ok, aborted_set} ->
253
        split_and_notify_aborts_with_set(%{plan | stage: :conflicts_resolved}, aborted_set, opts)
24✔
254

255
      {:error, reason} ->
256
        %{plan | error: reason, stage: :failed}
4✔
257
    end
258
  end
259

260
  @spec call_all_resolvers_with_map(
261
          %{Resolver.ref() => [Transaction.encoded()]},
262
          Bedrock.epoch(),
263
          Bedrock.version(),
264
          Bedrock.version(),
265
          [{start_key :: Bedrock.key(), Resolver.ref()}],
266
          keyword()
267
        ) :: {:ok, MapSet.t(non_neg_integer())} | {:error, term()}
268
  defp call_all_resolvers_with_map(resolver_transaction_map, epoch, last_version, commit_version, resolvers, opts) do
269
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
28✔
270
    timeout = Keyword.get(opts, :timeout, 5_000)
28✔
271

272
    resolvers
273
    |> async_stream_fn.(
274
      fn {_start_key, ref} ->
275
        # Every resolver must have transactions after task processing
276
        filtered_transactions = Map.fetch!(resolver_transaction_map, ref)
24✔
277
        call_resolver_with_retry(ref, epoch, last_version, commit_version, filtered_transactions, opts)
24✔
278
      end,
279
      timeout: timeout
280
    )
281
    |> Enum.reduce_while({:ok, MapSet.new()}, fn
28✔
282
      {:ok, {:ok, aborted}}, {:ok, acc} ->
20✔
283
        {:cont, {:ok, Enum.into(aborted, acc)}}
284

285
      {:ok, {:error, reason}}, _ ->
4✔
286
        {:halt, {:error, reason}}
287

NEW
288
      {:exit, reason}, _ ->
×
289
        {:halt, {:error, {:resolver_exit, reason}}}
290
    end)
291
  end
292

293
  @spec call_resolver_with_retry(
294
          Resolver.ref(),
295
          Bedrock.epoch(),
296
          Bedrock.version(),
297
          Bedrock.version(),
298
          [Transaction.encoded()],
299
          keyword(),
300
          non_neg_integer()
301
        ) :: {:ok, [non_neg_integer()]} | {:error, term()}
302
  defp call_resolver_with_retry(
303
         ref,
304
         epoch,
305
         last_version,
306
         commit_version,
307
         filtered_transactions,
308
         opts,
309
         attempts_used \\ 0
24✔
310
       ) do
311
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
32✔
312
    resolver_fn = Keyword.get(opts, :resolver_fn, &Resolver.resolve_transactions/6)
32✔
313
    max_attempts = Keyword.get(opts, :max_attempts, 3)
32✔
314

315
    timeout_in_ms = timeout_fn.(attempts_used)
32✔
316

317
    case resolver_fn.(ref, epoch, last_version, commit_version, filtered_transactions, timeout: timeout_in_ms) do
32✔
318
      {:ok, _} = success ->
319
        success
20✔
320

321
      {:error, reason} when reason in [:timeout, :unavailable] and attempts_used < max_attempts - 1 ->
322
        Tracing.emit_resolver_retry(max_attempts - attempts_used - 2, attempts_used + 1, reason)
4✔
323

324
        call_resolver_with_retry(
4✔
325
          ref,
326
          epoch,
327
          last_version,
328
          commit_version,
329
          filtered_transactions,
330
          opts,
331
          attempts_used + 1
332
        )
333

334
      {:failure, reason, _ref} when reason in [:timeout, :unavailable] and attempts_used < max_attempts - 1 ->
335
        Tracing.emit_resolver_retry(max_attempts - attempts_used - 2, attempts_used + 1, reason)
4✔
336

337
        call_resolver_with_retry(
4✔
338
          ref,
339
          epoch,
340
          last_version,
341
          commit_version,
342
          filtered_transactions,
343
          opts,
344
          attempts_used + 1
345
        )
346

347
      {:error, reason} when reason in [:timeout, :unavailable] ->
348
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
2✔
349
        {:error, {:resolver_unavailable, reason}}
350

351
      {:failure, reason, _ref} when reason in [:timeout, :unavailable] ->
352
        Tracing.emit_resolver_max_retries_exceeded(attempts_used + 1, reason)
2✔
353
        {:error, {:resolver_unavailable, reason}}
354

UNCOV
355
      {:error, reason} ->
×
356
        {:error, reason}
357
    end
358
  end
359

360
  @spec default_timeout_fn(non_neg_integer()) :: non_neg_integer()
361
  def default_timeout_fn(attempts_used), do: 500 * (1 <<< attempts_used)
32✔
362

363
  @spec split_and_notify_aborts_with_set(FinalizationPlan.t(), MapSet.t(non_neg_integer()), keyword()) ::
364
          FinalizationPlan.t()
365
  defp split_and_notify_aborts_with_set(%FinalizationPlan{stage: :conflicts_resolved} = plan, aborted_set, opts) do
366
    abort_reply_fn =
24✔
367
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
368

369
    # Reply to aborted transactions
370
    aborted_set
371
    |> Enum.map(fn idx ->
372
      {_idx, reply_fn, _binary, _task} = Map.fetch!(plan.transactions, idx)
8✔
373
      reply_fn
8✔
374
    end)
375
    |> abort_reply_fn.()
24✔
376

377
    # Track that we've replied to these transactions and count them as aborted
378
    %{plan | replied_indices: aborted_set, aborted_count: MapSet.size(aborted_set), stage: :aborts_notified}
24✔
379
  end
380

381
  @spec reply_to_all_clients_with_aborted_transactions([Batch.reply_fn()]) :: :ok
382
  def reply_to_all_clients_with_aborted_transactions([]), do: :ok
20✔
383
  def reply_to_all_clients_with_aborted_transactions(aborts), do: Enum.each(aborts, & &1.({:error, :aborted}))
12✔
384

385
  # ============================================================================
386
  # Log Preparation
387
  # ============================================================================
388

389
  @spec prepare_for_logging(FinalizationPlan.t()) :: FinalizationPlan.t()
390
  def prepare_for_logging(%FinalizationPlan{stage: :failed} = plan), do: plan
4✔
391

392
  def prepare_for_logging(%FinalizationPlan{stage: :aborts_notified} = plan) do
393
    case build_transactions_for_logs(plan, plan.logs_by_id) do
24✔
394
      {:ok, transactions_by_log} ->
395
        %{plan | transactions_by_log: transactions_by_log, stage: :ready_for_logging}
24✔
396

397
      {:error, reason} ->
398
        %{plan | error: reason, stage: :failed}
×
399
    end
400
  end
401

402
  @spec build_transactions_for_logs(FinalizationPlan.t(), %{Log.id() => [Bedrock.range_tag()]}) ::
403
          {:ok, %{Log.id() => Transaction.encoded()}} | {:error, term()}
404
  defp build_transactions_for_logs(plan, logs_by_id) do
405
    initial_mutations_by_log =
24✔
406
      logs_by_id
407
      |> Map.keys()
408
      |> Map.new(&{&1, []})
26✔
409

410
    plan.transactions
24✔
411
    |> Enum.reduce_while(
412
      {:ok, initial_mutations_by_log},
413
      fn {idx, entry}, {:ok, acc} ->
414
        process_transaction_for_logs({idx, entry}, plan, logs_by_id, acc)
30✔
415
      end
416
    )
417
    |> case do
24✔
418
      {:ok, mutations_by_log} ->
419
        result =
24✔
420
          Map.new(mutations_by_log, fn {log_id, mutations_list} ->
421
            # Use the existing encode approach for transaction building
422
            encoded =
26✔
423
              Transaction.encode(%{
424
                mutations: Enum.reverse(mutations_list),
425
                commit_version: plan.commit_version
26✔
426
              })
427

428
            {log_id, encoded}
429
          end)
430

431
        {:ok, result}
432

433
      {:error, reason} ->
×
434
        {:error, reason}
435
    end
436
  end
437

438
  @spec process_transaction_for_logs(
439
          {non_neg_integer(), {non_neg_integer(), Batch.reply_fn(), Transaction.encoded(), Task.t()}},
440
          FinalizationPlan.t(),
441
          %{Log.id() => [Bedrock.range_tag()]},
442
          %{Log.id() => [term()]}
443
        ) ::
444
          {:cont, {:ok, %{Log.id() => [term()]}}}
445
          | {:halt, {:error, term()}}
446
  defp process_transaction_for_logs({idx, {_idx, _reply_fn, binary, _task}}, plan, logs_by_id, acc) do
447
    if MapSet.member?(plan.replied_indices, idx) do
30✔
448
      # Skip transactions that were already replied to (aborted)
449
      {:cont, {:ok, acc}}
450
    else
451
      process_transaction_mutations(binary, plan.storage_teams, logs_by_id, acc)
22✔
452
    end
453
  end
454

455
  @spec process_transaction_mutations(
456
          binary(),
457
          [StorageTeamDescriptor.t()],
458
          %{Log.id() => [Bedrock.range_tag()]},
459
          %{Log.id() => [term()]}
460
        ) ::
461
          {:cont, {:ok, %{Log.id() => [term()]}}} | {:halt, {:error, term()}}
462
  defp process_transaction_mutations(binary_transaction, storage_teams, logs_by_id, acc) do
463
    case Transaction.mutations(binary_transaction) do
22✔
464
      {:ok, mutations_stream} ->
465
        case process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
22✔
466
          {:ok, updated_acc} ->
22✔
467
            {:cont, {:ok, updated_acc}}
468

469
          {:error, reason} ->
×
470
            {:halt, {:error, reason}}
471
        end
472

NEW
473
      {:error, :section_not_found} ->
×
474
        {:cont, {:ok, acc}}
475

UNCOV
476
      {:error, reason} ->
×
477
        {:halt, {:error, {:mutation_extraction_failed, reason}}}
478
    end
479
  end
480

481
  @spec process_mutations_for_transaction(
482
          Enumerable.t(),
483
          [StorageTeamDescriptor.t()],
484
          %{Log.id() => [Bedrock.range_tag()]},
485
          %{Log.id() => [term()]}
486
        ) ::
487
          {:ok, %{Log.id() => [term()]}} | {:error, term()}
488
  defp process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
489
    Enum.reduce_while(mutations_stream, {:ok, acc}, fn mutation, {:ok, mutations_acc} ->
22✔
490
      distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc)
22✔
491
    end)
492
  end
493

494
  @spec distribute_mutation_to_logs(
495
          term(),
496
          [StorageTeamDescriptor.t()],
497
          %{Log.id() => [Bedrock.range_tag()]},
498
          %{Log.id() => [term()]}
499
        ) ::
500
          {:cont, {:ok, %{Log.id() => [term()]}}}
501
          | {:halt, {:error, term()}}
502
  defp distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc) do
503
    key_or_range = mutation_to_key_or_range(mutation)
22✔
504

505
    case key_or_range_to_tags(key_or_range, storage_teams) do
22✔
506
      {:ok, []} ->
×
507
        {:halt, {:error, {:storage_team_coverage_error, key_or_range}}}
508

509
      {:ok, affected_tags} ->
510
        affected_logs = find_logs_for_tags(affected_tags, logs_by_id)
22✔
511

512
        updated_acc =
22✔
513
          Enum.reduce(affected_logs, mutations_acc, fn log_id, acc_inner ->
514
            Map.update!(acc_inner, log_id, &[mutation | &1])
24✔
515
          end)
516

517
        {:cont, {:ok, updated_acc}}
518
    end
519
  end
520

521
  @spec mutation_to_key_or_range(
522
          {:set, Bedrock.key(), Bedrock.value()}
523
          | {:clear, Bedrock.key()}
524
          | {:clear_range, Bedrock.key(), Bedrock.key()}
525
        ) ::
526
          Bedrock.key() | {Bedrock.key(), Bedrock.key()}
527
  def mutation_to_key_or_range({:set, key, _value}), do: key
22✔
528
  def mutation_to_key_or_range({:clear, key}), do: key
2✔
529
  def mutation_to_key_or_range({:clear_range, start_key, end_key}), do: {start_key, end_key}
4✔
530

531
  @spec key_or_range_to_tags(Bedrock.key() | Bedrock.key_range(), [StorageTeamDescriptor.t()]) ::
532
          {:ok, [Bedrock.range_tag()]}
533
  def key_or_range_to_tags({start_key, end_key}, storage_teams) do
534
    tags =
10✔
535
      for %{tag: tag, key_range: {team_start, team_end}} <- storage_teams,
28✔
536
          ranges_intersect?(start_key, end_key, team_start, team_end) do
537
        tag
538
      end
539

540
    {:ok, tags}
541
  end
542

543
  def key_or_range_to_tags(key, storage_teams) do
544
    tags =
50✔
545
      for %{tag: tag, key_range: {min_key, max_key_ex}} <- storage_teams,
112✔
546
          KeyRange.contains?({min_key, max_key_ex}, key) do
547
        tag
548
      end
549

550
    {:ok, tags}
551
  end
552

553
  @spec find_logs_for_tags([Bedrock.range_tag()], %{Log.id() => [Bedrock.range_tag()]}) :: [Log.id()]
554
  def find_logs_for_tags(tags, logs_by_id) do
555
    tag_set = MapSet.new(tags)
32✔
556

557
    logs_by_id
558
    |> Enum.filter(fn {_log_id, log_tags} ->
559
      Enum.any?(log_tags, &MapSet.member?(tag_set, &1))
66✔
560
    end)
561
    |> Enum.map(fn {log_id, _log_tags} -> log_id end)
32✔
562
  end
563

564
  @spec ranges_intersect?(Bedrock.key(), Bedrock.key() | :end, Bedrock.key(), Bedrock.key() | :end) :: boolean()
565
  defp ranges_intersect?(_start1, :end, _start2, :end), do: true
2✔
566
  defp ranges_intersect?(start1, :end, _start2, end2), do: start1 < end2
4✔
567
  defp ranges_intersect?(_start1, end1, start2, :end), do: end1 > start2
6✔
568
  defp ranges_intersect?(start1, end1, start2, end2), do: start1 < end2 and end1 > start2
16✔
569

570
  # ============================================================================
571
  # Log Distribution
572
  # ============================================================================
573

574
  @spec push_to_logs(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) :: FinalizationPlan.t()
575
  def push_to_logs(%FinalizationPlan{stage: :failed} = plan, _layout, _opts), do: plan
4✔
576

577
  def push_to_logs(%FinalizationPlan{stage: :ready_for_logging} = plan, layout, opts) do
578
    batch_log_push_fn = Keyword.get(opts, :batch_log_push_fn, &push_transaction_to_logs_direct/5)
24✔
579

580
    case batch_log_push_fn.(layout, plan.last_commit_version, plan.transactions_by_log, plan.commit_version, opts) do
24✔
581
      :ok ->
582
        %{plan | stage: :logged}
20✔
583

584
      {:error, reason} ->
585
        %{plan | error: reason, stage: :failed}
4✔
586
    end
587
  end
588

589
  @spec resolve_log_descriptors(%{Log.id() => term()}, %{term() => ServiceDescriptor.t()}) :: %{
590
          Log.id() => ServiceDescriptor.t()
591
        }
592
  def resolve_log_descriptors(log_descriptors, services) do
593
    log_descriptors
594
    |> Map.keys()
595
    |> Enum.map(&{&1, Map.get(services, &1)})
30✔
596
    |> Enum.reject(&is_nil(elem(&1, 1)))
30✔
597
    |> Map.new()
20✔
598
  end
599

600
  @spec try_to_push_transaction_to_log(ServiceDescriptor.t(), binary(), Bedrock.version()) ::
601
          :ok | {:error, :unavailable}
602
  def try_to_push_transaction_to_log(%{kind: :log, status: {:up, log_server}}, transaction, last_commit_version) do
603
    Log.push(log_server, transaction, last_commit_version)
34✔
604
  end
605

606
  def try_to_push_transaction_to_log(_, _, _), do: {:error, :unavailable}
2✔
607

608
  @doc """
609
  Pushes transactions directly to logs and waits for acknowledgement from ALL log servers.
610

611
  This function takes transactions that have already been built per log and pushes them
612
  to the appropriate log servers. Each log receives its pre-built transaction.
613
  All logs must acknowledge to maintain durability guarantees.
614

615
  ## Parameters
616

617
    - `transaction_system_layout`: Contains configuration information about the
618
      transaction system, including available log servers.
619
    - `last_commit_version`: The last known committed version; used to
620
      ensure consistency in log ordering.
621
    - `transactions_by_log`: Map of log_id to transaction for that log.
622
      May be empty transactions if all transactions were aborted.
623
    - `commit_version`: The version assigned by the sequencer for this batch.
624
    - `opts`: Optional configuration for testing and customization.
625

626
  ## Options
627
    - `:async_stream_fn` - Function for parallel processing (default: Task.async_stream/3)
628
    - `:log_push_fn` - Function for pushing to individual logs (default: try_to_push_transaction_to_log/3)
629
    - `:timeout` - Timeout for log push operations (default: 5_000ms)
630

631
  ## Returns
632
    - `:ok` if acknowledgements have been received from ALL log servers.
633
    - `{:error, log_push_error()}` if any log has not successfully acknowledged the
634
       push within the timeout period or other errors occur.
635
  """
636
  @spec push_transaction_to_logs_direct(
637
          TransactionSystemLayout.t(),
638
          last_commit_version :: Bedrock.version(),
639
          %{Log.id() => Transaction.encoded()},
640
          commit_version :: Bedrock.version(),
641
          opts :: [
642
            async_stream_fn: async_stream_fn(),
643
            log_push_fn: log_push_single_fn(),
644
            timeout: non_neg_integer()
645
          ]
646
        ) :: :ok | {:error, log_push_error()}
647
  def push_transaction_to_logs_direct(
648
        transaction_system_layout,
649
        last_commit_version,
650
        transactions_by_log,
651
        _commit_version,
652
        opts \\ []
×
653
      ) do
654
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
20✔
655
    log_push_fn = Keyword.get(opts, :log_push_fn, &try_to_push_transaction_to_log/3)
20✔
656
    timeout = Keyword.get(opts, :timeout, 5_000)
20✔
657

658
    logs_by_id = transaction_system_layout.logs
20✔
659
    required_acknowledgments = map_size(logs_by_id)
20✔
660

661
    logs_by_id
662
    |> resolve_log_descriptors(transaction_system_layout.services)
20✔
663
    |> async_stream_fn.(
664
      fn {log_id, service_descriptor} ->
665
        encoded_transaction = Map.get(transactions_by_log, log_id)
28✔
666
        result = log_push_fn.(service_descriptor, encoded_transaction, last_commit_version)
28✔
667
        {log_id, result}
668
      end,
669
      timeout: timeout
670
    )
671
    |> Enum.reduce_while({0, []}, fn
672
      {:ok, {log_id, {:error, reason}}}, {_count, errors} ->
4✔
673
        {:halt, {:error, [{log_id, reason} | errors]}}
674

675
      {:ok, {_log_id, :ok}}, {count, errors} ->
676
        count = 1 + count
26✔
677

678
        if count == required_acknowledgments do
26✔
679
          {:halt, {:ok, count}}
680
        else
681
          {:cont, {count, errors}}
682
        end
683

684
      {:exit, {log_id, reason}}, {_count, errors} ->
×
685
        {:halt, {:error, [{log_id, reason} | errors]}}
686
    end)
687
    |> case do
20✔
688
      {:ok, ^required_acknowledgments} ->
16✔
689
        :ok
690

691
      {:error, errors} ->
4✔
692
        {:error, {:log_failures, errors}}
693

694
      {count, errors} when count < required_acknowledgments ->
×
695
        {:error, {:insufficient_acknowledgments, count, required_acknowledgments, errors}}
696

697
      _other ->
×
698
        {:error, :log_push_failed}
699
    end
700
  end
701

702
  # ============================================================================
703
  # Sequencer Notification
704
  # ============================================================================
705

706
  @spec notify_sequencer(FinalizationPlan.t(), Sequencer.ref(), keyword()) :: FinalizationPlan.t()
707
  def notify_sequencer(%FinalizationPlan{stage: :failed} = plan, _sequencer, _opts), do: plan
8✔
708

709
  def notify_sequencer(%FinalizationPlan{stage: :logged} = plan, sequencer, opts) do
710
    sequencer_notify_fn = Keyword.get(opts, :sequencer_notify_fn, &Sequencer.report_successful_commit/2)
20✔
711

712
    case sequencer_notify_fn.(sequencer, plan.commit_version) do
20✔
713
      :ok ->
714
        %{plan | stage: :sequencer_notified}
18✔
715

716
      {:error, reason} ->
717
        %{plan | error: reason, stage: :failed}
2✔
718
    end
719
  end
720

721
  # ============================================================================
722
  # Success Notification
723
  # ============================================================================
724

725
  @spec notify_successes(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
726
  def notify_successes(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
10✔
727

728
  def notify_successes(%FinalizationPlan{stage: :sequencer_notified} = plan, opts) do
729
    success_reply_fn = Keyword.get(opts, :success_reply_fn, &send_reply_with_commit_version/2)
18✔
730

731
    {successful_reply_fns, successful_indices} =
18✔
732
      plan.transactions
18✔
733
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
26✔
734
      |> Enum.map(fn {idx, {_tx_idx, reply_fn, _binary, _task}} -> {reply_fn, idx} end)
18✔
735
      |> Enum.unzip()
736

737
    success_reply_fn.(successful_reply_fns, plan.commit_version)
18✔
738

739
    %{plan | replied_indices: MapSet.union(plan.replied_indices, MapSet.new(successful_indices)), stage: :completed}
18✔
740
  end
741

742
  @spec send_reply_with_commit_version([Batch.reply_fn()], Bedrock.version()) :: :ok
743
  def send_reply_with_commit_version(oks, commit_version), do: Enum.each(oks, & &1.({:ok, commit_version}))
18✔
744

745
  # ============================================================================
746
  # Result Extraction and Error Handling
747
  # ============================================================================
748

749
  @spec extract_result_or_handle_error(FinalizationPlan.t(), keyword()) ::
750
          {:ok, non_neg_integer(), non_neg_integer()} | {:error, finalization_error()}
751
  def extract_result_or_handle_error(%FinalizationPlan{stage: :completed} = plan, _opts) do
752
    n_aborts = plan.aborted_count
18✔
753
    n_successes = plan.transaction_count - n_aborts
18✔
754

755
    {:ok, n_aborts, n_successes}
18✔
756
  end
757

758
  def extract_result_or_handle_error(%FinalizationPlan{stage: :failed} = plan, opts), do: handle_error(plan, opts)
10✔
759

760
  @spec handle_error(FinalizationPlan.t(), keyword()) :: {:error, finalization_error()}
761
  defp handle_error(%FinalizationPlan{error: error} = plan, opts) when not is_nil(error) do
762
    abort_reply_fn =
10✔
763
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
764

765
    # Notify all transactions that haven't been replied to yet
766
    pending_reply_fns =
10✔
767
      plan.transactions
10✔
768
      |> Enum.reject(fn {idx, _entry} -> MapSet.member?(plan.replied_indices, idx) end)
8✔
769
      |> Enum.map(fn {_idx, {_tx_idx, reply_fn, _binary, _task}} -> reply_fn end)
8✔
770

771
    abort_reply_fn.(pending_reply_fns)
10✔
772

773
    {:error, plan.error}
10✔
774
  end
775
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc