• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 6237307473

19 Sep 2023 04:02PM UTC coverage: 57.015% (-0.4%) from 57.44%
6237307473

push

github

web-flow
Feature/rc4 (#1570)

* Syntax tidying
* Dependency updates
* Event handling singletons (ThrowImmediately and co)

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: James A Sutherland <>
Co-authored-by: James Friel <jfriel001@dundee.ac.uk>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

10734 of 20259 branches covered (0.0%)

Branch coverage included in aggregate %.

5922 of 5922 new or added lines in 565 files covered. (100.0%)

30687 of 52390 relevant lines covered (58.57%)

7361.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.92
/Rdmp.Core/DataLoad/Engine/LoadExecution/Components/Standard/MigrateRAWTableToStaging.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Data;
9
using System.Linq;
10
using FAnsi.Discovery;
11
using Rdmp.Core.Curation.Data;
12
using Rdmp.Core.Curation.Data.DataLoad;
13
using Rdmp.Core.DataFlowPipeline;
14
using Rdmp.Core.DataFlowPipeline.Requirements;
15
using Rdmp.Core.DataLoad.Engine.DatabaseManagement.EntityNaming;
16
using Rdmp.Core.DataLoad.Engine.Job;
17
using Rdmp.Core.DataLoad.Engine.Pipeline.Components;
18
using Rdmp.Core.DataLoad.Engine.Pipeline.Components.Anonymisation;
19
using Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
20
using Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
21
using Rdmp.Core.Logging;
22
using Rdmp.Core.ReusableLibraryCode.Progress;
23

24
namespace Rdmp.Core.DataLoad.Engine.LoadExecution.Components.Standard;
25

26
/// <summary>
27
/// Streams records from a single table in the RAW database and writes it to the corresponding table in the STAGING database during data load.  RAW is an
28
/// unconstrained identifiable version of the LIVE table created at the start of an RMDP data load (the RAW=>STAGING=>LIVE model).  STAGING is a constrained
29
/// (has primary keys / not nulls etc) version of the LIVE table.  This class uses a DataFlowPipelineEngine to stream the records and this includes (optionally)
30
/// any anonymisation operations (dropping columns, substituting identifiers etc) configured on the TableInfo (See BasicAnonymisationEngine).
31
/// </summary>
32
public class MigrateRAWTableToStaging : DataLoadComponent
33
{
34
    private readonly ITableInfo _tableInfo;
35
    private readonly bool _isLookupTable;
36
    private readonly HICDatabaseConfiguration _databaseConfiguration;
37

38
    public MigrateRAWTableToStaging(ITableInfo tableInfo, bool isLookupTable,
46✔
39
        HICDatabaseConfiguration databaseConfiguration)
46✔
40
    {
41
        _tableInfo = tableInfo;
46✔
42
        _isLookupTable = isLookupTable;
46✔
43
        _databaseConfiguration = databaseConfiguration;
46✔
44
    }
46✔
45

46
    private DataFlowPipelineEngine<DataTable> _pipeline;
47

48
    public override ExitCodeType Run(IDataLoadJob job, GracefulCancellationToken cancellationToken)
49
    {
50
        if (_pipeline != null)
46!
51
            throw new Exception("Pipeline already executed once");
×
52

53
        var contextFactory = new DataFlowPipelineContextFactory<DataTable>();
46✔
54
        var context = contextFactory.Create(PipelineUsage.LoadsSingleTableInfo | PipelineUsage.FixedDestination |
46✔
55
                                            PipelineUsage.LogsToTableLoadInfo);
46✔
56

57
        //where we are coming from (source)
58
        var sourceConvention = LoadBubble.Raw;
46✔
59
        var sourceDatabase = _databaseConfiguration.DeployInfo[sourceConvention];
46✔
60
        var sourceTableName = _tableInfo.GetRuntimeName(sourceConvention, _databaseConfiguration.DatabaseNamer);
46✔
61

62
        //What to do if where we are coming from does not have the table existing on it
63
        if (!sourceDatabase.ExpectTable(sourceTableName).Exists())
46!
64
            if (_isLookupTable)
×
65
            {
66
                job.OnNotify(this,
×
67
                    new NotifyEventArgs(ProgressEventType.Warning,
×
68
                        $"Lookup table {sourceTableName} did not exist on RAW so was not migrated to STAGING"));
×
69
                return ExitCodeType.Success;
×
70
            }
71
            else
72
            {
73
                job.OnNotify(this,
×
74
                    new NotifyEventArgs(ProgressEventType.Error,
×
75
                        $"Table {sourceTableName} did not exist in RAW database {sourceDatabase} when it came time to migrate RAW to STAGING (and the table is not a lookup)"));
×
76
            }
77

78

79
        // where we are going to (destination)
80
        // ignore any columns that are marked for discard
81
        var destinationConvention = LoadBubble.Staging;
46✔
82
        var destinationDatabase = _databaseConfiguration.DeployInfo[LoadBubble.Staging];
46✔
83
        var destinationTableName =
46✔
84
            _tableInfo.GetRuntimeName(destinationConvention, _databaseConfiguration.DatabaseNamer);
46✔
85

86
        DeleteFullyNullRecords(sourceTableName, sourceDatabase, job);
46✔
87

88
        //audit
89
        var tableLoadInfo = job.DataLoadInfo.CreateTableLoadInfo(
46✔
90
            "None required, if fails then simply drop Staging database and reload dataset",
46✔
91
            $"STAGING:{destinationTableName}",
46✔
92
            new DataSource[] { new($"RAW:{sourceTableName}", DateTime.Now) }, -1);
46✔
93

94
        var syntax = sourceDatabase.Server.GetQuerySyntaxHelper();
46✔
95

96
        //connect to source and open a reader! note that GetReaderForRAW will at this point preserve the state of the database such that any commands e.g. deletes will not have any effect even though ExecutePipeline has not been called!
97
        var source = new DbDataCommandDataFlowSource(
46✔
98
            $"Select distinct * from {syntax.EnsureWrapped(sourceTableName)}",
46✔
99
            $"Fetch data from {syntax.EnsureWrapped(sourceTableName)}",
46✔
100
            sourceDatabase.Server.Builder, 50000);
46✔
101

102
        //ignore those that are pre load discarded columns (unless they are dilution in which case they get passed through in a decrepid state instead of dumped entirely - these fields will still bein ANODump in pristene state btw)
103
        var columnNamesToIgnoreForBulkInsert = _tableInfo.PreLoadDiscardedColumns
46✔
104
            .Where(c => c.Destination != DiscardedColumnDestination.Dilute).Select(column => column.RuntimeColumnName)
×
105
            .ToList();
46✔
106

107
        //pass pre load discard
108
        var destination = new SqlBulkInsertDestination(destinationDatabase, destinationTableName,
46✔
109
            columnNamesToIgnoreForBulkInsert);
46✔
110

111
        //engine that will move data
112
        _pipeline = new DataFlowPipelineEngine<DataTable>(context, source, destination, job);
46✔
113

114
        //add clean strings component
115
        _pipeline.ComponentObjects.Add(new CleanStrings());
46✔
116

117
        //add dropping of preload discard columns
118
        _pipeline.ComponentObjects.Add(new BasicAnonymisationEngine());
46✔
119

120
        _pipeline.Initialize(tableLoadInfo, _tableInfo);
46✔
121

122
        //tell it to move data
123
        _pipeline.ExecutePipeline(cancellationToken);
46✔
124

125
        return ExitCodeType.Success;
46✔
126
    }
127

128

129
    private void DeleteFullyNullRecords(string sourceTableName, DiscoveredDatabase dbInfo, IDataLoadJob job)
130
    {
131
        try
132
        {
133
            var cols = dbInfo.ExpectTable(sourceTableName).DiscoverColumns();
46✔
134

135
            using var con = dbInfo.Server.GetConnection();
46✔
136
            con.Open();
46✔
137
            using var cmd = dbInfo.Server.GetCommand(
46✔
138
                //Magical code that nukes blank/null rows - where all rows are blank/null
46✔
139
                $@"delete from {sourceTableName} WHERE {string.Join(" AND ",
46✔
140
                    cols.Select(c => $"({c} IS NULL OR {c}='')"))}", con);
336✔
141
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
46✔
142
                $"About to delete fully null records using SQL:{cmd.CommandText}"));
46✔
143

144
            cmd.CommandTimeout = 500000;
46✔
145

146
            var affectedRows = cmd.ExecuteNonQuery();
46✔
147

148
            if (affectedRows != 0)
26!
149
                job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
150
                    $"Deleted {affectedRows} fully blank/null rows from RAW database"));
×
151
        }
26✔
152
        catch (Exception e)
20✔
153
        {
154
            job.OnNotify(this,
20✔
155
                new NotifyEventArgs(ProgressEventType.Warning,
20✔
156
                    "Could not delete fully null records, this will not prevent the data load occurring", e));
20✔
157
        }
20✔
158
    }
46✔
159

160

161
    public override void LoadCompletedSoDispose(ExitCodeType exitCode, IDataLoadEventListener postLoadEventListener)
162
    {
163
    }
×
164

165
    /*
166

167
    private DataTable CreateAnonymisedDataTable(DataTable rawTable, TableInfo tableInfo, ReflectionFactory<IAnonymisationEngine> anonymisationEngineFactory,IDataLoadJob job)
168
    {
169
        DataTable stagingTable = DatabaseOperations.CreateDataTableFromDbOnServer(_databaseConfiguration.GetInfoForStage(HICTableNamingConvention.Staging), tableInfo.GetRuntimeName());
170
        for (var i = 0; i < rawTable.Rows.Count; ++i)
171
            stagingTable.Rows.Add(stagingTable.NewRow());
172

173
        var anoEngine = anonymisationEngineFactory.Create(_loadMetadata.AnonymisationEngineClass);
174
        anoEngine.Initialize(tableInfo, job, null);
175
        stagingTable = anoEngine.ProcessPipelineData(rawTable, job);
176
        return stagingTable;
177
    }*/
178
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc