• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ThreeMammals / Ocelot / 18870980298

28 Oct 2025 10:00AM UTC coverage: 91.723% (-0.04%) from 91.759%
18870980298

Pull #2328

github

web-flow
Merge 46252ad9a into e3ad51c9d
Pull Request #2328: #2248 Ensure correct mapping of `RouteKeysConfig` arrays in aggregates

6383 of 6959 relevant lines covered (91.72%)

3884.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.52
src/Ocelot/Multiplexer/MultiplexingMiddleware.cs
1
using Microsoft.AspNetCore.Http;
2
using Microsoft.Extensions.Primitives;
3
using Newtonsoft.Json.Linq;
4
using Ocelot.Configuration;
5
using Ocelot.Configuration.File;
6
using Ocelot.DownstreamRouteFinder.UrlMatcher;
7
using Ocelot.Logging;
8
using Ocelot.Middleware;
9
using System.Collections;
10
using Route = Ocelot.Configuration.Route;
11

12
namespace Ocelot.Multiplexer;
13

14
public class MultiplexingMiddleware : OcelotMiddleware
15
{
16
    private readonly RequestDelegate _next;
17
    private readonly IResponseAggregatorFactory _factory;
18
    private const string RequestIdString = "RequestId";
19

20
    public MultiplexingMiddleware(RequestDelegate next,
21
        IOcelotLoggerFactory loggerFactory,
22
        IResponseAggregatorFactory factory)
23
        : base(loggerFactory.CreateLogger<MultiplexingMiddleware>())
41✔
24
    {
25
        _factory = factory;
41✔
26
        _next = next;
41✔
27
    }
41✔
28

29
    public async Task Invoke(HttpContext httpContext)
30
    {
31
        var downstreamRouteHolder = httpContext.Items.DownstreamRouteHolder();
19✔
32
        var route = downstreamRouteHolder.Route;
19✔
33
        var downstreamRoutes = route.DownstreamRoute;
19✔
34

35
        // Case 1: if websocket request or single downstream route
36
        if (ShouldProcessSingleRoute(httpContext, downstreamRoutes))
19✔
37
        {
38
            await ProcessSingleRouteAsync(httpContext, downstreamRoutes[0]);
4✔
39
            return;
4✔
40
        }
41

42
        // Case 2: if no downstream routes
43
        if (downstreamRoutes.Count == 0)
15✔
44
        {
45
            return;
1✔
46
        }
47

48
        // Case 3: if multiple downstream routes
49
        var routeKeysConfigs = route.DownstreamRouteConfig;
14✔
50
        if (routeKeysConfigs == null || routeKeysConfigs.Count == 0)
14✔
51
        {
52
            await ProcessRoutesAsync(httpContext, route);
13✔
53
            return;
13✔
54
        }
55

56
        // Case 4: if multiple downstream routes with route keys
57
        var mainResponseContext = await ProcessMainRouteAsync(httpContext, downstreamRoutes[0]);
1✔
58
        if (mainResponseContext == null)
1✔
59
        {
60
            return;
×
61
        }
62

63
        var responsesContexts = await ProcessRoutesWithRouteKeysAsync(httpContext, downstreamRoutes, routeKeysConfigs, mainResponseContext);
1✔
64
        if (responsesContexts.Length == 0)
1✔
65
        {
66
            return;
×
67
        }
68

69
        await MapResponsesAsync(httpContext, route, mainResponseContext, responsesContexts);
1✔
70
    }
19✔
71

72
    /// <summary>
73
    /// Helper method to determine if only the first downstream route should be processed.
74
    /// It is the case if the request is a websocket request or if there is only one downstream route.
75
    /// </summary>
76
    /// <param name="context">The http context.</param>
77
    /// <param name="routes">The downstream routes.</param>
78
    /// <returns>True if only the first downstream route should be processed.</returns>
79
    private static bool ShouldProcessSingleRoute(HttpContext context, ICollection routes)
80
        => context.WebSockets.IsWebSocketRequest || routes.Count == 1;
19✔
81

82
    /// <summary>
83
    /// Processing a single downstream route (no route keys).
84
    /// In that case, no need to make copies of the http context.
85
    /// </summary>
86
    /// <param name="context">The http context.</param>
87
    /// <param name="route">The downstream route.</param>
88
    /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
89
    protected virtual Task ProcessSingleRouteAsync(HttpContext context, DownstreamRoute route)
90
    {
91
        context.Items.UpsertDownstreamRoute(route);
2✔
92
        return _next.Invoke(context);
2✔
93
    }
94

95
    /// <summary>
96
    /// Processing the downstream routes (no route keys).
97
    /// </summary>
98
    /// <param name="context">The main http context.</param>
99
    /// <param name="route">The route.</param>
100
    private async Task ProcessRoutesAsync(HttpContext context, Route route)
101
    {
102
        var tasks = route.DownstreamRoute
13✔
103
            .Select(downstreamRoute => ProcessRouteAsync(context, downstreamRoute))
41✔
104
            .ToArray();
13✔
105
        var contexts = await Task.WhenAll(tasks);
13✔
106
        await MapAsync(context, route, new(contexts));
13✔
107
    }
13✔
108

109
    /// <summary>
110
    /// When using route keys, the first route is the main route and the rest are additional routes.
111
    /// Since we need to break if the main route response is null, we must process the main route first.
112
    /// </summary>
113
    /// <param name="context">The http context.</param>
114
    /// <param name="route">The first route, the main route.</param>
115
    /// <returns>The updated http context.</returns>
116
    private async Task<HttpContext> ProcessMainRouteAsync(HttpContext context, DownstreamRoute route)
117
    {
118
        context.Items.UpsertDownstreamRoute(route);
1✔
119
        await _next.Invoke(context);
1✔
120
        return context;
1✔
121
    }
1✔
122

123
    /// <summary>
124
    /// Processing the downstream routes with route keys except the main route that has already been processed.
125
    /// </summary>
126
    /// <param name="context">The main http context.</param>
127
    /// <param name="routes">The downstream routes.</param>
128
    /// <param name="routeKeysConfigs">The route keys config.</param>
129
    /// <param name="mainResponse">The response from the main route.</param>
130
    /// <returns>A list of the tasks' http contexts.</returns>
131
    protected virtual async Task<HttpContext[]> ProcessRoutesWithRouteKeysAsync(HttpContext context, IEnumerable<DownstreamRoute> routes, IReadOnlyCollection<AggregateRouteConfig> routeKeysConfigs, HttpContext mainResponse)
132
    {
133
        var processing = new List<Task<HttpContext>>();
1✔
134
        var content = await mainResponse.Items.DownstreamResponse().Content.ReadAsStringAsync();
1✔
135
        var jObject = JToken.Parse(content);
1✔
136

137
        foreach (var downstreamRoute in routes.Skip(1))
6✔
138
        {
139
            var matchAdvancedAgg = routeKeysConfigs.FirstOrDefault(q => q.RouteKey == downstreamRoute.Key);
5✔
140
            if (matchAdvancedAgg != null)
2✔
141
            {
142
                processing.AddRange(ProcessRouteWithComplexAggregation(matchAdvancedAgg, jObject, context, downstreamRoute));
2✔
143
                continue;
2✔
144
            }
145

146
            processing.Add(ProcessRouteAsync(context, downstreamRoute));
×
147
        }
148

149
        return await Task.WhenAll(processing);
1✔
150
    }
1✔
151

152
    /// <summary>
153
    /// Mapping responses.
154
    /// </summary>
155
    private Task MapResponsesAsync(HttpContext context, Route route, HttpContext mainResponseContext, IEnumerable<HttpContext> responsesContexts)
156
    {
157
        var contexts = new List<HttpContext> { mainResponseContext };
1✔
158
        contexts.AddRange(responsesContexts);
1✔
159
        return MapAsync(context, route, contexts);
1✔
160
    }
161

162
    /// <summary>
163
    /// Processing a route with aggregation.
164
    /// </summary>
165
    private IEnumerable<Task<HttpContext>> ProcessRouteWithComplexAggregation(AggregateRouteConfig matchAdvancedAgg,
166
        JToken jObject, HttpContext httpContext, DownstreamRoute downstreamRoute)
167
    {
168
        var processing = new List<Task<HttpContext>>();
2✔
169
        var values = jObject.SelectTokens(matchAdvancedAgg.JsonPath).Select(s => s.ToString()).Distinct();
6✔
170
        foreach (var value in values)
8✔
171
        {
172
            var tPnv = new List<PlaceholderNameAndValue>(httpContext.Items.TemplatePlaceholderNameAndValues())
2✔
173
            {
2✔
174
                new('{' + matchAdvancedAgg.Parameter + '}', value),
2✔
175
            };
2✔
176
            processing.Add(ProcessRouteAsync(httpContext, downstreamRoute, tPnv));
2✔
177
        }
178

179
        return processing;
2✔
180
    }
181

182
    /// <summary>
183
    /// Process a downstream route asynchronously.
184
    /// </summary>
185
    /// <returns>The cloned Http context.</returns>
186
    private async Task<HttpContext> ProcessRouteAsync(HttpContext sourceContext, DownstreamRoute route, List<PlaceholderNameAndValue> placeholders = null)
187
    {
188
        var newHttpContext = await CreateThreadContextAsync(sourceContext, route);
43✔
189
        CopyItemsToNewContext(newHttpContext, sourceContext, placeholders);
43✔
190
        newHttpContext.Items.UpsertDownstreamRoute(route);
43✔
191

192
        await _next.Invoke(newHttpContext);
43✔
193
        return newHttpContext;
43✔
194
    }
43✔
195

196
    /// <summary>
197
    /// Copying some needed parameters to the Http context items.
198
    /// </summary>
199
    private static void CopyItemsToNewContext(HttpContext target, HttpContext source, List<PlaceholderNameAndValue> placeholders = null)
200
    {
201
        target.Items.Add(RequestIdString, source.Items[RequestIdString]);
43✔
202
        target.Items.SetIInternalConfiguration(source.Items.IInternalConfiguration());
43✔
203
        target.Items.UpsertTemplatePlaceholderNameAndValues(placeholders ??
43✔
204
                                                            source.Items.TemplatePlaceholderNameAndValues());
43✔
205
    }
43✔
206

207
    /// <summary>
208
    /// Creates a new HttpContext based on the source.
209
    /// </summary>
210
    /// <param name="source">The base http context.</param>
211
    /// <param name="route">Downstream route.</param>
212
    /// <returns>The cloned context.</returns>
213
    protected virtual async Task<HttpContext> CreateThreadContextAsync(HttpContext source, DownstreamRoute route)
214
    {
215
        var from = source.Request;
44✔
216
        var bodyStream = await CloneRequestBodyAsync(from, route, source.RequestAborted);
44✔
217
        var target = new DefaultHttpContext
44✔
218
        {
44✔
219
            Request =
44✔
220
            {
44✔
221
                Body = bodyStream,
44✔
222
                ContentLength = from.ContentLength,
44✔
223
                ContentType = from.ContentType,
44✔
224
                Host = from.Host,
44✔
225
                Method = from.Method,
44✔
226
                Path = from.Path,
44✔
227
                PathBase = from.PathBase,
44✔
228
                Protocol = from.Protocol,
44✔
229
                QueryString = from.QueryString,
44✔
230
                Scheme = from.Scheme,
44✔
231
                IsHttps = from.IsHttps,
44✔
232
                Query = new QueryCollection(new Dictionary<string, StringValues>(from.Query)),
44✔
233
                RouteValues = new(from.RouteValues),
44✔
234
            },
44✔
235
            Connection =
44✔
236
            {
44✔
237
                RemoteIpAddress = source.Connection.RemoteIpAddress,
44✔
238
            },
44✔
239
            RequestServices = source.RequestServices,
44✔
240
            RequestAborted = source.RequestAborted,
44✔
241
            User = source.User,
44✔
242
        };
44✔
243
        foreach (var header in from.Headers)
88✔
244
        {
245
            target.Request.Headers[header.Key] = header.Value.ToArray();
×
246
        }
247

248
        // Once the downstream request is completed and the downstream response has been read, the downstream response object can dispose of the body's Stream object
249
        target.Response.RegisterForDisposeAsync(bodyStream); // manage Stream lifetime by HttpResponse object
44✔
250
        return target;
44✔
251
    }
44✔
252

253
    protected virtual Task MapAsync(HttpContext httpContext, Route route, List<HttpContext> contexts)
254
    {
255
        if (route.DownstreamRoute.Count == 1)
2✔
256
        {
257
            return Task.CompletedTask;
×
258
        }
259

260
        // ensure each context retains its correct aggregate key for proper response mapping
261
        if (route.DownstreamRouteConfig != null && route.DownstreamRouteConfig.Count > 0)
2✔
262
        {
263
            for (int i = 0; i < contexts.Count && i < route.DownstreamRouteConfig.Count; i++)
×
264
            {
265
                var key = route.DownstreamRouteConfig[i].RouteKey;
×
266
                contexts[i].Items["CurrentAggregateRouteKey"] = key;
×
267
            }
268
        }
269

270
        var aggregator = _factory.Get(route);
2✔
271
        return aggregator.Aggregate(route, httpContext, contexts);
2✔
272
    }
273

274
    protected virtual async Task<Stream> CloneRequestBodyAsync(HttpRequest request, DownstreamRoute route, CancellationToken aborted)
275
    {
276
        request.EnableBuffering();
44✔
277
        if (request.Body.Position != 0)
44✔
278
        {
279
            Logger.LogWarning(() => $"Ocelot does not support body copy without stream in initial position 0 for the route {route.Name()}.");
×
280
            return request.Body;
×
281
        }
282

283
        var targetBuffer = new MemoryStream();
44✔
284
        if (request.ContentLength is not null)
44✔
285
        {
286
            await request.Body.CopyToAsync(targetBuffer, (int)request.ContentLength, aborted);
×
287
            targetBuffer.Position = 0;
×
288
            request.Body.Position = 0;
×
289
        }
290
        else
291
        {
292
            Logger.LogInformation(() => $"Aggregation does not support body copy without Content-Length header, skipping body copy for the route {route.Name()}.");
44✔
293
        }
294

295
        return targetBuffer;
44✔
296
    }
44✔
297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc