• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nshkrdotcom / ElixirScope / 87445e125cf1a04f5bcd3c5e00921afcd6013b1e

28 May 2025 02:48PM UTC coverage: 60.612% (+1.4%) from 59.229%
87445e125cf1a04f5bcd3c5e00921afcd6013b1e

push

github

NSHkr
Integration & Runtime Correlatio 1021 tests, 0 failures, 76 excluded

507 of 688 new or added lines in 5 files covered. (73.69%)

3 existing lines in 2 files now uncovered.

5466 of 9018 relevant lines covered (60.61%)

3689.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.71
/lib/elixir_scope/storage/data_access.ex
1
defmodule ElixirScope.Storage.DataAccess do
2
  @moduledoc """
3
  High-performance ETS-based storage for ElixirScope events.
4
  
5
  Provides multiple indexes for fast querying across different dimensions:
6
  - Primary index: Event ID -> Event
7
  - Temporal index: Timestamp -> Event ID
8
  - Process index: PID -> [Event IDs]
9
  - Function index: {Module, Function} -> [Event IDs]
10
  - Correlation index: Correlation ID -> [Event IDs]
11
  
12
  Designed for high write throughput and fast range queries.
13
  """
14

15
  alias ElixirScope.Events
16
  alias ElixirScope.Utils
17

18
  @type table_name :: atom()
19
  @type event_id :: binary()
20
  @type query_options :: keyword()
21

22
  defstruct [
23
    :name,
24
    :primary_table,
25
    :temporal_index,
26
    :process_index,
27
    :function_index,
28
    :correlation_index,
29
    :stats_table
30
  ]
31

32
  @type t :: %__MODULE__{
33
    name: table_name(),
34
    primary_table: :ets.tid(),
35
    temporal_index: :ets.tid(),
36
    process_index: :ets.tid(),
37
    function_index: :ets.tid(),
38
    correlation_index: :ets.tid(),
39
    stats_table: :ets.tid()
40
  }
41

42
  # Table configurations
43
  @primary_opts [:set, :public, {:read_concurrency, true}, {:write_concurrency, true}]
44
  @index_opts [:bag, :public, {:read_concurrency, true}, {:write_concurrency, true}]
45
  @stats_opts [:set, :public, {:read_concurrency, true}, {:write_concurrency, true}]
46

47
  @doc """
48
  Creates a new data access instance with ETS tables.
49
  
50
  ## Options
51
  - `:name` - Base name for the tables (default: generates unique name)
52
  - `:max_events` - Maximum number of events to store (default: 1_000_000)
53
  """
54
  @spec new(keyword()) :: {:ok, t()} | {:error, term()}
55
  def new(opts \\ []) do
56
    name = Keyword.get(opts, :name, generate_table_name())
62✔
57
    max_events = Keyword.get(opts, :max_events, 1_000_000)
62✔
58
    
59
    try do
62✔
60
      # Create primary event storage table
61
      primary_table = :ets.new(:"#{name}_events", @primary_opts)
62✔
62
      
63
      # Create index tables
64
      temporal_index = :ets.new(:"#{name}_temporal", @index_opts)
62✔
65
      process_index = :ets.new(:"#{name}_process", @index_opts)
62✔
66
      function_index = :ets.new(:"#{name}_function", @index_opts)
62✔
67
      correlation_index = :ets.new(:"#{name}_correlation", @index_opts)
62✔
68
      
69
      # Create stats table
70
      stats_table = :ets.new(:"#{name}_stats", @stats_opts)
62✔
71
      
72
      # Initialize stats
73
      :ets.insert(stats_table, [
62✔
74
        {:total_events, 0},
75
        {:max_events, max_events},
76
        {:oldest_timestamp, nil},
77
        {:newest_timestamp, nil},
78
        {:last_cleanup, Utils.monotonic_timestamp()}
79
      ])
80
      
81
      storage = %__MODULE__{
62✔
82
        name: name,
83
        primary_table: primary_table,
84
        temporal_index: temporal_index,
85
        process_index: process_index,
86
        function_index: function_index,
87
        correlation_index: correlation_index,
88
        stats_table: stats_table
89
      }
90
      
91
      {:ok, storage}
92
    rescue
93
      error -> {:error, {:table_creation_failed, error}}
×
94
    end
95
  end
96

97
  @doc """
98
  Stores an event in the data access layer.
99
  
100
  Automatically creates all necessary indexes for fast querying.
101
  """
102
  @spec store_event(t(), Events.event()) :: :ok | {:error, term()}
103
  def store_event(%__MODULE__{} = storage, event) do
104
    try do
2,506✔
105
      event_id = event.id
2,506✔
106
      timestamp = event.timestamp
2,505✔
107
      
108
      # Store in primary table
109
      :ets.insert(storage.primary_table, {event_id, event})
2,505✔
110
      
111
      # Update temporal index
112
      :ets.insert(storage.temporal_index, {timestamp, event_id})
2,505✔
113
      
114
      # Update process index if event has PID
115
      case extract_pid(event) do
2,505✔
116
        nil -> :ok
2,502✔
117
        pid -> :ets.insert(storage.process_index, {pid, event_id})
3✔
118
      end
119
      
120
      # Update function index if event has function info
121
      case extract_function_info(event) do
2,505✔
122
        nil -> :ok
2✔
123
        {module, function} -> :ets.insert(storage.function_index, {{module, function}, event_id})
2,503✔
124
      end
125
      
126
      # Update correlation index if event has correlation ID
127
      case extract_correlation_id(event) do
2,505✔
128
        nil -> :ok
2,505✔
129
        correlation_id -> :ets.insert(storage.correlation_index, {correlation_id, event_id})
×
130
      end
131
      
132
      # Update statistics
133
      update_stats(storage, timestamp)
2,505✔
134
      
135
      :ok
136
    rescue
137
      error -> {:error, {:storage_failed, error}}
1✔
138
    end
139
  end
140

141
  @doc """
142
  Stores multiple events in batch for better performance.
143
  """
144
  @spec store_events(t(), [Events.event()]) :: {:ok, non_neg_integer()} | {:error, term()}
145
  def store_events(%__MODULE__{} = storage, events) when is_list(events) do
146
    # Handle empty list case
147
    if events == [] do
23✔
148
      {:ok, 0}
149
    else
150
      try do
22✔
151
        # Prepare all inserts
152
        primary_inserts = Enum.map(events, &{&1.id, &1})
22✔
153
        temporal_inserts = Enum.map(events, &{&1.timestamp, &1.id})
22✔
154
        
155
        process_inserts = events
22✔
156
          |> Enum.map(&{extract_pid(&1), &1.id})
13,242✔
157
          |> Enum.reject(&(elem(&1, 0) == nil))
13,242✔
158
        
159
        function_inserts = events
22✔
160
          |> Enum.map(&{extract_function_info(&1), &1.id})
13,242✔
161
          |> Enum.reject(&(elem(&1, 0) == nil))
13,242✔
162
        
163
        correlation_inserts = events
22✔
164
          |> Enum.map(&{extract_correlation_id(&1), &1.id})
13,242✔
165
          |> Enum.reject(&(elem(&1, 0) == nil))
13,242✔
166
        
167
        # Batch insert into all tables
168
        :ets.insert(storage.primary_table, primary_inserts)
22✔
169
        :ets.insert(storage.temporal_index, temporal_inserts)
22✔
170
        
171
        if length(process_inserts) > 0 do
22✔
172
          :ets.insert(storage.process_index, process_inserts)
5✔
173
        end
174
        
175
        if length(function_inserts) > 0 do
22✔
176
          :ets.insert(storage.function_index, function_inserts)
22✔
177
        end
178
        
179
        if length(correlation_inserts) > 0 do
22✔
180
          :ets.insert(storage.correlation_index, correlation_inserts)
3✔
181
        end
182
        
183
        # Update stats with newest timestamp and event count
184
        newest_timestamp = events |> Enum.map(& &1.timestamp) |> Enum.max()
22✔
185
        update_stats_batch(storage, newest_timestamp, length(events))
22✔
186
        
187
        {:ok, length(events)}
188
      rescue
189
        error -> {:error, {:batch_storage_failed, error}}
×
190
      end
191
    end
192
  end
193

194
  @doc """
195
  Retrieves an event by its ID.
196
  """
197
  @spec get_event(t(), event_id()) :: {:ok, Events.event()} | {:error, :not_found}
198
  def get_event(%__MODULE__{} = storage, event_id) do
199
    case :ets.lookup(storage.primary_table, event_id) do
15✔
200
      [{^event_id, event}] -> {:ok, event}
14✔
201
      [] -> {:error, :not_found}
1✔
202
    end
203
  end
204

205
  @doc """
206
  Queries events by time range.
207
  
208
  ## Options
209
  - `:limit` - Maximum number of events to return (default: 1000)
210
  - `:order` - `:asc` or `:desc` (default: `:asc`)
211
  """
212
  @spec query_by_time_range(t(), non_neg_integer(), non_neg_integer(), query_options()) :: 
213
    {:ok, [Events.event()]} | {:error, term()}
214
  def query_by_time_range(%__MODULE__{} = storage, start_time, end_time, opts \\ []) do
215
    limit = Keyword.get(opts, :limit, 1000)
7✔
216
    order = Keyword.get(opts, :order, :asc)
7✔
217
    
218
    try do
7✔
219
      # Get event IDs in time range
220
      event_ids = get_events_in_time_range(storage.temporal_index, start_time, end_time, limit, order)
7✔
221
      
222
      # Fetch actual events
223
      events = Enum.map(event_ids, fn event_id ->
7✔
224
        [{^event_id, event}] = :ets.lookup(storage.primary_table, event_id)
1,023✔
225
        event
1,023✔
226
      end)
227
      
228
      {:ok, events}
229
    rescue
230
      error -> {:error, {:query_failed, error}}
×
231
    end
232
  end
233

234
  @doc """
235
  Queries events by process ID.
236
  """
237
  @spec query_by_process(t(), pid(), query_options()) :: {:ok, [Events.event()]} | {:error, term()}
238
  def query_by_process(%__MODULE__{} = storage, pid, opts \\ []) do
239
    limit = Keyword.get(opts, :limit, 1000)
9✔
240
    
241
    try do
9✔
242
      # Get event IDs for this process
243
      event_ids = :ets.lookup(storage.process_index, pid)
9✔
244
        |> Enum.map(&elem(&1, 1))
10,004✔
245
        |> Enum.take(limit)
246
      
247
      # Fetch actual events
248
      events = Enum.map(event_ids, fn event_id ->
9✔
249
        [{^event_id, event}] = :ets.lookup(storage.primary_table, event_id)
1,003✔
250
        event
1,003✔
251
      end)
252
      
253
      {:ok, events}
254
    rescue
255
      error -> {:error, {:query_failed, error}}
×
256
    end
257
  end
258

259
  @doc """
260
  Queries events by function.
261
  """
262
  @spec query_by_function(t(), module(), atom(), query_options()) :: {:ok, [Events.event()]} | {:error, term()}
263
  def query_by_function(%__MODULE__{} = storage, module, function, opts \\ []) do
264
    limit = Keyword.get(opts, :limit, 1000)
3✔
265
    
266
    try do
3✔
267
      # Get event IDs for this function
268
      event_ids = :ets.lookup(storage.function_index, {module, function})
3✔
269
        |> Enum.map(&elem(&1, 1))
4✔
270
        |> Enum.take(limit)
271
      
272
      # Fetch actual events
273
      events = Enum.map(event_ids, fn event_id ->
3✔
274
        [{^event_id, event}] = :ets.lookup(storage.primary_table, event_id)
3✔
275
        event
3✔
276
      end)
277
      
278
      {:ok, events}
279
    rescue
280
      error -> {:error, {:query_failed, error}}
×
281
    end
282
  end
283

284
  @doc """
285
  Queries events by correlation ID.
286
  """
287
  @spec query_by_correlation(t(), term(), query_options()) :: {:ok, [Events.event()]} | {:error, term()}
288
  def query_by_correlation(%__MODULE__{} = storage, correlation_id, opts \\ []) do
289
    limit = Keyword.get(opts, :limit, 1000)
3✔
290
    
291
    try do
3✔
292
      # Get event IDs for this correlation
293
      event_ids = :ets.lookup(storage.correlation_index, correlation_id)
3✔
294
        |> Enum.map(&elem(&1, 1))
4✔
295
        |> Enum.take(limit)
296
      
297
      # Fetch actual events
298
      events = Enum.map(event_ids, fn event_id ->
3✔
299
        [{^event_id, event}] = :ets.lookup(storage.primary_table, event_id)
3✔
300
        event
3✔
301
      end)
302
      
303
      {:ok, events}
304
    rescue
305
      error -> {:error, {:query_failed, error}}
×
306
    end
307
  end
308

309
  @doc """
310
  Gets storage statistics.
311
  """
312
  @spec get_stats(t()) :: %{
313
    total_events: non_neg_integer(),
314
    max_events: non_neg_integer(),
315
    oldest_timestamp: non_neg_integer() | nil,
316
    newest_timestamp: non_neg_integer() | nil,
317
    memory_usage: non_neg_integer()
318
  }
319
  def get_stats(%__MODULE__{} = storage) do
320
    stats = :ets.tab2list(storage.stats_table) |> Map.new()
10✔
321
    
322
    memory_usage = 
10✔
323
      :ets.info(storage.primary_table, :memory) +
10✔
324
      :ets.info(storage.temporal_index, :memory) +
10✔
325
      :ets.info(storage.process_index, :memory) +
10✔
326
      :ets.info(storage.function_index, :memory) +
10✔
327
      :ets.info(storage.correlation_index, :memory)
10✔
328
    
329
    Map.put(stats, :memory_usage, memory_usage * :erlang.system_info(:wordsize))
10✔
330
  end
331

332
  @doc """
333
  Cleans up old events to maintain memory bounds.
334
  
335
  Removes events older than the specified timestamp.
336
  """
337
  @spec cleanup_old_events(t(), non_neg_integer()) :: {:ok, non_neg_integer()} | {:error, term()}
338
  def cleanup_old_events(%__MODULE__{} = storage, cutoff_timestamp) do
339
    try do
3✔
340
      # Find old event IDs
341
      old_event_ids = get_events_before_timestamp(storage.temporal_index, cutoff_timestamp)
3✔
342
      
343
      # Remove from all tables
344
      Enum.each(old_event_ids, fn event_id ->
3✔
345
        # Get event to extract index keys
346
        case :ets.lookup(storage.primary_table, event_id) do
503✔
347
          [{^event_id, event}] ->
348
            # Remove from primary table
349
            :ets.delete(storage.primary_table, event_id)
503✔
350
            
351
            # Remove from indexes
352
            :ets.delete_object(storage.temporal_index, {event.timestamp, event_id})
503✔
353
            
354
            if pid = extract_pid(event) do
503✔
355
              :ets.delete_object(storage.process_index, {pid, event_id})
×
356
            end
357
            
358
            if function_info = extract_function_info(event) do
503✔
359
              :ets.delete_object(storage.function_index, {function_info, event_id})
503✔
360
            end
361
            
362
            if correlation_id = extract_correlation_id(event) do
503✔
363
              :ets.delete_object(storage.correlation_index, {correlation_id, event_id})
×
364
            end
365
            
366
          [] ->
×
367
            # Event already removed
368
            :ok
369
        end
370
      end)
371
      
372
      # Update stats
373
      :ets.update_counter(storage.stats_table, :total_events, -length(old_event_ids))
3✔
374
      :ets.insert(storage.stats_table, {:last_cleanup, Utils.monotonic_timestamp()})
3✔
375
      
376
      {:ok, length(old_event_ids)}
377
    rescue
378
      error -> {:error, {:cleanup_failed, error}}
×
379
    end
380
  end
381

382
  @doc """
383
  Destroys the storage and cleans up all ETS tables.
384
  """
385
  @spec destroy(t()) :: :ok
386
  def destroy(%__MODULE__{} = storage) do
387
    :ets.delete(storage.primary_table)
×
388
    :ets.delete(storage.temporal_index)
×
389
    :ets.delete(storage.process_index)
×
390
    :ets.delete(storage.function_index)
×
391
    :ets.delete(storage.correlation_index)
×
392
    :ets.delete(storage.stats_table)
×
393
    :ok
394
  end
395

396
  @doc """
397
  Queries events since a given timestamp.
398
  """
399
  @spec get_events_since(non_neg_integer()) :: [Events.event()]
400
  def get_events_since(since_timestamp) do
401
    # This assumes we have a global storage instance - in practice this would be managed by a GenServer
402
    case get_default_storage() do
×
403
      {:ok, storage} ->
404
        case query_by_time_range(storage, since_timestamp, Utils.monotonic_timestamp(), limit: 10_000) do
×
405
          {:ok, events} -> events
×
406
          {:error, _} -> []
×
407
        end
408
      {:error, _} -> []
×
409
    end
410
  end
411

412
  @doc """
413
  Checks if an event exists by ID.
414
  """
415
  @spec event_exists?(event_id()) :: boolean()
416
  def event_exists?(event_id) do
417
    case get_default_storage() do
×
418
      {:ok, storage} ->
419
        case get_event(storage, event_id) do
×
420
          {:ok, _} -> true
×
421
          {:error, :not_found} -> false
×
422
        end
423
      {:error, _} -> false
×
424
    end
425
  end
426

427
  @doc """
428
  Stores multiple events (simplified interface).
429
  """
430
  @spec store_events([Events.event()]) :: :ok | {:error, term()}
431
  def store_events(events) when is_list(events) do
432
    case get_default_storage() do
×
433
      {:ok, storage} ->
434
        case store_events(storage, events) do
×
435
          {:ok, _count} -> :ok
×
436
          error -> error
×
437
        end
438
      error -> error
×
439
    end
440
  end
441

442
  @doc """
443
  Gets the current instrumentation plan.
444
  """
445
  @spec get_instrumentation_plan() :: {:ok, map()} | {:error, :not_found}
446
  def get_instrumentation_plan() do
447
    case get_default_storage() do
11✔
448
      {:ok, storage} ->
449
        case :ets.lookup(storage.stats_table, :instrumentation_plan) do
11✔
450
          [{:instrumentation_plan, plan}] -> {:ok, plan}
9✔
451
          [] -> {:error, :not_found}
2✔
452
        end
453
      error -> error
×
454
    end
455
  end
456

457
  @doc """
458
  Stores an instrumentation plan.
459
  """
460
  @spec store_instrumentation_plan(map()) :: :ok | {:error, term()}
461
  def store_instrumentation_plan(plan) do
462
    case get_default_storage() do
8✔
463
      {:ok, storage} ->
464
        :ets.insert(storage.stats_table, {:instrumentation_plan, plan})
8✔
465
        :ok
466
      error -> error
×
467
    end
468
  end
469

470
  # Default storage management (simplified - in production this would be a GenServer)
471
  defp get_default_storage() do
472
    case :persistent_term.get(:elixir_scope_default_storage, nil) do
19✔
473
      nil ->
474
        case new(name: :elixir_scope_default) do
8✔
475
          {:ok, storage} ->
476
            :persistent_term.put(:elixir_scope_default_storage, storage)
8✔
477
            {:ok, storage}
478
          error -> error
×
479
        end
480
      storage -> {:ok, storage}
11✔
481
    end
482
  end
483

484
  # Private functions
485

486
  defp extract_pid(%Events.FunctionExecution{caller_pid: pid}), do: pid
16,245✔
487
  defp extract_pid(%Events.ProcessEvent{pid: pid}), do: pid
4✔
488
  defp extract_pid(%Events.MessageEvent{from_pid: pid}), do: pid
1✔
489
  # StateChange events are wrapped in base event structure, extract from wrapper
490
  defp extract_pid(%ElixirScope.Events{event_type: :state_change, pid: pid}), do: pid
×
491
  # ErrorEvent events are wrapped in base event structure, extract from wrapper
492
  defp extract_pid(%ElixirScope.Events{event_type: :error, pid: pid}), do: pid
×
UNCOV
493
  defp extract_pid(_), do: nil
×
494

495
  defp extract_function_info(%Events.FunctionExecution{module: module, function: function}) 
16,245✔
496
    when not is_nil(module) and not is_nil(function), do: {module, function}
497
  defp extract_function_info(_), do: nil
5✔
498

499
  defp extract_correlation_id(%Events.FunctionExecution{correlation_id: id}), do: id
16,245✔
500
  defp extract_correlation_id(_), do: nil
5✔
501

502
  defp update_stats(storage, timestamp) do
503
    :ets.update_counter(storage.stats_table, :total_events, 1)
2,505✔
504
    
505
    # Update oldest timestamp if this is the first event
506
    case :ets.lookup(storage.stats_table, :oldest_timestamp) do
2,505✔
507
      [{:oldest_timestamp, nil}] ->
508
        :ets.insert(storage.stats_table, {:oldest_timestamp, timestamp})
6✔
509
      _ ->
2,499✔
510
        :ok
511
    end
512
    
513
    # Always update newest timestamp
514
    :ets.insert(storage.stats_table, {:newest_timestamp, timestamp})
2,505✔
515
  end
516

517
  defp update_stats_batch(storage, timestamp, count) do
518
    :ets.update_counter(storage.stats_table, :total_events, count)
22✔
519
    
520
    # Update oldest timestamp if this is the first event
521
    case :ets.lookup(storage.stats_table, :oldest_timestamp) do
22✔
522
      [{:oldest_timestamp, nil}] ->
523
        :ets.insert(storage.stats_table, {:oldest_timestamp, timestamp})
22✔
524
      _ ->
×
525
        :ok
526
    end
527
    
528
    # Always update newest timestamp
529
    :ets.insert(storage.stats_table, {:newest_timestamp, timestamp})
22✔
530
  end
531

532
  defp get_events_in_time_range(temporal_table, start_time, end_time, limit, order) do
533
    # This is a simplified implementation
534
    # In a real system, you'd want more efficient range queries
535
    all_entries = :ets.tab2list(temporal_table)
7✔
536
    
537
    filtered_entries = all_entries
7✔
538
      |> Enum.filter(fn {timestamp, _} -> timestamp >= start_time and timestamp <= end_time end)
10,100✔
539
    
540
    sorted_entries = case order do
7✔
541
      :asc -> Enum.sort_by(filtered_entries, fn {timestamp, _} -> timestamp end)
6✔
542
      :desc -> Enum.sort_by(filtered_entries, fn {timestamp, _} -> timestamp end, :desc)
1✔
543
    end
544
    
545
    sorted_entries
546
      |> Enum.take(limit)
547
      |> Enum.map(&elem(&1, 1))
7✔
548
  end
549

550
  defp get_events_before_timestamp(temporal_table, cutoff_timestamp) do
551
    :ets.tab2list(temporal_table)
552
      |> Enum.filter(fn {timestamp, _} -> timestamp < cutoff_timestamp end)
1,020✔
553
      |> Enum.map(&elem(&1, 1))
3✔
554
  end
555

556
  defp generate_table_name do
557
    :"data_access_#{System.unique_integer([:positive])}"
62✔
558
  end
559
end 
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc