• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 13235788803

10 Feb 2025 07:36AM UTC coverage: 57.402% (+0.01%) from 57.389%
13235788803

push

github

web-flow
Task/rdmp 265 version data loads (#2125)

* basic versions

* add basic versioning

* add versioning

* rename file

* update changelog

* fix catalogue

* fix sql

* fix up sql

* update delete

* add tests

* tidy up

* fiz clone

* fix up tree structure

* clone version

* tidy up

11338 of 21302 branches covered (53.23%)

Branch coverage included in aggregate %.

966 of 1168 new or added lines in 8 files covered. (82.71%)

3 existing lines in 3 files now uncovered.

32238 of 54612 relevant lines covered (59.03%)

17067.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.52
/Rdmp.Core/Curation/Data/DataLoad/ProcessTask.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Collections.Generic;
9
using System.Data.Common;
10
using System.IO;
11
using System.Linq;
12
using System.Text.RegularExpressions;
13
using Rdmp.Core.Curation.Data.Cohort;
14
using Rdmp.Core.Curation.Data.ImportExport;
15
using Rdmp.Core.Curation.Data.Serialization;
16
using Rdmp.Core.MapsDirectlyToDatabaseTable;
17
using Rdmp.Core.MapsDirectlyToDatabaseTable.Attributes;
18
using Rdmp.Core.Repositories;
19
using Rdmp.Core.ReusableLibraryCode;
20
using Rdmp.Core.ReusableLibraryCode.Annotations;
21
using Rdmp.Core.ReusableLibraryCode.Checks;
22

23
namespace Rdmp.Core.Curation.Data.DataLoad;
24

25
/// <summary>
26
/// Describes a specific operation carried out at a specific step of a LoadMetadata.  This could be 'unzip all files called *.zip in for loading' or
27
/// 'after loading the data to live, call sp_clean_table1' or 'Connect to webservice X and download 1,000,000 records which will be serialized into XML'
28
/// 
29
/// <para>The class achieves this wide ranging functionality through the interaction of ProcessTaskType and Path.  e.g. when ProcessTaskType is Attacher then
30
/// Path functions as the Type name of a class that implements IAttacher e.g. 'LoadModules.Generic.Attachers.AnySeparatorFileAttacher'.  </para>
31
/// 
32
/// <para>Each ProcessTask can have one or more strongly typed arguments (see entity ProcessTaskArgument), these are discovered at design time by using
33
/// reflection to query the Path e.g. 'AnySeparatorFileAttacher' for all properties marked with [DemandsInitialization] attribute.  This allows for 3rd party developers
34
/// to write plugin classes to easily handle proprietary/bespoke source file types or complex data load requirements.</para>
35
/// </summary>
36
public class ProcessTask : DatabaseEntity, IProcessTask, IOrderable, INamed, ICheckable
37
{
38
    #region Database Properties
39

40
    private int _loadMetadataID;
41
    private int? _relatesSolelyToCatalogueID;
42
    private int _order;
43
    private string _path;
44
    private string _name;
45
    private LoadStage _loadStage;
46
    private ProcessTaskType _processTaskType;
47
    private bool _isDisabled;
48
#nullable enable
49
    private string? _SerialisableConfiguration;
50
#nullable disable
51

52
    /// <summary>
53
    /// The load the process task exists as part of
54
    /// </summary>
55
    [Relationship(typeof(LoadMetadata), RelationshipType.SharedObject)]
56
    public int LoadMetadata_ID
57
    {
58
        get => _loadMetadataID;
25,446✔
59
        set => SetField(ref _loadMetadataID, value);
1,144✔
60
    }
61

62
    /// <inheritdoc/>
63
    [Obsolete(
64
        "Since you can't change which Catalogues are loaded by a LoadMetadata at runtime, this property is now obsolete")]
65
    public int? RelatesSolelyToCatalogue_ID
66
    {
67
        get => _relatesSolelyToCatalogueID;
720✔
68
        set => SetField(ref _relatesSolelyToCatalogueID, value);
192✔
69
    }
70

71
    /// <inheritdoc/>
72
    public int Order
73
    {
74
        get => _order;
1,164✔
75
        set => SetField(ref _order, value);
1,154✔
76
    }
77

78
    /// <inheritdoc/>
79
    [AdjustableLocation]
80
    public string Path
81
    {
82
        get => _path;
1,316✔
83
        set => SetField(ref _path, value);
1,260✔
84
    }
85

86
    /// <inheritdoc cref="IProcessTask.Name"/>
87
    [NotNull]
88
    public string Name
89
    {
90
        get => _name;
1,004✔
91
        set => SetField(ref _name, value);
1,246✔
92
    }
93

94
    /// <inheritdoc/>
95
    public LoadStage LoadStage
96
    {
97
        get => _loadStage;
4,232✔
98
        set => SetField(ref _loadStage, value);
1,168✔
99
    }
100

101
    /// <inheritdoc/>
102
    public ProcessTaskType ProcessTaskType
103
    {
104
        get => _processTaskType;
1,148✔
105
        set => SetField(ref _processTaskType, value);
1,302✔
106
    }
107

108
    /// <inheritdoc/>
109
    public bool IsDisabled
110
    {
111
        get => _isDisabled;
1,520✔
112
        set => SetField(ref _isDisabled, value);
1,100✔
113
    }
114

115

116
    /// <inheritdoc/>
117
#nullable enable
118
    public string? SerialisableConfiguration
119
    {
120
        get => _SerialisableConfiguration;
824✔
121
        set => SetField(ref _SerialisableConfiguration, value);
1,090✔
122
    }
123

124

125
#nullable disable
126

127
    #endregion
128

129
    #region Relationships
130

131
    /// <inheritdoc cref="LoadMetadata_ID"/>
132
    [NoMappingToDatabase]
133
    public LoadMetadata LoadMetadata => Repository.GetObjectByID<LoadMetadata>(LoadMetadata_ID);
168✔
134

135
    /// <inheritdoc/>
136
    [NoMappingToDatabase]
137
    public IEnumerable<ProcessTaskArgument> ProcessTaskArguments =>
138
        Repository.GetAllObjectsWithParent<ProcessTaskArgument>(this);
602✔
139

140
    /// <summary>
141
    /// All <see cref="ILoadProgress"/> (if any) that can be advanced by executing this load.  This allows batch execution of large loads
142
    /// </summary>
143
    [NoMappingToDatabase]
144
    public ILoadProgress[] LoadProgresses => LoadMetadata.LoadProgresses;
2✔
145

146
    #endregion
147

148
    public ProcessTask()
×
149
    {
150
    }
×
151

152
    /// <summary>
153
    /// Creates a new operation in the data load (e.g. copy files from A to B, load all CSV files to RAW table B etc)
154
    /// </summary>
155
    /// <param name="repository"></param>
156
    /// <param name="parent"></param>
157
    /// <param name="stage"></param>
158
    public ProcessTask(ICatalogueRepository repository, ILoadMetadata parent, LoadStage stage)
236✔
159
    {
160
        var order =
236✔
161
            repository.GetAllObjectsWithParent<ProcessTask>(parent).Select(t => t.Order).DefaultIfEmpty().Max() + 1;
262✔
162

163
        repository.InsertAndHydrate(this, new Dictionary<string, object>
236!
164
        {
236✔
165
            { "LoadMetadata_ID", parent.RootLoadMetadata_ID??parent.ID },
236✔
166
            { "ProcessTaskType", ProcessTaskType.Executable.ToString() },
236✔
167
            { "LoadStage", stage },
236✔
168
            { "Name", $"New Process{Guid.NewGuid()}" },
236✔
169
            { "Order", order },
236✔
170
        });
236✔
171
    }
236✔
172

173
    /// <summary>
174
    /// Creates a new operation in the data load (e.g. copy files from A to B, load all CSV files to RAW table B etc)
175
    /// </summary>
176
    /// <param name="repository"></param>
177
    /// <param name="parent"></param>
178
    /// <param name="stage"></param>
179
    /// <param name="serialisableConfiguration"></param>
180
    public ProcessTask(ICatalogueRepository repository, ILoadMetadata parent, LoadStage stage, string serialisableConfiguration = null)
×
181
    {
182
        var order =
×
183
            repository.GetAllObjectsWithParent<ProcessTask>(parent).Select(t => t.Order).DefaultIfEmpty().Max() + 1;
×
184

185
        repository.InsertAndHydrate(this, new Dictionary<string, object>
×
186
        {
×
NEW
187
             { "LoadMetadata_ID", parent.RootLoadMetadata_ID??parent.ID },
×
188
            { "ProcessTaskType", ProcessTaskType.Executable.ToString() },
×
189
            { "LoadStage", stage },
×
190
            { "Name", $"New Process{Guid.NewGuid()}" },
×
191
            { "Order", order },
×
192
            {"SerialisableConfiguration", serialisableConfiguration}
×
193
        });
×
194
    }
×
195

196
    internal ProcessTask(ICatalogueRepository repository, DbDataReader r)
197
        : base(repository, r)
890✔
198
    {
199
        LoadMetadata_ID = int.Parse(r["LoadMetaData_ID"].ToString());
890✔
200

201
        if (r["RelatesSolelyToCatalogue_ID"] != DBNull.Value)
890!
202
            _relatesSolelyToCatalogueID = int.Parse(r["RelatesSolelyToCatalogue_ID"].ToString());
×
203

204
        Path = r["Path"] as string;
890✔
205
        Name = r["Name"] as string;
890✔
206
        Order = int.Parse(r["Order"].ToString());
890✔
207
        if (Enum.TryParse(r["ProcessTaskType"] as string, out ProcessTaskType processTaskType))
890!
208
            ProcessTaskType = processTaskType;
890✔
209
        else
210
            throw new Exception($"Could not parse ProcessTaskType:{r["ProcessTaskType"]}");
×
211

212
        if (Enum.TryParse(r["LoadStage"] as string, out LoadStage loadStage))
890!
213
            LoadStage = loadStage;
890✔
214
        else
215
            throw new Exception($"Could not parse LoadStage:{r["LoadStage"]}");
×
216

217
        IsDisabled = Convert.ToBoolean(r["IsDisabled"]);
890✔
218
        if (r["SerialisableConfiguration"] is not null)
890✔
219
            SerialisableConfiguration = r["SerialisableConfiguration"].ToString();
890✔
220
    }
890✔
221

222
    internal ProcessTask(ShareManager shareManager, ShareDefinition shareDefinition)
6✔
223
    {
224
        shareManager.UpsertAndHydrate(this, shareDefinition);
6✔
225
    }
6✔
226

227
    /// <inheritdoc/>
228
    public override string ToString() => Name;
72✔
229

230
    /// <inheritdoc/>
231
    public void Check(ICheckNotifier notifier)
232
    {
233
        switch (ProcessTaskType)
66!
234
        {
235
            case ProcessTaskType.Executable:
236
                CheckFileExistenceAndUniqueness(notifier);
×
237
                break;
×
238
            case ProcessTaskType.SQLFile:
239
                CheckFileExistenceAndUniqueness(notifier);
×
240
                CheckForProblemsInSQLFile(notifier);
×
241
                break;
×
242
            case ProcessTaskType.SQLBakFile:
243
                CheckFileExistenceAndUniqueness(notifier);
×
244
                break;
×
245
            case ProcessTaskType.Attacher:
246
                break;
247
            case ProcessTaskType.DataProvider:
248
                break;
249
            case ProcessTaskType.MutilateDataTable:
250
                break;
251
            default:
252
                throw new ArgumentOutOfRangeException();
×
253
        }
254
    }
66✔
255

256
    private void CheckForProblemsInSQLFile(ICheckNotifier notifier)
257
    {
258
        try
259
        {
260
            var sql = File.ReadAllText(Path);
×
261

262
            //let's check for any SQL that indicates user is trying to modify a STAGING table in a RAW script (for example)
263
            foreach (var tableInfo in LoadMetadata.GetDistinctTableInfoList(false))
×
264
                //for each stage get all the object names that are in that stage
265
                foreach (var stage in new[] { LoadStage.AdjustRaw, LoadStage.AdjustStaging, LoadStage.PostLoad })
×
266
                {
267
                    //process task belongs in that stage anyway so nothing is prohibited
268
                    if (stage == (LoadStage == LoadStage.Mounting ? LoadStage.AdjustRaw : LoadStage))
×
269
                        continue;
270

271
                    //figure out what is prohibited
272
                    var prohibitedSql = tableInfo.GetQuerySyntaxHelper()
×
273
                        .EnsureFullyQualified(tableInfo.GetDatabaseRuntimeName(stage), null,
×
274
                            tableInfo.GetRuntimeName(stage));
×
275

276
                    //if we reference it, complain
277
                    if (sql.Contains(prohibitedSql))
×
278
                        notifier.OnCheckPerformed(
×
279
                            new CheckEventArgs(
×
280
                                $"Sql in file '{Path}' contains a reference to '{prohibitedSql}' which is prohibited since the ProcessTask ('{Name}') runs in LoadStage {LoadStage}",
×
281
                                CheckResult.Warning));
×
282
                }
283
        }
×
284
        catch (Exception e)
×
285
        {
286
            notifier.OnCheckPerformed(new CheckEventArgs($"Failed to check the contents of the SQL file '{Path}'",
×
287
                CheckResult.Fail, e));
×
288
        }
×
289
    }
×
290

291
    private void CheckFileExistenceAndUniqueness(ICheckNotifier notifier)
292
    {
293
        if (string.IsNullOrWhiteSpace(Path))
×
294
        {
295
            notifier.OnCheckPerformed(new CheckEventArgs($"No Path specified for ProcessTask '{Name}'",
×
296
                CheckResult.Fail));
×
297
            return;
×
298
        }
299

300
        notifier.OnCheckPerformed(!File.Exists(Path)
×
301
            ? new CheckEventArgs($"Could not find File '{Path}' for ProcessTask '{Name}'", CheckResult.Fail)
×
302
            : new CheckEventArgs($"Found File '{Path}'", CheckResult.Success));
×
303

304

305
        var matchingPaths = Repository.GetAllObjects<ProcessTask>().Where(pt => pt.Path.Equals(Path));
×
306
        foreach (var duplicate in matchingPaths.Except(new[] { this }))
×
307
            notifier.OnCheckPerformed(
×
308
                new CheckEventArgs(
×
309
                    $"ProcessTask '{duplicate}' (ID={duplicate.ID}) also uses file '{System.IO.Path.GetFileName(Path)}'",
×
310
                    CheckResult.Warning));
×
311

312
        //conflicting tokens in Name string
313
        foreach (Match match in Regex.Matches(Name, @"'.*((\.exe')|(\.sql'))"))
×
314
            if (match.Success)
×
315
            {
316
                var referencedFile = System.IO.Path.GetFileName(match.Value.Trim('\''));
×
317
                var actualFile = System.IO.Path.GetFileName(Path);
×
318

319
                if (referencedFile != actualFile)
×
320
                    notifier.OnCheckPerformed(
×
321
                        new CheckEventArgs(
×
322
                            $"Name of ProcessTask '{Name}' (ID={ID}) references file '{match.Value}' but the Path of the ProcessTask is '{Path}'",
×
323
                            CheckResult.Fail));
×
324
            }
325
    }
×
326

327
    /// <summary>
328
    /// Returns all tables loaded by the parent <see cref="LoadMetadata"/>
329
    /// </summary>
330
    /// <returns></returns>
331
    public IEnumerable<TableInfo> GetTableInfos() => LoadMetadata.GetDistinctTableInfoList(true);
2✔
332

333
    /// <inheritdoc/>
334
    public IEnumerable<IArgument> GetAllArguments() => ProcessTaskArguments;
354✔
335

336
    /// <inheritdoc/>
337
    public IArgument CreateNewArgument() => new ProcessTaskArgument((ICatalogueRepository)Repository, this);
1,662✔
338

339
    /// <inheritdoc/>
340
    public string GetClassNameWhoArgumentsAreFor() => Path;
18✔
341

342
    /// <summary>
343
    /// Creates a new copy of the processTask and all its arguments in the database, this clone is then hooked up to the
344
    /// new LoadMetadata at the specified stage
345
    /// </summary>
346
    /// <param name="loadMetadata">The new LoadMetadata parent for the clone</param>
347
    /// <param name="loadStage">The new load stage to put the clone in </param>
348
    /// <returns>the new ProcessTask (the clone has a different ID to the parent)</returns>
349
    public ProcessTask CloneToNewLoadMetadataStage(LoadMetadata loadMetadata, LoadStage loadStage)
350
    {
351
        var cataRepository = (ICatalogueRepository)Repository;
4✔
352

353
        //clone only accepts sql connections so make sure we aren't in mysql land or something
354
        using (cataRepository.BeginNewTransaction())
4✔
355
        {
356
            try
357
            {
358
                //get list of arguments to also clone (will happen outside of transaction
359
                var toCloneArguments = ProcessTaskArguments.ToArray();
4✔
360

361
                //create a new transaction for all the cloning - note that once all objects are cloned the transaction is committed then all the objects are adjusted outside the transaction
362
                var clone = new ProcessTask(CatalogueRepository, LoadMetadata, loadStage);
4✔
363
                CopyShallowValuesTo(clone);
4✔
364

365
                //foreach of our child arguments
366
                foreach (var argument in toCloneArguments)
12✔
367
                    //clone it but rewire it to the proper ProcessTask parent (the clone)
368
                    argument.ShallowClone(clone);
2✔
369

370
                //the values passed into parameter
371
                clone.LoadMetadata_ID = loadMetadata.ID;
4✔
372
                clone.LoadStage = loadStage;
4✔
373
                clone.SaveToDatabase();
4✔
374

375
                //it worked
376
                cataRepository.EndTransaction(true);
4✔
377

378
                //return the clone
379
                return clone;
4✔
380
            }
381
            catch (Exception)
×
382
            {
383
                cataRepository.EndTransaction(false);
×
384
                throw;
×
385
            }
386
        }
387
    }
4✔
388

389
    /// <inheritdoc/>
390
    public IArgument[] CreateArgumentsForClassIfNotExists(Type t) =>
391
        ArgumentFactory.CreateArgumentsForClassIfNotExistsGeneric(
104✔
392
                t,
104✔
393

104✔
394
                //tell it how to create new instances of us related to parent
104✔
395
                this,
104✔
396

104✔
397
                //what arguments already exist
104✔
398
                GetAllArguments().ToArray())
104✔
399

104✔
400
            //convert the result back from generic to specific (us)
104✔
401
            .ToArray();
104✔
402

403
    /// <inheritdoc/>
404
    public IArgument[] CreateArgumentsForClassIfNotExists<T>() => CreateArgumentsForClassIfNotExists(typeof(T));
94✔
405

406
    /// <summary>
407
    /// Returns true if the <see cref="ProcessTaskType"/> is allowed to happen during the given <see cref="LoadStage"/>  (e.g. you can't use an IAttacher to
408
    /// load data into STAGING/LIVE - only RAW).
409
    /// </summary>
410
    /// <param name="type"></param>
411
    /// <param name="stage"></param>
412
    /// <returns></returns>
413
    public static bool IsCompatibleStage(ProcessTaskType type, LoadStage stage)
414
    {
415
        return type switch
×
416
        {
×
417
            ProcessTaskType.Executable => true,
×
418
            ProcessTaskType.SQLFile => stage != LoadStage.GetFiles,
×
419
            ProcessTaskType.SQLBakFile => stage != LoadStage.GetFiles,
×
420
            ProcessTaskType.Attacher => stage == LoadStage.Mounting,
×
421
            ProcessTaskType.DataProvider => true,
×
422
            ProcessTaskType.MutilateDataTable => stage != LoadStage.GetFiles,
×
423
            _ => throw new ArgumentOutOfRangeException(nameof(type))
×
424
        };
×
425
    }
426

427
    /// <summary>
428
    /// True if <see cref="Path"/> is the name of a C# class (as opposed to the path to an executable or SQL file etc)
429
    /// </summary>
430
    /// <returns></returns>
431
    public bool IsPluginType() => ProcessTaskType == ProcessTaskType.Attacher ||
×
432
                                  ProcessTaskType == ProcessTaskType.MutilateDataTable ||
×
433
                                  ProcessTaskType == ProcessTaskType.DataProvider;
×
434

435
    /// <summary>
436
    /// Sets the value of the corresponding <see cref="IArgument"/> (which must already exist) to the given value.  If your argument doesn't exist yet you
437
    /// can call <see cref="CreateArgumentsForClassIfNotExists"/>
438
    /// </summary>
439
    /// <param name="parameterName"></param>
440
    /// <param name="o"></param>
441
    public void SetArgumentValue(string parameterName, object o)
442
    {
443
        var matchingArgument = ProcessTaskArguments.SingleOrDefault(p => p.Name.Equals(parameterName)) ??
3,414!
444
                               throw new Exception(
198✔
445
                                   $"Could not find a ProcessTaskArgument called '{parameterName}', have you called CreateArgumentsForClassIfNotExists<T> yet?");
198✔
446
        matchingArgument.SetValue(o);
198✔
447
        matchingArgument.SaveToDatabase();
198✔
448
    }
198✔
449

450

451
    public ProcessTask Clone(LoadMetadata lmd)
452
    {
453
        var pt = new ProcessTask(this.CatalogueRepository, lmd, this.LoadStage) {
8✔
454
            ProcessTaskType = this.ProcessTaskType,
8✔
455
            Order = this.Order,
8✔
456
            IsDisabled = this.IsDisabled,
8✔
457
            SerialisableConfiguration = this.SerialisableConfiguration,
8✔
458
            Path = this.Path,
8✔
459
            Name= this.Name,
8✔
460
        };
8✔
461
        pt.LoadMetadata_ID = lmd.ID;
8✔
462
        pt.SaveToDatabase();
8✔
463
        foreach(var pta in ProcessTaskArguments)
320✔
464
        {
465
            pta.ShallowClone(pt);
152✔
466
        }
467
        return pt;
8✔
468
    }
469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc