• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orneryd / NornicDB / 25766380596

12 May 2026 10:38PM UTC coverage: 83.772% (-0.008%) from 83.78%
25766380596

push

github

orneryd
fix(macos, package): fix Macos CI

124603 of 148741 relevant lines covered (83.77%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.77
/pkg/server/server.go
1
// Package server provides a Neo4j-compatible HTTP REST API server for NornicDB.
2
//
3
// This package implements the Neo4j HTTP API specification, making NornicDB compatible
4
// with existing Neo4j tools, drivers, and browsers while adding NornicDB-specific
5
// extensions for memory decay, vector search, and compliance features.
6
//
7
// Neo4j Compatibility:
8
//   - Discovery endpoint (/) returns Neo4j-compatible service information
9
//   - Transaction API (/db/{name}/tx) supports implicit and explicit transactions
10
//   - Cypher query execution with Neo4j response format
11
//   - Basic Auth and Bearer token authentication
12
//   - Error codes follow Neo4j conventions (Neo.ClientError.*)
13
//
14
// NornicDB Extensions:
15
//   - JWT authentication with RBAC
16
//   - Vector search endpoints (/nornicdb/search, /nornicdb/similar)
17
//   - Memory decay information (/nornicdb/decay)
18
//   - GDPR compliance endpoints (/gdpr/export, /gdpr/delete)
19
//   - Admin endpoints (/admin/stats, /admin/config)
20
//   - GPU acceleration control (/admin/gpu/*)
21
//   - HTTP/2 support (always enabled, backwards compatible with HTTP/1.1)
22
//
23
// Example Usage:
24
//
25
//        // Create server
26
//        db, _ := nornicdb.Open("./data", nil)
27
//        auth, _ := auth.NewAuthenticator(auth.DefaultAuthConfig())
28
//        config := server.DefaultConfig()
29
//
30
//        server, err := server.New(db, auth, config)
31
//        if err != nil {
32
//                log.Fatal(err)
33
//        }
34
//
35
//        // Start server
36
//        if err := server.Start(); err != nil {
37
//                log.Fatal(err)
38
//        }
39
//
40
//        // Server listening on server.Addr()
41
//
42
//        // Use with Neo4j Browser
43
//        // Open: http://localhost:7474
44
//        // Connect URI: bolt://localhost:7687 (if Bolt server is running)
45
//        // Or use HTTP: http://localhost:7474/db/nornic/tx/commit
46
//
47
//        // Use with Neo4j drivers
48
//        driver := neo4j.NewDriver("http://localhost:7474", neo4j.BasicAuth("admin", "password"))
49
//        session := driver.NewSession(neo4j.SessionConfig{})
50
//        result, _ := session.Run("MATCH (n) RETURN count(n)", nil)
51
//
52
//        // Graceful shutdown
53
//        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
54
//        defer cancel()
55
//        server.Stop(ctx)
56
//
57
// Authentication:
58
//
59
// The server supports multiple authentication methods:
60
//
61
//  1. **Basic Auth** (Neo4j compatible):
62
//     Authorization: Basic base64(username:password)
63
//
64
//  2. **Bearer Token** (JWT):
65
//     Authorization: Bearer eyJhbGciOiJIUzI1NiIs...
66
//
67
//  3. **Cookie** (browser sessions):
68
//     Cookie: token=eyJhbGciOiJIUzI1NiIs...
69
//
70
//  4. **Query Parameter** (for SSE/WebSocket):
71
//     ?token=eyJhbGciOiJIUzI1NiIs...
72
//
73
// Neo4j HTTP API Endpoints:
74
//
75
//        GET  /                           - Discovery (service information)
76
//        GET  /db/{name}                  - Database information
77
//        POST /db/{name}/tx/commit       - Execute Cypher (implicit transaction)
78
//        POST /db/{name}/tx              - Begin explicit transaction
79
//        POST /db/{name}/tx/{id}         - Execute in transaction
80
//        POST /db/{name}/tx/{id}/commit  - Commit transaction
81
//        DELETE /db/{name}/tx/{id}       - Rollback transaction
82
//
83
// NornicDB Extension Endpoints:
84
//
85
//        Authentication:
86
//          POST /auth/token                - Get JWT token
87
//          POST /auth/logout               - Logout
88
//          GET  /auth/me                   - Current user info
89
//          POST /auth/api-token            - Generate API token (admin)
90
//          GET  /auth/oauth/redirect       - OAuth redirect
91
//          GET  /auth/oauth/callback        - OAuth callback
92
//          GET  /auth/users                 - List users (admin)
93
//          POST /auth/users                 - Create user (admin)
94
//          GET  /auth/users/{username}      - Get user (admin)
95
//          PUT  /auth/users/{username}      - Update user (admin)
96
//          DELETE /auth/users/{username}    - Delete user (admin)
97
//
98
//        Search & Embeddings:
99
//          POST /nornicdb/search           - Hybrid search (vector + BM25)
100
//          POST /nornicdb/similar           - Vector similarity search
101
//          GET  /nornicdb/decay             - Memory decay statistics
102
//          POST /nornicdb/embed/trigger     - Trigger embedding generation
103
//          GET  /nornicdb/embed/stats       - Embedding statistics
104
//          POST /nornicdb/embed/clear       - Clear all embeddings (admin)
105
//          POST /nornicdb/search/rebuild    - Rebuild search indexes
106
//
107
//        Admin & System:
108
//          GET  /admin/stats               - System statistics (admin)
109
//          GET  /admin/config               - Server configuration (admin)
110
//          POST /admin/backup               - Create backup (admin)
111
//          GET  /admin/gpu/status           - GPU status (admin)
112
//          POST /admin/gpu/enable           - Enable GPU (admin)
113
//          POST /admin/gpu/disable          - Disable GPU (admin)
114
//          POST /admin/gpu/test              - Test GPU (admin)
115
//
116
//        GDPR Compliance:
117
//          POST /gdpr/export                - GDPR data export (requires user_id and format in body)
118
//          POST /gdpr/delete                - GDPR erasure request
119
//
120
//        GraphQL & AI:
121
//          POST /graphql                    - GraphQL endpoint
122
//          GET  /graphql/playground         - GraphQL Playground
123
//          POST /mcp                        - MCP server endpoint
124
//          POST /api/bifrost/chat/completions - Heimdall AI chat
125
//
126
// For complete API documentation, see: docs/api-reference/openapi.yaml
127
//
128
// Security Features:
129
//
130
//   - CORS support with configurable origins
131
//   - Request size limits (default 10MB)
132
//   - IP-based rate limiting (configurable per-minute/per-hour limits)
133
//   - Audit logging integration
134
//   - Panic recovery middleware
135
//   - TLS/HTTPS support
136
//
137
// Compliance:
138
//   - GDPR Art.15 (right of access) via /gdpr/export
139
//   - GDPR Art.17 (right to erasure) via /gdpr/delete
140
//   - HIPAA audit logging for all data access
141
//   - SOC2 access controls via RBAC
142
//
143
// ELI12 (Explain Like I'm 12):
144
//
145
// Think of this server like a restaurant:
146
//
147
//  1. **Neo4j compatibility**: We speak the same "language" as Neo4j, so existing
148
//     customers (tools/drivers) can order from our menu without learning new words.
149
//
150
//  2. **Authentication**: Like checking IDs at the door - we make sure you're allowed
151
//     to be here and what you're allowed to do.
152
//
153
//  3. **Endpoints**: Different "counters" for different services - one for regular
154
//     food (Cypher queries), one for special orders (vector search), one for the
155
//     manager's office (admin functions).
156
//
157
//  4. **Middleware**: Like security guards, cashiers, and cleaners who help every
158
//     customer but do different jobs (logging, auth, error handling).
159
//
160
// The server makes sure everyone gets served safely and efficiently!
161
package server
162

163
import (
164
        "context"
165
        "errors"
166
        "fmt"
167
        "io"
168
        "log"
169
        "log/slog"
170
        "net"
171
        "net/http"
172
        "os"
173
        "path/filepath"
174
        "strconv"
175
        "strings"
176
        "sync"
177
        "sync/atomic"
178
        "time"
179

180
        "github.com/prometheus/client_golang/prometheus"
181
        "golang.org/x/net/http2"
182
        "golang.org/x/net/http2/h2c"
183

184
        "github.com/orneryd/nornicdb/pkg/audit"
185
        "github.com/orneryd/nornicdb/pkg/auth"
186
        "github.com/orneryd/nornicdb/pkg/buildinfo"
187
        nornicConfig "github.com/orneryd/nornicdb/pkg/config"
188
        "github.com/orneryd/nornicdb/pkg/config/dbconfig"
189
        "github.com/orneryd/nornicdb/pkg/cypher"
190
        "github.com/orneryd/nornicdb/pkg/embed"
191
        "github.com/orneryd/nornicdb/pkg/envutil"
192
        "github.com/orneryd/nornicdb/pkg/graphql"
193
        "github.com/orneryd/nornicdb/pkg/heimdall"
194
        "github.com/orneryd/nornicdb/pkg/localllm"
195
        "github.com/orneryd/nornicdb/pkg/mcp"
196
        "github.com/orneryd/nornicdb/pkg/multidb"
197
        "github.com/orneryd/nornicdb/pkg/nornicdb"
198
        "github.com/orneryd/nornicdb/pkg/observability"
199
        "github.com/orneryd/nornicdb/pkg/qdrantgrpc"
200
        "github.com/orneryd/nornicdb/pkg/search"
201
        "github.com/orneryd/nornicdb/pkg/storage"
202
        "github.com/orneryd/nornicdb/pkg/txsession"
203
)
204

205
// Errors for HTTP operations.
206
var (
207
        ErrServerClosed       = fmt.Errorf("server closed")
208
        ErrUnauthorized       = fmt.Errorf("unauthorized")
209
        ErrForbidden          = fmt.Errorf("forbidden")
210
        ErrBadRequest         = fmt.Errorf("bad request")
211
        ErrNotFound           = fmt.Errorf("not found")
212
        ErrMethodNotAllowed   = fmt.Errorf("method not allowed")
213
        ErrInternalError      = fmt.Errorf("internal server error")
214
        ErrServiceUnavailable = fmt.Errorf("service unavailable")
215
)
216

217
// embeddingCacheMemoryMB calculates approximate memory usage for embedding cache.
218
// Each cached embedding uses: cacheSize * dimensions * 4 bytes (float32).
219
func embeddingCacheMemoryMB(cacheSize, dimensions int) int {
1✔
220
        return cacheSize * dimensions * 4 / 1024 / 1024
1✔
221
}
1✔
222

223
// waitForDurationOrServerClose sleeps for d and returns true only when the full
224
// duration elapsed. It returns false when the server is closing so background
225
// retry loops can exit promptly during shutdown.
226
func waitForDurationOrServerClose(s *Server, d time.Duration) bool {
1✔
227
        if d <= 0 {
2✔
228
                return true
1✔
229
        }
1✔
230
        if s == nil {
2✔
231
                time.Sleep(d)
1✔
232
                return true
1✔
233
        }
1✔
234

235
        timer := time.NewTimer(d)
1✔
236
        defer timer.Stop()
1✔
237
        poll := time.NewTicker(250 * time.Millisecond)
1✔
238
        defer poll.Stop()
1✔
239

1✔
240
        for {
2✔
241
                if s.closed.Load() {
2✔
242
                        return false
1✔
243
                }
1✔
244
                select {
1✔
245
                case <-timer.C:
1✔
246
                        return true
1✔
247
                case <-poll.C:
1✔
248
                        if s.closed.Load() {
2✔
249
                                return false
1✔
250
                        }
1✔
251
                }
252
        }
253
}
254

255
// buildEmbedConfigFromResolved builds an embed.Config from per-DB effective map and server config fallbacks.
256
// Used by the per-DB embedder registry when EmbedConfigForDB is set.
257
func buildEmbedConfigFromResolved(effective map[string]string, fallback *Config) *embed.Config {
1✔
258
        if fallback == nil {
2✔
259
                return nil
1✔
260
        }
1✔
261
        get := func(key, def string) string {
2✔
262
                if v := effective[key]; v != "" {
2✔
263
                        return strings.TrimSpace(v)
1✔
264
                }
1✔
265
                return def
1✔
266
        }
267
        getInt := func(key string, def int) int {
2✔
268
                if v := effective[key]; v != "" {
2✔
269
                        if i, err := strconv.Atoi(strings.TrimSpace(v)); err == nil {
2✔
270
                                return i
1✔
271
                        }
1✔
272
                }
273
                return def
1✔
274
        }
275
        provider := get("NORNICDB_EMBEDDING_PROVIDER", fallback.EmbeddingProvider)
1✔
276
        if provider == "" {
2✔
277
                provider = "openai"
1✔
278
        }
1✔
279
        model := get("NORNICDB_EMBEDDING_MODEL", fallback.EmbeddingModel)
1✔
280
        apiURL := get("NORNICDB_EMBEDDING_API_URL", fallback.EmbeddingAPIURL)
1✔
281
        apiKey := get("NORNICDB_EMBEDDING_API_KEY", fallback.EmbeddingAPIKey)
1✔
282
        dimensions := getInt("NORNICDB_EMBEDDING_DIMENSIONS", fallback.EmbeddingDimensions)
1✔
283
        if dimensions <= 0 {
2✔
284
                dimensions = 1024
1✔
285
        }
1✔
286
        gpuLayers := getInt("NORNICDB_EMBEDDING_GPU_LAYERS", 0)
1✔
287
        cfg := &embed.Config{
1✔
288
                Provider:   provider,
1✔
289
                APIURL:     apiURL,
1✔
290
                APIKey:     apiKey,
1✔
291
                Model:      model,
1✔
292
                Dimensions: dimensions,
1✔
293
                ModelsDir:  fallback.ModelsDir,
1✔
294
                Timeout:    30 * time.Second,
1✔
295
                GPULayers:  gpuLayers,
1✔
296
        }
1✔
297
        switch provider {
1✔
298
        case "ollama":
1✔
299
                cfg.APIPath = "/api/embeddings"
1✔
300
        case "openai":
1✔
301
                cfg.APIPath = "/v1/embeddings"
1✔
302
        case "local":
1✔
303
                // no APIPath
304
        default:
1✔
305
                cfg.APIPath = "/api/embeddings"
1✔
306
        }
307
        return cfg
1✔
308
}
309

310
// Config holds HTTP server configuration options.
311
//
312
// All settings have sensible defaults via DefaultConfig(). The server follows
313
// Neo4j conventions where applicable (default port 7474, timeouts, etc.).
314
//
315
// Example:
316
//
317
//        // Production configuration
318
//        config := &server.Config{
319
//                Address:           "0.0.0.0",
320
//                Port:              7474,
321
//                ReadTimeout:       30 * time.Second,
322
//                WriteTimeout:      60 * time.Second,
323
//                MaxRequestSize:    50 * 1024 * 1024, // 50MB for large imports
324
//                EnableCORS:        true,
325
//                CORSOrigins:       []string{"https://myapp.com"},
326
//                EnableCompression: true,
327
//                TLSCertFile:       "/etc/ssl/server.crt",
328
//                TLSKeyFile:        "/etc/ssl/server.key",
329
//        }
330
//
331
//        // Development configuration with CORS for local UI
332
//        config = server.DefaultConfig()
333
//        config.Port = 8080
334
//        config.EnableCORS = true
335
//        config.CORSOrigins = []string{"http://localhost:3000"} // Local dev UI only
336
type Config struct {
337
        // Address to bind to (default: "127.0.0.1" - localhost only for security)
338
        // Set to "0.0.0.0" to listen on all interfaces (required for Docker/external access)
339
        Address string
340
        // Port to listen on (default: 7474)
341
        Port int
342
        // ReadTimeout for requests
343
        ReadTimeout time.Duration
344
        // WriteTimeout for responses
345
        WriteTimeout time.Duration
346
        // IdleTimeout for keep-alive connections
347
        IdleTimeout time.Duration
348
        // MaxRequestSize in bytes (default: 10MB)
349
        MaxRequestSize int64
350
        // EnableCORS for cross-origin requests (default: false for security)
351
        EnableCORS bool
352
        // CORSOrigins allowed origins (default: empty - must be explicitly configured)
353
        // WARNING: Never use "*" with credentials - this is a CSRF vulnerability
354
        CORSOrigins []string
355
        // EnableCompression for responses
356
        EnableCompression bool
357

358
        // Rate Limiting Configuration (DoS protection)
359
        // RateLimitEnabled enables IP-based rate limiting (default: true)
360
        RateLimitEnabled bool
361
        // RateLimitPerMinute max requests per IP per minute (default: 100)
362
        RateLimitPerMinute int
363
        // RateLimitPerHour max requests per IP per hour (default: 3000)
364
        RateLimitPerHour int
365
        // RateLimitBurst max burst size for short request spikes (default: 20)
366
        RateLimitBurst int
367
        // TLSCertFile for HTTPS
368
        TLSCertFile string
369
        // TLSKeyFile for HTTPS
370
        TLSKeyFile string
371

372
        // HTTP/2 Configuration
373
        // HTTP/2 is always enabled (backwards compatible with HTTP/1.1)
374
        // HTTP/2 provides multiplexing, header compression, and improved performance
375
        // HTTP/1.1 clients continue to work normally
376
        // HTTP2MaxConcurrentStreams limits the number of concurrent streams per connection (default: 250)
377
        // - 250: Go's internal default, matches standard library behavior (default)
378
        // - 100: Lower memory usage, good for resource-constrained environments
379
        // - 500-1000: High concurrency scenarios, uses more memory per connection
380
        // - Very high values (>1000) are not recommended due to DoS attack risk
381
        HTTP2MaxConcurrentStreams uint32
382

383
        // MCP Configuration (Model Context Protocol)
384
        // MCPEnabled controls whether the MCP server is started (default: true)
385
        // Set to false to disable MCP tools entirely
386
        // Env: NORNICDB_MCP_ENABLED=true|false
387
        MCPEnabled bool
388

389
        // Embedding Configuration (for vector search)
390
        // EmbeddingEnabled turns on automatic embedding generation
391
        EmbeddingEnabled bool
392
        // EmbeddingProvider: "ollama" or "openai" or "local"
393
        EmbeddingProvider string
394
        // EmbeddingAPIURL is the base URL (e.g., http://localhost:11434)
395
        EmbeddingAPIURL string
396
        // EmbeddingModel is the model name (e.g., bge-m3)
397
        EmbeddingModel string
398
        // EmbeddingDimensions is expected vector size (e.g., 1024)
399
        EmbeddingDimensions int
400
        // EmbeddingCacheSize is max embeddings to cache (0 = disabled, default: 10000)
401
        // Each cached embedding uses ~4KB (1024 dims × 4 bytes)
402
        EmbeddingCacheSize int
403
        // EmbeddingAPIKey is the API key for authenticated embedding providers (OpenAI, Cloudflare Workers AI, etc.)
404
        // Env: NORNICDB_EMBEDDING_API_KEY
405
        EmbeddingAPIKey string
406
        // ModelsDir is the directory containing local GGUF models
407
        // Env: NORNICDB_MODELS_DIR (default: ./models)
408
        ModelsDir string
409

410
        // Slow Query Logging Configuration
411
        // SlowQueryEnabled turns on slow query logging (default: true)
412
        SlowQueryEnabled bool
413
        // D-04d: SlowQueryThreshold and SlowQueryLogFile collapsed into
414
        // pkg/config.LoggingConfig (the single source of truth). Threaded into
415
        // the server via the Logging field below; readers go through
416
        // s.config.Logging.SlowQueryThreshold / .SlowQueryLogFile.
417
        //
418
        // Logging carries the runtime LoggingConfig snapshot. Populated by
419
        // cmd/nornicdb/main.go from cfg.Logging at server construction.
420
        Logging nornicConfig.LoggingConfig
421

422
        // Headless Mode Configuration
423
        // Headless disables the web UI and browser-related endpoints
424
        // Set to true for API-only deployments (e.g., embedded use, microservices)
425
        // Env: NORNICDB_HEADLESS=true|false
426
        Headless bool
427

428
        // BasePath for deployment behind a reverse proxy with URL prefix
429
        // Example: "/nornicdb" when deployed at https://example.com/nornicdb/
430
        // Leave empty for root deployment (default)
431
        // Env: NORNICDB_BASE_PATH
432
        BasePath string
433

434
        // Plugins Configuration
435
        // PluginsDir is the directory for APOC/function plugins
436
        // Env: NORNICDB_PLUGINS_DIR
437
        PluginsDir string
438
        // HeimdallPluginsDir is the directory for Heimdall plugins
439
        // Env: NORNICDB_HEIMDALL_PLUGINS_DIR
440
        HeimdallPluginsDir string
441

442
        // Features configuration (passed from main config loading)
443
        // This contains feature flags like HeimdallEnabled loaded from YAML/env
444
        Features *nornicConfig.FeatureFlagsConfig
445

446
        // Debug/Profiling Configuration
447
        // EnablePprof enables /debug/pprof endpoints for performance profiling
448
        // WARNING: Only enable in development/testing environments
449
        // Env: NORNICDB_ENABLE_PPROF=true|false
450
        EnablePprof bool
451

452
        // Logger is the structured-logging entrypoint per Phase 2 D-01.
453
        // If nil, a discard-handler fallback is installed at New() — graceful
454
        // degrade for the transitional period; ctors will be tightened post-M1
455
        // once all consumers are updated to pass an explicit logger via
456
        // observability.Provider.Logger().
457
        Logger *slog.Logger
458
}
459

460
// DefaultConfig returns Neo4j-compatible default server configuration.
461
//
462
// Defaults match Neo4j HTTP server settings:
463
//   - Port 7474 (Neo4j HTTP default)
464
//   - 30s read timeout
465
//   - 60s write timeout
466
//   - 120s idle timeout
467
//   - 10MB max request size
468
//   - CORS enabled for browser compatibility
469
//   - Compression enabled
470
//
471
// Embedding defaults (for MCP vector search):
472
//   - Enabled by default, connects to localhost:11434 (llama.cpp/Ollama)
473
//   - Model: bge-m3 (1024 dimensions)
474
//   - Falls back to text search if embeddings unavailable
475
//
476
// Environment Variables to override embedding config:
477
//
478
//        NORNICDB_EMBEDDING_ENABLED=true|false  - Enable/disable embeddings
479
//        NORNICDB_EMBEDDING_PROVIDER=openai     - API format: "openai" or "ollama"
480
//        NORNICDB_EMBEDDING_URL=http://...      - Embeddings API URL
481
//        NORNICDB_EMBEDDING_MODEL=bge-m3
482
//        NORNICDB_EMBEDDING_DIM=1024            - Vector dimensions
483
//
484
// Example:
485
//
486
//        config := server.DefaultConfig()
487
//        server, err := server.New(db, auth, config)
488
//
489
//        // Or customize
490
//        config = server.DefaultConfig()
491
//        config.Port = 8080
492
//        config.EnableCORS = false
493
//        server, err = server.New(db, auth, config)
494
func DefaultConfig() *Config {
1✔
495
        return &Config{
1✔
496
                // SECURITY: Bind to localhost only by default - prevents external access
1✔
497
                // Set Address to "0.0.0.0" for Docker/container deployments or external access
1✔
498
                Address:        "127.0.0.1",
1✔
499
                Port:           7474,
1✔
500
                ReadTimeout:    30 * time.Second,
1✔
501
                WriteTimeout:   300 * time.Second, // Bifrost agentic loops (many tool calls) can run 1–2 min; avoid closing stream early
1✔
502
                IdleTimeout:    120 * time.Second,
1✔
503
                MaxRequestSize: 10 * 1024 * 1024, // 10MB
1✔
504
                // CORS enabled by default for ease of use (allows all origins)
1✔
505
                // Override via: NORNICDB_CORS_ENABLED=false or NORNICDB_CORS_ORIGINS=https://myapp.com
1✔
506
                // WARNING: "*" allows any origin - configure specific origins for production
1✔
507
                EnableCORS:        true,
1✔
508
                CORSOrigins:       []string{"*"}, // Allow all origins by default
1✔
509
                EnableCompression: true,
1✔
510

1✔
511
                // Rate limiting enabled by default to prevent DoS attacks
1✔
512
                // High limits for high-performance local/development use
1✔
513
                RateLimitEnabled:   false,
1✔
514
                RateLimitPerMinute: 10000,  // 10,000 requests/minute per IP (166/sec)
1✔
515
                RateLimitPerHour:   100000, // 100,000 requests/hour per IP
1✔
516
                RateLimitBurst:     1000,   // Allow large bursts for batch operations
1✔
517

1✔
518
                // MCP server enabled by default
1✔
519
                // Override: NORNICDB_MCP_ENABLED=false
1✔
520
                MCPEnabled: true,
1✔
521

1✔
522
                // Embedding defaults - connects to local llama.cpp/Ollama server
1✔
523
                // Override via environment variables:
1✔
524
                //   NORNICDB_EMBEDDING_ENABLED=false     - Disable embeddings entirely
1✔
525
                //   NORNICDB_EMBEDDING_PROVIDER=ollama   - Use "ollama" or "openai" format
1✔
526
                //   NORNICDB_EMBEDDING_URL=http://...    - Embeddings API URL
1✔
527
                //   NORNICDB_EMBEDDING_MODEL=...         - Model name
1✔
528
                //   NORNICDB_EMBEDDING_DIM=1024          - Vector dimensions
1✔
529
                EmbeddingEnabled:    true,
1✔
530
                EmbeddingProvider:   "ollama", // default URL targets Ollama (port 11434)
1✔
531
                EmbeddingAPIURL:     "http://localhost:11434",
1✔
532
                EmbeddingModel:      "bge-m3",
1✔
533
                EmbeddingDimensions: 1024,
1✔
534
                EmbeddingCacheSize:  10000, // ~40MB cache for 1024-dim vectors
1✔
535

1✔
536
                // Slow query logging enabled by default
1✔
537
                // Override via:
1✔
538
                //   NORNICDB_SLOW_QUERY_ENABLED=false
1✔
539
                //   NORNICDB_SLOW_QUERY_THRESHOLD=200ms
1✔
540
                //   NORNICDB_SLOW_QUERY_LOG=/var/log/nornicdb/slow.log
1✔
541
                // D-04d: Threshold + LogFile defaults now live in
1✔
542
                // pkg/config.DefaultConfig().Logging (the single source of truth);
1✔
543
                // callers populate Logging from cfg.Logging.
1✔
544
                SlowQueryEnabled: false,
1✔
545
                Logging:          nornicConfig.LoggingConfig{SlowQueryThreshold: 100 * time.Millisecond},
1✔
546

1✔
547
                // Headless mode disabled by default (UI enabled)
1✔
548
                // Override via:
1✔
549
                //   NORNICDB_HEADLESS=true
1✔
550
                //   --headless flag
1✔
551
                Headless: false,
1✔
552

1✔
553
                // Pprof disabled by default (security: profiling endpoints expose internals)
1✔
554
                // Override via:
1✔
555
                //   NORNICDB_ENABLE_PPROF=true
1✔
556
                EnablePprof: envutil.GetBoolStrict("NORNICDB_ENABLE_PPROF", false),
1✔
557

1✔
558
                // HTTP/2 always enabled (backwards compatible with HTTP/1.1)
1✔
559
                // MaxConcurrentStreams: 250 matches Go's internal default
1✔
560
                // - Matches standard library http2.Server default (250)
1✔
561
                // - Good balance between performance and memory usage
1✔
562
                // - Can be reduced to 100 for lower-memory environments
1✔
563
                // - Can be increased to 500+ for high-concurrency scenarios
1✔
564
                HTTP2MaxConcurrentStreams: 250,
1✔
565
        }
1✔
566
}
1✔
567

568
// Server is the HTTP API server providing Neo4j-compatible endpoints.
569
//
570
// The server is thread-safe and handles concurrent requests. It maintains
571
// metrics, supports graceful shutdown, and integrates with audit logging.
572
//
573
// Lifecycle:
574
//  1. Create with New()
575
//  2. Optionally set audit logger with SetAuditLogger()
576
//  3. Start with Start()
577
//  4. Handle requests automatically
578
//  5. Stop with Stop() for graceful shutdown
579
//
580
// Example:
581
//
582
//        server := server.New(db, auth, config)
583
//
584
//        // Set up audit logging
585
//        auditLogger, _ := audit.NewLogger(audit.DefaultConfig())
586
//        server.SetAuditLogger(auditLogger)
587
//
588
//        // Start server
589
//        if err := server.Start(); err != nil {
590
//                log.Fatal(err)
591
//        }
592
//
593
//        // Server is now handling requests
594
//        // (Listening on server.Addr())
595
//
596
//        // Get metrics
597
//        stats := server.Stats()
598
//        // stats.RequestCount, stats.ErrorCount expose request/error counts
599
//
600
//        // Graceful shutdown
601
//        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
602
//        defer cancel()
603
//        server.Stop(ctx)
604
type Server struct {
605
        config    *Config
606
        db        *nornicdb.DB
607
        dbManager *multidb.DatabaseManager // Manages multiple databases
608
        auth      *auth.Authenticator
609
        audit     *audit.Logger
610

611
        // log is the structured logger for operational events (Phase 2 D-01).
612
        // Tagged .With("component", "server") at construction so every record
613
        // carries component attribution. NEVER nil after New() returns
614
        // (discard-fallback handler installed when cfg.Logger == nil).
615
        log *slog.Logger
616

617
        // MCP server for LLM tool interface
618
        mcpServer *mcp.Server
619

620
        // Heimdall - AI assistant for database management
621
        heimdallHandler *heimdall.Handler
622

623
        // GraphQL handler for GraphQL API
624
        graphqlHandler *graphql.Handler
625

626
        // Qdrant-compatible gRPC server (optional; feature-flagged).
627
        qdrantGRPCServer      *qdrantgrpc.Server
628
        qdrantCollectionStore qdrantgrpc.CollectionStore
629

630
        httpServer *http.Server
631
        listener   net.Listener
632

633
        // httpMetrics is the Plan-04-02 HTTP catalog bag (D-02 typed handle DI).
634
        // Populated by SetHTTPMetrics(...) AFTER observability.New runs in
635
        // cmd/nornicdb/main.go (Phase 2 D-08 two-phase bootstrap: server is
636
        // constructed BEFORE obs to keep the existing logger plumbing; metrics
637
        // bag is injected post-hoc and applied at Start() time when the http
638
        // handler is wrapped). Nil-safe: instrumentedMux is a pass-through
639
        // when nil, so test fixtures and pre-Phase-4 callers compile unchanged.
640
        httpMetrics *observability.HTTPMetrics
641

642
        // obsRegistry is the unified pkg/observability *prometheus.Registry,
643
        // injected post-construction via SetObsRegistry from cmd/nornicdb/main.go.
644
        // Used by handleMetrics (Phase 5 / Plan 05-04) to call
645
        // observability.RenderLegacy and produce the legacy :7474/metrics body
646
        // from the same registry that backs :9090/metrics — eliminating the
647
        // pre-Phase-5 hand-built second source of truth (ROADMAP SC #1).
648
        // Nil-safe: handleMetrics tolerates nil (RenderLegacy returns empty bytes).
649
        obsRegistry *prometheus.Registry
650

651
        // Rate limiter for DoS protection
652
        rateLimiter *IPRateLimiter
653

654
        // OAuth manager for OAuth 2.0 authentication
655
        oauthManager *auth.OAuthManager
656

657
        // Cache for Basic auth results to avoid bcrypt+JWT work on every request.
658
        // This materially improves throughput for Neo4j-compatible clients that
659
        // send Basic auth on each request.
660
        basicAuthCache *auth.BasicAuthCache
661

662
        mu      sync.RWMutex
663
        closed  atomic.Bool
664
        started time.Time
665

666
        // Metrics
667
        requestCount   atomic.Int64
668
        errorCount     atomic.Int64
669
        activeRequests atomic.Int64
670
        activeTxReqs   atomic.Int64
671

672
        // Slow query logging
673
        slowQueryLogger *log.Logger
674
        slowQueryCount  atomic.Int64
675

676
        // Cached search services per database (namespace-aware indexes)
677
        searchServicesMu sync.RWMutex
678
        searchServices   map[string]*search.Service
679

680
        // Cached Cypher executors per database (thread-safe, reusable)
681
        executorsMu sync.RWMutex
682
        executors   map[string]*cypher.StorageExecutor
683

684
        // Explicit transaction sessions shared across transports.
685
        txSessions *txsession.Manager
686

687
        // Per-database access control (Neo4j-aligned). When auth disabled, Full is used.
688
        // When auth enabled, allowlistStore (if set) provides allowlist-based mode per principal.
689
        databaseAccessMode    auth.DatabaseAccessMode
690
        allowlistStore        *auth.AllowlistStore        // loaded from system DB when auth enabled
691
        roleStore             *auth.RoleStore             // user-defined roles when auth enabled
692
        privilegesStore       *auth.PrivilegesStore       // per-DB read/write (Phase 4) when auth enabled
693
        roleEntitlementsStore *auth.RoleEntitlementsStore // per-role global entitlements when auth enabled
694
        dbConfigStore         *dbconfig.Store             // per-DB config overrides (embedding, search, etc.)
695
}
696

697
// ensureSearchBuildStartedForKnownDatabases reconciles search-service startup for
698
// databases known to DatabaseManager, including metadata-only empty databases.
699
// It is safe to call repeatedly; per-db build start is idempotent.
700
func (s *Server) ensureSearchBuildStartedForKnownDatabases() {
1✔
701
        if s == nil || s.db == nil || s.dbManager == nil {
2✔
702
                return
1✔
703
        }
1✔
704
        for _, info := range s.dbManager.ListDatabases() {
2✔
705
                if info == nil || info.Name == "" || info.Name == "system" {
2✔
706
                        continue
1✔
707
                }
708
                if s.dbManager.IsCompositeDatabase(info.Name) {
2✔
709
                        continue
1✔
710
                }
711
                status := s.db.GetDatabaseSearchStatus(info.Name)
1✔
712
                if status.Initialized {
2✔
713
                        continue
1✔
714
                }
715
                storageEngine, err := s.dbManager.GetStorage(info.Name)
1✔
716
                if err != nil {
1✔
717
                        s.log.Warn("startup search reconcile: storage unavailable", "subsystem", "search", "db", info.Name, "error", err)
×
718
                        continue
×
719
                }
720
                if _, err := s.db.EnsureSearchIndexesBuildStarted(info.Name, storageEngine); err != nil {
1✔
721
                        s.log.Warn("startup search reconcile failed", "subsystem", "search", "db", info.Name, "error", err)
×
722
                }
×
723
        }
724
}
725

726
// mcpToolRunnerAdapter adapts pkg/mcp.Server to heimdall.InMemoryToolRunner so the agentic loop
727
// can expose store, recall, discover, link, task, tasks to the LLM and execute them in process.
728
// allowlist: nil = all tools, empty = no tools, non-empty = only those names.
729
type mcpToolRunnerAdapter struct {
730
        s         *mcp.Server
731
        allowlist []string
732
}
733

734
func (a *mcpToolRunnerAdapter) allowedNames() []string {
1✔
735
        if a.allowlist == nil {
2✔
736
                return mcp.AllTools()
1✔
737
        }
1✔
738
        return a.allowlist
1✔
739
}
740

741
func (a *mcpToolRunnerAdapter) allow(name string) bool {
1✔
742
        allowed := a.allowedNames()
1✔
743
        for _, n := range allowed {
2✔
744
                if n == name {
2✔
745
                        return true
1✔
746
                }
1✔
747
        }
748
        return false
1✔
749
}
750

751
func (a *mcpToolRunnerAdapter) ToolDefinitions() []heimdall.MCPTool {
1✔
752
        defs := a.s.ToolDefinitions()
1✔
753
        allowed := a.allowedNames()
1✔
754
        if len(allowed) == 0 {
2✔
755
                return nil
1✔
756
        }
1✔
757
        allowSet := make(map[string]struct{}, len(allowed))
1✔
758
        for _, n := range allowed {
2✔
759
                allowSet[n] = struct{}{}
1✔
760
        }
1✔
761
        var out []heimdall.MCPTool
1✔
762
        for _, t := range defs {
2✔
763
                if _, ok := allowSet[t.Name]; ok {
2✔
764
                        out = append(out, heimdall.MCPTool{Name: t.Name, Description: t.Description, InputSchema: t.InputSchema})
1✔
765
                }
1✔
766
        }
767
        return out
1✔
768
}
769

770
func (a *mcpToolRunnerAdapter) ToolNames() []string {
1✔
771
        return a.allowedNames()
1✔
772
}
1✔
773

774
func (a *mcpToolRunnerAdapter) CallTool(ctx context.Context, name string, args map[string]interface{}, dbName string) (interface{}, error) {
1✔
775
        if !a.allow(name) {
2✔
776
                return nil, fmt.Errorf("tool %q is not in the MCP allowlist", name)
1✔
777
        }
1✔
778
        // Ensure we always pass a concrete database so MCP uses DatabaseScopedExecutor when set.
779
        // Empty dbName would cause MCP to fall back to s.db, which can diverge from the default DB in multi-db setups.
780
        if dbName == "" {
2✔
781
                dbName = a.s.DefaultDatabaseName()
1✔
782
        }
1✔
783
        ctx = mcp.ContextWithDatabase(ctx, dbName)
1✔
784
        return a.s.CallTool(ctx, name, args)
1✔
785
}
786

787
// mcpDatabaseScopedExecutor returns a callback that provides a Cypher executor and node getter for the given database.
788
// Used so MCP tools (store, recall, link, etc.) run against the request's database when invoked from the agentic loop.
789
func (s *Server) mcpDatabaseScopedExecutor() func(dbName string) (*cypher.StorageExecutor, func(context.Context, string) (*nornicdb.Node, error), error) {
1✔
790
        return func(dbName string) (*cypher.StorageExecutor, func(context.Context, string) (*nornicdb.Node, error), error) {
2✔
791
                exec, err := s.getExecutorForDatabase(dbName)
1✔
792
                if err != nil {
2✔
793
                        return nil, nil, err
1✔
794
                }
1✔
795
                getNode := func(ctx context.Context, id string) (*nornicdb.Node, error) {
2✔
796
                        result, err := exec.Execute(ctx, "MATCH (n) WHERE elementId(n) = $id RETURN n", map[string]interface{}{"id": id})
1✔
797
                        if err != nil {
1✔
798
                                return nil, err
×
799
                        }
×
800
                        if len(result.Rows) == 0 || len(result.Rows[0]) == 0 {
2✔
801
                                return nil, nornicdb.ErrNotFound
1✔
802
                        }
1✔
803
                        v := result.Rows[0][0]
1✔
804
                        if snode, ok := v.(*storage.Node); ok {
2✔
805
                                props := make(map[string]interface{}, len(snode.Properties))
1✔
806
                                for k, val := range snode.Properties {
2✔
807
                                        props[k] = val
1✔
808
                                }
1✔
809
                                return &nornicdb.Node{
1✔
810
                                        ID:         string(snode.ID),
1✔
811
                                        Labels:     snode.Labels,
1✔
812
                                        Properties: props,
1✔
813
                                        CreatedAt:  snode.CreatedAt,
1✔
814
                                }, nil
1✔
815
                        }
816
                        return nil, nornicdb.ErrNotFound
×
817
                }
818
                return exec, getNode, nil
1✔
819
        }
820
}
821

822
// IPRateLimiter provides IP-based rate limiting to prevent DoS attacks.
823
type IPRateLimiter struct {
824
        mu              sync.RWMutex
825
        counters        map[string]*ipRateLimitCounter
826
        perMinute       int
827
        perHour         int
828
        burst           int
829
        cleanupInterval time.Duration
830
        stopCleanup     chan struct{}
831
}
832

833
type ipRateLimitCounter struct {
834
        mu          sync.Mutex
835
        minuteCount int
836
        hourCount   int
837
        minuteReset time.Time
838
        hourReset   time.Time
839
}
840

841
// NewIPRateLimiter creates a new IP-based rate limiter.
842
func NewIPRateLimiter(perMinute, perHour, burst int) *IPRateLimiter {
1✔
843
        rl := &IPRateLimiter{
1✔
844
                counters:        make(map[string]*ipRateLimitCounter),
1✔
845
                perMinute:       perMinute,
1✔
846
                perHour:         perHour,
1✔
847
                burst:           burst,
1✔
848
                cleanupInterval: 10 * time.Minute,
1✔
849
                stopCleanup:     make(chan struct{}),
1✔
850
        }
1✔
851
        // Start background cleanup of stale entries
1✔
852
        go rl.cleanupLoop()
1✔
853
        return rl
1✔
854
}
1✔
855

856
// Allow checks if a request from the given IP is allowed.
857
func (rl *IPRateLimiter) Allow(ip string) bool {
1✔
858
        rl.mu.Lock()
1✔
859
        counter, exists := rl.counters[ip]
1✔
860
        if !exists {
2✔
861
                counter = &ipRateLimitCounter{
1✔
862
                        minuteReset: time.Now().Add(time.Minute),
1✔
863
                        hourReset:   time.Now().Add(time.Hour),
1✔
864
                }
1✔
865
                rl.counters[ip] = counter
1✔
866
        }
1✔
867
        rl.mu.Unlock()
1✔
868

1✔
869
        counter.mu.Lock()
1✔
870
        defer counter.mu.Unlock()
1✔
871

1✔
872
        now := time.Now()
1✔
873

1✔
874
        // Reset minute counter if needed
1✔
875
        if now.After(counter.minuteReset) {
2✔
876
                counter.minuteCount = 0
1✔
877
                counter.minuteReset = now.Add(time.Minute)
1✔
878
        }
1✔
879

880
        // Reset hour counter if needed
881
        if now.After(counter.hourReset) {
2✔
882
                counter.hourCount = 0
1✔
883
                counter.hourReset = now.Add(time.Hour)
1✔
884
        }
1✔
885

886
        // Check limits
887
        if counter.minuteCount >= rl.perMinute {
2✔
888
                return false
1✔
889
        }
1✔
890
        if counter.hourCount >= rl.perHour {
1✔
891
                return false
×
892
        }
×
893

894
        // Increment counters
895
        counter.minuteCount++
1✔
896
        counter.hourCount++
1✔
897

1✔
898
        return true
1✔
899
}
900

901
// cleanupLoop periodically removes stale IP entries to prevent memory leaks.
902
func (rl *IPRateLimiter) cleanupLoop() {
1✔
903
        ticker := time.NewTicker(rl.cleanupInterval)
1✔
904
        defer ticker.Stop()
1✔
905

1✔
906
        for {
2✔
907
                select {
1✔
908
                case <-ticker.C:
1✔
909
                        rl.cleanup()
1✔
910
                case <-rl.stopCleanup:
1✔
911
                        return
1✔
912
                }
913
        }
914
}
915

916
func (rl *IPRateLimiter) cleanup() {
1✔
917
        rl.mu.Lock()
1✔
918
        defer rl.mu.Unlock()
1✔
919

1✔
920
        now := time.Now()
1✔
921
        for ip, counter := range rl.counters {
2✔
922
                counter.mu.Lock()
1✔
923
                // Remove if both counters have been reset (inactive for >1 hour)
1✔
924
                if now.After(counter.hourReset) {
2✔
925
                        delete(rl.counters, ip)
1✔
926
                }
1✔
927
                counter.mu.Unlock()
1✔
928
        }
929
}
930

931
// Stop stops the cleanup goroutine.
932
func (rl *IPRateLimiter) Stop() {
1✔
933
        close(rl.stopCleanup)
1✔
934
}
1✔
935

936
// New creates a new HTTP server with the given database, authenticator, and configuration.
937
//
938
// The server is created but not started. Call Start() to begin accepting connections.
939
//
940
// Parameters:
941
//   - db: NornicDB database instance (required)
942
//   - authenticator: Authentication handler (can be nil to disable auth)
943
//   - config: Server configuration (uses DefaultConfig() if nil)
944
//
945
// Returns:
946
//   - Server instance ready to start
947
//   - Error if database is nil or configuration is invalid
948
//
949
// Example:
950
//
951
//        // With authentication
952
//        db, _ := nornicdb.Open("./data", nil)
953
//        auth, _ := auth.NewAuthenticator(auth.DefaultAuthConfig())
954
//        server, err := server.New(db, auth, nil) // Uses default config
955
//
956
//        // Without authentication (development)
957
//        server, err = server.New(db, nil, nil)
958
//
959
//        // Custom configuration
960
//        config := &server.Config{
961
//                Port: 8080,
962
//                EnableCORS: false,
963
//        }
964
//        server, err = server.New(db, auth, config)
965
func New(db *nornicdb.DB, authenticator *auth.Authenticator, config *Config) (*Server, error) {
1✔
966
        if config == nil {
2✔
967
                config = DefaultConfig()
1✔
968
        }
1✔
969
        if db == nil {
2✔
970
                return nil, fmt.Errorf("database required")
1✔
971
        }
1✔
972
        // Phase 2 D-01a: graceful-degrade discard fallback when caller did not
973
        // thread observability.Provider.Logger() through Config.Logger. Keeps
974
        // existing tests/callers compileable; tightens post-M1 once all
975
        // consumers wire the logger explicitly.
976
        if config.Logger == nil {
2✔
977
                config.Logger = slog.New(slog.NewTextHandler(io.Discard, nil))
1✔
978
        }
1✔
979

980
        // Note: GPU status is logged in main.go during GPU manager initialization
981
        // This avoids duplicate logs and provides more detailed information
982

983
        // Load environment-backed global config once (used for multi-db + feature defaults).
984
        globalConfig := nornicConfig.LoadFromEnv()
1✔
985

1✔
986
        // Create MCP server for LLM tool interface (if enabled)
1✔
987
        var mcpServer *mcp.Server
1✔
988
        if config.MCPEnabled {
2✔
989
                mcpConfig := mcp.DefaultServerConfig()
1✔
990
                mcpConfig.EmbeddingEnabled = config.EmbeddingEnabled
1✔
991
                mcpConfig.EmbeddingModel = config.EmbeddingModel
1✔
992
                mcpConfig.EmbeddingDimensions = config.EmbeddingDimensions
1✔
993
                mcpConfig.DefaultNodeLabel = globalConfig.Memory.DefaultNodeLabel
1✔
994
                mcpServer = mcp.NewServer(db, mcpConfig)
1✔
995
        } else {
2✔
996
                config.Logger.With("component", "server").Info("mcp server disabled via configuration")
1✔
997
        }
1✔
998

999
        // Initialize DatabaseManager for multi-database support.
1000
        // IMPORTANT: This must happen before Heimdall/GraphQL so they can route per database.
1001
        //
1002
        // Get the base storage engine from the DB (unwraps the namespaced storage).
1003
        // DatabaseManager will create NamespacedEngines for each logical database.
1004
        storageEngine := db.GetBaseStorageForManager()
1✔
1005
        remoteCredentialEncryptionKey := ""
1✔
1006
        switch {
1✔
1007
        case strings.TrimSpace(os.Getenv("NORNICDB_REMOTE_CREDENTIALS_KEY")) != "":
1✔
1008
                remoteCredentialEncryptionKey = strings.TrimSpace(os.Getenv("NORNICDB_REMOTE_CREDENTIALS_KEY"))
1✔
1009
        case strings.TrimSpace(globalConfig.Database.EncryptionPassword) != "":
×
1010
                remoteCredentialEncryptionKey = strings.TrimSpace(globalConfig.Database.EncryptionPassword)
×
1011
                config.Logger.With("component", "server").Warn("remote credential encryption key fallback in use",
×
1012
                        "fallback", "database_encryption_password",
×
1013
                        "remediation", "set NORNICDB_REMOTE_CREDENTIALS_KEY for key separation")
×
1014
        case strings.TrimSpace(globalConfig.Auth.JWTSecret) != "":
1✔
1015
                remoteCredentialEncryptionKey = strings.TrimSpace(globalConfig.Auth.JWTSecret)
1✔
1016
                config.Logger.With("component", "server").Warn("remote credential encryption key fallback in use",
1✔
1017
                        "fallback", "jwt_signing_secret",
1✔
1018
                        "remediation", "set NORNICDB_REMOTE_CREDENTIALS_KEY for stronger key separation")
1✔
1019
        }
1020
        multiDBConfig := &multidb.Config{
1✔
1021
                DefaultDatabase:               globalConfig.Database.DefaultDatabase,
1✔
1022
                SystemDatabase:                "system",
1✔
1023
                MaxDatabases:                  0, // Unlimited
1✔
1024
                AllowDropDefault:              false,
1✔
1025
                RemoteCredentialEncryptionKey: remoteCredentialEncryptionKey,
1✔
1026
                RemoteEngineFactory: func(ref multidb.ConstituentRef, authToken string) (storage.Engine, error) {
2✔
1027
                        useUserPassword := strings.EqualFold(strings.TrimSpace(ref.AuthMode), "user_password")
1✔
1028
                        cfg := storage.RemoteEngineConfig{
1✔
1029
                                URI:       ref.URI,
1✔
1030
                                Database:  ref.DatabaseName,
1✔
1031
                                AuthToken: authToken,
1✔
1032
                        }
1✔
1033
                        if useUserPassword {
2✔
1034
                                cfg.User = ref.User
1✔
1035
                                cfg.Password = ref.Password
1✔
1036
                        }
1✔
1037
                        return storage.NewRemoteEngine(cfg)
1✔
1038
                },
1039
        }
1040
        dbManager, err := multidb.NewDatabaseManager(storageEngine, multiDBConfig)
1✔
1041
        if err != nil {
1✔
1042
                return nil, fmt.Errorf("failed to initialize database manager: %w", err)
×
1043
        }
×
1044

1045
        s := &Server{
1✔
1046
                config:         config,
1✔
1047
                db:             db,
1✔
1048
                dbManager:      dbManager,
1✔
1049
                auth:           authenticator,
1✔
1050
                log:            config.Logger.With("component", "server"),
1✔
1051
                mcpServer:      mcpServer,
1✔
1052
                graphqlHandler: graphql.NewHandler(db, dbManager),
1✔
1053
                basicAuthCache: auth.NewBasicAuthCache(auth.DefaultAuthCacheEntries, auth.DefaultAuthCacheTTL),
1✔
1054
                searchServices: make(map[string]*search.Service),
1✔
1055
                executors:      make(map[string]*cypher.StorageExecutor),
1✔
1056
        }
1✔
1057
        // Foreground-first policy: while tx requests are active, background embed work yields.
1✔
1058
        s.db.SetEmbedQueueShouldYield(func() bool {
1✔
1059
                return s.activeTxReqs.Load() > 0
×
1060
        })
×
1061
        s.txSessions = txsession.NewManager(30*time.Second, s.newExecutorForDatabase)
1✔
1062
        s.txSessions.SetTerminalErrorObserver(func(session *txsession.Session, err error) {
2✔
1063
                s.logMVCCSnapshotExpiration(session, err)
1✔
1064
        })
1✔
1065

1066
        // ==========================================================================
1067
        // Heimdall - AI Assistant for Database Management
1068
        // ==========================================================================
1069
        // Use features config passed from main.go (which loads from YAML + env)
1070
        // Fall back to LoadFromEnv() if not provided (for backwards compatibility)
1071
        var featuresConfig *nornicConfig.FeatureFlagsConfig
1✔
1072
        if config.Features != nil {
2✔
1073
                featuresConfig = config.Features
1✔
1074
        } else {
2✔
1075
                featuresConfig = &globalConfig.Features
1✔
1076
                config.Features = featuresConfig
1✔
1077
        }
1✔
1078
        var rerankerResolverMu sync.RWMutex
1✔
1079
        var globalRerankerResolver func(string) search.Reranker
1✔
1080
        perDBRerankerCache := make(map[string]search.Reranker)
1✔
1081
        setGlobalRerankerResolver := func(fn func(string) search.Reranker) {
2✔
1082
                rerankerResolverMu.Lock()
1✔
1083
                globalRerankerResolver = fn
1✔
1084
                rerankerResolverMu.Unlock()
1✔
1085
        }
1✔
1086
        getGlobalRerankerResolver := func() func(string) search.Reranker {
1✔
1087
                rerankerResolverMu.RLock()
×
1088
                defer rerankerResolverMu.RUnlock()
×
1089
                return globalRerankerResolver
×
1090
        }
×
1091
        resolveDBRerankConfig := func(dbName string) (enabled bool, provider, model, apiURL, apiKey string) {
2✔
1092
                enabled = featuresConfig.SearchRerankEnabled
1✔
1093
                provider = strings.TrimSpace(strings.ToLower(featuresConfig.SearchRerankProvider))
1✔
1094
                model = featuresConfig.SearchRerankModel
1✔
1095
                apiURL = strings.TrimSpace(featuresConfig.SearchRerankAPIURL)
1✔
1096
                apiKey = featuresConfig.SearchRerankAPIKey
1✔
1097
                if provider == "" {
2✔
1098
                        provider = "local"
1✔
1099
                }
1✔
1100
                if provider == "ollama" && apiURL == "" {
1✔
1101
                        apiURL = "http://localhost:11434/rerank"
×
1102
                }
×
1103
                if s.dbConfigStore == nil {
2✔
1104
                        return enabled, provider, model, apiURL, apiKey
1✔
1105
                }
1✔
1106
                overrides := s.dbConfigStore.GetOverrides(dbName)
1✔
1107
                resolved := dbconfig.Resolve(globalConfig, overrides)
1✔
1108
                if resolved == nil || resolved.Effective == nil {
1✔
1109
                        return enabled, provider, model, apiURL, apiKey
×
1110
                }
×
1111
                eff := resolved.Effective
1✔
1112
                if raw := strings.TrimSpace(strings.ToLower(eff["NORNICDB_SEARCH_RERANK_ENABLED"])); raw != "" {
2✔
1113
                        switch raw {
1✔
1114
                        case "1", "true", "yes", "on":
×
1115
                                enabled = true
×
1116
                        case "0", "false", "no", "off":
1✔
1117
                                enabled = false
1✔
1118
                        }
1119
                }
1120
                if v := strings.TrimSpace(strings.ToLower(eff["NORNICDB_SEARCH_RERANK_PROVIDER"])); v != "" {
2✔
1121
                        provider = v
1✔
1122
                }
1✔
1123
                if v := eff["NORNICDB_SEARCH_RERANK_MODEL"]; strings.TrimSpace(v) != "" {
2✔
1124
                        model = strings.TrimSpace(v)
1✔
1125
                }
1✔
1126
                if v := eff["NORNICDB_SEARCH_RERANK_API_URL"]; strings.TrimSpace(v) != "" {
1✔
1127
                        apiURL = strings.TrimSpace(v)
×
1128
                }
×
1129
                if v := eff["NORNICDB_SEARCH_RERANK_API_KEY"]; strings.TrimSpace(v) != "" {
1✔
1130
                        apiKey = v
×
1131
                }
×
1132
                if provider == "ollama" && apiURL == "" {
1✔
1133
                        apiURL = "http://localhost:11434/rerank"
×
1134
                }
×
1135
                return enabled, provider, model, apiURL, apiKey
1✔
1136
        }
1137
        getOrCreateExternalReranker := func(provider, model, apiURL, apiKey string) search.Reranker {
1✔
1138
                key := strings.Join([]string{provider, model, apiURL, apiKey}, "|")
×
1139
                rerankerResolverMu.RLock()
×
1140
                if cached, ok := perDBRerankerCache[key]; ok {
×
1141
                        rerankerResolverMu.RUnlock()
×
1142
                        return cached
×
1143
                }
×
1144
                rerankerResolverMu.RUnlock()
×
1145
                if apiURL == "" {
×
1146
                        return nil
×
1147
                }
×
1148
                ceConfig := &search.CrossEncoderConfig{
×
1149
                        Enabled:  true,
×
1150
                        APIURL:   apiURL,
×
1151
                        APIKey:   apiKey,
×
1152
                        Model:    model,
×
1153
                        TopK:     100,
×
1154
                        Timeout:  30 * time.Second,
×
1155
                        MinScore: 0.0,
×
1156
                }
×
1157
                if ceConfig.Model == "" && provider == "ollama" {
×
1158
                        ceConfig.Model = "reranker"
×
1159
                }
×
1160
                ce := search.NewCrossEncoder(ceConfig)
×
1161
                rerankerResolverMu.Lock()
×
1162
                perDBRerankerCache[key] = ce
×
1163
                rerankerResolverMu.Unlock()
×
1164
                return ce
×
1165
        }
1166
        // Install per-DB reranker resolver. It respects DB overrides and falls back to global resolver.
1167
        db.SetRerankerResolver(func(dbName string) search.Reranker {
2✔
1168
                enabled, provider, model, apiURL, apiKey := resolveDBRerankConfig(dbName)
1✔
1169
                if !enabled {
2✔
1170
                        return nil
1✔
1171
                }
1✔
1172
                if provider == "local" {
×
1173
                        if resolver := getGlobalRerankerResolver(); resolver != nil {
×
1174
                                return resolver(dbName)
×
1175
                        }
×
1176
                        return nil
×
1177
                }
1178
                if r := getOrCreateExternalReranker(provider, model, apiURL, apiKey); r != nil {
×
1179
                        return r
×
1180
                }
×
1181
                if resolver := getGlobalRerankerResolver(); resolver != nil {
×
1182
                        return resolver(dbName)
×
1183
                }
×
1184
                return nil
×
1185
        })
1186
        if featuresConfig.HeimdallEnabled {
2✔
1187
                // Derive a child logger once at goroutine entry so every record
1✔
1188
                // carries subsystem=heimdall (Phase 2 D-10a).
1✔
1189
                heimdallLog := s.log.With("subsystem", "heimdall")
1✔
1190
                heimdallLog.Info("heimdall AI assistant initializing asynchronously")
1✔
1191
                go func() {
2✔
1192
                        // Configure token budget from environment variables
1✔
1193
                        heimdall.SetTokenBudget(featuresConfig)
1✔
1194
                        heimdallCfg := heimdall.ConfigFromFeatureFlags(featuresConfig)
1✔
1195
                        // Log resolved provider so users can verify env overrides (openai/ollama/local)
1✔
1196
                        provider := strings.TrimSpace(strings.ToLower(heimdallCfg.Provider))
1✔
1197
                        if provider == "" {
1✔
1198
                                provider = "local"
×
1199
                        }
×
1200
                        heimdallLog.Info("heimdall provider resolved",
1✔
1201
                                "provider", provider,
1✔
1202
                                "override_env", "NORNICDB_HEIMDALL_PROVIDER")
1✔
1203
                        manager, err := heimdall.NewManager(heimdallCfg)
1✔
1204
                        if err != nil {
1✔
1205
                                heimdallLog.Warn("heimdall initialization failed",
×
1206
                                        "error", err,
×
1207
                                        "remediation", "check NORNICDB_HEIMDALL_MODEL and NORNICDB_MODELS_DIR")
×
1208
                                return
×
1209
                        }
×
1210
                        if baseExec := db.GetCypherExecutor(); baseExec != nil {
2✔
1211
                                baseExec.SetInferenceManager(manager)
1✔
1212
                        }
1✔
1213

1214
                        // Create database router wrapper for Heimdall (multi-db aware)
1215
                        dbRouter := newHeimdallDBRouter(db, dbManager, featuresConfig)
1✔
1216
                        metricsReader := &heimdallMetricsReader{}
1✔
1217
                        handler := heimdall.NewHandler(manager, heimdallCfg, dbRouter, metricsReader)
1✔
1218
                        // Expose MCP tools to the agentic loop only when enabled (default off to avoid context bloat)
1✔
1219
                        if mcpServer != nil && featuresConfig.HeimdallMCPEnable {
2✔
1220
                                handler.SetInMemoryToolRunner(&mcpToolRunnerAdapter{
1✔
1221
                                        s:         mcpServer,
1✔
1222
                                        allowlist: featuresConfig.HeimdallMCPTools,
1✔
1223
                                })
1✔
1224
                        }
1✔
1225

1226
                        // Initialize Heimdall plugin subsystem
1227
                        subsystemMgr := heimdall.GetSubsystemManager()
1✔
1228

1✔
1229
                        // Create the Heimdall invoker so plugins can call the LLM
1✔
1230
                        heimdallInvoker := heimdall.NewLiveHeimdallInvoker(
1✔
1231
                                subsystemMgr,
1✔
1232
                                manager, // Manager implements Generator interface
1✔
1233
                                handler.Bifrost(),
1✔
1234
                                dbRouter,
1✔
1235
                                metricsReader,
1✔
1236
                        )
1✔
1237

1✔
1238
                        subsystemCtx := heimdall.SubsystemContext{
1✔
1239
                                Config:   heimdallCfg,
1✔
1240
                                Bifrost:  handler.Bifrost(),
1✔
1241
                                Database: dbRouter,
1✔
1242
                                Metrics:  metricsReader,
1✔
1243
                                Heimdall: heimdallInvoker, // Now plugins can call p.ctx.Heimdall.SendPrompt()
1✔
1244
                        }
1✔
1245
                        subsystemMgr.SetContext(subsystemCtx)
1✔
1246

1✔
1247
                        // Load plugins from configured directories.
1✔
1248
                        if config.PluginsDir != "" {
2✔
1249
                                heimdallLog.Debug("loading APOC plugins", "dir", config.PluginsDir)
1✔
1250
                                if err := nornicdb.LoadPluginsFromDir(config.PluginsDir, &subsystemCtx); err != nil {
1✔
1251
                                        heimdallLog.Warn("failed to load APOC plugins", "dir", config.PluginsDir, "error", err)
×
1252
                                }
×
1253
                        }
1254
                        if config.HeimdallPluginsDir != "" && config.HeimdallPluginsDir != config.PluginsDir {
2✔
1255
                                heimdallLog.Debug("loading Heimdall plugins", "dir", config.HeimdallPluginsDir)
1✔
1256
                                if err := nornicdb.LoadPluginsFromDir(config.HeimdallPluginsDir, &subsystemCtx); err != nil {
1✔
1257
                                        heimdallLog.Warn("failed to load Heimdall plugins", "dir", config.HeimdallPluginsDir, "error", err)
×
1258
                                }
×
1259
                        } else if config.HeimdallPluginsDir == "" {
2✔
1260
                                heimdallLog.Debug("heimdall plugins dir is empty")
1✔
1261
                        } else {
1✔
1262
                                heimdallLog.Debug("heimdall plugins dir same as plugins dir; skipping",
×
1263
                                        "heimdall_dir", config.HeimdallPluginsDir,
×
1264
                                        "plugins_dir", config.PluginsDir)
×
1265
                        }
×
1266

1267
                        s.setHeimdallHandler(handler)
1✔
1268

1✔
1269
                        plugins := heimdall.ListHeimdallPlugins()
1✔
1270
                        actions := heimdall.ListHeimdallActions()
1✔
1271
                        heimdallLog.Info("heimdall AI assistant ready",
1✔
1272
                                "model", heimdallCfg.Model,
1✔
1273
                                "plugins_loaded", len(plugins),
1✔
1274
                                "actions_available", len(actions),
1✔
1275
                                "bifrost_chat_route", "/api/bifrost/chat/completions",
1✔
1276
                                "status_route", "/api/bifrost/status",
1✔
1277
                        )
1✔
1278
                        if len(plugins) == 0 {
2✔
1279
                                heimdallLog.Warn("no heimdall plugins loaded — watcher logs will be absent",
1✔
1280
                                        "remediation", "ensure a .so exists in HeimdallPluginsDir")
1✔
1281
                        }
1✔
1282
                        for _, actionName := range actions {
1✔
1283
                                heimdallLog.Debug("heimdall action registered", "action", actionName)
×
1284
                        }
×
1285
                }()
1286
        } else {
1✔
1287
                s.log.Info("heimdall AI assistant disabled",
1✔
1288
                        "subsystem", "heimdall",
1✔
1289
                        "override_env", "NORNICDB_HEIMDALL_ENABLED")
1✔
1290
        }
1✔
1291

1292
        // Independent search rerank (Stage-2 reranking, not tied to Heimdall).
1293
        // Supports local (GGUF, like embeddings) or external provider (ollama/openai/http),
1294
        // similar to Heimdall and embeddings.
1295
        if featuresConfig.SearchRerankEnabled {
2✔
1296
                provider := strings.TrimSpace(strings.ToLower(featuresConfig.SearchRerankProvider))
1✔
1297
                if provider == "" {
1✔
1298
                        provider = "local"
×
1299
                }
×
1300

1301
                if provider == "local" {
2✔
1302
                        // Load GGUF into memory (same pattern as embedding model).
1✔
1303
                        modelsDir := config.ModelsDir
1✔
1304
                        if modelsDir == "" {
1✔
1305
                                modelsDir = "./models"
×
1306
                        }
×
1307
                        modelName := featuresConfig.SearchRerankModel
1✔
1308
                        if modelName == "" {
1✔
1309
                                modelName = "bge-reranker-v2-m3-Q4_K_M.gguf"
×
1310
                        }
×
1311
                        if !strings.HasSuffix(modelName, ".gguf") {
2✔
1312
                                modelName = modelName + ".gguf"
1✔
1313
                        }
1✔
1314
                        modelPath := filepath.Join(modelsDir, modelName)
1✔
1315

1✔
1316
                        rerankLog := s.log.With("subsystem", "search_rerank")
1✔
1317
                        rerankLog.Info("loading search reranker model",
1✔
1318
                                "provider", "local",
1✔
1319
                                "model_path", modelPath,
1✔
1320
                                "note", "server starts immediately; reranking available after model loads")
1✔
1321

1✔
1322
                        go func() {
2✔
1323
                                opts := localllm.DefaultOptions(modelPath)
1✔
1324
                                opts.GPULayers = -1
1✔
1325
                                rerankerModel, err := localllm.LoadRerankerModel(opts)
1✔
1326
                                if err != nil {
2✔
1327
                                        rerankLog.Warn("search reranker model unavailable; stage-2 reranking disabled, RRF order only",
1✔
1328
                                                "error", err)
1✔
1329
                                        return
1✔
1330
                                }
1✔
1331

1332
                                ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
×
1333
                                _, healthErr := rerankerModel.Score(ctx, "health", "check")
×
1334
                                cancel()
×
1335
                                if healthErr != nil {
×
1336
                                        rerankerModel.Close()
×
1337
                                        rerankLog.Warn("search reranker failed health check", "error", healthErr)
×
1338
                                        return
×
1339
                                }
×
1340

1341
                                cfg := search.DefaultLocalRerankerConfig()
×
1342
                                cfg.Enabled = true
×
1343
                                r := search.NewLocalReranker(rerankerModel, cfg)
×
1344
                                db.SetSearchReranker(r)
×
1345
                                setGlobalRerankerResolver(func(string) search.Reranker { return r })
×
1346
                                rerankLog.Info("search reranker ready (stage-2 reranking enabled)",
×
1347
                                        "model", modelName)
×
1348
                        }()
1349
                } else {
1✔
1350
                        // External provider: use HTTP rerank API (Cohere, HuggingFace TEI, Ollama adapter, etc.).
1✔
1351
                        apiURL := strings.TrimSpace(featuresConfig.SearchRerankAPIURL)
1✔
1352
                        if apiURL == "" {
2✔
1353
                                if provider == "ollama" {
2✔
1354
                                        apiURL = "http://localhost:11434/rerank"
1✔
1355
                                }
1✔
1356
                        }
1357
                        if apiURL == "" {
2✔
1358
                                s.log.Warn("search rerank enabled but API URL not set; stage-2 reranking disabled",
1✔
1359
                                        "subsystem", "search_rerank",
1✔
1360
                                        "provider", provider,
1✔
1361
                                        "required_env", "NORNICDB_SEARCH_RERANK_API_URL")
1✔
1362
                        } else {
2✔
1363
                                ceConfig := &search.CrossEncoderConfig{
1✔
1364
                                        Enabled:  true,
1✔
1365
                                        APIURL:   apiURL,
1✔
1366
                                        APIKey:   featuresConfig.SearchRerankAPIKey,
1✔
1367
                                        Model:    featuresConfig.SearchRerankModel,
1✔
1368
                                        TopK:     100,
1✔
1369
                                        Timeout:  30 * time.Second,
1✔
1370
                                        MinScore: 0.0,
1✔
1371
                                }
1✔
1372
                                if ceConfig.Model == "" && provider == "ollama" {
2✔
1373
                                        ceConfig.Model = "reranker"
1✔
1374
                                }
1✔
1375
                                ce := search.NewCrossEncoder(ceConfig)
1✔
1376
                                db.SetSearchReranker(ce)
1✔
1377
                                setGlobalRerankerResolver(func(string) search.Reranker { return ce })
1✔
1378
                                s.log.Info("search reranker ready (stage-2 reranking enabled)",
1✔
1379
                                        "subsystem", "search_rerank",
1✔
1380
                                        "provider", provider,
1✔
1381
                                        "url", apiURL)
1✔
1382
                        }
1383
                }
1384
        } else {
1✔
1385
                s.log.Info("search rerank disabled",
1✔
1386
                        "subsystem", "search_rerank",
1✔
1387
                        "override_env", "NORNICDB_SEARCH_RERANK_ENABLED")
1✔
1388
        }
1✔
1389

1390
        // Configure embeddings if enabled
1391
        // Local provider doesn't need API URL, others do
1392
        embeddingsReady := config.EmbeddingEnabled && (config.EmbeddingProvider == "local" || config.EmbeddingAPIURL != "")
1✔
1393
        if embeddingsReady {
2✔
1394
                embedConfig := &embed.Config{
1✔
1395
                        Provider:   config.EmbeddingProvider,
1✔
1396
                        APIURL:     config.EmbeddingAPIURL,
1✔
1397
                        APIKey:     config.EmbeddingAPIKey,
1✔
1398
                        Model:      config.EmbeddingModel,
1✔
1399
                        Dimensions: config.EmbeddingDimensions,
1✔
1400
                        ModelsDir:  config.ModelsDir,
1✔
1401
                        Timeout:    30 * time.Second,
1✔
1402
                }
1✔
1403

1✔
1404
                // Set API path based on provider (only for remote providers)
1✔
1405
                switch config.EmbeddingProvider {
1✔
1406
                case "ollama":
1✔
1407
                        embedConfig.APIPath = "/api/embeddings"
1✔
1408
                case "openai":
1✔
1409
                        embedConfig.APIPath = "/v1/embeddings"
1✔
1410
                case "local":
1✔
1411
                        // Local provider doesn't need API path
1412
                default:
×
1413
                        // Default to Ollama format
×
1414
                        embedConfig.APIPath = "/api/embeddings"
×
1415
                }
1416

1417
                // Initialize embeddings asynchronously to prevent startup blocking
1418
                // Local GGUF models can take 5-30 seconds to load (graph compilation)
1419
                embedInitLog := s.log.With("subsystem", "embed_init")
1✔
1420
                embedInitLog.Info("loading embedding model",
1✔
1421
                        "model", embedConfig.Model,
1✔
1422
                        "provider", embedConfig.Provider,
1✔
1423
                        "note", "server starts immediately; embeddings available after model loads")
1✔
1424

1✔
1425
                go func() {
2✔
1426
                        // Retry forever: exponential backoff to 5m, then fixed 5m interval.
1✔
1427
                        const (
1✔
1428
                                initialBackoff = 2 * time.Second
1✔
1429
                                maxBackoff     = 5 * time.Minute
1✔
1430
                        )
1✔
1431

1✔
1432
                        backoff := initialBackoff
1✔
1433
                        attempt := 0
1✔
1434

1✔
1435
                        for {
2✔
1436
                                if s.closed.Load() {
1✔
1437
                                        embedInitLog.Info("embedding init retry loop stopped: server shutting down")
×
1438
                                        return
×
1439
                                }
×
1440

1441
                                attempt++
1✔
1442

1✔
1443
                                // Use factory function for all providers.
1✔
1444
                                embedder, err := embed.NewEmbedder(embedConfig)
1✔
1445
                                if err == nil {
2✔
1446
                                        // Health check: test embedding before enabling.
1✔
1447
                                        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1✔
1448
                                        _, healthErr := embedder.Embed(ctx, "health check")
1✔
1449
                                        cancel()
1✔
1450
                                        if healthErr != nil {
2✔
1451
                                                err = fmt.Errorf("health check failed: %w", healthErr)
1✔
1452
                                        }
1✔
1453
                                }
1454

1455
                                if err == nil {
2✔
1456
                                        // Wrap with caching if enabled (default: 10K cache).
1✔
1457
                                        if config.EmbeddingCacheSize > 0 {
2✔
1458
                                                embedder = embed.NewCachedEmbedder(embedder, config.EmbeddingCacheSize)
1✔
1459
                                                embedInitLog.Info("embedding cache enabled",
1✔
1460
                                                        "entries", config.EmbeddingCacheSize,
1✔
1461
                                                        "memory_mb", embeddingCacheMemoryMB(config.EmbeddingCacheSize, config.EmbeddingDimensions))
1✔
1462
                                        }
1✔
1463

1464
                                        if config.EmbeddingProvider == "local" {
1✔
1465
                                                embedInitLog.Info("embeddings ready",
×
1466
                                                        "provider", "local_gguf",
×
1467
                                                        "model", config.EmbeddingModel,
×
1468
                                                        "dims", config.EmbeddingDimensions)
×
1469
                                        } else {
1✔
1470
                                                embedInitLog.Info("embeddings ready",
1✔
1471
                                                        "provider", config.EmbeddingProvider,
1✔
1472
                                                        "url", config.EmbeddingAPIURL,
1✔
1473
                                                        "model", config.EmbeddingModel,
1✔
1474
                                                        "dims", config.EmbeddingDimensions)
1✔
1475
                                        }
1✔
1476

1477
                                        if mcpServer != nil {
2✔
1478
                                                mcpServer.SetEmbedder(embedder)
1✔
1479
                                        }
1✔
1480
                                        // Share embedder with DB for auto-embed queue.
1481
                                        // The embed worker will wait for this to be set before processing.
1482
                                        db.SetEmbedder(embedder)
1✔
1483
                                        // Register as default for per-DB embedder registry (no-op if SetEmbedConfigForDB was not set).
1✔
1484
                                        db.SetDefaultEmbedConfig(embedConfig)
1✔
1485
                                        return
1✔
1486
                                }
1487

1488
                                if config.EmbeddingProvider == "local" {
2✔
1489
                                        embedInitLog.Warn("embedding init attempt failed",
1✔
1490
                                                "attempt", attempt,
1✔
1491
                                                "provider", "local",
1✔
1492
                                                "model", config.EmbeddingModel,
1✔
1493
                                                "error", err)
1✔
1494
                                } else {
2✔
1495
                                        embedInitLog.Warn("embedding init attempt failed",
1✔
1496
                                                "attempt", attempt,
1✔
1497
                                                "provider", config.EmbeddingProvider,
1✔
1498
                                                "model", config.EmbeddingModel,
1✔
1499
                                                "url", config.EmbeddingAPIURL,
1✔
1500
                                                "error", err)
1✔
1501
                                }
1✔
1502

1503
                                if backoff < maxBackoff {
2✔
1504
                                        embedInitLog.Info("retrying embedding init (exponential backoff)", "wait", backoff)
1✔
1505
                                        if !waitForDurationOrServerClose(s, backoff) {
2✔
1506
                                                embedInitLog.Info("embedding init retry interrupted by server shutdown")
1✔
1507
                                                return
1✔
1508
                                        }
1✔
1509
                                        backoff *= 2
1✔
1510
                                        if backoff > maxBackoff {
1✔
1511
                                                backoff = maxBackoff
×
1512
                                        }
×
1513
                                } else {
×
1514
                                        embedInitLog.Info("embedding init retry interval capped; continuing periodic retries",
×
1515
                                                "interval", maxBackoff)
×
1516
                                        if !waitForDurationOrServerClose(s, maxBackoff) {
×
1517
                                                embedInitLog.Info("embedding init retry interrupted by server shutdown")
×
1518
                                                return
×
1519
                                        }
×
1520
                                }
1521
                        }
1522
                }()
1523
        }
1524

1525
        // Log authentication status
1526
        if authenticator == nil || !authenticator.IsSecurityEnabled() {
2✔
1527
                s.log.Warn("authentication disabled")
1✔
1528
        }
1✔
1529

1530
        // Initialize rate limiter if enabled
1531
        var rateLimiter *IPRateLimiter
1✔
1532
        if config.RateLimitEnabled {
2✔
1533
                rateLimiter = NewIPRateLimiter(config.RateLimitPerMinute, config.RateLimitPerHour, config.RateLimitBurst)
1✔
1534
                s.log.Info("rate limiting enabled",
1✔
1535
                        "per_minute", config.RateLimitPerMinute,
1✔
1536
                        "per_hour", config.RateLimitPerHour,
1✔
1537
                        "scope", "per_ip")
1✔
1538
        }
1✔
1539
        s.rateLimiter = rateLimiter
1✔
1540

1✔
1541
        // So EmbeddingCount() aggregates across all databases (not just default)
1✔
1542
        s.db.SetAllDatabasesProvider(func() []nornicdb.DatabaseAndStorage {
2✔
1543
                var out []nornicdb.DatabaseAndStorage
1✔
1544
                for _, info := range s.dbManager.ListDatabases() {
2✔
1545
                        if info.Name == "system" {
2✔
1546
                                continue
1✔
1547
                        }
1548
                        isComposite := s.dbManager.IsCompositeDatabase(info.Name)
1✔
1549
                        if isComposite {
1✔
1550
                                continue
×
1551
                        }
1552
                        storageEngine, err := s.dbManager.GetStorage(info.Name)
1✔
1553
                        if err != nil {
1✔
1554
                                continue
×
1555
                        }
1556
                        out = append(out, nornicdb.DatabaseAndStorage{
1✔
1557
                                Name:        info.Name,
1✔
1558
                                Storage:     storageEngine,
1✔
1559
                                IsComposite: isComposite,
1✔
1560
                        })
1✔
1561
                }
1562
                return out
1✔
1563
        })
1564

1565
        // Reconcile search-service startup for metadata-only or late-created databases.
1566
        // DB.Open() warms namespaces present in storage; this loop ensures known DB metadata
1567
        // also gets initialized, and keeps doing so without requiring first-search triggers.
1568
        s.ensureSearchBuildStartedForKnownDatabases()
1✔
1569
        go func() {
2✔
1570
                ticker := time.NewTicker(2 * time.Second)
1✔
1571
                defer ticker.Stop()
1✔
1572
                for {
2✔
1573
                        if s.closed.Load() {
2✔
1574
                                return
1✔
1575
                        }
1✔
1576
                        s.ensureSearchBuildStartedForKnownDatabases()
1✔
1577
                        <-ticker.C
1✔
1578
                }
1579
        }()
1580

1581
        // Wire MCP to use per-database executors when invoked from the agentic loop (so link/store/recall use the request's database)
1582
        if mcpServer != nil && dbManager != nil {
2✔
1583
                mcpServer.SetDatabaseScopedExecutor(s.mcpDatabaseScopedExecutor())
1✔
1584
                mcpServer.SetDatabaseScopedStorage(func(dbName string) (storage.Engine, error) {
1✔
1585
                        return s.dbManager.GetStorage(dbName)
×
1586
                })
×
1587
        }
1588

1589
        // Initialize OAuth manager if authenticator is available
1590
        if authenticator != nil {
2✔
1591
                s.oauthManager = auth.NewOAuthManager(authenticator)
1✔
1592
        }
1✔
1593

1594
        // Per-database access: Full when auth disabled; when auth enabled, DenyAll until allowlist resolves.
1595
        if authenticator == nil || !authenticator.IsSecurityEnabled() {
2✔
1596
                s.databaseAccessMode = auth.FullDatabaseAccessMode
1✔
1597
        } else {
2✔
1598
                s.databaseAccessMode = auth.DenyAllDatabaseAccessMode
1✔
1599
        }
1✔
1600

1601
        // Load RBAC stores from system DB when available so roles/allowlist/privileges/entitlements APIs
1602
        // work even with --no-auth (e.g. fetch roles, configure RBAC before enabling auth).
1603
        if systemStorage, err := dbManager.GetStorage("system"); err == nil {
2✔
1604
                ctx := context.Background()
1✔
1605
                roleStore := auth.NewRoleStore(systemStorage)
1✔
1606
                if loadErr := roleStore.Load(ctx); loadErr != nil {
1✔
1607
                        s.log.Warn("failed to load RBAC roles", "subsystem", "rbac", "error", loadErr)
×
1608
                } else {
1✔
1609
                        s.roleStore = roleStore
1✔
1610
                }
1✔
1611
                allowlistStore := auth.NewAllowlistStore(systemStorage)
1✔
1612
                if loadErr := allowlistStore.Load(ctx); loadErr != nil {
1✔
1613
                        s.log.Warn("failed to load RBAC allowlist", "subsystem", "rbac", "error", loadErr)
×
1614
                } else {
1✔
1615
                        dbList := make([]string, 0, len(dbManager.ListDatabases()))
1✔
1616
                        for _, info := range dbManager.ListDatabases() {
2✔
1617
                                dbList = append(dbList, info.Name)
1✔
1618
                        }
1✔
1619
                        if seedErr := allowlistStore.SeedIfEmpty(ctx, dbList); seedErr != nil {
1✔
1620
                                s.log.Warn("failed to seed RBAC allowlist", "subsystem", "rbac", "error", seedErr)
×
1621
                        }
×
1622
                        s.allowlistStore = allowlistStore
1✔
1623
                }
1624
                privilegesStore := auth.NewPrivilegesStore(systemStorage)
1✔
1625
                if loadErr := privilegesStore.Load(ctx); loadErr != nil {
1✔
1626
                        s.log.Warn("failed to load RBAC privileges", "subsystem", "rbac", "error", loadErr)
×
1627
                } else {
1✔
1628
                        s.privilegesStore = privilegesStore
1✔
1629
                }
1✔
1630
                roleEntitlementsStore := auth.NewRoleEntitlementsStore(systemStorage)
1✔
1631
                if loadErr := roleEntitlementsStore.Load(ctx); loadErr != nil {
1✔
1632
                        s.log.Warn("failed to load RBAC role entitlements", "subsystem", "rbac", "error", loadErr)
×
1633
                } else {
1✔
1634
                        s.roleEntitlementsStore = roleEntitlementsStore
1✔
1635
                }
1✔
1636
                dbConfigStore := dbconfig.NewStore(systemStorage)
1✔
1637
                if loadErr := dbConfigStore.Load(ctx); loadErr != nil {
1✔
1638
                        s.log.Warn("failed to load per-DB config store", "subsystem", "dbconfig", "error", loadErr)
×
1639
                } else {
1✔
1640
                        s.dbConfigStore = dbConfigStore
1✔
1641
                        globalConfig := nornicConfig.LoadFromEnv()
1✔
1642
                        db.SetDbConfigResolver(func(dbName string) (int, float64, string) {
2✔
1643
                                overrides := dbConfigStore.GetOverrides(dbName)
1✔
1644
                                r := dbconfig.Resolve(globalConfig, overrides)
1✔
1645
                                if r == nil {
1✔
1646
                                        return 0, 0, ""
×
1647
                                }
×
1648
                                return r.EmbeddingDimensions, r.SearchMinSimilarity, r.BM25Engine
1✔
1649
                        })
1650
                        // Per-DB embedder registry: resolve embed config per database for EmbedQueryForDB.
1651
                        db.SetEmbedConfigForDB(func(dbName string) (*embed.Config, error) {
2✔
1652
                                overrides := dbConfigStore.GetOverrides(dbName)
1✔
1653
                                r := dbconfig.Resolve(globalConfig, overrides)
1✔
1654
                                if r == nil || r.Effective == nil {
1✔
1655
                                        return nil, nil
×
1656
                                }
×
1657
                                return buildEmbedConfigFromResolved(r.Effective, config), nil
1✔
1658
                        })
1659
                }
1660
        }
1661

1662
        // Initialize slow query logger if file specified.
1663
        // D-04d collapse: threshold + log file path read from the canonical
1664
        // pkg/config.LoggingConfig snapshot threaded via Config.Logging.
1665
        if config.SlowQueryEnabled && config.Logging.SlowQueryLogFile != "" {
2✔
1666
                file, err := os.OpenFile(config.Logging.SlowQueryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
1✔
1667
                if err != nil {
2✔
1668
                        s.log.Warn("failed to open slow query log file",
1✔
1669
                                "subsystem", "slow_query",
1✔
1670
                                "file", config.Logging.SlowQueryLogFile,
1✔
1671
                                "error", err)
1✔
1672
                } else {
2✔
1673
                        s.slowQueryLogger = log.New(file, "", log.LstdFlags)
1✔
1674
                        s.log.Info("slow query logging configured",
1✔
1675
                                "subsystem", "slow_query",
1✔
1676
                                "file", config.Logging.SlowQueryLogFile,
1✔
1677
                                "threshold", config.Logging.SlowQueryThreshold)
1✔
1678
                }
1✔
1679
        } else if config.SlowQueryEnabled {
1✔
1680
                s.log.Info("slow query logging enabled",
×
1681
                        "subsystem", "slow_query",
×
1682
                        "threshold", config.Logging.SlowQueryThreshold)
×
1683
        }
×
1684

1685
        return s, nil
1✔
1686
}
1687

1688
// SetHTTPMetrics injects the Plan-04-02 HTTP catalog bag (D-02 typed
1689
// handle DI). MUST be called BEFORE Start() — once the http.Server's
1690
// Handler is wired in Start(), the wrapper is fixed for the server
1691
// lifetime. Callers (cmd/nornicdb/main.go) inject after observability.New
1692
// returns the registry, then call Start().
1693
//
1694
// Nil-safe: passing nil is equivalent to never calling — instrumentedMux
1695
// is a pass-through. Test fixtures and pre-Phase-4 callers compile and
1696
// run unchanged.
1697
func (s *Server) SetHTTPMetrics(m *observability.HTTPMetrics) {
×
1698
        s.mu.Lock()
×
1699
        defer s.mu.Unlock()
×
1700
        s.httpMetrics = m
×
1701
}
×
1702

1703
// SetObsRegistry plumbs the unified prometheus registry from
1704
// observability.New into the server so handleMetrics can call
1705
// observability.RenderLegacy. Phase 5 / Plan 05-04. Mirrors the
1706
// SetHTTPMetrics pattern (mu.Lock + assign + unlock).
1707
//
1708
// Nil-safe: passing nil is equivalent to never calling — handleMetrics
1709
// tolerates a nil registry by emitting empty body bytes (RenderLegacy
1710
// contract). Test fixtures and pre-Phase-5 callers compile and run
1711
// unchanged.
1712
func (s *Server) SetObsRegistry(reg *prometheus.Registry) {
1✔
1713
        s.mu.Lock()
1✔
1714
        defer s.mu.Unlock()
1✔
1715
        s.obsRegistry = reg
1✔
1716
}
1✔
1717

1718
// SetAuditLogger sets the audit logger for compliance logging.
1719
func (s *Server) SetAuditLogger(logger *audit.Logger) {
1✔
1720
        s.mu.Lock()
1✔
1721
        defer s.mu.Unlock()
1✔
1722
        s.audit = logger
1✔
1723
        if s.db != nil {
2✔
1724
                s.db.SetRetentionAuditCallback(func(action, recordID, category string) {
1✔
1725
                        if s.audit == nil {
×
1726
                                return
×
1727
                        }
×
1728
                        _ = s.audit.LogDataAccess("system", "retention-manager", "node", recordID, action, true, category)
×
1729
                })
1730
        }
1731
}
1732

1733
func (s *Server) setHeimdallHandler(handler *heimdall.Handler) {
1✔
1734
        s.mu.Lock()
1✔
1735
        s.heimdallHandler = handler
1✔
1736
        s.mu.Unlock()
1✔
1737
}
1✔
1738

1739
func (s *Server) getHeimdallHandler() *heimdall.Handler {
1✔
1740
        s.mu.RLock()
1✔
1741
        defer s.mu.RUnlock()
1✔
1742
        return s.heimdallHandler
1✔
1743
}
1✔
1744

1745
// Start begins listening for HTTP connections on the configured address and port.
1746
//
1747
// The server starts in a separate goroutine, so this method returns immediately
1748
// after successfully binding to the port. Use Addr() to get the actual listening
1749
// address after starting.
1750
//
1751
// Returns:
1752
//   - nil if server started successfully
1753
//   - Error if failed to bind to port or server is already closed
1754
//
1755
// Example:
1756
//
1757
//        server := server.New(db, auth, config)
1758
//
1759
//        if err := server.Start(); err != nil {
1760
//                log.Fatalf("Failed to start server: %v", err)
1761
//        }
1762
//
1763
//        // Server started on server.Addr()
1764
//
1765
//        // Server is now accepting connections
1766
//        // Keep main goroutine alive
1767
//        select {}
1768
//
1769
// TLS Support:
1770
//
1771
//        If TLSCertFile and TLSKeyFile are configured, the server automatically
1772
//        starts with HTTPS. Otherwise, it uses HTTP.
1773
func (s *Server) Start() error {
1✔
1774
        if s.closed.Load() {
2✔
1775
                return ErrServerClosed
1✔
1776
        }
1✔
1777

1778
        addr := fmt.Sprintf("%s:%d", s.config.Address, s.config.Port)
1✔
1779
        listener, err := net.Listen("tcp", addr)
1✔
1780
        if err != nil {
1✔
1781
                return fmt.Errorf("failed to listen on %s: %w", addr, err)
×
1782
        }
×
1783

1784
        s.listener = listener
1✔
1785
        s.started = time.Now()
1✔
1786

1✔
1787
        // Build router
1✔
1788
        mux := s.buildRouter()
1✔
1789

1✔
1790
        // Plan 04-02 D-03: instrumentedMux is the SOLE HTTP observation
1✔
1791
        // chokepoint per AGENTS.md §7 DRY. It wraps mux.ServeHTTP, reading
1✔
1792
        // `r.Pattern` post-dispatch (Go 1.22+ stdlib field) so path_template
1✔
1793
        // values come from the closed route table — never from r.URL.Path
1✔
1794
        // (cardinality bomb). Nil-safe: when s.httpMetrics is nil (test
1✔
1795
        // fixtures, pre-Phase-4 callers), the wrapper is a pass-through.
1✔
1796
        // Panic-safe: handler panics still emit a 5xx observation before
1✔
1797
        // re-propagating (T-04-08).
1✔
1798
        instrumented := instrumentedMux(mux, s.httpMetrics)
1✔
1799

1✔
1800
        s.httpServer = &http.Server{
1✔
1801
                Handler:      instrumented,
1✔
1802
                ReadTimeout:  s.config.ReadTimeout,
1✔
1803
                WriteTimeout: s.config.WriteTimeout,
1✔
1804
                IdleTimeout:  s.config.IdleTimeout,
1✔
1805
        }
1✔
1806

1✔
1807
        // Configure HTTP/2 (always enabled, backwards compatible with HTTP/1.1)
1✔
1808
        http2Config := &http2.Server{
1✔
1809
                MaxConcurrentStreams: s.config.HTTP2MaxConcurrentStreams,
1✔
1810
        }
1✔
1811

1✔
1812
        if s.config.TLSCertFile != "" && s.config.TLSKeyFile != "" {
1✔
1813
                // HTTPS mode: HTTP/2 is automatically enabled via ALPN
×
1814
                // Configure HTTP/2 settings for TLS connections
×
1815
                if err := http2.ConfigureServer(s.httpServer, http2Config); err != nil {
×
1816
                        return fmt.Errorf("failed to configure HTTP/2 for TLS: %w", err)
×
1817
                }
×
1818
                s.log.Info("HTTP/2 enabled", "mode", "https")
×
1819
        } else {
1✔
1820
                // HTTP mode: Use h2c (HTTP/2 cleartext) for backwards compatibility
1✔
1821
                // h2c allows HTTP/2 over plain TCP, falling back to HTTP/1.1 for older clients
1✔
1822
                // Wrap the INSTRUMENTED mux (not bare mux) so observation runs
1✔
1823
                // inside the h2c transport adapter (Plan 04-02 D-03).
1✔
1824
                s.httpServer.Handler = h2c.NewHandler(instrumented, http2Config)
1✔
1825
                s.log.Info("HTTP/2 enabled", "mode", "h2c_cleartext", "compat", "http/1.1")
1✔
1826
        }
1✔
1827

1828
        // Start serving
1829
        go func() {
2✔
1830
                var err error
1✔
1831
                if s.config.TLSCertFile != "" && s.config.TLSKeyFile != "" {
1✔
1832
                        err = s.httpServer.ServeTLS(listener, s.config.TLSCertFile, s.config.TLSKeyFile)
×
1833
                } else {
1✔
1834
                        err = s.httpServer.Serve(listener)
1✔
1835
                }
1✔
1836
                if err != nil && err != http.ErrServerClosed {
1✔
1837
                        // Log error but don't crash
×
1838
                        s.log.Error("http server error", "error", err)
×
1839
                }
×
1840
        }()
1841

1842
        // Optional gRPC endpoints (feature-flagged).
1843
        if err := s.startQdrantGRPC(); err != nil {
2✔
1844
                _ = s.httpServer.Shutdown(context.Background())
1✔
1845
                return err
1✔
1846
        }
1✔
1847

1848
        return nil
1✔
1849
}
1850

1851
// Stop gracefully shuts down the server.
1852
func (s *Server) Stop(ctx context.Context) error {
1✔
1853
        if !s.closed.CompareAndSwap(false, true) {
2✔
1854
                return nil // Already closed
1✔
1855
        }
1✔
1856

1857
        s.stopQdrantGRPC()
1✔
1858

1✔
1859
        // Stop rate limiter cleanup goroutine
1✔
1860
        if s.rateLimiter != nil {
2✔
1861
                s.rateLimiter.Stop()
1✔
1862
        }
1✔
1863

1864
        if s.httpServer == nil {
2✔
1865
                return nil
1✔
1866
        }
1✔
1867

1868
        // Hard-bound shutdown: even if net/http Shutdown fails to return at ctx deadline
1869
        // (e.g., a stuck handler or an internal deadlock), Stop must return so callers
1870
        // can exit deterministically.
1871
        shutdownDone := make(chan error, 1)
1✔
1872
        go func() {
2✔
1873
                shutdownDone <- s.httpServer.Shutdown(ctx)
1✔
1874
        }()
1✔
1875

1876
        select {
1✔
1877
        case err := <-shutdownDone:
1✔
1878
                if err != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
1✔
1879
                        _ = s.httpServer.Close()
×
1880
                }
×
1881
                return err
1✔
1882
        case <-ctx.Done():
1✔
1883
                _ = s.httpServer.Close()
1✔
1884
                return ctx.Err()
1✔
1885
        }
1886
}
1887

1888
// Addr returns the server's listen address.
1889
func (s *Server) Addr() string {
1✔
1890
        if s.listener != nil {
2✔
1891
                return s.listener.Addr().String()
1✔
1892
        }
1✔
1893
        return ""
1✔
1894
}
1895

1896
// Stats returns current server runtime statistics.
1897
//
1898
// Statistics are updated in real-time by middleware and include:
1899
//   - Uptime since server start
1900
//   - Total request count
1901
//   - Total error count
1902
//   - Currently active requests
1903
//
1904
// Example:
1905
//
1906
//        stats := server.Stats()
1907
//        // stats.Uptime: server uptime
1908
//        // stats.RequestCount: total requests
1909
//        // stats.ErrorCount / stats.RequestCount: error rate
1910
//        // stats.ActiveRequests: in-flight requests
1911
//
1912
//        // Use for monitoring/alerting
1913
//        if stats.ErrorCount > 1000 {
1914
//                alert("High error count detected")
1915
//        }
1916
//
1917
// Thread-safe: Can be called concurrently from multiple goroutines.
1918
func (s *Server) Stats() ServerStats {
1✔
1919
        return ServerStats{
1✔
1920
                Uptime:         time.Since(s.started),
1✔
1921
                RequestCount:   s.requestCount.Load(),
1✔
1922
                ErrorCount:     s.errorCount.Load(),
1✔
1923
                ActiveRequests: s.activeRequests.Load(),
1✔
1924
                Version:        buildinfo.Version(),
1✔
1925
                Commit:         buildinfo.ShortCommit(),
1✔
1926
                BuildTime:      buildinfo.BuildTime,
1✔
1927
        }
1✔
1928
}
1✔
1929

1930
// ServerStats holds server metrics.
1931
type ServerStats struct {
1932
        Uptime         time.Duration `json:"uptime"`
1933
        RequestCount   int64         `json:"request_count"`
1934
        ErrorCount     int64         `json:"error_count"`
1935
        ActiveRequests int64         `json:"active_requests"`
1936
        Version        string        `json:"version"`
1937
        Commit         string        `json:"commit"`
1938
        BuildTime      string        `json:"build_time"`
1939
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc