• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence / 0187fdd2-f4a4-4c9a-97b4-6604937bf7be

09 May 2023 12:23AM UTC coverage: 57.253% (-0.002%) from 57.255%
0187fdd2-f4a4-4c9a-97b4-6604937bf7be

Pull #5252

buildkite

David Porter
Merge branch 'master' into feature/zonal-partitioning
Pull Request #5252: Feature/zonal partitioning

1460 of 1460 new or added lines in 51 files covered. (100.0%)

86909 of 151799 relevant lines covered (57.25%)

2482.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.4
/common/rpc/middleware.go
1
// Copyright (c) 2021 Uber Technologies, Inc.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19
// THE SOFTWARE.
20

21
package rpc
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "io"
27

28
        "github.com/uber/cadence/common"
29
        "github.com/uber/cadence/common/config"
30
        "github.com/uber/cadence/common/metrics"
31
        "github.com/uber/cadence/common/partition"
32

33
        "go.uber.org/cadence/worker"
34
        "go.uber.org/yarpc"
35
        "go.uber.org/yarpc/api/transport"
36
)
37

38
type authOutboundMiddleware struct {
39
        authProvider worker.AuthorizationProvider
40
}
41

42
func (m *authOutboundMiddleware) Call(ctx context.Context, request *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
3✔
43
        if m.authProvider == nil {
4✔
44
                return out.Call(ctx, request)
1✔
45
        }
1✔
46

47
        token, err := m.authProvider.GetAuthToken()
2✔
48
        if err != nil {
3✔
49
                return nil, err
1✔
50
        }
1✔
51
        request.Headers = request.Headers.
1✔
52
                With(common.AuthorizationTokenHeaderName, string(token))
1✔
53

1✔
54
        return out.Call(ctx, request)
1✔
55
}
56

57
type contextKey string
58

59
const _responseInfoContextKey = contextKey("response-info")
60

61
// ContextWithResponseInfo will create a child context that has ResponseInfo set as value.
62
// This value will get filled after the call is made and can be used later to retrieve some info of interest.
63
func ContextWithResponseInfo(parent context.Context) (context.Context, *ResponseInfo) {
2✔
64
        responseInfo := &ResponseInfo{}
2✔
65
        return context.WithValue(parent, _responseInfoContextKey, responseInfo), responseInfo
2✔
66
}
2✔
67

68
// ResponseInfo structure is filled with data after the RPC call.
69
// It can be obtained with rpc.ContextWithResponseInfo function.
70
type ResponseInfo struct {
71
        Size int
72
}
73

74
type countingReadCloser struct {
75
        reader    io.ReadCloser
76
        bytesRead *int
77
}
78

79
func (r *countingReadCloser) Read(p []byte) (n int, err error) {
2✔
80
        n, err = r.reader.Read(p)
2✔
81
        *r.bytesRead += n
2✔
82
        return n, err
2✔
83
}
2✔
84

85
func (r *countingReadCloser) Close() (err error) {
×
86
        return r.reader.Close()
×
87
}
×
88

89
// ResponseInfoMiddleware populates context with ResponseInfo structure which contains info about response that was received.
90
// In particular, it counts the size of the response in bytes. Such information can be useful down the line, where payload are deserialized and no longer have their size.
91
type ResponseInfoMiddleware struct{}
92

93
func (m *ResponseInfoMiddleware) Call(ctx context.Context, request *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
25,696✔
94
        response, err := out.Call(ctx, request)
25,696✔
95

25,696✔
96
        if value := ctx.Value(_responseInfoContextKey); value != nil {
25,698✔
97
                if responseInfo, ok := value.(*ResponseInfo); ok && response != nil {
3✔
98
                        // We can not use response.BodySize here, because it is not set on all transports.
1✔
99
                        // Instead wrap body reader with counter, that increments responseInfo.Size as it is read.
1✔
100
                        response.Body = &countingReadCloser{reader: response.Body, bytesRead: &responseInfo.Size}
1✔
101
                }
1✔
102
        }
103

104
        return response, err
25,696✔
105
}
106

107
// InboundMetricsMiddleware tags context with additional metric tags from incoming request.
108
type InboundMetricsMiddleware struct{}
109

110
func (m *InboundMetricsMiddleware) Handle(ctx context.Context, req *transport.Request, resw transport.ResponseWriter, h transport.UnaryHandler) error {
1✔
111
        ctx = metrics.TagContext(ctx,
1✔
112
                metrics.CallerTag(req.Caller),
1✔
113
                metrics.TransportTag(req.Transport),
1✔
114
        )
1✔
115
        return h.Handle(ctx, req, resw)
1✔
116
}
1✔
117

118
type overrideCallerMiddleware struct {
119
        caller string
120
}
121

122
func (m *overrideCallerMiddleware) Call(ctx context.Context, request *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
263✔
123
        request.Caller = m.caller
263✔
124
        return out.Call(ctx, request)
263✔
125
}
263✔
126

127
// HeaderForwardingMiddleware forwards headers from current inbound RPC call that is being handled to new outbound calls being made.
128
// As this does NOT differentiate between transports or purposes, it generally assumes we are not acting as a true proxy,
129
// so things like content lengths and encodings should not be forwarded - they will be provided by the outbound RPC library as needed.
130
//
131
// Duplicated headers retain the first value only, matching how browsers and Go (afaict) generally behave.
132
//
133
// This uses overly-simplified rules for choosing which headers are forwarded and which are not, intended to be lightly configurable.
134
// For a more in-depth logic review if it becomes needed, check:
135
//   - How Go's ReverseProxy deals with headers, e.g. per-protocol and a list of exclusions: https://cs.opensource.google/go/go/+/refs/tags/go1.20.1:src/net/http/httputil/reverseproxy.go;l=332
136
//   - HTTP's spec for headers, namely how duplicates and Connection work: https://www.rfc-editor.org/rfc/rfc9110.html#name-header-fields
137
//   - Many browsers prefer first-value-wins for unexpected duplicates: https://bugzilla.mozilla.org/show_bug.cgi?id=376756
138
//   - But there are MANY map-like implementations that choose last-value wins, and this mismatch is a source of frequent security problems.
139
//   - YARPC's `With` only retains the last call's value: https://github.com/yarpc/yarpc-go/blob/8ccd79a2ca696150213faac1d35011c5be52e5fb/api/transport/header.go#L69-L77
140
//   - Go's MIMEHeader's Get (used by YARPC) only returns the first value, and does not join duplicates: https://pkg.go.dev/net/textproto#MIMEHeader.Get
141
//
142
// There is likely no correct choice, as it depends on the recipients' behavior.
143
// If we need to support more complex logic, it's likely worth jumping to a fully-controllable thing.
144
// Middle-grounds will probably just need to be changed again later.
145
type HeaderForwardingMiddleware struct {
146
        // Rules are applied in order to add or remove headers by regex.
147
        //
148
        // There are no default rules, so by default no headers are copied.
149
        // To include headers by default, Add with a permissive regex and then remove specific ones.
150
        Rules []config.HeaderRule
151
}
152

153
func (m *HeaderForwardingMiddleware) Call(ctx context.Context, request *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
3✔
154
        if inboundCall := yarpc.CallFromContext(ctx); inboundCall != nil {
5✔
155
                outboundHeaders := request.Headers
2✔
156
                pending := make(map[string][]string, len(inboundCall.HeaderNames()))
2✔
157
                names := inboundCall.HeaderNames()
2✔
158
                for _, rule := range m.Rules {
5✔
159
                        for _, key := range names {
12✔
160
                                if !rule.Match.MatchString(key) {
11✔
161
                                        continue
2✔
162
                                }
163
                                if _, ok := outboundHeaders.Get(key); ok {
9✔
164
                                        continue // do not overwrite existing headers
2✔
165
                                }
166
                                if rule.Add {
9✔
167
                                        pending[key] = append(pending[key], inboundCall.Header(key))
4✔
168
                                } else {
5✔
169
                                        delete(pending, key)
1✔
170
                                }
1✔
171
                        }
172
                }
173
                for k, vs := range pending {
5✔
174
                        // yarpc's Headers.With keeps the LAST value, but we (and browsers) prefer the FIRST,
3✔
175
                        // and we do not canonicalize duplicates.
3✔
176
                        request.Headers = request.Headers.With(k, vs[0])
3✔
177
                }
3✔
178
        }
179

180
        return out.Call(ctx, request)
3✔
181
}
182

183
// ForwardPartitionConfigMiddleware forwards the partition config to remote cluster
184
// The middleware should always be applied after any other middleware that inject partition config into the context
185
// so that it can overwrites the partition config into the context
186
// The purpose of this middleware is to make sure the partition config doesn't change when a request is forwarded from
187
// passive cluster to the active cluster
188
type ForwardPartitionConfigMiddleware struct{}
189

190
func (m *ForwardPartitionConfigMiddleware) Handle(ctx context.Context, req *transport.Request, resw transport.ResponseWriter, h transport.UnaryHandler) error {
4✔
191
        if _, ok := req.Headers.Get(common.AutoforwardingClusterHeaderName); ok {
7✔
192
                var partitionConfig map[string]string
3✔
193
                if blob, ok := req.Headers.Get(common.PartitionConfigHeaderName); ok && len(blob) > 0 {
5✔
194
                        if err := json.Unmarshal([]byte(blob), &partitionConfig); err != nil {
2✔
195
                                return err
×
196
                        }
×
197
                }
198
                ctx = partition.ContextWithConfig(ctx, partitionConfig)
3✔
199
                isolationGroup, _ := req.Headers.Get(common.IsolationGroupHeaderName)
3✔
200
                ctx = partition.ContextWithIsolationGroup(ctx, isolationGroup)
3✔
201
        }
202
        return h.Handle(ctx, req, resw)
4✔
203
}
204

205
func (m *ForwardPartitionConfigMiddleware) Call(ctx context.Context, request *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
4✔
206
        if _, ok := request.Headers.Get(common.AutoforwardingClusterHeaderName); ok {
7✔
207
                partitionConfig := partition.ConfigFromContext(ctx)
3✔
208
                if len(partitionConfig) > 0 {
5✔
209
                        blob, err := json.Marshal(partitionConfig)
2✔
210
                        if err != nil {
2✔
211
                                return nil, err
×
212
                        }
×
213
                        request.Headers = request.Headers.With(common.PartitionConfigHeaderName, string(blob))
2✔
214
                } else {
1✔
215
                        request.Headers.Del(common.PartitionConfigHeaderName)
1✔
216
                }
1✔
217
                isolationGroup := partition.IsolationGroupFromContext(ctx)
3✔
218
                if isolationGroup != "" {
5✔
219
                        request.Headers = request.Headers.With(common.IsolationGroupHeaderName, isolationGroup)
2✔
220
                } else {
3✔
221
                        request.Headers.Del(common.IsolationGroupHeaderName)
1✔
222
                }
1✔
223
        }
224
        return out.Call(ctx, request)
4✔
225
}
226

227
// ZonalPartitionConfigMiddleware stores the partition config and isolation group of the request into the context
228
// It reads a header from client request which indicates the zone which the request is from and uses it as the isolation group
229
type ZonalPartitionConfigMiddleware struct{}
230

231
func (m *ZonalPartitionConfigMiddleware) Handle(ctx context.Context, req *transport.Request, resw transport.ResponseWriter, h transport.UnaryHandler) error {
2✔
232
        zone, _ := req.Headers.Get(common.ClientZoneHeaderName)
2✔
233
        if zone != "" {
3✔
234
                ctx = partition.ContextWithConfig(ctx, map[string]string{
1✔
235
                        partition.IsolationGroupKey: zone,
1✔
236
                })
1✔
237
                ctx = partition.ContextWithIsolationGroup(ctx, zone)
1✔
238
        }
1✔
239
        return h.Handle(ctx, req, resw)
2✔
240
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc