• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cybertec-postgresql / pg_timetable / 18006836801

25 Sep 2025 12:04PM UTC coverage: 85.871% (-0.2%) from 86.103%
18006836801

Pull #714

github

web-flow
Bump actions/upload-pages-artifact from 2 to 4

Bumps [actions/upload-pages-artifact](https://github.com/actions/upload-pages-artifact) from 2 to 4.
- [Release notes](https://github.com/actions/upload-pages-artifact/releases)
- [Commits](https://github.com/actions/upload-pages-artifact/compare/v2...v4)

---
updated-dependencies:
- dependency-name: actions/upload-pages-artifact
  dependency-version: '4'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #714: Bump actions/upload-pages-artifact from 2 to 4

1483 of 1727 relevant lines covered (85.87%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.59
/internal/pgengine/log_hook.go
1
package pgengine
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "time"
7

8
        pgx "github.com/jackc/pgx/v5"
9
        "github.com/sirupsen/logrus"
10
)
11

12
// LogHook is the implementation of the logrus hook for pgx
13
type LogHook struct {
14
        cacheLimit      int           // hold this number of entries before flush to database
15
        cacheTimeout    time.Duration // wait this amount of time before flush to database
16
        highLoadTimeout time.Duration // wait this amount of time before skip log entry
17
        db              PgxPoolIface
18
        input           chan logrus.Entry
19
        ctx             context.Context
20
        lastError       chan error
21
        pid             int32
22
        client          string
23
        level           string
24
}
25

26
// NewHook creates a LogHook to be added to an instance of logger
27
func NewHook(ctx context.Context, pge *PgEngine, level string) *LogHook {
1✔
28
        cacheLimit := 500
1✔
29
        l := &LogHook{
1✔
30
                cacheLimit:      cacheLimit,
1✔
31
                cacheTimeout:    2 * time.Second,
1✔
32
                highLoadTimeout: 200 * time.Millisecond,
1✔
33
                db:              pge.ConfigDb,
1✔
34
                input:           make(chan logrus.Entry, cacheLimit),
1✔
35
                lastError:       make(chan error),
1✔
36
                ctx:             ctx,
1✔
37
                pid:             pge.Getsid(),
1✔
38
                client:          pge.ClientName,
1✔
39
                level:           level,
1✔
40
        }
1✔
41
        go l.poll(l.input)
1✔
42
        return l
1✔
43
}
1✔
44

45
// Fire adds logrus log message to the internal queue for processing
46
func (hook *LogHook) Fire(entry *logrus.Entry) error {
1✔
47
        if hook.ctx.Err() != nil {
2✔
48
                return nil
1✔
49
        }
1✔
50
        select {
1✔
51
        case hook.input <- *entry:
1✔
52
                // entry sent
53
        case <-time.After(hook.highLoadTimeout):
1✔
54
                // entry dropped due to a huge load, check stdout or file for detailed log
55
        }
56
        select {
1✔
57
        case err := <-hook.lastError:
1✔
58
                return err
1✔
59
        default:
1✔
60
                return nil
1✔
61
        }
62
}
63

64
// Levels returns the available logging levels
65
func (hook *LogHook) Levels() []logrus.Level {
1✔
66
        switch hook.level {
1✔
67
        case "none":
1✔
68
                return []logrus.Level{}
1✔
69
        case "debug":
1✔
70
                return logrus.AllLevels
1✔
71
        case "info":
1✔
72
                return []logrus.Level{
1✔
73
                        logrus.PanicLevel,
1✔
74
                        logrus.FatalLevel,
1✔
75
                        logrus.ErrorLevel,
1✔
76
                        logrus.WarnLevel,
1✔
77
                        logrus.InfoLevel,
1✔
78
                }
1✔
79
        default:
1✔
80
                return []logrus.Level{
1✔
81
                        logrus.PanicLevel,
1✔
82
                        logrus.FatalLevel,
1✔
83
                        logrus.ErrorLevel,
1✔
84
                }
1✔
85
        }
86
}
87

88
// poll checks for incoming messages and caches them internally
89
// until either a maximum amount is reached, or a timeout occurs.
90
func (hook *LogHook) poll(input <-chan logrus.Entry) {
1✔
91
        cache := make([]logrus.Entry, 0, hook.cacheLimit)
1✔
92
        tick := time.NewTicker(hook.cacheTimeout)
1✔
93

1✔
94
        for {
2✔
95
                select {
1✔
96
                case <-hook.ctx.Done(): //check context with high priority
1✔
97
                        return
1✔
98
                default:
1✔
99
                        select {
1✔
100
                        case entry := <-input:
1✔
101
                                cache = append(cache, entry)
1✔
102
                                if len(cache) < hook.cacheLimit {
2✔
103
                                        break
1✔
104
                                }
105
                                tick.Stop()
×
106
                                hook.send(cache)
×
107
                                cache = cache[:0]
×
108
                                tick = time.NewTicker(hook.cacheTimeout)
×
109
                        case <-tick.C:
1✔
110
                                hook.send(cache)
1✔
111
                                cache = cache[:0]
1✔
112
                        case <-hook.ctx.Done():
×
113
                                return
×
114
                        }
115
                }
116
        }
117
}
118

119
func adaptEntryLevel(level logrus.Level) string {
1✔
120
        switch level {
1✔
121
        case logrus.TraceLevel, logrus.DebugLevel:
1✔
122
                return "DEBUG"
1✔
123
        case logrus.InfoLevel, logrus.WarnLevel:
1✔
124
                return "INFO"
1✔
125
        case logrus.ErrorLevel:
1✔
126
                return "ERROR"
1✔
127
        case logrus.FatalLevel, logrus.PanicLevel:
1✔
128
                return "PANIC"
1✔
129
        }
130
        return "UNKNOWN"
1✔
131
}
132

133
// send sends cached messages to the postgres server
134
func (hook *LogHook) send(cache []logrus.Entry) {
1✔
135
        if len(cache) == 0 {
2✔
136
                return // Nothing to do here.
1✔
137
        }
1✔
138
        _, err := hook.db.CopyFrom(
1✔
139
                hook.ctx,
1✔
140
                pgx.Identifier{"timetable", "log"},
1✔
141
                []string{"ts", "client_name", "pid", "log_level", "message", "message_data"},
1✔
142
                pgx.CopyFromSlice(len(cache),
1✔
143
                        func(i int) ([]interface{}, error) {
2✔
144
                                jsonData, err := json.Marshal(cache[i].Data)
1✔
145
                                if err != nil {
1✔
146
                                        return nil, err
×
147
                                }
×
148
                                return []interface{}{cache[i].Time,
1✔
149
                                        hook.client,
1✔
150
                                        hook.pid,
1✔
151
                                        adaptEntryLevel(cache[i].Level),
1✔
152
                                        cache[i].Message,
1✔
153
                                        jsonData}, nil
1✔
154
                        }),
155
        )
156
        if err != nil {
2✔
157
                select {
1✔
158
                case hook.lastError <- err:
×
159
                        //error sent to the logger
160
                default:
1✔
161
                        //there is unprocessed error already
162
                }
163
        }
164
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc