• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 11230100662

08 Oct 2024 06:49AM UTC coverage: 57.35% (-0.04%) from 57.39%
11230100662

Pull #2019

github

JFriel
rename
Pull Request #2019: Task/rdmp 254 ordering insert

11210 of 21052 branches covered (53.25%)

Branch coverage included in aggregate %.

3 of 66 new or added lines in 5 files covered. (4.55%)

4 existing lines in 2 files now uncovered.

31727 of 53816 relevant lines covered (58.95%)

8216.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.45
/Rdmp.Core/DataLoad/Engine/Migration/QueryBuilding/OverwriteMigrationStrategy.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Linq;
11
using System.Text;
12
using FAnsi;
13
using FAnsi.Connections;
14
using FAnsi.Discovery;
15
using FAnsi.Discovery.QuerySyntax;
16
using MongoDB.Driver;
17
using NPOI.SS.Formula.Functions;
18
using Rdmp.Core.DataFlowPipeline;
19
using Rdmp.Core.DataLoad.Engine.Job;
20
using Rdmp.Core.DataLoad.Triggers;
21
using Rdmp.Core.ReusableLibraryCode.Progress;
22

23
namespace Rdmp.Core.DataLoad.Engine.Migration.QueryBuilding;
24

25
/// <summary>
26
/// Migrates from STAGING to LIVE a single table (with a MigrationColumnSet).  This is an UPSERT (new replaces old) operation achieved (in SQL) with MERGE and
27
/// UPDATE (based on primary key).  Both tables must be on the same server.  A MERGE sql statement will be created using LiveMigrationQueryHelper and executed
28
/// within a transaction.
29
/// </summary>
30
public class OverwriteMigrationStrategy : DatabaseMigrationStrategy
31
{
32
    public OverwriteMigrationStrategy(IManagedConnection managedConnection)
33
        : base(managedConnection)
48✔
34
    {
35
    }
48✔
36

37
    public override void MigrateTable(IDataLoadJob job, MigrationColumnSet columnsToMigrate, int dataLoadInfoID,
38
        GracefulCancellationToken cancellationToken, ref int inserts, ref int updates)
39
    {
40
        var server = columnsToMigrate.DestinationTable.Database.Server;
50✔
41

42
        //see CrossDatabaseMergeCommandTest
43

44
        /*          ------------MIGRATE NEW RECORDS (novel by primary key)--------
45
         *
46

47
INSERT INTO CrossDatabaseMergeCommandTo..ToTable (Name,Age,Postcode,hic_dataLoadRunID)
48
SELECT
49
[CrossDatabaseMergeCommandFrom]..CrossDatabaseMergeCommandTo_ToTable_STAGING.Name,
50
[CrossDatabaseMergeCommandFrom]..CrossDatabaseMergeCommandTo_ToTable_STAGING.Age,
51
[CrossDatabaseMergeCommandFrom]..CrossDatabaseMergeCommandTo_ToTable_STAGING.Postcode,
52
1
53
FROM
54
[CrossDatabaseMergeCommandFrom]..CrossDatabaseMergeCommandTo_ToTable_STAGING
55
left join
56
CrossDatabaseMergeCommandTo..ToTable
57
on
58
[CrossDatabaseMergeCommandFrom]..CrossDatabaseMergeCommandTo_ToTable_STAGING.Age = CrossDatabaseMergeCommandTo..ToTable.Age
59
AND
60
[CrossDatabaseMergeCommandFrom]..CrossDatabaseMergeCommandTo_ToTable_STAGING.Name = CrossDatabaseMergeCommandTo..ToTable.Name
61
WHERE
62
CrossDatabaseMergeCommandTo..ToTable.Age is null
63
*/
64

65
        var sbInsert = new StringBuilder();
50✔
66
        var syntax = server.GetQuerySyntaxHelper();
50✔
67

68

69
        sbInsert.AppendLine(
50✔
70
            $"INSERT INTO {columnsToMigrate.DestinationTable.GetFullyQualifiedName()} ({string.Join(",", columnsToMigrate.FieldsToUpdate.Select(c => syntax.EnsureWrapped(c.GetRuntimeName())))}");
352✔
71

72
        //if we are not ignoring the trigger then we should record the data load run ID
73
        if (!job.LoadMetadata.IgnoreTrigger)
50✔
74
            sbInsert.AppendLine($",{syntax.EnsureWrapped(SpecialFieldNames.DataLoadRunID)}");
44✔
75

76
        sbInsert.AppendLine(")");
50✔
77

78
        sbInsert.AppendLine("SELECT");
50✔
79

80
        // Add the columns we are migrating
81
        sbInsert.AppendLine(string.Join($",{Environment.NewLine}",
50✔
82
            columnsToMigrate.FieldsToUpdate.Select(c => c.GetFullyQualifiedName())));
352✔
83

84
        // If we are using trigger also add the run ID e.g. ",50"
85
        if (!job.LoadMetadata.IgnoreTrigger)
50✔
86
            sbInsert.AppendLine($",{dataLoadInfoID}");
44✔
87

88
        sbInsert.AppendLine("FROM");
50✔
89
        sbInsert.AppendLine(columnsToMigrate.SourceTable.GetFullyQualifiedName());
50✔
90
        sbInsert.AppendLine("LEFT JOIN");
50✔
91
        sbInsert.AppendLine(columnsToMigrate.DestinationTable.GetFullyQualifiedName());
50✔
92
        sbInsert.AppendLine("ON");
50✔
93

94
        sbInsert.AppendLine(
50✔
95
            string.Join($" AND {Environment.NewLine}",
50✔
96
                columnsToMigrate.PrimaryKeys.Select(
50✔
97
                    pk =>
50✔
98
                        string.Format("{0}.{1}={2}.{1}", columnsToMigrate.SourceTable.GetFullyQualifiedName(),
142✔
99
                            syntax.EnsureWrapped(pk.GetRuntimeName()),
142✔
100
                            columnsToMigrate.DestinationTable.GetFullyQualifiedName()))));
142✔
101

102
        sbInsert.AppendLine("WHERE");
50✔
103
        sbInsert.AppendLine(
50✔
104
            $"{columnsToMigrate.DestinationTable.GetFullyQualifiedName()}.{syntax.EnsureWrapped(columnsToMigrate.PrimaryKeys.First().GetRuntimeName())} IS NULL");
50✔
105

106
        if (job.LoadMetadata.OrderInsertsByPrimaryKey && columnsToMigrate.PrimaryKeys.Any())
50!
107
        {
NEW
108
            var orderSQL = $@"
×
NEW
109
SELECT KU.ORDINAL_POSITION AS ORDINAL_POSITION
×
NEW
110
    , COLUMN_NAME, TC.CONSTRAINT_NAME as name, is_descending_key
×
NEW
111
FROM 
×
NEW
112
    INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
×
NEW
113
INNER JOIN 
×
NEW
114
    INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU ON TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
×
NEW
115
                                            AND TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
×
NEW
116
                                                                                        AND KU.TABLE_NAME = '{columnsToMigrate.DestinationTable.GetRuntimeName()}'
×
NEW
117
LEFT JOIN(select COL_NAME(ic.object_id,ic.column_id) as cn, i.name, is_descending_key FROM sys.indexes i
×
NEW
118
INNER JOIN sys.data_spaces ds ON i.data_space_id = ds.data_space_id 
×
NEW
119
INNER JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
×
NEW
120
where is_primary_key=1) SY  ON SY.cn = KU.COLUMN_NAME AND SY.name = KU.CONSTRAINT_NAME
×
NEW
121
ORDER BY ORDINAL_POSITION asc
×
NEW
122
        ";
×
NEW
123
            var dt = new DataTable();
×
NEW
124
            dt.BeginLoadData();
×
125

NEW
126
            using (var orderCmd = server.GetCommand(orderSQL, _managedConnection))
×
127
            {
NEW
128
                orderCmd.CommandTimeout = Timeout;
×
NEW
129
                using var da = server.GetDataAdapter(orderCmd);
×
NEW
130
                da.Fill(dt);
×
131
            }
NEW
132
            var orderList = String.Join(", ", dt.AsEnumerable().Select(row => $"{row[1]} {((bool)row[3]? "DESC":"ASC")}"));
×
NEW
133
            var orderString = $"ORDER BY {orderList}";
×
NEW
134
            sbInsert.Append(orderString);
×
135
        }
136

137
        //right at the end of the SELECT
138
        if (columnsToMigrate.DestinationTable.Database.Server.DatabaseType == DatabaseType.MySql)
50✔
139
            sbInsert.Append(" FOR UPDATE");
18✔
140

141
        var insertSql = sbInsert.ToString();
50✔
142

143
        var cmd = server.GetCommand(insertSql, _managedConnection);
50✔
144
        cmd.CommandTimeout = Timeout;
50✔
145

146
        job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
50✔
147
            $"INSERT query: {Environment.NewLine}{insertSql}"));
50✔
148

149
        cancellationToken.ThrowIfCancellationRequested();
50✔
150

151

152
        try
153
        {
154
            inserts = cmd.ExecuteNonQuery();
50✔
155

156
            var sqlLines = new List<CustomLine>();
50✔
157

158
            var toSet = columnsToMigrate.FieldsToUpdate.Where(c => !c.IsPrimaryKey).Select(c =>
352✔
159
                string.Format("t1.{0} = t2.{0}", syntax.EnsureWrapped(c.GetRuntimeName()))).ToArray();
260✔
160

161
            if (!toSet.Any())
50✔
162
            {
163
                job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
8✔
164
                    $"Table {columnsToMigrate.DestinationTable} is entirely composed of PrimaryKey columns or hic_ columns so UPDATE will NOT take place"));
8✔
165
                return;
8✔
166
            }
167

168
            var toDiff = columnsToMigrate.FieldsToDiff.Where(c => !c.IsPrimaryKey).ToArray();
326✔
169

170
            if (!toDiff.Any())
42✔
171
            {
172
                job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
2✔
173
                    $"Table {columnsToMigrate.DestinationTable} is entirely composed of PrimaryKey columns or hic_ columns/ other non DIFF columns that will not result in an UPDATE will NOT take place"));
2✔
174
                return;
2✔
175
            }
176

177
            //t1.Name = t2.Name, t1.Age=T2.Age etc
178
            sqlLines.Add(new CustomLine(string.Join(",", toSet), QueryComponent.SET));
40✔
179

180
            //also update the hic_dataLoadRunID field
181
            if (!job.LoadMetadata.IgnoreTrigger)
40✔
182
                sqlLines.Add(new CustomLine(
34✔
183
                    $"t1.{syntax.EnsureWrapped(SpecialFieldNames.DataLoadRunID)}={dataLoadInfoID}",
34✔
184
                    QueryComponent.SET));
34✔
185

186
            //t1.Name <> t2.Name AND t1.Age <> t2.Age etc
187
            sqlLines.Add(new CustomLine(string.Join(" OR ", toDiff.Select(c => GetORLine(c, syntax))),
248✔
188
                QueryComponent.WHERE));
40✔
189

190
            //the join
191
            sqlLines.AddRange(columnsToMigrate.PrimaryKeys.Select(p =>
40✔
192
                new CustomLine(string.Format("t1.{0} = t2.{0}", syntax.EnsureWrapped(p.GetRuntimeName())),
112✔
193
                    QueryComponent.JoinInfoJoin)));
112✔
194

195
            var updateHelper = columnsToMigrate.DestinationTable.Database.Server.GetQuerySyntaxHelper().UpdateHelper;
40✔
196

197
            var updateQuery = updateHelper.BuildUpdate(
40✔
198
                columnsToMigrate.DestinationTable,
40✔
199
                columnsToMigrate.SourceTable,
40✔
200
                sqlLines);
40✔
201

202
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
40✔
203
                $"Update query:{Environment.NewLine}{updateQuery}"));
40✔
204

205
            var updateCmd = server.GetCommand(updateQuery, _managedConnection);
40✔
206
            updateCmd.CommandTimeout = Timeout;
40✔
207
            cancellationToken.ThrowIfCancellationRequested();
40✔
208

209
            try
210
            {
211
                updates = updateCmd.ExecuteNonQuery();
40✔
212
            }
40✔
213
            catch (Exception e)
×
214
            {
215
                job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error,
×
216
                    $"Did not successfully perform the update queries: {updateQuery}", e));
×
217
                throw new Exception($"Did not successfully perform the update queries: {updateQuery} - {e}");
×
218
            }
219
        }
40✔
220
        catch (OperationCanceledException)
×
221
        {
222
            throw; // have to catch and rethrow this because of the catch-all below
×
223
        }
224
        catch (Exception e)
×
225
        {
226
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error,
×
227
                $"Failed to migrate {columnsToMigrate.SourceTable} to {columnsToMigrate.DestinationTable}", e));
×
228
            throw new Exception(
×
229
                $"Failed to migrate {columnsToMigrate.SourceTable} to {columnsToMigrate.DestinationTable}: {e}");
×
230
        }
231
    }
50✔
232

233
    private static string GetORLine(DiscoveredColumn c, IQuerySyntaxHelper syntax) => string.Format(
208✔
234
        "(t1.{0} <> t2.{0} OR (t1.{0} is null AND t2.{0} is not null) OR (t2.{0} is null AND t1.{0} is not null))",
208✔
235
        syntax.EnsureWrapped(c.GetRuntimeName()));
208✔
236
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc