• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Avanade / NTangle / 11759370733

09 Nov 2024 08:54PM UTC coverage: 74.159% (+9.7%) from 64.469%
11759370733

Pull #52

github

web-flow
Merge 6df4947c1 into 5cd46db5b
Pull Request #52: v3.0.0

256 of 368 branches covered (69.57%)

Branch coverage included in aggregate %.

830 of 978 new or added lines in 40 files covered. (84.87%)

70 existing lines in 1 file now uncovered.

1199 of 1594 relevant lines covered (75.22%)

28.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.36
/src/NTangle/Cdc/EntityOrchestratorBase.cs
1
// Copyright (c) Avanade. Licensed under the MIT License. See https://github.com/Avanade/NTangle
2

3
using CoreEx;
4
using CoreEx.Configuration;
5
using CoreEx.Database;
6
using CoreEx.Events;
7
using CoreEx.Json;
8
using CoreEx.Results;
9
using Microsoft.Extensions.Logging;
10
using System;
11
using System.Collections.Generic;
12
using System.Diagnostics;
13
using System.Threading;
14
using System.Threading.Tasks;
15

16
namespace NTangle.Cdc
17
{
18
    /// <summary>
19
    /// Provides the base (internal) Change Data Capture (CDC) entity (aggregate root) orchestration capability.
20
    /// </summary>
21
    /// <typeparam name="TEntity">The root entity <see cref="Type"/>.</typeparam>
22
    /// <typeparam name="TEntityEnvelopeColl">The <typeparamref name="TEntityEnvelope"/> collection <see cref="Type"/>.</typeparam>
23
    /// <typeparam name="TEntityEnvelope">The <typeparamref name="TEntity"/> envelope <see cref="Type"/>.</typeparam>
24
    public abstract class EntityOrchestratorBase<TEntity, TEntityEnvelopeColl, TEntityEnvelope> : EntityOrchestratorCore<TEntity, TEntityEnvelopeColl, TEntityEnvelope>, IEntityOrchestrator
25
        where TEntity : class, IEntity, new()
26
        where TEntityEnvelopeColl : List<TEntityEnvelope>, new()
27
        where TEntityEnvelope : class, TEntity, IEntityEnvelope, new()
28
    {
29
        /// <summary>
30
        /// Initializes a new instance of the <see cref="EntityOrchestratorBase{TEntity, TEntityEnvelopeColl, TEntityEnvelope}"/> class with no identifier mapping support.
31
        /// </summary>
32
        /// <param name="database">The <see cref="IDatabase"/>.</param>
33
        /// <param name="eventPublisher">The <see cref="IEventPublisher"/>.</param>
34
        /// <param name="jsonSerializer">The <see cref="IJsonSerializer"/>.</param>
35
        /// <param name="settings">The <see cref="SettingsBase"/>.</param>
36
        /// <param name="logger">The <see cref="ILogger"/>.</param>
37
        internal EntityOrchestratorBase(IDatabase database, IEventPublisher eventPublisher, IJsonSerializer jsonSerializer, SettingsBase settings, ILogger logger)
38
            : base(eventPublisher, jsonSerializer, settings, logger) => Database = database.ThrowIfNull(nameof(database));
36✔
39

40
        /// <summary>
41
        /// Gets the <see cref="IDatabase"/>.
42
        /// </summary>
43
        public IDatabase Database { get; }
53✔
44

45
        /// <summary>
46
        /// Gets the name of the batch <b>execute</b> stored procedure.
47
        /// </summary>
48
        protected abstract string ExecuteStoredProcedureName { get; }
49

50
        /// <summary>
51
        /// Gets the name of the batch <b>complete</b> stored procedure.
52
        /// </summary>
53
        protected abstract string CompleteStoredProcedureName { get; }
54

55
        /// <inheritdoc/>
56
        public override Task<EntityOrchestratorResult> CompleteAsync(long batchTrackerId, List<VersionTracker> versionTracking, CancellationToken cancellationToken = default)
57
            => CompleteAsync(Database, CompleteStoredProcedureName, batchTrackerId, versionTracking, cancellationToken);
20✔
58

59
        /// <inheritdoc/>
60
        public override async Task<EntityOrchestratorResult> ExecuteAsync(CancellationToken cancellationToken = default)
61
        {
29✔
62
            try
63
            {
29✔
64
                var result = new EntityOrchestratorResult<TEntityEnvelopeColl, TEntityEnvelope> { ExecutionId = ExecutionId = Guid.NewGuid() };
29✔
65
                return await EntityOrchestratorInvoker.Current.InvokeAsync(this, result, async (_, result, ct) => await ExecuteInternalAsync(result, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
58✔
66
            }
67
            finally
68
            {
29✔
69
                ExecutionId = Guid.Empty;
29✔
70
            }
29✔
71
        }
29✔
72

73
        /// <summary>
74
        /// Performs the actual execution.
75
        /// </summary>
76
        private async Task<EntityOrchestratorResult> ExecuteInternalAsync(EntityOrchestratorResult<TEntityEnvelopeColl, TEntityEnvelope> result, CancellationToken cancellationToken = default)
77
        {
29✔
78
            try
79
            {
29✔
80
                // Get the requested CDC batch data.
81
                LogQueryStart();
29✔
82
                var stopwatch = Stopwatch.StartNew();
29✔
83

84
                await CdcBatchQueryAsync(result, cancellationToken).ConfigureAwait(false);
29✔
85

86
                stopwatch.Stop();
27✔
87
                LogQueryComplete(result, stopwatch);
27✔
88
                if (result.BatchTracker == null)
27✔
89
                    return result;
7✔
90

91
                // Consolidate the results.
92
                await ConsolidateAsync(result, cancellationToken).ConfigureAwait(false);
20✔
93

94
                // Post-consolidation processing.
95
                await PostConsolidationAsync(result, cancellationToken).ConfigureAwait(false);
20✔
96

97
                // Additional processing.
98
                await AdditionalAsync(result, cancellationToken).ConfigureAwait(false);
20✔
99

100
                // Determine whether anything may have been sent before (version tracking) and exclude (i.e. do not send again).
101
                var tracking = await VersionAsync(result, cancellationToken).ConfigureAwait(false);
20✔
102

103
                // Publish & send the events.
104
                await PublishAndSendEventsAsync(result, cancellationToken).ConfigureAwait(false);
20✔
105

106
                // Complete the batch.
107
                var cresult = await CompleteAsync(result.BatchTracker.Id, tracking, cancellationToken).ConfigureAwait(false);
20✔
108
                cresult.ExecuteStatus = result.ExecuteStatus;
20✔
109
                return cresult;
20✔
110
            }
111
            catch (Exception ex)
2✔
112
            {
2✔
113
                var eresult = ExceptionHandler(result, ex);
2✔
114
                if (eresult is not null)
2!
115
                    return eresult;
2✔
116

NEW
117
                throw;
×
118
            }
119
        }
29✔
120

121
        /// <summary>
122
        /// Queries the CDC (Change Data Capture) for the batch and gets the resulting changes.
123
        /// </summary>
124
        private Task CdcBatchQueryAsync(EntityOrchestratorResult<TEntityEnvelopeColl, TEntityEnvelope> result, CancellationToken cancellationToken)
125
        {
29✔
126
            return EntityOrchestratorInvoker.Current.InvokeAsync(this, result, async (_, result, ct) =>
29✔
127
            {
29✔
128
                await GetBatchEntityDataAsync(result, cancellationToken).ConfigureAwait(false);
29✔
129
                result.ExecuteStatus = new EntityOrchestratorExecuteStatus { InitialCount = result.Result.Count };
27✔
130
            }, cancellationToken);
56✔
131
        }
29✔
132

133
        /// <summary>
134
        /// Executes a multi-dataset query command with one or more <paramref name="multiSetArgs"/>; whilst also outputing the resulting return value.
135
        /// </summary>
136
        /// <param name="result">The <see cref="EntityOrchestratorResult{TEntityEnvelopeColl, TEntityEnvelope}"/>.</param>
137
        /// <param name="multiSetArgs">One or more <see cref="IMultiSetArgs"/>.</param>
138
        /// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
139
        protected async Task SelectQueryMultiSetAsync(EntityOrchestratorResult<TEntityEnvelopeColl, TEntityEnvelope> result, IEnumerable<IMultiSetArgs> multiSetArgs, CancellationToken cancellationToken = default)
140
        {
29✔
141
            var msa = new List<IMultiSetArgs> { new MultiSetSingleArgs<BatchTracker>(BatchTrackerMapper, r => result.BatchTracker = r, isMandatory: false, stopOnNull: true) };
49✔
142
            msa.AddRange(multiSetArgs);
29✔
143

144
            await Database.StoredProcedure(ExecuteStoredProcedureName).Params(p =>
29✔
145
            {
29✔
146
                p.AddParameter(MaxQuerySizeParamName, MaxQuerySize);
29✔
147
                p.AddParameter(ContinueWithDataLossParamName, ContinueWithDataLoss);
29✔
148
            }).SelectMultiSetAsync(msa, cancellationToken).ConfigureAwait(false);
58✔
149
        }
27✔
150

151
        /// <summary>
152
        /// Gets the batch entity data from the database.
153
        /// </summary>
154
        /// <param name="result">The <see cref="EntityOrchestratorResult{TEntityEnvelopeColl, TEntityEnvelope}"/>.</param>
155
        /// <param name="cancellationToken">The <see cref="CancellationToken"/>.</param>
156
        /// <returns>A <typeparamref name="TEntityEnvelopeColl"/>.</returns>
157
        protected abstract Task GetBatchEntityDataAsync(EntityOrchestratorResult<TEntityEnvelopeColl, TEntityEnvelope> result, CancellationToken cancellationToken);
158
    }
159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc