• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

muesli / termenv / 6922800437

19 Nov 2023 08:34PM UTC coverage: 51.553%. First build
6922800437

push

github

maaslalani
feat: implement detection of kitty keyboard protocol

0 of 90 new or added lines in 1 file covered. (0.0%)

531 of 1030 relevant lines covered (51.55%)

7.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.57
/termenv_unix.go
1
//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris
2
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
3

4
package termenv
5

6
import (
7
        "fmt"
8
        "io"
9
        "strconv"
10
        "strings"
11
        "time"
12

13
        "golang.org/x/sys/unix"
14
)
15

16
const (
17
        // timeout for OSC queries
18
        OSCTimeout = 5 * time.Second
19
)
20

21
// ColorProfile returns the supported color profile:
22
// Ascii, ANSI, ANSI256, or TrueColor.
23
func (o *Output) ColorProfile() Profile {
14✔
24
        if !o.isTTY() {
27✔
25
                return Ascii
13✔
26
        }
13✔
27

28
        if o.environ.Getenv("GOOGLE_CLOUD_SHELL") == "true" {
1✔
29
                return TrueColor
×
30
        }
×
31

32
        term := o.environ.Getenv("TERM")
1✔
33
        colorTerm := o.environ.Getenv("COLORTERM")
1✔
34

1✔
35
        switch strings.ToLower(colorTerm) {
1✔
36
        case "24bit":
×
37
                fallthrough
×
38
        case "truecolor":
×
39
                if strings.HasPrefix(term, "screen") {
×
40
                        // tmux supports TrueColor, screen only ANSI256
×
41
                        if o.environ.Getenv("TERM_PROGRAM") != "tmux" {
×
42
                                return ANSI256
×
43
                        }
×
44
                }
45
                return TrueColor
×
46
        case "yes":
×
47
                fallthrough
×
48
        case "true":
×
49
                return ANSI256
×
50
        }
51

52
        switch term {
1✔
53
        case "xterm-kitty", "wezterm":
×
54
                return TrueColor
×
55
        case "linux":
×
56
                return ANSI
×
57
        }
58

59
        if strings.Contains(term, "256color") {
1✔
60
                return ANSI256
×
61
        }
×
62
        if strings.Contains(term, "color") {
1✔
63
                return ANSI
×
64
        }
×
65
        if strings.Contains(term, "ansi") {
1✔
66
                return ANSI
×
67
        }
×
68

69
        return Ascii
1✔
70
}
71

72
func (o Output) foregroundColor() Color {
×
73
        s, err := o.termStatusReport(10)
×
74
        if err == nil {
×
75
                c, err := xTermColor(s)
×
76
                if err == nil {
×
77
                        return c
×
78
                }
×
79
        }
80

81
        colorFGBG := o.environ.Getenv("COLORFGBG")
×
82
        if strings.Contains(colorFGBG, ";") {
×
83
                c := strings.Split(colorFGBG, ";")
×
84
                i, err := strconv.Atoi(c[0])
×
85
                if err == nil {
×
86
                        return ANSIColor(i)
×
87
                }
×
88
        }
89

90
        // default gray
91
        return ANSIColor(7)
×
92
}
93

94
func (o Output) backgroundColor() Color {
×
95
        s, err := o.termStatusReport(11)
×
96
        if err == nil {
×
97
                c, err := xTermColor(s)
×
98
                if err == nil {
×
99
                        return c
×
100
                }
×
101
        }
102

103
        colorFGBG := o.environ.Getenv("COLORFGBG")
×
104
        if strings.Contains(colorFGBG, ";") {
×
105
                c := strings.Split(colorFGBG, ";")
×
106
                i, err := strconv.Atoi(c[len(c)-1])
×
107
                if err == nil {
×
108
                        return ANSIColor(i)
×
109
                }
×
110
        }
111

112
        // default black
113
        return ANSIColor(0)
×
114
}
115

116
func (o Output) kittyKeyboardProtocolSupport() byte {
×
NEW
117
        // screen/tmux can't support OSC, because they can be connected to multiple
×
NEW
118
        // terminals concurrently.
×
NEW
119
        term := o.environ.Getenv("TERM")
×
NEW
120
        if strings.HasPrefix(term, "screen") || strings.HasPrefix(term, "tmux") {
×
NEW
121
                return 0b00000
×
NEW
122
        }
×
123

NEW
124
        tty := o.TTY()
×
NEW
125
        if tty == nil {
×
NEW
126
                return 0b00000
×
NEW
127
        }
×
128

NEW
129
        if !o.unsafe {
×
NEW
130
                fd := int(tty.Fd())
×
NEW
131
                // if in background, we can't control the terminal
×
NEW
132
                if !isForeground(fd) {
×
NEW
133
                        return 0b00000
×
NEW
134
                }
×
135

NEW
136
                t, err := unix.IoctlGetTermios(fd, tcgetattr)
×
NEW
137
                if err != nil {
×
NEW
138
                        return 0b00000
×
NEW
139
                }
×
NEW
140
                defer unix.IoctlSetTermios(fd, tcsetattr, t) //nolint:errcheck
×
NEW
141

×
NEW
142
                noecho := *t
×
NEW
143
                noecho.Lflag = noecho.Lflag &^ unix.ECHO
×
NEW
144
                noecho.Lflag = noecho.Lflag &^ unix.ICANON
×
NEW
145
                if err := unix.IoctlSetTermios(fd, tcsetattr, &noecho); err != nil {
×
NEW
146
                        return 0b00000
×
NEW
147
                }
×
148
        }
149

150
        // first, send CSI query to see whether this terminal supports the
151
        // kitty keyboard protocol
NEW
152
        fmt.Fprintf(tty, CSI+"?u")
×
NEW
153

×
NEW
154
        // then, query primary device data, should be supported by all terminals
×
NEW
155
        // if we receive a response for the primary device data befor the kitty keyboard
×
NEW
156
        // protocol response, this terminal does not support kitty keyboard protocol.
×
NEW
157
        fmt.Fprintf(tty, CSI+"c")
×
NEW
158

×
NEW
159
        response, isAttrs, err := o.readNextResponseKittyKeyboardProtocol()
×
NEW
160

×
NEW
161
        // we queried for the kitty keyboard protocol current progressive enhancements
×
NEW
162
        // but received the primary device attributes response, therefore this terminal
×
NEW
163
        // does not support the kitty keyboard protocol.
×
NEW
164
        if err != nil || isAttrs {
×
NEW
165
                return 0
×
NEW
166
        }
×
167

168
        // read the primary attrs response and ignore it.
NEW
169
        _, _, err = o.readNextResponseKittyKeyboardProtocol()
×
NEW
170
        if err != nil {
×
NEW
171
                return 0
×
NEW
172
        }
×
173

174
        // we receive a valid response to the kitty keyboard protocol query, this
175
        // terminal supports the protocol.
176
        //
177
        // parse the response and return the flags supported.
178
        //
179
        //   0    1 2 3 4
180
        //   \x1b [ ? 1 u
181
        //
NEW
182
        if len(response) <= 3 {
×
NEW
183
                return 0
×
NEW
184
        }
×
185

NEW
186
        return response[3]
×
187
}
188

189
func (o *Output) waitForData(timeout time.Duration) error {
×
190
        fd := o.TTY().Fd()
×
191
        tv := unix.NsecToTimeval(int64(timeout))
×
192
        var readfds unix.FdSet
×
193
        readfds.Set(int(fd))
×
194

×
195
        for {
×
196
                n, err := unix.Select(int(fd)+1, &readfds, nil, nil, &tv)
×
197
                if err == unix.EINTR {
×
198
                        continue
×
199
                }
200
                if err != nil {
×
201
                        return err
×
202
                }
×
203
                if n == 0 {
×
204
                        return fmt.Errorf("timeout")
×
205
                }
×
206

207
                break
×
208
        }
209

210
        return nil
×
211
}
212

213
func (o *Output) readNextByte() (byte, error) {
×
214
        if !o.unsafe {
×
215
                if err := o.waitForData(OSCTimeout); err != nil {
×
216
                        return 0, err
×
217
                }
×
218
        }
219

220
        var b [1]byte
×
221
        n, err := o.TTY().Read(b[:])
×
222
        if err != nil {
×
223
                return 0, err
×
224
        }
×
225

226
        if n == 0 {
×
227
                panic("read returned no data")
×
228
        }
229

230
        return b[0], nil
×
231
}
232

233
// readNextResponseKittyKeyboardProtocol reads either a CSI response to the current
234
// progressive enhancement status or primary device attributes response.
235
//   - CSI response: "\x1b]?31u"
236
//   - primary device attributes response: "\x1b]?64;1;2;7;8;9;15;18;21;44;45;46c"
NEW
237
func (o *Output) readNextResponseKittyKeyboardProtocol() (response string, isAttrs bool, err error) {
×
NEW
238
        start, err := o.readNextByte()
×
NEW
239
        if err != nil {
×
NEW
240
                return "", false, ErrStatusReport
×
NEW
241
        }
×
242

243
        // first byte must be ESC
NEW
244
        for start != ESC {
×
NEW
245
                start, err = o.readNextByte()
×
NEW
246
                if err != nil {
×
NEW
247
                        return "", false, ErrStatusReport
×
NEW
248
                }
×
249
        }
250

NEW
251
        response += string(start)
×
NEW
252

×
NEW
253
        // next byte is [
×
NEW
254
        tpe, err := o.readNextByte()
×
NEW
255
        if err != nil {
×
NEW
256
                return "", false, ErrStatusReport
×
NEW
257
        }
×
NEW
258
        response += string(tpe)
×
NEW
259

×
NEW
260
        if tpe != '[' {
×
NEW
261
                return "", false, ErrStatusReport
×
NEW
262
        }
×
263

NEW
264
        for {
×
NEW
265
                b, err := o.readNextByte()
×
NEW
266
                if err != nil {
×
NEW
267
                        return "", false, ErrStatusReport
×
NEW
268
                }
×
NEW
269
                response += string(b)
×
NEW
270

×
NEW
271
                switch b {
×
NEW
272
                case 'u':
×
NEW
273
                        // kitty keyboard protocol response
×
NEW
274
                        return response, false, nil
×
NEW
275
                case 'c':
×
NEW
276
                        // primary device attributes response
×
NEW
277
                        return response, true, nil
×
278
                }
279

280
                // both responses have less than 38 bytes, so if we read more, that's an error
NEW
281
                if len(response) > 38 {
×
NEW
282
                        break
×
283
                }
284
        }
285

NEW
286
        return response, isAttrs, nil
×
287
}
288

289
// readNextResponse reads either an OSC response or a cursor position response:
290
//   - OSC response: "\x1b]11;rgb:1111/1111/1111\x1b\\"
291
//   - cursor position response: "\x1b[42;1R"
292
func (o *Output) readNextResponse() (response string, isOSC bool, err error) {
×
293
        start, err := o.readNextByte()
×
294
        if err != nil {
×
295
                return "", false, err
×
296
        }
×
297

298
        // first byte must be ESC
299
        for start != ESC {
×
300
                start, err = o.readNextByte()
×
301
                if err != nil {
×
302
                        return "", false, err
×
303
                }
×
304
        }
305

306
        response += string(start)
×
307

×
308
        // next byte is either '[' (cursor position response) or ']' (OSC response)
×
309
        tpe, err := o.readNextByte()
×
310
        if err != nil {
×
311
                return "", false, err
×
312
        }
×
313

314
        response += string(tpe)
×
315

×
316
        var oscResponse bool
×
317
        switch tpe {
×
318
        case '[':
×
319
                oscResponse = false
×
320
        case ']':
×
321
                oscResponse = true
×
322
        default:
×
323
                return "", false, ErrStatusReport
×
324
        }
325

326
        for {
×
327
                b, err := o.readNextByte()
×
328
                if err != nil {
×
329
                        return "", false, err
×
330
                }
×
331

332
                response += string(b)
×
333

×
334
                if oscResponse {
×
335
                        // OSC can be terminated by BEL (\a) or ST (ESC)
×
336
                        if b == BEL || strings.HasSuffix(response, string(ESC)) {
×
337
                                return response, true, nil
×
338
                        }
×
339
                } else {
×
340
                        // cursor position response is terminated by 'R'
×
341
                        if b == 'R' {
×
342
                                return response, false, nil
×
343
                        }
×
344
                }
345

346
                // both responses have less than 25 bytes, so if we read more, that's an error
347
                if len(response) > 25 {
×
348
                        break
×
349
                }
350
        }
351

352
        return "", false, ErrStatusReport
×
353
}
354

355
func (o Output) termStatusReport(sequence int) (string, error) {
×
356
        // screen/tmux can't support OSC, because they can be connected to multiple
×
357
        // terminals concurrently.
×
358
        term := o.environ.Getenv("TERM")
×
359
        if strings.HasPrefix(term, "screen") || strings.HasPrefix(term, "tmux") {
×
360
                return "", ErrStatusReport
×
361
        }
×
362

363
        tty := o.TTY()
×
364
        if tty == nil {
×
365
                return "", ErrStatusReport
×
366
        }
×
367

368
        if !o.unsafe {
×
369
                fd := int(tty.Fd())
×
370
                // if in background, we can't control the terminal
×
371
                if !isForeground(fd) {
×
372
                        return "", ErrStatusReport
×
373
                }
×
374

375
                t, err := unix.IoctlGetTermios(fd, tcgetattr)
×
376
                if err != nil {
×
377
                        return "", fmt.Errorf("%s: %s", ErrStatusReport, err)
×
378
                }
×
379
                defer unix.IoctlSetTermios(fd, tcsetattr, t) //nolint:errcheck
×
380

×
381
                noecho := *t
×
382
                noecho.Lflag = noecho.Lflag &^ unix.ECHO
×
383
                noecho.Lflag = noecho.Lflag &^ unix.ICANON
×
384
                if err := unix.IoctlSetTermios(fd, tcsetattr, &noecho); err != nil {
×
385
                        return "", fmt.Errorf("%s: %s", ErrStatusReport, err)
×
386
                }
×
387
        }
388

389
        // first, send OSC query, which is ignored by terminal which do not support it
390
        fmt.Fprintf(tty, OSC+"%d;?"+ST, sequence)
×
391

×
392
        // then, query cursor position, should be supported by all terminals
×
393
        fmt.Fprintf(tty, CSI+"6n")
×
394

×
395
        // read the next response
×
396
        res, isOSC, err := o.readNextResponse()
×
397
        if err != nil {
×
398
                return "", fmt.Errorf("%s: %s", ErrStatusReport, err)
×
399
        }
×
400

401
        // if this is not OSC response, then the terminal does not support it
402
        if !isOSC {
×
403
                return "", ErrStatusReport
×
404
        }
×
405

406
        // read the cursor query response next and discard the result
407
        _, _, err = o.readNextResponse()
×
408
        if err != nil {
×
409
                return "", err
×
410
        }
×
411

412
        // fmt.Println("Rcvd", res[1:])
413
        return res, nil
×
414
}
415

416
// EnableVirtualTerminalProcessing enables virtual terminal processing on
417
// Windows for w and returns a function that restores w to its previous state.
418
// On non-Windows platforms, or if w does not refer to a terminal, then it
419
// returns a non-nil no-op function and no error.
420
func EnableVirtualTerminalProcessing(_ io.Writer) (func() error, error) {
1✔
421
        return func() error { return nil }, nil
2✔
422
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc