• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 6245535001

20 Sep 2023 07:44AM UTC coverage: 57.013%. First build
6245535001

push

github

web-flow
8.1.0 Release (#1628)

* Bump Newtonsoft.Json from 13.0.1 to 13.0.2

Bumps [Newtonsoft.Json](https://github.com/JamesNK/Newtonsoft.Json) from 13.0.1 to 13.0.2.
- [Release notes](https://github.com/JamesNK/Newtonsoft.Json/releases)
- [Commits](https://github.com/JamesNK/Newtonsoft.Json/compare/13.0.1...13.0.2)

---
updated-dependencies:
- dependency-name: Newtonsoft.Json
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump NLog from 5.0.5 to 5.1.0

Bumps [NLog](https://github.com/NLog/NLog) from 5.0.5 to 5.1.0.
- [Release notes](https://github.com/NLog/NLog/releases)
- [Changelog](https://github.com/NLog/NLog/blob/dev/CHANGELOG.md)
- [Commits](https://github.com/NLog/NLog/compare/v5.0.5...v5.1.0)

---
updated-dependencies:
- dependency-name: NLog
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump NLog from 5.0.5 to 5.1.0

* Fix -r flag - should have been --results-directory all along

* Bump Newtonsoft.Json from 13.0.1 to 13.0.2

* Bump YamlDotNet from 12.0.2 to 12.1.0

Bumps [YamlDotNet](https://github.com/aaubry/YamlDotNet) from 12.0.2 to 12.1.0.
- [Release notes](https://github.com/aaubry/YamlDotNet/releases)
- [Commits](https://github.com/aaubry/YamlDotNet/compare/v12.0.2...v12.1.0)

---
updated-dependencies:
- dependency-name: YamlDotNet
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump Moq from 4.18.2 to 4.18.3

Bumps [Moq](https://github.com/moq/moq4) from 4.18.2 to 4.18.3.
- [Release notes](https://github.com/moq/moq4/releases)
- [Changelog](https://github.com/moq/moq4/blob/main/CHANGELOG.md)
- [Commits](https://github.com/moq/moq4/compare/v4.18.2...v4.18.3)

---
updated-dependencies:
- dependency-name: Moq
... (continued)

10732 of 20257 branches covered (0.0%)

Branch coverage included in aggregate %.

48141 of 48141 new or added lines in 1086 files covered. (100.0%)

30685 of 52388 relevant lines covered (58.57%)

7387.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.13
/Rdmp.Core/DataFlowPipeline/DataFlowPipelineEngine.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Collections.ObjectModel;
10
using System.Data;
11
using System.Linq;
12
using Rdmp.Core.Curation.Data.Pipelines;
13
using Rdmp.Core.DataFlowPipeline.Requirements;
14
using Rdmp.Core.ReusableLibraryCode.Checks;
15
using Rdmp.Core.ReusableLibraryCode.Progress;
16

17
namespace Rdmp.Core.DataFlowPipeline;
18

19
/// <summary>
20
/// Generic implementation of IDataFlowPipelineEngine (See IDataFlowPipelineEngine).  You can create a DataFlowPipelineEngine by manually constructing the context,
21
/// source, destination etc but more often you will want to use an IPipeline configured by the user and an IPipelineUseCase to stamp out the pipeline into an instance
22
/// of the engine (See IDataFlowPipelineEngineFactory).  IPipeline is the user configured set of components they think will achieve a given task.
23
/// </summary>
24
/// <typeparam name="T"></typeparam>
25
public class DataFlowPipelineEngine<T> : IDataFlowPipelineEngine
26
{
27
    private readonly DataFlowPipelineContext<T> _context;
28
    private readonly IDataLoadEventListener _listener;
29

30
    private bool initialized;
31
    private string _name;
32

33
    /// <summary>
34
    /// Readonly cast of <see cref="ComponentObjects"/>. If you need to add components, add them to <see cref="ComponentObjects"/> instead.
35
    /// </summary>
36
    public ReadOnlyCollection<IDataFlowComponent<T>> Components =>
37
        ComponentObjects.Cast<IDataFlowComponent<T>>().ToList().AsReadOnly();
492✔
38

39
    /// <summary>
40
    /// The last component in the pipeline, responsible for writing the chunks (of type {T}) to somewhere (E.g. to disk, to database etc)
41
    /// </summary>
42
    public IDataFlowDestination<T> Destination { get; private set; }
872✔
43

44
    /// <summary>
45
    /// The first component in the pipeline, responsible for iteratively generating chunks (of type {T}) for feeding to downstream pipeline components
46
    /// </summary>
47
    public IDataFlowSource<T> Source { get; private set; }
1,002✔
48

49
    /// <summary>
50
    /// Middle components of the pipeline, must be <see cref="IDataFlowComponent{T}"/> with T appropriate to the context.
51
    /// </summary>
52
    public List<object> ComponentObjects { get; set; }
762✔
53

54
    /// <inheritdoc cref="Destination"/>
55
    public object DestinationObject => Destination;
72✔
56

57
    /// <inheritdoc cref="Source"/>
58
    public object SourceObject => Source;
54✔
59

60
    /// <summary>
61
    /// Creates a new pipeline engine ready to run under the <paramref name="context"/> recording events that occur to <paramref name="listener"/>.
62
    /// </summary>
63
    /// <param name="context"></param>
64
    /// <param name="source"></param>
65
    /// <param name="destination"></param>
66
    /// <param name="listener"></param>
67
    /// <param name="pipelineSource"></param>
68
    public DataFlowPipelineEngine(DataFlowPipelineContext<T> context, IDataFlowSource<T> source,
168✔
69
        IDataFlowDestination<T> destination, IDataLoadEventListener listener,
168✔
70
        IPipeline pipelineSource = null)
168✔
71
    {
72
        Source = source;
168✔
73
        Destination = destination;
168✔
74
        _context = context;
168✔
75
        _listener = listener;
168✔
76
        ComponentObjects = new List<object>();
168✔
77

78
        _name = pipelineSource != null ? pipelineSource.Name : "Undefined pipeline";
168✔
79
    }
168✔
80

81
    /// <inheritdoc/>
82
    public void Initialize(params object[] initializationObjects)
83
    {
84
        _context.PreInitialize(_listener, Source, initializationObjects);
168✔
85

86
        foreach (var component in Components)
540✔
87
            _context.PreInitialize(_listener, component, initializationObjects);
102✔
88

89
        _context.PreInitialize(_listener, Destination, initializationObjects);
168✔
90

91
        initialized = true;
168✔
92
    }
168✔
93

94
    /// <inheritdoc/>
95
    public void ExecutePipeline(GracefulCancellationToken cancellationToken)
96
    {
97
        Exception exception = null;
140✔
98
        try
99
        {
100
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Debug, "Starting pipeline engine"));
140✔
101

102
            if (!initialized)
140!
103
                throw new Exception(
×
104
                    "Engine has not been initialized, call Initialize(DataFlowPipelineContext context, params object[] initializationObjects");
×
105

106
            var hasMoreData = true;
140✔
107
            while (!cancellationToken.IsCancellationRequested && hasMoreData)
420✔
108
                hasMoreData = ExecuteSinglePass(cancellationToken);
284✔
109

110
            if (cancellationToken.IsAbortRequested)
136!
111
            {
112
                Source.Abort(_listener);
×
113

114
                foreach (var c in Components)
×
115
                    c.Abort(_listener);
×
116

117
                Destination.Abort(_listener);
×
118

119
                _listener.OnNotify(this,
×
120
                    new NotifyEventArgs(ProgressEventType.Information, "Pipeline engine aborted"));
×
121
                return;
×
122
            }
123
        }
136✔
124
        catch (Exception e)
4✔
125
        {
126
            exception = e;
4✔
127
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, e.Message, e));
4✔
128
        }
4✔
129
        finally
130
        {
131
            _listener.OnNotify(this,
140✔
132
                new NotifyEventArgs(ProgressEventType.Debug,
140✔
133
                    "Preparing to Dispose of DataFlowPipelineEngine components"));
140✔
134

135
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"About to Dispose {Source}"));
140✔
136
            try
137
            {
138
                Source.Dispose(_listener, exception);
140✔
139
            }
140✔
140
            catch (Exception e)
×
141
            {
142
                //dispose crashing is only a dealbreaker if there wasn't already an exception in the pipeline
143
                if (exception == null)
×
144
                    throw;
×
145

146
                _listener.OnNotify(Source,
×
147
                    new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Source Component", e));
×
148
            }
×
149

150
            foreach (var dataLoadComponent in Components)
484✔
151
            {
152
                _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace,
102✔
153
                    $"About to Dispose {dataLoadComponent}"));
102✔
154
                try
155
                {
156
                    dataLoadComponent.Dispose(_listener, exception);
102✔
157
                }
102✔
158
                catch (Exception e)
×
159
                {
160
                    //dispose crashing is only a dealbreaker if there wasn't already an exception in the pipeline
161
                    if (exception == null)
×
162
                        throw;
×
163

164
                    _listener.OnNotify(dataLoadComponent,
×
165
                        new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Component", e));
×
166
                }
×
167
            }
168

169
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"About to Dispose {Destination}"));
140✔
170
            try
171
            {
172
                Destination.Dispose(_listener, exception);
140✔
173
            }
140✔
174
            catch (Exception e)
×
175
            {
176
                //dispose crashing is only a dealbreaker if there wasn't already an exception in the pipeline
177
                if (exception == null)
×
178
                    throw;
×
179

180
                _listener.OnNotify(Destination,
×
181
                    new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Destination Component", e));
×
182
            }
×
183
        }
140✔
184

185
        if (exception != null)
140✔
186
            throw new PipelineCrashedException("Data Flow Pipeline Crashed", exception);
4✔
187
    }
136✔
188

189
    /// <inheritdoc/>
190
    public bool ExecuteSinglePass(GracefulCancellationToken cancellationToken)
191
    {
192
        if (!initialized)
328!
193
            throw new Exception(
×
194
                "Engine has not been initialized, call Initialize(DataFlowPipelineContext context, params object[] initializationObjects");
×
195

196
        T currentChunk;
197
        try
198
        {
199
            currentChunk = Source.GetChunk(_listener, cancellationToken);
328✔
200
        }
324✔
201
        catch (OperationCanceledException)
×
202
        {
203
            throw;
×
204
        }
205
        catch (Exception e)
4✔
206
        {
207
            throw new InvalidOperationException(
4✔
208
                $"Error when attempting to get a chunk from the source component: {Source}", e);
4✔
209
        }
210

211
        if (currentChunk == null)
324✔
212
        {
213
            _listener.OnNotify(this,
140✔
214
                new NotifyEventArgs(ProgressEventType.Debug,
140✔
215
                    "Received null chunk from the Source component, stopping engine"));
140✔
216
            return false;
140✔
217
        }
218

219
        foreach (var component in Components)
572✔
220
        {
221
            if (cancellationToken.IsAbortRequested) break;
102!
222
            currentChunk = component.ProcessPipelineData(currentChunk, _listener, cancellationToken);
102✔
223
        }
224

225
        if (cancellationToken.IsAbortRequested) return true;
184!
226
        Destination.ProcessPipelineData(currentChunk, _listener, cancellationToken);
184✔
227

228
        if (cancellationToken.IsAbortRequested) return true;
184!
229

230
        //if it is a DataTable call .Clear() because Dispose doesn't actually free up any memory
231
        if (typeof(DataTable).IsAssignableFrom(typeof(T)))
184✔
232
            ((DataTable)(object)currentChunk).Clear();
134✔
233

234

235
        //if the chunk is something that can be disposed, dispose it (e.g. DataTable - to free up memory)
236
        if (typeof(IDisposable).IsAssignableFrom(typeof(T)))
184✔
237
            ((IDisposable)currentChunk).Dispose();
134✔
238

239
        return true;
184✔
240
    }
241

242
    /// <summary>
243
    /// Runs checks on all components in the pipeline that support <see cref="ICheckable"/>
244
    /// </summary>
245
    /// <param name="notifier"></param>
246
    public void Check(ICheckNotifier notifier)
247
    {
248
        try
249
        {
250
            if (Source is ICheckable checkableSource)
×
251
            {
252
                notifier.OnCheckPerformed(new CheckEventArgs(
×
253
                    $"About to start checking Source component {checkableSource}",
×
254
                    CheckResult.Success));
×
255
                checkableSource.Check(notifier);
×
256
            }
257
            else
258
            {
259
                notifier.OnCheckPerformed(
×
260
                    new CheckEventArgs(
×
261
                        $"Source component {Source} does not support ICheckable so skipping checking it",
×
262
                        CheckResult.Warning));
×
263
            }
264

265
            foreach (var component in Components)
×
266
                if (component is ICheckable checkable)
×
267
                {
268
                    notifier.OnCheckPerformed(new CheckEventArgs($"About to start checking component {component}",
×
269
                        CheckResult.Success));
×
270
                    checkable.Check(notifier);
×
271
                }
272
                else
273
                {
274
                    notifier.OnCheckPerformed(
×
275
                        new CheckEventArgs(
×
276
                            $"Component {component} does not support ICheckable so skipping checking it",
×
277
                            CheckResult.Warning));
×
278
                }
279

280
            if (Destination is ICheckable checkableDestination)
×
281
            {
282
                notifier.OnCheckPerformed(new CheckEventArgs(
×
283
                    $"About to start checking Destination component {checkableDestination}",
×
284
                    CheckResult.Success));
×
285
                checkableDestination.Check(notifier);
×
286
            }
287
            else
288
            {
289
                notifier.OnCheckPerformed(
×
290
                    new CheckEventArgs(
×
291
                        $"Destination component {Destination} does not support ICheckable so skipping checking it",
×
292
                        CheckResult.Warning));
×
293
            }
294
        }
×
295
        catch (Exception e)
×
296
        {
297
            notifier.OnCheckPerformed(
×
298
                new CheckEventArgs(
×
299
                    $"{typeof(DataFlowPipelineEngine<T>).Name} Checking failed in an unexpected way",
×
300
                    CheckResult.Fail, e));
×
301
        }
×
302

303

304
        notifier.OnCheckPerformed(new CheckEventArgs("Finished checking all components", CheckResult.Success));
×
305
    }
×
306

307
    /// <inheritdoc/>
308
    public override string ToString() => _name;
×
309
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc