• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4866

26 Nov 2025 05:46AM UTC coverage: 64.504% (+0.009%) from 64.495%
#4866

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

768 of 945 new or added lines in 33 files covered. (81.27%)

3056 existing lines in 105 files now uncovered.

158119 of 245129 relevant lines covered (64.5%)

113247450.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.72
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr);
20

21
static void processCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
176✔
22
  int32_t code = 0;
176✔
23
  int32_t lino = 0;
176✔
24
  int32_t        needRebuild = 0;
176✔
25
  SVCreateTbReq* pCreateReq = NULL;
176✔
26
  SVCreateTbBatchReq reqNew = {0};
176✔
27
  void* buf = NULL;
176✔
28
  SVCreateTbBatchReq req = {0};
176✔
29
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
176✔
30
  if (code < 0) {
176✔
NEW
31
    lino = __LINE__;
×
NEW
32
    goto end;
×
33
  }
34

35
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
393✔
36
    pCreateReq = req.pReqs + iReq;
217✔
37
    if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
389✔
38
        taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
172✔
39
      needRebuild++;
86✔
40
    }
41
  }
42
  if (needRebuild == 0) {
176✔
43
    // do nothing
44
  } else if (needRebuild == req.nReqs) {
86✔
45
    *realTbSuid = tbSuid;
45✔
46
  } else {
47
    *realTbSuid = tbSuid;
41✔
48
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
41✔
49
    if (reqNew.pArray == NULL) {
41✔
NEW
50
      code = terrno;
×
NEW
51
      lino = __LINE__;
×
NEW
52
      goto end;
×
53
    }
54
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
123✔
55
      pCreateReq = req.pReqs + iReq;
82✔
56
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
164✔
57
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
82✔
58
        reqNew.nReqs++;
41✔
59
        if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
82✔
NEW
60
          code = terrno;
×
NEW
61
          lino = __LINE__;
×
NEW
62
          goto end;
×
63
        }
64
      }
65
    }
66

67
    int     tlen = 0;
41✔
68
    tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
41✔
69
    buf = taosMemoryMalloc(tlen);
41✔
70
    if (NULL == buf || code < 0) {
41✔
NEW
71
      lino = __LINE__;
×
NEW
72
      goto end;
×
73
    }
74
    SEncoder coderNew = {0};
41✔
75
    tEncoderInit(&coderNew, buf, tlen);
41✔
76
    code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
41✔
77
    tEncoderClear(&coderNew);
41✔
78
    if (code < 0) {
41✔
NEW
79
      lino = __LINE__;
×
NEW
80
      goto end;
×
81
    }
82
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
41✔
83
    pHead->bodyLen = tlen + sizeof(SMsgHead);
41✔
84
  }
85

86
end:
176✔
87
  taosMemoryFree(buf);
176✔
88
  taosArrayDestroy(reqNew.pArray);
176✔
89
  tDeleteSVCreateTbBatchReq(&req);
176✔
90
  if (code < 0) {
176✔
NEW
91
    tqError("processCreateTbMsg failed, code:%d, line:%d", code, lino);
×
92
  }
93
}
176✔
94

95
static void processAlterTbMsg(SDecoder* dcoder, STqReader* pReader, int64_t* realTbSuid) {
135✔
96
  SVAlterTbReq req = {0};
135✔
97
  SMetaReader mr = {0};
135✔
98
  int32_t lino = 0;
135✔
99
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
135✔
100
  if (code < 0) {
135✔
NEW
101
    lino = __LINE__;
×
NEW
102
    goto end;
×
103
  }
104

105
  metaReaderDoInit(&mr, pReader->pVnodeMeta, META_READER_LOCK);
135✔
106

107
  code = metaGetTableEntryByName(&mr, req.tbName);
135✔
108
  if (code < 0) {
135✔
NEW
109
    lino = __LINE__;
×
NEW
110
    goto end;
×
111
  }
112
  if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
135✔
113
    *realTbSuid = mr.me.ctbEntry.suid;
90✔
114
  }
115

116
end:
135✔
117
  taosArrayDestroy(req.pMultiTag);
135✔
118
  metaReaderClear(&mr);  
135✔
119
  if (code < 0) {
135✔
NEW
120
    tqError("processAlterTbMsg failed, code:%d, line:%d", code, lino);
×
121
  }
122
} 
135✔
123

NEW
124
static void processDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
×
NEW
125
  SVDropTbBatchReq req = {0};
×
NEW
126
  SVDropTbBatchReq reqNew = {0};
×
NEW
127
  void* buf = NULL;
×
NEW
128
  int32_t lino = 0;
×
NEW
129
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
×
NEW
130
  if (code < 0) {
×
NEW
131
    lino = __LINE__;
×
NEW
132
    goto end;
×
133
  }
134

NEW
135
  int32_t      needRebuild = 0;
×
NEW
136
  SVDropTbReq* pDropReq = NULL;
×
NEW
137
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
NEW
138
    pDropReq = req.pReqs + iReq;
×
139

NEW
140
    if (pDropReq->suid == tbSuid &&
×
NEW
141
        taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
NEW
142
      needRebuild++;
×
143
    }
144
  }
NEW
145
  if (needRebuild == 0) {
×
146
    // do nothing
NEW
147
  } else if (needRebuild == req.nReqs) {
×
NEW
148
    *realTbSuid = tbSuid;
×
149
  } else {
NEW
150
    *realTbSuid = tbSuid;
×
NEW
151
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
NEW
152
    if (reqNew.pArray == NULL) {
×
NEW
153
      code = terrno;
×
NEW
154
      lino = __LINE__;
×
NEW
155
      goto end;
×
156
    }
NEW
157
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
NEW
158
      pDropReq = req.pReqs + iReq;
×
NEW
159
      if (pDropReq->suid == tbSuid &&
×
NEW
160
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
NEW
161
        reqNew.nReqs++;
×
NEW
162
        if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
NEW
163
          code = terrno;
×
NEW
164
          lino = __LINE__;
×
NEW
165
          goto end;
×
166
        }
167
      }
168
    }
169

NEW
170
    int     tlen = 0;
×
NEW
171
    tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
×
NEW
172
    buf = taosMemoryMalloc(tlen);
×
NEW
173
    if (NULL == buf || code < 0) {
×
NEW
174
      lino = __LINE__;
×
NEW
175
      goto end;
×
176
    }
NEW
177
    SEncoder coderNew = {0};
×
NEW
178
    tEncoderInit(&coderNew, buf, tlen);
×
NEW
179
    code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
NEW
180
    tEncoderClear(&coderNew);
×
NEW
181
    if (code != 0) {
×
NEW
182
      lino = __LINE__;
×
NEW
183
      goto end;
×
184
    }
NEW
185
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
NEW
186
    pHead->bodyLen = tlen + sizeof(SMsgHead);
×
187
  }
188

NEW
189
end:
×
NEW
190
  taosMemoryFree(buf);
×
NEW
191
  taosArrayDestroy(reqNew.pArray);
×
NEW
192
  if (code < 0) {
×
NEW
193
    tqError("processDropTbMsg failed, code:%d, line:%d", code, lino);
×
194
  }
NEW
195
}
×
196

197
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
9,766✔
198
  int32_t code = 0;
9,766✔
199
  int32_t lino = 0;
9,766✔
200
  if (pHandle == NULL || pHead == NULL) {
9,766✔
UNCOV
201
    return false;
×
202
  }
203
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
9,766✔
204
    return true;
9,369✔
205
  }
206

207
  STqExecHandle* pExec = &pHandle->execHandle;
397✔
208
  STqReader* pReader = pExec->pTqReader;
397✔
209

210
  int16_t msgType = pHead->msgType;
397✔
211
  char*   body = pHead->body;
397✔
212
  int32_t bodyLen = pHead->bodyLen;
397✔
213

214
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
397✔
215
  int64_t  realTbSuid = 0;
397✔
216
  SDecoder dcoder = {0};
397✔
217
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
397✔
218
  int32_t  len = bodyLen - sizeof(SMsgHead);
397✔
219
  tDecoderInit(&dcoder, data, len);
397✔
220

221
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
483✔
222
    SVCreateStbReq req = {0};
86✔
223
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
86✔
UNCOV
224
      goto end;
×
225
    }
226
    realTbSuid = req.suid;
86✔
227
  } else if (msgType == TDMT_VND_DROP_STB) {
311✔
228
    SVDropStbReq req = {0};
×
229
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
230
      goto end;
×
231
    }
UNCOV
232
    realTbSuid = req.suid;
×
233
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
311✔
234
    processCreateTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
176✔
235
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
135✔
236
    processAlterTbMsg(&dcoder, pReader, &realTbSuid);
135✔
UNCOV
237
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
NEW
238
    processDropTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
×
239
  } else if (msgType == TDMT_VND_DELETE) {
×
UNCOV
240
    SDeleteRes req = {0};
×
UNCOV
241
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
UNCOV
242
      goto end;
×
243
    }
UNCOV
244
    realTbSuid = req.suid;
×
245
  }
246

247
end:
397✔
248
  tDecoderClear(&dcoder);
397✔
249
  bool tmp = tbSuid == realTbSuid;
397✔
250
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
397✔
251
  return tmp;
397✔
252
}
253

254
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
14,964,387✔
255
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
14,964,387✔
UNCOV
256
    return -1;
×
257
  }
258
  int32_t code = -1;
14,983,958✔
259
  int32_t vgId = TD_VID(pTq->pVnode);
14,983,958✔
260
  int64_t id = pHandle->pWalReader->readerId;
14,986,980✔
261

262
  int64_t offset = *fetchOffset;
14,986,086✔
263
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
14,987,963✔
264
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
14,987,307✔
265
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
14,987,412✔
266

267
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
14,986,822✔
268
          ", 0x%" PRIx64,
269
          vgId, offset, lastVer, committedVer, appliedVer, id);
270

271
  while (offset <= appliedVer) {
15,397,599✔
272
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
14,438,837✔
UNCOV
273
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
274
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
275
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
276
      goto END;
×
277
    }
278

279
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
14,438,413✔
280
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
281

282
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
14,438,585✔
283
      code = walFetchBody(pHandle->pWalReader);
14,021,257✔
284
      goto END;
14,021,070✔
285
    } else {
286
      if (pHandle->fetchMeta != WITH_DATA) {
417,540✔
287
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
14,633✔
288
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
14,633✔
289
          code = walFetchBody(pHandle->pWalReader);
9,766✔
290
          if (code < 0) {
9,766✔
UNCOV
291
            goto END;
×
292
          }
293

294
          pHead = &(pHandle->pWalReader->pHead->head);
9,766✔
295
          if (isValValidForTable(pHandle, pHead)) {
9,766✔
296
            code = 0;
9,614✔
297
            goto END;
9,614✔
298
          } else {
299
            offset++;
135✔
300
            code = -1;
135✔
301
            continue;
135✔
302
          }
303
        }
304
      }
305
      code = walSkipFetchBody(pHandle->pWalReader);
407,774✔
306
      if (code < 0) {
407,854✔
UNCOV
307
        goto END;
×
308
      }
309
      offset++;
407,854✔
310
    }
311
    code = -1;
407,854✔
312
  }
313

314
END:
958,762✔
315
  *fetchOffset = offset;
14,989,446✔
316
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
14,989,486✔
317
          ", applied:%" PRId64 ", 0x%" PRIx64,
318
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
319
  return code;
14,989,610✔
320
}
321

322
bool tqGetTablePrimaryKey(STqReader* pReader) {
1,192,245✔
323
  if (pReader == NULL) {
1,192,245✔
UNCOV
324
    return false;
×
325
  }
326
  return pReader->hasPrimaryKey;
1,192,245✔
327
}
328

329
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
10,157✔
330
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
10,157✔
331

332
  if (pReader == NULL) {
10,157✔
UNCOV
333
    return;
×
334
  }
335
  bool            ret = false;
10,157✔
336
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL, 0);
10,157✔
337
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
10,157✔
UNCOV
338
    ret = true;
×
339
  }
340
  tDeleteSchemaWrapper(schema);
341
  pReader->hasPrimaryKey = ret;
10,157✔
342
}
343

344
STqReader* tqReaderOpen(SVnode* pVnode) {
128,617✔
345
  tqDebug("%s:%p", __FUNCTION__, pVnode);
128,617✔
346
  if (pVnode == NULL) {
128,999✔
UNCOV
347
    return NULL;
×
348
  }
349
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
128,999✔
350
  if (pReader == NULL) {
128,999✔
UNCOV
351
    return NULL;
×
352
  }
353

354
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
128,999✔
355
  if (pReader->pWalReader == NULL) {
128,999✔
UNCOV
356
    taosMemoryFree(pReader);
×
UNCOV
357
    return NULL;
×
358
  }
359

360
  pReader->pVnodeMeta = pVnode->pMeta;
128,999✔
361
  pReader->pColIdList = NULL;
128,905✔
362
  pReader->cachedSchemaVer = 0;
128,999✔
363
  pReader->cachedSchemaSuid = 0;
128,905✔
364
  pReader->pSchemaWrapper = NULL;
128,905✔
365
  pReader->tbIdHash = NULL;
128,999✔
366
  pReader->pResBlock = NULL;
128,905✔
367

368
  int32_t code = createDataBlock(&pReader->pResBlock);
128,999✔
369
  if (code) {
128,905✔
UNCOV
370
    terrno = code;
×
371
  }
372

373
  return pReader;
128,905✔
374
}
375

376
void tqReaderClose(STqReader* pReader) {
129,121✔
377
  tqDebug("%s:%p", __FUNCTION__, pReader);
129,121✔
378
  if (pReader == NULL) return;
129,121✔
379

380
  // close wal reader
381
  if (pReader->pWalReader) {
128,999✔
382
    walCloseReader(pReader->pWalReader);
128,999✔
383
  }
384

385
  if (pReader->pSchemaWrapper) {
128,999✔
386
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
75,489✔
387
  }
388

389
  taosMemoryFree(pReader->extSchema);
128,999✔
390
  if (pReader->pColIdList) {
128,999✔
391
    taosArrayDestroy(pReader->pColIdList);
109,666✔
392
  }
393

394
  // free hash
395
  blockDataDestroy(pReader->pResBlock);
128,999✔
396
  taosHashCleanup(pReader->tbIdHash);
128,999✔
397
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
128,999✔
398

399
  taosHashCleanup(pReader->vtSourceScanInfo.pVirtualTables);
128,999✔
400
  taosHashCleanup(pReader->vtSourceScanInfo.pPhysicalTables);
128,999✔
401
  taosLRUCacheCleanup(pReader->vtSourceScanInfo.pPhyTblSchemaCache);
128,999✔
402
  taosMemoryFree(pReader);
128,999✔
403
}
404

405
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
238,174✔
406
  if (pReader == NULL) {
238,174✔
UNCOV
407
    return TSDB_CODE_INVALID_PARA;
×
408
  }
409
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
238,174✔
410
    return terrno;
8,392✔
411
  }
412
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
229,782✔
413
  return 0;
230,029✔
414
}
415

416
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
38,181,651✔
417
  if (pReader == NULL) {
38,181,651✔
UNCOV
418
    return false;
×
419
  }
420
  SWalReader* pWalReader = pReader->pWalReader;
38,181,651✔
421

422
  int64_t st = taosGetTimestampMs();
38,183,262✔
423
  while (1) {
39,786,000✔
424
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
77,969,262✔
425
    while (pReader->nextBlk < numOfBlocks) {
82,908,411✔
426
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
39,782,656✔
427
              pReader->msg.ver);
428

429
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
39,782,656✔
430
      if (pSubmitTbData == NULL) {
39,778,550✔
UNCOV
431
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
432
                pReader->msg.ver);
433
        return false;
561✔
434
      }
435
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
39,778,550✔
436
        pReader->nextBlk += 1;
1,802✔
437
        continue;
1,802✔
438
      }
439
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
39,775,199✔
440
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
34,838,437✔
441
        SSDataBlock* pRes = NULL;
34,838,437✔
442
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
34,832,071✔
443
        if (code == TSDB_CODE_SUCCESS) {
34,838,576✔
444
          return true;
34,838,576✔
445
        }
446
      } else {
447
        pReader->nextBlk += 1;
4,944,429✔
448
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
4,944,429✔
449
      }
450
    }
451

452
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
43,128,529✔
453
    pReader->msg.msgStr = NULL;
43,128,029✔
454

455
    int64_t elapsed = taosGetTimestampMs() - st;
43,128,582✔
456
    if (elapsed > 1000 || elapsed < 0) {
43,128,582✔
457
      return false;
674✔
458
    }
459

460
    // try next message in wal file
461
    if (walNextValidMsg(pWalReader, false) < 0) {
43,127,908✔
462
      return false;
3,341,252✔
463
    }
464

465
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
39,784,583✔
466
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
39,786,439✔
467
    int64_t ver = pWalReader->pHead->head.version;
39,786,185✔
468
    SDecoder decoder = {0};
39,785,427✔
469
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder) != 0) {
39,784,647✔
UNCOV
470
      tDecoderClear(&decoder);
×
UNCOV
471
      return false;
×
472
    }
473
    tDecoderClear(&decoder);
39,778,846✔
474
    pReader->nextBlk = 0;
39,783,375✔
475
  }
476
}
477

478
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
53,804,904✔
479
  if (pReader == NULL) {
53,804,904✔
UNCOV
480
    return TSDB_CODE_INVALID_PARA;
×
481
  }
482
  pReader->msg.msgStr = msgStr;
53,804,904✔
483
  pReader->msg.msgLen = msgLen;
53,806,374✔
484
  pReader->msg.ver = ver;
53,806,010✔
485

486
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
53,807,606✔
487

488
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
53,807,606✔
489
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
53,804,744✔
490

491
  if (code != 0) {
53,803,009✔
UNCOV
492
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
493
  }
494

495
  return code;
53,804,075✔
496
}
497

498
void tqReaderClearSubmitMsg(STqReader* pReader) {
28,017,195✔
499
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
28,017,195✔
500
  pReader->nextBlk = 0;
28,021,841✔
501
  pReader->msg.msgStr = NULL;
28,027,782✔
502
}
28,030,760✔
503

504
SWalReader* tqGetWalReader(STqReader* pReader) {
42,085,214✔
505
  if (pReader == NULL) {
42,085,214✔
UNCOV
506
    return NULL;
×
507
  }
508
  return pReader->pWalReader;
42,085,214✔
509
}
510

511
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
38,176,878✔
512
  if (pReader == NULL) {
38,176,878✔
UNCOV
513
    return NULL;
×
514
  }
515
  return pReader->pResBlock;
38,176,878✔
516
}
517

518
int64_t tqGetResultBlockTime(STqReader* pReader) {
38,181,225✔
519
  if (pReader == NULL) {
38,181,225✔
UNCOV
520
    return 0;
×
521
  }
522
  return pReader->lastTs;
38,181,225✔
523
}
524

525
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
5,397,276✔
526
  int32_t code = false;
5,397,276✔
527
  int32_t lino = 0;
5,397,276✔
528
  int64_t uid = 0;
5,397,276✔
529
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
5,397,276✔
530
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
5,397,276✔
531
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
5,397,362✔
532

533
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
5,397,964✔
534
  while (pReader->nextBlk < blockSz) {
5,699,894✔
535
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
2,850,720✔
536
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
2,850,720✔
537
    uid = pSubmitTbData->uid;
2,850,720✔
538
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
2,850,720✔
539
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
2,850,634✔
540

541
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
302,188✔
542
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
543
    pReader->nextBlk++;
302,188✔
544
  }
545

546
  tqReaderClearSubmitMsg(pReader);
2,849,518✔
547
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
2,849,174✔
548

549
END:
2,849,174✔
550
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
5,397,620✔
551
  return code;
5,396,846✔
552
}
553

554
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
22,327,695✔
555
  int32_t code = false;
22,327,695✔
556
  int32_t lino = 0;
22,327,695✔
557
  int64_t uid = 0;
22,327,695✔
558

559
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
22,327,695✔
560
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
22,327,695✔
561
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
22,334,455✔
562

563
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
22,334,455✔
564
  while (pReader->nextBlk < blockSz) {
22,339,889✔
565
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
11,171,883✔
566
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
11,171,655✔
567
    uid = pSubmitTbData->uid;
11,171,655✔
568
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
11,172,602✔
569
    TSDB_CHECK_NULL(ret, code, lino, END, true);
11,172,393✔
UNCOV
570
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
UNCOV
571
    pReader->nextBlk++;
×
572
  }
573
  tqReaderClearSubmitMsg(pReader);
11,168,723✔
574
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
11,170,693✔
575

576
END:
11,170,693✔
577
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
22,343,086✔
578
  return code;
22,321,407✔
579
}
580

581
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
13,671,583✔
582
                    SExtSchema* extSrc) {
583
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
13,671,583✔
UNCOV
584
    return TSDB_CODE_INVALID_PARA;
×
585
  }
586
  int32_t code = 0;
13,677,535✔
587

588
  int32_t cnt = 0;
13,677,535✔
589
  for (int32_t i = 0; i < pSrc->nCols; i++) {
62,314,239✔
590
    cnt += mask[i];
48,633,251✔
591
  }
592

593
  pDst->nCols = cnt;
13,679,128✔
594
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
13,682,452✔
595
  if (pDst->pSchema == NULL) {
13,672,648✔
UNCOV
596
    return TAOS_GET_TERRNO(terrno);
×
597
  }
598

599
  int32_t j = 0;
13,674,562✔
600
  for (int32_t i = 0; i < pSrc->nCols; i++) {
62,333,863✔
601
    if (mask[i]) {
48,645,396✔
602
      pDst->pSchema[j++] = pSrc->pSchema[i];
48,651,504✔
603
      SColumnInfoData colInfo =
48,653,467✔
604
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
48,650,178✔
605
      if (extSrc != NULL) {
48,643,592✔
606
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
3,706✔
607
      }
608
      code = blockDataAppendColInfo(pBlock, &colInfo);
48,643,592✔
609
      if (code != 0) {
48,658,027✔
UNCOV
610
        return code;
×
611
      }
612
    }
613
  }
614
  return 0;
13,684,311✔
615
}
616

617
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
64,853✔
618
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
64,853✔
619
    return TSDB_CODE_INVALID_PARA;
32✔
620
  }
621
  SSDataBlock* pBlock = pReader->pResBlock;
64,821✔
622
  if (blockDataGetNumOfCols(pBlock) > 0) {
64,761✔
623
    blockDataDestroy(pBlock);
67✔
624
    int32_t code = createDataBlock(&pReader->pResBlock);
67✔
625
    if (code) {
67✔
UNCOV
626
      return code;
×
627
    }
628
    pBlock = pReader->pResBlock;
67✔
629

630
    pBlock->info.id.uid = pReader->cachedSchemaUid;
67✔
631
    pBlock->info.version = pReader->msg.ver;
67✔
632
  }
633

634
  int32_t numOfCols = taosArrayGetSize(pColIdList);
64,821✔
635

636
  if (numOfCols == 0) {  // all columns are required
64,853✔
UNCOV
637
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
UNCOV
638
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
UNCOV
639
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
640

UNCOV
641
      if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
×
UNCOV
642
        decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
643
      }
UNCOV
644
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
UNCOV
645
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
646
        blockDataFreeRes(pBlock);
×
UNCOV
647
        return terrno;
×
648
      }
649
    }
650
  } else {
651
    if (numOfCols > pSchema->nCols) {
64,853✔
652
      numOfCols = pSchema->nCols;
67✔
653
    }
654

655
    int32_t i = 0;
64,821✔
656
    int32_t j = 0;
64,821✔
657
    while (i < pSchema->nCols && j < numOfCols) {
825,146✔
658
      SSchema* pColSchema = &pSchema->pSchema[i];
760,397✔
659
      col_id_t colIdSchema = pColSchema->colId;
760,429✔
660

661
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
760,429✔
662
      if (pColIdNeed == NULL) {
759,835✔
UNCOV
663
        break;
×
664
      }
665
      if (colIdSchema < *pColIdNeed) {
759,835✔
666
        i++;
43,722✔
667
      } else if (colIdSchema > *pColIdNeed) {
716,113✔
UNCOV
668
        j++;
×
669
      } else {
670
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
716,280✔
671
        if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
716,707✔
672
          decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
55,444✔
673
        }
674
        int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
716,707✔
675
        if (code != TSDB_CODE_SUCCESS) {
716,603✔
676
          return -1;
×
677
        }
678
        i++;
716,603✔
679
        j++;
716,603✔
680
      }
681
    }
682
  }
683

684
  return TSDB_CODE_SUCCESS;
64,749✔
685
}
686

UNCOV
687
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
UNCOV
688
  int32_t code = 0;
×
UNCOV
689
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
UNCOV
690
    return TSDB_CODE_INVALID_PARA;
×
691
  }
692
  // TODO(yhDeng)
UNCOV
693
  if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
694
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
UNCOV
695
    if (val == NULL) {
×
UNCOV
696
      return terrno;
×
697
    }
698

UNCOV
699
    uint64_t seq = 0;
×
UNCOV
700
    int32_t  len = 0;
×
701
    if (pColVal->value.pData != NULL) {
×
UNCOV
702
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
UNCOV
703
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
704
      }
UNCOV
705
      SBlobItem item = {0};
×
706
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
UNCOV
707
      if (code != 0) {
×
UNCOV
708
        taosMemoryFree(val);
×
UNCOV
709
        terrno = code;
×
UNCOV
710
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
UNCOV
711
        return code;
×
712
      }
713

714
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
UNCOV
715
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
UNCOV
716
      len = item.len;
×
717
    }
718

UNCOV
719
    blobDataSetLen(val, len);
×
UNCOV
720
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
721

UNCOV
722
    taosMemoryFree(val);
×
723
  } else {
UNCOV
724
    colDataSetNULL(pColumnInfoData, idx);
×
725
  }
726
  return code;
×
727
}
728
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
729
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
730

731
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
732
    if (COL_VAL_IS_VALUE(pColVal)) {
702,917,288✔
733
      char val[65535 + 2] = {0};
730,220,602✔
734
      if (pColVal->value.pData != NULL) {
730,236,507✔
735
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
728,838,559✔
736
      }
737
      varDataSetLen(val, pColVal->value.nData);
729,989,665✔
738
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
729,902,881✔
739
    } else {
740
      colDataSetNULL(pColumnInfoData, rowIndex);
×
741
    }
742
  } else {
743
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
744
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
745
  }
746

747
  return code;
2,147,483,647✔
748
}
749

750
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
34,826,499✔
751
  if (pReader == NULL || pRes == NULL) {
34,826,499✔
752
    return TSDB_CODE_INVALID_PARA;
×
753
  }
754
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
34,838,710✔
755
  int32_t        code = 0;
34,843,779✔
756
  int32_t        line = 0;
34,843,779✔
757
  STSchema*      pTSchema = NULL;
34,843,779✔
758
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
34,843,779✔
759
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
34,842,468✔
760
  SSDataBlock* pBlock = pReader->pResBlock;
34,842,468✔
761
  *pRes = pBlock;
34,842,468✔
762

763
  blockDataCleanup(pBlock);
34,842,468✔
764

765
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
34,842,242✔
766
  int32_t sversion = pSubmitTbData->sver;
34,841,827✔
767
  int64_t suid = pSubmitTbData->suid;
34,842,468✔
768
  int64_t uid = pSubmitTbData->uid;
34,842,554✔
769
  pReader->lastTs = pSubmitTbData->ctimeMs;
34,842,450✔
770

771
  pBlock->info.id.uid = uid;
34,842,450✔
772
  pBlock->info.version = pReader->msg.ver;
34,842,138✔
773

774
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
34,842,034✔
775
      (pReader->cachedSchemaVer != sversion)) {
34,777,040✔
776
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
65,061✔
777
    taosMemoryFree(pReader->extSchema);
64,853✔
778
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema, 0);
64,853✔
779
    if (pReader->pSchemaWrapper == NULL) {
64,853✔
UNCOV
780
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
781
             "version %d, possibly dropped table",
782
             vgId, suid, uid, pReader->cachedSchemaVer);
UNCOV
783
      pReader->cachedSchemaSuid = 0;
×
UNCOV
784
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
785
    }
786

787
    pReader->cachedSchemaUid = uid;
64,821✔
788
    pReader->cachedSchemaSuid = suid;
64,821✔
789
    pReader->cachedSchemaVer = sversion;
64,821✔
790

791
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
64,853✔
UNCOV
792
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
793
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
UNCOV
794
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
795
    }
796
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
64,749✔
797
    TSDB_CHECK_CODE(code, line, END);
64,853✔
798
    pBlock = pReader->pResBlock;
64,853✔
799
    *pRes = pBlock;
64,853✔
800
  }
801

802
  int32_t numOfRows = 0;
34,841,722✔
803
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
34,841,722✔
804
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
136✔
805
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
136✔
806
    numOfRows = pCol->nVal;
136✔
807
  } else {
808
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
34,841,586✔
809
  }
810

811
  code = blockDataEnsureCapacity(pBlock, numOfRows);
34,841,959✔
812
  TSDB_CHECK_CODE(code, line, END);
34,841,930✔
813
  pBlock->info.rows = numOfRows;
34,841,930✔
814
  int32_t colActual = blockDataGetNumOfCols(pBlock);
34,841,930✔
815

816
  // convert and scan one block
817
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
34,842,138✔
818
    SArray* pCols = pSubmitTbData->aCol;
136✔
819
    int32_t numOfCols = taosArrayGetSize(pCols);
136✔
820
    int32_t targetIdx = 0;
136✔
821
    int32_t sourceIdx = 0;
136✔
822
    while (targetIdx < colActual) {
612✔
823
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
476✔
824
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
476✔
825
      if (sourceIdx >= numOfCols) {
476✔
826
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
136✔
827
        colDataSetNNULL(pColData, 0, numOfRows);
136✔
828
        targetIdx++;
136✔
829
        continue;
136✔
830
      }
831

832
      uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
340✔
833

834
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
340✔
835
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
340✔
836
      SColVal colVal = {0};
340✔
837
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
340✔
838
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
839
      if (pCol->cid < pColData->info.colId) {
340✔
840
        sourceIdx++;
136✔
841
      } else if (pCol->cid == pColData->info.colId) {
204✔
842
        for (int32_t i = 0; i < pCol->nVal; i++) {
408✔
843
          code = tColDataGetValue(pCol, i, &colVal);
272✔
844
          TSDB_CHECK_CODE(code, line, END);
272✔
845

846
          if (isBlob == 0) {
272✔
847
            code = doSetVal(pColData, i, &colVal);
272✔
848
          } else {
UNCOV
849
            code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobSet);
×
850
          }
851
          TSDB_CHECK_CODE(code, line, END);
272✔
852
        }
853
        sourceIdx++;
136✔
854
        targetIdx++;
136✔
855
      } else {
856
        colDataSetNNULL(pColData, 0, numOfRows);
68✔
857
        targetIdx++;
68✔
858
      }
859
    }
860
  } else {
861
    SArray*         pRows = pSubmitTbData->aRowP;
34,841,916✔
862
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
34,841,916✔
863
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
34,841,913✔
864
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
34,842,144✔
865

866
    for (int32_t i = 0; i < numOfRows; i++) {
1,635,146,684✔
867
      SRow* pRow = taosArrayGetP(pRows, i);
1,548,673,920✔
868
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
1,542,515,283✔
869
      int32_t sourceIdx = 0;
1,545,489,578✔
870
      for (int32_t j = 0; j < colActual; j++) {
2,147,483,647✔
871
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
2,147,483,647✔
872
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
2,147,483,647✔
873

874
        uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
875
        while (1) {
161,388,184✔
876
          SColVal colVal = {0};
2,147,483,647✔
877
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
2,147,483,647✔
878
          TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
879

880
          if (colVal.cid < pColData->info.colId) {
2,147,483,647✔
881
            sourceIdx++;
161,388,184✔
882
            continue;
161,388,184✔
883
          } else if (colVal.cid == pColData->info.colId) {
2,147,483,647✔
884
            if (isBlob == 0) {
2,147,483,647✔
885
              code = doSetVal(pColData, i, &colVal);
2,147,483,647✔
886
            } else {
887
              code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobSet);
×
888
            }
889

890
            TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
891

892
            sourceIdx++;
2,147,483,647✔
893
            break;
2,147,483,647✔
894
          } else {
UNCOV
895
            colDataSetNULL(pColData, i);
×
UNCOV
896
            break;
×
897
          }
898
        }
899
      }
900
    }
901
  }
902

903
END:
86,472,764✔
904
  if (code != 0) {
26,356,249✔
UNCOV
905
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
906
  }
907
  taosMemoryFreeClear(pTSchema);
34,838,699✔
908
  return code;
34,838,994✔
909
}
910

911
#define PROCESS_VAL                                      \
912
  if (curRow == 0) {                                     \
913
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
914
    buildNew = true;                                     \
915
  } else {                                               \
916
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
917
    if (currentRowAssigned != assigned[j]) {             \
918
      assigned[j] = currentRowAssigned;                  \
919
      buildNew = true;                                   \
920
    }                                                    \
921
  }
922

923
#define SET_DATA                                                                                    \
924
  if (colVal.cid < pColData->info.colId) {                                                          \
925
    sourceIdx++;                                                                                    \
926
  } else if (colVal.cid == pColData->info.colId) {                                                  \
927
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
928
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
929
    } else {                                                                                        \
930
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
931
    }                                                                                               \
932
    sourceIdx++;                                                                                    \
933
    targetIdx++;                                                                                    \
934
  } else {                                                                                          \
935
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
936
    targetIdx++;                                                                                    \
937
  }
938

939
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
13,663,227✔
940
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
941
  int32_t         code = 0;
13,663,227✔
942
  SSchemaWrapper* pSW = NULL;
13,663,227✔
943
  SSDataBlock*    block = NULL;
13,670,564✔
944
  if (taosArrayGetSize(blocks) > 0) {
13,670,564✔
UNCOV
945
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
UNCOV
946
    TQ_NULL_GO_TO_END(pLastBlock);
×
UNCOV
947
    pLastBlock->info.rows = curRow - *lastRow;
×
UNCOV
948
    *lastRow = curRow;
×
949
  }
950

951
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
13,677,083✔
952
  TQ_NULL_GO_TO_END(block);
13,670,310✔
953

954
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
13,670,310✔
955
  TQ_NULL_GO_TO_END(pSW);
13,676,409✔
956

957
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
13,676,409✔
958
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
13,683,601✔
959
          (int32_t)taosArrayGetSize(block->pDataBlock));
960

961
  block->info.id.uid = pSubmitTbData->uid;
13,683,601✔
962
  block->info.version = pReader->msg.ver;
13,680,119✔
963
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
13,681,669✔
964
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
13,680,076✔
965
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
13,679,741✔
966
  pSW = NULL;
13,679,741✔
967

968
  taosMemoryFreeClear(block);
13,679,741✔
969

970
END:
13,678,915✔
971
  if (code != 0) {
13,678,646✔
UNCOV
972
    tqError("processBuildNew failed, code:%d", code);
×
973
  }
974
  tDeleteSchemaWrapper(pSW);
13,678,646✔
975
  blockDataFreeRes(block);
13,672,870✔
976
  taosMemoryFree(block);
13,674,564✔
977
  return code;
13,672,316✔
978
}
979
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
6,002✔
980
  int32_t code = 0;
6,002✔
981
  int32_t curRow = 0;
6,002✔
982
  int32_t lastRow = 0;
6,002✔
983

984
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
6,002✔
985
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
6,002✔
986
  TQ_NULL_GO_TO_END(assigned);
6,002✔
987

988
  SArray*   pCols = pSubmitTbData->aCol;
6,002✔
989
  SColData* pCol = taosArrayGet(pCols, 0);
6,002✔
990
  TQ_NULL_GO_TO_END(pCol);
6,002✔
991
  int32_t numOfRows = pCol->nVal;
6,002✔
992
  int32_t numOfCols = taosArrayGetSize(pCols);
6,002✔
993
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
6,002✔
994
          numOfRows);
995
  for (int32_t i = 0; i < numOfRows; i++) {
4,069,306✔
996
    bool buildNew = false;
4,009,488✔
997

998
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
15,766,738✔
999
      int32_t k = 0;
11,425,170✔
1000
      for (; k < numOfCols; k++) {
22,471,609✔
1001
        pCol = taosArrayGet(pCols, k);
20,486,941✔
1002
        TQ_NULL_GO_TO_END(pCol);
20,470,953✔
1003
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
20,470,953✔
1004
          SColVal colVal = {0};
11,191,062✔
1005
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
11,477,670✔
1006
          PROCESS_VAL
11,642,030✔
1007
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
11,776,822✔
1008
          break;
11,700,186✔
1009
        }
1010
      }
1011
      if (k >= numOfCols) {
11,757,250✔
1012
        // this column is not in the current row, so we set it to NULL
UNCOV
1013
        assigned[j] = 0;
×
UNCOV
1014
        buildNew = true;
×
1015
      }
1016
    }
1017

1018
    if (buildNew) {
3,021,620✔
1019
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
6,002✔
1020
    }
1021

1022
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
3,021,620✔
1023
    TQ_NULL_GO_TO_END(pBlock);
4,045,916✔
1024

1025
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
4,045,916✔
1026
            (int32_t)taosArrayGetSize(blocks));
1027

1028
    int32_t targetIdx = 0;
4,045,916✔
1029
    int32_t sourceIdx = 0;
4,045,916✔
1030
    int32_t colActual = blockDataGetNumOfCols(pBlock);
4,045,916✔
1031
    while (targetIdx < colActual && sourceIdx < numOfCols) {
15,859,501✔
1032
      pCol = taosArrayGet(pCols, sourceIdx);
11,796,197✔
1033
      TQ_NULL_GO_TO_END(pCol);
10,863,461✔
1034
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
10,863,461✔
1035
      TQ_NULL_GO_TO_END(pColData);
11,437,573✔
1036
      SColVal colVal = {0};
11,437,573✔
1037
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
11,061,029✔
1038
      SET_DATA
11,751,789✔
1039
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
12,005,049✔
1040
    }
1041

1042
    curRow++;
4,063,304✔
1043
  }
1044
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
59,818✔
1045
  pLastBlock->info.rows = curRow - lastRow;
6,002✔
1046
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
6,002✔
1047
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1048
END:
2,176,310✔
1049
  if (code != TSDB_CODE_SUCCESS) {
6,002✔
UNCOV
1050
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1051
  }
1052
  taosMemoryFree(assigned);
6,002✔
1053
  return code;
6,002✔
1054
}
1055

1056
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
13,660,601✔
1057
  int32_t   code = 0;
13,660,601✔
1058
  STSchema* pTSchema = NULL;
13,660,601✔
1059

1060
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
13,660,601✔
1061
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
13,668,588✔
1062
  TQ_NULL_GO_TO_END(assigned);
13,666,751✔
1063

1064
  int32_t curRow = 0;
13,666,751✔
1065
  int32_t lastRow = 0;
13,666,751✔
1066
  SArray* pRows = pSubmitTbData->aRowP;
13,661,518✔
1067
  int32_t numOfRows = taosArrayGetSize(pRows);
13,670,931✔
1068
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
13,669,935✔
1069
  TQ_NULL_GO_TO_END(pTSchema);
13,674,443✔
1070
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
13,674,443✔
1071

1072
  for (int32_t i = 0; i < numOfRows; i++) {
279,529,243✔
1073
    bool  buildNew = false;
265,865,116✔
1074
    SRow* pRow = taosArrayGetP(pRows, i);
265,865,116✔
1075
    TQ_NULL_GO_TO_END(pRow);
265,472,622✔
1076

1077
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
1,464,983,070✔
1078
      SColVal colVal = {0};
1,196,784,334✔
1079
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
1,197,133,280✔
1080
      PROCESS_VAL
1,198,921,140✔
1081
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
1,199,399,648✔
1082
    }
1083

1084
    if (buildNew) {
265,044,671✔
1085
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
13,677,594✔
1086
    }
1087

1088
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
265,036,954✔
1089
    TQ_NULL_GO_TO_END(pBlock);
266,081,051✔
1090

1091
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
266,081,051✔
1092
            (int32_t)taosArrayGetSize(blocks));
1093

1094
    int32_t targetIdx = 0;
266,081,051✔
1095
    int32_t sourceIdx = 0;
266,081,051✔
1096
    int32_t colActual = blockDataGetNumOfCols(pBlock);
266,081,051✔
1097
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
1,462,836,101✔
1098
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
1,196,466,933✔
1099
      TQ_NULL_GO_TO_END(pColData);
1,196,405,991✔
1100
      SColVal          colVal = {0};
1,196,405,991✔
1101
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
1,194,442,736✔
1102
      SET_DATA
1,196,735,065✔
1103
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
1,196,424,626✔
1104
    }
1105

1106
    curRow++;
265,868,991✔
1107
  }
1108
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
13,664,127✔
1109
  if (pLastBlock != NULL) {
13,677,989✔
1110
    pLastBlock->info.rows = curRow - lastRow;
13,677,189✔
1111
  }
1112

1113
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
13,670,324✔
1114
          (int)taosArrayGetSize(blocks));
1115
END:
13,746,632✔
1116
  if (code != TSDB_CODE_SUCCESS) {
13,671,669✔
UNCOV
1117
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1118
  }
1119
  taosMemoryFreeClear(pTSchema);
13,665,834✔
1120
  taosMemoryFree(assigned);
13,671,535✔
1121
  return code;
13,664,432✔
1122
}
1123

1124
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
698✔
1125
  int32_t code = 0;
698✔
1126
  int32_t lino = 0;
698✔
1127
  void*   createReq = NULL;
698✔
1128
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
698✔
1129
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
698✔
1130

1131
  if (pRsp->createTableNum == 0) {
698✔
1132
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
460✔
1133
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
460✔
1134
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
460✔
1135
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
460✔
1136
  }
1137

1138
  uint32_t len = 0;
698✔
1139
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
698✔
1140
  TSDB_CHECK_CODE(code, lino, END);
698✔
1141
  createReq = taosMemoryCalloc(1, len);
698✔
1142
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
698✔
1143

1144
  SEncoder encoder = {0};
698✔
1145
  tEncoderInit(&encoder, createReq, len);
698✔
1146
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
698✔
1147
  tEncoderClear(&encoder);
698✔
1148
  TSDB_CHECK_CODE(code, lino, END);
698✔
1149
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
1,396✔
1150
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
1,396✔
1151
  pRsp->createTableNum++;
698✔
1152
  tqTrace("build create table info msg success");
698✔
1153

1154
END:
698✔
1155
  if (code != 0) {
698✔
UNCOV
1156
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
1157
    taosMemoryFree(createReq);
×
1158
  }
1159
  return code;
698✔
1160
}
1161

1162
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
13,720,010✔
1163
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1164
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
13,720,010✔
1165
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
13,720,010✔
1166
  if (pSubmitTbData == NULL) {
13,720,175✔
UNCOV
1167
    return terrno;
×
1168
  }
1169
  pReader->nextBlk++;
13,720,175✔
1170

1171
  if (pSubmitTbDataRet) {
13,718,565✔
1172
    *pSubmitTbDataRet = pSubmitTbData;
13,719,941✔
1173
  }
1174

1175
  if (fetchMeta == ONLY_META) {
13,718,774✔
1176
    if (pSubmitTbData->pCreateTbReq != NULL) {
374✔
1177
      if (pRsp->createTableReq == NULL) {
68✔
1178
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
17✔
1179
        if (pRsp->createTableReq == NULL) {
17✔
UNCOV
1180
          return terrno;
×
1181
        }
1182
      }
1183
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
136✔
UNCOV
1184
        return terrno;
×
1185
      }
1186
      pSubmitTbData->pCreateTbReq = NULL;
68✔
1187
    }
1188
    return 0;
374✔
1189
  }
1190

1191
  int32_t sversion = pSubmitTbData->sver;
13,718,400✔
1192
  int64_t uid = pSubmitTbData->uid;
13,718,884✔
1193
  pReader->lastBlkUid = uid;
13,719,767✔
1194

1195
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
13,718,887✔
1196
  taosMemoryFreeClear(pReader->extSchema);
13,718,504✔
1197
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema, 0);
13,716,869✔
1198
  if (pReader->pSchemaWrapper == NULL) {
13,716,167✔
1199
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
36,997✔
1200
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1201
    pReader->cachedSchemaSuid = 0;
36,997✔
1202
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
36,997✔
1203
  }
1204

1205
  if (pSubmitTbData->pCreateTbReq != NULL) {
13,674,702✔
1206
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
698✔
1207
    if (code != 0) {
698✔
UNCOV
1208
      return code;
×
1209
    }
1210
  } else if (rawList != NULL) {
13,677,702✔
UNCOV
1211
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
UNCOV
1212
      return terrno;
×
1213
    }
UNCOV
1214
    pReader->pSchemaWrapper = NULL;
×
UNCOV
1215
    return 0;
×
1216
  }
1217

1218
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
13,678,400✔
1219
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
6,002✔
1220
  } else {
1221
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
13,663,716✔
1222
  }
1223
}
1224

1225
int32_t tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList, const char* id) {
109,666✔
1226
  if (pReader == NULL) {
109,666✔
UNCOV
1227
    return TSDB_CODE_SUCCESS;
×
1228
  }
1229
  pReader->pColIdList = pColIdList;
109,666✔
1230
  return tqCollectPhysicalTables(pReader, id);
109,666✔
1231
}
1232

1233
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
114,021✔
1234
  if (pReader == NULL || tbUidList == NULL) {
114,021✔
UNCOV
1235
    return TSDB_CODE_SUCCESS;
×
1236
  }
1237
  if (pReader->tbIdHash) {
114,021✔
1238
    taosHashClear(pReader->tbIdHash);
1,044✔
1239
  } else {
1240
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
112,977✔
1241
    if (pReader->tbIdHash == NULL) {
112,977✔
UNCOV
1242
      tqError("s-task:%s failed to init hash table", id);
×
UNCOV
1243
      return terrno;
×
1244
    }
1245
  }
1246

1247
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
2,747,581✔
1248
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
2,633,058✔
1249
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
2,632,763✔
1250
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
UNCOV
1251
      continue;
×
1252
    }
1253
  }
1254

1255
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
113,578✔
1256
  return TSDB_CODE_SUCCESS;
114,021✔
1257
}
1258

1259
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
51,141✔
1260
  if (pReader == NULL || pTableUidList == NULL) {
51,141✔
UNCOV
1261
    return;
×
1262
  }
1263
  if (pReader->tbIdHash == NULL) {
51,141✔
UNCOV
1264
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1265
    if (pReader->tbIdHash == NULL) {
×
UNCOV
1266
      tqError("failed to init hash table");
×
UNCOV
1267
      return;
×
1268
    }
1269
  }
1270

1271
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
51,141✔
1272
  for (int i = 0; i < numOfTables; i++) {
80,155✔
1273
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
29,014✔
1274
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
29,014✔
UNCOV
1275
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
UNCOV
1276
      continue;
×
1277
    }
1278
  }
1279
}
1280

1281
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
UNCOV
1282
  if (pReader == NULL) {
×
UNCOV
1283
    return false;
×
1284
  }
UNCOV
1285
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1286
}
1287

1288
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1289
  if (pReader == NULL) {
×
UNCOV
1290
    return false;
×
1291
  }
UNCOV
1292
  return pReader->msg.msgStr == NULL;
×
1293
}
1294

1295
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
583✔
1296
  if (pReader == NULL || tbUidList == NULL) {
583✔
UNCOV
1297
    return;
×
1298
  }
1299
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
750✔
1300
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
167✔
1301
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
167✔
1302
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1303
    }
1304
  }
1305
}
1306

1307
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
63,406,457✔
1308
  if (pTq == NULL) {
63,406,457✔
UNCOV
1309
    return 0;  // mounted vnode may have no tq
×
1310
  }
1311
  if (tbUidList == NULL) {
63,406,457✔
UNCOV
1312
    return TSDB_CODE_INVALID_PARA;
×
1313
  }
1314
  void*   pIter = NULL;
63,406,457✔
1315
  int32_t vgId = TD_VID(pTq->pVnode);
63,406,457✔
1316

1317
  // update the table list for each consumer handle
1318
  taosWLockLatch(&pTq->lock);
63,407,461✔
1319
  while (1) {
182,505✔
1320
    pIter = taosHashIterate(pTq->pHandle, pIter);
63,589,495✔
1321
    if (pIter == NULL) {
63,589,495✔
1322
      break;
63,406,990✔
1323
    }
1324

1325
    STqHandle* pTqHandle = (STqHandle*)pIter;
182,505✔
1326
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
182,505✔
1327
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
51,724✔
1328
      if (code != 0) {
51,724✔
UNCOV
1329
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1330
        continue;
×
1331
      }
1332
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
130,781✔
1333
      if (!isAdd) {
129,737✔
1334
        int32_t sz = taosArrayGetSize(tbUidList);
43,476✔
1335
        for (int32_t i = 0; i < sz; i++) {
43,476✔
UNCOV
1336
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1337
          if (tbUid &&
×
UNCOV
1338
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
UNCOV
1339
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1340
            continue;
×
1341
          }
1342
        }
1343
      }
1344
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
1,044✔
1345
      if (isAdd) {
1,044✔
1346
        SArray* list = NULL;
1,044✔
1347
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
1,044✔
1348
                                    &list, pTqHandle->execHandle.task);
1349
        if (ret == 0) {
1,044✔
1350
          ret = tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
1,044✔
1351
        }                            
1352
        if (ret != TDB_CODE_SUCCESS) {
1,044✔
UNCOV
1353
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1354
                  pTqHandle->consumerId);
UNCOV
1355
          taosArrayDestroy(list);
×
UNCOV
1356
          taosHashCancelIterate(pTq->pHandle, pIter);
×
UNCOV
1357
          taosWUnLockLatch(&pTq->lock);
×
1358

UNCOV
1359
          return ret;
×
1360
        }
1361
        taosArrayDestroy(list);
1,044✔
1362
      } else {
UNCOV
1363
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1364
      }
1365
    }
1366
  }
1367
  taosWUnLockLatch(&pTq->lock);
63,406,990✔
1368
  return 0;
63,407,304✔
1369
}
1370

UNCOV
1371
static void destroySourceScanTables(void* ptr) {
×
UNCOV
1372
  SArray** pTables = ptr;
×
UNCOV
1373
  if (pTables && *pTables) {
×
1374
    taosArrayDestroy(*pTables);
×
1375
    *pTables = NULL;
×
1376
  }
1377
}
×
1378

UNCOV
1379
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
UNCOV
1380
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
UNCOV
1381
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
UNCOV
1382
  if (pCol1->vColId == pCol2->vColId) {
×
UNCOV
1383
    return 0;
×
UNCOV
1384
  } else if (pCol1->vColId < pCol2->vColId) {
×
UNCOV
1385
    return -1;
×
1386
  } else {
UNCOV
1387
    return 1;
×
1388
  }
1389
}
1390

1391
int32_t tqReaderSetVtableInfo(STqReader* pReader, void* vnode, void* ptr, SSHashObj* pVtableInfos,
×
1392
                              SSDataBlock** ppResBlock, const char* idstr) {
1393
  int32_t            code = TSDB_CODE_SUCCESS;
×
1394
  int32_t            lino = 0;
×
1395
  SStorageAPI*       pAPI = ptr;
×
UNCOV
1396
  SVTSourceScanInfo* pScanInfo = NULL;
×
1397
  SHashObj*          pVirtualTables = NULL;
×
UNCOV
1398
  SMetaReader        metaReader = {0};
×
UNCOV
1399
  SVTColInfo         colInfo = {0};
×
UNCOV
1400
  SSchemaWrapper*    schema = NULL;
×
1401

UNCOV
1402
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
UNCOV
1403
  TSDB_CHECK_NULL(vnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
UNCOV
1404
  TSDB_CHECK_NULL(pAPI, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1405

UNCOV
1406
  pScanInfo = &pReader->vtSourceScanInfo;
×
UNCOV
1407
  taosHashCleanup(pScanInfo->pVirtualTables);
×
UNCOV
1408
  pScanInfo->pVirtualTables = NULL;
×
1409

1410
  if (tSimpleHashGetSize(pVtableInfos) == 0) {
×
1411
    goto _end;
×
1412
  }
1413

UNCOV
1414
  pVirtualTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1415
  TSDB_CHECK_NULL(pVirtualTables, code, lino, _end, terrno);
×
UNCOV
1416
  taosHashSetFreeFp(pVirtualTables, destroySourceScanTables);
×
1417

1418
  int32_t iter = 0;
×
1419
  void*   px = tSimpleHashIterate(pVtableInfos, NULL, &iter);
×
1420
  while (px != NULL) {
×
1421
    int64_t vTbUid = *(int64_t*)tSimpleHashGetKey(px, NULL);
×
1422
    SArray* pColInfos = taosArrayInit(8, sizeof(SVTColInfo));
×
1423
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, terrno);
×
UNCOV
1424
    code = taosHashPut(pVirtualTables, &vTbUid, sizeof(int64_t), &pColInfos, POINTER_BYTES);
×
1425
    TSDB_CHECK_CODE(code, lino, _end);
×
1426

UNCOV
1427
    SSHashObj* pPhysicalTables = *(SSHashObj**)px;
×
UNCOV
1428
    int32_t    iterIn = 0;
×
1429
    void*      pxIn = tSimpleHashIterate(pPhysicalTables, NULL, &iterIn);
×
UNCOV
1430
    while (pxIn != NULL) {
×
1431
      char* physicalTableName = tSimpleHashGetKey(pxIn, NULL);
×
1432
      pAPI->metaReaderFn.clearReader(&metaReader);
×
1433
      pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1434
      code = pAPI->metaReaderFn.getTableEntryByName(&metaReader, physicalTableName);
×
1435
      TSDB_CHECK_CODE(code, lino, _end);
×
1436
      pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1437
      colInfo.pTbUid = metaReader.me.uid;
×
1438

UNCOV
1439
      switch (metaReader.me.type) {
×
1440
        case TSDB_CHILD_TABLE: {
×
1441
          int64_t suid = metaReader.me.ctbEntry.suid;
×
1442
          pAPI->metaReaderFn.clearReader(&metaReader);
×
UNCOV
1443
          pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1444
          code = pAPI->metaReaderFn.getTableEntryByUid(&metaReader, suid);
×
1445
          TSDB_CHECK_CODE(code, lino, _end);
×
1446
          pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
UNCOV
1447
          schema = &metaReader.me.stbEntry.schemaRow;
×
1448
          break;
×
1449
        }
UNCOV
1450
        case TSDB_NORMAL_TABLE: {
×
UNCOV
1451
          schema = &metaReader.me.ntbEntry.schemaRow;
×
1452
          break;
×
1453
        }
1454
        default: {
×
UNCOV
1455
          tqError("invalid table type: %d", metaReader.me.type);
×
1456
          code = TSDB_CODE_INVALID_PARA;
×
1457
          TSDB_CHECK_CODE(code, lino, _end);
×
1458
        }
1459
      }
1460

1461
      SArray* pCols = *(SArray**)pxIn;
×
1462
      int32_t ncols = taosArrayGetSize(pCols);
×
1463
      for (int32_t i = 0; i < ncols; ++i) {
×
UNCOV
1464
        SColIdName* pCol = taosArrayGet(pCols, i);
×
1465
        colInfo.vColId = pCol->colId;
×
1466

1467
        for (int32_t j = 0; j < schema->nCols; ++j) {
×
1468
          if (strncmp(pCol->colName, schema->pSchema[j].name, strlen(schema->pSchema[j].name)) == 0) {
×
1469
            colInfo.pColId = schema->pSchema[j].colId;
×
1470
            void* px = taosArrayPush(pColInfos, &colInfo);
×
1471
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1472
            break;
×
1473
          }
1474
        }
1475
      }
1476

1477
      taosArraySort(pColInfos, compareSVTColInfo);
×
1478
      pxIn = tSimpleHashIterate(pPhysicalTables, pxIn, &iterIn);
×
1479
    }
1480

1481
    px = tSimpleHashIterate(pVtableInfos, px, &iter);
×
1482
  }
1483

1484
  pScanInfo->pVirtualTables = pVirtualTables;
×
1485
  pVirtualTables = NULL;
×
1486

1487
  // set the result data block
1488
  if (pReader->pResBlock) {
×
1489
    blockDataDestroy(pReader->pResBlock);
×
1490
  }
UNCOV
1491
  pReader->pResBlock = *ppResBlock;
×
1492
  *ppResBlock = NULL;
×
1493

1494
  // update reader callback for vtable source scan
1495
  pAPI->tqReaderFn.tqNextBlockImpl = tqNextVTableSourceBlockImpl;
×
UNCOV
1496
  pAPI->tqReaderFn.tqReaderIsQueriedTable = tqReaderIsQueriedSourceTable;
×
1497

UNCOV
1498
_end:
×
1499
  if (code != TSDB_CODE_SUCCESS) {
×
1500
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1501
  }
1502
  pAPI->metaReaderFn.clearReader(&metaReader);
×
1503
  if (pVirtualTables != NULL) {
×
UNCOV
1504
    taosHashCleanup(pVirtualTables);
×
1505
  }
1506
  return code;
×
1507
}
1508

1509
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
109,607✔
1510
  int32_t            code = TSDB_CODE_SUCCESS;
109,607✔
1511
  int32_t            lino = 0;
109,607✔
1512
  SVTSourceScanInfo* pScanInfo = NULL;
109,607✔
1513
  SHashObj*          pVirtualTables = NULL;
109,607✔
1514
  SHashObj*          pPhysicalTables = NULL;
109,607✔
1515
  void*              pIter = NULL;
109,607✔
1516
  void*              px = NULL;
109,607✔
1517

1518
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
109,607✔
1519

1520
  pScanInfo = &pReader->vtSourceScanInfo;
109,607✔
1521
  taosHashCleanup(pScanInfo->pPhysicalTables);
109,503✔
1522
  pScanInfo->pPhysicalTables = NULL;
109,597✔
1523
  taosLRUCacheCleanup(pScanInfo->pPhyTblSchemaCache);
109,666✔
1524
  pScanInfo->pPhyTblSchemaCache = NULL;
109,503✔
1525
  pScanInfo->nextVirtualTableIdx = -1;
109,503✔
1526
  pScanInfo->metaFetch = 0;
109,572✔
1527
  pScanInfo->cacheHit = 0;
109,666✔
1528

1529
  pVirtualTables = pScanInfo->pVirtualTables;
109,503✔
1530
  if (taosHashGetSize(pVirtualTables) == 0 || taosArrayGetSize(pReader->pColIdList) == 0) {
109,405✔
1531
    goto _end;
109,666✔
1532
  }
1533

1534
  pPhysicalTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
UNCOV
1535
  TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno);
×
1536
  taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables);
×
1537

1538
  pIter = taosHashIterate(pVirtualTables, NULL);
×
UNCOV
1539
  while (pIter != NULL) {
×
1540
    int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL);
×
1541
    SArray* pColInfos = *(SArray**)pIter;
×
1542
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1543

1544
    // Traverse all required columns and collect corresponding physical tables
UNCOV
1545
    int32_t nColInfos = taosArrayGetSize(pColInfos);
×
UNCOV
1546
    int32_t nOutputCols = taosArrayGetSize(pReader->pColIdList);
×
UNCOV
1547
    for (int32_t i = 0, j = 0; i < nColInfos && j < nOutputCols;) {
×
UNCOV
1548
      SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
UNCOV
1549
      col_id_t    colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
×
UNCOV
1550
      if (pCol->vColId < colIdNeed) {
×
UNCOV
1551
        i++;
×
UNCOV
1552
      } else if (pCol->vColId > colIdNeed) {
×
UNCOV
1553
        j++;
×
1554
      } else {
UNCOV
1555
        SArray* pRelatedVTs = NULL;
×
UNCOV
1556
        px = taosHashGet(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t));
×
UNCOV
1557
        if (px == NULL) {
×
UNCOV
1558
          pRelatedVTs = taosArrayInit(8, sizeof(int64_t));
×
UNCOV
1559
          TSDB_CHECK_NULL(pRelatedVTs, code, lino, _end, terrno);
×
UNCOV
1560
          code = taosHashPut(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t), &pRelatedVTs, POINTER_BYTES);
×
UNCOV
1561
          if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1562
            taosArrayDestroy(pRelatedVTs);
×
UNCOV
1563
            TSDB_CHECK_CODE(code, lino, _end);
×
1564
          }
1565
        } else {
UNCOV
1566
          pRelatedVTs = *(SArray**)px;
×
1567
        }
UNCOV
1568
        if (taosArrayGetSize(pRelatedVTs) == 0 || *(int64_t*)taosArrayGetLast(pRelatedVTs) != vTbUid) {
×
UNCOV
1569
          px = taosArrayPush(pRelatedVTs, &vTbUid);
×
UNCOV
1570
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1571
        }
1572
        i++;
×
1573
        j++;
×
1574
      }
1575
    }
1576
    pIter = taosHashIterate(pVirtualTables, pIter);
×
1577
  }
1578

1579
  pScanInfo->pPhysicalTables = pPhysicalTables;
×
1580
  pPhysicalTables = NULL;
×
1581

UNCOV
1582
  if (taosHashGetSize(pScanInfo->pPhysicalTables) > 0) {
×
1583
    pScanInfo->pPhyTblSchemaCache = taosLRUCacheInit(1024 * 128, -1, .5);
×
1584
    TSDB_CHECK_NULL(pScanInfo->pPhyTblSchemaCache, code, lino, _end, terrno);
×
1585
  }
1586

1587
_end:
×
1588
  if (code != TSDB_CODE_SUCCESS) {
109,666✔
1589
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1590
  }
1591
  if (pIter != NULL) {
109,597✔
UNCOV
1592
    taosHashCancelIterate(pReader->tbIdHash, pIter);
×
1593
  }
1594
  if (pPhysicalTables != NULL) {
109,666✔
1595
    taosHashCleanup(pPhysicalTables);
×
1596
  }
1597
  return code;
109,666✔
1598
}
1599

1600
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1601
  if (value) {
×
UNCOV
1602
    SSchemaWrapper* pSchemaWrapper = value;
×
1603
    tDeleteSchemaWrapper(pSchemaWrapper);
1604
  }
UNCOV
1605
}
×
1606

1607
bool tqNextVTableSourceBlockImpl(STqReader* pReader, const char* idstr) {
×
1608
  int32_t            code = TSDB_CODE_SUCCESS;
×
UNCOV
1609
  int32_t            lino = 0;
×
1610
  SVTSourceScanInfo* pScanInfo = NULL;
×
1611

UNCOV
1612
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1613

1614
  pScanInfo = &pReader->vtSourceScanInfo;
×
UNCOV
1615
  if (pReader->msg.msgStr == NULL || taosHashGetSize(pScanInfo->pPhysicalTables) == 0) {
×
UNCOV
1616
    return false;
×
1617
  }
1618

UNCOV
1619
  if (pScanInfo->nextVirtualTableIdx >= 0) {
×
1620
    // The data still needs to be converted into the virtual table result block
1621
    return true;
×
1622
  }
1623

UNCOV
1624
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
1625
  while (pReader->nextBlk < blockSz) {
×
UNCOV
1626
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
1627
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
UNCOV
1628
    int64_t pTbUid = pSubmitTbData->uid;
×
UNCOV
1629
    void*   px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
1630
    if (px != NULL) {
×
UNCOV
1631
      SArray* pRelatedVTs = *(SArray**)px;
×
UNCOV
1632
      if (taosArrayGetSize(pRelatedVTs) > 0) {
×
1633
        pScanInfo->nextVirtualTableIdx = 0;
×
UNCOV
1634
        return true;
×
1635
      }
1636
    }
UNCOV
1637
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, pTbUid);
×
1638
    pReader->nextBlk++;
×
1639
  }
1640

UNCOV
1641
  tqReaderClearSubmitMsg(pReader);
×
UNCOV
1642
  tqTrace("iterator data block end, total block num:%d", blockSz);
×
1643

UNCOV
1644
_end:
×
1645
  if (code != TSDB_CODE_SUCCESS) {
×
1646
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1647
  }
1648
  return false;
×
1649
}
1650

UNCOV
1651
bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) {
×
1652
  if (pReader == NULL) {
×
1653
    return false;
×
1654
  }
UNCOV
1655
  return taosHashGet(pReader->vtSourceScanInfo.pPhysicalTables, &uid, sizeof(uint64_t)) != NULL;
×
1656
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc