• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

satoren / y_ex / c3d6e4faa0a084600cbce820b1eaa0d8ca8f37b0

08 Apr 2026 04:59PM UTC coverage: 98.837% (+0.2%) from 98.675%
c3d6e4faa0a084600cbce820b1eaa0d8ca8f37b0

push

github

web-flow
perf: fast-path decode for v1 sync/awareness messages (#225)

route process_message_v1 directly to Yex.Nif.sync_message_decode_v1

split sync_step2/sync_update handling into dedicated function heads

add Rust fast-path parser for v1 tags with fallback to DecoderV1

verified with mix test doc_server_test.exs

2 of 2 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

595 of 602 relevant lines covered (98.84%)

26.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.22
/lib/server/doc_server_worker.ex
1
defmodule Yex.DocServer.Worker do
2
  @moduledoc false
3
  use GenServer, restart: :temporary
4
  require Logger
5

6
  alias Yex.DocServer.State
7

8
  alias Yex.{Doc, Awareness}
9

10
  # MSG_QUERY_AWARENESS (<<3>>) — bypass message_decode NIF
11
  @query_awareness_call :__yex_query_awareness
12
  @sync_step1_raw_call :__yex_sync_step1_raw
13

14
  @query_awareness_message <<3>>
15
  def process_message_v1(server, @query_awareness_message, _origin) do
16
    GenServer.call(server, @query_awareness_call)
4✔
17
  end
18

19
  # MSG_SYNC (0) + MSG_SYNC_STEP_1 (0) — bypass message_decode NIF, pass sv_payload directly
20
  def process_message_v1(server, <<0, 0, sv_payload::binary>>, _origin) do
21
    GenServer.call(server, {@sync_step1_raw_call, sv_payload})
11✔
22
  end
23

24
  def process_message_v1(server, message, origin) do
25
    case Yex.Nif.sync_message_decode_v1(message) do
21✔
26
      {:ok, message} ->
27
        message_v1(server, message, origin)
20✔
28

29
      error ->
30
        error
1✔
31
    end
32
  end
33

34
  #  defp message_v1(server, {:sync, {:sync_step1, encoded_state_vector}}, origin) do
35
  #    GenServer.call(server, {__MODULE__, :document_sync_step1, encoded_state_vector, origin})
36
  #  end
37

38
  defp message_v1(server, {:sync, {:sync_step2, encoded_diff}}, origin) do
39
    GenServer.cast(server, {__MODULE__, :document_update, encoded_diff, origin})
11✔
40
  end
41

42
  defp message_v1(server, {:sync, {:sync_update, encoded_diff}}, origin) do
UNCOV
43
    GenServer.cast(server, {__MODULE__, :document_update, encoded_diff, origin})
×
44
  end
45

46
  defp message_v1(server, {:awareness, awareness}, origin) do
47
    GenServer.cast(server, {__MODULE__, :awareness_update, awareness, origin})
8✔
48
  end
49

50
  #  defp message_v1(server, :query_awareness, _origin) do
51
  #    GenServer.call(server, @query_awareness_call)
52
  #  end
53

54
  defp message_v1(_server, _message, _origin) do
1✔
55
    {:error, :unknown_message}
56
  end
57

58
  ## Callbacks
59

60
  @impl true
61
  def init(arg) do
62
    module = Keyword.fetch!(arg, :module)
37✔
63
    option = Keyword.get(arg, :doc_option, nil)
37✔
64
    assigns = Keyword.get(arg, :assigns, %{})
37✔
65
    doc = if option, do: Doc.with_options(option), else: Doc.new()
37✔
66

67
    if function_exported?(module, :handle_update_v1, 4) do
37✔
68
      Doc.monitor_update_v1(doc, metadata: __MODULE__)
25✔
69
    end
70

71
    awareness = setup_awareness(doc, module)
37✔
72

73
    module.init(arg, %State{
37✔
74
      assigns: assigns,
75
      doc: doc,
76
      awareness: awareness,
77
      module: module
78
    })
79
  end
80

81
  defp setup_awareness(doc, module) do
82
    if function_exported?(module, :handle_awareness_change, 4) or
37✔
83
         function_exported?(module, :handle_awareness_update, 4) do
24✔
84
      case Awareness.new(doc) do
23✔
85
        {:ok, awareness} ->
86
          monitor_awareness_events(awareness, module)
23✔
87

88
          awareness
23✔
89
      end
90
    end
91
  end
92

93
  defp monitor_awareness_events(awareness, module) do
94
    if function_exported?(module, :handle_awareness_change, 4) do
23✔
95
      Awareness.monitor_change(awareness, metadata: __MODULE__)
13✔
96
    end
97

98
    if function_exported?(module, :handle_awareness_update, 4) do
23✔
99
      Awareness.monitor_update(awareness, metadata: __MODULE__)
10✔
100
    end
101
  end
102

103
  @impl true
104
  def handle_call(
105
        {@sync_step1_raw_call, sv_payload},
106
        _from,
107
        %{doc: doc, awareness: awareness} = state
108
      ) do
109
    {:reply, Yex.Nif.encode_sync_step1_response_v1(doc, nil, sv_payload, awareness), state}
11✔
110
  end
111

112
  @impl true
113
  def handle_call(
114
        @query_awareness_call,
115
        _from,
116
        %{awareness: nil} = state
117
      ) do
118
    {:reply, {:ok, []}, state}
1✔
119
  end
120

121
  @impl true
122
  def handle_call(
123
        @query_awareness_call,
124
        _from,
125
        %{awareness: awareness} = state
126
      ) do
127
    {:reply, Yex.Nif.encode_awareness_reply_v1(awareness), state}
3✔
128
  end
129

130
  @impl true
131
  def handle_call(
132
        {Yex.Doc, :run, fun},
133
        _from,
134
        state
135
      ) do
136
    {:reply, fun.(), state}
40✔
137
  end
138

139
  @impl true
140
  def handle_call(request, from, %{module: module} = state) do
141
    module.handle_call(request, from, state)
29✔
142
  end
143

144
  @impl true
145
  def handle_cast(
146
        {__MODULE__, :document_update, update, origin},
147
        %{doc: doc} = state
148
      ) do
149
    Yex.Doc.transaction(doc, origin, fn ->
10✔
150
      case Yex.apply_update(doc, update) do
10✔
151
        :ok ->
9✔
152
          :ok
153

154
        {:error, reason} ->
155
          Logger.log(:warning, inspect(reason))
1✔
156
          :ok
157
      end
158
    end)
159

160
    # Process update messages immediately
161
    handle_update_v1_immediately(state)
10✔
162
  end
163

164
  @impl true
165
  def handle_cast(
1✔
166
        {__MODULE__, :awareness_update, _message, _origin},
167
        %{awareness: nil} = state
168
      ) do
169
    #    Logger.warning("Received an awareness message, but ignored it because it is not enabled in this module. ")
170
    {:noreply, state}
171
  end
172

173
  @impl true
174
  def handle_cast(
175
        {__MODULE__, :awareness_update, message, origin},
176
        %{awareness: awareness} = state
177
      ) do
178
    Awareness.apply_update(awareness, message, origin)
7✔
179

180
    # Process update messages immediately
181
    handle_awareness_event_immediately(state)
7✔
182
  end
183

184
  @impl true
185
  def handle_cast(request, %{module: module} = state) do
186
    module.handle_cast(request, state)
1✔
187
  end
188

189
  @impl true
190
  def handle_info({:update_v1, update, origin, __MODULE__}, %{module: module, doc: doc} = state) do
191
    module.handle_update_v1(doc, update, origin, state)
19✔
192
  end
193

194
  @impl true
195
  def handle_info(
196
        {:awareness_change, change, origin, __MODULE__},
197
        %{module: module, awareness: awareness} = state
198
      ) do
199
    module.handle_awareness_change(awareness, change, origin, state)
13✔
200
  end
201

202
  @impl true
203
  def handle_info(
204
        {:awareness_update, change, origin, __MODULE__},
205
        %{module: module, awareness: awareness} = state
206
      ) do
207
    module.handle_awareness_update(awareness, change, origin, state)
4✔
208
  end
209

210
  @impl true
211
  def handle_info(msg, %{module: module} = state) do
212
    module.handle_info(msg, state)
9✔
213
  end
214

215
  @impl true
216
  def terminate(reason, %{module: module} = state) do
217
    if function_exported?(module, :terminate, 2) do
6✔
218
      module.terminate(reason, state)
6✔
219
    else
220
      :ok
221
    end
222
  end
223

224
  defp handle_update_v1_immediately(%{doc: doc, module: module} = state) do
225
    receive do
17✔
226
      {:update_v1, update, origin, __MODULE__} ->
227
        case module.handle_update_v1(doc, update, origin, state) do
7✔
228
          {:noreply, state} ->
229
            handle_update_v1_immediately(state)
7✔
230

231
          result ->
232
            result
×
233
        end
234
    after
235
      0 ->
10✔
236
        {:noreply, state}
237
    end
238
  end
239

240
  defp handle_awareness_event_immediately(%{awareness: awareness, module: module} = state) do
241
    receive do
14✔
242
      {:awareness_change, change, origin, __MODULE__} ->
243
        case module.handle_awareness_change(awareness, change, origin, state) do
2✔
244
          {:noreply, state} ->
245
            handle_awareness_event_immediately(state)
2✔
246

247
          result ->
248
            result
×
249
        end
250

251
      {:awareness_update, change, origin, __MODULE__} ->
252
        case module.handle_awareness_update(awareness, change, origin, state) do
5✔
253
          {:noreply, state} ->
254
            handle_awareness_event_immediately(state)
5✔
255

256
          result ->
257
            result
×
258
        end
259
    after
260
      0 ->
7✔
261
        {:noreply, state}
262
    end
263
  end
264
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc