• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 13324667571

14 Feb 2025 07:36AM UTC coverage: 57.416% (+0.02%) from 57.394%
13324667571

push

github

web-flow
Task/rdmp 276 catalogue metadata (#2120)

* add basic editable

* intermin new ui

* smarter binding

* tab updates

* remove unused

* workign dropdowns

* finish bindings

* update tests

* update chips

* use group boxes

* use group boxes

* better layout

* working ui

* update tests

* update tests

* add missing file

* tidy up

* add scroll

* fix up scroll bars

* downward scroll

* tidy up

* tidy up

* add purpose of dataset

* add additional info

* add missing file

* attempt to fix test

* add tooltips

* add missing file

* improve display

* add tooltips

* update helper text

* rename

* update data source parsing

* tidy up

* add other

* move other

* update chip

* fix data surce types of catalogue

* fix copy to mem

11356 of 21336 branches covered (53.22%)

Branch coverage included in aggregate %.

54 of 64 new or added lines in 3 files covered. (84.38%)

2 existing lines in 1 file now uncovered.

32288 of 54678 relevant lines covered (59.05%)

8732.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.04
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs
1
// Copyright (c) The University of Dundee 2018-2025
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Data.Common;
11
using System.Diagnostics;
12
using System.Linq;
13
using System.Threading.Tasks;
14
using FAnsi;
15
using FAnsi.Discovery.QuerySyntax;
16
using Rdmp.Core.Curation.Data;
17
using Rdmp.Core.DataExport.Data;
18
using Rdmp.Core.DataExport.DataExtraction.Commands;
19
using Rdmp.Core.DataFlowPipeline;
20
using Rdmp.Core.DataFlowPipeline.Requirements;
21
using Rdmp.Core.DataLoad.Engine.Pipeline.Components;
22
using Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
23
using Rdmp.Core.QueryBuilding;
24
using Rdmp.Core.ReusableLibraryCode;
25
using Rdmp.Core.ReusableLibraryCode.Checks;
26
using Rdmp.Core.ReusableLibraryCode.DataAccess;
27
using Rdmp.Core.ReusableLibraryCode.Progress;
28
using IContainer = Rdmp.Core.Curation.Data.IContainer;
29

30
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
31

32
/// <summary>
33
/// Executes a single Dataset extraction by linking a cohort with a dataset (either core or custom data - See IExtractCommand).  Also calculates the number
34
/// of unique identifiers seen, records row validation failures etc.
35
/// </summary>
36
public class ExecuteDatasetExtractionSource : IPluginDataFlowSource<DataTable>, IPipelineRequirement<IExtractCommand>
37
{
38
    //Request is either for one of these
39
    public ExtractDatasetCommand Request { get; protected set; }
2,076✔
40
    public ExtractGlobalsCommand GlobalsRequest { get; protected set; }
183✔
41

42
    public const string AuditTaskName = "DataExtraction";
43

44
    private readonly List<string> _extractionIdentifiersidx = new();
61✔
45

46
    private bool _cancel;
47

48
    private ICatalogue _catalogue;
49

50
    protected const string ValidationColumnName = "RowValidationResult";
51

52
    public ExtractionTimeValidator ExtractionTimeValidator { get; protected set; }
87✔
53
    public Exception ValidationFailureException { get; protected set; }
×
54

55
    public HashSet<object> UniqueReleaseIdentifiersEncountered { get; set; }
1,907✔
56

57
    public ExtractionTimeTimeCoverageAggregator ExtractionTimeTimeCoverageAggregator { get; set; }
179✔
58

59
    [DemandsInitialization(
60
        "Determines the systems behaviour when an extraction query returns 0 rows.  Default (false) is that an error is reported.  If set to true (ticked) then instead a DataTable with 0 rows but all the correct headers will be generated usually resulting in a headers only 0 line/empty extract file")]
61
    public bool AllowEmptyExtractions { get; set; }
104✔
62

63
    [DemandsInitialization(
64
        "Batch size, number of records to read from source before releasing it into the extraction pipeline",
65
        DefaultValue = 10000, Mandatory = true)]
66
    public int BatchSize { get; set; }
101✔
67

68
    [DemandsInitialization(
69
        "In seconds. Overrides the global timeout for SQL query execution. Use 0 for infinite timeout.",
70
        DefaultValue = 50000, Mandatory = true)]
71
    public int ExecutionTimeout { get; set; }
101✔
72

73
    [DemandsInitialization(@"Determines how the system achieves DISTINCT on extraction.  These include:
74
None - Do not DISTINCT the records, can result in duplication in your extract (not recommended)
75
SqlDistinct - Adds the DISTINCT keyword to the SELECT sql sent to the server
76
OrderByAndDistinctInMemory - Adds an ORDER BY statement to the query and applies the DISTINCT in memory as records are read from the server (this can help when extracting very large data sets where DISTINCT keyword blocks record streaming until all records are ready to go)"
77
        , DefaultValue = DistinctStrategy.SqlDistinct)]
78
    public DistinctStrategy DistinctStrategy { get; set; }
191✔
79

80

81
    [DemandsInitialization("When DBMS is SqlServer then HASH JOIN should be used instead of regular JOINs")]
82
    public bool UseHashJoins { get; set; }
102✔
83

84
    [DemandsInitialization(
85
        "When DBMS is SqlServer and the extraction is for any of these datasets then HASH JOIN should be used instead of regular JOINs")]
86
    public Catalogue[] UseHashJoinsForCatalogues { get; set; }
101✔
87

88
    [DemandsInitialization(
89
        "Exclusion list.  A collection of Catalogues which will never be considered for HASH JOIN even when UseHashJoins is enabled.  Being on this list takes precedence for a Catalogue even if it is on UseHashJoinsForCatalogues.")]
90
    public Catalogue[] DoNotUseHashJoinsForCatalogues { get; set; }
102✔
91

92
    [DemandsInitialization("When performing an extracton, copy the cohort into a temporary table to improve extraction speed", defaultValue: false)]
93
    public bool UseTempTablesWhenExtractingCohort { get; set; }
199✔
94

95

96
    /// <summary>
97
    /// This is a dictionary containing all the CatalogueItems used in the query, the underlying datatype in the origin database and the
98
    /// actual datatype that was output after the transform operation e.g. a varchar(10) could be converted into a bona fide DateTime which
99
    /// would be an sql Date.  Finally
100
    /// a recommended SqlDbType is passed back.
101
    /// </summary>
102
    public Dictionary<ExtractableColumn, ExtractTimeTransformationObserved> ExtractTimeTransformationsObserved;
103

104
    private DbDataCommandDataFlowSource _hostedSource;
105

106
    private IExternalCohortTable _externalCohortTable;
107
    private string _whereSQL;
108
    private DbConnection _con;
109
    private string _uuid;
110
    protected virtual void Initialize(ExtractDatasetCommand request)
111
    {
112
        Request = request;
92✔
113

114
        if (request == ExtractDatasetCommand.EmptyCommand)
92!
115
            return;
×
116
        _externalCohortTable = request.ExtractableCohort.ExternalCohortTable;
92✔
117
        _whereSQL = request.ExtractableCohort.WhereSQL();
92✔
118
        _timeSpentValidating = new Stopwatch();
92✔
119
        _timeSpentCalculatingDISTINCT = new Stopwatch();
92✔
120
        _timeSpentBuckettingDates = new Stopwatch();
92✔
121

122
        Request.ColumnsToExtract.Sort(); //ensure they are in the right order so we can record the release identifiers
92✔
123

124
        //if we have a cached builder already
125
        if (request.QueryBuilder == null)
92✔
126
            request.GenerateQueryBuilder();
48✔
127

128
        foreach (var substitution in Request.ReleaseIdentifierSubstitutions)
368✔
129
            _extractionIdentifiersidx.Add(substitution.GetRuntimeName());
92✔
130

131
        UniqueReleaseIdentifiersEncountered = new HashSet<object>();
92✔
132

133
        _catalogue = request.Catalogue;
92✔
134

135
        if (!string.IsNullOrWhiteSpace(_catalogue.ValidatorXML))
92!
136
            ExtractionTimeValidator = new ExtractionTimeValidator(_catalogue, request.ColumnsToExtract);
×
137

138
        //if there is a time periodicity ExtractionInformation (AND! it is among the columns the user selected to be extracted)
139
        if (_catalogue.TimeCoverage_ExtractionInformation_ID != null && request.ColumnsToExtract
92!
140
                .Cast<ExtractableColumn>().Any(c =>
92✔
141
                    c.CatalogueExtractionInformation_ID == _catalogue.TimeCoverage_ExtractionInformation_ID))
92✔
142
            ExtractionTimeTimeCoverageAggregator =
×
143
                new ExtractionTimeTimeCoverageAggregator(_catalogue, request.ExtractableCohort);
×
144
        else
145
            ExtractionTimeTimeCoverageAggregator = null;
92✔
146
    }
92✔
147

148
    private void Initialize(ExtractGlobalsCommand request)
149
    {
150
        GlobalsRequest = request;
22✔
151
    }
22✔
152

153
    public bool WasCancelled => _cancel;
91✔
154

155
    private Stopwatch _timeSpentValidating;
156
    private int _rowsValidated;
157

158
    private Stopwatch _timeSpentCalculatingDISTINCT;
159
    private Stopwatch _timeSpentBuckettingDates;
160
    private int _rowsBucketted;
161

162
    private bool firstChunk = true;
61✔
163
    private bool firstGlobalChunk = true;
61✔
164
    private int _rowsRead;
165

166
    private RowPeeker _peeker = new();
61✔
167

168
    private static readonly Random random = new Random();
×
169

170
    private static string RandomString(int length)
171
    {
172
        const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
173
        return new string(Enumerable.Repeat(chars, length)
×
174
            .Select(s => s[random.Next(s.Length)]).ToArray());
×
175
    }
176

177
    private void CreateCohortTempTable(DbConnection con, IDataLoadEventListener listener)
178
    {
179
        _uuid = $"#{RandomString(24)}";
×
180
        var sql = "";
×
181
        var db = _externalCohortTable.Discover();
×
182
        switch (db.Server.DatabaseType)
×
183
        {
184
            case DatabaseType.MicrosoftSQLServer:
185
                sql = $"""
×
186
                    SELECT *
×
187
                    INTO {_uuid}
×
188
                    FROM(
×
189
                    SELECT * FROM {_externalCohortTable.TableName}
×
190
                    WHERE {_whereSQL}
×
191
                    ) as cohortTempTable
×
192
                """;
×
193
                break;
×
194
            case DatabaseType.MySql:
195
                sql = $"""
×
196
                    CREATE TEMPORARY TABLE {_uuid} ENGINE=MEMORY
×
197
                    as (SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL})
×
198
                """;
×
199
                break;
×
200
            case DatabaseType.Oracle:
NEW
201
                sql = $"""
×
202
                    CREATE TEMPORARY TABLE {_uuid} SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}
×
203
                """;
×
204
                break;
×
205
            case DatabaseType.PostgreSql:
NEW
206
                sql = $"""
×
207
                    CREATE TEMP TABLE {_uuid} AS
×
208
                    SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}
×
209
                """;
×
210
                break;
×
211
            default:
212
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Unable to create temporary table for cohort. Original cohort table will be used"));
×
213
                return;
×
214

215

216
        }
217
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"About to copy the cohort into a temporary table using the SQL: {sql}"));
×
218

219
        using var cmd = db.Server.GetCommand(sql, con);
×
220
        cmd.CommandTimeout = ExecutionTimeout;
×
221
        try
222
        {
223
            cmd.ExecuteNonQuery();
×
224
        }
×
NEW
225
        catch (Exception ex)
×
226
        {
NEW
227
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Unable to create temporary table for cohort. Original cohort table will be used", ex));
×
228
            _uuid = null;
×
229
        }
×
230
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Cohort successfully copied to temporary table"));
×
231

232
    }
×
233

234
    public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
235
    {
236
        // we are in the Global Commands case, let's return an empty DataTable (not null)
237
        // so we can trigger the destination to extract the globals docs and sql
238
        if (GlobalsRequest != null)
112✔
239
        {
240
            GlobalsRequest.ElevateState(ExtractCommandState.WaitingForSQLServer);
22✔
241
            if (firstGlobalChunk)
22✔
242
            {
243
                //unless we are checking, start auditing
244
                StartAuditGlobals();
11✔
245

246
                firstGlobalChunk = false;
11✔
247
                return new DataTable(ExtractionDirectory.GLOBALS_DATA_NAME);
11✔
248
            }
249

250
            return null;
11✔
251
        }
252

253
        if (Request == null)
90!
254
            throw new Exception("Component has not been initialized before being asked to GetChunk(s)");
×
255

256
        Request.ElevateState(ExtractCommandState.WaitingForSQLServer);
90✔
257

258
        if (_cancel)
90!
259
            throw new Exception("User cancelled data extraction");
×
260

261
        if (_hostedSource == null)
90✔
262
        {
263
            if (UseTempTablesWhenExtractingCohort)
49!
264
            {
265
                _con = DatabaseCommandHelper.GetConnection(Request.GetDistinctLiveDatabaseServer().Builder);
×
266
                _con.Open();
×
267
                CreateCohortTempTable(_con, listener);
×
268
            }
269
            var cmdSql = GetCommandSQL(listener);
49✔
270
            StartAudit(cmdSql);
49✔
271

272
            if (Request.DatasetBundle.DataSet.DisableExtraction)
49✔
273
                throw new Exception(
1✔
274
                    $"Cannot extract {Request.DatasetBundle.DataSet} because DisableExtraction is set to true");
1✔
275

276
            _hostedSource = UseTempTablesWhenExtractingCohort ? new DbDataCommandDataFlowSource(cmdSql,
48!
277
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
48✔
278
               _con,
48✔
279
                ExecutionTimeout)
48✔
280
            {
48✔
281
                AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
48✔
282
                BatchSize = BatchSize
48✔
283
            }
48✔
284
                : new DbDataCommandDataFlowSource(cmdSql,
48✔
285
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
48✔
286
               Request.GetDistinctLiveDatabaseServer().Builder,
48✔
287
                ExecutionTimeout)
48✔
288
                {
48✔
289
                    // If we are running in batches then always allow empty extractions
48✔
290
                    AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
48✔
291
                    BatchSize = BatchSize
48✔
292
                };
48✔
293
        }
294

295
        DataTable chunk = null;
89✔
296

297
        try
298
        {
299
            chunk = _hostedSource.GetChunk(listener, cancellationToken);
89✔
300

301

302
            chunk = _peeker.AddPeekedRowsIfAny(chunk);
89✔
303

304
            if (Request != null && Request.DatasetBundle.DataSet is not null && chunk is not null)
89✔
305
                chunk.TableName = $"{Request.DatasetBundle.DataSet}";
47✔
306

307
            //if we are trying to distinct the records in memory based on release id
308
            if (DistinctStrategy == DistinctStrategy.OrderByAndDistinctInMemory)
89!
309
            {
310
                var releaseIdentifierColumn = Request.ReleaseIdentifierSubstitutions.First().GetRuntimeName();
×
311

312
                if (chunk is { Rows.Count: > 0 })
×
313
                {
314
                    //last release id in the current chunk
315
                    var lastReleaseId = chunk.Rows[^1][releaseIdentifierColumn];
×
316

317
                    _peeker.AddWhile(_hostedSource, r => Equals(r[releaseIdentifierColumn], lastReleaseId), chunk);
×
318
                    chunk = MakeDistinct(chunk, listener, cancellationToken);
×
319
                }
320
            }
321
        }
89✔
322
        catch (AggregateException a)
×
323
        {
324
            if (a.GetExceptionIfExists<TaskCanceledException>() != null)
×
325
                _cancel = true;
×
326

327
            throw;
×
328
        }
329
        catch (Exception e)
×
330
        {
331
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Read from source failed", e));
×
332
        }
×
333

334
        if (cancellationToken.IsCancellationRequested)
89!
335
            throw new Exception("Data read cancelled because our cancellationToken was set, aborting data reading");
×
336

337
        //if the first chunk is null
338
        if (firstChunk && chunk == null && !AllowEmptyExtractions)
89✔
339
            throw new Exception(
1!
340
                $"There is no data to load, query returned no rows, query was:{Environment.NewLine}{_hostedSource.Sql ?? Request.QueryBuilder.SQL}");
1✔
341

342
        //not the first chunk anymore
343
        firstChunk = false;
88✔
344

345
        //data exhausted
346
        if (chunk == null)
88✔
347
        {
348
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
41✔
349
                $"Data exhausted after reading {_rowsRead} rows of data ({UniqueReleaseIdentifiersEncountered.Count} unique release identifiers seen)"));
41✔
350
            if (Request != null)
41✔
351
                Request.CumulativeExtractionResults.DistinctReleaseIdentifiersEncountered =
41!
352
                    Request.IsBatchResume ? -1 : UniqueReleaseIdentifiersEncountered.Count;
41✔
353
            return null;
41✔
354
        }
355

356
        _rowsRead += chunk.Rows.Count;
47✔
357
        //chunk will have datatypes for all the things in the buffer so we can populate our dictionary of facts about what columns/catalogue items have spontaneously changed name/type etc
358
        if (ExtractTimeTransformationsObserved == null)
47✔
359
            GenerateExtractionTransformObservations(chunk);
47✔
360

361

362
        //see if the SqlDataReader has a column with the same name as the ReleaseIdentifierSQL (if so then we can use it to count the number of distinct subjects written out to the csv)
363
        var includesReleaseIdentifier = _extractionIdentifiersidx.Count > 0;
47✔
364

365

366
        //first line - let's see what columns we wrote out
367
        //looks at the buffer and computes any transforms performed on the column
368

369

370
        _timeSpentValidating.Start();
47✔
371
        //build up the validation report (Missing/Wrong/Etc) - this has no mechanical effect on the extracted data just some metadata that goes into a flat file
372
        if (ExtractionTimeValidator != null && Request.IncludeValidation)
47!
373
            try
374
            {
375
                chunk.Columns.Add(ValidationColumnName);
×
376

377
                ExtractionTimeValidator.Validate(chunk, ValidationColumnName);
×
378

379
                _rowsValidated += chunk.Rows.Count;
×
380
                listener.OnProgress(this,
×
381
                    new ProgressEventArgs("Validation", new ProgressMeasurement(_rowsValidated, ProgressType.Records),
×
382
                        _timeSpentValidating.Elapsed));
×
383
            }
×
384
            catch (Exception ex)
×
385
            {
386
                listener.OnNotify(this,
×
387
                    new NotifyEventArgs(ProgressEventType.Error, "Could not validate data chunk", ex));
×
388
                ValidationFailureException = ex;
×
389
                ExtractionTimeValidator = null;
×
390
            }
×
391

392
        _timeSpentValidating.Stop();
47✔
393

394
        _timeSpentBuckettingDates.Start();
47✔
395
        if (ExtractionTimeTimeCoverageAggregator != null)
47!
396
        {
397
            _rowsBucketted += chunk.Rows.Count;
×
398

399
            foreach (DataRow row in chunk.Rows)
×
400
                ExtractionTimeTimeCoverageAggregator.ProcessRow(row);
×
401

402
            listener.OnProgress(this,
×
403
                new ProgressEventArgs("Bucketting Dates", new ProgressMeasurement(_rowsBucketted, ProgressType.Records),
×
404
                    _timeSpentCalculatingDISTINCT.Elapsed));
×
405
        }
406

407
        _timeSpentBuckettingDates.Stop();
47✔
408

409
        _timeSpentCalculatingDISTINCT.Start();
47✔
410
        var pks = new List<DataColumn>();
47✔
411

412
        //record unique release identifiers found
413
        if (includesReleaseIdentifier)
47✔
414
            foreach (var idx in _extractionIdentifiersidx.Distinct().ToList())
188✔
415
            {
416
                var sub = Request.ReleaseIdentifierSubstitutions.FirstOrDefault(s => s.Alias == chunk.Columns[idx].ColumnName);
94✔
417
                if (sub?.ColumnInfo.ExtractionInformations.FirstOrDefault()?.IsPrimaryKey == true)
47!
418
                {
419
                    pks.Add(chunk.Columns[idx]);
11✔
420
                }
421

422
                foreach (DataRow r in chunk.Rows)
3,386✔
423
                {
424
                    if (r[idx] == DBNull.Value)
1,646!
425
                        if (_extractionIdentifiersidx.Count == 1)
×
426
                            throw new Exception(
×
427
                                $"Null release identifier found in extract of dataset {Request.DatasetBundle.DataSet}");
×
428
                        else
429
                            continue; //there are multiple extraction identifiers that's fine if one or two are null
430

431
                    UniqueReleaseIdentifiersEncountered.Add(r[idx]);
1,646✔
432
                }
433

434
                listener.OnProgress(this,
47✔
435
                    new ProgressEventArgs("Calculating Distinct Release Identifiers",
47✔
436
                        new ProgressMeasurement(UniqueReleaseIdentifiersEncountered.Count, ProgressType.Records),
47✔
437
                        _timeSpentCalculatingDISTINCT.Elapsed));
47✔
438
            }
439

440
        _timeSpentCalculatingDISTINCT.Stop();
47✔
441
        pks.AddRange(Request.ColumnsToExtract.Where(static c => ((ExtractableColumn)c).CatalogueExtractionInformation.IsPrimaryKey).Select(static column => ((ExtractableColumn)column).CatalogueExtractionInformation.ToString()).Select(name => chunk.Columns[name]));
633✔
442
        chunk.PrimaryKey = pks.ToArray();
47✔
443

444
        return chunk;
47✔
445
    }
446

447
    /// <summary>
448
    /// Makes the current batch ONLY distinct.  This only works if you have a bounded batch (see OrderByAndDistinctInMemory)
449
    /// </summary>
450
    /// <param name="chunk"></param>
451
    /// <param name="listener"></param>
452
    /// <param name="cancellationToken"></param>
453
    /// <returns></returns>
454
    private static DataTable MakeDistinct(DataTable chunk, IDataLoadEventListener listener,
455
        GracefulCancellationToken cancellationToken)
456
    {
457
        var removeDuplicates = new RemoveDuplicates { NoLogging = true };
×
458
        return removeDuplicates.ProcessPipelineData(chunk, listener, cancellationToken);
×
459
    }
460

461
    private void GenerateExtractionTransformObservations(DataTable chunk)
462
    {
463
        ExtractTimeTransformationsObserved = new Dictionary<ExtractableColumn, ExtractTimeTransformationObserved>();
47✔
464

465
        //create the Types dictionary
466
        foreach (ExtractableColumn column in Request.ColumnsToExtract)
1,250✔
467
        {
468
            ExtractTimeTransformationsObserved.Add(column, new ExtractTimeTransformationObserved());
578✔
469

470
            //record catalogue information about what it is supposed to be.
471
            if (!column.HasOriginalExtractionInformationVanished())
578✔
472
            {
473
                var extractionInformation = column.CatalogueExtractionInformation;
578✔
474

475
                //what the catalogue says it is
476
                ExtractTimeTransformationsObserved[column].DataTypeInCatalogue =
578✔
477
                    extractionInformation.ColumnInfo.Data_type;
578✔
478
                ExtractTimeTransformationsObserved[column].CatalogueItem = extractionInformation.CatalogueItem;
578✔
479

480
                //what it actually is
481
                if (chunk.Columns.Contains(column.GetRuntimeName()))
578!
482
                {
483
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = true;
578✔
484
                    ExtractTimeTransformationsObserved[column].DataTypeObservedInRuntimeBuffer =
578✔
485
                        chunk.Columns[column.GetRuntimeName()].DataType;
578✔
486
                }
487
                else
488
                {
489
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = false;
×
490
                }
491
            }
492
        }
493
    }
47✔
494

495
    private string GetCommandSQL(IDataLoadEventListener listener)
496
    {
497
        //if the user wants some custom logic for removing identical duplicates
498
        switch (DistinctStrategy)
499
        {
500
            //user doesn't care about identical duplicates
501
            case DistinctStrategy.None:
502
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
7!
503
                break;
7✔
504

505
            //system default behaviour
506
            case DistinctStrategy.SqlDistinct:
507
                break;
508

509
            //user wants to run order by the release ID and resolve duplicates in batches as they are read
510
            case DistinctStrategy.OrderByAndDistinctInMemory:
511

512
                //remove the DISTINCT keyword from the query
513
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
×
514

515
                //find the release identifier substitution (e.g. chi for PROCHI)
516
                var substitution = Request.ReleaseIdentifierSubstitutions.First();
×
517

518
                //add a line at the end of the query to ORDER BY the ReleaseId column (e.g. PROCHI)
519
                var orderBySql = $"ORDER BY {substitution.SelectSQL}";
×
520

521
                // don't add the line if it is already there (e.g. because of Retry)
522
                if (!Request.QueryBuilder.CustomLines.Any(l => string.Equals(l.Text, orderBySql)))
×
523
                    Request.QueryBuilder.AddCustomLine(orderBySql, QueryComponent.Postfix);
×
524

525
                break;
×
526
            default:
527
                throw new ArgumentOutOfRangeException();
×
528
        }
529

530
        var sql = Request.QueryBuilder.SQL;
49✔
531

532
        sql = HackExtractionSQL(sql, listener);
49✔
533

534
        if (ShouldUseHashedJoins())
49✔
535
        {
536
            //use hash joins!
537
            listener.OnNotify(this,
1✔
538
                new NotifyEventArgs(ProgressEventType.Information, "Substituting JOIN for HASH JOIN"));
1✔
539
            sql = sql.Replace(" JOIN ", " HASH JOIN ");
1✔
540
        }
541

542
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
49✔
543
            $"/*Decided on extraction SQL:*/{Environment.NewLine}{sql}"));
49✔
544
        if (UseTempTablesWhenExtractingCohort && _uuid is not null)
49!
545
        {
546
            sql = sql.Replace(_externalCohortTable.TableName, _uuid);
×
547
        }
548
        return sql;
49✔
549
    }
550

551
    private bool ShouldUseHashedJoins()
552
    {
553
        var dbms = Request?.QueryBuilder?.QuerySyntaxHelper?.DatabaseType;
49!
554

555
        //must be sql server
556
        if (dbms == null || dbms.Value != DatabaseType.MicrosoftSQLServer)
49!
557
            return false;
×
558

559
        // this Catalogue is explicitly marked as never hash join? i.e. its on the exclusion list
560
        if (DoNotUseHashJoinsForCatalogues?.Contains(Request.Catalogue) ?? false)
49!
561
            return false;
×
562

563
        if (UseHashJoins)
49✔
564
            return true;
1✔
565

566
        if (UseHashJoinsForCatalogues != null)
48!
567
            return UseHashJoinsForCatalogues.Contains(Request.Catalogue);
×
568

569
        //user doesn't want to use hash joins
570
        return false;
48✔
571
    }
572

573
    public virtual string HackExtractionSQL(string sql, IDataLoadEventListener listener) => sql;
45✔
574

575
    private void StartAudit(string sql)
576
    {
577
        var dataExportRepo = Request.DataExportRepository;
49✔
578

579
        var previousAudit = dataExportRepo
49✔
580
            .GetAllCumulativeExtractionResultsFor(Request.Configuration, Request.DatasetBundle.DataSet).ToArray();
49✔
581

582
        if (Request.IsBatchResume)
49!
583
        {
584
            var match =
×
585
                previousAudit.FirstOrDefault(a => a.ExtractableDataSet_ID == Request.DatasetBundle.DataSet.ID) ??
×
586
                throw new Exception(
×
587
                    $"Could not find previous CumulativeExtractionResults for dataset {Request.DatasetBundle.DataSet} despite the Request being marked as a batch resume");
×
588
            Request.CumulativeExtractionResults = match;
×
589
        }
590
        else
591
        {
592
            //delete old audit records
593
            foreach (var audit in previousAudit)
108✔
594
                audit.DeleteInDatabase();
5✔
595

596
            var extractionResults = new CumulativeExtractionResults(dataExportRepo, Request.Configuration,
49✔
597
                Request.DatasetBundle.DataSet, sql);
49✔
598

599
            var filterDescriptions =
49✔
600
                RecursivelyListAllFilterNames(
49✔
601
                    Request.Configuration.GetFilterContainerFor(Request.DatasetBundle.DataSet));
49✔
602

603
            extractionResults.FiltersUsed = filterDescriptions.TrimEnd(',');
49✔
604
            extractionResults.SaveToDatabase();
49✔
605

606
            Request.CumulativeExtractionResults = extractionResults;
49✔
607
        }
608
    }
49✔
609

610
    private void StartAuditGlobals()
611
    {
612
        var repo = GlobalsRequest.RepositoryLocator.DataExportRepository;
11✔
613

614
        var previousAudit = repo
11✔
615
            .GetAllObjectsWhere<SupplementalExtractionResults>("ExtractionConfiguration_ID",
11✔
616
                GlobalsRequest.Configuration.ID)
11✔
617
            .Where(c => c.CumulativeExtractionResults_ID == null);
11✔
618

619
        //delete old audit records
620
        foreach (var audit in previousAudit)
22!
621
            audit.DeleteInDatabase();
×
622
    }
11✔
623

624
    private string RecursivelyListAllFilterNames(IContainer filterContainer)
625
    {
626
        if (filterContainer == null)
49✔
627
            return "";
48✔
628

629
        var toReturn = "";
1✔
630

631
        if (filterContainer.GetSubContainers() != null)
1✔
632
            foreach (var subContainer in filterContainer.GetSubContainers())
2!
633
                toReturn += RecursivelyListAllFilterNames(subContainer);
×
634

635
        if (filterContainer.GetFilters() != null)
1✔
636
            foreach (var f in filterContainer.GetFilters())
4✔
637
                toReturn += $"{f.Name},";
1✔
638

639
        return toReturn;
1✔
640
    }
641

642
    public virtual void Dispose(IDataLoadEventListener job, Exception pipelineFailureExceptionIfAny)
643
    {
644
    }
51✔
645

646
    public void Abort(IDataLoadEventListener listener)
647
    {
648
    }
×
649

650
    public virtual DataTable TryGetPreview()
651
    {
652
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
653
            return new DataTable();
×
654

655
        var toReturn = new DataTable();
×
656
        toReturn.BeginLoadData();
×
657
        var server = _catalogue.GetDistinctLiveDatabaseServer(DataAccessContext.DataExport, false);
×
658

659
        using var con = server.GetConnection();
×
660
        con.Open();
×
661

662
        var da = server.GetDataAdapter(Request.QueryBuilder.SQL, con);
×
663

664
        //get up to 1000 records
665
        da.Fill(0, 1000, toReturn);
×
666
        toReturn.EndLoadData();
×
667

668
        con.Close();
×
669

670
        return toReturn;
×
671
    }
×
672

673
    public void PreInitialize(IExtractCommand value, IDataLoadEventListener listener)
674
    {
675
        if (value is ExtractDatasetCommand datasetCommand)
114✔
676
            Initialize(datasetCommand);
92✔
677
        if (value is ExtractGlobalsCommand command)
114✔
678
            Initialize(command);
22✔
679
    }
114✔
680

681
    public virtual void Check(ICheckNotifier notifier)
682
    {
683
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
684
        {
685
            notifier.OnCheckPerformed(new CheckEventArgs(
×
686
                "Request is ExtractDatasetCommand.EmptyCommand, checking will not be carried out",
×
687
                CheckResult.Warning));
×
688
            return;
×
689
        }
690

691
        if (GlobalsRequest != null)
×
692
        {
693
            notifier.OnCheckPerformed(new CheckEventArgs(
×
694
                "Request is for Globals, checking will not be carried out at source", CheckResult.Success));
×
695
            return;
×
696
        }
697

698
        if (Request == null)
×
699
        {
700
            notifier.OnCheckPerformed(new CheckEventArgs("ExtractionRequest has not been set", CheckResult.Fail));
×
701
            return;
×
702
        }
703
    }
×
704
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc