• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

igniterealtime / Smack / #3231

09 Jul 2024 06:20AM UTC coverage: 38.937% (-0.01%) from 38.947%
#3231

push

github

web-flow
Merge pull request #606 from guusdk/sint_ox-cleanup

[sinttest] XEP-0373 Integration Tests should clean-up

0 of 40 new or added lines in 2 files covered. (0.0%)

4 existing lines in 4 files now uncovered.

16977 of 43601 relevant lines covered (38.94%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.85
/smack-core/src/main/java/org/jivesoftware/smack/SmackReactor.java
1
/**
2
 *
3
 * Copyright 2018-2023 Florian Schmaus
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.jivesoftware.smack;
18

19
import java.io.IOException;
20
import java.nio.channels.CancelledKeyException;
21
import java.nio.channels.ClosedChannelException;
22
import java.nio.channels.SelectableChannel;
23
import java.nio.channels.SelectionKey;
24
import java.nio.channels.Selector;
25
import java.util.ArrayList;
26
import java.util.Collection;
27
import java.util.Collections;
28
import java.util.Date;
29
import java.util.Iterator;
30
import java.util.List;
31
import java.util.Queue;
32
import java.util.Set;
33
import java.util.concurrent.ConcurrentLinkedQueue;
34
import java.util.concurrent.DelayQueue;
35
import java.util.concurrent.Semaphore;
36
import java.util.concurrent.TimeUnit;
37
import java.util.concurrent.atomic.AtomicBoolean;
38
import java.util.concurrent.locks.Lock;
39
import java.util.concurrent.locks.ReentrantLock;
40
import java.util.logging.Level;
41
import java.util.logging.Logger;
42

43
/**
44
 * The SmackReactor for non-blocking I/O.
45
 * <p>
46
 * Highlights include:
47
 * <ul>
48
 * <li>Multiple reactor threads</li>
49
 * <li>Scheduled actions</li>
50
 * </ul>
51
 *
52
 * <pre>
53
 *
54
 *           ) ) )
55
 *        ( ( (
56
 *      ) ) )
57
 *   (~~~~~~~~~)
58
 *    | Smack |
59
 *    |Reactor|
60
 *    I      _._
61
 *    I    /'   `\
62
 *    I   |       |
63
 *    f   |   |~~~~~~~~~~~~~~|
64
 *  .'    |   | #   #   #  # |
65
 * '______|___|___________###|
66
 * </pre>
67
 */
68
public class SmackReactor {
69

70
    private static final Logger LOGGER = Logger.getLogger(SmackReactor.class.getName());
1✔
71

72
    private static final int DEFAULT_REACTOR_THREAD_COUNT = 2;
73

74
    private static final int PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE = 1024;
75

76
    private static SmackReactor INSTANCE;
77

78
    static synchronized SmackReactor getInstance() {
79
        if (INSTANCE == null) {
1✔
80
            INSTANCE = new SmackReactor("DefaultReactor");
1✔
81
        }
82
        return INSTANCE;
1✔
83
    }
84

85
    private final Selector selector;
86
    private final String reactorName;
87

88
    private final List<Reactor> reactorThreads = Collections.synchronizedList(new ArrayList<>());
1✔
89

90
    private final DelayQueue<ScheduledAction> scheduledActions = new DelayQueue<>();
1✔
91

92
    private final Lock registrationLock = new ReentrantLock();
1✔
93

94
    /**
95
     * The semaphore protecting the handling of the actions. Note that it is
96
     * initialized with -1, which basically means that one thread will always do I/O using
97
     * select().
98
     */
99
    private final Semaphore actionsSemaphore = new Semaphore(-1, false);
1✔
100

101
    private final Queue<SelectionKey> pendingSelectionKeys = new ConcurrentLinkedQueue<>();
1✔
102

103
    private final Queue<SetInterestOps> pendingSetInterestOps = new ConcurrentLinkedQueue<>();
1✔
104

105
    SmackReactor(String reactorName) {
1✔
106
        this.reactorName = reactorName;
1✔
107

108
        try {
109
            selector = Selector.open();
1✔
110
        }
111
        catch (IOException e) {
×
112
            throw new IllegalStateException(e);
×
113
        }
1✔
114

115
        setReactorThreadCount(DEFAULT_REACTOR_THREAD_COUNT);
1✔
116
    }
1✔
117

118
    public SelectionKey registerWithSelector(SelectableChannel channel, int ops, ChannelSelectedCallback callback)
119
            throws ClosedChannelException {
120
        SelectionKeyAttachment selectionKeyAttachment = new SelectionKeyAttachment(callback);
×
121

122
        registrationLock.lock();
×
123
        try {
124
            selector.wakeup();
×
125
            return channel.register(selector, ops, selectionKeyAttachment);
×
126
        } finally {
127
            registrationLock.unlock();
×
128
        }
129
    }
130

131
    public void setInterestOps(SelectionKey selectionKey, int interestOps) {
132
        SetInterestOps setInterestOps = new SetInterestOps(selectionKey, interestOps);
×
133
        pendingSetInterestOps.add(setInterestOps);
×
134
        selector.wakeup();
×
135
    }
×
136

137
    private static final class SetInterestOps {
138
        private final SelectionKey selectionKey;
139
        private final int interestOps;
140

141
        private SetInterestOps(SelectionKey selectionKey, int interestOps) {
×
142
            this.selectionKey = selectionKey;
×
143
            this.interestOps = interestOps;
×
144
        }
×
145
    }
146

147
    ScheduledAction schedule(Runnable runnable, long delay, TimeUnit unit, ScheduledAction.Kind scheduledActionKind) {
148
        long releaseTimeEpoch = System.currentTimeMillis() + unit.toMillis(delay);
1✔
149
        Date releaseTimeDate = new Date(releaseTimeEpoch);
1✔
150
        ScheduledAction scheduledAction = new ScheduledAction(runnable, releaseTimeDate, this, scheduledActionKind);
1✔
151
        scheduledActions.add(scheduledAction);
1✔
152
        selector.wakeup();
1✔
153
        return scheduledAction;
1✔
154
    }
155

156
    /**
157
     * Cancels the scheduled action.
158
     *
159
     * @param scheduledAction the scheduled action to cancel.
160
     * @return <code>true</code> if the scheduled action was still pending and got removed, <code>false</code> otherwise.
161
     */
162
    boolean cancel(ScheduledAction scheduledAction) {
163
        return scheduledActions.remove(scheduledAction);
1✔
164
    }
165

166
    private class Reactor extends Thread {
1✔
167

168
        private volatile long shutdownRequestTimestamp = -1;
1✔
169

170
        @Override
171
        public void run() {
172
            try {
173
                reactorLoop();
×
174
            } finally {
175
                if (shutdownRequestTimestamp > 0) {
×
176
                    long shutDownDelay = System.currentTimeMillis() - shutdownRequestTimestamp;
×
177
                    LOGGER.info(this + " shut down after " + shutDownDelay + "ms");
×
178
                } else {
×
179
                    boolean contained = reactorThreads.remove(this);
×
180
                    assert contained;
×
181
                }
182
            }
183
        }
×
184

185
        private void reactorLoop() {
186
            // Loop until reactor shutdown was requested.
187
            while (shutdownRequestTimestamp < 0) {
1✔
188
                handleScheduledActionsOrPerformSelect();
1✔
189

190
                handlePendingSelectionKeys();
1✔
191
            }
192
        }
×
193

194
        @SuppressWarnings("LockNotBeforeTry")
195
        private void handleScheduledActionsOrPerformSelect() {
196
            ScheduledAction dueScheduledAction = null;
1✔
197

198
            boolean permitToHandleScheduledActions = actionsSemaphore.tryAcquire();
1✔
199
            if (permitToHandleScheduledActions) {
1✔
200
                try {
201
                    dueScheduledAction = scheduledActions.poll();
1✔
202
                } finally {
203
                    actionsSemaphore.release();
1✔
204
                }
205
            }
206

207
            if (dueScheduledAction != null) {
1✔
208
                dueScheduledAction.run();
1✔
209
                return;
1✔
210
            }
211

212
            int newSelectedKeysCount = 0;
1✔
213
            List<SelectionKey> selectedKeys;
214
            synchronized (selector) {
1✔
215
                ScheduledAction nextScheduledAction = scheduledActions.peek();
1✔
216

217
                long selectWait;
218
                if (nextScheduledAction == null) {
1✔
219
                    // There is no next scheduled action, wait indefinitely in select() or until another thread invokes
220
                    // selector.wakeup().
221
                    selectWait = 0;
1✔
222
                } else {
223
                    selectWait = nextScheduledAction.getTimeToDueMillis();
1✔
224
                    if (selectWait <= 0) {
1✔
225
                        // A scheduled action was just released and became ready to execute.
UNCOV
226
                        return;
×
227
                    }
228
                }
229

230
                // Before we call select, we handle the pending the interest Ops. This will not block since no other
231
                // thread is currently in select() at this time.
232
                // Note: This was put deliberately before the registration lock. It may cause more synchronization but
233
                // allows for more parallelism.
234
                // Hopefully that assumption is right.
235
                int myHandledPendingSetInterestOps = 0;
1✔
236
                for (SetInterestOps setInterestOps; (setInterestOps = pendingSetInterestOps.poll()) != null;) {
1✔
237
                    setInterestOpsCancelledKeySafe(setInterestOps.selectionKey, setInterestOps.interestOps);
×
238

239
                    if (myHandledPendingSetInterestOps++ >= PENDING_SET_INTEREST_OPS_MAX_BATCH_SIZE) {
×
240
                        // This thread has handled enough "set pending interest ops" requests. Wakeup another one to
241
                        // handle the remaining (if any).
242
                        selector.wakeup();
×
243
                        break;
×
244
                    }
245
                }
246

247
                // Ensure that a wakeup() in registerWithSelector() gives the corresponding
248
                // register() in the same method the chance to actually register the channel. In
249
                // other words: This construct ensures that there is never another select()
250
                // between a corresponding wakeup() and register() calls.
251
                // See also https://stackoverflow.com/a/1112809/194894
252
                registrationLock.lock();
1✔
253
                registrationLock.unlock();
1✔
254

255
                try {
256
                    newSelectedKeysCount = selector.select(selectWait);
1✔
257
                } catch (IOException e) {
×
258
                    LOGGER.log(Level.SEVERE, "IOException while using select()", e);
×
259
                    return;
×
260
                }
1✔
261

262
                if (newSelectedKeysCount == 0) {
1✔
263
                    return;
1✔
264
                }
265

266
                // Copy the selected-key set over to selectedKeys, remove the keys from the
267
                // selected key set and loose interest of the key OPs for the time being.
268
                // Note that we perform this operation in two steps in order to maximize the
269
                // timespan setRacing() is set.
270
                Set<SelectionKey> selectedKeySet = selector.selectedKeys();
×
271
                for (SelectionKey selectionKey : selectedKeySet) {
×
272
                    SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
×
273
                    selectionKeyAttachment.setRacing();
×
274
                }
×
275
                for (SelectionKey selectionKey : selectedKeySet) {
×
276
                    setInterestOpsCancelledKeySafe(selectionKey, 0);
×
277
                }
×
278

279
                selectedKeys = new ArrayList<>(selectedKeySet);
×
280
                selectedKeySet.clear();
×
281
            }
×
282

283
            int selectedKeysCount = selectedKeys.size();
×
284
            int currentReactorThreadCount = reactorThreads.size();
×
285
            int myKeyCount;
286
            if (selectedKeysCount > currentReactorThreadCount) {
×
287
                myKeyCount = selectedKeysCount / currentReactorThreadCount;
×
288
            } else {
289
                myKeyCount = selectedKeysCount;
×
290
            }
291

292
            final Level reactorSelectStatsLogLevel = Level.FINE;
×
293
            if (LOGGER.isLoggable(reactorSelectStatsLogLevel)) {
×
294
                LOGGER.log(reactorSelectStatsLogLevel,
×
295
                                "New selected key count: " + newSelectedKeysCount
296
                                + ". Total selected key count " + selectedKeysCount
297
                                + ". My key count: " + myKeyCount
298
                                + ". Current reactor thread count: " + currentReactorThreadCount);
299
            }
300

301
            Collection<SelectionKey> mySelectedKeys = new ArrayList<>(myKeyCount);
×
302
            Iterator<SelectionKey> it = selectedKeys.iterator();
×
303
            for (int i = 0; i < myKeyCount; i++) {
×
304
                SelectionKey selectionKey = it.next();
×
305
                mySelectedKeys.add(selectionKey);
×
306
            }
307
            while (it.hasNext()) {
×
308
                // Drain to pendingSelectionKeys.
309
                SelectionKey selectionKey = it.next();
×
310
                pendingSelectionKeys.add(selectionKey);
×
311
            }
×
312

313
            if (selectedKeysCount - myKeyCount > 0) {
×
314
                // There where pending selection keys: Wakeup another reactor thread to handle them.
315
                selector.wakeup();
×
316
            }
317

318
            handleSelectedKeys(mySelectedKeys);
×
319
        }
×
320

321
        private void handlePendingSelectionKeys() {
322
            final int pendingSelectionKeysSize = pendingSelectionKeys.size();
1✔
323
            if (pendingSelectionKeysSize == 0) {
1✔
324
                return;
1✔
325
            }
326

327
            int currentReactorThreadCount = reactorThreads.size();
×
328
            int myKeyCount = pendingSelectionKeysSize / currentReactorThreadCount;
×
329
            // The division could result in myKeyCount being zero, even though there are pending selection keys.
330
            // Therefore, ensure that this thread tries to get at least one pending selection key by invoking poll().
331
            // Otherwise, it could happen that we end up in a busy loop, where myKeyCount is zero and this thread invokes
332
            // selector.wakeup() below because pendingSelectionsKeys is not empty, but the woken up reactor thread wil
333
            // end up with myKeyCount being zero again, restarting the busy-loop cycle.
334
            if (myKeyCount == 0) myKeyCount = 1;
×
335
            Collection<SelectionKey> selectedKeys = new ArrayList<>(myKeyCount);
×
336
            for (int i = 0; i < myKeyCount; i++) {
×
337
                SelectionKey selectionKey = pendingSelectionKeys.poll();
×
338
                if (selectionKey == null) {
×
339
                    // We lost a race and can abort here since the pendingSelectionKeys queue is empty.
340
                    break;
×
341
                }
342
                selectedKeys.add(selectionKey);
×
343
            }
344

345
            if (!pendingSelectionKeys.isEmpty()) {
×
346
                // There are more pending selection keys, wakeup a thread blocked in select() to handle them.
347
                selector.wakeup();
×
348
            }
349

350
            handleSelectedKeys(selectedKeys);
×
351
        }
×
352

353
        private void setInterestOpsCancelledKeySafe(SelectionKey selectionKey, int interestOps) {
354
            try {
355
                selectionKey.interestOps(interestOps);
×
356
            }
357
            catch (CancelledKeyException e) {
×
358
                final Level keyCancelledLogLevel = Level.FINER;
×
359
                if (LOGGER.isLoggable(keyCancelledLogLevel)) {
×
360
                    LOGGER.log(keyCancelledLogLevel, "Key '" + selectionKey + "' has been cancelled", e);
×
361
                }
362
            }
×
363
        }
×
364

365
        void requestShutdown() {
366
            shutdownRequestTimestamp = System.currentTimeMillis();
×
367
        }
×
368
    }
369

370
    private static void handleSelectedKeys(Collection<SelectionKey> selectedKeys) {
371
        for (SelectionKey selectionKey : selectedKeys) {
×
372
            SelectableChannel channel = selectionKey.channel();
×
373
            SelectionKeyAttachment selectionKeyAttachment = (SelectionKeyAttachment) selectionKey.attachment();
×
374
            ChannelSelectedCallback channelSelectedCallback = selectionKeyAttachment.channelSelectedCallback;
×
375
            channelSelectedCallback.onChannelSelected(channel, selectionKey);
×
376
        }
×
377
    }
×
378

379
    public interface ChannelSelectedCallback {
380
        void onChannelSelected(SelectableChannel channel, SelectionKey selectionKey);
381
    }
382

383
    public void setReactorThreadCount(int reactorThreadCount) {
384
        if (reactorThreadCount < 2) {
1✔
385
            throw new IllegalArgumentException("Must have at least two reactor threads, but you requested " + reactorThreadCount);
×
386
        }
387

388
        synchronized (reactorThreads) {
1✔
389
            int deltaThreads = reactorThreadCount - reactorThreads.size();
1✔
390
            if (deltaThreads > 0) {
1✔
391
                // Start new reactor thread. Note that we start the threads before we increase the permits of the
392
                // actionsSemaphore.
393
                for (int i = 0; i < deltaThreads; i++) {
1✔
394
                    Reactor reactor = new Reactor();
1✔
395
                    reactor.setDaemon(true);
1✔
396
                    reactor.setName("Smack " + reactorName + " Thread #" + i);
1✔
397
                    reactorThreads.add(reactor);
1✔
398
                    reactor.start();
1✔
399
                }
400

401
                actionsSemaphore.release(deltaThreads);
1✔
402
            } else {
403
                // Stop existing reactor threads. First we change the sign of deltaThreads, then we decrease the permits
404
                // of the actionsSemaphore *before* we signal the selected reactor threads that they should shut down.
405
                deltaThreads -= deltaThreads;
×
406

407
                for (int i = deltaThreads - 1; i > 0; i--) {
×
408
                    // Note that this could potentially block forever, starving on the unfair semaphore.
409
                    actionsSemaphore.acquireUninterruptibly();
×
410
                }
411

412
                for (int i = deltaThreads - 1; i > 0; i--) {
×
413
                    Reactor reactor = reactorThreads.remove(i);
×
414
                    reactor.requestShutdown();
×
415
                }
416

417
                selector.wakeup();
×
418
            }
419
        }
1✔
420
    }
1✔
421

422
    public static final class SelectionKeyAttachment {
423
        private final ChannelSelectedCallback channelSelectedCallback;
424
        private final AtomicBoolean reactorThreadRacing = new AtomicBoolean();
×
425

426
        private SelectionKeyAttachment(ChannelSelectedCallback channelSelectedCallback) {
×
427
            this.channelSelectedCallback = channelSelectedCallback;
×
428
        }
×
429

430
        private void setRacing() {
431
            // We use lazySet here since it is sufficient if the value does not become visible immediately.
432
            reactorThreadRacing.lazySet(true);
×
433
        }
×
434

435
        public void resetReactorThreadRacing() {
436
            reactorThreadRacing.set(false);
×
437
        }
×
438

439
        public boolean isReactorThreadRacing() {
440
            return reactorThreadRacing.get();
×
441
        }
442

443
    }
444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc