• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ory / x / 15046509034

15 May 2025 01:40PM UTC coverage: 60.795% (-0.2%) from 61.006%
15046509034

Pull #857

github

web-flow
Merge 2689561d7 into 5c4b146a4
Pull Request #857: fix(jonnetsecure): prevent massive memory usage from misbehaving jsonnet script

23 of 47 new or added lines in 4 files covered. (48.94%)

2 existing lines in 1 file now uncovered.

7130 of 11728 relevant lines covered (60.79%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.42
/jsonnetsecure/jsonnet_pool.go
1
// Copyright © 2024 Ory Corp
2
// SPDX-License-Identifier: Apache-2.0
3

4
package jsonnetsecure
5

6
import (
7
        "bufio"
8
        "context"
9
        "encoding/json"
10
        "io"
11
        "math"
12
        "os/exec"
13
        "strconv"
14
        "strings"
15
        "time"
16

17
        "github.com/jackc/puddle/v2"
18
        "github.com/pkg/errors"
19
        "go.opentelemetry.io/otel/attribute"
20
        semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
21
        "go.opentelemetry.io/otel/trace"
22

23
        "github.com/ory/x/otelx"
24
)
25

26
const (
27
        chanSize           = 16
28
        KiB                = 1024
29
        jsonnetOutputLimit = 512 * KiB
30
        jsonnetErrLimit    = 1 * KiB
31
)
32

33
type (
34
        processPoolVM struct {
35
                path   string
36
                args   []string
37
                ctx    context.Context
38
                params processParameters
39
                pool   *pool
40
        }
41
        Pool interface {
42
                Close()
43
                private()
44
        }
45
        pool struct {
46
                puddle *puddle.Pool[worker]
47
        }
48
        worker struct {
49
                cmd    *exec.Cmd
50
                stdin  chan<- []byte
51
                stdout <-chan string
52
                stderr <-chan string
53
        }
54
        contextKeyType string
55
)
56

57
var (
58
        ErrProcessPoolClosed = errors.New("jsonnetsecure: process pool closed")
59

60
        _ VM   = (*processPoolVM)(nil)
61
        _ Pool = (*pool)(nil)
62

63
        contextValuePath contextKeyType = "argc"
64
        contextValueArgs contextKeyType = "argv"
65
)
66

67
func NewProcessPool(size int) Pool {
1✔
68
        size = max(5, min(size, math.MaxInt32))
1✔
69
        pud, err := puddle.NewPool(&puddle.Config[worker]{
1✔
70
                MaxSize:     int32(size), //nolint:gosec // disable G115 // because of the previous min/max, 5 <= size <= math.MaxInt32
1✔
71
                Constructor: newWorker,
1✔
72
                Destructor:  worker.destroy,
1✔
73
        })
1✔
74
        if err != nil {
1✔
75
                panic(err) // this should never happen, see implementation of puddle.NewPool
×
76
        }
77
        for range size {
2✔
78
                // warm pool
1✔
79
                go pud.CreateResource(context.Background())
1✔
80
        }
1✔
81
        go func() {
2✔
82
                for {
2✔
83
                        time.Sleep(10 * time.Second)
1✔
84
                        for _, proc := range pud.AcquireAllIdle() {
1✔
85
                                if proc.Value().cmd.ProcessState != nil {
×
86
                                        proc.Destroy()
×
87
                                } else {
×
88
                                        proc.Release()
×
89
                                }
×
90
                        }
91
                }
92
        }()
93
        return &pool{pud}
1✔
94
}
95

96
func (*pool) private() {}
×
97

98
func (p *pool) Close() {
1✔
99
        p.puddle.Close()
1✔
100
}
1✔
101

102
func newWorker(ctx context.Context) (_ worker, err error) {
1✔
103
        tracer := trace.SpanFromContext(ctx).TracerProvider().Tracer("")
1✔
104
        ctx, span := tracer.Start(ctx, "jsonnetsecure.newWorker")
1✔
105
        defer otelx.End(span, &err)
1✔
106

1✔
107
        path, _ := ctx.Value(contextValuePath).(string)
1✔
108
        if path == "" {
2✔
109
                return worker{}, errors.New("newWorker: missing binary path in context")
1✔
110
        }
1✔
111
        args, _ := ctx.Value(contextValueArgs).([]string)
1✔
112
        cmd := exec.Command(path, append(args, "-0")...)
1✔
113
        cmd.Env = []string{"GOMAXPROCS=1"}
1✔
114
        cmd.WaitDelay = 100 * time.Millisecond
1✔
115

1✔
116
        span.SetAttributes(semconv.ProcessCommand(cmd.Path), semconv.ProcessCommandArgs(cmd.Args...))
1✔
117

1✔
118
        stdin, err := cmd.StdinPipe()
1✔
119
        if err != nil {
1✔
120
                return worker{}, errors.Wrap(err, "newWorker: failed to create stdin pipe")
×
121
        }
×
122

123
        in := make(chan []byte, chanSize)
1✔
124
        go func(c <-chan []byte) {
2✔
125
                for input := range c {
2✔
126
                        if _, err := stdin.Write(append(input, 0)); err != nil {
1✔
127
                                stdin.Close()
×
128
                                return
×
129
                        }
×
130
                }
131
        }(in)
132

133
        stdout, err := cmd.StdoutPipe()
1✔
134
        if err != nil {
1✔
135
                return worker{}, errors.Wrap(err, "newWorker: failed to create stdout pipe")
×
136
        }
×
137
        stderr, err := cmd.StderrPipe()
1✔
138
        if err != nil {
1✔
139
                return worker{}, errors.Wrap(err, "newWorker: failed to create stderr pipe")
×
140
        }
×
141

142
        stdoutReader := io.LimitReader(stdout, int64(jsonnetOutputLimit))
1✔
143
        stderrReader := io.LimitReader(stderr, int64(jsonnetErrLimit))
1✔
144

1✔
145
        if err := cmd.Start(); err != nil {
1✔
146
                return worker{}, errors.Wrap(err, "newWorker: failed to start process")
×
147
        }
×
148

149
        span.SetAttributes(semconv.ProcessPID(cmd.Process.Pid))
1✔
150

1✔
151
        scan := func(c chan<- string, r io.Reader, limit int) {
2✔
152
                defer close(c)
1✔
153
                // NOTE: `bufio.Scanner` has its own internal limit of 64 KiB.
1✔
154
                scanner := bufio.NewScanner(r)
1✔
155

1✔
156
                scanner.Split(splitNull)
1✔
157
                for scanner.Scan() {
2✔
158
                        s := scanner.Text()
1✔
159
                        if len(s) >= limit {
1✔
NEW
160
                                c <- "ERROR: reached limits: " + strconv.FormatInt(int64(len(s)), 10)
×
161
                        } else {
1✔
162
                                c <- s
1✔
163
                        }
1✔
164
                }
165
                if err := scanner.Err(); err != nil {
2✔
166
                        c <- "ERROR: scan: " + err.Error()
1✔
167
                }
1✔
168
        }
169
        out := make(chan string, chanSize)
1✔
170
        go scan(out, stdoutReader, jsonnetOutputLimit)
1✔
171
        errs := make(chan string, chanSize)
1✔
172
        // No limit here because: too much data on stderr is not an error - simply having some already is.
1✔
173
        // And we already use a LimitedReader.
1✔
174
        go scan(errs, stderrReader, math.MaxInt)
1✔
175

1✔
176
        w := worker{
1✔
177
                cmd:    cmd,
1✔
178
                stdin:  in,
1✔
179
                stdout: out,
1✔
180
                stderr: errs,
1✔
181
        }
1✔
182
        _, err = w.eval(ctx, []byte("{}")) // warm up
1✔
183
        if err != nil {
1✔
184
                w.destroy()
×
185
                return worker{}, errors.Wrap(err, "newWorker: warm up failed")
×
186
        }
×
187

188
        return w, nil
1✔
189
}
190

191
func (w worker) destroy() {
1✔
192
        close(w.stdin)
1✔
193
        w.cmd.Process.Kill()
1✔
194
        w.cmd.Wait()
1✔
195
}
1✔
196

197
func (w worker) eval(ctx context.Context, processParams []byte) (output string, err error) {
1✔
198
        tracer := trace.SpanFromContext(ctx).TracerProvider().Tracer("")
1✔
199
        ctx, span := tracer.Start(ctx, "jsonnetsecure.worker.eval", trace.WithAttributes(
1✔
200
                semconv.ProcessPID(w.cmd.Process.Pid)))
1✔
201
        defer otelx.End(span, &err)
1✔
202

1✔
203
        select {
1✔
204
        case <-ctx.Done():
×
205
                return "", ctx.Err()
×
206
        case w.stdin <- processParams:
1✔
207
                break
1✔
208
        }
209

210
        select {
1✔
UNCOV
211
        case <-ctx.Done():
×
UNCOV
212
                return "", ctx.Err()
×
213
        case output := <-w.stdout:
1✔
214
                return output, nil
1✔
215
        case err := <-w.stderr:
1✔
216
                return "", errors.New(err)
1✔
217
        }
218
}
219

220
func (vm *processPoolVM) EvaluateAnonymousSnippet(filename string, snippet string) (_ string, err error) {
1✔
221
        tracer := trace.SpanFromContext(vm.ctx).TracerProvider().Tracer("")
1✔
222
        ctx, span := tracer.Start(vm.ctx, "jsonnetsecure.processPoolVM.EvaluateAnonymousSnippet", trace.WithAttributes(attribute.String("filename", filename)))
1✔
223
        defer otelx.End(span, &err)
1✔
224

1✔
225
        params := vm.params
1✔
226
        params.Filename = filename
1✔
227
        params.Snippet = snippet
1✔
228
        pp, err := json.Marshal(params)
1✔
229
        if err != nil {
1✔
230
                return "", errors.Wrap(err, "jsonnetsecure: marshal")
×
231
        }
×
232

233
        ctx = context.WithValue(ctx, contextValuePath, vm.path)
1✔
234
        ctx = context.WithValue(ctx, contextValueArgs, vm.args)
1✔
235
        worker, err := vm.pool.puddle.Acquire(ctx)
1✔
236
        if err != nil {
1✔
237
                return "", errors.Wrap(err, "jsonnetsecure: acquire")
×
238
        }
×
239

240
        ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
1✔
241
        defer cancel()
1✔
242
        result, err := worker.Value().eval(ctx, pp)
1✔
243
        if err != nil {
2✔
244
                worker.Destroy()
1✔
245
                return "", errors.Wrap(err, "jsonnetsecure: eval")
1✔
246
        } else {
2✔
247
                worker.Release()
1✔
248
        }
1✔
249

250
        if strings.HasPrefix(result, "ERROR: ") {
2✔
251
                return "", errors.New("jsonnetsecure: " + result)
1✔
252
        }
1✔
253

254
        return result, nil
1✔
255
}
256

257
func NewProcessPoolVM(opts *vmOptions) VM {
1✔
258
        ctx := opts.ctx
1✔
259
        if ctx == nil {
1✔
260
                ctx = context.Background()
×
261
        }
×
262
        return &processPoolVM{
1✔
263
                path: opts.jsonnetBinaryPath,
1✔
264
                args: opts.args,
1✔
265
                ctx:  ctx,
1✔
266
                pool: opts.pool,
1✔
267
        }
1✔
268
}
269

270
func (vm *processPoolVM) ExtCode(key string, val string) {
1✔
271
        vm.params.ExtCodes = append(vm.params.ExtCodes, kv{key, val})
1✔
272
}
1✔
273

274
func (vm *processPoolVM) ExtVar(key string, val string) {
1✔
275
        vm.params.ExtVars = append(vm.params.ExtVars, kv{key, val})
1✔
276
}
1✔
277

278
func (vm *processPoolVM) TLACode(key string, val string) {
1✔
279
        vm.params.TLACodes = append(vm.params.TLACodes, kv{key, val})
1✔
280
}
1✔
281

282
func (vm *processPoolVM) TLAVar(key string, val string) {
1✔
283
        vm.params.TLAVars = append(vm.params.TLAVars, kv{key, val})
1✔
284
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc