• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 13235779323

10 Feb 2025 07:36AM UTC coverage: 57.389% (-0.04%) from 57.433%
13235779323

push

github

web-flow
Spike/rdmp 282 cohort temp tables (#2131)

* interim

* add cohort temp table

* tidy up code

* handle all types of db

* tidy up

* tidy up

* add changelog

* add error handling

11306 of 21250 branches covered (53.2%)

Branch coverage included in aggregate %.

20 of 79 new or added lines in 2 files covered. (25.32%)

2 existing lines in 1 file now uncovered.

32155 of 54480 relevant lines covered (59.02%)

17093.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.03
/Rdmp.Core/DataLoad/Engine/Pipeline/Sources/DbDataCommandDataFlowSource.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Data;
9
using System.Data.Common;
10
using System.Diagnostics;
11
using Rdmp.Core.DataFlowPipeline;
12
using Rdmp.Core.ReusableLibraryCode;
13
using Rdmp.Core.ReusableLibraryCode.Progress;
14

15
namespace Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
16

17
/// <inheritdoc/>
18
public class DbDataCommandDataFlowSource : IDbDataCommandDataFlowSource
19
{
20
    public string Sql { get; }
606✔
21
    private DbDataReader _reader;
22
    private readonly DbConnectionStringBuilder _builder;
23
    private readonly int _timeout;
24
    private DbConnection _con;
25

26
    private readonly string _taskBeingPerformed;
27
    private Stopwatch timer = new();
302✔
28

29
    public int BatchSize { get; set; }
4,938✔
30

31
    public DbCommand cmd { get; private set; }
1,110✔
32

33
    public bool AllowEmptyResultSets { get; set; }
102✔
34
    public int TotalRowsRead { get; set; }
9,970✔
35

36
    /// <summary>
37
    /// Called after command sql has been set up, allows last minute changes by subscribers before it is executed
38
    /// </summary>
39
    public Action<DbCommand> CommandAdjuster { get; set; }
302✔
40

41
    public DbDataCommandDataFlowSource(string sql, string taskBeingPerformed, DbConnectionStringBuilder builder,
302✔
42
        int timeout)
302✔
43
    {
44
        Sql = sql;
302✔
45
        _taskBeingPerformed = taskBeingPerformed;
302✔
46
        _builder = builder;
302✔
47
        _timeout = timeout;
302✔
48

49
        BatchSize = 10000;
302✔
50
    }
302✔
51

NEW
52
    public DbDataCommandDataFlowSource(string sql, string taskBeingPerformed, DbConnection con,
×
NEW
53
        int timeout)
×
54
    {
NEW
55
        Sql = sql;
×
NEW
56
        _taskBeingPerformed = taskBeingPerformed;
×
NEW
57
        _con = con;
×
NEW
58
        _timeout = timeout;
×
59

NEW
60
        BatchSize = 10000;
×
NEW
61
    }
×
62

63
    private int _numberOfColumns;
64

65
    private bool _firstChunk = true;
302✔
66

67
    private DataTable schema = null;
68

69
    public DataTable GetChunk(IDataLoadEventListener job, GracefulCancellationToken cancellationToken)
70
    {
71
        if (_reader == null)
562✔
72
        {
73
            _con = _con ==null?DatabaseCommandHelper.GetConnection(_builder):_con;
302!
74
            if(_con != null && _con.State == ConnectionState.Closed)
302✔
75
{
76
                _con.Open();
302✔
77
            }
78

79
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
302✔
80
                $"Running SQL:{Environment.NewLine}{Sql}"));
302✔
81

82
            cmd = DatabaseCommandHelper.GetCommand(Sql, _con);
302✔
83
            cmd.CommandTimeout = _timeout;
302✔
84
            CommandAdjuster?.Invoke(cmd);
302!
85

86
            _reader = cmd.ExecuteReaderAsync(cancellationToken.AbortToken).Result;
302✔
87
            _numberOfColumns = _reader.FieldCount;
276✔
88

89
            schema = GetChunkSchema(_reader);
276✔
90
        }
91

92
        var readThisBatch = 0;
536✔
93
        timer.Start();
536✔
94
        try
95
        {
96
            var chunk = schema.Clone();
536✔
97
            chunk.BeginLoadData();
536✔
98

99
            while (_reader.HasRows && _reader.Read())
5,076✔
100
            {
101
                cancellationToken.ThrowIfCancellationRequested();
4,540✔
102

103
                AddRowToDataTable(chunk, _reader);
4,540✔
104
                readThisBatch++;
4,540✔
105

106
                // loop until we reach the batch limit
107
                if (readThisBatch != BatchSize) continue;
4,540!
108

109
                chunk.EndLoadData();
×
110
                return chunk;
×
111
            }
112
            chunk.EndLoadData();
536✔
113

114
            //if data was read
115
            if (readThisBatch > 0)
536✔
116
            {
117
                chunk.EndLoadData();
270✔
118
                return chunk;
270✔
119
            }
120

121
            //data is exhausted
122

123
            //if data was exhausted on first read and we are allowing empty result sets
124
            if (_firstChunk && AllowEmptyResultSets)
266✔
125
            {
126
                chunk.EndLoadData();
4✔
127
                return chunk; //return the empty chunk
4✔
128
            }
129

130
            //data exhausted
131
            schema.Dispose();
262✔
132
            return null;
262✔
133
        }
134
        catch (Exception e)
×
135
        {
136
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Source read failed", e));
×
137
            throw;
×
138
        }
139
        finally
140
        {
141
            _firstChunk = false;
536✔
142
            timer.Stop();
536✔
143
            job.OnProgress(this,
536✔
144
                new ProgressEventArgs(_taskBeingPerformed, new ProgressMeasurement(TotalRowsRead, ProgressType.Records),
536✔
145
                    timer.Elapsed));
536✔
146
        }
536✔
147
    }
536✔
148

149
    private DataRow AddRowToDataTable(DataTable chunk, DbDataReader reader)
150
    {
151
        var values = new object[_numberOfColumns];
4,540✔
152

153
        reader.GetValues(values);
4,540✔
154
        TotalRowsRead++;
4,540✔
155
        return chunk.LoadDataRow(values, LoadOption.Upsert);
4,540✔
156
    }
157

158
    /// <inheritdoc/>
159
    public DataRow ReadOneRow() =>
160
        //return null if there are no more records to read
161
        _reader.Read() ? AddRowToDataTable(GetChunkSchema(_reader), _reader) : null;
×
162

163
    private static DataTable GetChunkSchema(DbDataReader reader)
164
    {
165
        var toReturn = new DataTable("dt");
276✔
166

167
        //Retrieve column schema into a DataTable.
168
        var schemaTable = reader.GetSchemaTable() ??
276!
169
                          throw new InvalidOperationException(
276✔
170
                              "Could not retrieve schema information from the DbDataReader");
276✔
171
        Debug.Assert(schemaTable.Columns[0].ColumnName.ToLower().Contains("name"));
172

173
        //For each field in the table...
174
        foreach (DataRow myField in schemaTable.Rows)
4,380✔
175
        {
176
            var t = Type.GetType(myField["DataType"].ToString()) ??
1,914!
177
                    throw new NotSupportedException($"Type.GetType failed on SQL DataType:{myField["DataType"]}");
1,914✔
178

179
            //let's not mess around with floats, make everything a double please
180
            if (t == typeof(float))
1,914!
181
                t = typeof(double);
×
182

183

184
            toReturn.Columns.Add(myField[0].ToString(), t); //0 should always be the column name
1,914✔
185
        }
186

187
        return toReturn;
276✔
188
    }
189

190
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
191
    {
192
        CloseReader(listener);
204✔
193
    }
204✔
194

195
    public void Abort(IDataLoadEventListener listener)
196
    {
197
        CloseReader(listener);
×
198
    }
×
199

200
    private void CloseReader(IDataLoadEventListener listener)
201
    {
202
        try
203
        {
204
            if (_con == null)
204!
205
                return;
×
206

207
            if (_con.State != ConnectionState.Closed)
204✔
208
                _con.Close();
204✔
209

210
            _reader?.Dispose();
204✔
211
            cmd?.Dispose();
204!
212

213
            //do not do this more than once! which could happen if they abort then it disposes
214
            _con = null;
204✔
215
        }
204✔
216
        catch (Exception e)
×
217
        {
218
            listener.OnNotify(this,
×
219
                new NotifyEventArgs(ProgressEventType.Warning, "Could not close Reader / Connection", e));
×
220
        }
×
221
    }
204✔
222

223
    public DataTable TryGetPreview()
224
    {
225
        var chunk = new DataTable();
×
NEW
226
        _con = _con == null ? DatabaseCommandHelper.GetConnection(_builder) : _con;
×
NEW
227
        if (_con != null && _con.State == ConnectionState.Closed)
×
228
        {
NEW
229
            _con.Open();
×
230
        }
NEW
231
        using var da = DatabaseCommandHelper.GetDataAdapter(DatabaseCommandHelper.GetCommand(Sql, _con));
×
UNCOV
232
        var read = da.Fill(0, 100, chunk);
×
UNCOV
233
        return read == 0 ? null : chunk;
×
234
    }
×
235
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc