• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HicServices / RDMP / 6003738864

28 Aug 2023 06:50PM UTC coverage: 57.442% (+0.01%) from 57.432%
6003738864

push

github

web-flow
Feature/RDMP-28 Add BeginLoadData & EndLoadData to Datatables (#1598)

* partial fix

* add row peaker update

* fix up whitespace

* add a lot of daat begin loads

* more data load

* fix typo

10908 of 20572 branches covered (0.0%)

Branch coverage included in aggregate %.

65 of 65 new or added lines in 20 files covered. (100.0%)

31683 of 53574 relevant lines covered (59.14%)

8443.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.48
/Rdmp.Core/DataLoad/Engine/Pipeline/Sources/DbDataCommandDataFlowSource.cs
1
// Copyright (c) The University of Dundee 2018-2019
2
// This file is part of the Research Data Management Platform (RDMP).
3
// RDMP is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
4
// RDMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
5
// You should have received a copy of the GNU General Public License along with RDMP. If not, see <https://www.gnu.org/licenses/>.
6

7
using System;
8
using System.Data;
9
using System.Data.Common;
10
using System.Diagnostics;
11
using Rdmp.Core.DataFlowPipeline;
12
using Rdmp.Core.ReusableLibraryCode;
13
using Rdmp.Core.ReusableLibraryCode.Progress;
14

15
namespace Rdmp.Core.DataLoad.Engine.Pipeline.Sources;
16

17
/// <inheritdoc/>
18
public class DbDataCommandDataFlowSource : IDbDataCommandDataFlowSource
19
{
20
    public string Sql { get; }
262✔
21
    private DbDataReader _reader;
22
    private readonly DbConnectionStringBuilder _builder;
23
    private readonly int _timeout;
24
    private DbConnection _con;
25

26
    private readonly string _taskBeingPerformed;
27
    private Stopwatch timer = new();
130✔
28

29
    public int BatchSize { get; set; }
4,676✔
30

31
    public DbCommand cmd { get; private set; }
458✔
32

33
    public bool AllowEmptyResultSets { get; set; }
66✔
34
    public int TotalRowsRead { get; set; }
9,272✔
35

36
    /// <summary>
37
    /// Called after command sql has been set up, allows last minute changes by subscribers before it is executed
38
    /// </summary>
39
    public Action<DbCommand> CommandAdjuster { get; set; }
130✔
40

41
    public DbDataCommandDataFlowSource(string sql, string taskBeingPerformed, DbConnectionStringBuilder builder,
130✔
42
        int timeout)
130✔
43
    {
44
        Sql = sql;
130✔
45
        _taskBeingPerformed = taskBeingPerformed;
130✔
46
        _builder = builder;
130✔
47
        _timeout = timeout;
130✔
48

49
        BatchSize = 10000;
130✔
50
    }
130✔
51

52
    private int _numberOfColumns;
53

54
    private bool firstChunk = true;
130✔
55

56
    public DataTable GetChunk(IDataLoadEventListener job, GracefulCancellationToken cancellationToken)
57
    {
58
        if (_reader == null)
242✔
59
        {
60
            _con = DatabaseCommandHelper.GetConnection(_builder);
130✔
61
            _con.Open();
130✔
62

63
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
130✔
64
                $"Running SQL:{Environment.NewLine}{Sql}"));
130✔
65

66
            cmd = DatabaseCommandHelper.GetCommand(Sql, _con);
130✔
67
            cmd.CommandTimeout = _timeout;
130✔
68
            CommandAdjuster?.Invoke(cmd);
130!
69

70
            _reader = cmd.ExecuteReaderAsync(cancellationToken.AbortToken).Result;
130✔
71
            _numberOfColumns = _reader.FieldCount;
128✔
72
        }
73

74
        var readThisBatch = 0;
240✔
75
        timer.Start();
240✔
76
        try
77
        {
78
            DataTable chunk = GetChunkSchema(_reader);
240✔
79
            chunk.BeginLoadData();
240✔
80
            while (_reader.HasRows && _reader.Read())
4,726✔
81
            {
82
                cancellationToken.ThrowIfCancellationRequested();
4,486✔
83

84
                AddRowToDataTable(chunk, _reader);
4,486✔
85
                readThisBatch++;
4,486✔
86

87
                //we reached batch limit
88
                if (readThisBatch == BatchSize)
4,486!
89
                {
90
                    chunk.EndLoadData();
×
91
                    return chunk;
×
92
                }
93
            }
94
            chunk.EndLoadData();
240✔
95

96
            //if data was read
97
            if (readThisBatch > 0)
240✔
98
                return chunk;
122✔
99

100
            //data is exhausted
101

102
            //if data was exhausted on first read and we are allowing empty result sets
103
            if (firstChunk && AllowEmptyResultSets)
118✔
104
                return chunk; //return the empty chunk
4✔
105

106
            //data exhausted
107
            return null;
114✔
108
        }
109
        catch (Exception e)
×
110
        {
111
            job.OnNotify(this, new NotifyEventArgs(ProgressEventType.Error, "Source read failed", e));
×
112
            throw;
×
113
        }
114
        finally
115
        {
116
            firstChunk = false;
240✔
117
            timer.Stop();
240✔
118
            job.OnProgress(this,
240✔
119
                new ProgressEventArgs(_taskBeingPerformed, new ProgressMeasurement(TotalRowsRead, ProgressType.Records),
240✔
120
                    timer.Elapsed));
240✔
121
        }
240✔
122
    }
240✔
123

124
    private DataRow AddRowToDataTable(DataTable chunk, DbDataReader reader)
125
    {
126
        var values = new object[_numberOfColumns];
4,486✔
127

128
        reader.GetValues(values);
4,486✔
129
        TotalRowsRead++;
4,486✔
130
        return chunk.LoadDataRow(values, LoadOption.Upsert);
4,486✔
131
    }
132

133
    /// <inheritdoc/>
134
    public DataRow ReadOneRow() =>
135
        //return null if there are no more records to read
136
        _reader.Read() ? AddRowToDataTable(GetChunkSchema(_reader), _reader) : null;
×
137

138
    private static DataTable GetChunkSchema(DbDataReader reader)
139
    {
140
        var toReturn = new DataTable("dt");
240✔
141

142
        //Retrieve column schema into a DataTable.
143
        var schemaTable = reader.GetSchemaTable() ??
240!
144
                          throw new InvalidOperationException(
240✔
145
                              "Could not retrieve schema information from the DbDataReader");
240✔
146
        Debug.Assert(schemaTable.Columns[0].ColumnName.ToLower().Contains("name"));
147

148
        //For each field in the table...
149
        foreach (DataRow myField in schemaTable.Rows)
3,548✔
150
        {
151
            var t = Type.GetType(myField["DataType"].ToString()) ??
1,534!
152
                    throw new NotSupportedException($"Type.GetType failed on SQL DataType:{myField["DataType"]}");
1,534✔
153

154
            //let's not mess around with floats, make everything a double please
155
            if (t == typeof(float))
1,534!
156
                t = typeof(double);
×
157

158

159
            toReturn.Columns.Add(myField[0].ToString(), t); //0 should always be the column name
1,534✔
160
        }
161

162
        return toReturn;
240✔
163
    }
164

165
    public void Dispose(IDataLoadEventListener listener, Exception pipelineFailureExceptionIfAny)
166
    {
167
        CloseReader(listener);
68✔
168
    }
68✔
169

170
    public void Abort(IDataLoadEventListener listener)
171
    {
172
        CloseReader(listener);
×
173
    }
×
174

175
    private void CloseReader(IDataLoadEventListener listener)
176
    {
177
        try
178
        {
179
            if (_con == null)
68!
180
                return;
×
181

182
            if (_con.State != ConnectionState.Closed)
68✔
183
                _con.Close();
68✔
184

185
            _reader?.Dispose();
68✔
186
            cmd?.Dispose();
68!
187

188
            //do not do this more than once! which could happen if they abort then it disposes
189
            _con = null;
68✔
190
        }
68✔
191
        catch (Exception e)
×
192
        {
193
            listener.OnNotify(this,
×
194
                new NotifyEventArgs(ProgressEventType.Warning, "Could not close Reader / Connection", e));
×
195
        }
×
196
    }
68✔
197

198
    public DataTable TryGetPreview()
199
    {
200
        var chunk = new DataTable();
×
201
        using (var con = DatabaseCommandHelper.GetConnection(_builder))
×
202
        {
203
            con.Open();
×
204
            using (var da = DatabaseCommandHelper.GetDataAdapter(DatabaseCommandHelper.GetCommand(Sql, con)))
×
205
            {
206
                var read = da.Fill(0, 100, chunk);
×
207

208
                if (read == 0)
×
209
                    return null;
×
210
            }
×
211

212
            return chunk;
×
213
        }
214
    }
×
215
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc