• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / JavaSpiNNaker / 13989614444

20 Mar 2025 11:43AM UTC coverage: 38.216% (-0.4%) from 38.579%
13989614444

Pull #1222

github

rowleya
Fix issues from review
Pull Request #1222: More spalloc rest calls

70 of 815 new or added lines in 33 files covered. (8.59%)

29 existing lines in 13 files now uncovered.

9181 of 24024 relevant lines covered (38.22%)

1.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/TxrxProcess.java
1
/*
2
 * Copyright (c) 2018 The University of Manchester
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     https://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
package uk.ac.manchester.spinnaker.transceiver;
17

18
import static java.lang.Integer.getInteger;
19
import static java.lang.String.format;
20
import static java.lang.System.nanoTime;
21
import static java.lang.Thread.sleep;
22
import static java.util.Collections.synchronizedMap;
23
import static java.util.Objects.requireNonNull;
24
import static org.slf4j.LoggerFactory.getLogger;
25
import static uk.ac.manchester.spinnaker.messages.Constants.SCP_RETRY_DEFAULT;
26
import static uk.ac.manchester.spinnaker.messages.Constants.SCP_TIMEOUT_DEFAULT;
27
import static uk.ac.manchester.spinnaker.messages.scp.SequenceNumberSource.SEQUENCE_LENGTH;
28
import static uk.ac.manchester.spinnaker.transceiver.ProcessException.makeInstance;
29
import static uk.ac.manchester.spinnaker.utils.UnitConstants.MSEC_PER_SEC;
30
import static uk.ac.manchester.spinnaker.utils.WaitUtils.waitUntil;
31

32
import java.io.IOException;
33
import java.io.InterruptedIOException;
34
import java.net.SocketTimeoutException;
35
import java.nio.ByteBuffer;
36
import java.util.ArrayList;
37
import java.util.BitSet;
38
import java.util.HashMap;
39
import java.util.List;
40
import java.util.Map;
41
import java.util.Objects;
42
import java.util.Set;
43
import java.util.WeakHashMap;
44
import java.util.function.Consumer;
45

46
import org.slf4j.Logger;
47

48
import com.google.errorprone.annotations.concurrent.GuardedBy;
49

50
import uk.ac.manchester.spinnaker.connections.ConnectionSelector;
51
import uk.ac.manchester.spinnaker.connections.SCPConnection;
52
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
53
import uk.ac.manchester.spinnaker.messages.scp.CheckOKResponse;
54
import uk.ac.manchester.spinnaker.messages.scp.CommandCode;
55
import uk.ac.manchester.spinnaker.messages.scp.ConnectionAwareMessage;
56
import uk.ac.manchester.spinnaker.messages.scp.EmptyResponse;
57
import uk.ac.manchester.spinnaker.messages.scp.NoResponse;
58
import uk.ac.manchester.spinnaker.messages.scp.PayloadedResponse;
59
import uk.ac.manchester.spinnaker.messages.scp.SCPRequest;
60
import uk.ac.manchester.spinnaker.messages.scp.SCPResponse;
61
import uk.ac.manchester.spinnaker.messages.scp.SCPResult;
62
import uk.ac.manchester.spinnaker.messages.scp.SCPResultMessage;
63
import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation;
64
import uk.ac.manchester.spinnaker.utils.ValueHolder;
65

66
/**
67
 * A process for talking to SpiNNaker efficiently that uses multiple connections
68
 * in communication (if appropriate).
69
 */
70
public class TxrxProcess {
71
        /** The default for the number of parallel channels. */
72
        protected static final int DEFAULT_NUM_CHANNELS = 8;
73

74
        /** The default for the number of instantaneously active channels. */
75
        protected static final int DEFAULT_INTERMEDIATE_CHANNEL_WAITS = 7;
76

77
        /**
78
         * The name of a <em>system property</em> that can override the default
79
         * timeouts. If specified as an integer, it gives the number of
80
         * milliseconds to wait before timing out a communication.
81
         */
82
        private static final String TIMEOUT_PROPERTY = "spinnaker.scp_timeout";
83

84
        /**
85
         * The name of a <em>system property</em> that can override the default
86
         * retries. If specified as an integer, it gives the number of retries
87
         * to perform (on timeout of receiving a reply) before timing out a
88
         * communication.
89
         */
90
        private static final String RETRY_PROPERTY = "spinnaker.scp_retries";
91

92
        private static final Logger log = getLogger(RequestPipeline.class);
×
93

94
        /**
95
         * The default number of outstanding responses to wait for before
96
         * continuing sending requests.
97
         */
98
        protected static final int DEFAULT_INTERMEDIATE_TIMEOUT_WAITS = 0;
99

100
        /**
101
         * The default number of times to resend any packet for any reason
102
         * before an error is triggered.
103
         */
104
        protected static final int SCP_RETRIES;
105

106
        /**
107
         * How long to wait between retries, in milliseconds.
108
         */
109
        protected static final int RETRY_DELAY_MS = 1;
110

111
        private static final String REASON_TIMEOUT = "timeout";
112

113
        /**
114
         * Packet minimum send interval, in <em>nanoseconds</em>.
115
         */
116
        private static final int INTER_SEND_INTERVAL_NS = 60000;
117

118
        /** The default for the timeout (in ms). */
119
        protected static final int SCP_TIMEOUT;
120

121
        static {
122
                // Read system properties
123
                SCP_TIMEOUT = getInteger(TIMEOUT_PROPERTY, SCP_TIMEOUT_DEFAULT);
×
124
                SCP_RETRIES = getInteger(RETRY_PROPERTY, SCP_RETRY_DEFAULT);
×
125
        }
126

127
        /**
128
         * The number of outstanding responses to wait for before continuing
129
         * sending requests.
130
         */
131
        final int numWaits;
132

133
        /** The number of requests to send before checking for responses. */
134
        final int numChannels;
135

136
        /**
137
         * The number of times to resend any packet for any reason before an
138
         * error is triggered.
139
         */
140
        final int numRetries;
141

142
        /**
143
         * The number of elapsed milliseconds after sending a packet before it
144
         * is considered a timeout.
145
         */
146
        final int packetTimeout;
147

148
        /**
149
         * How to select how to communicate.
150
         */
151
        private final ConnectionSelector<? extends SCPConnection> selector;
152

153
        private final Map<SCPConnection, RequestPipeline> requestPipelines;
154

155
        /**
156
         * The API of a single request.
157
         *
158
         * @author Donal Fellows
159
         */
160
        private interface Req {
161
                /**
162
                 * Tests whether the reasons for resending are consistently
163
                 * timeouts.
164
                 *
165
                 * @return True if all reasons are timeouts.
166
                 */
167
                boolean allTimeoutFailures();
168

169
                /**
170
                 * Get the command being sent in the request.
171
                 *
172
                 * @return The request's SCP command.
173
                 */
174
                CommandCode getCommand();
175

176
                /**
177
                 * Which core is the destination of the request?
178
                 *
179
                 * @return The core location.
180
                 */
181
                HasCoreLocation getDestination();
182

183
                /**
184
                 * Number of retries remaining for the packet.
185
                 *
186
                 * @return The number of retries that may still be performed.
187
                 */
188
                int getRetries();
189

190
                /**
191
                 * Get the list of reasons why a message was retried.
192
                 *
193
                 * @return The retry reasons.
194
                 */
195
                List<String> getRetryReasons();
196

197
                /**
198
                 * Report a failure to the process.
199
                 *
200
                 * @param exception
201
                 *            The problem that is being reported.
202
                 */
203
                void handleError(Exception exception);
204

205
                /**
206
                 * Handle the reception of a message.
207
                 *
208
                 * @param msg
209
                 *            the content of the message, in a little-endian buffer.
210
                 * @throws Exception
211
                 *             If something goes wrong.
212
                 */
213
                void parseReceivedResponse(SCPResultMessage msg) throws Exception;
214

215
                /**
216
                 * Send the request again.
217
                 *
218
                 * @param reason
219
                 *            Why the request is being sent again.
220
                 * @throws IOException
221
                 *             If the connection throws.
222
                 */
223
                void resend(Object reason) throws IOException;
224

225
                /**
226
                 * Send the request.
227
                 *
228
                 * @throws IOException
229
                 *             If the connection throws.
230
                 */
231
                void send() throws IOException;
232
        }
233

234
        /**
235
         * The maps of outstanding requests on each connection.
236
         */
237
        private static final Map<SCPConnection,
238
                        Map<Integer, Req>> OUTSTANDING_REQUESTS = new WeakHashMap<>();
×
239

240
        /**
241
         * An object used to track how many retries have been done, or
242
         * {@code null} if no such tracking is required.
243
         */
244
        private final RetryTracker retryTracker;
245

246
        // TODO handle multiple failures
247
        private Failure failure;
248

249
        /**
250
         * @param <Conn>
251
         *            The type of connection.
252
         * @param connectionSelector
253
         *            How to select how to communicate.
254
         * @param retryTracker
255
         *            Object used to track how many retries were used in an
256
         *            operation. May be {@code null} if no suck tracking is
257
         *            required.
258
         */
259
        public <Conn extends SCPConnection> TxrxProcess(
260
                        ConnectionSelector<Conn> connectionSelector,
261
                        RetryTracker retryTracker) {
262
                this(connectionSelector, SCP_RETRIES, SCP_TIMEOUT, DEFAULT_NUM_CHANNELS,
×
263
                                DEFAULT_INTERMEDIATE_CHANNEL_WAITS, retryTracker);
264
        }
×
265

266
        /**
267
         * @param <Conn>
268
         *            The type of connection.
269
         * @param connectionSelector
270
         *            How to select how to communicate.
271
         * @param numRetries
272
         *            The number of times to retry a communication.
273
         * @param timeout
274
         *            The timeout (in ms) for the communication.
275
         * @param numChannels
276
         *            The number of parallel communications to support
277
         * @param intermediateChannelWaits
278
         *            How many parallel communications to launch at once. (??)
279
         * @param retryTracker
280
         *            Object used to track how many retries were used in an
281
         *            operation. May be {@code null} if no suck tracking is
282
         *            required.
283
         */
284
        protected <Conn extends SCPConnection> TxrxProcess(
285
                        ConnectionSelector<Conn> connectionSelector,
286
                        int numRetries, int timeout, int numChannels,
287
                        int intermediateChannelWaits, RetryTracker retryTracker) {
×
288
                this.requestPipelines = new HashMap<>();
×
289
                this.numRetries = numRetries;
×
290
                this.packetTimeout = timeout;
×
291
                this.numChannels = numChannels;
×
292
                this.numWaits = intermediateChannelWaits;
×
293
                this.selector = Objects.requireNonNull(connectionSelector);
×
NEW
294
                this.retryTracker = retryTracker;
×
295
        }
×
296

297
        /**
298
         * Manufacture a pipeline to handle a request using the configured pipeline
299
         * parameters. Reuses an existing pipeline if it can.
300
         *
301
         * @param request
302
         *            The request it will handle.
303
         * @return The pipeline instance.
304
         */
305
        private RequestPipeline pipeline(SCPRequest<?> request) {
NEW
306
                var connection = selector.getNextConnection(request);
×
NEW
307
                if (!requestPipelines.containsKey(connection)) {
×
NEW
308
                        var pipeline = new RequestPipeline(connection);
×
NEW
309
                        requestPipelines.put(connection, pipeline);
×
NEW
310
                        connection.setInUse();
×
311
                }
NEW
312
                return requestPipelines.get(connection);
×
313
        }
314

315
        /**
316
         * Put the state in such a way that it definitely isn't recording an error.
317
         */
318
        private void resetFailureState() {
319
                this.failure = null;
×
320
        }
×
321

322
        /**
323
         * Wait for all outstanding requests sent by this process to receive replies
324
         * or time out. Then test if an error occurred on the SpiNNaker side, and
325
         * throw a process exception it if it did.
326
         *
327
         * @throws IOException
328
         *             If communications fail.
329
         * @throws ProcessException
330
         *             an exception that wraps the original exception that occurred.
331
         * @throws InterruptedException
332
         *             If the communications were interrupted.
333
         */
334
        protected final void finishBatch()
335
                        throws ProcessException, IOException, InterruptedException {
336
                for (var pipe : requestPipelines.values()) {
×
337
                        pipe.finish();
×
338
                }
×
NEW
339
                requestPipelines.clear();
×
340
                if (failure != null) {
×
341
                        var hdr = failure.req.sdpHeader;
×
342
                        throw makeInstance(hdr.getDestination(), failure.exn);
×
343
                }
344
        }
×
345

346
        /**
347
         * Send a request. The actual payload of the response to this request is to
348
         * be considered to be uninteresting provided it doesn't indicate a failure.
349
         * In particular, the response is a {@link EmptyResponse}.
350
         *
351
         * @param request
352
         *            The request to send.
353
         * @throws IOException
354
         *             If sending fails.
355
         * @throws InterruptedException
356
         *             If communications are interrupted while preparing to send.
357
         */
358
        protected final void sendRequest(SCPRequest<EmptyResponse> request)
359
                        throws IOException, InterruptedException {
360
                pipeline(request).send(request, null);
×
361
        }
×
362

363
        /**
364
         * Send a request and handle the response.
365
         *
366
         * @param <Resp>
367
         *            The type of response expected to the request.
368
         * @param request
369
         *            The request to send.
370
         * @param callback
371
         *            The callback that handles the request's response.
372
         * @throws IOException
373
         *             If sending fails.
374
         * @throws InterruptedException
375
         *             If communications are interrupted.
376
         */
377
        protected final <Resp extends CheckOKResponse> void sendRequest(
378
                        SCPRequest<Resp> request, Consumer<Resp> callback)
379
                        throws IOException, InterruptedException {
380
                pipeline(request).send(request,
×
381
                                requireNonNull(callback, "callback must be non-null"));
×
382
        }
×
383

384
        /**
385
         * Send a request for a response with a payload.
386
         *
387
         * @param <T>
388
         *            The type of parsed payload expected.
389
         * @param <R>
390
         *            The type of response expected to the request.
391
         * @param request
392
         *            The request to send.
393
         * @param callback
394
         *            The callback that handles the parsed payload.
395
         * @throws IOException
396
         *             If sending fails.
397
         * @throws InterruptedException
398
         *             If communications are interrupted.
399
         */
400
        protected final <T, R extends PayloadedResponse<T, ?>> void sendGet(
401
                        SCPRequest<R> request, Consumer<T> callback)
402
                        throws IOException, InterruptedException {
403
                pipeline(request).send(request, resp -> callback.accept(resp.get()));
×
404
        }
×
405

406
        /**
407
         * Do a synchronous call of an SCP operation, sending the given message and
408
         * completely processing the interaction before returning its response.
409
         *
410
         * @param request
411
         *            The request to send
412
         * @throws IOException
413
         *             If the communications fail
414
         * @throws ProcessException
415
         *             If the other side responds with a failure code
416
         * @throws InterruptedException
417
         *             If the communications were interrupted.
418
         */
419
        public final void call(SCPRequest<EmptyResponse> request)
420
                                        throws IOException, ProcessException, InterruptedException {
421
                var holder = new ValueHolder<EmptyResponse>();
×
422
                resetFailureState();
×
423
                sendRequest(request, holder::setValue);
×
424
                finishBatch();
×
425
                assert holder.getValue().result == SCPResult.RC_OK;
×
426
        }
×
427

428
        /**
429
         * Do a synchronous call of an SCP operation, sending the given message and
430
         * completely processing the interaction before returning its parsed
431
         * payload.
432
         *
433
         * @param <T>
434
         *            The type of the payload of the response.
435
         * @param <R>
436
         *            The type of the response; implicit in the type of the request.
437
         * @param request
438
         *            The request to send
439
         * @return The successful response to the request
440
         * @throws IOException
441
         *             If the communications fail
442
         * @throws ProcessException
443
         *             If the other side responds with a failure code
444
         * @throws InterruptedException
445
         *             If the communications were interrupted.
446
         */
447
        public final <T,
448
                        R extends PayloadedResponse<T, ?>> T retrieve(SCPRequest<R> request)
449
                                        throws IOException, ProcessException, InterruptedException {
450
                var holder = new ValueHolder<T>();
×
451
                resetFailureState();
×
452
                sendGet(request, holder::setValue);
×
453
                finishBatch();
×
454
                return holder.getValue();
×
455
        }
456

457
        /**
458
         * Send a one-way request. One way requests do not need to be finished.
459
         *
460
         * @param request
461
         *            The request to send. <em>Must</em> be a one-way request!
462
         * @throws IOException
463
         *             If sending fails.
464
         * @throws InterruptedException
465
         *             If communications are interrupted while preparing to send.
466
         */
467
        protected final void sendOneWayRequest(SCPRequest<NoResponse> request)
468
                        throws IOException, InterruptedException {
469
                pipeline(request).send(request);
×
470
        }
×
471

472
        /**
473
         * States that a particular request failed with a particular exception. The
474
         * request should not be retried once this has been generated.
475
         */
476
        // TODO make into a record once on a new enough language profile
477
        private static class Failure {
478
                private final SCPRequest<?> req;
479

480
                private final Exception exn;
481

482
                Failure(SCPRequest<?> req, Exception exn) {
×
483
                        this.req = req;
×
484
                        this.exn = exn;
×
485
                }
×
486
        }
487

488
        /**
489
         * Allows a set of SCP requests to be grouped together in a communication
490
         * across a number of channels for a given connection.
491
         * <p>
492
         * This class implements an SCP windowing, first suggested by Andrew Mundy.
493
         * This extends the idea by having both send and receive windows. These are
494
         * represented by the {@link TxrxProcess#numChannels} and the
495
         * {@link TxrxProcess#numWaits} fields of the enclosing class respectively.
496
         * This seems to help with the timeout issue; when a timeout is received,
497
         * all requests for which a reply has not been received can also timeout.
498
         *
499
         * @author Andrew Mundy
500
         * @author Andrew Rowley
501
         * @author Donal Fellows
502
         */
503
        class RequestPipeline {
504
                /** The connection over which the communication is to take place. */
505
                private final SCPConnection connection;
506

507
                /** The number of requests issued to this pipeline. */
508
                private int numRequests;
509

510
                /** The number of packets that have been resent. */
511
                private int numResent;
512

513
                /** The number of retries due to restartable errors. */
514
                private int numRetryCodeResent;
515

516
                /** The number of timeouts that occurred. */
517
                private int numTimeouts;
518

519
                /** A dictionary of sequence number &rarr; requests in progress. */
520
                @GuardedBy("itself")
521
                private final Map<Integer, Req> outstandingRequests;
522

523
                private long nextSendTime = 0;
×
524

525
                /**
526
                 * Per message record.
527
                 *
528
                 * @param <T>
529
                 *            The type of response expected to the request in the
530
                 *            message.
531
                 */
532
                private final class Request<T extends SCPResponse> implements Req {
533
                        /** Request in progress. */
534
                        private final SCPRequest<T> request;
535

536
                        /** Payload of request in progress. */
537
                        private final ByteBuffer requestData;
538

539
                        private int seq;
540

541
                        /** Callback function for response. */
542
                        private final Consumer<T> callback;
543

544
                        /** Retry reasons. */
545
                        private final List<String> retryReason;
546

547
                        /** Number of retries remaining for the packet. */
548
                        private int retries;
549

550
                        /**
551
                         * Make a record.
552
                         *
553
                         * @param request
554
                         *            The request.
555
                         * @param callback
556
                         *            The success callback.
557
                         */
558
                        private Request(SCPRequest<T> request, Consumer<T> callback) {
×
559
                                this.request = request;
×
560
                                this.requestData = request.getMessageData(connection.getChip());
×
561
                                this.seq = request.scpRequestHeader.getSequence();
×
562
                                this.callback = callback;
×
563
                                retryReason = new ArrayList<>();
×
564
                                retries = numRetries;
×
565
                        }
×
566

567
                        @Override
568
                        public void send() throws IOException {
569
                                if (waitUntil(nextSendTime)) {
×
570
                                        throw new InterruptedIOException(
×
571
                                                        "interrupted while waiting to send");
572
                                }
573
                                log.debug("Sending request {} with connection {}", request,
×
574
                                                connection);
575
                                switch (request.sdpHeader.getFlags()) {
×
576
                                case REPLY_EXPECTED:
577
                                case REPLY_EXPECTED_NO_P2P:
578
                                        connection.send(requestData, seq);
×
579
                                        break;
×
580
                                default:
581
                                        connection.send(requestData);
×
582
                                }
583
                                nextSendTime = nanoTime() + INTER_SEND_INTERVAL_NS;
×
584
                        }
×
585

586
                        @Override
587
                        public void resend(Object reason) throws IOException {
588
                                retries--;
×
589
                                retryReason.add(reason.toString());
×
590
                                if (retryTracker != null) {
×
591
                                        retryTracker.retryNeeded();
×
592
                                }
593
                                requestData.rewind();
×
594
                                send();
×
595
                        }
×
596

597
                        @Override
598
                        public boolean allTimeoutFailures() {
599
                                return retryReason.stream().allMatch(REASON_TIMEOUT::equals);
×
600
                        }
601

602
                        @Override
603
                        public void parseReceivedResponse(SCPResultMessage msg)
604
                                        throws Exception {
605
                                var response = msg.parsePayload(request);
×
606
                                if (callback != null) {
×
607
                                        callback.accept(response);
×
608
                                }
609
                        }
×
610

611
                        @Override
612
                        public SDPLocation getDestination() {
613
                                return request.sdpHeader.getDestination();
×
614
                        }
615

616
                        @Override
617
                        public void handleError(Exception exception) {
618
                                failure = new Failure(request, exception);
×
619
                        }
×
620

621
                        @Override
622
                        public CommandCode getCommand() {
623
                                return request.scpRequestHeader.command;
×
624
                        }
625

626
                        @Override
627
                        public int getRetries() {
628
                                return retries;
×
629
                        }
630

631
                        @Override
632
                        public List<String> getRetryReasons() {
633
                                return retryReason;
×
634
                        }
635
                }
636

637
                /**
638
                 * Create a request handling pipeline.
639
                 *
640
                 * @param connection
641
                 *            The connection over which the communication is to take
642
                 *            place
643
                 */
644
                RequestPipeline(SCPConnection connection) {
×
645
                        log.debug("Request pipeline {} using connection {}", this,
×
646
                                        connection);
647
                        this.connection = connection;
×
648
                        synchronized (OUTSTANDING_REQUESTS) {
×
649
                                outstandingRequests = OUTSTANDING_REQUESTS.computeIfAbsent(
×
650
                                                connection, __ -> synchronizedMap(new HashMap<>()));
×
651
                        }
×
652
                }
×
653

654
                // Various accessors for outstandingRequests to handle locking right
655

656
                private int numOutstandingRequests() {
657
                        synchronized (outstandingRequests) {
×
658
                                return outstandingRequests.size();
×
659
                        }
660
                }
661

662
                private Req getRequestForResult(SCPResultMessage msg) {
663
                        synchronized (outstandingRequests) {
×
664
                                return msg.pickRequest(outstandingRequests);
×
665
                        }
666
                }
667

668
                private void removeRequest(SCPResultMessage msg) {
669
                        synchronized (outstandingRequests) {
×
670
                                msg.removeRequest(outstandingRequests);
×
671
                        }
×
672
                }
×
673

674
                private Req getRequestForSeq(int seq) {
675
                        synchronized (outstandingRequests) {
×
676
                                return outstandingRequests.get(seq);
×
677
                        }
678
                }
679

680
                private List<Integer> listOutstandingSeqs() {
681
                        synchronized (outstandingRequests) {
×
682
                                return List.copyOf(outstandingRequests.keySet());
×
683
                        }
684
                }
685

686
                private void removeManySeqs(BitSet toRemove) {
687
                        synchronized (outstandingRequests) {
×
688
                                for (var seq : toRemove.stream().toArray()) {
×
689
                                        outstandingRequests.remove(seq);
×
690
                                }
691
                        }
×
692
                }
×
693

694
                /**
695
                 * Add an SCP request to the set to be sent. The request expects a
696
                 * reply.
697
                 *
698
                 * @param <T>
699
                 *            The type of response expected to the request.
700
                 * @param request
701
                 *            The SCP request to be sent
702
                 * @param callback
703
                 *            A callback function to call when the response has been
704
                 *            received; takes an SCPResponse as a parameter, or a
705
                 *            {@code null} if the response doesn't need to be processed.
706
                 * @throws IOException
707
                 *             If things go really wrong.
708
                 * @throws InterruptedException
709
                 *             If communications are interrupted (prior to sending).
710
                 */
711
                <T extends CheckOKResponse> void send(SCPRequest<T> request,
712
                                Consumer<T> callback) throws IOException, InterruptedException {
713
                        // If all the channels are used, start to receive packets
714
                        while (numOutstandingRequests() >= numChannels) {
×
715
                                multiRetrieve(numWaits);
×
716
                        }
717

718
                        // Send the request
719
                        registerRequest(request, callback).send();
×
720
                }
×
721

722
                /**
723
                 * Update the packet and store required details.
724
                 *
725
                 * @param <T>
726
                 *            The type of response expected to the request.
727
                 * @param request
728
                 *            The SCP request to be sent
729
                 * @param callback
730
                 *            A callback function to call when the response has been
731
                 *            received; takes an SCPResponse as a parameter, or a
732
                 *            {@code null} if the response doesn't need to be processed.
733
                 * @return The prepared and registered (but unsent) request.
734
                 * @throws DuplicateSequenceNumberException
735
                 *             If we couldn't mint a sequence number. Really shouldn't
736
                 *             happen!
737
                 */
738
                private <T extends CheckOKResponse> Request<T> registerRequest(
739
                                SCPRequest<T> request, Consumer<T> callback) {
740
                        if (request instanceof ConnectionAwareMessage) {
×
741
                                ConnectionAwareMessage cam = (ConnectionAwareMessage) request;
×
742
                                cam.setConnection(connection);
×
743
                        }
744
                        synchronized (outstandingRequests) {
×
745
                                int sequence = request.scpRequestHeader
×
746
                                                .issueSequenceNumber(outstandingRequests.keySet());
×
747

748
                                var req = new Request<>(request, callback);
×
749
                                log.debug("{}: sending message with sequence {}", this,
×
750
                                                sequence);
×
751
                                if (outstandingRequests.put(sequence, req) != null) {
×
752
                                        throw new DuplicateSequenceNumberException();
×
753
                                }
754
                                numRequests++;
×
755
                                return req;
×
756
                        }
757
                }
758

759
                /**
760
                 * Update the packet but don't remember it; it's a one-shot.
761
                 *
762
                 * @param request
763
                 *            The one-way SCP request to be sent
764
                 * @return The prepared (but unsent) request.
765
                 */
766
                private Request<NoResponse> unregisteredRequest(
767
                                SCPRequest<NoResponse> request) {
768
                        // Update the packet with a (non-valuable) sequence number
769
                        request.scpRequestHeader.issueSequenceNumber(Set.of());
×
770
                        log.debug("sending one-way message");
×
771
                        return new Request<>(request, null);
×
772
                }
773

774
                /**
775
                 * Send a one-way request.
776
                 *
777
                 * @param request
778
                 *            The one-way SCP request to be sent.
779
                 * @throws IOException
780
                 *             If things go really wrong.
781
                 * @throws InterruptedException
782
                 *             If communications are interrupted (prior to sending).
783
                 */
784
                void send(SCPRequest<NoResponse> request)
785
                                throws IOException, InterruptedException {
786
                        // Wait for all current in-flight responses to be received
787
                        finish();
×
788

789
                        // Send the request without registering it
790
                        unregisteredRequest(request).send();
×
791
                }
×
792

793
                /**
794
                 * Indicate the end of the packets to be sent. This must be called to
795
                 * ensure that all responses are received and handled.
796
                 *
797
                 * @throws IOException
798
                 *             If anything goes wrong with communications.
799
                 * @throws InterruptedException
800
                 *             If communications are interrupted.
801
                 */
802
                void finish() throws IOException, InterruptedException {
803
                        while (numOutstandingRequests() > 0) {
×
804
                                multiRetrieve(0);
×
805
                        }
806
                        log.debug("Finished called on {} with connection {}", this,
×
807
                                        connection);
NEW
808
                        this.connection.setNotInUse();
×
UNCOV
809
                }
×
810

811
                /**
812
                 * Receives responses until there are only numPackets responses left.
813
                 *
814
                 * @param numPacketsOutstanding
815
                 *            The number of packets that can remain after running.
816
                 * @throws IOException
817
                 *             If anything goes wrong with receiving a packet.
818
                 * @throws InterruptedException
819
                 *             If communications are interrupted.
820
                 */
821
                private void multiRetrieve(int numPacketsOutstanding)
822
                                throws IOException, InterruptedException {
823
                        // While there are still more packets in progress than some
824
                        // threshold
825
                        while (numOutstandingRequests() > numPacketsOutstanding) {
×
826
                                try {
827
                                        // Receive the next response
828
                                        singleRetrieve();
×
829
                                } catch (SocketTimeoutException e) {
×
830
                                        handleReceiveTimeout();
×
831
                                }
×
832
                        }
833
                }
×
834

835
                private void singleRetrieve() throws IOException, InterruptedException {
836
                        // Receive the next response
837
                        log.debug("{}: Connection {} waiting for message... timeout of {}",
×
838
                                        this, connection, packetTimeout);
×
839
                        var msg = connection.receiveSCPResponse(packetTimeout);
×
840
                        if (log.isDebugEnabled()) {
×
841
                                log.debug(
×
842
                                                "{}, Connection {} received message {} with seq num {}",
843
                                                this, connection, msg.getResult(),
×
844
                                                msg.getSequenceNumber());
×
845
                        }
846
                        var req = getRequestForResult(msg);
×
847

848
                        // Only process responses which have matching requests
849
                        if (req == null) {
×
850
                                log.info("discarding message with unknown sequence number: {}",
×
851
                                                msg.getSequenceNumber());
×
852
                                if (log.isDebugEnabled()) {
×
853
                                        log.debug("current waiting on requests with seq's ");
×
854
                                        for (int seq : listOutstandingSeqs()) {
×
855
                                                log.debug("{}", seq);
×
856
                                        }
×
857
                                }
858
                                return;
×
859
                        }
860

861
                        // If the response can be retried, retry it
862
                        if (msg.isRetriable()) {
×
863
                                try {
864
                                        resend(req, msg.getResult(), msg.getSequenceNumber());
×
865
                                        numRetryCodeResent++;
×
866
                                } catch (SocketTimeoutException e) {
×
867
                                        throw e;
×
868
                                } catch (Exception e) {
×
869
                                        log.debug("throwing away request {} coz of {}",
×
870
                                                        msg.getSequenceNumber(), e);
×
871
                                        req.handleError(e);
×
872
                                        removeRequest(msg);
×
873
                                }
×
874
                        } else {
875
                                // No retry is possible - try constructing the result
876
                                try {
877
                                        req.parseReceivedResponse(msg);
×
878
                                } catch (Exception e) {
×
879
                                        req.handleError(e);
×
880
                                } finally {
881
                                        // Remove the sequence from the outstanding responses
882
                                        removeRequest(msg);
×
883
                                }
884
                        }
885
                }
×
886

887
                private void handleReceiveTimeout() {
888
                        numTimeouts++;
×
889

890
                        // If there is a timeout, all packets remaining are resent
891
                        var toRemove = new BitSet(SEQUENCE_LENGTH);
×
892
                        for (int seq : listOutstandingSeqs()) {
×
893
                                log.debug("resending seq {}", seq);
×
894
                                var req = getRequestForSeq(seq);
×
895
                                if (req == null) {
×
896
                                        // Shouldn't happen, but if it does we should nuke it.
897
                                        toRemove.set(seq);
×
898
                                        continue;
×
899
                                }
900

901
                                try {
902
                                        resend(req, REASON_TIMEOUT, seq);
×
903
                                } catch (Exception e) {
×
904
                                        log.debug("removing seq {}", seq);
×
905
                                        req.handleError(e);
×
906
                                        toRemove.set(seq);
×
907
                                }
×
908
                        }
×
909
                        log.debug("finish resending");
×
910

911
                        removeManySeqs(toRemove);
×
912
                }
×
913

914
                private void resend(Req req, Object reason, int seq)
915
                                throws IOException, InterruptedException {
916
                        if (req.getRetries() <= 0) {
×
917
                                // Report timeouts as timeout exception
918
                                if (req.allTimeoutFailures()) {
×
919
                                        throw new SendTimedOutException(req, packetTimeout, seq);
×
920
                                }
921

922
                                // Report any other exception
923
                                throw new SendFailedException(req, numRetries);
×
924
                        }
925

926
                        // If the request can be retried, retry it, sleep for 1ms seems
927
                        // to protect against weird errors. So don't remove this sleep.
928
                        sleep(RETRY_DELAY_MS);
×
929
                        req.resend(reason);
×
930
                        numResent++;
×
931
                }
×
932

933
                @Override
934
                public String toString() {
935
                        synchronized (outstandingRequests) {
×
936
                                return format(
×
937
                                                "%s(req=%d,outstanding=%d,resent=%d,"
938
                                                                + "restart=%d,timeouts=%d)",
939
                                                super.toString(), numRequests,
×
940
                                                outstandingRequests.size(),
×
941
                                                numResent, numRetryCodeResent, numTimeouts);
×
942
                        }
943
                }
944
        }
945

946
        /**
947
         * Indicates that a request timed out.
948
         */
949
        static class SendTimedOutException extends SocketTimeoutException {
950
                private static final long serialVersionUID = -7911020002602751941L;
951

952
                /**
953
                 * @param req
954
                 *            The request that timed out.
955
                 * @param timeout
956
                 *            The length of timeout, in milliseconds.
957
                 */
958
                SendTimedOutException(Req req, int timeout, int seqNum) {
959
                        super(format(
×
960
                                        "Operation %s timed out after %f seconds with seq num %d",
961
                                        req.getCommand(), timeout / (double) MSEC_PER_SEC, seqNum));
×
962
                }
×
963
        }
964

965
        /**
966
         * Indicates that a request could not be sent.
967
         */
968
        static class SendFailedException extends IOException {
969
                private static final long serialVersionUID = -5555562816486761027L;
970

971
                /**
972
                 * @param req
973
                 *            The request that timed out.
974
                 * @param numRetries
975
                 *            How many attempts to send it were made.
976
                 */
977
                SendFailedException(Req req, int numRetries) {
978
                        super(format(
×
979
                                        "Errors sending request %s to %s over %d retries: %s",
980
                                        req.getCommand(), req.getDestination(),
×
981
                                        numRetries, req.getRetryReasons()));
×
982
                }
×
983
        }
984

985
        /**
986
         * There's a duplicate sequence number! This really shouldn't happen.
987
         *
988
         * @author Donal Fellows
989
         */
990
        static class DuplicateSequenceNumberException
991
                        extends IllegalThreadStateException {
992
                private static final long serialVersionUID = -4033792283948201730L;
993

994
                DuplicateSequenceNumberException() {
995
                        super("duplicate sequence number catastrophe");
×
996
                }
×
997
        }
998
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc