• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 13318089130

13 Feb 2025 10:13PM UTC coverage: 57.398% (+0.004%) from 57.394%
13318089130

Pull #2134

github

jas88
Update ChildProviderTests.cs

Fix up TestUpTo method
Pull Request #2134: CodeQL fixups

11346 of 21308 branches covered (53.25%)

Branch coverage included in aggregate %.

104 of 175 new or added lines in 45 files covered. (59.43%)

362 existing lines in 23 files now uncovered.

32218 of 54590 relevant lines covered (59.02%)

17091.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.09
/Rdmp.Core/DataFlowPipeline/DataFlowPipelineEngine.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Collections.ObjectModel;
10
using System.Data;
11
using System.Linq;
12
using System.Threading;
13
using Rdmp.Core.CommandExecution;
14
using Rdmp.Core.Curation.Data.Pipelines;
15
using Rdmp.Core.DataFlowPipeline.Requirements;
16
using Rdmp.Core.ReusableLibraryCode.Checks;
17
using Rdmp.Core.ReusableLibraryCode.Progress;
18
using Renci.SshNet.Messages;
19

20
namespace Rdmp.Core.DataFlowPipeline;
21

22
/// <summary>
23
/// Generic implementation of IDataFlowPipelineEngine (See IDataFlowPipelineEngine).  You can create a DataFlowPipelineEngine by manually constructing the context,
24
/// source, destination etc but more often you will want to use an IPipeline configured by the user and an IPipelineUseCase to stamp out the pipeline into an instance
25
/// of the engine (See IDataFlowPipelineEngineFactory).  IPipeline is the user configured set of components they think will achieve a given task.
26
/// </summary>
27
/// <typeparam name="T"></typeparam>
28
public class DataFlowPipelineEngine<T> : IDataFlowPipelineEngine
29
{
30
    private readonly DataFlowPipelineContext<T> _context;
31
    private readonly IDataLoadEventListener _listener;
32

33
    private bool initialized;
34
    private string _name;
35

36
    /// <summary>
37
    /// Readonly cast of <see cref="ComponentObjects"/>. If you need to add components, add them to <see cref="ComponentObjects"/> instead.
38
    /// </summary>
39
    public ReadOnlyCollection<IDataFlowComponent<T>> Components =>
40
        ComponentObjects.Cast<IDataFlowComponent<T>>().ToList().AsReadOnly();
1,172✔
41

42
    /// <summary>
43
    /// The last component in the pipeline, responsible for writing the chunks (of type {T}) to somewhere (E.g. to disk, to database etc)
44
    /// </summary>
45
    public IDataFlowDestination<T> Destination { get; private set; }
2,114✔
46

47
    /// <summary>
48
    /// The first component in the pipeline, responsible for iteratively generating chunks (of type {T}) for feeding to downstream pipeline components
49
    /// </summary>
50
    public IDataFlowSource<T> Source { get; private set; }
2,458✔
51

52
    /// <summary>
53
    /// Middle components of the pipeline, must be <see cref="IDataFlowComponent{T}"/> with T appropriate to the context.
54
    /// </summary>
55
    public List<object> ComponentObjects { get; set; }
1,710✔
56

57
    /// <inheritdoc cref="Destination"/>
58
    public object DestinationObject => Destination;
166✔
59

60
    /// <inheritdoc cref="Source"/>
61
    public object SourceObject => Source;
110✔
62

63
    /// <summary>
64
    /// Creates a new pipeline engine ready to run under the <paramref name="context"/> recording events that occur to <paramref name="listener"/>.
65
    /// </summary>
66
    /// <param name="context"></param>
67
    /// <param name="source"></param>
68
    /// <param name="destination"></param>
69
    /// <param name="listener"></param>
70
    /// <param name="pipelineSource"></param>
71
    public DataFlowPipelineEngine(DataFlowPipelineContext<T> context, IDataFlowSource<T> source,
408✔
72
        IDataFlowDestination<T> destination, IDataLoadEventListener listener,
408✔
73
        IPipeline pipelineSource = null)
408✔
74
    {
75
        Source = source;
408✔
76
        Destination = destination;
408✔
77
        _context = context;
408✔
78
        _listener = listener;
408✔
79
        ComponentObjects = new List<object>();
408✔
80

81
        _name = pipelineSource != null ? pipelineSource.Name : "Undefined pipeline";
408✔
82
    }
408✔
83

84
    /// <inheritdoc/>
85
    public void Initialize(params object[] initializationObjects)
86
    {
87
        _context.PreInitialize(_listener, Source, initializationObjects);
408✔
88

89
        foreach (var component in Components)
1,076✔
90
            _context.PreInitialize(_listener, component, initializationObjects);
130✔
91

92
        _context.PreInitialize(_listener, Destination, initializationObjects);
408✔
93

94
        initialized = true;
408✔
95
    }
408✔
96

97
    private void UIAlert(string alert, IBasicActivateItems activator)
98
    {
99
        if (!activator.IsInteractive) return;
×
100
        new Thread(() =>
×
101
        {
×
102
            // run as a separate thread to not halt the UI
×
103
            activator.ShowWarning(alert);
×
104

×
105
        })
×
106
        {
×
107
            IsBackground = true
×
108
        }.Start();
×
UNCOV
109
    }
×
110

111
    /// <inheritdoc/>
112
    public void ExecutePipeline(GracefulCancellationToken cancellationToken)
113
    {
114
        Exception exception = null;
368✔
115
        try
116
        {
117
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Debug, "Starting pipeline engine"));
368✔
118

119
            if (!initialized)
368!
120
                throw new Exception(
×
UNCOV
121
                    "Engine has not been initialized, call Initialize(DataFlowPipelineContext context, params object[] initializationObjects");
×
122

123
            var hasMoreData = true;
368✔
124
            List<Tuple<string, IBasicActivateItems>> uiAlerts = new();
368✔
125
            while (!cancellationToken.IsCancellationRequested && hasMoreData)
1,056✔
126
                hasMoreData = ExecuteSinglePass(cancellationToken, uiAlerts);
716✔
127

128
            if (cancellationToken.IsAbortRequested)
340!
129
            {
UNCOV
130
                Source.Abort(_listener);
×
131

132
                foreach (var c in Components)
×
UNCOV
133
                    c.Abort(_listener);
×
134

UNCOV
135
                Destination.Abort(_listener);
×
136

137
                _listener.OnNotify(this,
×
138
                    new NotifyEventArgs(ProgressEventType.Information, "Pipeline engine aborted"));
×
UNCOV
139
                return;
×
140
            }
141
            foreach (var alert in uiAlerts.Distinct().Where(static alert => alert is not null))
748!
UNCOV
142
                UIAlert(alert.Item1, alert.Item2);
×
143
        }
340✔
144
        catch (Exception e)
28✔
145
        {
146
            exception = e;
28✔
147
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, e.Message, e));
28✔
148
        }
28✔
149
        finally
150
        {
151
            _listener.OnNotify(this,
368✔
152
                new NotifyEventArgs(ProgressEventType.Debug,
368✔
153
                    "Preparing to Dispose of DataFlowPipelineEngine components"));
368✔
154

155
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"About to Dispose {Source}"));
368✔
156
            try
157
            {
158
                Source.Dispose(_listener, exception);
368✔
159
            }
368✔
UNCOV
160
            catch (Exception e)
×
161
            {
162
                //dispose crashing is only a deal-breaker if there wasn't already an exception in the pipeline
163
                if (exception == null)
×
UNCOV
164
                    throw;
×
165

166
                _listener.OnNotify(Source,
×
167
                    new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Source Component", e));
×
UNCOV
168
            }
×
169

170
            foreach (var dataLoadComponent in Components)
996✔
171
            {
172
                _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace,
130✔
173
                    $"About to Dispose {dataLoadComponent}"));
130✔
174
                try
175
                {
176
                    dataLoadComponent.Dispose(_listener, exception);
130✔
177
                }
130✔
UNCOV
178
                catch (Exception e)
×
179
                {
180
                    //dispose crashing is only a deal-breaker if there wasn't already an exception in the pipeline
181
                    if (exception == null)
×
UNCOV
182
                        throw;
×
183

184
                    _listener.OnNotify(dataLoadComponent,
×
185
                        new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Component", e));
×
UNCOV
186
                }
×
187
            }
188

189
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"About to Dispose {Destination}"));
368✔
190
            try
191
            {
192
                Destination.Dispose(_listener, exception);
368✔
193
            }
368✔
UNCOV
194
            catch (Exception e)
×
195
            {
196
                //dispose crashing is only a dealbreaker if there wasn't already an exception in the pipeline
197
                if (exception == null)
×
UNCOV
198
                    throw;
×
199

200
                _listener.OnNotify(Destination,
×
201
                    new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Destination Component", e));
×
UNCOV
202
            }
×
203
        }
368✔
204

205
        if (exception != null)
368✔
206
            throw new PipelineCrashedException("Data Flow Pipeline Crashed", exception);
28✔
207
    }
340✔
208

209
    /// <inheritdoc/>
210
    public bool ExecuteSinglePass(GracefulCancellationToken cancellationToken, List<Tuple<string, IBasicActivateItems>> completionUIAlerts = null)
211
    {
212
        if (!initialized)
756!
213
            throw new Exception(
×
UNCOV
214
                "Engine has not been initialized, call Initialize(DataFlowPipelineContext context, params object[] initializationObjects");
×
215

216
        T currentChunk;
217
        try
218
        {
219
            currentChunk = Source.GetChunk(_listener, cancellationToken);
756✔
220
        }
728✔
UNCOV
221
        catch (OperationCanceledException)
×
222
        {
UNCOV
223
            throw;
×
224
        }
225
        catch (Exception e)
28✔
226
        {
227
            throw new InvalidOperationException(
28✔
228
                $"Error when attempting to get a chunk from the source component: {Source}", e);
28✔
229
        }
230
        if (currentChunk == null)
728✔
231
        {
232
            _listener.OnNotify(this,
344✔
233
                new NotifyEventArgs(ProgressEventType.Debug,
344✔
234
                    "Received null chunk from the Source component, stopping engine"));
344✔
235
            return false;
344✔
236
        }
237

238
        try {
239
          foreach (var component in Components)
1,028✔
240
          {
241
              if (cancellationToken.IsAbortRequested) break;
130!
242

243
              currentChunk = component.ProcessPipelineData(currentChunk, _listener, cancellationToken);
130✔
244
              if (completionUIAlerts is not null && currentChunk is DataTable dt)
130✔
245
              {
246
                  var uiAlert = (Tuple<string, IBasicActivateItems>)dt.ExtendedProperties["AlertUIAtEndOfProcess"];
128✔
247
                  completionUIAlerts.Add(uiAlert);
128✔
248
              }
249
          }
250

251
          if (cancellationToken.IsAbortRequested) return true;
384!
252

253
          Destination.ProcessPipelineData(currentChunk, _listener, cancellationToken);
384✔
254

255
          if (cancellationToken.IsAbortRequested) return true;
384!
256
        }
384✔
257
        finally {
258
          //if it is a DataTable call .Clear() because Dispose doesn't actually free up any memory
259
          if (currentChunk is DataTable dt2)
384✔
260
              dt2.Clear();
334✔
261

262
          //if the chunk is something that can be disposed, dispose it (e.g. DataTable - to free up memory)
263
          if (currentChunk is IDisposable junk)
384✔
264
  #pragma warning disable
265
              junk.Dispose();
334✔
266
        }
384✔
267

268
        return true;
384✔
UNCOV
269
    }
×
270

271
    /// <summary>
272
    /// Runs checks on all components in the pipeline that support <see cref="ICheckable"/>
273
    /// </summary>
274
    /// <param name="notifier"></param>
275
    public void Check(ICheckNotifier notifier)
276
    {
277
        try
278
        {
279
            if (Source is ICheckable checkableSource)
12!
280
            {
281
                notifier.OnCheckPerformed(new CheckEventArgs(
12✔
282
                    $"About to start checking Source component {checkableSource}",
12✔
283
                    CheckResult.Success));
12✔
284
                checkableSource.Check(notifier);
12✔
285
            }
286
            else
287
            {
288
                notifier.OnCheckPerformed(
×
289
                    new CheckEventArgs(
×
290
                        $"Source component {Source} does not support ICheckable so skipping checking it",
×
UNCOV
291
                        CheckResult.Warning));
×
292
            }
293

294
            foreach (var component in Components)
24!
UNCOV
295
                if (component is ICheckable checkable)
×
296
                {
297
                    notifier.OnCheckPerformed(new CheckEventArgs($"About to start checking component {component}",
×
298
                        CheckResult.Success));
×
UNCOV
299
                    checkable.Check(notifier);
×
300
                }
301
                else
302
                {
303
                    notifier.OnCheckPerformed(
×
304
                        new CheckEventArgs(
×
305
                            $"Component {component} does not support ICheckable so skipping checking it",
×
UNCOV
306
                            CheckResult.Warning));
×
307
                }
308

309
            if (Destination is ICheckable checkableDestination)
12!
310
            {
311
                notifier.OnCheckPerformed(new CheckEventArgs(
12✔
312
                    $"About to start checking Destination component {checkableDestination}",
12✔
313
                    CheckResult.Success));
12✔
314
                checkableDestination.Check(notifier);
12✔
315
            }
316
            else
317
            {
318
                notifier.OnCheckPerformed(
×
319
                    new CheckEventArgs(
×
320
                        $"Destination component {Destination} does not support ICheckable so skipping checking it",
×
UNCOV
321
                        CheckResult.Warning));
×
322
            }
UNCOV
323
        }
×
324
        catch (Exception e)
12✔
325
        {
326
            notifier.OnCheckPerformed(
12✔
327
                new CheckEventArgs(
12✔
328
                    $"{typeof(DataFlowPipelineEngine<T>).Name} Checking failed in an unexpected way",
12✔
329
                    CheckResult.Fail, e));
12✔
UNCOV
330
        }
×
331

332

333
        notifier.OnCheckPerformed(new CheckEventArgs("Finished checking all components", CheckResult.Success));
×
UNCOV
334
    }
×
335

336
    /// <inheritdoc/>
UNCOV
337
    public override string ToString() => _name;
×
338
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc