• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 20057586828

09 Dec 2025 08:56AM UTC coverage: 57.193% (-0.2%) from 57.422%
20057586828

Pull #2182

github

JFriel
Merge branch 'develop' of https://github.com/HicServices/RDMP into task/RDMP-33-dataset-integration-interface
Pull Request #2182: [Datasets] Task/rdmp 33 dataset integration interface

11510 of 21615 branches covered (53.25%)

Branch coverage included in aggregate %.

1226 of 2024 new or added lines in 85 files covered. (60.57%)

35 existing lines in 15 files now uncovered.

32654 of 55604 relevant lines covered (58.73%)

8854.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.31
/Rdmp.Core/DataLoad/Engine/Pipeline/Destinations/DataTableUploadDestination.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using FAnsi;
8
using FAnsi.Connections;
9
using FAnsi.Discovery;
10
using FAnsi.Discovery.QuerySyntax;
11
using FAnsi.Discovery.TableCreation;
12
using Rdmp.Core.Curation.Data;
13
using Rdmp.Core.DataExport.Data;
14
using Rdmp.Core.DataFlowPipeline;
15
using Rdmp.Core.DataFlowPipeline.Requirements;
16
using Rdmp.Core.DataLoad.Triggers;
17
using Rdmp.Core.DataLoad.Triggers.Implementations;
18
using Rdmp.Core.Logging;
19
using Rdmp.Core.Logging.Listeners;
20
using Rdmp.Core.Repositories.Construction;
21
using Rdmp.Core.ReusableLibraryCode;
22
using Rdmp.Core.ReusableLibraryCode.Checks;
23
using Rdmp.Core.ReusableLibraryCode.DataAccess;
24
using Rdmp.Core.ReusableLibraryCode.Progress;
25
using System;
26
using System.Collections.Generic;
27
using System.Data;
28
using System.Diagnostics;
29
using System.Globalization;
30
using System.Linq;
31
using System.Threading.Tasks;
32
using TypeGuesser;
33

34
namespace Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
35

36
/// <summary>
37
/// Pipeline component (destination) which commits the DataTable(s) (in batches) to the DiscoveredDatabase (PreInitialize argument).  Supports cross platform
38
/// targets (MySql , Sql Server etc).  Normally the SQL Data Types and column names will be computed from the DataTable and a table will be created with the
39
/// name of the DataTable being processed.  If a matching table already exists you can choose to load it anyway in which case a basic bulk insert will take
40
/// place.
41
/// </summary>
42
public class DataTableUploadDestination : IPluginDataFlowComponent<DataTable>, IDataFlowDestination<DataTable>,
43
    IPipelineRequirement<DiscoveredDatabase>
44
{
45
    public const string LoggingServer_Description =
46
        "The logging server to log the upload to (leave blank to not bother auditing)";
47

48
    public const string AllowResizingColumnsAtUploadTime_Description =
49
        "If the target table being loaded has columns that are too small the destination will attempt to resize them";
50

51
    public const string AllowLoadingPopulatedTables_Description =
52
        "Normally when DataTableUploadDestination encounters a table that already contains records it will abandon the insertion attempt.  Set this to true to instead continue with the load.";
53

54
    public const string AlterTimeout_Description =
55
        "Timeout to perform all ALTER TABLE operations (column resize and PK creation)";
56

57
    [DemandsInitialization(LoggingServer_Description)]
58
    public ExternalDatabaseServer LoggingServer { get; set; }
200✔
59

60
    [DemandsInitialization(AllowResizingColumnsAtUploadTime_Description, DefaultValue = true)]
61
    public bool AllowResizingColumnsAtUploadTime { get; set; }
363✔
62

63
    [DemandsInitialization(AllowLoadingPopulatedTables_Description, DefaultValue = false)]
64
    public bool AllowLoadingPopulatedTables { get; set; }
47✔
65

66
    [DemandsInitialization(AlterTimeout_Description, DefaultValue = 300)]
67
    public int AlterTimeout { get; set; }
80✔
68

69
    [DemandsInitialization("Optional - Change system behaviour when a new table is being created by the component",
70
        TypeOf = typeof(IDatabaseColumnRequestAdjuster))]
71
    public Type Adjuster { get; set; }
181✔
72

73
    public bool AppendDataIfTableExists { get; set; }
196✔
74

75
    public bool IncludeTimeStamp { get; set; }
347✔
76

77
    private CultureInfo _culture;
78

79
    [DemandsInitialization("The culture to use for uploading (determines date format etc)")]
80
    public CultureInfo Culture
81
    {
82
        get => _culture ?? CultureInfo.CurrentCulture;
133✔
83
        set => _culture = value;
11✔
84
    }
85

86
    public string TargetTableName { get; private set; }
1,167✔
87

88
    /// <summary>
89
    /// True if a new table was created or re-created by the execution of this destination.  False if
90
    /// the table already existed e.g. data was simply added
91
    /// </summary>
92
    public bool CreatedTable { get; private set; }
355✔
93
    public bool UseTrigger { get; set; } = false;
355✔
94

95
    public bool IndexTables { get; set; } = false;
190✔
96
    public string IndexTableName { get; set; }
40✔
97
    public List<String> UserDefinedIndexes { get; set; } = new();
170✔
98

99
    private IBulkCopy _bulkcopy;
100
    private int _affectedRows;
101

102
    private Stopwatch swTimeSpentWriting = new();
135✔
103
    private Stopwatch swMeasuringStrings = new();
135✔
104

105
    private DiscoveredServer _loggingDatabaseSettings;
106

107
    private DiscoveredServer _server;
108
    private DiscoveredDatabase _database;
109

110
    private DataLoadInfo _dataLoadInfo;
111

112
    private IManagedConnection _managedConnection;
113
    private ToLoggingDatabaseDataLoadEventListener _loggingDatabaseListener;
114

115
    public List<DatabaseColumnRequest> ExplicitTypes { get; set; }
1,030✔
116

117
    private bool _firstTime = true;
135✔
118
    private HashSet<string> _primaryKey = new(StringComparer.CurrentCultureIgnoreCase);
135✔
119
    private DiscoveredTable _discoveredTable;
120
    private readonly string _extractionTimeStamp = "extraction_timestamp";
135✔
121

122
    private readonly IExternalCohortTable _externalCohortTable;
123

124
    //All column values sent to server so far
125
    private Dictionary<string, Guesser> _dataTypeDictionary;
126

127
    /// <summary>
128
    /// Optional function called when a name is needed for the table being uploaded (this overrides
129
    /// upstream components naming of tables - e.g. from file names).
130
    /// </summary>
131
    public Func<string> TableNamerDelegate { get; set; }
135✔
132

133
    public DataTableUploadDestination()
118✔
134
    {
135
        ExplicitTypes = new List<DatabaseColumnRequest>();
118✔
136
    }
118✔
137

138
    public DataTableUploadDestination(IExternalCohortTable externalCohortTable)
17✔
139
    {
140
        ExplicitTypes = new List<DatabaseColumnRequest>();
17✔
141
        _externalCohortTable = externalCohortTable;
17✔
142
    }
17✔
143

144
    private static object[] FilterOutItemAtIndex(object[] itemArray, int[] indexes)
145
    {
146
        if (indexes.Length == 0) return itemArray;
12!
147
        return itemArray.Where((source, idx) => !indexes.Contains(idx)).ToArray();
267✔
148
    }
149

150
    public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener listener,
151
        GracefulCancellationToken cancellationToken)
152
    {
153
        if (toProcess == null)
166!
154
            return null;
×
155

156
        var rowsToModify = new List<DataRow>();
166✔
157
        var pkColumns = toProcess.PrimaryKey;
166✔
158
        RemoveInvalidCharactersInSchema(toProcess);
166✔
159

160
        IDatabaseColumnRequestAdjuster adjuster = null;
166✔
161
        if (Adjuster != null) adjuster = (IDatabaseColumnRequestAdjuster)ObjectConstructor.Construct(Adjuster);
168✔
162

163
        //work out the table name for the table we are going to create
164
        if (TargetTableName == null)
166✔
165
        {
166
            if (TableNamerDelegate != null)
135!
167
            {
168
                TargetTableName = TableNamerDelegate();
×
169
                if (string.IsNullOrWhiteSpace(TargetTableName))
×
170
                    throw new Exception("No table name specified (TableNamerDelegate returned null)");
×
171
            }
172
            else if (string.IsNullOrWhiteSpace(toProcess.TableName))
135!
173
            {
174
                throw new Exception(
×
175
                    "Chunk did not have a TableName, did not know what to call the newly created table");
×
176
            }
177
            else
178
            {
179
                TargetTableName = QuerySyntaxHelper.MakeHeaderNameSensible(toProcess.TableName);
135✔
180
            }
181
        }
182

183
        ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(toProcess);
166✔
184

185
        StartAuditIfExists(TargetTableName);
166✔
186

187
        if (IncludeTimeStamp)
166!
188
        {
189
            AddTimeStampToExtractionData(toProcess);
×
190
        }
191

192
        if (_loggingDatabaseListener != null)
166!
193
            listener = new ForkDataLoadEventListener(listener, _loggingDatabaseListener);
×
194

195
        EnsureTableHasDataInIt(toProcess);
166✔
196

197
        CreatedTable = false;
164✔
198

199
        if (_firstTime)
164✔
200
        {
201
            var tableAlreadyExistsButEmpty = false;
133✔
202

203
            if (!_database.Exists())
133!
204
                throw new Exception($"Database {_database} does not exist");
×
205

206
            _discoveredTable = _database.ExpectTable(TargetTableName);
133✔
207

208
            //table already exists
209
            if (_discoveredTable.Exists())
133✔
210
            {
211
                tableAlreadyExistsButEmpty = true;
33✔
212

213
                if (!AllowLoadingPopulatedTables)
33✔
214
                    if (_discoveredTable.IsEmpty())
30✔
215
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
21✔
216
                            $"Found table {TargetTableName} already, normally this would forbid you from loading it (data duplication / no primary key etc) but it is empty so we are happy to load it, it will not be created"));
21✔
217
                    else if (!AppendDataIfTableExists)
9!
218
                        throw new Exception(
×
219
                            $"There is already a table called {TargetTableName} at the destination {_database}");
×
220

221
                if (AllowResizingColumnsAtUploadTime)
33✔
222
                    _dataTypeDictionary = _discoveredTable.DiscoverColumns().ToDictionary(k => k.GetRuntimeName(),
345✔
223
                        v => v.GetGuesser(), StringComparer.CurrentCultureIgnoreCase);
345✔
224
            }
225
            else
226
            {
227
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
100✔
228
                    $"Determined that the table name {TargetTableName} is unique at destination {_database}"));
100✔
229
            }
230

231
            //create connection to destination
232
            if (!tableAlreadyExistsButEmpty)
133✔
233
            {
234
                CreatedTable = true;
100✔
235

236
                if (AllowResizingColumnsAtUploadTime)
100✔
237
                    _database.CreateTable(out _dataTypeDictionary, TargetTableName, toProcess, ExplicitTypes.ToArray(),
53✔
238
                        true, adjuster);
53✔
239
                else
240
                    _database.CreateTable(TargetTableName, toProcess, ExplicitTypes.ToArray(), true, adjuster);
47✔
241

242
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
100✔
243
                    $"Created table {TargetTableName} successfully."));
100✔
244
            }
245

246
            _managedConnection = _server.BeginNewTransactedConnection();
133✔
247
            _bulkcopy = _discoveredTable.BeginBulkInsert(Culture, _managedConnection.ManagedTransaction);
133✔
248
            _firstTime = false;
133✔
249
        }
250

251
        if (IncludeTimeStamp && _discoveredTable.DiscoverColumns().All(c => c.GetRuntimeName() != _extractionTimeStamp))
164!
252
        {
253
            _discoveredTable.AddColumn(_extractionTimeStamp, new DatabaseTypeRequest(typeof(DateTime)), true, 30000);
×
254
        }
255

256
        if (IndexTables)
164✔
257
        {
258
            var indexes = UserDefinedIndexes.Count != 0 ? UserDefinedIndexes : pkColumns.Select(c => c.ColumnName);
14✔
259
            try
260
            {
261
                _discoveredTable.CreateIndex(IndexTableName, _discoveredTable.DiscoverColumns().Where(c => indexes.Contains(c.GetRuntimeName())).ToArray());
43✔
262
            }
7✔
263
            catch (Exception e)
7✔
264
            {
265
                //We only warn about not creating the index, as it's not  critical
266
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, e.Message));
7✔
267
            }
7✔
268
        }
269
        if (UseTrigger && pkColumns.Length > 0)
164✔
270
        {
271
            if (listener.GetType() == typeof(ForkDataLoadEventListener)) //need to add special fields to the datatable if we are logging to a database
13✔
272
            {
273
                var job = (ForkDataLoadEventListener)listener;
6✔
274
                var listeners = job.GetToLoggingDatabaseDataLoadEventListenersIfany();
6✔
275
                foreach (var dleListener in listeners)
24✔
276
                {
277
                    IDataLoadInfo dataLoadInfo = dleListener.DataLoadInfo;
6✔
278
                    DataColumn newColumn = new(SpecialFieldNames.DataLoadRunID, typeof(int))
6✔
279
                    {
6✔
280
                        DefaultValue = dataLoadInfo.ID
6✔
281
                    };
6✔
282
                    try
283
                    {
284
                        _discoveredTable.DiscoverColumn(SpecialFieldNames.DataLoadRunID);
6✔
285
                    }
6✔
286
                    catch (Exception)
×
287
                    {
288
                        _discoveredTable.AddColumn(SpecialFieldNames.DataLoadRunID, new DatabaseTypeRequest(typeof(int)), true, 30000);
×
289

290
                    }
×
291
                    if (!toProcess.Columns.Contains(SpecialFieldNames.DataLoadRunID))
6!
292
                        toProcess.Columns.Add(newColumn);
×
293
                    foreach (DataRow dr in toProcess.Rows)
26✔
294
                        dr[SpecialFieldNames.DataLoadRunID] = dataLoadInfo.ID;
7✔
295

296
                }
297
            }
298
        }
299

300
        try
301
        {
302
            if (AllowResizingColumnsAtUploadTime && !CreatedTable)
164✔
303
                ResizeColumnsIfRequired(toProcess, listener);
38✔
304

305
            swTimeSpentWriting.Start();
164✔
306
            if (AppendDataIfTableExists && pkColumns.Length > 0) //assumes columns are the same
164✔
307
            {
308
                //drop any pk clashes
309
                var existingData = _discoveredTable.GetDataTable();
24✔
310
                var rowsToDelete = new List<DataRow>();
24✔
311
                var releaseIdentifier = _externalCohortTable is not null ? _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1] : "ReleaseId";
24✔
312
                int[] toProcessIgnoreColumns = [toProcess.Columns.IndexOf(SpecialFieldNames.DataLoadRunID), toProcess.Columns.IndexOf(releaseIdentifier)];
24✔
313
                int[] existingDataIgnoreColumns = [existingData.Columns.IndexOf(SpecialFieldNames.DataLoadRunID), existingData.Columns.IndexOf(releaseIdentifier), existingData.Columns.IndexOf(SpecialFieldNames.ValidFrom)];
24✔
314
                foreach (DataRow row in toProcess.Rows)
104✔
315
                {
316

317
                    foreach (DataColumn pkCol in pkColumns)
115✔
318
                    {
319
                        bool clash = false;
31✔
320
                        if (_externalCohortTable is not null && pkCol.ColumnName == _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1])
31✔
321
                        {
322
                            // If it's a cohort release identifier
323
                            // look up the original value and check we've not already extected the same value under a different release ID
324
                            var privateIdentifierField = _externalCohortTable.PrivateIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
18✔
325
                            var releaseIdentifierField = _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
18✔
326
                            var cohortTable = _externalCohortTable.DiscoverCohortTable();
18✔
327
                            using var lookupDT = cohortTable.GetDataTable();
18✔
328
                            var releaseIdIndex = lookupDT.Columns.IndexOf(releaseIdentifierField);
18✔
329
                            var privateIdIndex = lookupDT.Columns.IndexOf(privateIdentifierField);
18✔
330
                            var foundRow = lookupDT.Rows.Cast<DataRow>().FirstOrDefault(r => r.ItemArray[releaseIdIndex].ToString() == row[pkCol.ColumnName].ToString());
44✔
331
                            if (foundRow is not null)
18✔
332
                            {
333
                                var originalValue = foundRow.ItemArray[privateIdIndex];
18✔
334
                                var existingIDsforReleaseID = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[privateIdIndex].ToString() == originalValue.ToString()).Select(s => s.ItemArray[releaseIdIndex].ToString());
52✔
335
                                clash = existingData.AsEnumerable().Any(r => existingIDsforReleaseID.Contains(r[pkCol.ColumnName].ToString()));
29✔
336
                            }
337
                        }
338
                        else
339
                        {
340
                            var val = row[pkCol.ColumnName];
13✔
341
                            clash = existingData.AsEnumerable().Any(r => r[pkCol.ColumnName].ToString() == val.ToString());
20✔
342

343
                        }
344
                        if (clash && UseTrigger)
31✔
345
                        {
346
                            if (existingData.AsEnumerable().Any(r => FilterOutItemAtIndex(r.ItemArray, existingDataIgnoreColumns).ToList().SequenceEqual(FilterOutItemAtIndex(row.ItemArray, toProcessIgnoreColumns).ToList()))) //do we have to worry about special field? what if the load ids are different?
12✔
347
                            {
348
                                //the row is the exact same,so there is no clash
349
                                clash = false;
3✔
350
                                rowsToDelete.Add(row);
3✔
351
                            }
352
                            else //row needs updated, but only if we're tracking history
353
                            {
354
                                rowsToModify.Add(row);//need to know what releaseId to replace
3✔
355
                                break;
3✔
356
                            }
357
                        }
358
                    }
359
                }
360
                foreach (DataRow row in rowsToDelete.Distinct())
54✔
361
                    toProcess.Rows.Remove(row);
3✔
362

363
            }
364

365
            if (rowsToModify.Any())
164✔
366
            {
367
                var tmpDt = toProcess.Clone();
3✔
368
                foreach (var row in rowsToModify)
12✔
369
                {
370
                    tmpDt.ImportRow(row);
3✔
371
                }
372
                var tblName = $"tmpTable_{DateTime.Now.Ticks / TimeSpan.TicksPerMillisecond}";
3✔
373
                var tmpTable = _discoveredTable.Database.CreateTable(tblName, tmpDt);
3✔
374

375
                var customLines = new List<CustomLine>() { };
3✔
376
                foreach (var col in tmpDt.Columns.Cast<DataColumn>())
92✔
377
                {
378
                    if (!pkColumns.Select(c => c.ColumnName).Contains(col.ColumnName))
86✔
379
                    {
380
                        customLines.Add(new CustomLine($"t1.{col.ColumnName} = t2.{col.ColumnName}", QueryComponent.SET));
40✔
381
                    }
382
                    else
383
                    {
384
                        customLines.Add(new CustomLine($"t1.{col.ColumnName} = t2.{col.ColumnName}", QueryComponent.JoinInfoJoin));
3✔
385
                        customLines.Add(new CustomLine($"t1.{col.ColumnName} = t2.{col.ColumnName}", QueryComponent.WHERE));
3✔
386
                    }
387
                }
388
                var existingColumns = _discoveredTable.DiscoverColumns().Select(c => c.GetRuntimeName()).Where(c => c != SpecialFieldNames.DataLoadRunID && c != SpecialFieldNames.ValidFrom);
99✔
389
                var columnsThatPreviouslyExisted = existingColumns.Where(ec => !toProcess.Columns.Cast<DataColumn>().ToList().Select(c => c.ColumnName).Contains(ec));
792✔
390
                foreach (var col in columnsThatPreviouslyExisted)
6!
391
                {
NEW
392
                    customLines.Add(new CustomLine($"t1.{col} = t2.{col}", QueryComponent.SET));
×
393

394
                }
395

396
                var str = _discoveredTable.Database.Server.GetQuerySyntaxHelper().UpdateHelper.BuildUpdate(_discoveredTable, tmpTable, customLines);
3✔
397
                using (var connection = _discoveredTable.Database.Server.GetConnection())
3✔
398
                {
399
                    connection.Open();
3✔
400
                    var cmd = _discoveredTable.Database.Server.GetCommand(str, connection);
3✔
401
                    cmd.ExecuteNonQuery();
3✔
402
                    tmpTable.Drop();
3✔
403
                }
3✔
404
            }
405

406
            foreach (DataRow row in rowsToModify.Distinct())
334✔
407
            {
408
                toProcess.Rows.Remove(row);
3✔
409
            }
410
            if (toProcess.Rows.Count == 0 && !rowsToModify.Any()) return null;
166✔
411
            if (toProcess.Rows.Count > 0)
162✔
412
            {
413
                _affectedRows += _bulkcopy.Upload(toProcess);
159✔
414
            }
415

416
            swTimeSpentWriting.Stop();
139✔
417
            listener.OnProgress(this,
139✔
418
                new ProgressEventArgs($"Uploading to {TargetTableName}",
139✔
419
                    new ProgressMeasurement(_affectedRows, ProgressType.Records), swTimeSpentWriting.Elapsed));
139✔
420
        }
139✔
421
        catch (Exception e)
23✔
422
        {
423
            _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
23✔
424

425
            if (LoggingServer != null)
23!
426
                _dataLoadInfo.LogFatalError(GetType().Name, ExceptionHelper.ExceptionToListOfInnerMessages(e, true));
×
427

428
            throw new Exception($"Failed to write rows (in transaction) to table {TargetTableName}", e);
23✔
429
        }
430

431

432
        _dataLoadInfo?.CloseAndMarkComplete();
139!
433
        return null;
139✔
434
    }
2✔
435

436

437
    private static void RemoveInvalidCharactersInSchema(DataTable toProcess)
438
    {
439
        var invalidSymbols = new[] { '.' };
166✔
440

441
        if (!string.IsNullOrWhiteSpace(toProcess.TableName) && invalidSymbols.Any(c => toProcess.TableName.Contains(c)))
329!
442
            foreach (var symbol in invalidSymbols)
×
443
                toProcess.TableName = toProcess.TableName.Replace(symbol.ToString(), "");
×
444

445
        foreach (DataColumn col in toProcess.Columns)
2,766✔
446
            if (!string.IsNullOrWhiteSpace(col.ColumnName) && invalidSymbols.Any(c => col.ColumnName.Contains(c)))
2,434✔
447
                foreach (var symbol in invalidSymbols)
8✔
448
                    col.ColumnName = col.ColumnName.Replace(symbol.ToString(), "");
2✔
449
    }
166✔
450

451

452
    /// <summary>
453
    /// Clears the primary key status of the DataTable / <see cref="ExplicitTypes"/>.  These are recorded in <see cref="_primaryKey"/> and applied at Dispose time
454
    /// in order that primary key in the destination database table does not interfere with ALTER statements (see <see cref="ResizeColumnsIfRequired"/>)
455
    /// </summary>
456
    /// <param name="toProcess"></param>
457
    private void ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(DataTable toProcess)
458
    {
459
        //handle primary keyness by removing it until Dispose step
460
        foreach (var pkCol in toProcess.PrimaryKey.Select(dc => dc.ColumnName))
443✔
461
            _primaryKey.Add(pkCol);
37✔
462

463
        toProcess.PrimaryKey = Array.Empty<DataColumn>();
166✔
464

465
        //also get rid of any ExplicitTypes primary keys
466
        foreach (var dcr in ExplicitTypes.Where(dcr => dcr.IsPrimaryKey))
995✔
467
        {
468
            dcr.IsPrimaryKey = false;
16✔
469
            _primaryKey.Add(dcr.ColumnName);
16✔
470
        }
471
    }
166✔
472

473

474
    private void AddTimeStampToExtractionData(DataTable toProcess)
475
    {
476
        var timeStamp = DateTime.Now;
×
477
        toProcess.Columns.Add(_extractionTimeStamp);
×
478
        foreach (DataRow row in toProcess.Rows)
×
479
        {
480
            row[_extractionTimeStamp] = timeStamp;
×
481
        }
482
    }
×
483

484
    private static void EnsureTableHasDataInIt(DataTable toProcess)
485
    {
486
        if (toProcess.Columns.Count == 0)
166✔
487
            throw new Exception($"DataTable '{toProcess}' had no Columns!");
1✔
488

489
        if (toProcess.Rows.Count == 0)
165✔
490
            throw new Exception($"DataTable '{toProcess}' had no Rows!");
1✔
491
    }
164✔
492

493
    private void ResizeColumnsIfRequired(DataTable toProcess, IDataLoadEventListener listener)
494
    {
495
        swMeasuringStrings.Start();
38✔
496

497
        var tbl = _database.ExpectTable(TargetTableName);
38✔
498
        var typeTranslater = tbl.GetQuerySyntaxHelper().TypeTranslater;
38✔
499

500
        //Get the current estimates from the datatype computer
501
        var oldTypes = _dataTypeDictionary.ToDictionary(k => k.Key,
409✔
502
            v => typeTranslater.GetSQLDBTypeForCSharpType(v.Value.Guess), StringComparer.CurrentCultureIgnoreCase);
409✔
503

504
        //columns in
505
        var sharedColumns = new List<string>();
38✔
506

507
        //for each destination column
508
        foreach (var col in _dataTypeDictionary.Keys)
818✔
509
            //if it appears in the toProcess DataTable
510
            if (toProcess.Columns.Contains(col))
371✔
511
                sharedColumns.Add(col); //it is a shared column
366✔
512

513
        //for each shared column adjust the corresponding computer for all rows
514
        Parallel.ForEach(sharedColumns, col =>
38✔
515
        {
38✔
516
            var guesser = _dataTypeDictionary[col];
366✔
517
            foreach (DataRow row in toProcess.Rows)
1,808✔
518
                guesser.AdjustToCompensateForValue(row[col]);
538✔
519
        });
404✔
520

521
        //see if any have changed
522
        foreach (DataColumn column in toProcess.Columns)
808✔
523
        {
524
            if (column.ColumnName == _extractionTimeStamp && IncludeTimeStamp)
366!
525
            {
526
                continue; //skip internally generated columns
527
            }
528
            //get what is required for the current batch and the current type that is configured in the live table
529
            oldTypes.TryGetValue(column.ColumnName, out var oldSqlType);
366✔
530
            _dataTypeDictionary.TryGetValue(column.ColumnName, out var knownType);
366✔
531

532
            var newSqlType = knownType is not null ? typeTranslater.GetSQLDBTypeForCSharpType(knownType.Guess) : null;
366!
533

534
            var changesMade = false;
366✔
535

536
            //if the SQL data type has degraded e.g. varchar(10) to varchar(50) or datetime to varchar(20)
537
            if (oldSqlType != newSqlType)
366✔
538
            {
539
                var col = tbl.DiscoverColumn(column.ColumnName, _managedConnection.ManagedTransaction);
32✔
540

541

542
                if (AbandonAlter(col.DataType.SQLType, newSqlType, out var reason))
32✔
543
                {
544
                    listener.OnNotify(this,
1✔
545
                        new NotifyEventArgs(ProgressEventType.Warning,
1✔
546
                            $"Considered resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}' but decided not to because:{reason}"));
1✔
547
                    continue;
1✔
548
                }
549

550
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
31✔
551
                    $"Resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}'"));
31✔
552

553
                //try changing the Type to the legit type
554
                col.DataType.AlterTypeTo(newSqlType, _managedConnection.ManagedTransaction, AlterTimeout);
31✔
555

556
                changesMade = true;
31✔
557
            }
558

559
            if (changesMade)
365✔
560
                _bulkcopy.InvalidateTableSchema();
31✔
561
        }
562

563
        swMeasuringStrings.Stop();
38✔
564
        listener.OnProgress(this,
38✔
565
            new ProgressEventArgs("Measuring DataType Sizes",
38✔
566
                new ProgressMeasurement(_affectedRows + toProcess.Rows.Count, ProgressType.Records),
38✔
567
                swMeasuringStrings.Elapsed));
38✔
568
    }
38✔
569

570
    /// <summary>
571
    /// Returns true if we should not be trying to do this alter after all
572
    /// </summary>
573
    /// <param name="oldSqlType">The database proprietary type you are considering altering from</param>
574
    /// <param name="newSqlType">The ANSI SQL type you are considering altering to</param>
575
    /// <param name="reason">Null or the reason we are returning true</param>
576
    /// <returns>True if the proposed alter is a bad idea and shouldn't be attempted</returns>
577
    protected virtual bool AbandonAlter(string oldSqlType, string newSqlType, out string reason)
578
    {
579
        var basicallyDecimalAlready = new List<string> { "real", "double", "float", "single" };
32✔
580

581
        var first = basicallyDecimalAlready.FirstOrDefault(c =>
32✔
582
            oldSqlType.Contains(c, StringComparison.InvariantCultureIgnoreCase));
157✔
583

584
        if (first != null && newSqlType.Contains("decimal", StringComparison.InvariantCultureIgnoreCase))
32✔
585
        {
586
            reason = $"Resizing from {first} to decimal is a bad idea and likely to fail";
1✔
587
            return true;
1✔
588
        }
589

590
        reason = null;
31✔
591
        return false;
31✔
592
    }
593

594
    public void Abort(IDataLoadEventListener listener)
595
    {
596
        _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
×
597
    }
×
598

599
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
600
    {
601
        try
602
        {
603
            if (_managedConnection != null)
139✔
604
            {
605
                //if there was an error
606
                if (pipelineFailureExceptionIfAny != null)
137✔
607
                {
608
                    _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
21✔
609

610
                    listener.OnNotify(this,
21✔
611
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction rolled back successfully"));
21✔
612

613
                    _bulkcopy?.Dispose();
21!
614
                }
615
                else
616
                {
617
                    _managedConnection.ManagedTransaction.CommitAndCloseConnection();
116✔
618

619
                    _bulkcopy?.Dispose();
116!
620

621
                    listener.OnNotify(this,
116✔
622
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction committed successfully"));
116✔
623
                }
624
            }
625
        }
139✔
626
        catch (Exception e)
×
627
        {
628
            listener.OnNotify(this,
×
629
                new NotifyEventArgs(ProgressEventType.Error,
×
630
                    "Commit failed on transaction (probably there was a previous error?)", e));
×
631
        }
×
632

633
        //if we have a primary key to create
634
        if (pipelineFailureExceptionIfAny == null && _primaryKey?.Any() == true && _discoveredTable?.Exists() == true)
139!
635
        {
636
            //Find the columns in the destination
637
            var allColumns = _discoveredTable.DiscoverColumns();
32✔
638

639
            //if there are not yet any primary keys
640
            if (allColumns.All(c => !c.IsPrimaryKey))
335✔
641
            {
642
                //find the columns the user decorated in his DataTable
643
                var pkColumnsToCreate = allColumns.Where(c =>
21✔
644
                        _primaryKey.Any(pk => pk.Equals(c.GetRuntimeName(), StringComparison.CurrentCultureIgnoreCase)))
622✔
645
                    .ToArray();
21✔
646

647
                //make sure we found all of them
648
                if (pkColumnsToCreate.Length != _primaryKey.Count)
21!
649
                    throw new Exception(
×
650
                        $"Could not find primary key column(s) {string.Join(",", _primaryKey)} in table {_discoveredTable}");
×
651

652
                //create the primary key to match user provided columns
653
                _discoveredTable.CreatePrimaryKey(AlterTimeout, pkColumnsToCreate);
21✔
654
            }
655
        }
656

657
        if (UseTrigger && _discoveredTable?.DiscoverColumns().Any(static col => col.IsPrimaryKey) == true) //can't use triggers without a PK
231!
658
        {
659
            var factory = new TriggerImplementerFactory(_database.Server.DatabaseType);
13✔
660
            var triggerImplementer = factory.Create(_discoveredTable);
13✔
661
            var currentStatus = triggerImplementer.GetTriggerStatus();
13✔
662
            if (currentStatus == TriggerStatus.Missing)
13✔
663
                try
664
                {
665
                    triggerImplementer.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
7✔
666
                }
7✔
667
                catch (Exception e)
×
668
                {
669
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, e.Message));
×
670
                }
×
671
        }
672

673
        EndAuditIfExists();
139✔
674
    }
139✔
675

676
    private void EndAuditIfExists()
677
    {
678
        //user is auditing
679
        _loggingDatabaseListener?.FinalizeTableLoadInfos();
139!
680
    }
×
681

682
    public void Check(ICheckNotifier notifier)
683
    {
684
        if (LoggingServer != null)
×
685
            new LoggingDatabaseChecker(LoggingServer).Check(notifier);
×
686
        else
687
            notifier.OnCheckPerformed(
×
688
                new CheckEventArgs(
×
689
                    "There is no logging server so there will be no audit of this destinations activities",
×
690
                    CheckResult.Success));
×
691
    }
×
692

693
    private void StartAuditIfExists(string tableName)
694
    {
695
        if (LoggingServer != null && _dataLoadInfo == null)
166!
696
        {
697
            _loggingDatabaseSettings = DataAccessPortal.ExpectServer(LoggingServer, DataAccessContext.Logging);
×
698
            var logManager = new LogManager(_loggingDatabaseSettings);
×
699
            logManager.CreateNewLoggingTaskIfNotExists("Internal");
×
700

701
            _dataLoadInfo = (DataLoadInfo)logManager.CreateDataLoadInfo("Internal", GetType().Name,
×
702
                $"Loading table {tableName}", "", false);
×
703
            _loggingDatabaseListener = new ToLoggingDatabaseDataLoadEventListener(logManager, _dataLoadInfo);
×
704
        }
705
    }
166✔
706

707
    public void PreInitialize(DiscoveredDatabase value, IDataLoadEventListener listener)
708
    {
709
        _database = value;
146✔
710
        _server = value.Server;
146✔
711
    }
146✔
712

713
    /// <summary>
714
    /// Declare that the column of name columnName (which might or might not appear in DataTables being uploaded) should always have the associated database type (e.g. varchar(59))
715
    /// The columnName is Case insensitive.  Note that if AllowResizingColumnsAtUploadTime is true then these datatypes are only the starting types and might get changed later to
716
    /// accommodate new data.
717
    /// </summary>
718
    /// <param name="columnName"></param>
719
    /// <param name="explicitType"></param>
720
    /// <param name="columnFlags"></param>
721
    /// <returns>The Column Request that has been added to the array</returns>
722
    public DatabaseColumnRequest AddExplicitWriteType(string columnName, string explicitType,
723
        ISupplementalColumnInformation columnFlags = null)
724
    {
725
        DatabaseColumnRequest columnRequest;
726

727
        if (columnFlags == null)
629!
728
        {
729
            columnRequest = new DatabaseColumnRequest(columnName, explicitType, true);
629✔
730
            ExplicitTypes.Add(columnRequest);
629✔
731
            return columnRequest;
629✔
732
        }
733

734
        columnRequest = new DatabaseColumnRequest(columnName, explicitType,
×
735
            !columnFlags.IsPrimaryKey && !columnFlags.IsAutoIncrement)
×
736
        {
×
737
            IsPrimaryKey = columnFlags.IsPrimaryKey,
×
738
            IsAutoIncrement = columnFlags.IsAutoIncrement,
×
739
            Collation = columnFlags.Collation
×
740
        };
×
741

742
        ExplicitTypes.Add(columnRequest);
×
743
        return columnRequest;
×
744
    }
745
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc