• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

igniterealtime / Smack / #2868

pending completion
#2868

push

github-actions

web-flow
Merge pull request #537 from MF1-MS/mf1-ms/use_xmpp_connection_as_local_socks5_address

Use XMPP connection as local socks5 address

10 of 10 new or added lines in 3 files covered. (100.0%)

16378 of 41845 relevant lines covered (39.14%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.58
/smack-tcp/src/main/java/org/jivesoftware/smack/tcp/XMPPTCPConnection.java
1
/**
2
 *
3
 * Copyright 2003-2007 Jive Software.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.jivesoftware.smack.tcp;
18

19
import java.io.BufferedReader;
20
import java.io.IOException;
21
import java.io.InputStream;
22
import java.io.InputStreamReader;
23
import java.io.OutputStream;
24
import java.io.OutputStreamWriter;
25
import java.io.Writer;
26
import java.net.InetAddress;
27
import java.net.InetSocketAddress;
28
import java.net.Socket;
29
import java.security.cert.CertificateException;
30
import java.util.ArrayList;
31
import java.util.Collection;
32
import java.util.Iterator;
33
import java.util.LinkedHashSet;
34
import java.util.LinkedList;
35
import java.util.List;
36
import java.util.Map;
37
import java.util.Set;
38
import java.util.concurrent.ArrayBlockingQueue;
39
import java.util.concurrent.BlockingQueue;
40
import java.util.concurrent.ConcurrentHashMap;
41
import java.util.concurrent.ConcurrentLinkedQueue;
42
import java.util.concurrent.TimeUnit;
43
import java.util.concurrent.atomic.AtomicBoolean;
44
import java.util.logging.Level;
45
import java.util.logging.Logger;
46

47
import javax.net.SocketFactory;
48
import javax.net.ssl.HostnameVerifier;
49
import javax.net.ssl.SSLSession;
50
import javax.net.ssl.SSLSocket;
51
import javax.net.ssl.SSLSocketFactory;
52

53
import org.jivesoftware.smack.AbstractXMPPConnection;
54
import org.jivesoftware.smack.ConnectionConfiguration;
55
import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
56
import org.jivesoftware.smack.ConnectionListener;
57
import org.jivesoftware.smack.SmackConfiguration;
58
import org.jivesoftware.smack.SmackException;
59
import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
60
import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
61
import org.jivesoftware.smack.SmackException.ConnectionException;
62
import org.jivesoftware.smack.SmackException.EndpointConnectionException;
63
import org.jivesoftware.smack.SmackException.NotConnectedException;
64
import org.jivesoftware.smack.SmackException.NotLoggedInException;
65
import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
66
import org.jivesoftware.smack.SmackException.SecurityNotPossibleException;
67
import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
68
import org.jivesoftware.smack.SmackFuture;
69
import org.jivesoftware.smack.StanzaListener;
70
import org.jivesoftware.smack.XMPPConnection;
71
import org.jivesoftware.smack.XMPPException;
72
import org.jivesoftware.smack.XMPPException.FailedNonzaException;
73
import org.jivesoftware.smack.XMPPException.StreamErrorException;
74
import org.jivesoftware.smack.compress.packet.Compress;
75
import org.jivesoftware.smack.compress.packet.Compressed;
76
import org.jivesoftware.smack.compression.XMPPInputOutputStream;
77
import org.jivesoftware.smack.datatypes.UInt16;
78
import org.jivesoftware.smack.filter.StanzaFilter;
79
import org.jivesoftware.smack.internal.SmackTlsContext;
80
import org.jivesoftware.smack.packet.Element;
81
import org.jivesoftware.smack.packet.IQ;
82
import org.jivesoftware.smack.packet.Message;
83
import org.jivesoftware.smack.packet.Presence;
84
import org.jivesoftware.smack.packet.Stanza;
85
import org.jivesoftware.smack.packet.StartTls;
86
import org.jivesoftware.smack.packet.StreamError;
87
import org.jivesoftware.smack.packet.StreamOpen;
88
import org.jivesoftware.smack.packet.TopLevelStreamElement;
89
import org.jivesoftware.smack.proxy.ProxyInfo;
90
import org.jivesoftware.smack.sasl.packet.SaslNonza;
91
import org.jivesoftware.smack.sm.SMUtils;
92
import org.jivesoftware.smack.sm.StreamManagementException;
93
import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
94
import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
95
import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
96
import org.jivesoftware.smack.sm.packet.StreamManagement;
97
import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
98
import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
99
import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
100
import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
101
import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
102
import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
103
import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
104
import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
105
import org.jivesoftware.smack.sm.predicates.Predicate;
106
import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
107
import org.jivesoftware.smack.tcp.rce.RemoteXmppTcpConnectionEndpoints;
108
import org.jivesoftware.smack.tcp.rce.Rfc6120TcpRemoteConnectionEndpoint;
109
import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
110
import org.jivesoftware.smack.util.Async;
111
import org.jivesoftware.smack.util.CloseableUtil;
112
import org.jivesoftware.smack.util.PacketParserUtils;
113
import org.jivesoftware.smack.util.StringUtils;
114
import org.jivesoftware.smack.util.TLSUtils;
115
import org.jivesoftware.smack.util.XmlStringBuilder;
116
import org.jivesoftware.smack.util.rce.RemoteConnectionException;
117
import org.jivesoftware.smack.xml.SmackXmlParser;
118
import org.jivesoftware.smack.xml.XmlPullParser;
119
import org.jivesoftware.smack.xml.XmlPullParserException;
120

121
import org.jxmpp.jid.impl.JidCreate;
122
import org.jxmpp.jid.parts.Resourcepart;
123
import org.jxmpp.stringprep.XmppStringprepException;
124
import org.minidns.dnsname.DnsName;
125

126
/**
127
 * Creates a socket connection to an XMPP server. This is the default connection
128
 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
129
 *
130
 * @see XMPPConnection
131
 * @author Matt Tucker
132
 */
133
public class XMPPTCPConnection extends AbstractXMPPConnection {
1✔
134

135
    private static final int QUEUE_SIZE = 500;
136
    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
1✔
137

138
    /**
139
     * The socket which is used for this connection.
140
     */
141
    private Socket socket;
142

143
    /**
144
     *
145
     */
146
    private boolean disconnectedButResumeable = false;
1✔
147

148
    private SSLSocket secureSocket;
149

150
    /**
151
     * Protected access level because of unit test purposes
152
     */
153
    protected final PacketWriter packetWriter = new PacketWriter();
1✔
154

155
    /**
156
     * Protected access level because of unit test purposes
157
     */
158
    protected final PacketReader packetReader = new PacketReader();
1✔
159

160
    /**
161
     *
162
     */
163
    private boolean streamFeaturesAfterAuthenticationReceived;
164

165
    /**
166
     *
167
     */
168
    private boolean compressSyncPoint;
169

170
    /**
171
     * The default bundle and defer callback, used for new connections.
172
     * @see bundleAndDeferCallback
173
     */
174
    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
175

176
    /**
177
     * The used bundle and defer callback.
178
     * <p>
179
     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
180
     * having a 'volatile' read within the writer threads loop.
181
     * </p>
182
     */
183
    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
1✔
184

185
    private static boolean useSmDefault = true;
1✔
186

187
    private static boolean useSmResumptionDefault = true;
1✔
188

189
    /**
190
     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
191
     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
192
     * {@link #unacknowledgedStanzas}.
193
     */
194
    private String smSessionId;
195

196
    /**
197
     * Represents the state of stream management resumption.
198
     * <p>
199
     * Unlike other sync points, this sync point is marked volatile because it is also read by the reader thread.
200
     * </p>
201
     */
202
    private volatile SyncPointState smResumedSyncPoint;
203
    private Failed smResumptionFailed;
204

205
    /**
206
     * Represents the state of stream magement.
207
     * <p>
208
     * This boolean is marked volatile as it is read by various threads, including the reader thread via {@link #isSmEnabled()}.
209
     * </p>
210
     */
211
    private volatile boolean smEnabledSyncPoint;
212

213
    /**
214
     * The client's preferred maximum resumption time in seconds.
215
     */
216
    private int smClientMaxResumptionTime = -1;
1✔
217

218
    /**
219
     * The server's preferred maximum resumption time in seconds.
220
     */
221
    private int smServerMaxResumptionTime = -1;
1✔
222

223
    /**
224
     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
225
     */
226
    private boolean useSm = useSmDefault;
1✔
227
    private boolean useSmResumption = useSmResumptionDefault;
1✔
228

229
    /**
230
     * The counter that the server sends the client about it's current height. For example, if the server sends
231
     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
232
     */
233
    private long serverHandledStanzasCount = 0;
1✔
234

235
    /**
236
     * The counter for stanzas handled ("received") by the client.
237
     * <p>
238
     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
239
     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
240
     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
241
     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
242
     * </p>
243
     */
244
    private long clientHandledStanzasCount = 0;
1✔
245

246
    private BlockingQueue<Stanza> unacknowledgedStanzas;
247

248
    /**
249
     * Set to true if Stream Management was at least once enabled for this connection.
250
     */
251
    private boolean smWasEnabledAtLeastOnce = false;
1✔
252

253
    /**
254
     * This listeners are invoked for every stanza that got acknowledged.
255
     * <p>
256
     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
257
     * themselves after they have been invoked.
258
     * </p>
259
     */
260
    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
1✔
261

262
    /**
263
     * These listeners are invoked for every stanza that got dropped.
264
     * <p>
265
     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
266
     * themselves after they have been invoked.
267
     * </p>
268
     */
269
    private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
1✔
270

271
    /**
272
     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
273
     * only be invoked once and automatically removed after that.
274
     */
275
    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
1✔
276

277
    /**
278
     * Predicates that determine if an stream management ack should be requested from the server.
279
     * <p>
280
     * We use a linked hash set here, so that the order how the predicates are added matches the
281
     * order in which they are invoked in order to determine if an ack request should be send or not.
282
     * </p>
283
     */
284
    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
1✔
285

286
    @SuppressWarnings("HidingField")
287
    private final XMPPTCPConnectionConfiguration config;
288

289
    /**
290
     * Creates a new XMPP connection over TCP (optionally using proxies).
291
     * <p>
292
     * Note that XMPPTCPConnection constructors do not establish a connection to the server
293
     * and you must call {@link #connect()}.
294
     * </p>
295
     *
296
     * @param config the connection configuration.
297
     */
298
    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
299
        super(config);
1✔
300
        this.config = config;
1✔
301
        addConnectionListener(new ConnectionListener() {
1✔
302
            @Override
303
            public void connectionClosedOnError(Exception e) {
304
                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
×
305
                    dropSmState();
×
306
                }
307
            }
×
308
        });
309

310
        // Re-init the reader and writer in case of SASL <success/>. This is done to reset the parser since a new stream
311
        // is initiated.
312
        buildNonzaCallback().listenFor(SaslNonza.Success.class, s -> resetParser()).install();
1✔
313
    }
1✔
314

315
    /**
316
     * Creates a new XMPP connection over TCP.
317
     * <p>
318
     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
319
     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
320
     * constructor.
321
     * </p>
322
     *
323
     * @param jid the bare JID used by the client.
324
     * @param password the password or authentication token.
325
     * @throws XmppStringprepException if the provided string is invalid.
326
     */
327
    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
328
        this(XMPPTCPConnectionConfiguration.builder().setXmppAddressAndPassword(jid, password).build());
×
329
    }
×
330

331
    /**
332
     * Creates a new XMPP connection over TCP.
333
     * <p>
334
     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
335
     * you can get fine-grained control over connection settings using the
336
     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
337
     * </p>
338
     * @param username TODO javadoc me please
339
     * @param password TODO javadoc me please
340
     * @param serviceName TODO javadoc me please
341
     * @throws XmppStringprepException if the provided string is invalid.
342
     */
343
    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
344
        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
1✔
345
                                        JidCreate.domainBareFrom(serviceName)).build());
1✔
346
    }
1✔
347

348
    @Override
349
    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
350
        if (packetWriter == null) {
×
351
            throw new NotConnectedException();
×
352
        }
353
        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
×
354
    }
×
355

356
    @Override
357
    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
358
        if (isConnected() && !disconnectedButResumeable) {
×
359
            throw new AlreadyConnectedException();
×
360
        }
361
    }
×
362

363
    @Override
364
    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
365
        if (isAuthenticated() && !disconnectedButResumeable) {
×
366
            throw new AlreadyLoggedInException();
×
367
        }
368
    }
×
369

370
    @Override
371
    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
372
        // Reset the flag in case it was set
373
        disconnectedButResumeable = false;
×
374
        super.afterSuccessfulLogin(resumed);
×
375
    }
×
376

377
    @Override
378
    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
379
                    SmackException, IOException, InterruptedException {
380
        // Authenticate using SASL
381
        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
×
382

383
        streamFeaturesAfterAuthenticationReceived = false;
×
384
        authenticate(username, password, config.getAuthzid(), sslSession);
×
385

386
        // Wait for stream features after the authentication.
387
        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
388
        // renamed to "streamFeaturesAfterAuthenticationReceived".
389
        waitForConditionOrThrowConnectionException(() -> streamFeaturesAfterAuthenticationReceived, "compress features from server");
×
390

391
        // If compression is enabled then request the server to use stream compression. XEP-170
392
        // recommends to perform stream compression before resource binding.
393
        maybeEnableCompression();
×
394

395
        smResumedSyncPoint = SyncPointState.initial;
×
396
        smResumptionFailed = null;
×
397
        if (isSmResumptionPossible()) {
×
398
            smResumedSyncPoint = SyncPointState.request_sent;
×
399
            sendNonza(new Resume(clientHandledStanzasCount, smSessionId));
×
400
            waitForConditionOrThrowConnectionException(() -> smResumedSyncPoint == SyncPointState.successful || smResumptionFailed != null, "resume previous stream");
×
401
            if (smResumedSyncPoint == SyncPointState.successful) {
×
402
                // We successfully resumed the stream, be done here
403
                afterSuccessfulLogin(true);
×
404
                return;
×
405
            }
406
            // SM resumption failed, what Smack does here is to report success of
407
            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
408
            // normal resource binding can be tried.
409
            assert smResumptionFailed != null;
×
410
            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process: " + smResumptionFailed);
×
411
        }
412

413
        // We either failed to resume a previous stream management (SM) session, or we did not even try. In any case,
414
        // mark SM as not enabled. Most importantly, we do this prior calling bindResourceAndEstablishSession(), as the
415
        // bind IQ may trigger a SM ack request, which would be invalid in the pre resource bound state.
416
        smEnabledSyncPoint = false;
×
417

418
        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
×
419
        if (unacknowledgedStanzas != null) {
×
420
            // There was a previous connection with SM enabled but that was either not resumable or
421
            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
422
            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
×
423
            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
424
            // XMPP session (There maybe was an enabled in a previous XMPP session of this
425
            // connection instance though). This is used in writePackets to decide if stanzas should
426
            // be added to the unacknowledged stanzas queue, because they have to be added right
427
            // after the 'enable' stream element has been sent.
428
            dropSmState();
×
429
        }
430

431
        // Now bind the resource. It is important to do this *after* we dropped an eventually
432
        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
433
        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
434
        bindResourceAndEstablishSession(resource);
×
435

436
        if (isSmAvailable() && useSm) {
×
437
            // Remove what is maybe left from previously stream managed sessions
438
            serverHandledStanzasCount = 0;
×
439
            sendNonza(new Enable(useSmResumption, smClientMaxResumptionTime));
×
440
            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
441
            // then this is a non recoverable error and we therefore throw an exception.
442
            waitForConditionOrThrowConnectionException(() -> smEnabledSyncPoint, "enabling stream mangement");
×
443
            synchronized (requestAckPredicates) {
×
444
                if (requestAckPredicates.isEmpty()) {
×
445
                    // Assure that we have at lest one predicate set up that so that we request acks
446
                    // for the server and eventually flush some stanzas from the unacknowledged
447
                    // stanza queue
448
                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
×
449
                }
450
            }
×
451
        }
452
        // Inform client about failed resumption if possible, resend stanzas otherwise
453
        // Process the stanzas synchronously so a client can re-queue them for transmission
454
        // before it is informed about connection success
455
        if (!stanzaDroppedListeners.isEmpty()) {
×
456
            for (Stanza stanza : previouslyUnackedStanzas) {
×
457
                for (StanzaListener listener : stanzaDroppedListeners) {
×
458
                    try {
459
                        listener.processStanza(stanza);
×
460
                    }
461
                    catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
×
462
                        LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
×
463
                    }
×
464
                }
×
465
            }
×
466
        } else {
467
            for (Stanza stanza : previouslyUnackedStanzas) {
×
468
                sendInternal(stanza);
×
469
            }
×
470
        }
471

472
        afterSuccessfulLogin(false);
×
473
    }
×
474

475
    @Override
476
    public boolean isSecureConnection() {
477
        return secureSocket != null;
×
478
    }
479

480
    /**
481
     * Shuts the current connection down. After this method returns, the connection must be ready
482
     * for re-use by connect.
483
     */
484
    @Override
485
    protected void shutdown() {
486
        if (isSmEnabled()) {
×
487
            try {
488
                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
489
                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
490
                sendSmAcknowledgementInternal();
×
491
            } catch (InterruptedException | NotConnectedException e) {
×
492
                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
×
493
            }
×
494
        }
495
        shutdown(false);
×
496
    }
×
497

498
    @Override
499
    public synchronized void instantShutdown() {
500
        shutdown(true);
×
501
    }
×
502

503
    private void shutdown(boolean instant) {
504
        // The writer thread may already been finished at this point, for example when the connection is in the
505
        // disconnected-but-resumable state. There is no need to wait for the closing stream tag from the server in this
506
        // case.
507
        if (!packetWriter.done()) {
×
508
            // First shutdown the writer, this will result in a closing stream element getting send to
509
            // the server
510
            LOGGER.finer(packetWriter.threadName + " shutdown()");
×
511
            packetWriter.shutdown(instant);
×
512
            LOGGER.finer(packetWriter.threadName + " shutdown() returned");
×
513

514
            if (!instant) {
×
515
                waitForClosingStreamTagFromServer();
×
516
            }
517
        }
518

519
        LOGGER.finer(packetReader.threadName + " shutdown()");
×
520
        packetReader.shutdown();
×
521
        LOGGER.finer(packetReader.threadName + " shutdown() returned");
×
522

523
        CloseableUtil.maybeClose(socket, LOGGER);
×
524

525
        setWasAuthenticated();
×
526

527
        try {
528
            boolean readerAndWriterThreadsTermianted = waitFor(() -> !packetWriter.running && !packetReader.running);
×
529
            if (!readerAndWriterThreadsTermianted) {
×
530
                LOGGER.severe("Reader and/or writer threads did not terminate timely. Writer running: "
×
531
                                + packetWriter.running + ", Reader running: " + packetReader.running);
×
532
            } else {
533
                LOGGER.fine("Reader and writer threads terminated");
×
534
            }
535
        } catch (InterruptedException e) {
×
536
            LOGGER.log(Level.FINE, "Interrupted while waiting for reader and writer threads to terminate", e);
×
537
        }
×
538

539
        if (disconnectedButResumeable) {
×
540
            return;
×
541
        }
542

543
        // If we are able to resume the stream, then don't set
544
        // connected/authenticated/usingTLS to false since we like to behave like we are still
545
        // connected (e.g. sendStanza should not throw a NotConnectedException).
546
        if (instant) {
×
547
            disconnectedButResumeable = isSmResumptionPossible();
×
548
            if (!disconnectedButResumeable) {
×
549
                // Reset the stream management session id to null, since the stream is no longer resumable. Note that we
550
                // keep the unacknowledgedStanzas queue, because we want to resend them when we are reconnected.
551
                smSessionId = null;
×
552
            }
553
        } else {
554
            disconnectedButResumeable = false;
×
555

556
            // Drop the stream management state if this is not an instant shutdown. We send
557
            // a </stream> close tag and now the stream management state is no longer valid.
558
            // This also prevents that we will potentially (re-)send any unavailable presence we
559
            // may have send, because it got put into the unacknowledged queue and was not acknowledged before the
560
            // connection terminated.
561
            dropSmState();
×
562
            // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
563
            // information is available in the connectionClosedOnError() listeners.
564
        }
565
        authenticated = false;
×
566
        connected = false;
×
567
        secureSocket = null;
×
568
        reader = null;
×
569
        writer = null;
×
570

571
        initState();
×
572
    }
×
573

574
    private interface SmAckAction<E extends Exception> {
575
        void run() throws NotConnectedException, E;
576
    }
577

578
    private <E extends Exception> void requestSmAckIfNecessary(TopLevelStreamElement element,
579
                    SmAckAction<E> smAckAction) throws NotConnectedException, E {
580
        if (!isSmEnabled())
×
581
            return;
×
582

583
        if (element instanceof Stanza) {
×
584
            Stanza stanza = (Stanza) element;
×
585
            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
×
586
                if (requestAckPredicate.accept(stanza)) {
×
587
                    smAckAction.run();
×
588
                    break;
×
589
                }
590
            }
×
591
        }
592
    }
×
593

594
    @Override
595
    protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
596
        packetWriter.sendStreamElement(element);
×
597
        requestSmAckIfNecessary(element, () -> requestSmAcknowledgementInternal());
×
598
    }
×
599

600
    @Override
601
    protected void sendNonBlockingInternal(TopLevelStreamElement element) throws NotConnectedException, OutgoingQueueFullException {
602
        packetWriter.sendNonBlocking(element);
×
603
        requestSmAckIfNecessary(element, () -> requestSmAcknowledgementNonBlockingInternal());
×
604
    }
×
605

606
    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
607
        RemoteXmppTcpConnectionEndpoints.Result<Rfc6120TcpRemoteConnectionEndpoint> result = RemoteXmppTcpConnectionEndpoints.lookup(config);
×
608

609
        List<RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint>> connectionExceptions = new ArrayList<>();
×
610

611
        SocketFactory socketFactory = config.getSocketFactory();
×
612
        ProxyInfo proxyInfo = config.getProxyInfo();
×
613
        int timeout = config.getConnectTimeout();
×
614
        if (socketFactory == null) {
×
615
            socketFactory = SocketFactory.getDefault();
×
616
        }
617
        for (Rfc6120TcpRemoteConnectionEndpoint endpoint : result.discoveredRemoteConnectionEndpoints) {
×
618
            Iterator<? extends InetAddress> inetAddresses;
619
            String host = endpoint.getHost().toString();
×
620
            UInt16 portUint16 = endpoint.getPort();
×
621
            int port = portUint16.intValue();
×
622
            if (proxyInfo == null) {
×
623
                inetAddresses = endpoint.getInetAddresses().iterator();
×
624
                assert inetAddresses.hasNext();
×
625

626
                innerloop: while (inetAddresses.hasNext()) {
×
627
                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
628
                    // re-usable after a failed connection attempt. See also SMACK-724.
629
                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
×
630

631
                    final InetAddress inetAddress = inetAddresses.next();
×
632
                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
×
633
                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
×
634
                    socketFuture.connectAsync(inetSocketAddress, timeout);
×
635

636
                    try {
637
                        socket = socketFuture.getOrThrow();
×
638
                    } catch (IOException e) {
×
639
                        RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(
×
640
                                        endpoint, inetAddress, e);
641
                        connectionExceptions.add(rce);
×
642
                        if (inetAddresses.hasNext()) {
×
643
                            continue innerloop;
×
644
                        } else {
645
                            break innerloop;
×
646
                        }
647
                    }
×
648
                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
×
649
                    // We found a host to connect to, return here
650
                    this.host = host;
×
651
                    this.port = portUint16;
×
652
                    return;
×
653
                }
654
            } else {
655
                // TODO: Move this into the inner-loop above. There appears no reason why we should not try a proxy
656
                // connection to every inet address of each connection endpoint.
657
                socket = socketFactory.createSocket();
×
658
                StringUtils.requireNotNullNorEmpty(host, "Host of endpoint " + endpoint + " must not be null when using a Proxy");
×
659
                final String hostAndPort = host + " at port " + port;
×
660
                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
×
661
                try {
662
                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
×
663
                } catch (IOException e) {
×
664
                    CloseableUtil.maybeClose(socket, LOGGER);
×
665
                    RemoteConnectionException<Rfc6120TcpRemoteConnectionEndpoint> rce = new RemoteConnectionException<>(endpoint, null, e);
×
666
                    connectionExceptions.add(rce);
×
667
                    continue;
×
668
                }
×
669
                LOGGER.finer("Established TCP connection to " + hostAndPort);
×
670
                // We found a host to connect to, return here
671
                this.host = host;
×
672
                this.port = portUint16;
×
673
                return;
×
674
            }
675
        }
×
676

677
        // There are no more host addresses to try
678
        // throw an exception and report all tried
679
        // HostAddresses in the exception
680
        throw EndpointConnectionException.from(result.lookupFailures, connectionExceptions);
×
681
    }
682

683
    /**
684
     * Initializes the connection by creating a stanza reader and writer and opening a
685
     * XMPP stream to the server.
686
     *
687
     * @throws IOException if an I/O error occurred.
688
     * @throws InterruptedException if the calling thread was interrupted.
689
     */
690
    private void initConnection() throws IOException, InterruptedException {
691
        compressionHandler = null;
×
692

693
        // Set the reader and writer instance variables
694
        initReaderAndWriter();
×
695

696
        // Start the writer thread. This will open an XMPP stream to the server
697
        packetWriter.init();
×
698
        // Start the reader thread. The startup() method will block until we
699
        // get an opening stream packet back from server
700
        packetReader.init();
×
701
    }
×
702

703
    private void initReaderAndWriter() throws IOException {
704
        InputStream is = socket.getInputStream();
×
705
        OutputStream os = socket.getOutputStream();
×
706
        if (compressionHandler != null) {
×
707
            is = compressionHandler.getInputStream(is);
×
708
            os = compressionHandler.getOutputStream(os);
×
709
        }
710
        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
711
        writer = new OutputStreamWriter(os, "UTF-8");
×
712
        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
×
713

714
        // If debugging is enabled, we open a window and write out all network traffic.
715
        initDebugger();
×
716
    }
×
717

718
    /**
719
     * The server has indicated that TLS negotiation can start. We now need to secure the
720
     * existing plain connection and perform a handshake. This method won't return until the
721
     * connection has finished the handshake or an error occurred while securing the connection.
722
     * @throws IOException if an I/O error occurred.
723
     * @throws SecurityNotPossibleException if TLS is not possible.
724
     * @throws CertificateException if there is an issue with the certificate.
725
     */
726
    @SuppressWarnings("LiteralClassName")
727
    private void proceedTLSReceived() throws IOException, SecurityNotPossibleException, CertificateException {
728
        SmackTlsContext smackTlsContext = getSmackTlsContext();
×
729

730
        Socket plain = socket;
×
731
        int port = plain.getPort();
×
732
        String xmppServiceDomainString = config.getXMPPServiceDomain().toString();
×
733
        SSLSocketFactory sslSocketFactory = smackTlsContext.sslContext.getSocketFactory();
×
734
        // Secure the plain connection
735
        socket = sslSocketFactory.createSocket(plain, xmppServiceDomainString, port, true);
×
736

737
        final SSLSocket sslSocket = (SSLSocket) socket;
×
738
        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
739
        // important (at least on certain platforms) and it seems to be a good idea anyways to
740
        // prevent an accidental implicit handshake.
741
        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
×
742

743
        // Initialize the reader and writer with the new secured version
744
        initReaderAndWriter();
×
745

746
        // Proceed to do the handshake
747
        sslSocket.startHandshake();
×
748

749
        if (smackTlsContext.daneVerifier != null) {
×
750
            smackTlsContext.daneVerifier.finish(sslSocket.getSession());
×
751
        }
752

753
        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
×
754
        if (verifier == null) {
×
755
                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
×
756
        }
757

758
        final String verifierHostname;
759
        {
760
            DnsName xmppServiceDomainDnsName = getConfiguration().getXmppServiceDomainAsDnsNameIfPossible();
×
761
            // Try to convert the XMPP service domain, which potentially includes Unicode characters, into ASCII
762
            // Compatible Encoding (ACE) to match RFC3280 dNSname IA5String constraint.
763
            // See also: https://bugzilla.mozilla.org/show_bug.cgi?id=280839#c1
764
            if (xmppServiceDomainDnsName != null) {
×
765
                verifierHostname = xmppServiceDomainDnsName.ace;
×
766
            }
767
            else {
768
                LOGGER.log(Level.WARNING, "XMPP service domain name '" + getXMPPServiceDomain()
×
769
                                + "' can not be represented as DNS name. TLS X.509 certificate validiation may fail.");
770
                verifierHostname = getXMPPServiceDomain().toString();
×
771
            }
772
        }
773

774
        final boolean verificationSuccessful;
775
        // Verify the TLS session.
776
        verificationSuccessful = verifier.verify(verifierHostname, sslSocket.getSession());
×
777
        if (!verificationSuccessful) {
×
778
            throw new CertificateException(
×
779
                            "Hostname verification of certificate failed. Certificate does not authenticate "
780
                                            + getXMPPServiceDomain());
×
781
        }
782

783
        // Set that TLS was successful
784
        secureSocket = sslSocket;
×
785
    }
×
786

787
    /**
788
     * Returns the compression handler that can be used for one compression methods offered by the server.
789
     *
790
     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
791
     *
792
     */
793
    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
794
        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
×
795
                String method = handler.getCompressionMethod();
×
796
                if (compression.getMethods().contains(method))
×
797
                    return handler;
×
798
        }
×
799
        return null;
×
800
    }
801

802
    @Override
803
    public boolean isUsingCompression() {
804
        return compressionHandler != null && compressSyncPoint;
×
805
    }
806

807
    /**
808
     * <p>
809
     * Starts using stream compression that will compress network traffic. Traffic can be
810
     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
811
     * connection. However, the server and the client will need to use more CPU time in order to
812
     * un/compress network data so under high load the server performance might be affected.
813
     * </p>
814
     * <p>
815
     * Stream compression has to have been previously offered by the server. Currently only the
816
     * zlib method is supported by the client. Stream compression negotiation has to be done
817
     * before authentication took place.
818
     * </p>
819
     *
820
     * @throws NotConnectedException if the XMPP connection is not connected.
821
     * @throws SmackException if Smack detected an exceptional situation.
822
     * @throws InterruptedException if the calling thread was interrupted.
823
     * @throws XMPPException if an XMPP protocol error was received.
824
     */
825
    private void maybeEnableCompression() throws SmackException, InterruptedException, XMPPException {
826
        if (!config.isCompressionEnabled()) {
×
827
            return;
×
828
        }
829

830
        Compress.Feature compression = getFeature(Compress.Feature.class);
×
831
        if (compression == null) {
×
832
            // Server does not support compression
833
            return;
×
834
        }
835
        // If stream compression was offered by the server and we want to use
836
        // compression then send compression request to the server
837
        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
×
838
            compressSyncPoint = false;
×
839
            sendNonza(new Compress(compressionHandler.getCompressionMethod()));
×
840
            waitForConditionOrThrowConnectionException(() -> compressSyncPoint, "establishing stream compression");
×
841
        } else {
842
            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
×
843
        }
844
    }
×
845

846
    /**
847
     * Establishes a connection to the XMPP server. It basically
848
     * creates and maintains a socket connection to the server.
849
     * <p>
850
     * Listeners will be preserved from a previous connection if the reconnection
851
     * occurs after an abrupt termination.
852
     * </p>
853
     *
854
     * @throws XMPPException if an error occurs while trying to establish the connection.
855
     * @throws SmackException if Smack detected an exceptional situation.
856
     * @throws IOException if an I/O error occurred.
857
     * @throws InterruptedException if the calling thread was interrupted.
858
     */
859
    @Override
860
    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
861
        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
862
        // there is an error establishing the connection
863
        connectUsingConfiguration();
×
864

865
        connected = true;
×
866

867
        // We connected successfully to the servers TCP port
868
        initConnection();
×
869

870
        // TLS handled will be true either if TLS was established, or if it was not mandatory.
871
        waitForConditionOrThrowConnectionException(() -> tlsHandled, "establishing TLS");
×
872

873
        // Wait with SASL auth until the SASL mechanisms have been received
874
        waitForConditionOrThrowConnectionException(() -> saslFeatureReceived, "SASL mechanisms stream feature from server");
×
875
    }
×
876

877
    /**
878
     * For unit testing purposes
879
     *
880
     * @param writer TODO javadoc me please
881
     */
882
    protected void setWriter(Writer writer) {
883
        this.writer = writer;
1✔
884
    }
1✔
885

886
    @Override
887
    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
888
        StartTls startTlsFeature = getFeature(StartTls.class);
×
889
        if (startTlsFeature != null) {
×
890
            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
×
891
                SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
×
892
                currentSmackException = smackException;
×
893
                notifyWaitingThreads();
×
894
                throw smackException;
×
895
            }
896

897
            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
×
898
                sendNonza(new StartTls());
×
899
            } else {
900
                tlsHandled = true;
×
901
                notifyWaitingThreads();
×
902
            }
903
        } else {
904
            tlsHandled = true;
×
905
            notifyWaitingThreads();
×
906
        }
907

908
        if (isSaslAuthenticated()) {
×
909
            // If we have received features after the SASL has been successfully completed, then we
910
            // have also *maybe* received, as it is an optional feature, the compression feature
911
            // from the server.
912
            streamFeaturesAfterAuthenticationReceived = true;
×
913
            notifyWaitingThreads();
×
914
        }
915
    }
×
916

917
    private void resetParser() throws IOException {
918
        try {
919
            packetReader.parser = SmackXmlParser.newXmlParser(reader);
×
920
        } catch (XmlPullParserException e) {
×
921
            throw new IOException(e);
×
922
        }
×
923
   }
×
924

925
    private void openStreamAndResetParser() throws IOException, NotConnectedException, InterruptedException {
926
        sendStreamOpen();
×
927
        resetParser();
×
928
    }
×
929

930
    protected class PacketReader {
1✔
931

932
        private final String threadName = "Smack Reader (" + getConnectionCounter() + ')';
1✔
933

934
        XmlPullParser parser;
935

936
        private volatile boolean done;
937

938
        private boolean running;
939

940
        /**
941
         * Initializes the reader in order to be used. The reader is initialized during the
942
         * first connection and when reconnecting due to an abruptly disconnection.
943
         */
944
        void init() {
945
            done = false;
×
946

947
            running = true;
×
948
            Async.go(new Runnable() {
×
949
                @Override
950
                public void run() {
951
                    LOGGER.finer(threadName + " start");
×
952
                    try {
953
                        parsePackets();
×
954
                    } finally {
955
                        LOGGER.finer(threadName + " exit");
×
956
                        running = false;
×
957
                        notifyWaitingThreads();
×
958
                    }
959
                }
×
960
            }, threadName);
961
         }
×
962

963
        /**
964
         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
965
         */
966
        void shutdown() {
967
            done = true;
×
968
        }
×
969

970
        /**
971
         * Parse top-level packets in order to process them further.
972
         */
973
        private void parsePackets() {
974
            try {
975
                openStreamAndResetParser();
×
976
                XmlPullParser.Event eventType = parser.getEventType();
×
977
                while (!done) {
×
978
                    switch (eventType) {
×
979
                    case START_ELEMENT:
980
                        final String name = parser.getName();
×
981
                        final String namespace = parser.getNamespace();
×
982

983
                        switch (name) {
×
984
                        case Message.ELEMENT:
985
                        case IQ.IQ_ELEMENT:
986
                        case Presence.ELEMENT:
987
                            try {
988
                                parseAndProcessStanza(parser);
×
989
                            } finally {
990
                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
×
991
                            }
992
                            break;
×
993
                        case "stream":
994
                            if (StreamOpen.ETHERX_JABBER_STREAMS_NAMESPACE.equals(namespace)) {
×
995
                                onStreamOpen(parser);
×
996
                            }
997
                            break;
998
                        case "error":
999
                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
×
1000
                            // Stream errors are non recoverable, throw this exceptions. Also note that this will set
1001
                            // this exception as current connection exceptions and notify any waiting threads.
1002
                            throw new StreamErrorException(streamError);
×
1003
                        case "features":
1004
                            parseFeaturesAndNotify(parser);
×
1005
                            break;
×
1006
                        case "proceed":
1007
                            // Secure the connection by negotiating TLS
1008
                            proceedTLSReceived();
×
1009
                            // Send a new opening stream to the server
1010
                            openStreamAndResetParser();
×
1011
                            break;
×
1012
                        case "failure":
1013
                            switch (namespace) {
×
1014
                            case "urn:ietf:params:xml:ns:xmpp-tls":
1015
                                // TLS negotiation has failed. The server will close the connection
1016
                                // TODO Parse failure stanza
1017
                                throw new SmackException.SmackMessageException("TLS negotiation has failed");
×
1018
                            case "http://jabber.org/protocol/compress":
1019
                                // Stream compression has been denied. This is a recoverable
1020
                                // situation. It is still possible to authenticate and
1021
                                // use the connection but using an uncompressed connection
1022
                                // TODO Parse failure stanza
1023
                                currentSmackException = new SmackException.SmackMessageException("Could not establish compression");
×
1024
                                notifyWaitingThreads();
×
1025
                                break;
×
1026
                            default:
1027
                                parseAndProcessNonza(parser);
×
1028
                            }
1029
                            break;
×
1030
                        case Compressed.ELEMENT:
1031
                            // Server confirmed that it's possible to use stream compression. Start
1032
                            // stream compression
1033
                            // Initialize the reader and writer with the new compressed version
1034
                            initReaderAndWriter();
×
1035
                            // Send a new opening stream to the server
1036
                            openStreamAndResetParser();
×
1037
                            // Notify that compression is being used
1038
                            compressSyncPoint = true;
×
1039
                            notifyWaitingThreads();
×
1040
                            break;
×
1041
                        case Enabled.ELEMENT:
1042
                            Enabled enabled = ParseStreamManagement.enabled(parser);
×
1043
                            if (enabled.isResumeSet()) {
×
1044
                                smSessionId = enabled.getId();
×
1045
                                if (StringUtils.isNullOrEmpty(smSessionId)) {
×
1046
                                    SmackException xmppException = new SmackException.SmackMessageException("Stream Management 'enabled' element with resume attribute but without session id received");
×
1047
                                    setCurrentConnectionExceptionAndNotify(xmppException);
×
1048
                                    throw xmppException;
×
1049
                                }
1050
                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
×
1051
                            } else {
1052
                                // Mark this a non-resumable stream by setting smSessionId to null
1053
                                smSessionId = null;
×
1054
                            }
1055
                            clientHandledStanzasCount = 0;
×
1056
                            smWasEnabledAtLeastOnce = true;
×
1057
                            smEnabledSyncPoint = true;
×
1058
                            notifyWaitingThreads();
×
1059
                            break;
×
1060
                        case Failed.ELEMENT:
1061
                            Failed failed = ParseStreamManagement.failed(parser);
×
1062
                            if (smResumedSyncPoint == SyncPointState.request_sent) {
×
1063
                                // This is a <failed/> nonza in a response to resuming a previous stream, failure to do
1064
                                // so is non-fatal as we can simply continue with resource binding in this case.
1065
                                smResumptionFailed = failed;
×
1066
                                notifyWaitingThreads();
×
1067
                            } else {
1068
                                FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
×
1069
                                setCurrentConnectionExceptionAndNotify(xmppException);
×
1070
                            }
1071
                            break;
×
1072
                        case Resumed.ELEMENT:
1073
                            Resumed resumed = ParseStreamManagement.resumed(parser);
×
1074
                            if (!smSessionId.equals(resumed.getPrevId())) {
×
1075
                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
×
1076
                            }
1077
                            // Mark SM as enabled
1078
                            smEnabledSyncPoint = true;
×
1079
                            // First, drop the stanzas already handled by the server
1080
                            processHandledCount(resumed.getHandledCount());
×
1081
                            // Then re-send what is left in the unacknowledged queue
1082
                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
×
1083
                            unacknowledgedStanzas.drainTo(stanzasToResend);
×
1084
                            for (Stanza stanza : stanzasToResend) {
×
1085
                                XMPPTCPConnection.this.sendInternal(stanza);
×
1086
                            }
×
1087
                            // If there where stanzas resent, then request a SM ack for them.
1088
                            // Writer's sendStreamElement() won't do it automatically based on
1089
                            // predicates.
1090
                            if (!stanzasToResend.isEmpty()) {
×
1091
                                requestSmAcknowledgementInternal();
×
1092
                            }
1093
                            // Mark SM resumption as successful
1094
                            smResumedSyncPoint = SyncPointState.successful;
×
1095
                            notifyWaitingThreads();
×
1096
                            break;
×
1097
                        case AckAnswer.ELEMENT:
1098
                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
×
1099
                            processHandledCount(ackAnswer.getHandledCount());
×
1100
                            break;
×
1101
                        case AckRequest.ELEMENT:
1102
                            ParseStreamManagement.ackRequest(parser);
×
1103
                            if (smEnabledSyncPoint) {
×
1104
                                sendSmAcknowledgementInternal();
×
1105
                            } else {
1106
                                LOGGER.warning("SM Ack Request received while SM is not enabled");
×
1107
                            }
1108
                            break;
×
1109
                         default:
1110
                             parseAndProcessNonza(parser);
×
1111
                             break;
1112
                        }
1113
                        break;
×
1114
                    case END_ELEMENT:
1115
                        final String endTagName = parser.getName();
×
1116
                        if ("stream".equals(endTagName)) {
×
1117
                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
×
1118
                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
×
1119
                                break;
×
1120
                            }
1121

1122
                            // Check if the queue was already shut down before reporting success on closing stream tag
1123
                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1124
                            // did re-start the queue again, causing this writer to assume that the queue is not
1125
                            // shutdown, which results in a call to disconnect().
1126
                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
×
1127
                            closingStreamReceived = true;
×
1128
                            notifyWaitingThreads();
×
1129

1130
                            if (queueWasShutdown) {
×
1131
                                // We received a closing stream element *after* we initiated the
1132
                                // termination of the session by sending a closing stream element to
1133
                                // the server first
1134
                                return;
×
1135
                            } else {
1136
                                // We received a closing stream element from the server without us
1137
                                // sending a closing stream element first. This means that the
1138
                                // server wants to terminate the session, therefore disconnect
1139
                                // the connection
1140
                                LOGGER.info(XMPPTCPConnection.this
×
1141
                                                + " received closing </stream> element."
1142
                                                + " Server wants to terminate the connection, calling disconnect()");
1143
                                ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
×
1144
                                    @Override
1145
                                    public void run() {
1146
                                        disconnect();
×
1147
                                    }});
×
1148
                            }
1149
                        }
×
1150
                        break;
1151
                    case END_DOCUMENT:
1152
                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1153
                        // closing stream element before.
1154
                        throw new SmackException.SmackMessageException(
×
1155
                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1156
                    default:
1157
                        // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
1158
                        break;
1159
                    }
1160
                    eventType = parser.next();
×
1161
                }
1162
            }
1163
            catch (Exception e) {
×
1164
                // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1165
                // the reader and writer thread's 'running' value is false. Hence we need to set it to false before calling
1166
                // notifyConnetctionError() below, even though run() also sets it to false. Therefore, do not remove this.
1167
                running = false;
×
1168

1169
                String ignoreReasonThread = null;
×
1170

1171
                boolean writerThreadWasShutDown = packetWriter.queue.isShutdown();
×
1172
                if (writerThreadWasShutDown) {
×
1173
                    ignoreReasonThread = "writer";
×
1174
                } else if (done) {
×
1175
                    ignoreReasonThread = "reader";
×
1176
                }
1177

1178
                if (ignoreReasonThread != null) {
×
1179
                    LOGGER.log(Level.FINER, "Ignoring " + e + " as " + ignoreReasonThread + " was already shut down");
×
1180
                    return;
×
1181
                }
1182

1183
                // Close the connection and notify connection listeners of the error.
1184
                notifyConnectionError(e);
×
1185
            }
×
1186
        }
×
1187
    }
1188

1189
    protected class PacketWriter {
1✔
1190
        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1191
        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE = 1024;
1192
        public static final int UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK = (int) (0.3 * UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
1193

1194
        private final String threadName = "Smack Writer (" + getConnectionCounter() + ')';
1✔
1195

1196
        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1✔
1197
                        QUEUE_SIZE, true);
1198

1199
        /**
1200
         * If set, the stanza writer is shut down
1201
         */
1202
        protected volatile Long shutdownTimestamp = null;
1✔
1203

1204
        private volatile boolean instantShutdown;
1205

1206
        /**
1207
         * True if some preconditions are given to start the bundle and defer mechanism.
1208
         * <p>
1209
         * This will likely get set to true right after the start of the writer thread, because
1210
         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1211
         * this field to true.
1212
         * </p>
1213
         */
1214
        private boolean shouldBundleAndDefer;
1215

1216
        private boolean running;
1217

1218
        /**
1219
        * Initializes the writer in order to be used. It is called at the first connection and also
1220
        * is invoked if the connection is disconnected by an error.
1221
        */
1222
        void init() {
1223
            shutdownTimestamp = null;
1✔
1224

1225
            if (unacknowledgedStanzas != null) {
1✔
1226
                // It's possible that there are new stanzas in the writer queue that
1227
                // came in while we were disconnected but resumable, drain those into
1228
                // the unacknowledged queue so that they get resent now
1229
                drainWriterQueueToUnacknowledgedStanzas();
×
1230
            }
1231

1232
            queue.start();
1✔
1233
            running = true;
1✔
1234
            Async.go(new Runnable() {
1✔
1235
                @Override
1236
                public void run() {
1237
                    LOGGER.finer(threadName + " start");
1✔
1238
                    try {
1239
                        writePackets();
1✔
1240
                    } finally {
1241
                        LOGGER.finer(threadName + " exit");
1✔
1242
                        running = false;
1✔
1243
                        notifyWaitingThreads();
1✔
1244
                    }
1245
                }
1✔
1246
            }, threadName);
1247
        }
1✔
1248

1249
        private boolean done() {
1250
            return shutdownTimestamp != null;
1✔
1251
        }
1252

1253
        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1254
            final boolean done = done();
1✔
1255
            if (done) {
1✔
1256
                final boolean smResumptionPossible = isSmResumptionPossible();
1✔
1257
                // Don't throw a NotConnectedException is there is an resumable stream available
1258
                if (!smResumptionPossible) {
1✔
1259
                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1✔
1260
                                    + " smResumptionPossible=" + smResumptionPossible);
1261
                }
1262
            }
1263
        }
1✔
1264

1265
        /**
1266
         * Sends the specified element to the server.
1267
         *
1268
         * @param element the element to send.
1269
         * @throws NotConnectedException if the XMPP connection is not connected.
1270
         * @throws InterruptedException if the calling thread was interrupted.
1271
         */
1272
        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1273
            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1✔
1274
            try {
1275
                queue.put(element);
1✔
1276
            }
1277
            catch (InterruptedException e) {
1✔
1278
                // put() may throw an InterruptedException for two reasons:
1279
                // 1. If the queue was shut down
1280
                // 2. If the thread was interrupted
1281
                // so we have to check which is the case
1282
                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
×
1283
                // If the method above did not throw, then the sending thread was interrupted
1284
                throw e;
×
1285
            }
1✔
1286
        }
1✔
1287

1288
        /**
1289
         * Sends the specified element to the server.
1290
         *
1291
         * @param element the element to send.
1292
         * @throws NotConnectedException if the XMPP connection is not connected.
1293
         * @throws OutgoingQueueFullException if there is no space in the outgoing queue.
1294
         */
1295
        protected void sendNonBlocking(Element element) throws NotConnectedException, OutgoingQueueFullException {
1296
            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
×
1297
            boolean enqueued = queue.offer(element);
×
1298
            if (!enqueued) {
×
1299
                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
×
1300
                throw new OutgoingQueueFullException();
×
1301
            }
1302
        }
×
1303

1304
        /**
1305
         * Shuts down the stanza writer. Once this method has been called, no further
1306
         * packets will be written to the server.
1307
         */
1308
        void shutdown(boolean instant) {
1309
            instantShutdown = instant;
1✔
1310
            queue.shutdown();
1✔
1311
            shutdownTimestamp = System.currentTimeMillis();
1✔
1312
        }
1✔
1313

1314
        /**
1315
         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1316
         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1317
         * that case.
1318
         *
1319
         * @return the next element for writing or null.
1320
         */
1321
        private Element nextStreamElement() {
1322
            // It is important the we check if the queue is empty before removing an element from it
1323
            if (queue.isEmpty()) {
1✔
1324
                shouldBundleAndDefer = true;
1✔
1325
            }
1326
            Element packet = null;
1✔
1327
            try {
1328
                packet = queue.take();
1✔
1329
            }
1330
            catch (InterruptedException e) {
×
1331
                if (!queue.isShutdown()) {
×
1332
                    // Users shouldn't try to interrupt the packet writer thread
1333
                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
×
1334
                }
1335
            }
1✔
1336
            return packet;
1✔
1337
        }
1338

1339
        private void writePackets() {
1340
            try {
1341
                // Write out packets from the queue.
1342
                while (!done()) {
1✔
1343
                    Element element = nextStreamElement();
1✔
1344
                    if (element == null) {
1✔
1345
                        continue;
×
1346
                    }
1347

1348
                    // Get a local version of the bundle and defer callback, in case it's unset
1349
                    // between the null check and the method invocation
1350
                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1✔
1351
                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1352
                    // empty), then we could wait a bit for further stanzas attempting to decrease
1353
                    // our energy consumption
1354
                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1✔
1355
                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1356
                        // queue is empty again.
1357
                        shouldBundleAndDefer = false;
×
1358
                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
×
1359
                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
×
1360
                                        bundlingAndDeferringStopped));
1361
                        if (bundleAndDeferMillis > 0) {
×
1362
                            long remainingWait = bundleAndDeferMillis;
×
1363
                            final long waitStart = System.currentTimeMillis();
×
1364
                            synchronized (bundlingAndDeferringStopped) {
×
1365
                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
×
1366
                                    bundlingAndDeferringStopped.wait(remainingWait);
×
1367
                                    remainingWait = bundleAndDeferMillis
×
1368
                                                    - (System.currentTimeMillis() - waitStart);
×
1369
                                }
1370
                            }
×
1371
                        }
1372
                    }
1373

1374
                    Stanza packet = null;
1✔
1375
                    if (element instanceof Stanza) {
1✔
1376
                        packet = (Stanza) element;
1✔
1377
                    }
1378
                    else if (element instanceof Enable) {
×
1379
                        // The client needs to add messages to the unacknowledged stanzas queue
1380
                        // right after it sent 'enabled'. Stanza will be added once
1381
                        // unacknowledgedStanzas is not null.
1382
                        unacknowledgedStanzas = new ArrayBlockingQueue<>(UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE);
×
1383
                    }
1384
                    maybeAddToUnacknowledgedStanzas(packet);
1✔
1385

1386
                    CharSequence elementXml = element.toXML(outgoingStreamXmlEnvironment);
1✔
1387
                    if (elementXml instanceof XmlStringBuilder) {
1✔
1388
                        try {
1389
                            ((XmlStringBuilder) elementXml).write(writer, outgoingStreamXmlEnvironment);
×
1390
                        } catch (NullPointerException npe) {
1✔
1391
                            LOGGER.log(Level.FINE, "NPE in XmlStringBuilder of " + element.getClass() + ": " + element, npe);
1✔
1392
                            throw npe;
1✔
1393
                        }
×
1394
                    }
1395
                    else {
1396
                        writer.write(elementXml.toString());
×
1397
                    }
1398

1399
                    if (queue.isEmpty()) {
×
1400
                        writer.flush();
×
1401
                    }
1402
                    if (packet != null) {
×
1403
                        firePacketSendingListeners(packet);
×
1404
                    }
1405
                }
×
1406
                if (!instantShutdown) {
×
1407
                    // Flush out the rest of the queue.
1408
                    try {
1409
                        while (!queue.isEmpty()) {
×
1410
                            Element packet = queue.remove();
×
1411
                            if (packet instanceof Stanza) {
×
1412
                                Stanza stanza = (Stanza) packet;
×
1413
                                maybeAddToUnacknowledgedStanzas(stanza);
×
1414
                            }
1415
                            writer.write(packet.toXML().toString());
×
1416
                        }
×
1417
                    }
1418
                    catch (Exception e) {
×
1419
                        LOGGER.log(Level.WARNING,
×
1420
                                        "Exception flushing queue during shutdown, ignore and continue",
1421
                                        e);
1422
                    }
×
1423

1424
                    // Close the stream.
1425
                    try {
1426
                        writer.write("</stream:stream>");
×
1427
                        writer.flush();
×
1428
                    }
1429
                    catch (Exception e) {
×
1430
                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
×
1431
                    }
×
1432

1433
                    // Delete the queue contents (hopefully nothing is left).
1434
                    queue.clear();
×
1435
                } else if (instantShutdown && isSmEnabled()) {
×
1436
                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1437
                    // into the unacknowledgedStanzas queue
1438
                    drainWriterQueueToUnacknowledgedStanzas();
×
1439
                }
1440
                // Do *not* close the writer here, as it will cause the socket
1441
                // to get closed. But we may want to receive further stanzas
1442
                // until the closing stream tag is received. The socket will be
1443
                // closed in shutdown().
1444
            }
1445
            catch (Exception e) {
1✔
1446
                // The exception can be ignored if the connection is 'done'
1447
                // or if the it was caused because the socket got closed
1448
                if (!(done() || queue.isShutdown())) {
1✔
1449
                    // Set running to false since this thread will exit here and notifyConnectionError() will wait until
1450
                    // the reader and writer thread's 'running' value is false.
1451
                    running = false;
×
1452
                    notifyConnectionError(e);
×
1453
                } else {
1454
                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1✔
1455
                }
1456
            }
×
1457
        }
1✔
1458

1459
        private void drainWriterQueueToUnacknowledgedStanzas() {
1460
            List<Element> elements = new ArrayList<>(queue.size());
×
1461
            queue.drainTo(elements);
×
1462
            for (int i = 0; i < elements.size(); i++) {
×
1463
                Element element = elements.get(i);
×
1464
                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1465
                if (unacknowledgedStanzas.remainingCapacity() == 0) {
×
1466
                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
×
1467
                            .newWith(i, elements, unacknowledgedStanzas);
×
1468
                    LOGGER.log(Level.WARNING,
×
1469
                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1470
                    return;
×
1471
                }
1472
                if (element instanceof Stanza) {
×
1473
                    unacknowledgedStanzas.add((Stanza) element);
×
1474
                }
1475
            }
1476
        }
×
1477

1478
        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1479
            // Check if the stream element should be put to the unacknowledgedStanza
1480
            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1481
            // packet order is not stable at this point (sendStanzaInternal() can be
1482
            // called concurrently).
1483
            if (unacknowledgedStanzas != null && stanza != null) {
1✔
1484
                // If the unacknowledgedStanza queue reaching its high water mark, request an new ack
1485
                // from the server in order to drain it
1486
                if (unacknowledgedStanzas.size() == UNACKKNOWLEDGED_STANZAS_QUEUE_SIZE_HIGH_WATER_MARK) {
×
1487
                    writer.write(AckRequest.INSTANCE.toXML().toString());
×
1488
                }
1489

1490
                try {
1491
                    // It is important the we put the stanza in the unacknowledged stanza
1492
                    // queue before we put it on the wire
1493
                    unacknowledgedStanzas.put(stanza);
×
1494
                }
1495
                catch (InterruptedException e) {
×
1496
                    throw new IllegalStateException(e);
×
1497
                }
×
1498
            }
1499
        }
1✔
1500
    }
1501

1502
    /**
1503
     * Set if Stream Management should be used by default for new connections.
1504
     *
1505
     * @param useSmDefault true to use Stream Management for new connections.
1506
     */
1507
    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1508
        XMPPTCPConnection.useSmDefault = useSmDefault;
×
1509
    }
×
1510

1511
    /**
1512
     * Set if Stream Management resumption should be used by default for new connections.
1513
     *
1514
     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1515
     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1516
     */
1517
    @Deprecated
1518
    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1519
        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
×
1520
    }
×
1521

1522
    /**
1523
     * Set if Stream Management resumption should be used by default for new connections.
1524
     *
1525
     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1526
     */
1527
    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1528
        if (useSmResumptionDefault) {
×
1529
            // Also enable SM is resumption is enabled
1530
            setUseStreamManagementDefault(useSmResumptionDefault);
×
1531
        }
1532
        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
×
1533
    }
×
1534

1535
    /**
1536
     * Set if Stream Management should be used if supported by the server.
1537
     *
1538
     * @param useSm true to use Stream Management.
1539
     */
1540
    public void setUseStreamManagement(boolean useSm) {
1541
        this.useSm = useSm;
×
1542
    }
×
1543

1544
    /**
1545
     * Set if Stream Management resumption should be used if supported by the server.
1546
     *
1547
     * @param useSmResumption true to use Stream Management resumption.
1548
     */
1549
    public void setUseStreamManagementResumption(boolean useSmResumption) {
1550
        if (useSmResumption) {
×
1551
            // Also enable SM is resumption is enabled
1552
            setUseStreamManagement(useSmResumption);
×
1553
        }
1554
        this.useSmResumption = useSmResumption;
×
1555
    }
×
1556

1557
    /**
1558
     * Set the preferred resumption time in seconds.
1559
     * @param resumptionTime the preferred resumption time in seconds
1560
     */
1561
    public void setPreferredResumptionTime(int resumptionTime) {
1562
        smClientMaxResumptionTime = resumptionTime;
×
1563
    }
×
1564

1565
    /**
1566
     * Add a predicate for Stream Management acknowledgment requests.
1567
     * <p>
1568
     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1569
     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1570
     * </p>
1571
     * <p>
1572
     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1573
     * </p>
1574
     *
1575
     * @param predicate the predicate to add.
1576
     * @return if the predicate was not already active.
1577
     */
1578
    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1579
        synchronized (requestAckPredicates) {
×
1580
            return requestAckPredicates.add(predicate);
×
1581
        }
1582
    }
1583

1584
    /**
1585
     * Remove the given predicate for Stream Management acknowledgment request.
1586
     * @param predicate the predicate to remove.
1587
     * @return true if the predicate was removed.
1588
     */
1589
    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1590
        synchronized (requestAckPredicates) {
×
1591
            return requestAckPredicates.remove(predicate);
×
1592
        }
1593
    }
1594

1595
    /**
1596
     * Remove all predicates for Stream Management acknowledgment requests.
1597
     */
1598
    public void removeAllRequestAckPredicates() {
1599
        synchronized (requestAckPredicates) {
×
1600
            requestAckPredicates.clear();
×
1601
        }
×
1602
    }
×
1603

1604
    /**
1605
     * Send an unconditional Stream Management acknowledgement request to the server.
1606
     *
1607
     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1608
     * @throws NotConnectedException if the connection is not connected.
1609
     * @throws InterruptedException if the calling thread was interrupted.
1610
     */
1611
    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1612
        if (!isSmEnabled()) {
×
1613
            throw new StreamManagementException.StreamManagementNotEnabledException();
×
1614
        }
1615
        requestSmAcknowledgementInternal();
×
1616
    }
×
1617

1618
    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1619
        packetWriter.sendStreamElement(AckRequest.INSTANCE);
×
1620
    }
×
1621

1622
    private void requestSmAcknowledgementNonBlockingInternal() throws NotConnectedException, OutgoingQueueFullException {
1623
        packetWriter.sendNonBlocking(AckRequest.INSTANCE);
×
1624
    }
×
1625

1626
    /**
1627
     * Send a unconditional Stream Management acknowledgment to the server.
1628
     * <p>
1629
     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management § 4. Acks</a>:
1630
     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1631
     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1632
     * </p>
1633
     *
1634
     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1635
     * @throws NotConnectedException if the connection is not connected.
1636
     * @throws InterruptedException if the calling thread was interrupted.
1637
     */
1638
    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1639
        if (!isSmEnabled()) {
×
1640
            throw new StreamManagementException.StreamManagementNotEnabledException();
×
1641
        }
1642
        sendSmAcknowledgementInternal();
×
1643
    }
×
1644

1645
    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1646
        AckAnswer ackAnswer = new AckAnswer(clientHandledStanzasCount);
×
1647
        // Do net put an ack to the queue if it has already been shutdown. Some servers, like ejabberd, like to request
1648
        // an ack even after we have send a stream close (and hance the queue was shutdown). If we would not check here,
1649
        // then the ack would dangle around in the queue, and be send on the next re-connection attempt even before the
1650
        // stream open.
1651
        packetWriter.queue.putIfNotShutdown(ackAnswer);
×
1652
    }
×
1653

1654
    /**
1655
     * Add a Stanza acknowledged listener.
1656
     * <p>
1657
     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1658
     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1659
     * possible.
1660
     * </p>
1661
     *
1662
     * @param listener the listener to add.
1663
     */
1664
    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1665
        stanzaAcknowledgedListeners.add(listener);
×
1666
    }
×
1667

1668
    /**
1669
     * Remove the given Stanza acknowledged listener.
1670
     *
1671
     * @param listener the listener.
1672
     * @return true if the listener was removed.
1673
     */
1674
    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1675
        return stanzaAcknowledgedListeners.remove(listener);
×
1676
    }
1677

1678
    /**
1679
     * Remove all stanza acknowledged listeners.
1680
     */
1681
    public void removeAllStanzaAcknowledgedListeners() {
1682
        stanzaAcknowledgedListeners.clear();
×
1683
    }
×
1684

1685
    /**
1686
     * Add a Stanza dropped listener.
1687
     * <p>
1688
     * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
1689
     * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
1690
     * the Stanzas.
1691
     * </p>
1692
     *
1693
     * @param listener the listener to add.
1694
     * @since 4.3.3
1695
     */
1696
    public void addStanzaDroppedListener(StanzaListener listener) {
1697
        stanzaDroppedListeners.add(listener);
×
1698
    }
×
1699

1700
    /**
1701
     * Remove the given Stanza dropped listener.
1702
     *
1703
     * @param listener the listener.
1704
     * @return true if the listener was removed.
1705
     * @since 4.3.3
1706
     */
1707
    public boolean removeStanzaDroppedListener(StanzaListener listener) {
1708
        return stanzaDroppedListeners.remove(listener);
×
1709
    }
1710

1711
    /**
1712
     * Add a new Stanza ID acknowledged listener for the given ID.
1713
     * <p>
1714
     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1715
     * automatically be removed after the listener was run.
1716
     * </p>
1717
     *
1718
     * @param id the stanza ID.
1719
     * @param listener the listener to invoke.
1720
     * @return the previous listener for this stanza ID or null.
1721
     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1722
     */
1723
    @SuppressWarnings("FutureReturnValueIgnored")
1724
    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1725
        // Prevent users from adding callbacks that will never get removed
1726
        if (!smWasEnabledAtLeastOnce) {
×
1727
            throw new StreamManagementException.StreamManagementNotEnabledException();
×
1728
        }
1729
        // Remove the listener after max. 3 hours
1730
        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
×
1731
        schedule(new Runnable() {
×
1732
            @Override
1733
            public void run() {
1734
                stanzaIdAcknowledgedListeners.remove(id);
×
1735
            }
×
1736
        }, removeAfterSeconds, TimeUnit.SECONDS);
1737
        return stanzaIdAcknowledgedListeners.put(id, listener);
×
1738
    }
1739

1740
    /**
1741
     * Remove the Stanza ID acknowledged listener for the given ID.
1742
     *
1743
     * @param id the stanza ID.
1744
     * @return true if the listener was found and removed, false otherwise.
1745
     */
1746
    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1747
        return stanzaIdAcknowledgedListeners.remove(id);
×
1748
    }
1749

1750
    /**
1751
     * Removes all Stanza ID acknowledged listeners.
1752
     */
1753
    public void removeAllStanzaIdAcknowledgedListeners() {
1754
        stanzaIdAcknowledgedListeners.clear();
×
1755
    }
×
1756

1757
    /**
1758
     * Returns true if Stream Management is supported by the server.
1759
     *
1760
     * @return true if Stream Management is supported by the server.
1761
     */
1762
    public boolean isSmAvailable() {
1763
        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
×
1764
    }
1765

1766
    /**
1767
     * Returns true if Stream Management was successfully negotiated with the server.
1768
     *
1769
     * @return true if Stream Management was negotiated.
1770
     */
1771
    public boolean isSmEnabled() {
1772
        return smEnabledSyncPoint;
×
1773
    }
1774

1775
    /**
1776
     * Returns true if the stream was successfully resumed with help of Stream Management.
1777
     *
1778
     * @return true if the stream was resumed.
1779
     */
1780
    public boolean streamWasResumed() {
1781
        return smResumedSyncPoint == SyncPointState.successful;
×
1782
    }
1783

1784
    /**
1785
     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1786
     *
1787
     * @return true if disconnected but resumption possible.
1788
     */
1789
    public boolean isDisconnectedButSmResumptionPossible() {
1790
        return disconnectedButResumeable && isSmResumptionPossible();
×
1791
    }
1792

1793
    /**
1794
     * Returns true if the stream is resumable.
1795
     *
1796
     * @return true if the stream is resumable.
1797
     */
1798
    public boolean isSmResumptionPossible() {
1799
        // There is no resumable stream available
1800
        if (smSessionId == null)
1✔
1801
            return false;
1✔
1802

1803
        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
×
1804
        // Seems like we are already reconnected, report true
1805
        if (shutdownTimestamp == null) {
×
1806
            return true;
×
1807
        }
1808

1809
        // See if resumption time is over
1810
        long current = System.currentTimeMillis();
×
1811
        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
×
1812
        if (current > shutdownTimestamp + maxResumptionMillies) {
×
1813
            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1814
            // resumption is possible
1815
            return false;
×
1816
        } else {
1817
            return true;
×
1818
        }
1819
    }
1820

1821
    /**
1822
     * Drop the stream management state. Sets {@link #smSessionId} and
1823
     * {@link #unacknowledgedStanzas} to <code>null</code>.
1824
     */
1825
    private void dropSmState() {
1826
        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1827
        // respective. No need to reset them here.
1828
        smSessionId = null;
×
1829
        unacknowledgedStanzas = null;
×
1830
    }
×
1831

1832
    /**
1833
     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1834
     * <p>
1835
     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1836
     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1837
     * without checking for overflows before.
1838
     * </p>
1839
     *
1840
     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1841
     */
1842
    public int getMaxSmResumptionTime() {
1843
        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
×
1844
        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
×
1845
        return Math.min(clientResumptionTime, serverResumptionTime);
×
1846
    }
1847

1848
    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1849
        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
×
1850
        final List<Stanza> ackedStanzas = new ArrayList<>(
×
1851
                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
×
1852
                                        : Integer.MAX_VALUE);
×
1853
        for (long i = 0; i < ackedStanzasCount; i++) {
×
1854
            Stanza ackedStanza = unacknowledgedStanzas.poll();
×
1855
            // If the server ack'ed a stanza, then it must be in the
1856
            // unacknowledged stanza queue. There can be no exception.
1857
            if (ackedStanza == null) {
×
1858
                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
×
1859
                                ackedStanzasCount, ackedStanzas);
1860
            }
1861
            ackedStanzas.add(ackedStanza);
×
1862
        }
1863

1864
        boolean atLeastOneStanzaAcknowledgedListener = false;
×
1865
        if (!stanzaAcknowledgedListeners.isEmpty()) {
×
1866
            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1867
            atLeastOneStanzaAcknowledgedListener = true;
×
1868
        }
1869
        else {
1870
            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1871
            for (Stanza ackedStanza : ackedStanzas) {
×
1872
                String id = ackedStanza.getStanzaId();
×
1873
                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
×
1874
                    atLeastOneStanzaAcknowledgedListener = true;
×
1875
                    break;
×
1876
                }
1877
            }
×
1878
        }
1879

1880
        // Only spawn a new thread if there is a chance that some listener is invoked
1881
        if (atLeastOneStanzaAcknowledgedListener) {
×
1882
            asyncGo(new Runnable() {
×
1883
                @Override
1884
                public void run() {
1885
                    for (Stanza ackedStanza : ackedStanzas) {
×
1886
                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
×
1887
                            try {
1888
                                listener.processStanza(ackedStanza);
×
1889
                            }
1890
                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
×
1891
                                LOGGER.log(Level.FINER, "Received exception", e);
×
1892
                            }
×
1893
                        }
×
1894
                        String id = ackedStanza.getStanzaId();
×
1895
                        if (StringUtils.isNullOrEmpty(id)) {
×
1896
                            continue;
×
1897
                        }
1898
                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
×
1899
                        if (listener != null) {
×
1900
                            try {
1901
                                listener.processStanza(ackedStanza);
×
1902
                            }
1903
                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
×
1904
                                LOGGER.log(Level.FINER, "Received exception", e);
×
1905
                            }
×
1906
                        }
1907
                    }
×
1908
                }
×
1909
            });
1910
        }
1911

1912
        serverHandledStanzasCount = handledCount;
×
1913
    }
×
1914

1915
    /**
1916
     * Set the default bundle and defer callback used for new connections.
1917
     *
1918
     * @param defaultBundleAndDeferCallback TODO javadoc me please
1919
     * @see BundleAndDeferCallback
1920
     * @since 4.1
1921
     */
1922
    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1923
        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
×
1924
    }
×
1925

1926
    /**
1927
     * Set the bundle and defer callback used for this connection.
1928
     * <p>
1929
     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1930
     * no longer get deferred.
1931
     * </p>
1932
     *
1933
     * @param bundleAndDeferCallback the callback or <code>null</code>.
1934
     * @see BundleAndDeferCallback
1935
     * @since 4.1
1936
     */
1937
    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1938
        this.bundleAndDeferCallback = bundleAndDeferCallback;
×
1939
    }
×
1940

1941

1942
    /**
1943
     * Returns the local address currently in use for this connection.
1944
     *
1945
     * @return the local address
1946
     */
1947
    @Override
1948
    public InetAddress getLocalAddress() {
1949
        final Socket socket = this.socket;
×
1950
        if (socket == null) return null;
×
1951

1952
        InetAddress localAddress = socket.getLocalAddress();
×
1953
        if (localAddress.isAnyLocalAddress()) return null;
×
1954

1955
        return localAddress;
×
1956
    }
1957
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc