• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 26566925905

28 May 2026 09:37AM UTC coverage: 57.023% (-0.08%) from 57.101%
26566925905

push

github

JFriel
Merge branch 'bugfix/cohort-commit-issue' of https://github.com/HicServices/RDMP into bugfix/cohort-commit-issue

11562 of 21813 branches covered (53.01%)

Branch coverage included in aggregate %.

32688 of 55787 relevant lines covered (58.59%)

18057.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.14
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/ExecuteFullExtractionToDatabaseMSSql.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using Amazon.Auth.AccessControlPolicy;
8
using FAnsi.Discovery;
9
using Rdmp.Core.CommandExecution;
10
using Rdmp.Core.Curation.Data;
11
using Rdmp.Core.DataExport.Data;
12
using Rdmp.Core.DataExport.DataExtraction.Commands;
13
using Rdmp.Core.DataExport.DataExtraction.UserPicks;
14
using Rdmp.Core.DataExport.DataRelease.Pipeline;
15
using Rdmp.Core.DataExport.DataRelease.Potential;
16
using Rdmp.Core.DataFlowPipeline;
17
using Rdmp.Core.DataLoad.Engine.Job.Scheduling;
18
using Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
19
using Rdmp.Core.DataLoad.Triggers.Exceptions;
20
using Rdmp.Core.DataLoad.Triggers.Implementations;
21
using Rdmp.Core.MapsDirectlyToDatabaseTable;
22
using Rdmp.Core.QueryBuilding;
23
using Rdmp.Core.Repositories;
24
using Rdmp.Core.ReusableLibraryCode;
25
using Rdmp.Core.ReusableLibraryCode.Checks;
26
using Rdmp.Core.ReusableLibraryCode.DataAccess;
27
using Rdmp.Core.ReusableLibraryCode.Progress;
28
using System;
29
using System.Data;
30
using System.Diagnostics;
31
using System.IO;
32
using System.Linq;
33
using System.Text.RegularExpressions;
34
using YamlDotNet.Core;
35

36
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations;
37

38
/// <summary>
39
/// Alternate extraction pipeline destination in which the DataTable containing the extracted dataset is written to an Sql Server database
40
/// </summary>
41
public class ExecuteFullExtractionToDatabaseMSSql : ExtractionDestination
42
{
43
    [DemandsInitialization(
44
        "External server to create the extraction into, a new database will be created for the project based on the naming pattern provided",
45
        Mandatory = true)]
46
    public IExternalDatabaseServer TargetDatabaseServer { get; set; }
370✔
47

48
    [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
49
         $p - Project Name ('e.g. My Project')
50
         $n - Project Number (e.g. 234)
51
         $t - Master Ticket (e.g. 'LINK-1234')
52
         $r - Request Ticket (e.g. 'LINK-1234')
53
         $l - Release Ticket (e.g. 'LINK-1234')
54
         $e - Extraction Configuration Id (e.g. 14)
55
         ", Mandatory = true, DefaultValue = "Proj_$n_$l")]
56
    public string DatabaseNamingPattern { get; set; }
256✔
57

58
    [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
59
         $p - Project Name ('e.g. My Project')
60
         $n - Project Number (e.g. 234)
61
         $c - Configuration Name (e.g. 'Cases')
62
         $d - Dataset name (e.g. 'Prescribing')
63
         $a - Dataset acronym (e.g. 'Presc') 
64
         $e - Extraction Configuration Id (e.g. 14)
65
         You must have either $a or $d
66
         ", Mandatory = true, DefaultValue = "$c_$d")]
67
    public string TableNamingPattern { get; set; }
138✔
68

69
    [DemandsInitialization(
70
        @"If the extraction fails half way through AND the destination table was created during the extraction then the table will be dropped from the destination rather than being left in a half loaded state ",
71
        defaultValue: true)]
72
    public bool DropTableIfLoadFails { get; set; }
68✔
73

74
    [DemandsInitialization(DataTableUploadDestination.AlterTimeout_Description, DefaultValue = 300)]
75
    public int AlterTimeout { get; set; }
102✔
76

77
    [DemandsInitialization("By applying the primary keys after writing the data, it ensures all data is extracted. Disabling this configuration may improve performance but will quickly raise issues with poorly keyed data.", DefaultValue = true)]
78
    public bool WriteDataBeforeApplyingPrimaryKeys { get; set; }
102✔
79

80
    [DemandsInitialization(
81
        "True to copy the column collations from the source database when creating the destination database.  Only works if both the source and destination have the same DatabaseType.  Excludes columns which feature a transform as part of extraction.",
82
        DefaultValue = false)]
83
    public bool CopyCollations { get; set; }
1,288✔
84

85
    [DemandsInitialization(
86
        "True to always drop the destination database table(s) from the destination if they already existed",
87
        DefaultValue = false)]
88
    public bool AlwaysDropExtractionTables { get; set; }
100✔
89

90
    [DemandsInitialization(
91
        "True to apply a distincting operation to the final table when using an ExtractionProgress.  This prevents data duplication from failed batch resumes.",
92
        DefaultValue = true)]
93
    public bool MakeFinalTableDistinctWhenBatchResuming { get; set; } = true;
146✔
94

95

96
    [DemandsInitialization("If this extraction has already been run, it will append the extraction data into the database. There is no duplication protection with this functionality.")]
97
    public bool AppendDataIfTableExists { get; set; } = false;
136✔
98

99
    [DemandsInitialization("If checked, a column names 'extraction_timestamp' will be included in the extraction that denotes the time the record was added to the extraction.")]
100
    public bool IncludeTimeStamp { get; set; } = false;
102✔
101

102

103
    [DemandsInitialization("If checked, indexed will be created using the primary keys specified")]
104
    public bool IndexTables { get; set; } = true;
180✔
105

106
    [DemandsInitialization(@"How do you want to name the created index, use the following tokens if you need them:   
107
         $p - Project Name ('e.g. My Project')
108
         $n - Project Number (e.g. 234)
109
         $c - Configuration Name (e.g. 'Cases')
110
         $d - Dataset name (e.g. 'Prescribing')
111
         $a - Dataset acronym (e.g. 'Presc') 
112
         $e - Extraction Configuration Id (e.g. 14)
113
         You must have either $a or $d
114
         ", DefaultValue = "Index_$c_$d")]
115
    public string IndexNamingPattern { get; set; }
102✔
116

117
    [DemandsInitialization("An optional list of columns to index on e.g \"Column1, Column2\"")]
118
    public string UserDefinedIndex { get; set; }
102✔
119

120

121
    [DemandsInitialization("When writing to an existing database, will update records and store historical versions via a view", DefaultValue = false)]
122
    public bool UseArchiveTrigger { get; set; }
124✔
123

124
    private DiscoveredDatabase _destinationDatabase;
125
    private DataTableUploadDestination _destination;
126

127
    private bool _tableDidNotExistAtStartOfLoad;
128
    private bool _isTableAlreadyNamed;
129
    private DataTable _toProcess;
130
    private IBasicActivateItems _activator;
131

132
    public ExecuteFullExtractionToDatabaseMSSql() : base(false)
78✔
133
    {
134
    }
78✔
135

136
    public override DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener job,
137
        GracefulCancellationToken cancellationToken)
138
    {
139
        _destinationDatabase = GetDestinationDatabase(job);
68✔
140
        return base.ProcessPipelineData(toProcess, job, cancellationToken);
68✔
141
    }
142

143
    protected override void Open(DataTable toProcess, IDataLoadEventListener job,
144
        GracefulCancellationToken cancellationToken)
145
    {
146
        _toProcess = toProcess;
34✔
147

148
        //give the data table the correct name
149
        if (_toProcess.ExtendedProperties.ContainsKey("ProperlyNamed") &&
34!
150
            _toProcess.ExtendedProperties["ProperlyNamed"].Equals(true))
34✔
151
            _isTableAlreadyNamed = true;
×
152

153
        _toProcess.TableName = GetTableName();
34✔
154

155
        _destination = PrepareDestination(job, _toProcess);
34✔
156
        OutputFile = _toProcess.TableName;
34✔
157
    }
34✔
158

159
    protected override void WriteRows(DataTable toProcess, IDataLoadEventListener job,
160
        GracefulCancellationToken cancellationToken, Stopwatch stopwatch)
161
    {
162
        // empty batches are allowed when using batch/resume
163
        if (toProcess.Rows.Count == 0 && _request.IsBatchResume) return;
34!
164

165
        if (_request.IsBatchResume) _destination.AllowLoadingPopulatedTables = true;
34!
166

167
        _destination.ProcessPipelineData(toProcess, job, cancellationToken);
34✔
168

169
        LinesWritten += toProcess.Rows.Count;
32✔
170
    }
32✔
171

172
    private DataTableUploadDestination PrepareDestination(IDataLoadEventListener listener, DataTable toProcess)
173
    {
174
        //see if the user has entered an extraction server/database
175
        if (TargetDatabaseServer == null)
34!
176
            throw new Exception(
×
177
                "TargetDatabaseServer (the place you want to extract the project data to) property has not been set!");
×
178

179
        try
180
        {
181
            if (!_destinationDatabase.Exists())
34!
182
                _destinationDatabase.Create();
×
183

184
            if (_request is ExtractGlobalsCommand)
34!
185
                return null;
×
186

187
            var tblName = _toProcess.TableName;
34✔
188

189
            //See if table already exists on the server (likely to cause problems including duplication, schema changes in configuration etc)
190
            var existing = _destinationDatabase.ExpectTable(tblName);
34✔
191
            if (existing.Exists())
34✔
192
            {
193
                var hasPKs = existing.DiscoverColumns().Any(col => col.IsPrimaryKey);
108✔
194

195
                if (!AlwaysDropExtractionTables)
16✔
196
                {
197
                    //check the PKs are the same
198
                    var remotePKs = existing.DiscoverColumns().Where(col => col.IsPrimaryKey).Select(col => col.GetRuntimeName()).ToList();
662✔
199
                    var rdmpPKs = toProcess.PrimaryKey.Cast<DataColumn>().Select(col => col.ColumnName).ToList();
32✔
200
                    if (!remotePKs.All(rdmpPKs.Contains) || remotePKs.Count != rdmpPKs.Count)
16!
201
                    {
202
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error,
×
203
                            $"""
×
204
                        Table {existing.GetFullyQualifiedName()} already exists and has different PKs to the source table.                            
×
205
                        Source PKs: {string.Join(", ", rdmpPKs)}
×
206
                        Destination PKs: {string.Join(", ", remotePKs)}
×
207
                        """));
×
208
                        return null;
×
209
                    }
210

211
                }
212

213
                if (_request.IsBatchResume)
16!
214
                {
215
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
×
216
                        $"Table {existing.GetFullyQualifiedName()} already exists but it IsBatchResume so no problem."));
×
217
                }
218
                else if (AlwaysDropExtractionTables)
16!
219
                {
220
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
221
                        $"Table {existing.GetFullyQualifiedName()} already exists, dropping because setting {nameof(AlwaysDropExtractionTables)} is on"));
×
222
                    existing.Drop();
×
223

224
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
225
                        $"Table {existing.GetFullyQualifiedName()} was dropped"));
×
226

227
                    // since we dropped it we should treat it as if it was never there to begin with
228
                    _tableDidNotExistAtStartOfLoad = true;
×
229
                }
230
                else if (UseArchiveTrigger && hasPKs)
16✔
231
                {
232

233
                    TriggerImplementerFactory triggerFactory = new TriggerImplementerFactory(FAnsi.DatabaseType.MicrosoftSQLServer);
6✔
234
                    var implementor = triggerFactory.Create(existing);
6✔
235
                    bool present;
236
                    try
237
                    {
238
                        present = implementor.GetTriggerStatus() == DataLoad.Triggers.TriggerStatus.Enabled;
6✔
239
                    }
6✔
240
                    catch (TriggerMissingException)
×
241
                    {
242
                        present = false;
×
243
                    }
×
244
                    //check the columns are correct, we might have added some
245
                    var existingColumns = existing.DiscoverColumns();
6✔
246
                    var existingColumnNames = existingColumns.Select(ec => ec.GetRuntimeName());
4,686✔
247
                    var toProcessColumnNames = toProcess.Columns.Cast<DataColumn>().Select(col => col.ColumnName);
240✔
248
                    var newColumns = toProcessColumnNames.Where(c => !existingColumnNames.Contains(c));
240✔
249
                    if (newColumns.Any())
6!
250
                    {
251
                        var archiveTable = _destinationDatabase.ExpectTable(tblName + "_Archive");
×
252
                        if (archiveTable.Exists())
×
253
                        {
254
                            foreach (var column in newColumns)
×
255
                            {
256
                                existing.AddColumn(column, new TypeGuesser.DatabaseTypeRequest(toProcess.Columns[column].DataType), true, 30000);
×
257
                                archiveTable.AddColumn(column, new TypeGuesser.DatabaseTypeRequest(toProcess.Columns[column].DataType), true, 30000);
×
258
                            }
259
                            if (present)
×
260
                            {
261
                                string triggerProblems = "";
×
262
                                string triggerOK = "";
×
263
                                implementor.DropTrigger(out triggerProblems, out triggerOK);
×
264
                                if (triggerProblems != "")
×
265
                                {
266
                                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, triggerProblems));
×
267
                                }
268

269
                                existing = _destinationDatabase.ExpectTable(tblName);
×
270
                                implementor = triggerFactory.Create(existing);
×
271
                                present = false;
×
272
                            }
273
                        }
274
                    }
275

276
                    if (!present)
6!
277
                    {
278
                        implementor.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
×
279
                    }
280

281
                }
282
                else
283
                {
284
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
10✔
285
                        $"A table called {tblName} already exists on server {TargetDatabaseServer}, data load might crash if it is populated and/or has an incompatible schema"));
10✔
286
                }
287
            }
288
            else
289
            {
290
                _tableDidNotExistAtStartOfLoad = true;
18✔
291
            }
292
        }
34✔
293
        catch (Exception e)
×
294
        {
295
            //Probably the database didn't exist or the credentials were wrong or something
296
            listener.OnNotify(this,
×
297
                new NotifyEventArgs(ProgressEventType.Error,
×
298
                    "Failed to inspect destination for already existing datatables", e));
×
299
        }
×
300

301
        _destination = new DataTableUploadDestination(((IExtractDatasetCommand)_request).ExtractableCohort.ExternalCohortTable);
34✔
302

303
        PrimeDestinationTypesBasedOnCatalogueTypes(listener, toProcess);
34✔
304

305
        _destination.AllowResizingColumnsAtUploadTime = true;
34✔
306
        _destination.AlterTimeout = AlterTimeout;
34✔
307
        _destination.WriteDataBeforeApplyingPrimaryKeys = WriteDataBeforeApplyingPrimaryKeys;
34✔
308
        _destination.AppendDataIfTableExists = AppendDataIfTableExists;
34✔
309
        _destination.IncludeTimeStamp = IncludeTimeStamp;
34✔
310
        _destination.UseTrigger = AppendDataIfTableExists;
34✔
311
        _destination.IndexTables = IndexTables;
34✔
312
        _destination.UseTrigger = UseArchiveTrigger;
34✔
313
        _destination.IndexTableName = GetIndexName();
34✔
314
        if (UserDefinedIndex is not null)
34!
315
            _destination.UserDefinedIndexes = UserDefinedIndex.Split(',').Select(i => i.Trim()).ToList();
×
316
        _destination.PreInitialize(_activator, _destinationDatabase, listener);
34✔
317

318

319
        return _destination;
34✔
320
    }
×
321

322
    private void PrimeDestinationTypesBasedOnCatalogueTypes(IDataLoadEventListener listener, DataTable toProcess)
323
    {
324
        //if the extraction is of a Catalogue
325

326
        if (_request is not IExtractDatasetCommand datasetCommand)
34!
327
            return;
×
328

329
        //for every extractable column in the Catalogue
330
        foreach (var extractionInformation in datasetCommand.ColumnsToExtract.OfType<ExtractableColumn>()
2,512✔
331
                     .Select(ec =>
34✔
332
                         ec.CatalogueExtractionInformation))
1,256✔
333
        {
334
            if (extractionInformation == null)
1,222✔
335
                continue;
336

337
            var catItem = extractionInformation.CatalogueItem;
1,222✔
338

339
            //if we do not know the data type or the ei is a transform
340
            if (catItem == null)
1,222!
341
            {
342
                listener.OnNotify(this,
×
343
                    new NotifyEventArgs(ProgressEventType.Warning,
×
344
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it had no associated CatalogueItem"));
×
345
                continue;
×
346
            }
347

348
            if (catItem.ColumnInfo == null)
1,222!
349
            {
350
                listener.OnNotify(this,
×
351
                    new NotifyEventArgs(ProgressEventType.Warning,
×
352
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it had no associated ColumnInfo"));
×
353
                continue;
×
354
            }
355

356
            if (extractionInformation.IsProperTransform())
1,222✔
357
            {
358
                listener.OnNotify(this,
2✔
359
                    new NotifyEventArgs(ProgressEventType.Warning,
2✔
360
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it is a Transform"));
2✔
361
                continue;
2✔
362
            }
363

364
            var destinationType = GetDestinationDatabaseType(extractionInformation);
1,220✔
365

366
            //Tell the destination the datatype of the ColumnInfo that underlies the ExtractionInformation (this might be changed by the ExtractionInformation e.g. as a
367
            //transform but it is a good starting point.  We don't want to create a varchar(10) column in the destination if the origin dataset (Catalogue) is a varchar(100)
368
            //since it will just confuse the user.  Bear in mind these data types can be degraded later by the destination
369
            var columnName = extractionInformation.Alias ?? catItem.ColumnInfo.GetRuntimeName();
1,220✔
370
            var addedType = _destination.AddExplicitWriteType(columnName, destinationType);
1,220✔
371
            addedType.IsPrimaryKey = toProcess.PrimaryKey.Any(dc => dc.ColumnName == columnName);
2,436✔
372

373
            //if user wants to copy collation types and the destination server is the same type as the origin server
374
            if (CopyCollations && _destinationDatabase.Server.DatabaseType == catItem.ColumnInfo.TableInfo.DatabaseType)
1,220!
375
                addedType.Collation = catItem.ColumnInfo.Collation;
×
376

377
            listener.OnNotify(this,
1,220✔
378
                new NotifyEventArgs(ProgressEventType.Information,
1,220✔
379
                    $"Set Type for {columnName} to {destinationType} (IsPrimaryKey={(addedType.IsPrimaryKey ? "true" : "false")}) to match the source table"));
1,220✔
380
        }
381

382

383
        foreach (var sub in datasetCommand.QueryBuilder.SelectColumns.Select(static sc => sc.IColumn)
1,392✔
384
                     .OfType<ReleaseIdentifierSubstitution>())
34✔
385
        {
386
            var columnName = sub.GetRuntimeName();
34✔
387
            var isPk = toProcess.PrimaryKey.Any(dc => dc.ColumnName == columnName);
62✔
388

389
            var addedType = _destination.AddExplicitWriteType(columnName,
34✔
390
                datasetCommand.ExtractableCohort.GetReleaseIdentifierDataType());
34✔
391
            addedType.IsPrimaryKey = isPk;
34✔
392
            addedType.AllowNulls = !isPk;
34✔
393
        }
394
    }
34✔
395

396
    private string GetDestinationDatabaseType(ConcreteColumn col)
397
    {
398
        //Make sure we know if we are going between database types
399
        var fromDbType = _destinationDatabase.Server.DatabaseType;
1,220✔
400
        var toDbType = col.ColumnInfo.TableInfo.DatabaseType;
1,220✔
401
        if (fromDbType != toDbType)
1,220!
402
        {
403
            var fromSyntax = col.ColumnInfo.GetQuerySyntaxHelper();
×
404
            var toSyntax = _destinationDatabase.Server.GetQuerySyntaxHelper();
×
405

406
            var intermediate = fromSyntax.TypeTranslater.GetDataTypeRequestForSQLDBType(col.ColumnInfo.Data_type);
×
407
            return toSyntax.TypeTranslater.GetSQLDBTypeForCSharpType(intermediate);
×
408
        }
409

410
        return col.ColumnInfo.Data_type;
1,220✔
411
    }
412

413
    private string GetIndexName()
414
    {
415
        string indexName = IndexNamingPattern;
34✔
416
        var project = _request.Configuration.Project;
34✔
417
        indexName = indexName.Replace("$p", project.Name);
34✔
418
        indexName = indexName.Replace("$n", project.ProjectNumber.ToString());
34✔
419
        indexName = indexName.Replace("$c", _request.Configuration.Name);
34✔
420
        indexName = indexName.Replace("$e", _request.Configuration.ID.ToString());
34✔
421
        if (_request is ExtractDatasetCommand extractDatasetCommand)
34✔
422
        {
423
            indexName = indexName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
34✔
424
            indexName = indexName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
34✔
425
        }
426

427
        if (_request is ExtractGlobalsCommand)
34!
428
        {
429
            indexName = indexName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
×
430
            indexName = indexName.Replace("$a", "G");
×
431
        }
432

433

434
        return indexName.Replace(" ", "");
34✔
435
    }
436

437
    private string GetTableName(string suffix = null)
438
    {
439
        string tblName;
440
        if (_isTableAlreadyNamed)
40!
441
        {
442
            tblName = SanitizeNameForDatabase(_toProcess.TableName);
×
443

444
            if (!string.IsNullOrWhiteSpace(suffix))
×
445
                tblName += $"_{suffix}";
×
446

447
            return tblName;
×
448
        }
449

450
        tblName = TableNamingPattern;
40✔
451
        var project = _request.Configuration.Project;
40✔
452

453
        tblName = tblName.Replace("$p", project.Name);
40✔
454
        tblName = tblName.Replace("$n", project.ProjectNumber.ToString());
40✔
455
        tblName = tblName.Replace("$c", _request.Configuration.Name);
40✔
456
        tblName = tblName.Replace("$e", _request.Configuration.ID.ToString());
40✔
457

458
        if (_request is ExtractDatasetCommand extractDatasetCommand)
40✔
459
        {
460
            tblName = tblName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
38✔
461
            tblName = tblName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
38✔
462
        }
463

464
        if (_request is ExtractGlobalsCommand)
40✔
465
        {
466
            tblName = tblName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
2✔
467
            tblName = tblName.Replace("$a", "G");
2✔
468
        }
469

470
        var cachedGetTableNameAnswer = SanitizeNameForDatabase(tblName);
40✔
471
        if (!string.IsNullOrWhiteSpace(suffix))
40✔
472
            cachedGetTableNameAnswer += $"_{suffix}";
6✔
473

474
        return cachedGetTableNameAnswer;
40✔
475
    }
476

477
    private string SanitizeNameForDatabase(string tblName)
478
    {
479
        if (_destinationDatabase == null)
40!
480
            throw new Exception(
×
481
                "Cannot pick a TableName until we know what type of server it is going to, _server is null");
×
482

483
        //get rid of brackets and dots
484
        tblName = Regex.Replace(tblName, "[.()]", "_");
40✔
485

486
        var syntax = _destinationDatabase.Server.GetQuerySyntaxHelper();
40✔
487
        syntax.ValidateTableName(tblName);
40✔
488

489
        //otherwise, fetch and cache answer
490
        var cachedGetTableNameAnswer = syntax.GetSensibleEntityNameFromString(tblName);
40✔
491

492
        return string.IsNullOrWhiteSpace(cachedGetTableNameAnswer)
40!
493
            ? throw new Exception(
40✔
494
                $"TableNamingPattern '{TableNamingPattern}' resulted in an empty string for request '{_request}'")
40✔
495
            : cachedGetTableNameAnswer;
40✔
496
    }
497

498
    public override void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
499
    {
500
        if (_destination != null)
68✔
501
        {
502
            _destination.Dispose(listener, pipelineFailureExceptionIfAny);
34✔
503

504
            //if the extraction failed, the table didn't exist in the destination (i.e. the table was created during the extraction) and we are to DropTableIfLoadFails
505
            if (pipelineFailureExceptionIfAny != null && _tableDidNotExistAtStartOfLoad && DropTableIfLoadFails)
34!
506
                if (_destinationDatabase != null)
×
507
                {
508
                    var tbl = _destinationDatabase.ExpectTable(_toProcess.TableName);
×
509

510
                    if (tbl.Exists())
×
511
                    {
512
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
513
                            $"DropTableIfLoadFails is true so about to drop table {tbl}"));
×
514
                        tbl.Drop();
×
515
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Dropped table {tbl}"));
×
516
                    }
517
                }
518

519
            if (pipelineFailureExceptionIfAny == null
34!
520
                && _request.IsBatchResume
34✔
521
                && MakeFinalTableDistinctWhenBatchResuming
34✔
522
                && _destinationDatabase != null
34✔
523
                && _toProcess != null)
34✔
524
            {
525
                var tbl = _destinationDatabase.ExpectTable(_toProcess.TableName);
×
526
                if (tbl.Exists())
×
527
                    // if there is no primary key then failed batches may have introduced duplication
528
                    if (!tbl.DiscoverColumns().Any(p => p.IsPrimaryKey))
×
529
                    {
530
                        listener.OnNotify(this,
×
531
                            new NotifyEventArgs(ProgressEventType.Information,
×
532
                                $"Making {tbl} distinct in case there are duplicate rows from bad batch resumes"));
×
533
                        tbl.MakeDistinct(50000000);
×
534
                        listener.OnNotify(this,
×
535
                            new NotifyEventArgs(ProgressEventType.Information, $"Finished distincting {tbl}"));
×
536
                    }
537
            }
538
        }
539

540
        TableLoadInfo?.CloseAndArchive();
68✔
541

542
        // also close off the cumulative extraction result
543
        if (_request is ExtractDatasetCommand)
68✔
544
        {
545
            var result = ((IExtractDatasetCommand)_request).CumulativeExtractionResults;
34✔
546
            if (result != null && _toProcess != null)
34✔
547
                result.CompleteAudit(GetType(), GetDestinationDescription(), TableLoadInfo.Inserts,
34✔
548
                    _request.IsBatchResume, pipelineFailureExceptionIfAny != null);
34✔
549
        }
550
    }
68✔
551

552
    public override void Abort(IDataLoadEventListener listener)
553
    {
554
        _destination?.Abort(listener);
×
555
    }
×
556

557
    protected override void PreInitializeImpl(IBasicActivateItems activator, IExtractCommand value, IDataLoadEventListener listener)
558
    {
559
        _activator = activator;
142✔
560
    }
142✔
561

562

563
    public override string GetDestinationDescription() => GetDestinationDescription("");
66✔
564

565
    private string GetDestinationDescription(string suffix = "")
566
    {
567
        if (_toProcess == null)
100✔
568
            return _request is ExtractGlobalsCommand
34!
569
                ? "Globals"
34✔
570
                : throw new Exception("Could not describe destination because _toProcess was null");
34✔
571

572
        var tblName = _toProcess.TableName;
66✔
573
        var dbName = GetDatabaseName();
66✔
574
        return $"{TargetDatabaseServer.ID}|{dbName}|{tblName}";
66✔
575
    }
576

577
    public static DestinationType GetDestinationType() => DestinationType.Database;
×
578

579
    public override ReleasePotential GetReleasePotential(IRDMPPlatformRepositoryServiceLocator repositoryLocator,
580
        ISelectedDataSets selectedDataSet) => new MsSqlExtractionReleasePotential(repositoryLocator, selectedDataSet);
×
581

582
    public override FixedReleaseSource<ReleaseAudit> GetReleaseSource(ICatalogueRepository catalogueRepository) =>
583
        new MsSqlReleaseSource(catalogueRepository);
×
584

585
    public override GlobalReleasePotential GetGlobalReleasabilityEvaluator(
586
        IRDMPPlatformRepositoryServiceLocator repositoryLocator, ISupplementalExtractionResults globalResult,
587
        IMapsDirectlyToDatabaseTable globalToCheck) =>
588
        new MsSqlGlobalsReleasePotential(repositoryLocator, globalResult, globalToCheck);
×
589

590
    protected override void TryExtractSupportingSQLTableImpl(SupportingSQLTable sqlTable, DirectoryInfo directory,
591
        IExtractionConfiguration configuration, IDataLoadEventListener listener, out int linesWritten,
592
        out string destinationDescription)
593
    {
594
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
4✔
595
            $"About to download SQL for global SupportingSQL {sqlTable.SQL}"));
4✔
596
        using var con = sqlTable.GetServer().GetConnection();
4✔
597
        con.Open();
4✔
598

599
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
4✔
600
            $"Connection opened successfully, about to send SQL command {sqlTable.SQL}"));
4✔
601

602
        using var dt = new DataTable();
4✔
603
        using (var cmd = DatabaseCommandHelper.GetCommand(sqlTable.SQL, con))
4✔
604
        using (var da = DatabaseCommandHelper.GetDataAdapter(cmd))
4✔
605
        {
606
            var sw = Stopwatch.StartNew();
4✔
607
            dt.BeginLoadData();
4✔
608
            da.Fill(dt);
4✔
609
            dt.EndLoadData();
4✔
610
        }
4✔
611

612
        dt.TableName = GetTableName(_destinationDatabase.Server.GetQuerySyntaxHelper()
4✔
613
            .GetSensibleEntityNameFromString(sqlTable.Name));
4✔
614
        linesWritten = dt.Rows.Count;
4✔
615

616
        var destinationDb = GetDestinationDatabase(listener);
4✔
617
        var tbl = destinationDb.ExpectTable(dt.TableName);
4✔
618

619
        if (tbl.Exists())
4!
620
            tbl.Drop();
×
621

622
        destinationDb.CreateTable(dt.TableName, dt);
4✔
623
        destinationDescription = $"{TargetDatabaseServer.ID}|{GetDatabaseName()}|{dt.TableName}";
4✔
624
    }
8✔
625

626

627
    protected override void TryExtractLookupTableImpl(BundledLookupTable lookup, DirectoryInfo lookupDir,
628
        IExtractionConfiguration requestConfiguration, IDataLoadEventListener listener, out int linesWritten,
629
        out string destinationDescription)
630
    {
631
        using var dt = lookup.GetDataTable();
2✔
632

633
        dt.TableName = GetTableName(_destinationDatabase.Server.GetQuerySyntaxHelper()
2✔
634
            .GetSensibleEntityNameFromString(lookup.TableInfo.Name));
2✔
635

636
        //describe the destination for the abstract base
637
        destinationDescription = $"{TargetDatabaseServer.ID}|{GetDatabaseName()}|{dt.TableName}";
2✔
638
        linesWritten = dt.Rows.Count;
2✔
639

640
        var destinationDb = GetDestinationDatabase(listener);
2✔
641
        var existing = destinationDb.ExpectTable(dt.TableName);
2✔
642

643
        if (existing.Exists())
2!
644
        {
645
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
646
                $"Dropping existing Lookup table '{existing.GetFullyQualifiedName()}'"));
×
647
            existing.Drop();
×
648
        }
649

650
        destinationDb.CreateTable(dt.TableName, dt);
2✔
651
    }
4✔
652

653
    private DiscoveredDatabase GetDestinationDatabase(IDataLoadEventListener listener)
654
    {
655
        //tell user we are about to inspect it
656
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
74✔
657
            $"About to open connection to {TargetDatabaseServer}"));
74✔
658

659
        var databaseName = GetDatabaseName();
74✔
660

661
        var discoveredServer = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
74✔
662

663
        var db = discoveredServer.ExpectDatabase(databaseName);
74✔
664
        if (!db.Exists())
74✔
665
            db.Create();
10✔
666

667
        return db;
74✔
668
    }
669

670
    private string GetDatabaseName()
671
    {
672
        var dbName = DatabaseNamingPattern;
152✔
673

674
        if (_project.ProjectNumber == null)
152!
675
            throw new ProjectNumberException($"Project '{_project}' must have a ProjectNumber");
×
676

677
        if (_request == null)
152!
678
            throw new Exception("No IExtractCommand Request was passed to this component");
×
679

680
        if (_request.Configuration == null)
152!
681
            throw new Exception($"Request did not specify any Configuration for Project '{_project}'");
×
682

683
        dbName = dbName.Replace("$p", _project.Name)
152✔
684
            .Replace("$n", _project.ProjectNumber.ToString())
152✔
685
            .Replace("$t", _project.MasterTicket)
152✔
686
            .Replace("$r", _request.Configuration.RequestTicket)
152✔
687
            .Replace("$l", _request.Configuration.ReleaseTicket)
152✔
688
            .Replace("$e", _request.Configuration.ID.ToString());
152✔
689
        return dbName;
152✔
690
    }
691

692
    public override void Check(ICheckNotifier notifier)
693
    {
694
        if (TargetDatabaseServer == null)
10✔
695
        {
696
            notifier.OnCheckPerformed(new CheckEventArgs(
2✔
697
                "Target database server property has not been set (This component does not know where to extract data to!), " +
2✔
698
                "to fix this you must edit the pipeline and choose an ExternalDatabaseServer to extract to)",
2✔
699
                CheckResult.Fail));
2✔
700
            return;
2✔
701
        }
702

703
        if (string.IsNullOrWhiteSpace(TargetDatabaseServer.Server))
8✔
704
        {
705
            notifier.OnCheckPerformed(new CheckEventArgs("TargetDatabaseServer does not have a .Server specified",
2✔
706
                CheckResult.Fail));
2✔
707
            return;
2✔
708
        }
709

710
        if (!string.IsNullOrWhiteSpace(TargetDatabaseServer.Database))
6!
711
            notifier.OnCheckPerformed(new CheckEventArgs(
×
712
                "TargetDatabaseServer has .Database specified but this will be ignored!", CheckResult.Warning));
×
713

714
        if (string.IsNullOrWhiteSpace(TableNamingPattern))
6!
715
        {
716
            notifier.OnCheckPerformed(new CheckEventArgs(
×
717
                "You must specify TableNamingPattern, this will tell the component how to name tables it generates in the remote destination",
×
718
                CheckResult.Fail));
×
719
            return;
×
720
        }
721

722
        if (string.IsNullOrWhiteSpace(DatabaseNamingPattern))
6!
723
        {
724
            notifier.OnCheckPerformed(new CheckEventArgs(
×
725
                "You must specify DatabaseNamingPattern, this will tell the component what database to create or use in the remote destination",
×
726
                CheckResult.Fail));
×
727
            return;
×
728
        }
729

730
        if (!DatabaseNamingPattern.Contains("$p") && !DatabaseNamingPattern.Contains("$n") &&
6✔
731
            !DatabaseNamingPattern.Contains("$t") && !DatabaseNamingPattern.Contains("$r") &&
6✔
732
            !DatabaseNamingPattern.Contains("$l"))
6✔
733
            notifier.OnCheckPerformed(new CheckEventArgs(
4✔
734
                "DatabaseNamingPattern does not contain any token. The tables may be created alongside existing tables and Release would be impossible.",
4✔
735
                CheckResult.Warning));
4✔
736

737
        if (!TableNamingPattern.Contains("$d") && !TableNamingPattern.Contains("$a"))
6!
738
            notifier.OnCheckPerformed(new CheckEventArgs(
×
739
                "TableNamingPattern must contain either $d or $a, the name/acronym of the dataset being extracted otherwise you will get collisions when you extract multiple tables at once",
×
740
                CheckResult.Warning));
×
741

742
        if (_request == ExtractDatasetCommand.EmptyCommand)
6!
743
        {
744
            notifier.OnCheckPerformed(new CheckEventArgs(
×
745
                "Request is ExtractDatasetCommand.EmptyCommand, will not try to connect to Database",
×
746
                CheckResult.Warning));
×
747
            return;
×
748
        }
749

750
        if (TableNamingPattern != null && TableNamingPattern.Contains("$a"))
6!
751
            if (_request is ExtractDatasetCommand dsRequest && string.IsNullOrWhiteSpace(dsRequest.Catalogue.Acronym))
×
752
                notifier.OnCheckPerformed(new CheckEventArgs(
×
753
                    $"Catalogue '{dsRequest.Catalogue}' does not have an Acronym but TableNamingPattern contains $a",
×
754
                    CheckResult.Fail));
×
755

756

757

758
        base.Check(notifier);
6✔
759

760
        try
761
        {
762
            var server = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
6✔
763
            var database = _destinationDatabase = server.ExpectDatabase(GetDatabaseName());
6✔
764

765
            if (UseArchiveTrigger)
6!
766
            {
767
                if (_request is ExtractDatasetCommand dsRequest)
×
768
                {
769
                    var existing = _destinationDatabase.ExpectTable(dsRequest.Catalogue.Name);
×
770
                    if (existing.Exists())
×
771
                    {
772
                        var hasPKs = existing.DiscoverColumns().Any(col => col.IsPrimaryKey);
×
773
                        if (!hasPKs)
×
774
                        {
775
                            notifier.OnCheckPerformed(new CheckEventArgs(
×
776
                               $"Catalogue does not have any PKS. Cannot apply the archive trigger",
×
777
                               CheckResult.Fail));
×
778
                        }
779
                    }
780
                }
781
            }
782

783
            if (database.Exists())
6✔
784
            {
785
                notifier.OnCheckPerformed(
2✔
786
                    new CheckEventArgs(
2✔
787
                        $"Database {database} already exists! if an extraction has already been run you may have errors if you are re-extracting the same tables",
2✔
788
                        CheckResult.Warning));
2✔
789
            }
790
            else
791
            {
792
                notifier.OnCheckPerformed(
4✔
793
                    new CheckEventArgs(
4✔
794
                        $"Database {database} does not exist on server... it will be created at runtime",
4✔
795
                        CheckResult.Success));
4✔
796
                return;
4✔
797
            }
798

799
            var tables = database.DiscoverTables(false);
2✔
800

801
            if (tables.Any())
2!
802
            {
803
                string tableName;
804

805
                try
806
                {
807
                    tableName = GetTableName();
×
808
                }
×
809
                catch (Exception ex)
×
810
                {
811
                    notifier.OnCheckPerformed(
×
812
                        new CheckEventArgs("Could not determine table name", CheckResult.Fail, ex));
×
813
                    return;
×
814
                }
815

816
                // if the expected table exists and we are not doing a batch resume or allowing data appending
817
                if (tables.Any(t => t.GetRuntimeName().Equals(tableName)) && !_request.IsBatchResume && !AppendDataIfTableExists)
×
818
                    notifier.OnCheckPerformed(new CheckEventArgs(ErrorCodes.ExistingExtractionTableInDatabase,
×
819
                        tableName, database));
×
820
            }
821
            else
822
            {
823
                notifier.OnCheckPerformed(new CheckEventArgs($"Confirmed that database {database} is empty of tables",
2✔
824
                    CheckResult.Success));
2✔
825
            }
826
        }
2✔
827
        catch (Exception e)
×
828
        {
829
            notifier.OnCheckPerformed(new CheckEventArgs(
×
830
                $"Could not connect to TargetDatabaseServer '{TargetDatabaseServer}'", CheckResult.Fail, e));
×
831
        }
×
832
    }
6✔
833
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc