• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 9283297040

29 May 2024 09:04AM UTC coverage: 56.516% (-0.4%) from 56.871%
9283297040

push

github

JFriel
Merge branch 'release/8.2.0' of https://github.com/HicServices/RDMP into bugfix/RDMP-153-improve-plugin-ui

10783 of 20572 branches covered (52.42%)

Branch coverage included in aggregate %.

116 of 435 new or added lines in 16 files covered. (26.67%)

373 existing lines in 24 files now uncovered.

30701 of 52830 relevant lines covered (58.11%)

7669.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.85
/Rdmp.Core/DataLoad/Engine/Pipeline/Destinations/DataTableUploadDestination.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Diagnostics;
11
using System.Globalization;
12
using System.Linq;
13
using System.Threading.Tasks;
14
using FAnsi.Connections;
15
using FAnsi.Discovery;
16
using FAnsi.Discovery.TableCreation;
17
using Org.BouncyCastle.Security.Certificates;
18
using Rdmp.Core.Curation.Data;
19
using Rdmp.Core.DataExport.Data;
20
using Rdmp.Core.DataFlowPipeline;
21
using Rdmp.Core.DataFlowPipeline.Requirements;
22
using Rdmp.Core.Logging;
23
using Rdmp.Core.Logging.Listeners;
24
using Rdmp.Core.Repositories.Construction;
25
using Rdmp.Core.ReusableLibraryCode;
26
using Rdmp.Core.ReusableLibraryCode.Checks;
27
using Rdmp.Core.ReusableLibraryCode.DataAccess;
28
using Rdmp.Core.ReusableLibraryCode.Progress;
29
using TypeGuesser;
30

31
namespace Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
32

33
/// <summary>
34
/// Pipeline component (destination) which commits the DataTable(s) (in batches) to the DiscoveredDatabase (PreInitialize argument).  Supports cross platform
35
/// targets (MySql , Sql Server etc).  Normally the SQL Data Types and column names will be computed from the DataTable and a table will be created with the
36
/// name of the DataTable being processed.  If a matching table already exists you can choose to load it anyway in which case a basic bulk insert will take
37
/// place.
38
/// </summary>
39
public class DataTableUploadDestination : IPluginDataFlowComponent<DataTable>, IDataFlowDestination<DataTable>,
40
    IPipelineRequirement<DiscoveredDatabase>
41
{
42
    public const string LoggingServer_Description =
43
        "The logging server to log the upload to (leave blank to not bother auditing)";
44

45
    public const string AllowResizingColumnsAtUploadTime_Description =
46
        "If the target table being loaded has columns that are too small the destination will attempt to resize them";
47

48
    public const string AllowLoadingPopulatedTables_Description =
49
        "Normally when DataTableUploadDestination encounters a table that already contains records it will abandon the insertion attempt.  Set this to true to instead continue with the load.";
50

51
    public const string AlterTimeout_Description =
52
        "Timeout to perform all ALTER TABLE operations (column resize and PK creation)";
53

54
    [DemandsInitialization(LoggingServer_Description)]
55
    public ExternalDatabaseServer LoggingServer { get; set; }
318✔
56

57
    [DemandsInitialization(AllowResizingColumnsAtUploadTime_Description, DefaultValue = true)]
58
    public bool AllowResizingColumnsAtUploadTime { get; set; }
584✔
59

60
    [DemandsInitialization(AllowLoadingPopulatedTables_Description, DefaultValue = false)]
61
    public bool AllowLoadingPopulatedTables { get; set; }
70✔
62

63
    [DemandsInitialization(AlterTimeout_Description, DefaultValue = 300)]
64
    public int AlterTimeout { get; set; }
116✔
65

66
    [DemandsInitialization("Optional - Change system behaviour when a new table is being created by the component",
67
        TypeOf = typeof(IDatabaseColumnRequestAdjuster))]
68
    public Type Adjuster { get; set; }
288✔
69

70
    public bool AppendDataIfTableExists { get; set; }
292✔
71

72
    public bool IncludeTimeStamp { get; set; }
554✔
73

74
    private CultureInfo _culture;
75

76
    [DemandsInitialization("The culture to use for uploading (determines date format etc)")]
77
    public CultureInfo Culture
78
    {
79
        get => _culture ?? CultureInfo.CurrentCulture;
214✔
80
        set => _culture = value;
10✔
81
    }
82

83
    public string TargetTableName { get; private set; }
1,892✔
84

85
    /// <summary>
86
    /// True if a new table was created or re-created by the execution of this destination.  False if
87
    /// the table already existed e.g. data was simply added
88
    /// </summary>
89
    public bool CreatedTable { get; private set; }
580✔
90

91
    private IBulkCopy _bulkcopy;
92
    private int _affectedRows;
93

94
    private Stopwatch swTimeSpentWriting = new();
218✔
95
    private Stopwatch swMeasuringStrings = new();
218✔
96

97
    private DiscoveredServer _loggingDatabaseSettings;
98

99
    private DiscoveredServer _server;
100
    private DiscoveredDatabase _database;
101

102
    private DataLoadInfo _dataLoadInfo;
103

104
    private IManagedConnection _managedConnection;
105
    private ToLoggingDatabaseDataLoadEventListener _loggingDatabaseListener;
106

107
    public List<DatabaseColumnRequest> ExplicitTypes { get; set; }
1,282✔
108

109
    private bool _firstTime = true;
218✔
110
    private HashSet<string> _primaryKey = new(StringComparer.CurrentCultureIgnoreCase);
218✔
111
    private DiscoveredTable _discoveredTable;
112
    private readonly string _extractionTimeStamp = "extraction_timestamp";
218✔
113

114
    private readonly IExternalCohortTable _externalCohortTable;
115

116
    //All column values sent to server so far
117
    private Dictionary<string, Guesser> _dataTypeDictionary;
118

119
    /// <summary>
120
    /// Optional function called when a name is needed for the table being uploaded (this overrides
121
    /// upstream components naming of tables - e.g. from file names).
122
    /// </summary>
123
    public Func<string> TableNamerDelegate { get; set; }
218✔
124

125
    public DataTableUploadDestination()
200✔
126
    {
127
        ExplicitTypes = new List<DatabaseColumnRequest>();
200✔
128
    }
200✔
129

130
    public DataTableUploadDestination(IExternalCohortTable externalCohortTable)
18✔
131
    {
132
        ExplicitTypes = new List<DatabaseColumnRequest>();
18✔
133
        _externalCohortTable = externalCohortTable;
18✔
134
    }
18✔
135

136
    public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener listener,
137
        GracefulCancellationToken cancellationToken)
138
    {
139
        if (toProcess == null)
270!
140
            return null;
×
141
        var pkColumns = toProcess.PrimaryKey;
270✔
142
        RemoveInvalidCharactersInSchema(toProcess);
270✔
143

144
        IDatabaseColumnRequestAdjuster adjuster = null;
270✔
145
        if (Adjuster != null) adjuster = (IDatabaseColumnRequestAdjuster)ObjectConstructor.Construct(Adjuster);
274✔
146

147
        //work out the table name for the table we are going to create
148
        if (TargetTableName == null)
270✔
149
        {
150
            if (TableNamerDelegate != null)
218!
151
            {
152
                TargetTableName = TableNamerDelegate();
×
153
                if (string.IsNullOrWhiteSpace(TargetTableName))
×
154
                    throw new Exception("No table name specified (TableNamerDelegate returned null)");
×
155
            }
156
            else if (string.IsNullOrWhiteSpace(toProcess.TableName))
218!
157
            {
158
                throw new Exception(
×
159
                    "Chunk did not have a TableName, did not know what to call the newly created table");
×
160
            }
161
            else
162
            {
163
                TargetTableName = QuerySyntaxHelper.MakeHeaderNameSensible(toProcess.TableName);
218✔
164
            }
165
        }
166

167
        ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(toProcess);
270✔
168

169
        StartAuditIfExists(TargetTableName);
270✔
170

171
        if (IncludeTimeStamp)
270!
172
        {
NEW
173
            AddTimeStampToExtractionData(toProcess);
×
174
        }
175

176
        if (_loggingDatabaseListener != null)
270!
177
            listener = new ForkDataLoadEventListener(listener, _loggingDatabaseListener);
×
178

179
        EnsureTableHasDataInIt(toProcess);
270✔
180

181
        CreatedTable = false;
266✔
182

183
        if (_firstTime)
266✔
184
        {
185
            var tableAlreadyExistsButEmpty = false;
214✔
186

187
            if (!_database.Exists())
214!
188
                throw new Exception($"Database {_database} does not exist");
×
189

190
            _discoveredTable = _database.ExpectTable(TargetTableName);
214✔
191

192
            //table already exists
193
            if (_discoveredTable.Exists())
214✔
194
            {
195
                tableAlreadyExistsButEmpty = true;
54✔
196

197
                if (!AllowLoadingPopulatedTables)
54✔
198
                    if (_discoveredTable.IsEmpty())
48✔
199
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
40✔
200
                            $"Found table {TargetTableName} already, normally this would forbid you from loading it (data duplication / no primary key etc) but it is empty so we are happy to load it, it will not be created"));
40✔
201
                    else if (!AppendDataIfTableExists)
8!
202
                        throw new Exception(
×
203
                            $"There is already a table called {TargetTableName} at the destination {_database}");
×
204

205
                if (AllowResizingColumnsAtUploadTime)
54✔
206
                    _dataTypeDictionary = _discoveredTable.DiscoverColumns().ToDictionary(k => k.GetRuntimeName(),
364✔
207
                        v => v.GetGuesser(), StringComparer.CurrentCultureIgnoreCase);
364✔
208
            }
209
            else
210
            {
211
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
160✔
212
                    $"Determined that the table name {TargetTableName} is unique at destination {_database}"));
160✔
213
            }
214

215
            //create connection to destination
216
            if (!tableAlreadyExistsButEmpty)
214✔
217
            {
218
                CreatedTable = true;
160✔
219

220
                if (AllowResizingColumnsAtUploadTime)
160✔
221
                    _database.CreateTable(out _dataTypeDictionary, TargetTableName, toProcess, ExplicitTypes.ToArray(),
86✔
222
                        true, adjuster);
86✔
223
                else
224
                    _database.CreateTable(TargetTableName, toProcess, ExplicitTypes.ToArray(), true, adjuster);
74✔
225

226
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
160✔
227
                    $"Created table {TargetTableName} successfully."));
160✔
228
            }
229

230
            _managedConnection = _server.BeginNewTransactedConnection();
214✔
231
            _bulkcopy = _discoveredTable.BeginBulkInsert(Culture, _managedConnection.ManagedTransaction);
214✔
232

233
            _firstTime = false;
214✔
234
        }
235

236

237
        if (IncludeTimeStamp && !_discoveredTable.DiscoverColumns().Where(c => c.GetRuntimeName() == _extractionTimeStamp).Any())
266!
238
        {
NEW
239
            _discoveredTable.AddColumn(_extractionTimeStamp, new DatabaseTypeRequest(typeof(DateTime)), true, 30000);
×
240
        }
241

242
        try
243
        {
244
            if (AllowResizingColumnsAtUploadTime && !CreatedTable)
266✔
245
                ResizeColumnsIfRequired(toProcess, listener);
68✔
246

247
            swTimeSpentWriting.Start();
266✔
248
            if (AppendDataIfTableExists && pkColumns.Length > 0) //assumes columns are the same
266✔
249
            {
250
                //drop any pk clashes
251
                var existingData = _discoveredTable.GetDataTable();
16✔
252
                var rowsToDelete = new List<DataRow>();
16✔
253

254
                foreach (DataRow row in toProcess.Rows)
76✔
255
                {
256

257
                    foreach (DataColumn pkCol in pkColumns)
88✔
258
                    {
259
                        bool clash = false;
26✔
260
                        if (_externalCohortTable is not null && pkCol.ColumnName == _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1])
26✔
261
                        {
262
                            // If it's a cohort release identifier
263
                            // look up the original value and check we've not already extected the same value under a different release ID
264
                            var privateIdentifierField = _externalCohortTable.PrivateIdentifierField.Split('.').Last()[1..^1];
22✔
265
                            var releaseIdentifierField = _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1];
22✔
266
                            DiscoveredTable cohortTable = _externalCohortTable.DiscoverCohortTable();
22✔
267
                            var lookupDT = cohortTable.GetDataTable();
22✔
268
                            var releaseIdIndex = lookupDT.Columns.IndexOf(releaseIdentifierField);
22✔
269
                            var privateIdIndex = lookupDT.Columns.IndexOf(privateIdentifierField);
22✔
270
                            var foundRow = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[releaseIdIndex].ToString() == row[pkCol.ColumnName].ToString()).FirstOrDefault();
56✔
271
                            if (foundRow is not null)
22✔
272
                            {
273
                                var originalValue = foundRow.ItemArray[privateIdIndex];
22✔
274
                                var existingIDsforReleaseID = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[privateIdIndex].ToString() == originalValue.ToString()).Select(s => s.ItemArray[releaseIdIndex].ToString());
68✔
275
                                clash = existingData.AsEnumerable().Any(r => existingIDsforReleaseID.Contains(r[pkCol.ColumnName].ToString()));
36✔
276
                            }
277
                        }
278
                        else
279
                        {
280
                            var val = row[pkCol.ColumnName];
4✔
281
                            clash = existingData.AsEnumerable().Any(r => r[pkCol.ColumnName].ToString() == val.ToString());
6✔
282
                        }
283
                        if (clash)
26✔
284
                        {
285
                            rowsToDelete.Add(row);
8✔
286
                            break;
8✔
287
                        }
288
                    }
289
                }
290
                foreach (DataRow row in rowsToDelete)
48✔
291
                {
292
                    toProcess.Rows.Remove(row);
8✔
293
                }
294
                if (toProcess.Rows.Count == 0) return null;
18✔
295
            }
296

297
            _affectedRows += _bulkcopy.Upload(toProcess);
264✔
298

299
            swTimeSpentWriting.Stop();
226✔
300
            listener.OnProgress(this,
226✔
301
                new ProgressEventArgs($"Uploading to {TargetTableName}",
226✔
302
                    new ProgressMeasurement(_affectedRows, ProgressType.Records), swTimeSpentWriting.Elapsed));
226✔
303
        }
226✔
304
        catch (Exception e)
38✔
305
        {
306
            _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
38✔
307

308
            if (LoggingServer != null)
38!
309
                _dataLoadInfo.LogFatalError(GetType().Name, ExceptionHelper.ExceptionToListOfInnerMessages(e, true));
×
310

311
            throw new Exception($"Failed to write rows (in transaction) to table {TargetTableName}", e);
38✔
312
        }
313

314

315
        _dataLoadInfo?.CloseAndMarkComplete();
226!
316
        return null;
226✔
317
    }
2✔
318

319
    private static void RemoveInvalidCharactersInSchema(DataTable toProcess)
320
    {
321
        var invalidSymbols = new[] { '.' };
270✔
322

323
        if (!string.IsNullOrWhiteSpace(toProcess.TableName) && invalidSymbols.Any(c => toProcess.TableName.Contains(c)))
534!
324
            foreach (var symbol in invalidSymbols)
×
325
                toProcess.TableName = toProcess.TableName.Replace(symbol.ToString(), "");
×
326

327
        foreach (DataColumn col in toProcess.Columns)
3,404✔
328
            if (!string.IsNullOrWhiteSpace(col.ColumnName) && invalidSymbols.Any(c => col.ColumnName.Contains(c)))
2,864✔
329
                foreach (var symbol in invalidSymbols)
16✔
330
                    col.ColumnName = col.ColumnName.Replace(symbol.ToString(), "");
4✔
331
    }
270✔
332

333

334
    /// <summary>
335
    /// Clears the primary key status of the DataTable / <see cref="ExplicitTypes"/>.  These are recorded in <see cref="_primaryKey"/> and applied at Dispose time
336
    /// in order that primary key in the destination database table does not interfere with ALTER statements (see <see cref="ResizeColumnsIfRequired"/>)
337
    /// </summary>
338
    /// <param name="toProcess"></param>
339
    private void ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(DataTable toProcess)
340
    {
341
        //handle primary keyness by removing it until Dispose step
342
        foreach (var pkCol in toProcess.PrimaryKey.Select(dc => dc.ColumnName))
666✔
343
            _primaryKey.Add(pkCol);
42✔
344

345
        toProcess.PrimaryKey = Array.Empty<DataColumn>();
270✔
346

347
        //also get rid of any ExplicitTypes primary keys
348
        foreach (var dcr in ExplicitTypes.Where(dcr => dcr.IsPrimaryKey))
1,222✔
349
        {
350
            dcr.IsPrimaryKey = false;
22✔
351
            _primaryKey.Add(dcr.ColumnName);
22✔
352
        }
353
    }
270✔
354

355

356
    private void AddTimeStampToExtractionData(DataTable toProcess)
357
    {
NEW
358
        var timeStamp = DateTime.Now;
×
NEW
359
        toProcess.Columns.Add(_extractionTimeStamp);
×
NEW
360
        foreach (DataRow row in toProcess.Rows)
×
361
        {
NEW
362
            row[_extractionTimeStamp] = timeStamp;
×
363
        }
NEW
364
    }
×
365

366
    private static void EnsureTableHasDataInIt(DataTable toProcess)
367
    {
368
        if (toProcess.Columns.Count == 0)
270✔
369
            throw new Exception($"DataTable '{toProcess}' had no Columns!");
2✔
370

371
        if (toProcess.Rows.Count == 0)
268✔
372
            throw new Exception($"DataTable '{toProcess}' had no Rows!");
2✔
373
    }
266✔
374

375
    private void ResizeColumnsIfRequired(DataTable toProcess, IDataLoadEventListener listener)
376
    {
377
        swMeasuringStrings.Start();
68✔
378

379
        var tbl = _database.ExpectTable(TargetTableName);
68✔
380
        var typeTranslater = tbl.GetQuerySyntaxHelper().TypeTranslater;
68✔
381

382
        //Get the current estimates from the datatype computer
383
        var oldTypes = _dataTypeDictionary.ToDictionary(k => k.Key,
492✔
384
            v => typeTranslater.GetSQLDBTypeForCSharpType(v.Value.Guess), StringComparer.CurrentCultureIgnoreCase);
492✔
385

386
        //columns in
387
        var sharedColumns = new List<string>();
68✔
388

389
        //for each destination column
390
        foreach (var col in _dataTypeDictionary.Keys)
984✔
391
            //if it appears in the toProcess DataTable
392
            if (toProcess.Columns.Contains(col))
424✔
393
                sharedColumns.Add(col); //it is a shared column
420✔
394

395
        //for each shared column adjust the corresponding computer for all rows
396
        Parallel.ForEach(sharedColumns, col =>
68✔
397
        {
68✔
398
            var guesser = _dataTypeDictionary[col];
420✔
399
            foreach (DataRow row in toProcess.Rows)
2,212✔
400
                guesser.AdjustToCompensateForValue(row[col]);
686✔
401
        });
488✔
402

403
        //see if any have changed
404
        foreach (DataColumn column in toProcess.Columns)
976✔
405
        {
406
            if (column.ColumnName == _extractionTimeStamp && IncludeTimeStamp)
420!
407
            {
408
                continue; //skip internally generated columns
409
            }
410
            //get what is required for the current batch and the current type that is configured in the live table
411
            var oldSqlType = oldTypes[column.ColumnName];
420✔
412
            var newSqlType = typeTranslater.GetSQLDBTypeForCSharpType(_dataTypeDictionary[column.ColumnName].Guess);
420✔
413

414
            var changesMade = false;
420✔
415

416
            //if the SQL data type has degraded e.g. varchar(10) to varchar(50) or datetime to varchar(20)
417
            if (oldSqlType != newSqlType)
420✔
418
            {
419
                var col = tbl.DiscoverColumn(column.ColumnName, _managedConnection.ManagedTransaction);
64✔
420

421

422
                if (AbandonAlter(col.DataType.SQLType, newSqlType, out var reason))
64✔
423
                {
424
                    listener.OnNotify(this,
2✔
425
                        new NotifyEventArgs(ProgressEventType.Warning,
2✔
426
                            $"Considered resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}' but decided not to because:{reason}"));
2✔
427
                    continue;
2✔
428
                }
429

430
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
62✔
431
                    $"Resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}'"));
62✔
432

433
                //try changing the Type to the legit type
434
                col.DataType.AlterTypeTo(newSqlType, _managedConnection.ManagedTransaction, AlterTimeout);
62✔
435

436
                changesMade = true;
62✔
437
            }
438

439
            if (changesMade)
418✔
440
                _bulkcopy.InvalidateTableSchema();
62✔
441
        }
442

443
        swMeasuringStrings.Stop();
68✔
444
        listener.OnProgress(this,
68✔
445
            new ProgressEventArgs("Measuring DataType Sizes",
68✔
446
                new ProgressMeasurement(_affectedRows + toProcess.Rows.Count, ProgressType.Records),
68✔
447
                swMeasuringStrings.Elapsed));
68✔
448
    }
68✔
449

450
    /// <summary>
451
    /// Returns true if we should not be trying to do this alter after all
452
    /// </summary>
453
    /// <param name="oldSqlType">The database proprietary type you are considering altering from</param>
454
    /// <param name="newSqlType">The ANSI SQL type you are considering altering to</param>
455
    /// <param name="reason">Null or the reason we are returning true</param>
456
    /// <returns>True if the proposed alter is a bad idea and shouldn't be attempted</returns>
457
    protected virtual bool AbandonAlter(string oldSqlType, string newSqlType, out string reason)
458
    {
459
        var basicallyDecimalAlready = new List<string> { "real", "double", "float", "single" };
64✔
460

461
        var first = basicallyDecimalAlready.FirstOrDefault(c =>
64✔
462
            oldSqlType.Contains(c, StringComparison.InvariantCultureIgnoreCase));
314✔
463

464
        if (first != null && newSqlType.Contains("decimal", StringComparison.InvariantCultureIgnoreCase))
64✔
465
        {
466
            reason = $"Resizing from {first} to decimal is a bad idea and likely to fail";
2✔
467
            return true;
2✔
468
        }
469

470
        reason = null;
62✔
471
        return false;
62✔
472
    }
473

474
    public void Abort(IDataLoadEventListener listener)
475
    {
476
        _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
×
477
    }
×
478

479
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
480
    {
481
        try
482
        {
483
            if (_managedConnection != null)
218✔
484
            {
485
                //if there was an error
486
                if (pipelineFailureExceptionIfAny != null)
214✔
487
                {
488
                    _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
38✔
489

490
                    listener.OnNotify(this,
38✔
491
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction rolled back successfully"));
38✔
492

493
                    _bulkcopy?.Dispose();
38!
494
                }
495
                else
496
                {
497
                    _managedConnection.ManagedTransaction.CommitAndCloseConnection();
176✔
498

499
                    _bulkcopy?.Dispose();
176!
500

501
                    listener.OnNotify(this,
176✔
502
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction committed successfully"));
176✔
503
                }
504
            }
505
        }
218✔
506
        catch (Exception e)
×
507
        {
508
            listener.OnNotify(this,
×
509
                new NotifyEventArgs(ProgressEventType.Error,
×
510
                    "Commit failed on transaction (probably there was a previous error?)", e));
×
511
        }
×
512

513
        //if we have a primary key to create
514
        if (pipelineFailureExceptionIfAny == null && _primaryKey?.Any() == true && _discoveredTable?.Exists() == true)
218!
515
        {
516
            //Find the columns in the destination
517
            var allColumns = _discoveredTable.DiscoverColumns();
34✔
518

519
            //if there are not yet any primary keys
520
            if (allColumns.All(c => !c.IsPrimaryKey))
378✔
521
            {
522
                //find the columns the user decorated in his DataTable
523
                var pkColumnsToCreate = allColumns.Where(c =>
26✔
524
                        _primaryKey.Any(pk => pk.Equals(c.GetRuntimeName(), StringComparison.CurrentCultureIgnoreCase)))
748✔
525
                    .ToArray();
26✔
526

527
                //make sure we found all of them
528
                if (pkColumnsToCreate.Length != _primaryKey.Count)
26!
529
                    throw new Exception(
×
530
                        $"Could not find primary key column(s) {string.Join(",", _primaryKey)} in table {_discoveredTable}");
×
531

532
                //create the primary key to match user provided columns
533
                _discoveredTable.CreatePrimaryKey(AlterTimeout, pkColumnsToCreate);
26✔
534
            }
535
        }
536

537
        EndAuditIfExists();
218✔
538
    }
218✔
539

540
    private void EndAuditIfExists()
541
    {
542
        //user is auditing
543
        _loggingDatabaseListener?.FinalizeTableLoadInfos();
218!
544
    }
×
545

546
    public void Check(ICheckNotifier notifier)
547
    {
548
        if (LoggingServer != null)
×
549
            new LoggingDatabaseChecker(LoggingServer).Check(notifier);
×
550
        else
551
            notifier.OnCheckPerformed(
×
552
                new CheckEventArgs(
×
553
                    "There is no logging server so there will be no audit of this destinations activities",
×
554
                    CheckResult.Success));
×
555
    }
×
556

557
    private void StartAuditIfExists(string tableName)
558
    {
559
        if (LoggingServer != null && _dataLoadInfo == null)
270!
560
        {
561
            _loggingDatabaseSettings = DataAccessPortal.ExpectServer(LoggingServer, DataAccessContext.Logging);
×
562
            var logManager = new LogManager(_loggingDatabaseSettings);
×
563
            logManager.CreateNewLoggingTaskIfNotExists("Internal");
×
564

565
            _dataLoadInfo = (DataLoadInfo)logManager.CreateDataLoadInfo("Internal", GetType().Name,
×
566
                $"Loading table {tableName}", "", false);
×
567
            _loggingDatabaseListener = new ToLoggingDatabaseDataLoadEventListener(logManager, _dataLoadInfo);
×
568
        }
569
    }
270✔
570

571
    public void PreInitialize(DiscoveredDatabase value, IDataLoadEventListener listener)
572
    {
573
        _database = value;
228✔
574
        _server = value.Server;
228✔
575
    }
228✔
576

577
    /// <summary>
578
    /// Declare that the column of name columnName (which might or might not appear in DataTables being uploaded) should always have the associated database type (e.g. varchar(59))
579
    /// The columnName is Case insensitive.  Note that if AllowResizingColumnsAtUploadTime is true then these datatypes are only the starting types and might get changed later to
580
    /// accomodate new data.
581
    /// </summary>
582
    /// <param name="columnName"></param>
583
    /// <param name="explicitType"></param>
584
    /// <param name="columnFlags"></param>
585
    /// <returns>The Column Request that has been added to the array</returns>
586
    public DatabaseColumnRequest AddExplicitWriteType(string columnName, string explicitType,
587
        ISupplementalColumnInformation columnFlags = null)
588
    {
589
        DatabaseColumnRequest columnRequest;
590

591
        if (columnFlags == null)
634!
592
        {
593
            columnRequest = new DatabaseColumnRequest(columnName, explicitType, true);
634✔
594
            ExplicitTypes.Add(columnRequest);
634✔
595
            return columnRequest;
634✔
596
        }
597

598
        columnRequest = new DatabaseColumnRequest(columnName, explicitType,
×
599
            !columnFlags.IsPrimaryKey && !columnFlags.IsAutoIncrement)
×
600
        {
×
601
            IsPrimaryKey = columnFlags.IsPrimaryKey,
×
602
            IsAutoIncrement = columnFlags.IsAutoIncrement,
×
603
            Collation = columnFlags.Collation
×
604
        };
×
605

606
        ExplicitTypes.Add(columnRequest);
×
607
        return columnRequest;
×
608
    }
609
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc