• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 9988359965

18 Jul 2024 08:42AM UTC coverage: 57.299% (+0.6%) from 56.679%
9988359965

push

github

JFriel
Merge branch 'develop' of https://github.com/HicServices/RDMP

11072 of 20790 branches covered (53.26%)

Branch coverage included in aggregate %.

31313 of 53181 relevant lines covered (58.88%)

7885.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.23
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Diagnostics;
11
using System.Linq;
12
using System.Threading.Tasks;
13
using FAnsi;
14
using FAnsi.Discovery.QuerySyntax;
15
using Rdmp.Core.Curation.Data;
16
using Rdmp.Core.DataExport.Data;
17
using Rdmp.Core.DataExport.DataExtraction.Commands;
18
using Rdmp.Core.DataFlowPipeline;
19
using Rdmp.Core.DataFlowPipeline.Requirements;
20
using Rdmp.Core.DataLoad.Engine.Pipeline.Components;
21
using Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
22
using Rdmp.Core.QueryBuilding;
23
using Rdmp.Core.ReusableLibraryCode;
24
using Rdmp.Core.ReusableLibraryCode.Checks;
25
using Rdmp.Core.ReusableLibraryCode.DataAccess;
26
using Rdmp.Core.ReusableLibraryCode.Progress;
27
using IContainer = Rdmp.Core.Curation.Data.IContainer;
28

29
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
30

31
/// <summary>
32
/// Executes a single Dataset extraction by linking a cohort with a dataset (either core or custom data - See IExtractCommand).  Also calculates the number
33
/// of unique identifiers seen, records row validation failures etc.
34
/// </summary>
35
public class ExecuteDatasetExtractionSource : IPluginDataFlowSource<DataTable>, IPipelineRequirement<IExtractCommand>
36
{
37
    //Request is either for one of these
38
    public ExtractDatasetCommand Request { get; protected set; }
3,540✔
39
    public ExtractGlobalsCommand GlobalsRequest { get; protected set; }
334✔
40

41
    public const string AuditTaskName = "DataExtraction";
42

43
    private readonly List<string> _extractionIdentifiersidx = new();
106✔
44

45
    private bool _cancel;
46

47
    private ICatalogue _catalogue;
48

49
    protected const string ValidationColumnName = "RowValidationResult";
50

51
    public ExtractionTimeValidator ExtractionTimeValidator { get; protected set; }
142✔
52
    public Exception ValidationFailureException { get; protected set; }
×
53

54
    public HashSet<object> UniqueReleaseIdentifiersEncountered { get; set; }
3,702✔
55

56
    public ExtractionTimeTimeCoverageAggregator ExtractionTimeTimeCoverageAggregator { get; set; }
294✔
57

58
    [DemandsInitialization(
59
        "Determines the systems behaviour when an extraction query returns 0 rows.  Default (false) is that an error is reported.  If set to true (ticked) then instead a DataTable with 0 rows but all the correct headers will be generated usually resulting in a headers only 0 line/empty extract file")]
60
    public bool AllowEmptyExtractions { get; set; }
176✔
61

62
    [DemandsInitialization(
63
        "Batch size, number of records to read from source before releasing it into the extraction pipeline",
64
        DefaultValue = 10000, Mandatory = true)]
65
    public int BatchSize { get; set; }
170✔
66

67
    [DemandsInitialization(
68
        "In seconds. Overrides the global timeout for SQL query execution. Use 0 for infinite timeout.",
69
        DefaultValue = 50000, Mandatory = true)]
70
    public int ExecutionTimeout { get; set; }
170✔
71

72
    [DemandsInitialization(@"Determines how the system achieves DISTINCT on extraction.  These include:
73
None - Do not DISTINCT the records, can result in duplication in your extract (not recommended)
74
SqlDistinct - Adds the DISTINCT keyword to the SELECT sql sent to the server
75
OrderByAndDistinctInMemory - Adds an ORDER BY statement to the query and applies the DISTINCT in memory as records are read from the server (this can help when extracting very large data sets where DISTINCT keyword blocks record streaming until all records are ready to go)"
76
        , DefaultValue = DistinctStrategy.SqlDistinct)]
77
    public DistinctStrategy DistinctStrategy { get; set; }
316✔
78

79

80
    [DemandsInitialization("When DBMS is SqlServer then HASH JOIN should be used instead of regular JOINs")]
81
    public bool UseHashJoins { get; set; }
170✔
82

83
    [DemandsInitialization(
84
        "When DBMS is SqlServer and the extraction is for any of these datasets then HASH JOIN should be used instead of regular JOINs")]
85
    public Catalogue[] UseHashJoinsForCatalogues { get; set; }
168✔
86

87
    [DemandsInitialization(
88
        "Exclusion list.  A collection of Catalogues which will never be considered for HASH JOIN even when UseHashJoins is enabled.  Being on this list takes precedence for a Catalogue even if it is on UseHashJoinsForCatalogues.")]
89
    public Catalogue[] DoNotUseHashJoinsForCatalogues { get; set; }
170✔
90

91

92
    /// <summary>
93
    /// This is a dictionary containing all the CatalogueItems used in the query, the underlying datatype in the origin database and the
94
    /// actual datatype that was output after the transform operation e.g. a varchar(10) could be converted into a bona fide DateTime which
95
    /// would be an sql Date.  Finally
96
    /// a recommended SqlDbType is passed back.
97
    /// </summary>
98
    public Dictionary<ExtractableColumn, ExtractTimeTransformationObserved> ExtractTimeTransformationsObserved;
99

100
    private DbDataCommandDataFlowSource _hostedSource;
101

102
    protected virtual void Initialize(ExtractDatasetCommand request)
103
    {
104
        Request = request;
152✔
105

106
        if (request == ExtractDatasetCommand.EmptyCommand)
152!
107
            return;
×
108

109
        _timeSpentValidating = new Stopwatch();
152✔
110
        _timeSpentCalculatingDISTINCT = new Stopwatch();
152✔
111
        _timeSpentBuckettingDates = new Stopwatch();
152✔
112

113
        Request.ColumnsToExtract.Sort(); //ensure they are in the right order so we can record the release identifiers
152✔
114

115
        //if we have a cached builder already
116
        if (request.QueryBuilder == null)
152✔
117
            request.GenerateQueryBuilder();
80✔
118

119
        foreach (var substitution in Request.ReleaseIdentifierSubstitutions)
608✔
120
            _extractionIdentifiersidx.Add(substitution.GetRuntimeName());
152✔
121

122
        UniqueReleaseIdentifiersEncountered = new HashSet<object>();
152✔
123

124
        _catalogue = request.Catalogue;
152✔
125

126
        if (!string.IsNullOrWhiteSpace(_catalogue.ValidatorXML))
152!
127
            ExtractionTimeValidator = new ExtractionTimeValidator(_catalogue, request.ColumnsToExtract);
×
128

129
        //if there is a time periodicity ExtractionInformation (AND! it is among the columns the user selected to be extracted)
130
        if (_catalogue.TimeCoverage_ExtractionInformation_ID != null && request.ColumnsToExtract
152!
131
                .Cast<ExtractableColumn>().Any(c =>
152✔
132
                    c.CatalogueExtractionInformation_ID == _catalogue.TimeCoverage_ExtractionInformation_ID))
152✔
133
            ExtractionTimeTimeCoverageAggregator =
×
134
                new ExtractionTimeTimeCoverageAggregator(_catalogue, request.ExtractableCohort);
×
135
        else
136
            ExtractionTimeTimeCoverageAggregator = null;
152✔
137
    }
152✔
138

139
    private void Initialize(ExtractGlobalsCommand request)
140
    {
141
        GlobalsRequest = request;
44✔
142
    }
44✔
143

144
    public bool WasCancelled => _cancel;
150✔
145

146
    private Stopwatch _timeSpentValidating;
147
    private int _rowsValidated;
148

149
    private Stopwatch _timeSpentCalculatingDISTINCT;
150
    private Stopwatch _timeSpentBuckettingDates;
151
    private int _rowsBucketted;
152

153
    private bool firstChunk = true;
106✔
154
    private bool firstGlobalChunk = true;
106✔
155
    private int _rowsRead;
156

157
    private RowPeeker _peeker = new();
106✔
158

159
    public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
160
    {
161
        // we are in the Global Commands case, let's return an empty DataTable (not null)
162
        // so we can trigger the destination to extract the globals docs and sql
163
        if (GlobalsRequest != null)
192✔
164
        {
165
            GlobalsRequest.ElevateState(ExtractCommandState.WaitingForSQLServer);
44✔
166
            if (firstGlobalChunk)
44✔
167
            {
168
                //unless we are checking, start auditing
169
                StartAuditGlobals();
22✔
170

171
                firstGlobalChunk = false;
22✔
172
                return new DataTable(ExtractionDirectory.GLOBALS_DATA_NAME);
22✔
173
            }
174

175
            return null;
22✔
176
        }
177

178
        if (Request == null)
148!
179
            throw new Exception("Component has not been initialized before being asked to GetChunk(s)");
×
180

181
        Request.ElevateState(ExtractCommandState.WaitingForSQLServer);
148✔
182

183
        if (_cancel)
148!
184
            throw new Exception("User cancelled data extraction");
×
185

186
        if (_hostedSource == null)
148✔
187
        {
188
            StartAudit(Request.QueryBuilder.SQL);
82✔
189

190
            if (Request.DatasetBundle.DataSet.DisableExtraction)
82✔
191
                throw new Exception(
2✔
192
                    $"Cannot extract {Request.DatasetBundle.DataSet} because DisableExtraction is set to true");
2✔
193

194
            _hostedSource = new DbDataCommandDataFlowSource(GetCommandSQL(listener),
80✔
195
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
80✔
196
                Request.GetDistinctLiveDatabaseServer().Builder,
80✔
197
                ExecutionTimeout)
80✔
198
            {
80✔
199
                // If we are running in batches then always allow empty extractions
80✔
200
                AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
80✔
201
                BatchSize = BatchSize
80✔
202
            };
80✔
203
        }
204

205
        DataTable chunk = null;
146✔
206

207
        try
208
        {
209
            chunk = _hostedSource.GetChunk(listener, cancellationToken);
146✔
210

211

212
            chunk = _peeker.AddPeekedRowsIfAny(chunk);
146✔
213

214
            if (Request != null && Request.DatasetBundle.DataSet is not null && chunk is not null)
146✔
215
                chunk.TableName = $"{Request.DatasetBundle.DataSet}";
78✔
216

217
            //if we are trying to distinct the records in memory based on release id
218
            if (DistinctStrategy == DistinctStrategy.OrderByAndDistinctInMemory)
146!
219
            {
220
                var releaseIdentifierColumn = Request.ReleaseIdentifierSubstitutions.First().GetRuntimeName();
×
221

222
                if (chunk is { Rows.Count: > 0 })
×
223
                {
224
                    //last release id in the current chunk
225
                    var lastReleaseId = chunk.Rows[^1][releaseIdentifierColumn];
×
226

227
                    _peeker.AddWhile(_hostedSource, r => Equals(r[releaseIdentifierColumn], lastReleaseId), chunk);
×
228
                    chunk = MakeDistinct(chunk, listener, cancellationToken);
×
229
                }
230
            }
231
        }
146✔
232
        catch (AggregateException a)
×
233
        {
234
            if (a.GetExceptionIfExists<TaskCanceledException>() != null)
×
235
                _cancel = true;
×
236

237
            throw;
×
238
        }
239
        catch (Exception e)
×
240
        {
241
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Read from source failed", e));
×
242
        }
×
243

244
        if (cancellationToken.IsCancellationRequested)
146!
245
            throw new Exception("Data read cancelled because our cancellationToken was set, aborting data reading");
×
246

247
        //if the first chunk is null
248
        if (firstChunk && chunk == null && !AllowEmptyExtractions)
146✔
249
            throw new Exception(
2!
250
                $"There is no data to load, query returned no rows, query was:{Environment.NewLine}{_hostedSource.Sql ?? Request.QueryBuilder.SQL}");
2✔
251

252
        //not the first chunk anymore
253
        firstChunk = false;
144✔
254

255
        //data exhausted
256
        if (chunk == null)
144✔
257
        {
258
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
66✔
259
                $"Data exhausted after reading {_rowsRead} rows of data ({UniqueReleaseIdentifiersEncountered.Count} unique release identifiers seen)"));
66✔
260
            if (Request != null)
66✔
261
                Request.CumulativeExtractionResults.DistinctReleaseIdentifiersEncountered =
66!
262
                    Request.IsBatchResume ? -1 : UniqueReleaseIdentifiersEncountered.Count;
66✔
263
            return null;
66✔
264
        }
265

266
        _rowsRead += chunk.Rows.Count;
78✔
267
        //chunk will have datatypes for all the things in the buffer so we can populate our dictionary of facts about what columns/catalogue items have spontaneously changed name/type etc
268
        if (ExtractTimeTransformationsObserved == null)
78✔
269
            GenerateExtractionTransformObservations(chunk);
78✔
270

271

272
        //see if the SqlDataReader has a column with the same name as the ReleaseIdentifierSQL (if so then we can use it to count the number of distinct subjects written out to the csv)
273
        var includesReleaseIdentifier = _extractionIdentifiersidx.Count > 0;
78✔
274

275

276
        //first line - let's see what columns we wrote out
277
        //looks at the buffer and computes any transforms performed on the column
278

279

280
        _timeSpentValidating.Start();
78✔
281
        //build up the validation report (Missing/Wrong/Etc) - this has no mechanical effect on the extracted data just some metadata that goes into a flat file
282
        if (ExtractionTimeValidator != null && Request.IncludeValidation)
78!
283
            try
284
            {
285
                chunk.Columns.Add(ValidationColumnName);
×
286

287
                ExtractionTimeValidator.Validate(chunk, ValidationColumnName);
×
288

289
                _rowsValidated += chunk.Rows.Count;
×
290
                listener.OnProgress(this,
×
291
                    new ProgressEventArgs("Validation", new ProgressMeasurement(_rowsValidated, ProgressType.Records),
×
292
                        _timeSpentValidating.Elapsed));
×
293
            }
×
294
            catch (Exception ex)
×
295
            {
296
                listener.OnNotify(this,
×
297
                    new NotifyEventArgs(ProgressEventType.Error, "Could not validate data chunk", ex));
×
298
                ValidationFailureException = ex;
×
299
                ExtractionTimeValidator = null;
×
300
            }
×
301

302
        _timeSpentValidating.Stop();
78✔
303

304
        _timeSpentBuckettingDates.Start();
78✔
305
        if (ExtractionTimeTimeCoverageAggregator != null)
78!
306
        {
307
            _rowsBucketted += chunk.Rows.Count;
×
308

309
            foreach (DataRow row in chunk.Rows)
×
310
                ExtractionTimeTimeCoverageAggregator.ProcessRow(row);
×
311

312
            listener.OnProgress(this,
×
313
                new ProgressEventArgs("Bucketting Dates", new ProgressMeasurement(_rowsBucketted, ProgressType.Records),
×
314
                    _timeSpentCalculatingDISTINCT.Elapsed));
×
315
        }
316

317
        _timeSpentBuckettingDates.Stop();
78✔
318

319
        _timeSpentCalculatingDISTINCT.Start();
78✔
320
        var pks = new List<DataColumn>();
78✔
321

322
        //record unique release identifiers found
323
        if (includesReleaseIdentifier)
78✔
324
            foreach (var idx in _extractionIdentifiersidx.Distinct().ToList())
312✔
325
            {
326
                var sub = Request.ReleaseIdentifierSubstitutions.Where(s => s.Alias == chunk.Columns[idx].ColumnName).FirstOrDefault();
156✔
327
                if (sub != null && sub.ColumnInfo.ExtractionInformations.FirstOrDefault() != null && sub.ColumnInfo.ExtractionInformations.FirstOrDefault().IsPrimaryKey)
78✔
328
                {
329
                    pks.Add(chunk.Columns[idx]);
22✔
330
                }
331
                foreach (DataRow r in chunk.Rows)
6,708✔
332
                {
333
                    if (r[idx] == DBNull.Value)
3,276!
334
                        if (_extractionIdentifiersidx.Count == 1)
×
335
                            throw new Exception(
×
336
                                $"Null release identifier found in extract of dataset {Request.DatasetBundle.DataSet}");
×
337
                        else
338
                            continue; //there are multiple extraction identifiers thats fine if one or two are null
339

340
                    UniqueReleaseIdentifiersEncountered.Add(r[idx]);
3,276✔
341
                }
342

343
                listener.OnProgress(this,
78✔
344
                    new ProgressEventArgs("Calculating Distinct Release Identifiers",
78✔
345
                        new ProgressMeasurement(UniqueReleaseIdentifiersEncountered.Count, ProgressType.Records),
78✔
346
                        _timeSpentCalculatingDISTINCT.Elapsed));
78✔
347
            }
348

349
        _timeSpentCalculatingDISTINCT.Stop();
78✔
350
        foreach (string name in Request.ColumnsToExtract.Where(c => ((ExtractableColumn)(c)).CatalogueExtractionInformation.IsPrimaryKey).Select(column => ((ExtractableColumn)column).CatalogueExtractionInformation.ToString()))
1,304✔
351
        {
352
            pks.Add(chunk.Columns[name]);
8✔
353
        }
354
        chunk.PrimaryKey = pks.ToArray();
78✔
355

356
        return chunk;
78✔
357
    }
358

359
    /// <summary>
360
    /// Makes the current batch ONLY distinct.  This only works if you have a bounded batch (see OrderByAndDistinctInMemory)
361
    /// </summary>
362
    /// <param name="chunk"></param>
363
    /// <param name="listener"></param>
364
    /// <param name="cancellationToken"></param>
365
    /// <returns></returns>
366
    private static DataTable MakeDistinct(DataTable chunk, IDataLoadEventListener listener,
367
        GracefulCancellationToken cancellationToken)
368
    {
369
        var removeDuplicates = new RemoveDuplicates { NoLogging = true };
×
370
        return removeDuplicates.ProcessPipelineData(chunk, listener, cancellationToken);
×
371
    }
372

373
    private void GenerateExtractionTransformObservations(DataTable chunk)
374
    {
375
        ExtractTimeTransformationsObserved = new Dictionary<ExtractableColumn, ExtractTimeTransformationObserved>();
78✔
376

377
        //create the Types dictionary
378
        foreach (ExtractableColumn column in Request.ColumnsToExtract)
2,404✔
379
        {
380
            ExtractTimeTransformationsObserved.Add(column, new ExtractTimeTransformationObserved());
1,124✔
381

382
            //record catalogue information about what it is supposed to be.
383
            if (!column.HasOriginalExtractionInformationVanished())
1,124✔
384
            {
385
                var extractionInformation = column.CatalogueExtractionInformation;
1,124✔
386

387
                //what the catalogue says it is
388
                ExtractTimeTransformationsObserved[column].DataTypeInCatalogue =
1,124✔
389
                    extractionInformation.ColumnInfo.Data_type;
1,124✔
390
                ExtractTimeTransformationsObserved[column].CatalogueItem = extractionInformation.CatalogueItem;
1,124✔
391

392
                //what it actually is
393
                if (chunk.Columns.Contains(column.GetRuntimeName()))
1,124!
394
                {
395
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = true;
1,124✔
396
                    ExtractTimeTransformationsObserved[column].DataTypeObservedInRuntimeBuffer =
1,124✔
397
                        chunk.Columns[column.GetRuntimeName()].DataType;
1,124✔
398
                }
399
                else
400
                {
401
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = false;
×
402
                }
403
            }
404
        }
405
    }
78✔
406

407
    private string GetCommandSQL(IDataLoadEventListener listener)
408
    {
409
        //if the user wants some custom logic for removing identical duplicates
410
        switch (DistinctStrategy)
411
        {
412
            //user doesn't care about identical duplicates
413
            case DistinctStrategy.None:
414
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
12!
415
                break;
12✔
416

417
            //system default behaviour
418
            case DistinctStrategy.SqlDistinct:
419
                break;
420

421
            //user wants to run order by the release ID and resolve duplicates in batches as they are read
422
            case DistinctStrategy.OrderByAndDistinctInMemory:
423

424
                //remove the DISTINCT keyword from the query
425
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
×
426

427
                //find the release identifier substitution (e.g. chi for PROCHI)
428
                var substitution = Request.ReleaseIdentifierSubstitutions.First();
×
429

430
                //add a line at the end of the query to ORDER BY the ReleaseId column (e.g. PROCHI)
431
                var orderBySql = $"ORDER BY {substitution.SelectSQL}";
×
432

433
                // don't add the line if it is already there (e.g. because of Retry)
434
                if (!Request.QueryBuilder.CustomLines.Any(l => string.Equals(l.Text, orderBySql)))
×
435
                    Request.QueryBuilder.AddCustomLine(orderBySql, QueryComponent.Postfix);
×
436

437
                break;
×
438
            default:
439
                throw new ArgumentOutOfRangeException();
×
440
        }
441

442
        var sql = Request.QueryBuilder.SQL;
80✔
443

444
        sql = HackExtractionSQL(sql, listener);
80✔
445

446
        if (ShouldUseHashedJoins())
80✔
447
        {
448
            //use hash joins!
449
            listener.OnNotify(this,
2✔
450
                new NotifyEventArgs(ProgressEventType.Information, "Substituting JOIN for HASH JOIN"));
2✔
451
            sql = sql.Replace(" JOIN ", " HASH JOIN ");
2✔
452
        }
453

454
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
80✔
455
            $"/*Decided on extraction SQL:*/{Environment.NewLine}{sql}"));
80✔
456

457
        return sql;
80✔
458
    }
459

460
    private bool ShouldUseHashedJoins()
461
    {
462
        var dbms = Request?.QueryBuilder?.QuerySyntaxHelper?.DatabaseType;
80!
463

464
        //must be sql server
465
        if (dbms == null || dbms.Value != DatabaseType.MicrosoftSQLServer)
80!
466
            return false;
×
467

468
        // this Catalogue is explicilty marked as never hash join? i.e. its on the exclusion list
469
        if (DoNotUseHashJoinsForCatalogues?.Contains(Request.Catalogue) ?? false)
80!
470
            return false;
×
471

472
        if (UseHashJoins)
80✔
473
            return true;
2✔
474

475
        if (UseHashJoinsForCatalogues != null)
78!
476
            return UseHashJoinsForCatalogues.Contains(Request.Catalogue);
×
477

478
        //user doesn't want to use hash joins
479
        return false;
78✔
480
    }
481

482
    public virtual string HackExtractionSQL(string sql, IDataLoadEventListener listener) => sql;
72✔
483

484
    private void StartAudit(string sql)
485
    {
486
        var dataExportRepo = Request.DataExportRepository;
82✔
487

488
        var previousAudit = dataExportRepo
82✔
489
            .GetAllCumulativeExtractionResultsFor(Request.Configuration, Request.DatasetBundle.DataSet).ToArray();
82✔
490

491
        if (Request.IsBatchResume)
82!
492
        {
493
            var match =
×
494
                previousAudit.FirstOrDefault(a => a.ExtractableDataSet_ID == Request.DatasetBundle.DataSet.ID) ??
×
495
                throw new Exception(
×
496
                    $"Could not find previous CumulativeExtractionResults for dataset {Request.DatasetBundle.DataSet} despite the Request being marked as a batch resume");
×
497
            Request.CumulativeExtractionResults = match;
×
498
        }
499
        else
500
        {
501
            //delete old audit records
502
            foreach (var audit in previousAudit)
184✔
503
                audit.DeleteInDatabase();
10✔
504

505
            var extractionResults = new CumulativeExtractionResults(dataExportRepo, Request.Configuration,
82✔
506
                Request.DatasetBundle.DataSet, sql);
82✔
507

508
            var filterDescriptions =
82✔
509
                RecursivelyListAllFilterNames(
82✔
510
                    Request.Configuration.GetFilterContainerFor(Request.DatasetBundle.DataSet));
82✔
511

512
            extractionResults.FiltersUsed = filterDescriptions.TrimEnd(',');
82✔
513
            extractionResults.SaveToDatabase();
82✔
514

515
            Request.CumulativeExtractionResults = extractionResults;
82✔
516
        }
517
    }
82✔
518

519
    private void StartAuditGlobals()
520
    {
521
        var repo = GlobalsRequest.RepositoryLocator.DataExportRepository;
22✔
522

523
        var previousAudit = repo
22✔
524
            .GetAllObjectsWhere<SupplementalExtractionResults>("ExtractionConfiguration_ID",
22✔
525
                GlobalsRequest.Configuration.ID)
22✔
526
            .Where(c => c.CumulativeExtractionResults_ID == null);
22✔
527

528
        //delete old audit records
529
        foreach (var audit in previousAudit)
44!
530
            audit.DeleteInDatabase();
×
531
    }
22✔
532

533
    private string RecursivelyListAllFilterNames(IContainer filterContainer)
534
    {
535
        if (filterContainer == null)
82✔
536
            return "";
80✔
537

538
        var toReturn = "";
2✔
539

540
        if (filterContainer.GetSubContainers() != null)
2✔
541
            foreach (var subContainer in filterContainer.GetSubContainers())
4!
542
                toReturn += RecursivelyListAllFilterNames(subContainer);
×
543

544
        if (filterContainer.GetFilters() != null)
2✔
545
            foreach (var f in filterContainer.GetFilters())
8✔
546
                toReturn += $"{f.Name},";
2✔
547

548
        return toReturn;
2✔
549
    }
550

551
    public virtual void Dispose(IDataLoadEventListener job, Exception pipelineFailureExceptionIfAny)
552
    {
553
    }
86✔
554

555
    public void Abort(IDataLoadEventListener listener)
556
    {
557
    }
×
558

559
    public virtual DataTable TryGetPreview()
560
    {
561
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
562
            return new DataTable();
×
563

564
        var toReturn = new DataTable();
×
565
        toReturn.BeginLoadData();
×
566
        var server = _catalogue.GetDistinctLiveDatabaseServer(DataAccessContext.DataExport, false);
×
567

568
        using var con = server.GetConnection();
×
569
        con.Open();
×
570

571
        var da = server.GetDataAdapter(Request.QueryBuilder.SQL, con);
×
572

573
        //get up to 1000 records
574
        da.Fill(0, 1000, toReturn);
×
575
        toReturn.EndLoadData();
×
576

577
        con.Close();
×
578

579
        return toReturn;
×
580
    }
×
581

582
    public void PreInitialize(IExtractCommand value, IDataLoadEventListener listener)
583
    {
584
        if (value is ExtractDatasetCommand datasetCommand)
196✔
585
            Initialize(datasetCommand);
152✔
586
        if (value is ExtractGlobalsCommand command)
196✔
587
            Initialize(command);
44✔
588
    }
196✔
589

590
    public virtual void Check(ICheckNotifier notifier)
591
    {
592
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
593
        {
594
            notifier.OnCheckPerformed(new CheckEventArgs(
×
595
                "Request is ExtractDatasetCommand.EmptyCommand, checking will not be carried out",
×
596
                CheckResult.Warning));
×
597
            return;
×
598
        }
599

600
        if (GlobalsRequest != null)
×
601
        {
602
            notifier.OnCheckPerformed(new CheckEventArgs(
×
603
                "Request is for Globals, checking will not be carried out at source", CheckResult.Success));
×
604
            return;
×
605
        }
606

607
        if (Request == null)
×
608
        {
609
            notifier.OnCheckPerformed(new CheckEventArgs("ExtractionRequest has not been set", CheckResult.Fail));
×
610
            return;
×
611
        }
612
    }
×
613
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc