• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 984a90d50e5214a5922f87eaa21b526a5950cda6-PR-43

06 Sep 2025 09:12PM UTC coverage: 63.695% (+0.02%) from 63.674%
984a90d50e5214a5922f87eaa21b526a5950cda6-PR-43

Pull #43

github

jallum
Merge remote-tracking branch 'origin/develop' into feature/rework_commit_proxy
Pull Request #43: Add conflict sharding with async resolver assignment

68 of 99 new or added lines in 8 files covered. (68.69%)

3 existing lines in 2 files now uncovered.

3265 of 5126 relevant lines covered (63.69%)

620.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.7
/lib/bedrock/data_plane/commit_proxy/batching.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Batching do
2
  @moduledoc false
3

4
  import Bedrock.DataPlane.CommitProxy.Batch,
5
    only: [new_batch: 3, add_transaction: 4, set_finalized_at: 2]
6

7
  import Bedrock.DataPlane.Sequencer, only: [next_commit_version: 1]
8

9
  alias Bedrock.DataPlane.CommitProxy.Batch
10
  alias Bedrock.DataPlane.CommitProxy.ConflictSharding
11
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
12
  alias Bedrock.DataPlane.CommitProxy.State
13
  alias Bedrock.DataPlane.Transaction
14

15
  @spec timestamp() :: Bedrock.timestamp_in_ms()
16
  defp timestamp, do: :erlang.monotonic_time(:millisecond)
×
17

18
  @spec single_transaction_batch(
19
          state :: State.t(),
20
          transaction :: Transaction.encoded(),
21
          reply_fn :: Batch.reply_fn()
22
        ) ::
23
          {:ok, Batch.t()}
24
          | {:error, :sequencer_unavailable}
25
  def single_transaction_batch(t, transaction, reply_fn \\ fn _result -> :ok end)
1✔
26

27
  def single_transaction_batch(%{transaction_system_layout: %{sequencer: nil}}, _transaction, _reply_fn),
1✔
28
    do: {:error, :sequencer_unavailable}
29

30
  def single_transaction_batch(state, transaction, reply_fn) when is_binary(transaction) do
31
    case next_commit_version(state.transaction_system_layout.sequencer) do
×
32
      {:ok, last_commit_version, commit_version} ->
33
        # Create task for resolver assignment (empty transactions still need processing)
NEW
34
        task = create_resolver_task(transaction, state.precomputed_layout)
×
35

36
        {:ok,
37
         timestamp()
38
         |> new_batch(last_commit_version, commit_version)
39
         |> add_transaction(transaction, reply_fn, task)
40
         |> set_finalized_at(timestamp())}
41

42
      {:error, :unavailable} ->
×
43
        {:error, :sequencer_unavailable}
44
    end
45
  end
46

47
  @spec create_resolver_task(Transaction.encoded(), LayoutOptimization.precomputed_layout()) :: Task.t()
48
  def create_resolver_task(transaction, %{resolver_refs: [single_ref], resolver_ends: _ends}) do
NEW
49
    Task.async(fn ->
×
NEW
50
      sections = Transaction.extract_sections!(transaction, [:read_conflicts, :write_conflicts])
×
NEW
51
      %{single_ref => sections}
×
52
    end)
53
  end
54

55
  def create_resolver_task(transaction, %{resolver_refs: refs, resolver_ends: ends}) do
NEW
56
    Task.async(fn ->
×
NEW
57
      sections = Transaction.extract_sections!(transaction, [:read_conflicts, :write_conflicts])
×
NEW
58
      ConflictSharding.shard_conflicts_across_resolvers(sections, ends, refs)
×
59
    end)
60
  end
61

62
  @spec start_batch_if_needed(State.t()) :: State.t() | {:error, term()}
63
  def start_batch_if_needed(%{batch: nil} = t) do
64
    case next_commit_version(t.transaction_system_layout.sequencer) do
×
65
      {:ok, last_commit_version, commit_version} ->
66
        %{t | batch: new_batch(timestamp(), last_commit_version, commit_version)}
×
67

68
      {:error, reason} ->
×
69
        {:error, {:sequencer_unavailable, reason}}
70
    end
71
  end
72

73
  def start_batch_if_needed(t), do: t
×
74

75
  @spec add_transaction_to_batch(State.t(), Transaction.encoded(), Batch.reply_fn(), Task.t()) ::
76
          State.t()
77
  def add_transaction_to_batch(t, transaction, reply_fn, task) when is_binary(transaction),
NEW
78
    do: %{t | batch: add_transaction(t.batch, transaction, reply_fn, task)}
×
79

80
  @spec apply_finalization_policy(State.t()) ::
81
          {State.t(), batch_to_finalize :: Batch.t()} | {State.t(), nil}
82
  def apply_finalization_policy(t) do
83
    now = timestamp()
×
84

85
    if max_latency?(t.batch, now, t.max_latency_in_ms) or
×
86
         max_transactions?(t.batch, t.max_per_batch) do
×
87
      {%{t | batch: nil}, set_finalized_at(t.batch, now)}
×
88
    else
89
      {t, nil}
90
    end
91
  end
92

93
  @spec max_latency?(
94
          Batch.t(),
95
          now :: Bedrock.timestamp_in_ms(),
96
          max_latency_in_ms :: pos_integer()
97
        ) :: boolean()
98
  defp max_latency?(batch, now, max_latency_in_ms), do: batch.started_at + max_latency_in_ms < now
×
99

100
  @spec max_transactions?(Batch.t(), max_per_batch :: pos_integer()) :: boolean()
101
  defp max_transactions?(batch, max_per_batch), do: batch.n_transactions >= max_per_batch
×
102
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc