• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 13318089130

13 Feb 2025 10:13PM UTC coverage: 57.398% (+0.004%) from 57.394%
13318089130

Pull #2134

github

jas88
Update ChildProviderTests.cs

Fix up TestUpTo method
Pull Request #2134: CodeQL fixups

11346 of 21308 branches covered (53.25%)

Branch coverage included in aggregate %.

104 of 175 new or added lines in 45 files covered. (59.43%)

362 existing lines in 23 files now uncovered.

32218 of 54590 relevant lines covered (59.02%)

17091.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.24
/Rdmp.Core/DataLoad/Engine/Pipeline/Destinations/DataTableUploadDestination.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Diagnostics;
11
using System.Globalization;
12
using System.Linq;
13
using System.Threading.Tasks;
14
using FAnsi.Connections;
15
using FAnsi.Discovery;
16
using FAnsi.Discovery.TableCreation;
17
using Rdmp.Core.Curation.Data;
18
using Rdmp.Core.DataExport.Data;
19
using Rdmp.Core.DataFlowPipeline;
20
using Rdmp.Core.DataFlowPipeline.Requirements;
21
using Rdmp.Core.DataLoad.Triggers.Implementations;
22
using Rdmp.Core.DataLoad.Triggers;
23
using Rdmp.Core.Logging;
24
using Rdmp.Core.Logging.Listeners;
25
using Rdmp.Core.Repositories.Construction;
26
using Rdmp.Core.ReusableLibraryCode;
27
using Rdmp.Core.ReusableLibraryCode.Checks;
28
using Rdmp.Core.ReusableLibraryCode.DataAccess;
29
using Rdmp.Core.ReusableLibraryCode.Progress;
30
using TypeGuesser;
31
using FAnsi;
32

33
namespace Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
34

35
/// <summary>
36
/// Pipeline component (destination) which commits the DataTable(s) (in batches) to the DiscoveredDatabase (PreInitialize argument).  Supports cross platform
37
/// targets (MySql , Sql Server etc).  Normally the SQL Data Types and column names will be computed from the DataTable and a table will be created with the
38
/// name of the DataTable being processed.  If a matching table already exists you can choose to load it anyway in which case a basic bulk insert will take
39
/// place.
40
/// </summary>
41
public class DataTableUploadDestination : IPluginDataFlowComponent<DataTable>, IDataFlowDestination<DataTable>,
42
    IPipelineRequirement<DiscoveredDatabase>
43
{
44
    public const string LoggingServer_Description =
45
        "The logging server to log the upload to (leave blank to not bother auditing)";
46

47
    public const string AllowResizingColumnsAtUploadTime_Description =
48
        "If the target table being loaded has columns that are too small the destination will attempt to resize them";
49

50
    public const string AllowLoadingPopulatedTables_Description =
51
        "Normally when DataTableUploadDestination encounters a table that already contains records it will abandon the insertion attempt.  Set this to true to instead continue with the load.";
52

53
    public const string AlterTimeout_Description =
54
        "Timeout to perform all ALTER TABLE operations (column resize and PK creation)";
55

56
    [DemandsInitialization(LoggingServer_Description)]
57
    public ExternalDatabaseServer LoggingServer { get; set; }
370✔
58

59
    [DemandsInitialization(AllowResizingColumnsAtUploadTime_Description, DefaultValue = true)]
60
    public bool AllowResizingColumnsAtUploadTime { get; set; }
668✔
61

62
    [DemandsInitialization(AllowLoadingPopulatedTables_Description, DefaultValue = false)]
63
    public bool AllowLoadingPopulatedTables { get; set; }
80✔
64

65
    [DemandsInitialization(AlterTimeout_Description, DefaultValue = 300)]
66
    public int AlterTimeout { get; set; }
138✔
67

68
    [DemandsInitialization("Optional - Change system behaviour when a new table is being created by the component",
69
        TypeOf = typeof(IDatabaseColumnRequestAdjuster))]
70
    public Type Adjuster { get; set; }
336✔
71

72
    public bool AppendDataIfTableExists { get; set; }
354✔
73

74
    public bool IncludeTimeStamp { get; set; }
642✔
75

76
    private CultureInfo _culture;
77

78
    [DemandsInitialization("The culture to use for uploading (determines date format etc)")]
79
    public CultureInfo Culture
80
    {
81
        get => _culture ?? CultureInfo.CurrentCulture;
246✔
82
        set => _culture = value;
16✔
83
    }
84

85
    public string TargetTableName { get; private set; }
2,178✔
86

87
    /// <summary>
88
    /// True if a new table was created or re-created by the execution of this destination.  False if
89
    /// the table already existed e.g. data was simply added
90
    /// </summary>
91
    public bool CreatedTable { get; private set; }
660✔
92
    public bool UseTrigger { get; set; } = false;
620✔
93

94
    public bool IndexTables { get; set; } = false;
348✔
95
    public string IndexTableName { get; set; }
68✔
96
    public List<String> UserDefinedIndexes { get; set; } = new();
320✔
97

98
    private IBulkCopy _bulkcopy;
99
    private int _affectedRows;
100

101
    private Stopwatch swTimeSpentWriting = new();
250✔
102
    private Stopwatch swMeasuringStrings = new();
250✔
103

104
    private DiscoveredServer _loggingDatabaseSettings;
105

106
    private DiscoveredServer _server;
107
    private DiscoveredDatabase _database;
108

109
    private DataLoadInfo _dataLoadInfo;
110

111
    private IManagedConnection _managedConnection;
112
    private ToLoggingDatabaseDataLoadEventListener _loggingDatabaseListener;
113

114
    public List<DatabaseColumnRequest> ExplicitTypes { get; set; }
1,540✔
115

116
    private bool _firstTime = true;
250✔
117
    private readonly HashSet<string> _primaryKey = new(StringComparer.CurrentCultureIgnoreCase);
250✔
118
    private DiscoveredTable _discoveredTable;
119
    private const string ExtractionTimeStamp = "extraction_timestamp";
120

121
    private readonly IExternalCohortTable _externalCohortTable;
122

123
    //All column values sent to server so far
124
    private Dictionary<string, Guesser> _dataTypeDictionary;
125

126
    /// <summary>
127
    /// Optional function called when a name is needed for the table being uploaded (this overrides
128
    /// upstream components naming of tables - e.g. from file names).
129
    /// </summary>
130
    public Func<string> TableNamerDelegate { get; set; }
250✔
131

132
    public DataTableUploadDestination()
228✔
133
    {
134
        ExplicitTypes = new List<DatabaseColumnRequest>();
228✔
135
    }
228✔
136

137
    public DataTableUploadDestination(IExternalCohortTable externalCohortTable)
22✔
138
    {
139
        ExplicitTypes = new List<DatabaseColumnRequest>();
22✔
140
        _externalCohortTable = externalCohortTable;
22✔
141
    }
22✔
142

143
    private static object[] FilterOutItemAtIndex(object[] itemArray, int[] indexes)
144
    {
145
        if (indexes.Length == 0) return itemArray;
36!
146
        return itemArray.Where((source, idx) => !indexes.Contains(idx)).ToArray();
1,020✔
147
    }
148
    private string GetPKValue(DataColumn pkColumn, DataRow row)
149
    {
150
        var pkName = pkColumn.ColumnName;
4✔
151
        var value = row[pkName];
4✔
152
        if (_externalCohortTable is not null)
4!
153
        {
154
            var privateIdentifierField = _externalCohortTable.PrivateIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
×
155
            var releaseIdentifierField = _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
×
156
            if (pkName == releaseIdentifierField)
×
157
            {
158
                //going to have to look up the previous relaseID to match
NEW
159
                var cohortTable = _externalCohortTable.DiscoverCohortTable();
×
160
                using var lookupDT = cohortTable.GetDataTable();
×
161
                var releaseIdIndex = lookupDT.Columns.IndexOf(releaseIdentifierField);
×
162
                var privateIdIndex = lookupDT.Columns.IndexOf(privateIdentifierField);
×
NEW
163
                var foundRow = lookupDT.Rows.Cast<DataRow>().LastOrDefault(r => r.ItemArray[releaseIdIndex].ToString() == value.ToString());
×
164
                if (foundRow is not null)
×
165
                {
166
                    var originalValue = foundRow.ItemArray[privateIdIndex];
×
NEW
167
                    var existingIDsforReleaseID = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[privateIdIndex].ToString() == originalValue.ToString()).Select(s => s.ItemArray[releaseIdIndex].ToString()).ToList();
×
NEW
168
                    if (existingIDsforReleaseID.Count != 0)
×
169
                    {
170
                        //we don't know what the current release ID is ( there may be ones from multiple cohorts)
NEW
171
                        var ids = existingIDsforReleaseID.Select(static id => $"'{id}'");
×
172
                        return $"{pkName} in ({string.Join(',', ids)})";
×
173
                    }
174
                }
175
            }
176
        }
177
        return $"{pkName} = '{value}'";
4✔
178
    }
×
179

180

181
    public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener listener,
182
        GracefulCancellationToken cancellationToken)
183
    {
184
        if (toProcess == null)
312!
185
            return null;
×
186

187
        var rowsToModify = new List<DataRow>();
312✔
188
        var pkColumns = toProcess.PrimaryKey;
312✔
189
        RemoveInvalidCharactersInSchema(toProcess);
312✔
190

191
        IDatabaseColumnRequestAdjuster adjuster = null;
312✔
192
        if (Adjuster != null) adjuster = (IDatabaseColumnRequestAdjuster)ObjectConstructor.Construct(Adjuster);
316✔
193

194
        //work out the table name for the table we are going to create
195
        if (TargetTableName == null)
312✔
196
        {
197
            if (TableNamerDelegate != null)
250!
198
            {
199
                TargetTableName = TableNamerDelegate();
×
200
                if (string.IsNullOrWhiteSpace(TargetTableName))
×
201
                    throw new Exception("No table name specified (TableNamerDelegate returned null)");
×
202
            }
203
            else if (string.IsNullOrWhiteSpace(toProcess.TableName))
250!
204
            {
205
                throw new Exception(
×
206
                    "Chunk did not have a TableName, did not know what to call the newly created table");
×
207
            }
208
            else
209
            {
210
                TargetTableName = QuerySyntaxHelper.MakeHeaderNameSensible(toProcess.TableName);
250✔
211
            }
212
        }
213

214
        ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(toProcess);
312✔
215

216
        StartAuditIfExists(TargetTableName);
312✔
217

218
        if (IncludeTimeStamp)
312!
219
        {
220
            AddTimeStampToExtractionData(toProcess);
×
221
        }
222

223
        if (_loggingDatabaseListener != null)
312!
224
            listener = new ForkDataLoadEventListener(listener, _loggingDatabaseListener);
×
225

226
        EnsureTableHasDataInIt(toProcess);
312✔
227

228
        CreatedTable = false;
308✔
229

230
        if (_firstTime)
308✔
231
        {
232
            var tableAlreadyExistsButEmpty = false;
246✔
233

234
            if (!_database.Exists())
246!
235
                throw new Exception($"Database {_database} does not exist");
×
236

237
            _discoveredTable = _database.ExpectTable(TargetTableName);
246✔
238

239
            //table already exists
240
            if (_discoveredTable.Exists())
246✔
241
            {
242
                tableAlreadyExistsButEmpty = true;
58✔
243

244
                if (!AllowLoadingPopulatedTables)
58✔
245
                    if (_discoveredTable.IsEmpty())
52✔
246
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
40✔
247
                            $"Found table {TargetTableName} already, normally this would forbid you from loading it (data duplication / no primary key etc) but it is empty so we are happy to load it, it will not be created"));
40✔
248
                    else if (!AppendDataIfTableExists)
12!
249
                        throw new Exception(
×
250
                            $"There is already a table called {TargetTableName} at the destination {_database}");
×
251

252
                if (AllowResizingColumnsAtUploadTime)
58✔
253
                    _dataTypeDictionary = _discoveredTable.DiscoverColumns().ToDictionary(k => k.GetRuntimeName(),
454✔
254
                        v => v.GetGuesser(), StringComparer.CurrentCultureIgnoreCase);
454✔
255
            }
256
            else
257
            {
258
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
188✔
259
                    $"Determined that the table name {TargetTableName} is unique at destination {_database}"));
188✔
260
            }
261

262
            //create connection to destination
263
            if (!tableAlreadyExistsButEmpty)
246✔
264
            {
265
                CreatedTable = true;
188✔
266

267
                if (AllowResizingColumnsAtUploadTime)
188✔
268
                    _database.CreateTable(out _dataTypeDictionary, TargetTableName, toProcess, ExplicitTypes.ToArray(),
94✔
269
                        true, adjuster);
94✔
270
                else
271
                    _database.CreateTable(TargetTableName, toProcess, ExplicitTypes.ToArray(), true, adjuster);
94✔
272

273
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
188✔
274
                    $"Created table {TargetTableName} successfully."));
188✔
275
            }
276

277
            _managedConnection = _server.BeginNewTransactedConnection();
246✔
278
            _bulkcopy = _discoveredTable.BeginBulkInsert(Culture, _managedConnection.ManagedTransaction);
246✔
279
            _firstTime = false;
246✔
280
        }
281

282
        if (IncludeTimeStamp && _discoveredTable.DiscoverColumns().All(c => c.GetRuntimeName() != ExtractionTimeStamp))
308!
283
        {
NEW
284
            _discoveredTable.AddColumn(ExtractionTimeStamp, new DatabaseTypeRequest(typeof(DateTime)), true, 30000);
×
285
        }
286

287
        if (IndexTables)
308✔
288
        {
289
            var indexes = UserDefinedIndexes.Count != 0 ? UserDefinedIndexes : pkColumns.Select(c => c.ColumnName);
28✔
290
            try
291
            {
292
                _discoveredTable.CreateIndex(IndexTableName, _discoveredTable.DiscoverColumns().Where(c => indexes.Contains(c.GetRuntimeName())).ToArray());
86✔
293
            }
14✔
294
            catch (Exception e)
14✔
295
            {
296
                //We only warn about not creating the index, as it's not  critical
297
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, e.Message));
14✔
298
            }
14✔
299
        }
300
        if (UseTrigger && pkColumns.Length > 0)
308✔
301
        {
302
            if (listener.GetType() == typeof(ForkDataLoadEventListener)) //need to add special fields to the datatable if we are logging to a database
34✔
303
            {
304
                var job = (ForkDataLoadEventListener)listener;
20✔
305
                var listeners = job.GetToLoggingDatabaseDataLoadEventListenersIfany();
20✔
306
                foreach (var dleListener in listeners)
80✔
307
                {
308
                    IDataLoadInfo dataLoadInfo = dleListener.DataLoadInfo;
20✔
309
                    DataColumn newColumn = new(SpecialFieldNames.DataLoadRunID, typeof(int))
20✔
310
                    {
20✔
311
                        DefaultValue = dataLoadInfo.ID
20✔
312
                    };
20✔
313
                    try
314
                    {
315
                        _discoveredTable.DiscoverColumn(SpecialFieldNames.DataLoadRunID);
20✔
316
                    }
20✔
317
                    catch (Exception)
×
318
                    {
319
                        _discoveredTable.AddColumn(SpecialFieldNames.DataLoadRunID, new DatabaseTypeRequest(typeof(int)), true, 30000);
×
320

321
                    }
×
322
                    if (!toProcess.Columns.Contains(SpecialFieldNames.DataLoadRunID))
20!
323
                        toProcess.Columns.Add(newColumn);
×
324
                    foreach (DataRow dr in toProcess.Rows)
96✔
325
                        dr[SpecialFieldNames.DataLoadRunID] = dataLoadInfo.ID;
28✔
326

327
                }
328
            }
329
        }
330

331
        try
332
        {
333
            if (AllowResizingColumnsAtUploadTime && !CreatedTable)
308✔
334
                ResizeColumnsIfRequired(toProcess, listener);
70✔
335

336
            swTimeSpentWriting.Start();
308✔
337
            if (AppendDataIfTableExists && pkColumns.Length > 0) //assumes columns are the same
308✔
338
            {
339
                //drop any pk clashes
340
                var existingData = _discoveredTable.GetDataTable();
40✔
341
                var rowsToDelete = new List<DataRow>();
40✔
342
                var releaseIdentifier = _externalCohortTable is not null ? _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1] : "ReleaseId";
40✔
343
                int[] toProcessIgnoreColumns = [toProcess.Columns.IndexOf(SpecialFieldNames.DataLoadRunID), toProcess.Columns.IndexOf(releaseIdentifier)];
40✔
344
                int[] existingDataIgnoreColumns = [existingData.Columns.IndexOf(SpecialFieldNames.DataLoadRunID), existingData.Columns.IndexOf(releaseIdentifier), existingData.Columns.IndexOf(SpecialFieldNames.ValidFrom)];
40✔
345
                foreach (DataRow row in toProcess.Rows)
176✔
346
                {
347

348
                    foreach (DataColumn pkCol in pkColumns)
200✔
349
                    {
350
                        bool clash = false;
54✔
351
                        if (_externalCohortTable is not null && pkCol.ColumnName == _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1])
54✔
352
                        {
353
                            // If it's a cohort release identifier
354
                            // look up the original value and check we've not already extected the same value under a different release ID
355
                            var privateIdentifierField = _externalCohortTable.PrivateIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
28✔
356
                            var releaseIdentifierField = _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
28✔
357
                            var cohortTable = _externalCohortTable.DiscoverCohortTable();
28✔
358
                            using var lookupDT = cohortTable.GetDataTable();
28✔
359
                            var releaseIdIndex = lookupDT.Columns.IndexOf(releaseIdentifierField);
28✔
360
                            var privateIdIndex = lookupDT.Columns.IndexOf(privateIdentifierField);
28✔
361
                            var foundRow = lookupDT.Rows.Cast<DataRow>().FirstOrDefault(r => r.ItemArray[releaseIdIndex].ToString() == row[pkCol.ColumnName].ToString());
72✔
362
                            if (foundRow is not null)
28✔
363
                            {
364
                                var originalValue = foundRow.ItemArray[privateIdIndex];
28✔
365
                                var existingIDsforReleaseID = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[privateIdIndex].ToString() == originalValue.ToString()).Select(s => s.ItemArray[releaseIdIndex].ToString());
88✔
366
                                clash = existingData.AsEnumerable().Any(r => existingIDsforReleaseID.Contains(r[pkCol.ColumnName].ToString()));
46✔
367
                            }
368
                        }
369
                        else
370
                        {
371
                            var val = row[pkCol.ColumnName];
26✔
372
                            clash = existingData.AsEnumerable().Any(r => r[pkCol.ColumnName].ToString() == val.ToString());
40✔
373

374
                        }
375
                        if (clash && UseTrigger)
54✔
376
                        {
377
                            if (existingData.AsEnumerable().Any(r => FilterOutItemAtIndex(r.ItemArray, existingDataIgnoreColumns).ToList().SequenceEqual(FilterOutItemAtIndex(row.ItemArray, toProcessIgnoreColumns).ToList()))) //do we have to worry about special field? what if the load ids are different?
36✔
378
                            {
379
                                //the row is the exact same,so there is no clash
380
                                clash = false;
14✔
381
                                rowsToDelete.Add(row);
14✔
382
                            }
383
                            else //row needs updated, but only if we're tracking history
384
                            {
385
                                rowsToModify.Add(row);//need to know what releaseId to replace
4✔
386
                                break;
4✔
387
                            }
388
                        }
389
                    }
390
                }
391
                foreach (DataRow row in rowsToDelete.Distinct())
104✔
392
                    toProcess.Rows.Remove(row);
12✔
393

394
            }
395

396

397
            foreach (DataRow row in rowsToModify.Distinct())
624✔
398
            {
399
                //replace existing 
400
                var args = new DatabaseOperationArgs();
4✔
401
                List<String> columns = [];
4✔
402
                foreach (DataColumn column in toProcess.Columns)
24✔
403
                {
404
                    //if (!pkColumns.Contains(column))
405
                    //{
406
                    columns.Add(column.ColumnName);
8✔
407
                    //}
408
                }
409
                //need to check for removed column and null them out
410
                var existingColumns = _discoveredTable.DiscoverColumns().Select(c => c.GetRuntimeName());
20✔
411
                var columnsThatPreviouslyExisted = existingColumns.Where(c => !pkColumns.Select(pk => pk.ColumnName).Contains(c) && !columns.Contains(c) && c != SpecialFieldNames.DataLoadRunID && c != SpecialFieldNames.ValidFrom);
36✔
412
                var nullEntries = string.Join(" ,", columnsThatPreviouslyExisted.Select(c => $"{c} = NULL"));
4✔
413
                var nullText = nullEntries.Length > 0 ? $" , {nullEntries}" : "";
4!
414
                var columnString = string.Join(" , ", columns.Select(col => $"{col} = '{row[col]}'").ToList());
12✔
415
                var pkMatch = string.Join(" AND ", pkColumns.Select(pk => GetPKValue(pk, row)).ToList());
8✔
416
                var sql = $"update {_discoveredTable.GetFullyQualifiedName()} set {columnString} {nullText} where {pkMatch}";
4✔
417
                var cmd = _discoveredTable.GetCommand(sql, args.GetManagedConnection(_discoveredTable).Connection);
4✔
418
                cmd.ExecuteNonQuery();
4✔
419
            }
420

421
            foreach (DataRow row in rowsToModify.Distinct())
624✔
422
            {
423
                toProcess.Rows.Remove(row);
4✔
424
            }
425
            if (toProcess.Rows.Count == 0 && !rowsToModify.Any()) return null;
312✔
426
            if (toProcess.Rows.Count > 0)
304✔
427
            {
428
                _affectedRows += _bulkcopy.Upload(toProcess);
300✔
429
            }
430

431
            swTimeSpentWriting.Stop();
262✔
432
            listener.OnProgress(this,
262✔
433
                new ProgressEventArgs($"Uploading to {TargetTableName}",
262✔
434
                    new ProgressMeasurement(_affectedRows, ProgressType.Records), swTimeSpentWriting.Elapsed));
262✔
435
        }
262✔
436
        catch (Exception e)
42✔
437
        {
438
            _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
42✔
439

440
            if (LoggingServer != null)
42!
441
                _dataLoadInfo.LogFatalError(GetType().Name, ExceptionHelper.ExceptionToListOfInnerMessages(e, true));
×
442

443
            throw new Exception($"Failed to write rows (in transaction) to table {TargetTableName}", e);
42✔
444
        }
445

446

447
        _dataLoadInfo?.CloseAndMarkComplete();
262!
448
        return null;
262✔
449
    }
4✔
450

451

452
    private static void RemoveInvalidCharactersInSchema(DataTable toProcess)
453
    {
454
        var invalidSymbols = new[] { '.' };
312✔
455

456
        if (!string.IsNullOrWhiteSpace(toProcess.TableName) && invalidSymbols.Any(c => toProcess.TableName.Contains(c)))
618!
457
            foreach (var symbol in invalidSymbols)
×
458
                toProcess.TableName = toProcess.TableName.Replace(symbol.ToString(), "");
×
459

460
        foreach (DataColumn col in toProcess.Columns)
4,076✔
461
            if (!string.IsNullOrWhiteSpace(col.ColumnName) && invalidSymbols.Any(c => col.ColumnName.Contains(c)))
3,452✔
462
                foreach (var symbol in invalidSymbols)
16✔
463
                    col.ColumnName = col.ColumnName.Replace(symbol.ToString(), "");
4✔
464
    }
312✔
465

466

467
    /// <summary>
468
    /// Clears the primary key status of the DataTable / <see cref="ExplicitTypes"/>.  These are recorded in <see cref="_primaryKey"/> and applied at Dispose time
469
    /// in order that primary key in the destination database table does not interfere with ALTER statements (see <see cref="ResizeColumnsIfRequired"/>)
470
    /// </summary>
471
    /// <param name="toProcess"></param>
472
    private void ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(DataTable toProcess)
473
    {
474
        //handle primary keyness by removing it until Dispose step
475
        foreach (var pkCol in toProcess.PrimaryKey.Select(dc => dc.ColumnName))
822✔
476
            _primaryKey.Add(pkCol);
66✔
477

478
        toProcess.PrimaryKey = Array.Empty<DataColumn>();
312✔
479

480
        //also get rid of any ExplicitTypes primary keys
481
        foreach (var dcr in ExplicitTypes.Where(dcr => dcr.IsPrimaryKey))
1,466✔
482
        {
483
            dcr.IsPrimaryKey = false;
24✔
484
            _primaryKey.Add(dcr.ColumnName);
24✔
485
        }
486
    }
312✔
487

488

489
    private void AddTimeStampToExtractionData(DataTable toProcess)
490
    {
491
        var timeStamp = DateTime.Now;
×
NEW
492
        toProcess.Columns.Add(ExtractionTimeStamp);
×
493
        foreach (DataRow row in toProcess.Rows)
×
494
        {
NEW
495
            row[ExtractionTimeStamp] = timeStamp;
×
496
        }
497
    }
×
498

499
    private static void EnsureTableHasDataInIt(DataTable toProcess)
500
    {
501
        if (toProcess.Columns.Count == 0)
312✔
502
            throw new Exception($"DataTable '{toProcess}' had no Columns!");
2✔
503

504
        if (toProcess.Rows.Count == 0)
310✔
505
            throw new Exception($"DataTable '{toProcess}' had no Rows!");
2✔
506
    }
308✔
507

508
    private void ResizeColumnsIfRequired(DataTable toProcess, IDataLoadEventListener listener)
509
    {
510
        swMeasuringStrings.Start();
70✔
511

512
        var tbl = _database.ExpectTable(TargetTableName);
70✔
513
        var typeTranslater = tbl.GetQuerySyntaxHelper().TypeTranslater;
70✔
514

515
        //Get the current estimates from the datatype computer
516
        var oldTypes = _dataTypeDictionary.ToDictionary(k => k.Key,
582✔
517
            v => typeTranslater.GetSQLDBTypeForCSharpType(v.Value.Guess), StringComparer.CurrentCultureIgnoreCase);
582✔
518

519
        //columns in
520
        var sharedColumns = new List<string>();
70✔
521

522
        //for each destination column
523
        foreach (var col in _dataTypeDictionary.Keys)
1,164✔
524
            //if it appears in the toProcess DataTable
525
            if (toProcess.Columns.Contains(col))
512✔
526
                sharedColumns.Add(col); //it is a shared column
498✔
527

528
        //for each shared column adjust the corresponding computer for all rows
529
        Parallel.ForEach(sharedColumns, col =>
70✔
530
        {
70✔
531
            var guesser = _dataTypeDictionary[col];
498✔
532
            foreach (DataRow row in toProcess.Rows)
2,680✔
533
                guesser.AdjustToCompensateForValue(row[col]);
842✔
534
        });
568✔
535

536
        //see if any have changed
537
        foreach (DataColumn column in toProcess.Columns)
1,136✔
538
        {
539
            if (column.ColumnName == ExtractionTimeStamp && IncludeTimeStamp)
498!
540
            {
541
                continue; //skip internally generated columns
542
            }
543
            //get what is required for the current batch and the current type that is configured in the live table
544
            oldTypes.TryGetValue(column.ColumnName, out var oldSqlType);
498✔
545
            _dataTypeDictionary.TryGetValue(column.ColumnName, out var knownType);
498✔
546

547
            var newSqlType = knownType is not null ? typeTranslater.GetSQLDBTypeForCSharpType(knownType.Guess) : null;
498!
548

549
            var changesMade = false;
498✔
550

551
            //if the SQL data type has degraded e.g. varchar(10) to varchar(50) or datetime to varchar(20)
552
            if (oldSqlType != newSqlType)
498✔
553
            {
554
                var col = tbl.DiscoverColumn(column.ColumnName, _managedConnection.ManagedTransaction);
64✔
555

556

557
                if (AbandonAlter(col.DataType.SQLType, newSqlType, out var reason))
64✔
558
                {
559
                    listener.OnNotify(this,
2✔
560
                        new NotifyEventArgs(ProgressEventType.Warning,
2✔
561
                            $"Considered resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}' but decided not to because:{reason}"));
2✔
562
                    continue;
2✔
563
                }
564

565
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
62✔
566
                    $"Resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}'"));
62✔
567

568
                //try changing the Type to the legit type
569
                col.DataType.AlterTypeTo(newSqlType, _managedConnection.ManagedTransaction, AlterTimeout);
62✔
570

571
                changesMade = true;
62✔
572
            }
573

574
            if (changesMade)
496✔
575
                _bulkcopy.InvalidateTableSchema();
62✔
576
        }
577

578
        swMeasuringStrings.Stop();
70✔
579
        listener.OnProgress(this,
70✔
580
            new ProgressEventArgs("Measuring DataType Sizes",
70✔
581
                new ProgressMeasurement(_affectedRows + toProcess.Rows.Count, ProgressType.Records),
70✔
582
                swMeasuringStrings.Elapsed));
70✔
583
    }
70✔
584

585
    /// <summary>
586
    /// Returns true if we should not be trying to do this alter after all
587
    /// </summary>
588
    /// <param name="oldSqlType">The database proprietary type you are considering altering from</param>
589
    /// <param name="newSqlType">The ANSI SQL type you are considering altering to</param>
590
    /// <param name="reason">Null or the reason we are returning true</param>
591
    /// <returns>True if the proposed alter is a bad idea and shouldn't be attempted</returns>
592
    protected virtual bool AbandonAlter(string oldSqlType, string newSqlType, out string reason)
593
    {
594
        var basicallyDecimalAlready = new List<string> { "real", "double", "float", "single" };
64✔
595

596
        var first = basicallyDecimalAlready.FirstOrDefault(c =>
64✔
597
            oldSqlType.Contains(c, StringComparison.InvariantCultureIgnoreCase));
314✔
598

599
        if (first != null && newSqlType.Contains("decimal", StringComparison.InvariantCultureIgnoreCase))
64✔
600
        {
601
            reason = $"Resizing from {first} to decimal is a bad idea and likely to fail";
2✔
602
            return true;
2✔
603
        }
604

605
        reason = null;
62✔
606
        return false;
62✔
607
    }
608

609
    public void Abort(IDataLoadEventListener listener)
610
    {
611
        _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
×
612
    }
×
613

614
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
615
    {
616
        try
617
        {
618
            if (_managedConnection != null)
258✔
619
            {
620
                //if there was an error
621
                if (pipelineFailureExceptionIfAny != null)
254✔
622
                {
623
                    _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
38✔
624

625
                    listener.OnNotify(this,
38✔
626
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction rolled back successfully"));
38✔
627

628
                    _bulkcopy?.Dispose();
38!
629
                }
630
                else
631
                {
632
                    _managedConnection.ManagedTransaction.CommitAndCloseConnection();
216✔
633

634
                    _bulkcopy?.Dispose();
216!
635

636
                    listener.OnNotify(this,
216✔
637
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction committed successfully"));
216✔
638
                }
639
            }
640
        }
258✔
641
        catch (Exception e)
×
642
        {
643
            listener.OnNotify(this,
×
644
                new NotifyEventArgs(ProgressEventType.Error,
×
645
                    "Commit failed on transaction (probably there was a previous error?)", e));
×
646
        }
×
647

648
        //if we have a primary key to create
649
        if (pipelineFailureExceptionIfAny == null && _primaryKey?.Any() == true && _discoveredTable?.Exists() == true)
258!
650
        {
651
            //Find the columns in the destination
652
            var allColumns = _discoveredTable.DiscoverColumns();
58✔
653

654
            //if there are not yet any primary keys
655
            if (allColumns.All(c => !c.IsPrimaryKey))
506✔
656
            {
657
                //find the columns the user decorated in his DataTable
658
                var pkColumnsToCreate = allColumns.Where(c =>
38✔
659
                        _primaryKey.Any(pk => pk.Equals(c.GetRuntimeName(), StringComparison.CurrentCultureIgnoreCase)))
932✔
660
                    .ToArray();
38✔
661

662
                //make sure we found all of them
663
                if (pkColumnsToCreate.Length != _primaryKey.Count)
38!
664
                    throw new Exception(
×
665
                        $"Could not find primary key column(s) {string.Join(",", _primaryKey)} in table {_discoveredTable}");
×
666

667
                //create the primary key to match user provided columns
668
                _discoveredTable.CreatePrimaryKey(AlterTimeout, pkColumnsToCreate);
38✔
669
            }
670
        }
671

672
        if (UseTrigger && _discoveredTable?.DiscoverColumns().Any(static col => col.IsPrimaryKey) == true) //can't use triggers without a PK
294!
673
        {
674
            var factory = new TriggerImplementerFactory(_database.Server.DatabaseType);
34✔
675
            var triggerImplementer = factory.Create(_discoveredTable);
34✔
676
            var currentStatus = triggerImplementer.GetTriggerStatus();
34✔
677
            if (currentStatus == TriggerStatus.Missing)
34✔
678
                try
679
                {
680
                    triggerImplementer.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
18✔
681
                }
18✔
682
                catch (Exception e)
×
683
                {
684
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, e.Message));
×
685
                }
×
686
        }
687

688
        EndAuditIfExists();
258✔
689
    }
258✔
690

691
    private void EndAuditIfExists()
692
    {
693
        //user is auditing
694
        _loggingDatabaseListener?.FinalizeTableLoadInfos();
258!
695
    }
×
696

697
    public void Check(ICheckNotifier notifier)
698
    {
699
        if (LoggingServer != null)
×
700
            new LoggingDatabaseChecker(LoggingServer).Check(notifier);
×
701
        else
702
            notifier.OnCheckPerformed(
×
703
                new CheckEventArgs(
×
704
                    "There is no logging server so there will be no audit of this destinations activities",
×
705
                    CheckResult.Success));
×
706
    }
×
707

708
    private void StartAuditIfExists(string tableName)
709
    {
710
        if (LoggingServer != null && _dataLoadInfo == null)
312!
711
        {
712
            _loggingDatabaseSettings = DataAccessPortal.ExpectServer(LoggingServer, DataAccessContext.Logging);
×
713
            var logManager = new LogManager(_loggingDatabaseSettings);
×
714
            logManager.CreateNewLoggingTaskIfNotExists("Internal");
×
715

716
            _dataLoadInfo = (DataLoadInfo)logManager.CreateDataLoadInfo("Internal", GetType().Name,
×
717
                $"Loading table {tableName}", "", false);
×
718
            _loggingDatabaseListener = new ToLoggingDatabaseDataLoadEventListener(logManager, _dataLoadInfo);
×
719
        }
720
    }
312✔
721

722
    public void PreInitialize(DiscoveredDatabase value, IDataLoadEventListener listener)
723
    {
724
        _database = value;
266✔
725
        _server = value.Server;
266✔
726
    }
266✔
727

728
    /// <summary>
729
    /// Declare that the column of name columnName (which might or might not appear in DataTables being uploaded) should always have the associated database type (e.g. varchar(59))
730
    /// The columnName is Case insensitive.  Note that if AllowResizingColumnsAtUploadTime is true then these datatypes are only the starting types and might get changed later to
731
    /// accommodate new data.
732
    /// </summary>
733
    /// <param name="columnName"></param>
734
    /// <param name="explicitType"></param>
735
    /// <param name="columnFlags"></param>
736
    /// <returns>The Column Request that has been added to the array</returns>
737
    public DatabaseColumnRequest AddExplicitWriteType(string columnName, string explicitType,
738
        ISupplementalColumnInformation columnFlags = null)
739
    {
740
        DatabaseColumnRequest columnRequest;
741

742
        if (columnFlags == null)
790!
743
        {
744
            columnRequest = new DatabaseColumnRequest(columnName, explicitType, true);
790✔
745
            ExplicitTypes.Add(columnRequest);
790✔
746
            return columnRequest;
790✔
747
        }
748

749
        columnRequest = new DatabaseColumnRequest(columnName, explicitType,
×
750
            !columnFlags.IsPrimaryKey && !columnFlags.IsAutoIncrement)
×
751
        {
×
752
            IsPrimaryKey = columnFlags.IsPrimaryKey,
×
753
            IsAutoIncrement = columnFlags.IsAutoIncrement,
×
754
            Collation = columnFlags.Collation
×
755
        };
×
756

757
        ExplicitTypes.Add(columnRequest);
×
758
        return columnRequest;
×
759
    }
760
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc