• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pomerium / pomerium / 19475484620

18 Nov 2025 05:36PM UTC coverage: 54.799% (-0.04%) from 54.843%
19475484620

push

github

web-flow
fix: databroker client updates should propagate to ssh codes (#5935)

## Summary

SSH auth code flow is causing Pomerium to not start cleanly / update
properly.

Databroker grpc client changes now propagate to the SSH code manager

## Related issues

N/A, slack thread.

## User Explanation

N/A

## Checklist

- [X] reference any related issues
- [X] updated unit tests
- [X] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [X] ready for review

4 of 66 new or added lines in 7 files covered. (6.06%)

25 existing lines in 7 files now uncovered.

28697 of 52368 relevant lines covered (54.8%)

93.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/ssh/code/issuer.go
1
package code
2

3
import (
4
        "context"
5
        "crypto/rand"
6
        "encoding/base64"
7
        "fmt"
8
        "sync"
9
        "sync/atomic"
10
        "time"
11

12
        "github.com/cenkalti/backoff/v4"
13
        "golang.org/x/sync/errgroup"
14
        "google.golang.org/grpc/codes"
15
        "google.golang.org/grpc/status"
16

17
        "github.com/pomerium/pomerium/pkg/grpc/databroker"
18
        "github.com/pomerium/pomerium/pkg/grpc/session"
19
        "github.com/pomerium/pomerium/pkg/grpcutil"
20
        "github.com/pomerium/pomerium/pkg/protoutil"
21
)
22

23
type issuer struct {
24
        done chan struct{}
25

26
        setupDone *atomic.Uint32
27
        setupF    *sync.Once
28
        // CodeAcessor
29

30
        mgr *codeManager
31
        Reader
32
        Revoker
33

34
        clientB databroker.ClientGetter
35
}
36

NEW
37
func NewIssuer(ctx context.Context, client databroker.ClientGetter) Issuer {
×
38
        doneC := make(chan struct{})
×
39
        initVal := &atomic.Uint32{}
×
40
        initVal.Store(0)
×
41
        i := &issuer{
×
NEW
42
                clientB:   client,
×
43
                done:      doneC,
×
44
                setupDone: initVal,
×
45
                setupF:    &sync.Once{},
×
46
                mgr:       newCodeManager(client),
×
47
                Reader:    NewReader(client),
×
48
                Revoker:   NewRevoker(client),
×
49
        }
×
50

×
51
        eg, ctxca := errgroup.WithContext(ctx)
×
52

×
53
        eg.Go(func() error {
×
54
                syncer := databroker.NewSyncer(
×
55
                        ctxca,
×
56
                        "session-biding-request-mgr",
×
57
                        i.mgr,
×
58
                        databroker.WithTypeURL("type.googleapis.com/session.SessionBindingRequest"),
×
59
                )
×
60
                return syncer.Run(ctxca)
×
61
        })
×
62
        go func() {
×
63
                defer close(i.done)
×
64
                _ = eg.Wait()
×
65
        }()
×
66
        return i
×
67
}
68

69
var _ Issuer = (*issuer)(nil)
70

71
func (i *issuer) waitForSetup() error {
×
72
        // FIXME: this needs to run once everywhere we query SessionBindingRequest and SessionBinding's
×
73
        // we want to avoid sharing a sync.Once for coordination across packages, and run this only once
×
74
        // per pomerium instance.
×
75
        i.setupF.Do(func() {
×
76
                ctxT, ca := context.WithTimeout(context.Background(), 5*time.Minute)
×
77
                defer ca()
×
78
                if err := i.setup(ctxT); err != nil {
×
79
                        panic(err)
×
80
                }
81
        })
82

83
        if i.setupDone.Load() == 0 {
×
84
                return fmt.Errorf("not yet initialized")
×
85
        }
×
86
        return nil
×
87
}
88

89
func (i *issuer) IssueCode() CodeID {
×
90
        code := [16]byte{}
×
91
        _, _ = rand.Read(code[:])
×
92
        codeStr := base64.RawURLEncoding.EncodeToString(code[:])
×
93
        return CodeID(codeStr)
×
94
}
×
95

96
func (i *issuer) OnCodeDecision(ctx context.Context, code CodeID) <-chan Status {
×
97
        ret := make(chan Status, 1)
×
98

×
99
        go func() {
×
100
                defer close(ret)
×
101
                t := time.NewTicker(time.Millisecond * 150)
×
102
                defer t.Stop()
×
103
                id := string(code)
×
104

×
105
                for {
×
106
                RETRY:
×
107
                        select {
×
108
                        case <-ctx.Done():
×
109
                                return
×
110
                        case <-t.C:
×
111
                                st, ok := i.mgr.GetByCodeID(id)
×
112
                                if !ok {
×
113
                                        goto RETRY
×
114
                                }
115
                                if st.ExpiresAt.Before(time.Now()) {
×
116
                                        return
×
117
                                }
×
118
                                if st.State != session.SessionBindingRequestState_InFlight {
×
119
                                        ret <- st
×
120
                                        return
×
121
                                }
×
122
                        }
123
                }
124
        }()
125
        return ret
×
126
}
127

128
func (i *issuer) setup(ctx context.Context) error {
×
129
        reqCap := uint64(50000)
×
130

×
131
        b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
×
132
        if err := backoff.Retry(func() error {
×
NEW
133
                _, err := i.clientB.GetDataBrokerServiceClient().SetOptions(ctx, &databroker.SetOptionsRequest{
×
134
                        Type: "type.googleapis.com/session.SessionBindingRequest",
×
135
                        Options: &databroker.Options{
×
136
                                Capacity:        &reqCap,
×
137
                                IndexableFields: []string{"key"},
×
138
                        },
×
139
                })
×
140
                return err
×
141
        }, b); err != nil {
×
142
                return err
×
143
        }
×
144

145
        if err := backoff.Retry(func() error {
×
NEW
146
                _, err := i.clientB.GetDataBrokerServiceClient().SetOptions(ctx, &databroker.SetOptionsRequest{
×
147
                        Type: "type.googleapis.com/session.SessionBinding",
×
148
                        Options: &databroker.Options{
×
149
                                IndexableFields: []string{
×
150
                                        "session_id",
×
151
                                        "user_id",
×
152
                                },
×
153
                        },
×
154
                })
×
155
                return err
×
156
        }, b); err != nil {
×
157
                return err
×
158
        }
×
159

160
        if err := backoff.Retry(func() error {
×
NEW
161
                _, err := i.clientB.GetDataBrokerServiceClient().SetOptions(ctx, &databroker.SetOptionsRequest{
×
162
                        Type: "type.googleapis.com/session.IdentityBinding",
×
163
                        Options: &databroker.Options{
×
164
                                IndexableFields: []string{
×
165
                                        "user_id",
×
166
                                },
×
167
                        },
×
168
                })
×
169
                return err
×
170
        }, b); err != nil {
×
171
                return err
×
172
        }
×
173
        i.setupDone.CompareAndSwap(0, 1)
×
174
        return nil
×
175
}
176

177
func (i *issuer) AssociateCode(
178
        ctx context.Context,
179
        code CodeID,
180
        sbr *session.SessionBindingRequest,
181
) (CodeID, error) {
×
182
        if err := i.waitForSetup(); err != nil {
×
183
                return "", err
×
184
        }
×
185
        b := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
×
186
        maybeCode, err := backoff.RetryWithData(func() (CodeID, error) {
×
NEW
187
                maybeCode, err := getCodeByBindingKey(ctx, i.clientB.GetDataBrokerServiceClient(), sbr.Key)
×
188
                if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
×
189
                        return "", nil
×
190
                }
×
191
                return maybeCode, nil
×
192
        }, b)
193
        if err != nil {
×
194
                return "", err
×
195
        }
×
196
        if maybeCode == "" {
×
NEW
197
                if _, err := i.clientB.GetDataBrokerServiceClient().Put(ctx, &databroker.PutRequest{
×
198
                        Records: []*databroker.Record{
×
199
                                {
×
200
                                        Type: grpcutil.GetTypeURL(sbr),
×
201
                                        Id:   string(code),
×
202
                                        Data: protoutil.NewAny(sbr),
×
203
                                },
×
204
                        },
×
205
                }); err != nil {
×
206
                        return "", err
×
207
                }
×
208
        }
209
        maybeCode, err = backoff.RetryWithData(func() (CodeID, error) {
×
NEW
210
                maybeCode, err := getCodeByBindingKey(ctx, i.clientB.GetDataBrokerServiceClient(), sbr.Key)
×
211
                if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
×
212
                        return "", nil
×
213
                }
×
214
                if err != nil {
×
215
                        return "", err
×
216
                }
×
217
                if maybeCode == "" {
×
218
                        return "", fmt.Errorf("failed to resolve code")
×
219
                }
×
220
                return maybeCode, nil
×
221
        }, b)
222
        if err != nil {
×
223
                return "", err
×
224
        }
×
225
        return maybeCode, nil
×
226
}
227

228
func (i *issuer) Done() chan struct{} {
×
229
        return i.done
×
230
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc