• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 9974847618

17 Jul 2024 01:09PM UTC coverage: 57.303% (+0.6%) from 56.673%
9974847618

Pull #1888

github

web-flow
Merge branch 'develop' into bugfix-pk-ei-issue
Pull Request #1888: Bugfix: Fix overeager PK setting for extraction Identifiers

11073 of 20788 branches covered (53.27%)

Branch coverage included in aggregate %.

3 of 3 new or added lines in 1 file covered. (100.0%)

39 existing lines in 7 files now uncovered.

31313 of 53180 relevant lines covered (58.88%)

7889.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.61
/Rdmp.Core/DataFlowPipeline/DataFlowPipelineEngine.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Collections.ObjectModel;
10
using System.Data;
11
using System.Linq;
12
using System.Threading;
13
using Rdmp.Core.CommandExecution;
14
using Rdmp.Core.Curation.Data.Pipelines;
15
using Rdmp.Core.DataFlowPipeline.Requirements;
16
using Rdmp.Core.ReusableLibraryCode.Checks;
17
using Rdmp.Core.ReusableLibraryCode.Progress;
18

19
namespace Rdmp.Core.DataFlowPipeline;
20

21
/// <summary>
22
/// Generic implementation of IDataFlowPipelineEngine (See IDataFlowPipelineEngine).  You can create a DataFlowPipelineEngine by manually constructing the context,
23
/// source, destination etc but more often you will want to use an IPipeline configured by the user and an IPipelineUseCase to stamp out the pipeline into an instance
24
/// of the engine (See IDataFlowPipelineEngineFactory).  IPipeline is the user configured set of components they think will achieve a given task.
25
/// </summary>
26
/// <typeparam name="T"></typeparam>
27
public class DataFlowPipelineEngine<T> : IDataFlowPipelineEngine
28
{
29
    private readonly DataFlowPipelineContext<T> _context;
30
    private readonly IDataLoadEventListener _listener;
31

32
    private bool initialized;
33
    private string _name;
34

35
    /// <summary>
36
    /// Readonly cast of <see cref="ComponentObjects"/>. If you need to add components, add them to <see cref="ComponentObjects"/> instead.
37
    /// </summary>
38
    public ReadOnlyCollection<IDataFlowComponent<T>> Components =>
39
        ComponentObjects.Cast<IDataFlowComponent<T>>().ToList().AsReadOnly();
1,046✔
40

41
    /// <summary>
42
    /// The last component in the pipeline, responsible for writing the chunks (of type {T}) to somewhere (E.g. to disk, to database etc)
43
    /// </summary>
44
    public IDataFlowDestination<T> Destination { get; private set; }
1,868✔
45

46
    /// <summary>
47
    /// The first component in the pipeline, responsible for iteratively generating chunks (of type {T}) for feeding to downstream pipeline components
48
    /// </summary>
49
    public IDataFlowSource<T> Source { get; private set; }
2,202✔
50

51
    /// <summary>
52
    /// Middle components of the pipeline, must be <see cref="IDataFlowComponent{T}"/> with T appropriate to the context.
53
    /// </summary>
54
    public List<object> ComponentObjects { get; set; }
1,510✔
55

56
    /// <inheritdoc cref="Destination"/>
57
    public object DestinationObject => Destination;
126✔
58

59
    /// <inheritdoc cref="Source"/>
60
    public object SourceObject => Source;
94✔
61

62
    /// <summary>
63
    /// Creates a new pipeline engine ready to run under the <paramref name="context"/> recording events that occur to <paramref name="listener"/>.
64
    /// </summary>
65
    /// <param name="context"></param>
66
    /// <param name="source"></param>
67
    /// <param name="destination"></param>
68
    /// <param name="listener"></param>
69
    /// <param name="pipelineSource"></param>
70
    public DataFlowPipelineEngine(DataFlowPipelineContext<T> context, IDataFlowSource<T> source,
362✔
71
        IDataFlowDestination<T> destination, IDataLoadEventListener listener,
362✔
72
        IPipeline pipelineSource = null)
362✔
73
    {
74
        Source = source;
362✔
75
        Destination = destination;
362✔
76
        _context = context;
362✔
77
        _listener = listener;
362✔
78
        ComponentObjects = new List<object>();
362✔
79

80
        _name = pipelineSource != null ? pipelineSource.Name : "Undefined pipeline";
362✔
81
    }
362✔
82

83
    /// <inheritdoc/>
84
    public void Initialize(params object[] initializationObjects)
85
    {
86
        _context.PreInitialize(_listener, Source, initializationObjects);
362✔
87

88
        foreach (var component in Components)
928✔
89
            _context.PreInitialize(_listener, component, initializationObjects);
102✔
90

91
        _context.PreInitialize(_listener, Destination, initializationObjects);
362✔
92

93
        initialized = true;
362✔
94
    }
362✔
95

96
    private void UIAlert(string alert, IBasicActivateItems activator)
97
    {
98
        if (!activator.IsInteractive) return;
×
99

100
        new Thread(() =>
×
101
        {
×
102
            // run as a separate thread to not halt the UI
×
103
            activator.Show(alert);
×
104
        })
×
105
        {
×
106
            IsBackground = true
×
107
        }.Start();
×
108
    }
×
109

110
    /// <inheritdoc/>
111
    public void ExecutePipeline(GracefulCancellationToken cancellationToken)
112
    {
113
        Exception exception = null;
334✔
114
        try
115
        {
116
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Debug, "Starting pipeline engine"));
334✔
117

118
            if (!initialized)
334!
119
                throw new Exception(
×
120
                    "Engine has not been initialized, call Initialize(DataFlowPipelineContext context, params object[] initializationObjects");
×
121

122
            var hasMoreData = true;
334✔
123
            List<Tuple<string, IBasicActivateItems>> uiAlerts = new();
334✔
124
            while (!cancellationToken.IsCancellationRequested && hasMoreData)
954✔
125
                hasMoreData = ExecuteSinglePass(cancellationToken, uiAlerts);
648✔
126

127
            if (cancellationToken.IsAbortRequested)
306!
128
            {
129
                Source.Abort(_listener);
×
130

131
                foreach (var c in Components)
×
132
                    c.Abort(_listener);
×
133

134
                Destination.Abort(_listener);
×
135

136
                _listener.OnNotify(this,
×
137
                    new NotifyEventArgs(ProgressEventType.Information, "Pipeline engine aborted"));
×
138
                return;
×
139
            }
140
            foreach (var alert in uiAlerts.Distinct().Where(static alert => alert is not null))
666!
141
                UIAlert(alert.Item1, alert.Item2);
×
142
        }
306✔
143
        catch (Exception e)
28✔
144
        {
145
            exception = e;
28✔
146
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, e.Message, e));
28✔
147
        }
28✔
148
        finally
149
        {
150
            _listener.OnNotify(this,
334✔
151
                new NotifyEventArgs(ProgressEventType.Debug,
334✔
152
                    "Preparing to Dispose of DataFlowPipelineEngine components"));
334✔
153

154
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"About to Dispose {Source}"));
334✔
155
            try
156
            {
157
                Source.Dispose(_listener, exception);
334✔
158
            }
334✔
159
            catch (Exception e)
×
160
            {
161
                //dispose crashing is only a deal-breaker if there wasn't already an exception in the pipeline
162
                if (exception == null)
×
163
                    throw;
×
164

165
                _listener.OnNotify(Source,
×
166
                    new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Source Component", e));
×
167
            }
×
168

169
            foreach (var dataLoadComponent in Components)
872✔
170
            {
171
                _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace,
102✔
172
                    $"About to Dispose {dataLoadComponent}"));
102✔
173
                try
174
                {
175
                    dataLoadComponent.Dispose(_listener, exception);
102✔
176
                }
102✔
177
                catch (Exception e)
×
178
                {
179
                    //dispose crashing is only a deal-breaker if there wasn't already an exception in the pipeline
180
                    if (exception == null)
×
181
                        throw;
×
182

183
                    _listener.OnNotify(dataLoadComponent,
×
184
                        new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Component", e));
×
185
                }
×
186
            }
187

188
            _listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Trace, $"About to Dispose {Destination}"));
334✔
189
            try
190
            {
191
                Destination.Dispose(_listener, exception);
334✔
192
            }
334✔
193
            catch (Exception e)
×
194
            {
195
                //dispose crashing is only a dealbreaker if there wasn't already an exception in the pipeline
196
                if (exception == null)
×
197
                    throw;
×
198

199
                _listener.OnNotify(Destination,
×
200
                    new NotifyEventArgs(ProgressEventType.Error, "Error Disposing Destination Component", e));
×
201
            }
×
202
        }
334✔
203

204
        if (exception != null)
334✔
205
            throw new PipelineCrashedException("Data Flow Pipeline Crashed", exception);
28✔
206
    }
306✔
207

208
    /// <inheritdoc/>
209
    public bool ExecuteSinglePass(GracefulCancellationToken cancellationToken, List<Tuple<string, IBasicActivateItems>> completionUIAlerts = null)
210
    {
211
        if (!initialized)
688!
212
            throw new Exception(
×
213
                "Engine has not been initialized, call Initialize(DataFlowPipelineContext context, params object[] initializationObjects");
×
214

215
        T currentChunk;
216
        try
217
        {
218
            currentChunk = Source.GetChunk(_listener, cancellationToken);
688✔
219
        }
660✔
220
        catch (OperationCanceledException)
×
221
        {
222
            throw;
×
223
        }
224
        catch (Exception e)
28✔
225
        {
226
            throw new InvalidOperationException(
28✔
227
                $"Error when attempting to get a chunk from the source component: {Source}", e);
28✔
228
        }
229
        if (currentChunk == null)
660✔
230
        {
231
            _listener.OnNotify(this,
310✔
232
                new NotifyEventArgs(ProgressEventType.Debug,
310✔
233
                    "Received null chunk from the Source component, stopping engine"));
310✔
234
            return false;
310✔
235
        }
236

237
        try {
238
          foreach (var component in Components)
904✔
239
          {
240
              if (cancellationToken.IsAbortRequested) break;
102!
241

242
              currentChunk = component.ProcessPipelineData(currentChunk, _listener, cancellationToken);
102✔
243
              if (completionUIAlerts is not null && currentChunk is DataTable dt)
102✔
244
              {
245
                  var uiAlert = (Tuple<string, IBasicActivateItems>)dt.ExtendedProperties["AlertUIAtEndOfProcess"];
100✔
246
                  completionUIAlerts.Add(uiAlert);
100✔
247
              }
248
          }
249

250
          if (cancellationToken.IsAbortRequested) return true;
350!
251

252
          Destination.ProcessPipelineData(currentChunk, _listener, cancellationToken);
350✔
253

254
          if (cancellationToken.IsAbortRequested) return true;
350!
255
        }
350✔
256
        finally {
257
          //if it is a DataTable call .Clear() because Dispose doesn't actually free up any memory
258
          if (currentChunk is DataTable dt2)
350✔
259
              dt2.Clear();
304✔
260

261
          //if the chunk is something that can be disposed, dispose it (e.g. DataTable - to free up memory)
262
          if (currentChunk is IDisposable junk)
350✔
263
  #pragma warning disable
264
              junk.Dispose();
304✔
265
        }
350✔
266

267
        return true;
350✔
268
    }
×
269

270
    /// <summary>
271
    /// Runs checks on all components in the pipeline that support <see cref="ICheckable"/>
272
    /// </summary>
273
    /// <param name="notifier"></param>
274
    public void Check(ICheckNotifier notifier)
275
    {
276
        try
277
        {
278
            if (Source is ICheckable checkableSource)
×
279
            {
280
                notifier.OnCheckPerformed(new CheckEventArgs(
×
281
                    $"About to start checking Source component {checkableSource}",
×
282
                    CheckResult.Success));
×
283
                checkableSource.Check(notifier);
×
284
            }
285
            else
286
            {
287
                notifier.OnCheckPerformed(
×
288
                    new CheckEventArgs(
×
289
                        $"Source component {Source} does not support ICheckable so skipping checking it",
×
290
                        CheckResult.Warning));
×
291
            }
292

293
            foreach (var component in Components)
×
294
                if (component is ICheckable checkable)
×
295
                {
296
                    notifier.OnCheckPerformed(new CheckEventArgs($"About to start checking component {component}",
×
297
                        CheckResult.Success));
×
298
                    checkable.Check(notifier);
×
299
                }
300
                else
301
                {
302
                    notifier.OnCheckPerformed(
×
303
                        new CheckEventArgs(
×
304
                            $"Component {component} does not support ICheckable so skipping checking it",
×
305
                            CheckResult.Warning));
×
306
                }
307

308
            if (Destination is ICheckable checkableDestination)
×
309
            {
310
                notifier.OnCheckPerformed(new CheckEventArgs(
×
311
                    $"About to start checking Destination component {checkableDestination}",
×
312
                    CheckResult.Success));
×
313
                checkableDestination.Check(notifier);
×
314
            }
315
            else
316
            {
317
                notifier.OnCheckPerformed(
×
318
                    new CheckEventArgs(
×
319
                        $"Destination component {Destination} does not support ICheckable so skipping checking it",
×
320
                        CheckResult.Warning));
×
321
            }
322
        }
×
323
        catch (Exception e)
×
324
        {
325
            notifier.OnCheckPerformed(
×
326
                new CheckEventArgs(
×
327
                    $"{typeof(DataFlowPipelineEngine<T>).Name} Checking failed in an unexpected way",
×
328
                    CheckResult.Fail, e));
×
329
        }
×
330

331

332
        notifier.OnCheckPerformed(new CheckEventArgs("Finished checking all components", CheckResult.Success));
×
333
    }
×
334

335
    /// <inheritdoc/>
UNCOV
336
    public override string ToString() => _name;
×
337
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc