• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 23183960924

17 Mar 2026 07:51AM UTC coverage: 57.138% (+0.02%) from 57.121%
23183960924

Pull #2326

github

JFriel
tidy up
Pull Request #2326: Bugfix/fix merge issue

11549 of 21743 branches covered (53.12%)

Branch coverage included in aggregate %.

19 of 27 new or added lines in 7 files covered. (70.37%)

3 existing lines in 3 files now uncovered.

32673 of 55652 relevant lines covered (58.71%)

18185.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.02
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
1
using FAnsi.Discovery;
2
using FAnsi.Discovery.QuerySyntax;
3
using Microsoft.Data.SqlClient;
4
using Rdmp.Core.CommandExecution;
5
using Rdmp.Core.Curation.Data;
6
using Rdmp.Core.DataExport.Data;
7
using Rdmp.Core.DataExport.DataExtraction.Commands;
8
using Rdmp.Core.DataExport.DataRelease.Pipeline;
9
using Rdmp.Core.DataExport.DataRelease.Potential;
10
using Rdmp.Core.DataFlowPipeline;
11
using Rdmp.Core.DataLoad.Triggers;
12
using Rdmp.Core.DataLoad.Triggers.Exceptions;
13
using Rdmp.Core.DataLoad.Triggers.Implementations;
14
using Rdmp.Core.Logging;
15
using Rdmp.Core.MapsDirectlyToDatabaseTable;
16
using Rdmp.Core.Repositories;
17
using Rdmp.Core.ReusableLibraryCode.Checks;
18
using Rdmp.Core.ReusableLibraryCode.DataAccess;
19
using Rdmp.Core.ReusableLibraryCode.Progress;
20
using System;
21
using System.Collections.Generic;
22
using System.Data;
23
using System.Diagnostics;
24
using System.DirectoryServices.Protocols;
25
using System.Globalization;
26
using System.Linq;
27
using System.Text;
28
using System.Text.RegularExpressions;
29
using System.Threading.Tasks;
30
using TypeGuesser;
31

32
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations
33
{
34
    /// <summary>
35
    /// Use for simple merges into a sql db
36
    /// </summary>
37
    public class MSSqlMergeDestination : ExtractionDestination
38
    {
39
        [DemandsInitialization(
40
       "External server to create the extraction into, a new database will be created for the project based on the naming pattern provided",
41
       Mandatory = true)]
42
        public IExternalDatabaseServer TargetDatabaseServer { get; set; }
24✔
43
        [DemandsInitialization(
44
       "External server to create the extraction into, a new database will be created for the project based on the naming pattern provided",
45
       Mandatory = true)]
46
        public string DatabaseNamingPattern { get; set; }
32✔
47

48
        [DemandsInitialization("Delete the temporary table used to perform the merge", DefaultValue = true)]
49
        public bool DeleteMergeTempTable { get; set; }
24✔
50

51
        [DemandsInitialization("Ensure the destination table has an archive trigger", DefaultValue = true)]
52
        public bool UseArchiveTrigger { get; set; }
16✔
53

54
        [DemandsInitialization("Allow the merge to perform deletes when records are missing from the source.", DefaultValue = false)]
55
        public bool AllowMergeToPerformDeletes { get; set; }
34✔
56

57
        [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
58
         $p - Project Name ('e.g. My Project')
59
         $n - Project Number (e.g. 234)
60
         $c - Configuration Name (e.g. 'Cases')
61
         $d - Dataset name (e.g. 'Prescribing')
62
         $a - Dataset acronym (e.g. 'Presc') 
63
         $e - Extraction Configuration Id (e.g. 14)
64
         You must have either $a or $d
65
         ", Mandatory = true, DefaultValue = "$c_$d")]
66
        public string TableNamingPattern { get; set; }
32✔
67

68
        [DemandsInitialization("How long should the merge command be allowed to run before timing out, in seconds", DefaultValue = 30000)]
69
        public int SQLMergeTimeout { get; set; }
32✔
70

71

72
        private DiscoveredDatabase db;
73
        private DataTable _toProcess;
74

75
        public MSSqlMergeDestination() : base(false)
8✔
76
        {
77
        }
8✔
78

79
        private string GetTableName(string suffix, DataTable dt)
80
        {
81
            string tblName = TableNamingPattern;
16✔
82
            var project = _request?.Configuration.Project;
16!
83

84
            tblName = tblName.Replace("$p", project?.Name);
16!
85
            tblName = tblName.Replace("$n", project?.ProjectNumber.ToString());
16!
86
            tblName = tblName.Replace("$c", _request?.Configuration.Name);
16!
87
            tblName = tblName.Replace("$e", _request?.Configuration.ID.ToString());
16!
88

89
            if (_request is ExtractDatasetCommand extractDatasetCommand)
16!
90
            {
91
                tblName = tblName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
×
92
                tblName = tblName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
×
93
            }
94

95
            if (_request is ExtractGlobalsCommand)
16!
96
            {
97
                tblName = tblName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
×
98
                tblName = tblName.Replace("$a", "G");
×
99
            }
100

101
            var cachedGetTableNameAnswer = SanitizeNameForDatabase(tblName);
16✔
102
            if (!string.IsNullOrWhiteSpace(suffix))
16!
103
                cachedGetTableNameAnswer += $"_{suffix}";
×
104

105
            return cachedGetTableNameAnswer;
16✔
106
        }
107

108
        private string SanitizeNameForDatabase(string tblName)
109
        {
110
            if (db == null)
16!
111
                throw new Exception(
×
112
                    "Cannot pick a TableName until we know what type of server it is going to, _server is null");
×
113

114
            //get rid of brackets and dots
115
            tblName = Regex.Replace(tblName, "[.()]", "_");
16✔
116

117
            var syntax = db.Server.GetQuerySyntaxHelper();
16✔
118
            syntax.ValidateTableName(tblName);
16✔
119

120
            //otherwise, fetch and cache answer
121
            var cachedGetTableNameAnswer = syntax.GetSensibleEntityNameFromString(tblName);
16✔
122

123
            return string.IsNullOrWhiteSpace(cachedGetTableNameAnswer)
16!
124
                ? throw new Exception(
16✔
125
                    $"TableNamingPattern '{TableNamingPattern}' resulted in an empty string for request '{_request}'")
16✔
126
                : cachedGetTableNameAnswer;
16✔
127
        }
128

129
        public override void Abort(IDataLoadEventListener listener)
130
        {
131
        }
×
132

133
        public override void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
134
        {
135
        }
×
136

137
        public override string GetDestinationDescription()
138
        {
139
            if (_toProcess == null)
×
140
                return _request is ExtractGlobalsCommand
×
141
                    ? "Globals"
×
142
                    : throw new Exception("Could not describe destination because _toProcess was null");
×
143

144
            var tblName = GetTableName(null, _toProcess);
×
145
            return $"{TargetDatabaseServer.ID}|{db.GetRuntimeName()}|{tblName}";
×
146
        }
147

148
        public override GlobalReleasePotential GetGlobalReleasabilityEvaluator(IRDMPPlatformRepositoryServiceLocator repositoryLocator, ISupplementalExtractionResults globalResult, IMapsDirectlyToDatabaseTable globalToCheck)
149
        {
150
            return new MsSqlGlobalsReleasePotential(repositoryLocator, globalResult, globalToCheck);
×
151
        }
152

153
        public override ReleasePotential GetReleasePotential(IRDMPPlatformRepositoryServiceLocator repositoryLocator, ISelectedDataSets selectedDataSet)
154
        {
155
            return new MsSqlExtractionReleasePotential(repositoryLocator, selectedDataSet);
×
156
        }
157

158
        public override FixedReleaseSource<ReleaseAudit> GetReleaseSource(ICatalogueRepository catalogueRepository)
159
        {
160
            return new MsSqlReleaseSource(catalogueRepository);
×
161
        }
162

163
        protected override void Open(DataTable toProcess, IDataLoadEventListener job, GracefulCancellationToken cancellationToken)
164
        {
165
            _toProcess = toProcess;
×
166
        }
×
167

168
        protected override void WriteRows(DataTable toProcess, IDataLoadEventListener job, GracefulCancellationToken cancellationToken, Stopwatch stopwatch)
169
        {
170
            _toProcess = toProcess;
16✔
171
            var discoveredServer = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
16✔
172

173
            //sort out the naming 
174
            var dbName = DatabaseNamingPattern;
16✔
175

176
            dbName = dbName.Replace("$p", _project?.Name)
16!
177
                .Replace("$n", _project?.ProjectNumber.ToString())
16✔
178
                .Replace("$t", _project?.MasterTicket)
16✔
179
                .Replace("$r", _request?.Configuration.RequestTicket)
16✔
180
                .Replace("$l", _request?.Configuration.ReleaseTicket)
16✔
181
                .Replace("$e", _request?.Configuration.ID.ToString());
16✔
182

183
            //make sure the db exist
184
            db = discoveredServer.ExpectDatabase(dbName);
16✔
185
            if (!db.Exists())
16✔
186
                db.Create();
2✔
187

188

189
            var tableName = GetTableName(null, toProcess);
16✔
190
            var destinationTable = db.ExpectTable(tableName);
16✔
191

192
            //ensure there are some pks
193
            bool hasPrimaryKeys = toProcess.PrimaryKey != null && toProcess.PrimaryKey.Length > 0;
16!
194
            if (!destinationTable.Exists())
16✔
195
            {
196
                //create
197
                destinationTable = db.CreateTable(out _, tableName, toProcess, null,
8✔
198
                         true, null);
8✔
199
            }
200
            else
201
            {
202
                hasPrimaryKeys = destinationTable.DiscoverColumns().Any(col => col.IsPrimaryKey);
16✔
203
            }
204

205
            if (!hasPrimaryKeys)
16!
206
            {
207
                job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "No Primary Keys were found in the destination table or source configuration"));
×
208
                return;
×
209
            }
210

211
            if (UseArchiveTrigger)
16!
212
            {
UNCOV
213
                TriggerImplementerFactory triggerFactory = new TriggerImplementerFactory(FAnsi.DatabaseType.MicrosoftSQLServer);
×
NEW
214
                var implementor = triggerFactory.Create(destinationTable,false,true);
×
215
                bool present;
216
                try
217
                {
218
                    present = implementor.GetTriggerStatus() == DataLoad.Triggers.TriggerStatus.Enabled;
×
219
                }
×
220
                catch (TriggerMissingException)
×
221
                {
222
                    present = false;
×
223
                }
×
224
                if (!present)
×
225
                {
226
                    implementor.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
×
227
                }
228
            }
229

230
            var pkColumns = toProcess.PrimaryKey;
16✔
231
            var nonPkColumns = toProcess.Columns.Cast<DataColumn>().Where(dc => !pkColumns.Contains(dc) && !dc.ColumnName.StartsWith("hic_")).ToArray();
48✔
232
            //merge
233
            List<DatabaseColumnRequest> columnTypes = new List<DatabaseColumnRequest>() { };
16✔
234
            foreach (var column in destinationTable.DiscoverColumns())
96✔
235
            {
236
                columnTypes.Add(new DatabaseColumnRequest(column.GetRuntimeName(), column.DataType.ToString(), column.AllowNulls));
32✔
237
            }
238

239
            var tmpTbl = db.CreateTable(
16✔
240
                out Dictionary<string, Guesser> _dataTypeDictionary,
16✔
241
                $"mergeTempTable_{tableName}_{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture).Replace('.', '-')}",
16✔
242
                toProcess, columnTypes.ToArray(), true, null);
16✔
243
            var _managedConnection = tmpTbl.Database.Server.GetManagedConnection();
16✔
244
            var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction);
16✔
245
            _bulkcopy.Timeout = SQLMergeTimeout;
16✔
246
            _bulkcopy.Upload(toProcess);
16✔
247
            _bulkcopy.Dispose();
16✔
248
            var mergeSql = $"""
16✔
249
                MERGE INTO {destinationTable.GetFullyQualifiedName()} WITH (HOLDLOCK) AS target
16✔
250
                USING {tmpTbl.GetFullyQualifiedName()} AS source
16✔
251
                    ON {string.Join(" AND ", pkColumns.Select(pkc => $"target.{pkc.ColumnName} = source.{pkc.ColumnName}"))}
16✔
252
                WHEN MATCHED AND(
16✔
253
                {string.Join(" OR ", nonPkColumns.Select(c => GetORLine(c,db.Server.GetQuerySyntaxHelper())))}
16✔
254
                )
16✔
255
                THEN 
16✔
256
                    UPDATE SET {string.Join(" , ", nonPkColumns.Select(pkc => $"target.{pkc.ColumnName} = source.{pkc.ColumnName}"))}
16✔
257
                WHEN NOT MATCHED BY TARGET THEN
16✔
258
                    INSERT ({string.Join(" , ", toProcess.Columns.Cast<DataColumn>().Select(pkc => pkc.ColumnName))})
32✔
259
                    VALUES ({string.Join(" , ", toProcess.Columns.Cast<DataColumn>().Select(pkc => $"source.{pkc.ColumnName}"))}){(AllowMergeToPerformDeletes ? "" : ";")}
32✔
260
                {(AllowMergeToPerformDeletes ? """
16✔
261
                WHEN NOT MATCHED BY SOURCE THEN
16✔
262
                    DELETE;
16✔
263
                """ : "")}
16✔
264
                """;
16✔
265
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
16!
266
            var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection);
16✔
267
            cmd.CommandTimeout = SQLMergeTimeout;
16✔
268
            var rowCount = cmd.ExecuteNonQuery();
16✔
269
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
16✔
270
            if (DeleteMergeTempTable) tmpTbl.Drop();
32✔
271
            _managedConnection.Dispose();
16✔
272
        }
16✔
273

274
        private static string GetORLine(DataColumn c, IQuerySyntaxHelper syntax)
275
        {
276
            return string.Format(
16✔
277
                "(target.{0} <> source.{0} OR (target.{0} is null AND source.{0} is not null) OR (target.{0} is null AND source.{0} is not null))",
16✔
278
                syntax.EnsureWrapped(c.ColumnName));
16✔
279
        }
280
        protected override void PreInitializeImpl(IBasicActivateItems activator, IExtractCommand request, IDataLoadEventListener listener)
281
        {
282
        }
×
283
    }
284
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc