• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17

UNCOV
18
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
×
UNCOV
19
  if (pBlock == NULL || pRsp == NULL) {
×
20
    return TSDB_CODE_INVALID_PARA;
×
21
  }
UNCOV
22
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
×
UNCOV
23
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
×
UNCOV
24
  void*   buf = taosMemoryCalloc(1, dataStrLen);
×
UNCOV
25
  if (buf == NULL) {
×
26
    return terrno;
×
27
  }
28

UNCOV
29
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
×
UNCOV
30
  pRetrieve->version = 1;
×
UNCOV
31
  pRetrieve->precision = precision;
×
UNCOV
32
  pRetrieve->compressed = 0;
×
UNCOV
33
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
×
34

UNCOV
35
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
×
UNCOV
36
  if(actualLen < 0){
×
37
    taosMemoryFree(buf);
×
38
    return terrno;
×
39
  }
UNCOV
40
  actualLen += sizeof(SRetrieveTableRspForTmq);
×
UNCOV
41
  if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
×
42
    taosMemoryFree(buf);
×
43
    return terrno;
×
44
  }
UNCOV
45
  if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
×
46
    taosMemoryFree(buf);
×
47
    return terrno;
×
48
  }
49

UNCOV
50
  return TSDB_CODE_SUCCESS;
×
51
}
52

UNCOV
53
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
×
UNCOV
54
  if (pRsp == NULL || pTq == NULL) {
×
55
    return TSDB_CODE_INVALID_PARA;
×
56
  }
UNCOV
57
  SMetaReader mr = {0};
×
UNCOV
58
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
×
59

UNCOV
60
  int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
×
UNCOV
61
  if (code < 0) {
×
UNCOV
62
    metaReaderClear(&mr);
×
UNCOV
63
    return code;
×
64
  }
65

UNCOV
66
  for (int32_t i = 0; i < n; i++) {
×
UNCOV
67
    char* tbName = taosStrdup(mr.me.name);
×
UNCOV
68
    if (tbName == NULL) {
×
69
      metaReaderClear(&mr);
×
70
      return terrno;
×
71
    }
UNCOV
72
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
×
73
      tqError("failed to push tbName to blockTbName:%s", tbName);
×
74
      continue;
×
75
    }
76
  }
UNCOV
77
  metaReaderClear(&mr);
×
UNCOV
78
  return 0;
×
79
}
80

UNCOV
81
int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
×
UNCOV
82
  if (task == NULL || pHandle == NULL || res == NULL) {
×
83
    return TSDB_CODE_INVALID_PARA;
×
84
  }
UNCOV
85
  uint64_t ts = 0;
×
UNCOV
86
  qStreamSetOpen(task);
×
87

UNCOV
88
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
×
UNCOV
89
  int32_t code = qExecTask(task, res, &ts);
×
UNCOV
90
  if (code != TSDB_CODE_SUCCESS) {
×
91
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
×
92
    return code;
×
93
  }
94

UNCOV
95
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
×
UNCOV
96
  return 0;
×
97
}
98

UNCOV
99
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
×
UNCOV
100
  if (pTq == NULL || pHandle == NULL || pRsp == NULL || pOffset == NULL || pRequest == NULL){
×
101
    return TSDB_CODE_INVALID_PARA;
×
102
  }
UNCOV
103
  int32_t vgId = TD_VID(pTq->pVnode);
×
UNCOV
104
  int32_t code = 0;
×
UNCOV
105
  int32_t line = 0;
×
UNCOV
106
  int32_t totalRows = 0;
×
107

UNCOV
108
  const STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
109
  qTaskInfo_t          task = pExec->task;
×
110

UNCOV
111
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
×
UNCOV
112
  TSDB_CHECK_CODE(code, line, END);
×
113

UNCOV
114
  qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
×
UNCOV
115
  uint64_t st = taosGetTimestampMs();
×
UNCOV
116
  while (1) {
×
UNCOV
117
    SSDataBlock* pDataBlock = NULL;
×
UNCOV
118
    code = getDataBlock(task, pHandle, vgId, &pDataBlock);
×
UNCOV
119
    TSDB_CHECK_CODE(code, line, END);
×
120

UNCOV
121
    if (pRequest->enableReplay) {
×
UNCOV
122
      if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
×
UNCOV
123
        blockDataDestroy(pHandle->block);
×
UNCOV
124
        pHandle->block = NULL;
×
125
      }
UNCOV
126
      if (pHandle->block == NULL) {
×
UNCOV
127
        if (pDataBlock == NULL) {
×
UNCOV
128
          break;
×
129
        }
130

UNCOV
131
        STqOffsetVal offset = {0};
×
UNCOV
132
        code = qStreamExtractOffset(task, &offset);
×
UNCOV
133
        TSDB_CHECK_CODE(code, line, END);
×
134

UNCOV
135
        pHandle->block = NULL;
×
136

UNCOV
137
        code = createOneDataBlock(pDataBlock, true, &pHandle->block);
×
UNCOV
138
        TSDB_CHECK_CODE(code, line, END);
×
139

UNCOV
140
        pHandle->blockTime = offset.ts;
×
UNCOV
141
        tOffsetDestroy(&offset);
×
UNCOV
142
        code = getDataBlock(task, pHandle, vgId, &pDataBlock);
×
UNCOV
143
        TSDB_CHECK_CODE(code, line, END);
×
144
      }
145

UNCOV
146
      code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
×
UNCOV
147
      TSDB_CHECK_CODE(code, line, END);
×
148

UNCOV
149
      pRsp->blockNum++;
×
UNCOV
150
      if (pDataBlock == NULL) {
×
UNCOV
151
        blockDataDestroy(pHandle->block);
×
UNCOV
152
        pHandle->block = NULL;
×
153
      } else {
UNCOV
154
        code = copyDataBlock(pHandle->block, pDataBlock);
×
UNCOV
155
        TSDB_CHECK_CODE(code, line, END);
×
156

UNCOV
157
        STqOffsetVal offset = {0};
×
UNCOV
158
        code = qStreamExtractOffset(task, &offset);
×
UNCOV
159
        TSDB_CHECK_CODE(code, line, END);
×
160

UNCOV
161
        pRsp->sleepTime = offset.ts - pHandle->blockTime;
×
UNCOV
162
        pHandle->blockTime = offset.ts;
×
UNCOV
163
        tOffsetDestroy(&offset);
×
164
      }
UNCOV
165
      break;
×
166
    } else {
UNCOV
167
      if (pDataBlock == NULL) {
×
UNCOV
168
        break;
×
169
      }
UNCOV
170
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
×
UNCOV
171
      TSDB_CHECK_CODE(code, line, END);
×
172

UNCOV
173
      pRsp->blockNum++;
×
UNCOV
174
      totalRows += pDataBlock->info.rows;
×
UNCOV
175
      if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
×
176
        break;
177
      }
178
    }
179
  }
180

UNCOV
181
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d",
×
182
          pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
UNCOV
183
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
×
UNCOV
184
END:
×
UNCOV
185
  if (code != 0) {
×
UNCOV
186
    tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line,
×
187
            code);
188
  }
UNCOV
189
  return code;
×
190
}
191

UNCOV
192
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
×
UNCOV
193
  if (pTq == NULL || pHandle == NULL || pRsp == NULL || pBatchMetaRsp == NULL || pOffset == NULL) {
×
194
    return TSDB_CODE_INVALID_PARA;
×
195
  }
UNCOV
196
  const STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
197
  qTaskInfo_t          task = pExec->task;
×
UNCOV
198
  int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
×
UNCOV
199
  if (code != 0) {
×
200
    return code;
×
201
  }
202

UNCOV
203
  int32_t rowCnt = 0;
×
UNCOV
204
  while (1) {
×
UNCOV
205
    SSDataBlock* pDataBlock = NULL;
×
UNCOV
206
    uint64_t     ts = 0;
×
UNCOV
207
    tqDebug("tmqsnap task start to execute");
×
UNCOV
208
    code = qExecTask(task, &pDataBlock, &ts);
×
UNCOV
209
    if (code != 0) {
×
210
      tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
×
211
      return code;
×
212
    }
213

UNCOV
214
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
×
215

UNCOV
216
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
×
UNCOV
217
      if (pRsp->withTbName) {
×
UNCOV
218
        char* tbName = taosStrdup(qExtractTbnameFromTask(task));
×
UNCOV
219
        if (tbName == NULL) {
×
220
          tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
×
221
          return terrno;
×
222
        }
UNCOV
223
        if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
×
224
          tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
225
          continue;
×
226
        }
227
      }
UNCOV
228
      if (pRsp->withSchema) {
×
UNCOV
229
        SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
×
UNCOV
230
        if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
×
231
          tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
232
          continue;
×
233
        }
234
      }
235

UNCOV
236
      if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
×
UNCOV
237
                          pTq->pVnode->config.tsdbCfg.precision) != 0) {
×
238
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
239
        continue;
×
240
      }
UNCOV
241
      pRsp->blockNum++;
×
UNCOV
242
      rowCnt += pDataBlock->info.rows;
×
UNCOV
243
      if (rowCnt <= tmqRowSize) continue;
×
244

245
    }
246

247
    // get meta
UNCOV
248
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
×
UNCOV
249
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
×
UNCOV
250
      code = qStreamExtractOffset(task, &tmp->rspOffset);
×
UNCOV
251
      if (code) {
×
252
        return code;
×
253
      }
254

UNCOV
255
      *pBatchMetaRsp = *tmp;
×
UNCOV
256
      tqDebug("tmqsnap task get meta");
×
UNCOV
257
      break;
×
258
    }
259

UNCOV
260
    if (pDataBlock == NULL) {
×
UNCOV
261
      code = qStreamExtractOffset(task, pOffset);
×
UNCOV
262
      if (code) {
×
263
        break;
×
264
      }
265

UNCOV
266
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
×
UNCOV
267
        continue;
×
268
      }
269

UNCOV
270
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
×
271
              pHandle->snapshotVer + 1);
UNCOV
272
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
×
UNCOV
273
      break;
×
274
    }
275

UNCOV
276
    if (pRsp->blockNum > 0) {
×
UNCOV
277
      tqDebug("tmqsnap task exec exited, get data");
×
UNCOV
278
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
×
UNCOV
279
      break;
×
280
    }
281
  }
282

UNCOV
283
  return code;
×
284
}
285

UNCOV
286
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
×
UNCOV
287
  if (pRsp == NULL || pCreateTbReq == NULL) {
×
288
    return TSDB_CODE_INVALID_PARA;
×
289
  }
UNCOV
290
  int32_t code = 0;
×
UNCOV
291
  void*   createReq = NULL;
×
UNCOV
292
  if (pRsp->createTableNum == 0) {
×
UNCOV
293
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
×
UNCOV
294
    if (pRsp->createTableLen == NULL) {
×
295
      code = terrno;
×
296
      goto END;
×
297
    }
UNCOV
298
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
×
UNCOV
299
    if (pRsp->createTableReq == NULL) {
×
300
      code = terrno;
×
301
      goto END;
×
302
    }
303
  }
304

UNCOV
305
  uint32_t len = 0;
×
UNCOV
306
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
×
UNCOV
307
  if (TSDB_CODE_SUCCESS != code) {
×
308
    goto END;
×
309
  }
UNCOV
310
  createReq = taosMemoryCalloc(1, len);
×
UNCOV
311
  if (createReq == NULL){
×
312
    code = terrno;
×
313
    goto END;
×
314
  }
UNCOV
315
  SEncoder encoder = {0};
×
UNCOV
316
  tEncoderInit(&encoder, createReq, len);
×
UNCOV
317
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
×
UNCOV
318
  tEncoderClear(&encoder);
×
UNCOV
319
  if (code < 0) {
×
320
    goto END;
×
321
  }
UNCOV
322
  if (taosArrayPush(pRsp->createTableLen, &len) == NULL){
×
323
    code = terrno;
×
324
    goto END;
×
325
  }
UNCOV
326
  if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){
×
327
    code = terrno;
×
328
    goto END;
×
329
  }
UNCOV
330
  pRsp->createTableNum++;
×
331

UNCOV
332
  return 0;
×
333
END:
×
334
  taosMemoryFree(createReq);
×
335
  return code;
×
336
}
337

UNCOV
338
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
×
UNCOV
339
  if (pTq == NULL || pHandle == NULL || pRsp == NULL || totalRows == NULL) {
×
UNCOV
340
    return;
×
341
  }
UNCOV
342
  int32_t code = 0;
×
UNCOV
343
  STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
344
  STqReader* pReader = pExec->pTqReader;
×
UNCOV
345
  SArray* pBlocks = NULL;
×
UNCOV
346
  SArray* pSchemas = NULL;
×
UNCOV
347
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
×
UNCOV
348
  if (pBlocks == NULL) {
×
349
    code = terrno;
×
350
    goto END;
×
351
  }
UNCOV
352
  pSchemas = taosArrayInit(0, sizeof(void*));
×
UNCOV
353
  if(pSchemas == NULL){
×
354
    code = terrno;
×
355
    goto END;
×
356
  }
357

UNCOV
358
  SSubmitTbData* pSubmitTbDataRet = NULL;
×
UNCOV
359
  int64_t createTime = INT64_MAX;
×
UNCOV
360
  code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime);
×
UNCOV
361
  if (code != 0) {
×
UNCOV
362
    tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId);
×
UNCOV
363
    goto END;
×
364
  }
365

UNCOV
366
  if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) {
×
UNCOV
367
    goto END;
×
368
  }
UNCOV
369
  if (pRsp->withTbName) {
×
UNCOV
370
    int64_t uid = pExec->pTqReader->lastBlkUid;
×
UNCOV
371
    code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks));
×
UNCOV
372
    if (code != 0) {
×
UNCOV
373
      tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
×
UNCOV
374
      goto END;
×
375
    }
376
  }
UNCOV
377
  if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
×
UNCOV
378
    if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) {  // judge if table is already created to avoid sending crateTbReq
×
UNCOV
379
      code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq);
×
UNCOV
380
      if (code != 0){
×
UNCOV
381
        tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId);
×
382
        goto END;
×
383
      }
384
    }
385
  }
UNCOV
386
  if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) {
×
UNCOV
387
    goto END;
×
388
  }
UNCOV
389
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
×
UNCOV
390
    SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
×
UNCOV
391
    if (pBlock == NULL) {
×
392
      continue;
×
393
    }
UNCOV
394
    if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock),
×
UNCOV
395
                            pTq->pVnode->config.tsdbCfg.precision) != 0){
×
396
      tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
397
      continue;
×
398
    }
UNCOV
399
    *totalRows += pBlock->info.rows;
×
UNCOV
400
    blockDataFreeRes(pBlock);
×
UNCOV
401
    SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
×
UNCOV
402
    if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
×
403
      tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
×
404
      continue;
×
405
    }
UNCOV
406
    pRsp->blockNum++;
×
407
  }
408

UNCOV
409
  taosArrayDestroy(pBlocks);
×
UNCOV
410
  taosArrayDestroy(pSchemas);
×
UNCOV
411
  return;
×
412

UNCOV
413
END:
×
UNCOV
414
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
×
UNCOV
415
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
×
416
}
417

UNCOV
418
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows,
×
419
                       int8_t sourceExcluded) {
UNCOV
420
  if (pTq == NULL || pHandle == NULL || pRsp == NULL || totalRows == NULL) {
×
UNCOV
421
    return TSDB_CODE_INVALID_PARA;
×
422
  }
UNCOV
423
  STqExecHandle* pExec = &pHandle->execHandle;
×
UNCOV
424
  int32_t        code = 0;
×
UNCOV
425
  STqReader* pReader = pExec->pTqReader;
×
UNCOV
426
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver);
×
UNCOV
427
  if (code != 0) {
×
428
    return code;
×
429
  }
430

UNCOV
431
  if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
432
    while (tqNextBlockImpl(pReader, NULL)) {
×
UNCOV
433
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
×
434
    }
UNCOV
435
  } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
×
UNCOV
436
    while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
×
UNCOV
437
      tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
×
438
    }
439
  }
440

UNCOV
441
  return code;
×
442
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc