• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 6245535001

20 Sep 2023 07:44AM UTC coverage: 57.013%. First build
6245535001

push

github

web-flow
8.1.0 Release (#1628)

* Bump Newtonsoft.Json from 13.0.1 to 13.0.2

Bumps [Newtonsoft.Json](https://github.com/JamesNK/Newtonsoft.Json) from 13.0.1 to 13.0.2.
- [Release notes](https://github.com/JamesNK/Newtonsoft.Json/releases)
- [Commits](https://github.com/JamesNK/Newtonsoft.Json/compare/13.0.1...13.0.2)

---
updated-dependencies:
- dependency-name: Newtonsoft.Json
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump NLog from 5.0.5 to 5.1.0

Bumps [NLog](https://github.com/NLog/NLog) from 5.0.5 to 5.1.0.
- [Release notes](https://github.com/NLog/NLog/releases)
- [Changelog](https://github.com/NLog/NLog/blob/dev/CHANGELOG.md)
- [Commits](https://github.com/NLog/NLog/compare/v5.0.5...v5.1.0)

---
updated-dependencies:
- dependency-name: NLog
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump NLog from 5.0.5 to 5.1.0

* Fix -r flag - should have been --results-directory all along

* Bump Newtonsoft.Json from 13.0.1 to 13.0.2

* Bump YamlDotNet from 12.0.2 to 12.1.0

Bumps [YamlDotNet](https://github.com/aaubry/YamlDotNet) from 12.0.2 to 12.1.0.
- [Release notes](https://github.com/aaubry/YamlDotNet/releases)
- [Commits](https://github.com/aaubry/YamlDotNet/compare/v12.0.2...v12.1.0)

---
updated-dependencies:
- dependency-name: YamlDotNet
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump Moq from 4.18.2 to 4.18.3

Bumps [Moq](https://github.com/moq/moq4) from 4.18.2 to 4.18.3.
- [Release notes](https://github.com/moq/moq4/releases)
- [Changelog](https://github.com/moq/moq4/blob/main/CHANGELOG.md)
- [Commits](https://github.com/moq/moq4/compare/v4.18.2...v4.18.3)

---
updated-dependencies:
- dependency-name: Moq
... (continued)

10732 of 20257 branches covered (0.0%)

Branch coverage included in aggregate %.

48141 of 48141 new or added lines in 1086 files covered. (100.0%)

30685 of 52388 relevant lines covered (58.57%)

7387.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.72
/Rdmp.Core/DataExport/DataExtraction/Pipeline/Sources/ExecuteCrossServerDatasetExtractionSource.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data;
10
using System.Threading;
11
using FAnsi.Discovery;
12
using FAnsi.Discovery.QuerySyntax;
13
using Rdmp.Core.Curation.Data;
14
using Rdmp.Core.DataFlowPipeline;
15
using Rdmp.Core.DataLoad.Engine.Pipeline.Destinations;
16
using Rdmp.Core.ReusableLibraryCode.Checks;
17
using Rdmp.Core.ReusableLibraryCode.Progress;
18

19
namespace Rdmp.Core.DataExport.DataExtraction.Pipeline.Sources;
20

21
/// <summary>
22
/// Data Extraction Source which can fulfill the IExtractCommand even when the dataset in the command is on a different server from the cohort.  This is done
23
/// by copying the Cohort from the cohort database into tempdb for the duration of the pipeline execution and doing the linkage against that instead of
24
/// the original cohort table.
25
/// 
26
/// </summary>
27
public class ExecuteCrossServerDatasetExtractionSource : ExecuteDatasetExtractionSource
28
{
29
    private bool _haveCopiedCohortAndAdjustedSql;
30

31
    [DemandsInitialization("Database to upload the cohort to prior to linking", defaultValue: "tempdb",
32
        mandatory: true)]
33
    public string TemporaryDatabaseName { get; set; }
14✔
34

35
    [DemandsInitialization(
36
        "Determines behaviour if TemporaryDatabaseName is not found on the dataset server.  True to create it as a new database, False to crash",
37
        defaultValue: true)]
38
    public bool CreateTemporaryDatabaseIfNotExists { get; set; }
6✔
39

40
    [DemandsInitialization(
41
        "Determines behaviour if TemporaryDatabaseName already contains a Cohort table.  True to drop it, False to crash",
42
        defaultValue: true)]
43
    public bool DropExistingCohortTableIfExists { get; set; }
6✔
44

45
    [DemandsInitialization(
46
        "Naming pattern for the temporary cohort database table created on the data server(s) for extraction. Use $g for a guid.",
47
        defaultValue: "$g")]
48
    public string TemporaryTableName { get; set; }
8✔
49

50
    public override DataTable GetChunk(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
51
    {
52
        SetServer();
12✔
53

54
        if (!_haveCopiedCohortAndAdjustedSql && Request != null && _doNotMigrate == false)
12!
55
            CopyCohortToDataServer(listener, cancellationToken);
×
56

57
        return base.GetChunk(listener, cancellationToken);
12✔
58
    }
59

60
    private List<DiscoveredTable> tablesToCleanup = new();
8✔
61

62
    public static Semaphore OneCrossServerExtractionAtATime = new(1, 1);
×
63
    private DiscoveredServer _server;
64
    private DiscoveredDatabase _tempDb;
65
    private bool _semaphoreObtained;
66

67
    /// <summary>
68
    /// True if we decided not to move the cohort after all (e.g. if one or more datasets being extracted are already on the same server).
69
    /// </summary>
70
    private bool _doNotMigrate;
71

72
    private string _tablename;
73
    private object _tableName = new();
8✔
74

75
    public override string HackExtractionSQL(string sql, IDataLoadEventListener listener)
76
    {
77
        SetServer();
6✔
78

79
        //call base hacks
80
        sql = base.HackExtractionSQL(sql, listener);
6✔
81

82
        if (_doNotMigrate)
6✔
83
        {
84
            listener.OnNotify(this,
4✔
85
                new NotifyEventArgs(ProgressEventType.Information,
4✔
86
                    "Cohort and Data are on same server so no migration will occur"));
4✔
87
            return sql;
4✔
88
        }
89

90
        listener.OnNotify(this,
2✔
91
            new NotifyEventArgs(ProgressEventType.Information, $"Original (unhacked) SQL was {sql}", null));
2✔
92

93
        //now replace database with tempdb
94
        var extractableCohort = Request.ExtractableCohort;
2✔
95
        var extractableCohortSource = extractableCohort.ExternalCohortTable;
2✔
96

97
        var syntaxHelperFactory = new QuerySyntaxHelperFactory();
2✔
98
        var sourceSyntax = syntaxHelperFactory.Create(extractableCohortSource.DatabaseType);
2✔
99
        var destinationSyntax = syntaxHelperFactory.Create(_server.DatabaseType);
2✔
100

101
        //To replace (in this order)
102
        //Cohort database.table.privateId
103
        //Cohort database.table.releaseId
104
        //Cohort database.table.cohortdefinitionId
105
        //Cohort database.table name
106
        var replacementStrings = new Dictionary<string, string>();
2✔
107

108
        var sourceDb = sourceSyntax.GetRuntimeName(extractableCohortSource.Database);
2✔
109
        var sourceTable = sourceSyntax.GetRuntimeName(extractableCohortSource.TableName);
2✔
110
        var destinationTable = GetTableName() ?? sourceTable;
2✔
111
        var sourcePrivateId = sourceSyntax.GetRuntimeName(extractableCohort.GetPrivateIdentifier());
2✔
112
        var sourceReleaseId = sourceSyntax.GetRuntimeName(extractableCohort.GetReleaseIdentifier());
2✔
113
        var sourceCohortDefinitionId =
2✔
114
            sourceSyntax.GetRuntimeName(extractableCohortSource.DefinitionTableForeignKeyField);
2✔
115

116
        //Swaps the given entity for the same entity but in _tempDb
117
        AddReplacement(replacementStrings, sourceDb, sourceTable, destinationTable, sourcePrivateId, sourceSyntax,
2✔
118
            destinationSyntax);
2✔
119

120
        // If it is not an identifiable extraction (private and release are different)
121
        if (!string.Equals(sourcePrivateId, sourceReleaseId))
2✔
122
            AddReplacement(replacementStrings, sourceDb, sourceTable, destinationTable, sourceReleaseId, sourceSyntax,
2✔
123
                destinationSyntax);
2✔
124

125
        AddReplacement(replacementStrings, sourceDb, sourceTable, destinationTable, sourceCohortDefinitionId,
2✔
126
            sourceSyntax, destinationSyntax);
2✔
127
        AddReplacement(replacementStrings, sourceDb, sourceTable, destinationTable, sourceSyntax, destinationSyntax);
2✔
128

129
        foreach (var r in replacementStrings)
20✔
130
        {
131
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
8✔
132
                $"Replacing '{r.Key}' with '{r.Value}'", null));
8✔
133

134
            if (!sql.Contains(r.Key))
8!
135
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
136
                    $"SQL extraction query string did not contain the text '{r.Key}' (which we expected to replace with '{r.Value}"));
×
137

138
            sql = sql.Replace(r.Key, r.Value);
8✔
139
        }
140

141
        listener.OnNotify(this,
2✔
142
            new NotifyEventArgs(ProgressEventType.Information, $"Adjusted (hacked) SQL was {sql}", null));
2✔
143

144
        //replace [MyCohortDatabase].. with [tempdb].. (while dealing with Cohort..Cohort replacement correctly as well as 'Cohort.dbo.Cohort.Fish' correctly)
145
        return sql;
2✔
146
    }
147

148
    private string GetTableName()
149
    {
150
        lock (_tableName)
2✔
151
        {
152
            if (_tablename != null)
2!
153
                return _tablename;
×
154

155
            if (string.IsNullOrWhiteSpace(TemporaryTableName))
2!
156
                return null;
2✔
157

158
            // add a g to avoid creating a table name that starts with a number (can cause problems and always requires wrapping etc... just bad)
159
            var guid = $"g{Guid.NewGuid():N}";
×
160

161
            return _tablename = TemporaryTableName.Replace("$g", guid);
×
162
        }
163
    }
2✔
164

165
    private void AddReplacement(Dictionary<string, string> replacementStrings, string sourceDb, string sourceTable,
166
        string destinationTable, string col, IQuerySyntaxHelper sourceSyntax, IQuerySyntaxHelper destinationSyntax)
167
    {
168
        replacementStrings.Add(
6✔
169
            sourceSyntax.EnsureFullyQualified(sourceDb, null, sourceTable, col),
6✔
170
            destinationSyntax.EnsureFullyQualified(_tempDb.GetRuntimeName(), null, destinationTable, col)
6✔
171
        );
6✔
172
    }
6✔
173

174
    private void AddReplacement(Dictionary<string, string> replacementStrings, string sourceDb, string sourceTable,
175
        string destinationTable, IQuerySyntaxHelper sourceSyntax, IQuerySyntaxHelper destinationSyntax)
176
    {
177
        replacementStrings.Add(
2✔
178
            sourceSyntax.EnsureFullyQualified(sourceDb, null, sourceTable),
2✔
179
            destinationSyntax.EnsureFullyQualified(_tempDb.GetRuntimeName(), null, destinationTable)
2✔
180
        );
2✔
181
    }
2✔
182

183
    private void SetServer()
184
    {
185
        if (_server == null && Request != null)
18✔
186
        {
187
            //it's a legit dataset being extracted?
188
            _server = Request.GetDistinctLiveDatabaseServer();
6✔
189

190
            //expect a database called called tempdb
191
            _tempDb = _server.ExpectDatabase(TemporaryDatabaseName);
6✔
192

193
            var cohortServer = Request.ExtractableCohort.ExternalCohortTable.Discover();
6✔
194
            if (AreOnSameServer(_server, cohortServer.Server))
6✔
195
                _doNotMigrate = true;
4✔
196
        }
197
    }
18✔
198

199
    /// <summary>
200
    /// Returns true if the two databases are on the same server (do not have to be on the same database).  Also confirms that the access
201
    /// credentials are compatible.
202
    /// </summary>
203
    /// <param name="a"></param>
204
    /// <param name="b"></param>
205
    /// <returns></returns>
206
    protected static bool AreOnSameServer(DiscoveredServer a, DiscoveredServer b) =>
207
        string.Equals(a.Name, b.Name, StringComparison.CurrentCultureIgnoreCase) &&
6✔
208
        a.DatabaseType == b.DatabaseType &&
6✔
209
        a.ExplicitUsernameIfAny == b.ExplicitUsernameIfAny &&
6✔
210
        a.ExplicitPasswordIfAny == b.ExplicitPasswordIfAny;
6✔
211

212

213
    private void CopyCohortToDataServer(IDataLoadEventListener listener, GracefulCancellationToken cancellationToken)
214
    {
215
        DataTable cohortDataTable;
216
        SetServer();
×
217

218
        listener.OnNotify(this,
×
219
            new NotifyEventArgs(ProgressEventType.Information,
×
220
                "About to wait for Semaphore OneCrossServerExtractionAtATime to become available"));
×
221
        OneCrossServerExtractionAtATime.WaitOne(-1);
×
222
        _semaphoreObtained = true;
×
223
        listener.OnNotify(this,
×
224
            new NotifyEventArgs(ProgressEventType.Information, "Captured Semaphore OneCrossServerExtractionAtATime"));
×
225

226
        try
227
        {
228
            var cohort = Request.ExtractableCohort;
×
229
            cohortDataTable = cohort.FetchEntireCohort();
×
230
        }
×
231
        catch (Exception e)
×
232
        {
233
            throw new Exception(
×
234
                "An error occurred while trying to download the cohort from the Cohort server (in preparation for transferring it to the data server for linkage and extraction)",
×
235
                e);
×
236
        }
237

238
        //make sure tempdb exists (this covers you for servers where it doesn't exist e.g. mysql or when user has specified a different database name)
239
        if (!_tempDb.Exists())
×
240
            if (CreateTemporaryDatabaseIfNotExists)
×
241
                _tempDb.Create();
×
242
            else
243
                throw new Exception(
×
244
                    $"Database '{_tempDb}' did not exist on server '{_server}' and CreateAndDestroyTemporaryDatabaseIfNotExists was false");
×
245

246

247
        var tbl = _tempDb.ExpectTable(GetTableName() ?? cohortDataTable.TableName);
×
248

249
        if (tbl.Exists())
×
250
        {
251
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
252
                $"Found existing table called '{tbl}' in '{_tempDb}'"));
×
253

254
            if (DropExistingCohortTableIfExists)
×
255
            {
256
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
257
                    $"About to drop existing table '{tbl}'"));
×
258

259
                try
260
                {
261
                    tbl.Drop();
×
262
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
263
                        $"Dropped existing table '{tbl}'"));
×
264
                }
×
265
                catch (Exception ex)
×
266
                {
267
                    listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
268
                        $"Warning dropping '{tbl}' failed", ex));
×
269
                }
×
270
            }
271
            else
272
            {
273
                listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Warning,
×
274
                    $"'{_tempDb}' contains a table called '{tbl}' and DropExistingCohortTableIfExists is false"));
×
275
            }
276
        }
277

278
        // ensures the uploaded table has the correct name
279
        cohortDataTable.TableName = tbl.GetRuntimeName();
×
280

281
        try
282
        {
283
            // attempt to set primary key of the private identifier to improve
284
            // query performance. e.g. chi
285
            cohortDataTable.PrimaryKey = new[]
×
286
                { cohortDataTable.Columns[Request.ExtractableCohort.GetPrivateIdentifier(true)] };
×
287
        }
×
288
        catch (Exception ex)
×
289
        {
290
            listener.OnNotify(this,
×
291
                new NotifyEventArgs(ProgressEventType.Warning,
×
292
                    "Failed to set primary key on cross server copied cohort.  Query performance may be slow", ex));
×
293
        }
×
294

295
        var destination = new DataTableUploadDestination();
×
296
        destination.PreInitialize(_tempDb, listener);
×
297
        destination.ProcessPipelineData(cohortDataTable, listener, cancellationToken);
×
298
        destination.Dispose(listener, null);
×
299

300

301
        if (!tbl.Exists())
×
302
            throw new Exception(
×
303
                $"Table '{tbl}' did not exist despite DataTableUploadDestination completing Successfully!");
×
304

305
        tablesToCleanup.Add(tbl);
×
306

307
        //table will now be in tempdb
308
        _haveCopiedCohortAndAdjustedSql = true;
×
309
    }
×
310

311

312
    public override void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
313
    {
314
        listener.OnNotify(this,
6✔
315
            new NotifyEventArgs(ProgressEventType.Information,
6✔
316
                "About to release Semaphore OneCrossServerExtractionAtATime"));
6✔
317
        if (_semaphoreObtained)
6!
318
            OneCrossServerExtractionAtATime.Release(1);
×
319
        listener.OnNotify(this,
6✔
320
            new NotifyEventArgs(ProgressEventType.Information, "Released Semaphore OneCrossServerExtractionAtATime"));
6✔
321

322
        foreach (var table in tablesToCleanup)
12!
323
        {
324
            listener.OnNotify(this,
×
325
                new NotifyEventArgs(ProgressEventType.Information, $"About to drop table '{table}'"));
×
326
            table.Drop();
×
327
            listener.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information, $"Dropped table '{table}'"));
×
328
        }
329

330
        base.Dispose(listener, pipelineFailureExceptionIfAny);
6✔
331
    }
6✔
332

333
    public override void Check(ICheckNotifier notifier)
334
    {
335
    }
×
336

337
    public override DataTable TryGetPreview() => throw new NotSupportedException(
×
338
        "Previews are not supported for Cross Server extraction since it involves shipping off the cohort into tempdb.");
×
339
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc