• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 27198235588

09 Jun 2026 09:53AM UTC coverage: 57.016% (+0.05%) from 56.971%
27198235588

Pull #2343

github

JFriel
update codeql
Pull Request #2343: Task/rdmp 376 extraction structural changes

11582 of 21847 branches covered (53.01%)

Branch coverage included in aggregate %.

31 of 60 new or added lines in 3 files covered. (51.67%)

152 existing lines in 5 files now uncovered.

32702 of 55823 relevant lines covered (58.58%)

18085.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.58
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/ExecuteFullExtractionToDatabaseMSSql.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using Amazon.Auth.AccessControlPolicy;
8
using FAnsi.Discovery;
9
using NPOI.SS.Formula.Functions;
10
using Rdmp.Core.CommandExecution;
11
using Rdmp.Core.Curation.Data;
12
using Rdmp.Core.DataExport.Data;
13
using Rdmp.Core.DataExport.DataExtraction.Commands;
14
using Rdmp.Core.DataExport.DataExtraction.UserPicks;
15
using Rdmp.Core.DataExport.DataRelease.Pipeline;
16
using Rdmp.Core.DataExport.DataRelease.Potential;
17
using Rdmp.Core.DataFlowPipeline;
18
using Rdmp.Core.DataLoad.Engine.Job.Scheduling;
19
using Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
20
using Rdmp.Core.DataLoad.Triggers;
21
using Rdmp.Core.DataLoad.Triggers.Exceptions;
22
using Rdmp.Core.DataLoad.Triggers.Implementations;
23
using Rdmp.Core.MapsDirectlyToDatabaseTable;
24
using Rdmp.Core.QueryBuilding;
25
using Rdmp.Core.Repositories;
26
using Rdmp.Core.ReusableLibraryCode;
27
using Rdmp.Core.ReusableLibraryCode.Checks;
28
using Rdmp.Core.ReusableLibraryCode.DataAccess;
29
using Rdmp.Core.ReusableLibraryCode.Progress;
30
using System;
31
using System.Data;
32
using System.Diagnostics;
33
using System.IO;
34
using System.Linq;
35
using System.Text.RegularExpressions;
36
using YamlDotNet.Core;
37

38
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations;
39

40
/// <summary>
41
/// Alternate extraction pipeline destination in which the DataTable containing the extracted dataset is written to an Sql Server database
42
/// </summary>
43
public class ExecuteFullExtractionToDatabaseMSSql : ExtractionDestination
44
{
45
    [DemandsInitialization(
46
        "External server to create the extraction into, a new database will be created for the project based on the naming pattern provided",
47
        Mandatory = true)]
48
    public IExternalDatabaseServer TargetDatabaseServer { get; set; }
370✔
49

50
    [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
51
         $p - Project Name ('e.g. My Project')
52
         $n - Project Number (e.g. 234)
53
         $t - Master Ticket (e.g. 'LINK-1234')
54
         $r - Request Ticket (e.g. 'LINK-1234')
55
         $l - Release Ticket (e.g. 'LINK-1234')
56
         $e - Extraction Configuration Id (e.g. 14)
57
         ", Mandatory = true, DefaultValue = "Proj_$n_$l")]
58
    public string DatabaseNamingPattern { get; set; }
256✔
59

60
    [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
61
         $p - Project Name ('e.g. My Project')
62
         $n - Project Number (e.g. 234)
63
         $c - Configuration Name (e.g. 'Cases')
64
         $d - Dataset name (e.g. 'Prescribing')
65
         $a - Dataset acronym (e.g. 'Presc') 
66
         $e - Extraction Configuration Id (e.g. 14)
67
         You must have either $a or $d
68
         ", Mandatory = true, DefaultValue = "$c_$d")]
69
    public string TableNamingPattern { get; set; }
138✔
70

71
    [DemandsInitialization(
72
        @"If the extraction fails half way through AND the destination table was created during the extraction then the table will be dropped from the destination rather than being left in a half loaded state ",
73
        defaultValue: true)]
74
    public bool DropTableIfLoadFails { get; set; }
68✔
75

76
    [DemandsInitialization(DataTableUploadDestination.AlterTimeout_Description, DefaultValue = 300)]
77
    public int AlterTimeout { get; set; }
102✔
78

79
    [DemandsInitialization("By applying the primary keys after writing the data, it ensures all data is extracted. Disabling this configuration may improve performance but will quickly raise issues with poorly keyed data.", DefaultValue = true)]
80
    public bool WriteDataBeforeApplyingPrimaryKeys { get; set; }
102✔
81

82
    [DemandsInitialization(
83
        "True to copy the column collations from the source database when creating the destination database.  Only works if both the source and destination have the same DatabaseType.  Excludes columns which feature a transform as part of extraction.",
84
        DefaultValue = false)]
85
    public bool CopyCollations { get; set; }
1,288✔
86

87
    [DemandsInitialization(
88
        "True to always drop the destination database table(s) from the destination if they already existed",
89
        DefaultValue = false)]
90
    public bool AlwaysDropExtractionTables { get; set; }
100✔
91

92
    [DemandsInitialization(
93
        "True to apply a distincting operation to the final table when using an ExtractionProgress.  This prevents data duplication from failed batch resumes.",
94
        DefaultValue = true)]
95
    public bool MakeFinalTableDistinctWhenBatchResuming { get; set; } = true;
146✔
96

97

98
    [DemandsInitialization("If this extraction has already been run, it will append the extraction data into the database. There is no duplication protection with this functionality.")]
99
    public bool AppendDataIfTableExists { get; set; } = false;
136✔
100

101
    [DemandsInitialization("If checked, a column names 'extraction_timestamp' will be included in the extraction that denotes the time the record was added to the extraction.")]
102
    public bool IncludeTimeStamp { get; set; } = false;
102✔
103

104

105
    [DemandsInitialization("If checked, indexed will be created using the primary keys specified")]
106
    public bool IndexTables { get; set; } = true;
180✔
107

108
    [DemandsInitialization(@"How do you want to name the created index, use the following tokens if you need them:   
109
         $p - Project Name ('e.g. My Project')
110
         $n - Project Number (e.g. 234)
111
         $c - Configuration Name (e.g. 'Cases')
112
         $d - Dataset name (e.g. 'Prescribing')
113
         $a - Dataset acronym (e.g. 'Presc') 
114
         $e - Extraction Configuration Id (e.g. 14)
115
         You must have either $a or $d
116
         ", DefaultValue = "Index_$c_$d")]
117
    public string IndexNamingPattern { get; set; }
102✔
118

119
    [DemandsInitialization("An optional list of columns to index on e.g \"Column1, Column2\"")]
120
    public string UserDefinedIndex { get; set; }
102✔
121

122

123
    [DemandsInitialization("When writing to an existing database, will update records and store historical versions via a view", DefaultValue = false)]
124
    public bool UseArchiveTrigger { get; set; }
124✔
125

126
    private DiscoveredDatabase _destinationDatabase;
127
    private DataTableUploadDestination _destination;
128

129
    private bool _tableDidNotExistAtStartOfLoad;
130
    private bool _isTableAlreadyNamed;
131
    private DataTable _toProcess;
132
    private IBasicActivateItems _activator;
133

134
    public ExecuteFullExtractionToDatabaseMSSql() : base(false)
78✔
135
    {
136
    }
78✔
137

138
    public override DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener job,
139
        GracefulCancellationToken cancellationToken)
140
    {
141
        _destinationDatabase = GetDestinationDatabase(job);
68✔
142
        return base.ProcessPipelineData(toProcess, job, cancellationToken);
68✔
143
    }
144

145
    protected override void Open(DataTable toProcess, IDataLoadEventListener job,
146
        GracefulCancellationToken cancellationToken)
147
    {
148
        _toProcess = toProcess;
34✔
149

150
        //give the data table the correct name
151
        if (_toProcess.ExtendedProperties.ContainsKey("ProperlyNamed") &&
34!
152
            _toProcess.ExtendedProperties["ProperlyNamed"].Equals(true))
34✔
153
            _isTableAlreadyNamed = true;
×
154

155
        _toProcess.TableName = GetTableName();
34✔
156

157
        _destination = PrepareDestination(job, _toProcess);
34✔
158
        OutputFile = _toProcess.TableName;
34✔
159
    }
34✔
160

161
    protected override void WriteRows(DataTable toProcess, IDataLoadEventListener job,
162
        GracefulCancellationToken cancellationToken, Stopwatch stopwatch)
163
    {
164
        // empty batches are allowed when using batch/resume
165
        if (toProcess.Rows.Count == 0 && _request.IsBatchResume) return;
34!
166

167
        if (_request.IsBatchResume) _destination.AllowLoadingPopulatedTables = true;
34!
168

169
        _destination.ProcessPipelineData(toProcess, job, cancellationToken);
34✔
170

171
        LinesWritten += toProcess.Rows.Count;
32✔
172
    }
32✔
173

174

175
    private bool hasStructuralChanges(DataTable source, DiscoveredTable destination)
176
    {
177
        var sourceColumns = source.Columns.Cast<DataColumn>().Select(c => c.ColumnName).ToList();
640✔
178
        var destinationColumns = destination.DiscoverColumns().Select(c => c.GetRuntimeName()).ToList();
646✔
179
        return !sourceColumns.All(destinationColumns.Contains) || !destinationColumns.All(sourceColumns.Contains);
16!
180
    }
181

182

183
    private DataTableUploadDestination PrepareDestination(IDataLoadEventListener listener, DataTable toProcess)
184
    {
185
        //see if the user has entered an extraction server/database
186
        if (TargetDatabaseServer == null)
34!
187
            throw new Exception(
×
188
                "TargetDatabaseServer (the place you want to extract the project data to) property has not been set!");
×
189

190
        try
191
        {
192
            if (!_destinationDatabase.Exists())
34!
193
                _destinationDatabase.Create();
×
194

195
            if (_request is ExtractGlobalsCommand)
34!
196
                return null;
×
197

198
            var tblName = _toProcess.TableName;
34✔
199

200
            //See if table already exists on the server (likely to cause problems including duplication, schema changes in configuration etc)
201
            var existing = _destinationDatabase.ExpectTable(tblName);
34✔
202
            if (existing.Exists())
34✔
203
            {
204
                var hasPKs = existing.DiscoverColumns().Any(col => col.IsPrimaryKey);
108✔
205
                TriggerImplementerFactory triggerFactory = new TriggerImplementerFactory(FAnsi.DatabaseType.MicrosoftSQLServer);
16✔
206
                var implementor = triggerFactory.Create(existing);
16✔
207
                bool triggerPresent;
208
                try
209
                {
210
                    triggerPresent = implementor.GetTriggerStatus() == DataLoad.Triggers.TriggerStatus.Enabled;
16✔
211
                }
16✔
NEW
212
                catch (TriggerMissingException)
×
213
                {
NEW
214
                    triggerPresent = false;
×
NEW
215
                }
×
216
                if (!AlwaysDropExtractionTables)
16✔
217
                {
218
                    //check the PKs are the same
219
                    var remotePKs = existing.DiscoverColumns().Where(col => col.IsPrimaryKey).Select(col => col.GetRuntimeName()).ToList();
662✔
220
                    var rdmpPKs = toProcess.PrimaryKey.Cast<DataColumn>().Select(col => col.ColumnName).ToList();
32✔
221
                    if (!remotePKs.All(rdmpPKs.Contains) || remotePKs.Count != rdmpPKs.Count)
16!
222
                    {
223
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error,
×
224
                            $"""
×
225
                        Table {existing.GetFullyQualifiedName()} already exists and has different PKs to the source table.                            
×
226
                        Source PKs: {string.Join(", ", rdmpPKs)}
×
227
                        Destination PKs: {string.Join(", ", remotePKs)}
×
228
                        """));
×
NEW
229
                        return null;//todo this error could be better
×
230
                    }
231
                    if (hasStructuralChanges(toProcess, existing))
16✔
232
                    {
233
                        var sourceColumns = toProcess.Columns.Cast<DataColumn>().Select(c => c.ColumnName).ToList();
240✔
234
                        var destinationColumns = existing.DiscoverColumns().Select(c => c.GetRuntimeName()).ToList();
246✔
235

236
                        if (triggerPresent && destinationColumns.Except(sourceColumns).Where(c => !SpecialFieldNames.IsHicPrefixed(c)).Any())//only mess about with column removal if there is an archive trigger
12!
237
                        {
238

239
                            //move everything into the archive - do this by updating the HIC_validfrom
NEW
240
                            var sql = $"UPDATE {existing.GetFullyQualifiedName()} set {SpecialFieldNames.ValidFrom} = GETDATE()";
×
NEW
241
                            using var con = _destinationDatabase.Server.GetConnection();
×
NEW
242
                            con.Open();
×
NEW
243
                            using var cmd = _destinationDatabase.Server.GetCommand(sql, con);
×
NEW
244
                            cmd.CommandTimeout = 30000;
×
NEW
245
                            cmd.ExecuteNonQuery();
×
246

NEW
247
                            var removedColumns = destinationColumns.Except(sourceColumns).Where(c => !SpecialFieldNames.IsHicPrefixed(c));
×
NEW
248
                            foreach (var column in removedColumns.Select(c => existing.DiscoverColumn(c)))
×
249
                            {
NEW
250
                                existing.DropColumn(column);
×
251
                            }
NEW
252
                            string triggerProblems = "";
×
NEW
253
                            string triggerOK = "";
×
NEW
254
                            implementor.DropTrigger(out triggerProblems, out triggerOK);
×
NEW
255
                            if (triggerProblems != "")
×
256
                            {
NEW
257
                                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, triggerProblems));
×
258
                            }
259

NEW
260
                            existing = _destinationDatabase.ExpectTable(tblName);
×
NEW
261
                            implementor = triggerFactory.Create(existing);
×
262
                            try
263
                            {
NEW
264
                                triggerPresent = implementor.GetTriggerStatus() == DataLoad.Triggers.TriggerStatus.Enabled;
×
NEW
265
                            }
×
NEW
266
                            catch (TriggerMissingException)
×
267
                            {
NEW
268
                                triggerPresent = false;
×
NEW
269
                            }
×
270
                        }
271
                    }
272

273
                }
274

275
                if (_request.IsBatchResume)
16!
276
                {
277
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
×
UNCOV
278
                        $"Table {existing.GetFullyQualifiedName()} already exists but it IsBatchResume so no problem."));
×
279
                }
280
                else if (AlwaysDropExtractionTables)
16!
281
                {
282
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
283
                        $"Table {existing.GetFullyQualifiedName()} already exists, dropping because setting {nameof(AlwaysDropExtractionTables)} is on"));
×
UNCOV
284
                    existing.Drop();
×
285

286
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
UNCOV
287
                        $"Table {existing.GetFullyQualifiedName()} was dropped"));
×
288

289
                    // since we dropped it we should treat it as if it was never there to begin with
UNCOV
290
                    _tableDidNotExistAtStartOfLoad = true;
×
291
                }
292
                else if (UseArchiveTrigger && hasPKs)
16✔
293
                {
294
                    //check the columns are correct, we might have added some
295
                    var existingColumns = existing.DiscoverColumns();
6✔
296
                    var existingColumnNames = existingColumns.Select(ec => ec.GetRuntimeName());
4,686✔
297
                    var toProcessColumnNames = toProcess.Columns.Cast<DataColumn>().Select(col => col.ColumnName);
240✔
298
                    var newColumns = toProcessColumnNames.Where(c => !existingColumnNames.Contains(c));
240✔
299
                    if (newColumns.Any())
6!
300
                    {
301
                        var archiveTable = _destinationDatabase.ExpectTable(tblName + "_Archive");
×
UNCOV
302
                        if (archiveTable.Exists())
×
303
                        {
UNCOV
304
                            foreach (var column in newColumns)
×
305
                            {
306
                                existing.AddColumn(column, new TypeGuesser.DatabaseTypeRequest(toProcess.Columns[column].DataType), true, 30000);
×
NEW
307
                                if (archiveTable.DiscoverColumns().All(col => col.GetRuntimeName() != column))
×
308
                                {
NEW
309
                                    archiveTable.AddColumn(column, new TypeGuesser.DatabaseTypeRequest(toProcess.Columns[column].DataType), true, 30000);
×
310
                                }
311
                            }
NEW
312
                            if (triggerPresent)
×
313
                            {
314
                                string triggerProblems = "";
×
315
                                string triggerOK = "";
×
316
                                implementor.DropTrigger(out triggerProblems, out triggerOK);
×
UNCOV
317
                                if (triggerProblems != "")
×
318
                                {
UNCOV
319
                                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, triggerProblems));
×
320
                                }
321

322
                                existing = _destinationDatabase.ExpectTable(tblName);
×
323
                                implementor = triggerFactory.Create(existing);
×
NEW
324
                                triggerPresent = false;
×
325
                            }
326
                        }
327
                    }
328

329
                    if (!triggerPresent)
6!
330
                    {
UNCOV
331
                        implementor.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
×
332
                    }
333

334
                }
335
                else
336
                {
337
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
10✔
338
                        $"A table called {tblName} already exists on server {TargetDatabaseServer}, data load might crash if it is populated and/or has an incompatible schema"));
10✔
339
                }
340
            }
341
            else
342
            {
343
                _tableDidNotExistAtStartOfLoad = true;
18✔
344
            }
345
        }
34✔
UNCOV
346
        catch (Exception e)
×
347
        {
348
            //Probably the database didn't exist or the credentials were wrong or something
349
            listener.OnNotify(this,
×
350
                new NotifyEventArgs(ProgressEventType.Error,
×
351
                    "Failed to inspect destination for already existing datatables", e));
×
UNCOV
352
        }
×
353

354
        _destination = new DataTableUploadDestination(((IExtractDatasetCommand)_request).ExtractableCohort.ExternalCohortTable);
34✔
355

356
        PrimeDestinationTypesBasedOnCatalogueTypes(listener, toProcess);
34✔
357

358
        _destination.AllowResizingColumnsAtUploadTime = true;
34✔
359
        _destination.AlterTimeout = AlterTimeout;
34✔
360
        _destination.WriteDataBeforeApplyingPrimaryKeys = WriteDataBeforeApplyingPrimaryKeys;
34✔
361
        _destination.AppendDataIfTableExists = AppendDataIfTableExists;
34✔
362
        _destination.IncludeTimeStamp = IncludeTimeStamp;
34✔
363
        _destination.UseTrigger = AppendDataIfTableExists;
34✔
364
        _destination.IndexTables = IndexTables;
34✔
365
        _destination.UseTrigger = UseArchiveTrigger;
34✔
366
        _destination.IndexTableName = GetIndexName();
34✔
367
        if (UserDefinedIndex is not null)
34!
UNCOV
368
            _destination.UserDefinedIndexes = UserDefinedIndex.Split(',').Select(i => i.Trim()).ToList();
×
369
        _destination.PreInitialize(_activator, _destinationDatabase, listener);
34✔
370

371

372
        return _destination;
34✔
UNCOV
373
    }
×
374

375
    private void PrimeDestinationTypesBasedOnCatalogueTypes(IDataLoadEventListener listener, DataTable toProcess)
376
    {
377
        //if the extraction is of a Catalogue
378

379
        if (_request is not IExtractDatasetCommand datasetCommand)
34!
UNCOV
380
            return;
×
381

382
        //for every extractable column in the Catalogue
383
        foreach (var extractionInformation in datasetCommand.ColumnsToExtract.OfType<ExtractableColumn>()
2,512✔
384
                     .Select(ec =>
34✔
385
                         ec.CatalogueExtractionInformation))
1,256✔
386
        {
387
            if (extractionInformation == null)
1,222✔
388
                continue;
389

390
            var catItem = extractionInformation.CatalogueItem;
1,222✔
391

392
            //if we do not know the data type or the ei is a transform
393
            if (catItem == null)
1,222!
394
            {
395
                listener.OnNotify(this,
×
396
                    new NotifyEventArgs(ProgressEventType.Warning,
×
397
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it had no associated CatalogueItem"));
×
UNCOV
398
                continue;
×
399
            }
400

401
            if (catItem.ColumnInfo == null)
1,222!
402
            {
403
                listener.OnNotify(this,
×
404
                    new NotifyEventArgs(ProgressEventType.Warning,
×
405
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it had no associated ColumnInfo"));
×
UNCOV
406
                continue;
×
407
            }
408

409
            if (extractionInformation.IsProperTransform())
1,222✔
410
            {
411
                listener.OnNotify(this,
2✔
412
                    new NotifyEventArgs(ProgressEventType.Warning,
2✔
413
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it is a Transform"));
2✔
414
                continue;
2✔
415
            }
416

417
            var destinationType = GetDestinationDatabaseType(extractionInformation);
1,220✔
418

419
            //Tell the destination the datatype of the ColumnInfo that underlies the ExtractionInformation (this might be changed by the ExtractionInformation e.g. as a
420
            //transform but it is a good starting point.  We don't want to create a varchar(10) column in the destination if the origin dataset (Catalogue) is a varchar(100)
421
            //since it will just confuse the user.  Bear in mind these data types can be degraded later by the destination
422
            var columnName = extractionInformation.Alias ?? catItem.ColumnInfo.GetRuntimeName();
1,220✔
423
            var addedType = _destination.AddExplicitWriteType(columnName, destinationType);
1,220✔
424
            addedType.IsPrimaryKey = toProcess.PrimaryKey.Any(dc => dc.ColumnName == columnName);
2,436✔
425

426
            //if user wants to copy collation types and the destination server is the same type as the origin server
427
            if (CopyCollations && _destinationDatabase.Server.DatabaseType == catItem.ColumnInfo.TableInfo.DatabaseType)
1,220!
UNCOV
428
                addedType.Collation = catItem.ColumnInfo.Collation;
×
429

430
            listener.OnNotify(this,
1,220✔
431
                new NotifyEventArgs(ProgressEventType.Information,
1,220✔
432
                    $"Set Type for {columnName} to {destinationType} (IsPrimaryKey={(addedType.IsPrimaryKey ? "true" : "false")}) to match the source table"));
1,220✔
433
        }
434

435

436
        foreach (var sub in datasetCommand.QueryBuilder.SelectColumns.Select(static sc => sc.IColumn)
1,392✔
437
                     .OfType<ReleaseIdentifierSubstitution>())
34✔
438
        {
439
            var columnName = sub.GetRuntimeName();
34✔
440
            var isPk = toProcess.PrimaryKey.Any(dc => dc.ColumnName == columnName);
62✔
441

442
            var addedType = _destination.AddExplicitWriteType(columnName,
34✔
443
                datasetCommand.ExtractableCohort.GetReleaseIdentifierDataType());
34✔
444
            addedType.IsPrimaryKey = isPk;
34✔
445
            addedType.AllowNulls = !isPk;
34✔
446
        }
447
    }
34✔
448

449
    private string GetDestinationDatabaseType(ConcreteColumn col)
450
    {
451
        //Make sure we know if we are going between database types
452
        var fromDbType = _destinationDatabase.Server.DatabaseType;
1,220✔
453
        var toDbType = col.ColumnInfo.TableInfo.DatabaseType;
1,220✔
454
        if (fromDbType != toDbType)
1,220!
455
        {
456
            var fromSyntax = col.ColumnInfo.GetQuerySyntaxHelper();
×
UNCOV
457
            var toSyntax = _destinationDatabase.Server.GetQuerySyntaxHelper();
×
458

459
            var intermediate = fromSyntax.TypeTranslater.GetDataTypeRequestForSQLDBType(col.ColumnInfo.Data_type);
×
UNCOV
460
            return toSyntax.TypeTranslater.GetSQLDBTypeForCSharpType(intermediate);
×
461
        }
462

463
        return col.ColumnInfo.Data_type;
1,220✔
464
    }
465

466
    private string GetIndexName()
467
    {
468
        string indexName = IndexNamingPattern;
34✔
469
        var project = _request.Configuration.Project;
34✔
470
        indexName = indexName.Replace("$p", project.Name);
34✔
471
        indexName = indexName.Replace("$n", project.ProjectNumber.ToString());
34✔
472
        indexName = indexName.Replace("$c", _request.Configuration.Name);
34✔
473
        indexName = indexName.Replace("$e", _request.Configuration.ID.ToString());
34✔
474
        if (_request is ExtractDatasetCommand extractDatasetCommand)
34✔
475
        {
476
            indexName = indexName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
34✔
477
            indexName = indexName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
34✔
478
        }
479

480
        if (_request is ExtractGlobalsCommand)
34!
481
        {
482
            indexName = indexName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
×
UNCOV
483
            indexName = indexName.Replace("$a", "G");
×
484
        }
485

486

487
        return indexName.Replace(" ", "");
34✔
488
    }
489

490
    private string GetTableName(string suffix = null)
491
    {
492
        string tblName;
493
        if (_isTableAlreadyNamed)
40!
494
        {
UNCOV
495
            tblName = SanitizeNameForDatabase(_toProcess.TableName);
×
496

497
            if (!string.IsNullOrWhiteSpace(suffix))
×
UNCOV
498
                tblName += $"_{suffix}";
×
499

UNCOV
500
            return tblName;
×
501
        }
502

503
        tblName = TableNamingPattern;
40✔
504
        var project = _request.Configuration.Project;
40✔
505

506
        tblName = tblName.Replace("$p", project.Name);
40✔
507
        tblName = tblName.Replace("$n", project.ProjectNumber.ToString());
40✔
508
        tblName = tblName.Replace("$c", _request.Configuration.Name);
40✔
509
        tblName = tblName.Replace("$e", _request.Configuration.ID.ToString());
40✔
510

511
        if (_request is ExtractDatasetCommand extractDatasetCommand)
40✔
512
        {
513
            tblName = tblName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
38✔
514
            tblName = tblName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
38✔
515
        }
516

517
        if (_request is ExtractGlobalsCommand)
40✔
518
        {
519
            tblName = tblName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
2✔
520
            tblName = tblName.Replace("$a", "G");
2✔
521
        }
522

523
        var cachedGetTableNameAnswer = SanitizeNameForDatabase(tblName);
40✔
524
        if (!string.IsNullOrWhiteSpace(suffix))
40✔
525
            cachedGetTableNameAnswer += $"_{suffix}";
6✔
526

527
        return cachedGetTableNameAnswer;
40✔
528
    }
529

530
    private string SanitizeNameForDatabase(string tblName)
531
    {
532
        if (_destinationDatabase == null)
40!
533
            throw new Exception(
×
UNCOV
534
                "Cannot pick a TableName until we know what type of server it is going to, _server is null");
×
535

536
        //get rid of brackets and dots
537
        tblName = Regex.Replace(tblName, "[.()]", "_");
40✔
538

539
        var syntax = _destinationDatabase.Server.GetQuerySyntaxHelper();
40✔
540
        syntax.ValidateTableName(tblName);
40✔
541

542
        //otherwise, fetch and cache answer
543
        var cachedGetTableNameAnswer = syntax.GetSensibleEntityNameFromString(tblName);
40✔
544

545
        return string.IsNullOrWhiteSpace(cachedGetTableNameAnswer)
40!
546
            ? throw new Exception(
40✔
547
                $"TableNamingPattern '{TableNamingPattern}' resulted in an empty string for request '{_request}'")
40✔
548
            : cachedGetTableNameAnswer;
40✔
549
    }
550

551
    public override void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
552
    {
553
        if (_destination != null)
68✔
554
        {
555
            _destination.Dispose(listener, pipelineFailureExceptionIfAny);
34✔
556

557
            //if the extraction failed, the table didn't exist in the destination (i.e. the table was created during the extraction) and we are to DropTableIfLoadFails
558
            if (pipelineFailureExceptionIfAny != null && _tableDidNotExistAtStartOfLoad && DropTableIfLoadFails)
34!
UNCOV
559
                if (_destinationDatabase != null)
×
560
                {
UNCOV
561
                    var tbl = _destinationDatabase.ExpectTable(_toProcess.TableName);
×
562

UNCOV
563
                    if (tbl.Exists())
×
564
                    {
565
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
566
                            $"DropTableIfLoadFails is true so about to drop table {tbl}"));
×
567
                        tbl.Drop();
×
UNCOV
568
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Dropped table {tbl}"));
×
569
                    }
570
                }
571

572
            if (pipelineFailureExceptionIfAny == null
34!
573
                && _request.IsBatchResume
34✔
574
                && MakeFinalTableDistinctWhenBatchResuming
34✔
575
                && _destinationDatabase != null
34✔
576
                && _toProcess != null)
34✔
577
            {
578
                var tbl = _destinationDatabase.ExpectTable(_toProcess.TableName);
×
UNCOV
579
                if (tbl.Exists())
×
580
                    // if there is no primary key then failed batches may have introduced duplication
UNCOV
581
                    if (!tbl.DiscoverColumns().Any(p => p.IsPrimaryKey))
×
582
                    {
583
                        listener.OnNotify(this,
×
584
                            new NotifyEventArgs(ProgressEventType.Information,
×
585
                                $"Making {tbl} distinct in case there are duplicate rows from bad batch resumes"));
×
586
                        tbl.MakeDistinct(50000000);
×
587
                        listener.OnNotify(this,
×
UNCOV
588
                            new NotifyEventArgs(ProgressEventType.Information, $"Finished distincting {tbl}"));
×
589
                    }
590
            }
591
        }
592

593
        TableLoadInfo?.CloseAndArchive();
68✔
594

595
        // also close off the cumulative extraction result
596
        if (_request is ExtractDatasetCommand)
68✔
597
        {
598
            var result = ((IExtractDatasetCommand)_request).CumulativeExtractionResults;
34✔
599
            if (result != null && _toProcess != null)
34✔
600
                result.CompleteAudit(GetType(), GetDestinationDescription(), TableLoadInfo.Inserts,
34✔
601
                    _request.IsBatchResume, pipelineFailureExceptionIfAny != null);
34✔
602
        }
603
    }
68✔
604

605
    public override void Abort(IDataLoadEventListener listener)
606
    {
607
        _destination?.Abort(listener);
×
UNCOV
608
    }
×
609

610
    protected override void PreInitializeImpl(IBasicActivateItems activator, IExtractCommand value, IDataLoadEventListener listener)
611
    {
612
        _activator = activator;
142✔
613
    }
142✔
614

615

616
    public override string GetDestinationDescription() => GetDestinationDescription("");
66✔
617

618
    private string GetDestinationDescription(string suffix = "")
619
    {
620
        if (_toProcess == null)
100✔
621
            return _request is ExtractGlobalsCommand
34!
622
                ? "Globals"
34✔
623
                : throw new Exception("Could not describe destination because _toProcess was null");
34✔
624

625
        var tblName = _toProcess.TableName;
66✔
626
        var dbName = GetDatabaseName();
66✔
627
        return $"{TargetDatabaseServer.ID}|{dbName}|{tblName}";
66✔
628
    }
629

UNCOV
630
    public static DestinationType GetDestinationType() => DestinationType.Database;
×
631

632
    public override ReleasePotential GetReleasePotential(IRDMPPlatformRepositoryServiceLocator repositoryLocator,
UNCOV
633
        ISelectedDataSets selectedDataSet) => new MsSqlExtractionReleasePotential(repositoryLocator, selectedDataSet);
×
634

635
    public override FixedReleaseSource<ReleaseAudit> GetReleaseSource(ICatalogueRepository catalogueRepository) =>
UNCOV
636
        new MsSqlReleaseSource(catalogueRepository);
×
637

638
    public override GlobalReleasePotential GetGlobalReleasabilityEvaluator(
639
        IRDMPPlatformRepositoryServiceLocator repositoryLocator, ISupplementalExtractionResults globalResult,
640
        IMapsDirectlyToDatabaseTable globalToCheck) =>
UNCOV
641
        new MsSqlGlobalsReleasePotential(repositoryLocator, globalResult, globalToCheck);
×
642

643
    protected override void TryExtractSupportingSQLTableImpl(SupportingSQLTable sqlTable, DirectoryInfo directory,
644
        IExtractionConfiguration configuration, IDataLoadEventListener listener, out int linesWritten,
645
        out string destinationDescription)
646
    {
647
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
4✔
648
            $"About to download SQL for global SupportingSQL {sqlTable.SQL}"));
4✔
649
        using var con = sqlTable.GetServer().GetConnection();
4✔
650
        con.Open();
4✔
651

652
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
4✔
653
            $"Connection opened successfully, about to send SQL command {sqlTable.SQL}"));
4✔
654

655
        using var dt = new DataTable();
4✔
656
        using (var cmd = DatabaseCommandHelper.GetCommand(sqlTable.SQL, con))
4✔
657
        using (var da = DatabaseCommandHelper.GetDataAdapter(cmd))
4✔
658
        {
659
            var sw = Stopwatch.StartNew();
4✔
660
            dt.BeginLoadData();
4✔
661
            da.Fill(dt);
4✔
662
            dt.EndLoadData();
4✔
663
        }
4✔
664

665
        dt.TableName = GetTableName(_destinationDatabase.Server.GetQuerySyntaxHelper()
4✔
666
            .GetSensibleEntityNameFromString(sqlTable.Name));
4✔
667
        linesWritten = dt.Rows.Count;
4✔
668

669
        var destinationDb = GetDestinationDatabase(listener);
4✔
670
        var tbl = destinationDb.ExpectTable(dt.TableName);
4✔
671

672
        if (tbl.Exists())
4!
UNCOV
673
            tbl.Drop();
×
674

675
        destinationDb.CreateTable(dt.TableName, dt);
4✔
676
        destinationDescription = $"{TargetDatabaseServer.ID}|{GetDatabaseName()}|{dt.TableName}";
4✔
677
    }
8✔
678

679

680
    protected override void TryExtractLookupTableImpl(BundledLookupTable lookup, DirectoryInfo lookupDir,
681
        IExtractionConfiguration requestConfiguration, IDataLoadEventListener listener, out int linesWritten,
682
        out string destinationDescription)
683
    {
684
        using var dt = lookup.GetDataTable();
2✔
685

686
        dt.TableName = GetTableName(_destinationDatabase.Server.GetQuerySyntaxHelper()
2✔
687
            .GetSensibleEntityNameFromString(lookup.TableInfo.Name));
2✔
688

689
        //describe the destination for the abstract base
690
        destinationDescription = $"{TargetDatabaseServer.ID}|{GetDatabaseName()}|{dt.TableName}";
2✔
691
        linesWritten = dt.Rows.Count;
2✔
692

693
        var destinationDb = GetDestinationDatabase(listener);
2✔
694
        var existing = destinationDb.ExpectTable(dt.TableName);
2✔
695

696
        if (existing.Exists())
2!
697
        {
698
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
699
                $"Dropping existing Lookup table '{existing.GetFullyQualifiedName()}'"));
×
UNCOV
700
            existing.Drop();
×
701
        }
702

703
        destinationDb.CreateTable(dt.TableName, dt);
2✔
704
    }
4✔
705

706
    private DiscoveredDatabase GetDestinationDatabase(IDataLoadEventListener listener)
707
    {
708
        //tell user we are about to inspect it
709
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
74✔
710
            $"About to open connection to {TargetDatabaseServer}"));
74✔
711

712
        var databaseName = GetDatabaseName();
74✔
713

714
        var discoveredServer = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
74✔
715

716
        var db = discoveredServer.ExpectDatabase(databaseName);
74✔
717
        if (!db.Exists())
74✔
718
            db.Create();
10✔
719

720
        return db;
74✔
721
    }
722

723
    private string GetDatabaseName()
724
    {
725
        var dbName = DatabaseNamingPattern;
152✔
726

727
        if (_project.ProjectNumber == null)
152!
UNCOV
728
            throw new ProjectNumberException($"Project '{_project}' must have a ProjectNumber");
×
729

730
        if (_request == null)
152!
UNCOV
731
            throw new Exception("No IExtractCommand Request was passed to this component");
×
732

733
        if (_request.Configuration == null)
152!
UNCOV
734
            throw new Exception($"Request did not specify any Configuration for Project '{_project}'");
×
735

736
        dbName = dbName.Replace("$p", _project.Name)
152✔
737
            .Replace("$n", _project.ProjectNumber.ToString())
152✔
738
            .Replace("$t", _project.MasterTicket)
152✔
739
            .Replace("$r", _request.Configuration.RequestTicket)
152✔
740
            .Replace("$l", _request.Configuration.ReleaseTicket)
152✔
741
            .Replace("$e", _request.Configuration.ID.ToString());
152✔
742
        return dbName;
152✔
743
    }
744

745
    public override void Check(ICheckNotifier notifier)
746
    {
747
        if (TargetDatabaseServer == null)
10✔
748
        {
749
            notifier.OnCheckPerformed(new CheckEventArgs(
2✔
750
                "Target database server property has not been set (This component does not know where to extract data to!), " +
2✔
751
                "to fix this you must edit the pipeline and choose an ExternalDatabaseServer to extract to)",
2✔
752
                CheckResult.Fail));
2✔
753
            return;
2✔
754
        }
755

756
        if (string.IsNullOrWhiteSpace(TargetDatabaseServer.Server))
8✔
757
        {
758
            notifier.OnCheckPerformed(new CheckEventArgs("TargetDatabaseServer does not have a .Server specified",
2✔
759
                CheckResult.Fail));
2✔
760
            return;
2✔
761
        }
762

763
        if (!string.IsNullOrWhiteSpace(TargetDatabaseServer.Database))
6!
764
            notifier.OnCheckPerformed(new CheckEventArgs(
×
UNCOV
765
                "TargetDatabaseServer has .Database specified but this will be ignored!", CheckResult.Warning));
×
766

767
        if (string.IsNullOrWhiteSpace(TableNamingPattern))
6!
768
        {
769
            notifier.OnCheckPerformed(new CheckEventArgs(
×
770
                "You must specify TableNamingPattern, this will tell the component how to name tables it generates in the remote destination",
×
771
                CheckResult.Fail));
×
UNCOV
772
            return;
×
773
        }
774

775
        if (string.IsNullOrWhiteSpace(DatabaseNamingPattern))
6!
776
        {
777
            notifier.OnCheckPerformed(new CheckEventArgs(
×
778
                "You must specify DatabaseNamingPattern, this will tell the component what database to create or use in the remote destination",
×
779
                CheckResult.Fail));
×
UNCOV
780
            return;
×
781
        }
782

783
        if (!DatabaseNamingPattern.Contains("$p") && !DatabaseNamingPattern.Contains("$n") &&
6✔
784
            !DatabaseNamingPattern.Contains("$t") && !DatabaseNamingPattern.Contains("$r") &&
6✔
785
            !DatabaseNamingPattern.Contains("$l"))
6✔
786
            notifier.OnCheckPerformed(new CheckEventArgs(
4✔
787
                "DatabaseNamingPattern does not contain any token. The tables may be created alongside existing tables and Release would be impossible.",
4✔
788
                CheckResult.Warning));
4✔
789

790
        if (!TableNamingPattern.Contains("$d") && !TableNamingPattern.Contains("$a"))
6!
791
            notifier.OnCheckPerformed(new CheckEventArgs(
×
792
                "TableNamingPattern must contain either $d or $a, the name/acronym of the dataset being extracted otherwise you will get collisions when you extract multiple tables at once",
×
UNCOV
793
                CheckResult.Warning));
×
794

795
        if (_request == ExtractDatasetCommand.EmptyCommand)
6!
796
        {
797
            notifier.OnCheckPerformed(new CheckEventArgs(
×
798
                "Request is ExtractDatasetCommand.EmptyCommand, will not try to connect to Database",
×
799
                CheckResult.Warning));
×
UNCOV
800
            return;
×
801
        }
802

803
        if (TableNamingPattern != null && TableNamingPattern.Contains("$a"))
6!
804
            if (_request is ExtractDatasetCommand dsRequest && string.IsNullOrWhiteSpace(dsRequest.Catalogue.Acronym))
×
805
                notifier.OnCheckPerformed(new CheckEventArgs(
×
806
                    $"Catalogue '{dsRequest.Catalogue}' does not have an Acronym but TableNamingPattern contains $a",
×
UNCOV
807
                    CheckResult.Fail));
×
808

809

810

811
        base.Check(notifier);
6✔
812

813
        try
814
        {
815
            var server = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
6✔
816
            var database = _destinationDatabase = server.ExpectDatabase(GetDatabaseName());
6✔
817

818
            if (UseArchiveTrigger)
6!
819
            {
UNCOV
820
                if (_request is ExtractDatasetCommand dsRequest)
×
821
                {
822
                    var existing = _destinationDatabase.ExpectTable(dsRequest.Catalogue.Name);
×
UNCOV
823
                    if (existing.Exists())
×
824
                    {
825
                        var hasPKs = existing.DiscoverColumns().Any(col => col.IsPrimaryKey);
×
UNCOV
826
                        if (!hasPKs)
×
827
                        {
828
                            notifier.OnCheckPerformed(new CheckEventArgs(
×
829
                               $"Catalogue does not have any PKS. Cannot apply the archive trigger",
×
UNCOV
830
                               CheckResult.Fail));
×
831
                        }
832
                    }
833
                }
834
            }
835

836
            if (database.Exists())
6✔
837
            {
838
                notifier.OnCheckPerformed(
2✔
839
                    new CheckEventArgs(
2✔
840
                        $"Database {database} already exists! if an extraction has already been run you may have errors if you are re-extracting the same tables",
2✔
841
                        CheckResult.Warning));
2✔
842
            }
843
            else
844
            {
845
                notifier.OnCheckPerformed(
4✔
846
                    new CheckEventArgs(
4✔
847
                        $"Database {database} does not exist on server... it will be created at runtime",
4✔
848
                        CheckResult.Success));
4✔
849
                return;
4✔
850
            }
851

852
            var tables = database.DiscoverTables(false);
2✔
853

854
            if (tables.Any())
2!
855
            {
856
                string tableName;
857

858
                try
859
                {
860
                    tableName = GetTableName();
×
861
                }
×
UNCOV
862
                catch (Exception ex)
×
863
                {
864
                    notifier.OnCheckPerformed(
×
865
                        new CheckEventArgs("Could not determine table name", CheckResult.Fail, ex));
×
UNCOV
866
                    return;
×
867
                }
868

869
                // if the expected table exists and we are not doing a batch resume or allowing data appending
870
                if (tables.Any(t => t.GetRuntimeName().Equals(tableName)) && !_request.IsBatchResume && !AppendDataIfTableExists)
×
871
                    notifier.OnCheckPerformed(new CheckEventArgs(ErrorCodes.ExistingExtractionTableInDatabase,
×
UNCOV
872
                        tableName, database));
×
873
            }
874
            else
875
            {
876
                notifier.OnCheckPerformed(new CheckEventArgs($"Confirmed that database {database} is empty of tables",
2✔
877
                    CheckResult.Success));
2✔
878
            }
879
        }
2✔
UNCOV
880
        catch (Exception e)
×
881
        {
882
            notifier.OnCheckPerformed(new CheckEventArgs(
×
883
                $"Could not connect to TargetDatabaseServer '{TargetDatabaseServer}'", CheckResult.Fail, e));
×
UNCOV
884
        }
×
885
    }
6✔
886
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc