• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 23487875153

24 Mar 2026 11:48AM UTC coverage: 57.104% (-0.03%) from 57.133%
23487875153

Pull #2327

github

JFriel
add dnd
Pull Request #2327: Fix Cohort UI Issue

11543 of 21745 branches covered (53.08%)

Branch coverage included in aggregate %.

1 of 12 new or added lines in 1 file covered. (8.33%)

41 existing lines in 2 files now uncovered.

32665 of 55671 relevant lines covered (58.68%)

9092.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.1
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Destinations/MSSqlMergeDestination.cs
1
using FAnsi.Discovery;
2
using Microsoft.Data.SqlClient;
3
using Rdmp.Core.CommandExecution;
4
using Rdmp.Core.Curation.Data;
5
using Rdmp.Core.DataExport.Data;
6
using Rdmp.Core.DataExport.DataExtraction.Commands;
7
using Rdmp.Core.DataExport.DataRelease.Pipeline;
8
using Rdmp.Core.DataExport.DataRelease.Potential;
9
using Rdmp.Core.DataFlowPipeline;
10
using Rdmp.Core.DataLoad.Triggers;
11
using Rdmp.Core.DataLoad.Triggers.Exceptions;
12
using Rdmp.Core.DataLoad.Triggers.Implementations;
13
using Rdmp.Core.Logging;
14
using Rdmp.Core.MapsDirectlyToDatabaseTable;
15
using Rdmp.Core.Repositories;
16
using Rdmp.Core.ReusableLibraryCode.Checks;
17
using Rdmp.Core.ReusableLibraryCode.DataAccess;
18
using Rdmp.Core.ReusableLibraryCode.Progress;
19
using System;
20
using System.Collections.Generic;
21
using System.Data;
22
using System.Diagnostics;
23
using System.DirectoryServices.Protocols;
24
using System.Globalization;
25
using System.Linq;
26
using System.Text;
27
using System.Text.RegularExpressions;
28
using System.Threading.Tasks;
29
using TypeGuesser;
30

31
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Destinations
32
{
33
    /// <summary>
34
    /// Use for simple merges into a sql db
35
    /// </summary>
36
    public class MSSqlMergeDestination : ExtractionDestination
37
    {
38
        [DemandsInitialization(
39
       "External server to create the extraction into, a new database will be created for the project based on the naming pattern provided",
40
       Mandatory = true)]
41
        public IExternalDatabaseServer TargetDatabaseServer { get; set; }
12✔
42
        [DemandsInitialization(
43
       "External server to create the extraction into, a new database will be created for the project based on the naming pattern provided",
44
       Mandatory = true)]
45
        public string DatabaseNamingPattern { get; set; }
16✔
46

47
        [DemandsInitialization("Delete the temporary table used to perform the merge", DefaultValue = true)]
48
        public bool DeleteMergeTempTable { get; set; }
12✔
49

50
        [DemandsInitialization("Ensure the destination table has an archive trigger", DefaultValue = true)]
51
        public bool UseArchiveTrigger { get; set; }
8✔
52

53
        [DemandsInitialization("Allow the merge to perform deletes when records are missing from the source.", DefaultValue = false)]
54
        public bool AllowMergeToPerformDeletes { get; set; }
17✔
55

56
        [DemandsInitialization(@"How do you want to name datasets, use the following tokens if you need them:   
57
         $p - Project Name ('e.g. My Project')
58
         $n - Project Number (e.g. 234)
59
         $c - Configuration Name (e.g. 'Cases')
60
         $d - Dataset name (e.g. 'Prescribing')
61
         $a - Dataset acronym (e.g. 'Presc') 
62
         $e - Extraction Configuration Id (e.g. 14)
63
         You must have either $a or $d
64
         ", Mandatory = true, DefaultValue = "$c_$d")]
65
        public string TableNamingPattern { get; set; }
16✔
66

67
        [DemandsInitialization("How long should the merge command be allowed to run before timing out, in seconds", DefaultValue = 30000)]
68
        public int SQLMergeTimeout { get; set; }
16✔
69

70

71
        private DiscoveredDatabase db;
72
        private DataTable _toProcess;
73

74
        public MSSqlMergeDestination() : base(false)
4✔
75
        {
76
        }
4✔
77

78
        private string GetTableName(string suffix, DataTable dt)
79
        {
80
            string tblName = TableNamingPattern;
8✔
81
            var project = _request?.Configuration.Project;
8!
82

83
            tblName = tblName.Replace("$p", project?.Name);
8!
84
            tblName = tblName.Replace("$n", project?.ProjectNumber.ToString());
8!
85
            tblName = tblName.Replace("$c", _request?.Configuration.Name);
8!
86
            tblName = tblName.Replace("$e", _request?.Configuration.ID.ToString());
8!
87

88
            if (_request is ExtractDatasetCommand extractDatasetCommand)
8!
89
            {
UNCOV
90
                tblName = tblName.Replace("$d", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Name);
×
91
                tblName = tblName.Replace("$a", extractDatasetCommand.DatasetBundle.DataSet.Catalogue.Acronym);
×
92
            }
93

94
            if (_request is ExtractGlobalsCommand)
8!
95
            {
UNCOV
96
                tblName = tblName.Replace("$d", ExtractionDirectory.GLOBALS_DATA_NAME);
×
97
                tblName = tblName.Replace("$a", "G");
×
98
            }
99

100
            var cachedGetTableNameAnswer = SanitizeNameForDatabase(tblName);
8✔
101
            if (!string.IsNullOrWhiteSpace(suffix))
8!
UNCOV
102
                cachedGetTableNameAnswer += $"_{suffix}";
×
103

104
            return cachedGetTableNameAnswer;
8✔
105
        }
106

107
        private string SanitizeNameForDatabase(string tblName)
108
        {
109
            if (db == null)
8!
UNCOV
110
                throw new Exception(
×
111
                    "Cannot pick a TableName until we know what type of server it is going to, _server is null");
×
112

113
            //get rid of brackets and dots
114
            tblName = Regex.Replace(tblName, "[.()]", "_");
8✔
115

116
            var syntax = db.Server.GetQuerySyntaxHelper();
8✔
117
            syntax.ValidateTableName(tblName);
8✔
118

119
            //otherwise, fetch and cache answer
120
            var cachedGetTableNameAnswer = syntax.GetSensibleEntityNameFromString(tblName);
8✔
121

122
            return string.IsNullOrWhiteSpace(cachedGetTableNameAnswer)
8!
123
                ? throw new Exception(
8✔
124
                    $"TableNamingPattern '{TableNamingPattern}' resulted in an empty string for request '{_request}'")
8✔
125
                : cachedGetTableNameAnswer;
8✔
126
        }
127

128
        public override void Abort(IDataLoadEventListener listener)
129
        {
UNCOV
130
        }
×
131

132
        public override void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
133
        {
UNCOV
134
        }
×
135

136
        public override string GetDestinationDescription()
137
        {
UNCOV
138
            if (_toProcess == null)
×
139
                return _request is ExtractGlobalsCommand
×
140
                    ? "Globals"
×
141
                    : throw new Exception("Could not describe destination because _toProcess was null");
×
142

UNCOV
143
            var tblName = GetTableName(null, _toProcess);
×
144
            return $"{TargetDatabaseServer.ID}|{db.GetRuntimeName()}|{tblName}";
×
145
        }
146

147
        public override GlobalReleasePotential GetGlobalReleasabilityEvaluator(IRDMPPlatformRepositoryServiceLocator repositoryLocator, ISupplementalExtractionResults globalResult, IMapsDirectlyToDatabaseTable globalToCheck)
148
        {
UNCOV
149
            return new MsSqlGlobalsReleasePotential(repositoryLocator, globalResult, globalToCheck);
×
150
        }
151

152
        public override ReleasePotential GetReleasePotential(IRDMPPlatformRepositoryServiceLocator repositoryLocator, ISelectedDataSets selectedDataSet)
153
        {
UNCOV
154
            return new MsSqlExtractionReleasePotential(repositoryLocator, selectedDataSet);
×
155
        }
156

157
        public override FixedReleaseSource<ReleaseAudit> GetReleaseSource(ICatalogueRepository catalogueRepository)
158
        {
UNCOV
159
            return new MsSqlReleaseSource(catalogueRepository);
×
160
        }
161

162
        protected override void Open(DataTable toProcess, IDataLoadEventListener job, GracefulCancellationToken cancellationToken)
163
        {
UNCOV
164
            _toProcess = toProcess;
×
165
        }
×
166

167
        protected override void WriteRows(DataTable toProcess, IDataLoadEventListener job, GracefulCancellationToken cancellationToken, Stopwatch stopwatch)
168
        {
169
            _toProcess = toProcess;
8✔
170
            var discoveredServer = DataAccessPortal.ExpectServer(TargetDatabaseServer, DataAccessContext.DataExport, false);
8✔
171

172
            //sort out the naming 
173
            var dbName = DatabaseNamingPattern;
8✔
174

175
            dbName = dbName.Replace("$p", _project?.Name)
8!
176
                .Replace("$n", _project?.ProjectNumber.ToString())
8✔
177
                .Replace("$t", _project?.MasterTicket)
8✔
178
                .Replace("$r", _request?.Configuration.RequestTicket)
8✔
179
                .Replace("$l", _request?.Configuration.ReleaseTicket)
8✔
180
                .Replace("$e", _request?.Configuration.ID.ToString());
8✔
181

182
            //make sure the db exist
183
            db = discoveredServer.ExpectDatabase(dbName);
8✔
184
            if (!db.Exists())
8✔
185
                db.Create();
1✔
186

187

188
            var tableName = GetTableName(null, toProcess);
8✔
189
            var destinationTable = db.ExpectTable(tableName);
8✔
190

191
            //ensure there are some pks
192
            bool hasPrimaryKeys = toProcess.PrimaryKey != null && toProcess.PrimaryKey.Length > 0;
8!
193
            if (!destinationTable.Exists())
8✔
194
            {
195
                //create
196
                destinationTable = db.CreateTable(out _, tableName, toProcess, null,
4✔
197
                         true, null);
4✔
198
            }
199
            else
200
            {
201
                hasPrimaryKeys = destinationTable.DiscoverColumns().Any(col => col.IsPrimaryKey);
8✔
202
            }
203

204
            if (!hasPrimaryKeys)
8!
205
            {
UNCOV
206
                job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "No Primary Keys were found in the destination table or source configuration"));
×
207
                return;
×
208
            }
209

210
            if (UseArchiveTrigger)
8!
211
            {
212

213
                var listeners = ((ForkDataLoadEventListener)job).GetToLoggingDatabaseDataLoadEventListenersIfany();
×
214
                foreach (var dleListener in listeners)
×
215
                {
UNCOV
216
                    IDataLoadInfo dataLoadInfo = dleListener.DataLoadInfo;
×
UNCOV
217
                    DataColumn newColumn = new(SpecialFieldNames.DataLoadRunID, typeof(int))
×
218
                    {
×
219
                        DefaultValue = dataLoadInfo.ID
×
220
                    };
×
221
                    try
222
                    {
223
                        destinationTable.DiscoverColumn(SpecialFieldNames.DataLoadRunID);
×
224
                    }
×
UNCOV
225
                    catch (Exception)
×
226
                    {
UNCOV
227
                        destinationTable.AddColumn(SpecialFieldNames.DataLoadRunID, new DatabaseTypeRequest(typeof(int)), true, 30000);
×
228

UNCOV
229
                    }
×
UNCOV
230
                    if (!toProcess.Columns.Contains(SpecialFieldNames.DataLoadRunID))
×
UNCOV
231
                        toProcess.Columns.Add(newColumn);
×
UNCOV
232
                    foreach (DataRow dr in toProcess.Rows)
×
UNCOV
233
                        dr[SpecialFieldNames.DataLoadRunID] = dataLoadInfo.ID;
×
234

235
                }
236

237

UNCOV
238
                TriggerImplementerFactory triggerFactory = new TriggerImplementerFactory(FAnsi.DatabaseType.MicrosoftSQLServer);
×
UNCOV
239
                var implementor = triggerFactory.Create(destinationTable);
×
240
                bool present;
241
                try
242
                {
UNCOV
243
                    present = implementor.GetTriggerStatus() == DataLoad.Triggers.TriggerStatus.Enabled;
×
UNCOV
244
                }
×
UNCOV
245
                catch (TriggerMissingException)
×
246
                {
UNCOV
247
                    present = false;
×
UNCOV
248
                }
×
UNCOV
249
                if (!present)
×
250
                {
UNCOV
251
                    implementor.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
×
252
                }
253
            }
254

255
            var pkColumns = toProcess.PrimaryKey;
8✔
256
            var nonPkColumns = toProcess.Columns.Cast<DataColumn>().Where(dc => !pkColumns.Contains(dc)).ToArray();
24✔
257
            //merge
258
            List<DatabaseColumnRequest> columnTypes = new List<DatabaseColumnRequest>() { };
8✔
259
            foreach (var column in destinationTable.DiscoverColumns())
48✔
260
            {
261
                columnTypes.Add(new DatabaseColumnRequest(column.GetRuntimeName(), column.DataType.ToString(), column.AllowNulls));
16✔
262
            }
263

264
            var tmpTbl = db.CreateTable(
8✔
265
                out Dictionary<string, Guesser> _dataTypeDictionary,
8✔
266
                $"mergeTempTable_{tableName}_{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture).Replace('.', '-')}",
8✔
267
                toProcess, columnTypes.ToArray(), true, null);
8✔
268
            var _managedConnection = tmpTbl.Database.Server.GetManagedConnection();
8✔
269
            var _bulkcopy = tmpTbl.BeginBulkInsert(CultureInfo.CurrentCulture, _managedConnection.ManagedTransaction);
8✔
270
            _bulkcopy.Timeout = SQLMergeTimeout;
8✔
271
            _bulkcopy.Upload(toProcess);
8✔
272
            _bulkcopy.Dispose();
8✔
273
            var mergeSql = $"""
8✔
274
                MERGE INTO {destinationTable.GetFullyQualifiedName()} WITH (HOLDLOCK) AS target
8✔
275
                USING {tmpTbl.GetFullyQualifiedName()} AS source
8✔
276
                    ON {string.Join(" AND ", pkColumns.Select(pkc => $"target.{pkc.ColumnName} = source.{pkc.ColumnName}"))}
8✔
277
                WHEN MATCHED THEN 
8✔
278
                    UPDATE SET {string.Join(" , ", nonPkColumns.Select(pkc => $"target.{pkc.ColumnName} = source.{pkc.ColumnName}"))}
8✔
279
                WHEN NOT MATCHED BY TARGET THEN
8✔
280
                    INSERT ({string.Join(" , ", toProcess.Columns.Cast<DataColumn>().Select(pkc => pkc.ColumnName))})
16✔
281
                    VALUES ({string.Join(" , ", toProcess.Columns.Cast<DataColumn>().Select(pkc => $"source.{pkc.ColumnName}"))}){(AllowMergeToPerformDeletes ? "" : ";")}
16✔
282
                {(AllowMergeToPerformDeletes ? """
8✔
283
                WHEN NOT MATCHED BY SOURCE THEN
8✔
284
                    DELETE;
8✔
285
                """ : "")}
8✔
286
                """;
8✔
287
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"{mergeSql}"));
8!
288
            var cmd = new SqlCommand(mergeSql, (SqlConnection)_managedConnection.Connection);
8✔
289
            cmd.CommandTimeout = SQLMergeTimeout;
8✔
290
            var rowCount = cmd.ExecuteNonQuery();
8✔
291
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Merged {rowCount} rows into {destinationTable.GetFullyQualifiedName()}."));
8✔
292
            if (DeleteMergeTempTable) tmpTbl.Drop();
16✔
293
            _managedConnection.Dispose();
8✔
294
        }
8✔
295

296
        protected override void PreInitializeImpl(IBasicActivateItems activator, IExtractCommand request, IDataLoadEventListener listener)
297
        {
UNCOV
298
        }
×
299
    }
300
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc