• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 20057586828

09 Dec 2025 08:56AM UTC coverage: 57.193% (-0.2%) from 57.422%
20057586828

Pull #2182

github

JFriel
Merge branch 'develop' of https://github.com/HicServices/RDMP into task/RDMP-33-dataset-integration-interface
Pull Request #2182: [Datasets] Task/rdmp 33 dataset integration interface

11510 of 21615 branches covered (53.25%)

Branch coverage included in aggregate %.

1226 of 2024 new or added lines in 85 files covered. (60.57%)

35 existing lines in 15 files now uncovered.

32654 of 55604 relevant lines covered (58.73%)

8854.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.2
/Rdmp.Core/DataExport/DataExtraction/Pipeline/ExtractionPipelineUseCase.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Data;
9
using System.Linq;
10
using Rdmp.Core.CommandExecution;
11
using Rdmp.Core.Curation.Data.Pipelines;
12
using Rdmp.Core.DataExport.Data;
13
using Rdmp.Core.DataExport.DataExtraction.Commands;
14
using Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations;
15
using Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
16
using Rdmp.Core.DataFlowPipeline;
17
using Rdmp.Core.DataFlowPipeline.Requirements;
18
using Rdmp.Core.Logging;
19
using Rdmp.Core.Reports.ExtractionTime;
20
using Rdmp.Core.Repositories;
21
using Rdmp.Core.ReusableLibraryCode;
22
using Rdmp.Core.ReusableLibraryCode.Progress;
23

24
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline;
25

26
/// <summary>
27
/// Use case for linking and extracting Project Extraction Configuration datasets and custom data (See IExtractCommand).
28
/// </summary>
29
public sealed class ExtractionPipelineUseCase : PipelineUseCase
30
{
31
    private readonly IPipeline _pipeline;
32
    private readonly DataLoadInfo _dataLoadInfo;
33

34
    public IExtractCommand ExtractCommand { get; set; }
580✔
35
    public ExecuteDatasetExtractionSource Source { get; private set; }
848✔
36

37
    public GracefulCancellationToken Token { get; set; }
203✔
38

39
    /// <summary>
40
    /// If Destination is an IExecuteDatasetExtractionDestination then it will be initialized properly with the configuration, cohort etc otherwise the destination will have to react properly
41
    /// / dynamically based on what comes down the pipeline just like it would normally e.g. SqlBulkInsertDestination would be a logically permissible destination for an ExtractionPipeline
42
    /// </summary>
43
    public IExecuteDatasetExtractionDestination Destination { get; private set; }
398✔
44

45
    public ExtractionPipelineUseCase(IBasicActivateItems activator, IProject project, IExtractCommand extractCommand,
66✔
46
        IPipeline pipeline, DataLoadInfo dataLoadInfo)
66✔
47
    {
48
        _dataLoadInfo = dataLoadInfo;
66✔
49
        ExtractCommand = extractCommand;
66✔
50
        _pipeline = pipeline;
66✔
51

52
        extractCommand.ElevateState(ExtractCommandState.NotLaunched);
66✔
53

54
        AddInitializationObject(ExtractCommand);
66✔
55
        AddInitializationObject(project);
66✔
56
        AddInitializationObject(_dataLoadInfo);
66✔
57
        AddInitializationObject(project.DataExportRepository.CatalogueRepository);
66✔
58
        AddInitializationObject(activator);
66✔
59

60
        GenerateContext();
66✔
61
    }
66✔
62

63

64
    protected override IDataFlowPipelineContext GenerateContextImpl()
65
    {
66
        //create the context using the standard context factory
67
        var contextFactory = new DataFlowPipelineContextFactory<DataTable>();
724✔
68
        var context = contextFactory.Create(PipelineUsage.LogsToTableLoadInfo);
724✔
69

70
        //adjust context: we want a destination requirement of IExecuteDatasetExtractionDestination
71
        context.MustHaveDestination =
724✔
72
            typeof(IExecuteDatasetExtractionDestination); //we want this freaky destination type
724✔
73
        context.MustHaveSource = typeof(ExecuteDatasetExtractionSource);
724✔
74

75
        return context;
724✔
76
    }
77

78
    public void Execute(IDataLoadEventListener listener)
79
    {
80
        if (ExtractCommand is ExtractDatasetCommand eds)
64✔
81
        {
82
            bool runAgain;
83
            var totalFailureCount = 0;
47✔
84
            var consecutiveFailureCount = 0;
47✔
85

86
            do
87
            {
88
                Token?.ThrowIfStopRequested();
47✔
89
                Token?.ThrowIfAbortRequested();
47✔
90

91
                bool runSuccessful;
92
                try
93
                {
94
                    runSuccessful = ExecuteOnce(listener);
47✔
95
                }
46✔
96
                catch (Exception)
1✔
97
                {
98
                    runSuccessful = false;
1✔
99
                }
1✔
100

101
                if (runSuccessful)
47✔
102
                {
103
                    runAgain = IncrementProgressIfAny(eds, listener);
46✔
104
                    consecutiveFailureCount = 0;
46✔
105

106
                    if (runAgain)
46!
107
                        listener.OnNotify(this,
×
108
                            new NotifyEventArgs(ProgressEventType.Information,
×
109
                                "Running pipeline again for next batch in ExtractionProgress"));
×
110
                }
111
                else
112
                {
113
                    totalFailureCount++;
1✔
114
                    consecutiveFailureCount++;
1✔
115

116
                    runAgain = ShouldRetry(eds, listener, totalFailureCount, consecutiveFailureCount);
1✔
117

118
                    if (runAgain)
1!
119
                        listener.OnNotify(this,
×
120
                            new NotifyEventArgs(ProgressEventType.Information, "Retrying pipeline"));
×
121
                }
122
            } while (runAgain);
47✔
123
        }
124
        else
125
        {
126
            ExecuteOnce(listener);
17✔
127
        }
128
    }
17✔
129

130
    /// <summary>
131
    /// Inspects the <paramref name="extractDatasetCommand"/> to see if it is a batch load that has
132
    /// only done part of its full execution.  If so then progress will be recorded and true will be returned
133
    /// (i.e. run again).
134
    /// </summary>
135
    /// <returns></returns>
136
    /// <exception cref="Exception"></exception>
137
    private bool IncrementProgressIfAny(ExtractDatasetCommand extractDatasetCommand, IDataLoadEventListener listener)
138
    {
139
        var progress = extractDatasetCommand.SelectedDataSets.ExtractionProgressIfAny;
46✔
140

141
        if (progress == null)
46✔
142
            return false;
45✔
143

144
        // if load ended successfully and it is a batch load
145
        if (extractDatasetCommand.BatchEnd != null)
1!
146
        {
147
            // update our progress
148
            progress.ProgressDate = extractDatasetCommand.BatchEnd.Value;
1✔
149
            progress.SaveToDatabase();
1✔
150
            listener.OnNotify(this,
1✔
151
                new NotifyEventArgs(ProgressEventType.Information,
1✔
152
                    $"Saving batch extraction progress as {progress.ProgressDate}"));
1✔
153

154
            if (progress.MoreToFetch())
1!
155
            {
156
                // clear the query builder so it can be rebuilt for the new dates
157
                extractDatasetCommand.Reset();
×
158
                return true;
×
159
            }
160

161
            return false;
1✔
162
        }
163

164
        return false;
×
165
    }
166

167
    /// <summary>
168
    /// Returns whether to retry the extraction.  This method may perform a wait operation
169
    /// before returning true.
170
    /// </summary>
171
    /// <param name="extractDatasetCommand"></param>
172
    /// <param name="listener"></param>
173
    /// <param name="totalFailureCount"></param>
174
    /// <param name="consecutiveFailureCount"></param>
175
    /// <returns></returns>
176
    private bool ShouldRetry(ExtractDatasetCommand extractDatasetCommand, IDataLoadEventListener listener,
177
        int totalFailureCount, int consecutiveFailureCount)
178
    {
179
        var progress = extractDatasetCommand.SelectedDataSets.ExtractionProgressIfAny;
1✔
180

181
        return progress?.ApplyRetryWaitStrategy(Token, listener, totalFailureCount, consecutiveFailureCount) == true;
1!
182
    }
183

184

185
    /// <summary>
186
    /// Runs the extraction once and returns true if it was success otherwise false
187
    /// </summary>
188
    /// <param name="listener"></param>
189
    /// <returns></returns>
190
    private bool ExecuteOnce(IDataLoadEventListener listener)
191
    {
192
        try
193
        {
194
            ExtractCommand.ElevateState(ExtractCommandState.WaitingToExecute);
64✔
195

196
            listener.OnNotify(this,
64✔
197
                new NotifyEventArgs(ProgressEventType.Information,
64✔
198
                    $"Running Extraction {ExtractCommand} with Pipeline {_pipeline.Name} (ID={_pipeline.ID})"));
64✔
199

200
            var engine = GetEngine(_pipeline, listener);
64✔
201

202
            try
203
            {
204
                engine.ExecutePipeline(Token ?? new GracefulCancellationToken());
64✔
205
                listener.OnNotify(Destination, new NotifyEventArgs(ProgressEventType.Information,
63✔
206
                    $"Extraction completed successfully into : {Destination.GetDestinationDescription()}"));
63✔
207
            }
63✔
208
            catch (Exception e)
1✔
209
            {
210
                ExtractCommand.ElevateState(ExtractCommandState.Crashed);
1✔
211
                _dataLoadInfo.LogFatalError("Execute extraction pipeline",
1✔
212
                    ExceptionHelper.ExceptionToListOfInnerMessages(e, true));
1✔
213

214
                if (ExtractCommand is ExtractDatasetCommand command)
1!
215
                {
216
                    //audit to extraction results
217
                    var result = command.CumulativeExtractionResults;
1✔
218
                    result.Exception = ExceptionHelper.ExceptionToListOfInnerMessages(e, true);
1✔
219
                    result.SaveToDatabase();
1✔
220
                }
221
                else
222
                {
223
                    //audit to extraction results
224
                    var result = (ExtractCommand as ExtractGlobalsCommand).ExtractionResults;
×
225
                    foreach (var extractionResults in result)
×
226
                    {
227
                        extractionResults.Exception = ExceptionHelper.ExceptionToListOfInnerMessages(e, true);
×
228
                        extractionResults.SaveToDatabase();
×
229
                    }
230
                }
231

232
                //throw so it can be audited to UI (triple audit yay!)
233
                throw new Exception("An error occurred while executing pipeline", e);
1✔
234
            }
235

236
            if (Source == null)
63!
237
                throw new Exception("Execute Pipeline completed without Exception but Source was null somehow?!");
×
238

239
            if (Source.WasCancelled)
63!
240
            {
241
                Destination.TableLoadInfo.DataLoadInfoParent.LogFatalError(GetType().Name, "User Cancelled Extraction");
×
242
                ExtractCommand.ElevateState(ExtractCommandState.UserAborted);
×
243

244
                if (ExtractCommand is ExtractDatasetCommand command)
×
245
                {
246
                    //audit to extraction results
247
                    var result = command.CumulativeExtractionResults;
×
248
                    result.Exception = "User Cancelled Extraction";
×
249
                    result.SaveToDatabase();
×
250
                }
251
                else
252
                {
253
                    //audit to extraction results
254
                    var result = (ExtractCommand as ExtractGlobalsCommand).ExtractionResults;
×
255
                    foreach (var extractionResults in result)
×
256
                    {
257
                        extractionResults.Exception = "User Cancelled Extraction";
×
258
                        extractionResults.SaveToDatabase();
×
259
                    }
260
                }
261
            }
262
        }
63✔
263
        catch (Exception ex)
1✔
264
        {
265
            listener.OnNotify(this,
1✔
266
                new NotifyEventArgs(ProgressEventType.Error, "Execute pipeline failed with Exception", ex));
1✔
267
            ExtractCommand.ElevateState(ExtractCommandState.Crashed);
×
268
        }
×
269

270
        //if it didn't crash / get aborted etc
271
        if (ExtractCommand.State < ExtractCommandState.WritingMetadata)
63!
272
        {
273
            if (ExtractCommand is ExtractDatasetCommand)
63✔
274
                WriteMetadata(listener);
46✔
275
            else
276
                ExtractCommand.ElevateState(ExtractCommandState.Completed);
17✔
277
        }
278
        else
279
        {
280
            return false; // it crashed or was aborted etc
×
281
        }
282

283
        return true;
63✔
284
    }
285

286
    public override IDataFlowPipelineEngine GetEngine(IPipeline pipeline, IDataLoadEventListener listener)
287
    {
288
        var engine = base.GetEngine(pipeline, listener);
66✔
289

290
        Destination =
66✔
291
            (IExecuteDatasetExtractionDestination)engine
66✔
292
                .DestinationObject; //record the destination that was created as part of the Pipeline configured
66✔
293
        Source = (ExecuteDatasetExtractionSource)engine.SourceObject;
66✔
294

295
        return engine;
66✔
296
    }
297

298
    private void WriteMetadata(IDataLoadEventListener listener)
299
    {
300
        ExtractCommand.ElevateState(ExtractCommandState.WritingMetadata);
46✔
301
        WordDataWriter wordDataWriter;
302

303

304
        try
305
        {
306
            wordDataWriter = new WordDataWriter(this);
46✔
307
            wordDataWriter.GenerateWordFile(); //run the report
46✔
308
        }
46✔
309
        catch (Exception e)
×
310
        {
311
            //something about the pipeline resulted i a known unsupported state (e.g. extracting to a database) so we can't use WordDataWritter with this
312
            // tell user that we could not run the report and set the status to warning
313
            ExtractCommand.ElevateState(ExtractCommandState.Warning);
×
314

315
            listener.OnNotify(this,
×
316
                new NotifyEventArgs(ProgressEventType.Error, "Word metadata document NOT CREATED", e));
×
317
            return;
×
318
        }
319
        try
320
        {
321
            var datasetVariableReportWriter = new DatasetVariableReportGenerator(this);
46✔
322
            datasetVariableReportWriter.GenerateDatasetVariableReport();
46✔
323
        }
46✔
NEW
324
        catch (Exception e)
×
325
        {
NEW
326
            ExtractCommand.ElevateState(ExtractCommandState.Warning);
×
327

NEW
328
            listener.OnNotify(this,
×
NEW
329
                new NotifyEventArgs(ProgressEventType.Error, "Dataset variable document NOT CREATED", e));
×
NEW
330
            return;
×
331
        }
332

333
        //if there were any exceptions
334
        if (wordDataWriter.ExceptionsGeneratingWordFile.Any())
46!
335
        {
336
            ExtractCommand.ElevateState(ExtractCommandState.Warning);
×
337

338
            foreach (var e in wordDataWriter.ExceptionsGeneratingWordFile)
×
339
                listener.OnNotify(wordDataWriter,
×
340
                    new NotifyEventArgs(ProgressEventType.Warning, "Word metadata document creation caused exception",
×
341
                        e));
×
342
        }
343
        else
344
        {
345
            ExtractCommand.ElevateState(ExtractCommandState.Completed);
46✔
346
        }
347
    }
46✔
348

349
    private ExtractionPipelineUseCase()
350
        : base(new Type[]
658✔
351
        {
658✔
352
            typeof(IExtractCommand),
658✔
353
            typeof(IProject),
658✔
354
            typeof(DataLoadInfo),
658✔
355
            typeof(ICatalogueRepository),
658✔
356
            typeof(IBasicActivateItems)
658✔
357
        })
658✔
358
    {
359
        GenerateContext();
658✔
360
    }
658✔
361

362
    public static ExtractionPipelineUseCase DesignTime() => new();
658✔
363
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc