• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orneryd / NornicDB / 25766380596

12 May 2026 10:38PM UTC coverage: 83.772% (-0.008%) from 83.78%
25766380596

push

github

orneryd
fix(macos, package): fix Macos CI

124603 of 148741 relevant lines covered (83.77%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.34
/pkg/nornicdb/embed_queue.go
1
// Package nornicdb provides async embedding worker for background embedding generation.
2
package nornicdb
3

4
import (
5
        "context"
6
        "encoding/json"
7
        "fmt"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/orneryd/nornicdb/pkg/embed"
13
        "github.com/orneryd/nornicdb/pkg/embeddingutil"
14
        "github.com/orneryd/nornicdb/pkg/storage"
15
)
16

17
type deterministicTextChunker interface {
18
        ChunkText(text string, maxTokens, overlap int) ([]string, error)
19
}
20

21
// EmbedWorker manages async embedding generation using a pull-based model.
22
// On each cycle, it scans for nodes without embeddings and processes them.
23
type EmbedWorker struct {
24
        embedder embed.Embedder
25
        storage  storage.Engine
26
        config   *EmbedWorkerConfig
27

28
        ctx         context.Context
29
        cancel      context.CancelFunc
30
        wg          sync.WaitGroup
31
        lifecycleMu sync.Mutex
32

33
        // Trigger channel to wake up worker immediately
34
        trigger chan struct{}
35
        // Debounced external-trigger state (write-lull signaling).
36
        triggerMu            sync.Mutex
37
        triggerDebounceTimer *time.Timer
38
        triggerDebounceSeq   atomic.Uint64
39

40
        // Callback after embedding a node (for search index update)
41
        onEmbedded func(node *storage.Node)
42

43
        // Callback when queue becomes empty (for triggering k-means clustering)
44
        onQueueEmpty func(processedCount int)
45

46
        // Stats
47
        mu sync.Mutex
48
        // Atomic stats fields so /embed/stats never waits on worker map locks.
49
        processed atomic.Int64
50
        failed    atomic.Int64
51
        running   atomic.Bool
52
        closed    atomic.Bool // Set to true when Close() is called
53

54
        // Recently processed node IDs to prevent re-processing before DB commit is visible
55
        // This prevents the same node being processed multiple times in quick succession
56
        recentlyProcessed map[string]time.Time
57

58
        // Track nodes we've already logged as skipped (to avoid log spam)
59
        loggedSkip map[string]bool
60

61
        // Debounce state for k-means clustering trigger
62
        clusterDebounceTimer   *time.Timer
63
        clusterDebounceMu      sync.Mutex
64
        pendingClusterCount    int  // Accumulated count for debounced callback
65
        clusterDebounceRunning bool // Whether a debounce timer is active
66

67
        // claimMu serializes find+claim so only one worker can take a node at a time (prevents double-processing).
68
        claimMu sync.Mutex
69

70
        // workersStarted is true once StartWorkers() has been called (used when DeferWorkerStart is true).
71
        workersStarted bool
72

73
        // initialScanDone ensures startup refresh/index scan runs once for the whole worker pool,
74
        // not once per worker goroutine.
75
        initialScanDone bool
76

77
        // refreshMu/lastRefreshAt throttle full pending-index refresh scans, which are
78
        // potentially expensive on large datasets and can contend with write traffic.
79
        refreshMu     sync.Mutex
80
        lastRefreshAt time.Time
81

82
        // shouldYield reports whether foreground request pressure is high enough that
83
        // background embedding should pause. This keeps tx/commit hot path prioritized.
84
        shouldYield func() bool
85
}
86

87
const minRefreshOnEmptyInterval = 30 * time.Second
88

89
// EmbedWorkerConfig holds configuration for the embedding worker.
90
type EmbedWorkerConfig struct {
91
        // Worker settings
92
        NumWorkers   int           // Number of concurrent workers (default: 1, use more for network/parallel processing)
93
        ScanInterval time.Duration // How often to scan for nodes without embeddings (default: 5s)
94
        BatchDelay   time.Duration // Delay between processing nodes (default: 500ms)
95
        MaxRetries   int           // Max retry attempts per node (default: 3)
96
        // TriggerDebounceDelay delays enqueue-triggered scans until writes lull.
97
        // Each new Enqueue resets the timer. Set 0 for immediate trigger behavior.
98
        TriggerDebounceDelay time.Duration // default: 2s
99

100
        // Text chunking settings.
101
        ChunkSize    int // Max tokens per chunk (default: 512)
102
        ChunkOverlap int // Tokens to overlap between chunks (default: 50)
103
        // EmbedBatchSize caps chunks per EmbedBatch call to avoid oversized requests.
104
        EmbedBatchSize int // Max chunks per batch request (default: 32)
105

106
        // Debounce settings for k-means clustering trigger
107
        ClusterDebounceDelay time.Duration // How long to wait after last embedding before triggering k-means (default: 30s)
108
        ClusterMinBatchSize  int           // Minimum embeddings processed before triggering k-means (default: 10)
109

110
        // Property include/exclude for embedding text (optional)
111
        // PropertiesInclude: if non-empty, only these property keys are used when building embedding text.
112
        PropertiesInclude []string
113
        // PropertiesExclude: these property keys are never used (in addition to built-in metadata skips).
114
        PropertiesExclude []string
115
        // IncludeLabels: if true (default), node labels are prepended to the embedding text.
116
        IncludeLabels bool
117

118
        // DeferWorkerStart, when true, creates the queue but does not start worker goroutines.
119
        // Call StartWorkers() after the database has warmed up (e.g. after search index build).
120
        DeferWorkerStart bool
121
}
122

123
// DefaultEmbedWorkerConfig returns sensible defaults.
124
func DefaultEmbedWorkerConfig() *EmbedWorkerConfig {
1✔
125
        return &EmbedWorkerConfig{
1✔
126
                NumWorkers:           1,                      // Single worker by default
1✔
127
                ScanInterval:         15 * time.Minute,       // Scan for missed nodes every 15 minutes
1✔
128
                BatchDelay:           500 * time.Millisecond, // Delay between processing nodes
1✔
129
                MaxRetries:           3,
1✔
130
                TriggerDebounceDelay: 2 * time.Second,
1✔
131
                ChunkSize:            512,
1✔
132
                ChunkOverlap:         50,
1✔
133
                EmbedBatchSize:       32,
1✔
134
                ClusterDebounceDelay: 30 * time.Second, // Wait 30s after last embedding before k-means
1✔
135
                ClusterMinBatchSize:  10,               // Need at least 10 embeddings to trigger k-means
1✔
136
                PropertiesInclude:    nil,
1✔
137
                PropertiesExclude:    nil,
1✔
138
                IncludeLabels:        true,
1✔
139
        }
1✔
140
}
1✔
141

142
// NewEmbedWorker creates a new async embedding worker pool.
143
// If embedder is nil, the worker will wait for SetEmbedder() to be called.
144
// NumWorkers controls how many concurrent workers process embeddings in parallel.
145
// Use more workers for network-based embedders (OpenAI, etc.) or when you have
146
// multiple GPUs/CPUs available for local embedding generation.
147
func NewEmbedWorker(embedder embed.Embedder, storage storage.Engine, config *EmbedWorkerConfig) *EmbedWorker {
1✔
148
        if config == nil {
2✔
149
                config = DefaultEmbedWorkerConfig()
1✔
150
        }
1✔
151

152
        // Ensure at least 1 worker
153
        if config.NumWorkers < 1 {
2✔
154
                config.NumWorkers = 1
1✔
155
        }
1✔
156
        if config.EmbedBatchSize < 1 {
2✔
157
                config.EmbedBatchSize = 32
1✔
158
        }
1✔
159

160
        ctx, cancel := context.WithCancel(context.Background())
1✔
161

1✔
162
        ew := &EmbedWorker{
1✔
163
                embedder:          embedder,
1✔
164
                storage:           storage,
1✔
165
                config:            config,
1✔
166
                ctx:               ctx,
1✔
167
                cancel:            cancel,
1✔
168
                trigger:           make(chan struct{}, 1),
1✔
169
                recentlyProcessed: make(map[string]time.Time),
1✔
170
                loggedSkip:        make(map[string]bool),
1✔
171
        }
1✔
172

1✔
173
        // Start N workers unless deferred until after DB warmup
1✔
174
        if !config.DeferWorkerStart {
2✔
175
                numWorkers := config.NumWorkers
1✔
176
                for i := 0; i < numWorkers; i++ {
2✔
177
                        ew.wg.Add(1)
1✔
178
                        go ew.worker()
1✔
179
                }
1✔
180
                if numWorkers > 1 {
1✔
181
                        fmt.Printf("🧠 Started %d embedding workers for parallel processing\n", numWorkers)
×
182
                }
×
183
        }
184

185
        return ew
1✔
186
}
187

188
// StartWorkers starts the embedding worker goroutines. It is used when the queue was
189
// created with DeferWorkerStart=true (e.g. to avoid competing with DB warmup). Idempotent.
190
func (ew *EmbedWorker) StartWorkers() {
1✔
191
        ew.lifecycleMu.Lock()
1✔
192
        defer ew.lifecycleMu.Unlock()
1✔
193
        ew.mu.Lock()
1✔
194
        defer ew.mu.Unlock()
1✔
195
        if ew.closed.Load() || ew.workersStarted {
2✔
196
                return
1✔
197
        }
1✔
198
        ew.workersStarted = true
1✔
199
        numWorkers := ew.config.NumWorkers
1✔
200
        if numWorkers < 1 {
1✔
201
                numWorkers = 1
×
202
        }
×
203
        for i := 0; i < numWorkers; i++ {
2✔
204
                ew.wg.Add(1)
1✔
205
                go ew.worker()
1✔
206
        }
1✔
207
        if numWorkers > 1 {
1✔
208
                fmt.Printf("🧠 Started %d embedding workers for parallel processing\n", numWorkers)
×
209
        } else {
1✔
210
                fmt.Println("🧠 Embed queue workers started (after DB warmup)")
1✔
211
        }
1✔
212
}
213

214
// SetEmbedder sets or updates the embedder (for async initialization).
215
// This allows the worker to start before the model is loaded.
216
func (ew *EmbedWorker) SetEmbedder(embedder embed.Embedder) {
1✔
217
        ew.mu.Lock()
1✔
218
        ew.embedder = embedder
1✔
219
        ew.mu.Unlock()
1✔
220
        // Trigger immediate processing now that embedder is available
1✔
221
        ew.TriggerImmediate()
1✔
222
}
1✔
223

224
// SetOnEmbedded sets a callback to be called after a node is embedded.
225
// Use this to update search indexes.
226
func (ew *EmbedWorker) SetOnEmbedded(fn func(node *storage.Node)) {
1✔
227
        ew.onEmbedded = fn
1✔
228
}
1✔
229

230
// SetOnQueueEmpty sets a callback to be called when the queue becomes empty.
231
// Use this to trigger k-means clustering after batch embedding completes.
232
// The callback receives the total number of embeddings processed in this batch.
233
func (ew *EmbedWorker) SetOnQueueEmpty(fn func(processedCount int)) {
1✔
234
        ew.onQueueEmpty = fn
1✔
235
}
1✔
236

237
// SetShouldYield configures an optional pressure probe used to pause background
238
// embedding while foreground request traffic is active.
239
func (ew *EmbedWorker) SetShouldYield(fn func() bool) {
1✔
240
        ew.mu.Lock()
1✔
241
        ew.shouldYield = fn
1✔
242
        ew.mu.Unlock()
1✔
243
}
1✔
244

245
// Trigger wakes up the worker to check for nodes without embeddings.
246
// Call this after creating a new node.
247
func (ew *EmbedWorker) Trigger() {
1✔
248
        if ew.closed.Load() {
2✔
249
                return
1✔
250
        }
1✔
251
        delay := time.Duration(0)
1✔
252
        if ew.config != nil {
2✔
253
                delay = ew.config.TriggerDebounceDelay
1✔
254
        }
1✔
255
        if delay <= 0 {
2✔
256
                ew.signalTrigger()
1✔
257
                return
1✔
258
        }
1✔
259

260
        seq := ew.triggerDebounceSeq.Add(1)
1✔
261
        ew.triggerMu.Lock()
1✔
262
        if ew.triggerDebounceTimer != nil {
1✔
263
                ew.triggerDebounceTimer.Stop()
×
264
        }
×
265
        ew.triggerDebounceTimer = time.AfterFunc(delay, func() {
1✔
266
                if ew.closed.Load() {
×
267
                        return
×
268
                }
×
269
                // Debounce reset behavior: only latest schedule fires.
270
                if ew.triggerDebounceSeq.Load() != seq {
×
271
                        return
×
272
                }
×
273
                ew.signalTrigger()
×
274
        })
275
        ew.triggerMu.Unlock()
1✔
276
}
277

278
// TriggerImmediate bypasses debounce and signals worker immediately.
279
// Use for explicit/manual triggers, not high-frequency write-path enqueue.
280
func (ew *EmbedWorker) TriggerImmediate() {
1✔
281
        ew.signalTrigger()
1✔
282
}
1✔
283

284
func (ew *EmbedWorker) signalTrigger() {
1✔
285
        if ew.closed.Load() {
2✔
286
                return
1✔
287
        }
1✔
288
        select {
1✔
289
        case ew.trigger <- struct{}{}:
1✔
290
        default:
1✔
291
                // Already triggered
292
        }
293
}
294

295
// WorkerStats returns current worker statistics.
296
type WorkerStats struct {
297
        Running   bool `json:"running"`
298
        Processed int  `json:"processed"`
299
        Failed    int  `json:"failed"`
300
}
301

302
// Stats returns current worker statistics.
303
// QueueLen returns the current pending-embedding queue depth for the
304
// observability nornicdb_embed_queue_depth GaugeFunc (Plan 04-05 D-15b).
305
//
306
// EmbedWorker is pull-based: there is no in-memory queue of work items
307
// waiting to be processed. Instead, the worker periodically scans
308
// storage for nodes lacking embeddings (the storage-side
309
// "pending-embedding index" is the durable queue). For the M1 scrape
310
// surface we report the trigger channel buffer depth (a coarse upper
311
// bound on outstanding wake-up signals); deeper visibility into the
312
// storage-side pending count is deferred to a future plan that wires
313
// the AddToPendingEmbeddings counter through here.
314
//
315
// The metric value is therefore a lower-bound on actual outstanding
316
// work — when alerts trigger SREs should also check the storage-side
317
// pending-embeddings index. Documented in CONTEXT D-15b.
318
func (ew *EmbedWorker) QueueLen() int {
×
319
        if ew == nil {
×
320
                return 0
×
321
        }
×
322
        return len(ew.trigger)
×
323
}
324

325
func (ew *EmbedWorker) Stats() WorkerStats {
1✔
326
        return WorkerStats{
1✔
327
                Running:   ew.running.Load(),
1✔
328
                Processed: int(ew.processed.Load()),
1✔
329
                Failed:    int(ew.failed.Load()),
1✔
330
        }
1✔
331
}
1✔
332

333
// Reset stops the current worker and restarts it fresh.
334
// This clears processed counts and the recently-processed cache,
335
// which is necessary when regenerating all embeddings.
336
func (ew *EmbedWorker) Reset() {
1✔
337
        ew.lifecycleMu.Lock()
1✔
338
        defer ew.lifecycleMu.Unlock()
1✔
339

1✔
340
        ew.mu.Lock()
1✔
341
        if ew.closed.Load() {
1✔
342
                ew.mu.Unlock()
×
343
                return
×
344
        }
×
345
        // Mark as resetting to prevent Trigger() from sending during reset
346
        wasRunning := ew.running.Load()
1✔
347
        ew.mu.Unlock()
1✔
348

1✔
349
        fmt.Println("🔄 Resetting embed worker for regeneration...")
1✔
350

1✔
351
        // Cancel context to stop current processing
1✔
352
        ew.cancel()
1✔
353

1✔
354
        // Stop pending debounced trigger timer.
1✔
355
        ew.triggerMu.Lock()
1✔
356
        if ew.triggerDebounceTimer != nil {
1✔
357
                ew.triggerDebounceTimer.Stop()
×
358
                ew.triggerDebounceTimer = nil
×
359
        }
×
360
        ew.triggerMu.Unlock()
1✔
361

1✔
362
        // Wait synchronously for previous workers to exit before reusing the WaitGroup.
1✔
363
        // This avoids "WaitGroup is reused before previous Wait has returned" panics
1✔
364
        // when Reset and Close overlap under load.
1✔
365
        ew.wg.Wait()
1✔
366
        if ew.closed.Load() {
1✔
367
                return
×
368
        }
×
369

370
        // Reset state under lock
371
        ew.mu.Lock()
1✔
372
        ew.initialScanDone = false
1✔
373
        ew.recentlyProcessed = make(map[string]time.Time)
1✔
374
        ew.loggedSkip = make(map[string]bool)
1✔
375
        ew.mu.Unlock()
1✔
376
        ew.processed.Store(0)
1✔
377
        ew.failed.Store(0)
1✔
378
        ew.running.Store(false)
1✔
379

1✔
380
        // Create new context (don't recreate trigger channel - just drain it)
1✔
381
        ew.ctx, ew.cancel = context.WithCancel(context.Background())
1✔
382

1✔
383
        // Drain any pending triggers
1✔
384
        select {
1✔
385
        case <-ew.trigger:
1✔
386
        default:
×
387
        }
388

389
        // Restart worker
390
        ew.wg.Add(1)
1✔
391
        go ew.worker()
1✔
392

1✔
393
        _ = wasRunning // suppress unused warning
1✔
394
        fmt.Println("✅ Embed worker reset complete, starting fresh scan")
1✔
395
}
396

397
// Close gracefully shuts down the worker.
398
func (ew *EmbedWorker) Close() {
1✔
399
        ew.lifecycleMu.Lock()
1✔
400
        defer ew.lifecycleMu.Unlock()
1✔
401

1✔
402
        ew.closed.Store(true)
1✔
403

1✔
404
        // Stop pending debounced trigger timer.
1✔
405
        ew.triggerMu.Lock()
1✔
406
        if ew.triggerDebounceTimer != nil {
2✔
407
                ew.triggerDebounceTimer.Stop()
1✔
408
                ew.triggerDebounceTimer = nil
1✔
409
        }
1✔
410
        ew.triggerMu.Unlock()
1✔
411

1✔
412
        // Stop any pending debounce timer
1✔
413
        ew.clusterDebounceMu.Lock()
1✔
414
        if ew.clusterDebounceTimer != nil {
2✔
415
                ew.clusterDebounceTimer.Stop()
1✔
416
                ew.clusterDebounceTimer = nil
1✔
417
        }
1✔
418
        ew.clusterDebounceMu.Unlock()
1✔
419

1✔
420
        ew.cancel()
1✔
421
        // Do NOT close trigger channel: Trigger() can still race and send, which would panic.
1✔
422
        // Context cancellation is enough to stop workers.
1✔
423
        // Wait synchronously for worker shutdown to complete.
1✔
424
        ew.wg.Wait()
1✔
425
}
426

427
// scheduleClusteringDebounced accumulates embedding counts and debounces the k-means trigger.
428
// This prevents constant re-clustering when embeddings trickle in one at a time.
429
// The callback will fire after ClusterDebounceDelay of inactivity, if MinBatchSize is met.
430
func (ew *EmbedWorker) scheduleClusteringDebounced(processedCount int) {
1✔
431
        ew.clusterDebounceMu.Lock()
1✔
432
        defer ew.clusterDebounceMu.Unlock()
1✔
433

1✔
434
        // Accumulate the count
1✔
435
        ew.pendingClusterCount += processedCount
1✔
436

1✔
437
        // Cancel existing timer if any
1✔
438
        if ew.clusterDebounceTimer != nil {
2✔
439
                ew.clusterDebounceTimer.Stop()
1✔
440
        }
1✔
441

442
        // Get debounce delay from config (default 30s)
443
        delay := ew.config.ClusterDebounceDelay
1✔
444
        if delay == 0 {
1✔
445
                delay = 30 * time.Second
×
446
        }
×
447

448
        // Get minimum batch size from config (default 10)
449
        minBatch := ew.config.ClusterMinBatchSize
1✔
450
        if minBatch == 0 {
1✔
451
                minBatch = 10
×
452
        }
×
453

454
        // Schedule new timer
455
        ew.clusterDebounceRunning = true
1✔
456
        ew.clusterDebounceTimer = time.AfterFunc(delay, func() {
2✔
457
                ew.clusterDebounceMu.Lock()
1✔
458
                count := ew.pendingClusterCount
1✔
459
                ew.pendingClusterCount = 0
1✔
460
                ew.clusterDebounceRunning = false
1✔
461
                ew.clusterDebounceTimer = nil
1✔
462
                ew.clusterDebounceMu.Unlock()
1✔
463

1✔
464
                // Only trigger if we have enough embeddings
1✔
465
                if count >= minBatch && ew.onQueueEmpty != nil {
2✔
466
                        fmt.Printf("🔬 Debounced k-means trigger: %d embeddings processed (waited %.0fs for more)\n", count, delay.Seconds())
1✔
467
                        ew.onQueueEmpty(count)
1✔
468
                } else if count > 0 && count < minBatch {
1✔
469
                        fmt.Printf("⏸️  Skipping k-means: only %d embeddings (min batch: %d)\n", count, minBatch)
×
470
                }
×
471
        })
472

473
        fmt.Printf("⏳ K-means debounce: %d pending embeddings, will trigger in %.0fs if no more arrive\n",
1✔
474
                ew.pendingClusterCount, delay.Seconds())
1✔
475
}
476

477
// worker runs the embedding loop.
478
func (ew *EmbedWorker) worker() {
1✔
479
        defer ew.wg.Done()
1✔
480

1✔
481
        fmt.Println("🧠 Embed worker started")
1✔
482

1✔
483
        // Wait for embedder to be set (async model loading)
1✔
484
        if ew.embedder == nil {
2✔
485
                fmt.Println("⏳ Waiting for embedding model to load...")
1✔
486
                for {
2✔
487
                        ew.mu.Lock()
1✔
488
                        hasEmbedder := ew.embedder != nil
1✔
489
                        ew.mu.Unlock()
1✔
490

1✔
491
                        if hasEmbedder {
1✔
492
                                fmt.Println("✅ Embedding model loaded, worker active")
×
493
                                break
×
494
                        }
495
                        if ew.closed.Load() {
2✔
496
                                return
1✔
497
                        }
1✔
498

499
                        select {
×
500
                        case <-ew.ctx.Done():
×
501
                                return
×
502
                        case <-time.After(1 * time.Second):
×
503
                                // Check again
504
                        }
505
                }
506
        }
507

508
        // Short initial delay to let server start
509
        time.Sleep(500 * time.Millisecond)
1✔
510

1✔
511
        // Refresh the pending embeddings index on startup to catch any nodes
1✔
512
        // that need embedding (e.g., after restart, bulk import, or cleared embeddings).
1✔
513
        // Run this once for the whole worker pool to avoid duplicate startup scans/logs.
1✔
514
        ew.mu.Lock()
1✔
515
        doInitialScan := !ew.initialScanDone
1✔
516
        if doInitialScan {
2✔
517
                ew.initialScanDone = true
1✔
518
        }
1✔
519
        ew.mu.Unlock()
1✔
520
        if doInitialScan {
2✔
521
                fmt.Println("🔍 Initial scan for nodes needing embeddings...")
1✔
522
                // Refresh index to clean up stale entries from deleted nodes
1✔
523
                ew.refreshEmbeddingIndexIfDue(true)
1✔
524
        }
1✔
525

526
        ew.processUntilEmpty()
1✔
527

1✔
528
        ticker := time.NewTicker(ew.config.ScanInterval)
1✔
529
        defer ticker.Stop()
1✔
530

1✔
531
        for {
2✔
532
                select {
1✔
533
                case <-ew.ctx.Done():
1✔
534
                        fmt.Println("🧠 Embed worker stopped")
1✔
535
                        return
1✔
536

537
                case <-ew.trigger:
1✔
538
                        // Immediate trigger - process until queue is empty
1✔
539
                        ew.processUntilEmpty()
1✔
540

541
                case <-ticker.C:
×
542
                        // Regular interval scan
×
543
                        ew.processNextBatch()
×
544
                }
545
        }
546
}
547

548
// processUntilEmpty keeps processing nodes until no more need embeddings.
549
// When the queue becomes empty, it schedules a debounced k-means clustering trigger.
550
func (ew *EmbedWorker) processUntilEmpty() {
1✔
551
        batchProcessed := 0
1✔
552
        consecutiveEmptyCount := 0
1✔
553
        maxConsecutiveEmpty := 3 // Stop after 3 consecutive empty checks
1✔
554

1✔
555
        for {
2✔
556
                select {
1✔
557
                case <-ew.ctx.Done():
1✔
558
                        return
1✔
559
                default:
1✔
560
                        // processNextBatch returns true if it actually processed or skipped a node
1✔
561
                        // It returns false if there was nothing to process
1✔
562
                        didWork := ew.processNextBatch()
1✔
563
                        if !didWork {
2✔
564
                                consecutiveEmptyCount++
1✔
565
                                if consecutiveEmptyCount == 1 {
2✔
566
                                        // First empty check - refresh index to catch any new nodes and clean up stale entries
1✔
567
                                        removed := ew.refreshEmbeddingIndexIfDue(false)
1✔
568
                                        if removed > 0 {
1✔
569
                                                fmt.Printf("🧹 Cleaned up %d stale entries from pending embeddings index\n", removed)
×
570
                                                // Reset counter since we found and cleaned stale entries - try again
×
571
                                                consecutiveEmptyCount = 0
×
572
                                                continue
×
573
                                        }
574
                                } else if consecutiveEmptyCount >= maxConsecutiveEmpty {
2✔
575
                                        // Multiple empty checks - we're done
1✔
576
                                        // Queue is empty - schedule debounced k-means callback if we processed anything
1✔
577
                                        if batchProcessed > 0 && ew.onQueueEmpty != nil {
1✔
578
                                                ew.scheduleClusteringDebounced(batchProcessed)
×
579
                                        }
×
580
                                        return // No more nodes to process
1✔
581
                                }
582
                                // Small delay before next check
583
                                time.Sleep(100 * time.Millisecond)
1✔
584
                        } else {
1✔
585
                                // Successfully processed - reset counter
1✔
586
                                consecutiveEmptyCount = 0
1✔
587
                                batchProcessed++
1✔
588
                                // Small delay between batches to avoid CPU spin
1✔
589
                                time.Sleep(50 * time.Millisecond)
1✔
590
                        }
1✔
591
                }
592
        }
593
}
594

595
// processNextBatch finds and processes nodes without embeddings.
596
// Returns true if it did useful work (processed or permanently skipped a node).
597
// Returns false if there was nothing to process or if a node was temporarily skipped.
598
func (ew *EmbedWorker) processNextBatch() bool {
1✔
599
        // Check for cancellation at the start
1✔
600
        select {
1✔
601
        case <-ew.ctx.Done():
×
602
                return false
×
603
        default:
1✔
604
        }
605

606
        // Foreground-first policy: if tx load is active, pause background embedding.
607
        ew.mu.Lock()
1✔
608
        shouldYield := ew.shouldYield
1✔
609
        ew.mu.Unlock()
1✔
610
        if shouldYield != nil && shouldYield() {
2✔
611
                time.Sleep(25 * time.Millisecond)
1✔
612
                ew.signalTrigger()
1✔
613
                return false
1✔
614
        }
1✔
615

616
        ew.running.Store(true)
1✔
617

1✔
618
        defer func() {
2✔
619
                ew.running.Store(false)
1✔
620
        }()
1✔
621

622
        // Serialize find+claim so only one worker can take a node at a time (prevents double-processing).
623
        ew.claimMu.Lock()
1✔
624
        node := ew.findNodeWithoutEmbedding()
1✔
625
        if node == nil {
2✔
626
                ew.claimMu.Unlock()
1✔
627
                return false // Nothing to process
1✔
628
        }
1✔
629

630
        // Check for cancellation before processing
631
        select {
1✔
632
        case <-ew.ctx.Done():
×
633
                ew.claimMu.Unlock()
×
634
                return false
×
635
        default:
1✔
636
        }
637

638
        // CRITICAL: Verify node still exists before processing
639
        // Node might have been deleted between index lookup and now
640
        // This prevents trying to embed deleted nodes
641
        existingNode, err := ew.storage.GetNode(node.ID)
1✔
642
        if err != nil {
2✔
643
                // Node was deleted - remove from pending index and skip
1✔
644
                fmt.Printf("⚠️  Node %s from pending index doesn't exist - removing stale entry\n", node.ID)
1✔
645
                ew.markNodeEmbedded(node.ID)
1✔
646
                ew.claimMu.Unlock()
1✔
647
                return false // Skip this node, try next one
1✔
648
        }
1✔
649

650
        // DEBUG: Verify the node we found matches what's in storage
651
        if existingNode == nil {
2✔
652
                fmt.Printf("⚠️  Node %s from pending index is nil - removing stale entry\n", node.ID)
1✔
653
                ew.markNodeEmbedded(node.ID)
1✔
654
                ew.claimMu.Unlock()
1✔
655
                return false
1✔
656
        }
1✔
657

658
        // Update node with latest data from storage
659
        node = existingNode
1✔
660

1✔
661
        // Check if this node was recently processed (prevents re-processing before DB commit is visible)
1✔
662
        ew.mu.Lock()
1✔
663
        if ew.recentlyProcessed == nil {
2✔
664
                ew.recentlyProcessed = make(map[string]time.Time)
1✔
665
        }
1✔
666
        if ew.loggedSkip == nil {
2✔
667
                ew.loggedSkip = make(map[string]bool)
1✔
668
        }
1✔
669
        if lastProcessed, ok := ew.recentlyProcessed[string(node.ID)]; ok {
1✔
670
                if time.Since(lastProcessed) < 30*time.Second {
×
671
                        if !ew.loggedSkip[string(node.ID)] {
×
672
                                ew.loggedSkip[string(node.ID)] = true
×
673
                                fmt.Printf("⏭️  Skipping node %s: recently processed (waiting for DB sync)\n", node.ID)
×
674
                        }
×
675
                        ew.mu.Unlock()
×
676
                        ew.claimMu.Unlock()
×
677
                        return false // Temporary skip - don't continue looping
×
678
                }
679
                delete(ew.loggedSkip, string(node.ID))
×
680
        }
681
        // Clean up old entries (older than 1 minute)
682
        for id, t := range ew.recentlyProcessed {
2✔
683
                if time.Since(t) > time.Minute {
2✔
684
                        delete(ew.recentlyProcessed, id)
1✔
685
                        delete(ew.loggedSkip, id)
1✔
686
                }
1✔
687
        }
688
        ew.mu.Unlock()
1✔
689

1✔
690
        // Claim the node so no other worker can pick it (remove from pending index now; re-queue on failure).
1✔
691
        ew.markNodeEmbedded(node.ID)
1✔
692
        ew.claimMu.Unlock()
1✔
693

1✔
694
        fmt.Printf("🔄 Processing node %s for embedding...\n", node.ID)
1✔
695

1✔
696
        // IMPORTANT: Deep copy properties to avoid race conditions
1✔
697
        // The node from storage may be accessed by other goroutines (e.g., HTTP handlers)
1✔
698
        // Modifying the Properties map directly causes "concurrent map iteration and map write"
1✔
699
        node = copyNodeForEmbedding(node)
1✔
700

1✔
701
        // Build text for embedding (labels and properties per config include/exclude)
1✔
702
        opts := embeddingutil.EmbedTextOptionsFromFields(ew.config.PropertiesInclude, ew.config.PropertiesExclude, ew.config.IncludeLabels)
1✔
703
        text := embeddingutil.BuildText(node.Properties, node.Labels, opts)
1✔
704

1✔
705
        chunker, ok := ew.embedder.(deterministicTextChunker)
1✔
706
        if !ok {
1✔
707
                fmt.Printf("⚠️  Failed to chunk node %s: embedder %T does not support deterministic token chunking\n", node.ID, ew.embedder)
×
708
                ew.addNodeToPendingEmbeddings(node.ID)
×
709
                ew.failed.Add(1)
×
710
                return true
×
711
        }
×
712

713
        // Chunk text using the embedder's tokenizer so every chunk respects the true token cap.
714
        chunks, err := chunker.ChunkText(text, ew.config.ChunkSize, ew.config.ChunkOverlap)
1✔
715
        if err != nil {
1✔
716
                fmt.Printf("⚠️  Failed to chunk node %s: %v\n", node.ID, err)
×
717
                ew.addNodeToPendingEmbeddings(node.ID)
×
718
                ew.failed.Add(1)
×
719
                return true
×
720
        }
×
721

722
        // Embed chunks in micro-batches to avoid oversized single requests for large files.
723
        embeddings, err := ew.embedChunksInBatches(chunks, node.ID)
1✔
724
        if err != nil {
2✔
725
                fmt.Printf("⚠️  Failed to embed node %s: %v\n", node.ID, err)
1✔
726
                ew.addNodeToPendingEmbeddings(node.ID) // Re-queue so another worker can retry
1✔
727
                ew.failed.Add(1)
1✔
728
                return true
1✔
729
        }
1✔
730

731
        // Validate embeddings were generated
732
        if len(embeddings) == 0 || embeddings[0] == nil || len(embeddings[0]) == 0 {
2✔
733
                fmt.Printf("⚠️  Failed to generate embedding for node %s: empty embedding\n", node.ID)
1✔
734
                ew.addNodeToPendingEmbeddings(node.ID) // Re-queue so another worker can retry
1✔
735
                ew.failed.Add(1)
1✔
736
                return true // Failed but we tried - continue to next node
1✔
737
        }
1✔
738

739
        // Persist worker-managed embedding fields in a shared canonical shape.
740
        embeddingutil.ApplyManagedEmbedding(node, embeddings, ew.embedder.Model(), ew.embedder.Dimensions(), time.Now())
1✔
741

1✔
742
        // CRITICAL: Double-check node still exists before updating
1✔
743
        // This prevents creating orphaned nodes if the node was deleted between
1✔
744
        // the initial check and now. Reload from storage to get latest version.
1✔
745
        // BUT: Preserve the embeddings we just generated!
1✔
746
        chunkEmbeddingsToSave := node.ChunkEmbeddings // Save the chunk embeddings we just generated
1✔
747
        embedMetaToSave := make(map[string]any)
1✔
748
        if node.EmbedMeta != nil {
2✔
749
                // Save embedding metadata
1✔
750
                for k, v := range node.EmbedMeta {
2✔
751
                        embedMetaToSave[k] = v
1✔
752
                }
1✔
753
        }
754

755
        existingNode, err = ew.storage.GetNode(node.ID)
1✔
756
        if err != nil {
2✔
757
                // Node was deleted - remove from pending index and skip
1✔
758
                fmt.Printf("⚠️  Node %s was deleted before embedding could be saved - skipping\n", node.ID)
1✔
759
                ew.markNodeEmbedded(node.ID)
1✔
760
                return false // Skip this node, try next one
1✔
761
        }
1✔
762

763
        // CRITICAL: Preserve the embeddings we just generated!
764
        // Don't overwrite node with existingNode - that would lose the embeddings
765
        // Instead, update the existing node's embedding field while preserving other fields
766
        node = existingNode                          // Get latest data from storage
1✔
767
        node.ChunkEmbeddings = chunkEmbeddingsToSave // Restore chunk embeddings (struct field, opaque to users)
1✔
768
        node.UpdatedAt = time.Now()                  // Update timestamp
1✔
769

1✔
770
        // Restore embedding metadata (in EmbedMeta, not Properties)
1✔
771
        node.EmbedMeta = embedMetaToSave
1✔
772

1✔
773
        // Save the parent node (either with embedding for single chunk, or metadata for chunked files)
1✔
774
        // CRITICAL: Use UpdateNodeEmbedding if available (only updates existing nodes, doesn't create)
1✔
775
        // This prevents creating orphaned nodes when the pending index has stale entries
1✔
776
        var updateErr error
1✔
777
        if embedUpdater, ok := ew.storage.(interface{ UpdateNodeEmbedding(*storage.Node) error }); ok {
2✔
778
                // UpdateNodeEmbedding only updates existing nodes - returns ErrNotFound if node doesn't exist
1✔
779
                updateErr = embedUpdater.UpdateNodeEmbedding(node)
1✔
780
                if updateErr == storage.ErrNotFound {
2✔
781
                        // Node was deleted - remove from pending index and skip
1✔
782
                        fmt.Printf("⚠️  Node %s was deleted - skipping update to prevent orphaned node\n", node.ID)
1✔
783
                        ew.markNodeEmbedded(node.ID)
1✔
784
                        return false
1✔
785
                }
1✔
786
        } else {
1✔
787
                // Fallback: UpdateNode has upsert behavior which can create orphaned nodes
1✔
788
                // This should only happen if the storage engine doesn't support UpdateNodeEmbedding
1✔
789
                // For safety, we've already verified the node exists above
1✔
790
                updateErr = ew.storage.UpdateNode(node)
1✔
791
        }
1✔
792
        if updateErr != nil {
1✔
793
                // If update failed because node doesn't exist, skip it (already claimed, don't re-queue)
×
794
                if updateErr == storage.ErrNotFound {
×
795
                        fmt.Printf("⚠️  Node %s doesn't exist - skipping update to prevent orphaned node\n", node.ID)
×
796
                        return false
×
797
                }
×
798
                fmt.Printf("⚠️  Failed to update node %s: %v\n", node.ID, updateErr)
×
799
                ew.addNodeToPendingEmbeddings(node.ID) // Re-queue so another worker can retry
×
800
                ew.failed.Add(1)
×
801
                return true // Failed but we tried - continue to next node
×
802
        }
803

804
        // Call callback to update search index
805
        if ew.onEmbedded != nil {
2✔
806
                ew.onEmbedded(node)
1✔
807
        }
1✔
808

809
        // Remove from pending embeddings index (O(1) operation)
810
        ew.markNodeEmbedded(node.ID)
1✔
811

1✔
812
        ew.processed.Add(1)
1✔
813
        // Track this node as recently processed to prevent re-processing before DB commit is visible
1✔
814
        ew.mu.Lock()
1✔
815
        if ew.recentlyProcessed == nil {
1✔
816
                ew.recentlyProcessed = make(map[string]time.Time)
×
817
        }
×
818
        ew.recentlyProcessed[string(node.ID)] = time.Now()
1✔
819
        ew.mu.Unlock()
1✔
820

1✔
821
        // Log success with appropriate message
1✔
822
        if len(node.ChunkEmbeddings) > 0 {
2✔
823
                dims := 0
1✔
824
                if len(node.ChunkEmbeddings[0]) > 0 {
2✔
825
                        dims = len(node.ChunkEmbeddings[0])
1✔
826
                }
1✔
827
                if len(node.ChunkEmbeddings) > 1 {
2✔
828
                        fmt.Printf("✅ Embedded %s (%d dims, %d chunks)\n", node.ID, dims, len(node.ChunkEmbeddings))
1✔
829
                } else {
2✔
830
                        fmt.Printf("✅ Embedded %s (%d dims)\n", node.ID, dims)
1✔
831
                }
1✔
832
        }
833

834
        // Small delay before next
835
        time.Sleep(ew.config.BatchDelay)
1✔
836

1✔
837
        // Trigger another check immediately if there might be more.
1✔
838
        // Internal chaining should not be debounced.
1✔
839
        ew.signalTrigger()
1✔
840

1✔
841
        return true // Successfully processed
1✔
842
}
843

844
// EmbeddingFinder interface for efficient node lookup
845
type EmbeddingFinder interface {
846
        FindNodeNeedingEmbedding() *storage.Node
847
}
848

849
// EmbeddingIndexManager is an optional interface for storage engines
850
// that support efficient pending embeddings tracking via Badger secondary index.
851
type EmbeddingIndexManager interface {
852
        RefreshPendingEmbeddingsIndex() int
853
        MarkNodeEmbedded(nodeID storage.NodeID)
854
}
855

856
// findNodeWithoutEmbedding finds a single node that needs embedding.
857
// Uses efficient streaming iteration if available, falls back to AllNodes.
858
func (ew *EmbedWorker) findNodeWithoutEmbedding() *storage.Node {
1✔
859
        // Try efficient streaming method first (BadgerEngine, WALEngine)
1✔
860
        if finder, ok := ew.storage.(EmbeddingFinder); ok {
2✔
861
                return finder.FindNodeNeedingEmbedding()
1✔
862
        }
1✔
863

864
        // Fallback: use storage helper
865
        return storage.FindNodeNeedingEmbedding(ew.storage)
1✔
866
}
867

868
// refreshEmbeddingIndex refreshes the pending embeddings index
869
// to catch any nodes that were added during processing.
870
// Returns the number of stale entries removed.
871
func (ew *EmbedWorker) refreshEmbeddingIndex() int {
1✔
872
        if mgr, ok := ew.storage.(EmbeddingIndexManager); ok {
2✔
873
                return mgr.RefreshPendingEmbeddingsIndex()
1✔
874
        }
1✔
875
        return 0
1✔
876
}
877

878
// refreshEmbeddingIndexIfDue runs a full pending-index refresh at most once per
879
// minRefreshOnEmptyInterval unless forced. This avoids repeated full scans when
880
// the worker is repeatedly triggered by bursty writes.
881
func (ew *EmbedWorker) refreshEmbeddingIndexIfDue(force bool) int {
1✔
882
        ew.refreshMu.Lock()
1✔
883
        if !force && !ew.lastRefreshAt.IsZero() && time.Since(ew.lastRefreshAt) < minRefreshOnEmptyInterval {
2✔
884
                ew.refreshMu.Unlock()
1✔
885
                return 0
1✔
886
        }
1✔
887
        ew.lastRefreshAt = time.Now()
1✔
888
        ew.refreshMu.Unlock()
1✔
889
        return ew.refreshEmbeddingIndex()
1✔
890
}
891

892
// markNodeEmbedded removes a node from the pending embeddings index.
893
func (ew *EmbedWorker) markNodeEmbedded(nodeID storage.NodeID) {
1✔
894
        if mgr, ok := ew.storage.(EmbeddingIndexManager); ok {
2✔
895
                mgr.MarkNodeEmbedded(nodeID)
1✔
896
        }
1✔
897
}
898

899
// addNodeToPendingEmbeddings re-queues a node for embedding (e.g. after a failed attempt so another worker can retry).
900
func (ew *EmbedWorker) addNodeToPendingEmbeddings(nodeID storage.NodeID) {
1✔
901
        if adder, ok := ew.storage.(interface{ AddToPendingEmbeddings(storage.NodeID) }); ok {
2✔
902
                adder.AddToPendingEmbeddings(nodeID)
1✔
903
        }
1✔
904
}
905

906
// embedChunksInBatches embeds chunks using bounded request sizes.
907
// This avoids sending massive single EmbedBatch requests for large files.
908
func (ew *EmbedWorker) embedChunksInBatches(chunks []string, nodeID storage.NodeID) ([][]float32, error) {
1✔
909
        if len(chunks) == 0 {
1✔
910
                return nil, nil
×
911
        }
×
912
        batchSize := ew.config.EmbedBatchSize
1✔
913
        if batchSize < 1 {
2✔
914
                batchSize = 32
1✔
915
        }
1✔
916
        allEmbeddings := make([][]float32, 0, len(chunks))
1✔
917
        for start := 0; start < len(chunks); start += batchSize {
2✔
918
                end := start + batchSize
1✔
919
                if end > len(chunks) {
2✔
920
                        end = len(chunks)
1✔
921
                }
1✔
922
                batch := chunks[start:end]
1✔
923
                batchEmbeddings, err := ew.embedBatchWithRetry(batch)
1✔
924
                if err != nil {
2✔
925
                        return nil, fmt.Errorf("batch %d-%d/%d failed for %s: %w", start+1, end, len(chunks), nodeID, err)
1✔
926
                }
1✔
927
                if len(batchEmbeddings) != len(batch) {
2✔
928
                        return nil, fmt.Errorf("embedding count mismatch for %s: got %d, expected %d", nodeID, len(batchEmbeddings), len(batch))
1✔
929
                }
1✔
930
                allEmbeddings = append(allEmbeddings, batchEmbeddings...)
1✔
931
        }
932
        return allEmbeddings, nil
1✔
933
}
934

935
// embedBatchWithRetry retries a single micro-batch with backoff.
936
func (ew *EmbedWorker) embedBatchWithRetry(chunks []string) ([][]float32, error) {
1✔
937
        var embeddings [][]float32
1✔
938
        var err error
1✔
939
        for attempt := 1; attempt <= ew.config.MaxRetries; attempt++ {
2✔
940
                type embedResult struct {
1✔
941
                        embeddings [][]float32
1✔
942
                        err        error
1✔
943
                }
1✔
944
                resultCh := make(chan embedResult, 1)
1✔
945
                go func() {
2✔
946
                        embs, embedErr := ew.embedder.EmbedBatch(ew.ctx, chunks)
1✔
947
                        resultCh <- embedResult{embeddings: embs, err: embedErr}
1✔
948
                }()
1✔
949
                select {
1✔
950
                case <-ew.ctx.Done():
1✔
951
                        return nil, ew.ctx.Err()
1✔
952
                case result := <-resultCh:
1✔
953
                        embeddings, err = result.embeddings, result.err
1✔
954
                }
955
                if err == nil {
2✔
956
                        return embeddings, nil
1✔
957
                }
1✔
958
                if attempt < ew.config.MaxRetries {
2✔
959
                        backoff := time.Duration(attempt) * 2 * time.Second
1✔
960
                        fmt.Printf("   ⚠️  Embed batch attempt %d failed (batch_size=%d), retrying in %v\n", attempt, len(chunks), backoff)
1✔
961
                        select {
1✔
962
                        case <-ew.ctx.Done():
×
963
                                return nil, ew.ctx.Err()
×
964
                        case <-time.After(backoff):
1✔
965
                        }
966
                        continue
1✔
967
                }
968
        }
969
        return nil, err
1✔
970
}
971

972
// averageEmbeddings computes the element-wise average of multiple embeddings.
973
func averageEmbeddings(embeddings [][]float32) []float32 {
1✔
974
        if len(embeddings) == 0 {
2✔
975
                return nil
1✔
976
        }
1✔
977
        if len(embeddings) == 1 {
2✔
978
                return embeddings[0]
1✔
979
        }
1✔
980

981
        dims := len(embeddings[0])
1✔
982
        avg := make([]float32, dims)
1✔
983

1✔
984
        for _, emb := range embeddings {
2✔
985
                for i, v := range emb {
2✔
986
                        if i < dims {
2✔
987
                                avg[i] += v
1✔
988
                        }
1✔
989
                }
990
        }
991

992
        n := float32(len(embeddings))
1✔
993
        for i := range avg {
2✔
994
                avg[i] /= n
1✔
995
        }
1✔
996

997
        return avg
1✔
998
}
999

1000
// MarshalJSON for worker stats.
1001
func (s WorkerStats) MarshalJSON() ([]byte, error) {
1✔
1002
        return json.Marshal(map[string]interface{}{
1✔
1003
                "running":   s.Running,
1✔
1004
                "processed": s.Processed,
1✔
1005
                "failed":    s.Failed,
1✔
1006
        })
1✔
1007
}
1✔
1008

1009
// copyNodeForEmbedding creates a deep copy of a node to avoid race conditions.
1010
// The original node from storage may be accessed by other goroutines (HTTP handlers,
1011
// search service, etc.) Modifying the Properties map directly while another goroutine
1012
// iterates over it causes "concurrent map iteration and map write" panic.
1013
//
1014
// This function copies:
1015
//   - All scalar fields (ID, Labels, Embedding, etc.)
1016
//   - Deep copy of Properties map
1017
func copyNodeForEmbedding(src *storage.Node) *storage.Node {
1✔
1018
        if src == nil {
2✔
1019
                return nil
1✔
1020
        }
1✔
1021

1022
        // Create a new node with copied scalar fields
1023
        dst := &storage.Node{
1✔
1024
                ID:        src.ID,
1✔
1025
                Labels:    make([]string, len(src.Labels)),
1✔
1026
                CreatedAt: src.CreatedAt,
1✔
1027
                UpdatedAt: src.UpdatedAt,
1✔
1028
        }
1✔
1029

1✔
1030
        // Copy labels
1✔
1031
        copy(dst.Labels, src.Labels)
1✔
1032

1✔
1033
        // Copy chunk embeddings if present (always stored in ChunkEmbeddings, even single chunk = array of 1)
1✔
1034
        if len(src.ChunkEmbeddings) > 0 {
2✔
1035
                dst.ChunkEmbeddings = make([][]float32, len(src.ChunkEmbeddings))
1✔
1036
                for i, emb := range src.ChunkEmbeddings {
2✔
1037
                        dst.ChunkEmbeddings[i] = make([]float32, len(emb))
1✔
1038
                        copy(dst.ChunkEmbeddings[i], emb)
1✔
1039
                }
1✔
1040
        }
1041

1042
        // Deep copy Properties map - this is the critical part to avoid race condition
1043
        if src.Properties != nil {
2✔
1044
                dst.Properties = make(map[string]any, len(src.Properties))
1✔
1045
                for k, v := range src.Properties {
2✔
1046
                        dst.Properties[k] = v // Shallow copy of values is OK for our use case
1✔
1047
                }
1✔
1048
        }
1049

1050
        return dst
1✔
1051
}
1052

1053
// Legacy aliases for compatibility with existing code
1054
type EmbedQueue = EmbedWorker
1055
type EmbedQueueConfig = EmbedWorkerConfig
1056
type QueueStats = WorkerStats
1057

1058
func DefaultEmbedQueueConfig() *EmbedQueueConfig {
1✔
1059
        return DefaultEmbedWorkerConfig()
1✔
1060
}
1✔
1061

1062
func NewEmbedQueue(embedder embed.Embedder, storage storage.Engine, config *EmbedQueueConfig) *EmbedQueue {
1✔
1063
        return NewEmbedWorker(embedder, storage, config)
1✔
1064
}
1✔
1065

1066
// Enqueue is now just a trigger - tells worker to check for work.
1067
func (ew *EmbedWorker) Enqueue(nodeID string) {
1✔
1068
        ew.Trigger()
1✔
1069
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc