• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 9988359965

18 Jul 2024 08:42AM UTC coverage: 57.299% (+0.6%) from 56.679%
9988359965

push

github

JFriel
Merge branch 'develop' of https://github.com/HicServices/RDMP

11072 of 20790 branches covered (53.26%)

Branch coverage included in aggregate %.

31313 of 53181 relevant lines covered (58.88%)

7885.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.95
/Rdmp.Core/DataExport/DataExtraction/Pipeline/ExtractionPipelineUseCase.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Data;
9
using System.Linq;
10
using Rdmp.Core.CommandExecution;
11
using Rdmp.Core.Curation.Data.Pipelines;
12
using Rdmp.Core.DataExport.Data;
13
using Rdmp.Core.DataExport.DataExtraction.Commands;
14
using Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations;
15
using Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
16
using Rdmp.Core.DataFlowPipeline;
17
using Rdmp.Core.DataFlowPipeline.Requirements;
18
using Rdmp.Core.Logging;
19
using Rdmp.Core.Reports.ExtractionTime;
20
using Rdmp.Core.Repositories;
21
using Rdmp.Core.ReusableLibraryCode;
22
using Rdmp.Core.ReusableLibraryCode.Progress;
23

24
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline;
25

26
/// <summary>
27
/// Use case for linking and extracting Project Extraction Configuration datasets and custom data (See IExtractCommand).
28
/// </summary>
29
public sealed class ExtractionPipelineUseCase : PipelineUseCase
30
{
31
    private readonly IPipeline _pipeline;
32
    private readonly DataLoadInfo _dataLoadInfo;
33

34
    public IExtractCommand ExtractCommand { get; set; }
780✔
35
    public ExecuteDatasetExtractionSource Source { get; private set; }
976✔
36

37
    public GracefulCancellationToken Token { get; set; }
280✔
38

39
    /// <summary>
40
    /// If Destination is an IExecuteDatasetExtractionDestination then it will be initialized properly with the configuration, cohort etc otherwise the destination will have to react properly
41
    /// / dynamically based on what comes down the pipeline just like it would normally e.g. SqlBulkInsertDestination would be a logically permissable destination for an ExtractionPipeline
42
    /// </summary>
43
    public IExecuteDatasetExtractionDestination Destination { get; private set; }
474✔
44

45
    public ExtractionPipelineUseCase(IBasicActivateItems activator, IProject project, IExtractCommand extractCommand,
90✔
46
        IPipeline pipeline, DataLoadInfo dataLoadInfo)
90✔
47
    {
48
        _dataLoadInfo = dataLoadInfo;
90✔
49
        ExtractCommand = extractCommand;
90✔
50
        _pipeline = pipeline;
90✔
51

52
        extractCommand.ElevateState(ExtractCommandState.NotLaunched);
90✔
53

54
        AddInitializationObject(ExtractCommand);
90✔
55
        AddInitializationObject(project);
90✔
56
        AddInitializationObject(_dataLoadInfo);
90✔
57
        AddInitializationObject(project.DataExportRepository.CatalogueRepository);
90✔
58
        AddInitializationObject(activator);
90✔
59

60
        GenerateContext();
90✔
61
    }
90✔
62

63

64
    protected override IDataFlowPipelineContext GenerateContextImpl()
65
    {
66
        //create the context using the standard context factory
67
        var contextFactory = new DataFlowPipelineContextFactory<DataTable>();
958✔
68
        var context = contextFactory.Create(PipelineUsage.LogsToTableLoadInfo);
958✔
69

70
        //adjust context: we want a destination requirement of IExecuteDatasetExtractionDestination
71
        context.MustHaveDestination =
958✔
72
            typeof(IExecuteDatasetExtractionDestination); //we want this freaky destination type
958✔
73
        context.MustHaveSource = typeof(ExecuteDatasetExtractionSource);
958✔
74

75
        return context;
958✔
76
    }
77

78
    public void Execute(IDataLoadEventListener listener)
79
    {
80
        if (ExtractCommand is ExtractDatasetCommand eds)
86✔
81
        {
82
            bool runAgain;
83
            var totalFailureCount = 0;
64✔
84
            var consecutiveFailureCount = 0;
64✔
85

86
            do
87
            {
88
                Token?.ThrowIfStopRequested();
64✔
89
                Token?.ThrowIfAbortRequested();
64✔
90

91
                bool runSuccessful;
92
                try
93
                {
94
                    runSuccessful = ExecuteOnce(listener);
64✔
95
                }
64✔
96
                catch (Exception)
×
97
                {
98
                    runSuccessful = false;
×
99
                }
×
100

101
                if (runSuccessful)
64!
102
                {
103
                    runAgain = IncrementProgressIfAny(eds, listener);
64✔
104
                    consecutiveFailureCount = 0;
64✔
105

106
                    if (runAgain)
64!
107
                        listener.OnNotify(this,
×
108
                            new NotifyEventArgs(ProgressEventType.Information,
×
109
                                "Running pipeline again for next batch in ExtractionProgress"));
×
110
                }
111
                else
112
                {
113
                    totalFailureCount++;
×
114
                    consecutiveFailureCount++;
×
115

116
                    runAgain = ShouldRetry(eds, listener, totalFailureCount, consecutiveFailureCount);
×
117

118
                    if (runAgain)
×
119
                        listener.OnNotify(this,
×
120
                            new NotifyEventArgs(ProgressEventType.Information, "Retrying pipeline"));
×
121
                }
122
            } while (runAgain);
64✔
123
        }
124
        else
125
        {
126
            ExecuteOnce(listener);
22✔
127
        }
128
    }
22✔
129

130
    /// <summary>
131
    /// Inspects the <paramref name="extractDatasetCommand"/> to see if it is a batch load that has
132
    /// only done part of its full execution.  If so then progress will be recorded and true will be returned
133
    /// (i.e. run again).
134
    /// </summary>
135
    /// <returns></returns>
136
    /// <exception cref="Exception"></exception>
137
    private bool IncrementProgressIfAny(ExtractDatasetCommand extractDatasetCommand, IDataLoadEventListener listener)
138
    {
139
        var progress = extractDatasetCommand.SelectedDataSets.ExtractionProgressIfAny;
64✔
140

141
        if (progress == null)
64✔
142
            return false;
62✔
143

144
        // if load ended successfully and it is a batch load
145
        if (extractDatasetCommand.BatchEnd != null)
2!
146
        {
147
            // update our progress
148
            progress.ProgressDate = extractDatasetCommand.BatchEnd.Value;
2✔
149
            progress.SaveToDatabase();
2✔
150
            listener.OnNotify(this,
2✔
151
                new NotifyEventArgs(ProgressEventType.Information,
2✔
152
                    $"Saving batch extraction progress as {progress.ProgressDate}"));
2✔
153

154
            if (progress.MoreToFetch())
2!
155
            {
156
                // clear the query builder so it can be rebuilt for the new dates
157
                extractDatasetCommand.Reset();
×
158
                return true;
×
159
            }
160

161
            return false;
2✔
162
        }
163

164
        return false;
×
165
    }
166

167
    /// <summary>
168
    /// Returns whether to retry the extraction.  This method may perform a wait operation
169
    /// before returning true.
170
    /// </summary>
171
    /// <param name="extractDatasetCommand"></param>
172
    /// <param name="listener"></param>
173
    /// <param name="totalFailureCount"></param>
174
    /// <param name="consecutiveFailureCount"></param>
175
    /// <returns></returns>
176
    private bool ShouldRetry(ExtractDatasetCommand extractDatasetCommand, IDataLoadEventListener listener,
177
        int totalFailureCount, int consecutiveFailureCount)
178
    {
179
        var progress = extractDatasetCommand.SelectedDataSets.ExtractionProgressIfAny;
×
180

181
        return progress?.ApplyRetryWaitStrategy(Token, listener, totalFailureCount, consecutiveFailureCount) == true;
×
182
    }
183

184

185
    /// <summary>
186
    /// Runs the extraction once and returns true if it was success otherwise false
187
    /// </summary>
188
    /// <param name="listener"></param>
189
    /// <returns></returns>
190
    private bool ExecuteOnce(IDataLoadEventListener listener)
191
    {
192
        try
193
        {
194
            ExtractCommand.ElevateState(ExtractCommandState.WaitingToExecute);
86✔
195

196
            listener.OnNotify(this,
86✔
197
                new NotifyEventArgs(ProgressEventType.Information,
86✔
198
                    $"Running Extraction {ExtractCommand} with Pipeline {_pipeline.Name} (ID={_pipeline.ID})"));
86✔
199

200
            var engine = GetEngine(_pipeline, listener);
86✔
201

202
            try
203
            {
204
                engine.ExecutePipeline(Token ?? new GracefulCancellationToken());
86✔
205
                listener.OnNotify(Destination, new NotifyEventArgs(ProgressEventType.Information,
86✔
206
                    $"Extraction completed successfully into : {Destination.GetDestinationDescription()}"));
86✔
207
            }
86✔
208
            catch (Exception e)
×
209
            {
210
                ExtractCommand.ElevateState(ExtractCommandState.Crashed);
×
211
                _dataLoadInfo.LogFatalError("Execute extraction pipeline",
×
212
                    ExceptionHelper.ExceptionToListOfInnerMessages(e, true));
×
213

214
                if (ExtractCommand is ExtractDatasetCommand command)
×
215
                {
216
                    //audit to extraction results
217
                    var result = command.CumulativeExtractionResults;
×
218
                    result.Exception = ExceptionHelper.ExceptionToListOfInnerMessages(e, true);
×
219
                    result.SaveToDatabase();
×
220
                }
221
                else
222
                {
223
                    //audit to extraction results
224
                    var result = (ExtractCommand as ExtractGlobalsCommand).ExtractionResults;
×
225
                    foreach (var extractionResults in result)
×
226
                    {
227
                        extractionResults.Exception = ExceptionHelper.ExceptionToListOfInnerMessages(e, true);
×
228
                        extractionResults.SaveToDatabase();
×
229
                    }
230
                }
231

232
                //throw so it can be audited to UI (triple audit yay!)
233
                throw new Exception("An error occurred while executing pipeline", e);
×
234
            }
235

236
            if (Source == null)
86!
237
                throw new Exception("Execute Pipeline completed without Exception but Source was null somehow?!");
×
238

239
            if (Source.WasCancelled)
86!
240
            {
241
                Destination.TableLoadInfo.DataLoadInfoParent.LogFatalError(GetType().Name, "User Cancelled Extraction");
×
242
                ExtractCommand.ElevateState(ExtractCommandState.UserAborted);
×
243

244
                if (ExtractCommand is ExtractDatasetCommand command)
×
245
                {
246
                    //audit to extraction results
247
                    var result = command.CumulativeExtractionResults;
×
248
                    result.Exception = "User Cancelled Extraction";
×
249
                    result.SaveToDatabase();
×
250
                }
251
                else
252
                {
253
                    //audit to extraction results
254
                    var result = (ExtractCommand as ExtractGlobalsCommand).ExtractionResults;
×
255
                    foreach (var extractionResults in result)
×
256
                    {
257
                        extractionResults.Exception = "User Cancelled Extraction";
×
258
                        extractionResults.SaveToDatabase();
×
259
                    }
260
                }
261
            }
262
        }
86✔
263
        catch (Exception ex)
×
264
        {
265
            listener.OnNotify(this,
×
266
                new NotifyEventArgs(ProgressEventType.Error, "Execute pipeline failed with Exception", ex));
×
267
            ExtractCommand.ElevateState(ExtractCommandState.Crashed);
×
268
        }
×
269

270
        //if it didn't crash / get aborted etc
271
        if (ExtractCommand.State < ExtractCommandState.WritingMetadata)
86!
272
        {
273
            if (ExtractCommand is ExtractDatasetCommand)
86✔
274
                WriteMetadata(listener);
64✔
275
            else
276
                ExtractCommand.ElevateState(ExtractCommandState.Completed);
22✔
277
        }
278
        else
279
        {
280
            return false; // it crashed or was aborted etc
×
281
        }
282

283
        return true;
86✔
284
    }
285

286
    public override IDataFlowPipelineEngine GetEngine(IPipeline pipeline, IDataLoadEventListener listener)
287
    {
288
        var engine = base.GetEngine(pipeline, listener);
90✔
289

290
        Destination =
90✔
291
            (IExecuteDatasetExtractionDestination)engine
90✔
292
                .DestinationObject; //record the destination that was created as part of the Pipeline configured
90✔
293
        Source = (ExecuteDatasetExtractionSource)engine.SourceObject;
90✔
294

295
        return engine;
90✔
296
    }
297

298
    private void WriteMetadata(IDataLoadEventListener listener)
299
    {
300
        ExtractCommand.ElevateState(ExtractCommandState.WritingMetadata);
64✔
301
        WordDataWriter wordDataWriter;
302

303

304
        try
305
        {
306
            wordDataWriter = new WordDataWriter(this);
64✔
307
            wordDataWriter.GenerateWordFile(); //run the report
64✔
308
        }
64✔
309
        catch (Exception e)
×
310
        {
311
            //something about the pipeline resulted i a known unsupported state (e.g. extracting to a database) so we can't use WordDataWritter with this
312
            // tell user that we could not run the report and set the status to warning
313
            ExtractCommand.ElevateState(ExtractCommandState.Warning);
×
314

315
            listener.OnNotify(this,
×
316
                new NotifyEventArgs(ProgressEventType.Error, "Word metadata document NOT CREATED", e));
×
317
            return;
×
318
        }
319

320
        //if there were any exceptions
321
        if (wordDataWriter.ExceptionsGeneratingWordFile.Any())
64!
322
        {
323
            ExtractCommand.ElevateState(ExtractCommandState.Warning);
×
324

325
            foreach (var e in wordDataWriter.ExceptionsGeneratingWordFile)
×
326
                listener.OnNotify(wordDataWriter,
×
327
                    new NotifyEventArgs(ProgressEventType.Warning, "Word metadata document creation caused exception",
×
328
                        e));
×
329
        }
330
        else
331
        {
332
            ExtractCommand.ElevateState(ExtractCommandState.Completed);
64✔
333
        }
334
    }
64✔
335

336
    private ExtractionPipelineUseCase()
337
        : base(new Type[]
868✔
338
        {
868✔
339
            typeof(IExtractCommand),
868✔
340
            typeof(IProject),
868✔
341
            typeof(DataLoadInfo),
868✔
342
            typeof(ICatalogueRepository),
868✔
343
            typeof(IBasicActivateItems)
868✔
344
        })
868✔
345
    {
346
        GenerateContext();
868✔
347
    }
868✔
348

349
    public static ExtractionPipelineUseCase DesignTime() => new();
868✔
350
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc