• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agentjido / jido / e1279ae55e92f7e5c813f99ddc4b8f682d1e39d8-PR-19

07 Apr 2025 11:08PM UTC coverage: 74.129% (-0.1%) from 74.224%
e1279ae55e92f7e5c813f99ddc4b8f682d1e39d8-PR-19

Pull #19

github

web-flow
deps(deps-dev): bump mimic from 1.11.1 to 1.11.2

Bumps [mimic](https://github.com/edgurgel/mimic) from 1.11.1 to 1.11.2.
- [Release notes](https://github.com/edgurgel/mimic/releases)
- [Commits](https://github.com/edgurgel/mimic/compare/v1.11.1...v1.11.2)

---
updated-dependencies:
- dependency-name: mimic
  dependency-version: 1.11.2
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #19: deps(deps-dev): bump mimic from 1.11.1 to 1.11.2

2341 of 3158 relevant lines covered (74.13%)

24381.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.46
/lib/jido/signal/journal.ex
1
defmodule Jido.Signal.Journal do
2
  @moduledoc """
3
  The Signal Journal tracks and manages signals between agents, maintaining causality
4
  and conversation relationships. It provides a directed graph of signals that captures
5
  temporal ordering and causal relationships.
6
  """
7
  use TypedStruct
8
  alias Jido.Signal
9

10
  typedstruct do
1✔
11
    @typedoc "The journal maintains the graph of signals and their relationships"
12
    field(:adapter, module(), enforce: true)
13
    field(:adapter_pid, pid())
14
  end
15

16
  @type query_opts :: [
17
          type: String.t() | nil,
18
          source: String.t() | nil,
19
          after: DateTime.t() | nil,
20
          before: DateTime.t() | nil
21
        ]
22

23
  @doc """
24
  Creates a new journal with the specified persistence adapter.
25
  """
26
  @spec new(module()) :: t()
27
  def new(adapter \\ Jido.Signal.Journal.Adapters.InMemory) do
28
    case adapter.init() do
19✔
29
      {:ok, pid} when adapter == Jido.Signal.Journal.Adapters.ETS ->
30
        %__MODULE__{adapter: adapter, adapter_pid: pid}
×
31

32
      :ok ->
33
        %__MODULE__{adapter: adapter}
19✔
34

35
      error ->
36
        error
×
37
    end
38
  end
39

40
  @doc """
41
  Records a new signal in the journal.
42

43
  ## Parameters
44
    * journal - The current journal state
45
    * signal - The signal to record
46
    * cause_id - Optional ID of the signal that caused this one
47

48
  Returns `{:ok, journal}` or `{:error, reason}`
49
  """
50
  @spec record(t(), Signal.t(), String.t() | nil) :: {:ok, t()} | {:error, atom()}
51
  def record(journal, %Signal{} = signal, cause_id \\ nil) do
52
    with :ok <- validate_causality(journal, signal, cause_id),
34✔
53
         :ok <- put_signal(journal, signal),
31✔
54
         :ok <- add_to_conversation(journal, signal),
31✔
55
         :ok <- maybe_add_causality(journal, signal, cause_id) do
31✔
56
      {:ok, journal}
57
    end
58
  end
59

60
  @doc """
61
  Gets all signals in a conversation.
62

63
  ## Parameters
64
    * journal - The current journal state
65
    * conversation_id - The ID of the conversation to fetch
66

67
  Returns a list of signals in chronological order
68
  """
69
  @spec get_conversation(t(), String.t()) :: [Signal.t()]
70
  def get_conversation(%__MODULE__{} = journal, conversation_id) do
71
    case call_adapter(journal, :get_conversation, [conversation_id]) do
4✔
72
      {:ok, signal_ids} ->
73
        signal_ids
74
        |> MapSet.to_list()
75
        |> Task.async_stream(fn id ->
76
          case call_adapter(journal, :get_signal, [id]) do
5✔
77
            {:ok, signal} -> signal
5✔
78
            _ -> nil
×
79
          end
80
        end)
81
        |> Stream.map(fn {:ok, signal} -> signal end)
5✔
82
        |> Stream.reject(&is_nil/1)
5✔
83
        |> Enum.sort_by(& &1.time, &sort_time_compare/2)
4✔
84

85
      _ ->
×
86
        []
87
    end
88
  end
89

90
  @doc """
91
  Gets all effects (signals caused by) of a given signal.
92

93
  ## Parameters
94
    * journal - The current journal state
95
    * signal_id - The ID of the signal to get effects for
96

97
  Returns a list of signals in chronological order
98
  """
99
  @spec get_effects(t(), String.t()) :: [Signal.t()]
100
  def get_effects(%__MODULE__{} = journal, signal_id) do
101
    case call_adapter(journal, :get_effects, [signal_id]) do
11✔
102
      {:ok, effect_ids} ->
103
        effect_ids
104
        |> MapSet.to_list()
105
        |> Task.async_stream(fn id ->
106
          case call_adapter(journal, :get_signal, [id]) do
6✔
107
            {:ok, signal} -> signal
6✔
108
            _ -> nil
×
109
          end
110
        end)
111
        |> Stream.map(fn {:ok, signal} -> signal end)
6✔
112
        |> Stream.reject(&is_nil/1)
6✔
113
        |> Enum.sort_by(& &1.time, &sort_time_compare/2)
11✔
114

115
      _ ->
×
116
        []
117
    end
118
  end
119

120
  @doc """
121
  Gets the cause of a given signal.
122

123
  ## Parameters
124
    * journal - The current journal state
125
    * signal_id - The ID of the signal to get the cause for
126

127
  Returns the causing signal or nil if none exists
128
  """
129
  @spec get_cause(t(), String.t()) :: Signal.t() | nil
130
  def get_cause(%__MODULE__{} = journal, signal_id) do
131
    with {:ok, cause_id} <- call_adapter(journal, :get_cause, [signal_id]),
26✔
132
         {:ok, signal} <- call_adapter(journal, :get_signal, [cause_id]) do
10✔
133
      signal
10✔
134
    else
135
      _ -> nil
136
    end
137
  end
138

139
  @doc """
140
  Traces the complete causal chain starting from a signal.
141

142
  ## Parameters
143
    * journal - The current journal state
144
    * signal_id - The ID of the signal to trace from
145
    * direction - :forward for effects chain, :backward for causes chain
146

147
  Returns a list of signals in causal order
148
  """
149
  @spec trace_chain(t(), String.t(), :forward | :backward) :: [Signal.t()]
150
  def trace_chain(journal, signal_id, direction \\ :forward) do
151
    case call_adapter(journal, :get_signal, [signal_id]) do
15✔
152
      {:ok, signal} ->
153
        do_trace_chain(journal, [signal], direction, MapSet.new([signal_id]))
15✔
154

155
      _ ->
×
156
        []
157
    end
158
  end
159

160
  @doc """
161
  Queries signals based on criteria.
162

163
  ## Options
164
    * type - Filter by signal type
165
    * source - Filter by signal source
166
    * after - Filter signals after this time
167
    * before - Filter signals before this time
168

169
  Returns a list of signals matching all criteria, in chronological order
170
  """
171
  @spec query(t(), query_opts()) :: [Signal.t()]
172
  def query(%__MODULE__{} = journal, opts \\ []) do
173
    # Note: This is inefficient for large datasets as it loads all signals
174
    # A real implementation would push filtering down to the persistence layer
175
    get_all_signals(journal)
176
    |> Enum.filter(&matches_criteria?(&1, opts))
8✔
177
    |> Enum.sort_by(& &1.time, &sort_time_compare/2)
4✔
178
  end
179

180
  # Private helpers
181

182
  defp validate_causality(_journal, _signal, nil), do: :ok
21✔
183

184
  defp validate_causality(journal, signal, cause_id) do
185
    case call_adapter(journal, :get_signal, [cause_id]) do
13✔
186
      {:ok, cause} ->
187
        cond do
12✔
188
          # Would create a cycle
189
          would_create_cycle?(journal, signal.id, cause_id) ->
12✔
190
            {:error, :causality_cycle}
191

192
          # Cause is chronologically after the effect
193
          time_compare(signal.time, cause.time) == :lt ->
11✔
194
            {:error, :invalid_temporal_order}
195

196
          true ->
10✔
197
            :ok
198
        end
199

200
      {:error, :not_found} ->
1✔
201
        {:error, :cause_not_found}
202

203
      error ->
204
        error
×
205
    end
206
  end
207

208
  defp would_create_cycle?(journal, effect_id, cause_id) do
209
    # Check if the effect is already in the cause's chain
210
    cause_chain = trace_chain(journal, cause_id, :backward)
12✔
211
    Enum.any?(cause_chain, &(&1.id == effect_id))
12✔
212
  end
213

214
  defp add_to_conversation(journal, signal) do
215
    conversation_id = signal.subject || "default"
31✔
216
    call_adapter(journal, :put_conversation, [conversation_id, signal.id])
31✔
217
  end
218

219
  defp maybe_add_causality(_journal, _signal, nil), do: :ok
21✔
220

221
  defp maybe_add_causality(journal, signal, cause_id) do
222
    call_adapter(journal, :put_cause, [cause_id, signal.id])
10✔
223
  end
224

225
  defp put_signal(journal, signal) do
226
    call_adapter(journal, :put_signal, [signal])
31✔
227
  end
228

229
  defp get_all_signals(%__MODULE__{adapter: Jido.Signal.Journal.Adapters.ETS} = journal) do
230
    call_adapter(journal, :get_all_signals, [])
×
231
  end
232

233
  defp get_all_signals(%__MODULE__{adapter: adapter}) do
234
    adapter.get_all_signals()
4✔
235
  end
236

237
  defp call_adapter({:error, {:already_started, pid}}, function, args) do
238
    apply(Jido.Signal.Journal.Adapters.ETS, function, args ++ [pid])
×
239
  end
240

241
  defp call_adapter(
242
         %__MODULE__{adapter: Jido.Signal.Journal.Adapters.ETS, adapter_pid: pid} = _journal,
243
         function,
244
         args
245
       ) do
246
    apply(Jido.Signal.Journal.Adapters.ETS, function, args ++ [pid])
×
247
  end
248

249
  defp call_adapter(%__MODULE__{adapter: adapter} = _journal, function, args) do
250
    apply(adapter, function, args)
162✔
251
  end
252

253
  defp do_trace_chain(_journal, chain, _direction, _visited) when length(chain) > 100 do
254
    # Prevent infinite recursion by limiting chain length
255
    chain
×
256
  end
257

258
  defp do_trace_chain(journal, chain, direction, visited) do
259
    current = List.last(chain)
27✔
260

261
    next_signals =
27✔
262
      case direction do
263
        :forward -> get_effects(journal, current.id)
6✔
264
        :backward -> [get_cause(journal, current.id)]
21✔
265
      end
266
      |> Enum.reject(&is_nil/1)
25✔
267
      |> Enum.reject(&MapSet.member?(visited, &1.id))
12✔
268

269
    case next_signals do
27✔
270
      [] ->
271
        chain
16✔
272

273
      signals ->
274
        new_visited = Enum.reduce(signals, visited, &MapSet.put(&2, &1.id))
11✔
275

276
        Enum.reduce(signals, chain, fn signal, acc ->
11✔
277
          do_trace_chain(journal, acc ++ [signal], direction, new_visited)
12✔
278
        end)
279
    end
280
  end
281

282
  defp matches_criteria?(signal, opts) do
283
    Enum.all?([
8✔
284
      matches_type?(signal, opts[:type]),
285
      matches_source?(signal, opts[:source]),
286
      matches_time_range?(signal, opts[:after], opts[:before])
287
    ])
288
  end
289

290
  defp matches_type?(_signal, nil), do: true
5✔
291
  defp matches_type?(signal, type), do: signal.type == type
3✔
292

293
  defp matches_source?(_signal, nil), do: true
3✔
294
  defp matches_source?(signal, source), do: signal.source == source
5✔
295

296
  defp matches_time_range?(signal, after_time, before_time) do
297
    matches_after?(signal, after_time) and matches_before?(signal, before_time)
8✔
298
  end
299

300
  defp matches_after?(_signal, nil), do: true
6✔
301
  defp matches_after?(signal, after_time), do: time_compare(signal.time, after_time) in [:gt, :eq]
2✔
302

303
  defp matches_before?(_signal, nil), do: true
7✔
304
  defp matches_before?(signal, before_time), do: time_compare(signal.time, before_time) == :lt
×
305

306
  # Time comparison helpers
307
  defp time_compare(time1, time2) when is_binary(time1) and is_binary(time2) do
308
    {:ok, dt1, _} = DateTime.from_iso8601(time1)
14✔
309
    {:ok, dt2, _} = DateTime.from_iso8601(time2)
14✔
310
    DateTime.compare(dt1, dt2)
14✔
311
  end
312

313
  defp time_compare(time1, %DateTime{} = time2) when is_binary(time1) do
314
    {:ok, dt1, _} = DateTime.from_iso8601(time1)
2✔
315
    DateTime.compare(dt1, time2)
2✔
316
  end
317

318
  defp time_compare(%DateTime{} = time1, time2) when is_binary(time2) do
319
    {:ok, dt2, _} = DateTime.from_iso8601(time2)
×
320
    DateTime.compare(time1, dt2)
×
321
  end
322

323
  defp time_compare(%DateTime{} = time1, %DateTime{} = time2) do
324
    DateTime.compare(time1, time2)
×
325
  end
326

327
  # Sorting comparison function
328
  defp sort_time_compare(time1, time2) do
329
    case time_compare(time1, time2) do
3✔
330
      :lt -> true
2✔
331
      :eq -> true
×
332
      :gt -> false
1✔
333
    end
334
  end
335
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc