• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 13493861722

24 Feb 2025 08:35AM UTC coverage: 57.414% (-0.03%) from 57.446%
13493861722

push

github

web-flow
marge v8.4.3 into main (#2145)

* simple db update

* add changelog

* revert csproj

* Fix up some codeql/inspection code issues (#2087)

* Update OverviewModel.cs

Fix up some .Dispose/using issues, make finding most recent load ID more efficient

* LINQ tidying

* CodeQL fixups

* Update Catalogue.cs

Add Hashcode, Equals methods.

* Update Catalogue.cs

Tweak equality semantics for RDMP DB oddities

* Bump actions/setup-dotnet from 4.1.0 to 4.2.0

Bumps [actions/setup-dotnet](https://github.com/actions/setup-dotnet) from 4.1.0 to 4.2.0.
- [Release notes](https://github.com/actions/setup-dotnet/releases)
- [Commits](https://github.com/actions/setup-dotnet/compare/v4.1.0...v4.2.0)

---
updated-dependencies:
- dependency-name: actions/setup-dotnet
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump NUnit.Analyzers from 4.4.0 to 4.5.0

Bumps [NUnit.Analyzers](https://github.com/nunit/nunit.analyzers) from 4.4.0 to 4.5.0.
- [Release notes](https://github.com/nunit/nunit.analyzers/releases)
- [Changelog](https://github.com/nunit/nunit.analyzers/blob/master/CHANGES.md)
- [Commits](https://github.com/nunit/nunit.analyzers/compare/4.4.0...4.5.0)

---
updated-dependencies:
- dependency-name: NUnit.Analyzers
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump Minio from 6.0.3 to 6.0.4

Bumps [Minio](https://github.com/minio/minio-dotnet) from 6.0.3 to 6.0.4.
- [Release notes](https://github.com/minio/minio-dotnet/releases)
- [Commits](https://github.com/minio/minio-dotnet/compare/6.0.3...6.0.4)

---
updated-dependencies:
- dependency-name: Minio
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump the aws-sdk group across 1 directory with 4 updates

Bumps the aws-sdk grou... (continued)

11358 of 21338 branches covered (53.23%)

Branch coverage included in aggregate %.

1116 of 1482 new or added lines in 50 files covered. (75.3%)

19 existing lines in 10 files now uncovered.

32283 of 54673 relevant lines covered (59.05%)

17450.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.09
/Rdmp.Core/DataFlowPipeline/DataFlowPipelineEngine.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Collections.ObjectModel;
10
using System.Data;
11
using System.Linq;
12
using System.Threading;
13
using Rdmp.Core.CommandExecution;
14
using Rdmp.Core.Curation.Data.Pipelines;
15
using Rdmp.Core.DataFlowPipeline.Requirements;
16
using Rdmp.Core.ReusableLibraryCode.Checks;
17
using Rdmp.Core.ReusableLibraryCode.Progress;
18

19
namespace Rdmp.Core.DataFlowPipeline;
20

21
/// <summary>
22
/// Generic implementation of IDataFlowPipelineEngine (See IDataFlowPipelineEngine).  You can create a DataFlowPipelineEngine by manually constructing the context,
23
/// source, destination etc but more often you will want to use an IPipeline configured by the user and an IPipelineUseCase to stamp out the pipeline into an instance
24
/// of the engine (See IDataFlowPipelineEngineFactory).  IPipeline is the user configured set of components they think will achieve a given task.
25
/// </summary>
26
/// <typeparam name="T"></typeparam>
27
public class DataFlowPipelineEngine<T> : IDataFlowPipelineEngine
28
{
29
    private readonly DataFlowPipelineContext<T> _context;
30
    private readonly IDataLoadEventListener _listener;
31

32
    private bool initialized;
33
    private string _name;
34

35
    /// <summary>
36
    /// Readonly cast of <see cref="ComponentObjects"/>. If you need to add components, add them to <see cref="ComponentObjects"/> instead.
37
    /// </summary>
38
    public ReadOnlyCollection<IDataFlowComponent<T>> Components =>
39
        ComponentObjects.Cast<IDataFlowComponent<T>>().ToList().AsReadOnly();
1,176✔
40

41
    /// <summary>
42
    /// The last component in the pipeline, responsible for writing the chunks (of type {T}) to somewhere (E.g. to disk, to database etc)
43
    /// </summary>
44
    public IDataFlowDestination<T> Destination { get; private set; }
2,118✔
45

46
    /// <summary>
47
    /// The first component in the pipeline, responsible for iteratively generating chunks (of type {T}) for feeding to downstream pipeline components
48
    /// </summary>
49
    public IDataFlowSource<T> Source { get; private set; }
2,462✔
50

51
    /// <summary>
52
    /// Middle components of the pipeline, must be <see cref="IDataFlowComponent{T}"/> with T appropriate to the context.
53
    /// </summary>
54
    public List<object> ComponentObjects { get; set; }
1,714✔
55

56
    /// <inheritdoc cref="Destination"/>
57
    public object DestinationObject => Destination;
166✔
58

59
    /// <inheritdoc cref="Source"/>
60
    public object SourceObject => Source;
110✔
61

62
    /// <summary>
63
    /// Creates a new pipeline engine ready to run under the <paramref name="context"/> recording events that occur to <paramref name="listener"/>.
64
    /// </summary>
65
    /// <param name="context"></param>
66
    /// <param name="source"></param>
67
    /// <param name="destination"></param>
68
    /// <param name="listener"></param>
69
    /// <param name="pipelineSource"></param>
70
    public DataFlowPipelineEngine(DataFlowPipelineContext<T> context, IDataFlowSource<T> source,
408✔
71
        IDataFlowDestination<T> destination, IDataLoadEventListener listener,
408✔
72
        IPipeline pipelineSource = null)
408✔
73
    {
74
        Source = source;
408✔
75
        Destination = destination;
408✔
76
        _context = context;
408✔
77
        _listener = listener;
408✔
78
        ComponentObjects = new List<object>();
408✔
79

80
        _name = pipelineSource != null ? pipelineSource.Name : "Undefined pipeline";
408✔
81
    }
408✔
82

83
    /// <inheritdoc/>
84
    public void Initialize(params object[] initializationObjects)
85
    {
86
        _context.PreInitialize(_listener, Source, initializationObjects);
408✔
87

88
        foreach (var component in Components)
1,076✔
89
            _context.PreInitialize(_listener, component, initializationObjects);
130✔
90

91
        _context.PreInitialize(_listener, Destination, initializationObjects);
408✔
92

93
        initialized = true;
408✔
94
    }
408✔
95

96
    private void UIAlert(string alert, IBasicActivateItems activator)
97
    {
98
        if (!activator.IsInteractive) return;
×
UNCOV
99
        new Thread(() =>
×
100
        {
×
101
            // run as a separate thread to not halt the UI
×
NEW
102
            activator.ShowWarning(alert);
×
NEW
103

×
104
        })
×
105
        {
×
106
            IsBackground = true
×
107
        }.Start();
×
108
    }
×
109

110
    /// <inheritdoc/>
111
    public void ExecutePipeline(GracefulCancellationToken cancellationToken)
112
    {
113
        Exception exception = null;
368✔
114
        try
115
        {
116
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Debug, "Starting pipeline engine"));
368✔
117

118
            if (!initialized)
368!
119
                throw new Exception(
×
120
                    "Engine has not been initialized, call Initialize(DataFlowPipelineContext context, params object[] initializationObjects");
×
121

122
            var hasMoreData = true;
368✔
123
            List<Tuple<string, IBasicActivateItems>> uiAlerts = new();
368✔
124
            while (!cancellationToken.IsCancellationRequested && hasMoreData)
1,056✔
125
                hasMoreData = ExecuteSinglePass(cancellationToken, uiAlerts);
716✔
126

127
            if (cancellationToken.IsAbortRequested)
340!
128
            {
129
                Source.Abort(_listener);
×
130

131
                foreach (var c in Components)
×
132
                    c.Abort(_listener);
×
133

134
                Destination.Abort(_listener);
×
135

136
                _listener.OnNotify(this,
×
137
                    new NotifyEventArgs(ProgressEventType.Information, "Pipeline engine aborted"));
×
138
                return;
×
139
            }
140
            foreach (var alert in uiAlerts.Distinct().Where(static alert => alert is not null))
748!
141
                UIAlert(alert.Item1, alert.Item2);
×
142
        }
340✔
143
        catch (Exception e)
28✔
144
        {
145
            exception = e;
28✔
146
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, e.Message, e));
28✔
147
        }
28✔
148
        finally
149
        {
150
            _listener.OnNotify(this,
368✔
151
                new NotifyEventArgs(ProgressEventType.Debug,
368✔
152
                    "Preparing to Dispose of DataFlowPipelineEngine components"));
368✔
153

154
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"About to Dispose {Source}"));
368✔
155
            try
156
            {
157
                Source.Dispose(_listener, exception);
368✔
158
            }
368✔
159
            catch (Exception e)
×
160
            {
161
                //dispose crashing is only a deal-breaker if there wasn't already an exception in the pipeline
162
                if (exception == null)
×
163
                    throw;
×
164

165
                _listener.OnNotify(Source,
×
166
                    new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Source Component", e));
×
167
            }
×
168

169
            foreach (var dataLoadComponent in Components)
996✔
170
            {
171
                _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace,
130✔
172
                    $"About to Dispose {dataLoadComponent}"));
130✔
173
                try
174
                {
175
                    dataLoadComponent.Dispose(_listener, exception);
130✔
176
                }
130✔
177
                catch (Exception e)
×
178
                {
179
                    //dispose crashing is only a deal-breaker if there wasn't already an exception in the pipeline
180
                    if (exception == null)
×
181
                        throw;
×
182

183
                    _listener.OnNotify(dataLoadComponent,
×
184
                        new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Component", e));
×
185
                }
×
186
            }
187

188
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"About to Dispose {Destination}"));
368✔
189
            try
190
            {
191
                Destination.Dispose(_listener, exception);
368✔
192
            }
368✔
193
            catch (Exception e)
×
194
            {
195
                //dispose crashing is only a dealbreaker if there wasn't already an exception in the pipeline
196
                if (exception == null)
×
197
                    throw;
×
198

199
                _listener.OnNotify(Destination,
×
200
                    new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Destination Component", e));
×
201
            }
×
202
        }
368✔
203

204
        if (exception != null)
368✔
205
            throw new PipelineCrashedException("Data Flow Pipeline Crashed", exception);
28✔
206
    }
340✔
207

208
    /// <inheritdoc/>
209
    public bool ExecuteSinglePass(GracefulCancellationToken cancellationToken, List<Tuple<string, IBasicActivateItems>> completionUIAlerts = null)
210
    {
211
        if (!initialized)
760!
212
            throw new Exception(
×
213
                "Engine has not been initialized, call Initialize(DataFlowPipelineContext context, params object[] initializationObjects");
×
214

215
        T currentChunk;
216
        try
217
        {
218
            currentChunk = Source.GetChunk(_listener, cancellationToken);
760✔
219
        }
732✔
220
        catch (OperationCanceledException)
×
221
        {
222
            throw;
×
223
        }
224
        catch (Exception e)
28✔
225
        {
226
            throw new InvalidOperationException(
28✔
227
                $"Error when attempting to get a chunk from the source component: {Source}", e);
28✔
228
        }
229
        if (currentChunk == null)
732✔
230
        {
231
            _listener.OnNotify(this,
344✔
232
                new NotifyEventArgs(ProgressEventType.Debug,
344✔
233
                    "Received null chunk from the Source component, stopping engine"));
344✔
234
            return false;
344✔
235
        }
236

237
        try {
238
          foreach (var component in Components)
1,036✔
239
          {
240
              if (cancellationToken.IsAbortRequested) break;
130!
241

242
              currentChunk = component.ProcessPipelineData(currentChunk, _listener, cancellationToken);
130✔
243
              if (completionUIAlerts is not null && currentChunk is DataTable dt)
130✔
244
              {
245
                  var uiAlert = (Tuple<string, IBasicActivateItems>)dt.ExtendedProperties["AlertUIAtEndOfProcess"];
128✔
246
                  completionUIAlerts.Add(uiAlert);
128✔
247
              }
248
          }
249

250
          if (cancellationToken.IsAbortRequested) return true;
388!
251

252
          Destination.ProcessPipelineData(currentChunk, _listener, cancellationToken);
388✔
253

254
          if (cancellationToken.IsAbortRequested) return true;
388!
255
        }
388✔
256
        finally {
257
          //if it is a DataTable call .Clear() because Dispose doesn't actually free up any memory
258
          if (currentChunk is DataTable dt2)
388✔
259
              dt2.Clear();
334✔
260

261
          //if the chunk is something that can be disposed, dispose it (e.g. DataTable - to free up memory)
262
          if (currentChunk is IDisposable junk)
388✔
263
  #pragma warning disable
264
              junk.Dispose();
334✔
265
        }
388✔
266

267
        return true;
388✔
268
    }
×
269

270
    /// <summary>
271
    /// Runs checks on all components in the pipeline that support <see cref="ICheckable"/>
272
    /// </summary>
273
    /// <param name="notifier"></param>
274
    public void Check(ICheckNotifier notifier)
275
    {
276
        try
277
        {
278
            if (Source is ICheckable checkableSource)
12!
279
            {
280
                notifier.OnCheckPerformed(new CheckEventArgs(
12✔
281
                    $"About to start checking Source component {checkableSource}",
12✔
282
                    CheckResult.Success));
12✔
283
                checkableSource.Check(notifier);
12✔
284
            }
285
            else
286
            {
287
                notifier.OnCheckPerformed(
×
288
                    new CheckEventArgs(
×
289
                        $"Source component {Source} does not support ICheckable so skipping checking it",
×
290
                        CheckResult.Warning));
×
291
            }
292

293
            foreach (var component in Components)
24!
294
                if (component is ICheckable checkable)
×
295
                {
296
                    notifier.OnCheckPerformed(new CheckEventArgs($"About to start checking component {component}",
×
297
                        CheckResult.Success));
×
298
                    checkable.Check(notifier);
×
299
                }
300
                else
301
                {
302
                    notifier.OnCheckPerformed(
×
303
                        new CheckEventArgs(
×
304
                            $"Component {component} does not support ICheckable so skipping checking it",
×
305
                            CheckResult.Warning));
×
306
                }
307

308
            if (Destination is ICheckable checkableDestination)
12!
309
            {
310
                notifier.OnCheckPerformed(new CheckEventArgs(
12✔
311
                    $"About to start checking Destination component {checkableDestination}",
12✔
312
                    CheckResult.Success));
12✔
313
                checkableDestination.Check(notifier);
12✔
314
            }
315
            else
316
            {
317
                notifier.OnCheckPerformed(
×
318
                    new CheckEventArgs(
×
319
                        $"Destination component {Destination} does not support ICheckable so skipping checking it",
×
320
                        CheckResult.Warning));
×
321
            }
322
        }
×
323
        catch (Exception e)
12✔
324
        {
325
            notifier.OnCheckPerformed(
12✔
326
                new CheckEventArgs(
12✔
327
                    $"{typeof(DataFlowPipelineEngine<T>).Name} Checking failed in an unexpected way",
12✔
328
                    CheckResult.Fail, e));
12✔
329
        }
×
330

331

332
        notifier.OnCheckPerformed(new CheckEventArgs("Finished checking all components", CheckResult.Success));
×
333
    }
×
334

335
    /// <inheritdoc/>
336
    public override string ToString() => _name;
×
337
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc