• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3597

05 Feb 2025 01:41AM UTC coverage: 63.546% (-0.02%) from 63.562%
#3597

push

travis-ci

web-flow
Merge pull request #29639 from taosdata/feat/3.0/TS-5795

Enh(tsdb):print fid while data file corrupted.

141230 of 285630 branches covered (49.45%)

Branch coverage included in aggregate %.

2 of 4 new or added lines in 1 file covered. (50.0%)

398 existing lines in 102 files now uncovered.

220015 of 282846 relevant lines covered (77.79%)

18937864.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.07
/source/dnode/vnode/src/tq/tqUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

18
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
19
                                 const SMqMetaRsp* pRsp, int32_t vgId);
20
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
21
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
22

23
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
48,107✔
24
  int32_t    code = TDB_CODE_SUCCESS;
48,107✔
25
  int32_t    lino = 0;
48,107✔
26
  tqDebug("%s called", __FUNCTION__ );
48,107!
27
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
48,107!
28

29
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
48,107✔
30
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
48,106!
31

32
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
48,106✔
33
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
48,107!
34

35
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
48,107✔
36
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
48,107✔
37
  pRsp->withTbName = 0;
48,107✔
38
  pRsp->withSchema = false;
48,107✔
39

40
END:
48,107✔
41
  if (code != 0){
48,107!
42
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
43
  }
44
  return code;
48,107✔
45
}
46

47
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
14,386✔
48
  SSyncState state = syncGetState(pTq->pVnode->sync);
14,386✔
49
  streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
14,389✔
50
}
14,389✔
51

52
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
10,466✔
53
  int32_t    code = TDB_CODE_SUCCESS;
10,466✔
54
  int32_t    lino = 0;
10,466✔
55
  tqDebug("%s called", __FUNCTION__ );
10,466!
56
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
10,466!
57
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
10,466✔
58
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
10,466✔
59

60
  pRsp->withTbName = 1;
10,465✔
61
  pRsp->withSchema = 1;
10,465✔
62
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
10,465✔
63
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\
10,466!
64

65
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
10,466✔
66
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
10,466!
67

68
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
10,466✔
69
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
10,466!
70

71
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
10,466✔
72
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
10,466!
73

74

75
END:
10,466✔
76
  if (code != 0){
10,466!
77
    tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code));
×
78
    taosArrayDestroy(pRsp->blockData);
×
79
    taosArrayDestroy(pRsp->blockDataLen);
×
80
    taosArrayDestroy(pRsp->blockTbName);
×
81
    taosArrayDestroy(pRsp->blockSchema);
×
82
  }
83
  return code;
10,466✔
84
}
85

86
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
2,557✔
87
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
88
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
2,557!
89
    return TSDB_CODE_INVALID_PARA;
×
90
  }
91
  uint64_t   consumerId = pRequest->consumerId;
2,557✔
92
  STqOffset* pOffset = NULL;
2,557✔
93
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
2,557✔
94
  int32_t    vgId = TD_VID(pTq->pVnode);
2,557✔
95

96
  *pBlockReturned = false;
2,557✔
97
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
98
  if (code == 0) {
2,557✔
99
    tOffsetCopy(pOffsetVal, &pOffset->val);
173✔
100

101
    char formatBuf[TSDB_OFFSET_LEN] = {0};
173✔
102
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
173✔
103
    tqDebug("tmq poll: consumer:0x%" PRIx64
173!
104
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
105
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
106
    return 0;
173✔
107
  } else {
108
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
109
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
2,384✔
110
      if (pRequest->useSnapshot) {
1,690✔
111
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
832!
112
                consumerId, pHandle->subKey, vgId);
113
        if (pHandle->fetchMeta) {
832✔
114
          tqOffsetResetToMeta(pOffsetVal, 0);
115
        } else {
116
          SValue val = {0};
814!
117
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
814✔
118
        }
119
      } else {
120
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
858✔
121
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
858✔
122
      }
123
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
694✔
124
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
54✔
125
      SMqDataRsp dataRsp = {0};
54✔
126
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
54✔
127

128
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
54✔
129
      if (code != 0) {
54!
130
        return code;
×
131
      }
132
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
54!
133
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
134
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
54✔
135
      tDeleteMqDataRsp(&dataRsp);
54✔
136

137
      *pBlockReturned = true;
54✔
138
      return code;
54✔
139
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
640!
140
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
640!
141
                  " in vg %d, subkey %s, reset none failed",
142
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
143
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
640✔
144
    }
145
  }
146

147
  return 0;
1,690✔
148
}
149

150
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
47,361✔
151
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
152
  int32_t    code = TDB_CODE_SUCCESS;
47,361✔
153
  int32_t    lino = 0;
47,361✔
154
  tqDebug("%s called", __FUNCTION__ );
47,361!
155
  uint64_t consumerId = pRequest->consumerId;
47,362✔
156
  int32_t  vgId = TD_VID(pTq->pVnode);
47,362✔
157
  terrno = 0;
47,362✔
158

159
  SMqDataRsp dataRsp = {0};
47,362✔
160
  code = tqInitDataRsp(&dataRsp, *pOffset);
47,362✔
161
  TSDB_CHECK_CODE(code, lino, end);
47,362!
162

163
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
47,362✔
164
  TSDB_CHECK_CODE(code, lino, end);
47,362!
165

166
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
47,362✔
167
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
47,361✔
168
    goto end;
634✔
169
  }
170

171
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
172
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
46,727✔
173
    // lock
174
    taosWLockLatch(&pTq->lock);
20,737✔
175
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
20,737✔
176
    if (dataRsp.rspOffset.version > ver) {  // check if there are data again to avoid lost data
20,737✔
177
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
19,691✔
178
      taosWUnLockLatch(&pTq->lock);
19,691✔
179
      goto end;
19,691✔
180
    }
181
    taosWUnLockLatch(&pTq->lock);
1,046✔
182
  }
183

184
  // reqOffset represents the current date offset, may be changed if wal not exists
185
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
27,036✔
186
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
27,036✔
187

188
end:
47,362✔
189
  {
190
    char buf[TSDB_OFFSET_LEN] = {0};
47,362✔
191
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
47,362✔
192
    if (code != 0){
47,361✔
193
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " error msg:%s, line:%d",
634!
194
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
195
    } else {
196
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " success",
46,727!
197
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
198
    }
199

200
    tDeleteMqDataRsp(&dataRsp);
47,362✔
201
    return code;
47,362✔
202
  }
203
}
204

205
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
206
  SDecoder decoder = {0};                                                                                  \
207
  TYPE     req = {0};                                                                                      \
208
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
209
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
210
  tDecoderInit(&decoder, data, len);                                                                       \
211
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
212
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
213
            " msgType %d",                                                                                 \
214
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
215
    fetchVer++;                                                                                            \
216
    DELETE_FUNC(&req);                                                                                     \
217
    tDecoderClear(&decoder);                                                                               \
218
    continue;                                                                                              \
219
  }                                                                                                        \
220
  DELETE_FUNC(&req);                                                                                       \
221
  tDecoderClear(&decoder);
222

223
static void tDeleteCommon(void* parm) {}
321✔
224

225
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
10,466✔
226
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
227
  int32_t         vgId = TD_VID(pTq->pVnode);
10,466✔
228
  SMqDataRsp      taosxRsp = {0};
10,466✔
229
  SMqBatchMetaRsp btMetaRsp = {0};
10,466✔
230
  int32_t         code = 0;
10,466✔
231

232
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
10,466!
233
  if (offset->type != TMQ_OFFSET__LOG) {
10,466✔
234
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest->timeout));
165!
235

236
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
165✔
237
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
16✔
238
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
16!
239
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
240
      goto END;
16✔
241
    }
242

243
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
149!
244
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
245
    if (taosxRsp.blockNum > 0) {
149✔
246
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
142✔
247
      goto END;
142✔
248
    } else {
249
      tOffsetCopy(offset, &taosxRsp.rspOffset);
7✔
250
    }
251
  }
252

253
  if (offset->type == TMQ_OFFSET__LOG) {
10,308!
254
    walReaderVerifyOffset(pHandle->pWalReader, offset);
10,308✔
255
    int64_t fetchVer = offset->version;
10,308✔
256

257
    uint64_t st = taosGetTimestampMs();
10,308✔
258
    int      totalRows = 0;
10,308✔
259
    int32_t  totalMetaRows = 0;
10,308✔
260
    while (1) {
122,306✔
261
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
132,614✔
262
      if (savedEpoch > pRequest->epoch) {
132,613!
263
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
264
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
265
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
266
        goto END;
10,308✔
267
      }
268

269
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
132,613✔
270
        if (totalMetaRows > 0) {
9,563✔
271
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
2✔
272
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
2✔
273
          goto END;
2✔
274
        }
275
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
9,561✔
276
        code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
9,561✔
277
                             taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
9,561✔
278
        goto END;
9,561✔
279
      }
280

281
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
123,045✔
282
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
123,045!
283
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
284

285
      // process meta
286
      if (pHead->msgType != TDMT_VND_SUBMIT) {
123,050✔
287
        if (totalRows > 0) {
558✔
288
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
26✔
289
          code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
26✔
290
                               taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
26✔
291
          goto END;
215✔
292
        }
293

294
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
532✔
295
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
529✔
296
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
170!
297
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
359✔
298
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, tDeleteCommon)
44!
299
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
396✔
300
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tDeleteCommon)
272!
301
          } else if (pHead->msgType == TDMT_VND_DELETE) {
43✔
302
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tDeleteCommon)
5!
303
          }
304
        }
305

306
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
191!
307
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
308
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
191!
309
          SMqMetaRsp metaRsp = {0};
189✔
310
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
189✔
311
          metaRsp.resMsgType = pHead->msgType;
189✔
312
          metaRsp.metaRspLen = pHead->bodyLen;
189✔
313
          metaRsp.metaRsp = pHead->body;
189✔
314
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
189✔
315
          goto END;
189✔
316
        }
317

318
        if (!btMetaRsp.batchMetaReq) {
2!
319
          btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
2✔
320
          TQ_NULL_GO_TO_END(btMetaRsp.batchMetaReq);
2!
321
          btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
2✔
322
          TQ_NULL_GO_TO_END(btMetaRsp.batchMetaLen);
2!
323
        }
324
        fetchVer++;
2✔
325

326
        SMqMetaRsp tmpMetaRsp = {0};
2✔
327
        tmpMetaRsp.resMsgType = pHead->msgType;
2✔
328
        tmpMetaRsp.metaRspLen = pHead->bodyLen;
2✔
329
        tmpMetaRsp.metaRsp = pHead->body;
2✔
330
        uint32_t len = 0;
2✔
331
        tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
2!
332
        if (TSDB_CODE_SUCCESS != code) {
2!
333
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
334
          continue;
×
335
        }
336
        int32_t tLen = sizeof(SMqRspHead) + len;
2✔
337
        void*   tBuf = taosMemoryCalloc(1, tLen);
2!
338
        TQ_NULL_GO_TO_END(tBuf);
2!
339
        void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
2✔
340
        SEncoder encoder = {0};
2✔
341
        tEncoderInit(&encoder, metaBuff, len);
2✔
342
        code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
2✔
343
        tEncoderClear(&encoder);
2✔
344

345
        if (code < 0) {
2!
346
          tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
347
          continue;
×
348
        }
349
        TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf));
4!
350
        TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen));
4!
351
        totalMetaRows++;
2✔
352
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
4!
353
          tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
354
          code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
355
          goto END;
×
356
        }
357
        continue;
2✔
358
      }
359

360
      if (totalMetaRows > 0) {
122,492!
361
        tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
×
362
        code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
×
363
        goto END;
×
364
      }
365

366
      // process data
367
      SPackedData submit = {
122,492✔
368
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
122,492✔
369
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
122,492✔
370
          .ver = pHead->version,
122,492✔
371
      };
372

373
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded));
122,492!
374

375
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
244,456!
376
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
530✔
377
        code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
530✔
378
                             taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
530!
379
        goto END;
530✔
380
      } else {
381
        fetchVer++;
121,963✔
382
      }
383
    }
384
  }
385

386
END:
×
387
  if (code != 0){
10,466!
388
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
389
            pRequest->subKey);
390
  }
391
  tDeleteMqBatchMetaRsp(&btMetaRsp);
10,466✔
392
  tDeleteSTaosxRsp(&taosxRsp);
10,465✔
393
  return code;
10,465✔
394
}
395

396
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
58,521✔
397
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
58,521!
UNCOV
398
    return TSDB_CODE_TMQ_INVALID_MSG;
×
399
  }
400
  int32_t      code = 0;
58,522✔
401
  STqOffsetVal reqOffset = {0};
58,522✔
402
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
58,522✔
403

404
  // reset the offset if needed
405
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
58,521✔
406
    bool blockReturned = false;
2,557✔
407
    code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
2,557✔
408
    if (code != 0) {
2,557✔
409
      goto END;
694✔
410
    }
411

412
    // empty block returned, quit
413
    if (blockReturned) {
1,917✔
414
      goto END;
54✔
415
    }
416
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
55,964!
417
    uError("req offset type is 0");
×
418
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
419
    goto END;
×
420
  }
421

422
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
57,827✔
423
    code = extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
47,361✔
424
  } else {
425
    code = extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
10,466✔
426
  }
427

428
END:
58,520✔
429
  if (code != 0){
58,520✔
430
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
1,274!
431
  }
432
  tOffsetDestroy(&reqOffset);
58,520✔
433
  return code;
58,521✔
434
}
435

436
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
38,248✔
437
                          int64_t ever) {
438
  if (pMsgHead == NULL) {
38,248!
439
    return;
×
440
  }
441
  pMsgHead->consumerId = consumerId;
38,248✔
442
  pMsgHead->epoch = epoch;
38,248✔
443
  pMsgHead->mqMsgType = type;
38,248✔
444
  pMsgHead->walsver = sver;
38,248✔
445
  pMsgHead->walever = ever;
38,248✔
446
}
447

448
int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
18✔
449
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
450
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
18!
451
    return TSDB_CODE_TMQ_INVALID_MSG;
×
452
  }
453
  int32_t len = 0;
18✔
454
  int32_t code = 0;
18✔
455
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
18!
456
  if (code < 0) {
18!
457
    return TAOS_GET_TERRNO(code);
×
458
  }
459
  int32_t tlen = sizeof(SMqRspHead) + len;
18✔
460
  void*   buf = rpcMallocCont(tlen);
18✔
461
  if (buf == NULL) {
18!
462
    return TAOS_GET_TERRNO(terrno);
×
463
  }
464

465
  int64_t sver = 0, ever = 0;
18✔
466
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
18✔
467
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
18✔
468

469
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
18✔
470

471
  SEncoder encoder = {0};
18✔
472
  tEncoderInit(&encoder, abuf, len);
18✔
473
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
18✔
474
  tEncoderClear(&encoder);
18✔
475
  if (code < 0) {
18!
476
    rpcFreeCont(buf);
×
477
    return TAOS_GET_TERRNO(code);
×
478
  }
479
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
18✔
480

481
  tmsgSendRsp(&resp);
18✔
482
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
18!
483
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
484

485
  return 0;
18✔
486
}
487

488
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
189✔
489
                          int32_t vgId) {
490
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
189!
491
    return TSDB_CODE_TMQ_INVALID_MSG;
×
492
  }
493
  int32_t len = 0;
189✔
494
  int32_t code = 0;
189✔
495
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
189!
496
  if (code < 0) {
189!
497
    return TAOS_GET_TERRNO(code);
×
498
  }
499
  int32_t tlen = sizeof(SMqRspHead) + len;
189✔
500
  void*   buf = rpcMallocCont(tlen);
189✔
501
  if (buf == NULL) {
189!
502
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
503
  }
504

505
  int64_t sver = 0, ever = 0;
189✔
506
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
189✔
507
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
189✔
508

509
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
189✔
510

511
  SEncoder encoder = {0};
189✔
512
  tEncoderInit(&encoder, abuf, len);
189✔
513
  code = tEncodeMqMetaRsp(&encoder, pRsp);
189✔
514
  tEncoderClear(&encoder);
189✔
515
  if (code < 0) {
189!
516
    rpcFreeCont(buf);
×
517
    return TAOS_GET_TERRNO(code);
×
518
  }
519

520
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
189✔
521

522
  tmsgSendRsp(&resp);
189✔
523
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
189!
524
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
525

526
  return 0;
189✔
527
}
528

529
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
38,041✔
530
                        int32_t type, int64_t sver, int64_t ever) {
531
  if (pRpcHandleInfo == NULL || pRsp == NULL) {
38,041!
532
    return TSDB_CODE_TMQ_INVALID_MSG;
×
533
  }
534
  int32_t len = 0;
38,041✔
535
  int32_t code = 0;
38,041✔
536

537
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
38,041✔
538
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
38,029!
539
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
12!
540
    tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
12!
541
  }
542

543
  if (code < 0) {
38,040!
544
    return TAOS_GET_TERRNO(code);
×
545
  }
546

547
  int32_t tlen = sizeof(SMqRspHead) + len;
38,040✔
548
  void*   buf = rpcMallocCont(tlen);
38,040✔
549
  if (buf == NULL) {
38,041!
550
    return terrno;
×
551
  }
552

553
  SMqRspHead* pHead = (SMqRspHead*)buf;
38,041✔
554
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
38,041✔
555

556
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
38,041✔
557

558
  SEncoder encoder = {0};
38,041✔
559
  tEncoderInit(&encoder, abuf, len);
38,041✔
560

561
  if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
38,041✔
562
    code = tEncodeMqDataRsp(&encoder, pRsp);
38,029✔
563
  } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
12!
564
    code = tEncodeSTaosxRsp(&encoder, pRsp);
12✔
565
  }
566
  tEncoderClear(&encoder);
38,041✔
567
  if (code < 0) {
38,041!
568
    rpcFreeCont(buf);
×
569
    return TAOS_GET_TERRNO(code);
×
570
  }
571
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
38,041✔
572

573
  tmsgSendRsp(&rsp);
38,041✔
574
  return 0;
38,041✔
575
}
576

577
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
8,772✔
578
  int32_t     code = 0;
8,772✔
579
  int32_t     line = 0;
8,772✔
580
  SDecoder*   pCoder = &(SDecoder){0};
8,772✔
581
  SDeleteRes* pRes = &(SDeleteRes){0};
8,772✔
582

583
  *pRefBlock = NULL;
8,772✔
584

585
  pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
8,772✔
586
  TSDB_CHECK_NULL(pRes->uidList, code, line, END, terrno)
8,772!
587

588
  tDecoderInit(pCoder, (uint8_t*)pData, len);
8,772✔
589
  code = tDecodeDeleteRes(pCoder, pRes);
8,772✔
590
  TSDB_CHECK_CODE(code, line, END);
8,772!
591

592
  int32_t numOfTables = taosArrayGetSize(pRes->uidList);
8,772✔
593
  if (numOfTables == 0 || pRes->affectedRows == 0) {
8,772✔
594
    goto END;
6,374✔
595
  }
596

597
  SSDataBlock* pDelBlock = NULL;
2,398✔
598
  code = createSpecialDataBlock(blockType, &pDelBlock);
2,398✔
599
  TSDB_CHECK_CODE(code, line, END);
2,398!
600

601
  code = blockDataEnsureCapacity(pDelBlock, numOfTables);
2,398✔
602
  TSDB_CHECK_CODE(code, line, END);
2,398!
603

604
  pDelBlock->info.rows = numOfTables;
2,398✔
605
  pDelBlock->info.version = ver;
2,398✔
606

607
  for (int32_t i = 0; i < numOfTables; i++) {
4,802✔
608
    // start key column
609
    SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
2,405✔
610
    TSDB_CHECK_NULL(pStartCol, code, line, END, terrno)
2,405!
611
    code = colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false);  // end key column
2,405✔
612
    TSDB_CHECK_CODE(code, line, END);
2,405!
613
    SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
2,405✔
614
    TSDB_CHECK_NULL(pEndCol, code, line, END, terrno)
2,405!
615
    code = colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
2,405✔
616
    TSDB_CHECK_CODE(code, line, END);
2,405!
617
    // uid column
618
    SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
2,405✔
619
    TSDB_CHECK_NULL(pUidCol, code, line, END, terrno)
2,405!
620

621
    int64_t* pUid = taosArrayGet(pRes->uidList, i);
2,405✔
622
    code = colDataSetVal(pUidCol, i, (const char*)pUid, false);
2,404✔
623
    TSDB_CHECK_CODE(code, line, END);
2,404!
624
    void* tmp = taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX);
2,404✔
625
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,404!
626
    colDataSetNULL(tmp, i);
2,404!
627
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
2,404✔
628
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,404!
629
    colDataSetNULL(tmp, i);
2,404!
630
    tmp = taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
2,404✔
631
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,404!
632
    colDataSetNULL(tmp, i);
2,404!
633
    tmp = taosArrayGet(pDelBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
2,404✔
634
    TSDB_CHECK_NULL(tmp, code, line, END, terrno)
2,404!
635
    colDataSetNULL(tmp, i);
2,404!
636
  }
637

638
  if (type == 0) {
2,397✔
639
    code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
2,389✔
640
    if (code) {
2,390!
641
      blockDataCleanup(pDelBlock);
×
642
      taosMemoryFree(pDelBlock);
×
643
      return code;
×
644
    }
645

646
    ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
2,390✔
647
    ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pDelBlock;
2,390✔
648
  } else if (type == 1) {
8!
649
    *pRefBlock = pDelBlock;
8✔
650
  } else {
651
    tqError("unknown type:%d", type);
×
652
    code = TSDB_CODE_TMQ_CONSUMER_ERROR;
×
653
  }
654

655
END:
8,772✔
656
  if (code != 0) {
8,772!
657
    tqError("failed to extract delete data block, line:%d code:%d", line, code);
×
658
  }
659
  tDecoderClear(pCoder);
8,772✔
660
  taosArrayDestroy(pRes->uidList);
8,772✔
661
  return code;
8,772✔
662
}
663

664
int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished) {
5,785✔
665
  SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
5,785✔
666
  int32_t      numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,785✔
667
  int32_t      code = TSDB_CODE_SUCCESS;
5,784✔
668

669
  if (pDelay != NULL) {
5,784!
670
    *pDelay = 0;
5,784✔
671
  }
672

673
  *fhFinished = false;
5,784✔
674

675
  if (numOfTasks <= 0) {
5,784!
676
    return code;
×
677
  }
678

679
  // extract the required source task for a given stream, identified by streamId
680
  streamMetaRLock(pMeta);
5,784✔
681

682
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5,785✔
683

684
  for (int32_t i = 0; i < numOfTasks; ++i) {
35,329✔
685
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
29,543✔
686
    if (pId == NULL) {
29,542!
687
      continue;
23,760✔
688
    }
689
    if (pId->streamId != streamId) {
29,542✔
690
      continue;
17,974✔
691
    }
692

693
    STaskId      id = {.streamId = pId->streamId, .taskId = pId->taskId};
11,568✔
694
    SStreamTask* pTask = NULL;
11,568✔
695

696
    code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
11,568✔
697
    if (code != 0) {
11,572!
698
      tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
×
699
      continue;
×
700
    }
701

702
    if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
11,572✔
703
      streamMetaReleaseTask(pMeta, pTask);
5,786✔
704
      continue;
5,786✔
705
    }
706

707
    // here we get the required stream source task
708
    *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
5,786✔
709

710
    int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
5,786✔
711
    if (ver == -1) {
5,786✔
712
      ver = pTask->chkInfo.processedVer;
1,828✔
713
    } else {
714
      ver--;
3,958✔
715
    }
716

717
    SVersionRange verRange = {0};
5,786✔
718
    walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer);
5,786✔
719

720
    SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
5,786✔
721
    if (pReader == NULL) {
5,786!
722
      tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
×
723
      streamMetaReleaseTask(pMeta, pTask);
×
724
      continue;
×
725
    }
726

727
    int64_t cur = 0;
5,786✔
728
    int64_t latest = 0;
5,786✔
729

730
    code = walFetchHead(pReader, ver);
5,786✔
731
    if (code == TSDB_CODE_SUCCESS) {
5,786!
732
      cur = pReader->pHead->head.ingestTs;
5,786✔
733
    }
734

735
    if (ver == verRange.maxVer) {
5,786✔
736
      latest = cur;
2,433✔
737
    } else {
738
      code = walFetchHead(pReader, verRange.maxVer);
3,353✔
739
      if (code == TSDB_CODE_SUCCESS) {
3,353!
740
        latest = pReader->pHead->head.ingestTs;
3,353✔
741
      }
742
    }
743

744
    if (pDelay != NULL) {  // delay in ms
5,786✔
745
      *pDelay = (latest - cur) / 1000;
5,785✔
746
    }
747

748
    walCloseReader(pReader);
5,786✔
749
    streamMetaReleaseTask(pMeta, pTask);
5,786✔
750
  }
751

752
  streamMetaRUnLock(pMeta);
5,786✔
753

754
  return TSDB_CODE_SUCCESS;
5,786✔
755
}
756

757
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) {
185✔
758
  int32_t          code = 0;
185✔
759
  int32_t          lino = 0;
185✔
760
  SDecoder         dc = {0};
185✔
761
  SVDropTbBatchReq batchReq = {0};
185✔
762
  tDecoderInit(&dc, (uint8_t*)data, len);
185✔
763
  code = tDecodeSVDropTbBatchReq(&dc, &batchReq);
185✔
764
  TSDB_CHECK_CODE(code, lino, _exit);
185!
765
  if (batchReq.nReqs <= 0) goto _exit;
185!
766

767
  SSDataBlock* pBlock = NULL;
185✔
768
  code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock);
185✔
769
  TSDB_CHECK_CODE(code, lino, _exit);
185!
770

771
  code = blockDataEnsureCapacity(pBlock, batchReq.nReqs);
185✔
772
  TSDB_CHECK_CODE(code, lino, _exit);
185!
773

774
  pBlock->info.rows = batchReq.nReqs;
185✔
775
  pBlock->info.version = ver;
185✔
776
  for (int32_t i = 0; i < batchReq.nReqs; ++i) {
375✔
777
    SVDropTbReq* pReq = batchReq.pReqs + i;
190✔
778
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
190✔
779
    TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
190!
780
    code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
190✔
781
    TSDB_CHECK_CODE(code, lino, _exit);
190!
782
  }
783

784
  code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
185✔
785
  TSDB_CHECK_CODE(code, lino, _exit);
185!
786
  ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK;
185✔
787
  ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock;
185✔
788

789
_exit:
185✔
790
  tDecoderClear(&dc);
185✔
791
  if (TSDB_CODE_SUCCESS != code) {
185!
792
    tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code));
×
793
    blockDataCleanup(pBlock);
×
794
    taosMemoryFree(pBlock);
×
795
  }
796
  return code;
185✔
797
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc