• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 022bf889f44fabf5818028244ac5e8e953e1a798

18 Aug 2025 02:00AM UTC coverage: 62.424% (+2.5%) from 59.888%
022bf889f44fabf5818028244ac5e8e953e1a798

push

github

web-flow
Feature/31 (#35)

# Major Transaction System Overhaul

This branch represents a **fundamental rearchitecture of Bedrock's
transaction processing system** with three main
phases of development spanning performance, unification, reliability,
and architectural improvements.

  ## Phase 1: Transaction Format Unification (fd66e2d)
**Rationale**: Replace the fragmented, inconsistent transaction encoding
system with a unified, robust approach.

  **Key Changes**:
- **Consolidated Transaction Format**: Merged `read_conflicts` and
`read_version` into a single tuple format,
  eliminating redundancy
- **New BedrockTransaction Module**: Replaced the limited
`EncodedTransaction` with a comprehensive 971-line module
  featuring:
    - Tagged binary encoding with self-describing sections
    - CRC validation for data integrity
    - Order-independent sections for better extensibility
    - Efficient partial decoding capabilities
- **Removed Legacy Code**: Eliminated obsolete resolver recovery logic
and transaction fragmentation
- **Comprehensive Testing**: Added extensive test coverage including
binary integration tests

  ## Phase 2: Enhanced Transaction Processing (f96a1ed)
**Rationale**: Address reliability and robustness gaps in transaction
handling.

  **Key Improvements**:
- **Transaction Validation**: Added validation in resolver to ensure
proper transaction format before processing
- **Timeout Management**: Implemented `WaitingList` module for
systematic timeout handling across components
- **Error Resilience**: Enhanced error handling for Director
notifications and unavailable states
- **Standardization**: Moved Logger.require statements to module level
for consistency

  ## Phase 3: Commit Proxy Rework & System-Wide Improvements (dc6a048)
**Rationale**: Optimize transaction distribution and improve
observability across the data plane.

  **Major Architectural Changes**:
- **Single Transaction Per Log**: Reworked commit proxy to produce one
transaction per ... (continued)

817 of 1213 new or added lines in 110 files covered. (67.35%)

40 existing lines in 21 files now uncovered.

2369 of 3795 relevant lines covered (62.42%)

871.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.01
/lib/bedrock/data_plane/commit_proxy/finalization.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Finalization do
2
  @moduledoc """
3
  Transaction finalization pipeline that handles conflict resolution and log persistence.
4

5
  ## Version Chain Integrity
6

7
  CRITICAL: This module maintains the Lamport clock version chain established by the sequencer.
8
  The sequencer provides both `last_commit_version` and `commit_version` as a proper chain link:
9

10
  - `last_commit_version`: The actual last committed version from the sequencer
11
  - `commit_version`: The new version assigned to this batch
12

13
  Always use the exact version values provided by the sequencer through the batch to maintain
14
  proper MVCC conflict detection and transaction ordering. Version gaps can exist due to failed
15
  transactions, recovery scenarios, or system restarts.
16
  """
17

18
  import Bedrock.DataPlane.CommitProxy.Batch,
19
    only: [transactions_in_order: 1]
20

21
  import Bitwise, only: [<<<: 2]
22

23
  alias Bedrock.ControlPlane.Config.ServiceDescriptor
24
  alias Bedrock.ControlPlane.Config.StorageTeamDescriptor
25
  alias Bedrock.ControlPlane.Config.TransactionSystemLayout
26
  alias Bedrock.DataPlane.CommitProxy.Batch
27
  alias Bedrock.DataPlane.Log
28
  alias Bedrock.DataPlane.Resolver
29
  alias Bedrock.DataPlane.Sequencer
30
  alias Bedrock.DataPlane.Transaction
31

32
  @type resolver_fn() :: (resolvers :: [{start_key :: Bedrock.key(), Resolver.ref()}],
33
                          last_version :: Bedrock.version(),
34
                          commit_version :: Bedrock.version(),
35
                          transaction_summaries :: [Resolver.transaction_summary()],
36
                          resolver_opts :: [timeout: Bedrock.timeout_in_ms()] ->
37
                            {:ok, aborted :: [transaction_index :: non_neg_integer()]}
38
                            | {:error, resolution_error()})
39

40
  @type log_push_batch_fn() :: (TransactionSystemLayout.t(),
41
                                last_commit_version :: Bedrock.version(),
42
                                transactions_by_tag :: %{
43
                                  Bedrock.range_tag() => Transaction.encoded()
44
                                },
45
                                commit_version :: Bedrock.version(),
46
                                opts :: [
47
                                  timeout: Bedrock.timeout_in_ms(),
48
                                  async_stream_fn: async_stream_fn()
49
                                ] ->
50
                                  :ok | {:error, log_push_error()})
51

52
  @type log_push_single_fn() :: (ServiceDescriptor.t(), binary(), Bedrock.version() ->
53
                                   :ok | {:error, :unavailable})
54

55
  @type async_stream_fn() :: (enumerable :: Enumerable.t(), fun :: (term() -> term()), opts :: keyword() ->
56
                                Enumerable.t())
57

58
  @type abort_reply_fn() :: ([Batch.reply_fn()] -> :ok)
59

60
  @type success_reply_fn() :: ([Batch.reply_fn()], Bedrock.version() -> :ok)
61

62
  @type timeout_fn() :: (non_neg_integer() -> non_neg_integer())
63

64
  @type resolution_error() ::
65
          :timeout
66
          | :unavailable
67
          | {:resolver_unavailable, term()}
68

69
  @type storage_coverage_error() ::
70
          {:storage_team_coverage_error, binary()}
71

72
  @type log_push_error() ::
73
          {:log_failures, [{Log.id(), term()}]}
74
          | {:insufficient_acknowledgments, non_neg_integer(), non_neg_integer(), [{Log.id(), term()}]}
75
          | :log_push_failed
76

77
  @type finalization_error() ::
78
          resolution_error()
79
          | storage_coverage_error()
80
          | log_push_error()
81

82
  defmodule FinalizationPlan do
83
    @moduledoc """
84
    Pipeline state for transaction finalization that accumulates information
85
    and tracks reply status to prevent double-replies.
86
    """
87

88
    @enforce_keys [
89
      :transactions,
90
      :commit_version,
91
      :last_commit_version,
92
      :storage_teams,
93
      :logs_by_id
94
    ]
95
    defstruct [
96
      :transactions,
97
      :commit_version,
98
      :last_commit_version,
99
      :storage_teams,
100
      :logs_by_id,
101
      resolver_data: [],
102
      aborted_indices: [],
103
      aborted_replies: [],
104
      successful_replies: [],
105
      transactions_by_log: %{},
106
      replied_indices: MapSet.new(),
107
      stage: :initialized,
108
      error: nil
109
    ]
110

111
    @type t :: %__MODULE__{
112
            transactions: [{Batch.reply_fn(), binary()}],
113
            commit_version: Bedrock.version(),
114
            last_commit_version: Bedrock.version(),
115
            storage_teams: [StorageTeamDescriptor.t()],
116
            logs_by_id: %{Log.id() => [Bedrock.range_tag()]},
117
            resolver_data: [Resolver.transaction_summary()],
118
            aborted_indices: [integer()],
119
            aborted_replies: [Batch.reply_fn()],
120
            successful_replies: [Batch.reply_fn()],
121
            transactions_by_log: %{Log.id() => Transaction.encoded()},
122
            replied_indices: MapSet.t(),
123
            stage: atom(),
124
            error: term() | nil
125
          }
126
  end
127

128
  @doc """
129
  Finalizes a batch of transactions by resolving conflicts, separating
130
  successful transactions from aborts, and pushing them to the log servers.
131

132
  This function processes a batch of transactions, first ensuring that any
133
  conflicts are resolved. After conflict resolution, it organizes the
134
  transactions into those that will be committed and those that will be aborted.
135

136
  Clients with aborted transactions are notified of the abort immediately.
137
  Successful transactions are pushed to the system's logs, and clients that
138
  submitted the transactions are notified when a majority of the log servers
139
  have acknowledged.
140

141
  ## Parameters
142

143
    - `batch`: A `Batch.t()` struct that contains the transactions to be finalized,
144
      along with the commit version details.
145
    - `transaction_system_layout`: Provides configuration and systemic details,
146
      including the available resolver and log servers.
147

148
  ## Returns
149
    - `:ok` when the batch has been processed, and all clients have been
150
      notified about the status of their transactions.
151
  """
152
  @spec finalize_batch(
153
          Batch.t(),
154
          TransactionSystemLayout.t(),
155
          opts :: [
156
            epoch: Bedrock.epoch(),
157
            resolver_fn: resolver_fn(),
158
            batch_log_push_fn: log_push_batch_fn(),
159
            abort_reply_fn: abort_reply_fn(),
160
            success_reply_fn: success_reply_fn(),
161
            async_stream_fn: async_stream_fn(),
162
            log_push_fn: log_push_single_fn(),
163
            timeout: non_neg_integer()
164
          ]
165
        ) ::
166
          {:ok, n_aborts :: non_neg_integer(), n_oks :: non_neg_integer()}
167
          | {:error, finalization_error()}
168
  def finalize_batch(batch, transaction_system_layout, opts \\ []) do
169
    batch
170
    |> create_finalization_plan(transaction_system_layout)
171
    |> prepare_for_resolution()
172
    |> resolve_conflicts(transaction_system_layout, opts)
173
    |> split_and_notify_aborts(opts)
174
    |> prepare_for_logging()
175
    |> push_to_logs(transaction_system_layout, opts)
176
    |> notify_sequencer(transaction_system_layout.sequencer)
13✔
177
    |> notify_successes(opts)
178
    |> extract_result_or_handle_error(opts)
13✔
179
  end
180

181
  @spec create_finalization_plan(Batch.t(), TransactionSystemLayout.t()) :: FinalizationPlan.t()
182
  defp create_finalization_plan(batch, transaction_system_layout) do
183
    %FinalizationPlan{
13✔
184
      transactions: transactions_in_order(batch),
185
      commit_version: batch.commit_version,
13✔
186
      last_commit_version: batch.last_commit_version,
13✔
187
      storage_teams: transaction_system_layout.storage_teams,
13✔
188
      logs_by_id: transaction_system_layout.logs,
13✔
189
      stage: :created
190
    }
191
  end
192

193
  @spec prepare_for_resolution(FinalizationPlan.t()) :: FinalizationPlan.t()
194
  defp prepare_for_resolution(%FinalizationPlan{stage: :created} = plan) do
195
    resolver_data = transform_transactions_for_resolution(plan.transactions)
13✔
196

197
    %{plan | resolver_data: resolver_data, stage: :ready_for_resolution}
13✔
198
  end
199

200
  @doc """
201
  Transforms the list of transactions for resolution.
202

203
  Converts the transaction data to the format expected by the conflict
204
  resolution logic. For each transaction, it extracts the read version,
205
  read conflicts, and write conflicts. Handles both map format (Bedrock.transaction())
206
  and binary encoded format for backward compatibility.
207
  """
208
  @spec transform_transactions_for_resolution([
209
          {Batch.reply_fn(), Bedrock.transaction() | binary()}
210
        ]) :: [
211
          Resolver.transaction_summary()
212
        ]
213
  def transform_transactions_for_resolution(transactions) do
214
    Enum.map(transactions, &transform_single_transaction_for_resolution/1)
18✔
215
  end
216

217
  @spec transform_single_transaction_for_resolution({Batch.reply_fn(), Bedrock.transaction() | binary()}) ::
218
          Resolver.transaction_summary()
219
  defp transform_single_transaction_for_resolution({_reply_fn, transaction}) when is_map(transaction) do
NEW
220
    read_version = Map.get(transaction, :read_version)
×
NEW
221
    read_conflicts = Map.get(transaction, :read_conflicts, [])
×
NEW
222
    write_conflicts = Map.get(transaction, :write_conflicts, [])
×
223

NEW
224
    transform_version_and_conflicts(read_version, read_conflicts, write_conflicts)
×
225
  end
226

227
  defp transform_single_transaction_for_resolution({_reply_fn, binary_transaction})
228
       when is_binary(binary_transaction) do
229
    {:ok, read_version} = Transaction.extract_read_version(binary_transaction)
19✔
230

231
    {:ok, {_read_version, read_conflicts}} =
19✔
232
      Transaction.extract_read_conflicts(binary_transaction)
233

234
    {:ok, write_conflicts} = Transaction.extract_write_conflicts(binary_transaction)
19✔
235

236
    transform_version_and_conflicts(read_version, read_conflicts, write_conflicts)
19✔
237
  end
238

239
  @spec transform_version_and_conflicts(nil | Bedrock.version(), [Bedrock.key()], [Bedrock.key()]) ::
240
          Resolver.transaction_summary()
241
  defp transform_version_and_conflicts(nil, _read_conflicts, write_conflicts) do
17✔
242
    {nil, write_conflicts}
243
  end
244

245
  defp transform_version_and_conflicts(version, read_conflicts, write_conflicts) do
2✔
246
    {{version, Enum.uniq(read_conflicts)}, write_conflicts}
247
  end
248

249
  @spec resolve_conflicts(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
250
          FinalizationPlan.t()
251
  defp resolve_conflicts(%FinalizationPlan{stage: :ready_for_resolution} = plan, layout, opts) do
252
    resolver_fn = Keyword.get(opts, :resolver_fn, &resolve_transactions/6)
13✔
253
    epoch = Keyword.get(opts, :epoch) || raise "Missing epoch in finalization opts"
13✔
254

255
    case resolver_fn.(
13✔
256
           epoch,
257
           layout.resolvers,
13✔
258
           plan.last_commit_version,
13✔
259
           plan.commit_version,
13✔
260
           plan.resolver_data,
13✔
261
           Keyword.put(opts, :timeout, 1_000)
262
         ) do
263
      {:ok, aborted_indices} ->
264
        %{plan | aborted_indices: aborted_indices, stage: :conflicts_resolved}
11✔
265

266
      {:error, reason} ->
267
        %{plan | error: reason, stage: :failed}
2✔
268
    end
269
  end
270

271
  @spec resolve_transactions(
272
          epoch :: Bedrock.epoch(),
273
          resolvers :: [{start_key :: Bedrock.key(), Resolver.ref()}],
274
          last_version :: Bedrock.version(),
275
          commit_version :: Bedrock.version(),
276
          [Resolver.transaction_summary()],
277
          opts :: [
278
            timeout: :infinity | non_neg_integer(),
279
            timeout_fn: timeout_fn(),
280
            attempts_remaining: non_neg_integer(),
281
            attempts_used: non_neg_integer()
282
          ]
283
        ) ::
284
          {:ok, aborted :: [index :: integer()]}
285
          | {:error, resolution_error()}
286
  def resolve_transactions(epoch, resolvers, last_version, commit_version, transaction_summaries, opts) do
287
    timeout_fn = Keyword.get(opts, :timeout_fn, &default_timeout_fn/1)
17✔
288
    attempts_remaining = Keyword.get(opts, :attempts_remaining, 2)
17✔
289
    attempts_used = Keyword.get(opts, :attempts_used, 0)
17✔
290
    timeout = Keyword.get(opts, :timeout, timeout_fn.(attempts_used))
17✔
291

292
    ranges =
17✔
293
      resolvers
294
      |> Enum.map(&elem(&1, 0))
17✔
295
      |> Enum.concat([:end])
296
      |> Enum.chunk_every(2, 1, :discard)
297

298
    transaction_summaries_by_start_key =
17✔
299
      Map.new(ranges, fn
300
        [start_key, end_key] ->
301
          filtered_summaries =
17✔
302
            filter_transaction_summaries(
303
              transaction_summaries,
304
              filter_fn(start_key, end_key)
305
            )
306

307
          {start_key, filtered_summaries}
308
      end)
309

310
    result =
17✔
311
      resolvers
312
      |> Enum.map(fn {start_key, ref} ->
313
        Resolver.resolve_transactions(
17✔
314
          ref,
315
          epoch,
316
          last_version,
317
          commit_version,
318
          Map.get(transaction_summaries_by_start_key, start_key, []),
319
          timeout: timeout
320
        )
321
      end)
322
      |> Enum.reduce({:ok, []}, fn
323
        {:ok, aborted}, {:ok, acc} ->
×
324
          {:ok, Enum.uniq(acc ++ aborted)}
325

326
        {:error, reason}, _ ->
17✔
327
          {:error, reason}
328
      end)
329

330
    case result do
17✔
331
      {:ok, _} = success ->
332
        success
×
333

334
      {:error, reason} when attempts_remaining > 0 ->
335
        :telemetry.execute(
11✔
336
          [:bedrock, :commit_proxy, :resolver, :retry],
337
          %{attempts_remaining: attempts_remaining - 1, attempts_used: attempts_used + 1},
338
          %{reason: reason}
339
        )
340

341
        updated_opts =
11✔
342
          opts
343
          |> Keyword.put(:attempts_remaining, attempts_remaining - 1)
344
          |> Keyword.put(:attempts_used, attempts_used + 1)
345

346
        resolve_transactions(
11✔
347
          epoch,
348
          resolvers,
349
          last_version,
350
          commit_version,
351
          transaction_summaries,
352
          updated_opts
353
        )
354

355
      {:error, reason} ->
356
        :telemetry.execute(
6✔
357
          [:bedrock, :commit_proxy, :resolver, :max_retries_exceeded],
358
          %{total_attempts: attempts_used + 1},
359
          %{reason: reason}
360
        )
361

362
        {:error, {:resolver_unavailable, reason}}
363
    end
364
  end
365

366
  @spec default_timeout_fn(non_neg_integer()) :: non_neg_integer()
367
  def default_timeout_fn(attempts_used), do: 500 * (1 <<< attempts_used)
12✔
368

369
  @spec filter_fn(Bedrock.key(), :end | Bedrock.key()) :: (Bedrock.key() -> boolean())
370
  defp filter_fn(start_key, :end), do: &(&1 >= start_key)
17✔
371
  defp filter_fn(start_key, end_key), do: &(&1 >= start_key and &1 < end_key)
×
372

373
  @spec filter_transaction_summaries([Resolver.transaction_summary()], (Bedrock.key() ->
374
                                                                          boolean())) :: [
375
          Resolver.transaction_summary()
376
        ]
377
  defp filter_transaction_summaries(transaction_summaries, filter_fn),
378
    do: Enum.map(transaction_summaries, &filter_transaction_summary(&1, filter_fn))
17✔
379

380
  @spec filter_transaction_summary(Resolver.transaction_summary(), (Bedrock.key() -> boolean())) ::
381
          Resolver.transaction_summary()
382
  defp filter_transaction_summary({nil, writes}, filter_fn), do: {nil, Enum.filter(writes, filter_fn)}
17✔
383

384
  defp filter_transaction_summary({{read_version, reads}, writes}, filter_fn),
×
385
    do: {{read_version, Enum.filter(reads, filter_fn)}, Enum.filter(writes, filter_fn)}
386

387
  @spec split_and_notify_aborts(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
388
  defp split_and_notify_aborts(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
2✔
389

390
  defp split_and_notify_aborts(%FinalizationPlan{stage: :conflicts_resolved} = plan, opts) do
391
    abort_reply_fn =
11✔
392
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
393

394
    {aborted_replies, successful_replies, aborted_indices_set} =
11✔
395
      split_transactions_by_abort_status(plan)
396

397
    new_aborted_indices = MapSet.difference(aborted_indices_set, plan.replied_indices)
11✔
398

399
    new_aborted_replies =
11✔
400
      plan.transactions
11✔
401
      |> Enum.with_index()
402
      |> Enum.filter(fn {_transaction, idx} -> MapSet.member?(new_aborted_indices, idx) end)
12✔
403
      |> Enum.map(fn {{reply_fn, _transaction}, _idx} -> reply_fn end)
3✔
404

405
    abort_reply_fn.(new_aborted_replies)
11✔
406

407
    updated_replied_indices = MapSet.union(plan.replied_indices, aborted_indices_set)
11✔
408

409
    %{
410
      plan
411
      | aborted_replies: aborted_replies,
11✔
412
        successful_replies: successful_replies,
413
        replied_indices: updated_replied_indices,
414
        stage: :aborts_notified
415
    }
416
  end
417

418
  @spec reply_to_all_clients_with_aborted_transactions([Batch.reply_fn()]) :: :ok
419
  def reply_to_all_clients_with_aborted_transactions([]), do: :ok
9✔
420

421
  def reply_to_all_clients_with_aborted_transactions(aborts), do: Enum.each(aborts, & &1.({:error, :aborted}))
5✔
422

423
  @spec split_transactions_by_abort_status(FinalizationPlan.t()) ::
424
          {[Batch.reply_fn()], [Batch.reply_fn()], MapSet.t()}
425
  defp split_transactions_by_abort_status(plan) do
426
    aborted_set = MapSet.new(plan.aborted_indices)
11✔
427

428
    {aborted_replies, successful_replies} =
11✔
429
      plan.transactions
11✔
430
      |> Enum.with_index()
431
      |> Enum.reduce({[], []}, fn {{reply_fn, _transaction}, idx}, {aborts_acc, success_acc} ->
432
        if MapSet.member?(aborted_set, idx) do
12✔
433
          {[reply_fn | aborts_acc], success_acc}
434
        else
435
          {aborts_acc, [reply_fn | success_acc]}
436
        end
437
      end)
438

439
    {Enum.reverse(aborted_replies), Enum.reverse(successful_replies), aborted_set}
11✔
440
  end
441

442
  @spec prepare_for_logging(FinalizationPlan.t()) :: FinalizationPlan.t()
443
  defp prepare_for_logging(%FinalizationPlan{stage: :failed} = plan), do: plan
2✔
444

445
  defp prepare_for_logging(%FinalizationPlan{stage: :aborts_notified} = plan) do
446
    case build_transactions_for_logs(plan, plan.logs_by_id) do
11✔
447
      {:ok, transactions_by_log} ->
448
        %{plan | transactions_by_log: transactions_by_log, stage: :ready_for_logging}
11✔
449

450
      {:error, reason} ->
451
        %{plan | error: reason, stage: :failed}
×
452
    end
453
  end
454

455
  @spec build_transactions_for_logs(FinalizationPlan.t(), %{Log.id() => [Bedrock.range_tag()]}) ::
456
          {:ok, %{Log.id() => Transaction.encoded()}} | {:error, term()}
457
  defp build_transactions_for_logs(plan, logs_by_id) do
458
    aborted_set = MapSet.new(plan.aborted_indices)
11✔
459

460
    initial_mutations_by_log =
11✔
461
      logs_by_id
462
      |> Map.keys()
463
      |> Map.new(&{&1, []})
11✔
464

465
    case plan.transactions
11✔
466
         |> Enum.with_index()
467
         |> Enum.reduce_while(
468
           {:ok, initial_mutations_by_log},
469
           fn transaction_with_idx, {:ok, acc} ->
470
             process_transaction_for_logs(
12✔
471
               transaction_with_idx,
472
               aborted_set,
473
               plan.storage_teams,
12✔
474
               logs_by_id,
475
               acc
476
             )
477
           end
478
         ) do
479
      {:ok, mutations_by_log} ->
480
        result =
11✔
481
          Map.new(mutations_by_log, fn {log_id, mutations_list} ->
482
            encoded =
11✔
483
              Transaction.encode(%{
484
                mutations: Enum.reverse(mutations_list),
485
                commit_version: plan.commit_version
11✔
486
              })
487

488
            {log_id, encoded}
489
          end)
490

491
        {:ok, result}
492

NEW
493
      {:error, reason} ->
×
494
        {:error, reason}
495
    end
496
  end
497

498
  @spec process_transaction_for_logs(
499
          {{function(), binary()}, non_neg_integer()},
500
          MapSet.t(non_neg_integer()),
501
          [StorageTeamDescriptor.t()],
502
          %{Log.id() => [Bedrock.range_tag()]},
503
          %{Log.id() => [term()]}
504
        ) ::
505
          {:cont, {:ok, %{Log.id() => [term()]}}}
506
          | {:halt, {:error, term()}}
507
  defp process_transaction_for_logs({{_reply_fn, binary_transaction}, idx}, aborted_set, storage_teams, logs_by_id, acc) do
508
    if MapSet.member?(aborted_set, idx) do
12✔
509
      {:cont, {:ok, acc}}
510
    else
511
      process_transaction_mutations(binary_transaction, storage_teams, logs_by_id, acc)
9✔
512
    end
513
  end
514

515
  @spec process_transaction_mutations(
516
          binary(),
517
          [StorageTeamDescriptor.t()],
518
          %{Log.id() => [Bedrock.range_tag()]},
519
          %{Log.id() => [term()]}
520
        ) ::
521
          {:cont, {:ok, %{Log.id() => [term()]}}} | {:halt, {:error, term()}}
522
  defp process_transaction_mutations(binary_transaction, storage_teams, logs_by_id, acc) do
523
    case Transaction.stream_mutations(binary_transaction) do
9✔
524
      {:ok, mutations_stream} ->
525
        case process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
9✔
526
          {:ok, updated_acc} ->
9✔
527
            {:cont, {:ok, updated_acc}}
528

NEW
529
          {:error, reason} ->
×
530
            {:halt, {:error, reason}}
531
        end
532

NEW
533
      {:error, reason} ->
×
534
        {:halt, {:error, {:mutation_extraction_failed, reason}}}
535
    end
536
  end
537

538
  @spec process_mutations_for_transaction(
539
          Enumerable.t(),
540
          [StorageTeamDescriptor.t()],
541
          %{Log.id() => [Bedrock.range_tag()]},
542
          %{Log.id() => [term()]}
543
        ) ::
544
          {:ok, %{Log.id() => [term()]}} | {:error, term()}
545
  defp process_mutations_for_transaction(mutations_stream, storage_teams, logs_by_id, acc) do
546
    Enum.reduce_while(mutations_stream, {:ok, acc}, fn mutation, {:ok, mutations_acc} ->
9✔
547
      distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc)
9✔
548
    end)
549
  end
550

551
  @spec distribute_mutation_to_logs(
552
          term(),
553
          [StorageTeamDescriptor.t()],
554
          %{Log.id() => [Bedrock.range_tag()]},
555
          %{Log.id() => [term()]}
556
        ) ::
557
          {:cont, {:ok, %{Log.id() => [term()]}}}
558
          | {:halt, {:error, term()}}
559
  defp distribute_mutation_to_logs(mutation, storage_teams, logs_by_id, mutations_acc) do
560
    key_or_range = mutation_to_key_or_range(mutation)
9✔
561

562
    case key_or_range_to_tags(key_or_range, storage_teams) do
9✔
NEW
563
      {:ok, []} ->
×
564
        {:halt, {:error, {:storage_team_coverage_error, key_or_range}}}
565

566
      {:ok, affected_tags} ->
567
        affected_logs = find_logs_for_tags(affected_tags, logs_by_id)
9✔
568

569
        updated_acc =
9✔
570
          Enum.reduce(affected_logs, mutations_acc, fn log_id, acc_inner ->
571
            Map.update!(acc_inner, log_id, &[mutation | &1])
10✔
572
          end)
573

574
        {:cont, {:ok, updated_acc}}
575
    end
576
  end
577

578
  @doc """
579
  Extracts the key or range affected by a mutation using pattern matching.
580

581
  ## Parameters
582
    - `mutation`: A tuple representing the mutation operation
583

584
  ## Returns
585
    - For `{:set, key, value}` -> `key`
586
    - For `{:clear, key}` -> `key`
587
    - For `{:clear_range, start_key, end_key}` -> `{start_key, end_key}`
588

589
  ## Examples
590
      iex> mutation_to_key_or_range({:set, "hello", "world"})
591
      "hello"
592

593
      iex> mutation_to_key_or_range({:clear_range, "a", "z"})
594
      {"a", "z"}
595
  """
596
  @spec mutation_to_key_or_range(
597
          {:set, Bedrock.key(), Bedrock.value()}
598
          | {:clear, Bedrock.key()}
599
          | {:clear_range, Bedrock.key(), Bedrock.key()}
600
        ) ::
601
          Bedrock.key() | {Bedrock.key(), Bedrock.key()}
602
  def mutation_to_key_or_range({:set, key, _value}), do: key
9✔
603
  def mutation_to_key_or_range({:clear, key}), do: key
1✔
604
  def mutation_to_key_or_range({:clear_range, start_key, end_key}), do: {start_key, end_key}
2✔
605

606
  @doc """
607
  Maps a key or range to all storage team tags that are affected by it.
608

609
  For single keys, uses existing `key_to_tags/2` logic.
610
  For ranges, finds all storage teams that intersect with the range.
611

612
  ## Parameters
613
    - `key_or_range`: Either a single key or a `{start_key, end_key}` tuple
614
    - `storage_teams`: List of storage team descriptors
615

616
  ## Returns
617
    - `{:ok, [tag]}` with list of affected tags
618

619
  ## Examples
620
      iex> key_or_range_to_tags("hello", storage_teams)
621
      {:ok, [:team1]}
622

623
      iex> key_or_range_to_tags({"a", "z"}, storage_teams)
624
      {:ok, [:team1, :team2]}
625
  """
626
  @spec key_or_range_to_tags(Bedrock.key() | {Bedrock.key(), Bedrock.key()}, [
627
          StorageTeamDescriptor.t()
628
        ]) ::
629
          {:ok, [Bedrock.range_tag()]}
630
  def key_or_range_to_tags(key, storage_teams) when is_binary(key), do: key_to_tags(key, storage_teams)
16✔
631

632
  def key_or_range_to_tags({start_key, end_key}, storage_teams) do
633
    tags =
5✔
634
      storage_teams
635
      |> Enum.filter(fn %{key_range: {team_start, team_end}} ->
636
        ranges_intersect?(start_key, end_key, team_start, team_end)
14✔
637
      end)
638
      |> Enum.map(fn %{tag: tag} -> tag end)
11✔
639

640
    {:ok, tags}
641
  end
642

643
  @doc """
644
  Finds all logs whose tag sets intersect with the given tags.
645

646
  ## Parameters
647
    - `tags`: List of storage team tags
648
    - `logs_by_id`: Map of log_id -> list of tags covered by that log
649

650
  ## Returns
651
    - List of log IDs whose tag sets intersect with the given tags
652

653
  ## Examples
654
      iex> find_logs_for_tags([:tag1, :tag2], %{log1: [:tag1, :tag3], log2: [:tag2, :tag4]})
655
      [:log1, :log2]
656
  """
657
  @spec find_logs_for_tags([Bedrock.range_tag()], %{Log.id() => [Bedrock.range_tag()]}) ::
658
          [Log.id()]
659
  def find_logs_for_tags(tags, logs_by_id) do
660
    tag_set = MapSet.new(tags)
14✔
661

662
    logs_by_id
663
    |> Enum.filter(fn {_log_id, log_tags} ->
664
      log_tag_set = MapSet.new(log_tags)
29✔
665
      not MapSet.disjoint?(tag_set, log_tag_set)
29✔
666
    end)
667
    |> Enum.map(fn {log_id, _log_tags} -> log_id end)
14✔
668
  end
669

670
  @spec ranges_intersect?(
671
          Bedrock.key(),
672
          Bedrock.key() | :end,
673
          Bedrock.key(),
674
          Bedrock.key() | :end
675
        ) ::
676
          boolean()
677
  defp ranges_intersect?(_start1, :end, _start2, :end), do: true
1✔
678
  defp ranges_intersect?(start1, :end, _start2, end2), do: start1 < end2
2✔
679
  defp ranges_intersect?(_start1, end1, start2, :end), do: end1 > start2
3✔
680
  defp ranges_intersect?(start1, end1, start2, end2), do: start1 < end2 and end1 > start2
8✔
681

682
  @doc """
683
  Maps a key to all storage team tags that contain it.
684

685
  Searches through storage teams to find all teams whose key ranges contain
686
  the given key. A key can belong to multiple overlapping teams.
687
  Uses lexicographic ordering where a key belongs to a team
688
  if it falls within [start_key, end_key) or [start_key, :end).
689

690
  ## Parameters
691
    - `key`: The key to map to storage teams
692
    - `storage_teams`: List of storage team descriptors
693

694
  ## Returns
695
    - `{:ok, [tag]}` with list of matching tags (may be empty)
696

697
  ## Examples
698
      iex> teams = [%{tag: :team1, key_range: {"a", "m"}}, %{tag: :team2, key_range: {"h", "z"}}]
699
      iex> key_to_tags("hello", teams)
700
      {:ok, [:team1, :team2]}
701
  """
702
  @spec key_to_tags(Bedrock.key(), [StorageTeamDescriptor.t()]) ::
703
          {:ok, [Bedrock.range_tag()]}
704
  def key_to_tags(key, storage_teams) do
705
    tags =
37✔
706
      storage_teams
707
      |> Enum.filter(fn %{key_range: {start_key, end_key}} ->
708
        key_in_range?(key, start_key, end_key)
82✔
709
      end)
710
      |> Enum.map(fn %{tag: tag} -> tag end)
44✔
711

712
    {:ok, tags}
713
  end
714

715
  @doc """
716
  Maps a key to its corresponding storage team tag (legacy function).
717

718
  Searches through storage teams to find which team's key range contains
719
  the given key. Uses lexicographic ordering where a key belongs to a team
720
  if it falls within [start_key, end_key) or [start_key, :end).
721

722
  This function returns the first matching tag for backward compatibility.
723
  Consider using `key_to_tags/2` for multi-tag support.
724

725
  ## Parameters
726
    - `key`: The key to map to a storage team
727
    - `storage_teams`: List of storage team descriptors
728

729
  ## Returns
730
    - `{:ok, tag}` if a matching storage team is found
731
    - `{:error, :no_matching_team}` if no team covers the key
732

733
  ## Examples
734
      iex> teams = [%{tag: :team1, key_range: {"a", "m"}}, %{tag: :team2, key_range: {"m", :end}}]
735
      iex> key_to_tag("hello", teams)
736
      {:ok, :team1}
737
  """
738
  @spec key_to_tag(Bedrock.key(), [StorageTeamDescriptor.t()]) ::
739
          {:ok, Bedrock.range_tag()} | {:error, :no_matching_team}
740
  def key_to_tag(key, storage_teams) do
741
    case key_to_tags(key, storage_teams) do
12✔
742
      {:ok, [tag | _]} -> {:ok, tag}
11✔
743
      {:ok, []} -> {:error, :no_matching_team}
1✔
744
    end
745
  end
746

747
  @spec key_in_range?(Bedrock.key(), Bedrock.key(), Bedrock.key() | :end) :: boolean()
748
  defp key_in_range?(key, start_key, :end), do: key >= start_key
28✔
749
  defp key_in_range?(key, start_key, end_key), do: key >= start_key and key < end_key
54✔
750

751
  @spec push_to_logs(FinalizationPlan.t(), TransactionSystemLayout.t(), keyword()) ::
752
          FinalizationPlan.t()
753
  defp push_to_logs(%FinalizationPlan{stage: :failed} = plan, _layout, _opts), do: plan
2✔
754

755
  defp push_to_logs(%FinalizationPlan{stage: :ready_for_logging} = plan, layout, opts) do
756
    batch_log_push_fn = Keyword.get(opts, :batch_log_push_fn, &push_transaction_to_logs_direct/5)
11✔
757

758
    case batch_log_push_fn.(
11✔
759
           layout,
760
           plan.last_commit_version,
11✔
761
           plan.transactions_by_log,
11✔
762
           plan.commit_version,
11✔
763
           opts
764
         ) do
765
      :ok ->
766
        %{plan | stage: :logged}
9✔
767

768
      {:error, reason} ->
769
        %{plan | error: reason, stage: :failed}
2✔
770
    end
771
  end
772

773
  @spec resolve_log_descriptors(%{Log.id() => term()}, %{term() => ServiceDescriptor.t()}) :: %{
774
          Log.id() => ServiceDescriptor.t()
775
        }
776
  def resolve_log_descriptors(log_descriptors, services) do
777
    log_descriptors
778
    |> Map.keys()
779
    |> Enum.map(&{&1, Map.get(services, &1)})
13✔
780
    |> Enum.reject(&is_nil(elem(&1, 1)))
13✔
781
    |> Map.new()
9✔
782
  end
783

784
  @spec try_to_push_transaction_to_log(
785
          ServiceDescriptor.t(),
786
          binary(),
787
          Bedrock.version()
788
        ) ::
789
          :ok | {:error, :unavailable}
790
  def try_to_push_transaction_to_log(%{kind: :log, status: {:up, log_server}}, transaction, last_commit_version) do
791
    Log.push(log_server, transaction, last_commit_version)
15✔
792
  end
793

794
  def try_to_push_transaction_to_log(_, _, _), do: {:error, :unavailable}
1✔
795

796
  @doc """
797
  Pushes transactions directly to logs and waits for acknowledgement from ALL log servers.
798

799
  This function takes transactions that have already been built per log and pushes them
800
  to the appropriate log servers. Each log receives its pre-built transaction.
801
  All logs must acknowledge to maintain durability guarantees.
802

803
  ## Parameters
804

805
    - `transaction_system_layout`: Contains configuration information about the
806
      transaction system, including available log servers.
807
    - `last_commit_version`: The last known committed version; used to
808
      ensure consistency in log ordering.
809
    - `transactions_by_log`: Map of log_id to transaction for that log.
810
      May be empty transactions if all transactions were aborted.
811
    - `commit_version`: The version assigned by the sequencer for this batch.
812
    - `opts`: Optional configuration for testing and customization.
813

814
  ## Options
815
    - `:async_stream_fn` - Function for parallel processing (default: Task.async_stream/3)
816
    - `:log_push_fn` - Function for pushing to individual logs (default: try_to_push_transaction_to_log/3)
817
    - `:timeout` - Timeout for log push operations (default: 5_000ms)
818

819
  ## Returns
820
    - `:ok` if acknowledgements have been received from ALL log servers.
821
    - `{:error, log_push_error()}` if any log has not successfully acknowledged the
822
       push within the timeout period or other errors occur.
823
  """
824
  @spec push_transaction_to_logs_direct(
825
          TransactionSystemLayout.t(),
826
          last_commit_version :: Bedrock.version(),
827
          %{Log.id() => Transaction.encoded()},
828
          commit_version :: Bedrock.version(),
829
          opts :: [
830
            async_stream_fn: async_stream_fn(),
831
            log_push_fn: log_push_single_fn(),
832
            timeout: non_neg_integer()
833
          ]
834
        ) :: :ok | {:error, log_push_error()}
835
  def push_transaction_to_logs_direct(
836
        transaction_system_layout,
837
        last_commit_version,
838
        transactions_by_log,
839
        _commit_version,
UNCOV
840
        opts \\ []
×
841
      ) do
842
    async_stream_fn = Keyword.get(opts, :async_stream_fn, &Task.async_stream/3)
9✔
843
    log_push_fn = Keyword.get(opts, :log_push_fn, &try_to_push_transaction_to_log/3)
9✔
844
    timeout = Keyword.get(opts, :timeout, 5_000)
9✔
845

846
    logs_by_id = transaction_system_layout.logs
9✔
847
    required_acknowledgments = map_size(logs_by_id)
9✔
848

849
    resolved_logs = resolve_log_descriptors(logs_by_id, transaction_system_layout.services)
9✔
850

851
    stream_result =
9✔
852
      async_stream_fn.(
853
        resolved_logs,
854
        fn {log_id, service_descriptor} ->
855
          encoded_transaction = Map.get(transactions_by_log, log_id)
12✔
856
          result = log_push_fn.(service_descriptor, encoded_transaction, last_commit_version)
12✔
857
          {log_id, result}
858
        end,
859
        timeout: timeout
860
      )
861

862
    stream_result
863
    |> Enum.reduce_while({0, []}, fn
864
      {:ok, {log_id, {:error, reason}}}, {_count, errors} ->
2✔
865
        {:halt, {:error, [{log_id, reason} | errors]}}
866

867
      {:ok, {_log_id, :ok}}, {count, errors} ->
868
        count = 1 + count
11✔
869

870
        if count == required_acknowledgments do
11✔
871
          {:halt, {:ok, count}}
872
        else
873
          {:cont, {count, errors}}
874
        end
875

876
      {:exit, {log_id, reason}}, {_count, errors} ->
×
877
        {:halt, {:error, [{log_id, reason} | errors]}}
878
    end)
879
    |> case do
9✔
880
      {:ok, ^required_acknowledgments} ->
7✔
881
        :ok
882

883
      {:error, errors} ->
2✔
884
        {:error, {:log_failures, errors}}
885

UNCOV
886
      {count, errors} when count < required_acknowledgments ->
×
887
        {:error, {:insufficient_acknowledgments, count, required_acknowledgments, errors}}
888

889
      _other ->
×
890
        {:error, :log_push_failed}
891
    end
892
  end
893

894
  @spec notify_sequencer(FinalizationPlan.t(), Sequencer.ref()) ::
895
          FinalizationPlan.t()
896
  defp notify_sequencer(%FinalizationPlan{stage: :failed} = plan, _sequencer), do: plan
4✔
897

898
  defp notify_sequencer(%FinalizationPlan{stage: :logged} = plan, sequencer) do
899
    :ok = Sequencer.report_successful_commit(sequencer, plan.commit_version)
9✔
900
    %{plan | stage: :sequencer_notified}
9✔
901
  end
902

903
  @spec notify_successes(FinalizationPlan.t(), keyword()) :: FinalizationPlan.t()
904
  defp notify_successes(%FinalizationPlan{stage: :failed} = plan, _opts), do: plan
4✔
905

906
  defp notify_successes(%FinalizationPlan{stage: :sequencer_notified} = plan, opts) do
907
    success_reply_fn = Keyword.get(opts, :success_reply_fn, &send_reply_with_commit_version/2)
9✔
908

909
    successful_indices = get_successful_indices(plan)
9✔
910
    unreplied_success_indices = MapSet.difference(successful_indices, plan.replied_indices)
9✔
911

912
    unreplied_successes =
9✔
913
      filter_replies_by_indices(plan.successful_replies, unreplied_success_indices)
9✔
914

915
    success_reply_fn.(unreplied_successes, plan.commit_version)
9✔
916

917
    updated_replied_indices = MapSet.union(plan.replied_indices, successful_indices)
9✔
918

919
    %{plan | replied_indices: updated_replied_indices, stage: :completed}
9✔
920
  end
921

922
  @spec send_reply_with_commit_version([Batch.reply_fn()], Bedrock.version()) ::
923
          :ok
924
  def send_reply_with_commit_version(oks, commit_version), do: Enum.each(oks, & &1.({:ok, commit_version}))
9✔
925

926
  @spec get_successful_indices(FinalizationPlan.t()) :: MapSet.t()
927
  defp get_successful_indices(plan) do
928
    aborted_set = MapSet.new(plan.aborted_indices)
9✔
929
    transaction_count = length(plan.transactions)
9✔
930

931
    if transaction_count > 0 do
9✔
932
      all_indices = MapSet.new(0..(transaction_count - 1))
6✔
933
      MapSet.difference(all_indices, aborted_set)
6✔
934
    else
935
      MapSet.new()
3✔
936
    end
937
  end
938

939
  @spec filter_replies_by_indices([Batch.reply_fn()], MapSet.t()) :: [Batch.reply_fn()]
940
  defp filter_replies_by_indices(replies, indices_set) do
941
    replies
942
    |> Enum.with_index()
943
    |> Enum.filter(fn {_reply_fn, idx} -> MapSet.member?(indices_set, idx) end)
7✔
944
    |> Enum.map(fn {reply_fn, _idx} -> reply_fn end)
9✔
945
  end
946

947
  @spec extract_result_or_handle_error(FinalizationPlan.t(), keyword()) ::
948
          {:ok, non_neg_integer(), non_neg_integer()} | {:error, finalization_error()}
949
  defp extract_result_or_handle_error(%FinalizationPlan{stage: :completed} = plan, _opts) do
950
    n_aborts = length(plan.aborted_replies)
9✔
951
    n_successes = length(plan.successful_replies)
9✔
952
    {:ok, n_aborts, n_successes}
9✔
953
  end
954

955
  defp extract_result_or_handle_error(%FinalizationPlan{stage: :failed} = plan, opts) do
956
    handle_error(plan, opts)
4✔
957
  end
958

959
  @spec handle_error(FinalizationPlan.t(), keyword()) :: {:error, finalization_error()}
960
  defp handle_error(%FinalizationPlan{error: error} = plan, opts) when not is_nil(error) do
961
    abort_reply_fn =
4✔
962
      Keyword.get(opts, :abort_reply_fn, &reply_to_all_clients_with_aborted_transactions/1)
963

964
    all_indices = MapSet.new(0..(length(plan.transactions) - 1))
4✔
965
    unreplied_indices = MapSet.difference(all_indices, plan.replied_indices)
4✔
966

967
    unreplied_replies =
4✔
968
      plan.transactions
4✔
969
      |> Enum.with_index()
970
      |> Enum.filter(fn {_transaction, idx} -> MapSet.member?(unreplied_indices, idx) end)
4✔
971
      |> Enum.map(fn {{reply_fn, _transaction}, _idx} -> reply_fn end)
4✔
972

973
    abort_reply_fn.(unreplied_replies)
4✔
974

975
    {:error, plan.error}
4✔
976
  end
977
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc