• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apache / rocketmq / 7926

pending completion
7926

push

travis-ci-com

GitHub
[ISSUE #5965] Fix lmqTopicQueueTable initialization (#5967)

10 of 10 new or added lines in 2 files covered. (100.0%)

22455 of 43079 relevant lines covered (52.13%)

1.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.37
/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one or more
3
 * contributor license agreements.  See the NOTICE file distributed with
4
 * this work for additional information regarding copyright ownership.
5
 * The ASF licenses this file to You under the Apache License, Version 2.0
6
 * (the "License"); you may not use this file except in compliance with
7
 * the License.  You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 */
17
package org.apache.rocketmq.store;
18

19
import io.netty.buffer.ByteBuf;
20
import io.netty.buffer.ByteBufAllocator;
21
import io.netty.buffer.UnpooledByteBufAllocator;
22
import org.apache.rocketmq.common.MixAll;
23
import org.apache.rocketmq.common.ServiceThread;
24
import org.apache.rocketmq.common.UtilAll;
25
import org.apache.rocketmq.common.constant.LoggerName;
26
import org.apache.rocketmq.common.message.MessageAccessor;
27
import org.apache.rocketmq.common.message.MessageConst;
28
import org.apache.rocketmq.common.message.MessageDecoder;
29
import org.apache.rocketmq.common.message.MessageExt;
30
import org.apache.rocketmq.common.message.MessageExtBatch;
31
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
32
import org.apache.rocketmq.common.topic.TopicValidator;
33
import org.apache.rocketmq.logging.InternalLogger;
34
import org.apache.rocketmq.logging.InternalLoggerFactory;
35
import org.apache.rocketmq.store.config.BrokerRole;
36
import org.apache.rocketmq.store.config.FlushDiskType;
37
import org.apache.rocketmq.store.config.MessageStoreConfig;
38
import org.apache.rocketmq.store.ha.HAService;
39
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
40

41
import java.net.Inet6Address;
42
import java.net.InetSocketAddress;
43
import java.nio.ByteBuffer;
44
import java.util.Collections;
45
import java.util.HashMap;
46
import java.util.LinkedList;
47
import java.util.List;
48
import java.util.Map;
49
import java.util.Set;
50
import java.util.concurrent.CompletableFuture;
51
import java.util.concurrent.ConcurrentHashMap;
52
import java.util.function.Supplier;
53

54
/**
55
 * Store all metadata downtime for recovery, data protection reliability
56
 */
57
public class CommitLog {
58
    // Message's MAGIC CODE daa320a7
59
    public final static int MESSAGE_MAGIC_CODE = -626843481;
60
    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
2✔
61
    // End of file empty MAGIC CODE cbd43194
62
    protected final static int BLANK_MAGIC_CODE = -875286124;
63
    protected final MappedFileQueue mappedFileQueue;
64
    protected final DefaultMessageStore defaultMessageStore;
65
    private final FlushCommitLogService flushCommitLogService;
66

67
    //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
68
    private final FlushCommitLogService commitLogService;
69

70
    private final AppendMessageCallback appendMessageCallback;
71
    private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
72
    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
2✔
73
    protected Map<String/* topic-queueid */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024);
2✔
74
    protected volatile long confirmOffset = -1L;
2✔
75

76
    private volatile long beginTimeInLock = 0;
2✔
77

78
    protected final PutMessageLock putMessageLock;
79

80
    private volatile Set<String> fullStorePaths = Collections.emptySet();
2✔
81

82
    protected final MultiDispatch multiDispatch;
83
    private final FlushDiskWatcher flushDiskWatcher;
84

85
    public CommitLog(final DefaultMessageStore defaultMessageStore) {
2✔
86
        String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
2✔
87
        if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
2✔
88
            this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
2✔
89
                    defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
2✔
90
                    defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
2✔
91
        } else {
92
            this.mappedFileQueue = new MappedFileQueue(storePath,
2✔
93
                    defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
2✔
94
                    defaultMessageStore.getAllocateMappedFileService());
2✔
95
        }
96

97
        this.defaultMessageStore = defaultMessageStore;
2✔
98

99
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
2✔
100
            this.flushCommitLogService = new GroupCommitService();
2✔
101
        } else {
102
            this.flushCommitLogService = new FlushRealTimeService();
2✔
103
        }
104

105
        this.commitLogService = new CommitRealTimeService();
2✔
106

107
        this.appendMessageCallback = new DefaultAppendMessageCallback();
2✔
108
        putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
2✔
109
            @Override
110
            protected PutMessageThreadLocal initialValue() {
111
                return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
2✔
112
            }
113
        };
114
        this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
2✔
115

116
        this.multiDispatch = new MultiDispatch(defaultMessageStore, this);
2✔
117

118
        flushDiskWatcher = new FlushDiskWatcher();
2✔
119
    }
2✔
120

121
    public void setFullStorePaths(Set<String> fullStorePaths) {
122
        this.fullStorePaths = fullStorePaths;
2✔
123
    }
2✔
124

125
    public Set<String> getFullStorePaths() {
126
        return fullStorePaths;
2✔
127
    }
128

129
    public ThreadLocal<PutMessageThreadLocal> getPutMessageThreadLocal() {
130
        return putMessageThreadLocal;
2✔
131
    }
132

133
    public boolean load() {
134
        boolean result = this.mappedFileQueue.load();
2✔
135
        log.info("load commit log " + (result ? "OK" : "Failed"));
2✔
136
        return result;
2✔
137
    }
138

139
    public void start() {
140
        this.flushCommitLogService.start();
2✔
141

142
        flushDiskWatcher.setDaemon(true);
2✔
143
        flushDiskWatcher.start();
2✔
144

145

146
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
2✔
147
            this.commitLogService.start();
×
148
        }
149
    }
2✔
150

151
    public void shutdown() {
152
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
2✔
153
            this.commitLogService.shutdown();
×
154
        }
155

156
        this.flushCommitLogService.shutdown();
2✔
157

158
        flushDiskWatcher.shutdown(true);
2✔
159
    }
2✔
160

161
    public long flush() {
162
        this.mappedFileQueue.commit(0);
×
163
        this.mappedFileQueue.flush(0);
×
164
        return this.mappedFileQueue.getFlushedWhere();
×
165
    }
166

167
    public long getMaxOffset() {
168
        return this.mappedFileQueue.getMaxOffset();
2✔
169
    }
170

171
    public long remainHowManyDataToCommit() {
172
        return this.mappedFileQueue.remainHowManyDataToCommit();
×
173
    }
174

175
    public long remainHowManyDataToFlush() {
176
        return this.mappedFileQueue.remainHowManyDataToFlush();
×
177
    }
178

179
    public int deleteExpiredFile(
180
        final long expiredTime,
181
        final int deleteFilesInterval,
182
        final long intervalForcibly,
183
        final boolean cleanImmediately
184
    ) {
185
        return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
2✔
186
    }
187

188
    /**
189
     * Read CommitLog data, use data replication
190
     */
191
    public SelectMappedBufferResult getData(final long offset) {
192
        return this.getData(offset, offset == 0);
2✔
193
    }
194

195
    public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
196
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
2✔
197
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
2✔
198
        if (mappedFile != null) {
2✔
199
            int pos = (int) (offset % mappedFileSize);
2✔
200
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
2✔
201
            return result;
2✔
202
        }
203

204
        return null;
2✔
205
    }
206

207
    /**
208
     * When the normal exit, data recovery, all memory data have been flush
209
     */
210
    public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
211
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
2✔
212
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
2✔
213
        if (!mappedFiles.isEmpty()) {
2✔
214
            // Began to recover from the last third file
215
            int index = mappedFiles.size() - 3;
2✔
216
            if (index < 0)
2✔
217
                index = 0;
2✔
218

219
            MappedFile mappedFile = mappedFiles.get(index);
2✔
220
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
2✔
221
            long processOffset = mappedFile.getFileFromOffset();
2✔
222
            long mappedFileOffset = 0;
2✔
223
            while (true) {
224
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
2✔
225
                int size = dispatchRequest.getMsgSize();
2✔
226
                // Normal data
227
                if (dispatchRequest.isSuccess() && size > 0) {
2✔
228
                    mappedFileOffset += size;
2✔
229
                }
230
                // Come the end of the file, switch to the next file Since the
231
                // return 0 representatives met last hole,
232
                // this can not be included in truncate offset
233
                else if (dispatchRequest.isSuccess() && size == 0) {
2✔
234
                    index++;
2✔
235
                    if (index >= mappedFiles.size()) {
2✔
236
                        // Current branch can not happen
237
                        log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
×
238
                        break;
×
239
                    } else {
240
                        mappedFile = mappedFiles.get(index);
2✔
241
                        byteBuffer = mappedFile.sliceByteBuffer();
2✔
242
                        processOffset = mappedFile.getFileFromOffset();
2✔
243
                        mappedFileOffset = 0;
2✔
244
                        log.info("recover next physics file, " + mappedFile.getFileName());
2✔
245
                    }
246
                }
247
                // Intermediate file read error
248
                else if (!dispatchRequest.isSuccess()) {
2✔
249
                    log.info("recover physics file end, " + mappedFile.getFileName());
2✔
250
                    break;
2✔
251
                }
252
            }
2✔
253

254
            processOffset += mappedFileOffset;
2✔
255
            this.mappedFileQueue.setFlushedWhere(processOffset);
2✔
256
            this.mappedFileQueue.setCommittedWhere(processOffset);
2✔
257
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
2✔
258

259
            // Clear ConsumeQueue redundant data
260
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
2✔
261
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
2✔
262
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
2✔
263
            }
264
        } else {
2✔
265
            // Commitlog case files are deleted
266
            log.warn("The commitlog files are deleted, and delete the consume queue files");
2✔
267
            this.mappedFileQueue.setFlushedWhere(0);
2✔
268
            this.mappedFileQueue.setCommittedWhere(0);
2✔
269
            this.defaultMessageStore.destroyLogics();
2✔
270
        }
271
    }
2✔
272

273
    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC) {
274
        return this.checkMessageAndReturnSize(byteBuffer, checkCRC, true);
2✔
275
    }
276

277
    private void doNothingForDeadCode(final Object obj) {
278
        if (obj != null) {
2✔
279
            log.debug(String.valueOf(obj.hashCode()));
2✔
280
        }
281
    }
2✔
282

283
    /**
284
     * check the message and returns the message size
285
     *
286
     * @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
287
     */
288
    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
289
        final boolean readBody) {
290
        try {
291
            // 1 TOTAL SIZE
292
            int totalSize = byteBuffer.getInt();
2✔
293

294
            // 2 MAGIC CODE
295
            int magicCode = byteBuffer.getInt();
2✔
296
            switch (magicCode) {
2✔
297
                case MESSAGE_MAGIC_CODE:
298
                    break;
2✔
299
                case BLANK_MAGIC_CODE:
300
                    return new DispatchRequest(0, true /* success */);
2✔
301
                default:
302
                    log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
2✔
303
                    return new DispatchRequest(-1, false /* success */);
2✔
304
            }
305

306
            byte[] bytesContent = new byte[totalSize];
2✔
307

308
            int bodyCRC = byteBuffer.getInt();
2✔
309

310
            int queueId = byteBuffer.getInt();
2✔
311

312
            int flag = byteBuffer.getInt();
2✔
313

314
            long queueOffset = byteBuffer.getLong();
2✔
315

316
            long physicOffset = byteBuffer.getLong();
2✔
317

318
            int sysFlag = byteBuffer.getInt();
2✔
319

320
            long bornTimeStamp = byteBuffer.getLong();
2✔
321

322
            ByteBuffer byteBuffer1;
323
            if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
2✔
324
                byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);
2✔
325
            } else {
326
                byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);
2✔
327
            }
328

329
            long storeTimestamp = byteBuffer.getLong();
2✔
330

331
            ByteBuffer byteBuffer2;
332
            if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
2✔
333
                byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);
2✔
334
            } else {
335
                byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);
2✔
336
            }
337

338
            int reconsumeTimes = byteBuffer.getInt();
2✔
339

340
            long preparedTransactionOffset = byteBuffer.getLong();
2✔
341

342
            int bodyLen = byteBuffer.getInt();
2✔
343
            if (bodyLen > 0) {
2✔
344
                if (readBody) {
2✔
345
                    byteBuffer.get(bytesContent, 0, bodyLen);
2✔
346

347
                    if (checkCRC) {
2✔
348
                        int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
2✔
349
                        if (crc != bodyCRC) {
2✔
350
                            log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
×
351
                            return new DispatchRequest(-1, false/* success */);
×
352
                        }
353
                    }
2✔
354
                } else {
355
                    byteBuffer.position(byteBuffer.position() + bodyLen);
2✔
356
                }
357
            }
358

359
            byte topicLen = byteBuffer.get();
2✔
360
            byteBuffer.get(bytesContent, 0, topicLen);
2✔
361
            String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
2✔
362

363
            long tagsCode = 0;
2✔
364
            String keys = "";
2✔
365
            String uniqKey = null;
2✔
366

367
            short propertiesLength = byteBuffer.getShort();
2✔
368
            Map<String, String> propertiesMap = null;
2✔
369
            if (propertiesLength > 0) {
2✔
370
                byteBuffer.get(bytesContent, 0, propertiesLength);
2✔
371
                String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
2✔
372
                propertiesMap = MessageDecoder.string2messageProperties(properties);
2✔
373

374
                keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
2✔
375

376
                uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
2✔
377

378
                String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
2✔
379
                if (tags != null && tags.length() > 0) {
2✔
380
                    tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
2✔
381
                }
382

383
                // Timing message processing
384
                {
385
                    String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
2✔
386
                    if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
2✔
387
                        int delayLevel = Integer.parseInt(t);
2✔
388

389
                        if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
2✔
390
                            delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
×
391
                        }
392

393
                        if (delayLevel > 0) {
2✔
394
                            tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
2✔
395
                                storeTimestamp);
396
                        }
397
                    }
398
                }
399
            }
400

401
            int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
2✔
402
            if (totalSize != readLength) {
2✔
403
                doNothingForDeadCode(reconsumeTimes);
2✔
404
                doNothingForDeadCode(flag);
2✔
405
                doNothingForDeadCode(bornTimeStamp);
2✔
406
                doNothingForDeadCode(byteBuffer1);
2✔
407
                doNothingForDeadCode(byteBuffer2);
2✔
408
                log.error(
2✔
409
                    "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
410
                    totalSize, readLength, bodyLen, topicLen, propertiesLength);
2✔
411
                return new DispatchRequest(totalSize, false/* success */);
2✔
412
            }
413

414
            return new DispatchRequest(
2✔
415
                topic,
416
                queueId,
417
                physicOffset,
418
                totalSize,
419
                tagsCode,
420
                storeTimestamp,
421
                queueOffset,
422
                keys,
423
                uniqKey,
424
                sysFlag,
425
                preparedTransactionOffset,
426
                propertiesMap
427
            );
428
        } catch (Exception e) {
×
429
        }
430

431
        return new DispatchRequest(-1, false /* success */);
×
432
    }
433

434
    protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {
435
        int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
2✔
436
        int storehostAddressLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
2✔
437
        final int msgLen = 4 //TOTALSIZE
2✔
438
            + 4 //MAGICCODE
439
            + 4 //BODYCRC
440
            + 4 //QUEUEID
441
            + 4 //FLAG
442
            + 8 //QUEUEOFFSET
443
            + 8 //PHYSICALOFFSET
444
            + 4 //SYSFLAG
445
            + 8 //BORNTIMESTAMP
446
            + bornhostLength //BORNHOST
447
            + 8 //STORETIMESTAMP
448
            + storehostAddressLength //STOREHOSTADDRESS
449
            + 4 //RECONSUMETIMES
450
            + 8 //Prepared Transaction Offset
451
            + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
452
            + 1 + topicLength //TOPIC
453
            + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
454
            + 0;
455
        return msgLen;
2✔
456
    }
457

458
    public long getConfirmOffset() {
459
        return this.confirmOffset;
2✔
460
    }
461

462
    public void setConfirmOffset(long phyOffset) {
463
        this.confirmOffset = phyOffset;
×
464
    }
×
465

466
    @Deprecated
467
    public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
468
        // recover by the minimum time stamp
469
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
2✔
470
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
2✔
471
        if (!mappedFiles.isEmpty()) {
2✔
472
            // Looking beginning to recover from which file
473
            int index = mappedFiles.size() - 1;
2✔
474
            MappedFile mappedFile = null;
2✔
475
            for (; index >= 0; index--) {
2✔
476
                mappedFile = mappedFiles.get(index);
2✔
477
                if (this.isMappedFileMatchedRecover(mappedFile)) {
2✔
478
                    log.info("recover from this mapped file " + mappedFile.getFileName());
×
479
                    break;
×
480
                }
481
            }
482

483
            if (index < 0) {
2✔
484
                index = 0;
2✔
485
                mappedFile = mappedFiles.get(index);
2✔
486
            }
487

488
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
2✔
489
            long processOffset = mappedFile.getFileFromOffset();
2✔
490
            long mappedFileOffset = 0;
2✔
491
            while (true) {
492
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
2✔
493
                int size = dispatchRequest.getMsgSize();
2✔
494

495
                if (dispatchRequest.isSuccess()) {
2✔
496
                    // Normal data
497
                    if (size > 0) {
2✔
498
                        mappedFileOffset += size;
2✔
499

500
                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
2✔
501
                            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
×
502
                                this.defaultMessageStore.doDispatch(dispatchRequest);
×
503
                            }
504
                        } else {
505
                            this.defaultMessageStore.doDispatch(dispatchRequest);
2✔
506
                        }
507
                    }
508
                    // Come the end of the file, switch to the next file
509
                    // Since the return 0 representatives met last hole, this can
510
                    // not be included in truncate offset
511
                    else if (size == 0) {
2✔
512
                        index++;
2✔
513
                        if (index >= mappedFiles.size()) {
2✔
514
                            // The current branch under normal circumstances should
515
                            // not happen
516
                            log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
×
517
                            break;
×
518
                        } else {
519
                            mappedFile = mappedFiles.get(index);
2✔
520
                            byteBuffer = mappedFile.sliceByteBuffer();
2✔
521
                            processOffset = mappedFile.getFileFromOffset();
2✔
522
                            mappedFileOffset = 0;
2✔
523
                            log.info("recover next physics file, " + mappedFile.getFileName());
2✔
524
                        }
525
                    }
526
                } else {
527
                    log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
2✔
528
                    break;
2✔
529
                }
530
            }
2✔
531

532
            processOffset += mappedFileOffset;
2✔
533
            this.mappedFileQueue.setFlushedWhere(processOffset);
2✔
534
            this.mappedFileQueue.setCommittedWhere(processOffset);
2✔
535
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
2✔
536

537
            // Clear ConsumeQueue redundant data
538
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
2✔
539
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
2✔
540
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
2✔
541
            }
542
        }
2✔
543
        // Commitlog case files are deleted
544
        else {
545
            log.warn("The commitlog files are deleted, and delete the consume queue files");
2✔
546
            this.mappedFileQueue.setFlushedWhere(0);
2✔
547
            this.mappedFileQueue.setCommittedWhere(0);
2✔
548
            this.defaultMessageStore.destroyLogics();
2✔
549
        }
550
    }
2✔
551

552
    private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
553
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
2✔
554

555
        int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
2✔
556
        if (magicCode != MESSAGE_MAGIC_CODE) {
2✔
557
            return false;
×
558
        }
559

560
        int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
2✔
561
        int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
2✔
562
        int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength;
2✔
563
        long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
2✔
564
        if (0 == storeTimestamp) {
2✔
565
            return false;
×
566
        }
567

568
        if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
2✔
569
            && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
2✔
570
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
×
571
                log.info("find check timestamp, {} {}",
×
572
                    storeTimestamp,
×
573
                    UtilAll.timeMillisToHumanString(storeTimestamp));
×
574
                return true;
×
575
            }
576
        } else {
577
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
2✔
578
                log.info("find check timestamp, {} {}",
×
579
                    storeTimestamp,
×
580
                    UtilAll.timeMillisToHumanString(storeTimestamp));
×
581
                return true;
×
582
            }
583
        }
584

585
        return false;
2✔
586
    }
587

588
    private void notifyMessageArriving() {
589

590
    }
×
591

592
    public boolean resetOffset(long offset) {
593
        return this.mappedFileQueue.resetOffset(offset);
×
594
    }
595

596
    public long getBeginTimeInLock() {
597
        return beginTimeInLock;
2✔
598
    }
599

600
    private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) {
601
        keyBuilder.setLength(0);
2✔
602
        keyBuilder.append(messageExt.getTopic());
2✔
603
        keyBuilder.append('-');
2✔
604
        keyBuilder.append(messageExt.getQueueId());
2✔
605
        return keyBuilder.toString();
2✔
606
    }
607

608
    public void updateMaxMessageSize(PutMessageThreadLocal putMessageThreadLocal) {
609
        // dynamically adjust maxMessageSize, but not support DLedger mode temporarily
610
        int newMaxMessageSize = this.defaultMessageStore.getMessageStoreConfig().getMaxMessageSize();
2✔
611
        if (newMaxMessageSize >= 10 &&
2✔
612
                putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
2✔
613
            putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
2✔
614
        }
615
    }
2✔
616

617
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
618
        // Set the storage time
619
        msg.setStoreTimestamp(System.currentTimeMillis());
2✔
620
        // Set the message body BODY CRC (consider the most appropriate setting
621
        // on the client)
622
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
2✔
623
        // Back to Results
624
        AppendMessageResult result = null;
2✔
625

626
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
2✔
627

628
        String topic = msg.getTopic();
2✔
629
//        int queueId msg.getQueueId();
630
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
2✔
631
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
2✔
632
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
633
            // Delay Delivery
634
            if (msg.getDelayTimeLevel() > 0) {
2✔
635
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
2✔
636
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
×
637
                }
638

639
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
2✔
640
                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
2✔
641

642
                // Backup real topic, queueId
643
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
2✔
644
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
2✔
645
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
2✔
646

647
                msg.setTopic(topic);
2✔
648
                msg.setQueueId(queueId);
2✔
649
            }
650
        }
651

652
        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
2✔
653
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
2✔
654
            msg.setBornHostV6Flag();
2✔
655
        }
656

657
        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
2✔
658
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
2✔
659
            msg.setStoreHostAddressV6Flag();
2✔
660
        }
661

662
        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
2✔
663
        updateMaxMessageSize(putMessageThreadLocal);
2✔
664
        if (!multiDispatch.isMultiDispatchMsg(msg)) {
2✔
665
            PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
2✔
666
            if (encodeResult != null) {
2✔
667
                return CompletableFuture.completedFuture(encodeResult);
2✔
668
            }
669
            msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
2✔
670
        }
671
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
2✔
672

673
        long elapsedTimeInLock = 0;
2✔
674
        MappedFile unlockMappedFile = null;
2✔
675

676
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
2✔
677
        try {
678
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
2✔
679
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
2✔
680
            this.beginTimeInLock = beginLockTimestamp;
2✔
681

682
            // Here settings are stored timestamp, in order to ensure an orderly
683
            // global
684
            msg.setStoreTimestamp(beginLockTimestamp);
2✔
685

686
            if (null == mappedFile || mappedFile.isFull()) {
2✔
687
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
2✔
688
            }
689
            if (null == mappedFile) {
2✔
690
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
×
691
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
×
692
            }
693

694
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
2✔
695
            switch (result.getStatus()) {
2✔
696
                case PUT_OK:
697
                    break;
2✔
698
                case END_OF_FILE:
699
                    unlockMappedFile = mappedFile;
2✔
700
                    // Create a new file, re-write the message
701
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
2✔
702
                    if (null == mappedFile) {
2✔
703
                        // XXX: warn and notify me
704
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
×
705
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
×
706
                    }
707
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
2✔
708
                    break;
2✔
709
                case MESSAGE_SIZE_EXCEEDED:
710
                case PROPERTIES_SIZE_EXCEEDED:
711
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
×
712
                case UNKNOWN_ERROR:
713
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
×
714
                default:
715
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
×
716
            }
717

718
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
2✔
719
        } finally {
720
            beginTimeInLock = 0;
2✔
721
            putMessageLock.unlock();
2✔
722
        }
723

724
        if (elapsedTimeInLock > 500) {
2✔
725
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
×
726
        }
727

728
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
2✔
729
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
×
730
        }
731

732
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
2✔
733

734
        // Statistics
735
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
2✔
736
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
2✔
737

738
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
2✔
739
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
2✔
740
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
2✔
741
            if (flushStatus != PutMessageStatus.PUT_OK) {
2✔
742
                putMessageResult.setPutMessageStatus(flushStatus);
×
743
            }
744
            if (replicaStatus != PutMessageStatus.PUT_OK) {
2✔
745
                putMessageResult.setPutMessageStatus(replicaStatus);
2✔
746
            }
747
            return putMessageResult;
2✔
748
        });
749
    }
750

751
    public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
752
        messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
2✔
753
        AppendMessageResult result;
754

755
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
2✔
756

757
        final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
2✔
758

759
        if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
2✔
760
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
×
761
        }
762
        if (messageExtBatch.getDelayTimeLevel() > 0) {
2✔
763
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
×
764
        }
765

766
        InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost();
2✔
767
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
2✔
768
            messageExtBatch.setBornHostV6Flag();
2✔
769
        }
770

771
        InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost();
2✔
772
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
2✔
773
            messageExtBatch.setStoreHostAddressV6Flag();
2✔
774
        }
775

776
        long elapsedTimeInLock = 0;
2✔
777
        MappedFile unlockMappedFile = null;
2✔
778
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
2✔
779

780
        //fine-grained lock instead of the coarse-grained
781
        PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
2✔
782
        updateMaxMessageSize(pmThreadLocal);
2✔
783
        MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
2✔
784

785
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
2✔
786
        messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext));
2✔
787

788
        putMessageLock.lock();
2✔
789
        try {
790
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
2✔
791
            this.beginTimeInLock = beginLockTimestamp;
2✔
792

793
            // Here settings are stored timestamp, in order to ensure an orderly
794
            // global
795
            messageExtBatch.setStoreTimestamp(beginLockTimestamp);
2✔
796

797
            if (null == mappedFile || mappedFile.isFull()) {
2✔
798
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
2✔
799
            }
800
            if (null == mappedFile) {
2✔
801
                log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
×
802
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
×
803
            }
804

805
            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
2✔
806
            switch (result.getStatus()) {
2✔
807
                case PUT_OK:
808
                    break;
2✔
809
                case END_OF_FILE:
810
                    unlockMappedFile = mappedFile;
×
811
                    // Create a new file, re-write the message
812
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
×
813
                    if (null == mappedFile) {
×
814
                        // XXX: warn and notify me
815
                        log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
×
816
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
×
817
                    }
818
                    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
×
819
                    break;
×
820
                case MESSAGE_SIZE_EXCEEDED:
821
                case PROPERTIES_SIZE_EXCEEDED:
822
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
×
823
                case UNKNOWN_ERROR:
824
                default:
825
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
×
826
            }
827

828
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
2✔
829
        } finally {
830
            beginTimeInLock = 0;
2✔
831
            putMessageLock.unlock();
2✔
832
        }
833

834
        if (elapsedTimeInLock > 500) {
2✔
835
            log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
×
836
        }
837

838
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
2✔
839
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
×
840
        }
841

842
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
2✔
843

844
        // Statistics
845
        storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
2✔
846
        storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());
2✔
847

848
        CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
2✔
849
        CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
2✔
850
        return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
2✔
851
            if (flushStatus != PutMessageStatus.PUT_OK) {
2✔
852
                putMessageResult.setPutMessageStatus(flushStatus);
×
853
            }
854
            if (replicaStatus != PutMessageStatus.PUT_OK) {
2✔
855
                putMessageResult.setPutMessageStatus(replicaStatus);
×
856
            }
857
            return putMessageResult;
2✔
858
        });
859

860
    }
861

862
    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
863
        // Synchronization flush
864
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
2✔
865
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
2✔
866
            if (messageExt.isWaitStoreMsgOK()) {
2✔
867
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
2✔
868
                        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
2✔
869
                flushDiskWatcher.add(request);
2✔
870
                service.putRequest(request);
2✔
871
                return request.future();
2✔
872
            } else {
873
                service.wakeup();
×
874
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
×
875
            }
876
        }
877
        // Asynchronous flush
878
        else {
879
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
2✔
880
                flushCommitLogService.wakeup();
2✔
881
            } else  {
882
                commitLogService.wakeup();
×
883
            }
884
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
2✔
885
        }
886
    }
887

888
    public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
889
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
2✔
890
            HAService service = this.defaultMessageStore.getHaService();
2✔
891
            if (messageExt.isWaitStoreMsgOK()) {
2✔
892
                if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
2✔
893
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
2✔
894
                            this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
2✔
895
                    service.putRequest(request);
2✔
896
                    service.getWaitNotifyObject().wakeupAll();
2✔
897
                    return request.future();
2✔
898
                }
899
                else {
900
                    return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
2✔
901
                }
902
            }
903
        }
904
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
2✔
905
    }
906

907
    /**
908
     * According to receive certain message or offset storage time if an error occurs, it returns -1
909
     */
910
    public long pickupStoreTimestamp(final long offset, final int size) {
911
        if (offset >= this.getMinOffset()) {
2✔
912
            SelectMappedBufferResult result = this.getMessage(offset, size);
2✔
913
            if (null != result) {
2✔
914
                try {
915
                    int sysFlag = result.getByteBuffer().getInt(MessageDecoder.SYSFLAG_POSITION);
2✔
916
                    int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
2✔
917
                    int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength;
2✔
918
                    return result.getByteBuffer().getLong(msgStoreTimePos);
2✔
919
                } finally {
920
                    result.release();
2✔
921
                }
922
            }
923
        }
924

925
        return -1;
2✔
926
    }
927

928
    public long getMinOffset() {
929
        MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
2✔
930
        if (mappedFile != null) {
2✔
931
            if (mappedFile.isAvailable()) {
2✔
932
                return mappedFile.getFileFromOffset();
2✔
933
            } else {
934
                return this.rollNextFile(mappedFile.getFileFromOffset());
2✔
935
            }
936
        }
937

938
        return -1;
2✔
939
    }
940

941
    public SelectMappedBufferResult getMessage(final long offset, final int size) {
942
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
2✔
943
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
2✔
944
        if (mappedFile != null) {
2✔
945
            int pos = (int) (offset % mappedFileSize);
2✔
946
            return mappedFile.selectMappedBuffer(pos, size);
2✔
947
        }
948
        return null;
×
949
    }
950

951
    public long rollNextFile(final long offset) {
952
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
2✔
953
        return offset + mappedFileSize - offset % mappedFileSize;
2✔
954
    }
955

956
    public HashMap<String, Long> getTopicQueueTable() {
957
        return topicQueueTable;
×
958
    }
959

960
    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
961
        this.topicQueueTable = topicQueueTable;
2✔
962
    }
2✔
963

964
    public void destroy() {
965
        this.mappedFileQueue.destroy();
2✔
966
    }
2✔
967

968
    public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
969
        putMessageLock.lock();
2✔
970
        try {
971
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
2✔
972
            if (null == mappedFile) {
2✔
973
                log.error("appendData getLastMappedFile error  " + startOffset);
×
974
                return false;
×
975
            }
976

977
            return mappedFile.appendMessage(data, dataStart, dataLength);
2✔
978
        } finally {
979
            putMessageLock.unlock();
2✔
980
        }
981
    }
982

983
    public boolean retryDeleteFirstFile(final long intervalForcibly) {
984
        return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly);
2✔
985
    }
986

987
    public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
988
        String key = topic + "-" + queueId;
×
989
        synchronized (this) {
×
990
            this.topicQueueTable.remove(key);
×
991
            this.lmqTopicQueueTable.remove(key);
×
992
        }
×
993

994
        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
×
995
    }
×
996

997
    public void checkSelf() {
998
        mappedFileQueue.checkSelf();
×
999
    }
×
1000

1001
    public long lockTimeMills() {
1002
        long diff = 0;
×
1003
        long begin = this.beginTimeInLock;
×
1004
        if (begin > 0) {
×
1005
            diff = this.defaultMessageStore.now() - begin;
×
1006
        }
1007

1008
        if (diff < 0) {
×
1009
            diff = 0;
×
1010
        }
1011

1012
        return diff;
×
1013
    }
1014

1015
    public Map<String, Long> getLmqTopicQueueTable() {
1016
        return this.lmqTopicQueueTable;
2✔
1017
    }
1018

1019
    public void setLmqTopicQueueTable(Map<String, Long> lmqTopicQueueTable) {
1020
        if (!defaultMessageStore.getMessageStoreConfig().isEnableLmq()) {
2✔
1021
            return;
2✔
1022
        }
1023
        Map<String, Long> table = new HashMap<String, Long>(1024);
2✔
1024
        for (Map.Entry<String, Long> entry : lmqTopicQueueTable.entrySet()) {
2✔
1025
            if (MixAll.isLmq(entry.getKey())) {
2✔
1026
                table.put(entry.getKey(), entry.getValue());
×
1027
            }
1028
        }
2✔
1029
        this.lmqTopicQueueTable = table;
2✔
1030
    }
2✔
1031

1032
    abstract class FlushCommitLogService extends ServiceThread {
2✔
1033
        protected static final int RETRY_TIMES_OVER = 10;
1034
    }
1035

1036
    class CommitRealTimeService extends FlushCommitLogService {
2✔
1037

1038
        private long lastCommitTimestamp = 0;
2✔
1039

1040
        @Override
1041
        public String getServiceName() {
1042
            return CommitRealTimeService.class.getSimpleName();
×
1043
        }
1044

1045
        @Override
1046
        public void run() {
1047
            CommitLog.log.info(this.getServiceName() + " service started");
×
1048
            while (!this.isStopped()) {
×
1049
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
×
1050

1051
                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
×
1052

1053
                int commitDataThoroughInterval =
×
1054
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
×
1055

1056
                long begin = System.currentTimeMillis();
×
1057
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
×
1058
                    this.lastCommitTimestamp = begin;
×
1059
                    commitDataLeastPages = 0;
×
1060
                }
1061

1062
                try {
1063
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
×
1064
                    long end = System.currentTimeMillis();
×
1065
                    if (!result) {
×
1066
                        this.lastCommitTimestamp = end; // result = false means some data committed.
×
1067
                        //now wake up flush thread.
1068
                        flushCommitLogService.wakeup();
×
1069
                    }
1070

1071
                    if (end - begin > 500) {
×
1072
                        log.info("Commit data to file costs {} ms", end - begin);
×
1073
                    }
1074
                    this.waitForRunning(interval);
×
1075
                } catch (Throwable e) {
×
1076
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
×
1077
                }
×
1078
            }
×
1079

1080
            boolean result = false;
×
1081
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
×
1082
                result = CommitLog.this.mappedFileQueue.commit(0);
×
1083
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
×
1084
            }
1085
            CommitLog.log.info(this.getServiceName() + " service end");
×
1086
        }
×
1087
    }
1088

1089
    class FlushRealTimeService extends FlushCommitLogService {
2✔
1090
        private long lastFlushTimestamp = 0;
2✔
1091
        private long printTimes = 0;
2✔
1092

1093
        public void run() {
1094
            CommitLog.log.info(this.getServiceName() + " service started");
2✔
1095

1096
            while (!this.isStopped()) {
2✔
1097
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
2✔
1098

1099
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
2✔
1100
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
2✔
1101

1102
                int flushPhysicQueueThoroughInterval =
2✔
1103
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
2✔
1104

1105
                boolean printFlushProgress = false;
2✔
1106

1107
                // Print flush progress
1108
                long currentTimeMillis = System.currentTimeMillis();
2✔
1109
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
2✔
1110
                    this.lastFlushTimestamp = currentTimeMillis;
2✔
1111
                    flushPhysicQueueLeastPages = 0;
2✔
1112
                    printFlushProgress = (printTimes++ % 10) == 0;
2✔
1113
                }
1114

1115
                try {
1116
                    if (flushCommitLogTimed) {
2✔
1117
                        Thread.sleep(interval);
2✔
1118
                    } else {
1119
                        this.waitForRunning(interval);
×
1120
                    }
1121

1122
                    if (printFlushProgress) {
2✔
1123
                        this.printFlushProgress();
2✔
1124
                    }
1125

1126
                    long begin = System.currentTimeMillis();
2✔
1127
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
2✔
1128
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
2✔
1129
                    if (storeTimestamp > 0) {
2✔
1130
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
2✔
1131
                    }
1132
                    long past = System.currentTimeMillis() - begin;
2✔
1133
                    if (past > 500) {
2✔
1134
                        log.info("Flush data to disk costs {} ms", past);
×
1135
                    }
1136
                } catch (Throwable e) {
×
1137
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
×
1138
                    this.printFlushProgress();
×
1139
                }
2✔
1140
            }
2✔
1141

1142
            // Normal shutdown, to ensure that all the flush before exit
1143
            boolean result = false;
2✔
1144
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
2✔
1145
                result = CommitLog.this.mappedFileQueue.flush(0);
2✔
1146
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
2✔
1147
            }
1148

1149
            this.printFlushProgress();
2✔
1150

1151
            CommitLog.log.info(this.getServiceName() + " service end");
2✔
1152
        }
2✔
1153

1154
        @Override
1155
        public String getServiceName() {
1156
            return FlushRealTimeService.class.getSimpleName();
2✔
1157
        }
1158

1159
        private void printFlushProgress() {
1160
            // CommitLog.log.info("how much disk fall behind memory, "
1161
            // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
1162
        }
2✔
1163

1164
        @Override
1165
        public long getJointime() {
1166
            return 1000 * 60 * 5;
2✔
1167
        }
1168
    }
1169

1170
    public static class GroupCommitRequest {
1171
        private final long nextOffset;
1172
        private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
2✔
1173
        private final long deadLine;
1174

1175
        public GroupCommitRequest(long nextOffset, long timeoutMillis) {
2✔
1176
            this.nextOffset = nextOffset;
2✔
1177
            this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
2✔
1178
        }
2✔
1179

1180
        public long getDeadLine() {
1181
            return deadLine;
2✔
1182
        }
1183

1184
        public long getNextOffset() {
1185
            return nextOffset;
2✔
1186
        }
1187

1188
        public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
1189
            this.flushOKFuture.complete(putMessageStatus);
2✔
1190
        }
2✔
1191

1192
        public CompletableFuture<PutMessageStatus> future() {
1193
            return flushOKFuture;
2✔
1194
        }
1195

1196
    }
1197

1198
    /**
1199
     * GroupCommit Service
1200
     */
1201
    class GroupCommitService extends FlushCommitLogService {
2✔
1202
        private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
2✔
1203
        private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
2✔
1204
        private final PutMessageSpinLock lock = new PutMessageSpinLock();
2✔
1205

1206
        public synchronized void putRequest(final GroupCommitRequest request) {
1207
            lock.lock();
2✔
1208
            try {
1209
                this.requestsWrite.add(request);
2✔
1210
            } finally {
1211
                lock.unlock();
2✔
1212
            }
1213
            this.wakeup();
2✔
1214
        }
2✔
1215

1216
        private void swapRequests() {
1217
            lock.lock();
2✔
1218
            try {
1219
                LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
2✔
1220
                this.requestsWrite = this.requestsRead;
2✔
1221
                this.requestsRead = tmp;
2✔
1222
            } finally {
1223
                lock.unlock();
2✔
1224
            }
1225
        }
2✔
1226

1227
        private void doCommit() {
1228
            if (!this.requestsRead.isEmpty()) {
2✔
1229
                for (GroupCommitRequest req : this.requestsRead) {
2✔
1230
                    // There may be a message in the next file, so a maximum of
1231
                    // two times the flush
1232
                    boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
2✔
1233
                    for (int i = 0; i < 2 && !flushOK; i++) {
2✔
1234
                        CommitLog.this.mappedFileQueue.flush(0);
2✔
1235
                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
2✔
1236
                    }
1237

1238
                    req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
2✔
1239
                }
2✔
1240

1241
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
2✔
1242
                if (storeTimestamp > 0) {
2✔
1243
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
2✔
1244
                }
1245

1246
                this.requestsRead = new LinkedList<>();
2✔
1247
            } else {
2✔
1248
                // Because of individual messages is set to not sync flush, it
1249
                // will come to this process
1250
                CommitLog.this.mappedFileQueue.flush(0);
2✔
1251
            }
1252
        }
2✔
1253

1254
        public void run() {
1255
            CommitLog.log.info(this.getServiceName() + " service started");
2✔
1256

1257
            while (!this.isStopped()) {
2✔
1258
                try {
1259
                    this.waitForRunning(10);
2✔
1260
                    this.doCommit();
2✔
1261
                } catch (Exception e) {
×
1262
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
×
1263
                }
2✔
1264
            }
1265

1266
            // Under normal circumstances shutdown, wait for the arrival of the
1267
            // request, and then flush
1268
            try {
1269
                Thread.sleep(10);
2✔
1270
            } catch (InterruptedException e) {
×
1271
                CommitLog.log.warn(this.getServiceName() + " Exception, ", e);
×
1272
            }
2✔
1273

1274
            synchronized (this) {
2✔
1275
                this.swapRequests();
2✔
1276
            }
2✔
1277

1278
            this.doCommit();
2✔
1279

1280
            CommitLog.log.info(this.getServiceName() + " service end");
2✔
1281
        }
2✔
1282

1283
        @Override
1284
        protected void onWaitEnd() {
1285
            this.swapRequests();
2✔
1286
        }
2✔
1287

1288
        @Override
1289
        public String getServiceName() {
1290
            return GroupCommitService.class.getSimpleName();
2✔
1291
        }
1292

1293
        @Override
1294
        public long getJointime() {
1295
            return 1000 * 60 * 5;
2✔
1296
        }
1297
    }
1298

1299
    class DefaultAppendMessageCallback implements AppendMessageCallback {
1300
        // File at the end of the minimum fixed length empty
1301
        private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
1302
        // Store the message content
1303
        private final ByteBuffer msgStoreItemMemory;
1304

1305
        DefaultAppendMessageCallback() {
2✔
1306
            this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
2✔
1307
        }
2✔
1308

1309
        public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
1310
            final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
1311
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
1312

1313
            // PHY OFFSET
1314
            long wroteOffset = fileFromOffset + byteBuffer.position();
2✔
1315

1316
            Supplier<String> msgIdSupplier = () -> {
2✔
1317
                int sysflag = msgInner.getSysFlag();
×
1318
                int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
×
1319
                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
×
1320
                MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
×
1321
                msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
×
1322
                msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
×
1323
                return UtilAll.bytes2string(msgIdBuffer.array());
×
1324
            };
1325

1326
            // Record ConsumeQueue information
1327
            String key = putMessageContext.getTopicQueueTableKey();
2✔
1328
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
2✔
1329
            if (null == queueOffset) {
2✔
1330
                queueOffset = 0L;
2✔
1331
                CommitLog.this.topicQueueTable.put(key, queueOffset);
2✔
1332
            }
1333

1334
            boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
2✔
1335
            if (!multiDispatchWrapResult) {
2✔
1336
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
×
1337
            }
1338

1339
            // Transaction messages that require special handling
1340
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
2✔
1341
            switch (tranType) {
2✔
1342
                // Prepared and Rollback message is not consumed, will not enter the
1343
                // consumer queue
1344
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
1345
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
1346
                    queueOffset = 0L;
×
1347
                    break;
×
1348
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
1349
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
1350
                default:
1351
                    break;
1352
            }
1353

1354
            ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
2✔
1355
            final int msgLen = preEncodeBuffer.getInt(0);
2✔
1356

1357
            // Determines whether there is sufficient free space
1358
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
2✔
1359
                this.msgStoreItemMemory.clear();
2✔
1360
                // 1 TOTALSIZE
1361
                this.msgStoreItemMemory.putInt(maxBlank);
2✔
1362
                // 2 MAGICCODE
1363
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
2✔
1364
                // 3 The remaining space may be any value
1365
                // Here the length of the specially set maxBlank
1366
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
2✔
1367
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
2✔
1368
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
2✔
1369
                        maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
1370
                        msgIdSupplier, msgInner.getStoreTimestamp(),
2✔
1371
                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
2✔
1372
            }
1373

1374
            int pos = 4 + 4 + 4 + 4 + 4;
2✔
1375
            // 6 QUEUEOFFSET
1376
            preEncodeBuffer.putLong(pos, queueOffset);
2✔
1377
            pos += 8;
2✔
1378
            // 7 PHYSICALOFFSET
1379
            preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
2✔
1380
            int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
2✔
1381
            // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
1382
            pos += 8 + 4 + 8 + ipLen;
2✔
1383
            // refresh store time stamp in lock
1384
            preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
2✔
1385

1386

1387
            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
2✔
1388
            // Write messages to the queue buffer
1389
            byteBuffer.put(preEncodeBuffer);
2✔
1390
            msgInner.setEncodedBuff(null);
2✔
1391
            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
2✔
1392
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
2✔
1393

1394
            switch (tranType) {
2✔
1395
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
1396
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
1397
                    break;
×
1398
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
1399
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
1400
                    // The next update ConsumeQueue information
1401
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
2✔
1402
                    CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
2✔
1403
                    break;
2✔
1404
                default:
1405
                    break;
1406
            }
1407
            return result;
2✔
1408
        }
1409

1410
        public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
1411
            final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
1412
            byteBuffer.mark();
2✔
1413
            //physical offset
1414
            long wroteOffset = fileFromOffset + byteBuffer.position();
2✔
1415
            // Record ConsumeQueue information
1416
            String key = putMessageContext.getTopicQueueTableKey();
2✔
1417
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
2✔
1418
            if (null == queueOffset) {
2✔
1419
                queueOffset = 0L;
2✔
1420
                CommitLog.this.topicQueueTable.put(key, queueOffset);
2✔
1421
            }
1422
            long beginQueueOffset = queueOffset;
2✔
1423
            int totalMsgLen = 0;
2✔
1424
            int msgNum = 0;
2✔
1425

1426
            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
2✔
1427
            ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff();
2✔
1428

1429
            int sysFlag = messageExtBatch.getSysFlag();
2✔
1430
            int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
2✔
1431
            int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
2✔
1432
            Supplier<String> msgIdSupplier = () -> {
2✔
1433
                int msgIdLen = storeHostLength + 8;
2✔
1434
                int batchCount = putMessageContext.getBatchSize();
2✔
1435
                long[] phyPosArray = putMessageContext.getPhyPos();
2✔
1436
                ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
2✔
1437
                MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer);
2✔
1438
                msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
2✔
1439

1440
                StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1);
2✔
1441
                for (int i = 0; i < phyPosArray.length; i++) {
2✔
1442
                    msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]);
2✔
1443
                    String msgId = UtilAll.bytes2string(msgIdBuffer.array());
2✔
1444
                    if (i != 0) {
2✔
1445
                        buffer.append(',');
2✔
1446
                    }
1447
                    buffer.append(msgId);
2✔
1448
                }
1449
                return buffer.toString();
2✔
1450
            };
1451

1452
            messagesByteBuff.mark();
2✔
1453
            int index = 0;
2✔
1454
            while (messagesByteBuff.hasRemaining()) {
2✔
1455
                // 1 TOTALSIZE
1456
                final int msgPos = messagesByteBuff.position();
2✔
1457
                final int msgLen = messagesByteBuff.getInt();
2✔
1458

1459
                totalMsgLen += msgLen;
2✔
1460
                // Determines whether there is sufficient free space
1461
                if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
2✔
1462
                    this.msgStoreItemMemory.clear();
2✔
1463
                    // 1 TOTALSIZE
1464
                    this.msgStoreItemMemory.putInt(maxBlank);
2✔
1465
                    // 2 MAGICCODE
1466
                    this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
2✔
1467
                    // 3 The remaining space may be any value
1468
                    //ignore previous read
1469
                    messagesByteBuff.reset();
2✔
1470
                    // Here the length of the specially set maxBlank
1471
                    byteBuffer.reset(); //ignore the previous appended messages
2✔
1472
                    byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
2✔
1473
                    return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(),
2✔
1474
                        beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
2✔
1475
                }
1476
                //move to add queue offset and commitlog offset
1477
                int pos = msgPos + 20;
2✔
1478
                messagesByteBuff.putLong(pos, queueOffset);
2✔
1479
                pos += 8;
2✔
1480
                messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen);
2✔
1481
                // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
1482
                pos += 8 + 4 + 8 + bornHostLength;
2✔
1483
                // refresh store time stamp in lock
1484
                messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp());
2✔
1485

1486
                putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
2✔
1487
                queueOffset++;
2✔
1488
                msgNum++;
2✔
1489
                messagesByteBuff.position(msgPos + msgLen);
2✔
1490
            }
2✔
1491

1492
            messagesByteBuff.position(0);
2✔
1493
            messagesByteBuff.limit(totalMsgLen);
2✔
1494
            byteBuffer.put(messagesByteBuff);
2✔
1495
            messageExtBatch.setEncodedBuff(null);
2✔
1496
            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier,
2✔
1497
                messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
2✔
1498
            result.setMsgNum(msgNum);
2✔
1499
            CommitLog.this.topicQueueTable.put(key, queueOffset);
2✔
1500

1501
            return result;
2✔
1502
        }
1503

1504
    }
1505

1506
    public static class MessageExtEncoder {
1507
        private ByteBuf byteBuf;
1508
        // The maximum length of the message body.
1509
        private int maxMessageBodySize;
1510
        // The maximum length of the full message.
1511
        private int maxMessageSize;
1512
        MessageExtEncoder(final int maxMessageBodySize) {
2✔
1513
            ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
2✔
1514
            //Reserve 64kb for encoding buffer outside body
1515
            int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 1024 ?
2✔
1516
                    maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
1517
            byteBuf = alloc.directBuffer(maxMessageSize);
2✔
1518
            this.maxMessageBodySize = maxMessageBodySize;
2✔
1519
            this.maxMessageSize = maxMessageSize;
2✔
1520
        }
2✔
1521

1522
        protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
1523
            this.byteBuf.clear();
2✔
1524
            /**
1525
             * Serialize message
1526
             */
1527
            final byte[] propertiesData =
2✔
1528
                    msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
2✔
1529

1530
            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
2✔
1531

1532
            if (propertiesLength > Short.MAX_VALUE) {
2✔
1533
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
2✔
1534
                return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
2✔
1535
            }
1536

1537
            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
2✔
1538
            final int topicLength = topicData.length;
2✔
1539

1540
            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
2✔
1541

1542
            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
2✔
1543

1544
            // Exceeds the maximum message body
1545
            if (bodyLength > this.maxMessageBodySize) {
2✔
1546
                CommitLog.log.warn("message body size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
2✔
1547
                    + ", maxMessageSize: " + this.maxMessageBodySize);
1548
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
2✔
1549
            }
1550

1551
            // Exceeds the maximum message
1552
            if (msgLen > this.maxMessageSize) {
2✔
1553
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
2✔
1554
                        + ", maxMessageSize: " + this.maxMessageSize);
1555
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
2✔
1556
            }
1557

1558
            // 1 TOTALSIZE
1559
            this.byteBuf.writeInt(msgLen);
2✔
1560
            // 2 MAGICCODE
1561
            this.byteBuf.writeInt(CommitLog.MESSAGE_MAGIC_CODE);
2✔
1562
            // 3 BODYCRC
1563
            this.byteBuf.writeInt(msgInner.getBodyCRC());
2✔
1564
            // 4 QUEUEID
1565
            this.byteBuf.writeInt(msgInner.getQueueId());
2✔
1566
            // 5 FLAG
1567
            this.byteBuf.writeInt(msgInner.getFlag());
2✔
1568
            // 6 QUEUEOFFSET, need update later
1569
            this.byteBuf.writeLong(0);
2✔
1570
            // 7 PHYSICALOFFSET, need update later
1571
            this.byteBuf.writeLong(0);
2✔
1572
            // 8 SYSFLAG
1573
            this.byteBuf.writeInt(msgInner.getSysFlag());
2✔
1574
            // 9 BORNTIMESTAMP
1575
            this.byteBuf.writeLong(msgInner.getBornTimestamp());
2✔
1576

1577
            // 10 BORNHOST
1578
            ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
2✔
1579
            this.byteBuf.writeBytes(bornHostBytes.array());
2✔
1580

1581
            // 11 STORETIMESTAMP
1582
            this.byteBuf.writeLong(msgInner.getStoreTimestamp());
2✔
1583

1584
            // 12 STOREHOSTADDRESS
1585
            ByteBuffer storeHostBytes = msgInner.getStoreHostBytes();
2✔
1586
            this.byteBuf.writeBytes(storeHostBytes.array());
2✔
1587

1588
            // 13 RECONSUMETIMES
1589
            this.byteBuf.writeInt(msgInner.getReconsumeTimes());
2✔
1590
            // 14 Prepared Transaction Offset
1591
            this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
2✔
1592
            // 15 BODY
1593
            this.byteBuf.writeInt(bodyLength);
2✔
1594
            if (bodyLength > 0)
2✔
1595
                this.byteBuf.writeBytes(msgInner.getBody());
2✔
1596
            // 16 TOPIC
1597
            this.byteBuf.writeByte((byte) topicLength);
2✔
1598
            this.byteBuf.writeBytes(topicData);
2✔
1599
            // 17 PROPERTIES
1600
            this.byteBuf.writeShort((short) propertiesLength);
2✔
1601
            if (propertiesLength > 0)
2✔
1602
                this.byteBuf.writeBytes(propertiesData);
2✔
1603

1604
            return null;
2✔
1605
        }
1606

1607
        protected ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) {
1608
            this.byteBuf.clear();
2✔
1609

1610
            ByteBuffer messagesByteBuff = messageExtBatch.wrap();
2✔
1611

1612
            int totalLength = messagesByteBuff.limit();
2✔
1613
            if (totalLength > this.maxMessageBodySize) {
2✔
1614
                CommitLog.log.warn("message body size exceeded, msg body size: " + totalLength + ", maxMessageSize: " + this.maxMessageBodySize);
2✔
1615
                throw new RuntimeException("message body size exceeded");
2✔
1616
            }
1617

1618
            // properties from MessageExtBatch
1619
            String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
2✔
1620
            final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
2✔
1621
            int batchPropDataLen = batchPropData.length;
2✔
1622
            if (batchPropDataLen > Short.MAX_VALUE) {
2✔
1623
                CommitLog.log.warn("Properties size of messageExtBatch exceeded, properties size: {}, maxSize: {}.", batchPropDataLen, Short.MAX_VALUE);
×
1624
                throw new RuntimeException("Properties size of messageExtBatch exceeded!");
×
1625
            }
1626
            final short batchPropLen = (short) batchPropDataLen;
2✔
1627

1628
            int batchSize = 0;
2✔
1629
            while (messagesByteBuff.hasRemaining()) {
2✔
1630
                batchSize++;
2✔
1631
                // 1 TOTALSIZE
1632
                messagesByteBuff.getInt();
2✔
1633
                // 2 MAGICCODE
1634
                messagesByteBuff.getInt();
2✔
1635
                // 3 BODYCRC
1636
                messagesByteBuff.getInt();
2✔
1637
                // 4 FLAG
1638
                int flag = messagesByteBuff.getInt();
2✔
1639
                // 5 BODY
1640
                int bodyLen = messagesByteBuff.getInt();
2✔
1641
                int bodyPos = messagesByteBuff.position();
2✔
1642
                int bodyCrc = UtilAll.crc32(messagesByteBuff.array(), bodyPos, bodyLen);
2✔
1643
                messagesByteBuff.position(bodyPos + bodyLen);
2✔
1644
                // 6 properties
1645
                short propertiesLen = messagesByteBuff.getShort();
2✔
1646
                int propertiesPos = messagesByteBuff.position();
2✔
1647
                messagesByteBuff.position(propertiesPos + propertiesLen);
2✔
1648
                boolean needAppendLastPropertySeparator = propertiesLen > 0 && batchPropLen > 0
2✔
1649
                            && messagesByteBuff.get(messagesByteBuff.position() - 1) != MessageDecoder.PROPERTY_SEPARATOR;
2✔
1650

1651
                final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
2✔
1652

1653
                final int topicLength = topicData.length;
2✔
1654

1655
                int totalPropLen = needAppendLastPropertySeparator ? propertiesLen + batchPropLen + 1
2✔
1656
                                                                     : propertiesLen + batchPropLen;
1657
                final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, totalPropLen);
2✔
1658

1659
                // 1 TOTALSIZE
1660
                this.byteBuf.writeInt(msgLen);
2✔
1661
                // 2 MAGICCODE
1662
                this.byteBuf.writeInt(CommitLog.MESSAGE_MAGIC_CODE);
2✔
1663
                // 3 BODYCRC
1664
                this.byteBuf.writeInt(bodyCrc);
2✔
1665
                // 4 QUEUEID
1666
                this.byteBuf.writeInt(messageExtBatch.getQueueId());
2✔
1667
                // 5 FLAG
1668
                this.byteBuf.writeInt(flag);
2✔
1669
                // 6 QUEUEOFFSET
1670
                this.byteBuf.writeLong(0);
2✔
1671
                // 7 PHYSICALOFFSET
1672
                this.byteBuf.writeLong(0);
2✔
1673
                // 8 SYSFLAG
1674
                this.byteBuf.writeInt(messageExtBatch.getSysFlag());
2✔
1675
                // 9 BORNTIMESTAMP
1676
                this.byteBuf.writeLong(messageExtBatch.getBornTimestamp());
2✔
1677

1678
                // 10 BORNHOST
1679
                ByteBuffer bornHostBytes = messageExtBatch.getBornHostBytes();
2✔
1680
                this.byteBuf.writeBytes(bornHostBytes.array());
2✔
1681

1682
                // 11 STORETIMESTAMP
1683
                this.byteBuf.writeLong(messageExtBatch.getStoreTimestamp());
2✔
1684

1685
                // 12 STOREHOSTADDRESS
1686
                ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes();
2✔
1687
                this.byteBuf.writeBytes(storeHostBytes.array());
2✔
1688

1689
                // 13 RECONSUMETIMES
1690
                this.byteBuf.writeInt(messageExtBatch.getReconsumeTimes());
2✔
1691
                // 14 Prepared Transaction Offset, batch does not support transaction
1692
                this.byteBuf.writeLong(0);
2✔
1693
                // 15 BODY
1694
                this.byteBuf.writeInt(bodyLen);
2✔
1695
                if (bodyLen > 0)
2✔
1696
                    this.byteBuf.writeBytes(messagesByteBuff.array(), bodyPos, bodyLen);
2✔
1697
                // 16 TOPIC
1698
                this.byteBuf.writeByte((byte) topicLength);
2✔
1699
                this.byteBuf.writeBytes(topicData);
2✔
1700
                // 17 PROPERTIES
1701
                this.byteBuf.writeShort((short) totalPropLen);
2✔
1702
                if (propertiesLen > 0) {
2✔
1703
                    this.byteBuf.writeBytes(messagesByteBuff.array(), propertiesPos, propertiesLen);
2✔
1704
                }
1705
                if (batchPropLen > 0) {
2✔
1706
                    if (needAppendLastPropertySeparator) {
2✔
1707
                        this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR);
2✔
1708
                    }
1709
                    this.byteBuf.writeBytes(batchPropData, 0, batchPropLen);
2✔
1710
                }
1711
            }
2✔
1712
            putMessageContext.setBatchSize(batchSize);
2✔
1713
            putMessageContext.setPhyPos(new long[batchSize]);
2✔
1714

1715
            return this.byteBuf.nioBuffer();
2✔
1716
        }
1717

1718
        public ByteBuffer getEncoderBuffer() {
1719
            return this.byteBuf.nioBuffer();
2✔
1720
        }
1721

1722
        public int getMaxMessageBodySize() {
1723
            return this.maxMessageBodySize;
2✔
1724
        }
1725

1726
        public void updateEncoderBufferCapacity(int newMaxMessageBodySize) {
1727
            this.maxMessageBodySize = newMaxMessageBodySize;
2✔
1728
            //Reserve 64kb for encoding buffer outside body
1729
            this.maxMessageSize = Integer.MAX_VALUE - newMaxMessageBodySize >= 64 * 1024 ?
2✔
1730
                    this.maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
1731
            this.byteBuf.capacity(this.maxMessageSize);
2✔
1732
        }
2✔
1733
    }
1734

1735
    static class PutMessageThreadLocal {
1736
        private MessageExtEncoder encoder;
1737
        private StringBuilder keyBuilder;
1738

1739
        PutMessageThreadLocal(int maxMessageBodySize) {
2✔
1740
            encoder = new MessageExtEncoder(maxMessageBodySize);
2✔
1741
            keyBuilder = new StringBuilder();
2✔
1742
        }
2✔
1743

1744
        public MessageExtEncoder getEncoder() {
1745
            return encoder;
2✔
1746
        }
1747

1748
        public StringBuilder getKeyBuilder() {
1749
            return keyBuilder;
2✔
1750
        }
1751
    }
1752

1753
    static class PutMessageContext {
1754
        private String topicQueueTableKey;
1755
        private long[] phyPos;
1756
        private int batchSize;
1757

1758
        public PutMessageContext(String topicQueueTableKey) {
2✔
1759
            this.topicQueueTableKey = topicQueueTableKey;
2✔
1760
        }
2✔
1761

1762
        public String getTopicQueueTableKey() {
1763
            return topicQueueTableKey;
2✔
1764
        }
1765

1766
        public long[] getPhyPos() {
1767
            return phyPos;
2✔
1768
        }
1769

1770
        public void setPhyPos(long[] phyPos) {
1771
            this.phyPos = phyPos;
2✔
1772
        }
2✔
1773

1774
        public int getBatchSize() {
1775
            return batchSize;
2✔
1776
        }
1777

1778
        public void setBatchSize(int batchSize) {
1779
            this.batchSize = batchSize;
2✔
1780
        }
2✔
1781
    }
1782
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc