• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 6233274834

19 Sep 2023 08:46AM UTC coverage: 36.409% (-0.6%) from 36.982%
6233274834

Pull #658

github

dkfellows
Merge branch 'master' into java-17
Pull Request #658: Update Java version to 17

1656 of 1656 new or added lines in 260 files covered. (100.0%)

8373 of 22997 relevant lines covered (36.41%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/TxrxProcess.java
1
/*
2
 * Copyright (c) 2018 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.transceiver;
17

18
import static java.lang.Integer.getInteger;
19
import static java.lang.String.format;
20
import static java.lang.System.nanoTime;
21
import static java.lang.Thread.sleep;
22
import static java.util.Collections.synchronizedMap;
23
import static java.util.Objects.requireNonNull;
24
import static org.slf4j.LoggerFactory.getLogger;
25
import static uk.ac.manchester.spinnaker.messages.Constants.SCP_RETRY_DEFAULT;
26
import static uk.ac.manchester.spinnaker.messages.Constants.SCP_TIMEOUT_DEFAULT;
27
import static uk.ac.manchester.spinnaker.messages.scp.SequenceNumberSource.SEQUENCE_LENGTH;
28
import static uk.ac.manchester.spinnaker.transceiver.ProcessException.makeInstance;
29
import static uk.ac.manchester.spinnaker.utils.UnitConstants.MSEC_PER_SEC;
30
import static uk.ac.manchester.spinnaker.utils.WaitUtils.waitUntil;
31

32
import java.io.IOException;
33
import java.io.InterruptedIOException;
34
import java.io.Serial;
35
import java.net.SocketTimeoutException;
36
import java.nio.ByteBuffer;
37
import java.util.ArrayList;
38
import java.util.BitSet;
39
import java.util.HashMap;
40
import java.util.List;
41
import java.util.Map;
42
import java.util.Objects;
43
import java.util.Set;
44
import java.util.WeakHashMap;
45
import java.util.function.Consumer;
46

47
import org.slf4j.Logger;
48

49
import com.google.errorprone.annotations.concurrent.GuardedBy;
50

51
import uk.ac.manchester.spinnaker.connections.ConnectionSelector;
52
import uk.ac.manchester.spinnaker.connections.SCPConnection;
53
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
54
import uk.ac.manchester.spinnaker.messages.scp.CheckOKResponse;
55
import uk.ac.manchester.spinnaker.messages.scp.CommandCode;
56
import uk.ac.manchester.spinnaker.messages.scp.ConnectionAwareMessage;
57
import uk.ac.manchester.spinnaker.messages.scp.EmptyResponse;
58
import uk.ac.manchester.spinnaker.messages.scp.NoResponse;
59
import uk.ac.manchester.spinnaker.messages.scp.PayloadedResponse;
60
import uk.ac.manchester.spinnaker.messages.scp.SCPRequest;
61
import uk.ac.manchester.spinnaker.messages.scp.SCPResponse;
62
import uk.ac.manchester.spinnaker.messages.scp.SCPResult;
63
import uk.ac.manchester.spinnaker.messages.scp.SCPResultMessage;
64
import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation;
65
import uk.ac.manchester.spinnaker.utils.ValueHolder;
66

67
/**
68
 * A process for talking to SpiNNaker efficiently that uses multiple connections
69
 * in communication (if appropriate).
70
 */
71
public class TxrxProcess {
72
        /** The default for the number of parallel channels. */
73
        protected static final int DEFAULT_NUM_CHANNELS = 8;
74

75
        /** The default for the number of instantaneously active channels. */
76
        protected static final int DEFAULT_INTERMEDIATE_CHANNEL_WAITS = 7;
77

78
        /**
79
         * The name of a <em>system property</em> that can override the default
80
         * timeouts. If specified as an integer, it gives the number of
81
         * milliseconds to wait before timing out a communication.
82
         */
83
        private static final String TIMEOUT_PROPERTY = "spinnaker.scp_timeout";
84

85
        /**
86
         * The name of a <em>system property</em> that can override the default
87
         * retries. If specified as an integer, it gives the number of retries
88
         * to perform (on timeout of receiving a reply) before timing out a
89
         * communication.
90
         */
91
        private static final String RETRY_PROPERTY = "spinnaker.scp_retries";
92

93
        private static final Logger log = getLogger(RequestPipeline.class);
×
94

95
        /**
96
         * The default number of outstanding responses to wait for before
97
         * continuing sending requests.
98
         */
99
        protected static final int DEFAULT_INTERMEDIATE_TIMEOUT_WAITS = 0;
100

101
        /**
102
         * The default number of times to resend any packet for any reason
103
         * before an error is triggered.
104
         */
105
        protected static final int SCP_RETRIES;
106

107
        /**
108
         * How long to wait between retries, in milliseconds.
109
         */
110
        protected static final int RETRY_DELAY_MS = 1;
111

112
        private static final String REASON_TIMEOUT = "timeout";
113

114
        /**
115
         * Packet minimum send interval, in <em>nanoseconds</em>.
116
         */
117
        private static final int INTER_SEND_INTERVAL_NS = 60000;
118

119
        /** The default for the timeout (in ms). */
120
        protected static final int SCP_TIMEOUT;
121

122
        static {
123
                // Read system properties
124
                SCP_TIMEOUT = getInteger(TIMEOUT_PROPERTY, SCP_TIMEOUT_DEFAULT);
×
125
                SCP_RETRIES = getInteger(RETRY_PROPERTY, SCP_RETRY_DEFAULT);
×
126
        }
127

128
        /**
129
         * The number of outstanding responses to wait for before continuing
130
         * sending requests.
131
         */
132
        final int numWaits;
133

134
        /** The number of requests to send before checking for responses. */
135
        final int numChannels;
136

137
        /**
138
         * The number of times to resend any packet for any reason before an
139
         * error is triggered.
140
         */
141
        final int numRetries;
142

143
        /**
144
         * The number of elapsed milliseconds after sending a packet before it
145
         * is considered a timeout.
146
         */
147
        final int packetTimeout;
148

149
        /**
150
         * How to select how to communicate.
151
         */
152
        private final ConnectionSelector<? extends SCPConnection> selector;
153

154
        private final Map<SCPConnection, RequestPipeline> requestPipelines;
155

156
        /**
157
         * The API of a single request.
158
         *
159
         * @author Donal Fellows
160
         */
161
        private interface Req {
162
                /**
163
                 * Tests whether the reasons for resending are consistently
164
                 * timeouts.
165
                 *
166
                 * @return True if all reasons are timeouts.
167
                 */
168
                boolean allTimeoutFailures();
169

170
                /**
171
                 * Get the command being sent in the request.
172
                 *
173
                 * @return The request's SCP command.
174
                 */
175
                CommandCode getCommand();
176

177
                /**
178
                 * Which core is the destination of the request?
179
                 *
180
                 * @return The core location.
181
                 */
182
                HasCoreLocation getDestination();
183

184
                /**
185
                 * Number of retries remaining for the packet.
186
                 *
187
                 * @return The number of retries that may still be performed.
188
                 */
189
                int getRetries();
190

191
                /**
192
                 * Get the list of reasons why a message was retried.
193
                 *
194
                 * @return The retry reasons.
195
                 */
196
                List<String> getRetryReasons();
197

198
                /**
199
                 * Report a failure to the process.
200
                 *
201
                 * @param exception
202
                 *            The problem that is being reported.
203
                 */
204
                void handleError(Exception exception);
205

206
                /**
207
                 * Handle the reception of a message.
208
                 *
209
                 * @param msg
210
                 *            the content of the message, in a little-endian buffer.
211
                 * @throws Exception
212
                 *             If something goes wrong.
213
                 */
214
                void parseReceivedResponse(SCPResultMessage msg) throws Exception;
215

216
                /**
217
                 * Send the request again.
218
                 *
219
                 * @param reason
220
                 *            Why the request is being sent again.
221
                 * @throws IOException
222
                 *             If the connection throws.
223
                 */
224
                void resend(Object reason) throws IOException;
225

226
                /**
227
                 * Send the request.
228
                 *
229
                 * @throws IOException
230
                 *             If the connection throws.
231
                 */
232
                void send() throws IOException;
233
        }
234

235
        /**
236
         * The maps of outstanding requests on each connection.
237
         */
238
        private static final Map<SCPConnection,
239
                        Map<Integer, Req>> OUTSTANDING_REQUESTS = new WeakHashMap<>();
×
240

241
        /**
242
         * An object used to track how many retries have been done, or
243
         * {@code null} if no such tracking is required.
244
         */
245
        private final RetryTracker retryTracker;
246

247
        // TODO handle multiple failures
248
        private Failure failure;
249

250
        /**
251
         * @param <Conn>
252
         *            The type of connection.
253
         * @param connectionSelector
254
         *            How to select how to communicate.
255
         * @param retryTracker
256
         *            Object used to track how many retries were used in an
257
         *            operation. May be {@code null} if no suck tracking is
258
         *            required.
259
         */
260
        protected <Conn extends SCPConnection> TxrxProcess(
261
                        ConnectionSelector<Conn> connectionSelector,
262
                        RetryTracker retryTracker) {
263
                this(connectionSelector, SCP_RETRIES, SCP_TIMEOUT, DEFAULT_NUM_CHANNELS,
×
264
                                DEFAULT_INTERMEDIATE_CHANNEL_WAITS, retryTracker);
265
        }
×
266

267
        /**
268
         * @param <Conn>
269
         *            The type of connection.
270
         * @param connectionSelector
271
         *            How to select how to communicate.
272
         * @param numRetries
273
         *            The number of times to retry a communication.
274
         * @param timeout
275
         *            The timeout (in ms) for the communication.
276
         * @param numChannels
277
         *            The number of parallel communications to support
278
         * @param intermediateChannelWaits
279
         *            How many parallel communications to launch at once. (??)
280
         * @param retryTracker
281
         *            Object used to track how many retries were used in an
282
         *            operation. May be {@code null} if no suck tracking is
283
         *            required.
284
         */
285
        protected <Conn extends SCPConnection> TxrxProcess(
286
                        ConnectionSelector<Conn> connectionSelector,
287
                        int numRetries, int timeout, int numChannels,
288
                        int intermediateChannelWaits, RetryTracker retryTracker) {
×
289
                this.requestPipelines = new HashMap<>();
×
290
                this.numRetries = numRetries;
×
291
                this.packetTimeout = timeout;
×
292
                this.numChannels = numChannels;
×
293
                this.numWaits = intermediateChannelWaits;
×
294
                this.selector = Objects.requireNonNull(connectionSelector);
×
295
                this.retryTracker = Objects.requireNonNull(retryTracker);
×
296
        }
×
297

298
        /**
299
         * Manufacture a pipeline to handle a request using the configured pipeline
300
         * parameters. Reuses an existing pipeline if it can.
301
         *
302
         * @param request
303
         *            The request it will handle.
304
         * @return The pipeline instance.
305
         */
306
        private RequestPipeline pipeline(SCPRequest<?> request) {
307
                return requestPipelines.computeIfAbsent(
×
308
                                selector.getNextConnection(request), RequestPipeline::new);
×
309
        }
310

311
        /**
312
         * Put the state in such a way that it definitely isn't recording an error.
313
         */
314
        private void resetFailureState() {
315
                this.failure = null;
×
316
        }
×
317

318
        /**
319
         * Wait for all outstanding requests sent by this process to receive replies
320
         * or time out. Then test if an error occurred on the SpiNNaker side, and
321
         * throw a process exception it if it did.
322
         *
323
         * @throws IOException
324
         *             If communications fail.
325
         * @throws ProcessException
326
         *             an exception that wraps the original exception that occurred.
327
         * @throws InterruptedException
328
         *             If the communications were interrupted.
329
         */
330
        protected final void finishBatch()
331
                        throws ProcessException, IOException, InterruptedException {
332
                for (var pipe : requestPipelines.values()) {
×
333
                        pipe.finish();
×
334
                }
×
335
                if (failure != null) {
×
336
                        var hdr = failure.req.sdpHeader;
×
337
                        throw makeInstance(hdr.getDestination(), failure.exn);
×
338
                }
339
        }
×
340

341
        /**
342
         * Send a request. The actual payload of the response to this request is to
343
         * be considered to be uninteresting provided it doesn't indicate a failure.
344
         * In particular, the response is a {@link EmptyResponse}.
345
         *
346
         * @param request
347
         *            The request to send.
348
         * @throws IOException
349
         *             If sending fails.
350
         * @throws InterruptedException
351
         *             If communications are interrupted while preparing to send.
352
         */
353
        protected final void sendRequest(SCPRequest<EmptyResponse> request)
354
                        throws IOException, InterruptedException {
355
                pipeline(request).send(request, null);
×
356
        }
×
357

358
        /**
359
         * Send a request and handle the response.
360
         *
361
         * @param <Resp>
362
         *            The type of response expected to the request.
363
         * @param request
364
         *            The request to send.
365
         * @param callback
366
         *            The callback that handles the request's response.
367
         * @throws IOException
368
         *             If sending fails.
369
         * @throws InterruptedException
370
         *             If communications are interrupted.
371
         */
372
        protected final <Resp extends CheckOKResponse> void sendRequest(
373
                        SCPRequest<Resp> request, Consumer<Resp> callback)
374
                        throws IOException, InterruptedException {
375
                pipeline(request).send(request,
×
376
                                requireNonNull(callback, "callback must be non-null"));
×
377
        }
×
378

379
        /**
380
         * Send a request for a response with a payload.
381
         *
382
         * @param <T>
383
         *            The type of parsed payload expected.
384
         * @param <R>
385
         *            The type of response expected to the request.
386
         * @param request
387
         *            The request to send.
388
         * @param callback
389
         *            The callback that handles the parsed payload.
390
         * @throws IOException
391
         *             If sending fails.
392
         * @throws InterruptedException
393
         *             If communications are interrupted.
394
         */
395
        protected final <T, R extends PayloadedResponse<T, ?>> void sendGet(
396
                        SCPRequest<R> request, Consumer<T> callback)
397
                        throws IOException, InterruptedException {
398
                pipeline(request).send(request, resp -> callback.accept(resp.get()));
×
399
        }
×
400

401
        /**
402
         * Do a synchronous call of an SCP operation, sending the given message and
403
         * completely processing the interaction before returning its response.
404
         *
405
         * @param request
406
         *            The request to send
407
         * @throws IOException
408
         *             If the communications fail
409
         * @throws ProcessException
410
         *             If the other side responds with a failure code
411
         * @throws InterruptedException
412
         *             If the communications were interrupted.
413
         */
414
        protected final void call(SCPRequest<EmptyResponse> request)
415
                                        throws IOException, ProcessException, InterruptedException {
416
                var holder = new ValueHolder<EmptyResponse>();
×
417
                resetFailureState();
×
418
                sendRequest(request, holder::setValue);
×
419
                finishBatch();
×
420
                assert holder.getValue().result == SCPResult.RC_OK;
×
421
        }
×
422

423
        /**
424
         * Do a synchronous call of an SCP operation, sending the given message and
425
         * completely processing the interaction before returning its parsed
426
         * payload.
427
         *
428
         * @param <T>
429
         *            The type of the payload of the response.
430
         * @param <R>
431
         *            The type of the response; implicit in the type of the request.
432
         * @param request
433
         *            The request to send
434
         * @return The successful response to the request
435
         * @throws IOException
436
         *             If the communications fail
437
         * @throws ProcessException
438
         *             If the other side responds with a failure code
439
         * @throws InterruptedException
440
         *             If the communications were interrupted.
441
         */
442
        protected final <T,
443
                        R extends PayloadedResponse<T, ?>> T retrieve(SCPRequest<R> request)
444
                                        throws IOException, ProcessException, InterruptedException {
445
                var holder = new ValueHolder<T>();
×
446
                resetFailureState();
×
447
                sendGet(request, holder::setValue);
×
448
                finishBatch();
×
449
                return holder.getValue();
×
450
        }
451

452
        /**
453
         * Send a one-way request. One way requests do not need to be finished.
454
         *
455
         * @param request
456
         *            The request to send. <em>Must</em> be a one-way request!
457
         * @throws IOException
458
         *             If sending fails.
459
         * @throws InterruptedException
460
         *             If communications are interrupted while preparing to send.
461
         */
462
        protected final void sendOneWayRequest(SCPRequest<NoResponse> request)
463
                        throws IOException, InterruptedException {
464
                pipeline(request).send(request);
×
465
        }
×
466

467
        /**
468
         * States that a particular request failed with a particular exception. The
469
         * request should not be retried once this has been generated.
470
         */
471
        private record Failure(SCPRequest<?> req, Exception exn) {
×
472
        }
473

474
        /**
475
         * Allows a set of SCP requests to be grouped together in a communication
476
         * across a number of channels for a given connection.
477
         * <p>
478
         * This class implements an SCP windowing, first suggested by Andrew Mundy.
479
         * This extends the idea by having both send and receive windows. These are
480
         * represented by the {@link TxrxProcess#numChannels} and the
481
         * {@link TxrxProcess#numWaits} fields of the enclosing class respectively.
482
         * This seems to help with the timeout issue; when a timeout is received,
483
         * all requests for which a reply has not been received can also timeout.
484
         *
485
         * @author Andrew Mundy
486
         * @author Andrew Rowley
487
         * @author Donal Fellows
488
         */
489
        class RequestPipeline {
490
                /** The connection over which the communication is to take place. */
491
                private final SCPConnection connection;
492

493
                /** The number of requests issued to this pipeline. */
494
                private int numRequests;
495

496
                /** The number of packets that have been resent. */
497
                private int numResent;
498

499
                /** The number of retries due to restartable errors. */
500
                private int numRetryCodeResent;
501

502
                /** The number of timeouts that occurred. */
503
                private int numTimeouts;
504

505
                /** A dictionary of sequence number &rarr; requests in progress. */
506
                @GuardedBy("itself")
507
                private final Map<Integer, Req> outstandingRequests;
508

509
                private long nextSendTime = 0;
×
510

511
                /**
512
                 * Per message record.
513
                 *
514
                 * @param <T>
515
                 *            The type of response expected to the request in the
516
                 *            message.
517
                 */
518
                private final class Request<T extends SCPResponse> implements Req {
519
                        /** Request in progress. */
520
                        private final SCPRequest<T> request;
521

522
                        /** Payload of request in progress. */
523
                        private final ByteBuffer requestData;
524

525
                        private int seq;
526

527
                        /** Callback function for response. */
528
                        private final Consumer<T> callback;
529

530
                        /** Retry reasons. */
531
                        private final List<String> retryReason;
532

533
                        /** Number of retries remaining for the packet. */
534
                        private int retries;
535

536
                        /**
537
                         * Make a record.
538
                         *
539
                         * @param request
540
                         *            The request.
541
                         * @param callback
542
                         *            The success callback.
543
                         */
544
                        private Request(SCPRequest<T> request, Consumer<T> callback) {
×
545
                                this.request = request;
×
546
                                this.requestData = request.getMessageData(connection.getChip());
×
547
                                this.seq = request.scpRequestHeader.getSequence();
×
548
                                this.callback = callback;
×
549
                                retryReason = new ArrayList<>();
×
550
                                retries = numRetries;
×
551
                        }
×
552

553
                        @Override
554
                        public void send() throws IOException {
555
                                if (waitUntil(nextSendTime)) {
×
556
                                        throw new InterruptedIOException(
×
557
                                                        "interrupted while waiting to send");
558
                                }
559
                                log.debug("Sending request {} with connection {}", request,
×
560
                                                connection);
561
                                switch (request.sdpHeader.getFlags()) {
×
562
                                case REPLY_EXPECTED, REPLY_EXPECTED_NO_P2P ->
563
                                        connection.send(requestData, seq);
×
564
                                default -> connection.send(requestData);
×
565
                                }
566
                                nextSendTime = nanoTime() + INTER_SEND_INTERVAL_NS;
×
567
                        }
×
568

569
                        @Override
570
                        public void resend(Object reason) throws IOException {
571
                                retries--;
×
572
                                retryReason.add(reason.toString());
×
573
                                if (retryTracker != null) {
×
574
                                        retryTracker.retryNeeded();
×
575
                                }
576
                                requestData.rewind();
×
577
                                send();
×
578
                        }
×
579

580
                        @Override
581
                        public boolean allTimeoutFailures() {
582
                                return retryReason.stream().allMatch(REASON_TIMEOUT::equals);
×
583
                        }
584

585
                        @Override
586
                        public void parseReceivedResponse(SCPResultMessage msg)
587
                                        throws Exception {
588
                                var response = msg.parsePayload(request);
×
589
                                if (callback != null) {
×
590
                                        callback.accept(response);
×
591
                                }
592
                        }
×
593

594
                        @Override
595
                        public SDPLocation getDestination() {
596
                                return request.sdpHeader.getDestination();
×
597
                        }
598

599
                        @Override
600
                        public void handleError(Exception exception) {
601
                                failure = new Failure(request, exception);
×
602
                        }
×
603

604
                        @Override
605
                        public CommandCode getCommand() {
606
                                return request.scpRequestHeader.command;
×
607
                        }
608

609
                        @Override
610
                        public int getRetries() {
611
                                return retries;
×
612
                        }
613

614
                        @Override
615
                        public List<String> getRetryReasons() {
616
                                return retryReason;
×
617
                        }
618
                }
619

620
                /**
621
                 * Create a request handling pipeline.
622
                 *
623
                 * @param connection
624
                 *            The connection over which the communication is to take
625
                 *            place
626
                 */
627
                RequestPipeline(SCPConnection connection) {
×
628
                        log.debug("Request pipeline {} using connection {}", this,
×
629
                                        connection);
630
                        this.connection = connection;
×
631
                        synchronized (OUTSTANDING_REQUESTS) {
×
632
                                outstandingRequests = OUTSTANDING_REQUESTS.computeIfAbsent(
×
633
                                                connection, __ -> synchronizedMap(new HashMap<>()));
×
634
                        }
×
635
                }
×
636

637
                // Various accessors for outstandingRequests to handle locking right
638

639
                private int numOutstandingRequests() {
640
                        synchronized (outstandingRequests) {
×
641
                                return outstandingRequests.size();
×
642
                        }
643
                }
644

645
                private Req getRequestForResult(SCPResultMessage msg) {
646
                        synchronized (outstandingRequests) {
×
647
                                return msg.pickRequest(outstandingRequests);
×
648
                        }
649
                }
650

651
                private void removeRequest(SCPResultMessage msg) {
652
                        synchronized (outstandingRequests) {
×
653
                                msg.removeRequest(outstandingRequests);
×
654
                        }
×
655
                }
×
656

657
                private Req getRequestForSeq(int seq) {
658
                        synchronized (outstandingRequests) {
×
659
                                return outstandingRequests.get(seq);
×
660
                        }
661
                }
662

663
                private List<Integer> listOutstandingSeqs() {
664
                        synchronized (outstandingRequests) {
×
665
                                return List.copyOf(outstandingRequests.keySet());
×
666
                        }
667
                }
668

669
                private void removeManySeqs(BitSet toRemove) {
670
                        synchronized (outstandingRequests) {
×
671
                                for (var seq : toRemove.stream().toArray()) {
×
672
                                        outstandingRequests.remove(seq);
×
673
                                }
674
                        }
×
675
                }
×
676

677
                /**
678
                 * Add an SCP request to the set to be sent. The request expects a
679
                 * reply.
680
                 *
681
                 * @param <T>
682
                 *            The type of response expected to the request.
683
                 * @param request
684
                 *            The SCP request to be sent
685
                 * @param callback
686
                 *            A callback function to call when the response has been
687
                 *            received; takes an SCPResponse as a parameter, or a
688
                 *            {@code null} if the response doesn't need to be processed.
689
                 * @throws IOException
690
                 *             If things go really wrong.
691
                 * @throws InterruptedException
692
                 *             If communications are interrupted (prior to sending).
693
                 */
694
                <T extends CheckOKResponse> void send(SCPRequest<T> request,
695
                                Consumer<T> callback) throws IOException, InterruptedException {
696
                        // If all the channels are used, start to receive packets
697
                        while (numOutstandingRequests() >= numChannels) {
×
698
                                multiRetrieve(numWaits);
×
699
                        }
700

701
                        // Send the request
702
                        registerRequest(request, callback).send();
×
703
                }
×
704

705
                /**
706
                 * Update the packet and store required details.
707
                 *
708
                 * @param <T>
709
                 *            The type of response expected to the request.
710
                 * @param request
711
                 *            The SCP request to be sent
712
                 * @param callback
713
                 *            A callback function to call when the response has been
714
                 *            received; takes an SCPResponse as a parameter, or a
715
                 *            {@code null} if the response doesn't need to be processed.
716
                 * @return The prepared and registered (but unsent) request.
717
                 * @throws DuplicateSequenceNumberException
718
                 *             If we couldn't mint a sequence number. Really shouldn't
719
                 *             happen!
720
                 */
721
                private <T extends CheckOKResponse> Request<T> registerRequest(
722
                                SCPRequest<T> request, Consumer<T> callback) {
723
                        if (request instanceof ConnectionAwareMessage cam) {
×
724
                                cam.setConnection(connection);
×
725
                        }
726
                        synchronized (outstandingRequests) {
×
727
                                int sequence = request.scpRequestHeader
×
728
                                                .issueSequenceNumber(outstandingRequests.keySet());
×
729

730
                                var req = new Request<>(request, callback);
×
731
                                log.debug("{}: sending message with sequence {}", this,
×
732
                                                sequence);
×
733
                                if (outstandingRequests.put(sequence, req) != null) {
×
734
                                        throw new DuplicateSequenceNumberException();
×
735
                                }
736
                                numRequests++;
×
737
                                return req;
×
738
                        }
739
                }
740

741
                /**
742
                 * Update the packet but don't remember it; it's a one-shot.
743
                 *
744
                 * @param request
745
                 *            The one-way SCP request to be sent
746
                 * @return The prepared (but unsent) request.
747
                 */
748
                private Request<NoResponse> unregisteredRequest(
749
                                SCPRequest<NoResponse> request) {
750
                        // Update the packet with a (non-valuable) sequence number
751
                        request.scpRequestHeader.issueSequenceNumber(Set.of());
×
752
                        log.debug("sending one-way message");
×
753
                        return new Request<>(request, null);
×
754
                }
755

756
                /**
757
                 * Send a one-way request.
758
                 *
759
                 * @param request
760
                 *            The one-way SCP request to be sent.
761
                 * @throws IOException
762
                 *             If things go really wrong.
763
                 * @throws InterruptedException
764
                 *             If communications are interrupted (prior to sending).
765
                 */
766
                void send(SCPRequest<NoResponse> request)
767
                                throws IOException, InterruptedException {
768
                        // Wait for all current in-flight responses to be received
769
                        finish();
×
770

771
                        // Send the request without registering it
772
                        unregisteredRequest(request).send();
×
773
                }
×
774

775
                /**
776
                 * Indicate the end of the packets to be sent. This must be called to
777
                 * ensure that all responses are received and handled.
778
                 *
779
                 * @throws IOException
780
                 *             If anything goes wrong with communications.
781
                 * @throws InterruptedException
782
                 *             If communications are interrupted.
783
                 */
784
                void finish() throws IOException, InterruptedException {
785
                        while (numOutstandingRequests() > 0) {
×
786
                                multiRetrieve(0);
×
787
                        }
788
                        log.debug("Finished called on {} with connection {}", this,
×
789
                                        connection);
790
                }
×
791

792
                /**
793
                 * Receives responses until there are only numPackets responses left.
794
                 *
795
                 * @param numPacketsOutstanding
796
                 *            The number of packets that can remain after running.
797
                 * @throws IOException
798
                 *             If anything goes wrong with receiving a packet.
799
                 * @throws InterruptedException
800
                 *             If communications are interrupted.
801
                 */
802
                private void multiRetrieve(int numPacketsOutstanding)
803
                                throws IOException, InterruptedException {
804
                        // While there are still more packets in progress than some
805
                        // threshold
806
                        while (numOutstandingRequests() > numPacketsOutstanding) {
×
807
                                try {
808
                                        // Receive the next response
809
                                        singleRetrieve();
×
810
                                } catch (SocketTimeoutException e) {
×
811
                                        handleReceiveTimeout();
×
812
                                }
×
813
                        }
814
                }
×
815

816
                private void singleRetrieve() throws IOException, InterruptedException {
817
                        // Receive the next response
818
                        log.debug("{}: Connection {} waiting for message... timeout of {}",
×
819
                                        this, connection, packetTimeout);
×
820
                        var msg = connection.receiveSCPResponse(packetTimeout);
×
821
                        if (log.isDebugEnabled()) {
×
822
                                log.debug(
×
823
                                                "{}, Connection {} received message {} with seq num {}",
824
                                                this, connection, msg.getResult(),
×
825
                                                msg.getSequenceNumber());
×
826
                        }
827
                        var req = getRequestForResult(msg);
×
828

829
                        // Only process responses which have matching requests
830
                        if (req == null) {
×
831
                                log.info("discarding message with unknown sequence number: {}",
×
832
                                                msg.getSequenceNumber());
×
833
                                if (log.isDebugEnabled()) {
×
834
                                        log.debug("current waiting on requests with seq's ");
×
835
                                        for (int seq : listOutstandingSeqs()) {
×
836
                                                log.debug("{}", seq);
×
837
                                        }
×
838
                                }
839
                                return;
×
840
                        }
841

842
                        // If the response can be retried, retry it
843
                        if (msg.isRetriable()) {
×
844
                                try {
845
                                        resend(req, msg.getResult(), msg.getSequenceNumber());
×
846
                                        numRetryCodeResent++;
×
847
                                } catch (SocketTimeoutException e) {
×
848
                                        throw e;
×
849
                                } catch (Exception e) {
×
850
                                        log.debug("throwing away request {} coz of {}",
×
851
                                                        msg.getSequenceNumber(), e);
×
852
                                        req.handleError(e);
×
853
                                        removeRequest(msg);
×
854
                                }
×
855
                        } else {
856
                                // No retry is possible - try constructing the result
857
                                try {
858
                                        req.parseReceivedResponse(msg);
×
859
                                } catch (Exception e) {
×
860
                                        req.handleError(e);
×
861
                                } finally {
862
                                        // Remove the sequence from the outstanding responses
863
                                        removeRequest(msg);
×
864
                                }
865
                        }
866
                }
×
867

868
                private void handleReceiveTimeout() {
869
                        numTimeouts++;
×
870

871
                        // If there is a timeout, all packets remaining are resent
872
                        var toRemove = new BitSet(SEQUENCE_LENGTH);
×
873
                        for (int seq : listOutstandingSeqs()) {
×
874
                                log.debug("resending seq {}", seq);
×
875
                                var req = getRequestForSeq(seq);
×
876
                                if (req == null) {
×
877
                                        // Shouldn't happen, but if it does we should nuke it.
878
                                        toRemove.set(seq);
×
879
                                        continue;
×
880
                                }
881

882
                                try {
883
                                        resend(req, REASON_TIMEOUT, seq);
×
884
                                } catch (Exception e) {
×
885
                                        log.debug("removing seq {}", seq);
×
886
                                        req.handleError(e);
×
887
                                        toRemove.set(seq);
×
888
                                }
×
889
                        }
×
890
                        log.debug("finish resending");
×
891

892
                        removeManySeqs(toRemove);
×
893
                }
×
894

895
                private void resend(Req req, Object reason, int seq)
896
                                throws IOException, InterruptedException {
897
                        if (req.getRetries() <= 0) {
×
898
                                // Report timeouts as timeout exception
899
                                if (req.allTimeoutFailures()) {
×
900
                                        throw new SendTimedOutException(req, packetTimeout, seq);
×
901
                                }
902

903
                                // Report any other exception
904
                                throw new SendFailedException(req, numRetries);
×
905
                        }
906

907
                        // If the request can be retried, retry it, sleep for 1ms seems
908
                        // to protect against weird errors. So don't remove this sleep.
909
                        sleep(RETRY_DELAY_MS);
×
910
                        req.resend(reason);
×
911
                        numResent++;
×
912
                }
×
913

914
                @Override
915
                public String toString() {
916
                        synchronized (outstandingRequests) {
×
917
                                return format(
×
918
                                                "%s(req=%d,outstanding=%d,resent=%d,"
919
                                                                + "restart=%d,timeouts=%d)",
920
                                                super.toString(), numRequests,
×
921
                                                outstandingRequests.size(),
×
922
                                                numResent, numRetryCodeResent, numTimeouts);
×
923
                        }
924
                }
925
        }
926

927
        /**
928
         * Indicates that a request timed out.
929
         */
930
        static class SendTimedOutException extends SocketTimeoutException {
931
                @Serial
932
                private static final long serialVersionUID = -7911020002602751941L;
933

934
                /**
935
                 * @param req
936
                 *            The request that timed out.
937
                 * @param timeout
938
                 *            The length of timeout, in milliseconds.
939
                 */
940
                SendTimedOutException(Req req, int timeout, int seqNum) {
941
                        super(format(
×
942
                                        "Operation %s timed out after %f seconds with seq num %d",
943
                                        req.getCommand(), timeout / (double) MSEC_PER_SEC, seqNum));
×
944
                }
×
945
        }
946

947
        /**
948
         * Indicates that a request could not be sent.
949
         */
950
        static class SendFailedException extends IOException {
951
                @Serial
952
                private static final long serialVersionUID = -5555562816486761027L;
953

954
                /**
955
                 * @param req
956
                 *            The request that timed out.
957
                 * @param numRetries
958
                 *            How many attempts to send it were made.
959
                 */
960
                SendFailedException(Req req, int numRetries) {
961
                        super(format(
×
962
                                        "Errors sending request %s to %s over %d retries: %s",
963
                                        req.getCommand(), req.getDestination(),
×
964
                                        numRetries, req.getRetryReasons()));
×
965
                }
×
966
        }
967

968
        /**
969
         * There's a duplicate sequence number! This really shouldn't happen.
970
         *
971
         * @author Donal Fellows
972
         */
973
        static class DuplicateSequenceNumberException
974
                        extends IllegalThreadStateException {
975
                @Serial
976
                private static final long serialVersionUID = -4033792283948201730L;
977

978
                DuplicateSequenceNumberException() {
979
                        super("duplicate sequence number catastrophe");
×
980
                }
×
981
        }
982
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc