• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

loresoft / FluentCommand / 24791405613

22 Apr 2026 04:57PM UTC coverage: 54.862% (+0.3%) from 54.554%
24791405613

push

github

pwelter34
Merge branch 'master' of https://github.com/loresoft/FluentCommand

1399 of 3215 branches covered (43.51%)

Branch coverage included in aggregate %.

4119 of 6843 relevant lines covered (60.19%)

315.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.13
/src/FluentCommand.SqlServer/Merge/DataMerge.cs
1
using System.ComponentModel.DataAnnotations;
2
using System.Data;
3
using System.Data.Common;
4

5
using FluentCommand.Extensions;
6

7
using Microsoft.Data.SqlClient;
8

9
namespace FluentCommand.Merge;
10

11
/// <summary>
12
/// Provides a fluent API for configuring and executing SQL Server data merge operations, supporting insert, update, delete, and output of changes.
13
/// </summary>
14
public class DataMerge : DisposableBase, IDataMerge
15
{
16
    private readonly IDataSession _dataSession;
17
    private readonly DataMergeDefinition _mergeDefinition;
18

19
    private int _commandTimeout = 0;
20

21
    /// <summary>
22
    /// Initializes a new instance of the <see cref="DataMerge"/> class.
23
    /// </summary>
24
    /// <param name="dataSession">The data session used for the merge operation.</param>
25
    /// <param name="mergeDefinition">The data merge definition containing mapping and configuration.</param>
26
    public DataMerge(IDataSession dataSession, DataMergeDefinition mergeDefinition)
7✔
27
    {
28
        _dataSession = dataSession;
7✔
29
        _mergeDefinition = mergeDefinition;
7✔
30
    }
7✔
31

32
    /// <inheritdoc/>
33
    public IDataMerge IncludeInsert(bool value = true)
34
    {
35
        _mergeDefinition.IncludeInsert = value;
×
36
        return this;
×
37
    }
38

39
    /// <inheritdoc/>
40
    public IDataMerge IncludeUpdate(bool value = true)
41
    {
42
        _mergeDefinition.IncludeUpdate = value;
×
43
        return this;
×
44
    }
45

46
    /// <inheritdoc/>
47
    public IDataMerge IncludeDelete(bool value = true)
48
    {
49
        _mergeDefinition.IncludeDelete = value;
×
50
        return this;
×
51
    }
52

53
    /// <inheritdoc/>
54
    public IDataMerge IdentityInsert(bool value = true)
55
    {
56
        _mergeDefinition.IdentityInsert = value;
×
57
        return this;
×
58
    }
59

60
    /// <inheritdoc/>
61
    public IDataMerge TargetTable(string value)
62
    {
63
        _mergeDefinition.TargetTable = value;
×
64
        return this;
×
65
    }
66

67
    /// <inheritdoc/>
68
    public IDataMerge Map(Action<DataMergeMapping> builder)
69
    {
70
        var dataMapping = new DataMergeMapping(_mergeDefinition);
×
71
        builder(dataMapping);
×
72

73
        return this;
×
74
    }
75

76
    /// <inheritdoc/>
77
    public IDataMerge Map<TEntity>(Action<DataMergeMapping<TEntity>> builder)
78
        where TEntity : class
79
    {
80
        var dataMapping = new DataMergeMapping<TEntity>(_mergeDefinition);
7✔
81
        builder(dataMapping);
7✔
82

83
        return this;
7✔
84
    }
85

86
    /// <inheritdoc/>
87
    public IDataMerge Mode(DataMergeMode mergeMode)
88
    {
89
        _mergeDefinition.Mode = mergeMode;
3✔
90
        return this;
3✔
91
    }
92

93
    /// <inheritdoc/>
94
    public IDataMerge CommandTimeout(int timeout)
95
    {
96
        _commandTimeout = timeout;
×
97
        return this;
×
98
    }
99

100
    /// <inheritdoc/>
101
    public int Execute<TEntity>(IEnumerable<TEntity> data)
102
        where TEntity : class
103
    {
104
        if (data == null)
2!
105
            throw new ArgumentNullException(nameof(data));
×
106

107
        var ignoreNames = _mergeDefinition.Columns
2✔
108
            .Where(c => c.IsIgnored)
2✔
109
            .Select(c => c.SourceColumn);
2✔
110

111
        var rows = data.Count();
2✔
112
        using var listDataReader = new ListDataReader<TEntity>(data, ignoreNames);
2✔
113

114
        int result = 0;
2✔
115
        Merge(listDataReader, rows, command => result = command.ExecuteNonQuery());
2✔
116

117
        return result;
2✔
118
    }
2✔
119

120
    /// <inheritdoc/>
121
    public int Execute(DataTable tableData)
122
    {
123
        if (tableData == null)
×
124
            throw new ArgumentNullException(nameof(tableData));
×
125

126
        var rows = tableData.Rows.Count;
×
127
        var reader = tableData.CreateDataReader();
×
128

129
        int result = 0;
×
130
        Merge(reader, rows, command => result = command.ExecuteNonQuery());
×
131

132
        return result;
×
133
    }
134

135
    /// <inheritdoc/>
136
    public async Task<int> ExecuteAsync<TEntity>(IEnumerable<TEntity> data, CancellationToken cancellationToken = default)
137
        where TEntity : class
138
    {
139
        var ignoreNames = _mergeDefinition.Columns
3✔
140
            .Where(c => c.IsIgnored)
3✔
141
            .Select(c => c.SourceColumn);
3✔
142

143
        var rows = data.Count();
3✔
144
        using var listDataReader = new ListDataReader<TEntity>(data, ignoreNames);
3✔
145

146
        int result = 0;
3✔
147
        await MergeAsync(listDataReader, rows, cancellationToken, async (command, token) =>
3✔
148
        {
3✔
149
            result = await command
3✔
150
                .ExecuteNonQueryAsync(token)
3✔
151
                .ConfigureAwait(false);
3✔
152

3✔
153
        }).ConfigureAwait(false);
3✔
154

155
        return result;
3✔
156
    }
3✔
157

158
    /// <inheritdoc/>
159
    public async Task<int> ExecuteAsync(DataTable tableData, CancellationToken cancellationToken = default)
160
    {
161
        var rows = tableData.Rows.Count;
×
162
        var reader = tableData.CreateDataReader();
×
163

164
        int result = 0;
×
165
        await MergeAsync(reader, rows, cancellationToken, async (command, token) =>
×
166
            {
×
167
                result = await command
×
168
                    .ExecuteNonQueryAsync(token)
×
169
                    .ConfigureAwait(false);
×
170

×
171
            }).ConfigureAwait(false);
×
172

173
        return result;
×
174
    }
×
175

176
    /// <inheritdoc/>
177
    public IEnumerable<DataMergeOutputRow> ExecuteOutput<TEntity>(IEnumerable<TEntity> data)
178
        where TEntity : class
179
    {
180
        // update definition to include output
181
        _mergeDefinition.IncludeOutput = true;
1✔
182

183
        var results = Enumerable.Empty<DataMergeOutputRow>();
1✔
184

185
        var ignoreNames = _mergeDefinition.Columns
1✔
186
            .Where(c => c.IsIgnored)
1✔
187
            .Select(c => c.SourceColumn);
1✔
188

189
        var columns = _mergeDefinition.Columns
1✔
190
            .Where(c => c.IsIgnored == false)
1✔
191
            .ToList();
1✔
192

193
        var rows = data.Count();
1✔
194
        using var listDataReader = new ListDataReader<TEntity>(data, ignoreNames);
1✔
195

196
        // run merge capturing output
197
        Merge(listDataReader, rows, command =>
1✔
198
        {
1✔
199
            using var reader = command.ExecuteReader();
1✔
200
            results = CaptureOutput(reader, columns);
1✔
201
        });
1✔
202

203
        return results;
1✔
204
    }
1✔
205

206
    /// <inheritdoc/>
207
    public IEnumerable<DataMergeOutputRow> ExecuteOutput(DataTable table)
208
    {
209
        // update definition to include output
210
        _mergeDefinition.IncludeOutput = true;
×
211

212
        var results = Enumerable.Empty<DataMergeOutputRow>();
×
213
        var columns = _mergeDefinition.Columns
×
214
            .Where(c => c.IsIgnored == false)
×
215
            .ToList();
×
216

217
        var rows = table.Rows.Count;
×
218
        var dataTableReader = table.CreateDataReader();
×
219

220
        // run merge capturing output
221
        Merge(dataTableReader, rows, command =>
×
222
        {
×
223
            using var reader = command.ExecuteReader();
×
224
            results = CaptureOutput(reader, columns);
×
225
        });
×
226

227
        return results;
×
228
    }
229

230
    /// <inheritdoc/>
231
    public async Task<IEnumerable<DataMergeOutputRow>> ExecuteOutputAsync<TEntity>(IEnumerable<TEntity> data, CancellationToken cancellationToken = default)
232
        where TEntity : class
233
    {
234
        // update definition to include output
235
        _mergeDefinition.IncludeOutput = true;
1✔
236

237
        var results = Enumerable.Empty<DataMergeOutputRow>();
1✔
238

239
        var ignoreNames = _mergeDefinition.Columns
1✔
240
            .Where(c => c.IsIgnored)
1✔
241
            .Select(c => c.SourceColumn);
1✔
242

243
        var columns = _mergeDefinition.Columns
1✔
244
            .Where(c => c.IsIgnored == false)
1✔
245
            .ToList();
1✔
246

247
        var rows = data.Count();
1✔
248
        using var listDataReader = new ListDataReader<TEntity>(data, ignoreNames);
1✔
249

250
        // run merge capturing output
251
        await MergeAsync(listDataReader, rows, cancellationToken, async (command, token) =>
1✔
252
        {
1✔
253
            using var reader = await command.ExecuteReaderAsync(token).ConfigureAwait(false);
1✔
254
            results = await CaptureOutputAsync(reader, columns, cancellationToken);
1✔
255
        });
1✔
256

257
        return results;
1✔
258
    }
1✔
259

260
    /// <inheritdoc/>
261
    public async Task<IEnumerable<DataMergeOutputRow>> ExecuteOutputAsync(DataTable table, CancellationToken cancellationToken = default)
262
    {
263
        // update definition to include output
264
        _mergeDefinition.IncludeOutput = true;
×
265

266
        var results = Enumerable.Empty<DataMergeOutputRow>();
×
267
        var columns = _mergeDefinition.Columns
×
268
            .Where(c => c.IsIgnored == false)
×
269
            .ToList();
×
270

271
        var rows = table.Rows.Count;
×
272
        var dataTableReader = table.CreateDataReader();
×
273

274
        // run merge capturing output
275
        await MergeAsync(dataTableReader, rows, cancellationToken, async (command, token) =>
×
276
        {
×
277
            using var reader = await command.ExecuteReaderAsync(token).ConfigureAwait(false);
×
278
            results = await CaptureOutputAsync(reader, columns, cancellationToken);
×
279
        });
×
280

281
        return results;
×
282
    }
×
283

284
    /// <summary>
285
    /// Validates the specified merge definition for correctness and required configuration.
286
    /// </summary>
287
    /// <param name="mergeDefinition">The merge definition to validate.</param>
288
    /// <param name="isBulk"><c>true</c> if the merge mode is bulk copy; otherwise, <c>false</c>.</param>
289
    /// <returns><c>true</c> if the definition is valid; otherwise, an exception is thrown.</returns>
290
    /// <exception cref="ValidationException">
291
    /// Thrown if required properties are missing or invalid in the merge definition.
292
    /// </exception>
293
    public static bool Validate(DataMergeDefinition mergeDefinition, bool isBulk)
294
    {
295
        if (mergeDefinition.TargetTable.IsNullOrEmpty())
7!
296
            throw new ValidationException("TargetTable is require for the merge definition.");
×
297

298
        // generate temporary name if not set
299
        if (mergeDefinition.TemporaryTable.IsNullOrEmpty())
7!
300
            mergeDefinition.TemporaryTable = "#Merge" + DateTime.Now.Ticks;
×
301

302
        // make sure it starts with #
303
        if (!mergeDefinition.TemporaryTable.StartsWith("#"))
7!
304
            mergeDefinition.TemporaryTable = "#" + mergeDefinition.TemporaryTable;
×
305

306
        // filter ignored columns
307
        var mergeColumns = mergeDefinition.Columns
7✔
308
            .Where(c => !c.IsIgnored)
7✔
309
            .ToList();
7✔
310

311
        if (mergeColumns.Count == 0)
7!
312
            throw new ValidationException("At least one column is required for the merge definition.");
×
313

314
        if (mergeColumns.Count(c => c.IsKey) == 0)
7!
315
            throw new ValidationException("At least one column is required to be marked as a key for the merge definition.");
×
316

317
        for (int i = 0; i < mergeColumns.Count; i++)
90✔
318
        {
319
            var column = mergeColumns[i];
38✔
320

321
            if (column.SourceColumn.IsNullOrEmpty())
38!
322
                throw new ValidationException("SourceColumn is require for column index {0} merge definition.".FormatWith(i));
×
323

324
            // use source if no target
325
            if (column.TargetColumn.IsNullOrEmpty())
38!
326
                column.TargetColumn = column.SourceColumn;
×
327

328
            if (isBulk && column.NativeType.IsNullOrEmpty())
38!
329
                throw new ValidationException("NativeType is require for column '{0}' merge definition.".FormatWith(column.SourceColumn));
×
330
        }
331

332
        return true;
7✔
333
    }
334

335
    // Private helpers (not part of public API, so no XML docs needed)
336
    private void Merge(IDataReader reader, int rows, Action<DbCommand> executeFactory)
337
    {
338
        var isBulk = _mergeDefinition.Mode == DataMergeMode.BulkCopy
3!
339
                     || (_mergeDefinition.Mode == DataMergeMode.Auto && rows > 1000);
3✔
340

341
        // Step 1, validate definition
342
        if (!Validate(_mergeDefinition, isBulk))
3!
343
            return;
×
344

345
        try
346
        {
347
            _dataSession.EnsureConnection();
3✔
348

349
            var sqlConnection = _dataSession.Connection as SqlConnection;
3✔
350
            if (sqlConnection == null)
3!
351
                throw new InvalidOperationException(
×
352
                    "Bulk-Copy only supported by SQL Server.  Make sure DataSession was create with a valid SqlConnection.");
×
353

354
            var sqlTransaction = _dataSession.Transaction as SqlTransaction;
3✔
355
            string mergeSql;
356

357
            if (isBulk)
3✔
358
            {
359
                // Step 2, create temp table
360
                string tableSql = DataMergeGenerator.BuildTable(_mergeDefinition);
1✔
361
                using (var tableCommand = _dataSession.Connection.CreateCommand())
1✔
362
                {
363
                    tableCommand.CommandText = tableSql;
1✔
364
                    tableCommand.CommandType = CommandType.Text;
1✔
365
                    tableCommand.Transaction = sqlTransaction;
1✔
366

367
                    tableCommand.ExecuteNonQuery();
1✔
368
                }
1✔
369

370
                // Step 3, bulk copy into temp table
371
                using (var bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.Default, sqlTransaction))
1✔
372
                {
373
                    bulkCopy.DestinationTableName = _mergeDefinition.TemporaryTable;
1✔
374
                    bulkCopy.BatchSize = 1000;
1✔
375
                    foreach (var mergeColumn in _mergeDefinition.Columns.Where(c => !c.IsIgnored && c.CanBulkCopy))
10✔
376
                        bulkCopy.ColumnMappings.Add(mergeColumn.SourceColumn, mergeColumn.SourceColumn);
4✔
377

378
                    bulkCopy.WriteToServer(reader);
1✔
379
                }
1✔
380

381
                // Step 4, merge sql
382
                mergeSql = DataMergeGenerator.BuildMerge(_mergeDefinition);
1✔
383
            }
384
            else
385
            {
386
                // build merge from data
387
                mergeSql = DataMergeGenerator.BuildMerge(_mergeDefinition, reader);
2✔
388
            }
389

390
            // run merge statement
391
            using var mergeCommand = _dataSession.Connection.CreateCommand();
3✔
392

393
            mergeCommand.CommandText = mergeSql;
3✔
394
            mergeCommand.CommandType = CommandType.Text;
3✔
395
            mergeCommand.Transaction = sqlTransaction;
3✔
396

397
            if (_commandTimeout > 0)
3!
398
                mergeCommand.CommandTimeout = _commandTimeout;
×
399

400
            // run merge with factory
401
            executeFactory(mergeCommand);
3✔
402
        }
403
        finally
404
        {
405
            _dataSession.ReleaseConnection();
3✔
406
        }
3✔
407
    }
3✔
408

409
    private async Task MergeAsync(IDataReader reader, int rows, CancellationToken cancellationToken, Func<DbCommand, CancellationToken, Task> executeFactory)
410
    {
411
        var isBulk = _mergeDefinition.Mode == DataMergeMode.BulkCopy
4!
412
                     || (_mergeDefinition.Mode == DataMergeMode.Auto && rows > 1000);
4✔
413

414
        // Step 1, validate definition
415
        if (!Validate(_mergeDefinition, isBulk))
4!
416
            return;
×
417

418
        try
419
        {
420
            await _dataSession
4✔
421
                .EnsureConnectionAsync(cancellationToken)
4✔
422
                .ConfigureAwait(false);
4✔
423

424
            var sqlConnection = _dataSession.Connection as SqlConnection;
4✔
425
            if (sqlConnection == null)
4!
426
                throw new InvalidOperationException(
×
427
                    "Bulk-Copy only supported by SQL Server.  Make sure DataSession was create with a valid SqlConnection.");
×
428

429
            var sqlTransaction = _dataSession.Transaction as SqlTransaction;
4✔
430
            string mergeSql;
431

432
            if (isBulk)
4✔
433
            {
434
                // Step 2, create temp table
435
                string tableSql = DataMergeGenerator.BuildTable(_mergeDefinition);
2✔
436
                using (var tableCommand = _dataSession.Connection.CreateCommand())
2✔
437
                {
438
                    tableCommand.CommandText = tableSql;
2✔
439
                    tableCommand.CommandType = CommandType.Text;
2✔
440
                    tableCommand.Transaction = sqlTransaction;
2✔
441

442
                    await tableCommand
2✔
443
                        .ExecuteNonQueryAsync(cancellationToken)
2✔
444
                        .ConfigureAwait(false);
2✔
445
                }
2✔
446

447
                // Step 3, bulk copy into temp table
448
                using (var bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.Default, sqlTransaction))
2✔
449
                {
450
                    bulkCopy.DestinationTableName = _mergeDefinition.TemporaryTable;
2✔
451
                    bulkCopy.BatchSize = 1000;
2✔
452
                    foreach (var mergeColumn in _mergeDefinition.Columns.Where(c => !c.IsIgnored && c.CanBulkCopy))
24✔
453
                        bulkCopy.ColumnMappings.Add(mergeColumn.SourceColumn, mergeColumn.SourceColumn);
10✔
454

455
                    await bulkCopy
2✔
456
                        .WriteToServerAsync(reader, cancellationToken)
2✔
457
                        .ConfigureAwait(false);
2✔
458
                }
2✔
459

460
                // Step 4, merge sql
461
                mergeSql = DataMergeGenerator.BuildMerge(_mergeDefinition);
2✔
462
            }
463
            else
464
            {
465
                // build merge from data
466
                mergeSql = DataMergeGenerator.BuildMerge(_mergeDefinition, reader);
2✔
467
            }
468

469
            // run merge statement
470
            using var mergeCommand = _dataSession.Connection.CreateCommand();
4✔
471

472
            mergeCommand.CommandText = mergeSql;
4✔
473
            mergeCommand.CommandType = CommandType.Text;
4✔
474
            mergeCommand.Transaction = sqlTransaction;
4✔
475

476
            if (_commandTimeout > 0)
4!
477
                mergeCommand.CommandTimeout = _commandTimeout;
×
478

479
            // run merge with factory
480
            await executeFactory(mergeCommand, cancellationToken)
4✔
481
                .ConfigureAwait(false);
4✔
482
        }
4✔
483
        finally
484
        {
485
#if NETCOREAPP3_0_OR_GREATER
486
            await _dataSession.ReleaseConnectionAsync();
4✔
487
#else
488
            _dataSession.ReleaseConnection();
489
#endif
490
        }
491
    }
4✔
492

493
    private static IEnumerable<DataMergeOutputRow> CaptureOutput(IDataReader reader, List<DataMergeColumn> columns)
494
    {
495
        List<DataMergeOutputRow> results = new();
1✔
496

497
        var originalReader = new DataReaderWrapper(reader, DataMergeGenerator.OriginalPrefix);
1✔
498
        var currentReader = new DataReaderWrapper(reader, DataMergeGenerator.CurrentPrefix);
1✔
499

500
        while (reader.Read())
101✔
501
        {
502
            var output = new DataMergeOutputRow();
100✔
503

504
            var action = reader.GetString("Action");
100✔
505
            output.Action = action;
100✔
506

507
            foreach (var column in columns)
1,400✔
508
            {
509
                var name = column.SourceColumn;
600✔
510

511
                var outputColumn = new DataMergeOutputColumn();
600✔
512
                outputColumn.Name = name;
600✔
513
                outputColumn.Original = originalReader.GetValue(name);
600✔
514
                outputColumn.Current = currentReader.GetValue(name);
600✔
515
                outputColumn.Type = currentReader.GetFieldType(name);
600✔
516

517
                output.Columns.Add(outputColumn);
600✔
518
            }
519

520
            results.Add(output);
100✔
521
        }
522

523
        return results;
1✔
524
    }
525

526
    private static async Task<IEnumerable<DataMergeOutputRow>> CaptureOutputAsync(DbDataReader reader, List<DataMergeColumn> columns, CancellationToken cancellationToken)
527
    {
528
        List<DataMergeOutputRow> results = new();
1✔
529

530
        var originalReader = new DataReaderWrapper(reader, DataMergeGenerator.OriginalPrefix);
1✔
531
        var currentReader = new DataReaderWrapper(reader, DataMergeGenerator.CurrentPrefix);
1✔
532

533
        while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
101✔
534
        {
535
            var output = new DataMergeOutputRow();
100✔
536

537
            string action = reader.GetString("Action");
100✔
538
            output.Action = action;
100✔
539

540
            foreach (var column in columns)
1,400✔
541
            {
542
                string name = column.SourceColumn;
600✔
543

544
                var outputColumn = new DataMergeOutputColumn();
600✔
545
                outputColumn.Name = name;
600✔
546
                outputColumn.Original = originalReader.GetValue(name);
600✔
547
                outputColumn.Current = currentReader.GetValue(name);
600✔
548
                outputColumn.Type = currentReader.GetFieldType(name);
600✔
549

550
                output.Columns.Add(outputColumn);
600✔
551
            }
552

553
            results.Add(output);
100✔
554
        }
555

556
        return results;
1✔
557
    }
1✔
558
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc