• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 8451022258

27 Mar 2024 11:29AM UTC coverage: 56.272% (-0.6%) from 56.886%
8451022258

Pull #1779

github

web-flow
Merge branch 'develop' into task/RDMP-122-update-a-database-extraction
Pull Request #1779: Task/rdmp 122 update a database extraction

10712 of 20527 branches covered (52.18%)

Branch coverage included in aggregate %.

29 of 40 new or added lines in 3 files covered. (72.5%)

370 existing lines in 21 files now uncovered.

30518 of 52742 relevant lines covered (57.86%)

7355.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.82
/Rdmp.Core/DataLoad/Engine/Pipeline/Destinations/DataTableUploadDestination.cs
1
// Copyright (c) The University of Dundee 2018-2024
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Diagnostics;
11
using System.Globalization;
12
using System.Linq;
13
using System.Threading.Tasks;
14
using FAnsi.Connections;
15
using FAnsi.Discovery;
16
using FAnsi.Discovery.TableCreation;
17
using Rdmp.Core.Curation.Data;
18
using Rdmp.Core.DataFlowPipeline;
19
using Rdmp.Core.DataFlowPipeline.Requirements;
20
using Rdmp.Core.Logging;
21
using Rdmp.Core.Logging.Listeners;
22
using Rdmp.Core.Repositories.Construction;
23
using Rdmp.Core.ReusableLibraryCode;
24
using Rdmp.Core.ReusableLibraryCode.Checks;
25
using Rdmp.Core.ReusableLibraryCode.DataAccess;
26
using Rdmp.Core.ReusableLibraryCode.Progress;
27
using TypeGuesser;
28

29
namespace Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
30

31
/// <summary>
32
/// Pipeline component (destination) which commits the DataTable(s) (in batches) to the DiscoveredDatabase (PreInitialize argument).  Supports cross platform
33
/// targets (MySql , Sql Server etc).  Normally the SQL Data Types and column names will be computed from the DataTable and a table will be created with the
34
/// name of the DataTable being processed.  If a matching table already exists you can choose to load it anyway in which case a basic bulk insert will take
35
/// place.
36
/// </summary>
37
public class DataTableUploadDestination : IPluginDataFlowComponent<DataTable>, IDataFlowDestination<DataTable>,
38
    IPipelineRequirement<DiscoveredDatabase>
39
{
40
    public const string LoggingServer_Description =
41
        "The logging server to log the upload to (leave blank to not bother auditing)";
42

43
    public const string AllowResizingColumnsAtUploadTime_Description =
44
        "If the target table being loaded has columns that are too small the destination will attempt to resize them";
45

46
    public const string AllowLoadingPopulatedTables_Description =
47
        "Normally when DataTableUploadDestination encounters a table that already contains records it will abandon the insertion attempt.  Set this to true to instead continue with the load.";
48

49
    public const string AlterTimeout_Description =
50
        "Timeout to perform all ALTER TABLE operations (column resize and PK creation)";
51

52
    [DemandsInitialization(LoggingServer_Description)]
53
    public ExternalDatabaseServer LoggingServer { get; set; }
288✔
54

55
    [DemandsInitialization(AllowResizingColumnsAtUploadTime_Description, DefaultValue = true)]
56
    public bool AllowResizingColumnsAtUploadTime { get; set; }
522✔
57

58
    [DemandsInitialization(AllowLoadingPopulatedTables_Description, DefaultValue = false)]
59
    public bool AllowLoadingPopulatedTables { get; set; }
55✔
60

61
    [DemandsInitialization(AlterTimeout_Description, DefaultValue = 300)]
62
    public int AlterTimeout { get; set; }
84✔
63

64
    [DemandsInitialization("Optional - Change system behaviour when a new table is being created by the component",
65
        TypeOf = typeof(IDatabaseColumnRequestAdjuster))]
66
    public Type Adjuster { get; set; }
259✔
67

68
    public bool AppendDataIfTableExists { get; set; }
253✔
69

70
    public bool IncludeTimeStamp { get; set; }
496✔
71

72
    private CultureInfo _culture;
73

74
    [DemandsInitialization("The culture to use for uploading (determines date format etc)")]
75
    public CultureInfo Culture
76
    {
77
        get => _culture ?? CultureInfo.CurrentCulture;
193✔
78
        set => _culture = value;
2✔
79
    }
80

81
    public string TargetTableName { get; private set; }
1,724✔
82

83
    /// <summary>
84
    /// True if a new table was created or re-created by the execution of this destination.  False if
85
    /// the table already existed e.g. data was simply added
86
    /// </summary>
87
    public bool CreatedTable { get; private set; }
525✔
88

89
    private IBulkCopy _bulkcopy;
90
    private int _affectedRows;
91

92
    private Stopwatch swTimeSpentWriting = new();
197✔
93
    private Stopwatch swMeasuringStrings = new();
197✔
94

95
    private DiscoveredServer _loggingDatabaseSettings;
96

97
    private DiscoveredServer _server;
98
    private DiscoveredDatabase _database;
99

100
    private DataLoadInfo _dataLoadInfo;
101

102
    private IManagedConnection _managedConnection;
103
    private ToLoggingDatabaseDataLoadEventListener _loggingDatabaseListener;
104

105
    public List<DatabaseColumnRequest> ExplicitTypes { get; set; }
602✔
106

107
    private bool _firstTime = true;
197✔
108
    private HashSet<string> _primaryKey = new(StringComparer.CurrentCultureIgnoreCase);
197✔
109
    private DiscoveredTable _discoveredTable;
110
    private readonly string _extractionTimeStamp = "extraction_timestamp";
197✔
111

112
    //All column values sent to server so far
113
    private Dictionary<string, Guesser> _dataTypeDictionary;
114

115
    /// <summary>
116
    /// Optional function called when a name is needed for the table being uploaded (this overrides
117
    /// upstream components naming of tables - e.g. from file names).
118
    /// </summary>
119
    public Func<string> TableNamerDelegate { get; set; }
197✔
120

121
    public DataTableUploadDestination()
197✔
122
    {
123
        ExplicitTypes = new List<DatabaseColumnRequest>();
197✔
124
    }
197✔
125

126
    public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener listener,
127
        GracefulCancellationToken cancellationToken)
128
    {
129
        if (toProcess == null)
249!
130
            return null;
×
131
        var pkColumns = toProcess.PrimaryKey;
249✔
132
        RemoveInvalidCharactersInSchema(toProcess);
249✔
133

134
        IDatabaseColumnRequestAdjuster adjuster = null;
249✔
135
        if (Adjuster != null) adjuster = (IDatabaseColumnRequestAdjuster)ObjectConstructor.Construct(Adjuster);
253✔
136

137
        //work out the table name for the table we are going to create
138
        if (TargetTableName == null)
249✔
139
        {
140
            if (TableNamerDelegate != null)
197!
141
            {
142
                TargetTableName = TableNamerDelegate();
×
143
                if (string.IsNullOrWhiteSpace(TargetTableName))
×
144
                    throw new Exception("No table name specified (TableNamerDelegate returned null)");
×
145
            }
146
            else if (string.IsNullOrWhiteSpace(toProcess.TableName))
197!
147
            {
148
                throw new Exception(
×
149
                    "Chunk did not have a TableName, did not know what to call the newly created table");
×
150
            }
151
            else
152
            {
153
                TargetTableName = QuerySyntaxHelper.MakeHeaderNameSensible(toProcess.TableName);
197✔
154
            }
155
        }
156

157
        ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(toProcess);
249✔
158

159
        StartAuditIfExists(TargetTableName);
249✔
160

161
        if (IncludeTimeStamp)
249!
162
        {
NEW
163
            AddTimeStampToExtractionData(toProcess);
×
164
        }
165

166
        if (_loggingDatabaseListener != null)
249!
167
            listener = new ForkDataLoadEventListener(listener, _loggingDatabaseListener);
×
168

169
        EnsureTableHasDataInIt(toProcess);
249✔
170

171
        CreatedTable = false;
245✔
172

173
        if (_firstTime)
245✔
174
        {
175
            var tableAlreadyExistsButEmpty = false;
193✔
176

177
            if (!_database.Exists())
193!
178
                throw new Exception($"Database {_database} does not exist");
×
179

180
            _discoveredTable = _database.ExpectTable(TargetTableName);
193✔
181

182
            //table already exists
183
            if (_discoveredTable.Exists())
193✔
184
            {
185
                tableAlreadyExistsButEmpty = true;
47✔
186

187
                if (!AllowLoadingPopulatedTables)
47✔
188
                    if (_discoveredTable.IsEmpty())
41✔
189
                        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
39✔
190
                            $"Found table {TargetTableName} already, normally this would forbid you from loading it (data duplication / no primary key etc) but it is empty so we are happy to load it, it will not be created"));
39✔
191
                    else if (!AppendDataIfTableExists)
2!
192
                        throw new Exception(
×
193
                            $"There is already a table called {TargetTableName} at the destination {_database}");
×
194

195
                if (AllowResizingColumnsAtUploadTime)
47✔
196
                    _dataTypeDictionary = _discoveredTable.DiscoverColumns().ToDictionary(k => k.GetRuntimeName(),
56✔
197
                        v => v.GetGuesser(), StringComparer.CurrentCultureIgnoreCase);
56✔
198
            }
199
            else
200
            {
201
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
146✔
202
                    $"Determined that the table name {TargetTableName} is unique at destination {_database}"));
146✔
203
            }
204

205
            //create connection to destination
206
            if (!tableAlreadyExistsButEmpty)
193✔
207
            {
208
                CreatedTable = true;
146✔
209

210
                if (AllowResizingColumnsAtUploadTime)
146✔
211
                    _database.CreateTable(out _dataTypeDictionary, TargetTableName, toProcess, ExplicitTypes.ToArray(),
72✔
212
                        true, adjuster);
72✔
213
                else
214
                    _database.CreateTable(TargetTableName, toProcess, ExplicitTypes.ToArray(), true, adjuster);
74✔
215

216
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
146✔
217
                    $"Created table {TargetTableName} successfully."));
146✔
218
            }
219

220
            _managedConnection = _server.BeginNewTransactedConnection();
193✔
221
            _bulkcopy = _discoveredTable.BeginBulkInsert(Culture, _managedConnection.ManagedTransaction);
193✔
222

223
            _firstTime = false;
193✔
224
        }
225

226

227
        if (IncludeTimeStamp && !_discoveredTable.DiscoverColumns().Where(c => c.GetRuntimeName() == _extractionTimeStamp).Any())
245!
228
        {
NEW
229
            _discoveredTable.AddColumn(_extractionTimeStamp, new DatabaseTypeRequest(typeof(DateTime)), true, 30000);
×
230
        }
231

232
        try
233
        {
234
            if (AllowResizingColumnsAtUploadTime && !CreatedTable)
245✔
235
                ResizeColumnsIfRequired(toProcess, listener);
62✔
236

237
            //push the data
238
            swTimeSpentWriting.Start();
245✔
239
            //there is no pks for some reason
240
            if (AppendDataIfTableExists && pkColumns.Length > 0) //assumes columns are the same
245✔
241
            {
242
                //drop any pk clashes
243
                var existingData = _discoveredTable.GetDataTable();
2✔
244
                var rowsToDelete = new List<DataRow>();
2✔
245

246
                foreach (DataRow row in toProcess.Rows)
8✔
247
                {
248

249
                    foreach (DataColumn pkCol in pkColumns)
8✔
250
                    {
251
                        var val = row[pkCol.ColumnName];
2✔
252
                        var clash = existingData.AsEnumerable().Any(r => r[pkCol.ColumnName].ToString() == val.ToString());
2✔
253
                        if (clash)
2!
254
                        {
NEW
255
                            rowsToDelete.Add(row);
×
NEW
256
                            break;
×
257
                        }
258
                    }
259
                }
260
                foreach (DataRow row in rowsToDelete)
4!
261
                {
NEW
262
                    toProcess.Rows.Remove(row);
×
263
                }
264
                if (toProcess.Rows.Count == 0) return null;
2!
265
            }
266

267
            _affectedRows += _bulkcopy.Upload(toProcess);
245✔
268

269
            swTimeSpentWriting.Stop();
208✔
270
            listener.OnProgress(this,
208✔
271
                new ProgressEventArgs($"Uploading to {TargetTableName}",
208✔
272
                    new ProgressMeasurement(_affectedRows, ProgressType.Records), swTimeSpentWriting.Elapsed));
208✔
273
        }
208✔
274
        catch (Exception e)
37✔
275
        {
276
            _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
37✔
277

278
            if (LoggingServer != null)
37!
279
                _dataLoadInfo.LogFatalError(GetType().Name, ExceptionHelper.ExceptionToListOfInnerMessages(e, true));
×
280

281
            throw new Exception($"Failed to write rows (in transaction) to table {TargetTableName}", e);
37✔
282
        }
283

284

285
        _dataLoadInfo?.CloseAndMarkComplete();
208!
286
        return null;
208✔
UNCOV
287
    }
×
288

289
    private static void RemoveInvalidCharactersInSchema(DataTable toProcess)
290
    {
291
        var invalidSymbols = new[] { '.' };
249✔
292

293
        if (!string.IsNullOrWhiteSpace(toProcess.TableName) && invalidSymbols.Any(c => toProcess.TableName.Contains(c)))
492!
294
            foreach (var symbol in invalidSymbols)
×
295
                toProcess.TableName = toProcess.TableName.Replace(symbol.ToString(), "");
×
296

297
        foreach (DataColumn col in toProcess.Columns)
1,524✔
298
            if (!string.IsNullOrWhiteSpace(col.ColumnName) && invalidSymbols.Any(c => col.ColumnName.Contains(c)))
1,026✔
299
                foreach (var symbol in invalidSymbols)
16✔
300
                    col.ColumnName = col.ColumnName.Replace(symbol.ToString(), "");
4✔
301
    }
249✔
302

303

304
    /// <summary>
305
    /// Clears the primary key status of the DataTable / <see cref="ExplicitTypes"/>.  These are recorded in <see cref="_primaryKey"/> and applied at Dispose time
306
    /// in order that primary key in the destination database table does not interfere with ALTER statements (see <see cref="ResizeColumnsIfRequired"/>)
307
    /// </summary>
308
    /// <param name="toProcess"></param>
309
    private void ClearPrimaryKeyFromDataTableAndExplicitWriteTypes(DataTable toProcess)
310
    {
311
        //handle primary keyness by removing it until Dispose step
312
        foreach (var pkCol in toProcess.PrimaryKey.Select(dc => dc.ColumnName))
564✔
313
            _primaryKey.Add(pkCol);
22✔
314

315
        toProcess.PrimaryKey = Array.Empty<DataColumn>();
249✔
316

317
        //also get rid of any ExplicitTypes primary keys
318
        foreach (var dcr in ExplicitTypes.Where(dcr => dcr.IsPrimaryKey))
516✔
319
        {
320
            dcr.IsPrimaryKey = false;
2✔
321
            _primaryKey.Add(dcr.ColumnName);
2✔
322
        }
323
    }
249✔
324

325

326
    private void AddTimeStampToExtractionData(DataTable toProcess)
327
    {
NEW
328
        var timeStamp = DateTime.Now;
×
NEW
329
        toProcess.Columns.Add(_extractionTimeStamp);
×
NEW
330
        foreach (DataRow row in toProcess.Rows)
×
331
        {
NEW
332
            row[_extractionTimeStamp] = timeStamp;
×
333
        }
NEW
334
    }
×
335

336
    private static void EnsureTableHasDataInIt(DataTable toProcess)
337
    {
338
        if (toProcess.Columns.Count == 0)
249✔
339
            throw new Exception($"DataTable '{toProcess}' had no Columns!");
2✔
340

341
        if (toProcess.Rows.Count == 0)
247✔
342
            throw new Exception($"DataTable '{toProcess}' had no Rows!");
2✔
343
    }
245✔
344

345
    private void ResizeColumnsIfRequired(DataTable toProcess, IDataLoadEventListener listener)
346
    {
347
        swMeasuringStrings.Start();
62✔
348

349
        var tbl = _database.ExpectTable(TargetTableName);
62✔
350
        var typeTranslater = tbl.GetQuerySyntaxHelper().TypeTranslater;
62✔
351

352
        //Get the current estimates from the datatype computer
353
        var oldTypes = _dataTypeDictionary.ToDictionary(k => k.Key,
184✔
354
            v => typeTranslater.GetSQLDBTypeForCSharpType(v.Value.Guess), StringComparer.CurrentCultureIgnoreCase);
184✔
355

356
        //columns in
357
        var sharedColumns = new List<string>();
62✔
358

359
        //for each destination column
360
        foreach (var col in _dataTypeDictionary.Keys)
368✔
361
            //if it appears in the toProcess DataTable
362
            if (toProcess.Columns.Contains(col))
122✔
363
                sharedColumns.Add(col); //it is a shared column
118✔
364

365
        //for each shared column adjust the corresponding computer for all rows
366
        Parallel.ForEach(sharedColumns, col =>
62✔
367
        {
62✔
368
            var guesser = _dataTypeDictionary[col];
118✔
369
            foreach (DataRow row in toProcess.Rows)
536✔
370
                guesser.AdjustToCompensateForValue(row[col]);
150✔
371
        });
180✔
372

373
        //see if any have changed
374
        foreach (DataColumn column in toProcess.Columns)
360✔
375
        {
376
            if (column.ColumnName == _extractionTimeStamp && IncludeTimeStamp)
118!
377
            {
378
                continue; //skip internally generated columns
379
            }
380
            //get what is required for the current batch and the current type that is configured in the live table
381
            var oldSqlType = oldTypes[column.ColumnName];
118✔
382
            var newSqlType = typeTranslater.GetSQLDBTypeForCSharpType(_dataTypeDictionary[column.ColumnName].Guess);
118✔
383

384
            var changesMade = false;
118✔
385

386
            //if the SQL data type has degraded e.g. varchar(10) to varchar(50) or datetime to varchar(20)
387
            if (oldSqlType != newSqlType)
118✔
388
            {
389
                var col = tbl.DiscoverColumn(column.ColumnName, _managedConnection.ManagedTransaction);
64✔
390

391

392
                if (AbandonAlter(col.DataType.SQLType, newSqlType, out var reason))
64✔
393
                {
394
                    listener.OnNotify(this,
2✔
395
                        new NotifyEventArgs(ProgressEventType.Warning,
2✔
396
                            $"Considered resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}' but decided not to because:{reason}"));
2✔
397
                    continue;
2✔
398
                }
399

400
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
62✔
401
                    $"Resizing column '{column}' from '{col.DataType.SQLType}' to '{newSqlType}'"));
62✔
402

403
                //try changing the Type to the legit type
404
                col.DataType.AlterTypeTo(newSqlType, _managedConnection.ManagedTransaction, AlterTimeout);
62✔
405

406
                changesMade = true;
62✔
407
            }
408

409
            if (changesMade)
116✔
410
                _bulkcopy.InvalidateTableSchema();
62✔
411
        }
412

413
        swMeasuringStrings.Stop();
62✔
414
        listener.OnProgress(this,
62✔
415
            new ProgressEventArgs("Measuring DataType Sizes",
62✔
416
                new ProgressMeasurement(_affectedRows + toProcess.Rows.Count, ProgressType.Records),
62✔
417
                swMeasuringStrings.Elapsed));
62✔
418
    }
62✔
419

420
    /// <summary>
421
    /// Returns true if we should not be trying to do this alter after all
422
    /// </summary>
423
    /// <param name="oldSqlType">The database proprietary type you are considering altering from</param>
424
    /// <param name="newSqlType">The ANSI SQL type you are considering altering to</param>
425
    /// <param name="reason">Null or the reason we are returning true</param>
426
    /// <returns>True if the proposed alter is a bad idea and shouldn't be attempted</returns>
427
    protected virtual bool AbandonAlter(string oldSqlType, string newSqlType, out string reason)
428
    {
429
        var basicallyDecimalAlready = new List<string> { "real", "double", "float", "single" };
64✔
430

431
        var first = basicallyDecimalAlready.FirstOrDefault(c =>
64✔
432
            oldSqlType.Contains(c, StringComparison.InvariantCultureIgnoreCase));
314✔
433

434
        if (first != null && newSqlType.Contains("decimal", StringComparison.InvariantCultureIgnoreCase))
64✔
435
        {
436
            reason = $"Resizing from {first} to decimal is a bad idea and likely to fail";
2✔
437
            return true;
2✔
438
        }
439

440
        reason = null;
62✔
441
        return false;
62✔
442
    }
443

444
    public void Abort(IDataLoadEventListener listener)
445
    {
446
        _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
×
447
    }
×
448

449
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
450
    {
451
        try
452
        {
453
            if (_managedConnection != null)
197✔
454
            {
455
                //if there was an error
456
                if (pipelineFailureExceptionIfAny != null)
193✔
457
                {
458
                    _managedConnection.ManagedTransaction.AbandonAndCloseConnection();
37✔
459

460
                    listener.OnNotify(this,
37✔
461
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction rolled back successfully"));
37✔
462

463
                    _bulkcopy?.Dispose();
37!
464
                }
465
                else
466
                {
467
                    _managedConnection.ManagedTransaction.CommitAndCloseConnection();
156✔
468

469
                    _bulkcopy?.Dispose();
156!
470

471
                    listener.OnNotify(this,
156✔
472
                        new NotifyEventArgs(ProgressEventType.Information, "Transaction committed successfully"));
156✔
473
                }
474
            }
475
        }
197✔
476
        catch (Exception e)
×
477
        {
478
            listener.OnNotify(this,
×
479
                new NotifyEventArgs(ProgressEventType.Error,
×
480
                    "Commit failed on transaction (probably there was a previous error?)", e));
×
481
        }
×
482

483
        //if we have a primary key to create
484
        if (pipelineFailureExceptionIfAny == null && _primaryKey?.Any() == true && _discoveredTable?.Exists() == true)
197!
485
        {
486
            //Find the columns in the destination
487
            var allColumns = _discoveredTable.DiscoverColumns();
18✔
488

489
            //if there are not yet any primary keys
490
            if (allColumns.All(c => !c.IsPrimaryKey))
42✔
491
            {
492
                //find the columns the user decorated in his DataTable
493
                var pkColumnsToCreate = allColumns.Where(c =>
18✔
494
                        _primaryKey.Any(pk => pk.Equals(c.GetRuntimeName(), StringComparison.CurrentCultureIgnoreCase)))
48✔
495
                    .ToArray();
18✔
496

497
                //make sure we found all of them
498
                if (pkColumnsToCreate.Length != _primaryKey.Count)
18!
499
                    throw new Exception(
×
500
                        $"Could not find primary key column(s) {string.Join(",", _primaryKey)} in table {_discoveredTable}");
×
501

502
                //create the primary key to match user provided columns
503
                _discoveredTable.CreatePrimaryKey(AlterTimeout, pkColumnsToCreate);
18✔
504
            }
505
        }
506

507
        EndAuditIfExists();
197✔
508
    }
197✔
509

510
    private void EndAuditIfExists()
511
    {
512
        //user is auditing
513
        _loggingDatabaseListener?.FinalizeTableLoadInfos();
197!
514
    }
×
515

516
    public void Check(ICheckNotifier notifier)
517
    {
518
        if (LoggingServer != null)
×
519
            new LoggingDatabaseChecker(LoggingServer).Check(notifier);
×
520
        else
521
            notifier.OnCheckPerformed(
×
522
                new CheckEventArgs(
×
523
                    "There is no logging server so there will be no audit of this destinations activities",
×
524
                    CheckResult.Success));
×
525
    }
×
526

527
    private void StartAuditIfExists(string tableName)
528
    {
529
        if (LoggingServer != null && _dataLoadInfo == null)
249!
530
        {
531
            _loggingDatabaseSettings = DataAccessPortal.ExpectServer(LoggingServer, DataAccessContext.Logging);
×
532
            var logManager = new LogManager(_loggingDatabaseSettings);
×
533
            logManager.CreateNewLoggingTaskIfNotExists("Internal");
×
534

535
            _dataLoadInfo = (DataLoadInfo)logManager.CreateDataLoadInfo("Internal", GetType().Name,
×
536
                $"Loading table {tableName}", "", false);
×
537
            _loggingDatabaseListener = new ToLoggingDatabaseDataLoadEventListener(logManager, _dataLoadInfo);
×
538
        }
539
    }
249✔
540

541
    public void PreInitialize(DiscoveredDatabase value, IDataLoadEventListener listener)
542
    {
543
        _database = value;
199✔
544
        _server = value.Server;
199✔
545
    }
199✔
546

547
    /// <summary>
548
    /// Declare that the column of name columnName (which might or might not appear in DataTables being uploaded) should always have the associated database type (e.g. varchar(59))
549
    /// The columnName is Case insensitive.  Note that if AllowResizingColumnsAtUploadTime is true then these datatypes are only the starting types and might get changed later to
550
    /// accomodate new data.
551
    /// </summary>
552
    /// <param name="columnName"></param>
553
    /// <param name="explicitType"></param>
554
    /// <param name="columnFlags"></param>
555
    /// <returns>The Column Request that has been added to the array</returns>
556
    public DatabaseColumnRequest AddExplicitWriteType(string columnName, string explicitType,
557
        ISupplementalColumnInformation columnFlags = null)
558
    {
559
        DatabaseColumnRequest columnRequest;
560

561
        if (columnFlags == null)
10!
562
        {
563
            columnRequest = new DatabaseColumnRequest(columnName, explicitType, true);
10✔
564
            ExplicitTypes.Add(columnRequest);
10✔
565
            return columnRequest;
10✔
566
        }
567

568
        columnRequest = new DatabaseColumnRequest(columnName, explicitType,
×
569
            !columnFlags.IsPrimaryKey && !columnFlags.IsAutoIncrement)
×
570
        {
×
571
            IsPrimaryKey = columnFlags.IsPrimaryKey,
×
572
            IsAutoIncrement = columnFlags.IsAutoIncrement,
×
573
            Collation = columnFlags.Collation
×
574
        };
×
575

576
        ExplicitTypes.Add(columnRequest);
×
577
        return columnRequest;
×
578
    }
579
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc