• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ory / x / 16046219703

03 Jul 2025 09:04AM UTC coverage: 61.788% (+0.005%) from 61.783%
16046219703

Pull #874

github

web-flow
Merge 64abf75d1 into 219becb91
Pull Request #874: feat: add configurable list of sensitive HTTP headers for redaction

21 of 25 new or added lines in 2 files covered. (84.0%)

4 existing lines in 1 file now uncovered.

7422 of 12012 relevant lines covered (61.79%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.13
/jsonnetsecure/jsonnet_pool.go
1
// Copyright © 2024 Ory Corp
2
// SPDX-License-Identifier: Apache-2.0
3

4
package jsonnetsecure
5

6
// Known limitations/edge cases:
7
// - The child process exiting early (e.g. crashing) or getting killed (e.g. reaching some OS limit)
8
//   is not detected and no error will be returned in this case from `eval()`.
9
// - Misbehaving jsonnet scripts in the middle of a batch being passed to the child process for evaluation may result in
10
//   no error (as mentioned above), and other valid scripts in this batch may result
11
//   in an error (because the output from the child process is truncated).
12
//
13
// Possible remediations:
14
// - Do not pass a batch of scripts to a worker, only pass one script at a time (to isolate misbehaving scripts)
15
// - Validate that the output is valid JSON (to detect truncated output)
16
// - Detect the child process exiting (to return an error)
17

18
import (
19
        "bufio"
20
        "context"
21
        "encoding/json"
22
        "io"
23
        "math"
24
        "os/exec"
25
        "strings"
26
        "time"
27

28
        "github.com/jackc/puddle/v2"
29
        "github.com/pkg/errors"
30
        "go.opentelemetry.io/otel/attribute"
31
        semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
32
        "go.opentelemetry.io/otel/trace"
33

34
        "github.com/ory/x/otelx"
35
)
36

37
const (
38
        KiB                = 1024
39
        jsonnetOutputLimit = 512 * KiB
40
        jsonnetErrLimit    = 1 * KiB
41
)
42

43
type (
44
        processPoolVM struct {
45
                path   string
46
                args   []string
47
                ctx    context.Context
48
                params processParameters
49
                pool   *pool
50
        }
51
        Pool interface {
52
                Close()
53
                private()
54
        }
55
        pool struct {
56
                puddle *puddle.Pool[worker]
57
        }
58
        worker struct {
59
                cmd    *exec.Cmd
60
                stdin  chan<- []byte
61
                stdout <-chan string
62
                stderr <-chan string
63
        }
64
        contextKeyType string
65
)
66

67
var (
68
        ErrProcessPoolClosed = errors.New("jsonnetsecure: process pool closed")
69

70
        _ VM   = (*processPoolVM)(nil)
71
        _ Pool = (*pool)(nil)
72

73
        contextValuePath contextKeyType = "argc"
74
        contextValueArgs contextKeyType = "argv"
75
)
76

77
func NewProcessPool(size int) Pool {
1✔
78
        size = max(5, min(size, math.MaxInt32))
1✔
79
        pud, err := puddle.NewPool(&puddle.Config[worker]{
1✔
80
                MaxSize:     int32(size), //nolint:gosec // disable G115 // because of the previous min/max, 5 <= size <= math.MaxInt32
1✔
81
                Constructor: newWorker,
1✔
82
                Destructor:  worker.destroy,
1✔
83
        })
1✔
84
        if err != nil {
1✔
85
                panic(err) // this should never happen, see implementation of puddle.NewPool
×
86
        }
87
        for range size {
2✔
88
                // warm pool
1✔
89
                go pud.CreateResource(context.Background())
1✔
90
        }
1✔
91
        go func() {
2✔
92
                for {
2✔
93
                        time.Sleep(10 * time.Second)
1✔
94
                        for _, proc := range pud.AcquireAllIdle() {
1✔
UNCOV
95
                                if proc.Value().cmd.ProcessState != nil {
×
96
                                        proc.Destroy()
×
UNCOV
97
                                } else {
×
UNCOV
98
                                        proc.Release()
×
UNCOV
99
                                }
×
100
                        }
101
                }
102
        }()
103
        return &pool{pud}
1✔
104
}
105

106
func (*pool) private() {}
×
107

108
func (p *pool) Close() {
1✔
109
        p.puddle.Close()
1✔
110
}
1✔
111

112
func newWorker(ctx context.Context) (_ worker, err error) {
1✔
113
        tracer := trace.SpanFromContext(ctx).TracerProvider().Tracer("")
1✔
114
        ctx, span := tracer.Start(ctx, "jsonnetsecure.newWorker")
1✔
115
        defer otelx.End(span, &err)
1✔
116

1✔
117
        path, _ := ctx.Value(contextValuePath).(string)
1✔
118
        if path == "" {
2✔
119
                return worker{}, errors.New("newWorker: missing binary path in context")
1✔
120
        }
1✔
121
        args, _ := ctx.Value(contextValueArgs).([]string)
1✔
122
        cmd := exec.Command(path, append(args, "-0")...)
1✔
123
        cmd.Env = []string{"GOMAXPROCS=1"}
1✔
124
        cmd.WaitDelay = 100 * time.Millisecond
1✔
125

1✔
126
        span.SetAttributes(semconv.ProcessCommand(cmd.Path), semconv.ProcessCommandArgs(cmd.Args...))
1✔
127

1✔
128
        stdin, err := cmd.StdinPipe()
1✔
129
        if err != nil {
1✔
130
                return worker{}, errors.Wrap(err, "newWorker: failed to create stdin pipe")
×
131
        }
×
132

133
        in := make(chan []byte, 1)
1✔
134
        go func(c <-chan []byte) {
2✔
135
                for input := range c {
2✔
136
                        if _, err := stdin.Write(append(input, 0)); err != nil {
2✔
137
                                stdin.Close()
1✔
138
                                return
1✔
139
                        }
1✔
140
                }
141
        }(in)
142

143
        stdout, err := cmd.StdoutPipe()
1✔
144
        if err != nil {
1✔
145
                return worker{}, errors.Wrap(err, "newWorker: failed to create stdout pipe")
×
146
        }
×
147
        stderr, err := cmd.StderrPipe()
1✔
148
        if err != nil {
1✔
149
                return worker{}, errors.Wrap(err, "newWorker: failed to create stderr pipe")
×
150
        }
×
151

152
        if err := cmd.Start(); err != nil {
1✔
153
                return worker{}, errors.Wrap(err, "newWorker: failed to start process")
×
154
        }
×
155

156
        span.SetAttributes(semconv.ProcessPID(cmd.Process.Pid))
1✔
157

1✔
158
        scan := func(c chan<- string, r io.Reader) {
2✔
159
                defer close(c)
1✔
160
                // NOTE: `bufio.Scanner` has its own internal limit of 64 KiB.
1✔
161
                scanner := bufio.NewScanner(r)
1✔
162

1✔
163
                scanner.Split(splitNull)
1✔
164
                for scanner.Scan() {
2✔
165
                        c <- scanner.Text()
1✔
166
                }
1✔
167
                if err := scanner.Err(); err != nil {
2✔
168
                        c <- "ERROR: scan: " + err.Error()
1✔
169
                }
1✔
170
        }
171
        out := make(chan string, 1)
1✔
172
        go scan(out, stdout)
1✔
173
        errs := make(chan string, 1)
1✔
174
        go scan(errs, stderr)
1✔
175

1✔
176
        w := worker{
1✔
177
                cmd:    cmd,
1✔
178
                stdin:  in,
1✔
179
                stdout: out,
1✔
180
                stderr: errs,
1✔
181
        }
1✔
182
        _, err = w.eval(ctx, []byte("{}")) // warm up
1✔
183
        if err != nil {
1✔
184
                w.destroy()
×
185
                return worker{}, errors.Wrap(err, "newWorker: warm up failed")
×
186
        }
×
187

188
        return w, nil
1✔
189
}
190

191
func (w worker) destroy() {
1✔
192
        close(w.stdin)
1✔
193
        w.cmd.Process.Kill()
1✔
194
        w.cmd.Wait()
1✔
195
}
1✔
196

197
func (w worker) eval(ctx context.Context, processParams []byte) (output string, err error) {
1✔
198
        tracer := trace.SpanFromContext(ctx).TracerProvider().Tracer("")
1✔
199
        ctx, span := tracer.Start(ctx, "jsonnetsecure.worker.eval", trace.WithAttributes(
1✔
200
                semconv.ProcessPID(w.cmd.Process.Pid)))
1✔
201
        defer otelx.End(span, &err)
1✔
202

1✔
203
        select {
1✔
204
        case <-ctx.Done():
1✔
205
                return "", ctx.Err()
1✔
206
        case w.stdin <- processParams:
1✔
207
                break
1✔
208
        }
209

210
        select {
1✔
211
        case <-ctx.Done():
1✔
212
                return "", ctx.Err()
1✔
213
        case output := <-w.stdout:
1✔
214
                return output, nil
1✔
215
        case err := <-w.stderr:
1✔
216
                return "", errors.New(err)
1✔
217
        }
218
}
219

220
func (vm *processPoolVM) EvaluateAnonymousSnippet(filename string, snippet string) (_ string, err error) {
1✔
221
        tracer := trace.SpanFromContext(vm.ctx).TracerProvider().Tracer("")
1✔
222
        ctx, span := tracer.Start(vm.ctx, "jsonnetsecure.processPoolVM.EvaluateAnonymousSnippet", trace.WithAttributes(attribute.String("filename", filename)))
1✔
223
        defer otelx.End(span, &err)
1✔
224

1✔
225
        params := vm.params
1✔
226
        params.Filename = filename
1✔
227
        params.Snippet = snippet
1✔
228
        pp, err := json.Marshal(params)
1✔
229
        if err != nil {
1✔
230
                return "", errors.Wrap(err, "jsonnetsecure: marshal")
×
231
        }
×
232

233
        ctx = context.WithValue(ctx, contextValuePath, vm.path)
1✔
234
        ctx = context.WithValue(ctx, contextValueArgs, vm.args)
1✔
235
        worker, err := vm.pool.puddle.Acquire(ctx)
1✔
236
        if err != nil {
1✔
237
                return "", errors.Wrap(err, "jsonnetsecure: acquire")
×
238
        }
×
239

240
        ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
1✔
241
        defer cancel()
1✔
242
        result, err := worker.Value().eval(ctx, pp)
1✔
243
        if err != nil {
2✔
244
                worker.Destroy()
1✔
245
                return "", errors.Wrap(err, "jsonnetsecure: eval")
1✔
246
        } else {
2✔
247
                worker.Release()
1✔
248
        }
1✔
249

250
        if strings.HasPrefix(result, "ERROR: ") {
2✔
251
                return "", errors.New("jsonnetsecure: " + result)
1✔
252
        }
1✔
253

254
        return result, nil
1✔
255
}
256

257
func NewProcessPoolVM(opts *vmOptions) VM {
1✔
258
        ctx := opts.ctx
1✔
259
        if ctx == nil {
1✔
260
                ctx = context.Background()
×
261
        }
×
262
        return &processPoolVM{
1✔
263
                path: opts.jsonnetBinaryPath,
1✔
264
                args: opts.args,
1✔
265
                ctx:  ctx,
1✔
266
                pool: opts.pool,
1✔
267
        }
1✔
268
}
269

270
func (vm *processPoolVM) ExtCode(key string, val string) {
1✔
271
        vm.params.ExtCodes = append(vm.params.ExtCodes, kv{key, val})
1✔
272
}
1✔
273

274
func (vm *processPoolVM) ExtVar(key string, val string) {
1✔
275
        vm.params.ExtVars = append(vm.params.ExtVars, kv{key, val})
1✔
276
}
1✔
277

278
func (vm *processPoolVM) TLACode(key string, val string) {
1✔
279
        vm.params.TLACodes = append(vm.params.TLACodes, kv{key, val})
1✔
280
}
1✔
281

282
func (vm *processPoolVM) TLAVar(key string, val string) {
1✔
283
        vm.params.TLAVars = append(vm.params.TLAVars, kv{key, val})
1✔
284
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc