• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 20714560420

05 Jan 2026 11:53AM UTC coverage: 57.198% (-0.2%) from 57.378%
20714560420

push

github

JFriel
update deps

11495 of 21585 branches covered (53.25%)

Branch coverage included in aggregate %.

32571 of 55456 relevant lines covered (58.73%)

17789.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.37
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteDatasetExtractionSource.cs
1
// Copyright (c) The University of Dundee 2018-2025
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using FAnsi;
8
using FAnsi.Discovery.QuerySyntax;
9
using Rdmp.Core.Curation.Data;
10
using Rdmp.Core.DataExport.Data;
11
using Rdmp.Core.DataExport.DataExtraction.Commands;
12
using Rdmp.Core.DataFlowPipeline;
13
using Rdmp.Core.DataFlowPipeline.Requirements;
14
using Rdmp.Core.DataLoad.Engine.Pipeline.Components;
15
using Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
16
using Rdmp.Core.QueryBuilding;
17
using Rdmp.Core.ReusableLibraryCode;
18
using Rdmp.Core.ReusableLibraryCode.Checks;
19
using Rdmp.Core.ReusableLibraryCode.DataAccess;
20
using Rdmp.Core.ReusableLibraryCode.Progress;
21
using System;
22
using System.Collections.Generic;
23
using System.Data;
24
using System.Data.Common;
25
using System.Diagnostics;
26
using System.Linq;
27
using System.Text;
28
using System.Threading.Tasks;
29
using IContainer = Rdmp.Core.Curation.Data.IContainer;
30

31
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
32

33
/// <summary>
34
/// Executes a single Dataset extraction by linking a cohort with a dataset (either core or custom data - See IExtractCommand).  Also calculates the number
35
/// of unique identifiers seen, records row validation failures etc.
36
/// </summary>
37
public class ExecuteDatasetExtractionSource : IPluginDataFlowSource<DataTable>, IPipelineRequirement<IExtractCommand>
38
{
39
    //Request is either for one of these
40
    public ExtractDatasetCommand Request { get; protected set; }
5,094✔
41
    public ExtractGlobalsCommand GlobalsRequest { get; protected set; }
488✔
42

43
    public const string AuditTaskName = "DataExtraction";
44

45
    private readonly List<string> _extractionIdentifiersidx = new();
148✔
46

47
    private bool _cancel;
48

49
    private ICatalogue _catalogue;
50

51
    protected const string ValidationColumnName = "RowValidationResult";
52

53
    public ExtractionTimeValidator ExtractionTimeValidator { get; protected set; }
200✔
54
    public Exception ValidationFailureException { get; protected set; }
×
55

56
    public HashSet<object> UniqueReleaseIdentifiersEncountered { get; set; }
3,906✔
57

58
    public ExtractionTimeTimeCoverageAggregator ExtractionTimeTimeCoverageAggregator { get; set; }
412✔
59

60
    [DemandsInitialization(
61
        "Determines the systems behaviour when an extraction query returns 0 rows.  Default (false) is that an error is reported.  If set to true (ticked) then instead a DataTable with 0 rows but all the correct headers will be generated usually resulting in a headers only 0 line/empty extract file")]
62
    public bool AllowEmptyExtractions { get; set; }
248✔
63

64
    [DemandsInitialization(
65
        "Batch size, number of records to read from source before releasing it into the extraction pipeline",
66
        DefaultValue = 10000, Mandatory = true)]
67
    public int BatchSize { get; set; }
242✔
68

69
    [DemandsInitialization(
70
        "In seconds. Overrides the global timeout for SQL query execution. Use 0 for infinite timeout.",
71
        DefaultValue = 50000, Mandatory = true)]
72
    public int ExecutionTimeout { get; set; }
242✔
73

74
    [DemandsInitialization(@"Determines how the system achieves DISTINCT on extraction.  These include:
75
None - Do not DISTINCT the records, can result in duplication in your extract (not recommended)
76
SqlDistinct - Adds the DISTINCT keyword to the SELECT sql sent to the server
77
OrderByAndDistinctInMemory - Adds an ORDER BY statement to the query and applies the DISTINCT in memory as records are read from the server (this can help when extracting very large data sets where DISTINCT keyword blocks record streaming until all records are ready to go)
78
DistinctByDestinationPKs - Performs a GROUP BY on each batch of records to ensure unique extraction primary key values in the batch"
79
        , DefaultValue = DistinctStrategy.SqlDistinct)]
80
    public DistinctStrategy DistinctStrategy { get; set; }
768✔
81

82

83
    [DemandsInitialization("When DBMS is SqlServer then HASH JOIN should be used instead of regular JOINs")]
84
    public bool UseHashJoins { get; set; }
244✔
85

86
    [DemandsInitialization(
87
        "When DBMS is SqlServer and the extraction is for any of these datasets then HASH JOIN should be used instead of regular JOINs")]
88
    public Catalogue[] UseHashJoinsForCatalogues { get; set; }
242✔
89

90
    [DemandsInitialization(
91
        "Exclusion list.  A collection of Catalogues which will never be considered for HASH JOIN even when UseHashJoins is enabled.  Being on this list takes precedence for a Catalogue even if it is on UseHashJoinsForCatalogues.")]
92
    public Catalogue[] DoNotUseHashJoinsForCatalogues { get; set; }
244✔
93

94
    [DemandsInitialization("When performing an extracton, copy the cohort into a temporary table to improve extraction speed", defaultValue: false)]
95
    public bool UseTempTablesWhenExtractingCohort { get; set; }
466✔
96

97

98
    /// <summary>
99
    /// This is a dictionary containing all the CatalogueItems used in the query, the underlying datatype in the origin database and the
100
    /// actual datatype that was output after the transform operation e.g. a varchar(10) could be converted into a bona fide DateTime which
101
    /// would be an sql Date.  Finally
102
    /// a recommended SqlDbType is passed back.
103
    /// </summary>
104
    public Dictionary<ExtractableColumn, ExtractTimeTransformationObserved> ExtractTimeTransformationsObserved;
105

106
    private DbDataCommandDataFlowSource _hostedSource;
107

108
    private IExternalCohortTable _externalCohortTable;
109
    private string _whereSQL;
110
    private DbConnection _con;
111
    private string _uuid;
112
    private List<string> _knownPKs = new();
148✔
113
    protected virtual void Initialize(ExtractDatasetCommand request)
114
    {
115
        Request = request;
212✔
116

117
        if (request == ExtractDatasetCommand.EmptyCommand)
212!
118
            return;
×
119
        _externalCohortTable = request.ExtractableCohort.ExternalCohortTable;
212✔
120
        _whereSQL = request.ExtractableCohort.WhereSQL();
212✔
121
        _timeSpentValidating = new Stopwatch();
212✔
122
        _timeSpentCalculatingDISTINCT = new Stopwatch();
212✔
123
        _timeSpentBuckettingDates = new Stopwatch();
212✔
124

125
        Request.ColumnsToExtract.Sort(); //ensure they are in the right order so we can record the release identifiers
212✔
126

127
        //if we have a cached builder already
128
        if (request.QueryBuilder == null)
212✔
129
            request.GenerateQueryBuilder();
110✔
130

131
        foreach (var substitution in Request.ReleaseIdentifierSubstitutions)
848✔
132
            _extractionIdentifiersidx.Add(substitution.GetRuntimeName());
212✔
133

134
        UniqueReleaseIdentifiersEncountered = new HashSet<object>();
212✔
135

136
        _catalogue = request.Catalogue;
212✔
137

138
        if (DistinctStrategy == DistinctStrategy.DistinctByDestinationPKs)
212!
139
        {
140
            _knownPKs = _catalogue.CatalogueItems.Where(ci => ci.ExtractionInformation != null && ci.ExtractionInformation.IsPrimaryKey).Select(ci => ci.ColumnInfo.GetRuntimeName()).ToList();
×
141
        }
142
        if (!string.IsNullOrWhiteSpace(_catalogue.ValidatorXML))
212!
143
            ExtractionTimeValidator = new ExtractionTimeValidator(_catalogue, request.ColumnsToExtract);
×
144

145
        //if there is a time periodicity ExtractionInformation (AND! it is among the columns the user selected to be extracted)
146
        if (_catalogue.TimeCoverage_ExtractionInformation_ID != null && request.ColumnsToExtract
212!
147
                .Cast<ExtractableColumn>().Any(c =>
212✔
148
                    c.CatalogueExtractionInformation_ID == _catalogue.TimeCoverage_ExtractionInformation_ID))
212✔
149
            ExtractionTimeTimeCoverageAggregator =
×
150
                new ExtractionTimeTimeCoverageAggregator(_catalogue, request.ExtractableCohort);
×
151
        else
152
            ExtractionTimeTimeCoverageAggregator = null;
212✔
153
    }
212✔
154

155
    private void Initialize(ExtractGlobalsCommand request)
156
    {
157
        GlobalsRequest = request;
68✔
158
    }
68✔
159

160
    public bool WasCancelled => _cancel;
218✔
161

162
    private Stopwatch _timeSpentValidating;
163
    private int _rowsValidated;
164

165
    private Stopwatch _timeSpentCalculatingDISTINCT;
166
    private Stopwatch _timeSpentBuckettingDates;
167
    private int _rowsBucketted;
168

169
    private bool firstChunk = true;
148✔
170
    private bool firstGlobalChunk = true;
148✔
171
    private int _rowsRead;
172

173
    private RowPeeker _peeker = new();
148✔
174

175
    private static readonly Random random = new Random();
×
176

177
    private static string RandomString(int length)
178
    {
179
        const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
180
        return new string(Enumerable.Repeat(chars, length)
×
181
            .Select(s => s[random.Next(s.Length)]).ToArray());
×
182
    }
183

184
    private void CreateCohortTempTable(DbConnection con, IDataLoadEventListener listener)
185
    {
186
        _uuid = $"#{RandomString(24)}";
×
187
        var sql = "";
×
188
        var db = _externalCohortTable.Discover();
×
189
        switch (db.Server.DatabaseType)
×
190
        {
191
            case DatabaseType.MicrosoftSQLServer:
192
                sql = $"""
×
193
                    SELECT *
×
194
                    INTO {_uuid}
×
195
                    FROM(
×
196
                    SELECT * FROM {_externalCohortTable.TableName}
×
197
                    WHERE {_whereSQL}
×
198
                    ) as cohortTempTable
×
199
                """;
×
200
                break;
×
201
            case DatabaseType.MySql:
202
                sql = $"""
×
203
                    CREATE TEMPORARY TABLE {_uuid} ENGINE=MEMORY
×
204
                    as (SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL})
×
205
                """;
×
206
                break;
×
207
            case DatabaseType.Oracle:
208
                sql = $"""
×
209
                    CREATE TEMPORARY TABLE {_uuid} SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}
×
210
                """;
×
211
                break;
×
212
            case DatabaseType.PostgreSql:
213
                sql = $"""
×
214
                    CREATE TEMP TABLE {_uuid} AS
×
215
                    SELECT * FROM {_externalCohortTable.TableName} WHERE {_whereSQL}
×
216
                """;
×
217
                break;
×
218
            default:
219
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, "Unable to create temporary table for cohort. Original cohort table will be used"));
×
220
                return;
×
221

222

223
        }
224
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"About to copy the cohort into a temporary table using the SQL: {sql}"));
×
225

226
        using var cmd = db.Server.GetCommand(sql, con);
×
227
        cmd.CommandTimeout = ExecutionTimeout;
×
228
        try
229
        {
230
            cmd.ExecuteNonQuery();
×
231
        }
×
232
        catch (Exception ex)
×
233
        {
234
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, "Unable to create temporary table for cohort. Original cohort table will be used", ex));
×
235
            _uuid = null;
×
236
        }
×
237
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, "Cohort successfully copied to temporary table"));
×
238

239
    }
×
240

241
    public virtual DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
242
    {
243
        // we are in the Global Commands case, let's return an empty DataTable (not null)
244
        // so we can trigger the destination to extract the globals docs and sql
245
        if (GlobalsRequest != null)
274✔
246
        {
247
            GlobalsRequest.ElevateState(ExtractCommandState.WaitingForSQLServer);
68✔
248
            if (firstGlobalChunk)
68✔
249
            {
250
                //unless we are checking, start auditing
251
                StartAuditGlobals();
34✔
252

253
                firstGlobalChunk = false;
34✔
254
                return new DataTable(ExtractionDirectory.GLOBALS_DATA_NAME);
34✔
255
            }
256

257
            return null;
34✔
258
        }
259

260
        if (Request == null)
206!
261
            throw new Exception("Component has not been initialized before being asked to GetChunk(s)");
×
262

263
        Request.ElevateState(ExtractCommandState.WaitingForSQLServer);
206✔
264

265
        if (_cancel)
206!
266
            throw new Exception("User cancelled data extraction");
×
267

268
        if (_hostedSource == null)
206✔
269
        {
270
            if (UseTempTablesWhenExtractingCohort)
112!
271
            {
272
                _con = DatabaseCommandHelper.GetConnection(Request.GetDistinctLiveDatabaseServer().Builder);
×
273
                _con.Open();
×
274
                CreateCohortTempTable(_con, listener);
×
275
            }
276
            var cmdSql = GetCommandSQL(listener);
112✔
277
            StartAudit(cmdSql);
112✔
278

279
            if (Request.DatasetBundle.DataSet.DisableExtraction)
112✔
280
                throw new Exception(
2✔
281
                    $"Cannot extract {Request.DatasetBundle.DataSet} because DisableExtraction is set to true");
2✔
282

283
            _hostedSource = UseTempTablesWhenExtractingCohort ? new DbDataCommandDataFlowSource(cmdSql,
110!
284
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
110✔
285
               _con,
110✔
286
                ExecutionTimeout)
110✔
287
            {
110✔
288
                AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
110✔
289
                BatchSize = BatchSize
110✔
290
            }
110✔
291
                : new DbDataCommandDataFlowSource(cmdSql,
110✔
292
                $"ExecuteDatasetExtraction {Request.DatasetBundle.DataSet}",
110✔
293
               Request.GetDistinctLiveDatabaseServer().Builder,
110✔
294
                ExecutionTimeout)
110✔
295
                {
110✔
296
                    // If we are running in batches then always allow empty extractions
110✔
297
                    AllowEmptyResultSets = AllowEmptyExtractions || Request.IsBatchResume,
110✔
298
                    BatchSize = BatchSize
110✔
299
                };
110✔
300
        }
301

302
        DataTable chunk = null;
204✔
303

304
        try
305
        {
306
            chunk = _hostedSource.GetChunk(listener, cancellationToken);
204✔
307

308

309
            chunk = _peeker.AddPeekedRowsIfAny(chunk);
204✔
310

311
            if (Request != null && Request.DatasetBundle.DataSet is not null && chunk is not null)
204✔
312
                chunk.TableName = $"{Request.DatasetBundle.DataSet}";
108✔
313

314
            //if we are trying to distinct the records in memory based on release id
315
            if (DistinctStrategy == DistinctStrategy.OrderByAndDistinctInMemory)
204!
316
            {
317
                var releaseIdentifierColumn = Request.ReleaseIdentifierSubstitutions.First().GetRuntimeName();
×
318

319
                if (chunk is { Rows.Count: > 0 })
×
320
                {
321
                    //last release id in the current chunk
322
                    var lastReleaseId = chunk.Rows[^1][releaseIdentifierColumn];
×
323

324
                    _peeker.AddWhile(_hostedSource, r => Equals(r[releaseIdentifierColumn], lastReleaseId), chunk);
×
325
                    chunk = MakeDistinct(chunk, listener, cancellationToken);
×
326
                }
327
            }
328
        }
204✔
329
        catch (AggregateException a)
×
330
        {
331
            if (a.GetExceptionIfExists<TaskCanceledException>() != null)
×
332
                _cancel = true;
×
333

334
            throw;
×
335
        }
336
        catch (Exception e)
×
337
        {
338
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Read from source failed", e));
×
339
        }
×
340

341
        if (cancellationToken.IsCancellationRequested)
204!
342
            throw new Exception("Data read cancelled because our cancellationToken was set, aborting data reading");
×
343

344
        //if the first chunk is null
345
        if (firstChunk && chunk == null && !AllowEmptyExtractions)
204✔
346
            throw new Exception(
2!
347
                $"There is no data to load, query returned no rows, query was:{Environment.NewLine}{_hostedSource.Sql ?? Request.QueryBuilder.SQL}");
2✔
348

349
        //not the first chunk anymore
350
        firstChunk = false;
202✔
351

352
        //data exhausted
353
        if (chunk == null)
202✔
354
        {
355
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
94✔
356
                $"Data exhausted after reading {_rowsRead} rows of data ({UniqueReleaseIdentifiersEncountered.Count} unique release identifiers seen)"));
94✔
357
            if (Request != null)
94✔
358
                Request.CumulativeExtractionResults.DistinctReleaseIdentifiersEncountered =
94!
359
                    Request.IsBatchResume ? -1 : UniqueReleaseIdentifiersEncountered.Count;
94✔
360
            return null;
94✔
361
        }
362

363
        _rowsRead += chunk.Rows.Count;
108✔
364
        //chunk will have datatypes for all the things in the buffer so we can populate our dictionary of facts about what columns/catalogue items have spontaneously changed name/type etc
365
        if (ExtractTimeTransformationsObserved == null)
108✔
366
            GenerateExtractionTransformObservations(chunk);
108✔
367

368

369
        //see if the SqlDataReader has a column with the same name as the ReleaseIdentifierSQL (if so then we can use it to count the number of distinct subjects written out to the csv)
370
        var includesReleaseIdentifier = _extractionIdentifiersidx.Count > 0;
108✔
371

372

373
        //first line - let's see what columns we wrote out
374
        //looks at the buffer and computes any transforms performed on the column
375

376

377
        _timeSpentValidating.Start();
108✔
378
        //build up the validation report (Missing/Wrong/Etc) - this has no mechanical effect on the extracted data just some metadata that goes into a flat file
379
        if (ExtractionTimeValidator != null && Request.IncludeValidation)
108!
380
            try
381
            {
382
                chunk.Columns.Add(ValidationColumnName);
×
383

384
                ExtractionTimeValidator.Validate(chunk, ValidationColumnName);
×
385

386
                _rowsValidated += chunk.Rows.Count;
×
387
                listener.OnProgress(this,
×
388
                    new ProgressEventArgs("Validation", new ProgressMeasurement(_rowsValidated, ProgressType.Records),
×
389
                        _timeSpentValidating.Elapsed));
×
390
            }
×
391
            catch (Exception ex)
×
392
            {
393
                listener.OnNotify(this,
×
394
                    new NotifyEventArgs(ProgressEventType.Error, "Could not validate data chunk", ex));
×
395
                ValidationFailureException = ex;
×
396
                ExtractionTimeValidator = null;
×
397
            }
×
398

399
        _timeSpentValidating.Stop();
108✔
400

401
        _timeSpentBuckettingDates.Start();
108✔
402
        if (ExtractionTimeTimeCoverageAggregator != null)
108!
403
        {
404
            _rowsBucketted += chunk.Rows.Count;
×
405

406
            foreach (DataRow row in chunk.Rows)
×
407
                ExtractionTimeTimeCoverageAggregator.ProcessRow(row);
×
408

409
            listener.OnProgress(this,
×
410
                new ProgressEventArgs("Bucketting Dates", new ProgressMeasurement(_rowsBucketted, ProgressType.Records),
×
411
                    _timeSpentCalculatingDISTINCT.Elapsed));
×
412
        }
413

414
        _timeSpentBuckettingDates.Stop();
108✔
415

416
        _timeSpentCalculatingDISTINCT.Start();
108✔
417

418

419
        if (DistinctStrategy == DistinctStrategy.DistinctByDestinationPKs && _knownPKs.Any())
108!
420
        {
421
            var columnNames = _knownPKs;
×
422
            Func<DataRow, String> groupingFunction = (DataRow dr) => GroupData(dr, _knownPKs.ToArray());
×
423

424
            chunk = chunk.AsEnumerable()
×
425
                        .GroupBy(groupingFunction)
×
426
                        .Select(g => g.First())
×
427
                   .CopyToDataTable();
×
428
        }
429

430
        var pks = new List<DataColumn>();
108✔
431

432
        //record unique release identifiers found
433
        if (includesReleaseIdentifier)
108✔
434
            foreach (var idx in _extractionIdentifiersidx.Distinct().ToList())
432✔
435
            {
436
                var sub = Request.ReleaseIdentifierSubstitutions.FirstOrDefault(s => s.Alias == chunk.Columns[idx].ColumnName);
216✔
437
                if (sub?.ColumnInfo.ExtractionInformations.FirstOrDefault()?.IsPrimaryKey == true)
108!
438
                {
439
                    pks.Add(chunk.Columns[idx]);
30✔
440
                }
441

442
                foreach (DataRow r in chunk.Rows)
6,828✔
443
                {
444
                    if (r[idx] == DBNull.Value)
3,306!
445
                        if (_extractionIdentifiersidx.Count == 1)
×
446
                            throw new Exception(
×
447
                                $"Null release identifier found in extract of dataset {Request.DatasetBundle.DataSet}");
×
448
                        else
449
                            continue; //there are multiple extraction identifiers that's fine if one or two are null
450

451
                    UniqueReleaseIdentifiersEncountered.Add(r[idx]);
3,306✔
452
                }
453

454
                listener.OnProgress(this,
108✔
455
                    new ProgressEventArgs("Calculating Distinct Release Identifiers",
108✔
456
                        new ProgressMeasurement(UniqueReleaseIdentifiersEncountered.Count, ProgressType.Records),
108✔
457
                        _timeSpentCalculatingDISTINCT.Elapsed));
108✔
458
            }
459

460
        _timeSpentCalculatingDISTINCT.Stop();
108✔
461
        pks.AddRange(Request.ColumnsToExtract.Where(static c => ((ExtractableColumn)c).CatalogueExtractionInformation.IsPrimaryKey).Select(static column => ((ExtractableColumn)column).CatalogueExtractionInformation.ToString()).Select(name => chunk.Columns[name]));
1,740✔
462
        chunk.PrimaryKey = pks.ToArray();
108✔
463

464
        return chunk;
108✔
465
    }
466

467
    /// <summary>
468
    /// Makes the current batch ONLY distinct.  This only works if you have a bounded batch (see OrderByAndDistinctInMemory)
469
    /// </summary>
470
    /// <param name="chunk"></param>
471
    /// <param name="listener"></param>
472
    /// <param name="cancellationToken"></param>
473
    /// <returns></returns>
474
    private static DataTable MakeDistinct(DataTable chunk, IDataLoadEventListener listener,
475
        GracefulCancellationToken cancellationToken)
476
    {
477
        var removeDuplicates = new RemoveDuplicates { NoLogging = true };
×
478
        return removeDuplicates.ProcessPipelineData(chunk, listener, cancellationToken);
×
479
    }
480

481
    private void GenerateExtractionTransformObservations(DataTable chunk)
482
    {
483
        ExtractTimeTransformationsObserved = new Dictionary<ExtractableColumn, ExtractTimeTransformationObserved>();
108✔
484

485
        //create the Types dictionary
486
        foreach (ExtractableColumn column in Request.ColumnsToExtract)
3,448✔
487
        {
488
            ExtractTimeTransformationsObserved.Add(column, new ExtractTimeTransformationObserved());
1,616✔
489

490
            //record catalogue information about what it is supposed to be.
491
            if (!column.HasOriginalExtractionInformationVanished())
1,616✔
492
            {
493
                var extractionInformation = column.CatalogueExtractionInformation;
1,616✔
494

495
                //what the catalogue says it is
496
                ExtractTimeTransformationsObserved[column].DataTypeInCatalogue =
1,616✔
497
                    extractionInformation.ColumnInfo.Data_type;
1,616✔
498
                ExtractTimeTransformationsObserved[column].CatalogueItem = extractionInformation.CatalogueItem;
1,616✔
499

500
                //what it actually is
501
                if (chunk.Columns.Contains(column.GetRuntimeName()))
1,616!
502
                {
503
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = true;
1,616✔
504
                    ExtractTimeTransformationsObserved[column].DataTypeObservedInRuntimeBuffer =
1,616✔
505
                        chunk.Columns[column.GetRuntimeName()].DataType;
1,616✔
506
                }
507
                else
508
                {
509
                    ExtractTimeTransformationsObserved[column].FoundAtExtractTime = false;
×
510
                }
511
            }
512
        }
513
    }
108✔
514

515
    private string GetCommandSQL(IDataLoadEventListener listener)
516
    {
517
        //if the user wants some custom logic for removing identical duplicates
518
        switch (DistinctStrategy)
519
        {
520
            //user doesn't care about identical duplicates
521
            case DistinctStrategy.None:
522
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
14!
523
                break;
14✔
524

525
            //system default behaviour
526
            case DistinctStrategy.DistinctByDestinationPKs:
527
            case DistinctStrategy.SqlDistinct:
528
                break;
529

530
            //user wants to run order by the release ID and resolve duplicates in batches as they are read
531
            case DistinctStrategy.OrderByAndDistinctInMemory:
532

533
                //remove the DISTINCT keyword from the query
534
                ((QueryBuilder)Request.QueryBuilder).SetLimitationSQL("");
×
535

536
                //find the release identifier substitution (e.g. chi for PROCHI)
537
                var substitution = Request.ReleaseIdentifierSubstitutions.First();
×
538

539
                //add a line at the end of the query to ORDER BY the ReleaseId column (e.g. PROCHI)
540
                var orderBySql = $"ORDER BY {substitution.SelectSQL}";
×
541

542
                // don't add the line if it is already there (e.g. because of Retry)
543
                if (!Request.QueryBuilder.CustomLines.Any(l => string.Equals(l.Text, orderBySql)))
×
544
                    Request.QueryBuilder.AddCustomLine(orderBySql, QueryComponent.Postfix);
×
545

546
                break;
×
547
            default:
548
                throw new ArgumentOutOfRangeException();
×
549
        }
550

551
        var sql = Request.QueryBuilder.SQL;
112✔
552

553
        sql = HackExtractionSQL(sql, listener);
112✔
554

555
        if (ShouldUseHashedJoins())
112✔
556
        {
557
            //use hash joins!
558
            listener.OnNotify(this,
2✔
559
                new NotifyEventArgs(ProgressEventType.Information, "Substituting JOIN for HASH JOIN"));
2✔
560
            sql = sql.Replace(" JOIN ", " HASH JOIN ");
2✔
561
        }
562

563
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
112✔
564
            $"/*Decided on extraction SQL:*/{Environment.NewLine}{sql}"));
112✔
565
        if (UseTempTablesWhenExtractingCohort && _uuid is not null)
112!
566
        {
567
            sql = sql.Replace(_externalCohortTable.TableName, _uuid);
×
568
        }
569
        return sql;
112✔
570
    }
571

572
    private bool ShouldUseHashedJoins()
573
    {
574
        var dbms = Request?.QueryBuilder?.QuerySyntaxHelper?.DatabaseType;
112!
575

576
        //must be sql server
577
        if (dbms == null || dbms.Value != DatabaseType.MicrosoftSQLServer)
112!
578
            return false;
×
579

580
        // this Catalogue is explicitly marked as never hash join? i.e. its on the exclusion list
581
        if (DoNotUseHashJoinsForCatalogues?.Contains(Request.Catalogue) ?? false)
112!
582
            return false;
×
583

584
        if (UseHashJoins)
112✔
585
            return true;
2✔
586

587
        if (UseHashJoinsForCatalogues != null)
110!
588
            return UseHashJoinsForCatalogues.Contains(Request.Catalogue);
×
589

590
        //user doesn't want to use hash joins
591
        return false;
110✔
592
    }
593

594
    public virtual string HackExtractionSQL(string sql, IDataLoadEventListener listener) => sql;
104✔
595

596
    private void StartAudit(string sql)
597
    {
598
        var dataExportRepo = Request.DataExportRepository;
112✔
599

600
        var previousAudit = dataExportRepo
112✔
601
            .GetAllCumulativeExtractionResultsFor(Request.Configuration, Request.DatasetBundle.DataSet).ToArray();
112✔
602

603
        if (Request.IsBatchResume)
112!
604
        {
605
            var match =
×
606
                previousAudit.FirstOrDefault(a => a.ExtractableDataSet_ID == Request.DatasetBundle.DataSet.ID) ??
×
607
                throw new Exception(
×
608
                    $"Could not find previous CumulativeExtractionResults for dataset {Request.DatasetBundle.DataSet} despite the Request being marked as a batch resume");
×
609
            Request.CumulativeExtractionResults = match;
×
610
        }
611
        else
612
        {
613
            //delete old audit records
614
            foreach (var audit in previousAudit)
256✔
615
                audit.DeleteInDatabase();
16✔
616

617
            var extractionResults = new CumulativeExtractionResults(dataExportRepo, Request.Configuration,
112✔
618
                Request.DatasetBundle.DataSet, sql);
112✔
619

620
            var filterDescriptions =
112✔
621
                RecursivelyListAllFilterNames(
112✔
622
                    Request.Configuration.GetFilterContainerFor(Request.DatasetBundle.DataSet));
112✔
623

624
            extractionResults.FiltersUsed = filterDescriptions.TrimEnd(',');
112✔
625
            extractionResults.SaveToDatabase();
112✔
626

627
            Request.CumulativeExtractionResults = extractionResults;
112✔
628
        }
629
    }
112✔
630

631
    private void StartAuditGlobals()
632
    {
633
        var repo = GlobalsRequest.RepositoryLocator.DataExportRepository;
34✔
634

635
        var previousAudit = repo
34✔
636
            .GetAllObjectsWhere<SupplementalExtractionResults>("ExtractionConfiguration_ID",
34✔
637
                GlobalsRequest.Configuration.ID)
34✔
638
            .Where(c => c.CumulativeExtractionResults_ID == null);
34✔
639

640
        //delete old audit records
641
        foreach (var audit in previousAudit)
68!
642
            audit.DeleteInDatabase();
×
643
    }
34✔
644

645
    private string RecursivelyListAllFilterNames(IContainer filterContainer)
646
    {
647
        if (filterContainer == null)
112✔
648
            return "";
110✔
649

650
        var toReturn = "";
2✔
651

652
        if (filterContainer.GetSubContainers() != null)
2✔
653
            foreach (var subContainer in filterContainer.GetSubContainers())
4!
654
                toReturn += RecursivelyListAllFilterNames(subContainer);
×
655

656
        if (filterContainer.GetFilters() != null)
2✔
657
            foreach (var f in filterContainer.GetFilters())
8✔
658
                toReturn += $"{f.Name},";
2✔
659

660
        return toReturn;
2✔
661
    }
662

663
    public virtual void Dispose(IDataLoadEventListener job, Exception pipelineFailureExceptionIfAny)
664
    {
665
    }
128✔
666

667
    public void Abort(IDataLoadEventListener listener)
668
    {
669
    }
×
670

671
    public virtual DataTable TryGetPreview()
672
    {
673
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
674
            return new DataTable();
×
675

676
        var toReturn = new DataTable();
×
677
        toReturn.BeginLoadData();
×
678
        var server = _catalogue.GetDistinctLiveDatabaseServer(DataAccessContext.DataExport, false);
×
679

680
        using var con = server.GetConnection();
×
681
        con.Open();
×
682

683
        var da = server.GetDataAdapter(Request.QueryBuilder.SQL, con);
×
684

685
        //get up to 1000 records
686
        da.Fill(0, 1000, toReturn);
×
687
        toReturn.EndLoadData();
×
688

689
        con.Close();
×
690

691
        return toReturn;
×
692
    }
×
693

694
    public void PreInitialize(IExtractCommand value, IDataLoadEventListener listener)
695
    {
696
        if (value is ExtractDatasetCommand datasetCommand)
280✔
697
            Initialize(datasetCommand);
212✔
698
        if (value is ExtractGlobalsCommand command)
280✔
699
            Initialize(command);
68✔
700
    }
280✔
701

702
    public virtual void Check(ICheckNotifier notifier)
703
    {
704
        if (Request == ExtractDatasetCommand.EmptyCommand)
×
705
        {
706
            notifier.OnCheckPerformed(new CheckEventArgs(
×
707
                "Request is ExtractDatasetCommand.EmptyCommand, checking will not be carried out",
×
708
                CheckResult.Warning));
×
709
            return;
×
710
        }
711

712
        if (GlobalsRequest != null)
×
713
        {
714
            notifier.OnCheckPerformed(new CheckEventArgs(
×
715
                "Request is for Globals, checking will not be carried out at source", CheckResult.Success));
×
716
            return;
×
717
        }
718

719
        if (Request == null)
×
720
        {
721
            notifier.OnCheckPerformed(new CheckEventArgs("ExtractionRequest has not been set", CheckResult.Fail));
×
722
            return;
×
723
        }
724
    }
×
725

726
    private static String GroupData(DataRow dataRow, String[] columnNames)
727
    {
728

729
        StringBuilder stringBuilder = new StringBuilder();
×
730
        stringBuilder.Remove(0, stringBuilder.Length);
×
731
        foreach (String column in columnNames)
×
732
        {
733
            stringBuilder.Append(dataRow[column].ToString());
×
734
        }
735
        return stringBuilder.ToString();
×
736

737
    }
738
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc