• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 26959250291

04 Jun 2026 02:46PM UTC coverage: 57.01% (+0.04%) from 56.971%
26959250291

Pull #2343

github

JFriel
tidy up
Pull Request #2343: Task/rdmp 376 extraction structural changes

11579 of 21845 branches covered (53.01%)

Branch coverage included in aggregate %.

31 of 61 new or added lines in 3 files covered. (50.82%)

152 existing lines in 5 files now uncovered.

32700 of 55824 relevant lines covered (58.58%)

9032.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.68
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/ExecuteFullExtractionToDatabaseMSSql.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using Amazon.Auth.AccessControlPolicy;
8
using FAnsi.Discovery;
9
using NPOI.SS.Formula.Functions;
10
using Rdmp.Core.CommandExecution;
11
using Rdmp.Core.Curation.Data;
12
using Rdmp.Core.DataExport.Data;
13
using Rdmp.Core.DataExport.DataExtraction.Commands;
14
using Rdmp.Core.DataExport.DataExtraction.UserPicks;
15
using Rdmp.Core.DataExport.DataRelease.Pipeline;
16
using Rdmp.Core.DataExport.DataRelease.Potential;
17
using Rdmp.Core.DataFlowPipeline;
18
using Rdmp.Core.DataLoad.Engine.Job.Scheduling;
19
using Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
20
using Rdmp.Core.DataLoad.Triggers;
21
using Rdmp.Core.DataLoad.Triggers.Exceptions;
22
using Rdmp.Core.DataLoad.Triggers.Implementations;
23
using Rdmp.Core.MapsDirectlyToDatabaseTable;
24
using Rdmp.Core.QueryBuilding;
25
using Rdmp.Core.Repositories;
26
using Rdmp.Core.ReusableLibraryCode;
27
using Rdmp.Core.ReusableLibraryCode.Checks;
28
using Rdmp.Core.ReusableLibraryCode.DataAccess;
29
using Rdmp.Core.ReusableLibraryCode.Progress;
30
using System;
31
using System.Data;
32
using System.Diagnostics;
33
using System.IO;
34
using System.Linq;
35
using System.Text.RegularExpressions;
36
using YamlDotNet.Core;
37

38
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations;
39

40
/// <summary>
41
/// Alternate extraction pipeline destination in which the DataTable containing the extracted dataset is written to an Sql Server database
42
/// </summary>
43
public class ExecuteFullExtractionToDatabaseMSSql : ExtractionDestination
44
{
45
    [DemandsInitialization(
46
        "External server to create the extraction into, a new database will be created for the project based on the naming pattern provided",
47
        Mandatory = true)]
48
    public IExternalDatabaseServer TargetDatabaseServer { get; set; }
185✔
49

50
    [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
51
         $p - Project Name ('e.g. My Project')
52
         $n - Project Number (e.g. 234)
53
         $t - Master Ticket (e.g. 'LINK-1234')
54
         $r - Request Ticket (e.g. 'LINK-1234')
55
         $l - Release Ticket (e.g. 'LINK-1234')
56
         $e - Extraction Configuration Id (e.g. 14)
57
         ", Mandatory = true, DefaultValue = "Proj_$n_$l")]
58
    public string DatabaseNamingPattern { get; set; }
128✔
59

60
    [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
61
         $p - Project Name ('e.g. My Project')
62
         $n - Project Number (e.g. 234)
63
         $c - Configuration Name (e.g. 'Cases')
64
         $d - Dataset name (e.g. 'Prescribing')
65
         $a - Dataset acronym (e.g. 'Presc') 
66
         $e - Extraction Configuration Id (e.g. 14)
67
         You must have either $a or $d
68
         ", Mandatory = true, DefaultValue = "$c_$d")]
69
    public string TableNamingPattern { get; set; }
69✔
70

71
    [DemandsInitialization(
72
        @"If the extraction fails half way through AND the destination table was created during the extraction then the table will be dropped from the destination rather than being left in a half loaded state ",
73
        defaultValue: true)]
74
    public bool DropTableIfLoadFails { get; set; }
34✔
75

76
    [DemandsInitialization(DataTableUploadDestination.AlterTimeout_Description, DefaultValue = 300)]
77
    public int AlterTimeout { get; set; }
51✔
78

79
    [DemandsInitialization("By applying the primary keys after writing the data, it ensures all data is extracted. Disabling this configuration may improve performance but will quickly raise issues with poorly keyed data.", DefaultValue = true)]
80
    public bool WriteDataBeforeApplyingPrimaryKeys { get; set; }
51✔
81

82
    [DemandsInitialization(
83
        "True to copy the column collations from the source database when creating the destination database.  Only works if both the source and destination have the same DatabaseType.  Excludes columns which feature a transform as part of extraction.",
84
        DefaultValue = false)]
85
    public bool CopyCollations { get; set; }
644✔
86

87
    [DemandsInitialization(
88
        "True to always drop the destination database table(s) from the destination if they already existed",
89
        DefaultValue = false)]
90
    public bool AlwaysDropExtractionTables { get; set; }
50✔
91

92
    [DemandsInitialization(
93
        "True to apply a distincting operation to the final table when using an ExtractionProgress.  This prevents data duplication from failed batch resumes.",
94
        DefaultValue = true)]
95
    public bool MakeFinalTableDistinctWhenBatchResuming { get; set; } = true;
73✔
96

97

98
    [DemandsInitialization("If this extraction has already been run, it will append the extraction data into the database. There is no duplication protection with this functionality.")]
99
    public bool AppendDataIfTableExists { get; set; } = false;
68✔
100

101
    [DemandsInitialization("If checked, a column names 'extraction_timestamp' will be included in the extraction that denotes the time the record was added to the extraction.")]
102
    public bool IncludeTimeStamp { get; set; } = false;
51✔
103

104

105
    [DemandsInitialization("If checked, indexed will be created using the primary keys specified")]
106
    public bool IndexTables { get; set; } = true;
90✔
107

108
    [DemandsInitialization(@"How do you want to name the created index, use the following tokens if you need them:   
109
         $p - Project Name ('e.g. My Project')
110
         $n - Project Number (e.g. 234)
111
         $c - Configuration Name (e.g. 'Cases')
112
         $d - Dataset name (e.g. 'Prescribing')
113
         $a - Dataset acronym (e.g. 'Presc') 
114
         $e - Extraction Configuration Id (e.g. 14)
115
         You must have either $a or $d
116
         ", DefaultValue = "Index_$c_$d")]
117
    public string IndexNamingPattern { get; set; }
51✔
118

119
    [DemandsInitialization("An optional list of columns to index on e.g \"Column1, Column2\"")]
120
    public string UserDefinedIndex { get; set; }
51✔
121

122

123
    [DemandsInitialization("When writing to an existing database, will update records and store historical versions via a view", DefaultValue = false)]
124
    public bool UseArchiveTrigger { get; set; }
62✔
125

126
    private DiscoveredDatabase _destinationDatabase;
127
    private DataTableUploadDestination _destination;
128

129
    private bool _tableDidNotExistAtStartOfLoad;
130
    private bool _isTableAlreadyNamed;
131
    private DataTable _toProcess;
132
    private IBasicActivateItems _activator;
133

134
    public ExecuteFullExtractionToDatabaseMSSql() : base(false)
39✔
135
    {
136
    }
39✔
137

138
    public override DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener job,
139
        GracefulCancellationToken cancellationToken)
140
    {
141
        _destinationDatabase = GetDestinationDatabase(job);
34✔
142
        return base.ProcessPipelineData(toProcess, job, cancellationToken);
34✔
143
    }
144

145
    protected override void Open(DataTable toProcess, IDataLoadEventListener job,
146
        GracefulCancellationToken cancellationToken)
147
    {
148
        _toProcess = toProcess;
17✔
149

150
        //give the data table the correct name
151
        if (_toProcess.ExtendedProperties.ContainsKey("ProperlyNamed") &&
17!
152
            _toProcess.ExtendedProperties["ProperlyNamed"].Equals(true))
17✔
153
            _isTableAlreadyNamed = true;
×
154

155
        _toProcess.TableName = GetTableName();
17✔
156

157
        _destination = PrepareDestination(job, _toProcess);
17✔
158
        OutputFile = _toProcess.TableName;
17✔
159
    }
17✔
160

161
    protected override void WriteRows(DataTable toProcess, IDataLoadEventListener job,
162
        GracefulCancellationToken cancellationToken, Stopwatch stopwatch)
163
    {
164
        // empty batches are allowed when using batch/resume
165
        if (toProcess.Rows.Count == 0 && _request.IsBatchResume) return;
17!
166

167
        if (_request.IsBatchResume) _destination.AllowLoadingPopulatedTables = true;
17!
168

169
        _destination.ProcessPipelineData(toProcess, job, cancellationToken);
17✔
170

171
        LinesWritten += toProcess.Rows.Count;
16✔
172
    }
16✔
173

174

175
    private bool hasStructuralChanges(DataTable source, DiscoveredTable destination)
176
    {
177
        var sourceColumns = source.Columns.Cast<DataColumn>().Select(c => c.ColumnName).ToList();
320✔
178
        var destinationColumns = destination.DiscoverColumns().Select(c => c.GetRuntimeName()).ToList();
323✔
179
        return !sourceColumns.All(destinationColumns.Contains) || !destinationColumns.All(sourceColumns.Contains);
8!
180
    }
181

182

183
    private DataTableUploadDestination PrepareDestination(IDataLoadEventListener listener, DataTable toProcess)
184
    {
185
        //see if the user has entered an extraction server/database
186
        if (TargetDatabaseServer == null)
17!
187
            throw new Exception(
×
188
                "TargetDatabaseServer (the place you want to extract the project data to) property has not been set!");
×
189

190
        try
191
        {
192
            if (!_destinationDatabase.Exists())
17!
193
                _destinationDatabase.Create();
×
194

195
            if (_request is ExtractGlobalsCommand)
17!
196
                return null;
×
197

198
            var tblName = _toProcess.TableName;
17✔
199

200
            //See if table already exists on the server (likely to cause problems including duplication, schema changes in configuration etc)
201
            var existing = _destinationDatabase.ExpectTable(tblName);
17✔
202
            if (existing.Exists())
17✔
203
            {
204
                var hasPKs = existing.DiscoverColumns().Any(col => col.IsPrimaryKey);
54✔
205
                TriggerImplementerFactory triggerFactory = new TriggerImplementerFactory(FAnsi.DatabaseType.MicrosoftSQLServer);
8✔
206
                var implementor = triggerFactory.Create(existing);
8✔
207
                bool triggerPresent;
208
                try
209
                {
210
                    triggerPresent = implementor.GetTriggerStatus() == DataLoad.Triggers.TriggerStatus.Enabled;
8✔
211
                }
8✔
NEW
212
                catch (TriggerMissingException)
×
213
                {
NEW
214
                    triggerPresent = false;
×
NEW
215
                }
×
216
                if (!AlwaysDropExtractionTables)
8✔
217
                {
218
                    //check the PKs are the same
219
                    var remotePKs = existing.DiscoverColumns().Where(col => col.IsPrimaryKey).Select(col => col.GetRuntimeName()).ToList();
331✔
220
                    var rdmpPKs = toProcess.PrimaryKey.Cast<DataColumn>().Select(col => col.ColumnName).ToList();
16✔
221
                    if (!remotePKs.All(rdmpPKs.Contains) || remotePKs.Count != rdmpPKs.Count)
8!
222
                    {
223
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error,
×
224
                            $"""
×
225
                        Table {existing.GetFullyQualifiedName()} already exists and has different PKs to the source table.                            
×
226
                        Source PKs: {string.Join(", ", rdmpPKs)}
×
227
                        Destination PKs: {string.Join(", ", remotePKs)}
×
228
                        """));
×
NEW
229
                        return null;//todo this error could be better
×
230
                    }
231
                    if (hasStructuralChanges(toProcess, existing))
8✔
232
                    {
233
                        var sourceColumns = toProcess.Columns.Cast<DataColumn>().Select(c => c.ColumnName).ToList();
120✔
234
                        var destinationColumns = existing.DiscoverColumns().Select(c => c.GetRuntimeName()).ToList();
123✔
235

236
                        if (triggerPresent && destinationColumns.Except(sourceColumns).Where(c => !SpecialFieldNames.IsHicPrefixed(c)).Any())//only mess about with column removal if there is an archive trigger
6!
237
                        {
238

239
                            //move everything into the archive - do this by updating the HIC_validfrom
NEW
240
                            var sql = $"UPDATE {existing.GetFullyQualifiedName()} set {SpecialFieldNames.ValidFrom} = GETDATE()";
×
NEW
241
                            using var con = _destinationDatabase.Server.GetConnection();
×
NEW
242
                            con.Open();
×
NEW
243
                            using var cmd = _destinationDatabase.Server.GetCommand(sql, con);
×
NEW
244
                            cmd.CommandTimeout = 30000;
×
NEW
245
                            cmd.ExecuteNonQuery();
×
246

NEW
247
                            var removedColumns = destinationColumns.Except(sourceColumns).Where(c => !SpecialFieldNames.IsHicPrefixed(c));
×
NEW
248
                            foreach (var column in removedColumns)
×
249
                            {
NEW
250
                                var discoveredColumn = existing.DiscoverColumn(column);
×
NEW
251
                                existing.DropColumn(discoveredColumn);
×
252
                            }
NEW
253
                            string triggerProblems = "";
×
NEW
254
                            string triggerOK = "";
×
NEW
255
                            implementor.DropTrigger(out triggerProblems, out triggerOK);
×
NEW
256
                            if (triggerProblems != "")
×
257
                            {
NEW
258
                                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, triggerProblems));
×
259
                            }
260

NEW
261
                            existing = _destinationDatabase.ExpectTable(tblName);
×
NEW
262
                            implementor = triggerFactory.Create(existing);
×
263
                            try
264
                            {
NEW
265
                                triggerPresent = implementor.GetTriggerStatus() == DataLoad.Triggers.TriggerStatus.Enabled;
×
NEW
266
                            }
×
NEW
267
                            catch (TriggerMissingException)
×
268
                            {
NEW
269
                                triggerPresent = false;
×
NEW
270
                            }
×
271
                        }
272
                    }
273

274
                }
275

276
                if (_request.IsBatchResume)
8!
277
                {
278
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
×
UNCOV
279
                        $"Table {existing.GetFullyQualifiedName()} already exists but it IsBatchResume so no problem."));
×
280
                }
281
                else if (AlwaysDropExtractionTables)
8!
282
                {
283
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
284
                        $"Table {existing.GetFullyQualifiedName()} already exists, dropping because setting {nameof(AlwaysDropExtractionTables)} is on"));
×
UNCOV
285
                    existing.Drop();
×
286

287
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
UNCOV
288
                        $"Table {existing.GetFullyQualifiedName()} was dropped"));
×
289

290
                    // since we dropped it we should treat it as if it was never there to begin with
UNCOV
291
                    _tableDidNotExistAtStartOfLoad = true;
×
292
                }
293
                else if (UseArchiveTrigger && hasPKs)
8✔
294
                {
295
                    //check the columns are correct, we might have added some
296
                    var existingColumns = existing.DiscoverColumns();
3✔
297
                    var existingColumnNames = existingColumns.Select(ec => ec.GetRuntimeName());
2,343✔
298
                    var toProcessColumnNames = toProcess.Columns.Cast<DataColumn>().Select(col => col.ColumnName);
120✔
299
                    var newColumns = toProcessColumnNames.Where(c => !existingColumnNames.Contains(c));
120✔
300
                    if (newColumns.Any())
3!
301
                    {
302
                        var archiveTable = _destinationDatabase.ExpectTable(tblName + "_Archive");
×
UNCOV
303
                        if (archiveTable.Exists())
×
304
                        {
UNCOV
305
                            foreach (var column in newColumns)
×
306
                            {
307
                                existing.AddColumn(column, new TypeGuesser.DatabaseTypeRequest(toProcess.Columns[column].DataType), true, 30000);
×
NEW
308
                                if (archiveTable.DiscoverColumns().All(col => col.GetRuntimeName() != column))
×
309
                                {
NEW
310
                                    archiveTable.AddColumn(column, new TypeGuesser.DatabaseTypeRequest(toProcess.Columns[column].DataType), true, 30000);
×
311
                                }
312
                            }
NEW
313
                            if (triggerPresent)
×
314
                            {
315
                                string triggerProblems = "";
×
316
                                string triggerOK = "";
×
317
                                implementor.DropTrigger(out triggerProblems, out triggerOK);
×
UNCOV
318
                                if (triggerProblems != "")
×
319
                                {
UNCOV
320
                                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, triggerProblems));
×
321
                                }
322

323
                                existing = _destinationDatabase.ExpectTable(tblName);
×
324
                                implementor = triggerFactory.Create(existing);
×
NEW
325
                                triggerPresent = false;
×
326
                            }
327
                        }
328
                    }
329

330
                    if (!triggerPresent)
3!
331
                    {
UNCOV
332
                        implementor.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
×
333
                    }
334

335
                }
336
                else
337
                {
338
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
5✔
339
                        $"A table called {tblName} already exists on server {TargetDatabaseServer}, data load might crash if it is populated and/or has an incompatible schema"));
5✔
340
                }
341
            }
342
            else
343
            {
344
                _tableDidNotExistAtStartOfLoad = true;
9✔
345
            }
346
        }
17✔
UNCOV
347
        catch (Exception e)
×
348
        {
349
            //Probably the database didn't exist or the credentials were wrong or something
350
            listener.OnNotify(this,
×
351
                new NotifyEventArgs(ProgressEventType.Error,
×
352
                    "Failed to inspect destination for already existing datatables", e));
×
UNCOV
353
        }
×
354

355
        _destination = new DataTableUploadDestination(((IExtractDatasetCommand)_request).ExtractableCohort.ExternalCohortTable);
17✔
356

357
        PrimeDestinationTypesBasedOnCatalogueTypes(listener, toProcess);
17✔
358

359
        _destination.AllowResizingColumnsAtUploadTime = true;
17✔
360
        _destination.AlterTimeout = AlterTimeout;
17✔
361
        _destination.WriteDataBeforeApplyingPrimaryKeys = WriteDataBeforeApplyingPrimaryKeys;
17✔
362
        _destination.AppendDataIfTableExists = AppendDataIfTableExists;
17✔
363
        _destination.IncludeTimeStamp = IncludeTimeStamp;
17✔
364
        _destination.UseTrigger = AppendDataIfTableExists;
17✔
365
        _destination.IndexTables = IndexTables;
17✔
366
        _destination.UseTrigger = UseArchiveTrigger;
17✔
367
        _destination.IndexTableName = GetIndexName();
17✔
368
        if (UserDefinedIndex is not null)
17!
UNCOV
369
            _destination.UserDefinedIndexes = UserDefinedIndex.Split(',').Select(i => i.Trim()).ToList();
×
370
        _destination.PreInitialize(_activator, _destinationDatabase, listener);
17✔
371

372

373
        return _destination;
17✔
UNCOV
374
    }
×
375

376
    private void PrimeDestinationTypesBasedOnCatalogueTypes(IDataLoadEventListener listener, DataTable toProcess)
377
    {
378
        //if the extraction is of a Catalogue
379

380
        if (_request is not IExtractDatasetCommand datasetCommand)
17!
UNCOV
381
            return;
×
382

383
        //for every extractable column in the Catalogue
384
        foreach (var extractionInformation in datasetCommand.ColumnsToExtract.OfType<ExtractableColumn>()
1,256✔
385
                     .Select(ec =>
17✔
386
                         ec.CatalogueExtractionInformation))
628✔
387
        {
388
            if (extractionInformation == null)
611✔
389
                continue;
390

391
            var catItem = extractionInformation.CatalogueItem;
611✔
392

393
            //if we do not know the data type or the ei is a transform
394
            if (catItem == null)
611!
395
            {
396
                listener.OnNotify(this,
×
397
                    new NotifyEventArgs(ProgressEventType.Warning,
×
398
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it had no associated CatalogueItem"));
×
UNCOV
399
                continue;
×
400
            }
401

402
            if (catItem.ColumnInfo == null)
611!
403
            {
404
                listener.OnNotify(this,
×
405
                    new NotifyEventArgs(ProgressEventType.Warning,
×
406
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it had no associated ColumnInfo"));
×
UNCOV
407
                continue;
×
408
            }
409

410
            if (extractionInformation.IsProperTransform())
611✔
411
            {
412
                listener.OnNotify(this,
1✔
413
                    new NotifyEventArgs(ProgressEventType.Warning,
1✔
414
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it is a Transform"));
1✔
415
                continue;
1✔
416
            }
417

418
            var destinationType = GetDestinationDatabaseType(extractionInformation);
610✔
419

420
            //Tell the destination the datatype of the ColumnInfo that underlies the ExtractionInformation (this might be changed by the ExtractionInformation e.g. as a
421
            //transform but it is a good starting point.  We don't want to create a varchar(10) column in the destination if the origin dataset (Catalogue) is a varchar(100)
422
            //since it will just confuse the user.  Bear in mind these data types can be degraded later by the destination
423
            var columnName = extractionInformation.Alias ?? catItem.ColumnInfo.GetRuntimeName();
610✔
424
            var addedType = _destination.AddExplicitWriteType(columnName, destinationType);
610✔
425
            addedType.IsPrimaryKey = toProcess.PrimaryKey.Any(dc => dc.ColumnName == columnName);
1,218✔
426

427
            //if user wants to copy collation types and the destination server is the same type as the origin server
428
            if (CopyCollations && _destinationDatabase.Server.DatabaseType == catItem.ColumnInfo.TableInfo.DatabaseType)
610!
UNCOV
429
                addedType.Collation = catItem.ColumnInfo.Collation;
×
430

431
            listener.OnNotify(this,
610✔
432
                new NotifyEventArgs(ProgressEventType.Information,
610✔
433
                    $"Set Type for {columnName} to {destinationType} (IsPrimaryKey={(addedType.IsPrimaryKey ? "true" : "false")}) to match the source table"));
610✔
434
        }
435

436

437
        foreach (var sub in datasetCommand.QueryBuilder.SelectColumns.Select(static sc => sc.IColumn)
696✔
438
                     .OfType<ReleaseIdentifierSubstitution>())
17✔
439
        {
440
            var columnName = sub.GetRuntimeName();
17✔
441
            var isPk = toProcess.PrimaryKey.Any(dc => dc.ColumnName == columnName);
31✔
442

443
            var addedType = _destination.AddExplicitWriteType(columnName,
17✔
444
                datasetCommand.ExtractableCohort.GetReleaseIdentifierDataType());
17✔
445
            addedType.IsPrimaryKey = isPk;
17✔
446
            addedType.AllowNulls = !isPk;
17✔
447
        }
448
    }
17✔
449

450
    private string GetDestinationDatabaseType(ConcreteColumn col)
451
    {
452
        //Make sure we know if we are going between database types
453
        var fromDbType = _destinationDatabase.Server.DatabaseType;
610✔
454
        var toDbType = col.ColumnInfo.TableInfo.DatabaseType;
610✔
455
        if (fromDbType != toDbType)
610!
456
        {
457
            var fromSyntax = col.ColumnInfo.GetQuerySyntaxHelper();
×
UNCOV
458
            var toSyntax = _destinationDatabase.Server.GetQuerySyntaxHelper();
×
459

460
            var intermediate = fromSyntax.TypeTranslater.GetDataTypeRequestForSQLDBType(col.ColumnInfo.Data_type);
×
UNCOV
461
            return toSyntax.TypeTranslater.GetSQLDBTypeForCSharpType(intermediate);
×
462
        }
463

464
        return col.ColumnInfo.Data_type;
610✔
465
    }
466

467
    private string GetIndexName()
468
    {
469
        string indexName = IndexNamingPattern;
17✔
470
        var project = _request.Configuration.Project;
17✔
471
        indexName = indexName.Replace("$p", project.Name);
17✔
472
        indexName = indexName.Replace("$n", project.ProjectNumber.ToString());
17✔
473
        indexName = indexName.Replace("$c", _request.Configuration.Name);
17✔
474
        indexName = indexName.Replace("$e", _request.Configuration.ID.ToString());
17✔
475
        if (_request is ExtractDatasetCommand extractDatasetCommand)
17✔
476
        {
477
            indexName = indexName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
17✔
478
            indexName = indexName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
17✔
479
        }
480

481
        if (_request is ExtractGlobalsCommand)
17!
482
        {
483
            indexName = indexName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
×
UNCOV
484
            indexName = indexName.Replace("$a", "G");
×
485
        }
486

487

488
        return indexName.Replace(" ", "");
17✔
489
    }
490

491
    private string GetTableName(string suffix = null)
492
    {
493
        string tblName;
494
        if (_isTableAlreadyNamed)
20!
495
        {
UNCOV
496
            tblName = SanitizeNameForDatabase(_toProcess.TableName);
×
497

498
            if (!string.IsNullOrWhiteSpace(suffix))
×
UNCOV
499
                tblName += $"_{suffix}";
×
500

UNCOV
501
            return tblName;
×
502
        }
503

504
        tblName = TableNamingPattern;
20✔
505
        var project = _request.Configuration.Project;
20✔
506

507
        tblName = tblName.Replace("$p", project.Name);
20✔
508
        tblName = tblName.Replace("$n", project.ProjectNumber.ToString());
20✔
509
        tblName = tblName.Replace("$c", _request.Configuration.Name);
20✔
510
        tblName = tblName.Replace("$e", _request.Configuration.ID.ToString());
20✔
511

512
        if (_request is ExtractDatasetCommand extractDatasetCommand)
20✔
513
        {
514
            tblName = tblName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
19✔
515
            tblName = tblName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
19✔
516
        }
517

518
        if (_request is ExtractGlobalsCommand)
20✔
519
        {
520
            tblName = tblName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
1✔
521
            tblName = tblName.Replace("$a", "G");
1✔
522
        }
523

524
        var cachedGetTableNameAnswer = SanitizeNameForDatabase(tblName);
20✔
525
        if (!string.IsNullOrWhiteSpace(suffix))
20✔
526
            cachedGetTableNameAnswer += $"_{suffix}";
3✔
527

528
        return cachedGetTableNameAnswer;
20✔
529
    }
530

531
    private string SanitizeNameForDatabase(string tblName)
532
    {
533
        if (_destinationDatabase == null)
20!
534
            throw new Exception(
×
UNCOV
535
                "Cannot pick a TableName until we know what type of server it is going to, _server is null");
×
536

537
        //get rid of brackets and dots
538
        tblName = Regex.Replace(tblName, "[.()]", "_");
20✔
539

540
        var syntax = _destinationDatabase.Server.GetQuerySyntaxHelper();
20✔
541
        syntax.ValidateTableName(tblName);
20✔
542

543
        //otherwise, fetch and cache answer
544
        var cachedGetTableNameAnswer = syntax.GetSensibleEntityNameFromString(tblName);
20✔
545

546
        return string.IsNullOrWhiteSpace(cachedGetTableNameAnswer)
20!
547
            ? throw new Exception(
20✔
548
                $"TableNamingPattern '{TableNamingPattern}' resulted in an empty string for request '{_request}'")
20✔
549
            : cachedGetTableNameAnswer;
20✔
550
    }
551

552
    public override void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
553
    {
554
        if (_destination != null)
34✔
555
        {
556
            _destination.Dispose(listener, pipelineFailureExceptionIfAny);
17✔
557

558
            //if the extraction failed, the table didn't exist in the destination (i.e. the table was created during the extraction) and we are to DropTableIfLoadFails
559
            if (pipelineFailureExceptionIfAny != null && _tableDidNotExistAtStartOfLoad && DropTableIfLoadFails)
17!
UNCOV
560
                if (_destinationDatabase != null)
×
561
                {
UNCOV
562
                    var tbl = _destinationDatabase.ExpectTable(_toProcess.TableName);
×
563

UNCOV
564
                    if (tbl.Exists())
×
565
                    {
566
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
567
                            $"DropTableIfLoadFails is true so about to drop table {tbl}"));
×
568
                        tbl.Drop();
×
UNCOV
569
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Dropped table {tbl}"));
×
570
                    }
571
                }
572

573
            if (pipelineFailureExceptionIfAny == null
17!
574
                && _request.IsBatchResume
17✔
575
                && MakeFinalTableDistinctWhenBatchResuming
17✔
576
                && _destinationDatabase != null
17✔
577
                && _toProcess != null)
17✔
578
            {
579
                var tbl = _destinationDatabase.ExpectTable(_toProcess.TableName);
×
UNCOV
580
                if (tbl.Exists())
×
581
                    // if there is no primary key then failed batches may have introduced duplication
UNCOV
582
                    if (!tbl.DiscoverColumns().Any(p => p.IsPrimaryKey))
×
583
                    {
584
                        listener.OnNotify(this,
×
585
                            new NotifyEventArgs(ProgressEventType.Information,
×
586
                                $"Making {tbl} distinct in case there are duplicate rows from bad batch resumes"));
×
587
                        tbl.MakeDistinct(50000000);
×
588
                        listener.OnNotify(this,
×
UNCOV
589
                            new NotifyEventArgs(ProgressEventType.Information, $"Finished distincting {tbl}"));
×
590
                    }
591
            }
592
        }
593

594
        TableLoadInfo?.CloseAndArchive();
34✔
595

596
        // also close off the cumulative extraction result
597
        if (_request is ExtractDatasetCommand)
34✔
598
        {
599
            var result = ((IExtractDatasetCommand)_request).CumulativeExtractionResults;
17✔
600
            if (result != null && _toProcess != null)
17✔
601
                result.CompleteAudit(GetType(), GetDestinationDescription(), TableLoadInfo.Inserts,
17✔
602
                    _request.IsBatchResume, pipelineFailureExceptionIfAny != null);
17✔
603
        }
604
    }
34✔
605

606
    public override void Abort(IDataLoadEventListener listener)
607
    {
608
        _destination?.Abort(listener);
×
UNCOV
609
    }
×
610

611
    protected override void PreInitializeImpl(IBasicActivateItems activator, IExtractCommand value, IDataLoadEventListener listener)
612
    {
613
        _activator = activator;
71✔
614
    }
71✔
615

616

617
    public override string GetDestinationDescription() => GetDestinationDescription("");
33✔
618

619
    private string GetDestinationDescription(string suffix = "")
620
    {
621
        if (_toProcess == null)
50✔
622
            return _request is ExtractGlobalsCommand
17!
623
                ? "Globals"
17✔
624
                : throw new Exception("Could not describe destination because _toProcess was null");
17✔
625

626
        var tblName = _toProcess.TableName;
33✔
627
        var dbName = GetDatabaseName();
33✔
628
        return $"{TargetDatabaseServer.ID}|{dbName}|{tblName}";
33✔
629
    }
630

UNCOV
631
    public static DestinationType GetDestinationType() => DestinationType.Database;
×
632

633
    public override ReleasePotential GetReleasePotential(IRDMPPlatformRepositoryServiceLocator repositoryLocator,
UNCOV
634
        ISelectedDataSets selectedDataSet) => new MsSqlExtractionReleasePotential(repositoryLocator, selectedDataSet);
×
635

636
    public override FixedReleaseSource<ReleaseAudit> GetReleaseSource(ICatalogueRepository catalogueRepository) =>
UNCOV
637
        new MsSqlReleaseSource(catalogueRepository);
×
638

639
    public override GlobalReleasePotential GetGlobalReleasabilityEvaluator(
640
        IRDMPPlatformRepositoryServiceLocator repositoryLocator, ISupplementalExtractionResults globalResult,
641
        IMapsDirectlyToDatabaseTable globalToCheck) =>
UNCOV
642
        new MsSqlGlobalsReleasePotential(repositoryLocator, globalResult, globalToCheck);
×
643

644
    protected override void TryExtractSupportingSQLTableImpl(SupportingSQLTable sqlTable, DirectoryInfo directory,
645
        IExtractionConfiguration configuration, IDataLoadEventListener listener, out int linesWritten,
646
        out string destinationDescription)
647
    {
648
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
2✔
649
            $"About to download SQL for global SupportingSQL {sqlTable.SQL}"));
2✔
650
        using var con = sqlTable.GetServer().GetConnection();
2✔
651
        con.Open();
2✔
652

653
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
2✔
654
            $"Connection opened successfully, about to send SQL command {sqlTable.SQL}"));
2✔
655

656
        using var dt = new DataTable();
2✔
657
        using (var cmd = DatabaseCommandHelper.GetCommand(sqlTable.SQL, con))
2✔
658
        using (var da = DatabaseCommandHelper.GetDataAdapter(cmd))
2✔
659
        {
660
            var sw = Stopwatch.StartNew();
2✔
661
            dt.BeginLoadData();
2✔
662
            da.Fill(dt);
2✔
663
            dt.EndLoadData();
2✔
664
        }
2✔
665

666
        dt.TableName = GetTableName(_destinationDatabase.Server.GetQuerySyntaxHelper()
2✔
667
            .GetSensibleEntityNameFromString(sqlTable.Name));
2✔
668
        linesWritten = dt.Rows.Count;
2✔
669

670
        var destinationDb = GetDestinationDatabase(listener);
2✔
671
        var tbl = destinationDb.ExpectTable(dt.TableName);
2✔
672

673
        if (tbl.Exists())
2!
UNCOV
674
            tbl.Drop();
×
675

676
        destinationDb.CreateTable(dt.TableName, dt);
2✔
677
        destinationDescription = $"{TargetDatabaseServer.ID}|{GetDatabaseName()}|{dt.TableName}";
2✔
678
    }
4✔
679

680

681
    protected override void TryExtractLookupTableImpl(BundledLookupTable lookup, DirectoryInfo lookupDir,
682
        IExtractionConfiguration requestConfiguration, IDataLoadEventListener listener, out int linesWritten,
683
        out string destinationDescription)
684
    {
685
        using var dt = lookup.GetDataTable();
1✔
686

687
        dt.TableName = GetTableName(_destinationDatabase.Server.GetQuerySyntaxHelper()
1✔
688
            .GetSensibleEntityNameFromString(lookup.TableInfo.Name));
1✔
689

690
        //describe the destination for the abstract base
691
        destinationDescription = $"{TargetDatabaseServer.ID}|{GetDatabaseName()}|{dt.TableName}";
1✔
692
        linesWritten = dt.Rows.Count;
1✔
693

694
        var destinationDb = GetDestinationDatabase(listener);
1✔
695
        var existing = destinationDb.ExpectTable(dt.TableName);
1✔
696

697
        if (existing.Exists())
1!
698
        {
699
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
700
                $"Dropping existing Lookup table '{existing.GetFullyQualifiedName()}'"));
×
UNCOV
701
            existing.Drop();
×
702
        }
703

704
        destinationDb.CreateTable(dt.TableName, dt);
1✔
705
    }
2✔
706

707
    private DiscoveredDatabase GetDestinationDatabase(IDataLoadEventListener listener)
708
    {
709
        //tell user we are about to inspect it
710
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
37✔
711
            $"About to open connection to {TargetDatabaseServer}"));
37✔
712

713
        var databaseName = GetDatabaseName();
37✔
714

715
        var discoveredServer = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
37✔
716

717
        var db = discoveredServer.ExpectDatabase(databaseName);
37✔
718
        if (!db.Exists())
37✔
719
            db.Create();
5✔
720

721
        return db;
37✔
722
    }
723

724
    private string GetDatabaseName()
725
    {
726
        var dbName = DatabaseNamingPattern;
76✔
727

728
        if (_project.ProjectNumber == null)
76!
UNCOV
729
            throw new ProjectNumberException($"Project '{_project}' must have a ProjectNumber");
×
730

731
        if (_request == null)
76!
UNCOV
732
            throw new Exception("No IExtractCommand Request was passed to this component");
×
733

734
        if (_request.Configuration == null)
76!
UNCOV
735
            throw new Exception($"Request did not specify any Configuration for Project '{_project}'");
×
736

737
        dbName = dbName.Replace("$p", _project.Name)
76✔
738
            .Replace("$n", _project.ProjectNumber.ToString())
76✔
739
            .Replace("$t", _project.MasterTicket)
76✔
740
            .Replace("$r", _request.Configuration.RequestTicket)
76✔
741
            .Replace("$l", _request.Configuration.ReleaseTicket)
76✔
742
            .Replace("$e", _request.Configuration.ID.ToString());
76✔
743
        return dbName;
76✔
744
    }
745

746
    public override void Check(ICheckNotifier notifier)
747
    {
748
        if (TargetDatabaseServer == null)
5✔
749
        {
750
            notifier.OnCheckPerformed(new CheckEventArgs(
1✔
751
                "Target database server property has not been set (This component does not know where to extract data to!), " +
1✔
752
                "to fix this you must edit the pipeline and choose an ExternalDatabaseServer to extract to)",
1✔
753
                CheckResult.Fail));
1✔
754
            return;
1✔
755
        }
756

757
        if (string.IsNullOrWhiteSpace(TargetDatabaseServer.Server))
4✔
758
        {
759
            notifier.OnCheckPerformed(new CheckEventArgs("TargetDatabaseServer does not have a .Server specified",
1✔
760
                CheckResult.Fail));
1✔
761
            return;
1✔
762
        }
763

764
        if (!string.IsNullOrWhiteSpace(TargetDatabaseServer.Database))
3!
765
            notifier.OnCheckPerformed(new CheckEventArgs(
×
UNCOV
766
                "TargetDatabaseServer has .Database specified but this will be ignored!", CheckResult.Warning));
×
767

768
        if (string.IsNullOrWhiteSpace(TableNamingPattern))
3!
769
        {
770
            notifier.OnCheckPerformed(new CheckEventArgs(
×
771
                "You must specify TableNamingPattern, this will tell the component how to name tables it generates in the remote destination",
×
772
                CheckResult.Fail));
×
UNCOV
773
            return;
×
774
        }
775

776
        if (string.IsNullOrWhiteSpace(DatabaseNamingPattern))
3!
777
        {
778
            notifier.OnCheckPerformed(new CheckEventArgs(
×
779
                "You must specify DatabaseNamingPattern, this will tell the component what database to create or use in the remote destination",
×
780
                CheckResult.Fail));
×
UNCOV
781
            return;
×
782
        }
783

784
        if (!DatabaseNamingPattern.Contains("$p") && !DatabaseNamingPattern.Contains("$n") &&
3✔
785
            !DatabaseNamingPattern.Contains("$t") && !DatabaseNamingPattern.Contains("$r") &&
3✔
786
            !DatabaseNamingPattern.Contains("$l"))
3✔
787
            notifier.OnCheckPerformed(new CheckEventArgs(
2✔
788
                "DatabaseNamingPattern does not contain any token. The tables may be created alongside existing tables and Release would be impossible.",
2✔
789
                CheckResult.Warning));
2✔
790

791
        if (!TableNamingPattern.Contains("$d") && !TableNamingPattern.Contains("$a"))
3!
792
            notifier.OnCheckPerformed(new CheckEventArgs(
×
793
                "TableNamingPattern must contain either $d or $a, the name/acronym of the dataset being extracted otherwise you will get collisions when you extract multiple tables at once",
×
UNCOV
794
                CheckResult.Warning));
×
795

796
        if (_request == ExtractDatasetCommand.EmptyCommand)
3!
797
        {
798
            notifier.OnCheckPerformed(new CheckEventArgs(
×
799
                "Request is ExtractDatasetCommand.EmptyCommand, will not try to connect to Database",
×
800
                CheckResult.Warning));
×
UNCOV
801
            return;
×
802
        }
803

804
        if (TableNamingPattern != null && TableNamingPattern.Contains("$a"))
3!
805
            if (_request is ExtractDatasetCommand dsRequest && string.IsNullOrWhiteSpace(dsRequest.Catalogue.Acronym))
×
806
                notifier.OnCheckPerformed(new CheckEventArgs(
×
807
                    $"Catalogue '{dsRequest.Catalogue}' does not have an Acronym but TableNamingPattern contains $a",
×
UNCOV
808
                    CheckResult.Fail));
×
809

810

811

812
        base.Check(notifier);
3✔
813

814
        try
815
        {
816
            var server = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
3✔
817
            var database = _destinationDatabase = server.ExpectDatabase(GetDatabaseName());
3✔
818

819
            if (UseArchiveTrigger)
3!
820
            {
UNCOV
821
                if (_request is ExtractDatasetCommand dsRequest)
×
822
                {
823
                    var existing = _destinationDatabase.ExpectTable(dsRequest.Catalogue.Name);
×
UNCOV
824
                    if (existing.Exists())
×
825
                    {
826
                        var hasPKs = existing.DiscoverColumns().Any(col => col.IsPrimaryKey);
×
UNCOV
827
                        if (!hasPKs)
×
828
                        {
829
                            notifier.OnCheckPerformed(new CheckEventArgs(
×
830
                               $"Catalogue does not have any PKS. Cannot apply the archive trigger",
×
UNCOV
831
                               CheckResult.Fail));
×
832
                        }
833
                    }
834
                }
835
            }
836

837
            if (database.Exists())
3✔
838
            {
839
                notifier.OnCheckPerformed(
1✔
840
                    new CheckEventArgs(
1✔
841
                        $"Database {database} already exists! if an extraction has already been run you may have errors if you are re-extracting the same tables",
1✔
842
                        CheckResult.Warning));
1✔
843
            }
844
            else
845
            {
846
                notifier.OnCheckPerformed(
2✔
847
                    new CheckEventArgs(
2✔
848
                        $"Database {database} does not exist on server... it will be created at runtime",
2✔
849
                        CheckResult.Success));
2✔
850
                return;
2✔
851
            }
852

853
            var tables = database.DiscoverTables(false);
1✔
854

855
            if (tables.Any())
1!
856
            {
857
                string tableName;
858

859
                try
860
                {
861
                    tableName = GetTableName();
×
862
                }
×
UNCOV
863
                catch (Exception ex)
×
864
                {
865
                    notifier.OnCheckPerformed(
×
866
                        new CheckEventArgs("Could not determine table name", CheckResult.Fail, ex));
×
UNCOV
867
                    return;
×
868
                }
869

870
                // if the expected table exists and we are not doing a batch resume or allowing data appending
871
                if (tables.Any(t => t.GetRuntimeName().Equals(tableName)) && !_request.IsBatchResume && !AppendDataIfTableExists)
×
872
                    notifier.OnCheckPerformed(new CheckEventArgs(ErrorCodes.ExistingExtractionTableInDatabase,
×
UNCOV
873
                        tableName, database));
×
874
            }
875
            else
876
            {
877
                notifier.OnCheckPerformed(new CheckEventArgs($"Confirmed that database {database} is empty of tables",
1✔
878
                    CheckResult.Success));
1✔
879
            }
880
        }
1✔
UNCOV
881
        catch (Exception e)
×
882
        {
883
            notifier.OnCheckPerformed(new CheckEventArgs(
×
884
                $"Could not connect to TargetDatabaseServer '{TargetDatabaseServer}'", CheckResult.Fail, e));
×
UNCOV
885
        }
×
886
    }
3✔
887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc