• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rsocket / rsocket-cpp / 1373

pending completion
1373

push

travis-ci

lehecka
Fix duplicate def of FromPublisherOperator for Observable (#428)

* Fix duplicate def of FromPublisherOperator for Observable

* Use make_ref

* 80 column lint

2 of 2 new or added lines in 1 file covered. (100.0%)

8280 of 10092 relevant lines covered (82.05%)

245814.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.44
/src/ConnectionAutomaton.cpp
1
// Copyright 2004-present Facebook. All Rights Reserved.
2

3
#include "src/ConnectionAutomaton.h"
4

5
#include <folly/ExceptionWrapper.h>
6
#include <folly/MoveWrapper.h>
7
#include <folly/Optional.h>
8
#include <folly/String.h>
9
#include <folly/io/async/EventBase.h>
10
#include "src/ClientResumeStatusCallback.h"
11
#include "src/ConnectionSetupPayload.h"
12
#include "src/DuplexConnection.h"
13
#include "src/FrameTransport.h"
14
#include "src/RequestHandler.h"
15
#include "src/ResumeCache.h"
16
#include "src/Stats.h"
17
#include "src/StreamState.h"
18
#include "src/automata/ChannelResponder.h"
19
#include "src/automata/StreamAutomatonBase.h"
20

21
namespace reactivesocket {
22

23
ConnectionAutomaton::ConnectionAutomaton(
384✔
24
    folly::Executor& executor,
25
    ReactiveSocket* reactiveSocket,
26
    std::shared_ptr<RequestHandler> requestHandler,
27
    std::shared_ptr<Stats> stats,
28
    std::unique_ptr<KeepaliveTimer> keepaliveTimer,
29
    ReactiveSocketMode mode)
154 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On and GCC_VERSION=5 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
30
    : ExecutorBase(executor),
31
      reactiveSocket_(reactiveSocket),
32
      stats_(stats),
33
      mode_(mode),
34
      resumeCache_(std::make_shared<ResumeCache>(stats)),
35
      streamState_(std::make_shared<StreamState>(*stats)),
384✔
36
      requestHandler_(std::move(requestHandler)),
384✔
37
      keepaliveTimer_(std::move(keepaliveTimer)),
384✔
38
      streamsFactory_(*this, mode) {
1,536✔
39
  // We deliberately do not "open" input or output to avoid having c'tor on the
40
  // stack when processing any signals from the connection. See ::connect and
41
  // ::onSubscribe.
42
  CHECK(streamState_);
384✔
43
}
384✔
44

45
ConnectionAutomaton::~ConnectionAutomaton() {
740✔
46
  // this destructor can be called from a different thread because the stream
47
  // automatons destroyed on different threads can be the last ones referencing
48
  // this.
49

50
  VLOG(6) << "~ConnectionAutomaton";
370✔
51
  // We rely on SubscriptionPtr and SubscriberPtr to dispatch appropriate
52
  // terminal signals.
53
  DCHECK(!resumeCallback_);
370✔
54
  DCHECK(isDisconnectedOrClosed()); // the instance should be closed by via
370✔
55
  // close method
56
}
370✔
57

58
void ConnectionAutomaton::setResumable(bool resumable) {
359✔
59
  debugCheckCorrectExecutor();
359✔
60
  DCHECK(isDisconnectedOrClosed()); // we allow to set this flag before we are
359✔
61
  // connected
62
  remoteResumeable_ = isResumable_ = resumable;
359✔
63
}
359✔
64

65
bool ConnectionAutomaton::connect(
394✔
66
    std::shared_ptr<FrameTransport> frameTransport,
67
    bool sendingPendingFrames,
68
    ProtocolVersion protocolVersion) {
69
  debugCheckCorrectExecutor();
394✔
70
  CHECK(isDisconnectedOrClosed());
394✔
71
  CHECK(frameTransport);
394✔
72
  CHECK(!frameTransport->isClosed());
394✔
73
  if (protocolVersion != ProtocolVersion::Unknown) {
394✔
74
    if (frameSerializer_) {
55✔
75
      if (frameSerializer_->protocolVersion() != protocolVersion) {
15✔
76
        DCHECK(false);
×
77
        frameTransport->close(std::runtime_error("protocol version mismatch"));
×
78
        return false;
×
79
      }
80
    } else {
81
      frameSerializer_ =
32 all except GCC_VERSION=6 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
82
          FrameSerializer::createFrameSerializer(protocolVersion);
56✔
83
      if (!frameSerializer_) {
40✔
84
        DCHECK(false);
×
85
        frameTransport->close(std::runtime_error("invaid protocol version"));
×
86
        return false;
×
87
      }
88
    }
89
  }
90

91
  frameTransport_ = std::move(frameTransport);
394✔
92

93
  for (auto& callback : onConnectListeners_) {
414✔
94
    callback();
20✔
95
  }
96

97
  requestHandler_->socketOnConnected();
394✔
98

99
  // We need to create a hard reference to frameTransport_ to make sure the
100
  // instance survives until the setFrameProcessor returns.  There can be
101
  // terminating signals processed in that call which will nullify
102
  // frameTransport_.
103
  auto frameTransportCopy = frameTransport_;
710✔
104

105
  // Keep a reference to this, as processing frames might close the
106
  // ReactiveSocket instance.
107
  auto copyThis = shared_from_this();
788✔
108
  frameTransport_->setFrameProcessor(copyThis);
394✔
109

110
  if (sendingPendingFrames) {
394✔
111
    DCHECK(!resumeCallback_);
384✔
112
    // we are free to try to send frames again
113
    // not all frames might be sent if the connection breaks, the rest of them
114
    // will queue up again
115
    auto outputFrames = streamState_->moveOutputPendingFrames();
692✔
116
    for (auto& frame : outputFrames) {
384✔
117
      outputFrameOrEnqueue(std::move(frame));
×
118
    }
119

120
    // TODO: turn on only after setup frame was received
121
    if (keepaliveTimer_) {
384✔
122
      keepaliveTimer_->start(shared_from_this());
15✔
123
    }
76 only GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
124
  }
125

126
  return true;
472✔
127
}
128

129
std::shared_ptr<FrameTransport> ConnectionAutomaton::detachFrameTransport() {
×
130
  debugCheckCorrectExecutor();
×
131
  if (isDisconnectedOrClosed()) {
×
132
    return nullptr;
×
133
  }
134

135
  frameTransport_->setFrameProcessor(nullptr);
×
136
  return std::move(frameTransport_);
×
137
}
138

139
void ConnectionAutomaton::disconnect(folly::exception_wrapper ex) {
40✔
140
  debugCheckCorrectExecutor();
40✔
141
  VLOG(6) << "disconnect";
40✔
142
  if (isDisconnectedOrClosed()) {
40✔
143
    return;
18✔
144
  }
145

146
  for (auto& callback : onDisconnectListeners_) {
45✔
147
    callback(ex);
15✔
148
  }
149

150
  requestHandler_->socketOnDisconnected(ex);
30✔
151

152
  closeFrameTransport(std::move(ex), StreamCompletionSignal::CONNECTION_END);
30✔
153
  pauseStreams();
30✔
154
  stats_->socketDisconnected();
30✔
155
}
156

157
void ConnectionAutomaton::close(
623✔
158
    folly::exception_wrapper ex,
159
    StreamCompletionSignal signal) {
160
  debugCheckCorrectExecutor();
623✔
161

162
  if (isClosed_) {
623✔
163
    return;
362✔
164
  }
165
  isClosed_ = true;
384✔
166
  reactiveSocket_ = nullptr;
384✔
167
  stats_->socketClosed(signal);
384✔
168

169
  VLOG(6) << "close";
384✔
170

171
  if (resumeCallback_) {
384✔
172
    resumeCallback_->onResumeError(
×
173
        std::runtime_error(ex ? ex.what().c_str() : "RS closing"));
×
174
    resumeCallback_.reset();
×
175
  }
176

177
  onConnectListeners_.clear();
384✔
178
  onDisconnectListeners_.clear();
384✔
179
  auto onCloseListeners = std::move(onCloseListeners_);
692✔
180
  for (auto& callback : onCloseListeners) {
443✔
181
    callback(ex);
59✔
182
  }
183

184
  requestHandler_->socketOnClosed(ex);
384✔
185

186
  closeStreams(signal);
384✔
187
  closeFrameTransport(std::move(ex), signal);
384✔
188
}
189

190
void ConnectionAutomaton::closeFrameTransport(
414✔
191
    folly::exception_wrapper ex,
192
    StreamCompletionSignal signal) {
193
  if (isDisconnectedOrClosed()) {
414✔
194
    DCHECK(!resumeCallback_);
20✔
195
    return;
102✔
196
  }
197

198
  // Stop scheduling keepalives since the socket is now disconnected
199
  if (keepaliveTimer_) {
394✔
200
    keepaliveTimer_->stop();
20✔
201
  }
202

203
  if (resumeCallback_) {
394✔
204
    resumeCallback_->onConnectionError(
×
205
        std::runtime_error(ex ? ex.what().c_str() : "connection closing"));
×
206
    resumeCallback_.reset();
×
207
  }
208

209
  // echo the exception to the frameTransport only if the frameTransport started
210
  // closing with error
211
  // otherwise we sent some error frame over the wire and we are closing
212
  // transport cleanly
213
  frameTransport_->close(
870✔
214
      signal == StreamCompletionSignal::CONNECTION_ERROR
215
          ? std::move(ex)
319✔
216
          : folly::exception_wrapper());
711✔
217
  frameTransport_ = nullptr;
394✔
218
}
219

220
void ConnectionAutomaton::disconnectOrCloseWithError(Frame_ERROR&& errorFrame) {
5✔
221
  debugCheckCorrectExecutor();
5✔
222
  if (isResumable_) {
5✔
223
    disconnect(std::runtime_error(errorFrame.payload_.data->cloneAsValue()
×
224
                                      .moveToFbString()
×
225
                                      .toStdString()));
×
226
  } else {
227
    closeWithError(std::move(errorFrame));
5✔
228
  }
229
}
5✔
230

231
void ConnectionAutomaton::closeWithError(Frame_ERROR&& error) {
30✔
232
  debugCheckCorrectExecutor();
30✔
233
  VLOG(3) << "closeWithError "
30✔
234
          << error.payload_.data->cloneAsValue().moveToFbString();
60✔
235

236
  StreamCompletionSignal signal;
237
  switch (error.errorCode_) {
30✔
238
    case ErrorCode::INVALID_SETUP:
239
      signal = StreamCompletionSignal::INVALID_SETUP;
×
240
      break;
×
241
    case ErrorCode::UNSUPPORTED_SETUP:
242
      signal = StreamCompletionSignal::UNSUPPORTED_SETUP;
×
243
      break;
×
244
    case ErrorCode::REJECTED_SETUP:
245
      signal = StreamCompletionSignal::REJECTED_SETUP;
×
246
      break;
×
247

248
    case ErrorCode::CONNECTION_ERROR:
249
    // StreamCompletionSignal::CONNECTION_ERROR is reserved for
250
    // frameTransport errors
251
    // ErrorCode::CONNECTION_ERROR is a normal Frame_ERROR error code which has
252
    // nothing to do with frameTransport
253
    case ErrorCode::APPLICATION_ERROR:
254
    case ErrorCode::REJECTED:
255
    case ErrorCode::RESERVED:
256
    case ErrorCode::CANCELED:
257
    case ErrorCode::INVALID:
258
    default:
259
      signal = StreamCompletionSignal::ERROR;
30✔
260
  }
261

262
  auto exception = std::runtime_error(
263
      error.payload_.data->cloneAsValue().moveToFbString().toStdString());
54✔
264

265
  if (frameSerializer_) {
30✔
266
    outputFrameOrEnqueue(frameSerializer_->serializeOut(std::move(error)));
30✔
267
  }
268
  close(std::move(exception), signal);
30✔
269
}
30✔
270

271
void ConnectionAutomaton::reconnect(
5✔
272
    std::shared_ptr<FrameTransport> newFrameTransport,
273
    std::unique_ptr<ClientResumeStatusCallback> resumeCallback) {
274
  debugCheckCorrectExecutor();
5✔
275
  CHECK(newFrameTransport);
5✔
276
  CHECK(resumeCallback);
5✔
277
  CHECK(!resumeCallback_);
5✔
278
  CHECK(isResumable_);
5✔
279
  CHECK(mode_ == ReactiveSocketMode::CLIENT);
5✔
280

281
  // TODO: output frame buffer should not be written to the new connection until
282
  // we receive resume ok
283
  resumeCallback_ = std::move(resumeCallback);
5✔
284
  connect(std::move(newFrameTransport), false, ProtocolVersion::Unknown);
5✔
285
}
5✔
286

287
void ConnectionAutomaton::addStream(
349✔
288
    StreamId streamId,
289
    std::shared_ptr<StreamAutomatonBase> automaton) {
290
  debugCheckCorrectExecutor();
349✔
291
  auto result = streamState_->streams_.emplace(streamId, std::move(automaton));
349✔
292
  (void)result;
293
  assert(result.second);
349✔
294
}
349✔
295

296
void ConnectionAutomaton::endStream(
259✔
297
    StreamId streamId,
298
    StreamCompletionSignal signal) {
299
  debugCheckCorrectExecutor();
259✔
300
  VLOG(6) << "endStream";
259✔
301
  // The signal must be idempotent.
302
  if (!endStreamInternal(streamId, signal)) {
259✔
303
    return;
47 only GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
304
  }
305
  DCHECK(
271✔
306
      signal == StreamCompletionSignal::CANCEL ||
307
      signal == StreamCompletionSignal::COMPLETE ||
308
      signal == StreamCompletionSignal::APPLICATION_ERROR ||
309
      signal == StreamCompletionSignal::ERROR);
365✔
310
}
311

312
bool ConnectionAutomaton::endStreamInternal(
349✔
313
    StreamId streamId,
314
    StreamCompletionSignal signal) {
315
  VLOG(6) << "endStreamInternal";
349✔
316
  auto it = streamState_->streams_.find(streamId);
349✔
317
  if (it == streamState_->streams_.end()) {
349✔
318
    // Unsubscribe handshake initiated by the connection, we're done.
319
    return false;
×
320
  }
321
  // Remove from the map before notifying the automaton.
322
  auto automaton = std::move(it->second);
633✔
323
  streamState_->streams_.erase(it);
349✔
324
  automaton->endStream(signal);
349✔
325
  return true;
349✔
326
}
327

328
void ConnectionAutomaton::closeStreams(StreamCompletionSignal signal) {
456✔
329
  // Close all streams.
330
  while (!streamState_->streams_.empty()) {
622✔
331
    auto oldSize = streamState_->streams_.size();
90✔
332
    auto result =
333
        endStreamInternal(streamState_->streams_.begin()->first, signal);
90✔
334
    (void)oldSize;
335
    (void)result;
336
    // TODO(stupaq): what kind of a user action could violate these
337
    // assertions?
338
    assert(result);
90✔
339
    assert(streamState_->streams_.size() == oldSize - 1);
90✔
340
  }
341
}
384✔
342

343
void ConnectionAutomaton::pauseStreams() {
30✔
344
  for (auto& streamKV : streamState_->streams_) {
60✔
345
    streamKV.second->pauseStream(*requestHandler_);
30✔
346
  }
347
}
30✔
348

349
void ConnectionAutomaton::resumeStreams() {
10✔
350
  for (auto& streamKV : streamState_->streams_) {
20✔
351
    streamKV.second->resumeStream(*requestHandler_);
10✔
352
  }
353
}
10✔
354

355
void ConnectionAutomaton::processFrame(std::unique_ptr<folly::IOBuf> frame) {
855✔
356
  auto thisPtr = this->shared_from_this();
1,547✔
357
  runInExecutor([ thisPtr, frame = std::move(frame) ]() mutable {
9,837✔
358
    thisPtr->processFrameImpl(std::move(frame));
855✔
359
  });
1,873✔
360
}
855✔
361

362
void ConnectionAutomaton::processFrameImpl(
855✔
363
    std::unique_ptr<folly::IOBuf> frame) {
364
  if (isClosed()) {
855✔
365
    return;
204✔
366
  }
367

368
  if (!ensureOrAutodetectFrameSerializer(*frame)) {
855✔
369
    DLOG(FATAL) << "frame serializer is not set";
×
370
    // Failed to autodetect protocol version
371
    closeWithError(Frame_ERROR::invalidFrame());
372
    return;
373
  }
374

375
  auto frameType = frameSerializer_->peekFrameType(*frame);
855✔
376
  stats_->frameRead(frameType);
855✔
377

378
  // TODO(tmont): If a frame is invalid, it will still be tracked. However, we
379
  // actually want that. We want to keep
380
  // each side in sync, even if a frame is invalid.
381
  resumeCache_->trackReceivedFrame(*frame, frameType);
855✔
382

383
  auto streamIdPtr = frameSerializer_->peekStreamId(*frame);
855✔
384
  if (!streamIdPtr) {
855✔
385
    // Failed to deserialize the frame.
386
    closeWithError(Frame_ERROR::invalidFrame());
10✔
387
    return;
10✔
388
  }
389
  auto streamId = *streamIdPtr;
845✔
390
  if (streamId == 0) {
845✔
391
    handleConnectionFrame(frameType, std::move(frame));
194✔
392
    return;
194✔
393
  }
394

395
  // during the time when we are resuming we are can't receive any other
396
  // than connection level frames which drives the resumption
397
  // TODO(lehecka): this assertion should be handled more elegantly using
398
  // different state machine
399
  if (resumeCallback_) {
651✔
400
    LOG(ERROR) << "received stream frames during resumption";
×
401
    closeWithError(Frame_ERROR::invalidFrame());
×
402
    return;
×
403
  }
404

405
  handleStreamFrame(streamId, frameType, std::move(frame));
651✔
406
}
407

408
void ConnectionAutomaton::onTerminal(folly::exception_wrapper ex) {
199✔
409
  auto thisPtr = this->shared_from_this();
359✔
410
  auto movedEx = folly::makeMoveWrapper(ex);
398✔
411
  runInExecutor([thisPtr, movedEx]() mutable {
1,116✔
412
    thisPtr->onTerminalImpl(movedEx.move());
199✔
413
  });
437✔
414
}
199✔
415

416
void ConnectionAutomaton::onTerminalImpl(folly::exception_wrapper ex) {
199✔
417
  if (isResumable_) {
199✔
418
    disconnect(std::move(ex));
10✔
419
  } else {
420
    auto termSignal = ex ? StreamCompletionSignal::CONNECTION_ERROR
76 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On and GCC_VERSION=5 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
421
                         : StreamCompletionSignal::CONNECTION_END;
265✔
422
    close(std::move(ex), termSignal);
189✔
423
  }
424
}
199✔
425

426
void ConnectionAutomaton::handleConnectionFrame(
194✔
427
    FrameType frameType,
428
    std::unique_ptr<folly::IOBuf> payload) {
429
  switch (frameType) {
194✔
430
    case FrameType::KEEPALIVE: {
431
      Frame_KEEPALIVE frame;
×
432
      if (!deserializeFrameOrError(
×
433
              remoteResumeable_, frame, std::move(payload))) {
×
434
        return;
×
435
      }
436

437
      resumeCache_->resetUpToPosition(frame.position_);
×
438
      if (mode_ == ReactiveSocketMode::SERVER) {
×
439
        if (!!(frame.header_.flags_ & FrameFlags::KEEPALIVE_RESPOND)) {
×
440
          sendKeepalive(FrameFlags::EMPTY, std::move(frame.data_));
×
441
        } else {
442
          closeWithError(
×
443
              Frame_ERROR::connectionError("keepalive without flag"));
×
444
        }
445
      } else {
446
        if (!!(frame.header_.flags_ & FrameFlags::KEEPALIVE_RESPOND)) {
×
447
          closeWithError(Frame_ERROR::connectionError(
×
448
              "client received keepalive with respond flag"));
×
449
        } else if (keepaliveTimer_) {
×
450
          keepaliveTimer_->keepaliveReceived();
×
451
        }
452
      }
453
      return;
×
454
    }
455
    case FrameType::SETUP: {
456
      Frame_SETUP frame;
314✔
457
      if (!deserializeFrameOrError(frame, std::move(payload))) {
174✔
458
        return;
×
459
      }
460
      if (!!(frame.header_.flags_ & FrameFlags::RESUME_ENABLE)) {
174✔
461
        remoteResumeable_ = true;
×
462
      } else {
463
        remoteResumeable_ = false;
174✔
464
      }
465
      if (!!(frame.header_.flags_ & FrameFlags::LEASE)) {
174✔
466
        // TODO(yschimke) We don't have the correct lease and wait logic above
467
        // yet
468
        LOG(ERROR) << "ignoring setup frame with lease";
×
469
      }
470

471
      ConnectionSetupPayload setupPayload;
348✔
472
      frame.moveToSetupPayload(setupPayload);
174✔
473

474
      // this should be already set to the correct version
475
      if (frameSerializer_->protocolVersion() != setupPayload.protocolVersion) {
174✔
476
        closeWithError(Frame_ERROR::badSetupFrame("invalid protocol version"));
×
477
        return;
×
478
      }
479

480
      requestHandler_->handleSetupPayload(
174✔
481
          *reactiveSocket_, std::move(setupPayload));
174✔
482
      return;
208✔
483
    }
484
    case FrameType::METADATA_PUSH: {
485
      Frame_METADATA_PUSH frame;
18✔
486
      if (deserializeFrameOrError(frame, std::move(payload))) {
10✔
487
        requestHandler_->handleMetadataPush(std::move(frame.metadata_));
5✔
488
      }
489
      return;
10✔
490
    }
491
    case FrameType::RESUME: {
492
      if (mode_ == ReactiveSocketMode::SERVER && isResumable_) {
×
493
        Frame_RESUME frame;
×
494
        if (!deserializeFrameOrError(frame, std::move(payload))) {
×
495
          return;
×
496
        }
497
        auto resumed = requestHandler_->handleResume(
×
498
            *reactiveSocket_,
×
499
            ResumeParameters(
×
500
                frame.token_,
501
                frame.lastReceivedServerPosition_,
502
                frame.clientPosition_,
503
                ProtocolVersion(frame.versionMajor_, frame.versionMinor_)));
×
504
        if (!resumed) {
×
505
          closeWithError(Frame_ERROR::connectionError("can not resume"));
×
506
        }
×
507
      } else {
508
        closeWithError(
×
509
            Frame_ERROR::connectionError("RS not resumable. Can not resume"));
×
510
      }
511
      return;
×
512
    }
513
    case FrameType::RESUME_OK: {
514
      Frame_RESUME_OK frame;
5✔
515
      if (!deserializeFrameOrError(frame, std::move(payload))) {
5✔
516
        return;
×
517
      }
518
      if (resumeCallback_) {
5✔
519
        if (resumeCache_->isPositionAvailable(frame.position_)) {
5✔
520
          resumeCallback_->onResumeOk();
5✔
521
          resumeCallback_.reset();
5✔
522
          resumeFromPosition(frame.position_);
5✔
523
        } else {
524
          closeWithError(Frame_ERROR::connectionError(folly::to<std::string>(
×
525
              "Client cannot resume, server position ",
526
              frame.position_,
527
              " is not available.")));
×
528
        }
529
      } else {
530
        closeWithError(Frame_ERROR::invalidFrame());
×
531
      }
532
      return;
5✔
533
    }
534
    case FrameType::ERROR: {
535
      Frame_ERROR frame;
9✔
536
      if (!deserializeFrameOrError(frame, std::move(payload))) {
5✔
537
        return;
×
538
      }
539

540
      // TODO: handle INVALID_SETUP, UNSUPPORTED_SETUP, REJECTED_SETUP
541

542
      if (frame.errorCode_ == ErrorCode::CONNECTION_ERROR && resumeCallback_) {
5✔
543
        resumeCallback_->onResumeError(
×
544
            std::runtime_error(frame.payload_.moveDataToString()));
×
545
        resumeCallback_.reset();
×
546
        // fall through
547
      }
548

549
      close(
8 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
550
          std::runtime_error(frame.payload_.moveDataToString()),
8 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
551
          StreamCompletionSignal::ERROR);
5✔
552
      return;
5✔
553
    }
554
    case FrameType::RESERVED:
555
    case FrameType::LEASE:
556
    case FrameType::REQUEST_RESPONSE:
557
    case FrameType::REQUEST_FNF:
558
    case FrameType::REQUEST_STREAM:
559
    case FrameType::REQUEST_CHANNEL:
560
    case FrameType::REQUEST_N:
561
    case FrameType::CANCEL:
562
    case FrameType::PAYLOAD:
563
    case FrameType::EXT:
564
    default:
565
      closeWithError(Frame_ERROR::unexpectedFrame());
×
566
      return;
×
567
  }
568
}
569

570
void ConnectionAutomaton::handleStreamFrame(
651✔
571
    StreamId streamId,
572
    FrameType frameType,
573
    std::unique_ptr<folly::IOBuf> serializedFrame) {
574
  auto it = streamState_->streams_.find(streamId);
651✔
575
  if (it == streamState_->streams_.end()) {
651✔
576
    handleUnknownStream(streamId, frameType, std::move(serializedFrame));
194✔
577
    return;
388✔
578
  }
579
  auto &automaton = it->second;
457✔
580

581
  switch (frameType) {
457✔
582
    case FrameType::REQUEST_N: {
583
      Frame_REQUEST_N frameRequestN;
54✔
584
      if (!deserializeFrameOrError(frameRequestN,
64✔
585
                                   std::move(serializedFrame))) {
64✔
586
        return;
×
587
      }
588
      automaton->handleRequestN(frameRequestN.requestN_);
54✔
589
      break;
54✔
590
    }
591
    case FrameType::CANCEL: {
592
      automaton->handleCancel();
33✔
593
      break;
33✔
594
    }
595
    case FrameType::PAYLOAD: {
596
      Frame_PAYLOAD framePayload;
340✔
597
      if (!deserializeFrameOrError(framePayload,
408✔
598
                                   std::move(serializedFrame))) {
408✔
599
        return;
×
600
      }
601
      automaton->handlePayload(std::move(framePayload.payload_),
952✔
602
                               framePayload.header_.flagsComplete(),
340✔
603
                               framePayload.header_.flagsNext());
748✔
604
      break;
340✔
605
    }
606
    case FrameType::ERROR: {
607
      Frame_ERROR frameError;
30✔
608
      if (!deserializeFrameOrError(frameError,
36✔
609
                                   std::move(serializedFrame))) {
36✔
610
        return;
×
611
      }
612
      automaton->handleError(
78✔
613
          std::runtime_error(frameError.payload_.moveDataToString()));
78✔
614
      break;
30✔
615
    }
616
    case FrameType::REQUEST_CHANNEL:
617
    case FrameType::REQUEST_RESPONSE:
618
    case FrameType::RESERVED:
619
    case FrameType::SETUP:
620
    case FrameType::LEASE:
621
    case FrameType::KEEPALIVE:
622
    case FrameType::REQUEST_FNF:
623
    case FrameType::REQUEST_STREAM:
624
    case FrameType::METADATA_PUSH:
625
    case FrameType::RESUME:
626
    case FrameType::RESUME_OK:
627
    case FrameType::EXT:
628
      closeWithError(Frame_ERROR::unexpectedFrame());
×
629
      break;
×
630
    default:
631
      // because of compatibility with future frame types we will just ignore
632
      // unknown frames
633
      break;
×
634
  }
635
}
636

637
void ConnectionAutomaton::handleUnknownStream(
194✔
638
    StreamId streamId,
639
    FrameType frameType,
640
    std::unique_ptr<folly::IOBuf> serializedFrame) {
641
  DCHECK(streamId != 0);
194✔
642
  // TODO: comparing string versions is odd because from version
643
  // 10.0 the lexicographic comparison doesn't work
644
  // we should change the version to struct
645
  if (frameSerializer_->protocolVersion() > ProtocolVersion{0, 0} &&
388✔
646
      !streamsFactory_.registerNewPeerStreamId(streamId)) {
194✔
647
    return;
10✔
648
  }
649

650
  switch (frameType) {
184✔
651
    case FrameType::REQUEST_CHANNEL: {
652
      Frame_REQUEST_CHANNEL frame;
25✔
653
      if (!deserializeFrameOrError(frame, std::move(serializedFrame))) {
25✔
654
        return;
×
655
      }
656
      auto automaton = streamsFactory_.createChannelResponder(
657
          frame.requestN_, streamId, executor());
50✔
658
      auto requestSink = requestHandler_->handleRequestChannel(
25✔
659
          std::move(frame.payload_), streamId, automaton);
50✔
660
      automaton->subscribe(requestSink);
25✔
661
      break;
25✔
662
    }
663
    case FrameType::REQUEST_STREAM: {
664
      Frame_REQUEST_STREAM frame;
87✔
665
      if (!deserializeFrameOrError(frame, std::move(serializedFrame))) {
87✔
666
        return;
×
667
      }
668
      auto automaton = streamsFactory_.createStreamResponder(
669
          frame.requestN_, streamId, executor());
174✔
670
      requestHandler_->handleRequestStream(
231✔
671
          std::move(frame.payload_), streamId, automaton);
159✔
672
      break;
87✔
673
    }
674
    case FrameType::REQUEST_RESPONSE: {
675
      Frame_REQUEST_RESPONSE frame;
57✔
676
      if (!deserializeFrameOrError(frame, std::move(serializedFrame))) {
57✔
677
        return;
×
678
      }
679
      auto automaton =
680
          streamsFactory_.createRequestResponseResponder(streamId, executor());
114✔
681
      requestHandler_->handleRequestResponse(
153✔
682
          std::move(frame.payload_), streamId, automaton);
105✔
683
      break;
57✔
684
    }
685
    case FrameType::REQUEST_FNF: {
686
      Frame_REQUEST_FNF frame;
10✔
687
      if (!deserializeFrameOrError(frame, std::move(serializedFrame))) {
10✔
688
        return;
×
689
      }
690
      // no stream tracking is necessary
691
      requestHandler_->handleFireAndForgetRequest(
26✔
692
          std::move(frame.payload_), streamId);
18✔
693
      break;
10✔
694
    }
695

696
    case FrameType::RESUME:
697
    case FrameType::SETUP:
698
    case FrameType::METADATA_PUSH:
699
    case FrameType::LEASE:
700
    case FrameType::KEEPALIVE:
701
    case FrameType::RESERVED:
702
    case FrameType::REQUEST_N:
703
    case FrameType::CANCEL:
704
    case FrameType::PAYLOAD:
705
    case FrameType::ERROR:
706
    case FrameType::RESUME_OK:
707
    case FrameType::EXT:
708
    default:
709
      DLOG(ERROR) << "unknown stream frame (streamId=" << streamId
10✔
710
                  << " frameType=" << frameType << ")";
5✔
711
      closeWithError(Frame_ERROR::unexpectedFrame());
5✔
712
  }
713
}
714
/// @}
715

716
void ConnectionAutomaton::sendKeepalive(std::unique_ptr<folly::IOBuf> data) {
5✔
717
  sendKeepalive(FrameFlags::KEEPALIVE_RESPOND, std::move(data));
5✔
718
}
5✔
719

720
void ConnectionAutomaton::sendKeepalive(
5✔
721
    FrameFlags flags,
722
    std::unique_ptr<folly::IOBuf> data) {
723
  debugCheckCorrectExecutor();
5✔
724
  Frame_KEEPALIVE pingFrame(
725
      flags, resumeCache_->impliedPosition(), std::move(data));
9✔
726
  outputFrameOrEnqueue(
4 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
727
      frameSerializer_->serializeOut(std::move(pingFrame), remoteResumeable_));
9✔
728
}
5✔
729

730
void ConnectionAutomaton::tryClientResume(
5✔
731
    const ResumeIdentificationToken& token,
732
    std::shared_ptr<FrameTransport> frameTransport,
733
    std::unique_ptr<ClientResumeStatusCallback> resumeCallback) {
734
  frameTransport->outputFrameOrEnqueue(frameSerializer_->serializeOut(
18✔
735
      createResumeFrame(token)));
18✔
736

737
  // if the client was still connected we will disconnected the old connection
738
  // with a clear error message
739
  disconnect(
8 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
740
      std::runtime_error("resuming client on a different connection"));
9✔
741
  setResumable(true);
5✔
742
  reconnect(std::move(frameTransport), std::move(resumeCallback));
5✔
743
}
5✔
744

745
Frame_RESUME ConnectionAutomaton::createResumeFrame(
5✔
746
    const ResumeIdentificationToken& token) const {
747
  return Frame_RESUME(
748
      token,
749
      resumeCache_->impliedPosition(),
4 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
750
      resumeCache_->lastResetPosition(),
4 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
751
      frameSerializer_->protocolVersion());
13✔
752
}
753

754
bool ConnectionAutomaton::isPositionAvailable(ResumePosition position) {
×
755
  debugCheckCorrectExecutor();
×
756
  return resumeCache_->isPositionAvailable(position);
×
757
}
758

759
bool ConnectionAutomaton::resumeFromPositionOrClose(
5✔
760
    ResumePosition serverPosition,
761
    ResumePosition clientPosition) {
762
  debugCheckCorrectExecutor();
5✔
763
  DCHECK(!resumeCallback_);
5✔
764
  DCHECK(!isDisconnectedOrClosed());
5✔
765
  DCHECK(mode_ == ReactiveSocketMode::SERVER);
5✔
766

767
  bool clientPositionExist = (clientPosition == kUnspecifiedResumePosition) ||
5✔
768
      resumeCache_->canResumeFrom(clientPosition);
5✔
769

770
  if (clientPositionExist &&
10✔
771
      resumeCache_->isPositionAvailable(serverPosition)) {
5✔
772
    frameTransport_->outputFrameOrEnqueue(frameSerializer_->serializeOut(
18✔
773
        Frame_RESUME_OK(resumeCache_->impliedPosition())));
16✔
774
    resumeFromPosition(serverPosition);
5✔
775
    return true;
5✔
776
  } else {
777
    closeWithError(Frame_ERROR::connectionError(folly::to<std::string>(
×
778
        "Cannot resume server, client lastServerPosition=",
779
        serverPosition,
780
        " firstClientPosition=",
781
        clientPosition,
782
        " is not available. Last reset position is ",
783
        resumeCache_->lastResetPosition())));
×
784
    return false;
×
785
  }
786
}
787

788
void ConnectionAutomaton::resumeFromPosition(ResumePosition position) {
10✔
789
  DCHECK(!resumeCallback_);
10✔
790
  DCHECK(!isDisconnectedOrClosed());
10✔
791
  DCHECK(resumeCache_->isPositionAvailable(position));
10✔
792

793
  resumeStreams();
10✔
794
  resumeCache_->sendFramesFromPosition(position, *frameTransport_);
10✔
795

796
  for (auto& frame : streamState_->moveOutputPendingFrames()) {
15✔
797
    outputFrameOrEnqueue(std::move(frame));
5✔
798
  }
2 only GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
799

800
  if (!isDisconnectedOrClosed() && keepaliveTimer_) {
10✔
801
    keepaliveTimer_->start(shared_from_this());
5✔
802
  }
803
}
10✔
804

805
void ConnectionAutomaton::outputFrameOrEnqueue(
673✔
806
    std::unique_ptr<folly::IOBuf> frame) {
807
  debugCheckCorrectExecutor();
673✔
808
  // if we are resuming we cant send any frames until we receive RESUME_OK
809
  if (!isDisconnectedOrClosed() && !resumeCallback_) {
673✔
810
    outputFrame(std::move(frame));
668✔
811
  } else {
812
    streamState_->enqueueOutputPendingFrame(std::move(frame));
5✔
813
  }
814
}
673✔
815

816
void ConnectionAutomaton::requestFireAndForget(Payload request) {
5✔
817
  Frame_REQUEST_FNF frame(
818
      streamsFactory().getNextStreamId(),
5✔
819
      FrameFlags::EMPTY,
820
      std::move(std::move(request)));
14✔
821
  outputFrameOrEnqueue(frameSerializer_->serializeOut(std::move(frame)));
5✔
822
}
5✔
823

824
void ConnectionAutomaton::metadataPush(std::unique_ptr<folly::IOBuf> metadata) {
5✔
825
  outputFrameOrEnqueue(frameSerializer_->serializeOut(
13✔
826
      Frame_METADATA_PUSH(std::move(metadata))));
11✔
827
}
5✔
828

829
void ConnectionAutomaton::outputFrame(std::unique_ptr<folly::IOBuf> frame) {
668✔
830
  DCHECK(!isDisconnectedOrClosed());
668✔
831

832
  auto frameType = frameSerializer_->peekFrameType(*frame);
668✔
833
  stats_->frameWritten(frameType);
668✔
834

835
  if (isResumable_) {
668✔
836
    auto streamIdPtr = frameSerializer_->peekStreamId(*frame);
40✔
837
    resumeCache_->trackSentFrame(*frame, frameType, streamIdPtr);
40✔
838
  }
839
  frameTransport_->outputFrameOrEnqueue(std::move(frame));
668✔
840
}
668✔
841

842
uint32_t ConnectionAutomaton::getKeepaliveTime() const {
165✔
843
  debugCheckCorrectExecutor();
165✔
844
  return keepaliveTimer_
845
      ? static_cast<uint32_t>(keepaliveTimer_->keepaliveTime().count())
258✔
846
      : Frame_SETUP::kMaxKeepaliveTime;
558✔
847
}
848

849
bool ConnectionAutomaton::isDisconnectedOrClosed() const {
2,943✔
850
  return !frameTransport_;
2,943✔
851
}
852

853
bool ConnectionAutomaton::isClosed() const {
1,319✔
854
  return isClosed_;
1,319✔
855
}
856

857
DuplexConnection* ConnectionAutomaton::duplexConnection() const {
×
858
  debugCheckCorrectExecutor();
×
859
  return frameTransport_ ? frameTransport_->duplexConnection() : nullptr;
×
860
}
861

862
void ConnectionAutomaton::debugCheckCorrectExecutor() const {
2,912✔
863
  DCHECK(
3,228✔
864
      !dynamic_cast<folly::EventBase*>(&executor()) ||
865
      dynamic_cast<folly::EventBase*>(&executor())->isInEventBaseThread());
7,002✔
866
}
2,912✔
867

868
void ConnectionAutomaton::addConnectedListener(std::function<void()> listener) {
10✔
869
  CHECK(listener);
10✔
870
  onConnectListeners_.push_back(std::move(listener));
10✔
871
}
10✔
872

873
void ConnectionAutomaton::addDisconnectedListener(ErrorCallback listener) {
10✔
874
  CHECK(listener);
10✔
875
  onDisconnectListeners_.push_back(std::move(listener));
10✔
876
}
10✔
877

878
void ConnectionAutomaton::addClosedListener(ErrorCallback listener) {
59✔
879
  CHECK(listener);
59✔
880
  onCloseListeners_.push_back(std::move(listener));
59✔
881
}
59✔
882

883
void ConnectionAutomaton::setFrameSerializer(
175✔
884
    std::unique_ptr<FrameSerializer> frameSerializer) {
885
  CHECK(frameSerializer);
175✔
886
  // serializer is not interchangeable, it would screw up resumability
887
  // CHECK(!frameSerializer_);
888
  frameSerializer_ = std::move(frameSerializer);
175✔
889
}
175✔
890

891
void ConnectionAutomaton::setUpFrame(
165✔
892
    std::shared_ptr<FrameTransport> frameTransport,
893
    ConnectionSetupPayload setupPayload) {
894
  auto protocolVersion = getSerializerProtocolVersion();
165✔
895

896
  Frame_SETUP frame(
897
      setupPayload.resumable ? FrameFlags::RESUME_ENABLE : FrameFlags::EMPTY,
132 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
898
      protocolVersion.major,
132 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
899
      protocolVersion.minor,
132 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
900
      getKeepaliveTime(),
132 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
901
      Frame_SETUP::kMaxLifetime,
902
      setupPayload.token,
903
      std::move(setupPayload.metadataMimeType),
165✔
904
      std::move(setupPayload.dataMimeType),
165✔
905
      std::move(setupPayload.payload));
1,155✔
906

907
  // TODO: when the server returns back that it doesn't support resumability, we
908
  // should retry without resumability
909

910
  // making sure we send setup frame first
911
  frameTransport->outputFrameOrEnqueue(
297✔
912
      frameSerializer_->serializeOut(std::move(frame)));
297✔
913
  // then the rest of the cached frames will be sent
914
  connect(
264 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
915
      std::move(frameTransport), true, ProtocolVersion::Unknown);
297✔
916
}
165✔
917

918
ProtocolVersion ConnectionAutomaton::getSerializerProtocolVersion() {
330✔
919
  return frameSerializer_->protocolVersion();
330✔
920
}
921

922
void ConnectionAutomaton::writeNewStream(
180✔
923
    StreamId streamId,
924
    StreamType streamType,
925
    uint32_t initialRequestN,
926
    Payload payload,
927
    bool completed) {
928
  switch (streamType) {
180✔
929
    case StreamType::CHANNEL:
930
      outputFrameOrEnqueue(frameSerializer_->serializeOut(Frame_REQUEST_CHANNEL(
65✔
931
          streamId,
932
          completed ? FrameFlags::COMPLETE : FrameFlags::EMPTY,
933
          initialRequestN,
934
          std::move(payload))));
45✔
935
      break;
25✔
936

937
    case StreamType::STREAM:
938
      outputFrameOrEnqueue(frameSerializer_->serializeOut(Frame_REQUEST_STREAM(
234✔
939
          streamId, FrameFlags::EMPTY, initialRequestN, std::move(payload))));
162✔
940
      break;
90✔
941

942
    case StreamType::REQUEST_RESPONSE:
943
      outputFrameOrEnqueue(
52 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
944
          frameSerializer_->serializeOut(Frame_REQUEST_RESPONSE(
169✔
945
              streamId, FrameFlags::EMPTY, std::move(payload))));
117✔
946
      break;
65✔
947

948
    case StreamType::FNF:
949
      outputFrameOrEnqueue(frameSerializer_->serializeOut(
×
950
          Frame_REQUEST_FNF(streamId, FrameFlags::EMPTY, std::move(payload))));
×
951
      break;
×
952

953
    default:
954
      CHECK(false); // unknown type
×
955
  }
956
}
180✔
957

958
void ConnectionAutomaton::writeRequestN(StreamId streamId, uint32_t n) {
55✔
959
  outputFrameOrEnqueue(
44 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
960
      frameSerializer_->serializeOut(Frame_REQUEST_N(streamId, n)));
99✔
961
}
55✔
962

963
void ConnectionAutomaton::writePayload(
290✔
964
    StreamId streamId,
965
    Payload payload,
966
    bool complete) {
967
  Frame_PAYLOAD frame(
968
      streamId,
969
      FrameFlags::NEXT | (complete ? FrameFlags::COMPLETE : FrameFlags::EMPTY),
970
      std::move(payload));
534✔
971
  outputFrameOrEnqueue(frameSerializer_->serializeOut(std::move(frame)));
290✔
972
}
290✔
973

974
void ConnectionAutomaton::writeCloseStream(
98✔
975
    StreamId streamId,
976
    StreamCompletionSignal signal,
977
    Payload payload) {
978
  switch (signal) {
98✔
979
    case StreamCompletionSignal::COMPLETE:
980
      outputFrameOrEnqueue(
28 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
981
          frameSerializer_->serializeOut(Frame_PAYLOAD::complete(streamId)));
62✔
982
      break;
34✔
983

984
    case StreamCompletionSignal::CANCEL:
985
      outputFrameOrEnqueue(
28 all except GCC_VERSION=4.9 BUILD_TYPE=Debug CPP_VERSION=14 ASAN=On ✔
986
          frameSerializer_->serializeOut(Frame_CANCEL(streamId)));
63✔
987
      break;
35✔
988

989
    case StreamCompletionSignal::ERROR:
990
      outputFrameOrEnqueue(frameSerializer_->serializeOut(
13✔
991
          Frame_ERROR::error(streamId, std::move(payload))));
13✔
992
      break;
5✔
993

994
    case StreamCompletionSignal::APPLICATION_ERROR:
995
      outputFrameOrEnqueue(frameSerializer_->serializeOut(
64✔
996
          Frame_ERROR::applicationError(streamId, std::move(payload))));
64✔
997
      break;
24✔
998

999
    case StreamCompletionSignal::INVALID_SETUP:
1000
    case StreamCompletionSignal::UNSUPPORTED_SETUP:
1001
    case StreamCompletionSignal::REJECTED_SETUP:
1002

1003
    case StreamCompletionSignal::CONNECTION_ERROR:
1004
    case StreamCompletionSignal::CONNECTION_END:
1005
    case StreamCompletionSignal::SOCKET_CLOSED:
1006
    default:
1007
      CHECK(false); // unexpected value
×
1008
  }
1009
}
98✔
1010

1011
void ConnectionAutomaton::onStreamClosed(
259✔
1012
    StreamId streamId,
1013
    StreamCompletionSignal signal) {
1014
  endStream(streamId, signal);
259✔
1015
}
259✔
1016

1017
bool ConnectionAutomaton::ensureOrAutodetectFrameSerializer(
855✔
1018
    const folly::IOBuf& firstFrame) {
1019
  if (frameSerializer_) {
855✔
1020
    return true;
686✔
1021
  }
1022

1023
  if (mode_ != ReactiveSocketMode::SERVER) {
169✔
1024
    // this should never happen as clients are initized with FrameSerializer
1025
    // instance
1026
    DCHECK(false);
×
1027
    return false;
×
1028
  }
1029

1030
  auto serializer = FrameSerializer::createAutodetectedSerializer(firstFrame);
305✔
1031
  if (!serializer) {
169✔
1032
    LOG(ERROR) << "unable to detect protocol version";
×
1033
    return false;
×
1034
  }
1035

1036
  VLOG(2) << "detected protocol version" << serializer->protocolVersion();
169✔
1037
  frameSerializer_ = std::move(serializer);
169✔
1038
  return true;
169✔
1039
}
1040
} // reactivesocket
87✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc