• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 84f2ec5d5a2517957ec3c4741969ffc8414a4994

13 Aug 2025 02:57AM UTC coverage: 58.423%. Remained the same
84f2ec5d5a2517957ec3c4741969ffc8414a4994

push

github

jallum
0.1.2

- Reworked and simplified documentation
- Increased test coverage
- Project housekeeping

1515 of 2481 new or added lines in 105 files covered. (61.06%)

424 existing lines in 63 files now uncovered.

2112 of 3615 relevant lines covered (58.42%)

1870.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.15
/lib/bedrock/data_plane/commit_proxy/finalization.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Finalization do
2
  @moduledoc """
3
  Transaction finalization pipeline that handles conflict resolution and log persistence.
4

5
  ## Version Chain Integrity
6

7
  CRITICAL: This module maintains the Lamport clock version chain established by the sequencer.
8
  The sequencer provides both `last_commit_version` and `commit_version` as a proper chain link:
9

10
  - `last_commit_version`: The actual last committed version from the sequencer
11
  - `commit_version`: The new version assigned to this batch
12

13
  Always use the exact version values provided by the sequencer through the batch to maintain
14
  proper MVCC conflict detection and transaction ordering. Version gaps can exist due to failed
15
  transactions, recovery scenarios, or system restarts.
16
  """
17

18
  alias Bedrock.ControlPlane.Config.ServiceDescriptor
19
  alias Bedrock.ControlPlane.Config.StorageTeamDescriptor
20
  alias Bedrock.ControlPlane.Config.TransactionSystemLayout
21
  alias Bedrock.DataPlane.CommitProxy.Batch
22
  alias Bedrock.DataPlane.Log
23
  alias Bedrock.DataPlane.Log.EncodedTransaction
24
  alias Bedrock.DataPlane.Log.Transaction
25
  alias Bedrock.DataPlane.Resolver
26
  alias Bedrock.DataPlane.Sequencer
27

28
  import Bedrock.DataPlane.CommitProxy.Batch,
29
    only: [transactions_in_order: 1]
30

31
  import Bitwise, only: [<<<: 2]
32

33
  @type resolver_fn() :: (resolvers :: [{start_key :: Bedrock.key(), Resolver.ref()}],
34
                          last_version :: Bedrock.version(),
35
                          commit_version :: Bedrock.version(),
36
                          transaction_summaries :: [Resolver.transaction_summary()],
37
                          resolver_opts :: [timeout: Bedrock.timeout_in_ms()] ->
38
                            {:ok, aborted :: [transaction_index :: non_neg_integer()]}
39
                            | {:error, resolution_error()})
40

41
  @type log_push_batch_fn() :: (TransactionSystemLayout.t(),
42
                                last_commit_version :: Bedrock.version(),
43
                                transactions_by_tag :: %{Bedrock.range_tag() => Transaction.t()},
44
                                commit_version :: Bedrock.version(),
45
                                opts :: [
46
                                  timeout: Bedrock.timeout_in_ms(),
47
                                  async_stream_fn: async_stream_fn()
48
                                ] ->
49
                                  :ok | {:error, log_push_error()})
50

51
  @type log_push_single_fn() :: (ServiceDescriptor.t(),
52
                                 EncodedTransaction.t(),
53
                                 Bedrock.version() ->
54
                                   :ok | {:error, :unavailable})
55

56
  @type async_stream_fn() :: (enumerable :: Enumerable.t(),
57
                              fun :: (term() -> term()),
58
                              opts :: keyword() ->
59
                                Enumerable.t())
60

61
  @type abort_reply_fn() :: ([Batch.reply_fn()] -> :ok)
62

63
  @type success_reply_fn() :: ([Batch.reply_fn()], Bedrock.version() -> :ok)
64

65
  @type timeout_fn() :: (non_neg_integer() -> non_neg_integer())
66

67
  @type resolution_error() ::
68
          :timeout
69
          | :unavailable
70
          | {:resolver_unavailable, term()}
71

72
  @type storage_coverage_error() ::
73
          {:storage_team_coverage_error, binary()}
74

75
  @type log_push_error() ::
76
          {:log_failures, [{Log.id(), term()}]}
77
          | {:insufficient_acknowledgments, non_neg_integer(), non_neg_integer(),
78
             [{Log.id(), term()}]}
79
          | :log_push_failed
80

81
  @type finalization_error() ::
82
          resolution_error()
83
          | storage_coverage_error()
84
          | log_push_error()
85

86
  defmodule FinalizationPlan do
87
    @moduledoc """
88
    Pipeline state for transaction finalization that accumulates information
89
    and tracks reply status to prevent double-replies.
90
    """
91

92
    @enforce_keys [:transactions, :commit_version, :last_commit_version, :storage_teams]
93
    defstruct [
94
      :transactions,
95
      :commit_version,
96
      :last_commit_version,
97
      :storage_teams,
98

99
      # Accumulated pipeline state
100
      resolver_data: [],
101
      aborted_indices: [],
102
      aborted_replies: [],
103
      successful_replies: [],
104
      transactions_by_tag: %{},
105

106
      # Reply tracking for safety
107
      replied_indices: MapSet.new(),
108

109
      # Pipeline stage status
110
      stage: :initialized,
111
      error: nil
112
    ]
113

114
    @type t :: %__MODULE__{
115
            transactions: [{Batch.reply_fn(), Bedrock.transaction()}],
116
            commit_version: Bedrock.version(),
117
            last_commit_version: Bedrock.version(),
118
            storage_teams: [StorageTeamDescriptor.t()],
119
            resolver_data: [Resolver.transaction_summary()],
120
            aborted_indices: [integer()],
121
            aborted_replies: [Batch.reply_fn()],
122
            successful_replies: [Batch.reply_fn()],
123
            transactions_by_tag: %{Bedrock.range_tag() => Transaction.t()},
124
            replied_indices: MapSet.t(),
125
            stage: atom(),
126
            error: term() | nil
127
          }
128
  end
129

130
  # ============================================================================
131
  # MAIN PIPELINE FUNCTION
132
  # ============================================================================
133

134
  @doc """
135
  Finalizes a batch of transactions by resolving conflicts, separating
136
  successful transactions from aborts, and pushing them to the log servers.
137

138
  This function processes a batch of transactions, first ensuring that any
139
  conflicts are resolved. After conflict resolution, it organizes the
140
  transactions into those that will be committed and those that will be aborted.
141

142
  Clients with aborted transactions are notified of the abort immediately.
143
  Successful transactions are pushed to the system's logs, and clients that
144
  submitted the transactions are notified when a majority of the log servers
145
  have acknowledged.
146

147
  ## Parameters
148

149
    - `batch`: A `Batch.t()` struct that contains the transactions to be finalized,
150
      along with the commit version details.
151
    - `transaction_system_layout`: Provides configuration and systemic details,
152
      including the available resolver and log servers.
153

154
  ## Returns
155
    - `:ok` when the batch has been processed, and all clients have been
156
      notified about the status of their transactions.
157
  """
158
  @spec finalize_batch(
159
          Batch.t(),
160
          TransactionSystemLayout.t(),
161
          opts :: [
162
            resolver_fn: resolver_fn(),
163
            batch_log_push_fn: log_push_batch_fn(),
164
            abort_reply_fn: abort_reply_fn(),
165
            success_reply_fn: success_reply_fn(),
166
            async_stream_fn: async_stream_fn(),
167
            log_push_fn: log_push_single_fn(),
168
            timeout: non_neg_integer()
169
          ]
170
        ) ::
171
          {:ok, n_aborts :: non_neg_integer(), n_oks :: non_neg_integer()}
172
          | {:error, finalization_error()}
173
  def finalize_batch(batch, transaction_system_layout, opts \\ []) do
174
    batch
175
    |> create_finalization_plan(transaction_system_layout)
176
    |> prepare_for_resolution()
177
    |> resolve_conflicts(transaction_system_layout, opts)
178
    |> split_and_notify_aborts(opts)
179
    |> prepare_for_logging()
180
    |> push_to_logs(transaction_system_layout, opts)
181
    |> notify_sequencer(transaction_system_layout.sequencer)
20✔
182
    |> notify_successes(opts)
183
    |> extract_result_or_handle_error(opts)
20✔
184
  end
185

186
  # ============================================================================
187
  # STEP 1: CREATE FINALIZATION PLAN
188
  # ============================================================================
189

190
  @spec create_finalization_plan(Batch.t(), TransactionSystemLayout.t()) :: FinalizationPlan.t()
191
  defp create_finalization_plan(batch, transaction_system_layout) do
192
    %FinalizationPlan{
20✔
193
      transactions: transactions_in_order(batch),
194
      commit_version: batch.commit_version,
20✔
195
      last_commit_version: batch.last_commit_version,
20✔
196
      storage_teams: transaction_system_layout.storage_teams,
20✔
197
      stage: :created
198
    }
199
  end
200

201
  # ============================================================================
202
  # STEP 2: PREPARE FOR RESOLUTION
203
  # ============================================================================
204

205
  @spec prepare_for_resolution(FinalizationPlan.t()) :: FinalizationPlan.t()
206
  defp prepare_for_resolution(%FinalizationPlan{stage: :created} = plan) do
207
    resolver_data = transform_transactions_for_resolution(plan.transactions)
20✔
208

209
    %{plan | resolver_data: resolver_data, stage: :ready_for_resolution}
20✔
210
  end
211

212
  @doc """
213
  Transforms the list of transactions for resolution.
214

215
  Converts the transaction data to the format expected by the conflict
216
  resolution logic. For each transaction, it extracts the read version,
217
  the reads, and the keys of the writes, discarding the values of the writes
218
  as they are not needed for resolution.
219
  """
220
  @spec transform_transactions_for_resolution([{Batch.reply_fn(), Bedrock.transaction()}]) :: [
221
          Resolver.transaction_summary()
222
        ]
223
  def transform_transactions_for_resolution(transactions) do
224
    transactions
225
    |> Enum.map(fn
30✔
226
      {_from, {nil, writes}} ->
24✔
227
        {nil, writes |> Map.keys()}
228

229
      {_from, {{read_version, reads}, writes}} ->
4✔
230
        {{read_version, reads |> Enum.uniq()}, writes |> Map.keys()}
231
    end)
232
  end
233

234
  # ============================================================================
235
  # STEP 3: RESOLVE CONFLICTS
236
  # ============================================================================
237

238
  @spec resolve_conflicts(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
239
          FinalizationPlan.t()
240
  defp resolve_conflicts(%FinalizationPlan{stage: :ready_for_resolution} = plan, layout, opts) do
241
    resolver_fn = Keyword.get(opts, :resolver_fn, &resolve_transactions/5)
20✔
242

243
    case resolver_fn.(
20✔
244
           layout.resolvers,
20✔
245
           plan.last_commit_version,
20✔
246
           plan.commit_version,
20✔
247
           plan.resolver_data,
20✔
248
           Keyword.put(opts, :timeout, 1_000)
249
         ) do
250
      {:ok, aborted_indices} ->
251
        %{plan | aborted_indices: aborted_indices, stage: :conflicts_resolved}
16✔
252

253
      {:error, reason} ->
254
        %{plan | error: reason, stage: :failed}
4✔
255
    end
256
  end
257

258
  @spec resolve_transactions(
259
          resolvers :: [{start_key :: Bedrock.key(), Resolver.ref()}],
260
          last_version :: Bedrock.version(),
261
          commit_version :: Bedrock.version(),
262
          [Resolver.transaction_summary()],
263
          opts :: [
264
            timeout: :infinity | non_neg_integer(),
265
            timeout_fn: timeout_fn(),
266
            attempts_remaining: non_neg_integer(),
267
            attempts_used: non_neg_integer()
268
          ]
269
        ) ::
270
          {:ok, aborted :: [index :: integer()]}
271
          | {:error, resolution_error()}
272
  def resolve_transactions(
273
        resolvers,
274
        last_version,
275
        commit_version,
276
        transaction_summaries,
277
        opts
278
      ) do
279
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
34✔
280
    attempts_remaining = Keyword.get(opts, :attempts_remaining, 2)
34✔
281
    attempts_used = Keyword.get(opts, :attempts_used, 0)
34✔
282
    timeout = Keyword.get(opts, :timeout, timeout_fn.(attempts_used))
34✔
283

284
    ranges =
34✔
285
      resolvers
286
      |> Enum.map(&elem(&1, 0))
34✔
287
      |> Enum.concat([:end])
288
      |> Enum.chunk_every(2, 1, :discard)
289

290
    transaction_summaries_by_start_key =
34✔
291
      ranges
292
      |> Enum.map(fn
293
        [start_key, end_key] ->
294
          filtered_summaries =
34✔
295
            filter_transaction_summaries(
296
              transaction_summaries,
297
              filter_fn(start_key, end_key)
298
            )
299

300
          {start_key, filtered_summaries}
301
      end)
302
      |> Enum.into(%{})
34✔
303

304
    result =
34✔
305
      resolvers
306
      |> Enum.map(fn {start_key, ref} ->
307
        Resolver.resolve_transactions(
34✔
308
          ref,
309
          last_version,
310
          commit_version,
311
          Map.get(transaction_summaries_by_start_key, start_key, []),
312
          timeout: timeout
313
        )
314
      end)
315
      |> Enum.reduce({:ok, []}, fn
NEW
316
        {:ok, aborted}, {:ok, acc} ->
×
317
          {:ok, Enum.uniq(acc ++ aborted)}
318

319
        {:error, reason}, _ ->
34✔
320
          {:error, reason}
321
      end)
322

323
    case result do
34✔
324
      {:ok, _} = success ->
NEW
325
        success
×
326

327
      {:error, reason} when attempts_remaining > 0 ->
328
        # Emit telemetry for retry attempt (after this failure, before next retry)
329
        :telemetry.execute(
22✔
330
          [:bedrock, :commit_proxy, :resolver, :retry],
331
          %{attempts_remaining: attempts_remaining - 1, attempts_used: attempts_used + 1},
332
          %{reason: reason}
333
        )
334

335
        # Retry with updated attempt counters
336
        updated_opts =
22✔
337
          opts
338
          |> Keyword.put(:attempts_remaining, attempts_remaining - 1)
339
          |> Keyword.put(:attempts_used, attempts_used + 1)
340

341
        resolve_transactions(
22✔
342
          resolvers,
343
          last_version,
344
          commit_version,
345
          transaction_summaries,
346
          updated_opts
347
        )
348

349
      {:error, reason} ->
350
        # Emit telemetry for final failure
351
        :telemetry.execute(
12✔
352
          [:bedrock, :commit_proxy, :resolver, :max_retries_exceeded],
353
          %{total_attempts: attempts_used + 1},
354
          %{reason: reason}
355
        )
356

357
        # Max retries exceeded, return error to allow commit proxy to trigger recovery
358
        {:error, {:resolver_unavailable, reason}}
359
    end
360
  end
361

362
  @spec default_timeout_fn(non_neg_integer()) :: non_neg_integer()
363
  def default_timeout_fn(attempts_used), do: 500 * (1 <<< attempts_used)
24✔
364

365
  # Helper functions for resolve_conflicts step
366

367
  defp filter_fn(start_key, :end), do: &(&1 >= start_key)
34✔
UNCOV
368
  defp filter_fn(start_key, end_key), do: &(&1 >= start_key and &1 < end_key)
×
369

370
  defp filter_transaction_summaries(transaction_summaries, filter_fn),
371
    do: Enum.map(transaction_summaries, &filter_transaction_summary(&1, filter_fn))
34✔
372

373
  defp filter_transaction_summary({nil, writes}, filter_fn),
34✔
374
    do: {nil, Enum.filter(writes, filter_fn)}
375

UNCOV
376
  defp filter_transaction_summary({{read_version, reads}, writes}, filter_fn),
×
377
    do: {{read_version, Enum.filter(reads, filter_fn)}, Enum.filter(writes, filter_fn)}
378

379
  # ============================================================================
380
  # STEP 4: SPLIT AND NOTIFY ABORTS
381
  # ============================================================================
382

383
  @spec split_and_notify_aborts(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
384
  defp split_and_notify_aborts(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
4✔
385

386
  defp split_and_notify_aborts(%FinalizationPlan{stage: :conflicts_resolved} = plan, opts) do
387
    abort_reply_fn =
16✔
388
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
389

390
    {aborted_replies, successful_replies, aborted_indices_set} =
16✔
391
      split_transactions_by_abort_status(plan)
392

393
    new_aborted_indices = MapSet.difference(aborted_indices_set, plan.replied_indices)
16✔
394

395
    new_aborted_replies =
16✔
396
      plan.transactions
16✔
397
      |> Enum.with_index()
398
      |> Enum.filter(fn {_transaction, idx} -> MapSet.member?(new_aborted_indices, idx) end)
14✔
399
      |> Enum.map(fn {{reply_fn, _transaction}, _idx} -> reply_fn end)
6✔
400

401
    abort_reply_fn.(new_aborted_replies)
16✔
402

403
    updated_replied_indices = MapSet.union(plan.replied_indices, aborted_indices_set)
16✔
404

405
    %{
406
      plan
407
      | aborted_replies: aborted_replies,
16✔
408
        successful_replies: successful_replies,
409
        replied_indices: updated_replied_indices,
410
        stage: :aborts_notified
411
    }
412
  end
413

414
  @spec reply_to_all_clients_with_aborted_transactions([Batch.reply_fn()]) :: :ok
415
  def reply_to_all_clients_with_aborted_transactions([]), do: :ok
12✔
416

417
  def reply_to_all_clients_with_aborted_transactions(aborts),
418
    do: Enum.each(aborts, & &1.({:error, :aborted}))
10✔
419

420
  # Helper functions for split_and_notify_aborts step
421

422
  @spec split_transactions_by_abort_status(FinalizationPlan.t()) ::
423
          {[Batch.reply_fn()], [Batch.reply_fn()], MapSet.t()}
424
  defp split_transactions_by_abort_status(plan) do
425
    aborted_set = MapSet.new(plan.aborted_indices)
16✔
426

427
    {aborted_replies, successful_replies} =
16✔
428
      plan.transactions
16✔
429
      |> Enum.with_index()
430
      |> Enum.reduce({[], []}, fn {{reply_fn, _transaction}, idx}, {aborts_acc, success_acc} ->
431
        if MapSet.member?(aborted_set, idx) do
14✔
432
          {[reply_fn | aborts_acc], success_acc}
433
        else
434
          {aborts_acc, [reply_fn | success_acc]}
435
        end
436
      end)
437

438
    {Enum.reverse(aborted_replies), Enum.reverse(successful_replies), aborted_set}
16✔
439
  end
440

441
  # ============================================================================
442
  # STEP 5: PREPARE FOR LOGGING
443
  # ============================================================================
444

445
  @spec prepare_for_logging(FinalizationPlan.t()) :: FinalizationPlan.t()
446
  defp prepare_for_logging(%FinalizationPlan{stage: :failed} = plan), do: plan
4✔
447

448
  defp prepare_for_logging(%FinalizationPlan{stage: :aborts_notified} = plan) do
449
    case group_successful_transactions_by_tag(plan) do
16✔
450
      {:ok, transactions_by_tag} ->
451
        %{plan | transactions_by_tag: transactions_by_tag, stage: :ready_for_logging}
16✔
452

453
      {:error, reason} ->
NEW
454
        %{plan | error: reason, stage: :failed}
×
455
    end
456
  end
457

458
  # Helper functions for prepare_for_logging step
459

460
  @spec group_successful_transactions_by_tag(FinalizationPlan.t()) ::
461
          {:ok, %{Bedrock.range_tag() => Transaction.t()}} | {:error, term()}
462
  defp group_successful_transactions_by_tag(plan) do
463
    aborted_set = MapSet.new(plan.aborted_indices)
16✔
464

465
    with {:ok, combined_writes_by_tag} <-
16✔
466
           plan.transactions
16✔
467
           |> Enum.with_index()
468
           |> Enum.reduce_while({:ok, %{}}, fn transaction_with_idx, {:ok, acc} ->
469
             process_transaction_for_grouping(
14✔
470
               transaction_with_idx,
471
               aborted_set,
472
               plan.storage_teams,
14✔
473
               acc
474
             )
475
           end) do
476
      result =
16✔
477
        combined_writes_by_tag
478
        |> Enum.map(fn {tag, writes} -> {tag, Transaction.new(plan.commit_version, writes)} end)
8✔
479
        |> Map.new()
480

481
      {:ok, result}
482
    end
483
  end
484

485
  @spec process_transaction_for_grouping(
486
          {{function(), {[term()], %{Bedrock.key() => term()}}}, non_neg_integer()},
487
          MapSet.t(non_neg_integer()),
488
          [StorageTeamDescriptor.t()],
489
          %{Bedrock.range_tag() => %{Bedrock.key() => term()}}
490
        ) ::
491
          {:cont, {:ok, %{Bedrock.range_tag() => %{Bedrock.key() => term()}}}}
492
          | {:halt, {:error, term()}}
493
  defp process_transaction_for_grouping(
494
         {{_reply_fn, {_reads, writes}}, idx},
495
         aborted_set,
496
         storage_teams,
497
         acc
498
       ) do
499
    if MapSet.member?(aborted_set, idx) do
14✔
500
      {:cont, {:ok, acc}}
501
    else
502
      handle_non_aborted_transaction(writes, storage_teams, acc)
8✔
503
    end
504
  end
505

506
  @spec handle_non_aborted_transaction(
507
          %{Bedrock.key() => term()},
508
          [StorageTeamDescriptor.t()],
509
          %{Bedrock.range_tag() => %{Bedrock.key() => term()}}
510
        ) ::
511
          {:cont, {:ok, %{Bedrock.range_tag() => %{Bedrock.key() => term()}}}}
512
          | {:halt, {:error, term()}}
513
  defp handle_non_aborted_transaction(writes, storage_teams, acc) do
514
    case group_writes_by_tag(writes, storage_teams) do
8✔
515
      {:ok, tag_grouped_writes} ->
8✔
516
        {:cont, {:ok, merge_writes_by_tag(acc, tag_grouped_writes)}}
517

NEW
518
      {:error, _reason} = error ->
×
519
        {:halt, error}
520
    end
521
  end
522

523
  @doc """
524
  Groups a map of writes by their target storage team tags.
525

526
  For each key-value pair in the writes map, determines which storage team
527
  tag the key belongs to and groups the writes accordingly.
528

529
  ## Parameters
530
    - `writes`: Map of key -> value pairs to be written
531
    - `storage_teams`: List of storage team descriptors for tag mapping
532

533
  ## Returns
534
    - Map of tag -> %{key => value} for writes belonging to each tag
535

536
  ## Failure Behavior
537
    - Returns `{:error, {:storage_team_coverage_error, key}}` if any key doesn't
538
      match any storage team. This indicates a critical configuration error
539
      where storage teams don't cover the full keyspace, requiring recovery.
540
  """
541
  @spec group_writes_by_tag(%{Bedrock.key() => term()}, [StorageTeamDescriptor.t()]) ::
542
          {:ok, %{Bedrock.range_tag() => %{Bedrock.key() => term()}}}
543
          | {:error, storage_coverage_error()}
544
  def group_writes_by_tag(writes, storage_teams) do
545
    result =
16✔
546
      writes
547
      |> Enum.reduce_while({:ok, %{}}, fn {key, value}, {:ok, acc} ->
548
        case key_to_tag(key, storage_teams) do
22✔
549
          {:ok, tag} ->
20✔
550
            {:cont, {:ok, Map.update(acc, tag, %{key => value}, &Map.put(&1, key, value))}}
6✔
551

552
          {:error, :no_matching_team} ->
2✔
553
            {:halt, {:error, {:storage_team_coverage_error, key}}}
554
        end
555
      end)
556

557
    result
16✔
558
  end
559

560
  @doc """
561
  Merges two maps of writes grouped by tag.
562

563
  Takes two maps where keys are tags and values are write maps,
564
  and merges the write maps for each tag.
565

566
  ## Parameters
567
    - `acc`: Accumulator map of tag -> writes
568
    - `new_writes`: New writes map to merge
569

570
  ## Returns
571
    - Merged map of tag -> combined writes
572
  """
573
  @spec merge_writes_by_tag(
574
          %{Bedrock.range_tag() => %{Bedrock.key() => term()}},
575
          %{Bedrock.range_tag() => %{Bedrock.key() => term()}}
576
        ) :: %{Bedrock.range_tag() => %{Bedrock.key() => term()}}
577
  def merge_writes_by_tag(acc, new_writes) do
578
    Map.merge(acc, new_writes, fn _tag, existing_writes, new_writes ->
18✔
579
      Map.merge(existing_writes, new_writes)
4✔
580
    end)
581
  end
582

583
  @doc """
584
  Maps a key to its corresponding storage team tag.
585

586
  Searches through storage teams to find which team's key range contains
587
  the given key. Uses lexicographic ordering where a key belongs to a team
588
  if it falls within [start_key, end_key) or [start_key, :end).
589

590
  ## Parameters
591
    - `key`: The key to map to a storage team
592
    - `storage_teams`: List of storage team descriptors
593

594
  ## Returns
595
    - `{:ok, tag}` if a matching storage team is found
596
    - `{:error, :no_matching_team}` if no team covers the key
597

598
  ## Examples
599
      iex> teams = [%{tag: :team1, key_range: {"a", "m"}}, %{tag: :team2, key_range: {"m", :end}}]
600
      iex> key_to_tag("hello", teams)
601
      {:ok, :team1}
602
  """
603
  @spec key_to_tag(Bedrock.key(), [StorageTeamDescriptor.t()]) ::
604
          {:ok, Bedrock.range_tag()} | {:error, :no_matching_team}
605
  def key_to_tag(key, storage_teams) do
606
    Enum.find_value(storage_teams, {:error, :no_matching_team}, fn
46✔
607
      %{tag: tag, key_range: {start_key, end_key}} ->
608
        if key_in_range?(key, start_key, end_key) do
62✔
609
          {:ok, tag}
610
        else
611
          nil
612
        end
613
    end)
614
  end
615

616
  @spec key_in_range?(Bedrock.key(), Bedrock.key(), Bedrock.key() | :end) :: boolean()
617
  defp key_in_range?(key, start_key, :end), do: key >= start_key
20✔
618
  defp key_in_range?(key, start_key, end_key), do: key >= start_key and key < end_key
42✔
619

620
  # ============================================================================
621
  # STEP 6: PUSH TO LOGS
622
  # ============================================================================
623

624
  @spec push_to_logs(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
625
          FinalizationPlan.t()
626
  defp push_to_logs(%FinalizationPlan{stage: :failed} = plan, _layout, _opts), do: plan
4✔
627

628
  defp push_to_logs(%FinalizationPlan{stage: :ready_for_logging} = plan, layout, opts) do
629
    batch_log_push_fn = Keyword.get(opts, :batch_log_push_fn, &push_transaction_to_logs/5)
16✔
630

631
    case batch_log_push_fn.(
16✔
632
           layout,
633
           plan.last_commit_version,
16✔
634
           plan.transactions_by_tag,
16✔
635
           plan.commit_version,
16✔
636
           opts
637
         ) do
638
      :ok ->
639
        %{plan | stage: :logged}
12✔
640

641
      {:error, reason} ->
642
        %{plan | error: reason, stage: :failed}
4✔
643
    end
644
  end
645

646
  @doc """
647
  Builds the transaction that each log should receive based on tag coverage.
648

649
  Each log receives a transaction containing writes for all tags it covers.
650
  Logs that don't cover any tags in the transaction get an empty transaction
651
  to maintain version consistency.
652

653
  ## Parameters
654
    - `logs_by_id`: Map of log_id -> list of tags covered by that log
655
    - `transactions_by_tag`: Map of tag -> transaction shard for that tag
656
    - `commit_version`: The commit version for empty transactions
657

658
  ## Returns
659
    - Map of log_id -> transaction that log should receive
660

661
  ## Examples
662
      iex> logs_by_id = %{log1: [:tag1, :tag2], log2: [:tag3]}
663
      iex> transactions_by_tag = %{tag1: transaction1, tag3: transaction3}
664
      iex> build_log_transactions(logs_by_id, transactions_by_tag, 42)
665
      %{log1: combined_transaction, log2: transaction_for_tag3}
666
  """
667
  @spec build_log_transactions(
668
          %{Log.id() => [Bedrock.range_tag()]},
669
          %{Bedrock.range_tag() => Transaction.t()},
670
          Bedrock.version()
671
        ) :: %{Log.id() => Transaction.t()}
672
  def build_log_transactions(logs_by_id, transactions_by_tag, commit_version) do
673
    logs_by_id
674
    |> Enum.map(fn {log_id, tags_covered} ->
675
      # Collect writes for all tags this log covers
676
      combined_writes =
48✔
677
        tags_covered
678
        |> Enum.filter(&Map.has_key?(transactions_by_tag, &1))
66✔
679
        |> Enum.reduce(%{}, fn tag, acc ->
680
          transaction = Map.get(transactions_by_tag, tag)
38✔
681
          writes = Transaction.key_values(transaction)
38✔
682
          Map.merge(acc, writes)
38✔
683
        end)
684

685
      transaction = Transaction.new(commit_version, combined_writes)
48✔
686
      {log_id, transaction}
687
    end)
688
    |> Map.new()
26✔
689
  end
690

691
  def resolve_log_descriptors(log_descriptors, services) do
692
    log_descriptors
693
    |> Map.keys()
694
    |> Enum.map(&{&1, Map.get(services, &1)})
30✔
695
    |> Enum.reject(&is_nil(elem(&1, 1)))
30✔
696
    |> Map.new()
18✔
697
  end
698

699
  @spec try_to_push_transaction_to_log(
700
          ServiceDescriptor.t(),
701
          EncodedTransaction.t(),
702
          Bedrock.version()
703
        ) ::
704
          :ok | {:error, :unavailable}
705
  def try_to_push_transaction_to_log(
706
        %{kind: :log, status: {:up, log_server}},
707
        transaction,
708
        last_commit_version
709
      ) do
710
    Log.push(log_server, transaction, last_commit_version)
10✔
711
  end
712

713
  def try_to_push_transaction_to_log(_, _, _), do: {:error, :unavailable}
2✔
714

715
  @doc """
716
  Pushes transaction shards to logs based on tag coverage and waits for
717
  acknowledgement from ALL log servers.
718

719
  This function takes transaction shards grouped by storage team tags and
720
  routes them efficiently to logs. Each log receives only the transaction
721
  shards for tags it covers, plus empty transactions for version consistency.
722
  All logs must acknowledge to maintain durability guarantees.
723

724
  ## Parameters
725

726
    - `transaction_system_layout`: Contains configuration information about the
727
      transaction system, including available log servers and their tag coverage.
728
    - `last_commit_version`: The last known committed version; used to
729
      ensure consistency in log ordering.
730
    - `transactions_by_tag`: Map of storage team tag to transaction shard.
731
      May be empty if all transactions were aborted.
732
    - `commit_version`: The version assigned by the sequencer for this batch.
733
    - `opts`: Optional configuration for testing and customization.
734

735
  ## Options
736
    - `:async_stream_fn` - Function for parallel processing (default: Task.async_stream/3)
737
    - `:log_push_fn` - Function for pushing to individual logs (default: try_to_push_transaction_to_log/3)
738
    - `:timeout` - Timeout for log push operations (default: 5_000ms)
739

740
  ## Returns
741
    - `:ok` if acknowledgements have been received from ALL log servers.
742
    - `{:error, log_push_error()}` if any log has not successfully acknowledged the
743
       push within the timeout period or other errors occur.
744
  """
745
  @spec push_transaction_to_logs(
746
          TransactionSystemLayout.t(),
747
          last_commit_version :: Bedrock.version(),
748
          %{Bedrock.range_tag() => Transaction.t()},
749
          commit_version :: Bedrock.version(),
750
          opts :: [
751
            async_stream_fn: async_stream_fn(),
752
            log_push_fn: log_push_single_fn(),
753
            timeout: non_neg_integer()
754
          ]
755
        ) :: :ok | {:error, log_push_error()}
756
  def push_transaction_to_logs(
757
        transaction_system_layout,
758
        last_commit_version,
759
        transactions_by_tag,
760
        commit_version,
761
        opts \\ []
4✔
762
      ) do
763
    # Extract configurable functions for testability
764
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
18✔
765
    log_push_fn = Keyword.get(opts, :log_push_fn, &try_to_push_transaction_to_log/3)
18✔
766
    timeout = Keyword.get(opts, :timeout, 5_000)
18✔
767

768
    logs_by_id = transaction_system_layout.logs
18✔
769
    required_acknowledgments = map_size(logs_by_id)
18✔
770

771
    # Build the transaction each log should receive
772
    log_transactions = build_log_transactions(logs_by_id, transactions_by_tag, commit_version)
18✔
773
    resolved_logs = resolve_log_descriptors(logs_by_id, transaction_system_layout.services)
18✔
774

775
    # Use configurable async stream function
776
    stream_result =
18✔
777
      async_stream_fn.(
778
        resolved_logs,
779
        fn {log_id, service_descriptor} ->
780
          transaction_for_log = Map.get(log_transactions, log_id)
10✔
781
          encoded_transaction = EncodedTransaction.encode(transaction_for_log)
10✔
782

783
          result = log_push_fn.(service_descriptor, encoded_transaction, last_commit_version)
10✔
784
          {log_id, result}
785
        end,
786
        timeout: timeout
787
      )
788

789
    stream_result
790
    |> Enum.reduce_while({0, []}, fn
791
      {:ok, {log_id, {:error, reason}}}, {_count, errors} ->
6✔
792
        {:halt, {:error, [{log_id, reason} | errors]}}
793

794
      {:ok, {_log_id, :ok}}, {count, errors} ->
795
        count = 1 + count
18✔
796

797
        if count == required_acknowledgments do
18✔
798
          {:halt, {:ok, count}}
799
        else
800
          {:cont, {count, errors}}
801
        end
802

NEW
803
      {:exit, {log_id, reason}}, {_count, errors} ->
×
804
        {:halt, {:error, [{log_id, reason} | errors]}}
805
    end)
806
    |> case do
18✔
807
      {:ok, ^required_acknowledgments} ->
10✔
808
        :ok
809

810
      {:error, errors} ->
6✔
811
        {:error, {:log_failures, errors}}
812

813
      {count, errors} when count < required_acknowledgments ->
2✔
814
        {:error, {:insufficient_acknowledgments, count, required_acknowledgments, errors}}
815

NEW
816
      _other ->
×
817
        {:error, :log_push_failed}
818
    end
819
  end
820

821
  # ============================================================================
822
  # STEP 7: NOTIFY SEQUENCER
823
  # ============================================================================
824

825
  @spec notify_sequencer(FinalizationPlan.t(), Bedrock.DataPlane.Sequencer.ref()) ::
826
          FinalizationPlan.t()
827
  defp notify_sequencer(%FinalizationPlan{stage: :failed} = plan, _sequencer), do: plan
8✔
828

829
  defp notify_sequencer(%FinalizationPlan{stage: :logged} = plan, sequencer) do
830
    :ok = Sequencer.report_successful_commit(sequencer, plan.commit_version)
12✔
831
    %{plan | stage: :sequencer_notified}
12✔
832
  end
833

834
  # ============================================================================
835
  # STEP 8: NOTIFY SUCCESSES
836
  # ============================================================================
837

838
  @spec notify_successes(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
839
  defp notify_successes(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
8✔
840

841
  defp notify_successes(%FinalizationPlan{stage: :sequencer_notified} = plan, opts) do
842
    success_reply_fn = Keyword.get(opts, :success_reply_fn, &send_reply_with_commit_version/2)
12✔
843

844
    successful_indices = get_successful_indices(plan)
12✔
845
    unreplied_success_indices = MapSet.difference(successful_indices, plan.replied_indices)
12✔
846

847
    unreplied_successes =
12✔
848
      filter_replies_by_indices(plan.successful_replies, unreplied_success_indices)
12✔
849

850
    success_reply_fn.(unreplied_successes, plan.commit_version)
12✔
851

852
    updated_replied_indices = MapSet.union(plan.replied_indices, successful_indices)
12✔
853

854
    %{plan | replied_indices: updated_replied_indices, stage: :completed}
12✔
855
  end
856

857
  @spec send_reply_with_commit_version([Batch.reply_fn()], Bedrock.version()) ::
858
          :ok
859
  def send_reply_with_commit_version(oks, commit_version),
860
    do: Enum.each(oks, & &1.({:ok, commit_version}))
12✔
861

862
  # Helper functions for notify_successes step
863

864
  @spec get_successful_indices(FinalizationPlan.t()) :: MapSet.t()
865
  defp get_successful_indices(plan) do
866
    aborted_set = MapSet.new(plan.aborted_indices)
12✔
867
    transaction_count = length(plan.transactions)
12✔
868

869
    if transaction_count > 0 do
12✔
870
      all_indices = MapSet.new(0..(transaction_count - 1))
6✔
871
      MapSet.difference(all_indices, aborted_set)
6✔
872
    else
873
      MapSet.new()
6✔
874
    end
875
  end
876

877
  @spec filter_replies_by_indices([Batch.reply_fn()], MapSet.t()) :: [Batch.reply_fn()]
878
  defp filter_replies_by_indices(replies, indices_set) do
879
    replies
880
    |> Enum.with_index()
881
    |> Enum.filter(fn {_reply_fn, idx} -> MapSet.member?(indices_set, idx) end)
4✔
882
    |> Enum.map(fn {reply_fn, _idx} -> reply_fn end)
12✔
883
  end
884

885
  # ============================================================================
886
  # STEP 8: EXTRACT RESULT OR HANDLE ERROR
887
  # ============================================================================
888

889
  @spec extract_result_or_handle_error(FinalizationPlan.t(), keyword()) ::
890
          {:ok, non_neg_integer(), non_neg_integer()} | {:error, finalization_error()}
891
  defp extract_result_or_handle_error(%FinalizationPlan{stage: :completed} = plan, _opts) do
892
    n_aborts = length(plan.aborted_replies)
12✔
893
    n_successes = length(plan.successful_replies)
12✔
894
    {:ok, n_aborts, n_successes}
12✔
895
  end
896

897
  defp extract_result_or_handle_error(%FinalizationPlan{stage: :failed} = plan, opts) do
898
    handle_error(plan, opts)
8✔
899
  end
900

901
  # Error recovery: safely abort all unreplied transactions
902
  @spec handle_error(FinalizationPlan.t(), keyword()) :: {:error, finalization_error()}
903
  defp handle_error(%FinalizationPlan{error: error} = plan, opts) when not is_nil(error) do
904
    abort_reply_fn =
8✔
905
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
906

907
    all_indices = MapSet.new(0..(length(plan.transactions) - 1))
8✔
908
    unreplied_indices = MapSet.difference(all_indices, plan.replied_indices)
8✔
909

910
    unreplied_replies =
8✔
911
      plan.transactions
8✔
912
      |> Enum.with_index()
913
      |> Enum.filter(fn {_transaction, idx} -> MapSet.member?(unreplied_indices, idx) end)
8✔
914
      |> Enum.map(fn {{reply_fn, _transaction}, _idx} -> reply_fn end)
8✔
915

916
    abort_reply_fn.(unreplied_replies)
8✔
917

918
    {:error, plan.error}
8✔
919
  end
920
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc