• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / fb3defeb951c4b0ad9552be01797f0e4e04d962f-PR-43

03 Sep 2025 03:45AM UTC coverage: 63.433% (+0.005%) from 63.428%
fb3defeb951c4b0ad9552be01797f0e4e04d962f-PR-43

Pull #43

github

jallum
Add conflict sharding with async resolver assignment

Move expensive conflict distribution off critical path by computing resolver
assignments asynchronously during batching instead of blocking transaction
acceptance. Improves throughput and reduces latency.

- Add ConflictSharding module to distribute conflicts by key range
- Implement async Task-based resolver assignment in batching
- Add LayoutOptimization for precomputed static structures
- Update Batch and finalization pipeline for Task integration
- Extract KeyRange.overlap? utility, remove ResolutionPlan
Pull Request #43: Add conflict sharding with async resolver assignment

63 of 95 new or added lines in 10 files covered. (66.32%)

2 existing lines in 2 files now uncovered.

3027 of 4772 relevant lines covered (63.43%)

615.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.7
/lib/bedrock/data_plane/commit_proxy/batching.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Batching do
2
  @moduledoc false
3

4
  import Bedrock.DataPlane.CommitProxy.Batch,
5
    only: [new_batch: 3, add_transaction: 4, set_finalized_at: 2]
6

7
  import Bedrock.DataPlane.Sequencer, only: [next_commit_version: 1]
8

9
  alias Bedrock.DataPlane.CommitProxy.Batch
10
  alias Bedrock.DataPlane.CommitProxy.ConflictSharding
11
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
12
  alias Bedrock.DataPlane.CommitProxy.State
13
  alias Bedrock.DataPlane.Transaction
14

15
  @spec timestamp() :: Bedrock.timestamp_in_ms()
16
  defp timestamp, do: :erlang.monotonic_time(:millisecond)
×
17

18
  @spec single_transaction_batch(
19
          state :: State.t(),
20
          transaction :: Transaction.encoded(),
21
          reply_fn :: Batch.reply_fn()
22
        ) ::
23
          {:ok, Batch.t()}
24
          | {:error, :sequencer_unavailable}
25
  def single_transaction_batch(t, transaction, reply_fn \\ fn _result -> :ok end)
1✔
26

27
  def single_transaction_batch(%{transaction_system_layout: %{sequencer: nil}}, _transaction, _reply_fn),
1✔
28
    do: {:error, :sequencer_unavailable}
29

30
  def single_transaction_batch(state, transaction, reply_fn) when is_binary(transaction) do
31
    case next_commit_version(state.transaction_system_layout.sequencer) do
×
32
      {:ok, last_commit_version, commit_version} ->
33
        # Create task for resolver assignment (empty transactions still need processing)
NEW
34
        task = create_resolver_task(transaction, state.precomputed_layout)
×
35

36
        {:ok,
37
         timestamp()
38
         |> new_batch(last_commit_version, commit_version)
39
         |> add_transaction(transaction, reply_fn, task)
40
         |> set_finalized_at(timestamp())}
41

42
      {:error, :unavailable} ->
×
43
        {:error, :sequencer_unavailable}
44
    end
45
  end
46

47
  @spec create_resolver_task(Transaction.encoded(), LayoutOptimization.precomputed_layout()) :: Task.t()
48
  def create_resolver_task(transaction, %{resolver_refs: [single_ref], resolver_ends: _ends}) do
NEW
49
    Task.async(fn ->
×
NEW
50
      sections = Transaction.extract_sections!(transaction, [:read_conflicts, :write_conflicts])
×
NEW
51
      %{single_ref => sections}
×
52
    end)
53
  end
54

55
  def create_resolver_task(transaction, %{resolver_refs: refs, resolver_ends: ends}) do
NEW
56
    Task.async(fn ->
×
NEW
57
      sections = Transaction.extract_sections!(transaction, [:read_conflicts, :write_conflicts])
×
NEW
58
      ConflictSharding.shard_conflicts_across_resolvers(sections, ends, refs)
×
59
    end)
60
  end
61

62
  @spec start_batch_if_needed(State.t()) :: State.t() | {:error, term()}
63
  def start_batch_if_needed(%{batch: nil} = t) do
64
    case next_commit_version(t.transaction_system_layout.sequencer) do
×
65
      {:ok, last_commit_version, commit_version} ->
66
        %{t | batch: new_batch(timestamp(), last_commit_version, commit_version)}
×
67

68
      {:error, reason} ->
×
69
        {:error, {:sequencer_unavailable, reason}}
70
    end
71
  end
72

73
  def start_batch_if_needed(t), do: t
×
74

75
  @spec add_transaction_to_batch(State.t(), Transaction.encoded(), Batch.reply_fn(), Task.t()) ::
76
          State.t()
77
  def add_transaction_to_batch(t, transaction, reply_fn, task) when is_binary(transaction),
NEW
78
    do: %{t | batch: add_transaction(t.batch, transaction, reply_fn, task)}
×
79

80
  @spec apply_finalization_policy(State.t()) ::
81
          {State.t(), batch_to_finalize :: Batch.t()} | {State.t(), nil}
82
  def apply_finalization_policy(t) do
83
    now = timestamp()
×
84

85
    if max_latency?(t.batch, now, t.max_latency_in_ms) or
×
86
         max_transactions?(t.batch, t.max_per_batch) do
×
87
      {%{t | batch: nil}, set_finalized_at(t.batch, now)}
×
88
    else
89
      {t, nil}
90
    end
91
  end
92

93
  @spec max_latency?(
94
          Batch.t(),
95
          now :: Bedrock.timestamp_in_ms(),
96
          max_latency_in_ms :: pos_integer()
97
        ) :: boolean()
98
  defp max_latency?(batch, now, max_latency_in_ms), do: batch.started_at + max_latency_in_ms < now
×
99

100
  @spec max_transactions?(Batch.t(), max_per_batch :: pos_integer()) :: boolean()
101
  defp max_transactions?(batch, max_per_batch), do: batch.n_transactions >= max_per_batch
×
102
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc