• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 6245535001

20 Sep 2023 07:44AM UTC coverage: 57.013%. First build
6245535001

push

github

web-flow
8.1.0 Release (#1628)

* Bump Newtonsoft.Json from 13.0.1 to 13.0.2

Bumps [Newtonsoft.Json](https://github.com/JamesNK/Newtonsoft.Json) from 13.0.1 to 13.0.2.
- [Release notes](https://github.com/JamesNK/Newtonsoft.Json/releases)
- [Commits](https://github.com/JamesNK/Newtonsoft.Json/compare/13.0.1...13.0.2)

---
updated-dependencies:
- dependency-name: Newtonsoft.Json
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump NLog from 5.0.5 to 5.1.0

Bumps [NLog](https://github.com/NLog/NLog) from 5.0.5 to 5.1.0.
- [Release notes](https://github.com/NLog/NLog/releases)
- [Changelog](https://github.com/NLog/NLog/blob/dev/CHANGELOG.md)
- [Commits](https://github.com/NLog/NLog/compare/v5.0.5...v5.1.0)

---
updated-dependencies:
- dependency-name: NLog
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump NLog from 5.0.5 to 5.1.0

* Fix -r flag - should have been --results-directory all along

* Bump Newtonsoft.Json from 13.0.1 to 13.0.2

* Bump YamlDotNet from 12.0.2 to 12.1.0

Bumps [YamlDotNet](https://github.com/aaubry/YamlDotNet) from 12.0.2 to 12.1.0.
- [Release notes](https://github.com/aaubry/YamlDotNet/releases)
- [Commits](https://github.com/aaubry/YamlDotNet/compare/v12.0.2...v12.1.0)

---
updated-dependencies:
- dependency-name: YamlDotNet
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump Moq from 4.18.2 to 4.18.3

Bumps [Moq](https://github.com/moq/moq4) from 4.18.2 to 4.18.3.
- [Release notes](https://github.com/moq/moq4/releases)
- [Changelog](https://github.com/moq/moq4/blob/main/CHANGELOG.md)
- [Commits](https://github.com/moq/moq4/compare/v4.18.2...v4.18.3)

---
updated-dependencies:
- dependency-name: Moq
... (continued)

10732 of 20257 branches covered (0.0%)

Branch coverage included in aggregate %.

48141 of 48141 new or added lines in 1086 files covered. (100.0%)

30685 of 52388 relevant lines covered (58.57%)

7387.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.94
/Rdmp.Core/DataLoad/Engine/Pipeline/Sources/DbDataCommandDataFlowSource.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Data;
9
using System.Data.Common;
10
using System.Diagnostics;
11
using Rdmp.Core.DataFlowPipeline;
12
using Rdmp.Core.ReusableLibraryCode;
13
using Rdmp.Core.ReusableLibraryCode.Progress;
14

15
namespace Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
16

17
/// <inheritdoc/>
18
public class DbDataCommandDataFlowSource : IDbDataCommandDataFlowSource
19
{
20
    public string Sql { get; }
262✔
21
    private DbDataReader _reader;
22
    private readonly DbConnectionStringBuilder _builder;
23
    private readonly int _timeout;
24
    private DbConnection _con;
25

26
    private readonly string _taskBeingPerformed;
27
    private Stopwatch timer = new();
130✔
28

29
    public int BatchSize { get; set; }
4,676✔
30

31
    public DbCommand cmd { get; private set; }
458✔
32

33
    public bool AllowEmptyResultSets { get; set; }
66✔
34
    public int TotalRowsRead { get; set; }
9,272✔
35

36
    /// <summary>
37
    /// Called after command sql has been set up, allows last minute changes by subscribers before it is executed
38
    /// </summary>
39
    public Action<DbCommand> CommandAdjuster { get; set; }
130✔
40

41
    public DbDataCommandDataFlowSource(string sql, string taskBeingPerformed, DbConnectionStringBuilder builder,
130✔
42
        int timeout)
130✔
43
    {
44
        Sql = sql;
130✔
45
        _taskBeingPerformed = taskBeingPerformed;
130✔
46
        _builder = builder;
130✔
47
        _timeout = timeout;
130✔
48

49
        BatchSize = 10000;
130✔
50
    }
130✔
51

52
    private int _numberOfColumns;
53

54
    private bool _firstChunk = true;
130✔
55

56
    private DataTable schema = null;
57

58
    public DataTable GetChunk(IDataLoadEventListener job, GracefulCancellationToken cancellationToken)
59
    {
60
        if (_reader == null)
242✔
61
        {
62
            _con = DatabaseCommandHelper.GetConnection(_builder);
130✔
63
            _con.Open();
130✔
64

65
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
130✔
66
                $"Running SQL:{Environment.NewLine}{Sql}"));
130✔
67

68
            cmd = DatabaseCommandHelper.GetCommand(Sql, _con);
130✔
69
            cmd.CommandTimeout = _timeout;
130✔
70
            CommandAdjuster?.Invoke(cmd);
130!
71

72
            _reader = cmd.ExecuteReaderAsync(cancellationToken.AbortToken).Result;
130✔
73
            _numberOfColumns = _reader.FieldCount;
128✔
74

75
            schema = GetChunkSchema(_reader);
128✔
76
        }
77

78
        var readThisBatch = 0;
240✔
79
        timer.Start();
240✔
80
        try
81
        {
82
            var chunk = schema.Clone();
240✔
83
            chunk.BeginLoadData();
240✔
84

85
            while (_reader.HasRows && _reader.Read())
4,726✔
86
            {
87
                cancellationToken.ThrowIfCancellationRequested();
4,486✔
88

89
                AddRowToDataTable(chunk, _reader);
4,486✔
90
                readThisBatch++;
4,486✔
91

92
                // loop until we reach the batch limit
93
                if (readThisBatch != BatchSize) continue;
4,486!
94

95
                chunk.EndLoadData();
×
96
                return chunk;
×
97
            }
98
            chunk.EndLoadData();
240✔
99

100
            //if data was read
101
            if (readThisBatch > 0)
240✔
102
            {
103
                chunk.EndLoadData();
122✔
104
                return chunk;
122✔
105
            }
106

107
            //data is exhausted
108

109
            //if data was exhausted on first read and we are allowing empty result sets
110
            if (_firstChunk && AllowEmptyResultSets)
118✔
111
            {
112
                chunk.EndLoadData();
4✔
113
                return chunk; //return the empty chunk
4✔
114
            }
115

116
            //data exhausted
117
            schema.Dispose();
114✔
118
            return null;
114✔
119
        }
120
        catch (Exception e)
×
121
        {
122
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Source read failed", e));
×
123
            throw;
×
124
        }
125
        finally
126
        {
127
            _firstChunk = false;
240✔
128
            timer.Stop();
240✔
129
            job.OnProgress(this,
240✔
130
                new ProgressEventArgs(_taskBeingPerformed, new ProgressMeasurement(TotalRowsRead, ProgressType.Records),
240✔
131
                    timer.Elapsed));
240✔
132
        }
240✔
133
    }
240✔
134

135
    private DataRow AddRowToDataTable(DataTable chunk, DbDataReader reader)
136
    {
137
        var values = new object[_numberOfColumns];
4,486✔
138

139
        reader.GetValues(values);
4,486✔
140
        TotalRowsRead++;
4,486✔
141
        return chunk.LoadDataRow(values, LoadOption.Upsert);
4,486✔
142
    }
143

144
    /// <inheritdoc/>
145
    public DataRow ReadOneRow() =>
146
        //return null if there are no more records to read
147
        _reader.Read() ? AddRowToDataTable(GetChunkSchema(_reader), _reader) : null;
×
148

149
    private static DataTable GetChunkSchema(DbDataReader reader)
150
    {
151
        var toReturn = new DataTable("dt");
128✔
152

153
        //Retrieve column schema into a DataTable.
154
        var schemaTable = reader.GetSchemaTable() ??
128!
155
                          throw new InvalidOperationException(
128✔
156
                              "Could not retrieve schema information from the DbDataReader");
128✔
157
        Debug.Assert(schemaTable.Columns[0].ColumnName.ToLower().Contains("name"));
158

159
        //For each field in the table...
160
        foreach (DataRow myField in schemaTable.Rows)
1,924✔
161
        {
162
            var t = Type.GetType(myField["DataType"].ToString()) ??
834!
163
                    throw new NotSupportedException($"Type.GetType failed on SQL DataType:{myField["DataType"]}");
834✔
164

165
            //let's not mess around with floats, make everything a double please
166
            if (t == typeof(float))
834!
167
                t = typeof(double);
×
168

169

170
            toReturn.Columns.Add(myField[0].ToString(), t); //0 should always be the column name
834✔
171
        }
172

173
        return toReturn;
128✔
174
    }
175

176
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
177
    {
178
        CloseReader(listener);
68✔
179
    }
68✔
180

181
    public void Abort(IDataLoadEventListener listener)
182
    {
183
        CloseReader(listener);
×
184
    }
×
185

186
    private void CloseReader(IDataLoadEventListener listener)
187
    {
188
        try
189
        {
190
            if (_con == null)
68!
191
                return;
×
192

193
            if (_con.State != ConnectionState.Closed)
68✔
194
                _con.Close();
68✔
195

196
            _reader?.Dispose();
68✔
197
            cmd?.Dispose();
68!
198

199
            //do not do this more than once! which could happen if they abort then it disposes
200
            _con = null;
68✔
201
        }
68✔
202
        catch (Exception e)
×
203
        {
204
            listener.OnNotify(this,
×
205
                new NotifyEventArgs(ProgressEventType.Warning, "Could not close Reader / Connection", e));
×
206
        }
×
207
    }
68✔
208

209
    public DataTable TryGetPreview()
210
    {
211
        var chunk = new DataTable();
×
212
        using var con = DatabaseCommandHelper.GetConnection(_builder);
×
213
        con.Open();
×
214
        using var da = DatabaseCommandHelper.GetDataAdapter(DatabaseCommandHelper.GetCommand(Sql, con));
×
215
        var read = da.Fill(0, 100, chunk);
×
216

217
        return read == 0 ? null : chunk;
×
218
    }
×
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc