• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 9859858140

09 Jul 2024 03:24PM UTC coverage: 56.679% (-0.2%) from 56.916%
9859858140

push

github

JFriel
update

10912 of 20750 branches covered (52.59%)

Branch coverage included in aggregate %.

30965 of 53135 relevant lines covered (58.28%)

7908.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.9
/Rdmp.Core/DataLoad/Modules/DataFlowOperations/Swapping/ColumnSwapper.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Globalization;
11
using System.Linq;
12
using Rdmp.Core.CohortCommitting.Pipeline;
13
using Rdmp.Core.Curation.Data;
14
using Rdmp.Core.Curation.Data.Spontaneous;
15
using Rdmp.Core.DataExport.Data;
16
using Rdmp.Core.DataExport.DataExtraction.Commands;
17
using Rdmp.Core.DataFlowPipeline;
18
using Rdmp.Core.DataFlowPipeline.Requirements;
19
using Rdmp.Core.DataLoad.Modules.DataFlowOperations.Aliases;
20
using Rdmp.Core.DataLoad.Modules.DataFlowOperations.Aliases.Exceptions;
21
using Rdmp.Core.QueryBuilding;
22
using Rdmp.Core.Repositories;
23
using Rdmp.Core.ReusableLibraryCode.Checks;
24
using Rdmp.Core.ReusableLibraryCode.DataAccess;
25
using Rdmp.Core.ReusableLibraryCode.Progress;
26
using TypeGuesser;
27

28
namespace Rdmp.Core.DataLoad.Modules.DataFlowOperations.Swapping;
29

30
/// <summary>
31
/// Swaps values stored in a given column for values found in a mapping table (e.g. swap ReleaseID for PrivateID)
32
/// </summary>
33
internal class ColumnSwapper : IPluginDataFlowComponent<DataTable>, IPipelineOptionalRequirement<IExtractCommand>,
34
    IPipelineOptionalRequirement<ICohortCreationRequest>
35
{
36
    [DemandsInitialization(
37
        "The column in your pipeline containing input values you want swapped.  Leave null to use the same name as the MappingFromColumn")]
38
    public string InputFromColumn { get; set; }
50✔
39

40
    [DemandsInitialization(
41
        "Name for the column you want to create in the output stream of this component containing the mapped values.  Leave null to use the same name as the MappingToColumn")]
42
    public string OutputToColumn { get; set; }
50✔
43

44
    [DemandsInitialization("The column in your database which stores the input values you want mapped",
45
        Mandatory = true)]
46
    public ColumnInfo MappingFromColumn { get; set; }
236✔
47

48
    [DemandsInitialization("The column in your database which stores the output values you want emitted",
49
        Mandatory = true)]
50
    public ColumnInfo MappingToColumn { get; set; }
210✔
51

52
    [DemandsInitialization(@"Optional text to add when generating the mapping table. Should not start with WHERE.
53

54
If Pipeline execution environment contains a Project then the following replacements are available:
55
    $p - Project Name ('e.g. My Project')
56
    $n - Project Number (e.g. 234)
57
    $t - Master Ticket (e.g. 'LINK-1234')
58

59
If Pipeline execution environment contains an ExtractionConfiguration then the following additional replacements are available:
60
    $r - Request Ticket (e.g. 'LINK-1234')
61
    $l - Release Ticket (e.g. 'LINK-1234')", DemandType = DemandType.SQL, ContextText = "WHERE")]
62
    public virtual string WHERELogic { get; set; }
114✔
63

64
    [DemandsInitialization("Determines behaviour when the same input value maps to multiple output values",
65
        DefaultValue = AliasResolutionStrategy.CrashIfAliasesFound)]
66
    public AliasResolutionStrategy AliasResolutionStrategy { get; set; }
8✔
67

68
    [DemandsInitialization(@"Determines behaviour when no mapping is found for an input value:
69
True - Crash the load
70
False - Drop the row from the DataTable (and issue a warning)", DefaultValue = true)]
71
    public bool CrashIfNoMappingsFound { get; set; }
8✔
72

73
    [DemandsInitialization("Timeout to set on fetching the mapping table", DefaultValue = 30)]
74
    public int Timeout { get; set; }
30✔
75

76
    [DemandsInitialization(
77
        @"Setting this to true will leave the original input column in your DataTable (so your table will have both input and output columns instead of a substitution)",
78
        DefaultValue = true)]
79
    public bool KeepInputColumnToo { get; set; }
34✔
80

81
    private CultureInfo _culture;
82

83
    [DemandsInitialization("The culture to use e.g. when Type translations are required")]
84
    public CultureInfo Culture
85
    {
86
        get => _culture ?? CultureInfo.CurrentCulture;
2✔
87
        set => _culture = value;
×
88
    }
89

90
    private Dictionary<object, List<object>> _mappingTable;
91

92
    /// <summary>
93
    /// The Type of objects that are stored in the Keys of <see cref="_mappingTable"/>.  For use when input types do not match the mapping table types
94
    /// </summary>
95
    private Type _keyType;
96

97

98
    protected IProject _project;
99
    protected IExtractionConfiguration _configuration;
100

101
    public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener listener,
102
        GracefulCancellationToken cancellationToken)
103
    {
104
        var fromColumnName = string.IsNullOrWhiteSpace(InputFromColumn)
34✔
105
            ? MappingFromColumn.GetRuntimeName()
34✔
106
            : InputFromColumn;
34✔
107
        var toColumnName = string.IsNullOrWhiteSpace(OutputToColumn)
34✔
108
            ? MappingToColumn.GetRuntimeName()
34✔
109
            : OutputToColumn;
34✔
110

111
        var inPlace = string.Equals(fromColumnName, toColumnName);
34✔
112

113
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, "About to build mapping table"));
34✔
114

115
        if (!toProcess.Columns.Contains(fromColumnName))
34✔
116
            throw new Exception($"DataTable did not contain a field called '{fromColumnName}'");
4✔
117

118
        if (!inPlace && toProcess.Columns.Contains(toColumnName))
30!
119
            throw new Exception($"DataTable already contained a field '{toColumnName}'");
×
120

121
        if (_mappingTable == null)
30✔
122
            BuildMappingTable(listener);
30✔
123

124
        if (!_mappingTable.Any())
30!
125
            throw new Exception("Mapping table was empty");
×
126

127
        if (_keyType == null)
30!
128
            throw new Exception("Unable to determine key datatype for mapping table");
×
129

130
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
30✔
131
            $"Mapping table resulted in {_mappingTable.Count} unique possible input values"));
30✔
132
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
30✔
133
            $"Mapping table resulted in {_mappingTable.Sum(kvp => kvp.Value.Count)} unique possible output values"));
122✔
134
        listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
30✔
135
            $"Mapping table Key is of Type {_keyType}"));
30✔
136

137
        //add the new column (the output column).  Unless we are just updating the same input column
138
        if (!inPlace) toProcess.Columns.Add(toColumnName);
56✔
139

140

141
        var idxFrom = toProcess.Columns.IndexOf(fromColumnName);
30✔
142
        var idxTo = toProcess.Columns.IndexOf(toColumnName);
30✔
143

144
        var numberOfElementsPerRow = toProcess.Columns.Count;
30✔
145

146
        var newRows = new List<object[]>();
30✔
147
        var toDrop = new List<DataRow>();
30✔
148

149
        // Flag and anonymous method for converting between input data type and mapping table datatype
150
        var doTypeConversion = false;
30✔
151
        Func<object, object> typeConversion = null;
30✔
152

153
        //if there is a difference between the input column datatype and the mapping table datatatype
154
        if (toProcess.Columns[idxFrom].DataType != _keyType)
30✔
155
        {
156
            //tell the user
157
            listener.OnNotify(this,
2✔
158
                new NotifyEventArgs(ProgressEventType.Warning,
2✔
159
                    $"Input DataTable column {fromColumnName} is of data type {toProcess.Columns[idxFrom].DataType}, this differs from mapping table which is {_keyType}.  Type conversion will take place between these two Types when performing lookup"));
2✔
160
            doTypeConversion = true;
2✔
161

162
            //work out a suitable anonymous method for converting between the Types
163
            if (_keyType == typeof(string))
2!
164
                typeConversion = a => a.ToString();
×
165
            else
166
                try
167
                {
168
                    var deciderFactory = new TypeDeciderFactory(Culture);
2✔
169
                    var decider = deciderFactory.Create(_keyType);
2✔
170
                    typeConversion = a => decider.Parse(a.ToString());
4✔
171
                }
2✔
172
                catch (Exception ex)
×
173
                {
174
                    throw new Exception(
×
175
                        $"Error building Type conversion decider for the mapping table key type {_keyType}", ex);
×
176
                }
177
        }
178

179
        foreach (DataRow row in toProcess.Rows)
192✔
180
        {
181
            var fromValue = row[idxFrom];
68✔
182

183
            //ignore null inputs, pass them straight through
184
            if (fromValue == DBNull.Value)
68✔
185
            {
186
                row[idxTo] = DBNull.Value;
4✔
187
                continue;
4✔
188
            }
189

190
            //if we have to do a Type conversion
191
            if (doTypeConversion)
64✔
192
                // convert the input value to the mapping table key Type
193
                fromValue = typeConversion(fromValue);
2✔
194

195
            //if we don't have the key value
196
            if (!_mappingTable.TryGetValue(fromValue, out var results))
64✔
197
                if (CrashIfNoMappingsFound)
4✔
198
                {
199
                    throw new KeyNotFoundException($"Could not find mapping for {fromValue}");
2✔
200
                }
201
                else
202
                {
203
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
2✔
204
                        $"No mapping for '{fromValue}' dropping row"));
2✔
205
                    toDrop.Add(row);
2✔
206
                    continue;
2✔
207
                }
208

209
            //we do have the key value!
210

211
            //yes 1
212
            if (results.Count == 1)
60✔
213
                row[idxTo] = results.Single();
56✔
214
            else
215
                //great we have multiple mappings, bob=>Frank and bob=>Jesus.  What does the user want to do about that
216
                switch (AliasResolutionStrategy)
4!
217
                {
218
                    case AliasResolutionStrategy.CrashIfAliasesFound:
219
                        throw new AliasException(
2✔
220
                            $"The value '{fromValue}' maps to multiple output values:{string.Join(",", results.Select(v => $"'{v}'"))}");
6✔
221

222
                    case AliasResolutionStrategy.MultiplyInputDataRowsByAliases:
223

224
                        //substitute for the first alias (bob=>Frank)
225
                        row[idxTo] = results.First();
2✔
226

227
                        //then clone the row and do a row with bob=>Jesus
228
                        foreach (var next in results.Skip(1))
8✔
229
                        {
230
                            //Create a copy of the input row
231
                            var newRow = new object[numberOfElementsPerRow];
2✔
232
                            row.ItemArray.CopyTo(newRow, 0);
2✔
233

234
                            //Set the aliasable element to the alias
235
                            newRow[idxTo] = next;
2✔
236

237
                            //Add it to our new rows collection
238
                            newRows.Add(newRow);
2✔
239
                        }
240

241
                        break;
242
                    default:
243
                        throw new ArgumentOutOfRangeException();
×
244
                }
245
        }
246

247
        //add any alias multiplication rows
248
        foreach (var newRow in newRows)
56✔
249
            toProcess.Rows.Add(newRow);
2✔
250

251
        //drop rows with missing identifiers
252
        foreach (var dropRow in toDrop)
56✔
253
            toProcess.Rows.Remove(dropRow);
2✔
254

255
        // drop column unless it is an inplace (no new columns) update or user wants to keep both
256
        if (!inPlace && !KeepInputColumnToo)
26✔
257
            toProcess.Columns.Remove(fromColumnName);
18✔
258

259
        return toProcess;
26✔
260
    }
261

262
    private void BuildMappingTable(IDataLoadEventListener listener)
263
    {
264
        //Get a new mapping table in memory
265
        _mappingTable = new Dictionary<object, List<object>>();
30✔
266

267
        //connect to server and run distinct query
268
        var server = MappingFromColumn.TableInfo.Discover(DataAccessContext.DataLoad).Database.Server;
30✔
269

270
        var fromColumnName = MappingFromColumn.GetRuntimeName();
30✔
271
        var toColumnName = MappingToColumn.GetRuntimeName();
30✔
272

273
        // The number of null key values found in the mapping table (these are ignored)
274
        var nulls = 0;
30✔
275

276
        //pull back all the data
277
        using (var con = server.GetConnection())
30✔
278
        {
279
            con.Open();
30✔
280
            var sql = GetMappingTableSql();
30✔
281

282
            using var cmd = server.GetCommand(sql, con);
30✔
283
            cmd.CommandTimeout = Timeout;
30✔
284

285
            using var r = cmd.ExecuteReader();
30✔
286
            while (r.Read())
140✔
287
            {
288
                var keyVal = r[fromColumnName];
110✔
289

290
                if (keyVal != DBNull.Value)
110✔
291
                {
292
                    if (_keyType == null)
108✔
293
                    {
294
                        _keyType = keyVal.GetType();
30✔
295
                    }
296
                    else
297
                    {
298
                        if (_keyType != keyVal.GetType())
78!
299
                            throw new Exception(
×
300
                                $"Database mapping table Keys were of mixed Types {_keyType} and {keyVal.GetType()}");
×
301
                    }
302
                }
303
                else
304
                {
305
                    nulls++;
2✔
306
                    continue;
2✔
307
                }
308

309
                if (!_mappingTable.ContainsKey(keyVal))
108✔
310
                    _mappingTable.Add(keyVal, new List<object>());
92✔
311

312
                _mappingTable[keyVal].Add(r[toColumnName]);
108✔
313
            }
314
        }
315

316
        if (nulls > 0)
30✔
317
            listener.OnNotify(this,
2✔
318
                new NotifyEventArgs(ProgressEventType.Warning,
2✔
319
                    $"Discarded {nulls} Null key values read from mapping table"));
2✔
320
    }
30✔
321

322
    protected virtual string GetMappingTableSql()
323
    {
324
        var repo = new MemoryCatalogueRepository();
60✔
325

326
        var qb = new QueryBuilder("DISTINCT", null, null);
60✔
327
        qb.AddColumn(new ColumnInfoToIColumn(repo, MappingFromColumn));
60✔
328
        qb.AddColumn(new ColumnInfoToIColumn(repo, MappingToColumn));
60✔
329

330
        if (!string.IsNullOrWhiteSpace(WHERELogic))
60✔
331
        {
332
            var container = new SpontaneouslyInventedFilterContainer(repo, null, null, FilterContainerOperation.AND);
12✔
333
            var filter = new SpontaneouslyInventedFilter(repo, container, WHERELogic, "WHERELogic", null, null);
12✔
334
            container.AddChild(filter);
12✔
335

336
            qb.RootFilterContainer = container;
12✔
337
        }
338

339
        return AdjustForProjectTokens(qb.SQL);
60✔
340
    }
341

342
    private string AdjustForProjectTokens(string mappingTableSql)
343
    {
344
        if (mappingTableSql.Contains("$p"))
60!
345
        {
346
            if (_project == null)
×
347
                throw new Exception("You cannot use $p in contexts where there is no Project available");
×
348

349
            mappingTableSql = mappingTableSql.Replace("$p",
×
350
                _project.Name?.ToString() ?? throw new Exception("Project didn't have a Project Name"));
×
351
        }
352

353
        if (mappingTableSql.Contains("$n"))
60✔
354
        {
355
            if (_project == null)
4!
356
                throw new Exception("You cannot use $n in contexts where there is no Project available");
×
357

358
            mappingTableSql = mappingTableSql.Replace("$n",
4!
359
                _project.ProjectNumber?.ToString() ??
4✔
360
                throw new Exception($"Project '{_project.Name}' didn't have a Project Number"));
4✔
361
        }
362

363
        if (mappingTableSql.Contains("$t"))
60!
364
        {
365
            if (_project == null)
×
366
                throw new Exception("You cannot use $t in contexts where there is no Project available");
×
367

368
            mappingTableSql = mappingTableSql.Replace("$t",
×
369
                _project.MasterTicket?.ToString() ??
×
370
                throw new Exception($"Project '{_project.Name}' didn't have a Master Ticket"));
×
371
        }
372

373
        if (mappingTableSql.Contains("$r"))
60!
374
        {
375
            if (_configuration == null)
×
376
                throw new Exception(
×
377
                    "You cannot use $r in contexts where there is no ExtractionConfiguration available");
×
378

379
            mappingTableSql = mappingTableSql.Replace("$r",
×
380
                _configuration.RequestTicket?.ToString() ??
×
381
                throw new Exception($"Extraction Configuration '{_configuration.Name}' didn't have a Request Ticket"));
×
382
        }
383

384
        if (mappingTableSql.Contains("$l"))
60!
385
        {
386
            if (_configuration == null)
×
387
                throw new Exception(
×
388
                    "You cannot use $l in contexts where there is no ExtractionConfiguration available");
×
389

390
            mappingTableSql = mappingTableSql.Replace("$l",
×
391
                _configuration.ReleaseTicket?.ToString() ??
×
392
                throw new Exception($"Extraction Configuration '{_configuration.Name}' didn't have a Release Ticket"));
×
393
        }
394

395

396
        return mappingTableSql;
60✔
397
    }
398

399
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
400
    {
401
        //free up memory
402
        if (_mappingTable != null)
×
403
        {
404
            _mappingTable.Clear();
×
405
            _mappingTable = null;
×
406
        }
407
    }
×
408

409
    public void Abort(IDataLoadEventListener listener)
410
    {
411
    }
×
412

413
    public virtual void Check(ICheckNotifier notifier)
414
    {
415
        if (!string.IsNullOrWhiteSpace(WHERELogic))
30✔
416
            if (WHERELogic.StartsWith("WHERE"))
6!
417
                throw new Exception("WHERE logic should not start with WHERE");
×
418

419
        if (MappingFromColumn == null || MappingToColumn == null)
30!
420
            throw new Exception("Mapping From/To Column missing, these are Mandatory");
×
421

422
        if (MappingFromColumn.TableInfo_ID != MappingToColumn.TableInfo_ID)
30!
423
            throw new Exception("MappingFromColumn and MappingToColumn must belong to the same table");
×
424

425
        notifier.OnCheckPerformed(new CheckEventArgs(
30✔
426
            $"Mapping table SQL is:{Environment.NewLine}{GetMappingTableSql()}", CheckResult.Success));
30✔
427
    }
30✔
428

429
    public void PreInitialize(IExtractCommand value, IDataLoadEventListener listener)
430
    {
431
        _project = value.Configuration?.Project;
2!
432
        _configuration = value.Configuration;
2✔
433
    }
2✔
434

435
    public void PreInitialize(ICohortCreationRequest value, IDataLoadEventListener listener)
436
    {
437
        _project = value.Project;
×
438
    }
×
439
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc