• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jallum / bedrock / 8b8c459f55a92c8826fdfd5212844c818871bfa8

18 Sep 2025 03:40AM UTC coverage: 65.711% (+1.9%) from 63.832%
8b8c459f55a92c8826fdfd5212844c818871bfa8

push

github

jallum
Merge branch 'hotfix/0.3.0-rc3'

3896 of 5929 relevant lines covered (65.71%)

652.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.52
/lib/bedrock/data_plane/commit_proxy/batching.ex
1
defmodule Bedrock.DataPlane.CommitProxy.Batching do
2
  @moduledoc false
3

4
  import Bedrock.DataPlane.CommitProxy.Batch,
5
    only: [new_batch: 3, add_transaction: 4, set_finalized_at: 2]
6

7
  import Bedrock.DataPlane.Sequencer, only: [next_commit_version: 1]
8

9
  alias Bedrock.DataPlane.CommitProxy.Batch
10
  alias Bedrock.DataPlane.CommitProxy.ConflictSharding
11
  alias Bedrock.DataPlane.CommitProxy.LayoutOptimization
12
  alias Bedrock.DataPlane.CommitProxy.State
13
  alias Bedrock.DataPlane.Transaction
14

15
  @spec timestamp() :: Bedrock.timestamp_in_ms()
16
  defp timestamp, do: :erlang.monotonic_time(:millisecond)
141✔
17

18
  @spec single_transaction_batch(
19
          state :: State.t(),
20
          transaction :: Transaction.encoded(),
21
          reply_fn :: Batch.reply_fn()
22
        ) ::
23
          {:ok, Batch.t()}
24
          | {:error, :sequencer_unavailable}
25
  def single_transaction_batch(t, transaction, reply_fn \\ fn _result -> :ok end)
1✔
26

27
  def single_transaction_batch(%{transaction_system_layout: %{sequencer: nil}}, _transaction, _reply_fn),
1✔
28
    do: {:error, :sequencer_unavailable}
29

30
  def single_transaction_batch(state, transaction, reply_fn) when is_binary(transaction) do
31
    case next_commit_version(state.transaction_system_layout.sequencer) do
×
32
      {:ok, last_commit_version, commit_version} ->
×
33
        {:ok,
34
         timestamp()
35
         |> new_batch(last_commit_version, commit_version)
36
         |> add_transaction(transaction, reply_fn, nil)
37
         |> set_finalized_at(timestamp())}
38

39
      {:error, :unavailable} ->
×
40
        {:error, :sequencer_unavailable}
41
    end
42
  end
43

44
  @spec create_resolver_task(Transaction.encoded(), LayoutOptimization.precomputed_layout()) :: Task.t()
45
  def create_resolver_task(transaction, %{resolver_refs: [single_ref], resolver_ends: _ends}) do
46
    Task.async(fn ->
×
47
      sections = Transaction.extract_sections!(transaction, [:read_conflicts, :write_conflicts])
×
48
      %{single_ref => sections}
×
49
    end)
50
  end
51

52
  def create_resolver_task(transaction, %{resolver_refs: refs, resolver_ends: ends}) do
53
    Task.async(fn ->
×
54
      sections = Transaction.extract_sections!(transaction, [:read_conflicts, :write_conflicts])
×
55
      ConflictSharding.shard_conflicts_across_resolvers(sections, ends, refs)
×
56
    end)
57
  end
58

59
  @spec start_batch_if_needed(State.t()) :: State.t() | {:error, term()}
60
  def start_batch_if_needed(%{batch: nil} = t) do
61
    case next_commit_version(t.transaction_system_layout.sequencer) do
26✔
62
      {:ok, last_commit_version, commit_version} ->
63
        %{t | batch: new_batch(timestamp(), last_commit_version, commit_version)}
26✔
64

65
      {:error, reason} ->
×
66
        {:error, {:sequencer_unavailable, reason}}
67
    end
68
  end
69

70
  def start_batch_if_needed(t), do: t
89✔
71

72
  @spec add_transaction_to_batch(State.t(), Transaction.encoded(), Batch.reply_fn(), Task.t() | nil) ::
73
          State.t()
74
  def add_transaction_to_batch(t, transaction, reply_fn, task) when is_binary(transaction),
75
    do: %{t | batch: add_transaction(t.batch, transaction, reply_fn, task)}
115✔
76

77
  @spec apply_finalization_policy(State.t()) ::
78
          {State.t(), batch_to_finalize :: Batch.t()} | {State.t(), nil}
79
  def apply_finalization_policy(t) do
80
    now = timestamp()
115✔
81

82
    if max_latency?(t.batch, now, t.max_latency_in_ms) or
115✔
83
         max_transactions?(t.batch, t.max_per_batch) do
115✔
84
      {%{t | batch: nil}, set_finalized_at(t.batch, now)}
21✔
85
    else
86
      {t, nil}
87
    end
88
  end
89

90
  @spec max_latency?(
91
          Batch.t(),
92
          now :: Bedrock.timestamp_in_ms(),
93
          max_latency_in_ms :: pos_integer()
94
        ) :: boolean()
95
  defp max_latency?(batch, now, max_latency_in_ms), do: batch.started_at + max_latency_in_ms < now
115✔
96

97
  @spec max_transactions?(Batch.t(), max_per_batch :: pos_integer()) :: boolean()
98
  defp max_transactions?(batch, max_per_batch), do: batch.n_transactions >= max_per_batch
115✔
99
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc