• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4837

07 Nov 2025 09:40AM UTC coverage: 58.963% (+0.2%) from 58.728%
#4837

push

travis-ci

DuanKuanJun
coverity: cases_other.task add -R -Q2 -Q3 -Q4

150245 of 324452 branches covered (46.31%)

Branch coverage included in aggregate %.

200054 of 269646 relevant lines covered (74.19%)

317833830.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.36
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr);
20

21
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
1,587,778✔
22
  if (pHandle == NULL || pHead == NULL) {
1,587,778!
23
    return false;
×
24
  }
25
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
1,587,778✔
26
    return true;
1,567,198✔
27
  }
28

29
  STqExecHandle* pExec = &pHandle->execHandle;
20,580✔
30
  STqReader* pReader = pExec->pTqReader;
20,580✔
31

32
  int16_t msgType = pHead->msgType;
20,580✔
33
  char*   body = pHead->body;
20,580✔
34
  int32_t bodyLen = pHead->bodyLen;
20,580✔
35

36
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
20,580✔
37
  int64_t  realTbSuid = 0;
20,580✔
38
  SDecoder dcoder = {0};
20,580✔
39
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
20,580✔
40
  int32_t  len = bodyLen - sizeof(SMsgHead);
20,580✔
41
  tDecoderInit(&dcoder, data, len);
20,580✔
42

43
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
23,520!
44
    SVCreateStbReq req = {0};
2,940✔
45
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2,940!
46
      goto end;
×
47
    }
48
    realTbSuid = req.suid;
2,940✔
49
  } else if (msgType == TDMT_VND_DROP_STB) {
17,640!
50
    SVDropStbReq req = {0};
×
51
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
52
      goto end;
×
53
    }
54
    realTbSuid = req.suid;
×
55
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
17,640✔
56
    SVCreateTbBatchReq req = {0};
8,820✔
57
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
8,820!
58
      goto end;
×
59
    }
60

61
    int32_t        needRebuild = 0;
8,820✔
62
    SVCreateTbReq* pCreateReq = NULL;
8,820✔
63
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
17,640✔
64
      pCreateReq = req.pReqs + iReq;
8,820✔
65
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
14,700!
66
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
5,880✔
67
        needRebuild++;
2,940✔
68
      }
69
    }
70
    if (needRebuild == 0) {
8,820✔
71
      // do nothing
72
    } else if (needRebuild == req.nReqs) {
2,940!
73
      realTbSuid = tbSuid;
2,940✔
74
    } else {
75
      realTbSuid = tbSuid;
×
76
      SVCreateTbBatchReq reqNew = {0};
×
77
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
78
      if (reqNew.pArray == NULL) {
×
79
        tDeleteSVCreateTbBatchReq(&req);
×
80
        goto end;
×
81
      }
82
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
83
        pCreateReq = req.pReqs + iReq;
×
84
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
×
85
            taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
×
86
          reqNew.nReqs++;
×
87
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
88
            taosArrayDestroy(reqNew.pArray);
×
89
            tDeleteSVCreateTbBatchReq(&req);
×
90
            goto end;
×
91
          }
92
        }
93
      }
94

95
      int     tlen = 0;
×
96
      int32_t ret = 0;
×
97
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
98
      void* buf = taosMemoryMalloc(tlen);
×
99
      if (NULL == buf) {
×
100
        taosArrayDestroy(reqNew.pArray);
×
101
        tDeleteSVCreateTbBatchReq(&req);
×
102
        goto end;
×
103
      }
104
      SEncoder coderNew = {0};
×
105
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
106
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
107
      tEncoderClear(&coderNew);
×
108
      if (ret < 0) {
×
109
        taosMemoryFree(buf);
×
110
        taosArrayDestroy(reqNew.pArray);
×
111
        tDeleteSVCreateTbBatchReq(&req);
×
112
        goto end;
×
113
      }
114
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
115
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
116
      taosMemoryFree(buf);
×
117
      taosArrayDestroy(reqNew.pArray);
×
118
    }
119

120
    tDeleteSVCreateTbBatchReq(&req);
8,820✔
121
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
8,820!
122
    SVAlterTbReq req = {0};
8,820✔
123

124
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
8,820!
125
      goto end;
×
126
    }
127

128
    SMetaReader mr = {0};
8,820✔
129
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
8,820✔
130

131
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
8,820!
132
      metaReaderClear(&mr);
×
133
      goto end;
×
134
    }
135
    if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
8,820✔
136
      realTbSuid = mr.me.ctbEntry.suid;
5,880✔
137
    }
138
    metaReaderClear(&mr);
8,820✔
139
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
140
    SVDropTbBatchReq req = {0};
×
141

142
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
143
      goto end;
×
144
    }
145

146
    int32_t      needRebuild = 0;
×
147
    SVDropTbReq* pDropReq = NULL;
×
148
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
149
      pDropReq = req.pReqs + iReq;
×
150

151
      if (pDropReq->suid == tbSuid &&
×
152
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
153
        needRebuild++;
×
154
      }
155
    }
156
    if (needRebuild == 0) {
×
157
      // do nothing
158
    } else if (needRebuild == req.nReqs) {
×
159
      realTbSuid = tbSuid;
×
160
    } else {
161
      realTbSuid = tbSuid;
×
162
      SVDropTbBatchReq reqNew = {0};
×
163
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
164
      if (reqNew.pArray == NULL) {
×
165
        goto end;
×
166
      }
167
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
168
        pDropReq = req.pReqs + iReq;
×
169
        if (pDropReq->suid == tbSuid &&
×
170
            taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
171
          reqNew.nReqs++;
×
172
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
173
            taosArrayDestroy(reqNew.pArray);
×
174
            goto end;
×
175
          }
176
        }
177
      }
178

179
      int     tlen = 0;
×
180
      int32_t ret = 0;
×
181
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
182
      void* buf = taosMemoryMalloc(tlen);
×
183
      if (NULL == buf) {
×
184
        taosArrayDestroy(reqNew.pArray);
×
185
        goto end;
×
186
      }
187
      SEncoder coderNew = {0};
×
188
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
189
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
190
      tEncoderClear(&coderNew);
×
191
      if (ret != 0) {
×
192
        taosMemoryFree(buf);
×
193
        taosArrayDestroy(reqNew.pArray);
×
194
        goto end;
×
195
      }
196
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
197
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
198
      taosMemoryFree(buf);
×
199
      taosArrayDestroy(reqNew.pArray);
×
200
    }
201
  } else if (msgType == TDMT_VND_DELETE) {
×
202
    SDeleteRes req = {0};
×
203
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
204
      goto end;
×
205
    }
206
    realTbSuid = req.suid;
×
207
  }
208

209
end:
20,580✔
210
  tDecoderClear(&dcoder);
20,580✔
211
  bool tmp = tbSuid == realTbSuid;
20,580✔
212
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
20,580!
213
  return tmp;
20,580✔
214
}
215

216
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
460,059,625✔
217
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
460,059,625!
218
    return -1;
×
219
  }
220
  int32_t code = -1;
461,928,789✔
221
  int32_t vgId = TD_VID(pTq->pVnode);
461,928,789✔
222
  int64_t id = pHandle->pWalReader->readerId;
462,207,917✔
223

224
  int64_t offset = *fetchOffset;
461,912,467✔
225
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
462,262,586✔
226
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
462,206,292✔
227
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
462,199,525✔
228

229
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
462,190,357!
230
          ", 0x%" PRIx64,
231
          vgId, offset, lastVer, committedVer, appliedVer, id);
232

233
  while (offset <= appliedVer) {
489,693,573✔
234
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
444,124,563!
235
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
236
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
237
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
238
      goto END;
×
239
    }
240

241
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
444,113,007!
242
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
243

244
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
444,098,537✔
245
      code = walFetchBody(pHandle->pWalReader);
415,211,457✔
246
      goto END;
415,199,247✔
247
    } else {
248
      if (pHandle->fetchMeta != WITH_DATA) {
28,920,098✔
249
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
1,958,041✔
250
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
1,958,041✔
251
          code = walFetchBody(pHandle->pWalReader);
1,587,778✔
252
          if (code < 0) {
1,586,724!
253
            goto END;
×
254
          }
255

256
          pHead = &(pHandle->pWalReader->pHead->head);
1,586,724✔
257
          if (isValValidForTable(pHandle, pHead)) {
1,587,778✔
258
            code = 0;
1,578,958✔
259
            goto END;
1,578,958✔
260
          } else {
261
            offset++;
8,820✔
262
            code = -1;
8,820✔
263
            continue;
8,820✔
264
          }
265
        }
266
      }
267
      code = walSkipFetchBody(pHandle->pWalReader);
27,332,320✔
268
      if (code < 0) {
27,331,554!
269
        goto END;
×
270
      }
271
      offset++;
27,331,554✔
272
    }
273
    code = -1;
27,331,554✔
274
  }
275

276
END:
45,569,010✔
277
  *fetchOffset = offset;
462,347,215✔
278
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
462,364,957!
279
          ", applied:%" PRId64 ", 0x%" PRIx64,
280
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
281
  return code;
462,380,009✔
282
}
283

284
bool tqGetTablePrimaryKey(STqReader* pReader) {
57,420,535✔
285
  if (pReader == NULL) {
57,420,535!
286
    return false;
×
287
  }
288
  return pReader->hasPrimaryKey;
57,420,535!
289
}
290

291
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
459,752✔
292
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
459,752!
293

294
  if (pReader == NULL) {
459,752!
295
    return;
×
296
  }
297
  bool            ret = false;
459,752✔
298
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL, 0);
459,752✔
299
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
459,752!
300
    ret = true;
×
301
  }
302
  tDeleteSchemaWrapper(schema);
303
  pReader->hasPrimaryKey = ret;
456,783✔
304
}
305

306
STqReader* tqReaderOpen(SVnode* pVnode) {
4,130,441✔
307
  tqDebug("%s:%p", __FUNCTION__, pVnode);
4,130,441!
308
  if (pVnode == NULL) {
4,153,754!
309
    return NULL;
×
310
  }
311
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
4,153,754!
312
  if (pReader == NULL) {
4,153,754!
313
    return NULL;
×
314
  }
315

316
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
4,153,754✔
317
  if (pReader->pWalReader == NULL) {
4,153,754!
318
    taosMemoryFree(pReader);
×
319
    return NULL;
×
320
  }
321

322
  pReader->pVnodeMeta = pVnode->pMeta;
4,152,283✔
323
  pReader->pColIdList = NULL;
4,151,178✔
324
  pReader->cachedSchemaVer = 0;
4,151,178✔
325
  pReader->cachedSchemaSuid = 0;
4,151,178✔
326
  pReader->pSchemaWrapper = NULL;
4,152,283✔
327
  pReader->tbIdHash = NULL;
4,152,283✔
328
  pReader->pResBlock = NULL;
4,151,178✔
329

330
  int32_t code = createDataBlock(&pReader->pResBlock);
4,152,283✔
331
  if (code) {
4,153,754!
332
    terrno = code;
×
333
  }
334

335
  return pReader;
4,152,283✔
336
}
337

338
void tqReaderClose(STqReader* pReader) {
4,202,364✔
339
  tqDebug("%s:%p", __FUNCTION__, pReader);
4,202,364!
340
  if (pReader == NULL) return;
4,202,364✔
341

342
  // close wal reader
343
  if (pReader->pWalReader) {
4,153,754!
344
    walCloseReader(pReader->pWalReader);
4,153,754✔
345
  }
346

347
  if (pReader->pSchemaWrapper) {
4,153,754✔
348
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,923,168!
349
  }
350

351
  taosMemoryFree(pReader->extSchema);
4,153,754!
352
  if (pReader->pColIdList) {
4,153,754✔
353
    taosArrayDestroy(pReader->pColIdList);
3,151,847✔
354
  }
355

356
  // free hash
357
  blockDataDestroy(pReader->pResBlock);
4,153,754✔
358
  taosHashCleanup(pReader->tbIdHash);
4,153,754✔
359
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
4,153,754✔
360

361
  taosHashCleanup(pReader->vtSourceScanInfo.pVirtualTables);
4,153,754✔
362
  taosHashCleanup(pReader->vtSourceScanInfo.pPhysicalTables);
4,153,754✔
363
  taosLRUCacheCleanup(pReader->vtSourceScanInfo.pPhyTblSchemaCache);
4,153,754✔
364
  taosMemoryFree(pReader);
4,153,754!
365
}
366

367
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
7,379,588✔
368
  if (pReader == NULL) {
7,379,588!
369
    return TSDB_CODE_INVALID_PARA;
×
370
  }
371
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
7,379,588✔
372
    return terrno;
251,468✔
373
  }
374
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
7,131,084!
375
  return 0;
7,131,084✔
376
}
377

378
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
942,497,598✔
379
  if (pReader == NULL) {
942,497,598!
380
    return false;
×
381
  }
382
  SWalReader* pWalReader = pReader->pWalReader;
942,497,598✔
383

384
  int64_t st = taosGetTimestampMs();
942,622,946✔
385
  while (1) {
961,065,273✔
386
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
1,903,688,219✔
387
    while (pReader->nextBlk < numOfBlocks) {
2,077,316,874✔
388
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
961,168,747!
389
              pReader->msg.ver);
390

391
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
961,168,747✔
392
      if (pSubmitTbData == NULL) {
961,072,226!
393
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
394
                pReader->msg.ver);
395
        return false;
516✔
396
      }
397
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
961,072,226✔
398
        pReader->nextBlk += 1;
304,008✔
399
        continue;
304,008✔
400
      }
401
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
960,697,224✔
402
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
787,341,739!
403
        SSDataBlock* pRes = NULL;
787,341,739✔
404
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
787,340,828✔
405
        if (code == TSDB_CODE_SUCCESS) {
787,358,549!
406
          return true;
787,358,549✔
407
        }
408
      } else {
409
        pReader->nextBlk += 1;
173,444,751✔
410
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
173,448,539!
411
      }
412
    }
413

414
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
1,116,214,367✔
415
    pReader->msg.msgStr = NULL;
1,116,192,091✔
416

417
    int64_t elapsed = taosGetTimestampMs() - st;
1,116,193,023✔
418
    if (elapsed > 1000 || elapsed < 0) {
1,116,193,023!
419
      return false;
3,234✔
420
    }
421

422
    // try next message in wal file
423
    if (walNextValidMsg(pWalReader, false) < 0) {
1,116,189,853✔
424
      return false;
155,118,413✔
425
    }
426

427
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
961,000,960✔
428
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
961,063,575✔
429
    int64_t ver = pWalReader->pHead->head.version;
961,065,016✔
430
    SDecoder decoder = {0};
961,044,865✔
431
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder) != 0) {
961,023,223!
432
      tDecoderClear(&decoder);
×
433
      return false;
×
434
    }
435
    tDecoderClear(&decoder);
960,924,896✔
436
    pReader->nextBlk = 0;
961,025,570✔
437
  }
438
}
439

440
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
1,376,126,403✔
441
  if (pReader == NULL) {
1,376,126,403!
442
    return TSDB_CODE_INVALID_PARA;
×
443
  }
444
  pReader->msg.msgStr = msgStr;
1,376,126,403✔
445
  pReader->msg.msgLen = msgLen;
1,376,245,944✔
446
  pReader->msg.ver = ver;
1,376,229,359✔
447

448
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
1,376,256,940!
449

450
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
1,376,256,940✔
451
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
1,376,194,513✔
452

453
  if (code != 0) {
1,376,181,417!
454
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
455
  }
456

457
  return code;
1,376,191,230✔
458
}
459

460
void tqReaderClearSubmitMsg(STqReader* pReader) {
828,569,515✔
461
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
828,569,515✔
462
  pReader->nextBlk = 0;
828,425,021✔
463
  pReader->msg.msgStr = NULL;
829,602,709✔
464
}
828,216,224✔
465

466
SWalReader* tqGetWalReader(STqReader* pReader) {
1,111,548,677✔
467
  if (pReader == NULL) {
1,111,548,677!
468
    return NULL;
×
469
  }
470
  return pReader->pWalReader;
1,111,548,677✔
471
}
472

473
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
942,561,683✔
474
  if (pReader == NULL) {
942,561,683!
475
    return NULL;
×
476
  }
477
  return pReader->pResBlock;
942,561,683✔
478
}
479

480
int64_t tqGetResultBlockTime(STqReader* pReader) {
942,539,921✔
481
  if (pReader == NULL) {
942,539,921!
482
    return 0;
×
483
  }
484
  return pReader->lastTs;
942,539,921✔
485
}
486

487
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
207,399,835✔
488
  int32_t code = false;
207,399,835✔
489
  int32_t lino = 0;
207,399,835✔
490
  int64_t uid = 0;
207,399,835✔
491
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
207,399,835!
492
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
207,399,835!
493
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
207,429,574!
494

495
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
207,439,743✔
496
  while (pReader->nextBlk < blockSz) {
218,019,953✔
497
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
109,034,377✔
498
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
109,034,736!
499
    uid = pSubmitTbData->uid;
109,034,736✔
500
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
109,033,612✔
501
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
109,031,724✔
502

503
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
10,588,898!
504
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
505
    pReader->nextBlk++;
10,588,898✔
506
  }
507

508
  tqReaderClearSubmitMsg(pReader);
108,993,905✔
509
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
108,993,904!
510

511
END:
108,993,904✔
512
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
207,436,730!
513
  return code;
207,391,558✔
514
}
515

516
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
611,223,228✔
517
  int32_t code = false;
611,223,228✔
518
  int32_t lino = 0;
611,223,228✔
519
  int64_t uid = 0;
611,223,228✔
520

521
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
611,223,228!
522
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
611,223,228!
523
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
611,943,949!
524

525
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
611,943,949✔
526
  while (pReader->nextBlk < blockSz) {
612,163,290✔
527
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
306,287,025✔
528
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
306,324,448!
529
    uid = pSubmitTbData->uid;
306,324,448✔
530
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
306,381,679✔
531
    TSDB_CHECK_NULL(ret, code, lino, END, true);
306,308,718!
532
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
533
    pReader->nextBlk++;
×
534
  }
535
  tqReaderClearSubmitMsg(pReader);
305,891,664✔
536
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
305,372,342!
537

538
END:
305,372,342✔
539
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
611,681,060!
540
  return code;
609,907,935✔
541
}
542

543
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
401,500,533✔
544
                    SExtSchema* extSrc) {
545
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
401,500,533!
546
    return TSDB_CODE_INVALID_PARA;
×
547
  }
548
  int32_t code = 0;
402,507,364✔
549

550
  int32_t cnt = 0;
402,507,364✔
551
  for (int32_t i = 0; i < pSrc->nCols; i++) {
2,147,483,647✔
552
    cnt += mask[i];
1,843,326,871✔
553
  }
554

555
  pDst->nCols = cnt;
403,228,990✔
556
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
402,770,021!
557
  if (pDst->pSchema == NULL) {
401,917,613!
558
    return TAOS_GET_TERRNO(terrno);
×
559
  }
560

561
  int32_t j = 0;
401,571,622✔
562
  for (int32_t i = 0; i < pSrc->nCols; i++) {
2,147,483,647✔
563
    if (mask[i]) {
1,843,608,680✔
564
      pDst->pSchema[j++] = pSrc->pSchema[i];
1,844,477,683✔
565
      SColumnInfoData colInfo =
1,844,614,299✔
566
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
1,844,179,470✔
567
      if (extSrc != NULL) {
1,843,809,659✔
568
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
624,170✔
569
      }
570
      code = blockDataAppendColInfo(pBlock, &colInfo);
1,843,809,659✔
571
      if (code != 0) {
1,844,962,180!
572
        return code;
×
573
      }
574
    }
575
  }
576
  return 0;
402,877,316✔
577
}
578

579
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
1,371,963✔
580
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
1,371,963!
581
    return TSDB_CODE_INVALID_PARA;
×
582
  }
583
  SSDataBlock* pBlock = pReader->pResBlock;
1,376,846✔
584
  if (blockDataGetNumOfCols(pBlock) > 0) {
1,376,846✔
585
    blockDataDestroy(pBlock);
5,827✔
586
    int32_t code = createDataBlock(&pReader->pResBlock);
5,827✔
587
    if (code) {
5,827!
588
      return code;
×
589
    }
590
    pBlock = pReader->pResBlock;
5,827✔
591

592
    pBlock->info.id.uid = pReader->cachedSchemaUid;
5,827✔
593
    pBlock->info.version = pReader->msg.ver;
5,827✔
594
  }
595

596
  int32_t numOfCols = taosArrayGetSize(pColIdList);
1,376,846✔
597

598
  if (numOfCols == 0) {  // all columns are required
1,376,846!
599
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
600
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
601
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
602

603
      if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
×
604
        decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
605
      }
606
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
607
      if (code != TSDB_CODE_SUCCESS) {
×
608
        blockDataFreeRes(pBlock);
×
609
        return terrno;
×
610
      }
611
    }
612
  } else {
613
    if (numOfCols > pSchema->nCols) {
1,376,846✔
614
      numOfCols = pSchema->nCols;
4,002✔
615
    }
616

617
    int32_t i = 0;
1,375,708✔
618
    int32_t j = 0;
1,375,708✔
619
    while (i < pSchema->nCols && j < numOfCols) {
11,562,446✔
620
      SSchema* pColSchema = &pSchema->pSchema[i];
10,184,913✔
621
      col_id_t colIdSchema = pColSchema->colId;
10,181,905✔
622

623
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
10,184,913✔
624
      if (pColIdNeed == NULL) {
10,184,868!
625
        break;
×
626
      }
627
      if (colIdSchema < *pColIdNeed) {
10,184,868✔
628
        i++;
265,377✔
629
      } else if (colIdSchema > *pColIdNeed) {
9,919,491!
630
        j++;
×
631
      } else {
632
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
9,921,361✔
633
        if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
9,920,223!
634
          decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
487,824✔
635
        }
636
        int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
9,921,361✔
637
        if (code != TSDB_CODE_SUCCESS) {
9,920,223!
638
          return -1;
×
639
        }
640
        i++;
9,920,223✔
641
        j++;
9,920,223✔
642
      }
643
    }
644
  }
645

646
  return TSDB_CODE_SUCCESS;
1,376,846✔
647
}
648

649
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
650
  int32_t code = 0;
×
651
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
652
    return TSDB_CODE_INVALID_PARA;
×
653
  }
654
  // TODO(yhDeng)
655
  if (COL_VAL_IS_VALUE(pColVal)) {
×
656
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
657
    if (val == NULL) {
×
658
      return terrno;
×
659
    }
660

661
    uint64_t seq = 0;
×
662
    int32_t  len = 0;
×
663
    if (pColVal->value.pData != NULL) {
×
664
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
665
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
666
      }
667
      SBlobItem item = {0};
×
668
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
669
      if (code != 0) {
×
670
        taosMemoryFree(val);
×
671
        terrno = code;
×
672
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
673
        return code;
×
674
      }
675

676
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
677
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
678
      len = item.len;
×
679
    }
680

681
    blobDataSetLen(val, len);
×
682
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
683

684
    taosMemoryFree(val);
×
685
  } else {
686
    colDataSetNULL(pColumnInfoData, idx);
×
687
  }
688
  return code;
×
689
}
690
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
691
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
692

693
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647!
694
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647!
695
      char val[65535 + 2] = {0};
2,147,483,647✔
696
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
697
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647!
698
      }
699
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
700
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
701
    } else {
702
      colDataSetNULL(pColumnInfoData, rowIndex);
×
703
    }
704
  } else {
705
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647!
706
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647!
707
  }
708

709
  return code;
2,147,483,647✔
710
}
711

712
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
787,143,198✔
713
  if (pReader == NULL || pRes == NULL) {
787,143,198!
714
    return TSDB_CODE_INVALID_PARA;
×
715
  }
716
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
787,416,903✔
717
  int32_t        code = 0;
787,522,668✔
718
  int32_t        line = 0;
787,522,668✔
719
  STSchema*      pTSchema = NULL;
787,522,668✔
720
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
787,522,668✔
721
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
787,500,342!
722
  SSDataBlock* pBlock = pReader->pResBlock;
787,500,342✔
723
  *pRes = pBlock;
787,500,342✔
724

725
  blockDataCleanup(pBlock);
787,503,393✔
726

727
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
787,506,444✔
728
  int32_t sversion = pSubmitTbData->sver;
787,505,302✔
729
  int64_t suid = pSubmitTbData->suid;
787,501,137✔
730
  int64_t uid = pSubmitTbData->uid;
787,507,978✔
731
  pReader->lastTs = pSubmitTbData->ctimeMs;
787,497,728✔
732

733
  pBlock->info.id.uid = uid;
787,504,160✔
734
  pBlock->info.version = pReader->msg.ver;
787,493,143✔
735

736
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
787,493,524✔
737
      (pReader->cachedSchemaVer != sversion)) {
786,115,755✔
738
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,386,613✔
739
    taosMemoryFree(pReader->extSchema);
1,376,846!
740
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema, 0);
1,374,939✔
741
    if (pReader->pSchemaWrapper == NULL) {
1,375,718!
742
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
743
             "version %d, possibly dropped table",
744
             vgId, suid, uid, pReader->cachedSchemaVer);
745
      pReader->cachedSchemaSuid = 0;
×
746
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
747
    }
748

749
    pReader->cachedSchemaUid = uid;
1,373,810✔
750
    pReader->cachedSchemaSuid = suid;
1,375,708✔
751
    pReader->cachedSchemaVer = sversion;
1,375,708✔
752

753
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
1,375,708!
754
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
755
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
756
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
757
    }
758
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
1,372,682✔
759
    TSDB_CHECK_CODE(code, line, END);
1,376,846!
760
    pBlock = pReader->pResBlock;
1,376,846✔
761
    *pRes = pBlock;
1,376,846✔
762
  }
763

764
  int32_t numOfRows = 0;
787,489,825✔
765
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
787,489,825✔
766
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
11,676✔
767
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
11,676!
768
    numOfRows = pCol->nVal;
11,676✔
769
  } else {
770
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
787,474,788✔
771
  }
772

773
  code = blockDataEnsureCapacity(pBlock, numOfRows);
787,489,185✔
774
  TSDB_CHECK_CODE(code, line, END);
787,473,459!
775
  pBlock->info.rows = numOfRows;
787,473,459✔
776
  int32_t colActual = blockDataGetNumOfCols(pBlock);
787,486,778✔
777

778
  // convert and scan one block
779
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
787,495,399✔
780
    SArray* pCols = pSubmitTbData->aCol;
11,676✔
781
    int32_t numOfCols = taosArrayGetSize(pCols);
11,676✔
782
    int32_t targetIdx = 0;
11,676✔
783
    int32_t sourceIdx = 0;
11,676✔
784
    while (targetIdx < colActual) {
52,542✔
785
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
40,866✔
786
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
40,866!
787
      if (sourceIdx >= numOfCols) {
40,866✔
788
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
11,676!
789
        colDataSetNNULL(pColData, 0, numOfRows);
11,676!
790
        targetIdx++;
11,676✔
791
        continue;
11,676✔
792
      }
793

794
      uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
29,190!
795

796
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
29,190✔
797
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
29,190!
798
      SColVal colVal = {0};
29,190✔
799
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
29,190!
800
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
801
      if (pCol->cid < pColData->info.colId) {
29,190✔
802
        sourceIdx++;
11,676✔
803
      } else if (pCol->cid == pColData->info.colId) {
17,514✔
804
        for (int32_t i = 0; i < pCol->nVal; i++) {
35,028✔
805
          code = tColDataGetValue(pCol, i, &colVal);
23,352✔
806
          TSDB_CHECK_CODE(code, line, END);
23,352!
807

808
          if (isBlob == 0) {
23,352!
809
            code = doSetVal(pColData, i, &colVal);
23,352✔
810
          } else {
811
            code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobSet);
×
812
          }
813
          TSDB_CHECK_CODE(code, line, END);
23,352!
814
        }
815
        sourceIdx++;
11,676✔
816
        targetIdx++;
11,676✔
817
      } else {
818
        colDataSetNNULL(pColData, 0, numOfRows);
5,838!
819
        targetIdx++;
5,838✔
820
      }
821
    }
822
  } else {
823
    SArray*         pRows = pSubmitTbData->aRowP;
787,485,636✔
824
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
787,483,723✔
825
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
787,484,865✔
826
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
787,483,059!
827

828
    for (int32_t i = 0; i < numOfRows; i++) {
2,147,483,647✔
829
      SRow* pRow = taosArrayGetP(pRows, i);
2,147,483,647✔
830
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
831
      int32_t sourceIdx = 0;
2,147,483,647✔
832
      for (int32_t j = 0; j < colActual; j++) {
2,147,483,647✔
833
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
2,147,483,647✔
834
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
2,147,483,647!
835

836
        uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647!
837
        while (1) {
897,479,577✔
838
          SColVal colVal = {0};
2,147,483,647✔
839
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
2,147,483,647✔
840
          TSDB_CHECK_CODE(code, line, END);
2,147,483,647!
841

842
          if (colVal.cid < pColData->info.colId) {
2,147,483,647✔
843
            sourceIdx++;
897,477,823✔
844
            continue;
897,477,823✔
845
          } else if (colVal.cid == pColData->info.colId) {
2,147,483,647!
846
            if (isBlob == 0) {
2,147,483,647!
847
              code = doSetVal(pColData, i, &colVal);
2,147,483,647✔
848
            } else {
849
              code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobSet);
×
850
            }
851

852
            TSDB_CHECK_CODE(code, line, END);
2,147,483,647!
853

854
            sourceIdx++;
2,147,483,647✔
855
            break;
2,147,483,647✔
856
          } else {
857
            colDataSetNULL(pColData, i);
×
858
            break;
×
859
          }
860
        }
861
      }
862
    }
863
  }
864

865
END:
1,647,240,876✔
866
  if (code != 0) {
949,743,484!
867
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
868
  }
869
  taosMemoryFreeClear(pTSchema);
787,328,545!
870
  return code;
787,434,526✔
871
}
872

873
#define PROCESS_VAL                                      \
874
  if (curRow == 0) {                                     \
875
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
876
    buildNew = true;                                     \
877
  } else {                                               \
878
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
879
    if (currentRowAssigned != assigned[j]) {             \
880
      assigned[j] = currentRowAssigned;                  \
881
      buildNew = true;                                   \
882
    }                                                    \
883
  }
884

885
#define SET_DATA                                                                                    \
886
  if (colVal.cid < pColData->info.colId) {                                                          \
887
    sourceIdx++;                                                                                    \
888
  } else if (colVal.cid == pColData->info.colId) {                                                  \
889
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
890
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
891
    } else {                                                                                        \
892
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
893
    }                                                                                               \
894
    sourceIdx++;                                                                                    \
895
    targetIdx++;                                                                                    \
896
  } else {                                                                                          \
897
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
898
    targetIdx++;                                                                                    \
899
  }
900

901
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
401,114,678✔
902
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
903
  int32_t         code = 0;
401,114,678✔
904
  SSchemaWrapper* pSW = NULL;
401,114,678✔
905
  SSDataBlock*    block = NULL;
401,700,705✔
906
  if (taosArrayGetSize(blocks) > 0) {
401,700,705!
907
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
908
    TQ_NULL_GO_TO_END(pLastBlock);
×
909
    pLastBlock->info.rows = curRow - *lastRow;
×
910
    *lastRow = curRow;
×
911
  }
912

913
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
402,198,963!
914
  TQ_NULL_GO_TO_END(block);
401,878,026!
915

916
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
401,878,026!
917
  TQ_NULL_GO_TO_END(pSW);
401,841,754!
918

919
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
401,841,754!
920
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
402,767,958!
921
          (int32_t)taosArrayGetSize(block->pDataBlock));
922

923
  block->info.id.uid = pSubmitTbData->uid;
402,767,958✔
924
  block->info.version = pReader->msg.ver;
402,509,514✔
925
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
402,723,083!
926
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
402,511,166!
927
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
402,603,257!
928
  pSW = NULL;
402,603,257✔
929

930
  taosMemoryFreeClear(block);
402,603,257!
931

932
END:
402,400,008✔
933
  if (code != 0) {
402,321,039!
934
    tqError("processBuildNew failed, code:%d", code);
×
935
  }
936
  tDeleteSchemaWrapper(pSW);
402,321,039!
937
  blockDataFreeRes(block);
401,771,911✔
938
  taosMemoryFree(block);
401,912,021!
939
  return code;
401,839,467✔
940
}
941
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
738,558✔
942
  int32_t code = 0;
738,558✔
943
  int32_t curRow = 0;
738,558✔
944
  int32_t lastRow = 0;
738,558✔
945

946
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
738,558✔
947
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
737,504!
948
  TQ_NULL_GO_TO_END(assigned);
738,558!
949

950
  SArray*   pCols = pSubmitTbData->aCol;
738,558✔
951
  SColData* pCol = taosArrayGet(pCols, 0);
738,558✔
952
  TQ_NULL_GO_TO_END(pCol);
738,558!
953
  int32_t numOfRows = pCol->nVal;
738,558✔
954
  int32_t numOfCols = taosArrayGetSize(pCols);
738,558✔
955
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
737,504!
956
          numOfRows);
957
  for (int32_t i = 0; i < numOfRows; i++) {
416,454,003✔
958
    bool buildNew = false;
408,955,696✔
959

960
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
1,595,761,898✔
961
      int32_t k = 0;
1,156,220,168✔
962
      for (; k < numOfCols; k++) {
2,147,483,647✔
963
        pCol = taosArrayGet(pCols, k);
2,050,415,537✔
964
        TQ_NULL_GO_TO_END(pCol);
1,965,252,769!
965
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
1,965,252,769✔
966
          SColVal colVal = {0};
1,146,014,986✔
967
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
1,166,938,055!
968
          PROCESS_VAL
1,180,066,711!
969
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
1,187,487,636!
970
          break;
1,157,380,574✔
971
        }
972
      }
973
      if (k >= numOfCols) {
1,186,806,202!
974
        // this column is not in the current row, so we set it to NULL
975
        assigned[j] = 0;
×
976
        buildNew = true;
×
977
      }
978
    }
979

980
    if (buildNew) {
371,815,079✔
981
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
738,558!
982
    }
983

984
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
371,815,079✔
985
    TQ_NULL_GO_TO_END(pBlock);
410,840,871!
986

987
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
410,840,871!
988
            (int32_t)taosArrayGetSize(blocks));
989

990
    int32_t targetIdx = 0;
410,840,871✔
991
    int32_t sourceIdx = 0;
410,840,871✔
992
    int32_t colActual = blockDataGetNumOfCols(pBlock);
410,840,871✔
993
    while (targetIdx < colActual && sourceIdx < numOfCols) {
1,630,115,806!
994
      pCol = taosArrayGet(pCols, sourceIdx);
1,214,399,307✔
995
      TQ_NULL_GO_TO_END(pCol);
1,163,458,621!
996
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
1,163,458,621✔
997
      TQ_NULL_GO_TO_END(pColData);
1,120,100,123!
998
      SColVal colVal = {0};
1,120,100,123✔
999
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
1,114,158,796!
1000
      SET_DATA
1,192,231,023!
1001
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
1,200,234,653!
1002
    }
1003

1004
    curRow++;
415,716,499✔
1005
  }
1006
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
7,498,307✔
1007
  pLastBlock->info.rows = curRow - lastRow;
738,558✔
1008
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
738,558!
1009
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1010
END:
212,146,161✔
1011
  if (code != TSDB_CODE_SUCCESS) {
738,558!
1012
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1013
  }
1014
  taosMemoryFree(assigned);
738,558!
1015
  return code;
738,558✔
1016
}
1017

1018
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
400,400,875✔
1019
  int32_t   code = 0;
400,400,875✔
1020
  STSchema* pTSchema = NULL;
400,400,875✔
1021

1022
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
400,400,875✔
1023
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
401,046,216!
1024
  TQ_NULL_GO_TO_END(assigned);
401,066,556!
1025

1026
  int32_t curRow = 0;
401,066,556✔
1027
  int32_t lastRow = 0;
401,066,556✔
1028
  SArray* pRows = pSubmitTbData->aRowP;
400,719,261✔
1029
  int32_t numOfRows = taosArrayGetSize(pRows);
401,887,767✔
1030
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
401,704,044✔
1031
  TQ_NULL_GO_TO_END(pTSchema);
401,306,054!
1032
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
401,306,054!
1033

1034
  for (int32_t i = 0; i < numOfRows; i++) {
2,147,483,647✔
1035
    bool  buildNew = false;
2,147,483,647✔
1036
    SRow* pRow = taosArrayGetP(pRows, i);
2,147,483,647✔
1037
    TQ_NULL_GO_TO_END(pRow);
2,147,483,647!
1038

1039
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1040
      SColVal colVal = {0};
2,147,483,647✔
1041
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647!
1042
      PROCESS_VAL
2,147,483,647!
1043
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647!
1044
    }
1045

1046
    if (buildNew) {
2,147,483,647✔
1047
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
402,086,127!
1048
    }
1049

1050
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
2,147,483,647✔
1051
    TQ_NULL_GO_TO_END(pBlock);
2,147,483,647!
1052

1053
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
2,147,483,647!
1054
            (int32_t)taosArrayGetSize(blocks));
1055

1056
    int32_t targetIdx = 0;
2,147,483,647✔
1057
    int32_t sourceIdx = 0;
2,147,483,647✔
1058
    int32_t colActual = blockDataGetNumOfCols(pBlock);
2,147,483,647✔
1059
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647!
1060
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1061
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647!
1062
      SColVal          colVal = {0};
2,147,483,647✔
1063
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647!
1064
      SET_DATA
2,147,483,647!
1065
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647!
1066
    }
1067

1068
    curRow++;
2,147,483,647✔
1069
  }
1070
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
401,338,753✔
1071
  if (pLastBlock != NULL) {
402,130,960!
1072
    pLastBlock->info.rows = curRow - lastRow;
402,133,929✔
1073
  }
1074

1075
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
401,886,338!
1076
          (int)taosArrayGetSize(blocks));
1077
END:
429,209,721✔
1078
  if (code != TSDB_CODE_SUCCESS) {
401,891,995!
1079
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1080
  }
1081
  taosMemoryFreeClear(pTSchema);
401,134,015!
1082
  taosMemoryFree(assigned);
401,622,020!
1083
  return code;
400,709,439✔
1084
}
1085

1086
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
86,249✔
1087
  int32_t code = 0;
86,249✔
1088
  int32_t lino = 0;
86,249✔
1089
  void*   createReq = NULL;
86,249✔
1090
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
86,249!
1091
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
86,249!
1092

1093
  if (pRsp->createTableNum == 0) {
86,249✔
1094
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
48,965✔
1095
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
48,965!
1096
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
48,965✔
1097
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
48,965!
1098
  }
1099

1100
  uint32_t len = 0;
86,249✔
1101
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
86,249!
1102
  TSDB_CHECK_CODE(code, lino, END);
86,249!
1103
  createReq = taosMemoryCalloc(1, len);
86,249!
1104
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
86,249!
1105

1106
  SEncoder encoder = {0};
86,249✔
1107
  tEncoderInit(&encoder, createReq, len);
86,249✔
1108
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
86,249✔
1109
  tEncoderClear(&encoder);
86,249✔
1110
  TSDB_CHECK_CODE(code, lino, END);
86,249!
1111
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
172,498!
1112
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
172,498!
1113
  pRsp->createTableNum++;
86,249✔
1114
  tqTrace("build create table info msg success");
86,249!
1115

1116
END:
86,249✔
1117
  if (code != 0) {
86,249!
1118
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1119
    taosMemoryFree(createReq);
×
1120
  }
1121
  return code;
86,249✔
1122
}
1123

1124
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
404,671,500✔
1125
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1126
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
404,671,500!
1127
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
404,671,500✔
1128
  if (pSubmitTbData == NULL) {
404,814,852!
1129
    return terrno;
×
1130
  }
1131
  pReader->nextBlk++;
404,814,852✔
1132

1133
  if (pSubmitTbDataRet) {
404,617,036!
1134
    *pSubmitTbDataRet = pSubmitTbData;
404,812,201✔
1135
  }
1136

1137
  if (fetchMeta == ONLY_META) {
404,612,267✔
1138
    if (pSubmitTbData->pCreateTbReq != NULL) {
63,096✔
1139
      if (pRsp->createTableReq == NULL) {
11,472✔
1140
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
2,868✔
1141
        if (pRsp->createTableReq == NULL) {
2,868!
1142
          return terrno;
×
1143
        }
1144
      }
1145
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
22,944!
1146
        return terrno;
×
1147
      }
1148
      pSubmitTbData->pCreateTbReq = NULL;
11,472✔
1149
    }
1150
    return 0;
63,096✔
1151
  }
1152

1153
  int32_t sversion = pSubmitTbData->sver;
404,549,171✔
1154
  int64_t uid = pSubmitTbData->uid;
404,613,924✔
1155
  pReader->lastBlkUid = uid;
404,634,064✔
1156

1157
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
404,630,659✔
1158
  taosMemoryFreeClear(pReader->extSchema);
404,627,612!
1159
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema, 0);
404,474,612✔
1160
  if (pReader->pSchemaWrapper == NULL) {
404,469,714✔
1161
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
1,899,186!
1162
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1163
    pReader->cachedSchemaSuid = 0;
1,899,186✔
1164
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
1,899,186✔
1165
  }
1166

1167
  if (pSubmitTbData->pCreateTbReq != NULL) {
401,976,949✔
1168
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
86,249✔
1169
    if (code != 0) {
86,249!
1170
      return code;
×
1171
    }
1172
  } else if (rawList != NULL) {
402,414,054!
1173
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1174
      return terrno;
×
1175
    }
1176
    pReader->pSchemaWrapper = NULL;
×
1177
    return 0;
×
1178
  }
1179

1180
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
402,500,303✔
1181
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
737,504✔
1182
  } else {
1183
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
400,566,815✔
1184
  }
1185
}
1186

1187
int32_t tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList, const char* id) {
3,148,874✔
1188
  if (pReader == NULL) {
3,148,874!
1189
    return TSDB_CODE_SUCCESS;
×
1190
  }
1191
  pReader->pColIdList = pColIdList;
3,148,874✔
1192
  return tqCollectPhysicalTables(pReader, id);
3,150,739✔
1193
}
1194

1195
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
3,376,663✔
1196
  if (pReader == NULL || tbUidList == NULL) {
3,376,663!
1197
    return TSDB_CODE_SUCCESS;
×
1198
  }
1199
  if (pReader->tbIdHash) {
3,376,663✔
1200
    taosHashClear(pReader->tbIdHash);
41,898✔
1201
  } else {
1202
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
3,329,948✔
1203
    if (pReader->tbIdHash == NULL) {
3,334,765!
1204
      tqError("s-task:%s failed to init hash table", id);
×
1205
      return terrno;
×
1206
    }
1207
  }
1208

1209
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
97,474,691✔
1210
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
94,110,827✔
1211
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
93,920,958!
1212
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1213
      continue;
×
1214
    }
1215
  }
1216

1217
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
3,371,056!
1218
  return TSDB_CODE_SUCCESS;
3,376,663✔
1219
}
1220

1221
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
3,575,810✔
1222
  if (pReader == NULL || pTableUidList == NULL) {
3,575,810!
1223
    return;
×
1224
  }
1225
  if (pReader->tbIdHash == NULL) {
3,575,810!
1226
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1227
    if (pReader->tbIdHash == NULL) {
×
1228
      tqError("failed to init hash table");
×
1229
      return;
×
1230
    }
1231
  }
1232

1233
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
3,575,810✔
1234
  for (int i = 0; i < numOfTables; i++) {
5,685,236✔
1235
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
2,109,426✔
1236
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
2,109,426!
1237
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1238
      continue;
×
1239
    }
1240
  }
1241
}
1242

1243
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1244
  if (pReader == NULL) {
×
1245
    return false;
×
1246
  }
1247
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1248
}
1249

1250
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1251
  if (pReader == NULL) {
×
1252
    return false;
×
1253
  }
1254
  return pReader->msg.msgStr == NULL;
×
1255
}
1256

1257
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
16,521✔
1258
  if (pReader == NULL || tbUidList == NULL) {
16,521!
1259
    return;
×
1260
  }
1261
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
25,414✔
1262
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
8,893✔
1263
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
8,893!
1264
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1265
    }
1266
  }
1267
}
1268

1269
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
669,226,660✔
1270
  if (pTq == NULL) {
669,226,660!
1271
    return 0;  // mounted vnode may have no tq
×
1272
  }
1273
  if (tbUidList == NULL) {
669,226,660!
1274
    return TSDB_CODE_INVALID_PARA;
×
1275
  }
1276
  void*   pIter = NULL;
669,226,660✔
1277
  int32_t vgId = TD_VID(pTq->pVnode);
669,226,660✔
1278

1279
  // update the table list for each consumer handle
1280
  taosWLockLatch(&pTq->lock);
669,245,371✔
1281
  while (1) {
12,867,901✔
1282
    pIter = taosHashIterate(pTq->pHandle, pIter);
682,108,647✔
1283
    if (pIter == NULL) {
682,103,745✔
1284
      break;
669,235,844✔
1285
    }
1286

1287
    STqHandle* pTqHandle = (STqHandle*)pIter;
12,867,901✔
1288
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
12,867,901✔
1289
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
3,592,331✔
1290
      if (code != 0) {
3,592,331!
1291
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1292
        continue;
×
1293
      }
1294
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
9,275,570✔
1295
      if (!isAdd) {
9,233,672✔
1296
        int32_t sz = taosArrayGetSize(tbUidList);
3,106,918✔
1297
        for (int32_t i = 0; i < sz; i++) {
3,106,918!
1298
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1299
          if (tbUid &&
×
1300
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1301
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1302
            continue;
×
1303
          }
1304
        }
1305
      }
1306
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
41,898!
1307
      if (isAdd) {
41,898!
1308
        SArray* list = NULL;
41,898✔
1309
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
41,898✔
1310
                                    &list, pTqHandle->execHandle.task);
1311
        if (ret == 0) {
41,898!
1312
          ret = tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
41,898✔
1313
        }                            
1314
        if (ret != TDB_CODE_SUCCESS) {
41,898!
1315
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1316
                  pTqHandle->consumerId);
1317
          taosArrayDestroy(list);
×
1318
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1319
          taosWUnLockLatch(&pTq->lock);
×
1320

1321
          return ret;
×
1322
        }
1323
        taosArrayDestroy(list);
41,898✔
1324
      } else {
1325
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1326
      }
1327
    }
1328
  }
1329
  taosWUnLockLatch(&pTq->lock);
669,235,844✔
1330

1331
  // update the table list handle for each stream scanner/wal reader
1332
/* STREAMTODO
1333
  streamMetaWLock(pTq->pStreamMeta);
1334
  while (1) {
1335
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
1336
    if (pIter == NULL) {
1337
      break;
1338
    }
1339

1340
    int64_t      refId = *(int64_t*)pIter;
1341
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
1342
    if (pTask != NULL) {
1343
      int32_t taskId = pTask->id.taskId;
1344

1345
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
1346
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
1347
        if (code != 0) {
1348
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
1349
        }
1350
      }
1351
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
1352
      if (ret) {
1353
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
1354
      }
1355
    }
1356
  }
1357

1358
  streamMetaWUnLock(pTq->pStreamMeta);
1359
*/  
1360
  return 0;
669,243,395✔
1361
}
1362

1363
static void destroySourceScanTables(void* ptr) {
×
1364
  SArray** pTables = ptr;
×
1365
  if (pTables && *pTables) {
×
1366
    taosArrayDestroy(*pTables);
×
1367
    *pTables = NULL;
×
1368
  }
1369
}
×
1370

1371
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1372
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1373
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1374
  if (pCol1->vColId == pCol2->vColId) {
×
1375
    return 0;
×
1376
  } else if (pCol1->vColId < pCol2->vColId) {
×
1377
    return -1;
×
1378
  } else {
1379
    return 1;
×
1380
  }
1381
}
1382

1383
int32_t tqReaderSetVtableInfo(STqReader* pReader, void* vnode, void* ptr, SSHashObj* pVtableInfos,
×
1384
                              SSDataBlock** ppResBlock, const char* idstr) {
1385
  int32_t            code = TSDB_CODE_SUCCESS;
×
1386
  int32_t            lino = 0;
×
1387
  SStorageAPI*       pAPI = ptr;
×
1388
  SVTSourceScanInfo* pScanInfo = NULL;
×
1389
  SHashObj*          pVirtualTables = NULL;
×
1390
  SMetaReader        metaReader = {0};
×
1391
  SVTColInfo         colInfo = {0};
×
1392
  SSchemaWrapper*    schema = NULL;
×
1393

1394
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1395
  TSDB_CHECK_NULL(vnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1396
  TSDB_CHECK_NULL(pAPI, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1397

1398
  pScanInfo = &pReader->vtSourceScanInfo;
×
1399
  taosHashCleanup(pScanInfo->pVirtualTables);
×
1400
  pScanInfo->pVirtualTables = NULL;
×
1401

1402
  if (tSimpleHashGetSize(pVtableInfos) == 0) {
×
1403
    goto _end;
×
1404
  }
1405

1406
  pVirtualTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1407
  TSDB_CHECK_NULL(pVirtualTables, code, lino, _end, terrno);
×
1408
  taosHashSetFreeFp(pVirtualTables, destroySourceScanTables);
×
1409

1410
  int32_t iter = 0;
×
1411
  void*   px = tSimpleHashIterate(pVtableInfos, NULL, &iter);
×
1412
  while (px != NULL) {
×
1413
    int64_t vTbUid = *(int64_t*)tSimpleHashGetKey(px, NULL);
×
1414
    SArray* pColInfos = taosArrayInit(8, sizeof(SVTColInfo));
×
1415
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, terrno);
×
1416
    code = taosHashPut(pVirtualTables, &vTbUid, sizeof(int64_t), &pColInfos, POINTER_BYTES);
×
1417
    TSDB_CHECK_CODE(code, lino, _end);
×
1418

1419
    SSHashObj* pPhysicalTables = *(SSHashObj**)px;
×
1420
    int32_t    iterIn = 0;
×
1421
    void*      pxIn = tSimpleHashIterate(pPhysicalTables, NULL, &iterIn);
×
1422
    while (pxIn != NULL) {
×
1423
      char* physicalTableName = tSimpleHashGetKey(pxIn, NULL);
×
1424
      pAPI->metaReaderFn.clearReader(&metaReader);
×
1425
      pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1426
      code = pAPI->metaReaderFn.getTableEntryByName(&metaReader, physicalTableName);
×
1427
      TSDB_CHECK_CODE(code, lino, _end);
×
1428
      pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1429
      colInfo.pTbUid = metaReader.me.uid;
×
1430

1431
      switch (metaReader.me.type) {
×
1432
        case TSDB_CHILD_TABLE: {
×
1433
          int64_t suid = metaReader.me.ctbEntry.suid;
×
1434
          pAPI->metaReaderFn.clearReader(&metaReader);
×
1435
          pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1436
          code = pAPI->metaReaderFn.getTableEntryByUid(&metaReader, suid);
×
1437
          TSDB_CHECK_CODE(code, lino, _end);
×
1438
          pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1439
          schema = &metaReader.me.stbEntry.schemaRow;
×
1440
          break;
×
1441
        }
1442
        case TSDB_NORMAL_TABLE: {
×
1443
          schema = &metaReader.me.ntbEntry.schemaRow;
×
1444
          break;
×
1445
        }
1446
        default: {
×
1447
          tqError("invalid table type: %d", metaReader.me.type);
×
1448
          code = TSDB_CODE_INVALID_PARA;
×
1449
          TSDB_CHECK_CODE(code, lino, _end);
×
1450
        }
1451
      }
1452

1453
      SArray* pCols = *(SArray**)pxIn;
×
1454
      int32_t ncols = taosArrayGetSize(pCols);
×
1455
      for (int32_t i = 0; i < ncols; ++i) {
×
1456
        SColIdName* pCol = taosArrayGet(pCols, i);
×
1457
        colInfo.vColId = pCol->colId;
×
1458

1459
        for (int32_t j = 0; j < schema->nCols; ++j) {
×
1460
          if (strncmp(pCol->colName, schema->pSchema[j].name, strlen(schema->pSchema[j].name)) == 0) {
×
1461
            colInfo.pColId = schema->pSchema[j].colId;
×
1462
            void* px = taosArrayPush(pColInfos, &colInfo);
×
1463
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1464
            break;
×
1465
          }
1466
        }
1467
      }
1468

1469
      taosArraySort(pColInfos, compareSVTColInfo);
×
1470
      pxIn = tSimpleHashIterate(pPhysicalTables, pxIn, &iterIn);
×
1471
    }
1472

1473
    px = tSimpleHashIterate(pVtableInfos, px, &iter);
×
1474
  }
1475

1476
  pScanInfo->pVirtualTables = pVirtualTables;
×
1477
  pVirtualTables = NULL;
×
1478

1479
  // set the result data block
1480
  if (pReader->pResBlock) {
×
1481
    blockDataDestroy(pReader->pResBlock);
×
1482
  }
1483
  pReader->pResBlock = *ppResBlock;
×
1484
  *ppResBlock = NULL;
×
1485

1486
  // update reader callback for vtable source scan
1487
  pAPI->tqReaderFn.tqNextBlockImpl = tqNextVTableSourceBlockImpl;
×
1488
  pAPI->tqReaderFn.tqReaderIsQueriedTable = tqReaderIsQueriedSourceTable;
×
1489

1490
_end:
×
1491
  if (code != TSDB_CODE_SUCCESS) {
×
1492
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1493
  }
1494
  pAPI->metaReaderFn.clearReader(&metaReader);
×
1495
  if (pVirtualTables != NULL) {
×
1496
    taosHashCleanup(pVirtualTables);
×
1497
  }
1498
  return code;
×
1499
}
1500

1501
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
3,131,851✔
1502
  int32_t            code = TSDB_CODE_SUCCESS;
3,131,851✔
1503
  int32_t            lino = 0;
3,131,851✔
1504
  SVTSourceScanInfo* pScanInfo = NULL;
3,131,851✔
1505
  SHashObj*          pVirtualTables = NULL;
3,131,851✔
1506
  SHashObj*          pPhysicalTables = NULL;
3,131,851✔
1507
  void*              pIter = NULL;
3,131,851✔
1508
  void*              px = NULL;
3,131,851✔
1509

1510
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
3,131,851!
1511

1512
  pScanInfo = &pReader->vtSourceScanInfo;
3,131,851✔
1513
  taosHashCleanup(pScanInfo->pPhysicalTables);
3,151,847✔
1514
  pScanInfo->pPhysicalTables = NULL;
3,151,847✔
1515
  taosLRUCacheCleanup(pScanInfo->pPhyTblSchemaCache);
3,151,847✔
1516
  pScanInfo->pPhyTblSchemaCache = NULL;
3,150,746✔
1517
  pScanInfo->nextVirtualTableIdx = -1;
3,151,847✔
1518
  pScanInfo->metaFetch = 0;
3,150,006✔
1519
  pScanInfo->cacheHit = 0;
3,151,847✔
1520

1521
  pVirtualTables = pScanInfo->pVirtualTables;
3,150,720✔
1522
  if (taosHashGetSize(pVirtualTables) == 0 || taosArrayGetSize(pReader->pColIdList) == 0) {
3,148,852!
1523
    goto _end;
3,151,847✔
1524
  }
1525

1526
  pPhysicalTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1527
  TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno);
×
1528
  taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables);
×
1529

1530
  pIter = taosHashIterate(pVirtualTables, NULL);
×
1531
  while (pIter != NULL) {
×
1532
    int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL);
×
1533
    SArray* pColInfos = *(SArray**)pIter;
×
1534
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1535

1536
    // Traverse all required columns and collect corresponding physical tables
1537
    int32_t nColInfos = taosArrayGetSize(pColInfos);
×
1538
    int32_t nOutputCols = taosArrayGetSize(pReader->pColIdList);
×
1539
    for (int32_t i = 0, j = 0; i < nColInfos && j < nOutputCols;) {
×
1540
      SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
1541
      col_id_t    colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
×
1542
      if (pCol->vColId < colIdNeed) {
×
1543
        i++;
×
1544
      } else if (pCol->vColId > colIdNeed) {
×
1545
        j++;
×
1546
      } else {
1547
        SArray* pRelatedVTs = NULL;
×
1548
        px = taosHashGet(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t));
×
1549
        if (px == NULL) {
×
1550
          pRelatedVTs = taosArrayInit(8, sizeof(int64_t));
×
1551
          TSDB_CHECK_NULL(pRelatedVTs, code, lino, _end, terrno);
×
1552
          code = taosHashPut(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t), &pRelatedVTs, POINTER_BYTES);
×
1553
          if (code != TSDB_CODE_SUCCESS) {
×
1554
            taosArrayDestroy(pRelatedVTs);
×
1555
            TSDB_CHECK_CODE(code, lino, _end);
×
1556
          }
1557
        } else {
1558
          pRelatedVTs = *(SArray**)px;
×
1559
        }
1560
        if (taosArrayGetSize(pRelatedVTs) == 0 || *(int64_t*)taosArrayGetLast(pRelatedVTs) != vTbUid) {
×
1561
          px = taosArrayPush(pRelatedVTs, &vTbUid);
×
1562
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1563
        }
1564
        i++;
×
1565
        j++;
×
1566
      }
1567
    }
1568
    pIter = taosHashIterate(pVirtualTables, pIter);
×
1569
  }
1570

1571
  pScanInfo->pPhysicalTables = pPhysicalTables;
×
1572
  pPhysicalTables = NULL;
×
1573

1574
  if (taosHashGetSize(pScanInfo->pPhysicalTables) > 0) {
×
1575
    pScanInfo->pPhyTblSchemaCache = taosLRUCacheInit(1024 * 128, -1, .5);
×
1576
    TSDB_CHECK_NULL(pScanInfo->pPhyTblSchemaCache, code, lino, _end, terrno);
×
1577
  }
1578

1579
_end:
×
1580
  if (code != TSDB_CODE_SUCCESS) {
3,151,847!
1581
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1582
  }
1583
  if (pIter != NULL) {
3,150,376!
1584
    taosHashCancelIterate(pReader->tbIdHash, pIter);
×
1585
  }
1586
  if (pPhysicalTables != NULL) {
3,149,271!
1587
    taosHashCleanup(pPhysicalTables);
×
1588
  }
1589
  return code;
3,150,376✔
1590
}
1591

1592
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1593
  if (value) {
×
1594
    SSchemaWrapper* pSchemaWrapper = value;
×
1595
    tDeleteSchemaWrapper(pSchemaWrapper);
1596
  }
1597
}
×
1598

1599
bool tqNextVTableSourceBlockImpl(STqReader* pReader, const char* idstr) {
×
1600
  int32_t            code = TSDB_CODE_SUCCESS;
×
1601
  int32_t            lino = 0;
×
1602
  SVTSourceScanInfo* pScanInfo = NULL;
×
1603

1604
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1605

1606
  pScanInfo = &pReader->vtSourceScanInfo;
×
1607
  if (pReader->msg.msgStr == NULL || taosHashGetSize(pScanInfo->pPhysicalTables) == 0) {
×
1608
    return false;
×
1609
  }
1610

1611
  if (pScanInfo->nextVirtualTableIdx >= 0) {
×
1612
    // The data still needs to be converted into the virtual table result block
1613
    return true;
×
1614
  }
1615

1616
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
1617
  while (pReader->nextBlk < blockSz) {
×
1618
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
1619
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
1620
    int64_t pTbUid = pSubmitTbData->uid;
×
1621
    void*   px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
1622
    if (px != NULL) {
×
1623
      SArray* pRelatedVTs = *(SArray**)px;
×
1624
      if (taosArrayGetSize(pRelatedVTs) > 0) {
×
1625
        pScanInfo->nextVirtualTableIdx = 0;
×
1626
        return true;
×
1627
      }
1628
    }
1629
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, pTbUid);
×
1630
    pReader->nextBlk++;
×
1631
  }
1632

1633
  tqReaderClearSubmitMsg(pReader);
×
1634
  tqTrace("iterator data block end, total block num:%d", blockSz);
×
1635

1636
_end:
×
1637
  if (code != TSDB_CODE_SUCCESS) {
×
1638
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1639
  }
1640
  return false;
×
1641
}
1642

1643
bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) {
×
1644
  if (pReader == NULL) {
×
1645
    return false;
×
1646
  }
1647
  return taosHashGet(pReader->vtSourceScanInfo.pPhysicalTables, &uid, sizeof(uint64_t)) != NULL;
×
1648
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc