• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

igniterealtime / Smack / #2856

pending completion
#2856

push

github-actions

web-flow
Merge pull request #561 from Flowdalic/github-ci

[github ci] Java 15 → 17

16274 of 41793 relevant lines covered (38.94%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.42
/smack-core/src/main/java/org/jivesoftware/smack/AbstractXMPPConnection.java
1
/**
2
 *
3
 * Copyright 2009 Jive Software, 2018-2022 Florian Schmaus.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.jivesoftware.smack;
18

19
import java.io.IOException;
20
import java.io.Reader;
21
import java.io.Writer;
22
import java.util.Collection;
23
import java.util.HashMap;
24
import java.util.Iterator;
25
import java.util.LinkedHashMap;
26
import java.util.LinkedList;
27
import java.util.List;
28
import java.util.Map;
29
import java.util.Queue;
30
import java.util.Set;
31
import java.util.concurrent.ConcurrentLinkedQueue;
32
import java.util.concurrent.CopyOnWriteArraySet;
33
import java.util.concurrent.Executor;
34
import java.util.concurrent.ExecutorService;
35
import java.util.concurrent.Executors;
36
import java.util.concurrent.Semaphore;
37
import java.util.concurrent.ThreadFactory;
38
import java.util.concurrent.TimeUnit;
39
import java.util.concurrent.atomic.AtomicInteger;
40
import java.util.concurrent.locks.Lock;
41
import java.util.concurrent.locks.ReentrantLock;
42
import java.util.logging.Level;
43
import java.util.logging.Logger;
44

45
import javax.net.ssl.SSLSession;
46
import javax.xml.namespace.QName;
47

48
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
49
import org.jivesoftware.smack.SmackConfiguration.UnknownIqRequestReplyMode;
50
import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
51
import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
52
import org.jivesoftware.smack.SmackException.NoResponseException;
53
import org.jivesoftware.smack.SmackException.NotConnectedException;
54
import org.jivesoftware.smack.SmackException.NotLoggedInException;
55
import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
56
import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException;
57
import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
58
import org.jivesoftware.smack.SmackException.SecurityRequiredException;
59
import org.jivesoftware.smack.SmackException.SmackSaslException;
60
import org.jivesoftware.smack.SmackException.SmackWrappedException;
61
import org.jivesoftware.smack.SmackFuture.InternalSmackFuture;
62
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
63
import org.jivesoftware.smack.XMPPException.StreamErrorException;
64
import org.jivesoftware.smack.XMPPException.XMPPErrorException;
65
import org.jivesoftware.smack.compress.packet.Compress;
66
import org.jivesoftware.smack.compression.XMPPInputOutputStream;
67
import org.jivesoftware.smack.datatypes.UInt16;
68
import org.jivesoftware.smack.debugger.SmackDebugger;
69
import org.jivesoftware.smack.debugger.SmackDebuggerFactory;
70
import org.jivesoftware.smack.filter.IQReplyFilter;
71
import org.jivesoftware.smack.filter.StanzaFilter;
72
import org.jivesoftware.smack.filter.StanzaIdFilter;
73
import org.jivesoftware.smack.internal.SmackTlsContext;
74
import org.jivesoftware.smack.iqrequest.IQRequestHandler;
75
import org.jivesoftware.smack.packet.AbstractStreamOpen;
76
import org.jivesoftware.smack.packet.Bind;
77
import org.jivesoftware.smack.packet.ErrorIQ;
78
import org.jivesoftware.smack.packet.ExtensionElement;
79
import org.jivesoftware.smack.packet.IQ;
80
import org.jivesoftware.smack.packet.Mechanisms;
81
import org.jivesoftware.smack.packet.Message;
82
import org.jivesoftware.smack.packet.MessageBuilder;
83
import org.jivesoftware.smack.packet.MessageOrPresence;
84
import org.jivesoftware.smack.packet.MessageOrPresenceBuilder;
85
import org.jivesoftware.smack.packet.Nonza;
86
import org.jivesoftware.smack.packet.Presence;
87
import org.jivesoftware.smack.packet.PresenceBuilder;
88
import org.jivesoftware.smack.packet.Session;
89
import org.jivesoftware.smack.packet.Stanza;
90
import org.jivesoftware.smack.packet.StanzaError;
91
import org.jivesoftware.smack.packet.StanzaFactory;
92
import org.jivesoftware.smack.packet.StartTls;
93
import org.jivesoftware.smack.packet.StreamError;
94
import org.jivesoftware.smack.packet.StreamOpen;
95
import org.jivesoftware.smack.packet.TopLevelStreamElement;
96
import org.jivesoftware.smack.packet.XmlElement;
97
import org.jivesoftware.smack.packet.XmlEnvironment;
98
import org.jivesoftware.smack.packet.id.StanzaIdSource;
99
import org.jivesoftware.smack.parsing.ParsingExceptionCallback;
100
import org.jivesoftware.smack.parsing.SmackParsingException;
101
import org.jivesoftware.smack.provider.ExtensionElementProvider;
102
import org.jivesoftware.smack.provider.NonzaProvider;
103
import org.jivesoftware.smack.provider.ProviderManager;
104
import org.jivesoftware.smack.sasl.SASLErrorException;
105
import org.jivesoftware.smack.sasl.SASLMechanism;
106
import org.jivesoftware.smack.sasl.core.SASLAnonymous;
107
import org.jivesoftware.smack.sasl.packet.SaslNonza;
108
import org.jivesoftware.smack.util.Async;
109
import org.jivesoftware.smack.util.CollectionUtil;
110
import org.jivesoftware.smack.util.Consumer;
111
import org.jivesoftware.smack.util.MultiMap;
112
import org.jivesoftware.smack.util.Objects;
113
import org.jivesoftware.smack.util.PacketParserUtils;
114
import org.jivesoftware.smack.util.ParserUtils;
115
import org.jivesoftware.smack.util.Predicate;
116
import org.jivesoftware.smack.util.StringUtils;
117
import org.jivesoftware.smack.util.Supplier;
118
import org.jivesoftware.smack.xml.XmlPullParser;
119
import org.jivesoftware.smack.xml.XmlPullParserException;
120

121
import org.jxmpp.jid.DomainBareJid;
122
import org.jxmpp.jid.EntityBareJid;
123
import org.jxmpp.jid.EntityFullJid;
124
import org.jxmpp.jid.Jid;
125
import org.jxmpp.jid.impl.JidCreate;
126
import org.jxmpp.jid.parts.Resourcepart;
127
import org.jxmpp.stringprep.XmppStringprepException;
128
import org.jxmpp.util.XmppStringUtils;
129

130
/**
131
 * This abstract class is commonly used as super class for XMPP connection mechanisms like TCP and BOSH. Hence it
132
 * provides the methods for connection state management, like {@link #connect()}, {@link #login()} and
133
 * {@link #disconnect()} (which are deliberately not provided by the {@link XMPPConnection} interface).
134
 * <p>
135
 * <b>Note:</b> The default entry point to Smack's documentation is {@link XMPPConnection}. If you are getting started
136
 * with Smack, then head over to {@link XMPPConnection} and the come back here.
137
 * </p>
138
 * <h2>Parsing Exceptions</h2>
139
 * <p>
140
 * In case a Smack parser (Provider) throws those exceptions are handled over to the {@link ParsingExceptionCallback}. A
141
 * common cause for a provider throwing is illegal input, for example a non-numeric String where only Integers are
142
 * allowed. Smack's <em>default behavior</em> follows the <b>"fail-hard per default"</b> principle leading to a
143
 * termination of the connection on parsing exceptions. This default was chosen to make users eventually aware that they
144
 * should configure their own callback and handle those exceptions to prevent the disconnect. Handle a parsing exception
145
 * could be as simple as using a non-throwing no-op callback, which would cause the faulty stream element to be taken
146
 * out of the stream, i.e., Smack behaves like that element was never received.
147
 * </p>
148
 * <p>
149
 * If the parsing exception is because Smack received illegal input, then please consider informing the authors of the
150
 * originating entity about that. If it was thrown because of an bug in a Smack parser, then please consider filling a
151
 * bug with Smack.
152
 * </p>
153
 * <h3>Managing the parsing exception callback</h3>
154
 * <p>
155
 * The "fail-hard per default" behavior is achieved by using the
156
 * {@link org.jivesoftware.smack.parsing.ExceptionThrowingCallbackWithHint} as default parsing exception callback. You
157
 * can change the behavior using {@link #setParsingExceptionCallback(ParsingExceptionCallback)} to set a new callback.
158
 * Use {@link org.jivesoftware.smack.SmackConfiguration#setDefaultParsingExceptionCallback(ParsingExceptionCallback)} to
159
 * set the default callback.
160
 * </p>
161
 */
162
public abstract class AbstractXMPPConnection implements XMPPConnection {
1✔
163
    private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName());
1✔
164

165
    protected static final SmackReactor SMACK_REACTOR;
166

167
    static {
168
        SMACK_REACTOR = SmackReactor.getInstance();
1✔
169
    }
170

171
    /**
172
     * Counter to uniquely identify connections that are created.
173
     */
174
    private static final AtomicInteger connectionCounter = new AtomicInteger(0);
1✔
175

176
    static {
177
        Smack.ensureInitialized();
1✔
178
    }
179

180
    protected enum SyncPointState {
×
181
        initial,
×
182
        request_sent,
×
183
        successful,
×
184
    }
185

186
    /**
187
     * A collection of ConnectionListeners which listen for connection closing
188
     * and reconnection events.
189
     */
190
    protected final Set<ConnectionListener> connectionListeners =
1✔
191
            new CopyOnWriteArraySet<>();
192

193
    /**
194
     * A collection of StanzaCollectors which collects packets for a specified filter
195
     * and perform blocking and polling operations on the result queue.
196
     * <p>
197
     * We use a ConcurrentLinkedQueue here, because its Iterator is weakly
198
     * consistent and we want {@link #invokeStanzaCollectorsAndNotifyRecvListeners(Stanza)} for-each
199
     * loop to be lock free. As drawback, removing a StanzaCollector is O(n).
200
     * The alternative would be a synchronized HashSet, but this would mean a
201
     * synchronized block around every usage of <code>collectors</code>.
202
     * </p>
203
     */
204
    private final Collection<StanzaCollector> collectors = new ConcurrentLinkedQueue<>();
1✔
205

206
    private final Map<StanzaListener, ListenerWrapper> recvListeners = new LinkedHashMap<>();
1✔
207

208
    /**
209
     * List of PacketListeners that will be notified synchronously when a new stanza was received.
210
     */
211
    private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>();
1✔
212

213
    /**
214
     * List of PacketListeners that will be notified asynchronously when a new stanza was received.
215
     */
216
    private final Map<StanzaListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>();
1✔
217

218
    /**
219
     * List of PacketListeners that will be notified when a new stanza was sent.
220
     */
221
    private final Map<StanzaListener, ListenerWrapper> sendListeners =
1✔
222
            new HashMap<>();
223

224
    /**
225
     * List of PacketListeners that will be notified when a new stanza is about to be
226
     * sent to the server. These interceptors may modify the stanza before it is being
227
     * actually sent to the server.
228
     */
229
    private final Map<StanzaListener, InterceptorWrapper> interceptors =
1✔
230
            new HashMap<>();
231

232
    private final Map<Consumer<MessageBuilder>, GenericInterceptorWrapper<MessageBuilder, Message>> messageInterceptors = new HashMap<>();
1✔
233

234
    private final Map<Consumer<PresenceBuilder>, GenericInterceptorWrapper<PresenceBuilder, Presence>> presenceInterceptors = new HashMap<>();
1✔
235

236
    private XmlEnvironment incomingStreamXmlEnvironment;
237

238
    protected XmlEnvironment outgoingStreamXmlEnvironment;
239

240
    final MultiMap<QName, NonzaCallback> nonzaCallbacksMap = new MultiMap<>();
1✔
241

242
    protected final Lock connectionLock = new ReentrantLock();
1✔
243

244
    protected final Map<QName, XmlElement> streamFeatures = new HashMap<>();
1✔
245

246
    /**
247
     * The full JID of the authenticated user, as returned by the resource binding response of the server.
248
     * <p>
249
     * It is important that we don't infer the user from the login() arguments and the configurations service name, as,
250
     * for example, when SASL External is used, the username is not given to login but taken from the 'external'
251
     * certificate.
252
     * </p>
253
     */
254
    protected EntityFullJid user;
255

256
    protected boolean connected = false;
1✔
257

258
    /**
259
     * The stream ID, see RFC 6120 § 4.7.3
260
     */
261
    protected String streamId;
262

263
    /**
264
     * The timeout to wait for a reply in milliseconds.
265
     */
266
    private long replyTimeout = SmackConfiguration.getDefaultReplyTimeout();
1✔
267

268
    /**
269
     * The SmackDebugger allows to log and debug XML traffic.
270
     */
271
    protected final SmackDebugger debugger;
272

273
    /**
274
     * The Reader which is used for the debugger.
275
     */
276
    protected Reader reader;
277

278
    /**
279
     * The Writer which is used for the debugger.
280
     */
281
    protected Writer writer;
282

283
    protected SmackException currentSmackException;
284
    protected XMPPException currentXmppException;
285

286
    protected boolean tlsHandled;
287

288
    /**
289
     * Set to <code>true</code> if the last features stanza from the server has been parsed. A XMPP connection
290
     * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature
291
     * stanza is send by the server. This is set to true once the last feature stanza has been
292
     * parsed.
293
     */
294
    protected boolean lastFeaturesReceived;
295

296
    /**
297
     * Set to <code>true</code> if the SASL feature has been received.
298
     */
299
    protected boolean saslFeatureReceived;
300

301
    /**
302
     * A synchronization point which is successful if this connection has received the closing
303
     * stream element from the remote end-point, i.e. the server.
304
     */
305
    protected boolean closingStreamReceived;
306

307
    /**
308
     * The SASLAuthentication manager that is responsible for authenticating with the server.
309
     */
310
    private final SASLAuthentication saslAuthentication;
311

312
    /**
313
     * A number to uniquely identify connections that are created. This is distinct from the
314
     * connection ID, which is a value sent by the server once a connection is made.
315
     */
316
    protected final int connectionCounterValue = connectionCounter.getAndIncrement();
1✔
317

318
    /**
319
     * Holds the initial configuration used while creating the connection.
320
     */
321
    protected final ConnectionConfiguration config;
322

323
    /**
324
     * Defines how the from attribute of outgoing stanzas should be handled.
325
     */
326
    private FromMode fromMode = FromMode.OMITTED;
1✔
327

328
    protected XMPPInputOutputStream compressionHandler;
329

330
    private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback();
1✔
331

332
    /**
333
     * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set
334
     * them 'daemon'.
335
     */
336
    private static final ExecutorService CACHED_EXECUTOR_SERVICE = Executors.newCachedThreadPool(new ThreadFactory() {
1✔
337
        @Override
338
        public Thread newThread(Runnable runnable) {
339
            Thread thread = new Thread(runnable);
1✔
340
            thread.setName("Smack Cached Executor");
1✔
341
            thread.setDaemon(true);
1✔
342
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
1✔
343
                @Override
344
                public void uncaughtException(Thread t, Throwable e) {
345
                    LOGGER.log(Level.WARNING, t + " encountered uncaught exception", e);
×
346
                }
×
347
            });
348
            return thread;
1✔
349
        }
350
    });
351

352
    protected static final AsyncButOrdered<AbstractXMPPConnection> ASYNC_BUT_ORDERED = new AsyncButOrdered<>();
1✔
353

354
    /**
355
     * The used host to establish the connection to
356
     */
357
    protected String host;
358

359
    /**
360
     * The used port to establish the connection to
361
     */
362
    protected UInt16 port;
363

364
    /**
365
     * Flag that indicates if the user is currently authenticated with the server.
366
     */
367
    protected boolean authenticated = false;
1✔
368

369
    // TODO: Migrate to ZonedDateTime once Smack's minimum required Android SDK level is 26 (8.0, Oreo) or higher.
370
    protected long authenticatedConnectionInitiallyEstablishedTimestamp;
371

372
    /**
373
     * Flag that indicates if the user was authenticated with the server when the connection
374
     * to the server was closed (abruptly or not).
375
     */
376
    protected boolean wasAuthenticated = false;
1✔
377

378
    private final Map<QName, IQRequestHandler> setIqRequestHandler = new HashMap<>();
1✔
379
    private final Map<QName, IQRequestHandler> getIqRequestHandler = new HashMap<>();
1✔
380
    private final Set<String> iqRequestHandlerNamespaces = new CopyOnWriteArraySet<>();
1✔
381
    private final Map<String, Integer> iqRequestHandlerNamespacesReferenceCounters = new HashMap<>();
1✔
382

383
    private final StanzaFactory stanzaFactory;
384

385
    /**
386
     * Create a new XMPPConnection to an XMPP server.
387
     *
388
     * @param configuration The configuration which is used to establish the connection.
389
     */
390
    protected AbstractXMPPConnection(ConnectionConfiguration configuration) {
1✔
391
        saslAuthentication = new SASLAuthentication(this, configuration);
1✔
392
        config = configuration;
1✔
393

394
        // Install the SASL Nonza callbacks.
395
        buildNonzaCallback()
1✔
396
            .listenFor(SaslNonza.Challenge.class, c -> {
1✔
397
                try {
398
                    saslAuthentication.challengeReceived(c);
×
399
                } catch (SmackException | InterruptedException e) {
×
400
                    saslAuthentication.authenticationFailed(e);
×
401
                }
×
402
            })
×
403
            .listenFor(SaslNonza.Success.class, s -> {
1✔
404
                try {
405
                    saslAuthentication.authenticated(s);
×
406
                } catch (SmackSaslException | NotConnectedException | InterruptedException e) {
×
407
                    saslAuthentication.authenticationFailed(e);
×
408
                }
×
409
            })
×
410
            .listenFor(SaslNonza.SASLFailure.class, f -> saslAuthentication.authenticationFailed(f))
1✔
411
            .install();
1✔
412

413
        SmackDebuggerFactory debuggerFactory = configuration.getDebuggerFactory();
1✔
414
        if (debuggerFactory != null) {
1✔
415
            debugger = debuggerFactory.create(this);
×
416
        } else {
417
            debugger = null;
1✔
418
        }
419
        // Notify listeners that a new connection has been established
420
        for (ConnectionCreationListener listener : XMPPConnectionRegistry.getConnectionCreationListeners()) {
1✔
421
            listener.connectionCreated(this);
1✔
422
        }
1✔
423

424
        StanzaIdSource stanzaIdSource = configuration.constructStanzaIdSource();
1✔
425
        stanzaFactory = new StanzaFactory(stanzaIdSource);
1✔
426
    }
1✔
427

428
    /**
429
     * Get the connection configuration used by this connection.
430
     *
431
     * @return the connection configuration.
432
     */
433
    public ConnectionConfiguration getConfiguration() {
434
        return config;
1✔
435
    }
436

437
    @Override
438
    public DomainBareJid getXMPPServiceDomain() {
439
        if (xmppServiceDomain != null) {
1✔
440
            return xmppServiceDomain;
×
441
        }
442
        return config.getXMPPServiceDomain();
1✔
443
    }
444

445
    @Override
446
    public String getHost() {
447
        return host;
×
448
    }
449

450
    @Override
451
    public int getPort() {
452
        final UInt16 port = this.port;
×
453
        if (port == null) {
×
454
            return -1;
×
455
        }
456

457
        return port.intValue();
×
458
    }
459

460
    @Override
461
    public abstract boolean isSecureConnection();
462

463
    // Usually batching is a good idea. So the two
464
    // send(Internal|NonBlockingInternal) methods below could be using
465
    // Collection<? extends TopLevelStreamElement> as parameter type instead.
466
    // TODO: Add "batched send" support. Note that for the non-blocking variant, this probably requires a change in
467
    // return type, so that it is possible to signal which messages could be "send" and which not.
468

469
    protected abstract void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException;
470

471
    protected abstract void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException;
472

473
    @SuppressWarnings("deprecation")
474
    @Override
475
    public boolean trySendStanza(Stanza stanza) throws NotConnectedException {
476
        // Default implementation which falls back to sendStanza() as mentioned in the methods javadoc. May be
477
        // overwritten by subclasses.
478
        try {
479
            sendStanza(stanza);
×
480
        } catch (InterruptedException e) {
×
481
            LOGGER.log(Level.FINER,
×
482
                            "Thread blocked in fallback implementation of trySendStanza(Stanza) was interrupted", e);
483
            return false;
×
484
        }
×
485
        return true;
×
486
    }
487

488
    @SuppressWarnings("deprecation")
489
    @Override
490
    public boolean trySendStanza(Stanza stanza, long timeout, TimeUnit unit)
491
                    throws NotConnectedException, InterruptedException {
492
        // Default implementation which falls back to sendStanza() as mentioned in the methods javadoc. May be
493
        // overwritten by subclasses.
494
        sendStanza(stanza);
×
495
        return true;
×
496
    }
497

498
    @Override
499
    public final void sendNonza(Nonza nonza) throws NotConnectedException, InterruptedException {
500
        sendInternal(nonza);
1✔
501
    }
1✔
502

503
    @Override
504
    public final void sendNonzaNonBlocking(Nonza nonza) throws NotConnectedException, OutgoingQueueFullException {
505
        sendNonBlockingInternal(nonza);
×
506
    }
×
507

508
    @Override
509
    public abstract boolean isUsingCompression();
510

511
    protected void initState() {
512
        currentSmackException = null;
1✔
513
        currentXmppException = null;
1✔
514
        saslFeatureReceived = lastFeaturesReceived = tlsHandled = false;
1✔
515
        // TODO: We do not init closingStreamReceived here, as the integration tests use it to check if we waited for
516
        // it.
517
    }
1✔
518

519
    /**
520
     * Establishes a connection to the XMPP server. It basically
521
     * creates and maintains a connection to the server.
522
     * <p>
523
     * Listeners will be preserved from a previous connection.
524
     * </p>
525
     *
526
     * @throws XMPPException if an error occurs on the XMPP protocol level.
527
     * @throws SmackException if an error occurs somewhere else besides XMPP protocol level.
528
     * @throws IOException if an I/O error occurred.
529
     * @return a reference to this object, to chain <code>connect()</code> with <code>login()</code>.
530
     * @throws InterruptedException if the calling thread was interrupted.
531
     */
532
    public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException {
533
        // Check if not already connected
534
        throwAlreadyConnectedExceptionIfAppropriate();
1✔
535

536
        // Notify connection listeners that we are trying to connect
537
        callConnectionConnectingListener();
1✔
538

539
        // Reset the connection state
540
        initState();
1✔
541
        closingStreamReceived = false;
1✔
542
        streamId = null;
1✔
543

544
        try {
545
            // Perform the actual connection to the XMPP service
546
            connectInternal();
1✔
547

548
            // If TLS is required but the server doesn't offer it, disconnect
549
            // from the server and throw an error. First check if we've already negotiated TLS
550
            // and are secure, however (features get parsed a second time after TLS is established).
551
            if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) {
1✔
552
                throw new SecurityRequiredByClientException();
×
553
            }
554
        } catch (SmackException | IOException | XMPPException | InterruptedException e) {
×
555
            instantShutdown();
×
556
            throw e;
×
557
        }
1✔
558

559
        // If connectInternal() did not throw, then this connection must now be marked as connected.
560
        assert connected;
1✔
561

562
        callConnectionConnectedListener();
1✔
563

564
        return this;
1✔
565
    }
566

567
    /**
568
     * Abstract method that concrete subclasses of XMPPConnection need to implement to perform their
569
     * way of XMPP connection establishment. Implementations are required to perform an automatic
570
     * login if the previous connection state was logged (authenticated).
571
     *
572
     * @throws SmackException if Smack detected an exceptional situation.
573
     * @throws IOException if an I/O error occurred.
574
     * @throws XMPPException if an XMPP protocol error was received.
575
     * @throws InterruptedException if the calling thread was interrupted.
576
     */
577
    protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException;
578

579
    private String usedUsername, usedPassword;
580

581
    /**
582
     * The resourcepart used for this connection. May not be the resulting resourcepart if it's null or overridden by the XMPP service.
583
     */
584
    private Resourcepart usedResource;
585

586
    /**
587
     * Logs in to the server using the strongest SASL mechanism supported by
588
     * the server. If more than the connection's default stanza timeout elapses in each step of the
589
     * authentication process without a response from the server, a
590
     * {@link SmackException.NoResponseException} will be thrown.
591
     * <p>
592
     * Before logging in (i.e. authenticate) to the server the connection must be connected
593
     * by calling {@link #connect}.
594
     * </p>
595
     * <p>
596
     * It is possible to log in without sending an initial available presence by using
597
     * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}.
598
     * Finally, if you want to not pass a password and instead use a more advanced mechanism
599
     * while using SASL then you may be interested in using
600
     * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}.
601
     * For more advanced login settings see {@link ConnectionConfiguration}.
602
     * </p>
603
     *
604
     * @throws XMPPException if an error occurs on the XMPP protocol level.
605
     * @throws SmackException if an error occurs somewhere else besides XMPP protocol level.
606
     * @throws IOException if an I/O error occurs during login.
607
     * @throws InterruptedException if the calling thread was interrupted.
608
     */
609
    public synchronized void login() throws XMPPException, SmackException, IOException, InterruptedException {
610
        // The previously used username, password and resource take over precedence over the
611
        // ones from the connection configuration
612
        CharSequence username = usedUsername != null ? usedUsername : config.getUsername();
1✔
613
        String password = usedPassword != null ? usedPassword : config.getPassword();
1✔
614
        Resourcepart resource = usedResource != null ? usedResource : config.getResource();
1✔
615
        login(username, password, resource);
1✔
616
    }
1✔
617

618
    /**
619
     * Same as {@link #login(CharSequence, String, Resourcepart)}, but takes the resource from the connection
620
     * configuration.
621
     *
622
     * @param username TODO javadoc me please
623
     * @param password TODO javadoc me please
624
     * @throws XMPPException if an XMPP protocol error was received.
625
     * @throws SmackException if Smack detected an exceptional situation.
626
     * @throws IOException if an I/O error occurred.
627
     * @throws InterruptedException if the calling thread was interrupted.
628
     * @see #login
629
     */
630
    public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException,
631
                    IOException, InterruptedException {
632
        login(username, password, config.getResource());
×
633
    }
×
634

635
    /**
636
     * Login with the given username (authorization identity). You may omit the password if a callback handler is used.
637
     * If resource is null, then the server will generate one.
638
     *
639
     * @param username TODO javadoc me please
640
     * @param password TODO javadoc me please
641
     * @param resource TODO javadoc me please
642
     * @throws XMPPException if an XMPP protocol error was received.
643
     * @throws SmackException if Smack detected an exceptional situation.
644
     * @throws IOException if an I/O error occurred.
645
     * @throws InterruptedException if the calling thread was interrupted.
646
     * @see #login
647
     */
648
    public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException,
649
                    SmackException, IOException, InterruptedException {
650
        if (!config.allowNullOrEmptyUsername) {
1✔
651
            StringUtils.requireNotNullNorEmpty(username, "Username must not be null nor empty");
1✔
652
        }
653
        throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?");
1✔
654
        throwAlreadyLoggedInExceptionIfAppropriate();
1✔
655
        usedUsername = username != null ? username.toString() : null;
1✔
656
        usedPassword = password;
1✔
657
        usedResource = resource;
1✔
658
        loginInternal(usedUsername, usedPassword, usedResource);
1✔
659
    }
1✔
660

661
    protected abstract void loginInternal(String username, String password, Resourcepart resource)
662
                    throws XMPPException, SmackException, IOException, InterruptedException;
663

664
    @Override
665
    public final boolean isConnected() {
666
        return connected;
1✔
667
    }
668

669
    @Override
670
    public final boolean isAuthenticated() {
671
        return authenticated;
1✔
672
    }
673

674
    @Override
675
    public final EntityFullJid getUser() {
676
        return user;
1✔
677
    }
678

679
    @Override
680
    public String getStreamId() {
681
        if (!isConnected()) {
×
682
            return null;
×
683
        }
684
        return streamId;
×
685
    }
686

687
    protected final void throwCurrentConnectionException() throws SmackException, XMPPException {
688
        if (currentSmackException != null) {
×
689
            throw currentSmackException;
×
690
        } else if (currentXmppException != null) {
×
691
            throw currentXmppException;
×
692
        }
693

694
        throw new AssertionError("No current connection exception set, although throwCurrentException() was called");
×
695
    }
696

697
    protected final boolean hasCurrentConnectionException() {
698
        return currentSmackException != null || currentXmppException != null;
×
699
    }
700

701
    protected final void setCurrentConnectionExceptionAndNotify(Exception exception) {
702
        if (exception instanceof SmackException) {
×
703
            currentSmackException = (SmackException) exception;
×
704
        } else if (exception instanceof XMPPException) {
×
705
            currentXmppException = (XMPPException) exception;
×
706
        } else {
707
            currentSmackException = new SmackException.SmackWrappedException(exception);
×
708
        }
709

710
        notifyWaitingThreads();
×
711
    }
×
712

713
    /**
714
     * We use an extra object for {@link #notifyWaitingThreads()} and {@link #waitFor(Supplier)}, because all state
715
     * changing methods of the connection are synchronized using the connection instance as monitor. If we now would
716
     * also use the connection instance for the internal process to wait for a condition, the {@link Object#wait()}
717
     * would leave the monitor when it waits, which would allow for another potential call to a state changing function
718
     * to proceed.
719
     */
720
    private final Object internalMonitor = new Object();
1✔
721

722
    protected final void notifyWaitingThreads() {
723
        synchronized (internalMonitor) {
1✔
724
            internalMonitor.notifyAll();
1✔
725
        }
1✔
726
    }
1✔
727

728
    protected final boolean waitFor(Supplier<Boolean> condition) throws InterruptedException {
729
        final long deadline = System.currentTimeMillis() + getReplyTimeout();
×
730
        synchronized (internalMonitor) {
×
731
            while (!condition.get().booleanValue()) {
×
732
                final long now = System.currentTimeMillis();
×
733
                if (now >= deadline) {
×
734
                    return false;
×
735
                }
736
                internalMonitor.wait(deadline - now);
×
737
            }
×
738
        }
×
739
        return true;
×
740
    }
741

742
    protected final void waitForConditionOrThrowConnectionException(Supplier<Boolean> condition, String waitFor) throws InterruptedException, SmackException, XMPPException {
743
        boolean success = waitFor(() -> condition.get().booleanValue() || hasCurrentConnectionException());
×
744
        if (hasCurrentConnectionException()) {
×
745
            throwCurrentConnectionException();
×
746
        }
747

748
        // If there was no connection exception and we still did not successfully wait for the condition to hold, then
749
        // we ran into a no-response timeout.
750
        if (!success) {
×
751
            throw NoResponseException.newWith(this, waitFor);
×
752
        }
753
        // Otherwise we successfully awaited the condition.
754
    }
×
755

756
    protected Resourcepart bindResourceAndEstablishSession(Resourcepart resource)
757
                    throws SmackException, InterruptedException, XMPPException {
758
        // Wait until either:
759
        // - the servers last features stanza has been parsed
760
        // - the timeout occurs
761
        LOGGER.finer("Waiting for last features to be received before continuing with resource binding");
×
762
        waitForConditionOrThrowConnectionException(() -> lastFeaturesReceived, "last stream features received from server");
×
763

764
        if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) {
×
765
            // Server never offered resource binding, which is REQUIRED in XMPP client and
766
            // server implementations as per RFC6120 7.2
767
            throw new ResourceBindingNotOfferedException();
×
768
        }
769

770
        // Resource binding, see RFC6120 7.
771
        // Note that we can not use IQReplyFilter here, since the users full JID is not yet
772
        // available. It will become available right after the resource has been successfully bound.
773
        Bind bindResource = Bind.newSet(resource);
×
774
        StanzaCollector packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(bindResource), bindResource);
×
775
        Bind response = packetCollector.nextResultOrThrow();
×
776
        // Set the connections user to the result of resource binding. It is important that we don't infer the user
777
        // from the login() arguments and the configurations service name, as, for example, when SASL External is used,
778
        // the username is not given to login but taken from the 'external' certificate.
779
        user = response.getJid();
×
780
        xmppServiceDomain = user.asDomainBareJid();
×
781

782
        Session.Feature sessionFeature = getFeature(Session.Feature.class);
×
783
        // Only bind the session if it's announced as stream feature by the server, is not optional and not disabled
784
        // For more information see http://tools.ietf.org/html/draft-cridland-xmpp-session-01
785
        if (sessionFeature != null && !sessionFeature.isOptional()) {
×
786
            Session session = new Session();
×
787
            packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(session), session);
×
788
            packetCollector.nextResultOrThrow();
×
789
        }
790

791
        return response.getJid().getResourcepart();
×
792
    }
793

794
    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
795
        if (!resumed) {
×
796
            authenticatedConnectionInitiallyEstablishedTimestamp = System.currentTimeMillis();
×
797
        }
798
        // Indicate that we're now authenticated.
799
        this.authenticated = true;
×
800

801
        // If debugging is enabled, change the debug window title to include the
802
        // name we are now logged-in as.
803
        // If DEBUG was set to true AFTER the connection was created the debugger
804
        // will be null
805
        if (debugger != null) {
×
806
            debugger.userHasLogged(user);
×
807
        }
808
        callConnectionAuthenticatedListener(resumed);
×
809

810
        // Set presence to online. It is important that this is done after
811
        // callConnectionAuthenticatedListener(), as this call will also
812
        // eventually load the roster. And we should load the roster before we
813
        // send the initial presence.
814
        if (config.isSendPresence() && !resumed) {
×
815
            Presence availablePresence = getStanzaFactory()
×
816
                            .buildPresenceStanza()
×
817
                            .ofType(Presence.Type.available)
×
818
                            .build();
×
819
            sendStanza(availablePresence);
×
820
        }
821
    }
×
822

823
    @Override
824
    public final boolean isAnonymous() {
825
        return isAuthenticated() && SASLAnonymous.NAME.equals(getUsedSaslMechansism());
×
826
    }
827

828
    /**
829
     * Get the name of the SASL mechanism that was used to authenticate this connection. This returns the name of
830
     * mechanism which was used the last time this connection was authenticated, and will return <code>null</code> if
831
     * this connection was not authenticated before.
832
     *
833
     * @return the name of the used SASL mechanism.
834
     * @since 4.2
835
     */
836
    public final String getUsedSaslMechansism() {
837
        return saslAuthentication.getNameOfLastUsedSaslMechansism();
×
838
    }
839

840
    private DomainBareJid xmppServiceDomain;
841

842
    protected Lock getConnectionLock() {
843
        return connectionLock;
×
844
    }
845

846
    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
847
        throwNotConnectedExceptionIfAppropriate(null);
1✔
848
    }
1✔
849

850
    protected void throwNotConnectedExceptionIfAppropriate(String optionalHint) throws NotConnectedException {
851
        if (!isConnected()) {
1✔
852
            throw new NotConnectedException(optionalHint);
×
853
        }
854
    }
1✔
855

856
    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
857
        if (isConnected()) {
1✔
858
            throw new AlreadyConnectedException();
×
859
        }
860
    }
1✔
861

862
    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
863
        if (isAuthenticated()) {
1✔
864
            throw new AlreadyLoggedInException();
×
865
        }
866
    }
1✔
867

868
    @Override
869
    public final StanzaFactory getStanzaFactory() {
870
        return stanzaFactory;
1✔
871
    }
872

873
    private Stanza preSendStanza(Stanza stanza) throws NotConnectedException {
874
        Objects.requireNonNull(stanza, "Stanza must not be null");
1✔
875
        assert stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ;
1✔
876

877
        throwNotConnectedExceptionIfAppropriate();
1✔
878
        switch (fromMode) {
1✔
879
        case OMITTED:
880
            stanza.setFrom((Jid) null);
1✔
881
            break;
1✔
882
        case USER:
883
            stanza.setFrom(getUser());
×
884
            break;
×
885
        case UNCHANGED:
886
        default:
887
            break;
888
        }
889
        // Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify
890
        // the content of the stanza.
891
        Stanza stanzaAfterInterceptors = firePacketInterceptors(stanza);
1✔
892
        return stanzaAfterInterceptors;
1✔
893
    }
894

895
    @Override
896
    public final void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
897
        stanza = preSendStanza(stanza);
1✔
898
        sendInternal(stanza);
1✔
899
    }
1✔
900

901
    @Override
902
    public final void sendStanzaNonBlocking(Stanza stanza) throws NotConnectedException, OutgoingQueueFullException {
903
        stanza = preSendStanza(stanza);
1✔
904
        sendNonBlockingInternal(stanza);
1✔
905
    }
1✔
906

907
    /**
908
     * Authenticate a connection.
909
     *
910
     * @param username the username that is authenticating with the server.
911
     * @param password the password to send to the server.
912
     * @param authzid the authorization identifier (typically null).
913
     * @param sslSession the optional SSL/TLS session (if one was established)
914
     * @return the used SASLMechanism.
915
     * @throws XMPPErrorException if there was an XMPP error returned.
916
     * @throws SASLErrorException if a SASL protocol error was returned.
917
     * @throws IOException if an I/O error occurred.
918
     * @throws InterruptedException if the calling thread was interrupted.
919
     * @throws SmackSaslException if a SASL specific error occurred.
920
     * @throws NotConnectedException if the XMPP connection is not connected.
921
     * @throws NoResponseException if there was no response from the remote entity.
922
     * @throws SmackWrappedException in case of an exception.
923
     * @see SASLAuthentication#authenticate(String, String, EntityBareJid, SSLSession)
924
     */
925
    protected final SASLMechanism authenticate(String username, String password, EntityBareJid authzid,
926
                    SSLSession sslSession) throws XMPPErrorException, SASLErrorException, SmackSaslException,
927
                    NotConnectedException, NoResponseException, IOException, InterruptedException, SmackWrappedException {
928
        SASLMechanism saslMechanism = saslAuthentication.authenticate(username, password, authzid, sslSession);
×
929
        afterSaslAuthenticationSuccess();
×
930
        return saslMechanism;
×
931
    }
932

933
    /**
934
     * Hook for subclasses right after successful SASL authentication. RFC 6120 § 6.4.6. specifies a that the initiating
935
     * entity, needs to initiate a new stream in this case. But some transports, like BOSH, requires a special handling.
936
     * <p>
937
     * Note that we can not reset XMPPTCPConnection's parser here, because this method is invoked by the thread calling
938
     * {@link #login()}, but the parser reset has to be done within the reader thread.
939
     * </p>
940
     *
941
     * @throws NotConnectedException if the XMPP connection is not connected.
942
     * @throws InterruptedException if the calling thread was interrupted.
943
     * @throws SmackWrappedException in case of an exception.
944
     */
945
    protected void afterSaslAuthenticationSuccess()
946
                    throws NotConnectedException, InterruptedException, SmackWrappedException {
947
        sendStreamOpen();
×
948
    }
×
949

950
    protected final boolean isSaslAuthenticated() {
951
        return saslAuthentication.authenticationSuccessful();
×
952
    }
953

954
    /**
955
     * Closes the connection by setting presence to unavailable then closing the connection to
956
     * the XMPP server. The XMPPConnection can still be used for connecting to the server
957
     * again.
958
     *
959
     */
960
    public void disconnect() {
961
        Presence unavailablePresence = null;
1✔
962
        if (isAuthenticated()) {
1✔
963
            unavailablePresence = getStanzaFactory().buildPresenceStanza()
1✔
964
                            .ofType(Presence.Type.unavailable)
1✔
965
                            .build();
1✔
966
        }
967
        try {
968
            disconnect(unavailablePresence);
1✔
969
        }
970
        catch (NotConnectedException e) {
×
971
            LOGGER.log(Level.FINEST, "Connection is already disconnected", e);
×
972
        }
1✔
973
    }
1✔
974

975
    /**
976
     * Closes the connection. A custom unavailable presence is sent to the server, followed
977
     * by closing the stream. The XMPPConnection can still be used for connecting to the server
978
     * again. A custom unavailable presence is useful for communicating offline presence
979
     * information such as "On vacation". Typically, just the status text of the presence
980
     * stanza is set with online information, but most XMPP servers will deliver the full
981
     * presence stanza with whatever data is set.
982
     *
983
     * @param unavailablePresence the optional presence stanza to send during shutdown.
984
     * @throws NotConnectedException if the XMPP connection is not connected.
985
     */
986
    public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException {
987
        if (unavailablePresence != null) {
1✔
988
            try {
989
                sendStanza(unavailablePresence);
1✔
990
            } catch (InterruptedException e) {
×
991
                LOGGER.log(Level.FINE,
×
992
                        "Was interrupted while sending unavailable presence. Continuing to disconnect the connection",
993
                        e);
994
            }
1✔
995
        }
996
        shutdown();
1✔
997
        callConnectionClosedListener();
1✔
998
    }
1✔
999

1000
    private final Object notifyConnectionErrorMonitor = new Object();
1✔
1001

1002
    /**
1003
     * Sends out a notification that there was an error with the connection
1004
     * and closes the connection.
1005
     *
1006
     * @param exception the exception that causes the connection close event.
1007
     */
1008
    protected final void notifyConnectionError(final Exception exception) {
1009
        synchronized (notifyConnectionErrorMonitor) {
×
1010
            if (!isConnected()) {
×
1011
                LOGGER.log(Level.INFO, "Connection was already disconnected when attempting to handle " + exception,
×
1012
                                exception);
1013
                return;
×
1014
            }
1015

1016
            // Note that we first have to set the current connection exception and notify waiting threads, as one of them
1017
            // could hold the instance lock, which we also need later when calling instantShutdown().
1018
            setCurrentConnectionExceptionAndNotify(exception);
×
1019

1020
            // Closes the connection temporary. A if the connection supports stream management, then a reconnection is
1021
            // possible. Note that a connection listener of e.g. XMPPTCPConnection will drop the SM state in
1022
            // case the Exception is a StreamErrorException.
1023
            instantShutdown();
×
1024

1025
            for (StanzaCollector collector : collectors) {
×
1026
                collector.notifyConnectionError(exception);
×
1027
            }
×
1028

1029
            Async.go(() -> {
×
1030
                // Notify connection listeners of the error.
1031
                callConnectionClosedOnErrorListener(exception);
×
1032
            }, AbstractXMPPConnection.this + " callConnectionClosedOnErrorListener()");
×
1033
        }
×
1034
    }
×
1035

1036
    /**
1037
     * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
1038
     */
1039
    public abstract void instantShutdown();
1040

1041
    /**
1042
     * Shuts the current connection down.
1043
     */
1044
    protected abstract void shutdown();
1045

1046
    protected final boolean waitForClosingStreamTagFromServer() {
1047
        try {
1048
            waitForConditionOrThrowConnectionException(() -> closingStreamReceived, "closing stream tag from the server");
×
1049
        } catch (InterruptedException | SmackException | XMPPException e) {
×
1050
            LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e);
×
1051
            return false;
×
1052
        }
×
1053
        return true;
×
1054
    }
1055

1056
    @Override
1057
    public void addConnectionListener(ConnectionListener connectionListener) {
1058
        if (connectionListener == null) {
1✔
1059
            return;
×
1060
        }
1061
        connectionListeners.add(connectionListener);
1✔
1062
    }
1✔
1063

1064
    @Override
1065
    public void removeConnectionListener(ConnectionListener connectionListener) {
1066
        connectionListeners.remove(connectionListener);
×
1067
    }
×
1068

1069
    @Override
1070
    public <I extends IQ> I sendIqRequestAndWaitForResponse(IQ request)
1071
            throws NoResponseException, XMPPErrorException, NotConnectedException, InterruptedException {
1072
        StanzaCollector collector = createStanzaCollectorAndSend(request);
1✔
1073
        IQ resultResponse = collector.nextResultOrThrow();
1✔
1074
        @SuppressWarnings("unchecked")
1075
        I concreteResultResponse = (I) resultResponse;
1✔
1076
        return concreteResultResponse;
1✔
1077
    }
1078

1079
    @Override
1080
    public StanzaCollector createStanzaCollectorAndSend(IQ packet) throws NotConnectedException, InterruptedException {
1081
        StanzaFilter packetFilter = new IQReplyFilter(packet, this);
1✔
1082
        // Create the packet collector before sending the packet
1083
        StanzaCollector packetCollector = createStanzaCollectorAndSend(packetFilter, packet);
1✔
1084
        return packetCollector;
1✔
1085
    }
1086

1087
    @Override
1088
    public StanzaCollector createStanzaCollectorAndSend(StanzaFilter packetFilter, Stanza packet)
1089
                    throws NotConnectedException, InterruptedException {
1090
        StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration()
1✔
1091
                        .setStanzaFilter(packetFilter)
1✔
1092
                        .setRequest(packet);
1✔
1093
        // Create the packet collector before sending the packet
1094
        StanzaCollector packetCollector = createStanzaCollector(configuration);
1✔
1095
        try {
1096
            // Now we can send the packet as the collector has been created
1097
            sendStanza(packet);
1✔
1098
        }
1099
        catch (InterruptedException | NotConnectedException | RuntimeException e) {
×
1100
            packetCollector.cancel();
×
1101
            throw e;
×
1102
        }
1✔
1103
        return packetCollector;
1✔
1104
    }
1105

1106
    @Override
1107
    public StanzaCollector createStanzaCollector(StanzaFilter packetFilter) {
1108
        StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration().setStanzaFilter(packetFilter);
×
1109
        return createStanzaCollector(configuration);
×
1110
    }
1111

1112
    @Override
1113
    public StanzaCollector createStanzaCollector(StanzaCollector.Configuration configuration) {
1114
        StanzaCollector collector = new StanzaCollector(this, configuration);
1✔
1115
        // Add the collector to the list of active collectors.
1116
        collectors.add(collector);
1✔
1117
        return collector;
1✔
1118
    }
1119

1120
    @Override
1121
    public void removeStanzaCollector(StanzaCollector collector) {
1122
        collectors.remove(collector);
1✔
1123
    }
1✔
1124

1125
    @Override
1126
    public final void addStanzaListener(StanzaListener stanzaListener, StanzaFilter stanzaFilter) {
1127
        if (stanzaListener == null) {
×
1128
            throw new NullPointerException("Given stanza listener must not be null");
×
1129
        }
1130
        ListenerWrapper wrapper = new ListenerWrapper(stanzaListener, stanzaFilter);
×
1131
        synchronized (recvListeners) {
×
1132
            recvListeners.put(stanzaListener, wrapper);
×
1133
        }
×
1134
    }
×
1135

1136
    @Override
1137
    public final boolean removeStanzaListener(StanzaListener stanzaListener) {
1138
        synchronized (recvListeners) {
×
1139
            return recvListeners.remove(stanzaListener) != null;
×
1140
        }
1141
    }
1142

1143
    @Override
1144
    public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
1145
        if (packetListener == null) {
1✔
1146
            throw new NullPointerException("Packet listener is null.");
×
1147
        }
1148
        ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
1✔
1149
        synchronized (syncRecvListeners) {
1✔
1150
            syncRecvListeners.put(packetListener, wrapper);
1✔
1151
        }
1✔
1152
    }
1✔
1153

1154
    @Override
1155
    public boolean removeSyncStanzaListener(StanzaListener packetListener) {
1156
        synchronized (syncRecvListeners) {
×
1157
            return syncRecvListeners.remove(packetListener) != null;
×
1158
        }
1159
    }
1160

1161
    @Override
1162
    public void addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) {
1163
        if (packetListener == null) {
1✔
1164
            throw new NullPointerException("Packet listener is null.");
×
1165
        }
1166
        ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
1✔
1167
        synchronized (asyncRecvListeners) {
1✔
1168
            asyncRecvListeners.put(packetListener, wrapper);
1✔
1169
        }
1✔
1170
    }
1✔
1171

1172
    @Override
1173
    public boolean removeAsyncStanzaListener(StanzaListener packetListener) {
1174
        synchronized (asyncRecvListeners) {
1✔
1175
            return asyncRecvListeners.remove(packetListener) != null;
1✔
1176
        }
1177
    }
1178

1179
    @Override
1180
    public void addStanzaSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) {
1181
        if (packetListener == null) {
1✔
1182
            throw new NullPointerException("Packet listener is null.");
×
1183
        }
1184
        ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
1✔
1185
        synchronized (sendListeners) {
1✔
1186
            sendListeners.put(packetListener, wrapper);
1✔
1187
        }
1✔
1188
    }
1✔
1189

1190
    @Override
1191
    public void removeStanzaSendingListener(StanzaListener packetListener) {
1192
        synchronized (sendListeners) {
×
1193
            sendListeners.remove(packetListener);
×
1194
        }
×
1195
    }
×
1196

1197
    /**
1198
     * Process all stanza listeners for sending stanzas.
1199
     * <p>
1200
     * Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread.
1201
     * </p>
1202
     *
1203
     * @param sendTopLevelStreamElement the top level stream element which just got send.
1204
     */
1205
    // TODO: Rename to fireElementSendingListeners().
1206
    @SuppressWarnings("javadoc")
1207
    protected void firePacketSendingListeners(final TopLevelStreamElement sendTopLevelStreamElement) {
1208
        if (debugger != null) {
×
1209
            debugger.onOutgoingStreamElement(sendTopLevelStreamElement);
×
1210
        }
1211

1212
        if (!(sendTopLevelStreamElement instanceof Stanza)) {
×
1213
            return;
×
1214
        }
1215
        Stanza packet = (Stanza) sendTopLevelStreamElement;
×
1216

1217
        final List<StanzaListener> listenersToNotify = new LinkedList<>();
×
1218
        synchronized (sendListeners) {
×
1219
            for (ListenerWrapper listenerWrapper : sendListeners.values()) {
×
1220
                if (listenerWrapper.filterMatches(packet)) {
×
1221
                    listenersToNotify.add(listenerWrapper.getListener());
×
1222
                }
1223
            }
×
1224
        }
×
1225
        if (listenersToNotify.isEmpty()) {
×
1226
            return;
×
1227
        }
1228
        // Notify in a new thread, because we can
1229
        asyncGo(new Runnable() {
×
1230
            @Override
1231
            public void run() {
1232
                for (StanzaListener listener : listenersToNotify) {
×
1233
                    try {
1234
                        listener.processStanza(packet);
×
1235
                    }
1236
                    catch (Exception e) {
×
1237
                        LOGGER.log(Level.WARNING, "Sending listener threw exception", e);
×
1238
                        continue;
×
1239
                    }
×
1240
                }
×
1241
            }
×
1242
        });
1243
    }
×
1244

1245
    @Deprecated
1246
    @Override
1247
    public void addStanzaInterceptor(StanzaListener packetInterceptor,
1248
            StanzaFilter packetFilter) {
1249
        if (packetInterceptor == null) {
×
1250
            throw new NullPointerException("Packet interceptor is null.");
×
1251
        }
1252
        InterceptorWrapper interceptorWrapper = new InterceptorWrapper(packetInterceptor, packetFilter);
×
1253
        synchronized (interceptors) {
×
1254
            interceptors.put(packetInterceptor, interceptorWrapper);
×
1255
        }
×
1256
    }
×
1257

1258
    @Deprecated
1259
    @Override
1260
    public void removeStanzaInterceptor(StanzaListener packetInterceptor) {
1261
        synchronized (interceptors) {
×
1262
            interceptors.remove(packetInterceptor);
×
1263
        }
×
1264
    }
×
1265

1266
    private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> void addInterceptor(
1267
                    Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors, Consumer<MPB> interceptor,
1268
                    Predicate<MP> filter) {
1269
        Objects.requireNonNull(interceptor, "Interceptor must not be null");
1✔
1270

1271
        GenericInterceptorWrapper<MPB, MP> interceptorWrapper = new GenericInterceptorWrapper<>(interceptor, filter);
1✔
1272

1273
        synchronized (interceptors) {
1✔
1274
            interceptors.put(interceptor, interceptorWrapper);
1✔
1275
        }
1✔
1276
    }
1✔
1277

1278
    private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> void removeInterceptor(
1279
                    Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors, Consumer<MPB> interceptor) {
1280
        synchronized (interceptors) {
×
1281
            interceptors.remove(interceptor);
×
1282
        }
×
1283
    }
×
1284

1285
    @Override
1286
    public void addMessageInterceptor(Consumer<MessageBuilder> messageInterceptor, Predicate<Message> messageFilter) {
1287
        addInterceptor(messageInterceptors, messageInterceptor, messageFilter);
1✔
1288
    }
1✔
1289

1290
    @Override
1291
    public void removeMessageInterceptor(Consumer<MessageBuilder> messageInterceptor) {
1292
        removeInterceptor(messageInterceptors, messageInterceptor);
×
1293
    }
×
1294

1295
    @Override
1296
    public void addPresenceInterceptor(Consumer<PresenceBuilder> presenceInterceptor,
1297
                    Predicate<Presence> presenceFilter) {
1298
        addInterceptor(presenceInterceptors, presenceInterceptor, presenceFilter);
1✔
1299
    }
1✔
1300

1301
    @Override
1302
    public void removePresenceInterceptor(Consumer<PresenceBuilder> presenceInterceptor) {
1303
        removeInterceptor(presenceInterceptors, presenceInterceptor);
×
1304
    }
×
1305

1306
    private static <MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> MP fireMessageOrPresenceInterceptors(
1307
                    MP messageOrPresence, Map<Consumer<MPB>, GenericInterceptorWrapper<MPB, MP>> interceptors) {
1308
        List<Consumer<MPB>> interceptorsToInvoke = new LinkedList<>();
1✔
1309
        synchronized (interceptors) {
1✔
1310
            for (GenericInterceptorWrapper<MPB, MP> interceptorWrapper : interceptors.values()) {
1✔
1311
                if (interceptorWrapper.filterMatches(messageOrPresence)) {
1✔
1312
                    Consumer<MPB> interceptor = interceptorWrapper.getInterceptor();
×
1313
                    interceptorsToInvoke.add(interceptor);
×
1314
                }
1315
            }
1✔
1316
        }
1✔
1317

1318
        // Avoid transforming the stanza to a builder if there is no interceptor.
1319
        if (interceptorsToInvoke.isEmpty()) {
1✔
1320
            return messageOrPresence;
1✔
1321
        }
1322

1323
        MPB builder = messageOrPresence.asBuilder();
×
1324
        for (Consumer<MPB> interceptor : interceptorsToInvoke) {
×
1325
            interceptor.accept(builder);
×
1326
        }
×
1327

1328
        // Now that the interceptors have (probably) modified the stanza in its builder form, we need to re-assemble it.
1329
        messageOrPresence = builder.build();
×
1330
        return messageOrPresence;
×
1331
    }
1332

1333
    /**
1334
     * Process interceptors. Interceptors may modify the stanza that is about to be sent.
1335
     * Since the thread that requested to send the stanza will invoke all interceptors, it
1336
     * is important that interceptors perform their work as soon as possible so that the
1337
     * thread does not remain blocked for a long period.
1338
     *
1339
     * @param packet the stanza that is going to be sent to the server.
1340
     * @return the, potentially modified stanza, after the interceptors are run.
1341
     */
1342
    private Stanza firePacketInterceptors(Stanza packet) {
1343
        List<StanzaListener> interceptorsToInvoke = new LinkedList<>();
1✔
1344
        synchronized (interceptors) {
1✔
1345
            for (InterceptorWrapper interceptorWrapper : interceptors.values()) {
1✔
1346
                if (interceptorWrapper.filterMatches(packet)) {
×
1347
                    interceptorsToInvoke.add(interceptorWrapper.getInterceptor());
×
1348
                }
1349
            }
×
1350
        }
1✔
1351
        for (StanzaListener interceptor : interceptorsToInvoke) {
1✔
1352
            try {
1353
                interceptor.processStanza(packet);
×
1354
            } catch (Exception e) {
×
1355
                LOGGER.log(Level.SEVERE, "Packet interceptor threw exception", e);
×
1356
            }
×
1357
        }
×
1358

1359
        final Stanza stanzaAfterInterceptors;
1360
        if (packet instanceof Message) {
1✔
1361
            Message message = (Message) packet;
1✔
1362
            stanzaAfterInterceptors = fireMessageOrPresenceInterceptors(message, messageInterceptors);
1✔
1363
        }
1✔
1364
        else if (packet instanceof Presence) {
1✔
1365
            Presence presence = (Presence) packet;
1✔
1366
            stanzaAfterInterceptors = fireMessageOrPresenceInterceptors(presence, presenceInterceptors);
1✔
1367
        } else {
1✔
1368
            // We do not (yet) support interceptors for IQ stanzas.
1369
            assert packet instanceof IQ;
1✔
1370
            stanzaAfterInterceptors = packet;
1✔
1371
        }
1372

1373
        return stanzaAfterInterceptors;
1✔
1374
    }
1375

1376
    /**
1377
     * Initialize the {@link #debugger}. You can specify a customized {@link SmackDebugger}
1378
     * by setup the system property <code>smack.debuggerClass</code> to the implementation.
1379
     *
1380
     * @throws IllegalStateException if the reader or writer isn't yet initialized.
1381
     * @throws IllegalArgumentException if the SmackDebugger can't be loaded.
1382
     */
1383
    protected void initDebugger() {
1384
        if (reader == null || writer == null) {
×
1385
            throw new NullPointerException("Reader or writer isn't initialized.");
×
1386
        }
1387
        // If debugging is enabled, we open a window and write out all network traffic.
1388
        if (debugger != null) {
×
1389
            // Obtain new reader and writer from the existing debugger
1390
            reader = debugger.newConnectionReader(reader);
×
1391
            writer = debugger.newConnectionWriter(writer);
×
1392
        }
1393
    }
×
1394

1395
    @Override
1396
    public long getReplyTimeout() {
1397
        return replyTimeout;
1✔
1398
    }
1399

1400
    @Override
1401
    public void setReplyTimeout(long timeout) {
1402
        if (Long.MAX_VALUE - System.currentTimeMillis() < timeout) {
1✔
1403
            throw new IllegalArgumentException("Extremely long reply timeout");
×
1404
        }
1405
        else {
1406
            replyTimeout = timeout;
1✔
1407
        }
1408
    }
1✔
1409

1410
    private SmackConfiguration.UnknownIqRequestReplyMode unknownIqRequestReplyMode = SmackConfiguration.getUnknownIqRequestReplyMode();
1✔
1411

1412
    /**
1413
     * Set how Smack behaves when an unknown IQ request has been received.
1414
     *
1415
     * @param unknownIqRequestReplyMode reply mode.
1416
     */
1417
    public void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) {
1418
        this.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Mode must not be null");
×
1419
    }
×
1420

1421
    protected final NonzaCallback.Builder buildNonzaCallback() {
1422
        return new NonzaCallback.Builder(this);
1✔
1423
    }
1424

1425
    protected <SN extends Nonza, FN extends Nonza> SN sendAndWaitForResponse(Nonza nonza, Class<SN> successNonzaClass,
1426
                    Class<FN> failedNonzaClass)
1427
                    throws NoResponseException, NotConnectedException, InterruptedException, FailedNonzaException {
1428
        NonzaCallback.Builder builder = buildNonzaCallback();
×
1429
        SN successNonza = NonzaCallback.sendAndWaitForResponse(builder, nonza, successNonzaClass, failedNonzaClass);
×
1430
        return successNonza;
×
1431
    }
1432

1433
    private void maybeNotifyDebuggerAboutIncoming(TopLevelStreamElement incomingTopLevelStreamElement) {
1434
        final SmackDebugger debugger = this.debugger;
×
1435
        if (debugger != null) {
×
1436
            debugger.onIncomingStreamElement(incomingTopLevelStreamElement);
×
1437
        }
1438
    }
×
1439

1440
    protected final void parseAndProcessNonza(XmlPullParser parser) throws IOException, XmlPullParserException, SmackParsingException {
1441
        ParserUtils.assertAtStartTag(parser);
×
1442

1443
        final int initialDepth = parser.getDepth();
×
1444
        final String element = parser.getName();
×
1445
        final String namespace = parser.getNamespace();
×
1446
        final QName key = new QName(namespace, element);
×
1447

1448
        NonzaProvider<? extends Nonza> nonzaProvider = ProviderManager.getNonzaProvider(key);
×
1449
        if (nonzaProvider == null) {
×
1450
            LOGGER.severe("Unknown nonza: " + key);
×
1451
            ParserUtils.forwardToEndTagOfDepth(parser, initialDepth);
×
1452
            return;
×
1453
        }
1454

1455
        List<NonzaCallback> nonzaCallbacks;
1456
        synchronized (nonzaCallbacksMap) {
×
1457
            nonzaCallbacks = nonzaCallbacksMap.getAll(key);
×
1458
            nonzaCallbacks = CollectionUtil.newListWith(nonzaCallbacks);
×
1459
        }
×
1460
        if (nonzaCallbacks == null) {
×
1461
            LOGGER.info("No nonza callback for " + key);
×
1462
            ParserUtils.forwardToEndTagOfDepth(parser, initialDepth);
×
1463
            return;
×
1464
        }
1465

1466
        Nonza nonza = nonzaProvider.parse(parser, incomingStreamXmlEnvironment);
×
1467

1468
        maybeNotifyDebuggerAboutIncoming(nonza);
×
1469

1470
        for (NonzaCallback nonzaCallback : nonzaCallbacks) {
×
1471
            nonzaCallback.onNonzaReceived(nonza);
×
1472
        }
×
1473
    }
×
1474

1475
    protected void parseAndProcessStanza(XmlPullParser parser)
1476
                    throws XmlPullParserException, IOException, InterruptedException {
1477
        ParserUtils.assertAtStartTag(parser);
×
1478
        int parserDepth = parser.getDepth();
×
1479
        Stanza stanza = null;
×
1480
        try {
1481
            stanza = PacketParserUtils.parseStanza(parser, incomingStreamXmlEnvironment);
×
1482
        }
1483
        catch (XmlPullParserException | SmackParsingException | IOException | IllegalArgumentException e) {
×
1484
            CharSequence content = PacketParserUtils.parseContentDepth(parser,
×
1485
                            parserDepth);
1486
            UnparseableStanza message = new UnparseableStanza(content, e);
×
1487
            ParsingExceptionCallback callback = getParsingExceptionCallback();
×
1488
            if (callback != null) {
×
1489
                callback.handleUnparsableStanza(message);
×
1490
            }
1491
        }
×
1492
        ParserUtils.assertAtEndTag(parser);
×
1493
        if (stanza != null) {
×
1494
            processStanza(stanza);
×
1495
        }
1496
    }
×
1497

1498
    /**
1499
     * Processes a stanza after it's been fully parsed by looping through the installed
1500
     * stanza collectors and listeners and letting them examine the stanza to see if
1501
     * they are a match with the filter.
1502
     *
1503
     * @param stanza the stanza to process.
1504
     * @throws InterruptedException if the calling thread was interrupted.
1505
     */
1506
    protected void processStanza(final Stanza stanza) throws InterruptedException {
1507
        assert stanza != null;
×
1508

1509
        maybeNotifyDebuggerAboutIncoming(stanza);
×
1510

1511
        lastStanzaReceived = System.currentTimeMillis();
×
1512
        // Deliver the incoming packet to listeners.
1513
        invokeStanzaCollectorsAndNotifyRecvListeners(stanza);
×
1514
    }
×
1515

1516
    /**
1517
     * Invoke {@link StanzaCollector#processStanza(Stanza)} for every
1518
     * StanzaCollector with the given packet. Also notify the receive listeners with a matching stanza filter about the packet.
1519
     * <p>
1520
     * This method will be invoked by the connections incoming processing thread which may be shared across multiple connections and
1521
     * thus it is important that no user code, e.g. in form of a callback, is invoked by this method. For the same reason,
1522
     * this method must not block for an extended period of time.
1523
     * </p>
1524
     *
1525
     * @param packet the stanza to notify the StanzaCollectors and receive listeners about.
1526
     */
1527
    protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) {
1528
        if (packet instanceof IQ) {
1✔
1529
            final IQ iq = (IQ) packet;
1✔
1530
            if (iq.isRequestIQ()) {
1✔
1531
                final IQ iqRequest = iq;
1✔
1532
                final QName key = iqRequest.getChildElementQName();
1✔
1533
                IQRequestHandler iqRequestHandler;
1534
                final IQ.Type type = iq.getType();
1✔
1535
                switch (type) {
1✔
1536
                case set:
1537
                    synchronized (setIqRequestHandler) {
1✔
1538
                        iqRequestHandler = setIqRequestHandler.get(key);
1✔
1539
                    }
1✔
1540
                    break;
1✔
1541
                case get:
1542
                    synchronized (getIqRequestHandler) {
1✔
1543
                        iqRequestHandler = getIqRequestHandler.get(key);
1✔
1544
                    }
1✔
1545
                    break;
1✔
1546
                default:
1547
                    throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'");
×
1548
                }
1549
                if (iqRequestHandler == null) {
1✔
1550
                    final String iqNamespace = key.getNamespaceURI();
×
1551
                    StanzaError.Condition replyCondition;
1552
                    switch (unknownIqRequestReplyMode) {
×
1553
                    case doNotReply:
1554
                        return;
×
1555
                    case reply:
1556
                        boolean isKnownNamespace = iqRequestHandlerNamespaces.contains(iqNamespace);
×
1557
                        if (isKnownNamespace) {
×
1558
                            replyCondition = StanzaError.Condition.feature_not_implemented;
×
1559
                        } else {
1560
                            replyCondition = StanzaError.Condition.service_unavailable;
×
1561
                        }
1562
                        break;
×
1563
                    default:
1564
                        throw new AssertionError();
×
1565
                    }
1566

1567
                    // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an
1568
                    // IQ of type 'error' with condition 'service-unavailable'.
1569
                    final ErrorIQ errorIQ = IQ.createErrorResponse(iq, StanzaError.getBuilder(
×
1570
                                    replyCondition).build());
×
1571
                    // Use async sendStanza() here, since if sendStanza() would block, then some connections, e.g.
1572
                    // XmppNioTcpConnection, would deadlock, as this operation is performed in the same thread that is
1573
                    asyncGo(() -> {
×
1574
                        try {
1575
                            sendStanza(errorIQ);
×
1576
                        }
1577
                        catch (InterruptedException | NotConnectedException e) {
×
1578
                            LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e);
×
1579
                        }
×
1580
                    });
×
1581
                } else {
×
1582
                    Executor executorService = null;
1✔
1583
                    switch (iqRequestHandler.getMode()) {
1✔
1584
                    case sync:
1585
                        executorService = ASYNC_BUT_ORDERED.asExecutorFor(this);
1✔
1586
                        break;
1✔
1587
                    case async:
1588
                        executorService = this::asyncGoLimited;
1✔
1589
                        break;
1590
                    }
1591
                    final IQRequestHandler finalIqRequestHandler = iqRequestHandler;
1✔
1592
                    executorService.execute(new Runnable() {
1✔
1593
                        @Override
1594
                        public void run() {
1595
                            IQ response = finalIqRequestHandler.handleIQRequest(iq);
1✔
1596
                            if (response == null) {
1✔
1597
                                // It is not ideal if the IQ request handler does not return an IQ response, because RFC
1598
                                // 6120 § 8.1.2 does specify that a response is mandatory. But some APIs, mostly the
1599
                                // file transfer one, does not always return a result, so we need to handle this case.
1600
                                // Also sometimes a request handler may decide that it's better to not send a response,
1601
                                // e.g. to avoid presence leaks.
1602
                                return;
×
1603
                            }
1604

1605
                            assert response.isResponseIQ();
1✔
1606

1607
                            response.setTo(iqRequest.getFrom());
1✔
1608
                            response.setStanzaId(iqRequest.getStanzaId());
1✔
1609
                            try {
1610
                                sendStanza(response);
1✔
1611
                            }
1612
                            catch (InterruptedException | NotConnectedException e) {
×
1613
                                LOGGER.log(Level.WARNING, "Exception while sending response to IQ request", e);
×
1614
                            }
1✔
1615
                        }
1✔
1616
                    });
1617
                }
1618
                // The following returns makes it impossible for packet listeners and collectors to
1619
                // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the
1620
                // desired behavior.
1621
                return;
1✔
1622
            }
1623
        }
1624

1625
        // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below,
1626
        // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in
1627
        // their own thread.
1628
        final Collection<StanzaListener> listenersToNotify = new LinkedList<>();
1✔
1629
        extractMatchingListeners(packet, asyncRecvListeners, listenersToNotify);
1✔
1630
        for (final StanzaListener listener : listenersToNotify) {
1✔
1631
            asyncGoLimited(new Runnable() {
1✔
1632
                @Override
1633
                public void run() {
1634
                    try {
1635
                        listener.processStanza(packet);
1✔
1636
                    } catch (Exception e) {
×
1637
                        LOGGER.log(Level.SEVERE, "Exception in async packet listener", e);
×
1638
                    }
1✔
1639
                }
1✔
1640
            });
1641
        }
1✔
1642

1643
        // Loop through all collectors and notify the appropriate ones.
1644
        for (StanzaCollector collector : collectors) {
1✔
1645
            collector.processStanza(packet);
1✔
1646
        }
1✔
1647

1648
        listenersToNotify.clear();
1✔
1649
        extractMatchingListeners(packet, recvListeners, listenersToNotify);
1✔
1650
        final Semaphore listenerSemaphore = new Semaphore(1 - listenersToNotify.size());
1✔
1651
        for (StanzaListener stanzaListener : listenersToNotify) {
1✔
1652
            asyncGoLimited(() -> {
×
1653
                try {
1654
                    stanzaListener.processStanza(packet);
×
1655
                }
1656
                catch (NotConnectedException e) {
×
1657
                    LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
×
1658
                }
1659
                catch (Exception e) {
×
1660
                    LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
×
1661
                } finally {
1662
                    listenerSemaphore.release();
×
1663
                }
1664
            });
×
1665
        }
×
1666
        listenerSemaphore.acquireUninterruptibly();
1✔
1667

1668
        // Notify the receive listeners interested in the packet
1669
        listenersToNotify.clear();
1✔
1670
        extractMatchingListeners(packet, syncRecvListeners, listenersToNotify);
1✔
1671
        // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single
1672
        // threaded executor service and therefore keeps the order.
1673
        ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
1✔
1674
            @Override
1675
            public void run() {
1676
                // As listeners are able to remove themselves and because the timepoint where it is decided to invoke a
1677
                // listener is a different timepoint where the listener is actually invoked (here), we have to check
1678
                // again if the listener is still active.
1679
                Iterator<StanzaListener> it = listenersToNotify.iterator();
1✔
1680
                synchronized (syncRecvListeners) {
1✔
1681
                    while (it.hasNext()) {
1✔
1682
                        StanzaListener stanzaListener = it.next();
1✔
1683
                        if (!syncRecvListeners.containsKey(stanzaListener)) {
1✔
1684
                            // The listener was removed from syncRecvListener, also remove him from listenersToNotify.
1685
                            it.remove();
×
1686
                        }
1687
                    }
1✔
1688
                }
1✔
1689
                for (StanzaListener listener : listenersToNotify) {
1✔
1690
                    try {
1691
                        listener.processStanza(packet);
1✔
1692
                    } catch (NotConnectedException e) {
×
1693
                        LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e);
×
1694
                        break;
×
1695
                    } catch (Exception e) {
×
1696
                        LOGGER.log(Level.SEVERE, "Exception in packet listener", e);
×
1697
                    }
1✔
1698
                }
1✔
1699
            }
1✔
1700
        });
1701
    }
1✔
1702

1703
    private static void extractMatchingListeners(Stanza stanza, Map<StanzaListener, ListenerWrapper> listeners,
1704
                    Collection<StanzaListener> listenersToNotify) {
1705
        synchronized (listeners) {
1✔
1706
            for (ListenerWrapper listenerWrapper : listeners.values()) {
1✔
1707
                if (listenerWrapper.filterMatches(stanza)) {
1✔
1708
                    listenersToNotify.add(listenerWrapper.getListener());
1✔
1709
                }
1710
            }
1✔
1711
        }
1✔
1712
    }
1✔
1713

1714
    /**
1715
     * Sets whether the connection has already logged in the server. This method assures that the
1716
     * {@link #wasAuthenticated} flag is never reset once it has ever been set.
1717
     *
1718
     */
1719
    protected void setWasAuthenticated() {
1720
        // Never reset the flag if the connection has ever been authenticated
1721
        if (!wasAuthenticated) {
×
1722
            wasAuthenticated = authenticated;
×
1723
        }
1724
    }
×
1725

1726
    protected void callConnectionConnectingListener() {
1727
        for (ConnectionListener listener : connectionListeners) {
1✔
1728
            listener.connecting(this);
1✔
1729
        }
1✔
1730
    }
1✔
1731

1732
    protected void callConnectionConnectedListener() {
1733
        for (ConnectionListener listener : connectionListeners) {
1✔
1734
            listener.connected(this);
1✔
1735
        }
1✔
1736
    }
1✔
1737

1738
    protected void callConnectionAuthenticatedListener(boolean resumed) {
1739
        for (ConnectionListener listener : connectionListeners) {
×
1740
            try {
1741
                listener.authenticated(this, resumed);
×
1742
            } catch (Exception e) {
×
1743
                // Catch and print any exception so we can recover
1744
                // from a faulty listener and finish the shutdown process
1745
                LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e);
×
1746
            }
×
1747
        }
×
1748
    }
×
1749

1750
    void callConnectionClosedListener() {
1751
        for (ConnectionListener listener : connectionListeners) {
1✔
1752
            try {
1753
                listener.connectionClosed();
1✔
1754
            }
1755
            catch (Exception e) {
×
1756
                // Catch and print any exception so we can recover
1757
                // from a faulty listener and finish the shutdown process
1758
                LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e);
×
1759
            }
1✔
1760
        }
1✔
1761
    }
1✔
1762

1763
    private void callConnectionClosedOnErrorListener(Exception e) {
1764
        boolean logWarning = true;
×
1765
        if (e instanceof StreamErrorException) {
×
1766
            StreamErrorException see = (StreamErrorException) e;
×
1767
            if (see.getStreamError().getCondition() == StreamError.Condition.not_authorized
×
1768
                            && wasAuthenticated) {
1769
                logWarning = false;
×
1770
                LOGGER.log(Level.FINE,
×
1771
                                "Connection closed with not-authorized stream error after it was already authenticated. The account was likely deleted/unregistered on the server");
1772
            }
1773
        }
1774
        if (logWarning) {
×
1775
            LOGGER.log(Level.WARNING, "Connection " + this + " closed with error", e);
×
1776
        }
1777
        for (ConnectionListener listener : connectionListeners) {
×
1778
            try {
1779
                listener.connectionClosedOnError(e);
×
1780
            }
1781
            catch (Exception e2) {
×
1782
                // Catch and print any exception so we can recover
1783
                // from a faulty listener
1784
                LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e2);
×
1785
            }
×
1786
        }
×
1787
    }
×
1788

1789
    /**
1790
     * A wrapper class to associate a stanza filter with a listener.
1791
     */
1792
    protected static class ListenerWrapper {
1793

1794
        private final StanzaListener packetListener;
1795
        private final StanzaFilter packetFilter;
1796

1797
        /**
1798
         * Create a class which associates a stanza filter with a listener.
1799
         *
1800
         * @param packetListener the stanza listener.
1801
         * @param packetFilter the associated filter or null if it listen for all packets.
1802
         */
1803
        public ListenerWrapper(StanzaListener packetListener, StanzaFilter packetFilter) {
1✔
1804
            this.packetListener = packetListener;
1✔
1805
            this.packetFilter = packetFilter;
1✔
1806
        }
1✔
1807

1808
        public boolean filterMatches(Stanza packet) {
1809
            return packetFilter == null || packetFilter.accept(packet);
1✔
1810
        }
1811

1812
        public StanzaListener getListener() {
1813
            return packetListener;
1✔
1814
        }
1815
    }
1816

1817
    /**
1818
     * A wrapper class to associate a stanza filter with an interceptor.
1819
     */
1820
    @Deprecated
1821
    // TODO: Remove once addStanzaInterceptor is gone.
1822
    protected static class InterceptorWrapper {
1823

1824
        private final StanzaListener packetInterceptor;
1825
        private final StanzaFilter packetFilter;
1826

1827
        /**
1828
         * Create a class which associates a stanza filter with an interceptor.
1829
         *
1830
         * @param packetInterceptor the interceptor.
1831
         * @param packetFilter the associated filter or null if it intercepts all packets.
1832
         */
1833
        public InterceptorWrapper(StanzaListener packetInterceptor, StanzaFilter packetFilter) {
×
1834
            this.packetInterceptor = packetInterceptor;
×
1835
            this.packetFilter = packetFilter;
×
1836
        }
×
1837

1838
        public boolean filterMatches(Stanza packet) {
1839
            return packetFilter == null || packetFilter.accept(packet);
×
1840
        }
1841

1842
        public StanzaListener getInterceptor() {
1843
            return packetInterceptor;
×
1844
        }
1845
    }
1846

1847
    private static final class GenericInterceptorWrapper<MPB extends MessageOrPresenceBuilder<MP, MPB>, MP extends MessageOrPresence<MPB>> {
1848
        private final Consumer<MPB> stanzaInterceptor;
1849
        private final Predicate<MP> stanzaFilter;
1850

1851
        private GenericInterceptorWrapper(Consumer<MPB> stanzaInterceptor, Predicate<MP> stanzaFilter) {
1✔
1852
            this.stanzaInterceptor = stanzaInterceptor;
1✔
1853
            this.stanzaFilter = stanzaFilter;
1✔
1854
        }
1✔
1855

1856
        private boolean filterMatches(MP stanza) {
1857
            return stanzaFilter == null || stanzaFilter.test(stanza);
1✔
1858
        }
1859

1860
        public Consumer<MPB> getInterceptor() {
1861
            return stanzaInterceptor;
×
1862
        }
1863
    }
1864

1865
    @Override
1866
    public int getConnectionCounter() {
1867
        return connectionCounterValue;
1✔
1868
    }
1869

1870
    @Override
1871
    public void setFromMode(FromMode fromMode) {
1872
        this.fromMode = fromMode;
×
1873
    }
×
1874

1875
    @Override
1876
    public FromMode getFromMode() {
1877
        return this.fromMode;
×
1878
    }
1879

1880
    protected final void parseFeatures(XmlPullParser parser) throws XmlPullParserException, IOException, SmackParsingException {
1881
        streamFeatures.clear();
×
1882
        final int initialDepth = parser.getDepth();
×
1883
        while (true) {
1884
            XmlPullParser.Event eventType = parser.next();
×
1885

1886
            if (eventType == XmlPullParser.Event.START_ELEMENT && parser.getDepth() == initialDepth + 1) {
×
1887
                XmlElement streamFeature = null;
×
1888
                String name = parser.getName();
×
1889
                String namespace = parser.getNamespace();
×
1890
                switch (name) {
×
1891
                case StartTls.ELEMENT:
1892
                    streamFeature = PacketParserUtils.parseStartTlsFeature(parser);
×
1893
                    break;
×
1894
                case Mechanisms.ELEMENT:
1895
                    streamFeature = new Mechanisms(PacketParserUtils.parseMechanisms(parser));
×
1896
                    break;
×
1897
                case Bind.ELEMENT:
1898
                    streamFeature = Bind.Feature.INSTANCE;
×
1899
                    break;
×
1900
                case Session.ELEMENT:
1901
                    streamFeature = PacketParserUtils.parseSessionFeature(parser);
×
1902
                    break;
×
1903
                case Compress.Feature.ELEMENT:
1904
                    streamFeature = PacketParserUtils.parseCompressionFeature(parser);
×
1905
                    break;
×
1906
                default:
1907
                    ExtensionElementProvider<ExtensionElement> provider = ProviderManager.getStreamFeatureProvider(name, namespace);
×
1908
                    if (provider != null) {
×
1909
                        streamFeature = provider.parse(parser, incomingStreamXmlEnvironment);
×
1910
                    }
1911
                    break;
1912
                }
1913
                if (streamFeature != null) {
×
1914
                    addStreamFeature(streamFeature);
×
1915
                }
1916
            }
×
1917
            else if (eventType == XmlPullParser.Event.END_ELEMENT && parser.getDepth() == initialDepth) {
×
1918
                break;
×
1919
            }
1920
        }
×
1921
    }
×
1922

1923
    protected final void parseFeaturesAndNotify(XmlPullParser parser) throws Exception {
1924
        parseFeatures(parser);
×
1925

1926
        if (hasFeature(Mechanisms.ELEMENT, Mechanisms.NAMESPACE)) {
×
1927
            // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it
1928
            if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE)
×
1929
                            || config.getSecurityMode() == SecurityMode.disabled) {
×
1930
                tlsHandled = saslFeatureReceived = true;
×
1931
                notifyWaitingThreads();
×
1932
            }
1933
        }
1934

1935
        // If the server reported the bind feature then we are that that we did SASL and maybe
1936
        // STARTTLS. We can then report that the last 'stream:features' have been parsed
1937
        if (hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) {
×
1938
            if (!hasFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE)
×
1939
                            || !config.isCompressionEnabled()) {
×
1940
                // This where the last stream features from the server, either it did not contain
1941
                // compression or we disabled it.
1942
                lastFeaturesReceived = true;
×
1943
                notifyWaitingThreads();
×
1944
            }
1945
        }
1946
        afterFeaturesReceived();
×
1947
    }
×
1948

1949
    @SuppressWarnings("unused")
1950
    protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException {
1951
        // Default implementation does nothing
1952
    }
×
1953

1954
    @SuppressWarnings("unchecked")
1955
    @Override
1956
    public <F extends XmlElement> F getFeature(QName qname) {
1957
        return (F) streamFeatures.get(qname);
1✔
1958
    }
1959

1960
    @Override
1961
    public boolean hasFeature(QName qname) {
1962
        return streamFeatures.containsKey(qname);
1✔
1963
    }
1964

1965
    protected void addStreamFeature(XmlElement feature) {
1966
        QName key = feature.getQName();
1✔
1967
        streamFeatures.put(key, feature);
1✔
1968
    }
1✔
1969

1970
    @Override
1971
    public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request) {
1972
        return sendIqRequestAsync(request, getReplyTimeout());
1✔
1973
    }
1974

1975
    @Override
1976
    public SmackFuture<IQ, Exception> sendIqRequestAsync(IQ request, long timeout) {
1977
        StanzaFilter replyFilter = new IQReplyFilter(request, this);
1✔
1978
        return sendAsync(request, replyFilter, timeout);
1✔
1979
    }
1980

1981
    @Override
1982
    public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter) {
1983
        return sendAsync(stanza, replyFilter, getReplyTimeout());
×
1984
    }
1985

1986
    @SuppressWarnings("FutureReturnValueIgnored")
1987
    @Override
1988
    public <S extends Stanza> SmackFuture<S, Exception> sendAsync(S stanza, final StanzaFilter replyFilter, long timeout) {
1989
        Objects.requireNonNull(stanza, "stanza must not be null");
1✔
1990
        // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we
1991
        // disallow it here in the async API as it makes no sense
1992
        Objects.requireNonNull(replyFilter, "replyFilter must not be null");
1✔
1993

1994
        final InternalSmackFuture<S, Exception> future = new InternalSmackFuture<>();
1✔
1995

1996
        final StanzaListener stanzaListener = new StanzaListener() {
1✔
1997
            @Override
1998
            public void processStanza(Stanza stanza) throws NotConnectedException, InterruptedException {
1999
                boolean removed = removeAsyncStanzaListener(this);
1✔
2000
                if (!removed) {
1✔
2001
                    // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the
2002
                    // exception callback will be invoked (if any).
2003
                    return;
×
2004
                }
2005
                try {
2006
                    XMPPErrorException.ifHasErrorThenThrow(stanza);
1✔
2007
                    @SuppressWarnings("unchecked")
2008
                    S s = (S) stanza;
1✔
2009
                    future.setResult(s);
1✔
2010
                }
2011
                catch (XMPPErrorException exception) {
×
2012
                    future.setException(exception);
×
2013
                }
1✔
2014
            }
1✔
2015
        };
2016
        schedule(new Runnable() {
1✔
2017
            @Override
2018
            public void run() {
2019
                boolean removed = removeAsyncStanzaListener(stanzaListener);
×
2020
                if (!removed) {
×
2021
                    // We lost a race against the stanza listener, he already removed itself because he received a
2022
                    // reply. There is nothing more to do here.
2023
                    return;
×
2024
                }
2025

2026
                // If the packetListener got removed, then it was never run and
2027
                // we never received a response, inform the exception callback
2028
                Exception exception;
2029
                if (!isConnected()) {
×
2030
                    // If the connection is no longer connected, throw a not connected exception.
2031
                    exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter);
×
2032
                }
2033
                else {
2034
                    exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter);
×
2035
                }
2036
                future.setException(exception);
×
2037
            }
×
2038
        }, timeout, TimeUnit.MILLISECONDS);
2039

2040
        addAsyncStanzaListener(stanzaListener, replyFilter);
1✔
2041
        try {
2042
            sendStanzaNonBlocking(stanza);
1✔
2043
        }
2044
        catch (NotConnectedException | OutgoingQueueFullException exception) {
×
2045
            future.setException(exception);
×
2046
        }
1✔
2047

2048
        return future;
1✔
2049
    }
2050

2051
    @SuppressWarnings("FutureReturnValueIgnored")
2052
    @Override
2053
    public void addOneTimeSyncCallback(final StanzaListener callback, final StanzaFilter packetFilter) {
2054
        final StanzaListener packetListener = new StanzaListener() {
×
2055
            @Override
2056
            public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException, NotLoggedInException {
2057
                try {
2058
                    callback.processStanza(packet);
×
2059
                } finally {
2060
                    removeSyncStanzaListener(this);
×
2061
                }
2062
            }
×
2063
        };
2064
        addSyncStanzaListener(packetListener, packetFilter);
×
2065
        schedule(new Runnable() {
×
2066
            @Override
2067
            public void run() {
2068
                removeSyncStanzaListener(packetListener);
×
2069
            }
×
2070
        }, getReplyTimeout(), TimeUnit.MILLISECONDS);
×
2071
    }
×
2072

2073
    @Override
2074
    public IQRequestHandler registerIQRequestHandler(final IQRequestHandler iqRequestHandler) {
2075
        final QName key = iqRequestHandler.getQName();
1✔
2076
        IQRequestHandler previous;
2077
        switch (iqRequestHandler.getType()) {
1✔
2078
        case set:
2079
            synchronized (setIqRequestHandler) {
1✔
2080
                previous = setIqRequestHandler.put(key, iqRequestHandler);
1✔
2081
            }
1✔
2082
            break;
1✔
2083
        case get:
2084
            synchronized (getIqRequestHandler) {
1✔
2085
                previous = getIqRequestHandler.put(key, iqRequestHandler);
1✔
2086
            }
1✔
2087
            break;
1✔
2088
        default:
2089
            throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed");
×
2090
        }
2091

2092
        final String iqNamespace = key.getNamespaceURI();
1✔
2093
        synchronized (iqRequestHandlerNamespacesReferenceCounters) {
1✔
2094
            Integer newValue;
2095
            Integer counter = iqRequestHandlerNamespacesReferenceCounters.get(iqNamespace);
1✔
2096
            if (counter == null) {
1✔
2097
                iqRequestHandlerNamespaces.add(iqNamespace);
1✔
2098
                newValue = 0;
1✔
2099
            } else {
2100
                newValue = counter.intValue() + 1;
1✔
2101
            }
2102
            iqRequestHandlerNamespacesReferenceCounters.put(iqNamespace, newValue);
1✔
2103
        }
1✔
2104
        return previous;
1✔
2105
    }
2106

2107
    @Override
2108
    public final IQRequestHandler unregisterIQRequestHandler(IQRequestHandler iqRequestHandler) {
2109
        return unregisterIQRequestHandler(iqRequestHandler.getElement(), iqRequestHandler.getNamespace(),
×
2110
                        iqRequestHandler.getType());
×
2111
    }
2112

2113
    @Override
2114
    public IQRequestHandler unregisterIQRequestHandler(String element, String namespace, IQ.Type type) {
2115
        IQRequestHandler unregisteredHandler;
2116
        final QName key = new QName(namespace, element);
×
2117
        switch (type) {
×
2118
        case set:
2119
            synchronized (setIqRequestHandler) {
×
2120
                unregisteredHandler = setIqRequestHandler.remove(key);
×
2121
            }
×
2122
            break;
×
2123
        case get:
2124
            synchronized (getIqRequestHandler) {
×
2125
                unregisteredHandler = getIqRequestHandler.remove(key);
×
2126
            }
×
2127
            break;
×
2128
        default:
2129
            throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed");
×
2130
        }
2131

2132
        if (unregisteredHandler == null) {
×
2133
            return null;
×
2134
        }
2135

2136
        synchronized (iqRequestHandlerNamespacesReferenceCounters) {
×
2137
            int newValue = iqRequestHandlerNamespacesReferenceCounters.get(namespace).intValue() - 1;
×
2138
            if (newValue == 0) {
×
2139
                iqRequestHandlerNamespacesReferenceCounters.remove(namespace);
×
2140
                iqRequestHandlerNamespaces.remove(namespace);
×
2141
            } else {
2142
                iqRequestHandlerNamespacesReferenceCounters.put(namespace, newValue);
×
2143
            }
2144
        }
×
2145

2146
        return unregisteredHandler;
×
2147
    }
2148

2149
    private long lastStanzaReceived;
2150

2151
    @Override
2152
    public long getLastStanzaReceived() {
2153
        return lastStanzaReceived;
×
2154
    }
2155

2156
    /**
2157
     * Get the timestamp when the connection was the first time authenticated, i.e., when the first successful login was
2158
     * performed. Note that this value is not reset on disconnect, so it represents the timestamp from the last
2159
     * authenticated connection. The value is also not reset on stream resumption.
2160
     *
2161
     * @return the timestamp or {@code null}.
2162
     * @since 4.3.3
2163
     */
2164
    public final long getAuthenticatedConnectionInitiallyEstablishedTimestamp() {
2165
        return authenticatedConnectionInitiallyEstablishedTimestamp;
×
2166
    }
2167

2168
    /**
2169
     * Install a parsing exception callback, which will be invoked once an exception is encountered while parsing a
2170
     * stanza.
2171
     *
2172
     * @param callback the callback to install
2173
     */
2174
    public void setParsingExceptionCallback(ParsingExceptionCallback callback) {
2175
        parsingExceptionCallback = callback;
×
2176
    }
×
2177

2178
    /**
2179
     * Get the current active parsing exception callback.
2180
     *
2181
     * @return the active exception callback or null if there is none
2182
     */
2183
    public ParsingExceptionCallback getParsingExceptionCallback() {
2184
        return parsingExceptionCallback;
×
2185
    }
2186

2187
    @Override
2188
    public final String toString() {
2189
        EntityFullJid localEndpoint = getUser();
1✔
2190
        String localEndpointString = localEndpoint == null ?  "not-authenticated" : localEndpoint.toString();
1✔
2191
        return getClass().getSimpleName() + '[' + localEndpointString + "] (" + getConnectionCounter() + ')';
1✔
2192
    }
2193

2194
    /**
2195
     * A queue of deferred runnables that where not executed immediately because {@link #currentAsyncRunnables} reached
2196
     * {@link #maxAsyncRunnables}. Note that we use a {@code LinkedList} in order to avoid space blowups in case the
2197
     * list ever becomes very big and shrinks again.
2198
     */
2199
    private final Queue<Runnable> deferredAsyncRunnables = new LinkedList<>();
1✔
2200

2201
    private int deferredAsyncRunnablesCount;
2202

2203
    private int deferredAsyncRunnablesCountPrevious;
2204

2205
    private int maxAsyncRunnables = SmackConfiguration.getDefaultConcurrencyLevelLimit();
1✔
2206

2207
    private int currentAsyncRunnables;
2208

2209
    protected void asyncGoLimited(final Runnable runnable) {
2210
        Runnable wrappedRunnable = new Runnable() {
1✔
2211
            @Override
2212
            public void run() {
2213
                runnable.run();
1✔
2214

2215
                synchronized (deferredAsyncRunnables) {
1✔
2216
                    Runnable defferredRunnable = deferredAsyncRunnables.poll();
1✔
2217
                    if (defferredRunnable == null) {
1✔
2218
                        currentAsyncRunnables--;
1✔
2219
                    } else {
2220
                        deferredAsyncRunnablesCount--;
×
2221
                        asyncGo(defferredRunnable);
×
2222
                    }
2223
                }
1✔
2224
            }
1✔
2225
        };
2226

2227
        synchronized (deferredAsyncRunnables) {
1✔
2228
            if (currentAsyncRunnables < maxAsyncRunnables) {
1✔
2229
                currentAsyncRunnables++;
1✔
2230
                asyncGo(wrappedRunnable);
1✔
2231
            } else {
2232
                deferredAsyncRunnablesCount++;
×
2233
                deferredAsyncRunnables.add(wrappedRunnable);
×
2234
            }
2235

2236
            final int HIGH_WATERMARK = 100;
1✔
2237
            final int INFORM_WATERMARK = 20;
1✔
2238

2239
            final int deferredAsyncRunnablesCount = this.deferredAsyncRunnablesCount;
1✔
2240

2241
            if (deferredAsyncRunnablesCount >= HIGH_WATERMARK
1✔
2242
                    && deferredAsyncRunnablesCountPrevious < HIGH_WATERMARK) {
2243
                LOGGER.log(Level.WARNING, "High watermark of " + HIGH_WATERMARK + " simultaneous executing runnables reached");
×
2244
            } else if (deferredAsyncRunnablesCount >= INFORM_WATERMARK
1✔
2245
                    && deferredAsyncRunnablesCountPrevious < INFORM_WATERMARK) {
2246
                LOGGER.log(Level.INFO, INFORM_WATERMARK + " simultaneous executing runnables reached");
×
2247
            }
2248

2249
            deferredAsyncRunnablesCountPrevious = deferredAsyncRunnablesCount;
1✔
2250
        }
1✔
2251
    }
1✔
2252

2253
    public void setMaxAsyncOperations(int maxAsyncOperations) {
2254
        if (maxAsyncOperations < 1) {
×
2255
            throw new IllegalArgumentException("Max async operations must be greater than 0");
×
2256
        }
2257

2258
        synchronized (deferredAsyncRunnables) {
×
2259
            maxAsyncRunnables = maxAsyncOperations;
×
2260
        }
×
2261
    }
×
2262

2263
    protected static void asyncGo(Runnable runnable) {
2264
        CACHED_EXECUTOR_SERVICE.execute(runnable);
1✔
2265
    }
1✔
2266

2267
    @SuppressWarnings("static-method")
2268
    protected final SmackReactor getReactor() {
2269
        return SMACK_REACTOR;
1✔
2270
    }
2271

2272
    protected static ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit) {
2273
        return SMACK_REACTOR.schedule(runnable, delay, unit, ScheduledAction.Kind.NonBlocking);
1✔
2274
    }
2275

2276
    /**
2277
     * Must be called when a XMPP stream open tag is encountered. Sets values like the stream ID and the incoming stream
2278
     * XML environment.
2279
     * <p>
2280
     * This method also returns a matching stream close tag. For example if the stream open is {@code <stream …>}, then
2281
     * {@code </stream>} is returned. But if it is {@code <stream:stream>}, then {@code </stream:stream>} is returned.
2282
     * Or if it is {@code <foo:stream>}, then {@code </foo:stream>} is returned.
2283
     * </p>
2284
     *
2285
     * @param parser an XML parser that is positioned at the start of the stream open.
2286
     * @return a String representing the corresponding stream end tag.
2287
     */
2288
    protected String onStreamOpen(XmlPullParser parser) {
2289
        assert StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(parser.getNamespace()) : parser.getNamespace()
×
2290
                        + " is not " + StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE;
2291
        assert StreamOpen.UNPREFIXED_ELEMENT.equals(parser.getName());
×
2292

2293
        streamId = parser.getAttributeValue("id");
×
2294
        incomingStreamXmlEnvironment = XmlEnvironment.from(parser);
×
2295

2296
        String reportedServerDomainString = parser.getAttributeValue("from");
×
2297
        // RFC 6120 § 4.7.1. makes no explicit statement whether or not 'from' in the stream open from the server
2298
        // in c2s connections is required or not.
2299
        if (reportedServerDomainString != null) {
×
2300
            DomainBareJid reportedServerDomain;
2301
            try {
2302
                reportedServerDomain = JidCreate.domainBareFrom(reportedServerDomainString);
×
2303
                DomainBareJid configuredXmppServiceDomain = config.getXMPPServiceDomain();
×
2304
                if (!configuredXmppServiceDomain.equals(reportedServerDomain)) {
×
2305
                    LOGGER.warning("Domain reported by server '" + reportedServerDomain
×
2306
                            + "' does not match configured domain '" + configuredXmppServiceDomain + "'");
2307
                }
2308
            } catch (XmppStringprepException e) {
×
2309
                LOGGER.log(Level.WARNING, "XMPP service domain '" + reportedServerDomainString
×
2310
                        + "' as reported by server could not be transformed to a valid JID", e);
2311
            }
×
2312
        }
2313

2314
        String prefix = parser.getPrefix();
×
2315
        if (StringUtils.isNotEmpty(prefix)) {
×
2316
            return "</" + prefix + ":stream>";
×
2317
        }
2318
        return "</stream>";
×
2319
    }
2320

2321
    protected final void sendStreamOpen() throws NotConnectedException, InterruptedException {
2322
        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
2323
        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
2324
        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
2325
        // response from the server (see e.g. RFC 6120 § 9.1.1 Step 2.)
2326
        DomainBareJid to = getXMPPServiceDomain();
×
2327
        CharSequence from = null;
×
2328
        CharSequence localpart = config.getUsername();
×
2329
        if (localpart != null) {
×
2330
            from = XmppStringUtils.completeJidFrom(localpart, to);
×
2331
        }
2332
        String id = getStreamId();
×
2333
        String lang = config.getXmlLang();
×
2334

2335
        AbstractStreamOpen streamOpen = getStreamOpen(to, from, id, lang);
×
2336
        sendNonza(streamOpen);
×
2337
        updateOutgoingStreamXmlEnvironmentOnStreamOpen(streamOpen);
×
2338
    }
×
2339

2340
    protected AbstractStreamOpen getStreamOpen(DomainBareJid to, CharSequence from, String id, String lang) {
2341
        return new StreamOpen(to, from, id, lang);
×
2342
    }
2343

2344
    protected void updateOutgoingStreamXmlEnvironmentOnStreamOpen(AbstractStreamOpen streamOpen) {
2345
        XmlEnvironment.Builder xmlEnvironmentBuilder = XmlEnvironment.builder();
×
2346
        xmlEnvironmentBuilder.with(streamOpen);
×
2347
        outgoingStreamXmlEnvironment = xmlEnvironmentBuilder.build();
×
2348
    }
×
2349

2350
    protected final SmackTlsContext getSmackTlsContext() {
2351
        return config.smackTlsContext;
×
2352
    }
2353
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc