• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 13235786180

10 Feb 2025 07:36AM UTC coverage: 57.403% (-0.05%) from 57.451%
13235786180

push

github

web-flow
Merge branch 'develop' into task/RDMP-265-version-data-loads

11339 of 21302 branches covered (53.23%)

Branch coverage included in aggregate %.

20 of 84 new or added lines in 3 files covered. (23.81%)

2 existing lines in 1 file now uncovered.

32238 of 54612 relevant lines covered (59.03%)

17062.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.88
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs
1
// Copyright (c) The University of Dundee 2018-2025
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Data.Common;
11
using System.Diagnostics;
12
using System.Linq;
13
using System.Threading.Tasks;
14
using FAnsi;
15
using FAnsi.Discovery.QuerySyntax;
16
using Rdmp.Core.Curation.Data;
17
using Rdmp.Core.DataExport.Data;
18
using Rdmp.Core.DataExport.DataExtraction.Commands;
19
using Rdmp.Core.DataFlowPipeline;
20
using Rdmp.Core.DataFlowPipeline.Requirements;
21
using Rdmp.Core.DataLoad.Engine.Pipeline.Components;
22
using Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
23
using Rdmp.Core.QueryBuilding;
24
using Rdmp.Core.ReusableLibraryCode;
25
using Rdmp.Core.ReusableLibraryCode.Checks;
26
using Rdmp.Core.ReusableLibraryCode.DataAccess;
27
using Rdmp.Core.ReusableLibraryCode.Progress;
28
using IContainer = Rdmp.Core.Curation.Data.IContainer;
29

30
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
31

32
/// <summary>
33
/// Executes a single Dataset extraction by linking a cohort with a dataset (either core or custom data - See IExtractCommand).  Also calculates the number
34
/// of unique identifiers seen, records row validation failures etc.
35
/// </summary>
36
public class ExecuteDatasetExtractionSource : IPluginDataFlowSource<DataTable>, IPipelineRequirement<IExtractCommand>
37
{
38
    //Request is either for one of these
39
    public ExtractDatasetCommand Request { get; protected set; }
4,152✔
40
    public ExtractGlobalsCommand GlobalsRequest { get; protected set; }
366✔
41

42
    public const string AuditTaskName = "DataExtraction";
43

44
    private readonly List<string> _extractionIdentifiersidx = new();
122✔
45

46
    private bool _cancel;
47

48
    private ICatalogue _catalogue;
49

50
    protected const string ValidationColumnName = "RowValidationResult";
51

52
    public ExtractionTimeValidator ExtractionTimeValidator { get; protected set; }
174✔
53
    public Exception ValidationFailureException { get; protected set; }
×
54

55
    public HashSet<object> UniqueReleaseIdentifiersEncountered { get; set; }
3,814✔
56

57
    public ExtractionTimeTimeCoverageAggregator ExtractionTimeTimeCoverageAggregator { get; set; }
358✔
58

59
    [DemandsInitialization(
60
        "Determines the systems behaviour when an extraction query returns 0 rows.  Default (false) is that an error is reported.  If set to true (ticked) then instead a DataTable with 0 rows but all the correct headers will be generated usually resulting in a headers only 0 line/empty extract file")]
61
    public bool AllowEmptyExtractions { get; set; }
208✔
62

63
    [DemandsInitialization(
64
        "Batch size, number of records to read from source before releasing it into the extraction pipeline",
65
        DefaultValue = 10000, Mandatory = true)]
66
    public int BatchSize { get; set; }
202✔
67

68
    [DemandsInitialization(
69
        "In seconds. Overrides the global timeout for SQL query execution. Use 0 for infinite timeout.",
70
        DefaultValue = 50000, Mandatory = true)]
71
    public int ExecutionTimeout { get; set; }
202✔
72

73
    [DemandsInitialization(@"Determines how the system achieves DISTINCT on extraction.  These include:
74
None - Do not DISTINCT the records, can result in duplication in your extract (not recommended)
75
SqlDistinct - Adds the DISTINCT keyword to the SELECT sql sent to the server
76
OrderByAndDistinctInMemory - Adds an ORDER BY statement to the query and applies the DISTINCT in memory as records are read from the server (this can help when extracting very large data sets where DISTINCT keyword blocks record streaming until all records are ready to go)"
77
        , DefaultValue = DistinctStrategy.SqlDistinct)]
78
    public DistinctStrategy DistinctStrategy { get; set; }
382✔
79

80

81
    [DemandsInitialization("When DBMS is SqlServer then HASH JOIN should be used instead of regular JOINs")]
82
    public bool UseHashJoins { get; set; }
204✔
83

84
    [DemandsInitialization(
85
        "When DBMS is SqlServer and the extraction is for any of these datasets then HASH JOIN should be used instead of regular JOINs")]
86
    public Catalogue[] UseHashJoinsForCatalogues { get; set; }
202✔
87

88
    [DemandsInitialization(
89
        "Exclusion list.  A collection of Catalogues which will never be considered for HASH JOIN even when UseHashJoins is enabled.  Being on this list takes precedence for a Catalogue even if it is on UseHashJoinsForCatalogues.")]
90
    public Catalogue[] DoNotUseHashJoinsForCatalogues { get; set; }
204✔
91

92
    [DemandsInitialization("When performing an extracton, copy the cohort into a temporary table to improve extraction speed", defaultValue: false)]
93
    public bool UseTempTablesWhenExtractingCohort { get; set; }
398✔
94

95

96
    /// <summary>
97
    /// This is a dictionary containing all the CatalogueItems used in the query, the underlying datatype in the origin database and the
98
    /// actual datatype that was output after the transform operation e.g. a varchar(10) could be converted into a bona fide DateTime which
99
    /// would be an sql Date.  Finally
100
    /// a recommended SqlDbType is passed back.
101
    /// </summary>
102
    public Dictionary<ExtractableColumn, ExtractTimeTransformationObserved> ExtractTimeTransformationsObserved;
103

104
    private DbDataCommandDataFlowSource _hostedSource;
105

106
    private IExternalCohortTable _externalCohortTable;
107
    private string _whereSQL;
108
    private DbConnection _con;
109
    private string _uuid;
110
    protected virtual void Initialize(ExtractDatasetCommand request)
111
    {
112
        Request = request;
184✔
113

114
        if (request == ExtractDatasetCommand.EmptyCommand)
184!
115
            return;
×
116
        _externalCohortTable = request.ExtractableCohort.ExternalCohortTable;
184✔
117
        _whereSQL = request.ExtractableCohort.WhereSQL();
184✔
118
        _timeSpentValidating = new Stopwatch();
184✔
119
        _timeSpentCalculatingDISTINCT = new Stopwatch();
184✔
120
        _timeSpentBuckettingDates = new Stopwatch();
184✔
121

122
        Request.ColumnsToExtract.Sort(); //ensure they are in the right order so we can record the release identifiers
184✔
123

124
        //if we have a cached builder already
125
        if (request.QueryBuilder == null)
184✔
126
            request.GenerateQueryBuilder();
96✔
127

128
        foreach (var substitution in Request.ReleaseIdentifierSubstitutions)
736✔
129
            _extractionIdentifiersidx.Add(substitution.GetRuntimeName());
184✔
130

131
        UniqueReleaseIdentifiersEncountered = new HashSet<object>();
184✔
132

133
        _catalogue = request.Catalogue;
184✔
134

135
        if (!string.IsNullOrWhiteSpace(_catalogue.ValidatorXML))
184!
136
            ExtractionTimeValidator = new ExtractionTimeValidator(_catalogue, request.ColumnsToExtract);
×
137

138
        //if there is a time periodicity ExtractionInformation (AND! it is among the columns the user selected to be extracted)
139
        if (_catalogue.TimeCoverage_ExtractionInformation_ID != null && request.ColumnsToExtract
184!
140
                .Cast<ExtractableColumn>().Any(c =>
184✔
141
                    c.CatalogueExtractionInformation_ID == _catalogue.TimeCoverage_ExtractionInformation_ID))
184✔
142
            ExtractionTimeTimeCoverageAggregator =
×
143
                new ExtractionTimeTimeCoverageAggregator(_catalogue, request.ExtractableCohort);
×
144
        else
145
            ExtractionTimeTimeCoverageAggregator = null;
184✔
146
    }
184✔
147

148
    private void Initialize(ExtractGlobalsCommand request)
149
    {
150
        GlobalsRequest = request;
44✔
151
    }
44✔
152

153
    public bool WasCancelled => _cancel;
182✔
154

155
    private Stopwatch _timeSpentValidating;
156
    private int _rowsValidated;
157

158
    private Stopwatch _timeSpentCalculatingDISTINCT;
159
    private Stopwatch _timeSpentBuckettingDates;
160
    private int _rowsBucketted;
161

162
    private bool firstChunk = true;
122✔
163
    private bool firstGlobalChunk = true;
122✔
164
    private int _rowsRead;
165

166
    private RowPeeker _peeker = new();
122✔
167

NEW
168
    private static readonly Random random = new Random();
×
169

170
    private static string RandomString(int length)
171
    {
172
        const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
NEW
173
        return new string(Enumerable.Repeat(chars, length)
×
NEW
174
            .Select(s => s[random.Next(s.Length)]).ToArray());
×
175
    }
176

177
    private void CreateCohortTempTable(DbConnection con, IDataLoadEventListener listener)
178
    {
NEW
179
        _uuid = $"#{RandomString(24)}";
×
NEW
180
        var sql = "";
×
NEW
181
        var db = _externalCohortTable.Discover();
×
NEW
182
        switch (db.Server.DatabaseType)
×
183
        {
184
            case DatabaseType.MicrosoftSQLServer:
NEW
185
                sql = $"""
×
NEW
186
                    SELECT *
×
NEW
187
                    INTO {_uuid}
×
NEW
188
                    FROM(
×
NEW
189
                    SELECT * FROM {_externalCohortTable.TableName}
×
NEW
190
                    WHERE {_whereSQL}
×
NEW
191
                    ) as cohortTempTable
×
NEW
192
                """;
×
NEW
193
                break;
×
194
            case DatabaseType.MySql:
NEW
195
                sql = $"""
×
NEW
196
                    CREATE TEMPORARY TABLE {_uuid} ENGINE=MEMORY
×
NEW
197
                    as (SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL})
×
NEW
198
                """;
×
NEW
199
                break;
×
200
            case DatabaseType.Oracle:
NEW
201
                sql= $"""
×
NEW
202
                    CREATE TEMPORARY TABLE {_uuid} SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}
×
NEW
203
                """;
×
NEW
204
                break;
×
205
            case DatabaseType.PostgreSql:
NEW
206
                sql= $"""
×
NEW
207
                    CREATE TEMP TABLE {_uuid} AS
×
NEW
208
                    SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}
×
NEW
209
                """;
×
NEW
210
                break;
×
211
            default:
NEW
212
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Unable to create temporary table for cohort. Original cohort table will be used"));
×
NEW
213
                return;
×
214

215

216
        }
NEW
217
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"About to copy the cohort into a temporary table using the SQL: {sql}"));
×
218

NEW
219
        using var cmd = db.Server.GetCommand(sql, con);
×
NEW
220
        cmd.CommandTimeout = ExecutionTimeout;
×
221
        try
222
        {
NEW
223
            cmd.ExecuteNonQuery();
×
NEW
224
        }
×
NEW
225
        catch (Exception ex) {
×
NEW
226
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Unable to create temporary table for cohort. Original cohort table will be used",ex));
×
NEW
227
            _uuid = null;
×
NEW
228
        }
×
NEW
229
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Cohort successfully copied to temporary table"));
×
230

NEW
231
    }
×
232

233
    public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
234
    {
235
        // we are in the Global Commands case, let's return an empty DataTable (not null)
236
        // so we can trigger the destination to extract the globals docs and sql
237
        if (GlobalsRequest != null)
224✔
238
        {
239
            GlobalsRequest.ElevateState(ExtractCommandState.WaitingForSQLServer);
44✔
240
            if (firstGlobalChunk)
44✔
241
            {
242
                //unless we are checking, start auditing
243
                StartAuditGlobals();
22✔
244

245
                firstGlobalChunk = false;
22✔
246
                return new DataTable(ExtractionDirectory.GLOBALS_DATA_NAME);
22✔
247
            }
248

249
            return null;
22✔
250
        }
251

252
        if (Request == null)
180!
253
            throw new Exception("Component has not been initialized before being asked to GetChunk(s)");
×
254

255
        Request.ElevateState(ExtractCommandState.WaitingForSQLServer);
180✔
256

257
        if (_cancel)
180!
258
            throw new Exception("User cancelled data extraction");
×
259

260
        if (_hostedSource == null)
180✔
261
        {
262
            if (UseTempTablesWhenExtractingCohort)
98!
263
            {
NEW
264
                _con = DatabaseCommandHelper.GetConnection(Request.GetDistinctLiveDatabaseServer().Builder);
×
NEW
265
                _con.Open();
×
NEW
266
                CreateCohortTempTable(_con, listener);
×
267
            }
268
            var cmdSql = GetCommandSQL(listener);
98✔
269
            StartAudit(cmdSql);
98✔
270

271
            if (Request.DatasetBundle.DataSet.DisableExtraction)
98✔
272
                throw new Exception(
2✔
273
                    $"Cannot extract {Request.DatasetBundle.DataSet} because DisableExtraction is set to true");
2✔
274

275
            _hostedSource = UseTempTablesWhenExtractingCohort ? new DbDataCommandDataFlowSource(cmdSql,
96!
276
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
96✔
277
               _con,
96✔
278
                ExecutionTimeout) : new DbDataCommandDataFlowSource(cmdSql,
96✔
279
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
96✔
280
               Request.GetDistinctLiveDatabaseServer().Builder,
96✔
281
                ExecutionTimeout)
96✔
282
                {
96✔
283
                    // If we are running in batches then always allow empty extractions
96✔
284
                    AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
96✔
285
                    BatchSize = BatchSize
96✔
286
                };
96✔
287
        }
288

289
        DataTable chunk = null;
178✔
290

291
        try
292
        {
293
            chunk = _hostedSource.GetChunk(listener, cancellationToken);
178✔
294

295

296
            chunk = _peeker.AddPeekedRowsIfAny(chunk);
178✔
297

298
            if (Request != null && Request.DatasetBundle.DataSet is not null && chunk is not null)
178✔
299
                chunk.TableName = $"{Request.DatasetBundle.DataSet}";
94✔
300

301
            //if we are trying to distinct the records in memory based on release id
302
            if (DistinctStrategy == DistinctStrategy.OrderByAndDistinctInMemory)
178!
303
            {
304
                var releaseIdentifierColumn = Request.ReleaseIdentifierSubstitutions.First().GetRuntimeName();
×
305

306
                if (chunk is { Rows.Count: > 0 })
×
307
                {
308
                    //last release id in the current chunk
309
                    var lastReleaseId = chunk.Rows[^1][releaseIdentifierColumn];
×
310

311
                    _peeker.AddWhile(_hostedSource, r => Equals(r[releaseIdentifierColumn], lastReleaseId), chunk);
×
312
                    chunk = MakeDistinct(chunk, listener, cancellationToken);
×
313
                }
314
            }
315
        }
178✔
316
        catch (AggregateException a)
×
317
        {
318
            if (a.GetExceptionIfExists<TaskCanceledException>() != null)
×
319
                _cancel = true;
×
320

321
            throw;
×
322
        }
323
        catch (Exception e)
×
324
        {
325
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Read from source failed", e));
×
326
        }
×
327

328
        if (cancellationToken.IsCancellationRequested)
178!
329
            throw new Exception("Data read cancelled because our cancellationToken was set, aborting data reading");
×
330

331
        //if the first chunk is null
332
        if (firstChunk && chunk == null && !AllowEmptyExtractions)
178✔
333
            throw new Exception(
2!
334
                $"There is no data to load, query returned no rows, query was:{Environment.NewLine}{_hostedSource.Sql ?? Request.QueryBuilder.SQL}");
2✔
335

336
        //not the first chunk anymore
337
        firstChunk = false;
176✔
338

339
        //data exhausted
340
        if (chunk == null)
176✔
341
        {
342
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
82✔
343
                $"Data exhausted after reading {_rowsRead} rows of data ({UniqueReleaseIdentifiersEncountered.Count} unique release identifiers seen)"));
82✔
344
            if (Request != null)
82✔
345
                Request.CumulativeExtractionResults.DistinctReleaseIdentifiersEncountered =
82!
346
                    Request.IsBatchResume ? -1 : UniqueReleaseIdentifiersEncountered.Count;
82✔
347
            return null;
82✔
348
        }
349

350
        _rowsRead += chunk.Rows.Count;
94✔
351
        //chunk will have datatypes for all the things in the buffer so we can populate our dictionary of facts about what columns/catalogue items have spontaneously changed name/type etc
352
        if (ExtractTimeTransformationsObserved == null)
94✔
353
            GenerateExtractionTransformObservations(chunk);
94✔
354

355

356
        //see if the SqlDataReader has a column with the same name as the ReleaseIdentifierSQL (if so then we can use it to count the number of distinct subjects written out to the csv)
357
        var includesReleaseIdentifier = _extractionIdentifiersidx.Count > 0;
94✔
358

359

360
        //first line - let's see what columns we wrote out
361
        //looks at the buffer and computes any transforms performed on the column
362

363

364
        _timeSpentValidating.Start();
94✔
365
        //build up the validation report (Missing/Wrong/Etc) - this has no mechanical effect on the extracted data just some metadata that goes into a flat file
366
        if (ExtractionTimeValidator != null && Request.IncludeValidation)
94!
367
            try
368
            {
369
                chunk.Columns.Add(ValidationColumnName);
×
370

371
                ExtractionTimeValidator.Validate(chunk, ValidationColumnName);
×
372

373
                _rowsValidated += chunk.Rows.Count;
×
374
                listener.OnProgress(this,
×
375
                    new ProgressEventArgs("Validation", new ProgressMeasurement(_rowsValidated, ProgressType.Records),
×
376
                        _timeSpentValidating.Elapsed));
×
377
            }
×
378
            catch (Exception ex)
×
379
            {
380
                listener.OnNotify(this,
×
381
                    new NotifyEventArgs(ProgressEventType.Error, "Could not validate data chunk", ex));
×
382
                ValidationFailureException = ex;
×
383
                ExtractionTimeValidator = null;
×
384
            }
×
385

386
        _timeSpentValidating.Stop();
94✔
387

388
        _timeSpentBuckettingDates.Start();
94✔
389
        if (ExtractionTimeTimeCoverageAggregator != null)
94!
390
        {
391
            _rowsBucketted += chunk.Rows.Count;
×
392

393
            foreach (DataRow row in chunk.Rows)
×
394
                ExtractionTimeTimeCoverageAggregator.ProcessRow(row);
×
395

396
            listener.OnProgress(this,
×
397
                new ProgressEventArgs("Bucketting Dates", new ProgressMeasurement(_rowsBucketted, ProgressType.Records),
×
398
                    _timeSpentCalculatingDISTINCT.Elapsed));
×
399
        }
400

401
        _timeSpentBuckettingDates.Stop();
94✔
402

403
        _timeSpentCalculatingDISTINCT.Start();
94✔
404
        var pks = new List<DataColumn>();
94✔
405

406
        //record unique release identifiers found
407
        if (includesReleaseIdentifier)
94✔
408
            foreach (var idx in _extractionIdentifiersidx.Distinct().ToList())
376✔
409
            {
410
                var sub = Request.ReleaseIdentifierSubstitutions.FirstOrDefault(s => s.Alias == chunk.Columns[idx].ColumnName);
188✔
411
                if (sub?.ColumnInfo.ExtractionInformations.FirstOrDefault()?.IsPrimaryKey == true)
94!
412
                {
413
                    pks.Add(chunk.Columns[idx]);
22✔
414
                }
415

416
                foreach (DataRow r in chunk.Rows)
6,772✔
417
                {
418
                    if (r[idx] == DBNull.Value)
3,292!
419
                        if (_extractionIdentifiersidx.Count == 1)
×
420
                            throw new Exception(
×
421
                                $"Null release identifier found in extract of dataset {Request.DatasetBundle.DataSet}");
×
422
                        else
423
                            continue; //there are multiple extraction identifiers thats fine if one or two are null
424

425
                    UniqueReleaseIdentifiersEncountered.Add(r[idx]);
3,292✔
426
                }
427

428
                listener.OnProgress(this,
94✔
429
                    new ProgressEventArgs("Calculating Distinct Release Identifiers",
94✔
430
                        new ProgressMeasurement(UniqueReleaseIdentifiersEncountered.Count, ProgressType.Records),
94✔
431
                        _timeSpentCalculatingDISTINCT.Elapsed));
94✔
432
            }
433

434
        _timeSpentCalculatingDISTINCT.Stop();
94✔
435
        pks.AddRange(Request.ColumnsToExtract.Where(static c => ((ExtractableColumn)c).CatalogueExtractionInformation.IsPrimaryKey).Select(static column => ((ExtractableColumn)column).CatalogueExtractionInformation.ToString()).Select(name => chunk.Columns[name]));
1,266✔
436
        chunk.PrimaryKey = pks.ToArray();
94✔
437

438
        return chunk;
94✔
439
    }
440

441
    /// <summary>
442
    /// Makes the current batch ONLY distinct.  This only works if you have a bounded batch (see OrderByAndDistinctInMemory)
443
    /// </summary>
444
    /// <param name="chunk"></param>
445
    /// <param name="listener"></param>
446
    /// <param name="cancellationToken"></param>
447
    /// <returns></returns>
448
    private static DataTable MakeDistinct(DataTable chunk, IDataLoadEventListener listener,
449
        GracefulCancellationToken cancellationToken)
450
    {
451
        var removeDuplicates = new RemoveDuplicates { NoLogging = true };
×
452
        return removeDuplicates.ProcessPipelineData(chunk, listener, cancellationToken);
×
453
    }
454

455
    private void GenerateExtractionTransformObservations(DataTable chunk)
456
    {
457
        ExtractTimeTransformationsObserved = new Dictionary<ExtractableColumn, ExtractTimeTransformationObserved>();
94✔
458

459
        //create the Types dictionary
460
        foreach (ExtractableColumn column in Request.ColumnsToExtract)
2,500✔
461
        {
462
            ExtractTimeTransformationsObserved.Add(column, new ExtractTimeTransformationObserved());
1,156✔
463

464
            //record catalogue information about what it is supposed to be.
465
            if (!column.HasOriginalExtractionInformationVanished())
1,156✔
466
            {
467
                var extractionInformation = column.CatalogueExtractionInformation;
1,156✔
468

469
                //what the catalogue says it is
470
                ExtractTimeTransformationsObserved[column].DataTypeInCatalogue =
1,156✔
471
                    extractionInformation.ColumnInfo.Data_type;
1,156✔
472
                ExtractTimeTransformationsObserved[column].CatalogueItem = extractionInformation.CatalogueItem;
1,156✔
473

474
                //what it actually is
475
                if (chunk.Columns.Contains(column.GetRuntimeName()))
1,156!
476
                {
477
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = true;
1,156✔
478
                    ExtractTimeTransformationsObserved[column].DataTypeObservedInRuntimeBuffer =
1,156✔
479
                        chunk.Columns[column.GetRuntimeName()].DataType;
1,156✔
480
                }
481
                else
482
                {
483
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = false;
×
484
                }
485
            }
486
        }
487
    }
94✔
488

489
    private string GetCommandSQL(IDataLoadEventListener listener)
490
    {
491
        //if the user wants some custom logic for removing identical duplicates
492
        switch (DistinctStrategy)
493
        {
494
            //user doesn't care about identical duplicates
495
            case DistinctStrategy.None:
496
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
14!
497
                break;
14✔
498

499
            //system default behaviour
500
            case DistinctStrategy.SqlDistinct:
501
                break;
502

503
            //user wants to run order by the release ID and resolve duplicates in batches as they are read
504
            case DistinctStrategy.OrderByAndDistinctInMemory:
505

506
                //remove the DISTINCT keyword from the query
507
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
×
508

509
                //find the release identifier substitution (e.g. chi for PROCHI)
510
                var substitution = Request.ReleaseIdentifierSubstitutions.First();
×
511

512
                //add a line at the end of the query to ORDER BY the ReleaseId column (e.g. PROCHI)
513
                var orderBySql = $"ORDER BY {substitution.SelectSQL}";
×
514

515
                // don't add the line if it is already there (e.g. because of Retry)
516
                if (!Request.QueryBuilder.CustomLines.Any(l => string.Equals(l.Text, orderBySql)))
×
517
                    Request.QueryBuilder.AddCustomLine(orderBySql, QueryComponent.Postfix);
×
518

519
                break;
×
520
            default:
521
                throw new ArgumentOutOfRangeException();
×
522
        }
523

524
        var sql = Request.QueryBuilder.SQL;
98✔
525

526
        sql = HackExtractionSQL(sql, listener);
98✔
527

528
        if (ShouldUseHashedJoins())
98✔
529
        {
530
            //use hash joins!
531
            listener.OnNotify(this,
2✔
532
                new NotifyEventArgs(ProgressEventType.Information, "Substituting JOIN for HASH JOIN"));
2✔
533
            sql = sql.Replace(" JOIN ", " HASH JOIN ");
2✔
534
        }
535

536
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
98✔
537
            $"/*Decided on extraction SQL:*/{Environment.NewLine}{sql}"));
98✔
538
        if (UseTempTablesWhenExtractingCohort && _uuid is not null)
98!
539
        {
NEW
540
            sql = sql.Replace(_externalCohortTable.TableName, _uuid);
×
541
        }
542
        return sql;
98✔
543
    }
544

545
    private bool ShouldUseHashedJoins()
546
    {
547
        var dbms = Request?.QueryBuilder?.QuerySyntaxHelper?.DatabaseType;
98!
548

549
        //must be sql server
550
        if (dbms == null || dbms.Value != DatabaseType.MicrosoftSQLServer)
98!
551
            return false;
×
552

553
        // this Catalogue is explicilty marked as never hash join? i.e. its on the exclusion list
554
        if (DoNotUseHashJoinsForCatalogues?.Contains(Request.Catalogue) ?? false)
98!
555
            return false;
×
556

557
        if (UseHashJoins)
98✔
558
            return true;
2✔
559

560
        if (UseHashJoinsForCatalogues != null)
96!
561
            return UseHashJoinsForCatalogues.Contains(Request.Catalogue);
×
562

563
        //user doesn't want to use hash joins
564
        return false;
96✔
565
    }
566

567
    public virtual string HackExtractionSQL(string sql, IDataLoadEventListener listener) => sql;
90✔
568

569
    private void StartAudit(string sql)
570
    {
571
        var dataExportRepo = Request.DataExportRepository;
98✔
572

573
        var previousAudit = dataExportRepo
98✔
574
            .GetAllCumulativeExtractionResultsFor(Request.Configuration, Request.DatasetBundle.DataSet).ToArray();
98✔
575

576
        if (Request.IsBatchResume)
98!
577
        {
578
            var match =
×
579
                previousAudit.FirstOrDefault(a => a.ExtractableDataSet_ID == Request.DatasetBundle.DataSet.ID) ??
×
580
                throw new Exception(
×
581
                    $"Could not find previous CumulativeExtractionResults for dataset {Request.DatasetBundle.DataSet} despite the Request being marked as a batch resume");
×
582
            Request.CumulativeExtractionResults = match;
×
583
        }
584
        else
585
        {
586
            //delete old audit records
587
            foreach (var audit in previousAudit)
216✔
588
                audit.DeleteInDatabase();
10✔
589

590
            var extractionResults = new CumulativeExtractionResults(dataExportRepo, Request.Configuration,
98✔
591
                Request.DatasetBundle.DataSet, sql);
98✔
592

593
            var filterDescriptions =
98✔
594
                RecursivelyListAllFilterNames(
98✔
595
                    Request.Configuration.GetFilterContainerFor(Request.DatasetBundle.DataSet));
98✔
596

597
            extractionResults.FiltersUsed = filterDescriptions.TrimEnd(',');
98✔
598
            extractionResults.SaveToDatabase();
98✔
599

600
            Request.CumulativeExtractionResults = extractionResults;
98✔
601
        }
602
    }
98✔
603

604
    private void StartAuditGlobals()
605
    {
606
        var repo = GlobalsRequest.RepositoryLocator.DataExportRepository;
22✔
607

608
        var previousAudit = repo
22✔
609
            .GetAllObjectsWhere<SupplementalExtractionResults>("ExtractionConfiguration_ID",
22✔
610
                GlobalsRequest.Configuration.ID)
22✔
611
            .Where(c => c.CumulativeExtractionResults_ID == null);
22✔
612

613
        //delete old audit records
614
        foreach (var audit in previousAudit)
44!
615
            audit.DeleteInDatabase();
×
616
    }
22✔
617

618
    private string RecursivelyListAllFilterNames(IContainer filterContainer)
619
    {
620
        if (filterContainer == null)
98✔
621
            return "";
96✔
622

623
        var toReturn = "";
2✔
624

625
        if (filterContainer.GetSubContainers() != null)
2✔
626
            foreach (var subContainer in filterContainer.GetSubContainers())
4!
627
                toReturn += RecursivelyListAllFilterNames(subContainer);
×
628

629
        if (filterContainer.GetFilters() != null)
2✔
630
            foreach (var f in filterContainer.GetFilters())
8✔
631
                toReturn += $"{f.Name},";
2✔
632

633
        return toReturn;
2✔
634
    }
635

636
    public virtual void Dispose(IDataLoadEventListener job, Exception pipelineFailureExceptionIfAny)
637
    {
638
    }
102✔
639

640
    public void Abort(IDataLoadEventListener listener)
641
    {
642
    }
×
643

644
    public virtual DataTable TryGetPreview()
645
    {
646
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
647
            return new DataTable();
×
648

649
        var toReturn = new DataTable();
×
650
        toReturn.BeginLoadData();
×
651
        var server = _catalogue.GetDistinctLiveDatabaseServer(DataAccessContext.DataExport, false);
×
652

653
        using var con = server.GetConnection();
×
654
        con.Open();
×
655

656
        var da = server.GetDataAdapter(Request.QueryBuilder.SQL, con);
×
657

658
        //get up to 1000 records
659
        da.Fill(0, 1000, toReturn);
×
660
        toReturn.EndLoadData();
×
661

662
        con.Close();
×
663

664
        return toReturn;
×
665
    }
×
666

667
    public void PreInitialize(IExtractCommand value, IDataLoadEventListener listener)
668
    {
669
        if (value is ExtractDatasetCommand datasetCommand)
228✔
670
            Initialize(datasetCommand);
184✔
671
        if (value is ExtractGlobalsCommand command)
228✔
672
            Initialize(command);
44✔
673
    }
228✔
674

675
    public virtual void Check(ICheckNotifier notifier)
676
    {
677
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
678
        {
679
            notifier.OnCheckPerformed(new CheckEventArgs(
×
680
                "Request is ExtractDatasetCommand.EmptyCommand, checking will not be carried out",
×
681
                CheckResult.Warning));
×
682
            return;
×
683
        }
684

685
        if (GlobalsRequest != null)
×
686
        {
687
            notifier.OnCheckPerformed(new CheckEventArgs(
×
688
                "Request is for Globals, checking will not be carried out at source", CheckResult.Success));
×
689
            return;
×
690
        }
691

692
        if (Request == null)
×
693
        {
694
            notifier.OnCheckPerformed(new CheckEventArgs("ExtractionRequest has not been set", CheckResult.Fail));
×
695
            return;
×
696
        }
697
    }
×
698
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc