• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.72
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
561✔
20
  if (pHandle == NULL || pHead == NULL) {
561!
UNCOV
21
    return false;
×
22
  }
23
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
561✔
24
    return true;
557✔
25
  }
26

27
  int16_t msgType = pHead->msgType;
4✔
28
  char*   body = pHead->body;
4✔
29
  int32_t bodyLen = pHead->bodyLen;
4✔
30

31
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
4✔
32
  int64_t  realTbSuid = 0;
4✔
33
  SDecoder dcoder = {0};
4✔
34
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
4✔
35
  int32_t  len = bodyLen - sizeof(SMsgHead);
4✔
36
  tDecoderInit(&dcoder, data, len);
4✔
37

38
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
4!
39
    SVCreateStbReq req = {0};
2✔
40
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2!
UNCOV
41
      goto end;
×
42
    }
43
    realTbSuid = req.suid;
2✔
44
  } else if (msgType == TDMT_VND_DROP_STB) {
2!
UNCOV
45
    SVDropStbReq req = {0};
×
46
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
UNCOV
47
      goto end;
×
48
    }
UNCOV
49
    realTbSuid = req.suid;
×
50
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2!
51
    SVCreateTbBatchReq req = {0};
2✔
52
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
2!
UNCOV
53
      goto end;
×
54
    }
55

56
    int32_t        needRebuild = 0;
2✔
57
    SVCreateTbReq* pCreateReq = NULL;
2✔
58
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
4✔
59
      pCreateReq = req.pReqs + iReq;
2✔
60
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
2!
UNCOV
61
        needRebuild++;
×
62
      }
63
    }
64
    if (needRebuild == 0) {
2!
65
      // do nothing
66
    } else if (needRebuild == req.nReqs) {
×
67
      realTbSuid = tbSuid;
×
68
    } else {
69
      realTbSuid = tbSuid;
×
70
      SVCreateTbBatchReq reqNew = {0};
×
71
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
UNCOV
72
      if (reqNew.pArray == NULL) {
×
73
        tDeleteSVCreateTbBatchReq(&req);
×
74
        goto end;
×
75
      }
76
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
77
        pCreateReq = req.pReqs + iReq;
×
78
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
79
          reqNew.nReqs++;
×
80
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
UNCOV
81
            taosArrayDestroy(reqNew.pArray);
×
UNCOV
82
            tDeleteSVCreateTbBatchReq(&req);
×
UNCOV
83
            goto end;
×
84
          }
85
        }
86
      }
87

88
      int     tlen = 0;
×
89
      int32_t ret = 0;
×
90
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
91
      void* buf = taosMemoryMalloc(tlen);
×
92
      if (NULL == buf) {
×
UNCOV
93
        taosArrayDestroy(reqNew.pArray);
×
94
        tDeleteSVCreateTbBatchReq(&req);
×
95
        goto end;
×
96
      }
97
      SEncoder coderNew = {0};
×
98
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
99
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
100
      tEncoderClear(&coderNew);
×
101
      if (ret < 0) {
×
102
        taosMemoryFree(buf);
×
UNCOV
103
        taosArrayDestroy(reqNew.pArray);
×
104
        tDeleteSVCreateTbBatchReq(&req);
×
105
        goto end;
×
106
      }
107
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
UNCOV
108
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
UNCOV
109
      taosMemoryFree(buf);
×
UNCOV
110
      taosArrayDestroy(reqNew.pArray);
×
111
    }
112

113
    tDeleteSVCreateTbBatchReq(&req);
2✔
114
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
115
    SVAlterTbReq req = {0};
×
116

UNCOV
117
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
118
      goto end;
×
119
    }
120

121
    SMetaReader mr = {0};
×
122
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
123

UNCOV
124
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
125
      metaReaderClear(&mr);
×
126
      goto end;
×
127
    }
128
    realTbSuid = mr.me.ctbEntry.suid;
×
UNCOV
129
    metaReaderClear(&mr);
×
130
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
131
    SVDropTbBatchReq req = {0};
×
132

UNCOV
133
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
134
      goto end;
×
135
    }
136

137
    int32_t      needRebuild = 0;
×
UNCOV
138
    SVDropTbReq* pDropReq = NULL;
×
139
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
140
      pDropReq = req.pReqs + iReq;
×
141

UNCOV
142
      if (pDropReq->suid == tbSuid) {
×
143
        needRebuild++;
×
144
      }
145
    }
146
    if (needRebuild == 0) {
×
147
      // do nothing
148
    } else if (needRebuild == req.nReqs) {
×
149
      realTbSuid = tbSuid;
×
150
    } else {
151
      realTbSuid = tbSuid;
×
152
      SVDropTbBatchReq reqNew = {0};
×
UNCOV
153
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
154
      if (reqNew.pArray == NULL) {
×
155
        goto end;
×
156
      }
157
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
158
        pDropReq = req.pReqs + iReq;
×
159
        if (pDropReq->suid == tbSuid) {
×
160
          reqNew.nReqs++;
×
UNCOV
161
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
UNCOV
162
            taosArrayDestroy(reqNew.pArray);
×
UNCOV
163
            goto end;
×
164
          }
165
        }
166
      }
167

168
      int     tlen = 0;
×
169
      int32_t ret = 0;
×
170
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
171
      void* buf = taosMemoryMalloc(tlen);
×
UNCOV
172
      if (NULL == buf) {
×
173
        taosArrayDestroy(reqNew.pArray);
×
174
        goto end;
×
175
      }
176
      SEncoder coderNew = {0};
×
177
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
178
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
179
      tEncoderClear(&coderNew);
×
180
      if (ret != 0) {
×
UNCOV
181
        taosMemoryFree(buf);
×
182
        taosArrayDestroy(reqNew.pArray);
×
183
        goto end;
×
184
      }
185
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
UNCOV
186
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
187
      taosMemoryFree(buf);
×
188
      taosArrayDestroy(reqNew.pArray);
×
189
    }
190
  } else if (msgType == TDMT_VND_DELETE) {
×
UNCOV
191
    SDeleteRes req = {0};
×
192
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
UNCOV
193
      goto end;
×
194
    }
195
    realTbSuid = req.suid;
×
196
  }
197

UNCOV
198
end:
×
199
  tDecoderClear(&dcoder);
4✔
200
  return tbSuid == realTbSuid;
4✔
201
}
202

203
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
132,748✔
204
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
132,748!
UNCOV
205
    return -1;
×
206
  }
207
  int32_t code = -1;
132,796✔
208
  int32_t vgId = TD_VID(pTq->pVnode);
132,796✔
209
  int64_t id = pHandle->pWalReader->readerId;
132,796✔
210

211
  int64_t offset = *fetchOffset;
132,796✔
212
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
132,796✔
213
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
132,761✔
214
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
132,780✔
215

216
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
132,781!
217
          ", 0x%" PRIx64,
218
          vgId, offset, lastVer, committedVer, appliedVer, id);
219

220
  while (offset <= appliedVer) {
141,086✔
221
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
130,731!
UNCOV
222
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
223
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
224
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
UNCOV
225
      goto END;
×
226
    }
227

228
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
130,717!
229
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
230

231
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
130,729✔
232
      code = walFetchBody(pHandle->pWalReader);
122,033✔
233
      goto END;
122,021✔
234
    } else {
235
      if (pHandle->fetchMeta != WITH_DATA) {
8,696✔
236
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
713✔
237
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
713✔
238
          code = walFetchBody(pHandle->pWalReader);
561✔
239
          if (code < 0) {
561!
UNCOV
240
            goto END;
×
241
          }
242

243
          pHead = &(pHandle->pWalReader->pHead->head);
561✔
244
          if (isValValidForTable(pHandle, pHead)) {
561✔
245
            code = 0;
558✔
246
            goto END;
558✔
247
          } else {
248
            offset++;
3✔
249
            code = -1;
3✔
250
            continue;
3✔
251
          }
252
        }
253
      }
254
      code = walSkipFetchBody(pHandle->pWalReader);
8,135✔
255
      if (code < 0) {
8,134!
UNCOV
256
        goto END;
×
257
      }
258
      offset++;
8,134✔
259
    }
260
    code = -1;
8,134✔
261
  }
262

263
END:
10,355✔
264
  *fetchOffset = offset;
132,934✔
265
  return code;
132,934✔
266
}
267

268
bool tqGetTablePrimaryKey(STqReader* pReader) {
20,243✔
269
  if (pReader == NULL) {
20,243!
UNCOV
270
    return false;
×
271
  }
272
  return pReader->hasPrimaryKey;
20,243✔
273
}
274

275
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
182✔
276
  if (pReader == NULL) {
182!
277
    return;
×
278
  }
279
  bool            ret = false;
182✔
280
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
182✔
281
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
182!
282
    ret = true;
3✔
283
  }
284
  tDeleteSchemaWrapper(schema);
285
  pReader->hasPrimaryKey = ret;
182✔
286
}
287

288
STqReader* tqReaderOpen(SVnode* pVnode) {
8,351✔
289
  if (pVnode == NULL) {
8,351!
UNCOV
290
    return NULL;
×
291
  }
292
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
8,351!
293
  if (pReader == NULL) {
8,371!
UNCOV
294
    return NULL;
×
295
  }
296

297
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
8,371✔
298
  if (pReader->pWalReader == NULL) {
8,369!
UNCOV
299
    taosMemoryFree(pReader);
×
UNCOV
300
    return NULL;
×
301
  }
302

303
  pReader->pVnodeMeta = pVnode->pMeta;
8,369✔
304
  pReader->pColIdList = NULL;
8,369✔
305
  pReader->cachedSchemaVer = 0;
8,369✔
306
  pReader->cachedSchemaSuid = 0;
8,369✔
307
  pReader->pSchemaWrapper = NULL;
8,369✔
308
  pReader->tbIdHash = NULL;
8,369✔
309
  pReader->pResBlock = NULL;
8,369✔
310

311
  int32_t code = createDataBlock(&pReader->pResBlock);
8,369✔
312
  if (code) {
8,371!
UNCOV
313
    terrno = code;
×
314
  }
315

316
  return pReader;
8,371✔
317
}
318

319
void tqReaderClose(STqReader* pReader) {
8,283✔
320
  if (pReader == NULL) return;
8,283✔
321

322
  // close wal reader
323
  if (pReader->pWalReader) {
8,242!
324
    walCloseReader(pReader->pWalReader);
8,242✔
325
  }
326

327
  if (pReader->pSchemaWrapper) {
8,243✔
328
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,770!
329
  }
330

331
  if (pReader->pColIdList) {
8,242✔
332
    taosArrayDestroy(pReader->pColIdList);
7,937✔
333
  }
334

335
  // free hash
336
  blockDataDestroy(pReader->pResBlock);
8,242✔
337
  taosHashCleanup(pReader->tbIdHash);
8,242✔
338
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
8,244✔
339
  taosMemoryFree(pReader);
8,242!
340
}
341

342
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
5,299✔
343
  if (pReader == NULL) {
5,299!
UNCOV
344
    return TSDB_CODE_INVALID_PARA;
×
345
  }
346
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
5,299✔
347
    return -1;
142✔
348
  }
349
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
5,154!
350
  return 0;
5,158✔
351
}
352

353
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
456,917✔
354
  int32_t code = 0;
456,917✔
355

356
  while (1) {
8,083✔
357
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
465,000✔
358

359
    SWalCont* pCont = &pReader->pHead->head;
402,286✔
360
    int64_t   ver = pCont->version;
402,286✔
361
    if (ver > maxVer) {
402,286✔
362
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
304✔
363
      return TSDB_CODE_SUCCESS;
304✔
364
    }
365

366
    if (pCont->msgType == TDMT_VND_SUBMIT) {
401,982✔
367
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
391,361✔
368
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
391,361✔
369

370
      void* data = taosMemoryMalloc(len);
391,361!
371
      if (data == NULL) {
391,356!
372
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
373
        // retry
UNCOV
374
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
UNCOV
375
        return terrno;
×
376
      }
377

378
      (void)memcpy(data, pBody, len);
391,356✔
379
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
391,356✔
380

381
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
391,356✔
382
      if (code != 0) {
391,367!
UNCOV
383
        tqError("%s failed to create data submit for stream since out of memory", id);
×
UNCOV
384
        return code;
×
385
      }
386
    } else if (pCont->msgType == TDMT_VND_DELETE) {
10,621✔
387
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
10,439✔
388
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
10,439✔
389
      EStreamType blockType = STREAM_DELETE_DATA;
10,439✔
390
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
10,439✔
391
      if (code == TSDB_CODE_SUCCESS) {
10,441!
392
        if (*pItem == NULL) {
10,441✔
393
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
8,083✔
394
          // we need to continue check next data in the wal files.
395
          continue;
8,083✔
396
        } else {
397
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
2,358✔
398
        }
399
      } else {
400
        terrno = code;
×
401
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
UNCOV
402
        return code;
×
403
      }
404

405
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
367!
406
      void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
185✔
407
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
185✔
408
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
185✔
409
      if (TSDB_CODE_SUCCESS == code) {
185!
410
        if (!*pItem) {
185!
UNCOV
411
          continue;
×
412
        } else {
413
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
185!
414
        }
415
      } else {
UNCOV
416
        terrno = code;
×
UNCOV
417
        return code;
×
418
      }
419
    } else {
420
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
UNCOV
421
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
422
    }
423

424
    return code;
393,907✔
425
  }
426
}
427

428
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
521,755✔
429
  if (pReader == NULL) {
521,755!
UNCOV
430
    return false;
×
431
  }
432
  SWalReader* pWalReader = pReader->pWalReader;
521,755✔
433

434
  int64_t st = taosGetTimestampMs();
521,752✔
435
  while (1) {
466,749✔
436
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
988,501✔
437
    while (pReader->nextBlk < numOfBlocks) {
1,238,405✔
438
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
717,307✔
439
              pReader->msg.ver);
440

441
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
717,307✔
442
      if (pSubmitTbData == NULL) {
717,301✔
443
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
1!
444
                pReader->msg.ver);
UNCOV
445
        return false;
×
446
      }
447
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
717,300✔
448
        pReader->nextBlk += 1;
110✔
449
        continue;
110✔
450
      }
451
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
717,189!
452
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
467,382✔
453
        SSDataBlock* pRes = NULL;
467,382✔
454
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
467,382✔
455
        if (code == TSDB_CODE_SUCCESS) {
467,377!
456
          return true;
467,378✔
457
        }
458
      } else {
459
        pReader->nextBlk += 1;
249,826✔
460
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
249,826!
461
      }
462
    }
463

464
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
521,098✔
465
    pReader->msg.msgStr = NULL;
521,129✔
466

467
    int64_t elapsed = taosGetTimestampMs() - st;
521,124✔
468
    if (elapsed > 1000 || elapsed < 0) {
521,124!
469
      return false;
1✔
470
    }
471

472
    // try next message in wal file
473
    if (walNextValidMsg(pWalReader) < 0) {
521,123✔
474
      return false;
54,378✔
475
    }
476

477
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
466,735✔
478
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
466,735✔
479
    int64_t ver = pWalReader->pHead->head.version;
466,735✔
480
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
466,735!
UNCOV
481
      return false;
×
482
    }
483
    pReader->nextBlk = 0;
466,749✔
484
  }
485
}
486

487
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
980,392✔
488
if (pReader == NULL) {
980,392!
UNCOV
489
    return TSDB_CODE_INVALID_PARA;
×
490
  }
491
  pReader->msg.msgStr = msgStr;
980,392✔
492
  pReader->msg.msgLen = msgLen;
980,392✔
493
  pReader->msg.ver = ver;
980,392✔
494

495
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
980,392✔
496
  SDecoder decoder = {0};
980,438✔
497

498
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
980,438✔
499
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
980,416✔
500
  if (code != 0) {
980,319!
UNCOV
501
    tDecoderClear(&decoder);
×
502
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
UNCOV
503
    return code;
×
504
  }
505

506
  tDecoderClear(&decoder);
980,319✔
507
  return 0;
980,410✔
508
}
509

510
SWalReader* tqGetWalReader(STqReader* pReader) {
582,480✔
511
  if (pReader == NULL) {
582,480!
UNCOV
512
    return NULL;
×
513
  }
514
  return pReader->pWalReader;
582,480✔
515
}
516

517
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
521,745✔
518
  if (pReader == NULL) {
521,745!
UNCOV
519
    return NULL;
×
520
  }
521
  return pReader->pResBlock;
521,745✔
522
}
523

524
int64_t tqGetResultBlockTime(STqReader* pReader) {
521,733✔
525
  if (pReader == NULL) {
521,733!
UNCOV
526
    return 0;
×
527
  }
528
  return pReader->lastTs;
521,733✔
529
}
530

531
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
780,885✔
532
  if (pReader == NULL || pReader->msg.msgStr == NULL) {
780,885!
UNCOV
533
    return false;
×
534
  }
535

536
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
780,895✔
537
  while (pReader->nextBlk < numOfBlocks) {
893,513✔
538
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
465,315✔
539
            (pReader->nextBlk + 1), numOfBlocks, idstr);
540

541
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
465,314✔
542
    if (pSubmitTbData == NULL) {
465,311!
UNCOV
543
      return false;
×
544
    }
545
    if (pReader->tbIdHash == NULL) {
465,311!
UNCOV
546
      return true;
×
547
    }
548

549
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
465,311✔
550
    if (ret != NULL) {
465,359✔
551
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
352,755✔
552
      return true;
352,755✔
553
    } else {
554
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
112,604✔
555
              taosHashGetSize(pReader->tbIdHash), idstr);
556
    }
557

558
    pReader->nextBlk++;
112,623✔
559
  }
560

561
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
428,198✔
562
  pReader->nextBlk = 0;
428,273✔
563
  pReader->msg.msgStr = NULL;
428,273✔
564

565
  return false;
428,273✔
566
}
567

568
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
170,804✔
569
  if (pReader == NULL || pReader->msg.msgStr == NULL) return false;
170,804!
570

571
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
170,813✔
572
  while (pReader->nextBlk < blockSz) {
170,804✔
573
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
85,492✔
574
    if (pSubmitTbData == NULL) return false;
85,493!
575
    if (filterOutUids == NULL) return true;
85,493!
576

577
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
85,493✔
578
    if (ret == NULL) {
85,494!
579
      return true;
85,494✔
580
    }
UNCOV
581
    pReader->nextBlk++;
×
582
  }
583

584
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
85,312✔
585
  pReader->nextBlk = 0;
85,403✔
586
  pReader->msg.msgStr = NULL;
85,403✔
587

588
  return false;
85,403✔
589
}
590

591
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
118,093✔
592
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
118,093!
UNCOV
593
    return TSDB_CODE_INVALID_PARA;
×
594
  }
595
  int32_t code = 0;
118,116✔
596

597
  int32_t cnt = 0;
118,116✔
598
  for (int32_t i = 0; i < pSrc->nCols; i++) {
705,085✔
599
    cnt += mask[i];
586,969✔
600
  }
601

602
  pDst->nCols = cnt;
118,116✔
603
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
118,116!
604
  if (pDst->pSchema == NULL) {
118,098!
UNCOV
605
    return TAOS_GET_TERRNO(terrno);
×
606
  }
607

608
  int32_t j = 0;
118,098✔
609
  for (int32_t i = 0; i < pSrc->nCols; i++) {
704,240✔
610
    if (mask[i]) {
586,172✔
611
      pDst->pSchema[j++] = pSrc->pSchema[i];
586,158✔
612
      SColumnInfoData colInfo =
613
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
586,158✔
614
      code = blockDataAppendColInfo(pBlock, &colInfo);
586,892✔
615
      if (code != 0) {
586,128!
UNCOV
616
        return code;
×
617
      }
618
    }
619
  }
620
  return 0;
118,068✔
621
}
622

623
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
1,658✔
624
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
1,658!
UNCOV
625
    return TSDB_CODE_INVALID_PARA;
×
626
  }
627
  SSDataBlock* pBlock = pReader->pResBlock;
1,659✔
628
  if (blockDataGetNumOfCols(pBlock) > 0) {
1,659✔
629
      blockDataDestroy(pBlock);
1✔
630
      int32_t code = createDataBlock(&pReader->pResBlock);
1✔
631
      if (code) {
1!
UNCOV
632
        return code;
×
633
      }
634
      pBlock = pReader->pResBlock;
1✔
635

636
      pBlock->info.id.uid = pReader->cachedSchemaUid;
1✔
637
      pBlock->info.version = pReader->msg.ver;
1✔
638
  }
639

640
  int32_t numOfCols = taosArrayGetSize(pColIdList);
1,660✔
641

642
  if (numOfCols == 0) {  // all columns are required
1,660!
UNCOV
643
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
UNCOV
644
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
UNCOV
645
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
646

UNCOV
647
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
UNCOV
648
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
649
        blockDataFreeRes(pBlock);
×
UNCOV
650
        return terrno;
×
651
      }
652
    }
653
  } else {
654
    if (numOfCols > pSchema->nCols) {
1,660✔
655
      numOfCols = pSchema->nCols;
1✔
656
    }
657

658
    int32_t i = 0;
1,660✔
659
    int32_t j = 0;
1,660✔
660
    while (i < pSchema->nCols && j < numOfCols) {
13,619✔
661
      SSchema* pColSchema = &pSchema->pSchema[i];
11,951✔
662
      col_id_t colIdSchema = pColSchema->colId;
11,951✔
663

664
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
11,951✔
665
      if (pColIdNeed == NULL) {
11,956!
UNCOV
666
        break;
×
667
      }
668
      if (colIdSchema < *pColIdNeed) {
11,956✔
669
        i++;
1,606✔
670
      } else if (colIdSchema > *pColIdNeed) {
10,350!
UNCOV
671
        j++;
×
672
      } else {
673
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
10,350✔
674
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
10,375✔
675
        if (code != TSDB_CODE_SUCCESS) {
10,353!
UNCOV
676
          return -1;
×
677
        }
678
        i++;
10,353✔
679
        j++;
10,353✔
680
      }
681
    }
682
  }
683

684
  return TSDB_CODE_SUCCESS;
1,668✔
685
}
686

687
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
165,645,838✔
688
  int32_t code = TSDB_CODE_SUCCESS;
165,645,838✔
689

690
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
173,069,800!
691
    char val[65535 + 2] = {0};
6,964,160✔
692
    if (COL_VAL_IS_VALUE(pColVal)) {
6,964,160!
693
      if (pColVal->value.pData != NULL) {
7,433,025!
694
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
7,435,702✔
695
      }
696
      varDataSetLen(val, pColVal->value.nData);
7,433,025✔
697
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
7,433,025✔
698
    } else {
UNCOV
699
      colDataSetNULL(pColumnInfoData, rowIndex);
×
700
    }
701
  } else {
702
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
158,681,678✔
703
  }
704

705
  return code;
165,917,814✔
706
}
707

708
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
787,014✔
709
  if (pReader == NULL || pRes == NULL) {
787,014!
UNCOV
710
    return TSDB_CODE_INVALID_PARA;
×
711
  }
712
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
787,026✔
713
  int32_t        code = 0;
787,026✔
714
  int32_t        line = 0;
787,026✔
715
  STSchema*      pTSchema = NULL;
787,026✔
716
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
787,026✔
717
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
787,024!
718
  SSDataBlock* pBlock = pReader->pResBlock;
787,024✔
719
  *pRes = pBlock;
787,024✔
720

721
  blockDataCleanup(pBlock);
787,024✔
722

723
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
786,948✔
724
  int32_t sversion = pSubmitTbData->sver;
786,948✔
725
  int64_t suid = pSubmitTbData->suid;
786,948✔
726
  int64_t uid = pSubmitTbData->uid;
786,948✔
727
  pReader->lastTs = pSubmitTbData->ctimeMs;
786,948✔
728

729
  pBlock->info.id.uid = uid;
786,948✔
730
  pBlock->info.version = pReader->msg.ver;
786,948✔
731

732
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
786,948✔
733
      (pReader->cachedSchemaVer != sversion)) {
785,276✔
734
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,674✔
735

736
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
1,676✔
737
    if (pReader->pSchemaWrapper == NULL) {
1,676✔
738
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
17✔
739
             "version %d, possibly dropped table",
740
             vgId, suid, uid, pReader->cachedSchemaVer);
741
      pReader->cachedSchemaSuid = 0;
16✔
742
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
16✔
743
    }
744

745
    pReader->cachedSchemaUid = uid;
1,659✔
746
    pReader->cachedSchemaSuid = suid;
1,659✔
747
    pReader->cachedSchemaVer = sversion;
1,659✔
748

749
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
1,659!
UNCOV
750
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
751
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
UNCOV
752
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
753
    }
754
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
1,659✔
755
    TSDB_CHECK_CODE(code, line, END);
1,659!
756
    pBlock = pReader->pResBlock;
1,659✔
757
    *pRes = pBlock;
1,659✔
758
  }
759

760
  int32_t numOfRows = 0;
786,933✔
761
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
786,933✔
762
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
4✔
763
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
4!
764
    numOfRows = pCol->nVal;
4✔
765
  } else {
766
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
786,929✔
767
  }
768

769
  code = blockDataEnsureCapacity(pBlock, numOfRows);
786,932✔
770
  TSDB_CHECK_CODE(code, line, END);
786,941!
771
  pBlock->info.rows = numOfRows;
786,941✔
772
  int32_t colActual = blockDataGetNumOfCols(pBlock);
786,941✔
773

774
  // convert and scan one block
775
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
786,949✔
776
    SArray* pCols = pSubmitTbData->aCol;
4✔
777
    int32_t numOfCols = taosArrayGetSize(pCols);
4✔
778
    int32_t targetIdx = 0;
4✔
779
    int32_t sourceIdx = 0;
4✔
780
    while (targetIdx < colActual) {
18✔
781
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14✔
782
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
14!
783
      if (sourceIdx >= numOfCols) {
14✔
784
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
4!
785
        colDataSetNNULL(pColData, 0, numOfRows);
4!
786
        targetIdx++;
4✔
787
        continue;
4✔
788
      }
789

790
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
10✔
791
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
10!
792
      SColVal colVal = {0};
10✔
793
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
10!
794
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
795
      if (pCol->cid < pColData->info.colId) {
10✔
796
        sourceIdx++;
4✔
797
      } else if (pCol->cid == pColData->info.colId) {
6✔
798
        for (int32_t i = 0; i < pCol->nVal; i++) {
12✔
799
          tColDataGetValue(pCol, i, &colVal);
8✔
800
          code = doSetVal(pColData, i, &colVal);
8✔
801
          TSDB_CHECK_CODE(code, line, END);
8!
802
        }
803
        sourceIdx++;
4✔
804
        targetIdx++;
4✔
805
      } else {
806
        colDataSetNNULL(pColData, 0, numOfRows);
2!
807
        targetIdx++;
2✔
808
      }
809
    }
810
  } else {
811
    SArray*         pRows = pSubmitTbData->aRowP;
786,945✔
812
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
786,945✔
813
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
786,945✔
814
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
786,976✔
815

816
    for (int32_t i = 0; i < numOfRows; i++) {
57,121,767✔
817
      SRow* pRow = taosArrayGetP(pRows, i);
56,529,103✔
818
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
56,507,539!
819
      int32_t sourceIdx = 0;
56,529,226✔
820
      for (int32_t j = 0; j < colActual; j++) {
207,055,680✔
821
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
150,720,884✔
822
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
150,720,681!
823

824
        while (1) {
19,908,365✔
825
          SColVal colVal = {0};
170,629,046✔
826
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
170,629,046✔
827
          TSDB_CHECK_CODE(code, line, END);
170,233,718!
828

829
          if (colVal.cid < pColData->info.colId) {
170,233,718✔
830
            sourceIdx++;
19,908,365✔
831
            continue;
19,908,365✔
832
          } else if (colVal.cid == pColData->info.colId) {
150,325,353!
833
            code = doSetVal(pColData, i, &colVal);
150,892,881✔
834
            TSDB_CHECK_CODE(code, line, END);
151,093,982!
835
            sourceIdx++;
151,093,982✔
836
            break;
150,526,454✔
837
          } else {
UNCOV
838
            colDataSetNULL(pColData, i);
×
UNCOV
839
            break;
×
840
          }
841
        }
842
      }
843
    }
844
  }
845

846
END:
592,664✔
847
  if (code != 0) {
592,668!
UNCOV
848
    tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
×
849
  }
850
  taosMemoryFreeClear(pTSchema);
786,956!
851
  return code;
786,999✔
852
}
853

854
#define PROCESS_VAL                                      \
855
  if (curRow == 0) {                                     \
856
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
857
    buildNew = true;                                     \
858
  } else {                                               \
859
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
860
    if (currentRowAssigned != assigned[j]) {             \
861
      assigned[j] = currentRowAssigned;                  \
862
      buildNew = true;                                   \
863
    }                                                    \
864
  }
865

866
#define SET_DATA                                                     \
867
  if (colVal.cid < pColData->info.colId) {                           \
868
    sourceIdx++;                                                     \
869
  } else if (colVal.cid == pColData->info.colId) {                   \
870
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
871
    sourceIdx++;                                                     \
872
    targetIdx++;                                                     \
873
  }
874

875
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
117,988✔
876
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
877
                               int32_t* lastRow) {
878
  if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL || pSchemaWrapper == NULL ||
117,988!
879
      assigned == NULL || lastRow == NULL) {
118,025!
UNCOV
880
    return TSDB_CODE_INVALID_PARA;
×
881
  }
882
  int32_t         code = 0;
118,034✔
883
  SSchemaWrapper* pSW = NULL;
118,034✔
884
  SSDataBlock*    block = NULL;
118,034✔
885
  if (taosArrayGetSize(blocks) > 0) {
118,034!
UNCOV
886
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
UNCOV
887
    TQ_NULL_GO_TO_END(pLastBlock);
×
UNCOV
888
    pLastBlock->info.rows = curRow - *lastRow;
×
UNCOV
889
    *lastRow = curRow;
×
890
  }
891

892
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
118,036!
893
  TQ_NULL_GO_TO_END(block);
118,133!
894

895
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
118,133!
896
  TQ_NULL_GO_TO_END(pSW);
118,119!
897

898
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
118,119!
899
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
118,117!
900
          (int32_t)taosArrayGetSize(block->pDataBlock));
901

902
  block->info.id.uid = pSubmitTbData->uid;
118,117✔
903
  block->info.version = pReader->msg.ver;
118,117✔
904
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
118,117!
905
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
118,098!
906
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
118,074!
907
  pSW = NULL;
118,074✔
908
  taosMemoryFreeClear(block);
118,074!
909

910
END:
2✔
911
  tDeleteSchemaWrapper(pSW);
118,171!
912
  blockDataFreeRes(block);
118,139✔
913
  taosMemoryFree(block);
118,120!
914
  return code;
118,114✔
915
}
916
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
113✔
917
  if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL) {
113!
UNCOV
918
    return TSDB_CODE_INVALID_PARA;
×
919
  }
920
  int32_t code = 0;
113✔
921
  int32_t curRow = 0;
113✔
922
  int32_t lastRow = 0;
113✔
923

924
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
113✔
925
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
113!
926
  TQ_NULL_GO_TO_END(assigned);
113!
927

928
  SArray*   pCols = pSubmitTbData->aCol;
113✔
929
  SColData* pCol = taosArrayGet(pCols, 0);
113✔
930
  TQ_NULL_GO_TO_END(pCol);
113!
931
  int32_t numOfRows = pCol->nVal;
113✔
932
  int32_t numOfCols = taosArrayGetSize(pCols);
113✔
933
  for (int32_t i = 0; i < numOfRows; i++) {
299✔
934
    bool buildNew = false;
185✔
935

936
    for (int32_t j = 0; j < numOfCols; j++) {
949✔
937
      pCol = taosArrayGet(pCols, j);
765✔
938
      TQ_NULL_GO_TO_END(pCol);
765!
939
      SColVal colVal = {0};
765✔
940
      tColDataGetValue(pCol, i, &colVal);
765✔
941
      PROCESS_VAL
764!
942
    }
943

944
    if (buildNew) {
184✔
945
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
113!
946
                                       curRow, &lastRow));
947
    }
948

949
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
184✔
950
    TQ_NULL_GO_TO_END(pBlock);
185!
951

952
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
185!
953
            (int32_t)taosArrayGetSize(blocks));
954

955
    int32_t targetIdx = 0;
185✔
956
    int32_t sourceIdx = 0;
185✔
957
    int32_t colActual = blockDataGetNumOfCols(pBlock);
185✔
958
    while (targetIdx < colActual) {
945✔
959
      pCol = taosArrayGet(pCols, sourceIdx);
759✔
960
      TQ_NULL_GO_TO_END(pCol);
759!
961
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
759✔
962
      TQ_NULL_GO_TO_END(pColData);
759!
963
      SColVal colVal = {0};
759✔
964
      tColDataGetValue(pCol, i, &colVal);
759✔
965
      SET_DATA
761!
966
    }
967

968
    curRow++;
186✔
969
  }
970
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
114✔
971
  pLastBlock->info.rows = curRow - lastRow;
113✔
972

973
END:
113✔
974
  taosMemoryFree(assigned);
113!
975
  return code;
113✔
976
}
977

978
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
118,005✔
979
  if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL) {
118,005!
UNCOV
980
    return TSDB_CODE_INVALID_PARA;
×
981
  }
982
  int32_t   code = 0;
118,030✔
983
  STSchema* pTSchema = NULL;
118,030✔
984

985
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
118,030✔
986
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
118,030!
987
  TQ_NULL_GO_TO_END(assigned);
118,035!
988

989
  int32_t curRow = 0;
118,035✔
990
  int32_t lastRow = 0;
118,035✔
991
  SArray* pRows = pSubmitTbData->aRowP;
118,035✔
992
  int32_t numOfRows = taosArrayGetSize(pRows);
118,035✔
993
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
118,003✔
994

995
  for (int32_t i = 0; i < numOfRows; i++) {
3,497,075✔
996
    bool  buildNew = false;
3,379,212✔
997
    SRow* pRow = taosArrayGetP(pRows, i);
3,379,212✔
998
    TQ_NULL_GO_TO_END(pRow);
3,372,089!
999

1000
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
18,266,189✔
1001
      SColVal colVal = {0};
14,917,773✔
1002
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
14,917,773!
1003
      PROCESS_VAL
14,894,646!
1004
    }
1005

1006
    if (buildNew) {
3,348,416✔
1007
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
117,901!
1008
                                       curRow, &lastRow));
1009
    }
1010

1011
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
3,348,505✔
1012
    TQ_NULL_GO_TO_END(pBlock);
3,369,632!
1013

1014
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
3,369,632!
1015
            (int32_t)taosArrayGetSize(blocks));
1016

1017
    int32_t targetIdx = 0;
3,369,632✔
1018
    int32_t sourceIdx = 0;
3,369,632✔
1019
    int32_t colActual = blockDataGetNumOfCols(pBlock);
3,369,632✔
1020
    while (targetIdx < colActual) {
18,224,599✔
1021
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14,814,898✔
1022
      SColVal          colVal = {0};
14,789,005✔
1023
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
14,789,005!
1024
      SET_DATA
14,727,558!
1025
    }
1026

1027
    curRow++;
3,409,701✔
1028
  }
1029
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
117,863✔
1030
  pLastBlock->info.rows = curRow - lastRow;
117,842✔
1031

1032
END:
117,842✔
1033
  taosMemoryFreeClear(pTSchema);
117,842!
1034
  taosMemoryFree(assigned);
118,016!
1035
  return code;
117,963✔
1036
}
1037

1038
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
118,597✔
1039
  if (pReader == NULL || blocks == NULL || schemas == NULL) {
118,597!
UNCOV
1040
    return TSDB_CODE_INVALID_PARA;
×
1041
  }
1042
  tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
118,600!
1043
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
118,600✔
1044
  if (pSubmitTbData == NULL) {
118,598!
UNCOV
1045
    return terrno;
×
1046
  }
1047
  pReader->nextBlk++;
118,598✔
1048

1049
  if (pSubmitTbDataRet) {
118,598!
1050
    *pSubmitTbDataRet = pSubmitTbData;
118,598✔
1051
  }
1052

1053
  int32_t sversion = pSubmitTbData->sver;
118,598✔
1054
  int64_t uid = pSubmitTbData->uid;
118,598✔
1055
  pReader->lastBlkUid = uid;
118,598✔
1056

1057
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
118,598✔
1058
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
118,604✔
1059
  if (pReader->pSchemaWrapper == NULL) {
118,568✔
1060
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
437✔
1061
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1062
    pReader->cachedSchemaSuid = 0;
417✔
1063
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
417✔
1064
  }
1065

1066
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
118,131✔
1067
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
113✔
1068
  } else {
1069
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
118,018✔
1070
  }
1071
}
1072

1073
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) {
8,054✔
1074
  if (pReader == NULL){
8,054!
UNCOV
1075
    return;
×
1076
  }
1077
  pReader->pColIdList = pColIdList;
8,054✔
1078
}
1079

1080
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
8,143✔
1081
  if (pReader == NULL || tbUidList == NULL) {
8,143!
UNCOV
1082
    return;
×
1083
  }
1084
  if (pReader->tbIdHash) {
8,144✔
1085
    taosHashClear(pReader->tbIdHash);
14✔
1086
  } else {
1087
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
8,130✔
1088
    if (pReader->tbIdHash == NULL) {
8,131!
UNCOV
1089
      tqError("s-task:%s failed to init hash table", id);
×
UNCOV
1090
      return;
×
1091
    }
1092
  }
1093

1094
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
433,048✔
1095
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
424,845✔
1096
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
424,783!
UNCOV
1097
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
UNCOV
1098
      continue;
×
1099
    }
1100
  }
1101

1102
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
8,140✔
1103
}
1104

1105
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
20,219✔
1106
  if (pReader == NULL || pTableUidList == NULL) {
20,219!
1107
    return;
×
1108
  }
1109
  if (pReader->tbIdHash == NULL) {
20,222!
UNCOV
1110
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1111
    if (pReader->tbIdHash == NULL) {
×
UNCOV
1112
      tqError("failed to init hash table");
×
UNCOV
1113
      return;
×
1114
    }
1115
  }
1116

1117
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
20,222✔
1118
  for (int i = 0; i < numOfTables; i++) {
21,137✔
1119
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
915✔
1120
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
915!
UNCOV
1121
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
UNCOV
1122
      continue;
×
1123
    }
1124
  }
1125
}
1126

1127
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
2,381✔
1128
  if (pReader == NULL) {
2,381!
UNCOV
1129
    return false;
×
1130
  }
1131
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
2,381✔
1132
}
1133

1134
bool tqCurrentBlockConsumed(const STqReader* pReader) {
720,954✔
1135
  if (pReader == NULL) {
720,954!
UNCOV
1136
    return false;
×
1137
  }
1138
  return pReader->msg.msgStr == NULL;
720,954✔
1139
}
1140

1141
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
745✔
1142
  if (pReader == NULL || tbUidList == NULL) {
745!
1143
    return;
×
1144
  }
1145
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
745!
UNCOV
1146
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1147
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
×
UNCOV
1148
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1149
    }
1150
  }
1151
}
1152

1153
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
89,633✔
1154
  if (pTq == NULL || tbUidList == NULL) {
89,633!
UNCOV
1155
    return TSDB_CODE_INVALID_PARA;
×
1156
  }
1157
  void*   pIter = NULL;
89,635✔
1158
  int32_t vgId = TD_VID(pTq->pVnode);
89,635✔
1159

1160
  // update the table list for each consumer handle
1161
  taosWLockLatch(&pTq->lock);
89,635✔
1162
  while (1) {
4,074✔
1163
    pIter = taosHashIterate(pTq->pHandle, pIter);
93,709✔
1164
    if (pIter == NULL) {
93,709✔
1165
      break;
89,635✔
1166
    }
1167

1168
    STqHandle* pTqHandle = (STqHandle*)pIter;
4,074✔
1169
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,074✔
1170
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
963✔
1171
      if (code != 0) {
963!
UNCOV
1172
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
UNCOV
1173
        continue;
×
1174
      }
1175
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
3,111✔
1176
      if (!isAdd) {
3,097✔
1177
        int32_t sz = taosArrayGetSize(tbUidList);
1,004✔
1178
        for (int32_t i = 0; i < sz; i++) {
1,004!
UNCOV
1179
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1180
          if (tbUid &&
×
UNCOV
1181
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
UNCOV
1182
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
UNCOV
1183
            continue;
×
1184
          }
1185
        }
1186
      }
1187
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
14!
1188
      if (isAdd) {
14!
1189
        SArray* list = NULL;
14✔
1190
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
14✔
1191
                                    &list, pTqHandle->execHandle.task);
1192
        if (ret != TDB_CODE_SUCCESS) {
14!
UNCOV
1193
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1194
                  pTqHandle->consumerId);
UNCOV
1195
          taosArrayDestroy(list);
×
UNCOV
1196
          taosHashCancelIterate(pTq->pHandle, pIter);
×
UNCOV
1197
          taosWUnLockLatch(&pTq->lock);
×
1198

UNCOV
1199
          return ret;
×
1200
        }
1201
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
14✔
1202
        taosArrayDestroy(list);
14✔
1203
      } else {
UNCOV
1204
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1205
      }
1206
    }
1207
  }
1208
  taosWUnLockLatch(&pTq->lock);
89,635✔
1209

1210
  // update the table list handle for each stream scanner/wal reader
1211
  streamMetaWLock(pTq->pStreamMeta);
89,635✔
1212
  while (1) {
41,054✔
1213
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
130,689✔
1214
    if (pIter == NULL) {
130,684✔
1215
      break;
89,635✔
1216
    }
1217

1218
    int64_t      refId = *(int64_t*)pIter;
41,049✔
1219
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
41,049✔
1220
    if (pTask != NULL) {
41,053!
1221
      int32_t taskId = pTask->id.taskId;
41,053✔
1222

1223
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
41,053✔
1224
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
20,047✔
1225
        if (code != 0) {
20,042✔
1226
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
57!
1227
        }
1228
      }
1229
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
41,048✔
1230
      if (ret) {
41,054!
UNCOV
1231
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1232
      }
1233
    }
1234
  }
1235

1236
  streamMetaWUnLock(pTq->pStreamMeta);
89,635✔
1237
  return 0;
89,635✔
1238
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc