• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jcmellado / dartis / 50

pending completion
50

push

travis-ci

web-flow
Added some logic to control broken connections (#16)

Added some logic for handling broken connections.

48 of 48 new or added lines in 9 files covered. (100.0%)

1278 of 1431 relevant lines covered (89.31%)

1.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.41
/lib/src/client/pubsub.dart
1
// Copyright (c) 2018, Juan Mellado. All rights reserved. Use of this source
2
// is governed by a MIT-style license that can be found in the LICENSE file.
3

4
import 'dart:async' show Future, Stream, StreamController;
5

6
import '../exception.dart';
7
import '../protocol.dart';
8
import 'connection.dart';
9
import 'dispatcher.dart';
10

11
/// Pub/Sub event types.
12
abstract class PubSubEventType {
13
  /// Subscription to a channel.
14
  static const String subscribe = r'subscribe';
15

16
  /// Unsubscription from a channel.
17
  static const String unsubscribe = r'unsubscribe';
18

19
  /// Subscription to a pattern.
20
  static const String psubscribe = r'psubscribe';
21

22
  /// Unsubscription from a pattern.
23
  static const String punsubscribe = r'punsubscribe';
24

25
  /// Incoming message.
26
  static const String message = r'message';
27

28
  /// Incoming message matching a pattern.
29
  static const String pmessage = r'pmessage';
30

31
  /// Reply to a PING command.
32
  static const String pong = r'pong';
33
}
34

35
/// Marker interface for all the pub/sub events.
36
abstract class PubSubEvent {}
37

38
/// An event emitted when a command is performed on a channel.
39
class SubscriptionEvent<K> implements PubSubEvent {
40
  /// The name of the command that caused this event.
41
  ///
42
  /// See [PubSubEventType].
43
  final String command;
44

45
  /// The name of the channel affected by this event.
46
  final K channel;
47

48
  /// The number of channels that the client is currently subscribed to.
49
  final int channelCount;
50

51
  /// Creates a [SubscriptionEvent] instance.
52
  const SubscriptionEvent(this.command, this.channel, this.channelCount);
1✔
53

54
  @override
1✔
55
  String toString() => '''SubscriptionEvent<$K>: {command=$command,'''
1✔
56
      ''' channel=$channel, channelCount=$channelCount}''';
2✔
57
}
58

59
/// An event emitted when a message is published on a channel.
60
class MessageEvent<K, V> implements PubSubEvent {
61
  /// The name of the channel where the message was published.
62
  final K channel;
63

64
  /// The message content.
65
  final V message;
66

67
  /// The original pattern matching the name of the channel, if any.
68
  final K pattern;
69

70
  /// Creates a [MessageEvent] instance.
71
  const MessageEvent(this.channel, this.message, [this.pattern]);
1✔
72

73
  @override
1✔
74
  String toString() => '''MessageEvent<$K, $V>: {channel=$channel,'''
1✔
75
      ''' message=$message, pattern=$pattern}''';
2✔
76
}
77

78
/// An event emitted in response to a PING command.
79
class PongEvent<V> implements PubSubEvent {
80
  /// The message content.
81
  final V message;
82

83
  /// Creates a [PongEvent] instance.
84
  const PongEvent(this.message);
1✔
85

86
  @override
1✔
87
  String toString() => 'PongEvent<$V>: $message';
2✔
88
}
89

90
/// A client in Publish/Subscribe mode.
91
///
92
/// In this mode the only allowed commands are SUBSCRIBE, UNSUBSCRIBE,
93
/// PSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT.
94
///
95
/// The replies to subscription and unsubscription commands along with the
96
/// published messages are received in the form of events, so that the client
97
/// can just read a coherent [Stream] of Events.
98
///
99
/// The type [K] is used for channel names and the type [V] for messages.
100
/// Most times, using [String] for both is what you want.
101
///
102
/// ```dart
103
/// final pubsub = await PubSub
104
///     .connect<String, String>('redis://localhost:6379');
105
///
106
/// pubsub // Subscribe to some channels
107
///    ..subscribe(channel: 'dev.dart')
108
///    ..psubscribe(pattern: 'dartlang.news.*');
109
///
110
/// pubsub.stream.listen(print, onError: print); // Listen for server replies
111
/// ```
112
///
113
/// See [SubscriptionEvent], [MessageEvent] and [PongEvent].
114
///
115
/// See `pubsub.dart` in the `example` folder.
116
class PubSub<K, V> {
117
  final _PubSubDispatcher<K, V> _dispatcher;
118

119
  /// Creates a [PubSub] instance with the given [connection].
120
  ///
121
  /// [connect()] provides a more convenient way for creating instances
122
  /// of this class.
123
  PubSub(Connection connection)
1✔
124
      : _dispatcher = _PubSubDispatcher<K, V>(connection);
1✔
125

126
  /// Creates a new connection according to the host and port specified
127
  /// in the [connectionString].
128
  ///
129
  /// Connection string must follow the pattern "redis://{host}:{port}".
130
  ///
131
  /// Example: redis://localhost:6379
132
  ///
133
  /// Returns a [Future] that will complete with either a [PubSub] once
134
  /// connected or an error if the connection failed.
135
  static Future<PubSub<K, V>> connect<K, V>(String connectionString) async {
1✔
136
    final connection = await Connection.connect(connectionString);
2✔
137

138
    return PubSub<K, V>(connection);
1✔
139
  }
140

141
  /// Returns the stream where all events will be published.
142
  Stream<PubSubEvent> get stream => _dispatcher.stream;
3✔
143

144
  /// Returns the converter used to serialize/deserialize all the values.
145
  ///
146
  /// Custom converters can be registered in order of adding new ones
147
  /// or replacing the existing ones.
148
  RedisCodec get codec => _dispatcher.codec;
3✔
149

150
  /// Subscribes the client to the given [channel] or [channels].
151
  ///
152
  /// See https://redis.io/commands/subscribe
153
  void subscribe({K channel, Iterable<K> channels = const []}) =>
1✔
154
      _run(<Object>[r'SUBSCRIBE', channel]..addAll(channels));
3✔
155

156
  /// Unsubscribes the client from the given [channel] or [channels], or
157
  /// from all of them if none is given.
158
  ///
159
  /// See https://redis.io/commands/unsubscribe
160
  void unsubscribe({K channel, Iterable<K> channels = const []}) =>
1✔
161
      _run(<Object>[r'UNSUBSCRIBE', channel]..addAll(channels));
3✔
162

163
  /// Subscribes the client to the given [pattern] or [patterns].
164
  ///
165
  /// See https://redis.io/commands/psubscribe
166
  void psubscribe({K pattern, Iterable<K> patterns = const []}) =>
1✔
167
      _run(<Object>[r'PSUBSCRIBE', pattern]..addAll(patterns));
3✔
168

169
  /// Unsubscribes the client from the given [pattern] or [patterns], or
170
  /// from all of them if none is given.
171
  ///
172
  /// See https://redis.io/commands/punsubscribe
173
  void punsubscribe({K pattern, Iterable<K> patterns = const []}) =>
1✔
174
      _run(<Object>[r'PUNSUBSCRIBE', pattern]..addAll(patterns));
3✔
175

176
  /// Returns an empty string if no [message] is provided, otherwise returns
177
  /// a copy of the [message].
178
  ///
179
  /// See https://redis.io/commands/ping
180
  void ping([String message]) => _run(<Object>[r'PING', message]);
3✔
181

182
  /// Closes the connection.
183
  Future<void> disconnect() => _dispatcher.disconnect();
3✔
184

185
  void _run(Iterable<Object> line) {
1✔
186
    final withoutNulls = line.where((value) => value != null);
2✔
187

188
    _dispatcher.dispatch(withoutNulls);
2✔
189
  }
190
}
191

192
/// A dispatcher for a client in Publish/Subscribe mode.
193
class _PubSubDispatcher<K, V> extends ReplyDispatcher {
194
  final StreamController<PubSubEvent> _controller =
195
      StreamController<PubSubEvent>.broadcast();
196

197
  _PubSubDispatcher(Connection connection) : super(connection);
2✔
198

199
  Stream<PubSubEvent> get stream => _controller.stream;
3✔
200

201
  void dispatch(Iterable<Object> line) {
1✔
202
    final bytes = writer.write(line, codec);
3✔
203
    send(bytes);
1✔
204
  }
205

206
  @override
1✔
207
  void onReply(Reply reply) {
208
    if (reply is! ArrayReply) {
1✔
209
      throw RedisException('Unexpected server reply: $reply.');
×
210
    }
211

212
    // ignore: avoid_as
213
    final event = _onEvent(reply as ArrayReply);
1✔
214
    _controller.add(event);
2✔
215
  }
216

217
  @override
×
218
  void onErrorReply(ErrorReply reply) {
219
    _controller.addError(reply);
×
220
  }
221

222
  @override
×
223
  void onError(Object error, [StackTrace stackTrace]) {
224
    _controller.addError(error, stackTrace);
×
225
  }
226

227
  @override
1✔
228
  void onDone() {
229
    _controller.close();
2✔
230
  }
231

232
  PubSubEvent _onEvent(ArrayReply reply) {
1✔
233
    final array = reply.array;
1✔
234

235
    final type = codec.decode<String>(array[0]);
3✔
236
    switch (type) {
237
      case PubSubEventType.message:
1✔
238
        return _onMessage(array);
1✔
239
      case PubSubEventType.pmessage:
1✔
240
        return _onPmessage(array);
1✔
241
      case PubSubEventType.subscribe:
1✔
242
      case PubSubEventType.unsubscribe:
1✔
243
      case PubSubEventType.psubscribe:
1✔
244
      case PubSubEventType.punsubscribe:
1✔
245
        return _onSubscription(array);
1✔
246
      case PubSubEventType.pong:
1✔
247
        return _onPong(array);
1✔
248
    }
249

250
    throw RedisException('Unexpected server reply type "$type".');
×
251
  }
252

253
  PubSubEvent _onSubscription(List<Reply> array) {
1✔
254
    final command = codec.decode<String>(array[0]);
3✔
255
    final channel = codec.decode<K>(array[1]);
3✔
256
    final channelCount = codec.decode<int>(array[2]);
3✔
257

258
    return SubscriptionEvent<K>(command, channel, channelCount);
1✔
259
  }
260

261
  PubSubEvent _onMessage(List<Reply> array) {
1✔
262
    final channel = codec.decode<K>(array[1]);
3✔
263
    final message = codec.decode<V>(array[2]);
3✔
264

265
    return MessageEvent<K, V>(channel, message);
1✔
266
  }
267

268
  PubSubEvent _onPmessage(List<Reply> array) {
1✔
269
    final pattern = codec.decode<K>(array[1]);
3✔
270
    final channel = codec.decode<K>(array[2]);
3✔
271
    final message = codec.decode<V>(array[3]);
3✔
272

273
    return MessageEvent<K, V>(channel, message, pattern);
1✔
274
  }
275

276
  PubSubEvent _onPong(List<Reply> array) {
1✔
277
    final message = codec.decode<V>(array[1]);
3✔
278

279
    return PongEvent<V>(message);
1✔
280
  }
281
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2023 Coveralls, Inc