• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 20714560420

05 Jan 2026 11:53AM UTC coverage: 57.198% (-0.2%) from 57.378%
20714560420

push

github

JFriel
update deps

11495 of 21585 branches covered (53.25%)

Branch coverage included in aggregate %.

32571 of 55456 relevant lines covered (58.73%)

17789.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.2
/Rdmp.Core/DataExport/DataExtraction/Pipeline/ExtractionPipelineUseCase.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Data;
9
using System.Linq;
10
using Rdmp.Core.CommandExecution;
11
using Rdmp.Core.Curation.Data.Pipelines;
12
using Rdmp.Core.DataExport.Data;
13
using Rdmp.Core.DataExport.DataExtraction.Commands;
14
using Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations;
15
using Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
16
using Rdmp.Core.DataFlowPipeline;
17
using Rdmp.Core.DataFlowPipeline.Requirements;
18
using Rdmp.Core.Logging;
19
using Rdmp.Core.Reports.ExtractionTime;
20
using Rdmp.Core.Repositories;
21
using Rdmp.Core.ReusableLibraryCode;
22
using Rdmp.Core.ReusableLibraryCode.Progress;
23

24
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline;
25

26
/// <summary>
27
/// Use case for linking and extracting Project Extraction Configuration datasets and custom data (See IExtractCommand).
28
/// </summary>
29
public sealed class ExtractionPipelineUseCase : PipelineUseCase
30
{
31
    private readonly IPipeline _pipeline;
32
    private readonly DataLoadInfo _dataLoadInfo;
33

34
    public IExtractCommand ExtractCommand { get; set; }
1,160✔
35
    public ExecuteDatasetExtractionSource Source { get; private set; }
1,696✔
36

37
    public GracefulCancellationToken Token { get; set; }
406✔
38

39
    /// <summary>
40
    /// If Destination is an IExecuteDatasetExtractionDestination then it will be initialized properly with the configuration, cohort etc otherwise the destination will have to react properly
41
    /// / dynamically based on what comes down the pipeline just like it would normally e.g. SqlBulkInsertDestination would be a logically permissible destination for an ExtractionPipeline
42
    /// </summary>
43
    public IExecuteDatasetExtractionDestination Destination { get; private set; }
796✔
44

45
    public ExtractionPipelineUseCase(IBasicActivateItems activator, IProject project, IExtractCommand extractCommand,
132✔
46
        IPipeline pipeline, DataLoadInfo dataLoadInfo)
132✔
47
    {
48
        _dataLoadInfo = dataLoadInfo;
132✔
49
        ExtractCommand = extractCommand;
132✔
50
        _pipeline = pipeline;
132✔
51

52
        extractCommand.ElevateState(ExtractCommandState.NotLaunched);
132✔
53

54
        AddInitializationObject(ExtractCommand);
132✔
55
        AddInitializationObject(project);
132✔
56
        AddInitializationObject(_dataLoadInfo);
132✔
57
        AddInitializationObject(project.DataExportRepository.CatalogueRepository);
132✔
58
        AddInitializationObject(activator);
132✔
59

60
        GenerateContext();
132✔
61
    }
132✔
62

63

64
    protected override IDataFlowPipelineContext GenerateContextImpl()
65
    {
66
        //create the context using the standard context factory
67
        var contextFactory = new DataFlowPipelineContextFactory<DataTable>();
1,448✔
68
        var context = contextFactory.Create(PipelineUsage.LogsToTableLoadInfo);
1,448✔
69

70
        //adjust context: we want a destination requirement of IExecuteDatasetExtractionDestination
71
        context.MustHaveDestination =
1,448✔
72
            typeof(IExecuteDatasetExtractionDestination); //we want this freaky destination type
1,448✔
73
        context.MustHaveSource = typeof(ExecuteDatasetExtractionSource);
1,448✔
74

75
        return context;
1,448✔
76
    }
77

78
    public void Execute(IDataLoadEventListener listener)
79
    {
80
        if (ExtractCommand is ExtractDatasetCommand eds)
128✔
81
        {
82
            bool runAgain;
83
            var totalFailureCount = 0;
94✔
84
            var consecutiveFailureCount = 0;
94✔
85

86
            do
87
            {
88
                Token?.ThrowIfStopRequested();
94✔
89
                Token?.ThrowIfAbortRequested();
94✔
90

91
                bool runSuccessful;
92
                try
93
                {
94
                    runSuccessful = ExecuteOnce(listener);
94✔
95
                }
92✔
96
                catch (Exception)
2✔
97
                {
98
                    runSuccessful = false;
2✔
99
                }
2✔
100

101
                if (runSuccessful)
94✔
102
                {
103
                    runAgain = IncrementProgressIfAny(eds, listener);
92✔
104
                    consecutiveFailureCount = 0;
92✔
105

106
                    if (runAgain)
92!
107
                        listener.OnNotify(this,
×
108
                            new NotifyEventArgs(ProgressEventType.Information,
×
109
                                "Running pipeline again for next batch in ExtractionProgress"));
×
110
                }
111
                else
112
                {
113
                    totalFailureCount++;
2✔
114
                    consecutiveFailureCount++;
2✔
115

116
                    runAgain = ShouldRetry(eds, listener, totalFailureCount, consecutiveFailureCount);
2✔
117

118
                    if (runAgain)
2!
119
                        listener.OnNotify(this,
×
120
                            new NotifyEventArgs(ProgressEventType.Information, "Retrying pipeline"));
×
121
                }
122
            } while (runAgain);
94✔
123
        }
124
        else
125
        {
126
            ExecuteOnce(listener);
34✔
127
        }
128
    }
34✔
129

130
    /// <summary>
131
    /// Inspects the <paramref name="extractDatasetCommand"/> to see if it is a batch load that has
132
    /// only done part of its full execution.  If so then progress will be recorded and true will be returned
133
    /// (i.e. run again).
134
    /// </summary>
135
    /// <returns></returns>
136
    /// <exception cref="Exception"></exception>
137
    private bool IncrementProgressIfAny(ExtractDatasetCommand extractDatasetCommand, IDataLoadEventListener listener)
138
    {
139
        var progress = extractDatasetCommand.SelectedDataSets.ExtractionProgressIfAny;
92✔
140

141
        if (progress == null)
92✔
142
            return false;
90✔
143

144
        // if load ended successfully and it is a batch load
145
        if (extractDatasetCommand.BatchEnd != null)
2!
146
        {
147
            // update our progress
148
            progress.ProgressDate = extractDatasetCommand.BatchEnd.Value;
2✔
149
            progress.SaveToDatabase();
2✔
150
            listener.OnNotify(this,
2✔
151
                new NotifyEventArgs(ProgressEventType.Information,
2✔
152
                    $"Saving batch extraction progress as {progress.ProgressDate}"));
2✔
153

154
            if (progress.MoreToFetch())
2!
155
            {
156
                // clear the query builder so it can be rebuilt for the new dates
157
                extractDatasetCommand.Reset();
×
158
                return true;
×
159
            }
160

161
            return false;
2✔
162
        }
163

164
        return false;
×
165
    }
166

167
    /// <summary>
168
    /// Returns whether to retry the extraction.  This method may perform a wait operation
169
    /// before returning true.
170
    /// </summary>
171
    /// <param name="extractDatasetCommand"></param>
172
    /// <param name="listener"></param>
173
    /// <param name="totalFailureCount"></param>
174
    /// <param name="consecutiveFailureCount"></param>
175
    /// <returns></returns>
176
    private bool ShouldRetry(ExtractDatasetCommand extractDatasetCommand, IDataLoadEventListener listener,
177
        int totalFailureCount, int consecutiveFailureCount)
178
    {
179
        var progress = extractDatasetCommand.SelectedDataSets.ExtractionProgressIfAny;
2✔
180

181
        return progress?.ApplyRetryWaitStrategy(Token, listener, totalFailureCount, consecutiveFailureCount) == true;
2!
182
    }
183

184

185
    /// <summary>
186
    /// Runs the extraction once and returns true if it was success otherwise false
187
    /// </summary>
188
    /// <param name="listener"></param>
189
    /// <returns></returns>
190
    private bool ExecuteOnce(IDataLoadEventListener listener)
191
    {
192
        try
193
        {
194
            ExtractCommand.ElevateState(ExtractCommandState.WaitingToExecute);
128✔
195

196
            listener.OnNotify(this,
128✔
197
                new NotifyEventArgs(ProgressEventType.Information,
128✔
198
                    $"Running Extraction {ExtractCommand} with Pipeline {_pipeline.Name} (ID={_pipeline.ID})"));
128✔
199

200
            var engine = GetEngine(_pipeline, listener);
128✔
201

202
            try
203
            {
204
                engine.ExecutePipeline(Token ?? new GracefulCancellationToken());
128✔
205
                listener.OnNotify(Destination, new NotifyEventArgs(ProgressEventType.Information,
126✔
206
                    $"Extraction completed successfully into : {Destination.GetDestinationDescription()}"));
126✔
207
            }
126✔
208
            catch (Exception e)
2✔
209
            {
210
                ExtractCommand.ElevateState(ExtractCommandState.Crashed);
2✔
211
                _dataLoadInfo.LogFatalError("Execute extraction pipeline",
2✔
212
                    ExceptionHelper.ExceptionToListOfInnerMessages(e, true));
2✔
213

214
                if (ExtractCommand is ExtractDatasetCommand command)
2!
215
                {
216
                    //audit to extraction results
217
                    var result = command.CumulativeExtractionResults;
2✔
218
                    result.Exception = ExceptionHelper.ExceptionToListOfInnerMessages(e, true);
2✔
219
                    result.SaveToDatabase();
2✔
220
                }
221
                else
222
                {
223
                    //audit to extraction results
224
                    var result = (ExtractCommand as ExtractGlobalsCommand).ExtractionResults;
×
225
                    foreach (var extractionResults in result)
×
226
                    {
227
                        extractionResults.Exception = ExceptionHelper.ExceptionToListOfInnerMessages(e, true);
×
228
                        extractionResults.SaveToDatabase();
×
229
                    }
230
                }
231

232
                //throw so it can be audited to UI (triple audit yay!)
233
                throw new Exception("An error occurred while executing pipeline", e);
2✔
234
            }
235

236
            if (Source == null)
126!
237
                throw new Exception("Execute Pipeline completed without Exception but Source was null somehow?!");
×
238

239
            if (Source.WasCancelled)
126!
240
            {
241
                Destination.TableLoadInfo.DataLoadInfoParent.LogFatalError(GetType().Name, "User Cancelled Extraction");
×
242
                ExtractCommand.ElevateState(ExtractCommandState.UserAborted);
×
243

244
                if (ExtractCommand is ExtractDatasetCommand command)
×
245
                {
246
                    //audit to extraction results
247
                    var result = command.CumulativeExtractionResults;
×
248
                    result.Exception = "User Cancelled Extraction";
×
249
                    result.SaveToDatabase();
×
250
                }
251
                else
252
                {
253
                    //audit to extraction results
254
                    var result = (ExtractCommand as ExtractGlobalsCommand).ExtractionResults;
×
255
                    foreach (var extractionResults in result)
×
256
                    {
257
                        extractionResults.Exception = "User Cancelled Extraction";
×
258
                        extractionResults.SaveToDatabase();
×
259
                    }
260
                }
261
            }
262
        }
126✔
263
        catch (Exception ex)
2✔
264
        {
265
            listener.OnNotify(this,
2✔
266
                new NotifyEventArgs(ProgressEventType.Error, "Execute pipeline failed with Exception", ex));
2✔
267
            ExtractCommand.ElevateState(ExtractCommandState.Crashed);
×
268
        }
×
269

270
        //if it didn't crash / get aborted etc
271
        if (ExtractCommand.State < ExtractCommandState.WritingMetadata)
126!
272
        {
273
            if (ExtractCommand is ExtractDatasetCommand)
126✔
274
                WriteMetadata(listener);
92✔
275
            else
276
                ExtractCommand.ElevateState(ExtractCommandState.Completed);
34✔
277
        }
278
        else
279
        {
280
            return false; // it crashed or was aborted etc
×
281
        }
282

283
        return true;
126✔
284
    }
285

286
    public override IDataFlowPipelineEngine GetEngine(IPipeline pipeline, IDataLoadEventListener listener)
287
    {
288
        var engine = base.GetEngine(pipeline, listener);
132✔
289

290
        Destination =
132✔
291
            (IExecuteDatasetExtractionDestination)engine
132✔
292
                .DestinationObject; //record the destination that was created as part of the Pipeline configured
132✔
293
        Source = (ExecuteDatasetExtractionSource)engine.SourceObject;
132✔
294

295
        return engine;
132✔
296
    }
297

298
    private void WriteMetadata(IDataLoadEventListener listener)
299
    {
300
        ExtractCommand.ElevateState(ExtractCommandState.WritingMetadata);
92✔
301
        WordDataWriter wordDataWriter;
302

303

304
        try
305
        {
306
            wordDataWriter = new WordDataWriter(this);
92✔
307
            wordDataWriter.GenerateWordFile(); //run the report
92✔
308
        }
92✔
309
        catch (Exception e)
×
310
        {
311
            //something about the pipeline resulted i a known unsupported state (e.g. extracting to a database) so we can't use WordDataWritter with this
312
            // tell user that we could not run the report and set the status to warning
313
            ExtractCommand.ElevateState(ExtractCommandState.Warning);
×
314

315
            listener.OnNotify(this,
×
316
                new NotifyEventArgs(ProgressEventType.Error, "Word metadata document NOT CREATED", e));
×
317
            return;
×
318
        }
319
        try
320
        {
321
            var datasetVariableReportWriter = new DatasetVariableReportGenerator(this);
92✔
322
            datasetVariableReportWriter.GenerateDatasetVariableReport();
92✔
323
        }
92✔
324
        catch (Exception e)
×
325
        {
326
            ExtractCommand.ElevateState(ExtractCommandState.Warning);
×
327

328
            listener.OnNotify(this,
×
329
                new NotifyEventArgs(ProgressEventType.Error, "Dataset variable document NOT CREATED", e));
×
330
            return;
×
331
        }
332

333
        //if there were any exceptions
334
        if (wordDataWriter.ExceptionsGeneratingWordFile.Any())
92!
335
        {
336
            ExtractCommand.ElevateState(ExtractCommandState.Warning);
×
337

338
            foreach (var e in wordDataWriter.ExceptionsGeneratingWordFile)
×
339
                listener.OnNotify(wordDataWriter,
×
340
                    new NotifyEventArgs(ProgressEventType.Warning, "Word metadata document creation caused exception",
×
341
                        e));
×
342
        }
343
        else
344
        {
345
            ExtractCommand.ElevateState(ExtractCommandState.Completed);
92✔
346
        }
347
    }
92✔
348

349
    private ExtractionPipelineUseCase()
350
        : base(new Type[]
1,316✔
351
        {
1,316✔
352
            typeof(IExtractCommand),
1,316✔
353
            typeof(IProject),
1,316✔
354
            typeof(DataLoadInfo),
1,316✔
355
            typeof(ICatalogueRepository),
1,316✔
356
            typeof(IBasicActivateItems)
1,316✔
357
        })
1,316✔
358
    {
359
        GenerateContext();
1,316✔
360
    }
1,316✔
361

362
    public static ExtractionPipelineUseCase DesignTime() => new();
1,316✔
363
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc