• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 9758065908

02 Jul 2024 09:03AM UTC coverage: 56.679% (-0.2%) from 56.914%
9758065908

push

github

web-flow
Release/8.2.0 (#1867)

* add extraction additions

* interim

* add test

* interim

* working dedupe

* improved checking

* add timestamp option

* fix extra looping

* add check

* start on tests

* tidy up code

* update link

* tidy up

* Rename executeFullExtractionToDatabaseMSSql.md to ExecuteFullExtractionToDatabaseMSSql.md

* fix typo

* add docs

* update

* update documentation

* attempt fix docs

* update docs

* tidy up code

* better tests

* add real test

* tidy up

* interim

* grab existiing entity

* no new data

* add basic tests

* attempt to fix test

* interim

* interim commit

* working clash

* add test

* fix test

* improved clash checker

* tidy up

* update test

* fix up test

* update from codeql

* tidy up code

* fix bad merge

* fix typo

* skip over for now

* revert change

* Task/RDMP-180 Add instance settings table (#1820)

* working settings interface

* add documentation

* add missing files

* update namespace

* add icon

* update from run

* make key unique

* add tests

* update tests

* update for tests

* fix unique name issue

* tidy up

* tidy up from review

* works

* nested deprications

* recursive deprication

* tidy up

* add newline

* Task/rdmp 174 dqe improvements (#1849)

* working scallable graph

* add changelog

* add axis override

* interim

* working increments

* working ui refresh

* update changelog

* tidy up code

* add missing file

* tidy up

* Task/rdmp 155 migrate catalogue tables (#1805)

* start of UI

* interim

* working switch

* improved ui

* fix build

* rename duped file

* imterim

* add checks

* start of tests

* local tests  working

* add tests

* improved ui

* tidy up

* add single item use

* broken test

* updated tests

* tidy up imports

* add some documentation

* fix docume... (continued)

10912 of 20750 branches covered (52.59%)

Branch coverage included in aggregate %.

369 of 831 new or added lines in 38 files covered. (44.4%)

375 existing lines in 25 files now uncovered.

30965 of 53135 relevant lines covered (58.28%)

7845.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.39
/Rdmp.Core/DataLoad/Engine/Pipeline/Destinations/DataTableUploadDestination.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Diagnostics;
11
using System.Globalization;
12
using System.Linq;
13
using System.Threading.Tasks;
14
using FAnsi.Connections;
15
using FAnsi.Discovery;
16
using FAnsi.Discovery.TableCreation;
17
using Rdmp.Core.Curation.Data;
18
using Rdmp.Core.DataExport.Data;
19
using Rdmp.Core.DataFlowPipeline;
20
using Rdmp.Core.DataFlowPipeline.Requirements;
21
using Rdmp.Core.DataLoad.Triggers.Implementations;
22
using Rdmp.Core.DataLoad.Triggers;
23
using Rdmp.Core.Logging;
24
using Rdmp.Core.Logging.Listeners;
25
using Rdmp.Core.Repositories.Construction;
26
using Rdmp.Core.ReusableLibraryCode;
27
using Rdmp.Core.ReusableLibraryCode.Checks;
28
using Rdmp.Core.ReusableLibraryCode.DataAccess;
29
using Rdmp.Core.ReusableLibraryCode.Progress;
30
using TypeGuesser;
31
using FAnsi;
32
using Terminal.Gui;
33
using TB.ComponentModel;
34
using Npgsql;
35
using MathNet.Numerics.LinearAlgebra;
36

37
namespace Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
38

39
/// <summary>
40
/// Pipeline component (destination) which commits the DataTable(s) (in batches) to the DiscoveredDatabase (PreInitialize argument).  Supports cross platform
41
/// targets (MySql , Sql Server etc).  Normally the SQL Data Types and column names will be computed from the DataTable and a table will be created with the
42
/// name of the DataTable being processed.  If a matching table already exists you can choose to load it anyway in which case a basic bulk insert will take
43
/// place.
44
/// </summary>
45
public class DataTableUploadDestination : IPluginDataFlowComponent<DataTable>, IDataFlowDestination<DataTable>,
46
    IPipelineRequirement<DiscoveredDatabase>
47
{
48
    public const string LoggingServer_Description =
49
        "The logging server to log the upload to (leave blank to not bother auditing)";
50

51
    public const string AllowResizingColumnsAtUploadTime_Description =
52
        "If the target table being loaded has columns that are too small the destination will attempt to resize them";
53

54
    public const string AllowLoadingPopulatedTables_Description =
55
        "Normally when DataTableUploadDestination encounters a table that already contains records it will abandon the insertion attempt.  Set this to true to instead continue with the load.";
56

57
    public const string AlterTimeout_Description =
58
        "Timeout to perform all ALTER TABLE operations (column resize and PK creation)";
59

60
    [DemandsInitialization(LoggingServer_Description)]
61
    public ExternalDatabaseServer LoggingServer { get; set; }
366✔
62

63
    [DemandsInitialization(AllowResizingColumnsAtUploadTime_Description, DefaultValue = true)]
64
    public bool AllowResizingColumnsAtUploadTime { get; set; }
664✔
65

66
    [DemandsInitialization(AllowLoadingPopulatedTables_Description, DefaultValue = false)]
67
    public bool AllowLoadingPopulatedTables { get; set; }
78✔
68

69
    [DemandsInitialization(AlterTimeout_Description, DefaultValue = 300)]
70
    public int AlterTimeout { get; set; }
140✔
71

72
    [DemandsInitialization("Optional - Change system behaviour when a new table is being created by the component",
73
        TypeOf = typeof(IDatabaseColumnRequestAdjuster))]
74
    public Type Adjuster { get; set; }
334✔
75

76
    public bool AppendDataIfTableExists { get; set; }
352✔
77

78
    public bool IncludeTimeStamp { get; set; }
638✔
79

80
    private CultureInfo _culture;
81

82
    [DemandsInitialization("The culture to use for uploading (determines date format etc)")]
83
    public CultureInfo Culture
84
    {
85
        get => _culture ?? CultureInfo.CurrentCulture;
244✔
86
        set => _culture = value;
16✔
87
    }
88

89
    public string TargetTableName { get; private set; }
2,166✔
90

91
    /// <summary>
92
    /// True if a new table was created or re-created by the execution of this destination.  False if
93
    /// the table already existed e.g. data was simply added
94
    /// </summary>
95
    public bool CreatedTable { get; private set; }
658✔
96
    public bool UseTrigger { get; set; } = false;
616✔
97

98
    public bool IndexTables { get; set; } = false;
346✔
99
    public string IndexTableName { get; set; }
68✔
100
    public List<String> UserDefinedIndexes { get; set; } = new();
318✔
101

102
    private IBulkCopy _bulkcopy;
103
    private int _affectedRows;
104

105
    private Stopwatch swTimeSpentWriting = new();
248✔
106
    private Stopwatch swMeasuringStrings = new();
248✔
107

108
    private DiscoveredServer _loggingDatabaseSettings;
109

110
    private DiscoveredServer _server;
111
    private DiscoveredDatabase _database;
112

113
    private DataLoadInfo _dataLoadInfo;
114

115
    private IManagedConnection _managedConnection;
116
    private ToLoggingDatabaseDataLoadEventListener _loggingDatabaseListener;
117

118
    public List<DatabaseColumnRequest> ExplicitTypes { get; set; }
1,536✔
119

120
    private bool _firstTime = true;
248✔
121
    private HashSet<string> _primaryKey = new(StringComparer.CurrentCultureIgnoreCase);
248✔
122
    private DiscoveredTable _discoveredTable;
123
    private readonly string _extractionTimeStamp = "extraction_timestamp";
248✔
124

125
    private readonly IExternalCohortTable _externalCohortTable;
126

127
    //All column values sent to server so far
128
    private Dictionary<string, Guesser> _dataTypeDictionary;
129

130
    /// <summary>
131
    /// Optional function called when a name is needed for the table being uploaded (this overrides
132
    /// upstream components naming of tables - e.g. from file names).
133
    /// </summary>
134
    public Func<string> TableNamerDelegate { get; set; }
248✔
135

136
    public DataTableUploadDestination()
226✔
137
    {
138
        ExplicitTypes = new List<DatabaseColumnRequest>();
226✔
139
    }
226✔
140

141
    public DataTableUploadDestination(IExternalCohortTable externalCohortTable)
22✔
142
    {
143
        ExplicitTypes = new List<DatabaseColumnRequest>();
22✔
144
        _externalCohortTable = externalCohortTable;
22✔
145
    }
22✔
146

147
    private static object[] FilterOutItemAtIndex(object[] itemArray, int[] indexes)
148
    {
149
        if (indexes.Length == 0) return itemArray;
36!
150
        return itemArray.Where((source, idx) => !indexes.Contains(idx)).ToArray();
1,020✔
151
    }
152
    private string GetPKValue(DataColumn pkColumn, DataRow row)
153
    {
154
        var pkName = pkColumn.ColumnName;
4✔
155
        var value = row[pkName];
4✔
156
        if (_externalCohortTable is not null)
4!
157
        {
NEW
158
            var privateIdentifierField = _externalCohortTable.PrivateIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
×
NEW
159
            var releaseIdentifierField = _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
×
NEW
160
            if (pkName == releaseIdentifierField)
×
161
            {
162
                //going to have to look up the previous relaseID to match
NEW
163
                DiscoveredTable cohortTable = _externalCohortTable.DiscoverCohortTable();
×
NEW
164
                using var lookupDT = cohortTable.GetDataTable();
×
NEW
165
                var releaseIdIndex = lookupDT.Columns.IndexOf(releaseIdentifierField);
×
NEW
166
                var privateIdIndex = lookupDT.Columns.IndexOf(privateIdentifierField);
×
NEW
167
                var foundRow = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[releaseIdIndex].ToString() == value.ToString()).LastOrDefault();
×
NEW
168
                if (foundRow is not null)
×
169
                {
NEW
170
                    var originalValue = foundRow.ItemArray[privateIdIndex];
×
NEW
171
                    var existingIDsforReleaseID = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[privateIdIndex].ToString() == originalValue.ToString()).Select(s => s.ItemArray[releaseIdIndex].ToString());
×
NEW
172
                    if (existingIDsforReleaseID.Count() > 0)
×
173
                    {
174
                        //we don't know what the current releae ID is ( there may be ones from multiple cohorts)
NEW
175
                        var ids = existingIDsforReleaseID.Select(id => $"'{id}'");
×
NEW
176
                        return $"{pkName} in ({string.Join(',', ids)})";
×
177
                    }
178
                }
179
            }
180
        }
181
        return $"{pkName} = '{value}'";
4✔
NEW
182
    }
×
183

184

185
    public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener listener,
186
        GracefulCancellationToken cancellationToken)
187
    {
188
        if (toProcess == null)
310!
189
            return null;
×
190

191
        var rowsToModify = new List<DataRow>();
310✔
192
        var pkColumns = toProcess.PrimaryKey;
310✔
193
        RemoveInvalidCharactersInSchema(toProcess);
310✔
194

195
        IDatabaseColumnRequestAdjuster adjuster = null;
310✔
196
        if (Adjuster != null) adjuster = (IDatabaseColumnRequestAdjuster)ObjectConstructor.Construct(Adjuster);
314✔
197

198
        //work out the table name for the table we are going to create
199
        if (TargetTableName == null)
310✔
200
        {
201
            if (TableNamerDelegate != null)
248!
202
            {
203
                TargetTableName = TableNamerDelegate();
×
204
                if (string.IsNullOrWhiteSpace(TargetTableName))
×
205
                    throw new Exception("No table name specified (TableNamerDelegate returned null)");
×
206
            }
207
            else if (string.IsNullOrWhiteSpace(toProcess.TableName))
248!
208
            {
209
                throw new Exception(
×
210
                    "Chunk did not have a TableName, did not know what to call the newly created table");
×
211
            }
212
            else
213
            {
214
                TargetTableName = QuerySyntaxHelper.MakeHeaderNameSensible(toProcess.TableName);
248✔
215
            }
216
        }
217

218
        ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(toProcess);
310✔
219

220
        StartAuditIfExists(TargetTableName);
310✔
221

222
        if (IncludeTimeStamp)
310!
223
        {
NEW
224
            AddTimeStampToExtractionData(toProcess);
×
225
        }
226

227
        if (_loggingDatabaseListener != null)
310!
228
            listener = new ForkDataLoadEventListener(listener, _loggingDatabaseListener);
×
229

230
        EnsureTableHasDataInIt(toProcess);
310✔
231

232
        CreatedTable = false;
306✔
233

234
        if (_firstTime)
306✔
235
        {
236
            var tableAlreadyExistsButEmpty = false;
244✔
237

238
            if (!_database.Exists())
244!
239
                throw new Exception($"Database {_database} does not exist");
×
240

241
            _discoveredTable = _database.ExpectTable(TargetTableName);
244✔
242

243
            //table already exists
244
            if (_discoveredTable.Exists())
244✔
245
            {
246
                tableAlreadyExistsButEmpty = true;
56✔
247

248
                if (!AllowLoadingPopulatedTables)
56✔
249
                    if (_discoveredTable.IsEmpty())
50✔
250
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
38✔
251
                            $"Found table {TargetTableName} already, normally this would forbid you from loading it (data duplication / no primary key etc) but it is empty so we are happy to load it, it will not be created"));
38✔
252
                    else if (!AppendDataIfTableExists)
12!
253
                        throw new Exception(
×
254
                            $"There is already a table called {TargetTableName} at the destination {_database}");
×
255

256
                if (AllowResizingColumnsAtUploadTime)
56✔
257
                    _dataTypeDictionary = _discoveredTable.DiscoverColumns().ToDictionary(k => k.GetRuntimeName(),
454✔
258
                        v => v.GetGuesser(), StringComparer.CurrentCultureIgnoreCase);
454✔
259
            }
260
            else
261
            {
262
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
188✔
263
                    $"Determined that the table name {TargetTableName} is unique at destination {_database}"));
188✔
264
            }
265

266
            //create connection to destination
267
            if (!tableAlreadyExistsButEmpty)
244✔
268
            {
269
                CreatedTable = true;
188✔
270

271
                if (AllowResizingColumnsAtUploadTime)
188✔
272
                    _database.CreateTable(out _dataTypeDictionary, TargetTableName, toProcess, ExplicitTypes.ToArray(),
94✔
273
                        true, adjuster);
94✔
274
                else
275
                    _database.CreateTable(TargetTableName, toProcess, ExplicitTypes.ToArray(), true, adjuster);
94✔
276

277
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
188✔
278
                    $"Created table {TargetTableName} successfully."));
188✔
279
            }
280

281
            _managedConnection = _server.BeginNewTransactedConnection();
244✔
282
            _bulkcopy = _discoveredTable.BeginBulkInsert(Culture, _managedConnection.ManagedTransaction);
244✔
283
            _firstTime = false;
244✔
284
        }
285

286
        if (IncludeTimeStamp && !_discoveredTable.DiscoverColumns().Where(c => c.GetRuntimeName() == _extractionTimeStamp).Any())
306!
287
        {
NEW
288
            _discoveredTable.AddColumn(_extractionTimeStamp, new DatabaseTypeRequest(typeof(DateTime)), true, 30000);
×
289
        }
290
        if (IndexTables)
306✔
291
        {
292
            var indexes = UserDefinedIndexes.Count != 0 ? UserDefinedIndexes : pkColumns.Select(c => c.ColumnName);
28✔
293
            try
294
            {
295
                _discoveredTable.CreateIndex(IndexTableName, _discoveredTable.DiscoverColumns().Where(c => indexes.Contains(c.GetRuntimeName())).ToArray());
86✔
296
            }
14✔
297
            catch (Exception e)
14✔
298
            {
299
                //We only warn about not creating the index, as it's not  critical
300
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, e.Message));
14✔
301
            }
14✔
302
        }
303
        if (UseTrigger && pkColumns.Length > 0)
306✔
304
        {
305
            if (listener.GetType() == typeof(ForkDataLoadEventListener)) //need to add special fields to the datatable if we are logging to a database
34✔
306
            {
307
                var job = (ForkDataLoadEventListener)listener;
20✔
308
                var listeners = job.GetToLoggingDatabaseDataLoadEventListenersIfany();
20✔
309
                foreach (var dleListener in listeners)
80✔
310
                {
311
                    IDataLoadInfo dataLoadInfo = dleListener.DataLoadInfo;
20✔
312
                    DataColumn newColumn = new(SpecialFieldNames.DataLoadRunID, typeof(int))
20✔
313
                    {
20✔
314
                        DefaultValue = dataLoadInfo.ID
20✔
315
                    };
20✔
316
                    try
317
                    {
318
                        _discoveredTable.DiscoverColumn(SpecialFieldNames.DataLoadRunID);
20✔
319
                    }
20✔
NEW
320
                    catch (Exception)
×
321
                    {
NEW
322
                        _discoveredTable.AddColumn(SpecialFieldNames.DataLoadRunID, new DatabaseTypeRequest(typeof(int)), true, 30000);
×
323

NEW
324
                    }
×
325
                    if (!toProcess.Columns.Contains(SpecialFieldNames.DataLoadRunID))
20!
NEW
326
                        toProcess.Columns.Add(newColumn);
×
327
                    foreach (DataRow dr in toProcess.Rows)
96✔
328
                        dr[SpecialFieldNames.DataLoadRunID] = dataLoadInfo.ID;
28✔
329

330
                }
331
            }
332
        }
333

334
        try
335
        {
336
            if (AllowResizingColumnsAtUploadTime && !CreatedTable)
306✔
337
                ResizeColumnsIfRequired(toProcess, listener);
70✔
338

339
            swTimeSpentWriting.Start();
306✔
340
            if (AppendDataIfTableExists && pkColumns.Length > 0) //assumes columns are the same
306✔
341
            {
342
                //drop any pk clashes
343
                var existingData = _discoveredTable.GetDataTable();
40✔
344
                var rowsToDelete = new List<DataRow>();
40✔
345
                var releaseIdentifier = _externalCohortTable is not null ? _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1] : "ReleaseId";
40✔
346
                int[] toProcessIgnoreColumns = [toProcess.Columns.IndexOf(SpecialFieldNames.DataLoadRunID), toProcess.Columns.IndexOf(releaseIdentifier)];
40✔
347
                int[] existingDataIgnoreColumns = [existingData.Columns.IndexOf(SpecialFieldNames.DataLoadRunID), existingData.Columns.IndexOf(releaseIdentifier), existingData.Columns.IndexOf(SpecialFieldNames.ValidFrom)];
40✔
348
                foreach (DataRow row in toProcess.Rows)
176✔
349
                {
350

351
                    foreach (DataColumn pkCol in pkColumns)
200✔
352
                    {
353
                        bool clash = false;
54✔
354
                        if (_externalCohortTable is not null && pkCol.ColumnName == _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1])
54✔
355
                        {
356
                            // If it's a cohort release identifier
357
                            // look up the original value and check we've not already extected the same value under a different release ID
358
                            var privateIdentifierField = _externalCohortTable.PrivateIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
28✔
359
                            var releaseIdentifierField = _externalCohortTable.ReleaseIdentifierField.Split('.').Last()[1..^1];//remove the "[]" from the identifier field
28✔
360
                            DiscoveredTable cohortTable = _externalCohortTable.DiscoverCohortTable();
28✔
361
                            var lookupDT = cohortTable.GetDataTable();
28✔
362
                            var releaseIdIndex = lookupDT.Columns.IndexOf(releaseIdentifierField);
28✔
363
                            var privateIdIndex = lookupDT.Columns.IndexOf(privateIdentifierField);
28✔
364
                            var foundRow = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[releaseIdIndex].ToString() == row[pkCol.ColumnName].ToString()).FirstOrDefault();
72✔
365
                            if (foundRow is not null)
28✔
366
                            {
367
                                var originalValue = foundRow.ItemArray[privateIdIndex];
28✔
368
                                var existingIDsforReleaseID = lookupDT.Rows.Cast<DataRow>().Where(r => r.ItemArray[privateIdIndex].ToString() == originalValue.ToString()).Select(s => s.ItemArray[releaseIdIndex].ToString());
88✔
369
                                clash = existingData.AsEnumerable().Any(r => existingIDsforReleaseID.Contains(r[pkCol.ColumnName].ToString()));
46✔
370
                            }
371
                        }
372
                        else
373
                        {
374
                            var val = row[pkCol.ColumnName];
26✔
375
                            clash = existingData.AsEnumerable().Any(r => r[pkCol.ColumnName].ToString() == val.ToString());
40✔
376

377
                        }
378
                        if (clash && UseTrigger)
54✔
379
                        {
380
                            if (existingData.AsEnumerable().Any(r => FilterOutItemAtIndex(r.ItemArray, existingDataIgnoreColumns).ToList().SequenceEqual(FilterOutItemAtIndex(row.ItemArray, toProcessIgnoreColumns).ToList()))) //do we have to worry about special field? what if the load ids are different?
36✔
381
                            {
382
                                //the row is the exact same,so there is no clash
383
                                clash = false;
14✔
384
                                rowsToDelete.Add(row);
14✔
385
                            }
386
                            else //row needs updated, but only if we're tracking history
387
                            {
388
                                rowsToModify.Add(row);//need to know what releaseId to replace
4✔
389
                                break;
4✔
390
                            }
391
                        }
392
                    }
393
                }
394
                foreach (DataRow row in rowsToDelete.Distinct())
104✔
395
                    toProcess.Rows.Remove(row);
12✔
396

397
            }
398

399

400
            foreach (DataRow row in rowsToModify.Distinct())
620✔
401
            {
402
                //replace existing 
403
                var args = new DatabaseOperationArgs();
4✔
404
                List<String> columns = [];
4✔
405
                foreach (DataColumn column in toProcess.Columns)
24✔
406
                {
407
                    //if (!pkColumns.Contains(column))
408
                    //{
409
                    columns.Add(column.ColumnName);
8✔
410
                    //}
411
                }
412
                //need to check for removed column and null them out
413
                var existingColumns = _discoveredTable.DiscoverColumns().Select(c => c.GetRuntimeName());
20✔
414
                var columnsThatPreviouslyExisted = existingColumns.Where(c => !pkColumns.Select(pk => pk.ColumnName).Contains(c) && !columns.Contains(c) && c != SpecialFieldNames.DataLoadRunID && c != SpecialFieldNames.ValidFrom);
36✔
415
                var nullEntries = string.Join(" ,", columnsThatPreviouslyExisted.Select(c => $"{c} = NULL"));
4✔
416
                var nullText = nullEntries.Length > 0 ? $" , {nullEntries}" : "";
4!
417
                var columnString = string.Join(" , ", columns.Select(col => $"{col} = '{row[col]}'").ToList());
12✔
418
                var pkMatch = string.Join(" AND ", pkColumns.Select(pk => GetPKValue(pk, row)).ToList());
8✔
419
                var sql = $"update {_discoveredTable.GetFullyQualifiedName()} set {columnString} {nullText} where {pkMatch}";
4✔
420
                var cmd = _discoveredTable.GetCommand(sql, args.GetManagedConnection(_discoveredTable).Connection);
4✔
421
                cmd.ExecuteNonQuery();
4✔
422
            }
423

424
            foreach (DataRow row in rowsToModify.Distinct())
620✔
425
            {
426
                toProcess.Rows.Remove(row);
4✔
427
            }
428
            if (toProcess.Rows.Count == 0 && !rowsToModify.Any()) return null;
310✔
429
            if (toProcess.Rows.Count > 0)
302✔
430
            {
431
                _affectedRows += _bulkcopy.Upload(toProcess);
298✔
432
            }
433

434
            swTimeSpentWriting.Stop();
262✔
435
            listener.OnProgress(this,
262✔
436
                new ProgressEventArgs($"Uploading to {TargetTableName}",
262✔
437
                    new ProgressMeasurement(_affectedRows, ProgressType.Records), swTimeSpentWriting.Elapsed));
262✔
438
        }
262✔
439
        catch (Exception e)
40✔
440
        {
441
            _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
40✔
442

443
            if (LoggingServer != null)
40!
444
                _dataLoadInfo.LogFatalError(GetType().Name, ExceptionHelper.ExceptionToListOfInnerMessages(e, true));
×
445

446
            throw new Exception($"Failed to write rows (in transaction) to table {TargetTableName}", e);
40✔
447
        }
448

449

450
        _dataLoadInfo?.CloseAndMarkComplete();
262!
451
        return null;
262✔
452
    }
4✔
453

454

455
    private static void RemoveInvalidCharactersInSchema(DataTable toProcess)
456
    {
457
        var invalidSymbols = new[] { '.' };
310✔
458

459
        if (!string.IsNullOrWhiteSpace(toProcess.TableName) && invalidSymbols.Any(c => toProcess.TableName.Contains(c)))
614!
460
            foreach (var symbol in invalidSymbols)
×
461
                toProcess.TableName = toProcess.TableName.Replace(symbol.ToString(), "");
×
462

463
        foreach (DataColumn col in toProcess.Columns)
4,060✔
464
            if (!string.IsNullOrWhiteSpace(col.ColumnName) && invalidSymbols.Any(c => col.ColumnName.Contains(c)))
3,440✔
465
                foreach (var symbol in invalidSymbols)
16✔
466
                    col.ColumnName = col.ColumnName.Replace(symbol.ToString(), "");
4✔
467
    }
310✔
468

469

470
    /// <summary>
471
    /// Clears the primary key status of the DataTable / <see cref="ExplicitTypes"/>.  These are recorded in <see cref="_primaryKey"/> and applied at Dispose time
472
    /// in order that primary key in the destination database table does not interfere with ALTER statements (see <see cref="ResizeColumnsIfRequired"/>)
473
    /// </summary>
474
    /// <param name="toProcess"></param>
475
    private void ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(DataTable toProcess)
476
    {
477
        //handle primary keyness by removing it until Dispose step
478
        foreach (var pkCol in toProcess.PrimaryKey.Select(dc => dc.ColumnName))
824✔
479
            _primaryKey.Add(pkCol);
68✔
480

481
        toProcess.PrimaryKey = Array.Empty<DataColumn>();
310✔
482

483
        //also get rid of any ExplicitTypes primary keys
484
        foreach (var dcr in ExplicitTypes.Where(dcr => dcr.IsPrimaryKey))
1,466✔
485
        {
486
            dcr.IsPrimaryKey = false;
26✔
487
            _primaryKey.Add(dcr.ColumnName);
26✔
488
        }
489
    }
310✔
490

491

492
    private void AddTimeStampToExtractionData(DataTable toProcess)
493
    {
NEW
494
        var timeStamp = DateTime.Now;
×
NEW
495
        toProcess.Columns.Add(_extractionTimeStamp);
×
NEW
496
        foreach (DataRow row in toProcess.Rows)
×
497
        {
NEW
498
            row[_extractionTimeStamp] = timeStamp;
×
499
        }
NEW
500
    }
×
501

502
    private static void EnsureTableHasDataInIt(DataTable toProcess)
503
    {
504
        if (toProcess.Columns.Count == 0)
310✔
505
            throw new Exception($"DataTable '{toProcess}' had no Columns!");
2✔
506

507
        if (toProcess.Rows.Count == 0)
308✔
508
            throw new Exception($"DataTable '{toProcess}' had no Rows!");
2✔
509
    }
306✔
510

511
    private void ResizeColumnsIfRequired(DataTable toProcess, IDataLoadEventListener listener)
512
    {
513
        swMeasuringStrings.Start();
70✔
514

515
        var tbl = _database.ExpectTable(TargetTableName);
70✔
516
        var typeTranslater = tbl.GetQuerySyntaxHelper().TypeTranslater;
70✔
517

518
        //Get the current estimates from the datatype computer
519
        var oldTypes = _dataTypeDictionary.ToDictionary(k => k.Key,
582✔
520
            v => typeTranslater.GetSQLDBTypeForCSharpType(v.Value.Guess), StringComparer.CurrentCultureIgnoreCase);
582✔
521

522
        //columns in
523
        var sharedColumns = new List<string>();
70✔
524

525
        //for each destination column
526
        foreach (var col in _dataTypeDictionary.Keys)
1,164✔
527
            //if it appears in the toProcess DataTable
528
            if (toProcess.Columns.Contains(col))
512✔
529
                sharedColumns.Add(col); //it is a shared column
498✔
530

531
        //for each shared column adjust the corresponding computer for all rows
532
        Parallel.ForEach(sharedColumns, col =>
70✔
533
        {
70✔
534
            var guesser = _dataTypeDictionary[col];
498✔
535
            foreach (DataRow row in toProcess.Rows)
2,680✔
536
                guesser.AdjustToCompensateForValue(row[col]);
842✔
537
        });
568✔
538

539
        //see if any have changed
540
        foreach (DataColumn column in toProcess.Columns)
1,136✔
541
        {
542
            if (column.ColumnName == _extractionTimeStamp && IncludeTimeStamp)
498!
543
            {
544
                continue; //skip internally generated columns
545
            }
546
            //get what is required for the current batch and the current type that is configured in the live table
547
            oldTypes.TryGetValue(column.ColumnName, out var oldSqlType);
498✔
548
            _dataTypeDictionary.TryGetValue(column.ColumnName, out var knownType);
498✔
549

550
            var newSqlType = knownType is not null ? typeTranslater.GetSQLDBTypeForCSharpType(knownType.Guess) : null;
498!
551

552
            var changesMade = false;
498✔
553

554
            //if the SQL data type has degraded e.g. varchar(10) to varchar(50) or datetime to varchar(20)
555
            if (oldSqlType != newSqlType)
498✔
556
            {
557
                var col = tbl.DiscoverColumn(column.ColumnName, _managedConnection.ManagedTransaction);
64✔
558

559

560
                if (AbandonAlter(col.DataType.SQLType, newSqlType, out var reason))
64✔
561
                {
562
                    listener.OnNotify(this,
2✔
563
                        new NotifyEventArgs(ProgressEventType.Warning,
2✔
564
                            $"Considered resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}' but decided not to because:{reason}"));
2✔
565
                    continue;
2✔
566
                }
567

568
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
62✔
569
                    $"Resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}'"));
62✔
570

571
                //try changing the Type to the legit type
572
                col.DataType.AlterTypeTo(newSqlType, _managedConnection.ManagedTransaction, AlterTimeout);
62✔
573

574
                changesMade = true;
62✔
575
            }
576

577
            if (changesMade)
496✔
578
                _bulkcopy.InvalidateTableSchema();
62✔
579
        }
580

581
        swMeasuringStrings.Stop();
70✔
582
        listener.OnProgress(this,
70✔
583
            new ProgressEventArgs("Measuring DataType Sizes",
70✔
584
                new ProgressMeasurement(_affectedRows + toProcess.Rows.Count, ProgressType.Records),
70✔
585
                swMeasuringStrings.Elapsed));
70✔
586
    }
70✔
587

588
    /// <summary>
589
    /// Returns true if we should not be trying to do this alter after all
590
    /// </summary>
591
    /// <param name="oldSqlType">The database proprietary type you are considering altering from</param>
592
    /// <param name="newSqlType">The ANSI SQL type you are considering altering to</param>
593
    /// <param name="reason">Null or the reason we are returning true</param>
594
    /// <returns>True if the proposed alter is a bad idea and shouldn't be attempted</returns>
595
    protected virtual bool AbandonAlter(string oldSqlType, string newSqlType, out string reason)
596
    {
597
        var basicallyDecimalAlready = new List<string> { "real", "double", "float", "single" };
64✔
598

599
        var first = basicallyDecimalAlready.FirstOrDefault(c =>
64✔
600
            oldSqlType.Contains(c, StringComparison.InvariantCultureIgnoreCase));
314✔
601

602
        if (first != null && newSqlType.Contains("decimal", StringComparison.InvariantCultureIgnoreCase))
64✔
603
        {
604
            reason = $"Resizing from {first} to decimal is a bad idea and likely to fail";
2✔
605
            return true;
2✔
606
        }
607

608
        reason = null;
62✔
609
        return false;
62✔
610
    }
611

612
    public void Abort(IDataLoadEventListener listener)
613
    {
614
        _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
×
615
    }
×
616

617
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
618
    {
619
        try
620
        {
621
            if (_managedConnection != null)
256✔
622
            {
623
                //if there was an error
624
                if (pipelineFailureExceptionIfAny != null)
252✔
625
                {
626
                    _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
36✔
627

628
                    listener.OnNotify(this,
36✔
629
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction rolled back successfully"));
36✔
630

631
                    _bulkcopy?.Dispose();
36!
632
                }
633
                else
634
                {
635
                    _managedConnection.ManagedTransaction.CommitAndCloseConnection();
216✔
636

637
                    _bulkcopy?.Dispose();
216!
638

639
                    listener.OnNotify(this,
216✔
640
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction committed successfully"));
216✔
641
                }
642
            }
643
        }
256✔
644
        catch (Exception e)
×
645
        {
646
            listener.OnNotify(this,
×
647
                new NotifyEventArgs(ProgressEventType.Error,
×
648
                    "Commit failed on transaction (probably there was a previous error?)", e));
×
649
        }
×
650

651
        //if we have a primary key to create
652
        if (pipelineFailureExceptionIfAny == null && _primaryKey?.Any() == true && _discoveredTable?.Exists() == true)
256!
653
        {
654
            //Find the columns in the destination
655
            var allColumns = _discoveredTable.DiscoverColumns();
60✔
656

657
            //if there are not yet any primary keys
658
            if (allColumns.All(c => !c.IsPrimaryKey))
516✔
659
            {
660
                //find the columns the user decorated in his DataTable
661
                var pkColumnsToCreate = allColumns.Where(c =>
40✔
662
                        _primaryKey.Any(pk => pk.Equals(c.GetRuntimeName(), StringComparison.CurrentCultureIgnoreCase)))
948✔
663
                    .ToArray();
40✔
664

665
                //make sure we found all of them
666
                if (pkColumnsToCreate.Length != _primaryKey.Count)
40!
667
                    throw new Exception(
×
668
                        $"Could not find primary key column(s) {string.Join(",", _primaryKey)} in table {_discoveredTable}");
×
669

670
                //create the primary key to match user provided columns
671
                _discoveredTable.CreatePrimaryKey(AlterTimeout, pkColumnsToCreate);
40✔
672
            }
673
        }
674
        if (UseTrigger && _discoveredTable.DiscoverColumns().Where(col => col.IsPrimaryKey).Any()) //can't use triggers without a PK
292✔
675
        {
676

677
            var factory = new TriggerImplementerFactory(_database.Server.DatabaseType);
34✔
678
            var _triggerImplementer = factory.Create(_discoveredTable);
34✔
679
            var currentStatus = _triggerImplementer.GetTriggerStatus();
34✔
680
            if (currentStatus == TriggerStatus.Missing)
34✔
681
                try
682
                {
683
                    _triggerImplementer.CreateTrigger(ThrowImmediatelyCheckNotifier.Quiet);
18✔
684
                }
18✔
NEW
685
                catch (Exception e)
×
686
                {
NEW
687
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning, e.Message));
×
NEW
688
                }
×
689
        }
690

691
        EndAuditIfExists();
256✔
692
    }
256✔
693

694
    private void EndAuditIfExists()
695
    {
696
        //user is auditing
697
        _loggingDatabaseListener?.FinalizeTableLoadInfos();
256!
698
    }
×
699

700
    public void Check(ICheckNotifier notifier)
701
    {
702
        if (LoggingServer != null)
×
703
            new LoggingDatabaseChecker(LoggingServer).Check(notifier);
×
704
        else
705
            notifier.OnCheckPerformed(
×
706
                new CheckEventArgs(
×
707
                    "There is no logging server so there will be no audit of this destinations activities",
×
708
                    CheckResult.Success));
×
709
    }
×
710

711
    private void StartAuditIfExists(string tableName)
712
    {
713
        if (LoggingServer != null && _dataLoadInfo == null)
310!
714
        {
715
            _loggingDatabaseSettings = DataAccessPortal.ExpectServer(LoggingServer, DataAccessContext.Logging);
×
716
            var logManager = new LogManager(_loggingDatabaseSettings);
×
717
            logManager.CreateNewLoggingTaskIfNotExists("Internal");
×
718

719
            _dataLoadInfo = (DataLoadInfo)logManager.CreateDataLoadInfo("Internal", GetType().Name,
×
720
                $"Loading table {tableName}", "", false);
×
721
            _loggingDatabaseListener = new ToLoggingDatabaseDataLoadEventListener(logManager, _dataLoadInfo);
×
722
        }
723
    }
310✔
724

725
    public void PreInitialize(DiscoveredDatabase value, IDataLoadEventListener listener)
726
    {
727
        _database = value;
264✔
728
        _server = value.Server;
264✔
729
    }
264✔
730

731
    /// <summary>
732
    /// Declare that the column of name columnName (which might or might not appear in DataTables being uploaded) should always have the associated database type (e.g. varchar(59))
733
    /// The columnName is Case insensitive.  Note that if AllowResizingColumnsAtUploadTime is true then these datatypes are only the starting types and might get changed later to
734
    /// accomodate new data.
735
    /// </summary>
736
    /// <param name="columnName"></param>
737
    /// <param name="explicitType"></param>
738
    /// <param name="columnFlags"></param>
739
    /// <returns>The Column Request that has been added to the array</returns>
740
    public DatabaseColumnRequest AddExplicitWriteType(string columnName, string explicitType,
741
        ISupplementalColumnInformation columnFlags = null)
742
    {
743
        DatabaseColumnRequest columnRequest;
744

745
        if (columnFlags == null)
790!
746
        {
747
            columnRequest = new DatabaseColumnRequest(columnName, explicitType, true);
790✔
748
            ExplicitTypes.Add(columnRequest);
790✔
749
            return columnRequest;
790✔
750
        }
751

752
        columnRequest = new DatabaseColumnRequest(columnName, explicitType,
×
753
            !columnFlags.IsPrimaryKey && !columnFlags.IsAutoIncrement)
×
754
        {
×
755
            IsPrimaryKey = columnFlags.IsPrimaryKey,
×
756
            IsAutoIncrement = columnFlags.IsAutoIncrement,
×
757
            Collation = columnFlags.Collation
×
758
        };
×
759

760
        ExplicitTypes.Add(columnRequest);
×
761
        return columnRequest;
×
762
    }
763
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc