• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemoBytom / DemoEngine / 22808717869

07 Mar 2026 10:34PM UTC coverage: 33.499% (+0.4%) from 33.105%
22808717869

push

coveralls.net

DemoBytom
Clean up around the `WorkItem` object pooling

General code cleanup to the `WorkItem` object pooling within `StaThreadService`. Includes registering the pool in DI, as well as resolving it for the service.
Unit tests were updated to test both pooled and non-pooled behavior.

1279 of 3818 relevant lines covered (33.5%)

0.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.61
/src/Demo.Engine.Core/Features/StaThread/StaThreadService.cs
1
// Copyright © Michał Dembski and contributors.
2
// Distributed under MIT license. See LICENSE file in the root for more information.
3

4
using System.Collections.Concurrent;
5
using System.Diagnostics.CodeAnalysis;
6
using System.Runtime.InteropServices;
7
using System.Threading.Channels;
8
using Demo.Engine.Core.Interfaces;
9
using Demo.Engine.Core.Interfaces.Platform;
10
using Demo.Engine.Core.Interfaces.Rendering;
11
using Demo.Engine.Core.Services;
12
using Microsoft.Extensions.Hosting;
13
using Microsoft.Extensions.Logging;
14
using Microsoft.Extensions.ObjectPool;
15
using WorkItem = Demo.Engine.Core.Features.StaThread.StaThreadService.StaSingleThreadedSynchronizationContext.WorkItem;
16

17
namespace Demo.Engine.Core.Features.StaThread;
18

19
internal sealed class StaThreadService
20
    : IStaThreadService
21
{
22
    private readonly ILogger<StaThreadService> _logger;
23
    private readonly IHostApplicationLifetime _hostApplicationLifetime;
24
    private readonly ChannelReader<StaThreadRequests> _channelReader;
25
    private readonly IMainLoopLifetime _mainLoopLifetime;
26
    private readonly ObjectPool<WorkItem>? _workItemPool;
27

28
    public Task ExecutingTask { get; }
1✔
29
    public bool IsRunning { get; private set; }
1✔
30

31
    public StaThreadService(
1✔
32
        ILogger<StaThreadService> logger,
1✔
33
        IHostApplicationLifetime hostApplicationLifetime,
1✔
34
        IRenderingEngine renderingEngine,
1✔
35
        IOSMessageHandler osMessageHandler,
1✔
36
        ChannelReader<StaThreadRequests> channelReader,
1✔
37
        IMainLoopLifetime mainLoopLifetime,
1✔
38
        ObjectPool<WorkItem>? workItemPool = null)
1✔
39
    {
40
        _logger = logger;
1✔
41
        _hostApplicationLifetime = hostApplicationLifetime;
1✔
42
        _channelReader = channelReader;
1✔
43
        _mainLoopLifetime = mainLoopLifetime;
1✔
44
        _workItemPool = workItemPool;
1✔
45
        IsRunning = true;
1✔
46
        ExecutingTask = RunSTAThread(
1✔
47
            renderingEngine,
1✔
48
            osMessageHandler);
1✔
49
    }
1✔
50

51
    private async Task RunSTAThread(
52
        IRenderingEngine renderingEngine,
53
        IOSMessageHandler osMessageHandler)
54
    {
55
        var originalSynContext = SynchronizationContext.Current;
1✔
56
        StaSingleThreadedSynchronizationContext? syncContext = null;
1✔
57
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(
1✔
58
            _hostApplicationLifetime.ApplicationStopping,
1✔
59
            _mainLoopLifetime.Token);
1✔
60

61
        try
62
        {
63

64
            syncContext = new StaSingleThreadedSynchronizationContext(_workItemPool);
1✔
65
            SynchronizationContext.SetSynchronizationContext(syncContext);
1✔
66
            // Task.Yield is used here to force a context switch to STA thread
67
            // Awaiting here allows the STA thread to start and pump messages, which is necessary for the STAThread method to function correctly.
68
            await Task.Yield();
1✔
69
            System.Diagnostics.Debug.Assert(
70
                SynchronizationContext.Current == syncContext,
71
                "SynchronizationContext should be set to the STA context.");
72

73
            try
74
            {
75
                syncContext.OnError += OnStaContextError;
1✔
76

77
                await STAThread(
1✔
78
                        renderingEngine,
1✔
79
                        osMessageHandler,
1✔
80
                        cts.Token)
1✔
81
                    .ConfigureAwait(continueOnCapturedContext: true);
1✔
82
            }
83
            catch (OperationCanceledException)
1✔
84
            {
85
                _logger.LogStaThreadServiceWasCancelled();
1✔
86
            }
1✔
87
            catch (Exception)
88
            {
89
                IsRunning = false;
90
                _mainLoopLifetime.Cancel();
91
                throw;
92
            }
93
            IsRunning = false;
1✔
94
            _mainLoopLifetime.Cancel();
1✔
95

96
        }
97
        finally
98
        {
99
            SynchronizationContext.SetSynchronizationContext(originalSynContext);
1✔
100
            // Force context switch back to original context to ensure all STA thread work is completed before disposing the syncContext
101
            await Task.Yield();
1✔
102
            syncContext?.OnError -= OnStaContextError;
103
            syncContext?.Dispose();
104
        }
105

106
        void OnStaContextError(object? sender, Exception ex)
107
        {
108
            _logger.LogCritical(ex,
109
                "An error occured within the STA thread synchronization context.");
110
            cts.Cancel();
111
        }
112
    }
1✔
113

114
    private async Task STAThread(
115
        IRenderingEngine renderingEngine,
116
        IOSMessageHandler osMessageHandler,
117
        CancellationToken cancellationToken)
118
    {
119
        var doEventsOk = true;
1✔
120

121
        await foreach (var staAction in _channelReader
122
            .ReadAllAsync(cancellationToken)
1✔
123
            .ConfigureAwait(continueOnCapturedContext: true)
1✔
124
            .WithCancellation(cancellationToken))
1✔
125
        {
126
            switch (staAction)
127
            {
128
                case StaThreadRequests.DoEventsOkRequest doEventsOkRequest:
129
                    doEventsOk &= await doEventsOkRequest
130
                        .InvokeAsync(renderingEngine, osMessageHandler, cancellationToken)
131
                        .ConfigureAwait(continueOnCapturedContext: true);
132
                    break;
133

134
                default:
135
                    _ = await staAction
1✔
136
                        .InvokeAsync(renderingEngine, osMessageHandler, cancellationToken)
1✔
137
                        .ConfigureAwait(continueOnCapturedContext: true);
1✔
138
                    break;
139
            }
140

141
            if (!doEventsOk || !IsRunning || cancellationToken.IsCancellationRequested)
142
            {
143
                break;
144
            }
145
        }
146
    }
147

148
    internal sealed class StaSingleThreadedSynchronizationContext
149
        : SynchronizationContext,
150
          IDisposable
151
    {
152
        private readonly BlockingCollection<WorkItem> _workQueue = [];
2✔
153

154
        private readonly Thread _thread;
155
        private readonly CancellationTokenSource _cancellationTokenSource = new();
2✔
156
        private readonly ObjectPool<WorkItem>? _workItemPool;
157
        private bool _isDisposed;
158

159
        public event EventHandler<Exception>? OnError;
160

161
        /// <inheritdoc/>
162
        public override void Post(SendOrPostCallback d, object? state)
163
            => _workQueue.Add(
2✔
164
                GetAsynchronousWorkItem(d, state));
2✔
165

166
        /// <inheritdoc/>
167
        public override void Send(SendOrPostCallback d, object? state)
168
        {
169
            // If we're already on the STA thread, execute the callback directly to avoid deadlock.
170
            if (Environment.CurrentManagedThreadId == _thread.ManagedThreadId)
171
            {
172
                d.Invoke(state);
173
                return;
174
            }
175

176
            var workItem = GetSynchronousWorkItem(d, state);
177

178
            try
179
            {
180
                _workQueue.Add(workItem);
181
                workItem.SyncEvent!.Wait();
182

183
                if (workItem.Exception is { } exception)
184
                {
185
                    throw exception;
186
                }
187
            }
188
            finally
189
            {
190
                DisposeWorkItem(workItem);
191
            }
192
        }
193

194
        public StaSingleThreadedSynchronizationContext()
195
            : this(null)
196
        { }
197

198
        internal StaSingleThreadedSynchronizationContext(
2✔
199
            ObjectPool<WorkItem>? workItemPool = null)
2✔
200
        {
201
            _thread = new Thread(ThreadInner)
2✔
202
            {
2✔
203
                IsBackground = false,
2✔
204
            };
2✔
205

206
            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
207
            {
208
                //Can only by set on the Windows machine. Doesn't work on Linux/MacOS
209
                _thread.SetApartmentState(ApartmentState.STA);
2✔
210
                _thread.Name = "Main STA thread";
2✔
211
            }
212
            else
213
            {
214
                _thread.Name = "Main thread";
215
            }
216

217
            _thread.Start();
2✔
218
            _workItemPool = workItemPool;
2✔
219
        }
2✔
220

221
        private void ThreadInner()
222
        {
223
            SetSynchronizationContext(this);
2✔
224
            try
225
            {
226
                foreach (var workItem in _workQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
227
                {
228
                    try
229
                    {
230
                        workItem.Callback.Invoke(workItem.State);
2✔
231
                    }
2✔
232
                    catch (Exception ex)
2✔
233
                    {
234
                        if (workItem.IsSynchronous)
235
                        {
236
                            workItem.Exception = ex;
237
                        }
238
                        else
239
                        {
240
                            /* An exception cannot be thrown to the caller from here
241
                             * since the caller has already continued execution after posting the work item. 
242
                             * Instead an event is rised, if there's subscribers, or the process is terminated
243
                             * If there are subscribers, but they don't cancel the sync context within 10 seconds
244
                             * the process is terminated to avoid running in an unstable state. */
245
                            if (OnError is not null)
246
                            {
247
                                OnError.Invoke(this, ex);
2✔
248
                                _ = ExitAfter10Seconds(_cancellationTokenSource.Token);
2✔
249
                            }
250
                            else
251
                            {
252
                                Environment.Exit(-1);
253
                            }
254
                        }
255

256
                        static async Task ExitAfter10Seconds(CancellationToken cancellationToken)
257
                        {
258
                            try
259
                            {
260
                                await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken)
2✔
261
                                    .ConfigureAwait(false);
2✔
262
                            }
263
                            catch (Exception)
264
                                when (cancellationToken.IsCancellationRequested)
2✔
265
                            {
266
                                return;
2✔
267
                            }
268

269
                            Environment.Exit(-1);
270
                        }
2✔
271
                    }
2✔
272
                    finally
273
                    {
274
                        if (workItem.IsSynchronous)
275
                        {
276
                            workItem.SyncEvent.Set();
277
                        }
278
                        else
279
                        {
280
                            DisposeWorkItem(workItem);
2✔
281
                        }
282
                    }
2✔
283
                }
284
            }
285
            catch (OperationCanceledException)
2✔
286
            {
287
                // Gracefully exit the thread when cancellation is requested.
288
            }
2✔
289
            catch (InvalidOperationException)
290
            {
291
                // The BlockingCollection has been marked as complete for adding, which means we're shutting down.
292
            }
293
        }
2✔
294

295
        private WorkItem GetSynchronousWorkItem(SendOrPostCallback callback, object? state)
296
            => _workItemPool?
297
                .Get()
298
                .SetSynchronous(callback, state)
299
            ?? WorkItem.Synchronous(callback, state);
300

301
        private WorkItem GetAsynchronousWorkItem(SendOrPostCallback callback, object? state)
302
            => _workItemPool?
2✔
303
                .Get()
2✔
304
                .SetAsynchronous(callback, state)
2✔
305
            ?? WorkItem.Asynchronous(callback, state);
2✔
306

307
        private void DisposeWorkItem(WorkItem workItem)
308
        {
309
            if (_workItemPool is not null)
2✔
310
            {
311
                _workItemPool.Return(workItem);
2✔
312
            }
313
            else
314
            {
315
                workItem.Dispose();
2✔
316
            }
317
        }
2✔
318

319
        public void Dispose()
320
        {
321
            if (_isDisposed)
322
            {
323
                return;
324
            }
325
            _isDisposed = true;
2✔
326

327
            // Gracefully shutdown the message pump
328
            _cancellationTokenSource.Cancel();
2✔
329
            _workQueue.CompleteAdding();
2✔
330

331
            _thread.Join();
2✔
332
            _workQueue.Dispose();
2✔
333
            _cancellationTokenSource.Dispose();
2✔
334
        }
2✔
335

336
        internal sealed class WorkItem
337
            : IDisposable,
338
              IResettable
339
        {
340
            private bool _isDisposed;
341

342
            public SendOrPostCallback Callback { get; private set; }
3✔
343

344
            public object? State { get; private set; }
3✔
345

346
            [MemberNotNullWhen(true, nameof(SyncEvent))]
347
            public bool IsSynchronous { get; private set; }
3✔
348

349
            public ManualResetEventSlim? SyncEvent { get; private set; }
3✔
350

351
            public Exception? Exception { get; set; }
3✔
352

353
            private WorkItem(
3✔
354
                SendOrPostCallback callback,
3✔
355
                object? state,
3✔
356
                bool isSynchronous)
3✔
357
            {
358
                Callback = callback;
3✔
359
                State = state;
3✔
360
                IsSynchronous = isSynchronous;
3✔
361
                if (isSynchronous)
362
                {
363
                    SyncEvent = new ManualResetEventSlim();
×
364
                }
365
            }
3✔
366

367
            public WorkItem()
3✔
368
                => Callback = null!;
3✔
369

370
            private WorkItem Set(
371
                SendOrPostCallback callback,
372
                object? state,
373
                bool isSynchronous)
374
            {
375
                Callback = callback;
3✔
376
                State = state;
3✔
377
                IsSynchronous = isSynchronous;
3✔
378

379
                if (IsSynchronous)
380
                {
381
                    (SyncEvent ??= new ManualResetEventSlim())
×
382
                        .Reset();
×
383
                }
384

385
                return this;
3✔
386
            }
387

388
            internal WorkItem SetSynchronous(SendOrPostCallback callback, object? state)
389
                => Set(
×
390
                    callback: callback,
×
391
                    state: state,
×
392
                    isSynchronous: true);
×
393

394
            internal WorkItem SetAsynchronous(SendOrPostCallback callback, object? state)
395
                => Set(
3✔
396
                    callback: callback,
3✔
397
                    state: state,
3✔
398
                    isSynchronous: false);
3✔
399

400
            internal static WorkItem Synchronous(SendOrPostCallback callback, object? state)
401
                => new(
×
402
                    callback: callback,
×
403
                    state: state,
×
404
                    isSynchronous: true);
×
405

406
            internal static WorkItem Asynchronous(SendOrPostCallback callback, object? state)
407
                => new(
3✔
408
                    callback: callback,
3✔
409
                    state: state,
3✔
410
                    isSynchronous: false);
3✔
411

412
            public void Dispose()
413
            {
414
                if (_isDisposed)
415
                {
416
                    return;
×
417
                }
418

419
                _isDisposed = true;
3✔
420

421
                SyncEvent?.Dispose();
422
                SyncEvent = null;
3✔
423
            }
3✔
424

425
            public bool TryReset()
426
            {
427
                ObjectDisposedException.ThrowIf(_isDisposed, this);
3✔
428

429
                SyncEvent?.Reset();
430
                Exception = null;
3✔
431
                Callback = null!;
3✔
432
                State = null;
3✔
433

434
                return true;
3✔
435
            }
436
        }
437
    }
438
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc