• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 6003738864

28 Aug 2023 06:50PM UTC coverage: 57.442% (+0.01%) from 57.432%
6003738864

push

github

web-flow
Feature/RDMP-28 Add BeginLoadData & EndLoadData to Datatables (#1598)

* partial fix

* add row peaker update

* fix up whitespace

* add a lot of daat begin loads

* more data load

* fix typo

10908 of 20572 branches covered (0.0%)

Branch coverage included in aggregate %.

65 of 65 new or added lines in 20 files covered. (100.0%)

31683 of 53574 relevant lines covered (59.14%)

8443.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.14
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Diagnostics;
11
using System.Linq;
12
using System.Threading.Tasks;
13
using FAnsi;
14
using FAnsi.Discovery.QuerySyntax;
15
using Rdmp.Core.Curation.Data;
16
using Rdmp.Core.DataExport.Data;
17
using Rdmp.Core.DataExport.DataExtraction.Commands;
18
using Rdmp.Core.DataFlowPipeline;
19
using Rdmp.Core.DataFlowPipeline.Requirements;
20
using Rdmp.Core.DataLoad.Engine.Pipeline.Components;
21
using Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
22
using Rdmp.Core.QueryBuilding;
23
using Rdmp.Core.ReusableLibraryCode;
24
using Rdmp.Core.ReusableLibraryCode.Checks;
25
using Rdmp.Core.ReusableLibraryCode.DataAccess;
26
using Rdmp.Core.ReusableLibraryCode.Progress;
27
using IContainer = Rdmp.Core.Curation.Data.IContainer;
28

29
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
30

31
/// <summary>
32
/// Executes a single Dataset extraction by linking a cohort with a dataset (either core or custom data - See IExtractCommand).  Also calculates the number
33
/// of unique identifiers seen, records row validation failures etc.
34
/// </summary>
35
public class ExecuteDatasetExtractionSource : IPluginDataFlowSource<DataTable>, IPipelineRequirement<IExtractCommand>
36
{
37
    //Request is either for one of these
38
    public ExtractDatasetCommand Request { get; protected set; }
2,090✔
39
    public ExtractGlobalsCommand GlobalsRequest { get; protected set; }
134✔
40

41
    public const string AuditTaskName = "DataExtraction";
42

43
    private readonly List<string> _extractionIdentifiersidx = new();
66✔
44

45
    private bool _cancel = false;
46
    private ICatalogue _catalogue;
47

48
    protected const string ValidationColumnName = "RowValidationResult";
49

50
    public ExtractionTimeValidator ExtractionTimeValidator { get; protected set; }
102✔
51
    public Exception ValidationFailureException { get; protected set; }
×
52

53
    public HashSet<object> UniqueReleaseIdentifiersEncountered { get; set; }
8,488✔
54

55
    public ExtractionTimeTimeCoverageAggregator ExtractionTimeTimeCoverageAggregator { get; set; }
214✔
56

57
    [DemandsInitialization(
58
        "Determines the systems behaviour when an extraction query returns 0 rows.  Default (false) is that an error is reported.  If set to true (ticked) then instead a DataTable with 0 rows but all the correct headers will be generated usually resulting in a headers only 0 line/empty extract file")]
59
    public bool AllowEmptyExtractions { get; set; }
116✔
60

61
    [DemandsInitialization(
62
        "Batch size, number of records to read from source before releasing it into the extraction pipeline",
63
        DefaultValue = 10000, Mandatory = true)]
64
    public int BatchSize { get; set; }
110✔
65

66
    [DemandsInitialization(
67
        "In seconds. Overrides the global timeout for SQL query execution. Use 0 for infinite timeout.",
68
        DefaultValue = 50000, Mandatory = true)]
69
    public int ExecutionTimeout { get; set; }
110✔
70

71
    [DemandsInitialization(@"Determines how the system achieves DISTINCT on extraction.  These include:
72
None - Do not DISTINCT the records, can result in duplication in your extract (not recommended)
73
SqlDistinct - Adds the DISTINCT keyword to the SELECT sql sent to the server
74
OrderByAndDistinctInMemory - Adds an ORDER BY statement to the query and applies the DISTINCT in memory as records are read from the server (this can help when extracting very large data sets where DISTINCT keyword blocks record streaming until all records are ready to go)"
75
        , DefaultValue = DistinctStrategy.SqlDistinct)]
76
    public DistinctStrategy DistinctStrategy { get; set; }
216✔
77

78

79
    [DemandsInitialization("When DBMS is SqlServer then HASH JOIN should be used instead of regular JOINs")]
80
    public bool UseHashJoins { get; set; }
110✔
81

82
    [DemandsInitialization(
83
        "When DBMS is SqlServer and the extraction is for any of these datasets then HASH JOIN should be used instead of regular JOINs")]
84
    public Catalogue[] UseHashJoinsForCatalogues { get; set; }
108✔
85

86
    [DemandsInitialization(
87
        "Exclusion list.  A collection of Catalogues which will never be considered for HASH JOIN even when UseHashJoins is enabled.  Being on this list takes precedence for a Catalogue even if it is on UseHashJoinsForCatalogues.")]
88
    public Catalogue[] DoNotUseHashJoinsForCatalogues { get; set; }
110✔
89

90

91
    /// <summary>
92
    /// This is a dictionary containing all the CatalogueItems used in the query, the underlying datatype in the origin database and the
93
    /// actual datatype that was output after the transform operation e.g. a varchar(10) could be converted into a bona fide DateTime which
94
    /// would be an sql Date.  Finally
95
    /// a recommended SqlDbType is passed back.
96
    /// </summary>
97
    public Dictionary<ExtractableColumn, ExtractTimeTransformationObserved> ExtractTimeTransformationsObserved;
98

99
    private DbDataCommandDataFlowSource _hostedSource;
100

101
    protected virtual void Initialize(ExtractDatasetCommand request)
102
    {
103
        Request = request;
112✔
104

105
        if (request == ExtractDatasetCommand.EmptyCommand)
112!
106
            return;
×
107

108
        _timeSpentValidating = new Stopwatch();
112✔
109
        _timeSpentCalculatingDISTINCT = new Stopwatch();
112✔
110
        _timeSpentBuckettingDates = new Stopwatch();
112✔
111

112
        Request.ColumnsToExtract.Sort(); //ensure they are in the right order so we can record the release identifiers
112✔
113

114
        //if we have a cached builder already
115
        if (request.QueryBuilder == null)
112✔
116
            request.GenerateQueryBuilder();
60✔
117

118
        foreach (var substitution in Request.ReleaseIdentifierSubstitutions)
448✔
119
            _extractionIdentifiersidx.Add(substitution.GetRuntimeName());
112✔
120

121
        UniqueReleaseIdentifiersEncountered = new HashSet<object>();
112✔
122

123
        _catalogue = request.Catalogue;
112✔
124

125
        if (!string.IsNullOrWhiteSpace(_catalogue.ValidatorXML))
112!
126
            ExtractionTimeValidator = new ExtractionTimeValidator(_catalogue, request.ColumnsToExtract);
×
127

128
        //if there is a time periodicity ExtractionInformation (AND! it is among the columns the user selected to be extracted)
129
        if (_catalogue.TimeCoverage_ExtractionInformation_ID != null && request.ColumnsToExtract
112!
130
                .Cast<ExtractableColumn>().Any(c =>
112✔
131
                    c.CatalogueExtractionInformation_ID == _catalogue.TimeCoverage_ExtractionInformation_ID))
112✔
132
            ExtractionTimeTimeCoverageAggregator =
×
133
                new ExtractionTimeTimeCoverageAggregator(_catalogue, request.ExtractableCohort);
×
134
        else
135
            ExtractionTimeTimeCoverageAggregator = null;
112✔
136
    }
112✔
137

138
    private void Initialize(ExtractGlobalsCommand request)
139
    {
140
        GlobalsRequest = request;
4✔
141
    }
4✔
142

143
    public bool WasCancelled => _cancel;
90✔
144

145
    private Stopwatch _timeSpentValidating;
146
    private int _rowsValidated = 0;
147

148
    private Stopwatch _timeSpentCalculatingDISTINCT;
149
    private Stopwatch _timeSpentBuckettingDates;
150
    private int _rowsBucketted = 0;
151

152
    private bool firstChunk = true;
66✔
153
    private bool firstGlobalChunk = true;
66✔
154
    private int _rowsRead;
155

156
    private RowPeeker _peeker = new();
66✔
157

158
    public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
159
    {
160
        // we are in the Global Commands case, let's return an empty DataTable (not null)
161
        // so we can trigger the destination to extract the globals docs and sql
162
        if (GlobalsRequest != null)
112✔
163
        {
164
            GlobalsRequest.ElevateState(ExtractCommandState.WaitingForSQLServer);
4✔
165
            if (firstGlobalChunk)
4✔
166
            {
167
                //unless we are checking, start auditing
168
                StartAuditGlobals();
2✔
169

170
                firstGlobalChunk = false;
2✔
171
                return new DataTable(ExtractionDirectory.GLOBALS_DATA_NAME);
2✔
172
            }
173

174
            return null;
2✔
175
        }
176

177
        if (Request == null)
108!
178
            throw new Exception("Component has not been initialized before being asked to GetChunk(s)");
×
179

180
        Request.ElevateState(ExtractCommandState.WaitingForSQLServer);
108✔
181

182
        if (_cancel)
108!
183
            throw new Exception("User cancelled data extraction");
×
184

185
        if (_hostedSource == null)
108✔
186
        {
187
            StartAudit(Request.QueryBuilder.SQL);
62✔
188

189
            if (Request.DatasetBundle.DataSet.DisableExtraction)
62✔
190
                throw new Exception(
2✔
191
                    $"Cannot extract {Request.DatasetBundle.DataSet} because DisableExtraction is set to true");
2✔
192

193
            _hostedSource = new DbDataCommandDataFlowSource(GetCommandSQL(listener),
60✔
194
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
60✔
195
                Request.GetDistinctLiveDatabaseServer().Builder,
60✔
196
                ExecutionTimeout)
60✔
197
            {
60✔
198
                // If we are running in batches then always allow empty extractions
60✔
199
                AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
60✔
200
                BatchSize = BatchSize
60✔
201
            };
60✔
202
        }
203

204
        DataTable chunk = null;
106✔
205

206
        try
207
        {
208
            chunk = _hostedSource.GetChunk(listener, cancellationToken);
106✔
209

210
            chunk = _peeker.AddPeekedRowsIfAny(chunk);
106✔
211

212
            //if we are trying to distinct the records in memory based on release id
213
            if (DistinctStrategy == DistinctStrategy.OrderByAndDistinctInMemory)
106!
214
            {
215
                var releaseIdentifierColumn = Request.ReleaseIdentifierSubstitutions.First().GetRuntimeName();
×
216

217
                if (chunk != null && chunk.Rows.Count > 0)
×
218
                {
219
                    //last release id in the current chunk
220
                    var lastReleaseId = chunk.Rows[^1][releaseIdentifierColumn];
×
221

222
                    _peeker.AddWhile(_hostedSource, r => Equals(r[releaseIdentifierColumn], lastReleaseId), chunk);
×
223
                    chunk = MakeDistinct(chunk, listener, cancellationToken);
×
224
                }
225
            }
226
        }
106✔
227
        catch (AggregateException a)
×
228
        {
229
            if (a.GetExceptionIfExists<TaskCanceledException>() != null)
×
230
                _cancel = true;
×
231

232
            throw;
×
233
        }
234
        catch (Exception e)
×
235
        {
236
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Read from source failed", e));
×
237
        }
×
238

239
        if (cancellationToken.IsCancellationRequested)
106!
240
            throw new Exception("Data read cancelled because our cancellationToken was set, aborting data reading");
×
241

242
        //if the first chunk is null
243
        if (firstChunk && chunk == null && !AllowEmptyExtractions)
106✔
244
            throw new Exception(
2!
245
                $"There is no data to load, query returned no rows, query was:{Environment.NewLine}{_hostedSource.Sql ?? Request.QueryBuilder.SQL}");
2✔
246

247
        //not the first chunk anymore
248
        firstChunk = false;
104✔
249

250
        //data exhausted
251
        if (chunk == null)
104✔
252
        {
253
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
46✔
254
                $"Data exhausted after reading {_rowsRead} rows of data ({UniqueReleaseIdentifiersEncountered.Count} unique release identifiers seen)"));
46✔
255
            if (Request != null)
46✔
256
                Request.CumulativeExtractionResults.DistinctReleaseIdentifiersEncountered =
46!
257
                    Request.IsBatchResume ? -1 : UniqueReleaseIdentifiersEncountered.Count;
46✔
258
            return null;
46✔
259
        }
260

261
        _rowsRead += chunk.Rows.Count;
58✔
262
        //chunk will have datatypes for all the things in the buffer so we can populate our dictionary of facts about what columns/catalogue items have spontaneously changed name/type etc
263
        if (ExtractTimeTransformationsObserved == null)
58✔
264
            GenerateExtractionTransformObservations(chunk);
58✔
265

266

267
        //see if the SqlDataReader has a column with the same name as the ReleaseIdentifierSQL (if so then we can use it to count the number of distinct subjects written out to the csv)
268
        var includesReleaseIdentifier = _extractionIdentifiersidx.Count > 0;
58✔
269

270

271
        //first line - let's see what columns we wrote out
272
        //looks at the buffer and computes any transforms performed on the column
273

274

275
        _timeSpentValidating.Start();
58✔
276
        //build up the validation report (Missing/Wrong/Etc) - this has no mechanical effect on the extracted data just some metadata that goes into a flat file
277
        if (ExtractionTimeValidator != null && Request.IncludeValidation)
58!
278
            try
279
            {
280
                chunk.Columns.Add(ValidationColumnName);
×
281

282
                ExtractionTimeValidator.Validate(chunk, ValidationColumnName);
×
283

284
                _rowsValidated += chunk.Rows.Count;
×
285
                listener.OnProgress(this,
×
286
                    new ProgressEventArgs("Validation", new ProgressMeasurement(_rowsValidated, ProgressType.Records),
×
287
                        _timeSpentValidating.Elapsed));
×
288
            }
×
289
            catch (Exception ex)
×
290
            {
291
                listener.OnNotify(this,
×
292
                    new NotifyEventArgs(ProgressEventType.Error, "Could not validate data chunk", ex));
×
293
                ValidationFailureException = ex;
×
294
                ExtractionTimeValidator = null;
×
295
            }
×
296

297
        _timeSpentValidating.Stop();
58✔
298

299
        _timeSpentBuckettingDates.Start();
58✔
300
        if (ExtractionTimeTimeCoverageAggregator != null)
58!
301
        {
302
            _rowsBucketted += chunk.Rows.Count;
×
303

304
            foreach (DataRow row in chunk.Rows)
×
305
                ExtractionTimeTimeCoverageAggregator.ProcessRow(row);
×
306

307
            listener.OnProgress(this,
×
308
                new ProgressEventArgs("Bucketting Dates", new ProgressMeasurement(_rowsBucketted, ProgressType.Records),
×
309
                    _timeSpentCalculatingDISTINCT.Elapsed));
×
310
        }
311

312
        _timeSpentBuckettingDates.Stop();
58✔
313

314
        _timeSpentCalculatingDISTINCT.Start();
58✔
315
        //record unique release identifiers found
316
        if (includesReleaseIdentifier)
58✔
317
            foreach (var idx in _extractionIdentifiersidx)
324✔
318
            {
319
                foreach (DataRow r in chunk.Rows)
13,592✔
320
                {
321
                    if (r[idx] == DBNull.Value)
6,692!
322
                        if (_extractionIdentifiersidx.Count == 1)
×
323
                            throw new Exception(
×
324
                                $"Null release identifier found in extract of dataset {Request.DatasetBundle.DataSet}");
×
325
                        else
326
                            continue; //there are multiple extraction identifiers thats fine if one or two are null
327

328
                    if (!UniqueReleaseIdentifiersEncountered.Contains(r[idx]))
6,692✔
329
                        UniqueReleaseIdentifiersEncountered.Add(r[idx]);
1,444✔
330
                }
331

332
                listener.OnProgress(this,
104✔
333
                    new ProgressEventArgs("Calculating Distinct Release Identifiers",
104✔
334
                        new ProgressMeasurement(UniqueReleaseIdentifiersEncountered.Count, ProgressType.Records),
104✔
335
                        _timeSpentCalculatingDISTINCT.Elapsed));
104✔
336
            }
337

338
        _timeSpentCalculatingDISTINCT.Stop();
58✔
339

340
        return chunk;
58✔
341
    }
342

343
    /// <summary>
344
    /// Makes the current batch ONLY distinct.  This only works if you have a bounded batch (see OrderByAndDistinctInMemory)
345
    /// </summary>
346
    /// <param name="chunk"></param>
347
    /// <param name="listener"></param>
348
    /// <param name="cancellationToken"></param>
349
    /// <returns></returns>
350
    private static DataTable MakeDistinct(DataTable chunk, IDataLoadEventListener listener,
351
        GracefulCancellationToken cancellationToken)
352
    {
353
        var removeDuplicates = new RemoveDuplicates { NoLogging = true };
×
354
        return removeDuplicates.ProcessPipelineData(chunk, listener, cancellationToken);
×
355
    }
356

357
    private void GenerateExtractionTransformObservations(DataTable chunk)
358
    {
359
        ExtractTimeTransformationsObserved = new Dictionary<ExtractableColumn, ExtractTimeTransformationObserved>();
58✔
360

361
        //create the Types dictionary
362
        foreach (ExtractableColumn column in Request.ColumnsToExtract)
844✔
363
        {
364
            ExtractTimeTransformationsObserved.Add(column, new ExtractTimeTransformationObserved());
364✔
365

366
            //record catalogue information about what it is supposed to be.
367
            if (!column.HasOriginalExtractionInformationVanished())
364✔
368
            {
369
                var extractionInformation = column.CatalogueExtractionInformation;
364✔
370

371
                //what the catalogue says it is
372
                ExtractTimeTransformationsObserved[column].DataTypeInCatalogue =
364✔
373
                    extractionInformation.ColumnInfo.Data_type;
364✔
374
                ExtractTimeTransformationsObserved[column].CatalogueItem = extractionInformation.CatalogueItem;
364✔
375

376
                //what it actually is
377
                if (chunk.Columns.Contains(column.GetRuntimeName()))
364!
378
                {
379
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = true;
364✔
380
                    ExtractTimeTransformationsObserved[column].DataTypeObservedInRuntimeBuffer =
364✔
381
                        chunk.Columns[column.GetRuntimeName()].DataType;
364✔
382
                }
383
                else
384
                {
385
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = false;
×
386
                }
387
            }
388
        }
389
    }
58✔
390

391
    private string GetCommandSQL(IDataLoadEventListener listener)
392
    {
393
        //if the user wants some custom logic for removing identical duplicates
394
        switch (DistinctStrategy)
395
        {
396
            //user doesn't care about identical duplicates
397
            case DistinctStrategy.None:
398
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
12!
399
                break;
12✔
400

401
            //system default behaviour
402
            case DistinctStrategy.SqlDistinct:
403
                break;
404

405
            //user wants to run order by the release ID and resolve duplicates in batches as they are read
406
            case DistinctStrategy.OrderByAndDistinctInMemory:
407

408
                //remove the DISTINCT keyword from the query
409
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
×
410

411
                //find the release identifier substitution (e.g. chi for PROCHI)
412
                var substitution = Request.ReleaseIdentifierSubstitutions.First();
×
413

414
                //add a line at the end of the query to ORDER BY the ReleaseId column (e.g. PROCHI)
415
                var orderBySql = $"ORDER BY {substitution.SelectSQL}";
×
416

417
                // don't add the line if it is already there (e.g. because of Retry)
418
                if (!Request.QueryBuilder.CustomLines.Any(l => string.Equals(l.Text, orderBySql)))
×
419
                    Request.QueryBuilder.AddCustomLine(orderBySql, QueryComponent.Postfix);
×
420

421
                break;
×
422
            default:
423
                throw new ArgumentOutOfRangeException();
×
424
        }
425

426
        var sql = Request.QueryBuilder.SQL;
60✔
427

428
        sql = HackExtractionSQL(sql, listener);
60✔
429

430
        if (ShouldUseHashedJoins())
60✔
431
        {
432
            //use hash joins!
433
            listener.OnNotify(this,
2✔
434
                new NotifyEventArgs(ProgressEventType.Information, "Substituting JOIN for HASH JOIN"));
2✔
435
            sql = sql.Replace(" JOIN ", " HASH JOIN ");
2✔
436
        }
437

438
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
60✔
439
            $"/*Decided on extraction SQL:*/{Environment.NewLine}{sql}"));
60✔
440

441
        return sql;
60✔
442
    }
443

444
    private bool ShouldUseHashedJoins()
445
    {
446
        var dbms = Request?.QueryBuilder?.QuerySyntaxHelper?.DatabaseType;
60!
447

448
        //must be sql server
449
        if (dbms == null || dbms.Value != DatabaseType.MicrosoftSQLServer)
60!
450
            return false;
×
451

452
        // this Catalogue is explicilty marked as never hash join? i.e. its on the exclusion list
453
        if (DoNotUseHashJoinsForCatalogues?.Contains(Request.Catalogue) ?? false)
60!
454
            return false;
×
455

456
        if (UseHashJoins)
60✔
457
            return true;
2✔
458

459
        if (UseHashJoinsForCatalogues != null)
58!
460
            return UseHashJoinsForCatalogues.Contains(Request.Catalogue);
×
461

462
        //user doesn't want to use hash joins
463
        return false;
58✔
464
    }
465

466
    public virtual string HackExtractionSQL(string sql, IDataLoadEventListener listener) => sql;
52✔
467

468
    private void StartAudit(string sql)
469
    {
470
        var dataExportRepo = Request.DataExportRepository;
62✔
471

472
        var previousAudit = dataExportRepo
62✔
473
            .GetAllCumulativeExtractionResultsFor(Request.Configuration, Request.DatasetBundle.DataSet).ToArray();
62✔
474

475
        if (Request.IsBatchResume)
62!
476
        {
477
            var match =
×
478
                previousAudit.FirstOrDefault(a => a.ExtractableDataSet_ID == Request.DatasetBundle.DataSet.ID) ??
×
479
                throw new Exception(
×
480
                    $"Could not find previous CumulativeExtractionResults for dataset {Request.DatasetBundle.DataSet} despite the Request being marked as a batch resume");
×
481
            Request.CumulativeExtractionResults = match;
×
482
        }
483
        else
484
        {
485
            //delete old audit records
486
            foreach (var audit in previousAudit)
124!
487
                audit.DeleteInDatabase();
×
488

489
            var extractionResults = new CumulativeExtractionResults(dataExportRepo, Request.Configuration,
62✔
490
                Request.DatasetBundle.DataSet, sql);
62✔
491

492
            var filterDescriptions =
62✔
493
                RecursivelyListAllFilterNames(
62✔
494
                    Request.Configuration.GetFilterContainerFor(Request.DatasetBundle.DataSet));
62✔
495

496
            extractionResults.FiltersUsed = filterDescriptions.TrimEnd(',');
62✔
497
            extractionResults.SaveToDatabase();
62✔
498

499
            Request.CumulativeExtractionResults = extractionResults;
62✔
500
        }
501
    }
62✔
502

503
    private void StartAuditGlobals()
504
    {
505
        var repo = GlobalsRequest.RepositoryLocator.DataExportRepository;
2✔
506

507
        var previousAudit = repo
2✔
508
            .GetAllObjectsWhere<SupplementalExtractionResults>("ExtractionConfiguration_ID",
2✔
509
                GlobalsRequest.Configuration.ID)
2✔
510
            .Where(c => c.CumulativeExtractionResults_ID == null);
2✔
511

512
        //delete old audit records
513
        foreach (var audit in previousAudit)
4!
514
            audit.DeleteInDatabase();
×
515
    }
2✔
516

517
    private string RecursivelyListAllFilterNames(IContainer filterContainer)
518
    {
519
        if (filterContainer == null)
62✔
520
            return "";
60✔
521

522
        var toReturn = "";
2✔
523

524
        if (filterContainer.GetSubContainers() != null)
2✔
525
            foreach (var subContainer in filterContainer.GetSubContainers())
4!
526
                toReturn += RecursivelyListAllFilterNames(subContainer);
×
527

528
        if (filterContainer.GetFilters() != null)
2✔
529
            foreach (var f in filterContainer.GetFilters())
8✔
530
                toReturn += $"{f.Name},";
2✔
531

532
        return toReturn;
2✔
533
    }
534

535
    public virtual void Dispose(IDataLoadEventListener job, Exception pipelineFailureExceptionIfAny)
536
    {
537
    }
46✔
538

539
    public void Abort(IDataLoadEventListener listener)
540
    {
541
    }
×
542

543
    public virtual DataTable TryGetPreview()
544
    {
545
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
546
            return new DataTable();
×
547

548
        var toReturn = new DataTable();
×
549
        var server = _catalogue.GetDistinctLiveDatabaseServer(DataAccessContext.DataExport, false);
×
550

551
        using (var con = server.GetConnection())
×
552
        {
553
            con.Open();
×
554

555
            var da = server.GetDataAdapter(Request.QueryBuilder.SQL, con);
×
556

557
            //get up to 1000 records
558
            toReturn.BeginLoadData();
×
559
            da.Fill(0, 1000, toReturn);
×
560
            toReturn.EndLoadData();
×
561

562
            con.Close();
×
563
        }
×
564

565
        return toReturn;
×
566
    }
567

568
    public void PreInitialize(IExtractCommand value, IDataLoadEventListener listener)
569
    {
570
        if (value is ExtractDatasetCommand datasetCommand)
116✔
571
            Initialize(datasetCommand);
112✔
572
        if (value is ExtractGlobalsCommand command)
116✔
573
            Initialize(command);
4✔
574
    }
116✔
575

576
    public virtual void Check(ICheckNotifier notifier)
577
    {
578
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
579
        {
580
            notifier.OnCheckPerformed(new CheckEventArgs(
×
581
                "Request is ExtractDatasetCommand.EmptyCommand, checking will not be carried out",
×
582
                CheckResult.Warning));
×
583
            return;
×
584
        }
585

586
        if (GlobalsRequest != null)
×
587
        {
588
            notifier.OnCheckPerformed(new CheckEventArgs(
×
589
                "Request is for Globals, checking will not be carried out at source", CheckResult.Success));
×
590
            return;
×
591
        }
592

593
        if (Request == null)
×
594
        {
595
            notifier.OnCheckPerformed(new CheckEventArgs("ExtractionRequest has not been set", CheckResult.Fail));
×
596
            return;
×
597
        }
598
    }
×
599
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc