• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 19505116989

19 Nov 2025 02:38PM UTC coverage: 57.211% (+0.06%) from 57.148%
19505116989

push

github

web-flow
Task/rdmp 328 extraction archive triggers (#2254)

* add extraction db triggers

* fix test

* add tests

* add missing test

* update tests

* move checks

11495 of 21577 branches covered (53.27%)

Branch coverage included in aggregate %.

32 of 45 new or added lines in 2 files covered. (71.11%)

2 existing lines in 1 file now uncovered.

32571 of 55446 relevant lines covered (58.74%)

17747.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.53
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/ExecuteFullExtractionToDatabaseMSSql.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using FAnsi.Discovery;
8
using Rdmp.Core.Curation.Data;
9
using Rdmp.Core.DataExport.Data;
10
using Rdmp.Core.DataExport.DataExtraction.Commands;
11
using Rdmp.Core.DataExport.DataExtraction.UserPicks;
12
using Rdmp.Core.DataExport.DataRelease.Pipeline;
13
using Rdmp.Core.DataExport.DataRelease.Potential;
14
using Rdmp.Core.DataFlowPipeline;
15
using Rdmp.Core.DataLoad.Engine.Job.Scheduling;
16
using Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
17
using Rdmp.Core.DataLoad.Triggers.Exceptions;
18
using Rdmp.Core.DataLoad.Triggers.Implementations;
19
using Rdmp.Core.MapsDirectlyToDatabaseTable;
20
using Rdmp.Core.QueryBuilding;
21
using Rdmp.Core.Repositories;
22
using Rdmp.Core.ReusableLibraryCode;
23
using Rdmp.Core.ReusableLibraryCode.Checks;
24
using Rdmp.Core.ReusableLibraryCode.DataAccess;
25
using Rdmp.Core.ReusableLibraryCode.Progress;
26
using System;
27
using System.Data;
28
using System.Diagnostics;
29
using System.IO;
30
using System.Linq;
31
using System.Text.RegularExpressions;
32
using YamlDotNet.Core;
33

34
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations;
35

36
/// <summary>
37
/// Alternate extraction pipeline destination in which the DataTable containing the extracted dataset is written to an Sql Server database
38
/// </summary>
39
public class ExecuteFullExtractionToDatabaseMSSql : ExtractionDestination
40
{
41
    [DemandsInitialization(
42
        "External server to create the extraction into, a new database will be created for the project based on the naming pattern provided",
43
        Mandatory = true)]
44
    public IExternalDatabaseServer TargetDatabaseServer { get; set; }
370✔
45

46
    [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
47
         $p - Project Name ('e.g. My Project')
48
         $n - Project Number (e.g. 234)
49
         $t - Master Ticket (e.g. 'LINK-1234')
50
         $r - Request Ticket (e.g. 'LINK-1234')
51
         $l - Release Ticket (e.g. 'LINK-1234')
52
         $e - Extraction Configuration Id (e.g. 14)
53
         ", Mandatory = true, DefaultValue = "Proj_$n_$l")]
54
    public string DatabaseNamingPattern { get; set; }
256✔
55

56
    [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
57
         $p - Project Name ('e.g. My Project')
58
         $n - Project Number (e.g. 234)
59
         $c - Configuration Name (e.g. 'Cases')
60
         $d - Dataset name (e.g. 'Prescribing')
61
         $a - Dataset acronym (e.g. 'Presc') 
62
         $e - Extraction Configuration Id (e.g. 14)
63
         You must have either $a or $d
64
         ", Mandatory = true, DefaultValue = "$c_$d")]
65
    public string TableNamingPattern { get; set; }
138✔
66

67
    [DemandsInitialization(
68
        @"If the extraction fails half way through AND the destination table was created during the extraction then the table will be dropped from the destination rather than being left in a half loaded state ",
69
        defaultValue: true)]
70
    public bool DropTableIfLoadFails { get; set; }
68✔
71

72
    [DemandsInitialization(DataTableUploadDestination.AlterTimeout_Description, DefaultValue = 300)]
73
    public int AlterTimeout { get; set; }
102✔
74

75
    [DemandsInitialization(
76
        "True to copy the column collations from the source database when creating the destination database.  Only works if both the source and destination have the same DatabaseType.  Excludes columns which feature a transform as part of extraction.",
77
        DefaultValue = false)]
78
    public bool CopyCollations { get; set; }
1,288✔
79

80
    [DemandsInitialization(
81
        "True to always drop the destination database table(s) from the destination if they already existed",
82
        DefaultValue = false)]
83
    public bool AlwaysDropExtractionTables { get; set; }
84✔
84

85
    [DemandsInitialization(
86
        "True to apply a distincting operation to the final table when using an ExtractionProgress.  This prevents data duplication from failed batch resumes.",
87
        DefaultValue = true)]
88
    public bool MakeFinalTableDistinctWhenBatchResuming { get; set; } = true;
146✔
89

90

91
    [DemandsInitialization("If this extraction has already been run, it will append the extraction data into the database. There is no duplication protection with this functionality.")]
92
    public bool AppendDataIfTableExists { get; set; } = false;
136✔
93

94
    [DemandsInitialization("If checked, a column names 'extraction_timestamp' will be included in the extraction that denotes the time the record was added to the extraction.")]
95
    public bool IncludeTimeStamp { get; set; } = false;
102✔
96

97

98
    [DemandsInitialization("If checked, indexed will be created using the primary keys specified")]
99
    public bool IndexTables { get; set; } = true;
180✔
100

101
    [DemandsInitialization(@"How do you want to name the created index, use the following tokens if you need them:   
102
         $p - Project Name ('e.g. My Project')
103
         $n - Project Number (e.g. 234)
104
         $c - Configuration Name (e.g. 'Cases')
105
         $d - Dataset name (e.g. 'Prescribing')
106
         $a - Dataset acronym (e.g. 'Presc') 
107
         $e - Extraction Configuration Id (e.g. 14)
108
         You must have either $a or $d
109
         ", DefaultValue = "Index_$c_$d")]
110
    public string IndexNamingPattern { get; set; }
102✔
111

112
    [DemandsInitialization("An optional list of columns to index on e.g \"Column1, Column2\"")]
113
    public string UserDefinedIndex { get; set; }
102✔
114

115

116
    [DemandsInitialization("When writing to an existing database, will update records and store historical versions via a view", DefaultValue = false)]
117
    public bool UseArchiveTrigger { get; set; }
124✔
118

119
    private DiscoveredDatabase _destinationDatabase;
120
    private DataTableUploadDestination _destination;
121

122
    private bool _tableDidNotExistAtStartOfLoad;
123
    private bool _isTableAlreadyNamed;
124
    private DataTable _toProcess;
125

126
    public ExecuteFullExtractionToDatabaseMSSql() : base(false)
78✔
127
    {
128
    }
78✔
129

130
    public override DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener job,
131
        GracefulCancellationToken cancellationToken)
132
    {
133
        _destinationDatabase = GetDestinationDatabase(job);
68✔
134
        return base.ProcessPipelineData(toProcess, job, cancellationToken);
68✔
135
    }
136

137
    protected override void Open(DataTable toProcess, IDataLoadEventListener job,
138
        GracefulCancellationToken cancellationToken)
139
    {
140
        _toProcess = toProcess;
34✔
141

142
        //give the data table the correct name
143
        if (_toProcess.ExtendedProperties.ContainsKey("ProperlyNamed") &&
34!
144
            _toProcess.ExtendedProperties["ProperlyNamed"].Equals(true))
34✔
145
            _isTableAlreadyNamed = true;
×
146

147
        _toProcess.TableName = GetTableName();
34✔
148

149
        _destination = PrepareDestination(job, _toProcess);
34✔
150
        OutputFile = _toProcess.TableName;
34✔
151
    }
34✔
152

153
    protected override void WriteRows(DataTable toProcess, IDataLoadEventListener job,
154
        GracefulCancellationToken cancellationToken, Stopwatch stopwatch)
155
    {
156
        // empty batches are allowed when using batch/resume
157
        if (toProcess.Rows.Count == 0 && _request.IsBatchResume) return;
34!
158

159
        if (_request.IsBatchResume) _destination.AllowLoadingPopulatedTables = true;
34!
160

161
        _destination.ProcessPipelineData(toProcess, job, cancellationToken);
34✔
162

163
        LinesWritten += toProcess.Rows.Count;
32✔
164
    }
32✔
165

166
    private DataTableUploadDestination PrepareDestination(IDataLoadEventListener listener, DataTable toProcess)
167
    {
168
        //see if the user has entered an extraction server/database
169
        if (TargetDatabaseServer == null)
34!
170
            throw new Exception(
×
171
                "TargetDatabaseServer (the place you want to extract the project data to) property has not been set!");
×
172

173
        try
174
        {
175
            if (!_destinationDatabase.Exists())
34!
176
                _destinationDatabase.Create();
×
177

178
            if (_request is ExtractGlobalsCommand)
34!
179
                return null;
×
180

181
            var tblName = _toProcess.TableName;
34✔
182

183
            //See if table already exists on the server (likely to cause problems including duplication, schema changes in configuration etc)
184
            var existing = _destinationDatabase.ExpectTable(tblName);
34✔
185
            if (existing.Exists())
34✔
186
            {
187
                var hasPKs = existing.DiscoverColumns().Any(col => col.IsPrimaryKey);
108✔
188
                if (_request.IsBatchResume)
16!
189
                {
190
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
×
191
                        $"Table {existing.GetFullyQualifiedName()} already exists but it IsBatchResume so no problem."));
×
192
                }
193
                else if (AlwaysDropExtractionTables)
16!
194
                {
195
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
196
                        $"Table {existing.GetFullyQualifiedName()} already exists, dropping because setting {nameof(AlwaysDropExtractionTables)} is on"));
×
197
                    existing.Drop();
×
198

199
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
200
                        $"Table {existing.GetFullyQualifiedName()} was dropped"));
×
201

202
                    // since we dropped it we should treat it as if it was never there to begin with
203
                    _tableDidNotExistAtStartOfLoad = true;
×
204
                }
205
                else if (UseArchiveTrigger && hasPKs)
16✔
206
                {
207

208
                    TriggerImplementerFactory triggerFactory = new TriggerImplementerFactory(FAnsi.DatabaseType.MicrosoftSQLServer);
6✔
209
                    var implementor = triggerFactory.Create(existing);
6✔
210
                    bool present;
211
                    try
212
                    {
213
                        present = implementor.GetTriggerStatus() == DataLoad.Triggers.TriggerStatus.Enabled;
6✔
214
                    }
6✔
NEW
215
                    catch (TriggerMissingException)
×
216
                    {
NEW
217
                        present = false;
×
NEW
218
                    }
×
219
                    if (!present)
6!
220
                    {
NEW
221
                        implementor.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
×
222
                    }
223

224
                }
225
                else
226
                {
227
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
10✔
228
                        $"A table called {tblName} already exists on server {TargetDatabaseServer}, data load might crash if it is populated and/or has an incompatible schema"));
10✔
229
                }
230
            }
231
            else
232
            {
233
                _tableDidNotExistAtStartOfLoad = true;
18✔
234
            }
235
        }
34✔
236
        catch (Exception e)
×
237
        {
238
            //Probably the database didn't exist or the credentials were wrong or something
239
            listener.OnNotify(this,
×
240
                new NotifyEventArgs(ProgressEventType.Error,
×
241
                    "Failed to inspect destination for already existing datatables", e));
×
242
        }
×
243

244
        _destination = new DataTableUploadDestination(((IExtractDatasetCommand)_request).ExtractableCohort.ExternalCohortTable);
34✔
245

246
        PrimeDestinationTypesBasedOnCatalogueTypes(listener, toProcess);
34✔
247

248
        _destination.AllowResizingColumnsAtUploadTime = true;
34✔
249
        _destination.AlterTimeout = AlterTimeout;
34✔
250
        _destination.AppendDataIfTableExists = AppendDataIfTableExists;
34✔
251
        _destination.IncludeTimeStamp = IncludeTimeStamp;
34✔
252
        _destination.UseTrigger = AppendDataIfTableExists;
34✔
253
        _destination.IndexTables = IndexTables;
34✔
254
        _destination.UseTrigger = UseArchiveTrigger;
34✔
255
        _destination.IndexTableName = GetIndexName();
34✔
256
        if (UserDefinedIndex is not null)
34!
257
            _destination.UserDefinedIndexes = UserDefinedIndex.Split(',').Select(i => i.Trim()).ToList();
×
258
        _destination.PreInitialize(_destinationDatabase, listener);
34✔
259

260

261
        return _destination;
34✔
262
    }
×
263

264
    private void PrimeDestinationTypesBasedOnCatalogueTypes(IDataLoadEventListener listener, DataTable toProcess)
265
    {
266
        //if the extraction is of a Catalogue
267

268
        if (_request is not IExtractDatasetCommand datasetCommand)
34!
269
            return;
×
270

271
        //for every extractable column in the Catalogue
272
        foreach (var extractionInformation in datasetCommand.ColumnsToExtract.OfType<ExtractableColumn>()
2,512✔
273
                     .Select(ec =>
34✔
274
                         ec.CatalogueExtractionInformation))
1,256✔
275
        {
276
            if (extractionInformation == null)
1,222✔
277
                continue;
278

279
            var catItem = extractionInformation.CatalogueItem;
1,222✔
280

281
            //if we do not know the data type or the ei is a transform
282
            if (catItem == null)
1,222!
283
            {
284
                listener.OnNotify(this,
×
285
                    new NotifyEventArgs(ProgressEventType.Warning,
×
286
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it had no associated CatalogueItem"));
×
287
                continue;
×
288
            }
289

290
            if (catItem.ColumnInfo == null)
1,222!
291
            {
292
                listener.OnNotify(this,
×
293
                    new NotifyEventArgs(ProgressEventType.Warning,
×
294
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it had no associated ColumnInfo"));
×
295
                continue;
×
296
            }
297

298
            if (extractionInformation.IsProperTransform())
1,222✔
299
            {
300
                listener.OnNotify(this,
2✔
301
                    new NotifyEventArgs(ProgressEventType.Warning,
2✔
302
                        $"Did not copy Types for ExtractionInformation {extractionInformation} (ID={extractionInformation.ID}) because it is a Transform"));
2✔
303
                continue;
2✔
304
            }
305

306
            var destinationType = GetDestinationDatabaseType(extractionInformation);
1,220✔
307

308
            //Tell the destination the datatype of the ColumnInfo that underlies the ExtractionInformation (this might be changed by the ExtractionInformation e.g. as a
309
            //transform but it is a good starting point.  We don't want to create a varchar(10) column in the destination if the origin dataset (Catalogue) is a varchar(100)
310
            //since it will just confuse the user.  Bear in mind these data types can be degraded later by the destination
311
            var columnName = extractionInformation.Alias ?? catItem.ColumnInfo.GetRuntimeName();
1,220✔
312
            var addedType = _destination.AddExplicitWriteType(columnName, destinationType);
1,220✔
313
            addedType.IsPrimaryKey = toProcess.PrimaryKey.Any(dc => dc.ColumnName == columnName);
2,436✔
314

315
            //if user wants to copy collation types and the destination server is the same type as the origin server
316
            if (CopyCollations && _destinationDatabase.Server.DatabaseType == catItem.ColumnInfo.TableInfo.DatabaseType)
1,220!
317
                addedType.Collation = catItem.ColumnInfo.Collation;
×
318

319
            listener.OnNotify(this,
1,220✔
320
                new NotifyEventArgs(ProgressEventType.Information,
1,220✔
321
                    $"Set Type for {columnName} to {destinationType} (IsPrimaryKey={(addedType.IsPrimaryKey ? "true" : "false")}) to match the source table"));
1,220✔
322
        }
323

324

325
        foreach (var sub in datasetCommand.QueryBuilder.SelectColumns.Select(static sc => sc.IColumn)
1,392✔
326
                     .OfType<ReleaseIdentifierSubstitution>())
34✔
327
        {
328
            var columnName = sub.GetRuntimeName();
34✔
329
            var isPk = toProcess.PrimaryKey.Any(dc => dc.ColumnName == columnName);
62✔
330

331
            var addedType = _destination.AddExplicitWriteType(columnName,
34✔
332
                datasetCommand.ExtractableCohort.GetReleaseIdentifierDataType());
34✔
333
            addedType.IsPrimaryKey = isPk;
34✔
334
            addedType.AllowNulls = !isPk;
34✔
335
        }
336
    }
34✔
337

338
    private string GetDestinationDatabaseType(ConcreteColumn col)
339
    {
340
        //Make sure we know if we are going between database types
341
        var fromDbType = _destinationDatabase.Server.DatabaseType;
1,220✔
342
        var toDbType = col.ColumnInfo.TableInfo.DatabaseType;
1,220✔
343
        if (fromDbType != toDbType)
1,220!
344
        {
345
            var fromSyntax = col.ColumnInfo.GetQuerySyntaxHelper();
×
346
            var toSyntax = _destinationDatabase.Server.GetQuerySyntaxHelper();
×
347

348
            var intermediate = fromSyntax.TypeTranslater.GetDataTypeRequestForSQLDBType(col.ColumnInfo.Data_type);
×
349
            return toSyntax.TypeTranslater.GetSQLDBTypeForCSharpType(intermediate);
×
350
        }
351

352
        return col.ColumnInfo.Data_type;
1,220✔
353
    }
354

355
    private string GetIndexName()
356
    {
357
        string indexName = IndexNamingPattern;
34✔
358
        var project = _request.Configuration.Project;
34✔
359
        indexName = indexName.Replace("$p", project.Name);
34✔
360
        indexName = indexName.Replace("$n", project.ProjectNumber.ToString());
34✔
361
        indexName = indexName.Replace("$c", _request.Configuration.Name);
34✔
362
        indexName = indexName.Replace("$e", _request.Configuration.ID.ToString());
34✔
363
        if (_request is ExtractDatasetCommand extractDatasetCommand)
34✔
364
        {
365
            indexName = indexName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
34✔
366
            indexName = indexName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
34✔
367
        }
368

369
        if (_request is ExtractGlobalsCommand)
34!
370
        {
371
            indexName = indexName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
×
372
            indexName = indexName.Replace("$a", "G");
×
373
        }
374

375

376
        return indexName.Replace(" ", "");
34✔
377
    }
378

379
    private string GetTableName(string suffix = null)
380
    {
381
        string tblName;
382
        if (_isTableAlreadyNamed)
40!
383
        {
384
            tblName = SanitizeNameForDatabase(_toProcess.TableName);
×
385

386
            if (!string.IsNullOrWhiteSpace(suffix))
×
387
                tblName += $"_{suffix}";
×
388

389
            return tblName;
×
390
        }
391

392
        tblName = TableNamingPattern;
40✔
393
        var project = _request.Configuration.Project;
40✔
394

395
        tblName = tblName.Replace("$p", project.Name);
40✔
396
        tblName = tblName.Replace("$n", project.ProjectNumber.ToString());
40✔
397
        tblName = tblName.Replace("$c", _request.Configuration.Name);
40✔
398
        tblName = tblName.Replace("$e", _request.Configuration.ID.ToString());
40✔
399

400
        if (_request is ExtractDatasetCommand extractDatasetCommand)
40✔
401
        {
402
            tblName = tblName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
38✔
403
            tblName = tblName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
38✔
404
        }
405

406
        if (_request is ExtractGlobalsCommand)
40✔
407
        {
408
            tblName = tblName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
2✔
409
            tblName = tblName.Replace("$a", "G");
2✔
410
        }
411

412
        var cachedGetTableNameAnswer = SanitizeNameForDatabase(tblName);
40✔
413
        if (!string.IsNullOrWhiteSpace(suffix))
40✔
414
            cachedGetTableNameAnswer += $"_{suffix}";
6✔
415

416
        return cachedGetTableNameAnswer;
40✔
417
    }
418

419
    private string SanitizeNameForDatabase(string tblName)
420
    {
421
        if (_destinationDatabase == null)
40!
422
            throw new Exception(
×
423
                "Cannot pick a TableName until we know what type of server it is going to, _server is null");
×
424

425
        //get rid of brackets and dots
426
        tblName = Regex.Replace(tblName, "[.()]", "_");
40✔
427

428
        var syntax = _destinationDatabase.Server.GetQuerySyntaxHelper();
40✔
429
        syntax.ValidateTableName(tblName);
40✔
430

431
        //otherwise, fetch and cache answer
432
        var cachedGetTableNameAnswer = syntax.GetSensibleEntityNameFromString(tblName);
40✔
433

434
        return string.IsNullOrWhiteSpace(cachedGetTableNameAnswer)
40!
435
            ? throw new Exception(
40✔
436
                $"TableNamingPattern '{TableNamingPattern}' resulted in an empty string for request '{_request}'")
40✔
437
            : cachedGetTableNameAnswer;
40✔
438
    }
439

440
    public override void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
441
    {
442
        if (_destination != null)
68✔
443
        {
444
            _destination.Dispose(listener, pipelineFailureExceptionIfAny);
34✔
445

446
            //if the extraction failed, the table didn't exist in the destination (i.e. the table was created during the extraction) and we are to DropTableIfLoadFails
447
            if (pipelineFailureExceptionIfAny != null && _tableDidNotExistAtStartOfLoad && DropTableIfLoadFails)
34!
448
                if (_destinationDatabase != null)
×
449
                {
450
                    var tbl = _destinationDatabase.ExpectTable(_toProcess.TableName);
×
451

452
                    if (tbl.Exists())
×
453
                    {
454
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
455
                            $"DropTableIfLoadFails is true so about to drop table {tbl}"));
×
456
                        tbl.Drop();
×
457
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, $"Dropped table {tbl}"));
×
458
                    }
459
                }
460

461
            if (pipelineFailureExceptionIfAny == null
34!
462
                && _request.IsBatchResume
34✔
463
                && MakeFinalTableDistinctWhenBatchResuming
34✔
464
                && _destinationDatabase != null
34✔
465
                && _toProcess != null)
34✔
466
            {
467
                var tbl = _destinationDatabase.ExpectTable(_toProcess.TableName);
×
468
                if (tbl.Exists())
×
469
                    // if there is no primary key then failed batches may have introduced duplication
470
                    if (!tbl.DiscoverColumns().Any(p => p.IsPrimaryKey))
×
471
                    {
472
                        listener.OnNotify(this,
×
473
                            new NotifyEventArgs(ProgressEventType.Information,
×
474
                                $"Making {tbl} distinct in case there are duplicate rows from bad batch resumes"));
×
475
                        tbl.MakeDistinct(50000000);
×
476
                        listener.OnNotify(this,
×
477
                            new NotifyEventArgs(ProgressEventType.Information, $"Finished distincting {tbl}"));
×
478
                    }
479
            }
480
        }
481

482
        TableLoadInfo?.CloseAndArchive();
68✔
483

484
        // also close off the cumulative extraction result
485
        if (_request is ExtractDatasetCommand)
68✔
486
        {
487
            var result = ((IExtractDatasetCommand)_request).CumulativeExtractionResults;
34✔
488
            if (result != null && _toProcess != null)
34✔
489
                result.CompleteAudit(GetType(), GetDestinationDescription(), TableLoadInfo.Inserts,
34✔
490
                    _request.IsBatchResume, pipelineFailureExceptionIfAny != null);
34✔
491
        }
492
    }
68✔
493

494
    public override void Abort(IDataLoadEventListener listener)
495
    {
496
        _destination?.Abort(listener);
×
497
    }
×
498

499
    protected override void PreInitializeImpl(IExtractCommand value, IDataLoadEventListener listener)
500
    {
501
    }
142✔
502

503

504
    public override string GetDestinationDescription() => GetDestinationDescription("");
66✔
505

506
    private string GetDestinationDescription(string suffix = "")
507
    {
508
        if (_toProcess == null)
100✔
509
            return _request is ExtractGlobalsCommand
34!
510
                ? "Globals"
34✔
511
                : throw new Exception("Could not describe destination because _toProcess was null");
34✔
512

513
        var tblName = _toProcess.TableName;
66✔
514
        var dbName = GetDatabaseName();
66✔
515
        return $"{TargetDatabaseServer.ID}|{dbName}|{tblName}";
66✔
516
    }
517

518
    public static DestinationType GetDestinationType() => DestinationType.Database;
×
519

520
    public override ReleasePotential GetReleasePotential(IRDMPPlatformRepositoryServiceLocator repositoryLocator,
521
        ISelectedDataSets selectedDataSet) => new MsSqlExtractionReleasePotential(repositoryLocator, selectedDataSet);
×
522

523
    public override FixedReleaseSource<ReleaseAudit> GetReleaseSource(ICatalogueRepository catalogueRepository) =>
524
        new MsSqlReleaseSource(catalogueRepository);
×
525

526
    public override GlobalReleasePotential GetGlobalReleasabilityEvaluator(
527
        IRDMPPlatformRepositoryServiceLocator repositoryLocator, ISupplementalExtractionResults globalResult,
528
        IMapsDirectlyToDatabaseTable globalToCheck) =>
529
        new MsSqlGlobalsReleasePotential(repositoryLocator, globalResult, globalToCheck);
×
530

531
    protected override void TryExtractSupportingSQLTableImpl(SupportingSQLTable sqlTable, DirectoryInfo directory,
532
        IExtractionConfiguration configuration, IDataLoadEventListener listener, out int linesWritten,
533
        out string destinationDescription)
534
    {
535
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
4✔
536
            $"About to download SQL for global SupportingSQL {sqlTable.SQL}"));
4✔
537
        using var con = sqlTable.GetServer().GetConnection();
4✔
538
        con.Open();
4✔
539

540
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
4✔
541
            $"Connection opened successfully, about to send SQL command {sqlTable.SQL}"));
4✔
542

543
        using var dt = new DataTable();
4✔
544
        using (var cmd = DatabaseCommandHelper.GetCommand(sqlTable.SQL, con))
4✔
545
        using (var da = DatabaseCommandHelper.GetDataAdapter(cmd))
4✔
546
        {
547
            var sw = Stopwatch.StartNew();
4✔
548
            dt.BeginLoadData();
4✔
549
            da.Fill(dt);
4✔
550
            dt.EndLoadData();
4✔
551
        }
4✔
552

553
        dt.TableName = GetTableName(_destinationDatabase.Server.GetQuerySyntaxHelper()
4✔
554
            .GetSensibleEntityNameFromString(sqlTable.Name));
4✔
555
        linesWritten = dt.Rows.Count;
4✔
556

557
        var destinationDb = GetDestinationDatabase(listener);
4✔
558
        var tbl = destinationDb.ExpectTable(dt.TableName);
4✔
559

560
        if (tbl.Exists())
4!
561
            tbl.Drop();
×
562

563
        destinationDb.CreateTable(dt.TableName, dt);
4✔
564
        destinationDescription = $"{TargetDatabaseServer.ID}|{GetDatabaseName()}|{dt.TableName}";
4✔
565
    }
8✔
566

567

568
    protected override void TryExtractLookupTableImpl(BundledLookupTable lookup, DirectoryInfo lookupDir,
569
        IExtractionConfiguration requestConfiguration, IDataLoadEventListener listener, out int linesWritten,
570
        out string destinationDescription)
571
    {
572
        using var dt = lookup.GetDataTable();
2✔
573

574
        dt.TableName = GetTableName(_destinationDatabase.Server.GetQuerySyntaxHelper()
2✔
575
            .GetSensibleEntityNameFromString(lookup.TableInfo.Name));
2✔
576

577
        //describe the destination for the abstract base
578
        destinationDescription = $"{TargetDatabaseServer.ID}|{GetDatabaseName()}|{dt.TableName}";
2✔
579
        linesWritten = dt.Rows.Count;
2✔
580

581
        var destinationDb = GetDestinationDatabase(listener);
2✔
582
        var existing = destinationDb.ExpectTable(dt.TableName);
2✔
583

584
        if (existing.Exists())
2!
585
        {
586
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
587
                $"Dropping existing Lookup table '{existing.GetFullyQualifiedName()}'"));
×
588
            existing.Drop();
×
589
        }
590

591
        destinationDb.CreateTable(dt.TableName, dt);
2✔
592
    }
4✔
593

594
    private DiscoveredDatabase GetDestinationDatabase(IDataLoadEventListener listener)
595
    {
596
        //tell user we are about to inspect it
597
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
74✔
598
            $"About to open connection to {TargetDatabaseServer}"));
74✔
599

600
        var databaseName = GetDatabaseName();
74✔
601

602
        var discoveredServer = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
74✔
603

604
        var db = discoveredServer.ExpectDatabase(databaseName);
74✔
605
        if (!db.Exists())
74✔
606
            db.Create();
10✔
607

608
        return db;
74✔
609
    }
610

611
    private string GetDatabaseName()
612
    {
613
        var dbName = DatabaseNamingPattern;
152✔
614

615
        if (_project.ProjectNumber == null)
152!
616
            throw new ProjectNumberException($"Project '{_project}' must have a ProjectNumber");
×
617

618
        if (_request == null)
152!
619
            throw new Exception("No IExtractCommand Request was passed to this component");
×
620

621
        if (_request.Configuration == null)
152!
622
            throw new Exception($"Request did not specify any Configuration for Project '{_project}'");
×
623

624
        dbName = dbName.Replace("$p", _project.Name)
152✔
625
            .Replace("$n", _project.ProjectNumber.ToString())
152✔
626
            .Replace("$t", _project.MasterTicket)
152✔
627
            .Replace("$r", _request.Configuration.RequestTicket)
152✔
628
            .Replace("$l", _request.Configuration.ReleaseTicket)
152✔
629
            .Replace("$e", _request.Configuration.ID.ToString());
152✔
630
        return dbName;
152✔
631
    }
632

633
    public override void Check(ICheckNotifier notifier)
634
    {
635
        if (TargetDatabaseServer == null)
10✔
636
        {
637
            notifier.OnCheckPerformed(new CheckEventArgs(
2✔
638
                "Target database server property has not been set (This component does not know where to extract data to!), " +
2✔
639
                "to fix this you must edit the pipeline and choose an ExternalDatabaseServer to extract to)",
2✔
640
                CheckResult.Fail));
2✔
641
            return;
2✔
642
        }
643

644
        if (string.IsNullOrWhiteSpace(TargetDatabaseServer.Server))
8✔
645
        {
646
            notifier.OnCheckPerformed(new CheckEventArgs("TargetDatabaseServer does not have a .Server specified",
2✔
647
                CheckResult.Fail));
2✔
648
            return;
2✔
649
        }
650

651
        if (!string.IsNullOrWhiteSpace(TargetDatabaseServer.Database))
6!
652
            notifier.OnCheckPerformed(new CheckEventArgs(
×
653
                "TargetDatabaseServer has .Database specified but this will be ignored!", CheckResult.Warning));
×
654

655
        if (string.IsNullOrWhiteSpace(TableNamingPattern))
6!
656
        {
657
            notifier.OnCheckPerformed(new CheckEventArgs(
×
658
                "You must specify TableNamingPattern, this will tell the component how to name tables it generates in the remote destination",
×
659
                CheckResult.Fail));
×
660
            return;
×
661
        }
662

663
        if (string.IsNullOrWhiteSpace(DatabaseNamingPattern))
6!
664
        {
665
            notifier.OnCheckPerformed(new CheckEventArgs(
×
666
                "You must specify DatabaseNamingPattern, this will tell the component what database to create or use in the remote destination",
×
667
                CheckResult.Fail));
×
668
            return;
×
669
        }
670

671
        if (!DatabaseNamingPattern.Contains("$p") && !DatabaseNamingPattern.Contains("$n") &&
6✔
672
            !DatabaseNamingPattern.Contains("$t") && !DatabaseNamingPattern.Contains("$r") &&
6✔
673
            !DatabaseNamingPattern.Contains("$l"))
6✔
674
            notifier.OnCheckPerformed(new CheckEventArgs(
4✔
675
                "DatabaseNamingPattern does not contain any token. The tables may be created alongside existing tables and Release would be impossible.",
4✔
676
                CheckResult.Warning));
4✔
677

678
        if (!TableNamingPattern.Contains("$d") && !TableNamingPattern.Contains("$a"))
6!
679
            notifier.OnCheckPerformed(new CheckEventArgs(
×
680
                "TableNamingPattern must contain either $d or $a, the name/acronym of the dataset being extracted otherwise you will get collisions when you extract multiple tables at once",
×
681
                CheckResult.Warning));
×
682

683
        if (_request == ExtractDatasetCommand.EmptyCommand)
6!
684
        {
685
            notifier.OnCheckPerformed(new CheckEventArgs(
×
686
                "Request is ExtractDatasetCommand.EmptyCommand, will not try to connect to Database",
×
687
                CheckResult.Warning));
×
688
            return;
×
689
        }
690

691
        if (TableNamingPattern != null && TableNamingPattern.Contains("$a"))
6!
692
            if (_request is ExtractDatasetCommand dsRequest && string.IsNullOrWhiteSpace(dsRequest.Catalogue.Acronym))
×
693
                notifier.OnCheckPerformed(new CheckEventArgs(
×
694
                    $"Catalogue '{dsRequest.Catalogue}' does not have an Acronym but TableNamingPattern contains $a",
×
695
                    CheckResult.Fail));
×
696

697
       
698

699
        base.Check(notifier);
6✔
700

701
        try
702
        {
703
            var server = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
6✔
704
            var database = _destinationDatabase = server.ExpectDatabase(GetDatabaseName());
6✔
705

706
            if (UseArchiveTrigger)
6!
707
            {
NEW
708
                if (_request is ExtractDatasetCommand dsRequest)
×
709
                {
NEW
710
                    var existing = _destinationDatabase.ExpectTable(dsRequest.Catalogue.Name);
×
NEW
711
                    if (existing.Exists())
×
712
                    {
NEW
713
                        var hasPKs = existing.DiscoverColumns().Any(col => col.IsPrimaryKey);
×
NEW
714
                        if (!hasPKs)
×
715
                        {
NEW
716
                            notifier.OnCheckPerformed(new CheckEventArgs(
×
NEW
717
                               $"Catalogue does not have any PKS. Cannot apply the archive trigger",
×
NEW
718
                               CheckResult.Fail));
×
719
                        }
720
                    }
721
                }
722
            }
723

724
            if (database.Exists())
6✔
725
            {
726
                notifier.OnCheckPerformed(
2✔
727
                    new CheckEventArgs(
2✔
728
                        $"Database {database} already exists! if an extraction has already been run you may have errors if you are re-extracting the same tables",
2✔
729
                        CheckResult.Warning));
2✔
730
            }
731
            else
732
            {
733
                notifier.OnCheckPerformed(
4✔
734
                    new CheckEventArgs(
4✔
735
                        $"Database {database} does not exist on server... it will be created at runtime",
4✔
736
                        CheckResult.Success));
4✔
737
                return;
4✔
738
            }
739

740
            var tables = database.DiscoverTables(false);
2✔
741

742
            if (tables.Any())
2!
743
            {
744
                string tableName;
745

746
                try
747
                {
748
                    tableName = GetTableName();
×
749
                }
×
750
                catch (Exception ex)
×
751
                {
752
                    notifier.OnCheckPerformed(
×
753
                        new CheckEventArgs("Could not determine table name", CheckResult.Fail, ex));
×
754
                    return;
×
755
                }
756

757
                // if the expected table exists and we are not doing a batch resume or allowing data appending
758
                if (tables.Any(t => t.GetRuntimeName().Equals(tableName)) && !_request.IsBatchResume && !AppendDataIfTableExists)
×
759
                    notifier.OnCheckPerformed(new CheckEventArgs(ErrorCodes.ExistingExtractionTableInDatabase,
×
760
                        tableName, database));
×
761
            }
762
            else
763
            {
764
                notifier.OnCheckPerformed(new CheckEventArgs($"Confirmed that database {database} is empty of tables",
2✔
765
                    CheckResult.Success));
2✔
766
            }
767
        }
2✔
768
        catch (Exception e)
×
769
        {
770
            notifier.OnCheckPerformed(new CheckEventArgs(
×
771
                $"Could not connect to TargetDatabaseServer '{TargetDatabaseServer}'", CheckResult.Fail, e));
×
772
        }
×
773
    }
6✔
774
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc