• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 9859858140

09 Jul 2024 03:24PM UTC coverage: 56.679% (-0.2%) from 56.916%
9859858140

push

github

JFriel
update

10912 of 20750 branches covered (52.59%)

Branch coverage included in aggregate %.

30965 of 53135 relevant lines covered (58.28%)

7908.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.58
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Diagnostics;
11
using System.Linq;
12
using System.Threading.Tasks;
13
using FAnsi;
14
using FAnsi.Discovery.QuerySyntax;
15
using Rdmp.Core.Curation.Data;
16
using Rdmp.Core.DataExport.Data;
17
using Rdmp.Core.DataExport.DataExtraction.Commands;
18
using Rdmp.Core.DataFlowPipeline;
19
using Rdmp.Core.DataFlowPipeline.Requirements;
20
using Rdmp.Core.DataLoad.Engine.Pipeline.Components;
21
using Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
22
using Rdmp.Core.QueryBuilding;
23
using Rdmp.Core.ReusableLibraryCode;
24
using Rdmp.Core.ReusableLibraryCode.Checks;
25
using Rdmp.Core.ReusableLibraryCode.DataAccess;
26
using Rdmp.Core.ReusableLibraryCode.Progress;
27
using IContainer = Rdmp.Core.Curation.Data.IContainer;
28

29
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
30

31
/// <summary>
32
/// Executes a single Dataset extraction by linking a cohort with a dataset (either core or custom data - See IExtractCommand).  Also calculates the number
33
/// of unique identifiers seen, records row validation failures etc.
34
/// </summary>
35
public class ExecuteDatasetExtractionSource : IPluginDataFlowSource<DataTable>, IPipelineRequirement<IExtractCommand>
36
{
37
    //Request is either for one of these
38
    public ExtractDatasetCommand Request { get; protected set; }
3,176✔
39
    public ExtractGlobalsCommand GlobalsRequest { get; protected set; }
312✔
40

41
    public const string AuditTaskName = "DataExtraction";
42

43
    private readonly List<string> _extractionIdentifiersidx = new();
106✔
44

45
    private bool _cancel;
46

47
    private ICatalogue _catalogue;
48

49
    protected const string ValidationColumnName = "RowValidationResult";
50

51
    public ExtractionTimeValidator ExtractionTimeValidator { get; protected set; }
120✔
52
    public Exception ValidationFailureException { get; protected set; }
×
53

54
    public HashSet<object> UniqueReleaseIdentifiersEncountered { get; set; }
3,636✔
55

56
    public ExtractionTimeTimeCoverageAggregator ExtractionTimeTimeCoverageAggregator { get; set; }
272✔
57

58
    [DemandsInitialization(
59
        "Determines the systems behaviour when an extraction query returns 0 rows.  Default (false) is that an error is reported.  If set to true (ticked) then instead a DataTable with 0 rows but all the correct headers will be generated usually resulting in a headers only 0 line/empty extract file")]
60
    public bool AllowEmptyExtractions { get; set; }
176✔
61

62
    [DemandsInitialization(
63
        "Batch size, number of records to read from source before releasing it into the extraction pipeline",
64
        DefaultValue = 10000, Mandatory = true)]
65
    public int BatchSize { get; set; }
170✔
66

67
    [DemandsInitialization(
68
        "In seconds. Overrides the global timeout for SQL query execution. Use 0 for infinite timeout.",
69
        DefaultValue = 50000, Mandatory = true)]
70
    public int ExecutionTimeout { get; set; }
170✔
71

72
    [DemandsInitialization(@"Determines how the system achieves DISTINCT on extraction.  These include:
73
None - Do not DISTINCT the records, can result in duplication in your extract (not recommended)
74
SqlDistinct - Adds the DISTINCT keyword to the SELECT sql sent to the server
75
OrderByAndDistinctInMemory - Adds an ORDER BY statement to the query and applies the DISTINCT in memory as records are read from the server (this can help when extracting very large data sets where DISTINCT keyword blocks record streaming until all records are ready to go)"
76
        , DefaultValue = DistinctStrategy.SqlDistinct)]
77
    public DistinctStrategy DistinctStrategy { get; set; }
294✔
78

79

80
    [DemandsInitialization("When DBMS is SqlServer then HASH JOIN should be used instead of regular JOINs")]
81
    public bool UseHashJoins { get; set; }
170✔
82

83
    [DemandsInitialization(
84
        "When DBMS is SqlServer and the extraction is for any of these datasets then HASH JOIN should be used instead of regular JOINs")]
85
    public Catalogue[] UseHashJoinsForCatalogues { get; set; }
168✔
86

87
    [DemandsInitialization(
88
        "Exclusion list.  A collection of Catalogues which will never be considered for HASH JOIN even when UseHashJoins is enabled.  Being on this list takes precedence for a Catalogue even if it is on UseHashJoinsForCatalogues.")]
89
    public Catalogue[] DoNotUseHashJoinsForCatalogues { get; set; }
170✔
90

91

92
    /// <summary>
93
    /// This is a dictionary containing all the CatalogueItems used in the query, the underlying datatype in the origin database and the
94
    /// actual datatype that was output after the transform operation e.g. a varchar(10) could be converted into a bona fide DateTime which
95
    /// would be an sql Date.  Finally
96
    /// a recommended SqlDbType is passed back.
97
    /// </summary>
98
    public Dictionary<ExtractableColumn, ExtractTimeTransformationObserved> ExtractTimeTransformationsObserved;
99

100
    private DbDataCommandDataFlowSource _hostedSource;
101

102
    protected virtual void Initialize(ExtractDatasetCommand request)
103
    {
104
        Request = request;
152✔
105

106
        if (request == ExtractDatasetCommand.EmptyCommand)
152!
107
            return;
×
108

109
        _timeSpentValidating = new Stopwatch();
152✔
110
        _timeSpentCalculatingDISTINCT = new Stopwatch();
152✔
111
        _timeSpentBuckettingDates = new Stopwatch();
152✔
112

113
        Request.ColumnsToExtract.Sort(); //ensure they are in the right order so we can record the release identifiers
152✔
114

115
        //if we have a cached builder already
116
        if (request.QueryBuilder == null)
152✔
117
            request.GenerateQueryBuilder();
80✔
118

119
        foreach (var substitution in Request.ReleaseIdentifierSubstitutions)
608✔
120
            _extractionIdentifiersidx.Add(substitution.GetRuntimeName());
152✔
121

122
        UniqueReleaseIdentifiersEncountered = new HashSet<object>();
152✔
123

124
        _catalogue = request.Catalogue;
152✔
125

126
        if (!string.IsNullOrWhiteSpace(_catalogue.ValidatorXML))
152!
127
            ExtractionTimeValidator = new ExtractionTimeValidator(_catalogue, request.ColumnsToExtract);
×
128

129
        //if there is a time periodicity ExtractionInformation (AND! it is among the columns the user selected to be extracted)
130
        if (_catalogue.TimeCoverage_ExtractionInformation_ID != null && request.ColumnsToExtract
152!
131
                .Cast<ExtractableColumn>().Any(c =>
152✔
132
                    c.CatalogueExtractionInformation_ID == _catalogue.TimeCoverage_ExtractionInformation_ID))
152✔
133
            ExtractionTimeTimeCoverageAggregator =
×
134
                new ExtractionTimeTimeCoverageAggregator(_catalogue, request.ExtractableCohort);
×
135
        else
136
            ExtractionTimeTimeCoverageAggregator = null;
152✔
137
    }
152✔
138

139
    private void Initialize(ExtractGlobalsCommand request)
140
    {
141
        GlobalsRequest = request;
44✔
142
    }
44✔
143

144
    public bool WasCancelled => _cancel;
106✔
145

146
    private Stopwatch _timeSpentValidating;
147
    private int _rowsValidated;
148

149
    private Stopwatch _timeSpentCalculatingDISTINCT;
150
    private Stopwatch _timeSpentBuckettingDates;
151
    private int _rowsBucketted;
152

153
    private bool firstChunk = true;
106✔
154
    private bool firstGlobalChunk = true;
106✔
155
    private int _rowsRead;
156

157
    private RowPeeker _peeker = new();
106✔
158

159
    public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
160
    {
161
        // we are in the Global Commands case, let's return an empty DataTable (not null)
162
        // so we can trigger the destination to extract the globals docs and sql
163
        if (GlobalsRequest != null)
170✔
164
        {
165
            GlobalsRequest.ElevateState(ExtractCommandState.WaitingForSQLServer);
44✔
166
            if (firstGlobalChunk)
44✔
167
            {
168
                //unless we are checking, start auditing
169
                StartAuditGlobals();
22✔
170

171
                firstGlobalChunk = false;
22✔
172
                return new DataTable(ExtractionDirectory.GLOBALS_DATA_NAME);
22✔
173
            }
174

175
            return null;
22✔
176
        }
177

178
        if (Request == null)
126!
179
            throw new Exception("Component has not been initialized before being asked to GetChunk(s)");
×
180

181
        Request.ElevateState(ExtractCommandState.WaitingForSQLServer);
126✔
182

183
        if (_cancel)
126!
184
            throw new Exception("User cancelled data extraction");
×
185

186
        if (_hostedSource == null)
126✔
187
        {
188
            StartAudit(Request.QueryBuilder.SQL);
82✔
189

190
            if (Request.DatasetBundle.DataSet.DisableExtraction)
82✔
191
                throw new Exception(
2✔
192
                    $"Cannot extract {Request.DatasetBundle.DataSet} because DisableExtraction is set to true");
2✔
193

194
            _hostedSource = new DbDataCommandDataFlowSource(GetCommandSQL(listener),
80✔
195
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
80✔
196
                Request.GetDistinctLiveDatabaseServer().Builder,
80✔
197
                ExecutionTimeout)
80✔
198
            {
80✔
199
                // If we are running in batches then always allow empty extractions
80✔
200
                AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
80✔
201
                BatchSize = BatchSize
80✔
202
            };
80✔
203
        }
204

205
        DataTable chunk = null;
124✔
206

207
        try
208
        {
209
            chunk = _hostedSource.GetChunk(listener, cancellationToken);
124✔
210

211

212
            chunk = _peeker.AddPeekedRowsIfAny(chunk);
124✔
213

214
            if (Request != null && Request.DatasetBundle.DataSet is not null && chunk is not null)
124✔
215
                chunk.TableName = $"{Request.DatasetBundle.DataSet}";
78✔
216

217
            //if we are trying to distinct the records in memory based on release id
218
            if (DistinctStrategy == DistinctStrategy.OrderByAndDistinctInMemory)
124!
219
            {
220
                var releaseIdentifierColumn = Request.ReleaseIdentifierSubstitutions.First().GetRuntimeName();
×
221

222
                if (chunk is { Rows.Count: > 0 })
×
223
                {
224
                    //last release id in the current chunk
225
                    var lastReleaseId = chunk.Rows[^1][releaseIdentifierColumn];
×
226

227
                    _peeker.AddWhile(_hostedSource, r => Equals(r[releaseIdentifierColumn], lastReleaseId), chunk);
×
228
                    chunk = MakeDistinct(chunk, listener, cancellationToken);
×
229
                }
230
            }
231
        }
124✔
232
        catch (AggregateException a)
×
233
        {
234
            if (a.GetExceptionIfExists<TaskCanceledException>() != null)
×
235
                _cancel = true;
×
236

237
            throw;
×
238
        }
239
        catch (Exception e)
×
240
        {
241
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Read from source failed", e));
×
242
        }
×
243

244
        if (cancellationToken.IsCancellationRequested)
124!
245
            throw new Exception("Data read cancelled because our cancellationToken was set, aborting data reading");
×
246

247
        //if the first chunk is null
248
        if (firstChunk && chunk == null && !AllowEmptyExtractions)
124✔
249
            throw new Exception(
2!
250
                $"There is no data to load, query returned no rows, query was:{Environment.NewLine}{_hostedSource.Sql ?? Request.QueryBuilder.SQL}");
2✔
251

252
        //not the first chunk anymore
253
        firstChunk = false;
122✔
254

255
        //data exhausted
256
        if (chunk == null)
122✔
257
        {
258
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
44✔
259
                $"Data exhausted after reading {_rowsRead} rows of data ({UniqueReleaseIdentifiersEncountered.Count} unique release identifiers seen)"));
44✔
260
            if (Request != null)
44✔
261
                Request.CumulativeExtractionResults.DistinctReleaseIdentifiersEncountered =
44!
262
                    Request.IsBatchResume ? -1 : UniqueReleaseIdentifiersEncountered.Count;
44✔
263
            return null;
44✔
264
        }
265

266
        _rowsRead += chunk.Rows.Count;
78✔
267
        //chunk will have datatypes for all the things in the buffer so we can populate our dictionary of facts about what columns/catalogue items have spontaneously changed name/type etc
268
        if (ExtractTimeTransformationsObserved == null)
78✔
269
            GenerateExtractionTransformObservations(chunk);
78✔
270

271

272
        //see if the SqlDataReader has a column with the same name as the ReleaseIdentifierSQL (if so then we can use it to count the number of distinct subjects written out to the csv)
273
        var includesReleaseIdentifier = _extractionIdentifiersidx.Count > 0;
78✔
274

275

276
        //first line - let's see what columns we wrote out
277
        //looks at the buffer and computes any transforms performed on the column
278

279

280
        _timeSpentValidating.Start();
78✔
281
        //build up the validation report (Missing/Wrong/Etc) - this has no mechanical effect on the extracted data just some metadata that goes into a flat file
282
        if (ExtractionTimeValidator != null && Request.IncludeValidation)
78!
283
            try
284
            {
285
                chunk.Columns.Add(ValidationColumnName);
×
286

287
                ExtractionTimeValidator.Validate(chunk, ValidationColumnName);
×
288

289
                _rowsValidated += chunk.Rows.Count;
×
290
                listener.OnProgress(this,
×
291
                    new ProgressEventArgs("Validation", new ProgressMeasurement(_rowsValidated, ProgressType.Records),
×
292
                        _timeSpentValidating.Elapsed));
×
293
            }
×
294
            catch (Exception ex)
×
295
            {
296
                listener.OnNotify(this,
×
297
                    new NotifyEventArgs(ProgressEventType.Error, "Could not validate data chunk", ex));
×
298
                ValidationFailureException = ex;
×
299
                ExtractionTimeValidator = null;
×
300
            }
×
301

302
        _timeSpentValidating.Stop();
78✔
303

304
        _timeSpentBuckettingDates.Start();
78✔
305
        if (ExtractionTimeTimeCoverageAggregator != null)
78!
306
        {
307
            _rowsBucketted += chunk.Rows.Count;
×
308

309
            foreach (DataRow row in chunk.Rows)
×
310
                ExtractionTimeTimeCoverageAggregator.ProcessRow(row);
×
311

312
            listener.OnProgress(this,
×
313
                new ProgressEventArgs("Bucketting Dates", new ProgressMeasurement(_rowsBucketted, ProgressType.Records),
×
314
                    _timeSpentCalculatingDISTINCT.Elapsed));
×
315
        }
316

317
        _timeSpentBuckettingDates.Stop();
78✔
318

319
        _timeSpentCalculatingDISTINCT.Start();
78✔
320
        var pks = new List<DataColumn>();
78✔
321

322
        //record unique release identifiers found
323
        if (includesReleaseIdentifier)
78✔
324
            foreach (var idx in _extractionIdentifiersidx.Distinct().ToList())
312✔
325
            {
326
                pks.Add(chunk.Columns[idx]);
78✔
327
                foreach (DataRow r in chunk.Rows)
6,708✔
328
                {
329
                    if (r[idx] == DBNull.Value)
3,276!
330
                        if (_extractionIdentifiersidx.Count == 1)
×
331
                            throw new Exception(
×
332
                                $"Null release identifier found in extract of dataset {Request.DatasetBundle.DataSet}");
×
333
                        else
334
                            continue; //there are multiple extraction identifiers thats fine if one or two are null
335

336
                    UniqueReleaseIdentifiersEncountered.Add(r[idx]);
3,276✔
337
                }
338

339
                listener.OnProgress(this,
78✔
340
                    new ProgressEventArgs("Calculating Distinct Release Identifiers",
78✔
341
                        new ProgressMeasurement(UniqueReleaseIdentifiersEncountered.Count, ProgressType.Records),
78✔
342
                        _timeSpentCalculatingDISTINCT.Elapsed));
78✔
343
            }
344

345
        _timeSpentCalculatingDISTINCT.Stop();
78✔
346
        foreach (string name in Request.ColumnsToExtract.Where(c => ((ExtractableColumn)(c)).CatalogueExtractionInformation.IsPrimaryKey).Select(column => ((ExtractableColumn)column).CatalogueExtractionInformation.ToString()))
1,304✔
347
        {
348
            pks.Add(chunk.Columns[name]);
8✔
349
        }
350
        chunk.PrimaryKey = pks.ToArray();
78✔
351

352
        return chunk;
56✔
353
    }
354

355
    /// <summary>
356
    /// Makes the current batch ONLY distinct.  This only works if you have a bounded batch (see OrderByAndDistinctInMemory)
357
    /// </summary>
358
    /// <param name="chunk"></param>
359
    /// <param name="listener"></param>
360
    /// <param name="cancellationToken"></param>
361
    /// <returns></returns>
362
    private static DataTable MakeDistinct(DataTable chunk, IDataLoadEventListener listener,
363
        GracefulCancellationToken cancellationToken)
364
    {
365
        var removeDuplicates = new RemoveDuplicates { NoLogging = true };
×
366
        return removeDuplicates.ProcessPipelineData(chunk, listener, cancellationToken);
×
367
    }
368

369
    private void GenerateExtractionTransformObservations(DataTable chunk)
370
    {
371
        ExtractTimeTransformationsObserved = new Dictionary<ExtractableColumn, ExtractTimeTransformationObserved>();
78✔
372

373
        //create the Types dictionary
374
        foreach (ExtractableColumn column in Request.ColumnsToExtract)
2,404✔
375
        {
376
            ExtractTimeTransformationsObserved.Add(column, new ExtractTimeTransformationObserved());
1,124✔
377

378
            //record catalogue information about what it is supposed to be.
379
            if (!column.HasOriginalExtractionInformationVanished())
1,124✔
380
            {
381
                var extractionInformation = column.CatalogueExtractionInformation;
1,124✔
382

383
                //what the catalogue says it is
384
                ExtractTimeTransformationsObserved[column].DataTypeInCatalogue =
1,124✔
385
                    extractionInformation.ColumnInfo.Data_type;
1,124✔
386
                ExtractTimeTransformationsObserved[column].CatalogueItem = extractionInformation.CatalogueItem;
1,124✔
387

388
                //what it actually is
389
                if (chunk.Columns.Contains(column.GetRuntimeName()))
1,124!
390
                {
391
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = true;
1,124✔
392
                    ExtractTimeTransformationsObserved[column].DataTypeObservedInRuntimeBuffer =
1,124✔
393
                        chunk.Columns[column.GetRuntimeName()].DataType;
1,124✔
394
                }
395
                else
396
                {
397
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = false;
×
398
                }
399
            }
400
        }
401
    }
78✔
402

403
    private string GetCommandSQL(IDataLoadEventListener listener)
404
    {
405
        //if the user wants some custom logic for removing identical duplicates
406
        switch (DistinctStrategy)
407
        {
408
            //user doesn't care about identical duplicates
409
            case DistinctStrategy.None:
410
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
12!
411
                break;
12✔
412

413
            //system default behaviour
414
            case DistinctStrategy.SqlDistinct:
415
                break;
416

417
            //user wants to run order by the release ID and resolve duplicates in batches as they are read
418
            case DistinctStrategy.OrderByAndDistinctInMemory:
419

420
                //remove the DISTINCT keyword from the query
421
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
×
422

423
                //find the release identifier substitution (e.g. chi for PROCHI)
424
                var substitution = Request.ReleaseIdentifierSubstitutions.First();
×
425

426
                //add a line at the end of the query to ORDER BY the ReleaseId column (e.g. PROCHI)
427
                var orderBySql = $"ORDER BY {substitution.SelectSQL}";
×
428

429
                // don't add the line if it is already there (e.g. because of Retry)
430
                if (!Request.QueryBuilder.CustomLines.Any(l => string.Equals(l.Text, orderBySql)))
×
431
                    Request.QueryBuilder.AddCustomLine(orderBySql, QueryComponent.Postfix);
×
432

433
                break;
×
434
            default:
435
                throw new ArgumentOutOfRangeException();
×
436
        }
437

438
        var sql = Request.QueryBuilder.SQL;
80✔
439

440
        sql = HackExtractionSQL(sql, listener);
80✔
441

442
        if (ShouldUseHashedJoins())
80✔
443
        {
444
            //use hash joins!
445
            listener.OnNotify(this,
2✔
446
                new NotifyEventArgs(ProgressEventType.Information, "Substituting JOIN for HASH JOIN"));
2✔
447
            sql = sql.Replace(" JOIN ", " HASH JOIN ");
2✔
448
        }
449

450
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
80✔
451
            $"/*Decided on extraction SQL:*/{Environment.NewLine}{sql}"));
80✔
452

453
        return sql;
80✔
454
    }
455

456
    private bool ShouldUseHashedJoins()
457
    {
458
        var dbms = Request?.QueryBuilder?.QuerySyntaxHelper?.DatabaseType;
80!
459

460
        //must be sql server
461
        if (dbms == null || dbms.Value != DatabaseType.MicrosoftSQLServer)
80!
462
            return false;
×
463

464
        // this Catalogue is explicilty marked as never hash join? i.e. its on the exclusion list
465
        if (DoNotUseHashJoinsForCatalogues?.Contains(Request.Catalogue) ?? false)
80!
466
            return false;
×
467

468
        if (UseHashJoins)
80✔
469
            return true;
2✔
470

471
        if (UseHashJoinsForCatalogues != null)
78!
472
            return UseHashJoinsForCatalogues.Contains(Request.Catalogue);
×
473

474
        //user doesn't want to use hash joins
475
        return false;
78✔
476
    }
477

478
    public virtual string HackExtractionSQL(string sql, IDataLoadEventListener listener) => sql;
72✔
479

480
    private void StartAudit(string sql)
481
    {
482
        var dataExportRepo = Request.DataExportRepository;
82✔
483

484
        var previousAudit = dataExportRepo
82✔
485
            .GetAllCumulativeExtractionResultsFor(Request.Configuration, Request.DatasetBundle.DataSet).ToArray();
82✔
486

487
        if (Request.IsBatchResume)
82!
488
        {
489
            var match =
×
490
                previousAudit.FirstOrDefault(a => a.ExtractableDataSet_ID == Request.DatasetBundle.DataSet.ID) ??
×
491
                throw new Exception(
×
492
                    $"Could not find previous CumulativeExtractionResults for dataset {Request.DatasetBundle.DataSet} despite the Request being marked as a batch resume");
×
493
            Request.CumulativeExtractionResults = match;
×
494
        }
495
        else
496
        {
497
            //delete old audit records
498
            foreach (var audit in previousAudit)
184✔
499
                audit.DeleteInDatabase();
10✔
500

501
            var extractionResults = new CumulativeExtractionResults(dataExportRepo, Request.Configuration,
82✔
502
                Request.DatasetBundle.DataSet, sql);
82✔
503

504
            var filterDescriptions =
82✔
505
                RecursivelyListAllFilterNames(
82✔
506
                    Request.Configuration.GetFilterContainerFor(Request.DatasetBundle.DataSet));
82✔
507

508
            extractionResults.FiltersUsed = filterDescriptions.TrimEnd(',');
82✔
509
            extractionResults.SaveToDatabase();
82✔
510

511
            Request.CumulativeExtractionResults = extractionResults;
82✔
512
        }
513
    }
82✔
514

515
    private void StartAuditGlobals()
516
    {
517
        var repo = GlobalsRequest.RepositoryLocator.DataExportRepository;
22✔
518

519
        var previousAudit = repo
22✔
520
            .GetAllObjectsWhere<SupplementalExtractionResults>("ExtractionConfiguration_ID",
22✔
521
                GlobalsRequest.Configuration.ID)
22✔
522
            .Where(c => c.CumulativeExtractionResults_ID == null);
22✔
523

524
        //delete old audit records
525
        foreach (var audit in previousAudit)
44!
526
            audit.DeleteInDatabase();
×
527
    }
22✔
528

529
    private string RecursivelyListAllFilterNames(IContainer filterContainer)
530
    {
531
        if (filterContainer == null)
82✔
532
            return "";
80✔
533

534
        var toReturn = "";
2✔
535

536
        if (filterContainer.GetSubContainers() != null)
2✔
537
            foreach (var subContainer in filterContainer.GetSubContainers())
4!
538
                toReturn += RecursivelyListAllFilterNames(subContainer);
×
539

540
        if (filterContainer.GetFilters() != null)
2✔
541
            foreach (var f in filterContainer.GetFilters())
8✔
542
                toReturn += $"{f.Name},";
2✔
543

544
        return toReturn;
2✔
545
    }
546

547
    public virtual void Dispose(IDataLoadEventListener job, Exception pipelineFailureExceptionIfAny)
548
    {
549
    }
86✔
550

551
    public void Abort(IDataLoadEventListener listener)
552
    {
553
    }
×
554

555
    public virtual DataTable TryGetPreview()
556
    {
557
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
558
            return new DataTable();
×
559

560
        var toReturn = new DataTable();
×
561
        toReturn.BeginLoadData();
×
562
        var server = _catalogue.GetDistinctLiveDatabaseServer(DataAccessContext.DataExport, false);
×
563

564
        using var con = server.GetConnection();
×
565
        con.Open();
×
566

567
        var da = server.GetDataAdapter(Request.QueryBuilder.SQL, con);
×
568

569
        //get up to 1000 records
570
        da.Fill(0, 1000, toReturn);
×
571
        toReturn.EndLoadData();
×
572

573
        con.Close();
×
574

575
        return toReturn;
×
576
    }
×
577

578
    public void PreInitialize(IExtractCommand value, IDataLoadEventListener listener)
579
    {
580
        if (value is ExtractDatasetCommand datasetCommand)
196✔
581
            Initialize(datasetCommand);
152✔
582
        if (value is ExtractGlobalsCommand command)
196✔
583
            Initialize(command);
44✔
584
    }
196✔
585

586
    public virtual void Check(ICheckNotifier notifier)
587
    {
588
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
589
        {
590
            notifier.OnCheckPerformed(new CheckEventArgs(
×
591
                "Request is ExtractDatasetCommand.EmptyCommand, checking will not be carried out",
×
592
                CheckResult.Warning));
×
593
            return;
×
594
        }
595

596
        if (GlobalsRequest != null)
×
597
        {
598
            notifier.OnCheckPerformed(new CheckEventArgs(
×
599
                "Request is for Globals, checking will not be carried out at source", CheckResult.Success));
×
600
            return;
×
601
        }
602

603
        if (Request == null)
×
604
        {
605
            notifier.OnCheckPerformed(new CheckEventArgs("ExtractionRequest has not been set", CheckResult.Fail));
×
606
            return;
×
607
        }
608
    }
×
609
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc