• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orneryd / NornicDB / 22891108208

10 Mar 2026 07:04AM UTC coverage: 88.141% (+0.3%) from 87.89%
22891108208

push

github

orneryd
increasing test coverage

88098 of 99951 relevant lines covered (88.14%)

1.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.48
/pkg/temporal/query_load.go
1
// Package temporal - Query load prediction for resource scaling.
2
//
3
// QueryLoadPredictor tracks query volume trends and predicts future load
4
// using KalmanVelocity filters. This enables:
5
//   - Predicting upcoming query spikes
6
//   - Detecting load trends (increasing/decreasing)
7
//   - Triggering pre-emptive resource scaling
8
//   - Identifying query patterns (peak hours, burst events)
9
//
10
// Use cases:
11
//   - Auto-scale database connections
12
//   - Pre-warm caches before predicted spikes
13
//   - Alert on unusual load patterns
14
//   - Capacity planning
15
//
16
// Example usage:
17
//
18
//        predictor := temporal.NewQueryLoadPredictor(temporal.DefaultLoadConfig())
19
//
20
//        // Record each query
21
//        predictor.RecordQuery()
22
//
23
//        // Get current load prediction
24
//        prediction := predictor.GetPrediction()
25
//        fmt.Printf("Current QPS: %.1f, Predicted (5min): %.1f\n",
26
//            prediction.CurrentQPS, prediction.PredictedQPS)
27
//
28
//        // Check if scaling needed
29
//        if predictor.ShouldScaleUp(100) { // threshold QPS
30
//            triggerScaleUp()
31
//        }
32
//
33
// # ELI12 (Explain Like I'm 12)
34
//
35
// Imagine you're running a lemonade stand. You want to know:
36
//
37
//        🍋 "How many customers are coming right now?" (Current QPS)
38
//        🍋 "Will it get busier or slower?" (Trend)
39
//        🍋 "Should I make more lemonade NOW?" (Scale up prediction)
40
//
41
// The QueryLoadPredictor counts how many "questions" (queries) the database
42
// gets every second. It's like counting customers:
43
//
44
//        Second 1: 10 queries
45
//        Second 2: 12 queries
46
//        Second 3: 15 queries
47
//        Second 4: 20 queries
48
//        → "Whoa, we're getting BUSIER! Velocity is positive!" 📈
49
//
50
// The Kalman filter smooths out the bumps:
51
//
52
//        Raw counts:   10, 12, 50, 11, 13  (that 50 was a weird spike!)
53
//        Filtered:     10, 11, 15, 13, 13  (smoothed - ignores the spike)
54
//
55
// Why filter? Because ONE busy second doesn't mean you need to panic!
56
// The filter asks: "Is this a REAL trend or just random noise?"
57
//
58
// Predictions:
59
//
60
//        Current: 50 QPS (queries per second)
61
//        Velocity: +5 QPS/second (getting busier)
62
//        Predicted in 5 min: 50 + (5 × 300) = 1550 QPS! 😱
63
//        → "Better scale up NOW before we're overwhelmed!"
64
//
65
// Anomaly detection:
66
//
67
//        Normal: 50 QPS
68
//        Suddenly: 500 QPS ← "SPIKE! Something's happening!"
69
//        Suddenly: 5 QPS ← "DROP! Did something break?"
70
//
71
// Peak hour detection:
72
//
73
//        Hour    0  1  2  3  4  5  6  7  8  9  10 11 12 13 14 15 16 17 18 19 20 21 22 23
74
//        Count   2  1  1  1  2  5  15 30 50 45 40 30 35 40 50 45 40 35 25 20 15 10 5  3
75
//                                      ^^^^^^^^^^^^
76
//                            "Peak hours are 8-10am and 2-4pm"
77
//
78
// This lets us PREPARE before the rush! Pre-warm caches at 7:30am! 🚀
79
package temporal
80

81
import (
82
        "math"
83
        "sync"
84
        "time"
85

86
        "github.com/orneryd/nornicdb/pkg/filter"
87
)
88

89
// LoadPrediction represents a query load prediction.
90
type LoadPrediction struct {
91
        // Current metrics
92
        CurrentQPS   float64 // Queries per second (smoothed)
93
        CurrentQPM   float64 // Queries per minute (smoothed)
94
        RawQPS       float64 // Unfiltered QPS
95
        TotalQueries int64
96

97
        // Trend
98
        Velocity float64 // Rate of change (positive = increasing load)
99
        Trend    string  // "increasing", "decreasing", "stable"
100

101
        // Predictions
102
        PredictedQPS5m  float64 // Predicted QPS in 5 minutes
103
        PredictedQPS15m float64 // Predicted QPS in 15 minutes
104
        PredictedQPS1h  float64 // Predicted QPS in 1 hour
105

106
        // Confidence
107
        Confidence float64
108

109
        // Time-of-day pattern
110
        PeakHour   int
111
        IsNearPeak bool
112

113
        // Anomaly detection
114
        IsAnomaly   bool
115
        AnomalyType string // "spike", "drop", "sustained_high", "sustained_low"
116

117
        // Timestamp
118
        Timestamp time.Time
119
}
120

121
// LoadConfig holds configuration for query load prediction.
122
type LoadConfig struct {
123
        // FilterConfig for the underlying Kalman velocity filter
124
        FilterConfig filter.VelocityConfig
125

126
        // BucketDurationSeconds - duration of each measurement bucket
127
        BucketDurationSeconds float64
128

129
        // SpikeThreshold - QPS velocity above which is a "spike"
130
        SpikeThreshold float64
131

132
        // DropThreshold - QPS velocity below which is a "drop"
133
        DropThreshold float64
134

135
        // AnomalyStdDevs - number of standard deviations for anomaly detection
136
        AnomalyStdDevs float64
137

138
        // ScaleUpThreshold - relative increase that suggests scaling up
139
        ScaleUpThreshold float64
140

141
        // ScaleDownThreshold - relative decrease that suggests scaling down
142
        ScaleDownThreshold float64
143

144
        // PeakDetectionWindow - hours to track for peak detection
145
        PeakDetectionWindow int
146
}
147

148
// DefaultLoadConfig returns sensible defaults.
149
func DefaultLoadConfig() LoadConfig {
1✔
150
        return LoadConfig{
1✔
151
                FilterConfig: filter.VelocityConfig{
1✔
152
                        ProcessNoisePos:    0.5, // QPS can change quickly
1✔
153
                        ProcessNoiseVel:    0.1,
1✔
154
                        MeasurementNoise:   2.0, // Measurement has noise
1✔
155
                        InitialPosVariance: 100.0,
1✔
156
                        InitialVelVariance: 10.0,
1✔
157
                        Dt:                 1.0,
1✔
158
                },
1✔
159
                BucketDurationSeconds: 1.0, // 1-second buckets
1✔
160
                SpikeThreshold:        5.0, // 5 QPS/sec increase
1✔
161
                DropThreshold:         -5.0,
1✔
162
                AnomalyStdDevs:        3.0,
1✔
163
                ScaleUpThreshold:      0.5,  // 50% increase
1✔
164
                ScaleDownThreshold:    -0.3, // 30% decrease
1✔
165
                PeakDetectionWindow:   24,
1✔
166
        }
1✔
167
}
1✔
168

169
// HighSensitivityLoadConfig returns config for high-sensitivity detection.
170
func HighSensitivityLoadConfig() LoadConfig {
1✔
171
        cfg := DefaultLoadConfig()
1✔
172
        cfg.SpikeThreshold = 2.0
1✔
173
        cfg.AnomalyStdDevs = 2.0
1✔
174
        cfg.ScaleUpThreshold = 0.3
1✔
175
        return cfg
1✔
176
}
1✔
177

178
// QueryLoadPredictor predicts query load using velocity tracking.
179
type QueryLoadPredictor struct {
180
        mu     sync.RWMutex
181
        config LoadConfig
182

183
        // Kalman filter for QPS tracking
184
        qpsFilter *filter.KalmanVelocity
185

186
        // Current bucket
187
        currentBucket time.Time
188
        bucketCount   int64
189

190
        // Statistics
191
        totalQueries int64
192
        startTime    time.Time
193

194
        // Rolling window for variance calculation
195
        recentQPS    []float64
196
        recentQPSIdx int
197
        windowSize   int
198

199
        // Hour-of-day tracking
200
        hourCounts [24]int64
201
        hourSums   [24]float64
202

203
        // Baseline for anomaly detection
204
        baselineQPS    float64
205
        baselineStdDev float64
206
}
207

208
// NewQueryLoadPredictor creates a new query load predictor.
209
func NewQueryLoadPredictor(cfg LoadConfig) *QueryLoadPredictor {
1✔
210
        return &QueryLoadPredictor{
1✔
211
                config:        cfg,
1✔
212
                qpsFilter:     filter.NewKalmanVelocity(cfg.FilterConfig),
1✔
213
                startTime:     time.Now(),
1✔
214
                currentBucket: time.Now().Truncate(time.Duration(cfg.BucketDurationSeconds) * time.Second),
1✔
215
                recentQPS:     make([]float64, 60), // Track last 60 measurements
1✔
216
                windowSize:    60,
1✔
217
        }
1✔
218
}
1✔
219

220
// RecordQuery records a query event.
221
func (qlp *QueryLoadPredictor) RecordQuery() {
1✔
222
        qlp.RecordQueryAt(time.Now())
1✔
223
}
1✔
224

225
// RecordQueryAt records a query at a specific time.
226
func (qlp *QueryLoadPredictor) RecordQueryAt(timestamp time.Time) {
1✔
227
        qlp.mu.Lock()
1✔
228
        defer qlp.mu.Unlock()
1✔
229

1✔
230
        qlp.totalQueries++
1✔
231

1✔
232
        // Track by hour
1✔
233
        hour := timestamp.Hour()
1✔
234
        qlp.hourCounts[hour]++
1✔
235

1✔
236
        // Check if we've moved to a new bucket
1✔
237
        bucket := timestamp.Truncate(time.Duration(qlp.config.BucketDurationSeconds) * time.Second)
1✔
238
        if bucket.After(qlp.currentBucket) {
2✔
239
                // Flush the old bucket
1✔
240
                qlp.flushBucket()
1✔
241
                qlp.currentBucket = bucket
1✔
242
                qlp.bucketCount = 0
1✔
243
        }
1✔
244

245
        qlp.bucketCount++
1✔
246
}
247

248
// RecordQueries records multiple query events (batch recording).
249
func (qlp *QueryLoadPredictor) RecordQueries(count int) {
1✔
250
        qlp.mu.Lock()
1✔
251
        defer qlp.mu.Unlock()
1✔
252

1✔
253
        qlp.totalQueries += int64(count)
1✔
254
        qlp.bucketCount += int64(count)
1✔
255

1✔
256
        hour := time.Now().Hour()
1✔
257
        qlp.hourCounts[hour] += int64(count)
1✔
258
}
1✔
259

260
// flushBucket processes the completed bucket.
261
func (qlp *QueryLoadPredictor) flushBucket() {
1✔
262
        if qlp.bucketCount == 0 {
1✔
263
                return
×
264
        }
×
265

266
        // Calculate QPS for this bucket
267
        qps := float64(qlp.bucketCount) / qlp.config.BucketDurationSeconds
1✔
268

1✔
269
        // Feed to Kalman filter
1✔
270
        qlp.qpsFilter.Process(qps)
1✔
271

1✔
272
        // Track in rolling window
1✔
273
        qlp.recentQPS[qlp.recentQPSIdx] = qps
1✔
274
        qlp.recentQPSIdx = (qlp.recentQPSIdx + 1) % qlp.windowSize
1✔
275

1✔
276
        // Update hour tracking
1✔
277
        hour := qlp.currentBucket.Hour()
1✔
278
        qlp.hourSums[hour] += qps
1✔
279

1✔
280
        // Update baseline
1✔
281
        qlp.updateBaseline()
1✔
282
}
283

284
// updateBaseline updates the baseline QPS statistics.
285
func (qlp *QueryLoadPredictor) updateBaseline() {
1✔
286
        // Calculate mean and std dev from recent window
1✔
287
        var sum, sumSq float64
1✔
288
        count := 0
1✔
289

1✔
290
        for _, qps := range qlp.recentQPS {
2✔
291
                if qps > 0 {
2✔
292
                        sum += qps
1✔
293
                        sumSq += qps * qps
1✔
294
                        count++
1✔
295
                }
1✔
296
        }
297

298
        if count > 5 {
2✔
299
                mean := sum / float64(count)
1✔
300
                variance := sumSq/float64(count) - mean*mean
1✔
301
                if variance < 0 {
1✔
302
                        variance = 0
×
303
                }
×
304

305
                qlp.baselineQPS = mean
1✔
306
                qlp.baselineStdDev = math.Sqrt(variance)
1✔
307
        }
308
}
309

310
// GetPrediction returns the current load prediction.
311
func (qlp *QueryLoadPredictor) GetPrediction() LoadPrediction {
1✔
312
        qlp.mu.RLock()
1✔
313
        defer qlp.mu.RUnlock()
1✔
314

1✔
315
        now := time.Now()
1✔
316

1✔
317
        // Get filtered QPS and velocity
1✔
318
        currentQPS := qlp.qpsFilter.State()
1✔
319
        velocity := qlp.qpsFilter.Velocity()
1✔
320

1✔
321
        // Calculate raw QPS from current bucket
1✔
322
        elapsed := now.Sub(qlp.currentBucket).Seconds()
1✔
323
        rawQPS := 0.0
1✔
324
        if elapsed > 0 {
2✔
325
                rawQPS = float64(qlp.bucketCount) / elapsed
1✔
326
        }
1✔
327

328
        // Determine trend
329
        var trend string
1✔
330
        if velocity > qlp.config.SpikeThreshold/10 {
2✔
331
                trend = "increasing"
1✔
332
        } else if velocity < qlp.config.DropThreshold/10 {
3✔
333
                trend = "decreasing"
1✔
334
        } else {
2✔
335
                trend = "stable"
1✔
336
        }
1✔
337

338
        // Predictions
339
        pred5m := qlp.qpsFilter.Predict(300)  // 5 minutes
1✔
340
        pred15m := qlp.qpsFilter.Predict(900) // 15 minutes
1✔
341
        pred1h := qlp.qpsFilter.Predict(3600) // 1 hour
1✔
342

1✔
343
        // Clamp predictions to reasonable values
1✔
344
        if pred5m < 0 {
2✔
345
                pred5m = 0
1✔
346
        }
1✔
347
        if pred15m < 0 {
2✔
348
                pred15m = 0
1✔
349
        }
1✔
350
        if pred1h < 0 {
2✔
351
                pred1h = 0
1✔
352
        }
1✔
353

354
        // Find peak hour
355
        peakHour := 0
1✔
356
        maxCount := int64(0)
1✔
357
        for h, count := range qlp.hourCounts {
2✔
358
                if count > maxCount {
2✔
359
                        maxCount = count
1✔
360
                        peakHour = h
1✔
361
                }
1✔
362
        }
363

364
        // Check if near peak (within 2 hours)
365
        currentHour := now.Hour()
1✔
366
        isNearPeak := abs(currentHour-peakHour) <= 2 ||
1✔
367
                abs(currentHour-peakHour) >= 22 // Handle wrap-around
1✔
368

1✔
369
        // Anomaly detection
1✔
370
        isAnomaly := false
1✔
371
        anomalyType := ""
1✔
372
        if qlp.baselineStdDev > 0 {
2✔
373
                deviation := (currentQPS - qlp.baselineQPS) / qlp.baselineStdDev
1✔
374
                if deviation > qlp.config.AnomalyStdDevs {
1✔
375
                        isAnomaly = true
×
376
                        if velocity > qlp.config.SpikeThreshold {
×
377
                                anomalyType = "spike"
×
378
                        } else {
×
379
                                anomalyType = "sustained_high"
×
380
                        }
×
381
                } else if deviation < -qlp.config.AnomalyStdDevs {
1✔
382
                        isAnomaly = true
×
383
                        if velocity < qlp.config.DropThreshold {
×
384
                                anomalyType = "drop"
×
385
                        } else {
×
386
                                anomalyType = "sustained_low"
×
387
                        }
×
388
                }
389
        }
390

391
        // Calculate confidence
392
        confidence := float64(qlp.totalQueries) / float64(qlp.totalQueries+1000)
1✔
393

1✔
394
        return LoadPrediction{
1✔
395
                CurrentQPS:      currentQPS,
1✔
396
                CurrentQPM:      currentQPS * 60,
1✔
397
                RawQPS:          rawQPS,
1✔
398
                TotalQueries:    qlp.totalQueries,
1✔
399
                Velocity:        velocity,
1✔
400
                Trend:           trend,
1✔
401
                PredictedQPS5m:  pred5m,
1✔
402
                PredictedQPS15m: pred15m,
1✔
403
                PredictedQPS1h:  pred1h,
1✔
404
                Confidence:      confidence,
1✔
405
                PeakHour:        peakHour,
1✔
406
                IsNearPeak:      isNearPeak,
1✔
407
                IsAnomaly:       isAnomaly,
1✔
408
                AnomalyType:     anomalyType,
1✔
409
                Timestamp:       now,
1✔
410
        }
1✔
411
}
412

413
// abs returns the absolute value of an int.
414
func abs(x int) int {
1✔
415
        if x < 0 {
1✔
416
                return -x
×
417
        }
×
418
        return x
1✔
419
}
420

421
// ShouldScaleUp checks if load is increasing and above threshold.
422
func (qlp *QueryLoadPredictor) ShouldScaleUp(thresholdQPS float64) bool {
1✔
423
        pred := qlp.GetPrediction()
1✔
424

1✔
425
        // Scale up if:
1✔
426
        // 1. Current QPS is near threshold
1✔
427
        // 2. Trend is increasing
1✔
428
        // 3. Predicted QPS exceeds threshold
1✔
429
        if pred.CurrentQPS > thresholdQPS*0.8 && pred.Trend == "increasing" {
1✔
430
                return true
×
431
        }
×
432
        if pred.PredictedQPS5m > thresholdQPS {
1✔
433
                return true
×
434
        }
×
435
        return false
1✔
436
}
437

438
// ShouldScaleDown checks if load is decreasing and below threshold.
439
func (qlp *QueryLoadPredictor) ShouldScaleDown(thresholdQPS float64, minQPS float64) bool {
1✔
440
        pred := qlp.GetPrediction()
1✔
441

1✔
442
        // Scale down if:
1✔
443
        // 1. Current QPS is well below threshold
1✔
444
        // 2. Trend is decreasing or stable
1✔
445
        // 3. Above minimum QPS
1✔
446
        if pred.CurrentQPS < thresholdQPS*0.5 &&
1✔
447
                pred.Trend != "increasing" &&
1✔
448
                pred.CurrentQPS > minQPS {
2✔
449
                return true
1✔
450
        }
1✔
451
        return false
1✔
452
}
453

454
// GetLoadLevel returns a simplified load level (0-5).
455
func (qlp *QueryLoadPredictor) GetLoadLevel(maxQPS float64) int {
1✔
456
        pred := qlp.GetPrediction()
1✔
457
        ratio := pred.CurrentQPS / maxQPS
1✔
458

1✔
459
        switch {
1✔
460
        case ratio < 0.1:
1✔
461
                return 0 // Idle
1✔
462
        case ratio < 0.3:
1✔
463
                return 1 // Low
1✔
464
        case ratio < 0.5:
1✔
465
                return 2 // Medium
1✔
466
        case ratio < 0.7:
1✔
467
                return 3 // High
1✔
468
        case ratio < 0.9:
1✔
469
                return 4 // Very High
1✔
470
        default:
1✔
471
                return 5 // Critical
1✔
472
        }
473
}
474

475
// PredictPeakTime predicts when the next peak will occur.
476
func (qlp *QueryLoadPredictor) PredictPeakTime() time.Time {
1✔
477
        qlp.mu.RLock()
1✔
478
        defer qlp.mu.RUnlock()
1✔
479

1✔
480
        now := time.Now()
1✔
481
        currentHour := now.Hour()
1✔
482

1✔
483
        // Find peak hour
1✔
484
        peakHour := 0
1✔
485
        maxCount := int64(0)
1✔
486
        for h, count := range qlp.hourCounts {
2✔
487
                if count > maxCount {
2✔
488
                        maxCount = count
1✔
489
                        peakHour = h
1✔
490
                }
1✔
491
        }
492

493
        // Calculate next occurrence of peak hour
494
        hoursUntilPeak := peakHour - currentHour
1✔
495
        if hoursUntilPeak <= 0 {
1✔
496
                hoursUntilPeak += 24
×
497
        }
×
498

499
        return now.Add(time.Duration(hoursUntilPeak) * time.Hour).Truncate(time.Hour)
1✔
500
}
501

502
// LoadStats holds statistics about query load.
503
type LoadStats struct {
504
        TotalQueries  int64
505
        UptimeSeconds float64
506
        AverageQPS    float64
507
        CurrentQPS    float64
508
        PeakQPS       float64
509
        PeakHour      int
510
}
511

512
// GetStats returns query load statistics.
513
func (qlp *QueryLoadPredictor) GetStats() LoadStats {
1✔
514
        qlp.mu.RLock()
1✔
515
        defer qlp.mu.RUnlock()
1✔
516

1✔
517
        uptime := time.Since(qlp.startTime).Seconds()
1✔
518

1✔
519
        // Find peak QPS from recent window
1✔
520
        peakQPS := 0.0
1✔
521
        for _, qps := range qlp.recentQPS {
2✔
522
                if qps > peakQPS {
1✔
523
                        peakQPS = qps
×
524
                }
×
525
        }
526

527
        // Find peak hour
528
        peakHour := 0
1✔
529
        maxCount := int64(0)
1✔
530
        for h, count := range qlp.hourCounts {
2✔
531
                if count > maxCount {
2✔
532
                        maxCount = count
1✔
533
                        peakHour = h
1✔
534
                }
1✔
535
        }
536

537
        return LoadStats{
1✔
538
                TotalQueries:  qlp.totalQueries,
1✔
539
                UptimeSeconds: uptime,
1✔
540
                AverageQPS:    float64(qlp.totalQueries) / uptime,
1✔
541
                CurrentQPS:    qlp.qpsFilter.State(),
1✔
542
                PeakQPS:       peakQPS,
1✔
543
                PeakHour:      peakHour,
1✔
544
        }
1✔
545
}
546

547
// Reset clears all load data.
548
func (qlp *QueryLoadPredictor) Reset() {
1✔
549
        qlp.mu.Lock()
1✔
550
        defer qlp.mu.Unlock()
1✔
551

1✔
552
        qlp.qpsFilter = filter.NewKalmanVelocity(qlp.config.FilterConfig)
1✔
553
        qlp.totalQueries = 0
1✔
554
        qlp.startTime = time.Now()
1✔
555
        qlp.currentBucket = time.Now().Truncate(time.Duration(qlp.config.BucketDurationSeconds) * time.Second)
1✔
556
        qlp.bucketCount = 0
1✔
557
        qlp.recentQPS = make([]float64, qlp.windowSize)
1✔
558
        qlp.recentQPSIdx = 0
1✔
559
        qlp.hourCounts = [24]int64{}
1✔
560
        qlp.hourSums = [24]float64{}
1✔
561
        qlp.baselineQPS = 0
1✔
562
        qlp.baselineStdDev = 0
1✔
563
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc