• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orneryd / NornicDB / 22803356950

07 Mar 2026 05:07PM UTC coverage: 72.629% (+2.7%) from 69.919%
22803356950

push

github

orneryd
adding docker deploy CD yaml, adding tests

74706 of 102860 relevant lines covered (72.63%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.44
/pkg/storage/async_engine.go
1
// Package storage - AsyncEngine provides write-behind caching for eventual consistency.
2
//
3
// AsyncEngine wraps a storage engine and provides:
4
//   - Immediate writes to in-memory cache (fast)
5
//   - Background writes to underlying engine (async)
6
//   - Reads check cache first, then engine (eventual consistency)
7
//
8
// Trade-offs:
9
//   - Much faster writes (returns immediately)
10
//   - Reads may see stale data briefly (eventual consistency)
11
//   - Data loss risk if crash before flush (use with WAL for durability)
12
package storage
13

14
import (
15
        "context"
16
        "errors"
17
        "fmt"
18
        "log"
19
        "reflect"
20
        "strings"
21
        "sync"
22
        "time"
23
)
24

25
// AsyncEngine wraps a storage engine with write-behind caching.
26
// Writes return immediately after updating the cache, and are
27
// flushed to the underlying engine asynchronously.
28
type AsyncEngine struct {
29
        engine Engine
30

31
        // In-memory cache for pending writes
32
        nodeCache   map[NodeID]*Node
33
        edgeCache   map[EdgeID]*Edge
34
        deleteNodes map[NodeID]bool
35
        deleteEdges map[EdgeID]bool
36
        mu          sync.RWMutex
37

38
        // Event callbacks (optional): used to keep external services in sync when
39
        // operations are satisfied purely from the async cache (i.e., no inner engine
40
        // callback will fire because the data never hit persistent storage).
41
        onNodeCreated NodeEventCallback
42
        onNodeUpdated NodeEventCallback
43
        onNodeDeleted NodeDeleteCallback
44
        onEdgeCreated EdgeEventCallback
45
        onEdgeUpdated EdgeEventCallback
46
        onEdgeDeleted EdgeDeleteCallback
47
        callbackMu    sync.RWMutex
48

49
        // In-flight tracking: nodes/edges being written but not yet cleared from cache
50
        // This prevents double-counting in NodeCount/EdgeCount during flush
51
        inFlightNodes map[NodeID]bool
52
        inFlightEdges map[EdgeID]bool
53

54
        // Updates tracking: nodes/edges in cache that are UPDATES (not creates)
55
        // These exist in underlying engine and shouldn't be counted as pending creates
56
        updateNodes map[NodeID]bool
57
        updateEdges map[EdgeID]bool
58
        // Baseline snapshots captured when a node update is first queued.
59
        // Used to detect stale async updates and rebase onto newer committed state.
60
        nodeUpdateBaseline map[NodeID]*Node
61

62
        // Label index for fast lookups - maps normalized label to node IDs
63
        labelIndex map[string]map[NodeID]bool
64

65
        // Background flush
66
        flushInterval    time.Duration
67
        minFlushInterval time.Duration
68
        maxFlushInterval time.Duration
69
        targetFlushSize  int
70
        adaptiveFlush    bool
71
        flushTicker      *time.Ticker
72
        stopChan         chan struct{}
73
        wg               sync.WaitGroup
74
        maxNodeCacheSize int
75
        maxEdgeCacheSize int
76

77
        lastFlush time.Time
78

79
        // Stats
80
        pendingWrites int64
81
        totalFlushes  int64
82

83
        // Flush mutex prevents concurrent flushes (race condition fix)
84
        flushMu sync.RWMutex
85
}
86

87
// AsyncEngineConfig configures the async engine behavior.
88
type AsyncEngineConfig struct {
89
        // FlushInterval controls how often pending writes are flushed.
90
        // Smaller = more consistent, larger = better throughput.
91
        // Default: 50ms
92
        FlushInterval time.Duration
93

94
        // AdaptiveFlush enables volume-based flush timing.
95
        // When enabled, the flush loop ticks at MinFlushInterval and only
96
        // flushes when the adaptive interval has elapsed.
97
        AdaptiveFlush bool
98

99
        // MinFlushInterval is the shortest interval between flushes when adaptive flush is enabled.
100
        // Default: 10ms
101
        MinFlushInterval time.Duration
102

103
        // MaxFlushInterval is the longest interval between flushes when adaptive flush is enabled.
104
        // Default: 200ms
105
        MaxFlushInterval time.Duration
106

107
        // TargetFlushSize is the pending write count at which we reach MaxFlushInterval.
108
        // Smaller batches flush more frequently; larger batches flush less frequently.
109
        // Default: 1000 pending writes
110
        TargetFlushSize int
111

112
        // MaxNodeCacheSize is the maximum number of nodes to buffer before forcing a flush.
113
        // When this limit is reached, CreateNode will block and flush synchronously.
114
        // This prevents unbounded memory growth during bulk inserts.
115
        // Set to 0 for unlimited (not recommended for bulk operations).
116
        // Default: 50000 (50K nodes, ~35MB assuming 700 bytes/node)
117
        MaxNodeCacheSize int
118

119
        // MaxEdgeCacheSize is the maximum number of edges to buffer before forcing a flush.
120
        // When this limit is reached, CreateEdge will block and flush synchronously.
121
        // This prevents unbounded memory growth during bulk inserts.
122
        // Set to 0 for unlimited (not recommended for bulk operations).
123
        // Default: 100000 (100K edges, ~50MB assuming 500 bytes/edge)
124
        MaxEdgeCacheSize int
125
}
126

127
// NewAsyncEngine wraps an engine with write-behind caching.
128
func NewAsyncEngine(engine Engine, config *AsyncEngineConfig) *AsyncEngine {
1✔
129
        if config == nil {
2✔
130
                config = DefaultAsyncEngineConfig()
1✔
131
        }
1✔
132
        if config.MinFlushInterval <= 0 {
2✔
133
                config.MinFlushInterval = 10 * time.Millisecond
1✔
134
        }
1✔
135
        if config.MaxFlushInterval <= 0 {
2✔
136
                config.MaxFlushInterval = 200 * time.Millisecond
1✔
137
        }
1✔
138
        if config.TargetFlushSize <= 0 {
2✔
139
                config.TargetFlushSize = 1000
1✔
140
        }
1✔
141
        if config.MinFlushInterval > config.MaxFlushInterval {
1✔
142
                config.MinFlushInterval = config.MaxFlushInterval
×
143
        }
×
144

145
        ae := &AsyncEngine{
1✔
146
                engine:             engine,
1✔
147
                nodeCache:          make(map[NodeID]*Node),
1✔
148
                edgeCache:          make(map[EdgeID]*Edge),
1✔
149
                deleteNodes:        make(map[NodeID]bool),
1✔
150
                deleteEdges:        make(map[EdgeID]bool),
1✔
151
                inFlightNodes:      make(map[NodeID]bool),
1✔
152
                inFlightEdges:      make(map[EdgeID]bool),
1✔
153
                updateNodes:        make(map[NodeID]bool),
1✔
154
                updateEdges:        make(map[EdgeID]bool),
1✔
155
                nodeUpdateBaseline: make(map[NodeID]*Node),
1✔
156
                labelIndex:         make(map[string]map[NodeID]bool),
1✔
157
                flushInterval:      config.FlushInterval,
1✔
158
                minFlushInterval:   config.MinFlushInterval,
1✔
159
                maxFlushInterval:   config.MaxFlushInterval,
1✔
160
                targetFlushSize:    config.TargetFlushSize,
1✔
161
                adaptiveFlush:      config.AdaptiveFlush,
1✔
162
                maxNodeCacheSize:   config.MaxNodeCacheSize,
1✔
163
                maxEdgeCacheSize:   config.MaxEdgeCacheSize,
1✔
164
                stopChan:           make(chan struct{}),
1✔
165
                lastFlush:          time.Now(),
1✔
166
        }
1✔
167

1✔
168
        // Start background flush goroutine
1✔
169
        if ae.adaptiveFlush {
2✔
170
                ae.flushTicker = time.NewTicker(config.MinFlushInterval)
1✔
171
        } else {
2✔
172
                ae.flushTicker = time.NewTicker(config.FlushInterval)
1✔
173
        }
1✔
174
        ae.wg.Add(1)
1✔
175
        go ae.flushLoop()
1✔
176

1✔
177
        return ae
1✔
178
}
179

180
// flushLoop periodically flushes pending writes to the underlying engine.
181
func (ae *AsyncEngine) flushLoop() {
1✔
182
        defer ae.wg.Done()
1✔
183

1✔
184
        for {
2✔
185
                select {
1✔
186
                case <-ae.flushTicker.C:
1✔
187
                        if ae.adaptiveFlush {
2✔
188
                                pending := ae.pendingWriteCount()
1✔
189
                                if pending == 0 {
2✔
190
                                        continue
1✔
191
                                }
192
                                interval := ae.adaptiveFlushInterval(pending)
1✔
193
                                ae.flushMu.RLock()
1✔
194
                                lastFlush := ae.lastFlush
1✔
195
                                ae.flushMu.RUnlock()
1✔
196
                                if time.Since(lastFlush) < interval {
2✔
197
                                        continue
1✔
198
                                }
199
                        }
200
                        if err := ae.Flush(); err != nil {
2✔
201
                                // Don't log storage closed during shutdown (engine closed by teardown order or race with stopChan).
1✔
202
                                if !errors.Is(err, ErrStorageClosed) && !strings.Contains(err.Error(), ErrStorageClosed.Error()) {
2✔
203
                                        log.Printf("async flush failed: %v", err)
1✔
204
                                }
1✔
205
                        }
206
                case <-ae.stopChan:
1✔
207
                        // Final flush on shutdown
1✔
208
                        ae.Flush()
1✔
209
                        return
1✔
210
                }
211
        }
212
}
213

214
// FlushResult tracks the outcome of a flush operation for observability.
215
type FlushResult struct {
216
        NodesWritten     int
217
        NodesFailed      int
218
        EdgesWritten     int
219
        EdgesFailed      int
220
        NodesDeleted     int
221
        EdgesDeleted     int
222
        DeletesFailed    int
223
        FailedNodeIDs    []NodeID // IDs that failed - still in cache for retry
224
        FailedEdgeIDs    []EdgeID // IDs that failed - still in cache for retry
225
        FirstNodeError   string
226
        FirstEdgeError   string
227
        FirstDeleteError string
228
}
229

230
// HasErrors returns true if any flush operations failed.
231
func (r FlushResult) HasErrors() bool {
1✔
232
        return r.NodesFailed > 0 || r.EdgesFailed > 0 || r.DeletesFailed > 0
1✔
233
}
1✔
234

235
// isStorageClosedOnly returns true if the flush failed only due to storage closed (expected during shutdown).
236
func (r FlushResult) isStorageClosedOnly() bool {
1✔
237
        if !r.HasErrors() {
1✔
238
                return false
×
239
        }
×
240
        closed := ErrStorageClosed.Error()
1✔
241
        first := r.FirstNodeError
1✔
242
        if first == "" {
2✔
243
                first = r.FirstEdgeError
1✔
244
        }
1✔
245
        if first == "" {
1✔
246
                first = r.FirstDeleteError
×
247
        }
×
248
        return first == closed || strings.Contains(first, closed)
1✔
249
}
250

251
// Flush writes all pending changes to the underlying engine.
252
// Uses batched operations for better performance - all deletes in one transaction.
253
//
254
// CRITICAL FIX: Failed items are NOT removed from cache - they will be retried
255
// on the next flush. This prevents silent data loss.
256
//
257
// Design: Snapshot caches, clear them, UNLOCK, then write to engine.
258
// Reads during write see engine data (consistent since cache is empty).
259
// This avoids blocking reads during I/O which kills Mac M-series performance.
260
//
261
// Thread-safe: Uses flushMu to prevent concurrent flushes which can cause
262
// race conditions when cache limit is reached during concurrent writes.
263
func (ae *AsyncEngine) Flush() error {
1✔
264
        ae.flushMu.Lock()
1✔
265
        defer ae.flushMu.Unlock()
1✔
266

1✔
267
        result := ae.FlushWithResult()
1✔
268
        if result.HasErrors() {
2✔
269
                details := ""
1✔
270
                if result.FirstNodeError != "" {
2✔
271
                        details = result.FirstNodeError
1✔
272
                } else if result.FirstEdgeError != "" {
3✔
273
                        details = result.FirstEdgeError
1✔
274
                } else if result.FirstDeleteError != "" {
1✔
275
                        details = result.FirstDeleteError
×
276
                }
×
277
                if details != "" {
2✔
278
                        return fmt.Errorf("flush incomplete: %d nodes failed, %d edges failed, %d deletes failed (%s)",
1✔
279
                                result.NodesFailed, result.EdgesFailed, result.DeletesFailed, details)
1✔
280
                }
1✔
281
                return fmt.Errorf("flush incomplete: %d nodes failed, %d edges failed, %d deletes failed",
×
282
                        result.NodesFailed, result.EdgesFailed, result.DeletesFailed)
×
283
        }
284
        if result.NodesWritten+result.EdgesWritten+result.NodesDeleted+result.EdgesDeleted > 0 {
2✔
285
                ae.lastFlush = time.Now()
1✔
286
        }
1✔
287
        return nil
1✔
288
}
289

290
// FlushWithResult writes pending changes and returns detailed results.
291
// Use this for programmatic access to flush statistics.
292
func (ae *AsyncEngine) FlushWithResult() FlushResult {
1✔
293
        result := FlushResult{
1✔
294
                FailedNodeIDs: make([]NodeID, 0),
1✔
295
                FailedEdgeIDs: make([]EdgeID, 0),
1✔
296
        }
1✔
297

1✔
298
        ae.mu.Lock()
1✔
299

1✔
300
        // Nothing to flush
1✔
301
        if len(ae.nodeCache) == 0 && len(ae.edgeCache) == 0 && len(ae.deleteNodes) == 0 && len(ae.deleteEdges) == 0 {
2✔
302
                ae.mu.Unlock()
1✔
303
                return result
1✔
304
        }
1✔
305

306
        ae.totalFlushes++
1✔
307

1✔
308
        // Snapshot pending changes
1✔
309
        nodesToWrite := make(map[NodeID]*Node, len(ae.nodeCache))
1✔
310
        for k, v := range ae.nodeCache {
2✔
311
                nodesToWrite[k] = v
1✔
312
        }
1✔
313
        nodeBaselines := make(map[NodeID]*Node, len(nodesToWrite))
1✔
314
        for id := range nodesToWrite {
2✔
315
                if baseline, ok := ae.nodeUpdateBaseline[id]; ok {
2✔
316
                        nodeBaselines[id] = CopyNode(baseline)
1✔
317
                }
1✔
318
        }
319
        edgesToWrite := make(map[EdgeID]*Edge, len(ae.edgeCache))
1✔
320
        for k, v := range ae.edgeCache {
2✔
321
                edgesToWrite[k] = v
1✔
322
        }
1✔
323
        nodesToDelete := make(map[NodeID]bool, len(ae.deleteNodes))
1✔
324
        for k, v := range ae.deleteNodes {
2✔
325
                nodesToDelete[k] = v
1✔
326
        }
1✔
327
        edgesToDelete := make(map[EdgeID]bool, len(ae.deleteEdges))
1✔
328
        for k, v := range ae.deleteEdges {
2✔
329
                edgesToDelete[k] = v
1✔
330
        }
1✔
331

332
        // RACE FIX: Mark nodes/edges as in-flight BEFORE releasing lock
333
        // This prevents NodeCount/EdgeCount from double-counting during the window
334
        // where items are written to underlying engine but not yet cleared from cache
335
        for k := range nodesToWrite {
2✔
336
                ae.inFlightNodes[k] = true
1✔
337
        }
1✔
338
        for k := range edgesToWrite {
2✔
339
                ae.inFlightEdges[k] = true
1✔
340
        }
1✔
341

342
        ae.mu.Unlock()
1✔
343

1✔
344
        // Track successful operations
1✔
345
        successfulNodeWrites := make(map[NodeID]bool)
1✔
346
        successfulEdgeWrites := make(map[EdgeID]bool)
1✔
347
        successfulNodeDeletes := make(map[NodeID]bool)
1✔
348
        successfulEdgeDeletes := make(map[EdgeID]bool)
1✔
349

1✔
350
        // Apply bulk deletes first
1✔
351
        if len(nodesToDelete) > 0 {
2✔
352
                nodeIDs := make([]NodeID, 0, len(nodesToDelete))
1✔
353
                for id := range nodesToDelete {
2✔
354
                        nodeIDs = append(nodeIDs, id)
1✔
355
                }
1✔
356
                if err := ae.engine.BulkDeleteNodes(nodeIDs); err != nil {
2✔
357
                        if result.FirstDeleteError == "" {
2✔
358
                                result.FirstDeleteError = err.Error()
1✔
359
                        }
1✔
360
                        // Bulk failed - try individual deletes
361
                        for _, id := range nodeIDs {
2✔
362
                                if err := ae.engine.DeleteNode(id); err != nil {
1✔
363
                                        result.DeletesFailed++
×
364
                                        if result.FirstDeleteError == "" {
×
365
                                                result.FirstDeleteError = err.Error()
×
366
                                        }
×
367
                                } else {
1✔
368
                                        successfulNodeDeletes[id] = true
1✔
369
                                        result.NodesDeleted++
1✔
370
                                }
1✔
371
                        }
372
                } else {
1✔
373
                        for _, id := range nodeIDs {
2✔
374
                                successfulNodeDeletes[id] = true
1✔
375
                        }
1✔
376
                        result.NodesDeleted = len(nodeIDs)
1✔
377
                }
378
        }
379

380
        if len(edgesToDelete) > 0 {
2✔
381
                edgeIDs := make([]EdgeID, 0, len(edgesToDelete))
1✔
382
                for id := range edgesToDelete {
2✔
383
                        edgeIDs = append(edgeIDs, id)
1✔
384
                }
1✔
385
                if err := ae.engine.BulkDeleteEdges(edgeIDs); err != nil {
1✔
386
                        if result.FirstDeleteError == "" {
×
387
                                result.FirstDeleteError = err.Error()
×
388
                        }
×
389
                        // Bulk failed - try individual deletes
390
                        for _, id := range edgeIDs {
×
391
                                if err := ae.engine.DeleteEdge(id); err != nil {
×
392
                                        result.DeletesFailed++
×
393
                                        if result.FirstDeleteError == "" {
×
394
                                                result.FirstDeleteError = err.Error()
×
395
                                        }
×
396
                                } else {
×
397
                                        successfulEdgeDeletes[id] = true
×
398
                                        result.EdgesDeleted++
×
399
                                }
×
400
                        }
401
                } else {
1✔
402
                        for _, id := range edgeIDs {
2✔
403
                                successfulEdgeDeletes[id] = true
1✔
404
                        }
1✔
405
                        result.EdgesDeleted = len(edgeIDs)
1✔
406
                }
407
        }
408

409
        // Apply creates/updates using UpdateNode (upsert) for each node
410
        // This handles both new nodes and updates to existing nodes
411
        if len(nodesToWrite) > 0 {
2✔
412
                for _, node := range nodesToWrite {
2✔
413
                        if !nodesToDelete[node.ID] {
2✔
414
                                // UpdateNode now has upsert behavior - creates if not exists, updates if exists.
1✔
415
                                // If this node was queued as an update, detect staleness and rebase before write.
1✔
416
                                if err := ae.flushNodeWithRebase(node, nodeBaselines[node.ID]); err != nil {
2✔
417
                                        // CRITICAL FIX: Track failed node - DON'T remove from cache
1✔
418
                                        result.NodesFailed++
1✔
419
                                        result.FailedNodeIDs = append(result.FailedNodeIDs, node.ID)
1✔
420
                                        if result.FirstNodeError == "" {
2✔
421
                                                result.FirstNodeError = err.Error()
1✔
422
                                        }
1✔
423
                                } else {
1✔
424
                                        successfulNodeWrites[node.ID] = true
1✔
425
                                        result.NodesWritten++
1✔
426
                                }
1✔
427
                        }
428
                }
429
        }
430

431
        if len(edgesToWrite) > 0 {
2✔
432
                edges := make([]*Edge, 0, len(edgesToWrite))
1✔
433
                for _, edge := range edgesToWrite {
2✔
434
                        if !edgesToDelete[edge.ID] {
2✔
435
                                edges = append(edges, edge)
1✔
436
                        }
1✔
437
                }
438
                if len(edges) > 0 {
2✔
439
                        if err := ae.engine.BulkCreateEdges(edges); err != nil {
2✔
440
                                if result.FirstEdgeError == "" {
2✔
441
                                        result.FirstEdgeError = err.Error()
1✔
442
                                }
1✔
443
                                // Bulk failed - try individual creates
444
                                for _, edge := range edges {
2✔
445
                                        if err := ae.engine.CreateEdge(edge); err != nil {
2✔
446
                                                // Try update if create fails (might already exist)
1✔
447
                                                if err := ae.engine.UpdateEdge(edge); err != nil {
2✔
448
                                                        result.EdgesFailed++
1✔
449
                                                        result.FailedEdgeIDs = append(result.FailedEdgeIDs, edge.ID)
1✔
450
                                                        if result.FirstEdgeError == "" {
1✔
451
                                                                result.FirstEdgeError = err.Error()
×
452
                                                        }
×
453
                                                } else {
1✔
454
                                                        successfulEdgeWrites[edge.ID] = true
1✔
455
                                                        result.EdgesWritten++
1✔
456
                                                }
1✔
457
                                        } else {
1✔
458
                                                successfulEdgeWrites[edge.ID] = true
1✔
459
                                                result.EdgesWritten++
1✔
460
                                        }
1✔
461
                                }
462
                        } else {
1✔
463
                                for _, edge := range edges {
2✔
464
                                        successfulEdgeWrites[edge.ID] = true
1✔
465
                                }
1✔
466
                                result.EdgesWritten = len(edges)
1✔
467
                        }
468
                }
469
        }
470

471
        // CRITICAL FIX: Only clear SUCCESSFULLY flushed items
472
        ae.mu.Lock()
1✔
473
        for id := range nodesToWrite {
2✔
474
                // Only clear if successfully written AND still the same object in cache
1✔
475
                if successfulNodeWrites[id] && ae.nodeCache[id] == nodesToWrite[id] {
2✔
476
                        delete(ae.nodeCache, id)
1✔
477
                        delete(ae.updateNodes, id) // Clear update flag
1✔
478
                        delete(ae.nodeUpdateBaseline, id)
1✔
479
                }
1✔
480
                // Always clear in-flight marker for this batch (success or fail)
481
                delete(ae.inFlightNodes, id)
1✔
482
        }
483
        for id := range edgesToWrite {
2✔
484
                if successfulEdgeWrites[id] && ae.edgeCache[id] == edgesToWrite[id] {
2✔
485
                        delete(ae.edgeCache, id)
1✔
486
                        delete(ae.updateEdges, id) // Clear update flag
1✔
487
                }
1✔
488
                // Always clear in-flight marker for this batch (success or fail)
489
                delete(ae.inFlightEdges, id)
1✔
490
        }
491
        for id := range nodesToDelete {
2✔
492
                if successfulNodeDeletes[id] && ae.deleteNodes[id] {
2✔
493
                        delete(ae.deleteNodes, id)
1✔
494
                        delete(ae.nodeUpdateBaseline, id)
1✔
495
                }
1✔
496
        }
497
        for id := range edgesToDelete {
2✔
498
                if successfulEdgeDeletes[id] && ae.deleteEdges[id] {
2✔
499
                        delete(ae.deleteEdges, id)
1✔
500
                }
1✔
501
        }
502
        ae.mu.Unlock()
1✔
503

1✔
504
        return result
1✔
505
}
506

507
const asyncNodeRebaseMaxAttempts = 3
508

509
func (ae *AsyncEngine) flushNodeWithRebase(pending, baseline *Node) error {
1✔
510
        if pending == nil {
2✔
511
                return ErrInvalidData
1✔
512
        }
1✔
513
        if baseline == nil {
2✔
514
                return ae.engine.UpdateNode(pending)
1✔
515
        }
1✔
516

517
        candidate := CopyNode(pending)
1✔
518
        baselineSnapshot := CopyNode(baseline)
1✔
519

1✔
520
        for attempt := 0; attempt < asyncNodeRebaseMaxAttempts; attempt++ {
2✔
521
                latest, err := ae.engine.GetNode(candidate.ID)
1✔
522
                if err == nil && !nodesEquivalentForAsyncRebase(latest, baselineSnapshot) {
2✔
523
                        // Underlying row changed since async queue accepted this update.
1✔
524
                        // Rebase async intent onto latest committed state.
1✔
525
                        candidate = rebaseNodeUpdate(baselineSnapshot, pending, latest)
1✔
526
                        baselineSnapshot = CopyNode(latest)
1✔
527
                }
1✔
528

529
                if err := ae.engine.UpdateNode(candidate); err != nil {
2✔
530
                        if attempt == asyncNodeRebaseMaxAttempts-1 {
2✔
531
                                return err
1✔
532
                        }
1✔
533
                        continue
1✔
534
                }
535
                return nil
1✔
536
        }
537

538
        return fmt.Errorf("failed to apply async node update for %s after %d attempts", pending.ID, asyncNodeRebaseMaxAttempts)
×
539
}
540

541
func nodesEquivalentForAsyncRebase(a, b *Node) bool {
1✔
542
        if a == nil || b == nil {
2✔
543
                return a == b
1✔
544
        }
1✔
545
        return reflect.DeepEqual(a.Labels, b.Labels) &&
1✔
546
                reflect.DeepEqual(a.Properties, b.Properties) &&
1✔
547
                reflect.DeepEqual(a.NamedEmbeddings, b.NamedEmbeddings) &&
1✔
548
                reflect.DeepEqual(a.ChunkEmbeddings, b.ChunkEmbeddings) &&
1✔
549
                reflect.DeepEqual(a.EmbedMeta, b.EmbedMeta)
1✔
550
}
551

552
func rebaseNodeUpdate(base, pending, latest *Node) *Node {
1✔
553
        if pending == nil {
2✔
554
                return CopyNode(latest)
1✔
555
        }
1✔
556
        if latest == nil || base == nil {
2✔
557
                return CopyNode(pending)
1✔
558
        }
1✔
559

560
        rebased := CopyNode(latest)
1✔
561
        if rebased.Properties == nil {
1✔
562
                rebased.Properties = make(map[string]any)
×
563
        }
×
564

565
        if !reflect.DeepEqual(base.Labels, pending.Labels) {
2✔
566
                rebased.Labels = append([]string(nil), pending.Labels...)
1✔
567
        }
1✔
568

569
        baseProps := base.Properties
1✔
570
        if baseProps == nil {
1✔
571
                baseProps = map[string]any{}
×
572
        }
×
573
        pendingProps := pending.Properties
1✔
574
        if pendingProps == nil {
1✔
575
                pendingProps = map[string]any{}
×
576
        }
×
577

578
        for key, pendingValue := range pendingProps {
2✔
579
                baseValue, existedInBase := baseProps[key]
1✔
580
                if !existedInBase || !reflect.DeepEqual(baseValue, pendingValue) {
2✔
581
                        rebased.Properties[key] = pendingValue
1✔
582
                }
1✔
583
        }
584
        for key := range baseProps {
2✔
585
                if _, existsInPending := pendingProps[key]; !existsInPending {
2✔
586
                        delete(rebased.Properties, key)
1✔
587
                }
1✔
588
        }
589

590
        if !reflect.DeepEqual(base.NamedEmbeddings, pending.NamedEmbeddings) {
2✔
591
                rebased.NamedEmbeddings = CopyNode(pending).NamedEmbeddings
1✔
592
        }
1✔
593
        if !reflect.DeepEqual(base.ChunkEmbeddings, pending.ChunkEmbeddings) {
1✔
594
                rebased.ChunkEmbeddings = CopyNode(pending).ChunkEmbeddings
×
595
        }
×
596
        if !reflect.DeepEqual(base.EmbedMeta, pending.EmbedMeta) {
2✔
597
                rebased.EmbedMeta = CopyNode(pending).EmbedMeta
1✔
598
        }
1✔
599

600
        return rebased
1✔
601
}
602

603
// GetEngine returns the underlying storage engine.
604
// Used for transaction support which needs direct access.
605
func (ae *AsyncEngine) GetEngine() Engine {
1✔
606
        return ae.engine
1✔
607
}
1✔
608

609
// CreateNode adds to cache and returns immediately.
610
func (ae *AsyncEngine) CreateNode(node *Node) (NodeID, error) {
1✔
611
        if node == nil {
2✔
612
                return "", ErrInvalidData
1✔
613
        }
1✔
614
        if err := validatePropertiesForStorage(node.Properties); err != nil {
2✔
615
                return "", err
1✔
616
        }
1✔
617
        if err := ae.validateNodeConstraints(node); err != nil {
1✔
618
                return "", err
×
619
        }
×
620

621
        // Check cache size limit BEFORE acquiring lock to avoid deadlock
622
        // If cache is full, flush synchronously to make room
623
        if ae.maxNodeCacheSize > 0 {
2✔
624
                ae.mu.RLock()
1✔
625
                cacheSize := len(ae.nodeCache)
1✔
626
                ae.mu.RUnlock()
1✔
627
                if cacheSize >= ae.maxNodeCacheSize {
2✔
628
                        if err := ae.Flush(); err != nil {
1✔
629
                                return "", err
×
630
                        }
×
631
                }
632
        }
633

634
        ae.mu.Lock()
1✔
635
        defer ae.mu.Unlock()
1✔
636

1✔
637
        // Remove from delete set if present (recreating a deleted node)
1✔
638
        wasDeleted := ae.deleteNodes[node.ID]
1✔
639
        delete(ae.deleteNodes, node.ID)
1✔
640

1✔
641
        // Check if this node already exists in cache (being updated, not created)
1✔
642
        _, existsInCache := ae.nodeCache[node.ID]
1✔
643

1✔
644
        // Mark as update only if it was pending delete OR already in cache
1✔
645
        // DO NOT check underlying engine - that causes race conditions and is slow
1✔
646
        // New nodes from CREATE always have fresh UUIDs that won't exist anywhere
1✔
647
        isUpdate := wasDeleted || existsInCache
1✔
648
        if isUpdate {
2✔
649
                ae.updateNodes[node.ID] = true
1✔
650
        } else {
2✔
651
                delete(ae.updateNodes, node.ID)
1✔
652
        }
1✔
653

654
        ae.nodeCache[node.ID] = node
1✔
655
        delete(ae.nodeUpdateBaseline, node.ID)
1✔
656

1✔
657
        // Update label index
1✔
658
        for _, label := range node.Labels {
2✔
659
                normalLabel := strings.ToLower(label)
1✔
660
                if ae.labelIndex[normalLabel] == nil {
2✔
661
                        ae.labelIndex[normalLabel] = make(map[NodeID]bool)
1✔
662
                }
1✔
663
                ae.labelIndex[normalLabel][node.ID] = true
1✔
664
        }
665

666
        ae.pendingWrites++
1✔
667
        return node.ID, nil
1✔
668
}
669

670
// UpdateNode adds to cache and returns immediately.
671
func (ae *AsyncEngine) UpdateNode(node *Node) error {
1✔
672
        if node == nil {
1✔
673
                return ErrInvalidData
×
674
        }
×
675
        if err := validatePropertiesForStorage(node.Properties); err != nil {
1✔
676
                return err
×
677
        }
×
678
        if err := ae.validateNodeConstraints(node); err != nil {
1✔
679
                return err
×
680
        }
×
681

682
        var baseline *Node
1✔
683
        ae.mu.RLock()
1✔
684
        _, alreadyQueued := ae.nodeCache[node.ID]
1✔
685
        _, hasBaseline := ae.nodeUpdateBaseline[node.ID]
1✔
686
        ae.mu.RUnlock()
1✔
687
        if !alreadyQueued && !hasBaseline {
2✔
688
                if existing, err := ae.engine.GetNode(node.ID); err == nil {
2✔
689
                        baseline = CopyNode(existing)
1✔
690
                }
1✔
691
        }
692

693
        ae.mu.Lock()
1✔
694
        defer ae.mu.Unlock()
1✔
695

1✔
696
        if _, exists := ae.nodeUpdateBaseline[node.ID]; !exists {
2✔
697
                if _, existsInCache := ae.nodeCache[node.ID]; !existsInCache {
2✔
698
                        ae.nodeUpdateBaseline[node.ID] = baseline
1✔
699
                }
1✔
700
        }
701

702
        ae.nodeCache[node.ID] = node
1✔
703
        ae.pendingWrites++
1✔
704
        return nil
1✔
705
}
706

707
// UpdateNodeEmbedding updates an existing node with its embedding.
708
// Unlike UpdateNode, this MUST NOT create a new node; it returns ErrNotFound
709
// if the node does not exist (in cache, in-flight, or in the underlying engine).
710
func (ae *AsyncEngine) UpdateNodeEmbedding(node *Node) error {
1✔
711
        ae.mu.Lock()
1✔
712
        defer ae.mu.Unlock()
1✔
713

1✔
714
        if ae.deleteNodes[node.ID] {
1✔
715
                return ErrNotFound
×
716
        }
×
717

718
        // Exists in cache (including nodes created/updated but not yet flushed).
719
        if _, ok := ae.nodeCache[node.ID]; ok {
2✔
720
                // Important: do NOT mark this as an update here. If the node is a pending create
1✔
721
                // (not yet flushed), it must still count as a create for NodeCount/EdgeCount.
1✔
722
                ae.nodeCache[node.ID] = node
1✔
723
                ae.pendingWrites++
1✔
724
                return nil
1✔
725
        }
1✔
726

727
        // In-flight nodes will exist in the underlying engine after flush; allow update.
728
        if ae.inFlightNodes[node.ID] {
2✔
729
                // This is an update to an existing node (at minimum, it will exist after the in-flight write).
1✔
730
                // Mark as update so NodeCount doesn't temporarily treat it as a pending create.
1✔
731
                ae.updateNodes[node.ID] = true
1✔
732
                ae.nodeCache[node.ID] = node
1✔
733
                ae.pendingWrites++
1✔
734
                return nil
1✔
735
        }
1✔
736

737
        // Verify existence in underlying engine before accepting the update.
738
        if _, err := ae.engine.GetNode(node.ID); err != nil {
1✔
739
                return ErrNotFound
×
740
        }
×
741

742
        // Node exists in the underlying engine, so this is an update.
743
        // Mark as update so NodeCount doesn't temporarily treat it as a pending create.
744
        ae.updateNodes[node.ID] = true
1✔
745
        ae.nodeCache[node.ID] = node
1✔
746
        ae.pendingWrites++
1✔
747
        return nil
1✔
748
}
749

750
// DeleteNode marks for deletion and returns immediately.
751
// Optimized: if node was created in this transaction (still in cache),
752
// just remove it from cache - no need to delete from underlying engine.
753
// CRITICAL: If node is in-flight (being flushed), we must also mark for deletion
754
// because the flush will write it to the underlying engine.
755
func (ae *AsyncEngine) DeleteNode(id NodeID) error {
1✔
756
        for {
2✔
757
                ae.mu.Lock()
1✔
758

1✔
759
                // Check if already marked for deletion (idempotent)
1✔
760
                if ae.deleteNodes[id] {
2✔
761
                        ae.mu.Unlock()
1✔
762
                        return nil
1✔
763
                }
1✔
764

765
                // Check if node is being flushed right now (in-flight)
766
                isInFlight := ae.inFlightNodes[id]
1✔
767

1✔
768
                // Check if node was created/updated in this transaction (still in cache)
1✔
769
                if node, existsInCache := ae.nodeCache[id]; existsInCache {
2✔
770
                        // If this is a pending CREATE (not an update of an existing node), deleting it
1✔
771
                        // will never hit the inner engine, so no inner OnNodeDeleted callback will fire.
1✔
772
                        // Emit a best-effort delete notification so external services (search indexes,
1✔
773
                        // embedding counts) can drop any speculative state for this node.
1✔
774
                        shouldNotify := !ae.updateNodes[id]
1✔
775

1✔
776
                        // Remove from label index
1✔
777
                        for _, label := range node.Labels {
2✔
778
                                normalLabel := strings.ToLower(label)
1✔
779
                                if ae.labelIndex[normalLabel] != nil {
2✔
780
                                        delete(ae.labelIndex[normalLabel], id)
1✔
781
                                }
1✔
782
                        }
783
                        // Remove from cache
784
                        delete(ae.nodeCache, id)
1✔
785
                        delete(ae.nodeUpdateBaseline, id)
1✔
786

1✔
787
                        // CRITICAL FIX: If node is in-flight, the flush will still write it
1✔
788
                        // to the underlying engine, so we must also mark it for deletion
1✔
789
                        if isInFlight {
1✔
790
                                ae.deleteNodes[id] = true
×
791
                                ae.pendingWrites++
×
792
                        }
×
793

794
                        ae.mu.Unlock()
1✔
795
                        ae.MarkNodeEmbedded(id)
1✔
796
                        if shouldNotify {
2✔
797
                                ae.notifyNodeDeleted(id)
1✔
798
                        }
1✔
799
                        return nil
1✔
800
                }
801

802
                // If in-flight, it will exist in underlying engine after flush - mark for deletion
803
                if isInFlight {
2✔
804
                        ae.deleteNodes[id] = true
1✔
805
                        ae.pendingWrites++
1✔
806
                        ae.mu.Unlock()
1✔
807
                        ae.MarkNodeEmbedded(id)
1✔
808
                        return nil
1✔
809
                }
1✔
810

811
                ae.mu.Unlock()
1✔
812

1✔
813
                // Check if node actually exists in underlying engine before marking for deletion.
1✔
814
                // This prevents count going negative for non-existent nodes.
1✔
815
                if _, err := ae.engine.GetNode(id); err != nil {
2✔
816
                        // Node doesn't exist anywhere - nothing to delete
1✔
817
                        return ErrNotFound
1✔
818
                }
1✔
819

820
                // Re-check state under lock in case the node was recreated/updated concurrently.
821
                ae.mu.Lock()
1✔
822
                if ae.deleteNodes[id] {
1✔
823
                        ae.mu.Unlock()
×
824
                        return nil
×
825
                }
×
826
                if _, existsInCache := ae.nodeCache[id]; existsInCache {
1✔
827
                        ae.mu.Unlock()
×
828
                        continue
×
829
                }
830
                if ae.inFlightNodes[id] {
1✔
831
                        ae.deleteNodes[id] = true
×
832
                        ae.pendingWrites++
×
833
                        ae.mu.Unlock()
×
834
                        ae.MarkNodeEmbedded(id)
×
835
                        return nil
×
836
                }
×
837

838
                ae.deleteNodes[id] = true
1✔
839
                ae.pendingWrites++
1✔
840
                ae.mu.Unlock()
1✔
841
                ae.MarkNodeEmbedded(id)
1✔
842
                return nil
1✔
843
        }
844
}
845

846
// CreateEdge adds to cache and returns immediately.
847
func (ae *AsyncEngine) CreateEdge(edge *Edge) error {
1✔
848
        if edge == nil {
2✔
849
                return ErrInvalidData
1✔
850
        }
1✔
851
        if err := validatePropertiesForStorage(edge.Properties); err != nil {
1✔
852
                return err
×
853
        }
×
854
        // Check cache size limit BEFORE acquiring lock to avoid deadlock
855
        // If cache is full, flush synchronously to make room
856
        if ae.maxEdgeCacheSize > 0 {
2✔
857
                ae.mu.RLock()
1✔
858
                cacheSize := len(ae.edgeCache)
1✔
859
                ae.mu.RUnlock()
1✔
860
                if cacheSize >= ae.maxEdgeCacheSize {
2✔
861
                        ae.Flush() // Synchronous flush - blocks until complete
1✔
862
                }
1✔
863
        }
864

865
        ae.mu.Lock()
1✔
866
        defer ae.mu.Unlock()
1✔
867

1✔
868
        // Remove from delete set if present (recreating a deleted edge)
1✔
869
        wasDeleted := ae.deleteEdges[edge.ID]
1✔
870
        delete(ae.deleteEdges, edge.ID)
1✔
871

1✔
872
        // Check if this edge already exists in cache (being updated, not created)
1✔
873
        _, existsInCache := ae.edgeCache[edge.ID]
1✔
874

1✔
875
        // Mark as update only if it was pending delete OR already in cache
1✔
876
        // DO NOT check underlying engine - that causes race conditions and is slow
1✔
877
        if wasDeleted || existsInCache {
1✔
878
                ae.updateEdges[edge.ID] = true
×
879
        } else {
1✔
880
                delete(ae.updateEdges, edge.ID)
1✔
881
        }
1✔
882

883
        ae.edgeCache[edge.ID] = edge
1✔
884
        ae.pendingWrites++
1✔
885
        return nil
1✔
886
}
887

888
// UpdateEdge adds to cache and returns immediately.
889
func (ae *AsyncEngine) UpdateEdge(edge *Edge) error {
1✔
890
        if edge == nil {
2✔
891
                return ErrInvalidData
1✔
892
        }
1✔
893
        if err := validatePropertiesForStorage(edge.Properties); err != nil {
1✔
894
                return err
×
895
        }
×
896
        ae.mu.Lock()
1✔
897
        defer ae.mu.Unlock()
1✔
898

1✔
899
        ae.edgeCache[edge.ID] = edge
1✔
900
        ae.pendingWrites++
1✔
901
        return nil
1✔
902
}
903

904
// DeleteEdge marks for deletion and returns immediately.
905
// Optimized: if edge was created in this transaction (still in cache),
906
// just remove it from cache - no need to delete from underlying engine.
907
// CRITICAL: If edge is in-flight (being flushed), we must also mark for deletion
908
// because the flush will write it to the underlying engine.
909
func (ae *AsyncEngine) DeleteEdge(id EdgeID) error {
1✔
910
        ae.mu.Lock()
1✔
911
        defer ae.mu.Unlock()
1✔
912

1✔
913
        // Check if already marked for deletion (idempotent)
1✔
914
        if ae.deleteEdges[id] {
2✔
915
                return nil
1✔
916
        }
1✔
917

918
        // Check if edge is being flushed right now (in-flight)
919
        isInFlight := ae.inFlightEdges[id]
1✔
920

1✔
921
        // Check if edge was created in this transaction (still in cache)
1✔
922
        if _, existsInCache := ae.edgeCache[id]; existsInCache {
2✔
923
                // Edge was created but not flushed - just remove from cache
1✔
924
                delete(ae.edgeCache, id)
1✔
925

1✔
926
                // CRITICAL FIX: If edge is in-flight, the flush will still write it
1✔
927
                // to the underlying engine, so we must also mark it for deletion
1✔
928
                if isInFlight {
2✔
929
                        ae.deleteEdges[id] = true
1✔
930
                        ae.pendingWrites++
1✔
931
                }
1✔
932
                return nil
1✔
933
        }
934

935
        // If in-flight, it will exist in underlying engine after flush - mark for deletion
936
        if isInFlight {
2✔
937
                ae.deleteEdges[id] = true
1✔
938
                ae.pendingWrites++
1✔
939
                return nil
1✔
940
        }
1✔
941

942
        // Check if edge actually exists in underlying engine before marking for deletion
943
        // This prevents count going negative for non-existent edges
944
        if _, err := ae.engine.GetEdge(id); err != nil {
2✔
945
                // Edge doesn't exist anywhere - nothing to delete
1✔
946
                return ErrNotFound
1✔
947
        }
1✔
948

949
        // Edge exists in underlying engine - mark for deletion
950
        ae.deleteEdges[id] = true
1✔
951
        ae.pendingWrites++
1✔
952
        return nil
1✔
953
}
954

955
// GetNode checks cache first, then underlying engine.
956
func (ae *AsyncEngine) GetNode(id NodeID) (*Node, error) {
1✔
957
        ae.mu.RLock()
1✔
958
        // Check if deleted
1✔
959
        if ae.deleteNodes[id] {
2✔
960
                ae.mu.RUnlock()
1✔
961
                return nil, ErrNotFound
1✔
962
        }
1✔
963
        // Check cache
964
        if node, ok := ae.nodeCache[id]; ok {
2✔
965
                ae.mu.RUnlock()
1✔
966
                return node, nil
1✔
967
        }
1✔
968
        ae.mu.RUnlock()
1✔
969

1✔
970
        // Fall through to engine
1✔
971
        return ae.engine.GetNode(id)
1✔
972
}
973

974
// GetEdge checks cache first, then underlying engine.
975
func (ae *AsyncEngine) GetEdge(id EdgeID) (*Edge, error) {
1✔
976
        ae.mu.RLock()
1✔
977
        if ae.deleteEdges[id] {
2✔
978
                ae.mu.RUnlock()
1✔
979
                return nil, ErrNotFound
1✔
980
        }
1✔
981
        if edge, ok := ae.edgeCache[id]; ok {
2✔
982
                ae.mu.RUnlock()
1✔
983
                return edge, nil
1✔
984
        }
1✔
985
        ae.mu.RUnlock()
1✔
986

1✔
987
        return ae.engine.GetEdge(id)
1✔
988
}
989

990
// ForEachNodeIDByLabel streams node IDs for a label, combining cache + engine.
991
// Stops early when visit returns false.
992
func (ae *AsyncEngine) ForEachNodeIDByLabel(label string, visit func(NodeID) bool) error {
1✔
993
        if visit == nil {
2✔
994
                return nil
1✔
995
        }
1✔
996

997
        normalLabel := strings.ToLower(label)
1✔
998

1✔
999
        ae.mu.RLock()
1✔
1000
        deletedIDs := make(map[NodeID]bool, len(ae.deleteNodes))
1✔
1001
        for id := range ae.deleteNodes {
2✔
1002
                deletedIDs[id] = true
1✔
1003
        }
1✔
1004
        cachedIDs := make([]NodeID, 0, len(ae.labelIndex[normalLabel]))
1✔
1005
        for id := range ae.labelIndex[normalLabel] {
2✔
1006
                if !deletedIDs[id] {
2✔
1007
                        cachedIDs = append(cachedIDs, id)
1✔
1008
                }
1✔
1009
        }
1010
        ae.mu.RUnlock()
1✔
1011

1✔
1012
        seen := make(map[NodeID]struct{}, len(cachedIDs))
1✔
1013
        for _, id := range cachedIDs {
2✔
1014
                seen[id] = struct{}{}
1✔
1015
                if !visit(id) {
2✔
1016
                        return nil
1✔
1017
                }
1✔
1018
        }
1019

1020
        if lookup, ok := ae.engine.(LabelNodeIDLookupEngine); ok {
2✔
1021
                return lookup.ForEachNodeIDByLabel(label, func(id NodeID) bool {
2✔
1022
                        if deletedIDs[id] {
1✔
1023
                                return true
×
1024
                        }
×
1025
                        if _, ok := seen[id]; ok {
1✔
1026
                                return true
×
1027
                        }
×
1028
                        seen[id] = struct{}{}
1✔
1029
                        return visit(id)
1✔
1030
                })
1031
        }
1032

1033
        nodes, err := ae.engine.GetNodesByLabel(label)
1✔
1034
        if err != nil {
2✔
1035
                return err
1✔
1036
        }
1✔
1037
        for _, node := range nodes {
2✔
1038
                if node == nil {
1✔
1039
                        continue
×
1040
                }
1041
                if deletedIDs[node.ID] {
2✔
1042
                        continue
1✔
1043
                }
1044
                if _, ok := seen[node.ID]; ok {
2✔
1045
                        continue
1✔
1046
                }
1047
                seen[node.ID] = struct{}{}
×
1048
                if !visit(node.ID) {
×
1049
                        return nil
×
1050
                }
×
1051
        }
1052
        return nil
1✔
1053
}
1054

1055
// GetNodesByLabel checks cache and merges with engine results.
1056
// Uses case-insensitive label matching for Neo4j compatibility.
1057
// Snapshots cache state quickly, then releases lock before engine I/O.
1058
// GetFirstNodeByLabel returns the first node with the specified label.
1059
// Optimized for MATCH...LIMIT 1 patterns - uses label index for O(1) lookup.
1060
func (ae *AsyncEngine) GetFirstNodeByLabel(label string) (*Node, error) {
1✔
1061
        ae.mu.RLock()
1✔
1062
        normalLabel := strings.ToLower(label)
1✔
1063

1✔
1064
        // Use label index for O(1) lookup instead of scanning entire cache
1✔
1065
        if nodeIDs := ae.labelIndex[normalLabel]; len(nodeIDs) > 0 {
2✔
1066
                for id := range nodeIDs {
2✔
1067
                        if !ae.deleteNodes[id] {
2✔
1068
                                if node := ae.nodeCache[id]; node != nil {
2✔
1069
                                        ae.mu.RUnlock()
1✔
1070
                                        return node, nil
1✔
1071
                                }
1✔
1072
                        }
1073
                }
1074
        }
1075
        ae.mu.RUnlock()
1✔
1076

1✔
1077
        return ae.engine.GetFirstNodeByLabel(label)
1✔
1078
}
1079

1080
func (ae *AsyncEngine) GetNodesByLabel(label string) ([]*Node, error) {
1✔
1081
        ae.mu.RLock()
1✔
1082
        cachedNodes := make([]*Node, 0)
1✔
1083
        deletedIDs := make(map[NodeID]bool)
1✔
1084

1✔
1085
        // Normalize label for case-insensitive matching (Neo4j compatible)
1✔
1086
        normalLabel := strings.ToLower(label)
1✔
1087

1✔
1088
        for id := range ae.deleteNodes {
1✔
1089
                deletedIDs[id] = true
×
1090
        }
×
1091
        for _, node := range ae.nodeCache {
2✔
1092
                for _, l := range node.Labels {
2✔
1093
                        if strings.ToLower(l) == normalLabel { // Case-insensitive comparison
2✔
1094
                                cachedNodes = append(cachedNodes, node)
1✔
1095
                                break
1✔
1096
                        }
1097
                }
1098
        }
1099
        ae.mu.RUnlock()
1✔
1100

1✔
1101
        // Get from engine WITHOUT lock (I/O can be slow)
1✔
1102
        engineNodes, err := ae.engine.GetNodesByLabel(label)
1✔
1103
        if err != nil {
2✔
1104
                return cachedNodes, nil // Return cache-only on error
1✔
1105
        }
1✔
1106

1107
        // Merge: cache overrides engine
1108
        result := make([]*Node, 0, len(cachedNodes)+len(engineNodes))
1✔
1109

1✔
1110
        // Add cached nodes first
1✔
1111
        seenIDs := make(map[NodeID]bool)
1✔
1112
        for _, node := range cachedNodes {
1✔
1113
                result = append(result, node)
×
1114
                seenIDs[node.ID] = true
×
1115
        }
×
1116

1117
        // Add engine nodes not in cache or deleted
1118
        for _, node := range engineNodes {
2✔
1119
                if !seenIDs[node.ID] && !deletedIDs[node.ID] {
2✔
1120
                        result = append(result, node)
1✔
1121
                }
1✔
1122
        }
1123

1124
        return result, nil
1✔
1125
}
1126

1127
// BatchGetNodes fetches multiple nodes, checking cache first then engine.
1128
// Returns a map for O(1) lookup. Missing nodes are not included.
1129
func (ae *AsyncEngine) BatchGetNodes(ids []NodeID) (map[NodeID]*Node, error) {
1✔
1130
        if len(ids) == 0 {
2✔
1131
                return make(map[NodeID]*Node), nil
1✔
1132
        }
1✔
1133

1134
        ae.mu.RLock()
1✔
1135
        defer ae.mu.RUnlock()
1✔
1136

1✔
1137
        result := make(map[NodeID]*Node, len(ids))
1✔
1138
        var missingIDs []NodeID
1✔
1139

1✔
1140
        // Check cache and deleted set first
1✔
1141
        for _, id := range ids {
2✔
1142
                if id == "" {
2✔
1143
                        continue
1✔
1144
                }
1145

1146
                // Skip if marked for deletion
1147
                if ae.deleteNodes[id] {
2✔
1148
                        continue
1✔
1149
                }
1150

1151
                // Check cache first
1152
                if node, exists := ae.nodeCache[id]; exists {
2✔
1153
                        result[id] = node
1✔
1154
                        continue
1✔
1155
                }
1156

1157
                // Need to fetch from engine
1158
                missingIDs = append(missingIDs, id)
1✔
1159
        }
1160

1161
        // Batch fetch missing from engine
1162
        if len(missingIDs) > 0 {
2✔
1163
                engineNodes, err := ae.engine.BatchGetNodes(missingIDs)
1✔
1164
                if err != nil {
2✔
1165
                        return result, nil // Return what we have from cache
1✔
1166
                }
1✔
1167

1168
                // Add engine nodes not marked for deletion
1169
                for id, node := range engineNodes {
2✔
1170
                        if !ae.deleteNodes[id] {
2✔
1171
                                result[id] = node
1✔
1172
                        }
1✔
1173
                }
1174
        }
1175

1176
        return result, nil
1✔
1177
}
1178

1179
// AllNodes returns merged view of cache, in-flight nodes, and engine.
1180
// NOTE: We hold the read lock for the ENTIRE operation to prevent race conditions
1181
// with Flush() which clears the cache after writing to the engine.
1182
// CRITICAL: We must include in-flight nodes - these are being written to the engine
1183
// but haven't been cleared from tracking yet. Without this, nodes can become
1184
// "invisible" during the flush window, causing DETACH DELETE to miss them.
1185
func (ae *AsyncEngine) AllNodes() ([]*Node, error) {
1✔
1186
        ae.mu.RLock()
1✔
1187
        defer ae.mu.RUnlock()
1✔
1188

1✔
1189
        // Build set of deleted IDs (these should NOT appear in results)
1✔
1190
        deletedIDs := make(map[NodeID]bool)
1✔
1191
        for id := range ae.deleteNodes {
2✔
1192
                deletedIDs[id] = true
1✔
1193
        }
1✔
1194

1195
        // Collect nodes from cache (pending writes not yet flushed)
1196
        cachedNodes := make(map[NodeID]*Node, len(ae.nodeCache))
1✔
1197
        for id, node := range ae.nodeCache {
2✔
1198
                cachedNodes[id] = node
1✔
1199
        }
1✔
1200

1201
        // Get nodes from underlying engine
1202
        engineNodes, err := ae.engine.AllNodes()
1✔
1203
        if err != nil {
2✔
1204
                // If engine fails, return what we have in cache
1✔
1205
                result := make([]*Node, 0, len(cachedNodes))
1✔
1206
                for _, node := range cachedNodes {
2✔
1207
                        if !deletedIDs[node.ID] {
2✔
1208
                                result = append(result, node)
1✔
1209
                        }
1✔
1210
                }
1211
                return result, nil
1✔
1212
        }
1213

1214
        // Merge: cache takes precedence, then engine
1215
        // Track what we've seen to avoid duplicates
1216
        seenIDs := make(map[NodeID]bool)
1✔
1217
        result := make([]*Node, 0, len(cachedNodes)+len(engineNodes))
1✔
1218

1✔
1219
        // Add cached nodes first (they're the "freshest" view)
1✔
1220
        for id, node := range cachedNodes {
1✔
1221
                if !deletedIDs[id] {
×
1222
                        result = append(result, node)
×
1223
                        seenIDs[id] = true
×
1224
                }
×
1225
        }
1226

1227
        // Add engine nodes that aren't in cache and aren't deleted
1228
        for _, node := range engineNodes {
2✔
1229
                if !seenIDs[node.ID] && !deletedIDs[node.ID] {
2✔
1230
                        result = append(result, node)
1✔
1231
                        seenIDs[node.ID] = true
1✔
1232
                }
1✔
1233
        }
1234

1235
        return result, nil
1✔
1236
}
1237

1238
// AllEdges returns merged view of cache and engine.
1239
// NOTE: We hold the read lock for the ENTIRE operation to prevent race conditions
1240
// with Flush() which clears the cache after writing to the engine.
1241
func (ae *AsyncEngine) AllEdges() ([]*Edge, error) {
1✔
1242
        ae.mu.RLock()
1✔
1243
        defer ae.mu.RUnlock()
1✔
1244

1✔
1245
        cachedEdges := make([]*Edge, 0, len(ae.edgeCache))
1✔
1246
        deletedIDs := make(map[EdgeID]bool)
1✔
1247

1✔
1248
        for id := range ae.deleteEdges {
2✔
1249
                deletedIDs[id] = true
1✔
1250
        }
1✔
1251
        for _, edge := range ae.edgeCache {
2✔
1252
                cachedEdges = append(cachedEdges, edge)
1✔
1253
        }
1✔
1254

1255
        engineEdges, err := ae.engine.AllEdges()
1✔
1256
        if err != nil {
2✔
1257
                result := make([]*Edge, 0, len(cachedEdges))
1✔
1258
                for _, edge := range cachedEdges {
2✔
1259
                        if !deletedIDs[edge.ID] {
2✔
1260
                                result = append(result, edge)
1✔
1261
                        }
1✔
1262
                }
1263
                return result, nil
1✔
1264
        }
1265

1266
        result := make([]*Edge, 0, len(cachedEdges)+len(engineEdges))
1✔
1267
        seenIDs := make(map[EdgeID]bool)
1✔
1268

1✔
1269
        for _, edge := range cachedEdges {
1✔
1270
                result = append(result, edge)
×
1271
                seenIDs[edge.ID] = true
×
1272
        }
×
1273
        for _, edge := range engineEdges {
2✔
1274
                if !seenIDs[edge.ID] && !deletedIDs[edge.ID] {
2✔
1275
                        result = append(result, edge)
1✔
1276
                }
1✔
1277
        }
1278

1279
        return result, nil
1✔
1280
}
1281

1282
// GetEdgesByType returns all edges of a specific type, merging cache and engine.
1283
func (ae *AsyncEngine) GetEdgesByType(edgeType string) ([]*Edge, error) {
1✔
1284
        if edgeType == "" {
2✔
1285
                return ae.AllEdges()
1✔
1286
        }
1✔
1287

1288
        ae.mu.RLock()
1✔
1289
        normalizedType := strings.ToLower(edgeType)
1✔
1290
        cachedEdges := make([]*Edge, 0)
1✔
1291
        deletedIDs := make(map[EdgeID]bool)
1✔
1292

1✔
1293
        for id := range ae.deleteEdges {
2✔
1294
                deletedIDs[id] = true
1✔
1295
        }
1✔
1296
        for _, edge := range ae.edgeCache {
2✔
1297
                if strings.ToLower(edge.Type) == normalizedType {
2✔
1298
                        cachedEdges = append(cachedEdges, edge)
1✔
1299
                }
1✔
1300
        }
1301
        ae.mu.RUnlock()
1✔
1302

1✔
1303
        engineEdges, err := ae.engine.GetEdgesByType(edgeType)
1✔
1304
        if err != nil {
2✔
1305
                return cachedEdges, nil
1✔
1306
        }
1✔
1307

1308
        result := make([]*Edge, 0, len(cachedEdges)+len(engineEdges))
1✔
1309
        seenIDs := make(map[EdgeID]bool)
1✔
1310

1✔
1311
        for _, edge := range cachedEdges {
2✔
1312
                result = append(result, edge)
1✔
1313
                seenIDs[edge.ID] = true
1✔
1314
        }
1✔
1315
        for _, edge := range engineEdges {
2✔
1316
                if !seenIDs[edge.ID] && !deletedIDs[edge.ID] {
2✔
1317
                        result = append(result, edge)
1✔
1318
                }
1✔
1319
        }
1320

1321
        return result, nil
1✔
1322
}
1323

1324
// Delegate read-only methods to engine
1325

1326
func (ae *AsyncEngine) GetOutgoingEdges(nodeID NodeID) ([]*Edge, error) {
1✔
1327
        // Check cache first for this node's outgoing edges
1✔
1328
        ae.mu.RLock()
1✔
1329
        var cached []*Edge
1✔
1330
        for _, edge := range ae.edgeCache {
2✔
1331
                if edge.StartNode == nodeID && !ae.deleteEdges[edge.ID] {
2✔
1332
                        cached = append(cached, edge)
1✔
1333
                }
1✔
1334
        }
1335
        deletedIDs := make(map[EdgeID]bool)
1✔
1336
        for id := range ae.deleteEdges {
2✔
1337
                deletedIDs[id] = true
1✔
1338
        }
1✔
1339
        ae.mu.RUnlock()
1✔
1340

1✔
1341
        engineEdges, err := ae.engine.GetOutgoingEdges(nodeID)
1✔
1342
        if err != nil {
2✔
1343
                return cached, nil
1✔
1344
        }
1✔
1345

1346
        // Merge
1347
        seenIDs := make(map[EdgeID]bool)
1✔
1348
        result := make([]*Edge, 0, len(cached)+len(engineEdges))
1✔
1349
        for _, e := range cached {
2✔
1350
                result = append(result, e)
1✔
1351
                seenIDs[e.ID] = true
1✔
1352
        }
1✔
1353
        for _, e := range engineEdges {
2✔
1354
                if !seenIDs[e.ID] && !deletedIDs[e.ID] {
2✔
1355
                        result = append(result, e)
1✔
1356
                }
1✔
1357
        }
1358
        return result, nil
1✔
1359
}
1360

1361
func (ae *AsyncEngine) GetIncomingEdges(nodeID NodeID) ([]*Edge, error) {
1✔
1362
        ae.mu.RLock()
1✔
1363
        var cached []*Edge
1✔
1364
        for _, edge := range ae.edgeCache {
2✔
1365
                if edge.EndNode == nodeID && !ae.deleteEdges[edge.ID] {
2✔
1366
                        cached = append(cached, edge)
1✔
1367
                }
1✔
1368
        }
1369
        deletedIDs := make(map[EdgeID]bool)
1✔
1370
        for id := range ae.deleteEdges {
1✔
1371
                deletedIDs[id] = true
×
1372
        }
×
1373
        ae.mu.RUnlock()
1✔
1374

1✔
1375
        engineEdges, err := ae.engine.GetIncomingEdges(nodeID)
1✔
1376
        if err != nil {
1✔
1377
                return cached, nil
×
1378
        }
×
1379

1380
        seenIDs := make(map[EdgeID]bool)
1✔
1381
        result := make([]*Edge, 0, len(cached)+len(engineEdges))
1✔
1382
        for _, e := range cached {
2✔
1383
                result = append(result, e)
1✔
1384
                seenIDs[e.ID] = true
1✔
1385
        }
1✔
1386
        for _, e := range engineEdges {
2✔
1387
                if !seenIDs[e.ID] && !deletedIDs[e.ID] {
2✔
1388
                        result = append(result, e)
1✔
1389
                }
1✔
1390
        }
1391
        return result, nil
1✔
1392
}
1393

1394
func (ae *AsyncEngine) GetEdgesBetween(startID, endID NodeID) ([]*Edge, error) {
1✔
1395
        return ae.engine.GetEdgesBetween(startID, endID)
1✔
1396
}
1✔
1397

1398
func (ae *AsyncEngine) GetEdgeBetween(startID, endID NodeID, edgeType string) *Edge {
1✔
1399
        return ae.engine.GetEdgeBetween(startID, endID, edgeType)
1✔
1400
}
1✔
1401

1402
func (ae *AsyncEngine) GetAllNodes() []*Node {
1✔
1403
        nodes, _ := ae.AllNodes()
1✔
1404
        return nodes
1✔
1405
}
1✔
1406

1407
func (ae *AsyncEngine) GetInDegree(nodeID NodeID) int {
1✔
1408
        return ae.engine.GetInDegree(nodeID)
1✔
1409
}
1✔
1410

1411
func (ae *AsyncEngine) GetOutDegree(nodeID NodeID) int {
1✔
1412
        return ae.engine.GetOutDegree(nodeID)
1✔
1413
}
1✔
1414

1415
func (ae *AsyncEngine) GetSchema() *SchemaManager {
1✔
1416
        return ae.engine.GetSchema()
1✔
1417
}
1✔
1418

1419
// GetSchemaForNamespace implements NamespaceSchemaProvider when the underlying engine supports it.
1420
func (ae *AsyncEngine) GetSchemaForNamespace(namespace string) *SchemaManager {
1✔
1421
        if p, ok := ae.engine.(NamespaceSchemaProvider); ok {
2✔
1422
                return p.GetSchemaForNamespace(namespace)
1✔
1423
        }
1✔
1424
        return ae.engine.GetSchema()
1✔
1425
}
1426

1427
func (ae *AsyncEngine) NodeCount() (int64, error) {
1✔
1428
        // Prevent double-counting during flush I/O:
1✔
1429
        // Flush holds flushMu.Lock() across the entire flush (including engine writes).
1✔
1430
        // By taking a read lock, NodeCount sees either:
1✔
1431
        //   - pre-flush state (cache populated, engine not yet updated), or
1✔
1432
        //   - post-flush state (cache cleared, engine updated),
1✔
1433
        // but never the mixed mid-flush state where engine reflects writes while cache
1✔
1434
        // still contains in-flight items.
1✔
1435
        ae.flushMu.RLock()
1✔
1436
        defer ae.flushMu.RUnlock()
1✔
1437

1✔
1438
        // Snapshot cache state under lock, then release before engine I/O to avoid
1✔
1439
        // blocking writers/readers on potentially slow storage calls.
1✔
1440
        ae.mu.RLock()
1✔
1441

1✔
1442
        // Count pending creates, excluding:
1✔
1443
        // - update nodes (exist in engine, just being modified)
1✔
1444
        // NOTE: We DO count in-flight nodes because they are being written to engine
1✔
1445
        // but engine.NodeCount() won't include them until the write commits.
1✔
1446
        // During flush, nodes transition: cache -> inFlight -> engine
1✔
1447
        // If we skip inFlight nodes AND engine hasn't committed, count = 0 (BUG!)
1✔
1448
        pendingCreates := int64(0)
1✔
1449
        pendingUpdates := int64(0)
1✔
1450
        inFlightCreates := int64(0)
1✔
1451
        for id := range ae.nodeCache {
2✔
1452
                if ae.updateNodes[id] {
2✔
1453
                        pendingUpdates++ // Exists in engine, just updating
1✔
1454
                        continue
1✔
1455
                }
1456
                if ae.inFlightNodes[id] {
2✔
1457
                        inFlightCreates++ // Being written to engine right now
1✔
1458
                        continue
1✔
1459
                }
1460
                pendingCreates++
1✔
1461
        }
1462
        // Also count nodes that are in-flight but NOT updates (they're being created)
1463
        pendingDeletes := int64(len(ae.deleteNodes))
1✔
1464
        ae.mu.RUnlock()
1✔
1465

1✔
1466
        engineCount, err := ae.engine.NodeCount()
1✔
1467

1✔
1468
        if err != nil {
2✔
1469
                return 0, err
1✔
1470
        }
1✔
1471

1472
        // Adjust for pending creates and deletes
1473
        // Note: pendingUpdates don't change count (already counted in engineCount)
1474
        // Include inFlightCreates because they're being written but not yet in engineCount
1475
        count := engineCount + pendingCreates + inFlightCreates - pendingDeletes
1✔
1476

1✔
1477
        // Clamp to zero if negative (should never happen, log for debugging)
1✔
1478
        if count < 0 {
2✔
1479
                log.Printf("⚠️ [COUNT BUG] NodeCount went negative: engineCount=%d pendingCreates=%d pendingDeletes=%d result=%d (clamping to 0)",
1✔
1480
                        engineCount, pendingCreates, pendingDeletes, count)
1✔
1481
                return 0, nil
1✔
1482
        }
1✔
1483
        return count, nil
1✔
1484
}
1485

1486
func (ae *AsyncEngine) EdgeCount() (int64, error) {
1✔
1487
        ae.flushMu.RLock()
1✔
1488
        defer ae.flushMu.RUnlock()
1✔
1489

1✔
1490
        // Snapshot cache state under lock, then release before engine I/O to avoid
1✔
1491
        // blocking writers/readers on potentially slow storage calls.
1✔
1492
        ae.mu.RLock()
1✔
1493

1✔
1494
        // Count pending creates, excluding:
1✔
1495
        // - update edges (exist in engine, just being modified)
1✔
1496
        // NOTE: We DO count in-flight edges because they are being written to engine
1✔
1497
        // but engine.EdgeCount() won't include them until the write commits.
1✔
1498
        pendingCreates := int64(0)
1✔
1499
        inFlightCreates := int64(0)
1✔
1500
        for id := range ae.edgeCache {
2✔
1501
                if ae.updateEdges[id] {
2✔
1502
                        continue // Exists in engine, just updating
1✔
1503
                }
1504
                if ae.inFlightEdges[id] {
2✔
1505
                        inFlightCreates++ // Being written to engine right now
1✔
1506
                        continue
1✔
1507
                }
1508
                pendingCreates++
1✔
1509
        }
1510
        pendingDeletes := int64(len(ae.deleteEdges))
1✔
1511
        ae.mu.RUnlock()
1✔
1512

1✔
1513
        engineCount, err := ae.engine.EdgeCount()
1✔
1514

1✔
1515
        if err != nil {
2✔
1516
                return 0, err
1✔
1517
        }
1✔
1518

1519
        // Adjust for pending creates and deletes
1520
        // Note: updates don't change count (already counted in engineCount)
1521
        // Include inFlightCreates because they're being written but not yet in engineCount
1522
        count := engineCount + pendingCreates + inFlightCreates - pendingDeletes
1✔
1523

1✔
1524
        // Clamp to zero if negative (should never happen, log for debugging)
1✔
1525
        if count < 0 {
2✔
1526
                log.Printf("⚠️ [COUNT BUG] EdgeCount went negative: engineCount=%d pendingCreates=%d pendingDeletes=%d result=%d (clamping to 0)",
1✔
1527
                        engineCount, pendingCreates, pendingDeletes, count)
1✔
1528
                return 0, nil
1✔
1529
        }
1✔
1530
        return count, nil
1✔
1531
}
1532

1533
func (ae *AsyncEngine) NodeCountByPrefix(prefix string) (int64, error) {
1✔
1534
        locked := ae.flushMu.TryRLock()
1✔
1535
        if locked {
2✔
1536
                defer ae.flushMu.RUnlock()
1✔
1537
        }
1✔
1538

1539
        // Snapshot cache state under lock, then release before engine I/O.
1540
        ae.mu.RLock()
1✔
1541

1✔
1542
        pendingCreates := int64(0)
1✔
1543
        pendingUpdates := int64(0)
1✔
1544
        inFlightCreates := int64(0)
1✔
1545
        for id := range ae.nodeCache {
2✔
1546
                if !strings.HasPrefix(string(id), prefix) {
2✔
1547
                        continue
1✔
1548
                }
1549
                if ae.updateNodes[id] {
2✔
1550
                        pendingUpdates++
1✔
1551
                        continue
1✔
1552
                }
1553
                if ae.inFlightNodes[id] {
2✔
1554
                        inFlightCreates++
1✔
1555
                        continue
1✔
1556
                }
1557
                pendingCreates++
1✔
1558
        }
1559

1560
        pendingDeletes := int64(0)
1✔
1561
        for id := range ae.deleteNodes {
2✔
1562
                if strings.HasPrefix(string(id), prefix) {
2✔
1563
                        pendingDeletes++
1✔
1564
                }
1✔
1565
        }
1566
        ae.mu.RUnlock()
1✔
1567

1✔
1568
        var engineCount int64
1✔
1569
        var err error
1✔
1570
        if stats, ok := ae.engine.(PrefixStatsEngine); ok {
2✔
1571
                engineCount, err = stats.NodeCountByPrefix(prefix)
1✔
1572
        } else {
1✔
1573
                // Correctness fallback for uncommon engines (slower).
×
1574
                engineCount, err = countNodesInEngineByPrefix(ae.engine, prefix)
×
1575
        }
×
1576

1577
        if err != nil {
1✔
1578
                return 0, err
×
1579
        }
×
1580

1581
        count := engineCount + pendingCreates + inFlightCreates - pendingDeletes
1✔
1582
        if count < 0 {
2✔
1583
                log.Printf("⚠️ [COUNT BUG] NodeCountByPrefix went negative: prefix=%q engineCount=%d pendingCreates=%d pendingDeletes=%d result=%d (clamping to 0)",
1✔
1584
                        prefix, engineCount, pendingCreates, pendingDeletes, count)
1✔
1585
                return 0, nil
1✔
1586
        }
1✔
1587
        _ = pendingUpdates // no-op (kept for symmetry with NodeCount)
1✔
1588
        return count, nil
1✔
1589
}
1590

1591
func (ae *AsyncEngine) EdgeCountByPrefix(prefix string) (int64, error) {
1✔
1592
        locked := ae.flushMu.TryRLock()
1✔
1593
        if locked {
2✔
1594
                defer ae.flushMu.RUnlock()
1✔
1595
        }
1✔
1596

1597
        ae.mu.RLock()
1✔
1598

1✔
1599
        pendingCreates := int64(0)
1✔
1600
        inFlightCreates := int64(0)
1✔
1601
        for id := range ae.edgeCache {
2✔
1602
                if !strings.HasPrefix(string(id), prefix) {
2✔
1603
                        continue
1✔
1604
                }
1605
                if ae.updateEdges[id] {
2✔
1606
                        continue
1✔
1607
                }
1608
                if ae.inFlightEdges[id] {
2✔
1609
                        inFlightCreates++
1✔
1610
                        continue
1✔
1611
                }
1612
                pendingCreates++
1✔
1613
        }
1614

1615
        pendingDeletes := int64(0)
1✔
1616
        for id := range ae.deleteEdges {
2✔
1617
                if strings.HasPrefix(string(id), prefix) {
2✔
1618
                        pendingDeletes++
1✔
1619
                }
1✔
1620
        }
1621
        ae.mu.RUnlock()
1✔
1622

1✔
1623
        var engineCount int64
1✔
1624
        var err error
1✔
1625
        if stats, ok := ae.engine.(PrefixStatsEngine); ok {
2✔
1626
                engineCount, err = stats.EdgeCountByPrefix(prefix)
1✔
1627
        } else {
1✔
1628
                engineCount, err = countEdgesInEngineByPrefix(ae.engine, prefix)
×
1629
        }
×
1630

1631
        if err != nil {
1✔
1632
                return 0, err
×
1633
        }
×
1634

1635
        count := engineCount + pendingCreates + inFlightCreates - pendingDeletes
1✔
1636
        if count < 0 {
2✔
1637
                log.Printf("⚠️ [COUNT BUG] EdgeCountByPrefix went negative: prefix=%q engineCount=%d pendingCreates=%d pendingDeletes=%d result=%d (clamping to 0)",
1✔
1638
                        prefix, engineCount, pendingCreates, pendingDeletes, count)
1✔
1639
                return 0, nil
1✔
1640
        }
1✔
1641
        return count, nil
1✔
1642
}
1643

1644
func countNodesInEngineByPrefix(engine Engine, prefix string) (int64, error) {
1✔
1645
        if streamer, ok := engine.(StreamingEngine); ok {
2✔
1646
                var count int64
1✔
1647
                err := streamer.StreamNodes(context.Background(), func(node *Node) error {
2✔
1648
                        if strings.HasPrefix(string(node.ID), prefix) {
2✔
1649
                                count++
1✔
1650
                        }
1✔
1651
                        return nil
1✔
1652
                })
1653
                return count, err
1✔
1654
        }
1655

1656
        nodes, err := engine.AllNodes()
1✔
1657
        if err != nil {
2✔
1658
                return 0, err
1✔
1659
        }
1✔
1660
        var count int64
1✔
1661
        for _, node := range nodes {
2✔
1662
                if strings.HasPrefix(string(node.ID), prefix) {
2✔
1663
                        count++
1✔
1664
                }
1✔
1665
        }
1666
        return count, nil
1✔
1667
}
1668

1669
func countEdgesInEngineByPrefix(engine Engine, prefix string) (int64, error) {
1✔
1670
        if streamer, ok := engine.(StreamingEngine); ok {
2✔
1671
                var count int64
1✔
1672
                err := streamer.StreamEdges(context.Background(), func(edge *Edge) error {
2✔
1673
                        if strings.HasPrefix(string(edge.ID), prefix) {
2✔
1674
                                count++
1✔
1675
                        }
1✔
1676
                        return nil
1✔
1677
                })
1678
                return count, err
1✔
1679
        }
1680

1681
        edges, err := engine.AllEdges()
1✔
1682
        if err != nil {
2✔
1683
                return 0, err
1✔
1684
        }
1✔
1685
        var count int64
1✔
1686
        for _, edge := range edges {
2✔
1687
                if strings.HasPrefix(string(edge.ID), prefix) {
2✔
1688
                        count++
1✔
1689
                }
1✔
1690
        }
1691
        return count, nil
1✔
1692
}
1693

1694
// Close stops the background flush goroutine and flushes all pending data.
1695
// Returns an error if the final flush fails or if data remains unflushed.
1696
func (ae *AsyncEngine) Close() error {
1✔
1697
        // Stop flush loop
1✔
1698
        close(ae.stopChan)
1✔
1699
        ae.flushTicker.Stop()
1✔
1700
        ae.wg.Wait()
1✔
1701

1✔
1702
        // Final flush with error tracking
1✔
1703
        result := ae.FlushWithResult()
1✔
1704

1✔
1705
        // Check for unflushed data after final flush attempt
1✔
1706
        ae.mu.RLock()
1✔
1707
        pendingNodes := len(ae.nodeCache)
1✔
1708
        pendingEdges := len(ae.edgeCache)
1✔
1709
        pendingNodeDeletes := len(ae.deleteNodes)
1✔
1710
        pendingEdgeDeletes := len(ae.deleteEdges)
1✔
1711
        ae.mu.RUnlock()
1✔
1712

1✔
1713
        // Close underlying engine
1✔
1714
        engineErr := ae.engine.Close()
1✔
1715

1✔
1716
        // Treat "storage closed" as expected during shutdown (teardown closed engine before async, or ticker/stopChan race).
1✔
1717
        if result.HasErrors() && result.isStorageClosedOnly() {
1✔
1718
                result = FlushResult{}
×
1719
        }
×
1720

1721
        // Build error message if there are issues
1722
        if result.HasErrors() || pendingNodes > 0 || pendingEdges > 0 {
2✔
1723
                var errMsg string
1✔
1724
                if result.HasErrors() {
2✔
1725
                        errMsg = fmt.Sprintf("flush errors: %d nodes failed, %d edges failed, %d deletes failed",
1✔
1726
                                result.NodesFailed, result.EdgesFailed, result.DeletesFailed)
1✔
1727
                }
1✔
1728
                if pendingNodes > 0 || pendingEdges > 0 || pendingNodeDeletes > 0 || pendingEdgeDeletes > 0 {
2✔
1729
                        if errMsg != "" {
2✔
1730
                                errMsg += "; "
1✔
1731
                        }
1✔
1732
                        errMsg += fmt.Sprintf("unflushed: %d nodes, %d edges, %d node deletes, %d edge deletes (POTENTIAL DATA LOSS)",
1✔
1733
                                pendingNodes, pendingEdges, pendingNodeDeletes, pendingEdgeDeletes)
1✔
1734
                }
1735
                if engineErr != nil {
1✔
1736
                        return fmt.Errorf("%s; engine close: %w", errMsg, engineErr)
×
1737
                }
×
1738
                return fmt.Errorf("async engine close: %s", errMsg)
1✔
1739
        }
1740

1741
        return engineErr
1✔
1742
}
1743

1744
// BulkCreateNodes creates nodes in batch (async).
1745
func (ae *AsyncEngine) BulkCreateNodes(nodes []*Node) error {
1✔
1746
        if err := ae.validateBulkNodeConstraints(nodes); err != nil {
1✔
1747
                return err
×
1748
        }
×
1749
        for _, node := range nodes {
2✔
1750
                if node == nil {
1✔
1751
                        return ErrInvalidData
×
1752
                }
×
1753
                if err := validatePropertiesForStorage(node.Properties); err != nil {
1✔
1754
                        return err
×
1755
                }
×
1756
        }
1757

1758
        ae.mu.Lock()
1✔
1759
        defer ae.mu.Unlock()
1✔
1760

1✔
1761
        for _, node := range nodes {
2✔
1762
                delete(ae.deleteNodes, node.ID)
1✔
1763
                ae.nodeCache[node.ID] = node
1✔
1764
        }
1✔
1765
        ae.pendingWrites += int64(len(nodes))
1✔
1766
        return nil
1✔
1767
}
1768

1769
func (ae *AsyncEngine) validateBulkNodeConstraints(nodes []*Node) error {
1✔
1770
        seen := make(map[string]struct{})
1✔
1771

1✔
1772
        for _, node := range nodes {
2✔
1773
                if node == nil {
2✔
1774
                        return ErrInvalidData
1✔
1775
                }
1✔
1776
                namespace, prefixRequired, err := ae.resolveNamespace(node.ID)
1✔
1777
                if err != nil {
1✔
1778
                        return err
×
1779
                }
×
1780
                if err := ae.validateNodeConstraintsWithNamespace(node, namespace, prefixRequired); err != nil {
2✔
1781
                        return err
1✔
1782
                }
1✔
1783

1784
                schema := ae.GetSchemaForNamespace(namespace)
1✔
1785
                if schema == nil {
2✔
1786
                        continue
1✔
1787
                }
1788

1789
                constraints := schema.GetConstraintsForLabels(node.Labels)
1✔
1790
                for _, c := range constraints {
2✔
1791
                        switch c.Type {
1✔
1792
                        case ConstraintUnique:
1✔
1793
                                if len(c.Properties) != 1 {
2✔
1794
                                        continue
1✔
1795
                                }
1796
                                prop := c.Properties[0]
1✔
1797
                                value := node.Properties[prop]
1✔
1798
                                if value == nil {
2✔
1799
                                        continue
1✔
1800
                                }
1801
                                key := fmt.Sprintf("%s:%s:%s", namespace, c.Name, constraintValueKey(value))
1✔
1802
                                if _, exists := seen[key]; exists {
1✔
1803
                                        return &ConstraintViolationError{
×
1804
                                                Type:       ConstraintUnique,
×
1805
                                                Label:      c.Label,
×
1806
                                                Properties: []string{prop},
×
1807
                                                Message:    fmt.Sprintf("Node with %s=%v already exists in batch", prop, value),
×
1808
                                        }
×
1809
                                }
×
1810
                                seen[key] = struct{}{}
1✔
1811
                        case ConstraintNodeKey:
1✔
1812
                                values := make([]interface{}, len(c.Properties))
1✔
1813
                                for i, prop := range c.Properties {
2✔
1814
                                        values[i] = node.Properties[prop]
1✔
1815
                                        if values[i] == nil {
1✔
1816
                                                return &ConstraintViolationError{
×
1817
                                                        Type:       ConstraintNodeKey,
×
1818
                                                        Label:      c.Label,
×
1819
                                                        Properties: c.Properties,
×
1820
                                                        Message:    fmt.Sprintf("NODE KEY property %s cannot be null", prop),
×
1821
                                                }
×
1822
                                        }
×
1823
                                }
1824
                                key := fmt.Sprintf("%s:%s:%s", namespace, c.Name, constraintCompositeKey(values))
1✔
1825
                                if _, exists := seen[key]; exists {
1✔
1826
                                        return &ConstraintViolationError{
×
1827
                                                Type:       ConstraintNodeKey,
×
1828
                                                Label:      c.Label,
×
1829
                                                Properties: c.Properties,
×
1830
                                                Message:    fmt.Sprintf("Node with key %v=%v already exists in batch", c.Properties, values),
×
1831
                                        }
×
1832
                                }
×
1833
                                seen[key] = struct{}{}
1✔
1834
                        }
1835
                }
1836
        }
1837

1838
        return nil
1✔
1839
}
1840

1841
func (ae *AsyncEngine) validateNodeConstraints(node *Node) error {
1✔
1842
        if node == nil {
2✔
1843
                return ErrInvalidData
1✔
1844
        }
1✔
1845
        namespace, prefixRequired, err := ae.resolveNamespace(node.ID)
1✔
1846
        if err != nil {
1✔
1847
                return err
×
1848
        }
×
1849
        return ae.validateNodeConstraintsWithNamespace(node, namespace, prefixRequired)
1✔
1850
}
1851

1852
func (ae *AsyncEngine) validateNodeConstraintsWithNamespace(node *Node, namespace string, prefixRequired bool) error {
1✔
1853
        if node == nil {
1✔
1854
                return ErrInvalidData
×
1855
        }
×
1856

1857
        schema := ae.GetSchemaForNamespace(namespace)
1✔
1858
        if schema == nil {
2✔
1859
                return nil
1✔
1860
        }
1✔
1861

1862
        constraints := schema.GetConstraintsForLabels(node.Labels)
1✔
1863
        for _, constraint := range constraints {
2✔
1864
                switch constraint.Type {
1✔
1865
                case ConstraintUnique:
1✔
1866
                        if err := ae.checkUniqueConstraint(node, constraint, namespace, prefixRequired); err != nil {
2✔
1867
                                return err
1✔
1868
                        }
1✔
1869
                case ConstraintNodeKey:
1✔
1870
                        if err := ae.checkNodeKeyConstraint(node, constraint, namespace, prefixRequired); err != nil {
2✔
1871
                                return err
1✔
1872
                        }
1✔
1873
                case ConstraintExists:
1✔
1874
                        if err := ae.checkExistenceConstraint(node, constraint); err != nil {
2✔
1875
                                return err
1✔
1876
                        }
1✔
1877
                }
1878
        }
1879

1880
        typeConstraints := schema.GetPropertyTypeConstraintsForLabels(node.Labels)
1✔
1881
        for _, constraint := range typeConstraints {
2✔
1882
                value := node.Properties[constraint.Property]
1✔
1883
                if err := ValidatePropertyType(value, constraint.ExpectedType); err != nil {
1✔
1884
                        return &ConstraintViolationError{
×
1885
                                Type:       ConstraintPropertyType,
×
1886
                                Label:      constraint.Label,
×
1887
                                Properties: []string{constraint.Property},
×
1888
                                Message:    fmt.Sprintf("Property %s must be %s (%v)", constraint.Property, constraint.ExpectedType, err),
×
1889
                        }
×
1890
                }
×
1891
        }
1892

1893
        return nil
1✔
1894
}
1895

1896
func (ae *AsyncEngine) checkUniqueConstraint(node *Node, c Constraint, namespace string, prefixRequired bool) error {
1✔
1897
        if len(c.Properties) != 1 {
2✔
1898
                return nil
1✔
1899
        }
1✔
1900
        prop := c.Properties[0]
1✔
1901
        if node.Properties == nil {
1✔
1902
                return nil
×
1903
        }
×
1904
        value := node.Properties[prop]
1✔
1905
        if value == nil {
2✔
1906
                return nil
1✔
1907
        }
1✔
1908

1909
        nsPrefix := namespace + ":"
1✔
1910
        ae.mu.RLock()
1✔
1911
        for id, n := range ae.nodeCache {
2✔
1912
                if id == node.ID || ae.deleteNodes[id] {
1✔
1913
                        continue
×
1914
                }
1915
                if prefixRequired && !strings.HasPrefix(string(id), nsPrefix) {
1✔
1916
                        continue
×
1917
                }
1918
                if hasLabel(n.Labels, c.Label) && compareValues(n.Properties[prop], value) {
2✔
1919
                        ae.mu.RUnlock()
1✔
1920
                        return &ConstraintViolationError{
1✔
1921
                                Type:       ConstraintUnique,
1✔
1922
                                Label:      c.Label,
1✔
1923
                                Properties: []string{prop},
1✔
1924
                                Message:    fmt.Sprintf("Node with %s=%v already exists in async cache", prop, value),
1✔
1925
                        }
1✔
1926
                }
1✔
1927
        }
1928
        ae.mu.RUnlock()
1✔
1929

1✔
1930
        nodes, err := ae.engine.GetNodesByLabel(c.Label)
1✔
1931
        if err != nil {
1✔
1932
                return nil
×
1933
        }
×
1934
        for _, existing := range nodes {
2✔
1935
                if existing.ID == node.ID {
1✔
1936
                        continue
×
1937
                }
1938
                if prefixRequired && !strings.HasPrefix(string(existing.ID), nsPrefix) {
1✔
1939
                        continue
×
1940
                }
1941
                if compareValues(existing.Properties[prop], value) {
2✔
1942
                        return &ConstraintViolationError{
1✔
1943
                                Type:       ConstraintUnique,
1✔
1944
                                Label:      c.Label,
1✔
1945
                                Properties: []string{prop},
1✔
1946
                                Message:    fmt.Sprintf("Node with %s=%v already exists (nodeID: %s)", prop, value, existing.ID),
1✔
1947
                        }
1✔
1948
                }
1✔
1949
        }
1950

1951
        return nil
1✔
1952
}
1953

1954
func (ae *AsyncEngine) checkNodeKeyConstraint(node *Node, c Constraint, namespace string, prefixRequired bool) error {
1✔
1955
        if len(c.Properties) < 1 {
2✔
1956
                return nil
1✔
1957
        }
1✔
1958
        if node.Properties == nil {
2✔
1959
                return &ConstraintViolationError{
1✔
1960
                        Type:       ConstraintNodeKey,
1✔
1961
                        Label:      c.Label,
1✔
1962
                        Properties: c.Properties,
1✔
1963
                        Message:    "NODE KEY properties cannot be null",
1✔
1964
                }
1✔
1965
        }
1✔
1966

1967
        values := make([]interface{}, len(c.Properties))
1✔
1968
        for i, prop := range c.Properties {
2✔
1969
                value := node.Properties[prop]
1✔
1970
                if value == nil {
2✔
1971
                        return &ConstraintViolationError{
1✔
1972
                                Type:       ConstraintNodeKey,
1✔
1973
                                Label:      c.Label,
1✔
1974
                                Properties: c.Properties,
1✔
1975
                                Message:    fmt.Sprintf("NODE KEY property %s cannot be null", prop),
1✔
1976
                        }
1✔
1977
                }
1✔
1978
                values[i] = value
1✔
1979
        }
1980

1981
        nsPrefix := namespace + ":"
1✔
1982
        ae.mu.RLock()
1✔
1983
        for id, n := range ae.nodeCache {
2✔
1984
                if id == node.ID || ae.deleteNodes[id] {
1✔
1985
                        continue
×
1986
                }
1987
                if prefixRequired && !strings.HasPrefix(string(id), nsPrefix) {
1✔
1988
                        continue
×
1989
                }
1990
                if !hasLabel(n.Labels, c.Label) {
1✔
1991
                        continue
×
1992
                }
1993
                match := true
1✔
1994
                for i, prop := range c.Properties {
2✔
1995
                        if !compareValues(n.Properties[prop], values[i]) {
1✔
1996
                                match = false
×
1997
                                break
×
1998
                        }
1999
                }
2000
                if match {
2✔
2001
                        ae.mu.RUnlock()
1✔
2002
                        return &ConstraintViolationError{
1✔
2003
                                Type:       ConstraintNodeKey,
1✔
2004
                                Label:      c.Label,
1✔
2005
                                Properties: c.Properties,
1✔
2006
                                Message:    fmt.Sprintf("Node with key %v=%v already exists in async cache", c.Properties, values),
1✔
2007
                        }
1✔
2008
                }
1✔
2009
        }
2010
        ae.mu.RUnlock()
1✔
2011

1✔
2012
        nodes, err := ae.engine.GetNodesByLabel(c.Label)
1✔
2013
        if err != nil {
2✔
2014
                return nil
1✔
2015
        }
1✔
2016
        for _, existing := range nodes {
2✔
2017
                if existing.ID == node.ID {
1✔
2018
                        continue
×
2019
                }
2020
                if prefixRequired && !strings.HasPrefix(string(existing.ID), nsPrefix) {
2✔
2021
                        continue
1✔
2022
                }
2023
                match := true
1✔
2024
                for i, prop := range c.Properties {
2✔
2025
                        if !compareValues(existing.Properties[prop], values[i]) {
2✔
2026
                                match = false
1✔
2027
                                break
1✔
2028
                        }
2029
                }
2030
                if match {
2✔
2031
                        return &ConstraintViolationError{
1✔
2032
                                Type:       ConstraintNodeKey,
1✔
2033
                                Label:      c.Label,
1✔
2034
                                Properties: c.Properties,
1✔
2035
                                Message:    fmt.Sprintf("Node with key %v=%v already exists (nodeID: %s)", c.Properties, values, existing.ID),
1✔
2036
                        }
1✔
2037
                }
1✔
2038
        }
2039

2040
        return nil
1✔
2041
}
2042

2043
func (ae *AsyncEngine) resolveNamespace(nodeID NodeID) (string, bool, error) {
1✔
2044
        if namespace, _, ok := ParseDatabasePrefix(string(nodeID)); ok {
2✔
2045
                return namespace, true, nil
1✔
2046
        }
1✔
2047
        if provider, ok := ae.engine.(interface{ Namespace() string }); ok {
2✔
2048
                ns := provider.Namespace()
1✔
2049
                if ns != "" {
2✔
2050
                        return ns, false, nil
1✔
2051
                }
1✔
2052
        }
2053
        return "", false, fmt.Errorf("node ID must be prefixed with namespace (e.g., 'nornic:node-123'), got: %s", nodeID)
1✔
2054
}
2055

2056
func (ae *AsyncEngine) checkExistenceConstraint(node *Node, c Constraint) error {
1✔
2057
        if len(c.Properties) != 1 {
2✔
2058
                return nil
1✔
2059
        }
1✔
2060
        prop := c.Properties[0]
1✔
2061
        if node.Properties == nil {
2✔
2062
                return &ConstraintViolationError{
1✔
2063
                        Type:       ConstraintExists,
1✔
2064
                        Label:      c.Label,
1✔
2065
                        Properties: []string{prop},
1✔
2066
                        Message:    fmt.Sprintf("Required property %s is missing", prop),
1✔
2067
                }
1✔
2068
        }
1✔
2069
        if val, ok := node.Properties[prop]; !ok || val == nil {
2✔
2070
                return &ConstraintViolationError{
1✔
2071
                        Type:       ConstraintExists,
1✔
2072
                        Label:      c.Label,
1✔
2073
                        Properties: []string{prop},
1✔
2074
                        Message:    fmt.Sprintf("Required property %s is missing", prop),
1✔
2075
                }
1✔
2076
        }
1✔
2077
        return nil
1✔
2078
}
2079

2080
// BulkCreateEdges creates edges in batch (async).
2081
func (ae *AsyncEngine) BulkCreateEdges(edges []*Edge) error {
1✔
2082
        for _, edge := range edges {
2✔
2083
                if edge == nil {
1✔
2084
                        return ErrInvalidData
×
2085
                }
×
2086
                if err := validatePropertiesForStorage(edge.Properties); err != nil {
1✔
2087
                        return err
×
2088
                }
×
2089
        }
2090

2091
        ae.mu.Lock()
1✔
2092
        defer ae.mu.Unlock()
1✔
2093

1✔
2094
        for _, edge := range edges {
2✔
2095
                delete(ae.deleteEdges, edge.ID)
1✔
2096
                ae.edgeCache[edge.ID] = edge
1✔
2097
        }
1✔
2098
        ae.pendingWrites += int64(len(edges))
1✔
2099
        return nil
1✔
2100
}
2101

2102
// BulkDeleteNodes marks multiple nodes for deletion (async).
2103
func (ae *AsyncEngine) BulkDeleteNodes(ids []NodeID) error {
1✔
2104
        ae.mu.Lock()
1✔
2105
        notifyDeletes := make([]NodeID, 0)
1✔
2106

1✔
2107
        for _, id := range ids {
2✔
2108
                // If this is a pending create in cache, deleting it won’t hit the inner engine.
1✔
2109
                // Notify best-effort so external services can remove any speculative indexes.
1✔
2110
                if _, ok := ae.nodeCache[id]; ok && !ae.updateNodes[id] {
2✔
2111
                        notifyDeletes = append(notifyDeletes, id)
1✔
2112
                }
1✔
2113
                delete(ae.nodeCache, id)
1✔
2114
                ae.deleteNodes[id] = true
1✔
2115
        }
2116
        ae.pendingWrites += int64(len(ids))
1✔
2117
        ae.mu.Unlock()
1✔
2118

1✔
2119
        for _, id := range notifyDeletes {
2✔
2120
                ae.notifyNodeDeleted(id)
1✔
2121
        }
1✔
2122
        return nil
1✔
2123
}
2124

2125
// BulkDeleteEdges marks multiple edges for deletion (async).
2126
func (ae *AsyncEngine) BulkDeleteEdges(ids []EdgeID) error {
1✔
2127
        ae.mu.Lock()
1✔
2128
        defer ae.mu.Unlock()
1✔
2129

1✔
2130
        for _, id := range ids {
2✔
2131
                delete(ae.edgeCache, id)
1✔
2132
                ae.deleteEdges[id] = true
1✔
2133
        }
1✔
2134
        ae.pendingWrites += int64(len(ids))
1✔
2135
        return nil
1✔
2136
}
2137

2138
// FindNodeNeedingEmbedding returns a node that needs embedding.
2139
// IMPORTANT: This checks the in-memory cache first to ensure we don't re-process
2140
// nodes that have embeddings pending flush to the underlying engine.
2141
//
2142
// The algorithm:
2143
// 1. Build set of node IDs that have embeddings in cache (pending flush)
2144
// 2. First check nodes in our cache that need embedding
2145
// 3. Then check underlying engine, skipping nodes we have in cache with embeddings
2146
func (ae *AsyncEngine) FindNodeNeedingEmbedding() *Node {
1✔
2147
        ae.mu.RLock()
1✔
2148

1✔
2149
        // Build set of node IDs in cache that already have embeddings
1✔
2150
        cachedWithEmbedding := make(map[NodeID]bool)
1✔
2151
        for id, node := range ae.nodeCache {
2✔
2152
                if len(node.ChunkEmbeddings) > 0 && len(node.ChunkEmbeddings[0]) > 0 {
2✔
2153
                        cachedWithEmbedding[id] = true
1✔
2154
                }
1✔
2155
        }
2156

2157
        // First check nodes in our own cache that might need embedding
2158
        for _, node := range ae.nodeCache {
2✔
2159
                if ae.deleteNodes[node.ID] {
1✔
2160
                        continue
×
2161
                }
2162
                if !cachedWithEmbedding[node.ID] && NodeNeedsEmbedding(node) {
2✔
2163
                        ae.mu.RUnlock()
1✔
2164
                        return node
1✔
2165
                }
1✔
2166
        }
2167
        ae.mu.RUnlock()
1✔
2168

1✔
2169
        // Try dedicated finder on underlying engine
1✔
2170
        if finder, ok := ae.engine.(interface{ FindNodeNeedingEmbedding() *Node }); ok {
2✔
2171
                node := finder.FindNodeNeedingEmbedding()
1✔
2172
                if node == nil {
2✔
2173
                        return nil
1✔
2174
                }
1✔
2175

2176
                // Check if this node has an embedding in our cache
2177
                if cachedWithEmbedding[node.ID] {
2✔
2178
                        // This node has embedding pending flush - no work to do
1✔
2179
                        return nil
1✔
2180
                }
1✔
2181
                if ae.isNodeMarkedDeleted(node.ID) {
2✔
2182
                        // If delete is pending in AsyncEngine, proactively remove this stale queue
1✔
2183
                        // entry from the underlying pending-embeddings index.
1✔
2184
                        ae.MarkNodeEmbedded(node.ID)
1✔
2185
                        return nil
1✔
2186
                }
1✔
2187

2188
                return node
1✔
2189
        }
2190

2191
        // Fallback: use AllNodes from ExportableEngine
2192
        if exportable, ok := ae.engine.(ExportableEngine); ok {
2✔
2193
                nodes, err := exportable.AllNodes()
1✔
2194
                if err != nil {
2✔
2195
                        return nil
1✔
2196
                }
1✔
2197
                for _, node := range nodes {
2✔
2198
                        // Skip if in cache with embedding
1✔
2199
                        if cachedWithEmbedding[node.ID] {
1✔
2200
                                continue
×
2201
                        }
2202
                        if ae.isNodeMarkedDeleted(node.ID) {
2✔
2203
                                continue
1✔
2204
                        }
2205
                        if NodeNeedsEmbedding(node) {
2✔
2206
                                return node
1✔
2207
                        }
1✔
2208
                }
2209
        }
2210

2211
        return nil
×
2212
}
2213

2214
// RefreshPendingEmbeddingsIndex delegates to the underlying engine, if supported.
2215
// This keeps the pending-embeddings secondary index consistent even when AsyncEngine
2216
// is the outer-most storage layer.
2217
func (ae *AsyncEngine) RefreshPendingEmbeddingsIndex() int {
1✔
2218
        if mgr, ok := ae.engine.(interface{ RefreshPendingEmbeddingsIndex() int }); ok {
2✔
2219
                return mgr.RefreshPendingEmbeddingsIndex()
1✔
2220
        }
1✔
2221
        return 0
1✔
2222
}
2223

2224
// MarkNodeEmbedded delegates to the underlying engine, if supported.
2225
// This removes a node from the pending-embeddings secondary index once embedded.
2226
func (ae *AsyncEngine) MarkNodeEmbedded(nodeID NodeID) {
1✔
2227
        if mgr, ok := ae.engine.(interface{ MarkNodeEmbedded(NodeID) }); ok {
2✔
2228
                mgr.MarkNodeEmbedded(nodeID)
1✔
2229
        }
1✔
2230
}
2231

2232
// AddToPendingEmbeddings delegates to the underlying engine, if supported.
2233
// Call this to re-queue a node for embedding after a failed attempt (e.g. so another worker can retry).
2234
func (ae *AsyncEngine) AddToPendingEmbeddings(nodeID NodeID) {
1✔
2235
        // Don't allow re-queue while delete is pending in AsyncEngine.
1✔
2236
        if ae.isNodeMarkedDeleted(nodeID) {
2✔
2237
                return
1✔
2238
        }
1✔
2239
        if mgr, ok := ae.engine.(interface{ AddToPendingEmbeddings(NodeID) }); ok {
2✔
2240
                mgr.AddToPendingEmbeddings(nodeID)
1✔
2241
        }
1✔
2242
}
2243

2244
// PendingEmbeddingsCount delegates to the underlying engine, if supported.
2245
func (ae *AsyncEngine) PendingEmbeddingsCount() int {
1✔
2246
        if mgr, ok := ae.engine.(interface{ PendingEmbeddingsCount() int }); ok {
2✔
2247
                return mgr.PendingEmbeddingsCount()
1✔
2248
        }
1✔
2249
        return 0
1✔
2250
}
2251

2252
func (ae *AsyncEngine) isNodeMarkedDeleted(nodeID NodeID) bool {
1✔
2253
        ae.mu.RLock()
1✔
2254
        deleted := ae.deleteNodes[nodeID]
1✔
2255
        ae.mu.RUnlock()
1✔
2256
        return deleted
1✔
2257
}
1✔
2258

2259
// IterateNodes iterates through all nodes, checking cache first.
2260
func (ae *AsyncEngine) IterateNodes(fn func(*Node) bool) error {
1✔
2261
        // First iterate cache
1✔
2262
        // We need to make copies since the callback may be called without locks held
1✔
2263
        // and the node could be modified by other goroutines
1✔
2264
        ae.mu.RLock()
1✔
2265
        cachedIDs := make(map[NodeID]bool)
1✔
2266
        cachedCopies := make([]*Node, 0, len(ae.nodeCache))
1✔
2267
        for id, node := range ae.nodeCache {
2✔
2268
                if ae.deleteNodes[id] {
1✔
2269
                        continue
×
2270
                }
2271
                cachedIDs[id] = true
1✔
2272
                // Make a deep copy of the node to avoid concurrent access issues
1✔
2273
                nodeCopy := &Node{
1✔
2274
                        ID:           node.ID,
1✔
2275
                        Labels:       append([]string(nil), node.Labels...),
1✔
2276
                        Properties:   make(map[string]any, len(node.Properties)),
1✔
2277
                        CreatedAt:    node.CreatedAt,
1✔
2278
                        UpdatedAt:    node.UpdatedAt,
1✔
2279
                        DecayScore:   node.DecayScore,
1✔
2280
                        LastAccessed: node.LastAccessed,
1✔
2281
                        AccessCount:  node.AccessCount,
1✔
2282
                        ChunkEmbeddings: func() [][]float32 {
2✔
2283
                                chunks := make([][]float32, len(node.ChunkEmbeddings))
1✔
2284
                                for i, emb := range node.ChunkEmbeddings {
1✔
2285
                                        chunks[i] = append([]float32(nil), emb...)
×
2286
                                }
×
2287
                                return chunks
1✔
2288
                        }(),
2289
                }
2290
                for k, v := range node.Properties {
2✔
2291
                        nodeCopy.Properties[k] = v
1✔
2292
                }
1✔
2293
                cachedCopies = append(cachedCopies, nodeCopy)
1✔
2294
        }
2295
        ae.mu.RUnlock()
1✔
2296

1✔
2297
        // Call callback with copies (safe to do without lock)
1✔
2298
        for _, nodeCopy := range cachedCopies {
2✔
2299
                if !fn(nodeCopy) {
2✔
2300
                        return nil
1✔
2301
                }
1✔
2302
        }
2303

2304
        // Then iterate underlying engine, skipping cached nodes
2305
        if iterator, ok := ae.engine.(interface{ IterateNodes(func(*Node) bool) error }); ok {
2✔
2306
                return iterator.IterateNodes(func(node *Node) bool {
2✔
2307
                        if cachedIDs[node.ID] {
1✔
2308
                                return true // Skip, already visited from cache
×
2309
                        }
×
2310
                        ae.mu.RLock()
1✔
2311
                        deleted := ae.deleteNodes[node.ID]
1✔
2312
                        ae.mu.RUnlock()
1✔
2313

1✔
2314
                        if deleted {
1✔
2315
                                return true // Skip deleted
×
2316
                        }
×
2317

2318
                        return fn(node)
1✔
2319
                })
2320
        }
2321

2322
        return nil
×
2323
}
2324

2325
// ============================================================================
2326
// StreamingEngine Implementation
2327
// ============================================================================
2328

2329
// StreamNodes implements StreamingEngine.StreamNodes by delegating to the underlying engine.
2330
// It merges cached nodes with the underlying stream for consistency.
2331
func (ae *AsyncEngine) StreamNodes(ctx context.Context, fn func(node *Node) error) error {
1✔
2332
        ae.mu.RLock()
1✔
2333

1✔
2334
        // First, stream cached nodes (not yet flushed)
1✔
2335
        for id, node := range ae.nodeCache {
2✔
2336
                select {
1✔
2337
                case <-ctx.Done():
1✔
2338
                        ae.mu.RUnlock()
1✔
2339
                        return ctx.Err()
1✔
2340
                default:
1✔
2341
                }
2342
                if ae.deleteNodes[id] {
1✔
2343
                        continue // Skip if marked for deletion
×
2344
                }
2345
                ae.mu.RUnlock()
1✔
2346
                if err := fn(node); err != nil {
2✔
2347
                        if err == ErrIterationStopped {
2✔
2348
                                return nil // Normal early termination
1✔
2349
                        }
1✔
2350
                        return err
1✔
2351
                }
2352
                ae.mu.RLock()
1✔
2353
        }
2354

2355
        // Build set of cached node IDs to skip in underlying stream
2356
        cachedIDs := make(map[NodeID]bool, len(ae.nodeCache))
1✔
2357
        for id := range ae.nodeCache {
2✔
2358
                cachedIDs[id] = true
1✔
2359
        }
1✔
2360
        deletedIDs := make(map[NodeID]bool, len(ae.deleteNodes))
1✔
2361
        for id := range ae.deleteNodes {
2✔
2362
                deletedIDs[id] = true
1✔
2363
        }
1✔
2364
        ae.mu.RUnlock()
1✔
2365

1✔
2366
        // Then stream from underlying engine, skipping cached/deleted nodes
1✔
2367
        if streamer, ok := ae.engine.(StreamingEngine); ok {
2✔
2368
                return streamer.StreamNodes(ctx, func(node *Node) error {
2✔
2369
                        // Skip if we already returned this from cache or it's deleted
1✔
2370
                        if cachedIDs[node.ID] || deletedIDs[node.ID] {
2✔
2371
                                return nil
1✔
2372
                        }
1✔
2373
                        return fn(node)
1✔
2374
                })
2375
        }
2376

2377
        // Fallback: load all from underlying engine
2378
        nodes, err := ae.engine.AllNodes()
1✔
2379
        if err != nil {
2✔
2380
                return err
1✔
2381
        }
1✔
2382
        for _, node := range nodes {
2✔
2383
                select {
1✔
2384
                case <-ctx.Done():
×
2385
                        return ctx.Err()
×
2386
                default:
1✔
2387
                }
2388
                if cachedIDs[node.ID] || deletedIDs[node.ID] {
1✔
2389
                        continue
×
2390
                }
2391
                if err := fn(node); err != nil {
2✔
2392
                        if err == ErrIterationStopped {
2✔
2393
                                return nil
1✔
2394
                        }
1✔
2395
                        return err
1✔
2396
                }
2397
        }
2398
        return nil
×
2399
}
2400

2401
// StreamEdges implements StreamingEngine.StreamEdges by delegating to the underlying engine.
2402
func (ae *AsyncEngine) StreamEdges(ctx context.Context, fn func(edge *Edge) error) error {
1✔
2403
        ae.mu.RLock()
1✔
2404

1✔
2405
        // First, stream cached edges
1✔
2406
        for id, edge := range ae.edgeCache {
2✔
2407
                select {
1✔
2408
                case <-ctx.Done():
1✔
2409
                        ae.mu.RUnlock()
1✔
2410
                        return ctx.Err()
1✔
2411
                default:
1✔
2412
                }
2413
                if ae.deleteEdges[id] {
1✔
2414
                        continue
×
2415
                }
2416
                ae.mu.RUnlock()
1✔
2417
                if err := fn(edge); err != nil {
2✔
2418
                        if err == ErrIterationStopped {
2✔
2419
                                return nil
1✔
2420
                        }
1✔
2421
                        return err
1✔
2422
                }
2423
                ae.mu.RLock()
1✔
2424
        }
2425

2426
        // Build set of cached edge IDs
2427
        cachedIDs := make(map[EdgeID]bool, len(ae.edgeCache))
1✔
2428
        for id := range ae.edgeCache {
2✔
2429
                cachedIDs[id] = true
1✔
2430
        }
1✔
2431
        deletedIDs := make(map[EdgeID]bool, len(ae.deleteEdges))
1✔
2432
        for id := range ae.deleteEdges {
2✔
2433
                deletedIDs[id] = true
1✔
2434
        }
1✔
2435
        ae.mu.RUnlock()
1✔
2436

1✔
2437
        // Stream from underlying engine
1✔
2438
        if streamer, ok := ae.engine.(StreamingEngine); ok {
2✔
2439
                return streamer.StreamEdges(ctx, func(edge *Edge) error {
2✔
2440
                        if cachedIDs[edge.ID] || deletedIDs[edge.ID] {
2✔
2441
                                return nil
1✔
2442
                        }
1✔
2443
                        return fn(edge)
1✔
2444
                })
2445
        }
2446

2447
        // Fallback
2448
        edges, err := ae.engine.AllEdges()
1✔
2449
        if err != nil {
2✔
2450
                return err
1✔
2451
        }
1✔
2452
        for _, edge := range edges {
2✔
2453
                select {
1✔
2454
                case <-ctx.Done():
×
2455
                        return ctx.Err()
×
2456
                default:
1✔
2457
                }
2458
                if cachedIDs[edge.ID] || deletedIDs[edge.ID] {
1✔
2459
                        continue
×
2460
                }
2461
                if err := fn(edge); err != nil {
2✔
2462
                        if err == ErrIterationStopped {
2✔
2463
                                return nil
1✔
2464
                        }
1✔
2465
                        return err
1✔
2466
                }
2467
        }
2468
        return nil
×
2469
}
2470

2471
// StreamNodeChunks implements StreamingEngine.StreamNodeChunks by using StreamNodes.
2472
// We always use StreamNodes (not delegate) to properly merge cache + underlying engine.
2473
func (ae *AsyncEngine) StreamNodeChunks(ctx context.Context, chunkSize int, fn func(nodes []*Node) error) error {
1✔
2474
        // Always use our StreamNodes to properly handle cache + engine merging
1✔
2475
        chunk := make([]*Node, 0, chunkSize)
1✔
2476
        err := ae.StreamNodes(ctx, func(node *Node) error {
2✔
2477
                chunk = append(chunk, node)
1✔
2478
                if len(chunk) >= chunkSize {
2✔
2479
                        if err := fn(chunk); err != nil {
2✔
2480
                                return err
1✔
2481
                        }
1✔
2482
                        chunk = make([]*Node, 0, chunkSize)
1✔
2483
                }
2484
                return nil
1✔
2485
        })
2486
        if err != nil {
2✔
2487
                return err
1✔
2488
        }
1✔
2489
        // Final partial chunk
2490
        if len(chunk) > 0 {
1✔
2491
                return fn(chunk)
×
2492
        }
×
2493
        return nil
1✔
2494
}
2495

2496
// DeleteByPrefix delegates to the underlying engine.
2497
func (ae *AsyncEngine) DeleteByPrefix(prefix string) (nodesDeleted int64, edgesDeleted int64, err error) {
1✔
2498
        // Flush any pending writes first
1✔
2499
        ae.Flush()
1✔
2500
        return ae.engine.DeleteByPrefix(prefix)
1✔
2501
}
1✔
2502

2503
// LastWriteTime returns the last known write time from the underlying engine, if available.
2504
func (ae *AsyncEngine) LastWriteTime() time.Time {
1✔
2505
        if ae == nil {
2✔
2506
                return time.Time{}
1✔
2507
        }
1✔
2508
        if p, ok := ae.engine.(interface{ LastWriteTime() time.Time }); ok {
1✔
2509
                return p.LastWriteTime()
×
2510
        }
×
2511
        return time.Time{}
1✔
2512
}
2513

2514
// Verify AsyncEngine implements Engine interface
2515
var _ Engine = (*AsyncEngine)(nil)
2516

2517
// Verify AsyncEngine implements StreamingEngine interface
2518
var _ StreamingEngine = (*AsyncEngine)(nil)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc