• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

whitfin / cachex / 64ffdb79131a3eb9e15e483ea81eedd63160a005-PR-426

30 Oct 2025 04:39AM UTC coverage: 99.042% (-1.0%) from 100.0%
64ffdb79131a3eb9e15e483ea81eedd63160a005-PR-426

Pull #426

github

whitfin
Remove tagging from inspection and commands
Pull Request #426: Simplify and naturalize API signatures and return types

68 of 74 new or added lines in 25 files covered. (91.89%)

2 existing lines in 2 files now uncovered.

827 of 835 relevant lines covered (99.04%)

1054.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.25
/lib/cachex/router.ex
1
defmodule Cachex.Router do
2
  @moduledoc """
3
  Module controlling routing behaviour definitions.
4

5
  This module defines the router implementations for Cachex, allowing the user
6
  to route commands between nodes in a cache cluster. This means that users
7
  can provide their own routing and rebalancing logic without having to depend
8
  on it being included in Cachex.
9
  """
10
  alias Cachex.Router
11
  alias Cachex.Services
12

13
  # add some service aliases
14
  alias Services.Informant
15
  alias Services.Overseer
16

17
  # import macro stuff
18
  import Cachex.Error
19
  import Cachex.Spec
20

21
  #############
22
  # Behaviour #
23
  #############
24

25
  @doc """
26
  Initialize a routing state for a cache.
27

28
  Please see all child implementations for supported options.
29
  """
30
  @callback init(cache :: Cachex.t(), options :: Keyword.t()) :: any
31

32
  @doc """
33
  Retrieve the list of nodes from a routing state.
34
  """
35
  @callback nodes(state :: any) :: [atom]
36

37
  @doc """
38
  Route a key to a node in a routing state.
39
  """
40
  @callback route(state :: any, key :: any) :: atom
41

42
  @doc """
43
  Create a child specification to back a routing state.
44
  """
45
  @callback children(cache :: Cachex.t(), options :: Keyword.t()) ::
46
              [Supervisor.child_spec()]
47

48
  ##################
49
  # Implementation #
50
  ##################
51

52
  @doc false
53
  defmacro __using__(_) do
54
    quote location: :keep, generated: true do
55
      @behaviour Cachex.Router
56

57
      @doc false
58
      def init(cache, options \\ []),
59
        do: nil
60

61
      @doc false
62
      def children(cache, options),
63
        do: []
64

65
      # state modifiers are overridable
66
      defoverridable init: 2, children: 2
67
    end
68
  end
69

70
  ##############
71
  # Public API #
72
  ##############
73

74
  @doc """
75
  Retrieve all currently connected nodes (including this one).
76
  """
77
  @spec connected() :: [atom]
78
  def connected(),
40✔
79
    do: [node() | :erlang.nodes(:connected)]
80

81
  @doc """
82
  Retrieve all routable nodes for a cache.
83
  """
84
  @spec nodes(cache :: Cachex.t()) :: {:ok, [atom]}
85
  def nodes(cache(router: router(module: module, state: state))),
14✔
86
    do: {:ok, module.nodes(state)}
87

88
  @doc """
89
  Executes a previously dispatched action..
90
  """
91
  # The first match short circuits local-only caches
92
  @spec route(Cachex.t(), atom, {atom, [any]}) :: any
93
  def route(cache(router: router(module: Router.Local)) = cache, module, call),
94
    do: route_local(cache, module, call)
20,874✔
95

96
  def route(cache() = cache, module, call),
97
    do: route_cluster(cache, module, call)
272✔
98

99
  @doc """
100
  Dispatches a call to an appropriate execution environment.
101

102
  This acts as a macro just to avoid the overhead of slicing up module
103
  names at runtime, when they can be guaranteed at compile time much
104
  more easily.
105
  """
106
  defmacro route(cache, {action, _arguments} = call) do
107
    # coveralls-ignore-start
108
    act_name =
109
      action
110
      |> Kernel.to_string()
111
      |> String.replace_trailing("?", "")
112
      |> Macro.camelize()
113

114
    act_join = :"Elixir.Cachex.Actions.#{act_name}"
115
    # coveralls-ignore-stop
116

117
    quote do
118
      Overseer.with(unquote(cache), fn cache ->
119
        call = unquote(call)
120
        module = unquote(act_join)
121

122
        Router.route(cache, module, call)
123
      end)
124
    end
125
  end
126

127
  ###############
128
  # Private API #
129
  ###############
130

131
  # Results merging for distributed cache results.
132
  #
133
  # Follows these rules:
134
  #
135
  # - Lists are always concatenated.
136
  # - Numbers are always summed.
137
  # - Booleans are always AND-ed.
138
  # - Maps are always merged (recursively).
139
  #
140
  # This has to be public due to scopes, but we hide the docs
141
  # because we don't really care for anybody else calling it.
142
  defp result_merge(left, right) when is_list(left),
143
    do: left ++ right
9✔
144

145
  defp result_merge(left, right) when is_number(left),
146
    do: left + right
25✔
147

148
  defp result_merge(left, right) when is_boolean(left),
149
    do: left && right
4✔
150

151
  # coveralls-ignore-start
152
  defp result_merge(left, right) when is_map(left) do
153
    Map.merge(left, right, fn _, left, right ->
154
      result_merge(left, right)
155
    end)
156
  end
157

158
  # coveralls-ignore-stop
159

160
  # Provides handling for local actions on this node.
161
  #
162
  # This will provide handling of notifications across hooks before and after
163
  # the execution of an action. This is taken from code formerly in the old
164
  # `Cachex.Actions` module, but has been moved here as it's more appropriate.
165
  #
166
  # If `notify` is set to false, notifications are disabled and the call is
167
  # simply executed as is. If `via` is provided, you can override the handle
168
  # passed to the hooks (useful for re-use of functions). An example of this
169
  # is `decr/4` which simply calls `incr/4` with `via: { :decr, arguments }`.
170
  defp route_local(cache, module, {_action, arguments} = call) do
171
    option = List.last(arguments)
21,093✔
172
    notify = Keyword.get(option, :notify, true)
21,093✔
173

174
    message =
21,093✔
175
      notify &&
527✔
176
        case option[:via] do
20,566✔
177
          msg when not is_tuple(msg) -> call
20,543✔
178
          msg -> msg
23✔
179
        end
180

181
    notify && Informant.broadcast(cache, message)
21,093✔
182
    result = apply(module, :execute, [cache | arguments])
21,093✔
183

184
    if notify do
21,092✔
185
      Informant.broadcast(
20,566✔
186
        cache,
187
        message,
188
        Keyword.get(option, :result, result)
189
      )
190
    end
191

192
    result
21,092✔
193
  end
194

195
  # actions based on a key
196
  @keyed_actions [
197
    :del,
198
    :exists?,
199
    :expire,
200
    :fetch,
201
    :get,
202
    :get_and_update,
203
    :incr,
204
    :invoke,
205
    :put,
206
    :refresh,
207
    :take,
208
    :touch,
209
    :ttl,
210
    :update
211
  ]
212

213
  # Provides handling to key-based actions distributed to remote nodes.
214
  #
215
  # The algorithm here is simple; hash the key and slot the value using JCH into
216
  # the total number of slots available (i.e. the count of the nodes). If it comes
217
  # out to the local node, just execute the local code, otherwise RPC the base call
218
  # to the remote node, and just assume that it'll correctly handle it.
219
  defp route_cluster(cache, module, {action, [key | _]} = call) when action in @keyed_actions do
220
    cache(router: router(module: router, state: nodes)) = cache
102✔
221
    route_node(cache, module, call, router.route(nodes, key))
102✔
222
  end
223

224
  # actions which merge outputs
225
  @merge_actions [
226
    :clear,
227
    :count,
228
    :empty?,
229
    :export,
230
    :import,
231
    :keys,
232
    :purge,
233
    :reset,
234
    :size
235
  ]
236

237
  # Provides handling of cross-node actions distributed over remote nodes.
238
  #
239
  # This will do an RPC call across all nodes to fetch their results and merge
240
  # them with the results on the local node. The hooks will only be notified
241
  # on the local node, due to an annoying recursion issue when handling the
242
  # same across all nodes - seems to provide better logic though.
243
  defp route_cluster(cache, module, {action, arguments} = call) when action in @merge_actions do
244
    # fetch the nodes from the cluster state
245
    cache(router: router(module: router, state: state)) = cache
65✔
246

247
    # all calls have options we can use
248
    options = List.last(arguments)
65✔
249

250
    # can force local node setting local: true
251
    results =
65✔
252
      case Keyword.get(options, :local) do
253
        true ->
27✔
254
          []
255

256
        _any ->
257
          # don't want to execute on the local node
258
          other_nodes =
38✔
259
            state
260
            |> router.nodes()
261
            |> List.delete(node())
262

263
          # execute the call on all other nodes
264
          {results, _} =
38✔
265
            :rpc.multicall(
266
              other_nodes,
267
              module,
268
              :execute,
269
              [cache | arguments]
270
            )
271

272
          results
38✔
273
      end
274

275
    # execution on the local node, using the local macros and then unpack
276
    result = route_local(cache, module, call)
65✔
277
    tagged = match?({:ok, _}, result)
65✔
278

279
    mapped = fn
65✔
NEW
280
      {:ok, v} -> v
×
281
      v -> v
103✔
282
    end
283

284
    # TODO: patched for migration
285
    result = mapped.(result)
65✔
286
    results = Enum.map(results, mapped)
65✔
287

288
    # results merge
289
    merge_result = Enum.reduce(results, result, &result_merge/2)
65✔
290

291
    # return after merge
292
    if tagged do
65✔
293
      {:ok, merge_result}
294
    else
295
      merge_result
65✔
296
    end
297
  end
298

299
  # actions which always run locally
300
  @local_actions [
301
    :inspect,
302
    :prune,
303
    :restore,
304
    :save,
305
    :stats,
306
    :warm
307
  ]
308

309
  # Provides handling of `:inspect` operations.
310
  #
311
  # These operations are guaranteed to run on the local nodes.
312
  defp route_cluster(cache, module, {action, _arguments} = call)
313
       when action in @local_actions,
314
       do: route_local(cache, module, call)
90✔
315

316
  # Provides handling of `:put_many` operations.
317
  #
318
  # These operations can only execute if their keys slot to the same remote nodes.
319
  defp route_cluster(cache, module, {:put_many, _arguments} = call),
320
    do: route_batch(cache, module, call, &elem(&1, 0))
2✔
321

322
  # Provides handling of `:transaction` operations.
323
  #
324
  # These operations can only execute if their keys slot to the same remote nodes.
325
  defp route_cluster(cache, module, {:transaction, [[] | _]} = call),
326
    do: route_local(cache, module, call)
1✔
327

328
  defp route_cluster(cache, module, {:transaction, [_keys | _]} = call),
329
    do: route_batch(cache, module, call, & &1)
2✔
330

331
  # Any other actions are only available with local: true in the call
332
  defp route_cluster(cache, module, {_action, arguments} = call) do
333
    # all calls have options we can use
334
    options = List.last(arguments)
10✔
335

336
    # can force local node setting local: true
337
    case Keyword.get(options, :local) do
10✔
338
      true -> route_local(cache, module, call)
9✔
339
      _any -> error(:non_distributed)
1✔
340
    end
341
  end
342

343
  # coveralls-ignore-start
344
  # Catch-all just in case we missed something...
345
  defp route_cluster(_cache, _module, _call),
346
    do: error(:non_distributed)
347

348
  # coveralls-ignore-stop
349

350
  # Calls a slot for the provided cache action if all keys slot to the same node.
351
  #
352
  # This is a delegate handler for `route_node/4`, but ensures that all keys slot to the
353
  # same node to avoid the case where we have to fork a call out internally.
354
  defp route_batch(cache, module, {_action, [keys | _]} = call, mapper) do
355
    # map all keys to a slot in the nodes list
356
    cache(router: router(module: router, state: state)) = cache
4✔
357
    slots = Enum.map(keys, &router.route(state, mapper.(&1)))
4✔
358

359
    # unique to avoid dups
360
    case Enum.uniq(slots) do
4✔
361
      # if there's a single slot it's safe to continue with the call to the remote
362
      [slot] ->
363
        route_node(cache, module, call, slot)
2✔
364

365
      # otherwise, cross_slot errors!
366
      _disable ->
2✔
367
        error(:cross_slot)
368
    end
369
  end
370

371
  # Calls a node for the provided cache action.
372
  #
373
  # This will determine a local slot and delegate locally if so, bypassing
374
  # any RPC calls in order to gain a slight bit of performance.
375
  defp route_node(cache, module, {action, arguments} = call, node) do
376
    current = node()
104✔
377
    cache(name: name) = cache
104✔
378

379
    case node do
104✔
380
      ^current ->
381
        route_local(cache, module, call)
54✔
382

383
      targeted ->
384
        result =
50✔
385
          :rpc.call(
386
            targeted,
387
            Cachex,
388
            action,
389
            [name | arguments]
390
          )
391

392
        with {:badrpc, reason} <- result do
50✔
393
          {:error, reason}
394
        end
395
    end
396
  end
397
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc