• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orneryd / NornicDB / 26850148459

02 Jun 2026 09:50PM UTC coverage: 87.248% (+0.004%) from 87.244%
26850148459

push

github

orneryd
fix(badger): Add sharded embedding storage to avoid oversized Badger values
Add backward-compatible size-aware sharding for embedding chunk payloads
Introduce shard marker and part-key helpers for chunk reads/writes
Update embedding decode path to support both legacy single-value chunks and sharded payloads
Wire sharded embedding writes through node, transaction, and bulk create paths
Batch shard KV writes to keep Badger transactions bounded
Add regression coverage for large embedding chunk roundtrips via update, bulk create, and transactional create paths

75 of 114 new or added lines in 4 files covered. (65.79%)

41 existing lines in 5 files now uncovered.

135211 of 154974 relevant lines covered (87.25%)

1.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.01
/pkg/server/server.go
1
// Package server provides a Neo4j-compatible HTTP REST API server for NornicDB.
2
//
3
// This package implements the Neo4j HTTP API specification, making NornicDB compatible
4
// with existing Neo4j tools, drivers, and browsers while adding NornicDB-specific
5
// extensions for memory decay, vector search, and compliance features.
6
//
7
// Neo4j Compatibility:
8
//   - Discovery endpoint (/) returns Neo4j-compatible service information
9
//   - Transaction API (/db/{name}/tx) supports implicit and explicit transactions
10
//   - Cypher query execution with Neo4j response format
11
//   - Basic Auth and Bearer token authentication
12
//   - Error codes follow Neo4j conventions (Neo.ClientError.*)
13
//
14
// NornicDB Extensions:
15
//   - JWT authentication with RBAC
16
//   - Vector search endpoints (/nornicdb/search, /nornicdb/similar)
17
//   - Memory decay information (/nornicdb/decay)
18
//   - GDPR compliance endpoints (/gdpr/export, /gdpr/delete)
19
//   - Admin endpoints (/admin/stats, /admin/config)
20
//   - GPU acceleration control (/admin/gpu/*)
21
//   - HTTP/2 support (always enabled, backwards compatible with HTTP/1.1)
22
//
23
// Example Usage:
24
//
25
//        // Create server
26
//        db, _ := nornicdb.Open("./data", nil)
27
//        auth, _ := auth.NewAuthenticator(auth.DefaultAuthConfig())
28
//        config := server.DefaultConfig()
29
//
30
//        server, err := server.New(db, auth, config)
31
//        if err != nil {
32
//                log.Fatal(err)
33
//        }
34
//
35
//        // Start server
36
//        if err := server.Start(); err != nil {
37
//                log.Fatal(err)
38
//        }
39
//
40
//        // Server listening on server.Addr()
41
//
42
//        // Use with Neo4j Browser
43
//        // Open: http://localhost:7474
44
//        // Connect URI: bolt://localhost:7687 (if Bolt server is running)
45
//        // Or use HTTP: http://localhost:7474/db/nornic/tx/commit
46
//
47
//        // Use with Neo4j drivers
48
//        driver := neo4j.NewDriver("http://localhost:7474", neo4j.BasicAuth("admin", "password"))
49
//        session := driver.NewSession(neo4j.SessionConfig{})
50
//        result, _ := session.Run("MATCH (n) RETURN count(n)", nil)
51
//
52
//        // Graceful shutdown
53
//        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
54
//        defer cancel()
55
//        server.Stop(ctx)
56
//
57
// Authentication:
58
//
59
// The server supports multiple authentication methods:
60
//
61
//  1. **Basic Auth** (Neo4j compatible):
62
//     Authorization: Basic base64(username:password)
63
//
64
//  2. **Bearer Token** (JWT):
65
//     Authorization: Bearer eyJhbGciOiJIUzI1NiIs...
66
//
67
//  3. **Cookie** (browser sessions):
68
//     Cookie: token=eyJhbGciOiJIUzI1NiIs...
69
//
70
//  4. **Query Parameter** (for SSE/WebSocket):
71
//     ?token=eyJhbGciOiJIUzI1NiIs...
72
//
73
// Neo4j HTTP API Endpoints:
74
//
75
//        GET  /                           - Discovery (service information)
76
//        GET  /db/{name}                  - Database information
77
//        POST /db/{name}/tx/commit       - Execute Cypher (implicit transaction)
78
//        POST /db/{name}/tx              - Begin explicit transaction
79
//        POST /db/{name}/tx/{id}         - Execute in transaction
80
//        POST /db/{name}/tx/{id}/commit  - Commit transaction
81
//        DELETE /db/{name}/tx/{id}       - Rollback transaction
82
//
83
// NornicDB Extension Endpoints:
84
//
85
//        Authentication:
86
//          POST /auth/token                - Get JWT token
87
//          POST /auth/logout               - Logout
88
//          GET  /auth/me                   - Current user info
89
//          POST /auth/api-token            - Generate API token (admin)
90
//          GET  /auth/oauth/redirect       - OAuth redirect
91
//          GET  /auth/oauth/callback        - OAuth callback
92
//          GET  /auth/users                 - List users (admin)
93
//          POST /auth/users                 - Create user (admin)
94
//          GET  /auth/users/{username}      - Get user (admin)
95
//          PUT  /auth/users/{username}      - Update user (admin)
96
//          DELETE /auth/users/{username}    - Delete user (admin)
97
//
98
//        Search & Embeddings:
99
//          POST /nornicdb/search           - Hybrid search (vector + BM25)
100
//          POST /nornicdb/similar           - Vector similarity search
101
//          GET  /nornicdb/decay             - Memory decay statistics
102
//          POST /nornicdb/embed/trigger     - Trigger embedding generation
103
//          GET  /nornicdb/embed/stats       - Embedding statistics
104
//          POST /nornicdb/embed/clear       - Clear all embeddings (admin)
105
//          POST /nornicdb/search/rebuild    - Rebuild search indexes
106
//
107
//        Admin & System:
108
//          GET  /admin/stats               - System statistics (admin)
109
//          GET  /admin/config               - Server configuration (admin)
110
//          POST /admin/backup               - Create backup (admin)
111
//          GET  /admin/gpu/status           - GPU status (admin)
112
//          POST /admin/gpu/enable           - Enable GPU (admin)
113
//          POST /admin/gpu/disable          - Disable GPU (admin)
114
//          POST /admin/gpu/test              - Test GPU (admin)
115
//
116
//        GDPR Compliance:
117
//          POST /gdpr/export                - GDPR data export (requires user_id and format in body)
118
//          POST /gdpr/delete                - GDPR erasure request
119
//
120
//        GraphQL & AI:
121
//          POST /graphql                    - GraphQL endpoint
122
//          GET  /graphql/playground         - GraphQL Playground
123
//          POST /mcp                        - MCP server endpoint
124
//          POST /api/bifrost/chat/completions - Heimdall AI chat
125
//
126
// For complete API documentation, see: docs/api-reference/openapi.yaml
127
//
128
// Security Features:
129
//
130
//   - CORS support with configurable origins
131
//   - Request size limits (default 10MB)
132
//   - IP-based rate limiting (configurable per-minute/per-hour limits)
133
//   - Audit logging integration
134
//   - Panic recovery middleware
135
//   - TLS/HTTPS support
136
//
137
// Compliance:
138
//   - GDPR Art.15 (right of access) via /gdpr/export
139
//   - GDPR Art.17 (right to erasure) via /gdpr/delete
140
//   - HIPAA audit logging for all data access
141
//   - SOC2 access controls via RBAC
142
//
143
// ELI12 (Explain Like I'm 12):
144
//
145
// Think of this server like a restaurant:
146
//
147
//  1. **Neo4j compatibility**: We speak the same "language" as Neo4j, so existing
148
//     customers (tools/drivers) can order from our menu without learning new words.
149
//
150
//  2. **Authentication**: Like checking IDs at the door - we make sure you're allowed
151
//     to be here and what you're allowed to do.
152
//
153
//  3. **Endpoints**: Different "counters" for different services - one for regular
154
//     food (Cypher queries), one for special orders (vector search), one for the
155
//     manager's office (admin functions).
156
//
157
//  4. **Middleware**: Like security guards, cashiers, and cleaners who help every
158
//     customer but do different jobs (logging, auth, error handling).
159
//
160
// The server makes sure everyone gets served safely and efficiently!
161
package server
162

163
import (
164
        "context"
165
        "errors"
166
        "fmt"
167
        "io"
168
        "log"
169
        "log/slog"
170
        "net"
171
        "net/http"
172
        "os"
173
        "path/filepath"
174
        "strconv"
175
        "strings"
176
        "sync"
177
        "sync/atomic"
178
        "time"
179

180
        "github.com/prometheus/client_golang/prometheus"
181
        "golang.org/x/net/http2"
182
        "golang.org/x/net/http2/h2c"
183

184
        "github.com/orneryd/nornicdb/pkg/audit"
185
        "github.com/orneryd/nornicdb/pkg/auth"
186
        "github.com/orneryd/nornicdb/pkg/buildinfo"
187
        nornicConfig "github.com/orneryd/nornicdb/pkg/config"
188
        "github.com/orneryd/nornicdb/pkg/config/dbconfig"
189
        "github.com/orneryd/nornicdb/pkg/cypher"
190
        "github.com/orneryd/nornicdb/pkg/embed"
191
        "github.com/orneryd/nornicdb/pkg/envutil"
192
        "github.com/orneryd/nornicdb/pkg/graphql"
193
        "github.com/orneryd/nornicdb/pkg/heimdall"
194
        "github.com/orneryd/nornicdb/pkg/localllm"
195
        "github.com/orneryd/nornicdb/pkg/mcp"
196
        "github.com/orneryd/nornicdb/pkg/multidb"
197
        "github.com/orneryd/nornicdb/pkg/nornicdb"
198
        "github.com/orneryd/nornicdb/pkg/observability"
199
        "github.com/orneryd/nornicdb/pkg/qdrantgrpc"
200
        "github.com/orneryd/nornicdb/pkg/search"
201
        "github.com/orneryd/nornicdb/pkg/storage"
202
        "github.com/orneryd/nornicdb/pkg/txsession"
203
)
204

205
// Errors for HTTP operations.
206
var (
207
        ErrServerClosed       = fmt.Errorf("server closed")
208
        ErrUnauthorized       = fmt.Errorf("unauthorized")
209
        ErrForbidden          = fmt.Errorf("forbidden")
210
        ErrBadRequest         = fmt.Errorf("bad request")
211
        ErrNotFound           = fmt.Errorf("not found")
212
        ErrMethodNotAllowed   = fmt.Errorf("method not allowed")
213
        ErrInternalError      = fmt.Errorf("internal server error")
214
        ErrServiceUnavailable = fmt.Errorf("service unavailable")
215
)
216

217
// embeddingCacheMemoryMB calculates approximate memory usage for embedding cache.
218
// Each cached embedding uses: cacheSize * dimensions * 4 bytes (float32).
219
func embeddingCacheMemoryMB(cacheSize, dimensions int) int {
1✔
220
        return cacheSize * dimensions * 4 / 1024 / 1024
1✔
221
}
1✔
222

223
// waitForDurationOrServerClose sleeps for d and returns true only when the full
224
// duration elapsed. It returns false when the server is closing so background
225
// retry loops can exit promptly during shutdown.
226
func waitForDurationOrServerClose(s *Server, d time.Duration) bool {
1✔
227
        if d <= 0 {
2✔
228
                return true
1✔
229
        }
1✔
230
        if s == nil {
2✔
231
                time.Sleep(d)
1✔
232
                return true
1✔
233
        }
1✔
234

235
        timer := time.NewTimer(d)
1✔
236
        defer timer.Stop()
1✔
237
        poll := time.NewTicker(250 * time.Millisecond)
1✔
238
        defer poll.Stop()
1✔
239

1✔
240
        for {
2✔
241
                if s.closed.Load() {
2✔
242
                        return false
1✔
243
                }
1✔
244
                select {
1✔
245
                case <-timer.C:
1✔
246
                        return true
1✔
247
                case <-poll.C:
1✔
248
                        if s.closed.Load() {
2✔
249
                                return false
1✔
250
                        }
1✔
251
                }
252
        }
253
}
254

255
// buildEmbedConfigFromResolved builds an embed.Config from per-DB effective map and server config fallbacks.
256
// Used by the per-DB embedder registry when EmbedConfigForDB is set.
257
func buildEmbedConfigFromResolved(effective map[string]string, fallback *Config) *embed.Config {
1✔
258
        if fallback == nil {
2✔
259
                return nil
1✔
260
        }
1✔
261
        get := func(key, def string) string {
2✔
262
                if v := effective[key]; v != "" {
2✔
263
                        return strings.TrimSpace(v)
1✔
264
                }
1✔
265
                return def
1✔
266
        }
267
        getInt := func(key string, def int) int {
2✔
268
                if v := effective[key]; v != "" {
2✔
269
                        if i, err := strconv.Atoi(strings.TrimSpace(v)); err == nil {
2✔
270
                                return i
1✔
271
                        }
1✔
272
                }
273
                return def
1✔
274
        }
275
        provider := get("NORNICDB_EMBEDDING_PROVIDER", fallback.EmbeddingProvider)
1✔
276
        if provider == "" {
2✔
277
                provider = "openai"
1✔
278
        }
1✔
279
        model := get("NORNICDB_EMBEDDING_MODEL", fallback.EmbeddingModel)
1✔
280
        apiURL := get("NORNICDB_EMBEDDING_API_URL", fallback.EmbeddingAPIURL)
1✔
281
        apiKey := get("NORNICDB_EMBEDDING_API_KEY", fallback.EmbeddingAPIKey)
1✔
282
        dimensions := getInt("NORNICDB_EMBEDDING_DIMENSIONS", fallback.EmbeddingDimensions)
1✔
283
        if dimensions <= 0 {
2✔
284
                dimensions = 1024
1✔
285
        }
1✔
286
        gpuLayers := getInt("NORNICDB_EMBEDDING_GPU_LAYERS", 0)
1✔
287
        cfg := &embed.Config{
1✔
288
                Provider:      provider,
1✔
289
                APIURL:        apiURL,
1✔
290
                APIKey:        apiKey,
1✔
291
                Model:         model,
1✔
292
                Dimensions:    dimensions,
1✔
293
                ModelsDir:     fallback.ModelsDir,
1✔
294
                Timeout:       30 * time.Second,
1✔
295
                GPULayers:     gpuLayers,
1✔
296
                CtxType:       fallback.EmbeddingCtxType,
1✔
297
                PoolingType:   fallback.EmbeddingPoolingType,
1✔
298
                AttentionType: fallback.EmbeddingAttentionType,
1✔
299
                FlashAttn:     fallback.EmbeddingFlashAttn,
1✔
300
        }
1✔
301
        switch provider {
1✔
302
        case "ollama":
1✔
303
                cfg.APIPath = "/api/embeddings"
1✔
304
        case "openai":
1✔
305
                cfg.APIPath = "/v1/embeddings"
1✔
306
        case "local":
1✔
307
                // no APIPath
308
        default:
1✔
309
                cfg.APIPath = "/api/embeddings"
1✔
310
        }
311
        return cfg
1✔
312
}
313

314
// Config holds HTTP server configuration options.
315
//
316
// All settings have sensible defaults via DefaultConfig(). The server follows
317
// Neo4j conventions where applicable (default port 7474, timeouts, etc.).
318
//
319
// Example:
320
//
321
//        // Production configuration
322
//        config := &server.Config{
323
//                Address:           "0.0.0.0",
324
//                Port:              7474,
325
//                ReadTimeout:       30 * time.Second,
326
//                WriteTimeout:      60 * time.Second,
327
//                MaxRequestSize:    50 * 1024 * 1024, // 50MB for large imports
328
//                EnableCORS:        true,
329
//                CORSOrigins:       []string{"https://myapp.com"},
330
//                EnableCompression: true,
331
//                TLSCertFile:       "/etc/ssl/server.crt",
332
//                TLSKeyFile:        "/etc/ssl/server.key",
333
//        }
334
//
335
//        // Development configuration with CORS for local UI
336
//        config = server.DefaultConfig()
337
//        config.Port = 8080
338
//        config.EnableCORS = true
339
//        config.CORSOrigins = []string{"http://localhost:3000"} // Local dev UI only
340
type Config struct {
341
        // Address to bind to (default: "127.0.0.1" - localhost only for security)
342
        // Set to "0.0.0.0" to listen on all interfaces (required for Docker/external access)
343
        Address string
344
        // PerDBYAMLOverrides carries the `databases:` map parsed from
345
        // nornicdb.yaml. Server.New consumes it during system-DB load to seed
346
        // dbconfig.Store on first boot via LoadWithYAMLDefaults — admin-API
347
        // edits remain authoritative across restarts. Must be set BEFORE
348
        // server.New runs; the post-construction setter is gone because the
349
        // store load happens inside New and won't see late assignments.
350
        PerDBYAMLOverrides map[string]map[string]string
351
        // Port to listen on (default: 7474)
352
        Port int
353
        // BoltPort is the port the Bolt protocol server listens on. Surfaced
354
        // in the discovery response so browser clients constructing
355
        // neo4j-driver Bolt-over-WS sessions know where to connect.
356
        // Default: 7687 when zero.
357
        BoltPort int
358
        // ReadTimeout for requests
359
        ReadTimeout time.Duration
360
        // WriteTimeout for responses
361
        WriteTimeout time.Duration
362
        // IdleTimeout for keep-alive connections
363
        IdleTimeout time.Duration
364
        // MaxRequestSize in bytes (default: 10MB)
365
        MaxRequestSize int64
366
        // EnableCORS for cross-origin requests (default: false for security)
367
        EnableCORS bool
368
        // CORSOrigins allowed origins (default: empty - must be explicitly configured)
369
        // WARNING: Never use "*" with credentials - this is a CSRF vulnerability
370
        CORSOrigins []string
371
        // EnableCompression for responses
372
        EnableCompression bool
373

374
        // Rate Limiting Configuration (DoS protection)
375
        // RateLimitEnabled enables IP-based rate limiting (default: true)
376
        RateLimitEnabled bool
377
        // RateLimitPerMinute max requests per IP per minute (default: 100)
378
        RateLimitPerMinute int
379
        // RateLimitPerHour max requests per IP per hour (default: 3000)
380
        RateLimitPerHour int
381
        // RateLimitBurst max burst size for short request spikes (default: 20)
382
        RateLimitBurst int
383
        // TLSCertFile for HTTPS
384
        TLSCertFile string
385
        // TLSKeyFile for HTTPS
386
        TLSKeyFile string
387

388
        // HTTP/2 Configuration
389
        // HTTP/2 is always enabled (backwards compatible with HTTP/1.1)
390
        // HTTP/2 provides multiplexing, header compression, and improved performance
391
        // HTTP/1.1 clients continue to work normally
392
        // HTTP2MaxConcurrentStreams limits the number of concurrent streams per connection (default: 250)
393
        // - 250: Go's internal default, matches standard library behavior (default)
394
        // - 100: Lower memory usage, good for resource-constrained environments
395
        // - 500-1000: High concurrency scenarios, uses more memory per connection
396
        // - Very high values (>1000) are not recommended due to DoS attack risk
397
        HTTP2MaxConcurrentStreams uint32
398

399
        // MCP Configuration (Model Context Protocol)
400
        // MCPEnabled controls whether the MCP server is started (default: true)
401
        // Set to false to disable MCP tools entirely
402
        // Env: NORNICDB_MCP_ENABLED=true|false
403
        MCPEnabled bool
404

405
        // Embedding Configuration (for vector search)
406
        // EmbeddingEnabled turns on automatic embedding generation
407
        EmbeddingEnabled bool
408
        // EmbeddingProvider: "ollama" or "openai" or "local"
409
        EmbeddingProvider string
410
        // EmbeddingAPIURL is the base URL (e.g., http://localhost:11434)
411
        EmbeddingAPIURL string
412
        // EmbeddingModel is the model name (e.g., bge-m3)
413
        EmbeddingModel string
414
        // EmbeddingDimensions is expected vector size (e.g., 1024)
415
        EmbeddingDimensions int
416
        // EmbeddingCacheSize is max embeddings to cache (0 = disabled, default: 10000)
417
        // Each cached embedding uses ~4KB (1024 dims × 4 bytes)
418
        EmbeddingCacheSize int
419
        // EmbeddingAPIKey is the API key for authenticated embedding providers (OpenAI, Cloudflare Workers AI, etc.)
420
        // Env: NORNICDB_EMBEDDING_API_KEY
421
        EmbeddingAPIKey string
422
        // ModelsDir is the directory containing local GGUF models
423
        // Env: NORNICDB_MODELS_DIR (default: ./models)
424
        ModelsDir string
425
        // Embedding llama.cpp context features (passthrough for local GGUF models)
426
        EmbeddingCtxType       int // Env: NORNICDB_EMBEDDING_CTX_TYPE
427
        EmbeddingPoolingType   int // Env: NORNICDB_EMBEDDING_POOLING_TYPE
428
        EmbeddingAttentionType int // Env: NORNICDB_EMBEDDING_ATTENTION_TYPE
429
        EmbeddingFlashAttn     int // Env: NORNICDB_EMBEDDING_FLASH_ATTN
430

431
        // Slow Query Logging Configuration
432
        // SlowQueryEnabled turns on slow query logging (default: true)
433
        SlowQueryEnabled bool
434
        // D-04d: SlowQueryThreshold and SlowQueryLogFile collapsed into
435
        // pkg/config.LoggingConfig (the single source of truth). Threaded into
436
        // the server via the Logging field below; readers go through
437
        // s.config.Logging.SlowQueryThreshold / .SlowQueryLogFile.
438
        //
439
        // Logging carries the runtime LoggingConfig snapshot. Populated by
440
        // cmd/nornicdb/main.go from cfg.Logging at server construction.
441
        Logging nornicConfig.LoggingConfig
442

443
        // Headless Mode Configuration
444
        // Headless disables the web UI and browser-related endpoints
445
        // Set to true for API-only deployments (e.g., embedded use, microservices)
446
        // Env: NORNICDB_HEADLESS=true|false
447
        Headless bool
448

449
        // BasePath for deployment behind a reverse proxy with URL prefix
450
        // Example: "/nornicdb" when deployed at https://example.com/nornicdb/
451
        // Leave empty for root deployment (default)
452
        // Env: NORNICDB_BASE_PATH
453
        BasePath string
454

455
        // Plugins Configuration
456
        // PluginsDir is the directory for APOC/function plugins
457
        // Env: NORNICDB_PLUGINS_DIR
458
        PluginsDir string
459
        // HeimdallPluginsDir is the directory for Heimdall plugins
460
        // Env: NORNICDB_HEIMDALL_PLUGINS_DIR
461
        HeimdallPluginsDir string
462

463
        // Features configuration (passed from main config loading)
464
        // This contains feature flags like HeimdallEnabled loaded from YAML/env
465
        Features *nornicConfig.FeatureFlagsConfig
466

467
        // Debug/Profiling Configuration
468
        // EnablePprof enables /debug/pprof endpoints for performance profiling
469
        // WARNING: Only enable in development/testing environments
470
        // Env: NORNICDB_ENABLE_PPROF=true|false
471
        EnablePprof bool
472

473
        // Logger is the structured-logging entrypoint per Phase 2 D-01.
474
        // If nil, a discard-handler fallback is installed at New() — graceful
475
        // degrade for the transitional period; ctors will be tightened post-M1
476
        // once all consumers are updated to pass an explicit logger via
477
        // observability.Provider.Logger().
478
        Logger *slog.Logger
479
}
480

481
// DefaultConfig returns Neo4j-compatible default server configuration.
482
//
483
// Defaults match Neo4j HTTP server settings:
484
//   - Port 7474 (Neo4j HTTP default)
485
//   - 30s read timeout
486
//   - 60s write timeout
487
//   - 120s idle timeout
488
//   - 10MB max request size
489
//   - CORS enabled for browser compatibility
490
//   - Compression enabled
491
//
492
// Embedding defaults (for MCP vector search):
493
//   - Enabled by default, connects to localhost:11434 (llama.cpp/Ollama)
494
//   - Model: bge-m3 (1024 dimensions)
495
//   - Falls back to text search if embeddings unavailable
496
//
497
// Environment Variables to override embedding config:
498
//
499
//        NORNICDB_EMBEDDING_ENABLED=true|false  - Enable/disable embeddings
500
//        NORNICDB_EMBEDDING_PROVIDER=openai     - API format: "openai" or "ollama"
501
//        NORNICDB_EMBEDDING_URL=http://...      - Embeddings API URL
502
//        NORNICDB_EMBEDDING_MODEL=bge-m3
503
//        NORNICDB_EMBEDDING_DIM=1024            - Vector dimensions
504
//
505
// Example:
506
//
507
//        config := server.DefaultConfig()
508
//        server, err := server.New(db, auth, config)
509
//
510
//        // Or customize
511
//        config = server.DefaultConfig()
512
//        config.Port = 8080
513
//        config.EnableCORS = false
514
//        server, err = server.New(db, auth, config)
515
func DefaultConfig() *Config {
1✔
516
        return &Config{
1✔
517
                // SECURITY: Bind to localhost only by default - prevents external access
1✔
518
                // Set Address to "0.0.0.0" for Docker/container deployments or external access
1✔
519
                Address:        "127.0.0.1",
1✔
520
                Port:           7474,
1✔
521
                ReadTimeout:    30 * time.Second,
1✔
522
                WriteTimeout:   300 * time.Second, // Bifrost agentic loops (many tool calls) can run 1–2 min; avoid closing stream early
1✔
523
                IdleTimeout:    120 * time.Second,
1✔
524
                MaxRequestSize: 10 * 1024 * 1024, // 10MB
1✔
525
                // CORS enabled by default for ease of use (allows all origins)
1✔
526
                // Override via: NORNICDB_CORS_ENABLED=false or NORNICDB_CORS_ORIGINS=https://myapp.com
1✔
527
                // WARNING: "*" allows any origin - configure specific origins for production
1✔
528
                EnableCORS:        true,
1✔
529
                CORSOrigins:       []string{"*"}, // Allow all origins by default
1✔
530
                EnableCompression: true,
1✔
531

1✔
532
                // Rate limiting enabled by default to prevent DoS attacks
1✔
533
                // High limits for high-performance local/development use
1✔
534
                RateLimitEnabled:   false,
1✔
535
                RateLimitPerMinute: 10000,  // 10,000 requests/minute per IP (166/sec)
1✔
536
                RateLimitPerHour:   100000, // 100,000 requests/hour per IP
1✔
537
                RateLimitBurst:     1000,   // Allow large bursts for batch operations
1✔
538

1✔
539
                // MCP server enabled by default
1✔
540
                // Override: NORNICDB_MCP_ENABLED=false
1✔
541
                MCPEnabled: true,
1✔
542

1✔
543
                // Embedding defaults - connects to local llama.cpp/Ollama server
1✔
544
                // Override via environment variables:
1✔
545
                //   NORNICDB_EMBEDDING_ENABLED=false     - Disable embeddings entirely
1✔
546
                //   NORNICDB_EMBEDDING_PROVIDER=ollama   - Use "ollama" or "openai" format
1✔
547
                //   NORNICDB_EMBEDDING_URL=http://...    - Embeddings API URL
1✔
548
                //   NORNICDB_EMBEDDING_MODEL=...         - Model name
1✔
549
                //   NORNICDB_EMBEDDING_DIM=1024          - Vector dimensions
1✔
550
                EmbeddingEnabled:    true,
1✔
551
                EmbeddingProvider:   "ollama", // default URL targets Ollama (port 11434)
1✔
552
                EmbeddingAPIURL:     "http://localhost:11434",
1✔
553
                EmbeddingModel:      "bge-m3",
1✔
554
                EmbeddingDimensions: 1024,
1✔
555
                EmbeddingCacheSize:  10000, // ~40MB cache for 1024-dim vectors
1✔
556

1✔
557
                // Slow query logging enabled by default
1✔
558
                // Override via:
1✔
559
                //   NORNICDB_SLOW_QUERY_ENABLED=false
1✔
560
                //   NORNICDB_SLOW_QUERY_THRESHOLD=200ms
1✔
561
                //   NORNICDB_SLOW_QUERY_LOG=/var/log/nornicdb/slow.log
1✔
562
                // D-04d: Threshold + LogFile defaults now live in
1✔
563
                // pkg/config.DefaultConfig().Logging (the single source of truth);
1✔
564
                // callers populate Logging from cfg.Logging.
1✔
565
                SlowQueryEnabled: false,
1✔
566
                Logging:          nornicConfig.LoggingConfig{SlowQueryThreshold: 100 * time.Millisecond},
1✔
567

1✔
568
                // Headless mode disabled by default (UI enabled)
1✔
569
                // Override via:
1✔
570
                //   NORNICDB_HEADLESS=true
1✔
571
                //   --headless flag
1✔
572
                Headless: false,
1✔
573

1✔
574
                // Pprof disabled by default (security: profiling endpoints expose internals)
1✔
575
                // Override via:
1✔
576
                //   NORNICDB_ENABLE_PPROF=true
1✔
577
                EnablePprof: envutil.GetBoolStrict("NORNICDB_ENABLE_PPROF", false),
1✔
578

1✔
579
                // HTTP/2 always enabled (backwards compatible with HTTP/1.1)
1✔
580
                // MaxConcurrentStreams: 250 matches Go's internal default
1✔
581
                // - Matches standard library http2.Server default (250)
1✔
582
                // - Good balance between performance and memory usage
1✔
583
                // - Can be reduced to 100 for lower-memory environments
1✔
584
                // - Can be increased to 500+ for high-concurrency scenarios
1✔
585
                HTTP2MaxConcurrentStreams: 250,
1✔
586
        }
1✔
587
}
1✔
588

589
// Server is the HTTP API server providing Neo4j-compatible endpoints.
590
//
591
// The server is thread-safe and handles concurrent requests. It maintains
592
// metrics, supports graceful shutdown, and integrates with audit logging.
593
//
594
// Lifecycle:
595
//  1. Create with New()
596
//  2. Optionally set audit logger with SetAuditLogger()
597
//  3. Start with Start()
598
//  4. Handle requests automatically
599
//  5. Stop with Stop() for graceful shutdown
600
//
601
// Example:
602
//
603
//        server := server.New(db, auth, config)
604
//
605
//        // Set up audit logging
606
//        auditLogger, _ := audit.NewLogger(audit.DefaultConfig())
607
//        server.SetAuditLogger(auditLogger)
608
//
609
//        // Start server
610
//        if err := server.Start(); err != nil {
611
//                log.Fatal(err)
612
//        }
613
//
614
//        // Server is now handling requests
615
//        // (Listening on server.Addr())
616
//
617
//        // Get metrics
618
//        stats := server.Stats()
619
//        // stats.RequestCount, stats.ErrorCount expose request/error counts
620
//
621
//        // Graceful shutdown
622
//        ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
623
//        defer cancel()
624
//        server.Stop(ctx)
625
type Server struct {
626
        config    *Config
627
        db        *nornicdb.DB
628
        dbManager *multidb.DatabaseManager // Manages multiple databases
629
        auth      *auth.Authenticator
630
        audit     *audit.Logger
631

632
        // log is the structured logger for operational events (Phase 2 D-01).
633
        // Tagged .With("component", "server") at construction so every record
634
        // carries component attribution. NEVER nil after New() returns
635
        // (discard-fallback handler installed when cfg.Logger == nil).
636
        log *slog.Logger
637

638
        // MCP server for LLM tool interface
639
        mcpServer *mcp.Server
640

641
        // Heimdall - AI assistant for database management
642
        heimdallHandler *heimdall.Handler
643

644
        // GraphQL handler for GraphQL API
645
        graphqlHandler *graphql.Handler
646

647
        // Qdrant-compatible gRPC server (optional; feature-flagged).
648
        qdrantGRPCServer      *qdrantgrpc.Server
649
        qdrantCollectionStore qdrantgrpc.CollectionStore
650

651
        httpServer *http.Server
652
        listener   net.Listener
653
        // requestsCtx is the parent context handed to every inbound HTTP request
654
        // via http.Server.BaseContext. Cancelled at the start of Stop() so any
655
        // long-running handler (Cypher traversals, etc.) can unwind promptly
656
        // instead of holding shutdown open until ReadTimeout/WriteTimeout fires.
657
        requestsCtx       context.Context
658
        cancelRequestsCtx context.CancelFunc
659

660
        // httpMetrics is the Plan-04-02 HTTP catalog bag (D-02 typed handle DI).
661
        // Populated by SetHTTPMetrics(...) AFTER observability.New runs in
662
        // cmd/nornicdb/main.go (Phase 2 D-08 two-phase bootstrap: server is
663
        // constructed BEFORE obs to keep the existing logger plumbing; metrics
664
        // bag is injected post-hoc and applied at Start() time when the http
665
        // handler is wrapped). Nil-safe: instrumentedMux is a pass-through
666
        // when nil, so test fixtures and pre-Phase-4 callers compile unchanged.
667
        httpMetrics *observability.HTTPMetrics
668

669
        // obsRegistry is the unified pkg/observability *prometheus.Registry,
670
        // injected post-construction via SetObsRegistry from cmd/nornicdb/main.go.
671
        // Used by handleMetrics (Phase 5 / Plan 05-04) to call
672
        // observability.RenderLegacy and produce the legacy :7474/metrics body
673
        // from the same registry that backs :9090/metrics — eliminating the
674
        // pre-Phase-5 hand-built second source of truth (ROADMAP SC #1).
675
        // Nil-safe: handleMetrics tolerates nil (RenderLegacy returns empty bytes).
676
        obsRegistry *prometheus.Registry
677

678
        // Rate limiter for DoS protection
679
        rateLimiter *IPRateLimiter
680

681
        // OAuth manager for OAuth 2.0 authentication
682
        oauthManager *auth.OAuthManager
683

684
        // Cache for Basic auth results to avoid bcrypt+JWT work on every request.
685
        // This materially improves throughput for Neo4j-compatible clients that
686
        // send Basic auth on each request.
687
        basicAuthCache *auth.BasicAuthCache
688

689
        mu      sync.RWMutex
690
        closed  atomic.Bool
691
        started time.Time
692

693
        // Metrics
694
        requestCount   atomic.Int64
695
        errorCount     atomic.Int64
696
        activeRequests atomic.Int64
697
        activeTxReqs   atomic.Int64
698

699
        // Slow query logging
700
        slowQueryLogger *log.Logger
701
        slowQueryCount  atomic.Int64
702

703
        // Cached search services per database (namespace-aware indexes)
704
        searchServicesMu sync.RWMutex
705
        searchServices   map[string]*search.Service
706

707
        // Cached Cypher executors per database (thread-safe, reusable)
708
        executorsMu sync.RWMutex
709
        executors   map[string]*cypher.StorageExecutor
710

711
        // Explicit transaction sessions shared across transports.
712
        txSessions *txsession.Manager
713

714
        // Per-database access control (Neo4j-aligned). When auth disabled, Full is used.
715
        // When auth enabled, allowlistStore (if set) provides allowlist-based mode per principal.
716
        databaseAccessMode    auth.DatabaseAccessMode
717
        allowlistStore        *auth.AllowlistStore        // loaded from system DB when auth enabled
718
        roleStore             *auth.RoleStore             // user-defined roles when auth enabled
719
        privilegesStore       *auth.PrivilegesStore       // per-DB read/write (Phase 4) when auth enabled
720
        roleEntitlementsStore *auth.RoleEntitlementsStore // per-role global entitlements when auth enabled
721
        dbConfigStore         *dbconfig.Store             // per-DB config overrides (embedding, search, etc.)
722
        // perDBYAMLOverrides carries the `databases:` map from nornicdb.yaml,
723
        // passed through by the binary at construction time. Seeded into
724
        // dbConfigStore on first boot via LoadWithYAMLDefaults; subsequent
725
        // admin API edits are authoritative across restarts.
726
        perDBYAMLOverrides map[string]map[string]string
727
}
728

729
// ensureSearchBuildStartedForKnownDatabases reconciles search-service startup for
730
// databases known to DatabaseManager, including metadata-only empty databases.
731
// It is safe to call repeatedly; per-db build start is idempotent.
732
func (s *Server) ensureSearchBuildStartedForKnownDatabases() {
1✔
733
        if s == nil || s.db == nil || s.dbManager == nil {
2✔
734
                return
1✔
735
        }
1✔
736
        for _, info := range s.dbManager.ListDatabases() {
2✔
737
                if info == nil || info.Name == "" || info.Name == "system" {
2✔
738
                        continue
1✔
739
                }
740
                if s.dbManager.IsCompositeDatabase(info.Name) {
2✔
741
                        continue
1✔
742
                }
743
                status := s.db.GetDatabaseSearchStatus(info.Name)
1✔
744
                if status.Initialized {
2✔
745
                        continue
1✔
746
                }
747
                storageEngine, err := s.dbManager.GetStorage(info.Name)
1✔
748
                if err != nil {
1✔
749
                        s.log.Warn("startup search reconcile: storage unavailable", "subsystem", "search", "db", info.Name, "error", err)
×
750
                        continue
×
751
                }
752
                if _, err := s.db.EnsureSearchIndexesBuildStarted(info.Name, storageEngine); err != nil {
1✔
753
                        s.log.Warn("startup search reconcile failed", "subsystem", "search", "db", info.Name, "error", err)
×
754
                }
×
755
        }
756
}
757

758
// mcpToolRunnerAdapter adapts pkg/mcp.Server to heimdall.InMemoryToolRunner so the agentic loop
759
// can expose store, recall, discover, link, task, tasks to the LLM and execute them in process.
760
// allowlist: nil = all tools, empty = no tools, non-empty = only those names.
761
type mcpToolRunnerAdapter struct {
762
        s         *mcp.Server
763
        allowlist []string
764
}
765

766
func (a *mcpToolRunnerAdapter) allowedNames() []string {
1✔
767
        if a.allowlist == nil {
2✔
768
                return mcp.AllTools()
1✔
769
        }
1✔
770
        return a.allowlist
1✔
771
}
772

773
func (a *mcpToolRunnerAdapter) allow(name string) bool {
1✔
774
        allowed := a.allowedNames()
1✔
775
        for _, n := range allowed {
2✔
776
                if n == name {
2✔
777
                        return true
1✔
778
                }
1✔
779
        }
780
        return false
1✔
781
}
782

783
func (a *mcpToolRunnerAdapter) ToolDefinitions() []heimdall.MCPTool {
1✔
784
        defs := a.s.ToolDefinitions()
1✔
785
        allowed := a.allowedNames()
1✔
786
        if len(allowed) == 0 {
2✔
787
                return nil
1✔
788
        }
1✔
789
        allowSet := make(map[string]struct{}, len(allowed))
1✔
790
        for _, n := range allowed {
2✔
791
                allowSet[n] = struct{}{}
1✔
792
        }
1✔
793
        var out []heimdall.MCPTool
1✔
794
        for _, t := range defs {
2✔
795
                if _, ok := allowSet[t.Name]; ok {
2✔
796
                        out = append(out, heimdall.MCPTool{Name: t.Name, Description: t.Description, InputSchema: t.InputSchema})
1✔
797
                }
1✔
798
        }
799
        return out
1✔
800
}
801

802
func (a *mcpToolRunnerAdapter) ToolNames() []string {
1✔
803
        return a.allowedNames()
1✔
804
}
1✔
805

806
func (a *mcpToolRunnerAdapter) CallTool(ctx context.Context, name string, args map[string]interface{}, dbName string) (interface{}, error) {
1✔
807
        if !a.allow(name) {
2✔
808
                return nil, fmt.Errorf("tool %q is not in the MCP allowlist", name)
1✔
809
        }
1✔
810
        // Ensure we always pass a concrete database so MCP uses DatabaseScopedExecutor when set.
811
        // Empty dbName would cause MCP to fall back to s.db, which can diverge from the default DB in multi-db setups.
812
        if dbName == "" {
2✔
813
                dbName = a.s.DefaultDatabaseName()
1✔
814
        }
1✔
815
        ctx = mcp.ContextWithDatabase(ctx, dbName)
1✔
816
        return a.s.CallTool(ctx, name, args)
1✔
817
}
818

819
// mcpDatabaseScopedExecutor returns a callback that provides a Cypher executor and node getter for the given database.
820
// Used so MCP tools (store, recall, link, etc.) run against the request's database when invoked from the agentic loop.
821
func (s *Server) mcpDatabaseScopedExecutor() func(dbName string) (*cypher.StorageExecutor, func(context.Context, string) (*nornicdb.Node, error), error) {
1✔
822
        return func(dbName string) (*cypher.StorageExecutor, func(context.Context, string) (*nornicdb.Node, error), error) {
2✔
823
                exec, err := s.getExecutorForDatabase(dbName)
1✔
824
                if err != nil {
2✔
825
                        return nil, nil, err
1✔
826
                }
1✔
827
                getNode := func(ctx context.Context, id string) (*nornicdb.Node, error) {
2✔
828
                        result, err := exec.Execute(ctx, "MATCH (n) WHERE elementId(n) = $id RETURN n", map[string]interface{}{"id": id})
1✔
829
                        if err != nil {
1✔
830
                                return nil, err
×
831
                        }
×
832
                        if len(result.Rows) == 0 || len(result.Rows[0]) == 0 {
2✔
833
                                return nil, nornicdb.ErrNotFound
1✔
834
                        }
1✔
835
                        v := result.Rows[0][0]
1✔
836
                        if snode, ok := v.(*storage.Node); ok {
2✔
837
                                props := make(map[string]interface{}, len(snode.Properties))
1✔
838
                                for k, val := range snode.Properties {
2✔
839
                                        props[k] = val
1✔
840
                                }
1✔
841
                                return &nornicdb.Node{
1✔
842
                                        ID:         string(snode.ID),
1✔
843
                                        Labels:     snode.Labels,
1✔
844
                                        Properties: props,
1✔
845
                                        CreatedAt:  snode.CreatedAt,
1✔
846
                                }, nil
1✔
847
                        }
848
                        return nil, nornicdb.ErrNotFound
×
849
                }
850
                return exec, getNode, nil
1✔
851
        }
852
}
853

854
// IPRateLimiter provides IP-based rate limiting to prevent DoS attacks.
855
type IPRateLimiter struct {
856
        mu              sync.RWMutex
857
        counters        map[string]*ipRateLimitCounter
858
        perMinute       int
859
        perHour         int
860
        burst           int
861
        cleanupInterval time.Duration
862
        stopCleanup     chan struct{}
863
}
864

865
type ipRateLimitCounter struct {
866
        mu          sync.Mutex
867
        minuteCount int
868
        hourCount   int
869
        minuteReset time.Time
870
        hourReset   time.Time
871
}
872

873
// NewIPRateLimiter creates a new IP-based rate limiter.
874
func NewIPRateLimiter(perMinute, perHour, burst int) *IPRateLimiter {
1✔
875
        rl := &IPRateLimiter{
1✔
876
                counters:        make(map[string]*ipRateLimitCounter),
1✔
877
                perMinute:       perMinute,
1✔
878
                perHour:         perHour,
1✔
879
                burst:           burst,
1✔
880
                cleanupInterval: 10 * time.Minute,
1✔
881
                stopCleanup:     make(chan struct{}),
1✔
882
        }
1✔
883
        // Start background cleanup of stale entries
1✔
884
        go rl.cleanupLoop()
1✔
885
        return rl
1✔
886
}
1✔
887

888
// Allow checks if a request from the given IP is allowed.
889
func (rl *IPRateLimiter) Allow(ip string) bool {
1✔
890
        rl.mu.Lock()
1✔
891
        counter, exists := rl.counters[ip]
1✔
892
        if !exists {
2✔
893
                counter = &ipRateLimitCounter{
1✔
894
                        minuteReset: time.Now().Add(time.Minute),
1✔
895
                        hourReset:   time.Now().Add(time.Hour),
1✔
896
                }
1✔
897
                rl.counters[ip] = counter
1✔
898
        }
1✔
899
        rl.mu.Unlock()
1✔
900

1✔
901
        counter.mu.Lock()
1✔
902
        defer counter.mu.Unlock()
1✔
903

1✔
904
        now := time.Now()
1✔
905

1✔
906
        // Reset minute counter if needed
1✔
907
        if now.After(counter.minuteReset) {
2✔
908
                counter.minuteCount = 0
1✔
909
                counter.minuteReset = now.Add(time.Minute)
1✔
910
        }
1✔
911

912
        // Reset hour counter if needed
913
        if now.After(counter.hourReset) {
2✔
914
                counter.hourCount = 0
1✔
915
                counter.hourReset = now.Add(time.Hour)
1✔
916
        }
1✔
917

918
        // Check limits
919
        if counter.minuteCount >= rl.perMinute {
2✔
920
                return false
1✔
921
        }
1✔
922
        if counter.hourCount >= rl.perHour {
1✔
923
                return false
×
924
        }
×
925

926
        // Increment counters
927
        counter.minuteCount++
1✔
928
        counter.hourCount++
1✔
929

1✔
930
        return true
1✔
931
}
932

933
// cleanupLoop periodically removes stale IP entries to prevent memory leaks.
934
func (rl *IPRateLimiter) cleanupLoop() {
1✔
935
        ticker := time.NewTicker(rl.cleanupInterval)
1✔
936
        defer ticker.Stop()
1✔
937

1✔
938
        for {
2✔
939
                select {
1✔
940
                case <-ticker.C:
1✔
941
                        rl.cleanup()
1✔
942
                case <-rl.stopCleanup:
1✔
943
                        return
1✔
944
                }
945
        }
946
}
947

948
func (rl *IPRateLimiter) cleanup() {
1✔
949
        rl.mu.Lock()
1✔
950
        defer rl.mu.Unlock()
1✔
951

1✔
952
        now := time.Now()
1✔
953
        for ip, counter := range rl.counters {
2✔
954
                counter.mu.Lock()
1✔
955
                // Remove if both counters have been reset (inactive for >1 hour)
1✔
956
                if now.After(counter.hourReset) {
2✔
957
                        delete(rl.counters, ip)
1✔
958
                }
1✔
959
                counter.mu.Unlock()
1✔
960
        }
961
}
962

963
// Stop stops the cleanup goroutine.
964
func (rl *IPRateLimiter) Stop() {
1✔
965
        close(rl.stopCleanup)
1✔
966
}
1✔
967

968
// New creates a new HTTP server with the given database, authenticator, and configuration.
969
//
970
// The server is created but not started. Call Start() to begin accepting connections.
971
//
972
// Parameters:
973
//   - db: NornicDB database instance (required)
974
//   - authenticator: Authentication handler (can be nil to disable auth)
975
//   - config: Server configuration (uses DefaultConfig() if nil)
976
//
977
// Returns:
978
//   - Server instance ready to start
979
//   - Error if database is nil or configuration is invalid
980
//
981
// Example:
982
//
983
//        // With authentication
984
//        db, _ := nornicdb.Open("./data", nil)
985
//        auth, _ := auth.NewAuthenticator(auth.DefaultAuthConfig())
986
//        server, err := server.New(db, auth, nil) // Uses default config
987
//
988
//        // Without authentication (development)
989
//        server, err = server.New(db, nil, nil)
990
//
991
//        // Custom configuration
992
//        config := &server.Config{
993
//                Port: 8080,
994
//                EnableCORS: false,
995
//        }
996
//        server, err = server.New(db, auth, config)
997
func New(db *nornicdb.DB, authenticator *auth.Authenticator, config *Config) (*Server, error) {
1✔
998
        if config == nil {
2✔
999
                config = DefaultConfig()
1✔
1000
        }
1✔
1001
        if db == nil {
2✔
1002
                return nil, fmt.Errorf("database required")
1✔
1003
        }
1✔
1004
        // Phase 2 D-01a: graceful-degrade discard fallback when caller did not
1005
        // thread observability.Provider.Logger() through Config.Logger. Keeps
1006
        // existing tests/callers compileable; tightens post-M1 once all
1007
        // consumers wire the logger explicitly.
1008
        if config.Logger == nil {
2✔
1009
                config.Logger = slog.New(slog.NewTextHandler(io.Discard, nil))
1✔
1010
        }
1✔
1011

1012
        // Note: GPU status is logged in main.go during GPU manager initialization
1013
        // This avoids duplicate logs and provides more detailed information
1014

1015
        // Load environment-backed global config once (used for multi-db + feature defaults).
1016
        globalConfig := nornicConfig.LoadFromEnv()
1✔
1017

1✔
1018
        // Create MCP server for LLM tool interface (if enabled)
1✔
1019
        var mcpServer *mcp.Server
1✔
1020
        if config.MCPEnabled {
2✔
1021
                mcpConfig := mcp.DefaultServerConfig()
1✔
1022
                mcpConfig.EmbeddingEnabled = config.EmbeddingEnabled
1✔
1023
                mcpConfig.EmbeddingModel = config.EmbeddingModel
1✔
1024
                mcpConfig.EmbeddingDimensions = config.EmbeddingDimensions
1✔
1025
                mcpConfig.DefaultNodeLabel = globalConfig.Memory.DefaultNodeLabel
1✔
1026
                mcpServer = mcp.NewServer(db, mcpConfig)
1✔
1027
        } else {
2✔
1028
                config.Logger.With("component", "server").Info("mcp server disabled via configuration")
1✔
1029
        }
1✔
1030

1031
        // Initialize DatabaseManager for multi-database support.
1032
        // IMPORTANT: This must happen before Heimdall/GraphQL so they can route per database.
1033
        //
1034
        // Get the base storage engine from the DB (unwraps the namespaced storage).
1035
        // DatabaseManager will create NamespacedEngines for each logical database.
1036
        storageEngine := db.GetBaseStorageForManager()
1✔
1037
        remoteCredentialEncryptionKey := ""
1✔
1038
        switch {
1✔
1039
        case strings.TrimSpace(os.Getenv("NORNICDB_REMOTE_CREDENTIALS_KEY")) != "":
1✔
1040
                remoteCredentialEncryptionKey = strings.TrimSpace(os.Getenv("NORNICDB_REMOTE_CREDENTIALS_KEY"))
1✔
1041
        case strings.TrimSpace(globalConfig.Database.EncryptionPassword) != "":
×
1042
                remoteCredentialEncryptionKey = strings.TrimSpace(globalConfig.Database.EncryptionPassword)
×
1043
                config.Logger.With("component", "server").Warn("remote credential encryption key fallback in use",
×
1044
                        "fallback", "database_encryption_password",
×
1045
                        "remediation", "set NORNICDB_REMOTE_CREDENTIALS_KEY for key separation")
×
1046
        case strings.TrimSpace(globalConfig.Auth.JWTSecret) != "":
1✔
1047
                remoteCredentialEncryptionKey = strings.TrimSpace(globalConfig.Auth.JWTSecret)
1✔
1048
                config.Logger.With("component", "server").Warn("remote credential encryption key fallback in use",
1✔
1049
                        "fallback", "jwt_signing_secret",
1✔
1050
                        "remediation", "set NORNICDB_REMOTE_CREDENTIALS_KEY for stronger key separation")
1✔
1051
        }
1052
        multiDBConfig := &multidb.Config{
1✔
1053
                DefaultDatabase:               globalConfig.Database.DefaultDatabase,
1✔
1054
                SystemDatabase:                "system",
1✔
1055
                MaxDatabases:                  0, // Unlimited
1✔
1056
                AllowDropDefault:              false,
1✔
1057
                RemoteCredentialEncryptionKey: remoteCredentialEncryptionKey,
1✔
1058
                RemoteEngineFactory: func(ref multidb.ConstituentRef, authToken string) (storage.Engine, error) {
2✔
1059
                        useUserPassword := strings.EqualFold(strings.TrimSpace(ref.AuthMode), "user_password")
1✔
1060
                        cfg := storage.RemoteEngineConfig{
1✔
1061
                                URI:       ref.URI,
1✔
1062
                                Database:  ref.DatabaseName,
1✔
1063
                                AuthToken: authToken,
1✔
1064
                        }
1✔
1065
                        if useUserPassword {
2✔
1066
                                cfg.User = ref.User
1✔
1067
                                cfg.Password = ref.Password
1✔
1068
                        }
1✔
1069
                        return storage.NewRemoteEngine(cfg)
1✔
1070
                },
1071
        }
1072
        dbManager, err := multidb.NewDatabaseManager(storageEngine, multiDBConfig)
1✔
1073
        if err != nil {
1✔
1074
                return nil, fmt.Errorf("failed to initialize database manager: %w", err)
×
1075
        }
×
1076

1077
        s := &Server{
1✔
1078
                config:             config,
1✔
1079
                db:                 db,
1✔
1080
                dbManager:          dbManager,
1✔
1081
                auth:               authenticator,
1✔
1082
                log:                config.Logger.With("component", "server"),
1✔
1083
                mcpServer:          mcpServer,
1✔
1084
                graphqlHandler:     graphql.NewHandler(db, dbManager),
1✔
1085
                basicAuthCache:     auth.NewBasicAuthCache(auth.DefaultAuthCacheEntries, auth.DefaultAuthCacheTTL),
1✔
1086
                searchServices:     make(map[string]*search.Service),
1✔
1087
                executors:          make(map[string]*cypher.StorageExecutor),
1✔
1088
                perDBYAMLOverrides: config.PerDBYAMLOverrides,
1✔
1089
        }
1✔
1090
        // Foreground-first policy: while tx requests are active, background embed work yields.
1✔
1091
        s.db.SetEmbedQueueShouldYield(func() bool {
1✔
1092
                return s.activeTxReqs.Load() > 0
×
1093
        })
×
1094
        s.txSessions = txsession.NewManager(30*time.Second, s.newExecutorForDatabase)
1✔
1095
        s.txSessions.SetTerminalErrorObserver(func(session *txsession.Session, err error) {
2✔
1096
                s.logMVCCSnapshotExpiration(session, err)
1✔
1097
        })
1✔
1098

1099
        // ==========================================================================
1100
        // Heimdall - AI Assistant for Database Management
1101
        // ==========================================================================
1102
        // Use features config passed from main.go (which loads from YAML + env)
1103
        // Fall back to LoadFromEnv() if not provided (for backwards compatibility)
1104
        var featuresConfig *nornicConfig.FeatureFlagsConfig
1✔
1105
        if config.Features != nil {
2✔
1106
                featuresConfig = config.Features
1✔
1107
        } else {
2✔
1108
                featuresConfig = &globalConfig.Features
1✔
1109
                config.Features = featuresConfig
1✔
1110
        }
1✔
1111
        var rerankerResolverMu sync.RWMutex
1✔
1112
        var globalRerankerResolver func(string) search.Reranker
1✔
1113
        perDBRerankerCache := make(map[string]search.Reranker)
1✔
1114
        setGlobalRerankerResolver := func(fn func(string) search.Reranker) {
2✔
1115
                rerankerResolverMu.Lock()
1✔
1116
                globalRerankerResolver = fn
1✔
1117
                rerankerResolverMu.Unlock()
1✔
1118
        }
1✔
1119
        getGlobalRerankerResolver := func() func(string) search.Reranker {
1✔
1120
                rerankerResolverMu.RLock()
×
1121
                defer rerankerResolverMu.RUnlock()
×
1122
                return globalRerankerResolver
×
1123
        }
×
1124
        resolveDBRerankConfig := func(dbName string) (enabled bool, provider, model, apiURL, apiKey string) {
2✔
1125
                enabled = featuresConfig.SearchRerankEnabled
1✔
1126
                provider = strings.TrimSpace(strings.ToLower(featuresConfig.SearchRerankProvider))
1✔
1127
                model = featuresConfig.SearchRerankModel
1✔
1128
                apiURL = strings.TrimSpace(featuresConfig.SearchRerankAPIURL)
1✔
1129
                apiKey = featuresConfig.SearchRerankAPIKey
1✔
1130
                if provider == "" {
2✔
1131
                        provider = "local"
1✔
1132
                }
1✔
1133
                if provider == "ollama" && apiURL == "" {
1✔
UNCOV
1134
                        apiURL = "http://localhost:11434/rerank"
×
UNCOV
1135
                }
×
1136
                if s.dbConfigStore == nil {
2✔
1137
                        return enabled, provider, model, apiURL, apiKey
1✔
1138
                }
1✔
1139
                overrides := s.dbConfigStore.GetOverrides(dbName)
1✔
1140
                resolved := dbconfig.Resolve(globalConfig, overrides)
1✔
1141
                if resolved == nil || resolved.Effective == nil {
1✔
1142
                        return enabled, provider, model, apiURL, apiKey
×
1143
                }
×
1144
                eff := resolved.Effective
1✔
1145
                if raw := strings.TrimSpace(strings.ToLower(eff["NORNICDB_SEARCH_RERANK_ENABLED"])); raw != "" {
2✔
1146
                        switch raw {
1✔
1147
                        case "1", "true", "yes", "on":
×
1148
                                enabled = true
×
1149
                        case "0", "false", "no", "off":
1✔
1150
                                enabled = false
1✔
1151
                        }
1152
                }
1153
                if v := strings.TrimSpace(strings.ToLower(eff["NORNICDB_SEARCH_RERANK_PROVIDER"])); v != "" {
2✔
1154
                        provider = v
1✔
1155
                }
1✔
1156
                if v := eff["NORNICDB_SEARCH_RERANK_MODEL"]; strings.TrimSpace(v) != "" {
2✔
1157
                        model = strings.TrimSpace(v)
1✔
1158
                }
1✔
1159
                if v := eff["NORNICDB_SEARCH_RERANK_API_URL"]; strings.TrimSpace(v) != "" {
1✔
1160
                        apiURL = strings.TrimSpace(v)
×
1161
                }
×
1162
                if v := eff["NORNICDB_SEARCH_RERANK_API_KEY"]; strings.TrimSpace(v) != "" {
1✔
1163
                        apiKey = v
×
1164
                }
×
1165
                if provider == "ollama" && apiURL == "" {
1✔
1166
                        apiURL = "http://localhost:11434/rerank"
×
1167
                }
×
1168
                return enabled, provider, model, apiURL, apiKey
1✔
1169
        }
1170
        getOrCreateExternalReranker := func(provider, model, apiURL, apiKey string) search.Reranker {
1✔
UNCOV
1171
                key := strings.Join([]string{provider, model, apiURL, apiKey}, "|")
×
UNCOV
1172
                rerankerResolverMu.RLock()
×
UNCOV
1173
                if cached, ok := perDBRerankerCache[key]; ok {
×
1174
                        rerankerResolverMu.RUnlock()
×
1175
                        return cached
×
1176
                }
×
UNCOV
1177
                rerankerResolverMu.RUnlock()
×
UNCOV
1178
                if apiURL == "" {
×
1179
                        return nil
×
1180
                }
×
UNCOV
1181
                ceConfig := &search.CrossEncoderConfig{
×
UNCOV
1182
                        Enabled:  true,
×
UNCOV
1183
                        APIURL:   apiURL,
×
UNCOV
1184
                        APIKey:   apiKey,
×
UNCOV
1185
                        Model:    model,
×
UNCOV
1186
                        TopK:     100,
×
UNCOV
1187
                        Timeout:  30 * time.Second,
×
UNCOV
1188
                        MinScore: 0.0,
×
UNCOV
1189
                }
×
UNCOV
1190
                if ceConfig.Model == "" && provider == "ollama" {
×
UNCOV
1191
                        ceConfig.Model = "reranker"
×
UNCOV
1192
                }
×
UNCOV
1193
                ce := search.NewCrossEncoder(ceConfig)
×
UNCOV
1194
                rerankerResolverMu.Lock()
×
UNCOV
1195
                perDBRerankerCache[key] = ce
×
UNCOV
1196
                rerankerResolverMu.Unlock()
×
UNCOV
1197
                return ce
×
1198
        }
1199
        // Install per-DB reranker resolver. It respects DB overrides and falls back to global resolver.
1200
        db.SetRerankerResolver(func(dbName string) search.Reranker {
2✔
1201
                enabled, provider, model, apiURL, apiKey := resolveDBRerankConfig(dbName)
1✔
1202
                if !enabled {
2✔
1203
                        return nil
1✔
1204
                }
1✔
UNCOV
1205
                if provider == "local" {
×
1206
                        if resolver := getGlobalRerankerResolver(); resolver != nil {
×
1207
                                return resolver(dbName)
×
1208
                        }
×
1209
                        return nil
×
1210
                }
UNCOV
1211
                if r := getOrCreateExternalReranker(provider, model, apiURL, apiKey); r != nil {
×
UNCOV
1212
                        return r
×
UNCOV
1213
                }
×
1214
                if resolver := getGlobalRerankerResolver(); resolver != nil {
×
1215
                        return resolver(dbName)
×
1216
                }
×
1217
                return nil
×
1218
        })
1219
        if featuresConfig.HeimdallEnabled {
2✔
1220
                // Derive a child logger once at goroutine entry so every record
1✔
1221
                // carries subsystem=heimdall (Phase 2 D-10a).
1✔
1222
                heimdallLog := s.log.With("subsystem", "heimdall")
1✔
1223
                heimdallLog.Info("heimdall AI assistant initializing asynchronously")
1✔
1224
                go func() {
2✔
1225
                        // Configure token budget from environment variables
1✔
1226
                        heimdall.SetTokenBudget(featuresConfig)
1✔
1227
                        heimdallCfg := heimdall.ConfigFromFeatureFlags(featuresConfig)
1✔
1228
                        // Log resolved provider so users can verify env overrides (openai/ollama/local)
1✔
1229
                        provider := strings.TrimSpace(strings.ToLower(heimdallCfg.Provider))
1✔
1230
                        if provider == "" {
1✔
1231
                                provider = "local"
×
1232
                        }
×
1233
                        heimdallLog.Info("heimdall provider resolved",
1✔
1234
                                "provider", provider,
1✔
1235
                                "override_env", "NORNICDB_HEIMDALL_PROVIDER")
1✔
1236
                        manager, err := heimdall.NewManager(heimdallCfg)
1✔
1237
                        if err != nil {
1✔
1238
                                heimdallLog.Warn("heimdall initialization failed",
×
1239
                                        "error", err,
×
1240
                                        "remediation", "check NORNICDB_HEIMDALL_MODEL and NORNICDB_MODELS_DIR")
×
1241
                                return
×
1242
                        }
×
1243
                        if baseExec := db.GetCypherExecutor(); baseExec != nil {
2✔
1244
                                baseExec.SetInferenceManager(manager)
1✔
1245
                        }
1✔
1246

1247
                        // Create database router wrapper for Heimdall (multi-db aware)
1248
                        dbRouter := newHeimdallDBRouter(db, dbManager, featuresConfig)
1✔
1249
                        metricsReader := &heimdallMetricsReader{}
1✔
1250
                        handler := heimdall.NewHandler(manager, heimdallCfg, dbRouter, metricsReader)
1✔
1251
                        // Expose MCP tools to the agentic loop only when enabled (default off to avoid context bloat)
1✔
1252
                        if mcpServer != nil && featuresConfig.HeimdallMCPEnable {
2✔
1253
                                handler.SetInMemoryToolRunner(&mcpToolRunnerAdapter{
1✔
1254
                                        s:         mcpServer,
1✔
1255
                                        allowlist: featuresConfig.HeimdallMCPTools,
1✔
1256
                                })
1✔
1257
                        }
1✔
1258

1259
                        // Initialize Heimdall plugin subsystem
1260
                        subsystemMgr := heimdall.GetSubsystemManager()
1✔
1261

1✔
1262
                        // Create the Heimdall invoker so plugins can call the LLM
1✔
1263
                        heimdallInvoker := heimdall.NewLiveHeimdallInvoker(
1✔
1264
                                subsystemMgr,
1✔
1265
                                manager, // Manager implements Generator interface
1✔
1266
                                handler.Bifrost(),
1✔
1267
                                dbRouter,
1✔
1268
                                metricsReader,
1✔
1269
                        )
1✔
1270

1✔
1271
                        subsystemCtx := heimdall.SubsystemContext{
1✔
1272
                                Config:   heimdallCfg,
1✔
1273
                                Bifrost:  handler.Bifrost(),
1✔
1274
                                Database: dbRouter,
1✔
1275
                                Metrics:  metricsReader,
1✔
1276
                                Heimdall: heimdallInvoker, // Now plugins can call p.ctx.Heimdall.SendPrompt()
1✔
1277
                        }
1✔
1278
                        subsystemMgr.SetContext(subsystemCtx)
1✔
1279

1✔
1280
                        // Load plugins from configured directories.
1✔
1281
                        if config.PluginsDir != "" {
2✔
1282
                                heimdallLog.Debug("loading APOC plugins", "dir", config.PluginsDir)
1✔
1283
                                if err := nornicdb.LoadPluginsFromDir(config.PluginsDir, &subsystemCtx); err != nil {
1✔
1284
                                        heimdallLog.Warn("failed to load APOC plugins", "dir", config.PluginsDir, "error", err)
×
1285
                                }
×
1286
                        }
1287
                        if config.HeimdallPluginsDir != "" && config.HeimdallPluginsDir != config.PluginsDir {
2✔
1288
                                heimdallLog.Debug("loading Heimdall plugins", "dir", config.HeimdallPluginsDir)
1✔
1289
                                if err := nornicdb.LoadPluginsFromDir(config.HeimdallPluginsDir, &subsystemCtx); err != nil {
1✔
1290
                                        heimdallLog.Warn("failed to load Heimdall plugins", "dir", config.HeimdallPluginsDir, "error", err)
×
1291
                                }
×
1292
                        } else if config.HeimdallPluginsDir == "" {
2✔
1293
                                heimdallLog.Debug("heimdall plugins dir is empty")
1✔
1294
                        } else {
1✔
1295
                                heimdallLog.Debug("heimdall plugins dir same as plugins dir; skipping",
×
1296
                                        "heimdall_dir", config.HeimdallPluginsDir,
×
1297
                                        "plugins_dir", config.PluginsDir)
×
1298
                        }
×
1299

1300
                        s.setHeimdallHandler(handler)
1✔
1301

1✔
1302
                        plugins := heimdall.ListHeimdallPlugins()
1✔
1303
                        actions := heimdall.ListHeimdallActions()
1✔
1304
                        heimdallLog.Info("heimdall AI assistant ready",
1✔
1305
                                "model", heimdallCfg.Model,
1✔
1306
                                "plugins_loaded", len(plugins),
1✔
1307
                                "actions_available", len(actions),
1✔
1308
                                "bifrost_chat_route", "/api/bifrost/chat/completions",
1✔
1309
                                "status_route", "/api/bifrost/status",
1✔
1310
                        )
1✔
1311
                        if len(plugins) == 0 {
2✔
1312
                                heimdallLog.Warn("no heimdall plugins loaded — watcher logs will be absent",
1✔
1313
                                        "remediation", "ensure a .so exists in HeimdallPluginsDir")
1✔
1314
                        }
1✔
1315
                        for _, actionName := range actions {
1✔
1316
                                heimdallLog.Debug("heimdall action registered", "action", actionName)
×
1317
                        }
×
1318
                }()
1319
        } else {
1✔
1320
                s.log.Info("heimdall AI assistant disabled",
1✔
1321
                        "subsystem", "heimdall",
1✔
1322
                        "override_env", "NORNICDB_HEIMDALL_ENABLED")
1✔
1323
        }
1✔
1324

1325
        // Independent search rerank (Stage-2 reranking, not tied to Heimdall).
1326
        // Supports local (GGUF, like embeddings) or external provider (ollama/openai/http),
1327
        // similar to Heimdall and embeddings.
1328
        if featuresConfig.SearchRerankEnabled {
2✔
1329
                provider := strings.TrimSpace(strings.ToLower(featuresConfig.SearchRerankProvider))
1✔
1330
                if provider == "" {
1✔
1331
                        provider = "local"
×
1332
                }
×
1333

1334
                if provider == "local" {
2✔
1335
                        // Load GGUF into memory (same pattern as embedding model).
1✔
1336
                        modelsDir := config.ModelsDir
1✔
1337
                        if modelsDir == "" {
1✔
1338
                                modelsDir = "./models"
×
1339
                        }
×
1340
                        modelName := featuresConfig.SearchRerankModel
1✔
1341
                        if modelName == "" {
1✔
1342
                                modelName = "bge-reranker-v2-m3-Q4_K_M.gguf"
×
1343
                        }
×
1344
                        if !strings.HasSuffix(modelName, ".gguf") {
2✔
1345
                                modelName = modelName + ".gguf"
1✔
1346
                        }
1✔
1347
                        modelPath := filepath.Join(modelsDir, modelName)
1✔
1348

1✔
1349
                        rerankLog := s.log.With("subsystem", "search_rerank")
1✔
1350
                        rerankLog.Info("loading search reranker model",
1✔
1351
                                "provider", "local",
1✔
1352
                                "model_path", modelPath,
1✔
1353
                                "note", "server starts immediately; reranking available after model loads")
1✔
1354

1✔
1355
                        go func() {
2✔
1356
                                opts := localllm.DefaultOptions(modelPath)
1✔
1357
                                opts.GPULayers = -1
1✔
1358
                                // Apply rerank context features from config
1✔
1359
                                if config.Features != nil {
2✔
1360
                                        if config.Features.RerankCtxType != 0 {
1✔
1361
                                                opts.Features.CtxType = config.Features.RerankCtxType
×
1362
                                        }
×
1363
                                        if config.Features.RerankPoolingType != 0 {
1✔
1364
                                                opts.Features.PoolingType = config.Features.RerankPoolingType
×
1365
                                        }
×
1366
                                        if config.Features.RerankAttentionType != 0 {
1✔
1367
                                                opts.Features.AttentionType = config.Features.RerankAttentionType
×
1368
                                        }
×
1369
                                        if config.Features.RerankFlashAttn != 0 {
1✔
1370
                                                opts.Features.FlashAttn = config.Features.RerankFlashAttn
×
1371
                                        }
×
1372
                                }
1373
                                rerankerModel, err := localllm.LoadRerankerModel(opts)
1✔
1374
                                if err != nil {
2✔
1375
                                        rerankLog.Warn("search reranker model unavailable; stage-2 reranking disabled, RRF order only",
1✔
1376
                                                "error", err)
1✔
1377
                                        return
1✔
1378
                                }
1✔
1379

1380
                                ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
×
1381
                                _, healthErr := rerankerModel.Score(ctx, "health", "check")
×
1382
                                cancel()
×
1383
                                if healthErr != nil {
×
1384
                                        rerankerModel.Close()
×
1385
                                        rerankLog.Warn("search reranker failed health check", "error", healthErr)
×
1386
                                        return
×
1387
                                }
×
1388

1389
                                cfg := search.DefaultLocalRerankerConfig()
×
1390
                                cfg.Enabled = true
×
1391
                                r := search.NewLocalReranker(rerankerModel, cfg)
×
1392
                                db.SetSearchReranker(r)
×
1393
                                setGlobalRerankerResolver(func(string) search.Reranker { return r })
×
1394
                                rerankLog.Info("search reranker ready (stage-2 reranking enabled)",
×
1395
                                        "model", modelName)
×
1396
                        }()
1397
                } else {
1✔
1398
                        // External provider: use HTTP rerank API (Cohere, HuggingFace TEI, Ollama adapter, etc.).
1✔
1399
                        apiURL := strings.TrimSpace(featuresConfig.SearchRerankAPIURL)
1✔
1400
                        if apiURL == "" {
2✔
1401
                                if provider == "ollama" {
2✔
1402
                                        apiURL = "http://localhost:11434/rerank"
1✔
1403
                                }
1✔
1404
                        }
1405
                        if apiURL == "" {
2✔
1406
                                s.log.Warn("search rerank enabled but API URL not set; stage-2 reranking disabled",
1✔
1407
                                        "subsystem", "search_rerank",
1✔
1408
                                        "provider", provider,
1✔
1409
                                        "required_env", "NORNICDB_SEARCH_RERANK_API_URL")
1✔
1410
                        } else {
2✔
1411
                                ceConfig := &search.CrossEncoderConfig{
1✔
1412
                                        Enabled:  true,
1✔
1413
                                        APIURL:   apiURL,
1✔
1414
                                        APIKey:   featuresConfig.SearchRerankAPIKey,
1✔
1415
                                        Model:    featuresConfig.SearchRerankModel,
1✔
1416
                                        TopK:     100,
1✔
1417
                                        Timeout:  30 * time.Second,
1✔
1418
                                        MinScore: 0.0,
1✔
1419
                                }
1✔
1420
                                if ceConfig.Model == "" && provider == "ollama" {
2✔
1421
                                        ceConfig.Model = "reranker"
1✔
1422
                                }
1✔
1423
                                ce := search.NewCrossEncoder(ceConfig)
1✔
1424
                                db.SetSearchReranker(ce)
1✔
1425
                                setGlobalRerankerResolver(func(string) search.Reranker { return ce })
1✔
1426
                                s.log.Info("search reranker ready (stage-2 reranking enabled)",
1✔
1427
                                        "subsystem", "search_rerank",
1✔
1428
                                        "provider", provider,
1✔
1429
                                        "url", apiURL)
1✔
1430
                        }
1431
                }
1432
        } else {
1✔
1433
                s.log.Info("search rerank disabled",
1✔
1434
                        "subsystem", "search_rerank",
1✔
1435
                        "override_env", "NORNICDB_SEARCH_RERANK_ENABLED")
1✔
1436
        }
1✔
1437

1438
        // Configure embeddings if enabled
1439
        // Local provider doesn't need API URL, others do
1440
        embeddingsReady := config.EmbeddingEnabled && (config.EmbeddingProvider == "local" || config.EmbeddingAPIURL != "")
1✔
1441
        if embeddingsReady {
2✔
1442
                embedConfig := &embed.Config{
1✔
1443
                        Provider:      config.EmbeddingProvider,
1✔
1444
                        APIURL:        config.EmbeddingAPIURL,
1✔
1445
                        APIKey:        config.EmbeddingAPIKey,
1✔
1446
                        Model:         config.EmbeddingModel,
1✔
1447
                        Dimensions:    config.EmbeddingDimensions,
1✔
1448
                        ModelsDir:     config.ModelsDir,
1✔
1449
                        Timeout:       30 * time.Second,
1✔
1450
                        CtxType:       config.EmbeddingCtxType,
1✔
1451
                        PoolingType:   config.EmbeddingPoolingType,
1✔
1452
                        AttentionType: config.EmbeddingAttentionType,
1✔
1453
                        FlashAttn:     config.EmbeddingFlashAttn,
1✔
1454
                }
1✔
1455

1✔
1456
                // Set API path based on provider (only for remote providers)
1✔
1457
                switch config.EmbeddingProvider {
1✔
1458
                case "ollama":
1✔
1459
                        embedConfig.APIPath = "/api/embeddings"
1✔
1460
                case "openai":
1✔
1461
                        embedConfig.APIPath = "/v1/embeddings"
1✔
1462
                case "local":
1✔
1463
                        // Local provider doesn't need API path
1464
                default:
×
1465
                        // Default to Ollama format
×
1466
                        embedConfig.APIPath = "/api/embeddings"
×
1467
                }
1468

1469
                // Initialize embeddings asynchronously to prevent startup blocking
1470
                // Local GGUF models can take 5-30 seconds to load (graph compilation)
1471
                embedInitLog := s.log.With("subsystem", "embed_init")
1✔
1472
                embedInitLog.Info("loading embedding model",
1✔
1473
                        "model", embedConfig.Model,
1✔
1474
                        "provider", embedConfig.Provider,
1✔
1475
                        "note", "server starts immediately; embeddings available after model loads")
1✔
1476

1✔
1477
                go func() {
2✔
1478
                        // Retry forever: exponential backoff to 5m, then fixed 5m interval.
1✔
1479
                        const (
1✔
1480
                                initialBackoff = 2 * time.Second
1✔
1481
                                maxBackoff     = 5 * time.Minute
1✔
1482
                        )
1✔
1483

1✔
1484
                        backoff := initialBackoff
1✔
1485
                        attempt := 0
1✔
1486

1✔
1487
                        for {
2✔
1488
                                if s.closed.Load() {
1✔
1489
                                        embedInitLog.Info("embedding init retry loop stopped: server shutting down")
×
1490
                                        return
×
1491
                                }
×
1492

1493
                                attempt++
1✔
1494

1✔
1495
                                // Use factory function for all providers.
1✔
1496
                                embedder, err := embed.NewEmbedder(embedConfig)
1✔
1497
                                if err == nil {
2✔
1498
                                        // Health check: test embedding before enabling.
1✔
1499
                                        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1✔
1500
                                        _, healthErr := embedder.Embed(ctx, "health check")
1✔
1501
                                        cancel()
1✔
1502
                                        if healthErr != nil {
2✔
1503
                                                err = fmt.Errorf("health check failed: %w", healthErr)
1✔
1504
                                        }
1✔
1505
                                }
1506

1507
                                if err == nil {
2✔
1508
                                        // Wrap with caching if enabled (default: 10K cache).
1✔
1509
                                        if config.EmbeddingCacheSize > 0 {
2✔
1510
                                                embedder = embed.NewCachedEmbedder(embedder, config.EmbeddingCacheSize)
1✔
1511
                                                embedInitLog.Info("embedding cache enabled",
1✔
1512
                                                        "entries", config.EmbeddingCacheSize,
1✔
1513
                                                        "memory_mb", embeddingCacheMemoryMB(config.EmbeddingCacheSize, config.EmbeddingDimensions))
1✔
1514
                                        }
1✔
1515

1516
                                        if config.EmbeddingProvider == "local" {
1✔
1517
                                                embedInitLog.Info("embeddings ready",
×
1518
                                                        "provider", "local_gguf",
×
1519
                                                        "model", config.EmbeddingModel,
×
1520
                                                        "dims", config.EmbeddingDimensions)
×
1521
                                        } else {
1✔
1522
                                                embedInitLog.Info("embeddings ready",
1✔
1523
                                                        "provider", config.EmbeddingProvider,
1✔
1524
                                                        "url", config.EmbeddingAPIURL,
1✔
1525
                                                        "model", config.EmbeddingModel,
1✔
1526
                                                        "dims", config.EmbeddingDimensions)
1✔
1527
                                        }
1✔
1528

1529
                                        if mcpServer != nil {
2✔
1530
                                                mcpServer.SetEmbedder(embedder)
1✔
1531
                                        }
1✔
1532
                                        // Share embedder with DB for auto-embed queue.
1533
                                        // The embed worker will wait for this to be set before processing.
1534
                                        db.SetEmbedder(embedder)
1✔
1535
                                        // Register as default for per-DB embedder registry (no-op if SetEmbedConfigForDB was not set).
1✔
1536
                                        db.SetDefaultEmbedConfig(embedConfig)
1✔
1537
                                        return
1✔
1538
                                }
1539

1540
                                if config.EmbeddingProvider == "local" {
2✔
1541
                                        embedInitLog.Warn("embedding init attempt failed",
1✔
1542
                                                "attempt", attempt,
1✔
1543
                                                "provider", "local",
1✔
1544
                                                "model", config.EmbeddingModel,
1✔
1545
                                                "error", err)
1✔
1546
                                } else {
2✔
1547
                                        embedInitLog.Warn("embedding init attempt failed",
1✔
1548
                                                "attempt", attempt,
1✔
1549
                                                "provider", config.EmbeddingProvider,
1✔
1550
                                                "model", config.EmbeddingModel,
1✔
1551
                                                "url", config.EmbeddingAPIURL,
1✔
1552
                                                "error", err)
1✔
1553
                                }
1✔
1554

1555
                                if backoff < maxBackoff {
2✔
1556
                                        embedInitLog.Info("retrying embedding init (exponential backoff)", "wait", backoff)
1✔
1557
                                        if !waitForDurationOrServerClose(s, backoff) {
2✔
1558
                                                embedInitLog.Info("embedding init retry interrupted by server shutdown")
1✔
1559
                                                return
1✔
1560
                                        }
1✔
1561
                                        backoff *= 2
1✔
1562
                                        if backoff > maxBackoff {
1✔
1563
                                                backoff = maxBackoff
×
1564
                                        }
×
1565
                                } else {
×
1566
                                        embedInitLog.Info("embedding init retry interval capped; continuing periodic retries",
×
1567
                                                "interval", maxBackoff)
×
1568
                                        if !waitForDurationOrServerClose(s, maxBackoff) {
×
1569
                                                embedInitLog.Info("embedding init retry interrupted by server shutdown")
×
1570
                                                return
×
1571
                                        }
×
1572
                                }
1573
                        }
1574
                }()
1575
        }
1576

1577
        // Log authentication status
1578
        if authenticator == nil || !authenticator.IsSecurityEnabled() {
2✔
1579
                s.log.Warn("authentication disabled")
1✔
1580
        }
1✔
1581

1582
        // Initialize rate limiter if enabled
1583
        var rateLimiter *IPRateLimiter
1✔
1584
        if config.RateLimitEnabled {
2✔
1585
                rateLimiter = NewIPRateLimiter(config.RateLimitPerMinute, config.RateLimitPerHour, config.RateLimitBurst)
1✔
1586
                s.log.Info("rate limiting enabled",
1✔
1587
                        "per_minute", config.RateLimitPerMinute,
1✔
1588
                        "per_hour", config.RateLimitPerHour,
1✔
1589
                        "scope", "per_ip")
1✔
1590
        }
1✔
1591
        s.rateLimiter = rateLimiter
1✔
1592

1✔
1593
        // So EmbeddingCount() aggregates across all databases (not just default)
1✔
1594
        s.db.SetAllDatabasesProvider(func() []nornicdb.DatabaseAndStorage {
2✔
1595
                var out []nornicdb.DatabaseAndStorage
1✔
1596
                for _, info := range s.dbManager.ListDatabases() {
2✔
1597
                        if info.Name == "system" {
2✔
1598
                                continue
1✔
1599
                        }
1600
                        isComposite := s.dbManager.IsCompositeDatabase(info.Name)
1✔
1601
                        if isComposite {
1✔
1602
                                continue
×
1603
                        }
1604
                        storageEngine, err := s.dbManager.GetStorage(info.Name)
1✔
1605
                        if err != nil {
1✔
1606
                                continue
×
1607
                        }
1608
                        out = append(out, nornicdb.DatabaseAndStorage{
1✔
1609
                                Name:        info.Name,
1✔
1610
                                Storage:     storageEngine,
1✔
1611
                                IsComposite: isComposite,
1✔
1612
                        })
1✔
1613
                }
1614
                return out
1✔
1615
        })
1616

1617
        // Reconcile search-service startup for metadata-only or late-created databases.
1618
        // DB.Open() warms namespaces present in storage; this loop ensures known DB metadata
1619
        // also gets initialized, and keeps doing so without requiring first-search triggers.
1620
        s.ensureSearchBuildStartedForKnownDatabases()
1✔
1621
        go func() {
2✔
1622
                ticker := time.NewTicker(2 * time.Second)
1✔
1623
                defer ticker.Stop()
1✔
1624
                for {
2✔
1625
                        if s.closed.Load() {
2✔
1626
                                return
1✔
1627
                        }
1✔
1628
                        s.ensureSearchBuildStartedForKnownDatabases()
1✔
1629
                        <-ticker.C
1✔
1630
                }
1631
        }()
1632

1633
        // Wire MCP to use per-database executors when invoked from the agentic loop (so link/store/recall use the request's database)
1634
        if mcpServer != nil && dbManager != nil {
2✔
1635
                mcpServer.SetDatabaseScopedExecutor(s.mcpDatabaseScopedExecutor())
1✔
1636
                mcpServer.SetDatabaseScopedStorage(func(dbName string) (storage.Engine, error) {
1✔
1637
                        return s.dbManager.GetStorage(dbName)
×
1638
                })
×
1639
        }
1640

1641
        // Initialize OAuth manager if authenticator is available
1642
        if authenticator != nil {
2✔
1643
                s.oauthManager = auth.NewOAuthManager(authenticator)
1✔
1644
        }
1✔
1645

1646
        // Per-database access: Full when auth disabled; when auth enabled, DenyAll until allowlist resolves.
1647
        if authenticator == nil || !authenticator.IsSecurityEnabled() {
2✔
1648
                s.databaseAccessMode = auth.FullDatabaseAccessMode
1✔
1649
        } else {
2✔
1650
                s.databaseAccessMode = auth.DenyAllDatabaseAccessMode
1✔
1651
        }
1✔
1652

1653
        // Load RBAC stores from system DB when available so roles/allowlist/privileges/entitlements APIs
1654
        // work even with --no-auth (e.g. fetch roles, configure RBAC before enabling auth).
1655
        if systemStorage, err := dbManager.GetStorage("system"); err == nil {
2✔
1656
                ctx := context.Background()
1✔
1657
                roleStore := auth.NewRoleStore(systemStorage)
1✔
1658
                if loadErr := roleStore.Load(ctx); loadErr != nil {
1✔
1659
                        s.log.Warn("failed to load RBAC roles", "subsystem", "rbac", "error", loadErr)
×
1660
                } else {
1✔
1661
                        s.roleStore = roleStore
1✔
1662
                }
1✔
1663
                allowlistStore := auth.NewAllowlistStore(systemStorage)
1✔
1664
                if loadErr := allowlistStore.Load(ctx); loadErr != nil {
1✔
1665
                        s.log.Warn("failed to load RBAC allowlist", "subsystem", "rbac", "error", loadErr)
×
1666
                } else {
1✔
1667
                        dbList := make([]string, 0, len(dbManager.ListDatabases()))
1✔
1668
                        for _, info := range dbManager.ListDatabases() {
2✔
1669
                                dbList = append(dbList, info.Name)
1✔
1670
                        }
1✔
1671
                        if seedErr := allowlistStore.SeedIfEmpty(ctx, dbList); seedErr != nil {
1✔
1672
                                s.log.Warn("failed to seed RBAC allowlist", "subsystem", "rbac", "error", seedErr)
×
1673
                        }
×
1674
                        s.allowlistStore = allowlistStore
1✔
1675
                }
1676
                privilegesStore := auth.NewPrivilegesStore(systemStorage)
1✔
1677
                if loadErr := privilegesStore.Load(ctx); loadErr != nil {
1✔
1678
                        s.log.Warn("failed to load RBAC privileges", "subsystem", "rbac", "error", loadErr)
×
1679
                } else {
1✔
1680
                        s.privilegesStore = privilegesStore
1✔
1681
                }
1✔
1682
                roleEntitlementsStore := auth.NewRoleEntitlementsStore(systemStorage)
1✔
1683
                if loadErr := roleEntitlementsStore.Load(ctx); loadErr != nil {
1✔
1684
                        s.log.Warn("failed to load RBAC role entitlements", "subsystem", "rbac", "error", loadErr)
×
1685
                } else {
1✔
1686
                        s.roleEntitlementsStore = roleEntitlementsStore
1✔
1687
                }
1✔
1688
                dbConfigStore := dbconfig.NewStore(systemStorage)
1✔
1689
                // Seed yaml-declared per-DB overrides on first boot. After the
1✔
1690
                // first successful seed, admin-API edits are authoritative across
1✔
1691
                // restarts (LoadWithYAMLDefaults skips keys already in the store).
1✔
1692
                // Falls back to plain Load when no yaml overrides are present.
1✔
1693
                yamlOverrides := s.perDBYAMLOverrides
1✔
1694
                if loadErr := dbConfigStore.LoadWithYAMLDefaults(ctx, yamlOverrides); loadErr != nil {
1✔
1695
                        s.log.Warn("failed to load per-DB config store", "subsystem", "dbconfig", "error", loadErr)
×
1696
                } else {
1✔
1697
                        s.dbConfigStore = dbConfigStore
1✔
1698
                        globalConfig := nornicConfig.LoadFromEnv()
1✔
1699
                        db.SetDbConfigResolver(func(dbName string) (int, float64, string) {
2✔
1700
                                overrides := dbConfigStore.GetOverrides(dbName)
1✔
1701
                                r := dbconfig.Resolve(globalConfig, overrides)
1✔
1702
                                if r == nil {
1✔
1703
                                        return 0, 0, ""
×
1704
                                }
×
1705
                                return r.EmbeddingDimensions, r.SearchMinSimilarity, r.BM25Engine
1✔
1706
                        })
1707
                        db.SetDbSearchFlagsResolver(func(dbName string) (bool, bool, string, string) {
2✔
1708
                                overrides := dbConfigStore.GetOverrides(dbName)
1✔
1709
                                r := dbconfig.Resolve(globalConfig, overrides)
1✔
1710
                                if r == nil {
1✔
1711
                                        return true, true, "startup", "startup"
×
1712
                                }
×
1713
                                return r.BM25Enabled, r.VectorEnabled, r.BM25Warming, r.VectorWarming
1✔
1714
                        })
1715
                        // Per-DB embedder registry: resolve embed config per database for EmbedQueryForDB.
1716
                        db.SetEmbedConfigForDB(func(dbName string) (*embed.Config, error) {
2✔
1717
                                overrides := dbConfigStore.GetOverrides(dbName)
1✔
1718
                                r := dbconfig.Resolve(globalConfig, overrides)
1✔
1719
                                if r == nil || r.Effective == nil {
1✔
1720
                                        return nil, nil
×
1721
                                }
×
1722
                                return buildEmbedConfigFromResolved(r.Effective, config), nil
1✔
1723
                        })
1724
                }
1725
        }
1726
        // Release the search-index warmup gate. nornicdb.Open holds the
1727
        // warmup goroutine at a sync point until this is called so it can
1728
        // resolve per-DB flags through the resolver wired above instead of
1729
        // racing with it. Default DB and named DBs go through the same
1730
        // resolution path; per-DB overrides (YAML databases: block, admin
1731
        // API store) apply uniformly. If wiring above failed (e.g. system
1732
        // storage unavailable) the gate is still released — warmup falls
1733
        // through to db.config.Memory.Search* + the (true,true,startup)
1734
        // final fallback, which matches today's behaviour for misconfigured
1735
        // systems.
1736
        db.MarkSearchWarmupReady()
1✔
1737

1✔
1738
        // Initialize slow query logger if file specified.
1✔
1739
        // D-04d collapse: threshold + log file path read from the canonical
1✔
1740
        // pkg/config.LoggingConfig snapshot threaded via Config.Logging.
1✔
1741
        if config.SlowQueryEnabled && config.Logging.SlowQueryLogFile != "" {
2✔
1742
                file, err := os.OpenFile(config.Logging.SlowQueryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
1✔
1743
                if err != nil {
2✔
1744
                        s.log.Warn("failed to open slow query log file",
1✔
1745
                                "subsystem", "slow_query",
1✔
1746
                                "file", config.Logging.SlowQueryLogFile,
1✔
1747
                                "error", err)
1✔
1748
                } else {
2✔
1749
                        s.slowQueryLogger = log.New(file, "", log.LstdFlags)
1✔
1750
                        s.log.Info("slow query logging configured",
1✔
1751
                                "subsystem", "slow_query",
1✔
1752
                                "file", config.Logging.SlowQueryLogFile,
1✔
1753
                                "threshold", config.Logging.SlowQueryThreshold)
1✔
1754
                }
1✔
1755
        } else if config.SlowQueryEnabled {
1✔
1756
                s.log.Info("slow query logging enabled",
×
1757
                        "subsystem", "slow_query",
×
1758
                        "threshold", config.Logging.SlowQueryThreshold)
×
1759
        }
×
1760

1761
        return s, nil
1✔
1762
}
1763

1764
// SetHTTPMetrics injects the Plan-04-02 HTTP catalog bag (D-02 typed
1765
// handle DI). MUST be called BEFORE Start() — once the http.Server's
1766
// Handler is wired in Start(), the wrapper is fixed for the server
1767
// lifetime. Callers (cmd/nornicdb/main.go) inject after observability.New
1768
// returns the registry, then call Start().
1769
//
1770
// Nil-safe: passing nil is equivalent to never calling — instrumentedMux
1771
// is a pass-through. Test fixtures and pre-Phase-4 callers compile and
1772
// run unchanged.
1773
func (s *Server) SetHTTPMetrics(m *observability.HTTPMetrics) {
1✔
1774
        s.mu.Lock()
1✔
1775
        defer s.mu.Unlock()
1✔
1776
        s.httpMetrics = m
1✔
1777
}
1✔
1778

1779
// SetObsRegistry plumbs the unified prometheus registry from
1780
// observability.New into the server so handleMetrics can call
1781
// observability.RenderLegacy. Phase 5 / Plan 05-04. Mirrors the
1782
// SetHTTPMetrics pattern (mu.Lock + assign + unlock).
1783
//
1784
// Nil-safe: passing nil is equivalent to never calling — handleMetrics
1785
// tolerates a nil registry by emitting empty body bytes (RenderLegacy
1786
// contract). Test fixtures and pre-Phase-5 callers compile and run
1787
// unchanged.
1788
func (s *Server) SetObsRegistry(reg *prometheus.Registry) {
1✔
1789
        s.mu.Lock()
1✔
1790
        defer s.mu.Unlock()
1✔
1791
        s.obsRegistry = reg
1✔
1792
}
1✔
1793

1794
// SetAuditLogger sets the audit logger for compliance logging.
1795
func (s *Server) SetAuditLogger(logger *audit.Logger) {
1✔
1796
        s.mu.Lock()
1✔
1797
        defer s.mu.Unlock()
1✔
1798
        s.audit = logger
1✔
1799
        if s.db != nil {
2✔
1800
                s.db.SetRetentionAuditCallback(func(action, recordID, category string) {
1✔
1801
                        if s.audit == nil {
×
1802
                                return
×
1803
                        }
×
1804
                        _ = s.audit.LogDataAccess("system", "retention-manager", "node", recordID, action, true, category)
×
1805
                })
1806
        }
1807
}
1808

1809
func (s *Server) setHeimdallHandler(handler *heimdall.Handler) {
1✔
1810
        s.mu.Lock()
1✔
1811
        s.heimdallHandler = handler
1✔
1812
        s.mu.Unlock()
1✔
1813
}
1✔
1814

1815
func (s *Server) getHeimdallHandler() *heimdall.Handler {
1✔
1816
        s.mu.RLock()
1✔
1817
        defer s.mu.RUnlock()
1✔
1818
        return s.heimdallHandler
1✔
1819
}
1✔
1820

1821
// Start begins listening for HTTP connections on the configured address and port.
1822
//
1823
// The server starts in a separate goroutine, so this method returns immediately
1824
// after successfully binding to the port. Use Addr() to get the actual listening
1825
// address after starting.
1826
//
1827
// Returns:
1828
//   - nil if server started successfully
1829
//   - Error if failed to bind to port or server is already closed
1830
//
1831
// Example:
1832
//
1833
//        server := server.New(db, auth, config)
1834
//
1835
//        if err := server.Start(); err != nil {
1836
//                log.Fatalf("Failed to start server: %v", err)
1837
//        }
1838
//
1839
//        // Server started on server.Addr()
1840
//
1841
//        // Server is now accepting connections
1842
//        // Keep main goroutine alive
1843
//        select {}
1844
//
1845
// TLS Support:
1846
//
1847
//        If TLSCertFile and TLSKeyFile are configured, the server automatically
1848
//        starts with HTTPS. Otherwise, it uses HTTP.
1849
func (s *Server) Start() error {
1✔
1850
        if s.closed.Load() {
2✔
1851
                return ErrServerClosed
1✔
1852
        }
1✔
1853

1854
        addr := fmt.Sprintf("%s:%d", s.config.Address, s.config.Port)
1✔
1855
        listener, err := net.Listen("tcp", addr)
1✔
1856
        if err != nil {
1✔
1857
                return fmt.Errorf("failed to listen on %s: %w", addr, err)
×
1858
        }
×
1859

1860
        s.listener = listener
1✔
1861
        s.started = time.Now()
1✔
1862

1✔
1863
        // Build router
1✔
1864
        mux := s.buildRouter()
1✔
1865

1✔
1866
        // Plan 04-02 D-03: instrumentedMux is the SOLE HTTP observation
1✔
1867
        // chokepoint per AGENTS.md §7 DRY. It wraps mux.ServeHTTP, reading
1✔
1868
        // `r.Pattern` post-dispatch (Go 1.22+ stdlib field) so path_template
1✔
1869
        // values come from the closed route table — never from r.URL.Path
1✔
1870
        // (cardinality bomb). Nil-safe: when s.httpMetrics is nil (test
1✔
1871
        // fixtures, pre-Phase-4 callers), the wrapper is a pass-through.
1✔
1872
        // Panic-safe: handler panics still emit a 5xx observation before
1✔
1873
        // re-propagating (T-04-08).
1✔
1874
        instrumented := instrumentedMux(mux, s.httpMetrics)
1✔
1875

1✔
1876
        s.requestsCtx, s.cancelRequestsCtx = context.WithCancel(context.Background())
1✔
1877
        s.httpServer = &http.Server{
1✔
1878
                Handler:      instrumented,
1✔
1879
                ReadTimeout:  s.config.ReadTimeout,
1✔
1880
                WriteTimeout: s.config.WriteTimeout,
1✔
1881
                IdleTimeout:  s.config.IdleTimeout,
1✔
1882
                // BaseContext links every request's r.Context() to the server's
1✔
1883
                // shutdown signal. When Stop() cancels requestsCtx, all in-flight
1✔
1884
                // handlers see ctx.Err() != nil on their next cancellation probe and
1✔
1885
                // unwind, so http.Server.Shutdown returns instead of waiting on a
1✔
1886
                // long-running BFS.
1✔
1887
                BaseContext: func(net.Listener) context.Context { return s.requestsCtx },
2✔
1888
        }
1889

1890
        // Configure HTTP/2 (always enabled, backwards compatible with HTTP/1.1)
1891
        http2Config := &http2.Server{
1✔
1892
                MaxConcurrentStreams: s.config.HTTP2MaxConcurrentStreams,
1✔
1893
        }
1✔
1894

1✔
1895
        if s.config.TLSCertFile != "" && s.config.TLSKeyFile != "" {
1✔
1896
                // HTTPS mode: HTTP/2 is automatically enabled via ALPN
×
1897
                // Configure HTTP/2 settings for TLS connections
×
1898
                if err := http2.ConfigureServer(s.httpServer, http2Config); err != nil {
×
1899
                        return fmt.Errorf("failed to configure HTTP/2 for TLS: %w", err)
×
1900
                }
×
1901
                s.log.Info("HTTP/2 enabled", "mode", "https")
×
1902
        } else {
1✔
1903
                // HTTP mode: Use h2c (HTTP/2 cleartext) for backwards compatibility
1✔
1904
                // h2c allows HTTP/2 over plain TCP, falling back to HTTP/1.1 for older clients
1✔
1905
                // Wrap the INSTRUMENTED mux (not bare mux) so observation runs
1✔
1906
                // inside the h2c transport adapter (Plan 04-02 D-03).
1✔
1907
                s.httpServer.Handler = h2c.NewHandler(instrumented, http2Config)
1✔
1908
                s.log.Info("HTTP/2 enabled", "mode", "h2c_cleartext", "compat", "http/1.1")
1✔
1909
        }
1✔
1910

1911
        // Start serving
1912
        go func() {
2✔
1913
                var err error
1✔
1914
                if s.config.TLSCertFile != "" && s.config.TLSKeyFile != "" {
1✔
1915
                        err = s.httpServer.ServeTLS(listener, s.config.TLSCertFile, s.config.TLSKeyFile)
×
1916
                } else {
1✔
1917
                        err = s.httpServer.Serve(listener)
1✔
1918
                }
1✔
1919
                if err != nil && err != http.ErrServerClosed {
1✔
1920
                        // Log error but don't crash
×
1921
                        s.log.Error("http server error", "error", err)
×
1922
                }
×
1923
        }()
1924

1925
        // Optional gRPC endpoints (feature-flagged).
1926
        if err := s.startQdrantGRPC(); err != nil {
2✔
1927
                _ = s.httpServer.Shutdown(context.Background())
1✔
1928
                return err
1✔
1929
        }
1✔
1930

1931
        return nil
1✔
1932
}
1933

1934
// Stop gracefully shuts down the server.
1935
func (s *Server) Stop(ctx context.Context) error {
1✔
1936
        if !s.closed.CompareAndSwap(false, true) {
2✔
1937
                return nil // Already closed
1✔
1938
        }
1✔
1939

1940
        s.stopQdrantGRPC()
1✔
1941

1✔
1942
        // Stop rate limiter cleanup goroutine
1✔
1943
        if s.rateLimiter != nil {
2✔
1944
                s.rateLimiter.Stop()
1✔
1945
        }
1✔
1946

1947
        if s.httpServer == nil {
2✔
1948
                return nil
1✔
1949
        }
1✔
1950

1951
        // Cancel the BaseContext handed to all in-flight requests so handlers that
1952
        // honour ctx (Cypher BFS / shortestPath traversals, etc.) abandon work
1953
        // and let http.Server.Shutdown drain promptly. Without this, an unbounded
1954
        // shortestPath traversal could hold shutdown open for the duration of the
1955
        // configured WriteTimeout.
1956
        if s.cancelRequestsCtx != nil {
2✔
1957
                s.cancelRequestsCtx()
1✔
1958
        }
1✔
1959

1960
        // Hard-bound shutdown: even if net/http Shutdown fails to return at ctx deadline
1961
        // (e.g., a stuck handler or an internal deadlock), Stop must return so callers
1962
        // can exit deterministically.
1963
        shutdownDone := make(chan error, 1)
1✔
1964
        go func() {
2✔
1965
                shutdownDone <- s.httpServer.Shutdown(ctx)
1✔
1966
        }()
1✔
1967

1968
        select {
1✔
1969
        case err := <-shutdownDone:
1✔
1970
                if err != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) {
1✔
1971
                        _ = s.httpServer.Close()
×
1972
                }
×
1973
                return err
1✔
1974
        case <-ctx.Done():
1✔
1975
                _ = s.httpServer.Close()
1✔
1976
                return ctx.Err()
1✔
1977
        }
1978
}
1979

1980
// Addr returns the server's listen address.
1981
func (s *Server) Addr() string {
1✔
1982
        if s.listener != nil {
2✔
1983
                return s.listener.Addr().String()
1✔
1984
        }
1✔
1985
        return ""
1✔
1986
}
1987

1988
// Stats returns current server runtime statistics.
1989
//
1990
// Statistics are updated in real-time by middleware and include:
1991
//   - Uptime since server start
1992
//   - Total request count
1993
//   - Total error count
1994
//   - Currently active requests
1995
//
1996
// Example:
1997
//
1998
//        stats := server.Stats()
1999
//        // stats.Uptime: server uptime
2000
//        // stats.RequestCount: total requests
2001
//        // stats.ErrorCount / stats.RequestCount: error rate
2002
//        // stats.ActiveRequests: in-flight requests
2003
//
2004
//        // Use for monitoring/alerting
2005
//        if stats.ErrorCount > 1000 {
2006
//                alert("High error count detected")
2007
//        }
2008
//
2009
// Thread-safe: Can be called concurrently from multiple goroutines.
2010
func (s *Server) Stats() ServerStats {
1✔
2011
        return ServerStats{
1✔
2012
                Uptime:         time.Since(s.started),
1✔
2013
                RequestCount:   s.requestCount.Load(),
1✔
2014
                ErrorCount:     s.errorCount.Load(),
1✔
2015
                ActiveRequests: s.activeRequests.Load(),
1✔
2016
                Version:        buildinfo.Version(),
1✔
2017
                Commit:         buildinfo.ShortCommit(),
1✔
2018
                BuildTime:      buildinfo.BuildTime,
1✔
2019
        }
1✔
2020
}
1✔
2021

2022
// ServerStats holds server metrics.
2023
type ServerStats struct {
2024
        Uptime         time.Duration `json:"uptime"`
2025
        RequestCount   int64         `json:"request_count"`
2026
        ErrorCount     int64         `json:"error_count"`
2027
        ActiveRequests int64         `json:"active_requests"`
2028
        Version        string        `json:"version"`
2029
        Commit         string        `json:"commit"`
2030
        BuildTime      string        `json:"build_time"`
2031
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc