• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4952

06 Feb 2026 07:29AM UTC coverage: 66.869% (-0.02%) from 66.887%
#4952

push

travis-ci

web-flow
merge: from main to 3.0 #34521

758 of 1081 new or added lines in 28 files covered. (70.12%)

6496 existing lines in 142 files now uncovered.

205752 of 307696 relevant lines covered (66.87%)

126028909.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.97
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "taoserror.h"
17
#include "tarray.h"
18
#include "tmsg.h"
19
#include "tq.h"
20

21
static void processCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
1,255✔
22
  int32_t code = 0;
1,255✔
23
  int32_t lino = 0;
1,255✔
24
  int32_t        needRebuild = 0;
1,255✔
25
  SVCreateTbReq* pCreateReq = NULL;
1,255✔
26
  SVCreateTbBatchReq reqNew = {0};
1,255✔
27
  void* buf = NULL;
1,255✔
28
  SVCreateTbBatchReq req = {0};
1,255✔
29
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
1,255✔
30
  if (code < 0) {
1,255✔
31
    lino = __LINE__;
×
32
    goto end;
×
33
  }
34

35
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2,823✔
36
    pCreateReq = req.pReqs + iReq;
1,568✔
37
    if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
2,822✔
38
        taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
1,254✔
39
      needRebuild++;
627✔
40
    }
41
  }
42
  if (needRebuild == 0) {
1,255✔
43
    // do nothing
44
  } else if (needRebuild == req.nReqs) {
627✔
45
    *realTbSuid = tbSuid;
314✔
46
  } else {
47
    *realTbSuid = tbSuid;
313✔
48
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
313✔
49
    if (reqNew.pArray == NULL) {
313✔
50
      code = terrno;
×
51
      lino = __LINE__;
×
52
      goto end;
×
53
    }
54
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
939✔
55
      pCreateReq = req.pReqs + iReq;
626✔
56
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
1,252✔
57
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
626✔
58
        reqNew.nReqs++;
313✔
59
        if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
626✔
60
          code = terrno;
×
61
          lino = __LINE__;
×
62
          goto end;
×
63
        }
64
      }
65
    }
66

67
    int     tlen = 0;
313✔
68
    tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
313✔
69
    buf = taosMemoryMalloc(tlen);
313✔
70
    if (NULL == buf || code < 0) {
313✔
71
      lino = __LINE__;
×
72
      goto end;
×
73
    }
74
    SEncoder coderNew = {0};
313✔
75
    tEncoderInit(&coderNew, buf, tlen);
313✔
76
    code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
313✔
77
    tEncoderClear(&coderNew);
313✔
78
    if (code < 0) {
313✔
79
      lino = __LINE__;
×
80
      goto end;
×
81
    }
82
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
313✔
83
    pHead->bodyLen = tlen + sizeof(SMsgHead);
313✔
84
  }
85

86
end:
1,255✔
87
  taosMemoryFree(buf);
1,255✔
88
  taosArrayDestroy(reqNew.pArray);
1,255✔
89
  tDeleteSVCreateTbBatchReq(&req);
1,255✔
90
  if (code < 0) {
1,255✔
91
    tqError("processCreateTbMsg failed, code:%d, line:%d", code, lino);
×
92
  }
93
}
1,255✔
94

95
static void processAlterTbMsg(SDecoder* dcoder, STqReader* pReader, int64_t* realTbSuid) {
942✔
96
  SVAlterTbReq req = {0};
942✔
97
  SMetaReader mr = {0};
942✔
98
  int32_t lino = 0;
942✔
99
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
942✔
100
  if (code < 0) {
942✔
101
    lino = __LINE__;
×
102
    goto end;
×
103
  }
104

105
  metaReaderDoInit(&mr, pReader->pVnodeMeta, META_READER_LOCK);
942✔
106

107
  code = metaGetTableEntryByName(&mr, req.tbName);
942✔
108
  if (code < 0) {
942✔
109
    lino = __LINE__;
×
110
    goto end;
×
111
  }
112
  if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
942✔
113
    *realTbSuid = mr.me.ctbEntry.suid;
628✔
114
  }
115

116
end:
942✔
117
  taosArrayDestroy(req.pMultiTag);
942✔
118
  metaReaderClear(&mr);  
942✔
119
  if (code < 0) {
942✔
120
    tqError("processAlterTbMsg failed, code:%d, line:%d", code, lino);
×
121
  }
122
} 
942✔
123

124
static void processDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
×
125
  SVDropTbBatchReq req = {0};
×
126
  SVDropTbBatchReq reqNew = {0};
×
127
  void* buf = NULL;
×
128
  int32_t lino = 0;
×
129
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
×
130
  if (code < 0) {
×
131
    lino = __LINE__;
×
132
    goto end;
×
133
  }
134

135
  int32_t      needRebuild = 0;
×
136
  SVDropTbReq* pDropReq = NULL;
×
137
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
138
    pDropReq = req.pReqs + iReq;
×
139

140
    if (pDropReq->suid == tbSuid &&
×
141
        taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
142
      needRebuild++;
×
143
    }
144
  }
145
  if (needRebuild == 0) {
×
146
    // do nothing
147
  } else if (needRebuild == req.nReqs) {
×
148
    *realTbSuid = tbSuid;
×
149
  } else {
150
    *realTbSuid = tbSuid;
×
151
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
152
    if (reqNew.pArray == NULL) {
×
153
      code = terrno;
×
154
      lino = __LINE__;
×
155
      goto end;
×
156
    }
157
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
158
      pDropReq = req.pReqs + iReq;
×
159
      if (pDropReq->suid == tbSuid &&
×
160
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
161
        reqNew.nReqs++;
×
162
        if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
163
          code = terrno;
×
164
          lino = __LINE__;
×
165
          goto end;
×
166
        }
167
      }
168
    }
169

170
    int     tlen = 0;
×
171
    tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
×
172
    buf = taosMemoryMalloc(tlen);
×
173
    if (NULL == buf || code < 0) {
×
174
      lino = __LINE__;
×
175
      goto end;
×
176
    }
177
    SEncoder coderNew = {0};
×
178
    tEncoderInit(&coderNew, buf, tlen);
×
179
    code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
180
    tEncoderClear(&coderNew);
×
181
    if (code != 0) {
×
182
      lino = __LINE__;
×
183
      goto end;
×
184
    }
185
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
186
    pHead->bodyLen = tlen + sizeof(SMsgHead);
×
187
  }
188

189
end:
×
190
  taosMemoryFree(buf);
×
191
  taosArrayDestroy(reqNew.pArray);
×
192
  if (code < 0) {
×
193
    tqError("processDropTbMsg failed, code:%d, line:%d", code, lino);
×
194
  }
195
}
×
196

197
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
146,771✔
198
  int32_t code = 0;
146,771✔
199
  int32_t lino = 0;
146,771✔
200
  if (pHandle == NULL || pHead == NULL) {
146,771✔
201
    return false;
×
202
  }
203
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
146,771✔
204
    return true;
143,947✔
205
  }
206

207
  STqExecHandle* pExec = &pHandle->execHandle;
2,824✔
208
  STqReader* pReader = pExec->pTqReader;
2,824✔
209

210
  int16_t msgType = pHead->msgType;
2,824✔
211
  char*   body = pHead->body;
2,824✔
212
  int32_t bodyLen = pHead->bodyLen;
2,824✔
213

214
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
2,824✔
215
  int64_t  realTbSuid = 0;
2,824✔
216
  SDecoder dcoder = {0};
2,824✔
217
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
2,824✔
218
  int32_t  len = bodyLen - sizeof(SMsgHead);
2,824✔
219
  tDecoderInit(&dcoder, data, len);
2,824✔
220

221
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
3,451✔
222
    SVCreateStbReq req = {0};
627✔
223
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
627✔
224
      goto end;
×
225
    }
226
    realTbSuid = req.suid;
627✔
227
  } else if (msgType == TDMT_VND_DROP_STB) {
2,197✔
228
    SVDropStbReq req = {0};
×
229
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
230
      goto end;
×
231
    }
232
    realTbSuid = req.suid;
×
233
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2,197✔
234
    processCreateTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
1,255✔
235
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
942✔
236
    processAlterTbMsg(&dcoder, pReader, &realTbSuid);
942✔
237
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
238
    processDropTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
×
239
  } else if (msgType == TDMT_VND_DELETE) {
×
240
    SDeleteRes req = {0};
×
241
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
242
      goto end;
×
243
    }
244
    realTbSuid = req.suid;
×
245
  }
246

247
end:
2,824✔
248
  tDecoderClear(&dcoder);
2,824✔
249
  bool tmp = tbSuid == realTbSuid;
2,824✔
250
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
2,824✔
251
  return tmp;
2,824✔
252
}
253

254
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
49,861,462✔
255
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
49,861,462✔
256
    return -1;
×
257
  }
258
  int32_t code = -1;
49,934,999✔
259
  int32_t vgId = TD_VID(pTq->pVnode);
49,934,999✔
260
  int64_t id = pHandle->pWalReader->readerId;
49,944,643✔
261

262
  int64_t offset = *fetchOffset;
49,948,533✔
263
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
49,958,956✔
264
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
49,955,036✔
265
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
49,952,590✔
266

267
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
49,955,362✔
268
          ", 0x%" PRIx64,
269
          vgId, offset, lastVer, committedVer, appliedVer, id);
270

271
  while (offset <= appliedVer) {
52,798,686✔
272
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
47,190,432✔
273
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
274
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
275
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
276
      goto END;
×
277
    }
278

279
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
47,190,148✔
280
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
281

282
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
47,191,397✔
283
      code = walFetchBody(pHandle->pWalReader);
44,210,464✔
284
      goto END;
44,209,296✔
285
    } else {
286
      if (pHandle->fetchMeta != WITH_DATA) {
2,980,546✔
287
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
190,038✔
288
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
190,038✔
289
          code = walFetchBody(pHandle->pWalReader);
146,771✔
290
          if (code < 0) {
146,771✔
291
            goto END;
×
292
          }
293

294
          pHead = &(pHandle->pWalReader->pHead->head);
146,771✔
295
          if (isValValidForTable(pHandle, pHead)) {
146,771✔
296
            code = 0;
145,829✔
297
            goto END;
145,829✔
298
          } else {
299
            offset++;
942✔
300
            code = -1;
942✔
301
            continue;
942✔
302
          }
303
        }
304
      }
305
      code = walSkipFetchBody(pHandle->pWalReader);
2,833,775✔
306
      if (code < 0) {
2,833,433✔
307
        goto END;
×
308
      }
309
      offset++;
2,833,433✔
310
    }
311
    code = -1;
2,833,433✔
312
  }
313

314
END:
5,608,254✔
315
  *fetchOffset = offset;
49,963,379✔
316
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
49,963,178✔
317
          ", applied:%" PRId64 ", 0x%" PRIx64,
318
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
319
  return code;
49,964,688✔
320
}
321

322
bool tqGetTablePrimaryKey(STqReader* pReader) {
5,855,269✔
323
  if (pReader == NULL) {
5,855,269✔
324
    return false;
×
325
  }
326
  return pReader->hasPrimaryKey;
5,855,269✔
327
}
328

329
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
44,637✔
330
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
44,637✔
331

332
  if (pReader == NULL) {
44,637✔
333
    return;
×
334
  }
335
  bool            ret = false;
44,637✔
336
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL, 0);
44,637✔
337
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
44,637✔
338
    ret = true;
896✔
339
  }
340
  tDeleteSchemaWrapper(schema);
341
  pReader->hasPrimaryKey = ret;
44,637✔
342
}
343

344
STqReader* tqReaderOpen(SVnode* pVnode) {
439,885✔
345
  tqDebug("%s:%p", __FUNCTION__, pVnode);
439,885✔
346
  if (pVnode == NULL) {
441,645✔
347
    return NULL;
×
348
  }
349
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
441,645✔
350
  if (pReader == NULL) {
441,645✔
351
    return NULL;
×
352
  }
353

354
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
441,645✔
355
  if (pReader->pWalReader == NULL) {
441,645✔
356
    taosMemoryFree(pReader);
×
357
    return NULL;
×
358
  }
359

360
  pReader->pVnodeMeta = pVnode->pMeta;
441,645✔
361
  pReader->pColIdList = NULL;
441,645✔
362
  pReader->cachedSchemaVer = 0;
441,645✔
363
  pReader->cachedSchemaSuid = 0;
441,645✔
364
  pReader->pSchemaWrapper = NULL;
441,645✔
365
  pReader->tbIdHash = NULL;
441,379✔
366
  pReader->pResBlock = NULL;
441,379✔
367

368
  return pReader;
441,645✔
369
}
370

371
void tqReaderClose(STqReader* pReader) {
445,334✔
372
  tqDebug("%s:%p", __FUNCTION__, pReader);
445,334✔
373
  if (pReader == NULL) return;
446,000✔
374

375
  // close wal reader
376
  if (pReader->pWalReader) {
441,645✔
377
    walCloseReader(pReader->pWalReader);
441,645✔
378
  }
379

380
  if (pReader->pSchemaWrapper) {
441,645✔
381
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
207,256✔
382
  }
383

384
  taosMemoryFree(pReader->extSchema);
441,645✔
385
  if (pReader->pColIdList) {
441,645✔
386
    taosArrayDestroy(pReader->pColIdList);
324,135✔
387
  }
388

389
  // free hash
390
  blockDataDestroy(pReader->pResBlock);
441,645✔
391
  taosHashCleanup(pReader->tbIdHash);
441,645✔
392
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
441,645✔
393

394
  taosMemoryFree(pReader);
441,645✔
395
}
396

397
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
996,694✔
398
  if (pReader == NULL) {
996,694✔
399
    return TSDB_CODE_INVALID_PARA;
×
400
  }
401
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
996,694✔
402
    return terrno;
21,321✔
403
  }
404
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
975,114✔
405
  return 0;
975,373✔
406
}
407

408
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
120,164,837✔
409
  if (pReader == NULL) {
120,164,837✔
410
    return false;
×
411
  }
412
  SWalReader* pWalReader = pReader->pWalReader;
120,164,837✔
413

414
  int64_t st = taosGetTimestampMs();
120,176,332✔
415
  while (1) {
126,741,455✔
416
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
246,917,787✔
417
    while (pReader->nextBlk < numOfBlocks) {
274,492,458✔
418
      tqDebug("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
126,752,616✔
419
              pReader->msg.ver);
420

421
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
126,762,048✔
422
      if (pSubmitTbData == NULL) {
126,760,375✔
423
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
424
                pReader->msg.ver);
425
        return false;
153✔
426
      }
427
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
126,760,375✔
428
        pReader->nextBlk += 1;
27,454✔
429
        continue;
27,454✔
430
      }
431
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
126,729,545✔
432
        tqDebug("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
99,175,842✔
433
        SSDataBlock* pRes = NULL;
99,174,474✔
434
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
99,173,300✔
435
        if (code == TSDB_CODE_SUCCESS) {
99,162,104✔
436
          return true;
99,162,104✔
437
        }
438
      } else {
439
        pReader->nextBlk += 1;
27,558,519✔
440
        tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
27,558,168✔
441
      }
442
    }
443

444
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
147,747,660✔
445
    pReader->msg.msgStr = NULL;
147,746,804✔
446

447
    int64_t elapsed = taosGetTimestampMs() - st;
147,745,591✔
448
    if (elapsed > 1000 || elapsed < 0) {
147,745,591✔
449
      return false;
×
450
    }
451

452
    // try next message in wal file
453
    if (walNextValidMsg(pWalReader, false) < 0) {
147,746,292✔
454
      return false;
21,002,622✔
455
    }
456

457
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
126,735,042✔
458
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
126,742,751✔
459
    int64_t ver = pWalReader->pHead->head.version;
126,741,436✔
460
    SDecoder decoder = {0};
126,742,886✔
461
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder) != 0) {
126,741,498✔
462
      tDecoderClear(&decoder);
×
463
      return false;
×
464
    }
465
    tDecoderClear(&decoder);
126,732,549✔
466
    pReader->nextBlk = 0;
126,739,025✔
467
  }
468
}
469

470
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
170,943,469✔
471
  if (pReader == NULL) {
170,943,469✔
472
    return TSDB_CODE_INVALID_PARA;
×
473
  }
474
  pReader->msg.msgStr = msgStr;
170,943,469✔
475
  pReader->msg.msgLen = msgLen;
170,948,424✔
476
  pReader->msg.ver = ver;
170,949,420✔
477

478
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
170,950,441✔
479

480
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
170,950,441✔
481
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
170,948,834✔
482

483
  if (code != 0) {
170,939,949✔
484
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
485
  }
486

487
  return code;
170,932,933✔
488
}
489

490
void tqReaderClearSubmitMsg(STqReader* pReader) {
88,312,755✔
491
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
88,312,755✔
492
  pReader->nextBlk = 0;
88,329,251✔
493
  pReader->msg.msgStr = NULL;
88,354,784✔
494
}
88,362,748✔
495

496
SWalReader* tqGetWalReader(STqReader* pReader) {
142,916,170✔
497
  if (pReader == NULL) {
142,916,170✔
498
    return NULL;
×
499
  }
500
  return pReader->pWalReader;
142,916,170✔
501
}
502

503
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
120,172,522✔
504
  if (pReader == NULL) {
120,172,522✔
505
    return NULL;
×
506
  }
507
  return pReader->pResBlock;
120,172,522✔
508
}
509

510
int64_t tqGetResultBlockTime(STqReader* pReader) {
120,164,730✔
511
  if (pReader == NULL) {
120,164,730✔
512
    return 0;
×
513
  }
514
  return pReader->lastTs;
120,164,730✔
515
}
516

517
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
22,718,913✔
518
  int32_t code = false;
22,718,913✔
519
  int32_t lino = 0;
22,718,913✔
520
  int64_t uid = 0;
22,718,913✔
521
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
22,718,913✔
522
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
22,718,913✔
523
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
22,722,470✔
524

525
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
22,724,203✔
526
  while (pReader->nextBlk < blockSz) {
23,868,806✔
527
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
11,935,816✔
528
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
11,936,429✔
529
    uid = pSubmitTbData->uid;
11,936,429✔
530
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
11,936,429✔
531
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
11,936,421✔
532

533
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
1,144,928✔
534
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
535
    pReader->nextBlk++;
1,144,928✔
536
  }
537

538
  tqReaderClearSubmitMsg(pReader);
11,929,539✔
539
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
11,930,152✔
540

541
END:
11,930,152✔
542
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
22,721,645✔
543
  return code;
22,718,580✔
544
}
545

546
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
64,508,751✔
547
  int32_t code = false;
64,508,751✔
548
  int32_t lino = 0;
64,508,751✔
549
  int64_t uid = 0;
64,508,751✔
550

551
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
64,508,751✔
552
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
64,508,751✔
553
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
64,537,266✔
554

555
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
64,537,266✔
556
  while (pReader->nextBlk < blockSz) {
64,556,817✔
557
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
32,290,267✔
558
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
32,294,354✔
559
    uid = pSubmitTbData->uid;
32,294,354✔
560
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
32,294,041✔
561
    TSDB_CHECK_NULL(ret, code, lino, END, true);
32,293,801✔
562
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
563
    pReader->nextBlk++;
×
564
  }
565
  tqReaderClearSubmitMsg(pReader);
32,267,549✔
566
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
32,243,515✔
567

568
END:
32,243,515✔
569
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
64,537,316✔
570
  return code;
64,482,153✔
571
}
572

573
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
42,814,803✔
574
                    SExtSchema* extSrc) {
575
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
42,814,803✔
576
    return TSDB_CODE_INVALID_PARA;
13✔
577
  }
578
  int32_t code = 0;
42,827,872✔
579

580
  int32_t cnt = 0;
42,827,872✔
581
  for (int32_t i = 0; i < pSrc->nCols; i++) {
239,898,713✔
582
    cnt += mask[i];
197,062,055✔
583
  }
584

585
  pDst->nCols = cnt;
42,862,075✔
586
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
42,878,177✔
587
  if (pDst->pSchema == NULL) {
42,835,913✔
588
    return TAOS_GET_TERRNO(terrno);
×
589
  }
590

591
  int32_t j = 0;
42,831,433✔
592
  for (int32_t i = 0; i < pSrc->nCols; i++) {
240,010,852✔
593
    if (mask[i]) {
197,098,000✔
594
      pDst->pSchema[j++] = pSrc->pSchema[i];
197,149,525✔
595
      SColumnInfoData colInfo =
197,148,841✔
596
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
197,154,002✔
597
      if (extSrc != NULL) {
197,117,664✔
598
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
56,462✔
599
      }
600
      code = blockDataAppendColInfo(pBlock, &colInfo);
197,117,664✔
601
      if (code != 0) {
197,166,706✔
602
        return code;
×
603
      }
604
    }
605
  }
606
  return 0;
42,880,943✔
607
}
608

609
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
143,213✔
610
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
143,213✔
611
    return TSDB_CODE_INVALID_PARA;
×
612
  }
613
  blockDataDestroy(pReader->pResBlock);
143,213✔
614
  int32_t code = createDataBlock(&pReader->pResBlock);
143,213✔
615
  if (code) {
143,213✔
616
    return code;
×
617
  }
618
  SSDataBlock* pBlock = pReader->pResBlock;
143,213✔
619

620
  int32_t numOfCols = taosArrayGetSize(pColIdList);
143,213✔
621
  if (numOfCols == 0) {  // all columns are required
143,213✔
622
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
623
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
624
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
625

626
      if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
×
627
        decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
628
      }
629
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
630
      if (code != TSDB_CODE_SUCCESS) {
×
631
        blockDataFreeRes(pBlock);
×
632
        return code;
×
633
      }
634
    }
635
  } else {
636
    int32_t i = 0;
143,213✔
637
    int32_t j = 0;
143,213✔
638
    while (i < pSchema->nCols && j < numOfCols) {
1,074,731✔
639
      SSchema* pColSchema = &pSchema->pSchema[i];
931,518✔
640
      col_id_t colIdSchema = pColSchema->colId;
931,518✔
641

642
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
931,518✔
643
      if (pColIdNeed == NULL) {
931,518✔
644
        break;
×
645
      }
646
      if (colIdSchema < *pColIdNeed) {
931,518✔
647
        i++;
42,510✔
648
      } else if (colIdSchema > *pColIdNeed) {
889,008✔
649
        j++;
×
650
      } else {
651
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
889,008✔
652
        if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
889,008✔
653
          decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
32,542✔
654
        }
655
        int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
889,008✔
656
        if (code != TSDB_CODE_SUCCESS) {
889,008✔
657
          blockDataFreeRes(pBlock);
×
658
          return code;
×
659
        }
660
        i++;
889,008✔
661
        j++;
889,008✔
662
      }
663
    }
664
  }
665

666
  return TSDB_CODE_SUCCESS;
143,213✔
667
}
668

669
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
670
  int32_t code = 0;
×
671
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
672
    return TSDB_CODE_INVALID_PARA;
×
673
  }
674
  // TODO(yhDeng)
675
  if (COL_VAL_IS_VALUE(pColVal)) {
×
676
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
677
    if (val == NULL) {
×
678
      return terrno;
×
679
    }
680

681
    uint64_t seq = 0;
×
682
    int32_t  len = 0;
×
683
    if (pColVal->value.pData != NULL) {
×
684
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
685
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
686
      }
687
      SBlobItem item = {0};
×
688
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
689
      if (code != 0) {
×
690
        taosMemoryFree(val);
×
691
        terrno = code;
×
692
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
693
        return code;
×
694
      }
695

696
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
697
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
698
      len = item.len;
×
699
    }
700

701
    blobDataSetLen(val, len);
×
702
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
703

704
    taosMemoryFree(val);
×
705
  } else {
706
    colDataSetNULL(pColumnInfoData, idx);
×
707
  }
708
  return code;
×
709
}
710
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
711
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
712

713
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
714
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
715
      char val[65535 + 2] = {0};
2,147,483,647✔
716
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
717
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647✔
718
      }
719
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
720
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
721
    } else {
722
      colDataSetNULL(pColumnInfoData, rowIndex);
×
723
    }
724
  } else {
725
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
726
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
727
  }
728

729
  return code;
2,147,483,647✔
730
}
731

732
static int32_t checkAndSetDataBlock(STqReader* pReader, SSubmitTbData* pSubmitTbData) {
99,165,132✔
733
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
99,165,132✔
734
  int32_t sversion = pSubmitTbData->sver;
99,173,993✔
735
  int64_t suid = pSubmitTbData->suid;
99,175,352✔
736
  int64_t uid = pSubmitTbData->uid;
99,175,694✔
737
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
99,175,352✔
738
      (pReader->cachedSchemaVer != sversion)) {
99,031,797✔
739
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
138,425✔
740
    taosMemoryFree(pReader->extSchema);
143,213✔
741
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema, 0);
143,213✔
742
    if (pReader->pSchemaWrapper == NULL) {
143,213✔
743
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d, possibly dropped table",
×
744
              vgId, suid, uid, pReader->cachedSchemaVer);
745
      pReader->cachedSchemaSuid = 0;
×
746
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
747
    }
748

749
    pReader->cachedSchemaUid = uid;
143,213✔
750
    pReader->cachedSchemaSuid = suid;
143,213✔
751
    pReader->cachedSchemaVer = sversion;
143,213✔
752

753
    return buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
143,213✔
754
  }
755
  return TSDB_CODE_SUCCESS;
99,031,113✔
756
}
757

758
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
99,171,664✔
759
  if (pReader == NULL || pRes == NULL) {
99,171,664✔
760
    return TSDB_CODE_INVALID_PARA;
×
761
  }
762
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
99,173,642✔
763
  int32_t        code = 0;
99,166,214✔
764
  int32_t        line = 0;
99,166,214✔
765
  STSchema*      pTSchema = NULL;
99,166,214✔
766
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
99,166,214✔
767
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
99,174,677✔
768
  pReader->lastTs = pSubmitTbData->ctimeMs;
99,174,677✔
769

770
  code = checkAndSetDataBlock(pReader, pSubmitTbData);
99,173,309✔
771
  TSDB_CHECK_CODE(code, line, END);
99,174,326✔
772

773
  int32_t numOfRows = 0;
99,174,326✔
774
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
99,174,326✔
775
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
1,280✔
776
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
1,280✔
777
    numOfRows = pCol->nVal;
1,280✔
778
  } else {
779
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
99,173,388✔
780
  }
781

782
  SSDataBlock* pBlock = pReader->pResBlock;
99,175,041✔
783
  *pRes = pBlock;
99,175,041✔
784
  blockDataCleanup(pBlock);
99,174,024✔
785
  pBlock->info.id.uid = pSubmitTbData->uid;
99,171,923✔
786
  pBlock->info.version = pReader->msg.ver;
99,171,923✔
787
  code = blockDataEnsureCapacity(pBlock, numOfRows);
99,171,932✔
788
  TSDB_CHECK_CODE(code, line, END);
99,172,950✔
789
  pBlock->info.rows = numOfRows;
99,172,950✔
790
  int32_t colActual = blockDataGetNumOfCols(pBlock);
99,170,236✔
791

792
  // convert and scan one block
793
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
99,173,655✔
794
    SArray* pCols = pSubmitTbData->aCol;
1,280✔
795
    int32_t numOfCols = taosArrayGetSize(pCols);
1,280✔
796
    int32_t targetIdx = 0;
1,280✔
797
    int32_t sourceIdx = 0;
1,280✔
798
    while (targetIdx < colActual) {
5,760✔
799
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
4,480✔
800
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
4,480✔
801
      if (sourceIdx >= numOfCols) {
4,480✔
802
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
1,280✔
803
        colDataSetNNULL(pColData, 0, numOfRows);
1,280✔
804
        targetIdx++;
1,280✔
805
        continue;
1,280✔
806
      }
807

808
      uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
3,200✔
809

810
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
3,200✔
811
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
3,200✔
812
      SColVal colVal = {0};
3,200✔
813
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
3,200✔
814
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
815
      if (pCol->cid < pColData->info.colId) {
3,200✔
816
        sourceIdx++;
1,280✔
817
      } else if (pCol->cid == pColData->info.colId) {
1,920✔
818
        for (int32_t i = 0; i < pCol->nVal; i++) {
3,840✔
819
          code = tColDataGetValue(pCol, i, &colVal);
2,560✔
820
          TSDB_CHECK_CODE(code, line, END);
2,560✔
821

822
          if (isBlob == 0) {
2,560✔
823
            code = doSetVal(pColData, i, &colVal);
2,560✔
824
          } else {
825
            code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobSet);
×
826
          }
827
          TSDB_CHECK_CODE(code, line, END);
2,560✔
828
        }
829
        sourceIdx++;
1,280✔
830
        targetIdx++;
1,280✔
831
      } else {
832
        colDataSetNNULL(pColData, 0, numOfRows);
640✔
833
        targetIdx++;
640✔
834
      }
835
    }
836
  } else {
837
    SArray*         pRows = pSubmitTbData->aRowP;
99,172,385✔
838
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
99,172,726✔
839
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
99,172,376✔
840
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
99,170,689✔
841

842
    for (int32_t i = 0; i < numOfRows; i++) {
2,147,483,647✔
843
      SRow* pRow = taosArrayGetP(pRows, i);
2,147,483,647✔
844
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
845
      int32_t sourceIdx = 0;
2,147,483,647✔
846
      for (int32_t j = 0; j < colActual; j++) {
2,147,483,647✔
847
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
2,147,483,647✔
848
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
2,147,483,647✔
849

850
        uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
851
        while (1) {
216,112,853✔
852
          SColVal colVal = {0};
2,147,483,647✔
853
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
2,147,483,647✔
854
          TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
855

856
          if (colVal.cid < pColData->info.colId) {
2,147,483,647✔
857
            sourceIdx++;
216,113,546✔
858
            continue;
216,113,546✔
859
          } else if (colVal.cid == pColData->info.colId) {
2,147,483,647✔
860
            if (isBlob == 0) {
2,147,483,647✔
861
              code = doSetVal(pColData, i, &colVal);
2,147,483,647✔
862
            } else {
863
              code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobSet);
×
864
            }
865

866
            TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
867

868
            sourceIdx++;
2,147,483,647✔
869
            break;
2,147,483,647✔
870
          } else {
871
            colDataSetNULL(pColData, i);
×
872
            break;
×
873
          }
874
        }
875
      }
876
    }
877
  }
878

879
END:
148,039,728✔
880
  if (code != 0) {
147,722,242✔
881
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
882
  }
883
  taosMemoryFreeClear(pTSchema);
99,164,736✔
884
  return code;
99,161,393✔
885
}
886

887
#define PROCESS_VAL                                      \
888
  if (curRow == 0) {                                     \
889
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
890
    buildNew = true;                                     \
891
  } else {                                               \
892
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
893
    if (currentRowAssigned != assigned[j]) {             \
894
      assigned[j] = currentRowAssigned;                  \
895
      buildNew = true;                                   \
896
    }                                                    \
897
  }
898

899
#define SET_DATA                                                                                    \
900
  if (colVal.cid < pColData->info.colId) {                                                          \
901
    sourceIdx++;                                                                                    \
902
  } else if (colVal.cid == pColData->info.colId) {                                                  \
903
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
904
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
905
    } else {                                                                                        \
906
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
907
    }                                                                                               \
908
    sourceIdx++;                                                                                    \
909
    targetIdx++;                                                                                    \
910
  } else {                                                                                          \
911
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
912
    targetIdx++;                                                                                    \
913
  }
914

915
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
42,807,058✔
916
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
917
  int32_t         code = 0;
42,807,058✔
918
  SSchemaWrapper* pSW = NULL;
42,807,058✔
919
  SSDataBlock*    block = NULL;
42,829,460✔
920
  if (taosArrayGetSize(blocks) > 0) {
42,829,460✔
921
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
922
    TQ_NULL_GO_TO_END(pLastBlock);
×
923
    pLastBlock->info.rows = curRow - *lastRow;
×
924
    *lastRow = curRow;
×
925
  }
926

927
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
42,845,515✔
928
  TQ_NULL_GO_TO_END(block);
42,806,566✔
929

930
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
42,806,566✔
931
  TQ_NULL_GO_TO_END(pSW);
42,819,352✔
932

933
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
42,819,352✔
934
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
42,879,419✔
935
          (int32_t)taosArrayGetSize(block->pDataBlock));
936

937
  block->info.id.uid = pSubmitTbData->uid;
42,879,419✔
938
  block->info.version = pReader->msg.ver;
42,866,851✔
939
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
42,873,988✔
940
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
42,869,779✔
941
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
42,868,597✔
942
  pSW = NULL;
42,868,597✔
943

944
  taosMemoryFreeClear(block);
42,868,597✔
945

946
END:
42,879,029✔
947
  if (code != 0) {
42,872,206✔
948
    tqError("processBuildNew failed, code:%d", code);
×
949
  }
950
  tDeleteSchemaWrapper(pSW);
42,872,206✔
951
  blockDataFreeRes(block);
42,833,630✔
952
  taosMemoryFree(block);
42,830,385✔
953
  return code;
42,868,456✔
954
}
955
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
860,499✔
956
  int32_t code = 0;
860,499✔
957
  int32_t curRow = 0;
860,499✔
958
  int32_t lastRow = 0;
860,499✔
959

960
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
861,125✔
961
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
861,438✔
962
  TQ_NULL_GO_TO_END(assigned);
860,812✔
963

964
  SArray*   pCols = pSubmitTbData->aCol;
860,812✔
965
  SColData* pCol = taosArrayGet(pCols, 0);
861,438✔
966
  TQ_NULL_GO_TO_END(pCol);
861,125✔
967
  int32_t numOfRows = pCol->nVal;
861,125✔
968
  int32_t numOfCols = taosArrayGetSize(pCols);
861,438✔
969
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
861,125✔
970
          numOfRows);
971
  for (int32_t i = 0; i < numOfRows; i++) {
140,266,294✔
972
    bool buildNew = false;
139,337,079✔
973

974
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
555,533,646✔
975
      int32_t k = 0;
415,450,614✔
976
      for (; k < numOfCols; k++) {
825,278,824✔
977
        pCol = taosArrayGet(pCols, k);
808,361,951✔
978
        TQ_NULL_GO_TO_END(pCol);
807,150,710✔
979
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
807,150,710✔
980
          SColVal colVal = {0};
413,694,964✔
981
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
414,334,371✔
982
          PROCESS_VAL
418,105,335✔
983
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
418,478,887✔
984
          break;
417,507,325✔
985
        }
986
      }
987
      if (k >= numOfCols) {
416,196,567✔
988
        // this column is not in the current row, so we set it to NULL
989
        assigned[j] = 0;
×
990
        buildNew = true;
×
991
      }
992
    }
993

994
    if (buildNew) {
137,793,926✔
995
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
861,124✔
996
    }
997

998
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
137,794,240✔
999
    TQ_NULL_GO_TO_END(pBlock);
139,504,053✔
1000

1001
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
139,504,053✔
1002
            (int32_t)taosArrayGetSize(blocks));
1003

1004
    int32_t targetIdx = 0;
139,504,053✔
1005
    int32_t sourceIdx = 0;
139,504,053✔
1006
    int32_t colActual = blockDataGetNumOfCols(pBlock);
139,504,053✔
1007
    while (targetIdx < colActual && sourceIdx < numOfCols) {
554,406,859✔
1008
      pCol = taosArrayGet(pCols, sourceIdx);
415,001,064✔
1009
      TQ_NULL_GO_TO_END(pCol);
414,353,290✔
1010
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
414,353,290✔
1011
      TQ_NULL_GO_TO_END(pColData);
411,027,972✔
1012
      SColVal colVal = {0};
411,027,972✔
1013
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
412,884,535✔
1014
      SET_DATA
417,779,024✔
1015
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
415,181,597✔
1016
    }
1017

1018
    curRow++;
139,405,795✔
1019
  }
1020
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
929,215✔
1021
  pLastBlock->info.rows = curRow - lastRow;
861,438✔
1022
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
861,438✔
1023
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1024
END:
20,774,848✔
1025
  if (code != TSDB_CODE_SUCCESS) {
861,438✔
1026
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1027
  }
1028
  taosMemoryFree(assigned);
861,438✔
1029
  return code;
861,438✔
1030
}
1031

1032
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
41,948,387✔
1033
  int32_t   code = 0;
41,948,387✔
1034
  STSchema* pTSchema = NULL;
41,948,387✔
1035

1036
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
41,948,387✔
1037
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
41,989,324✔
1038
  TQ_NULL_GO_TO_END(assigned);
41,981,040✔
1039

1040
  int32_t curRow = 0;
41,981,040✔
1041
  int32_t lastRow = 0;
41,981,040✔
1042
  SArray* pRows = pSubmitTbData->aRowP;
41,958,298✔
1043
  int32_t numOfRows = taosArrayGetSize(pRows);
42,002,994✔
1044
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
41,992,276✔
1045
  TQ_NULL_GO_TO_END(pTSchema);
41,992,946✔
1046
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
41,992,946✔
1047

1048
  for (int32_t i = 0; i < numOfRows; i++) {
1,219,032,082✔
1049
    bool  buildNew = false;
1,177,175,578✔
1050
    SRow* pRow = taosArrayGetP(pRows, i);
1,177,175,578✔
1051
    TQ_NULL_GO_TO_END(pRow);
1,176,284,988✔
1052

1053
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1054
      SColVal colVal = {0};
2,147,483,647✔
1055
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647✔
1056
      PROCESS_VAL
2,147,483,647✔
1057
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647✔
1058
    }
1059

1060
    if (buildNew) {
1,169,784,765✔
1061
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
42,018,936✔
1062
    }
1063

1064
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1,169,767,365✔
1065
    TQ_NULL_GO_TO_END(pBlock);
1,176,777,481✔
1066

1067
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1,176,777,481✔
1068
            (int32_t)taosArrayGetSize(blocks));
1069

1070
    int32_t targetIdx = 0;
1,176,777,481✔
1071
    int32_t sourceIdx = 0;
1,176,777,481✔
1072
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1,176,777,481✔
1073
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647✔
1074
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1075
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647✔
1076
      SColVal          colVal = {0};
2,147,483,647✔
1077
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647✔
1078
      SET_DATA
2,147,483,647✔
1079
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647✔
1080
    }
1081

1082
    curRow++;
1,177,103,109✔
1083
  }
1084
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
41,856,504✔
1085
  if (pLastBlock != NULL) {
42,010,725✔
1086
    pLastBlock->info.rows = curRow - lastRow;
42,015,726✔
1087
  }
1088

1089
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
42,018,016✔
1090
          (int)taosArrayGetSize(blocks));
1091
END:
44,201,578✔
1092
  if (code != TSDB_CODE_SUCCESS) {
41,994,047✔
1093
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1094
  }
1095
  taosMemoryFreeClear(pTSchema);
41,981,548✔
1096
  taosMemoryFree(assigned);
41,967,934✔
1097
  return code;
41,992,979✔
1098
}
1099

1100
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
8,253✔
1101
  int32_t code = 0;
8,253✔
1102
  int32_t lino = 0;
8,253✔
1103
  void*   createReq = NULL;
8,253✔
1104
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
8,253✔
1105
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
8,253✔
1106

1107
  if (pRsp->createTableNum == 0) {
8,253✔
1108
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
4,627✔
1109
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
4,627✔
1110
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
4,627✔
1111
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
4,627✔
1112
  }
1113

1114
  uint32_t len = 0;
8,253✔
1115
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
8,253✔
1116
  TSDB_CHECK_CODE(code, lino, END);
8,253✔
1117
  createReq = taosMemoryCalloc(1, len);
8,253✔
1118
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
8,253✔
1119

1120
  SEncoder encoder = {0};
8,253✔
1121
  tEncoderInit(&encoder, createReq, len);
8,253✔
1122
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
8,253✔
1123
  tEncoderClear(&encoder);
8,253✔
1124
  TSDB_CHECK_CODE(code, lino, END);
8,253✔
1125
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
16,506✔
1126
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
16,506✔
1127
  pRsp->createTableNum++;
8,253✔
1128
  tqTrace("build create table info msg success");
8,253✔
1129

1130
END:
8,253✔
1131
  if (code != 0) {
8,253✔
1132
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1133
    taosMemoryFree(createReq);
×
1134
  }
1135
  return code;
8,253✔
1136
}
1137

1138
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
43,078,753✔
1139
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1140
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
43,078,753✔
1141
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
43,078,753✔
1142
  if (pSubmitTbData == NULL) {
43,085,466✔
1143
    return terrno;
×
1144
  }
1145
  pReader->nextBlk++;
43,085,466✔
1146

1147
  if (pSubmitTbDataRet) {
43,078,319✔
1148
    *pSubmitTbDataRet = pSubmitTbData;
43,085,230✔
1149
  }
1150

1151
  if (fetchMeta == ONLY_META) {
43,079,230✔
1152
    if (pSubmitTbData->pCreateTbReq != NULL) {
6,577✔
1153
      if (pRsp->createTableReq == NULL) {
1,915✔
1154
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
1,138✔
1155
        if (pRsp->createTableReq == NULL) {
1,138✔
1156
          return terrno;
×
1157
        }
1158
      }
1159
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
3,830✔
1160
        return terrno;
×
1161
      }
1162
      pSubmitTbData->pCreateTbReq = NULL;
1,915✔
1163
    }
1164
    return 0;
6,577✔
1165
  }
1166

1167
  int32_t sversion = pSubmitTbData->sver;
43,072,653✔
1168
  int64_t uid = pSubmitTbData->uid;
43,074,391✔
1169
  pReader->lastBlkUid = uid;
43,066,088✔
1170

1171
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
43,072,792✔
1172
  taosMemoryFreeClear(pReader->extSchema);
43,066,986✔
1173
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema, 0);
43,074,618✔
1174
  if (pReader->pSchemaWrapper == NULL) {
43,044,387✔
1175
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
199,199✔
1176
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1177
    pReader->cachedSchemaSuid = 0;
199,199✔
1178
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
199,199✔
1179
  }
1180

1181
  if (pSubmitTbData->pCreateTbReq != NULL) {
42,817,588✔
1182
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
8,253✔
1183
    if (code != 0) {
8,253✔
1184
      return code;
×
1185
    }
1186
  } else if (rawList != NULL) {
42,840,966✔
1187
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1188
      return terrno;
×
1189
    }
1190
    pReader->pSchemaWrapper = NULL;
×
1191
    return 0;
×
1192
  }
1193

1194
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
42,849,219✔
1195
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
861,125✔
1196
  } else {
1197
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
41,934,196✔
1198
  }
1199
}
1200

1201
int32_t tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList, const char* id) {
324,135✔
1202
  if (pReader != NULL) {
324,135✔
1203
    pReader->pColIdList = pColIdList;
324,135✔
1204
  }
1205
  return TSDB_CODE_SUCCESS;
324,135✔
1206
}
1207

1208
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
344,933✔
1209
  if (pReader == NULL || tbUidList == NULL) {
344,933✔
1210
    return TSDB_CODE_SUCCESS;
×
1211
  }
1212
  if (pReader->tbIdHash) {
344,933✔
1213
    taosHashClear(pReader->tbIdHash);
×
1214
  } else {
1215
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
344,933✔
1216
    if (pReader->tbIdHash == NULL) {
344,933✔
1217
      tqError("s-task:%s failed to init hash table", id);
×
1218
      return terrno;
×
1219
    }
1220
  }
1221

1222
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
9,412,155✔
1223
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
9,052,102✔
1224
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
9,042,849✔
1225
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1226
      continue;
×
1227
    }
1228
  }
1229

1230
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
344,933✔
1231
  return TSDB_CODE_SUCCESS;
344,933✔
1232
}
1233

1234
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
406,871✔
1235
  if (pReader == NULL || pTableUidList == NULL) {
406,871✔
1236
    return;
×
1237
  }
1238
  if (pReader->tbIdHash == NULL) {
406,871✔
1239
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1240
    if (pReader->tbIdHash == NULL) {
×
1241
      tqError("failed to init hash table");
×
1242
      return;
×
1243
    }
1244
  }
1245

1246
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
406,871✔
1247
  for (int i = 0; i < numOfTables; i++) {
973,470✔
1248
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
566,599✔
1249
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
566,599✔
1250
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1251
      continue;
×
1252
    }
1253
    tqDebug("%s add table uid:%" PRId64 " to hash", __func__, *pKey);
566,599✔
1254
  }
1255
}
1256

1257
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1258
  if (pReader == NULL) {
×
1259
    return false;
×
1260
  }
1261
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1262
}
1263

1264
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1265
  if (pReader == NULL) {
×
1266
    return false;
×
1267
  }
1268
  return pReader->msg.msgStr == NULL;
×
1269
}
1270

1271
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
1,032✔
1272
  if (pReader == NULL || tbUidList == NULL) {
1,032✔
1273
    return;
×
1274
  }
1275
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
2,064✔
1276
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
1,032✔
1277
    int32_t code = taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
1,032✔
1278
    if (code != 0) {
1,032✔
1279
      tqWarn("%s failed to remove table uid:%" PRId64 " from hash, msg:%s", __func__, pKey != NULL ? *pKey : 0, tstrerror(code));
344✔
1280
    }
1281
  }
1282
}
1283

1284
int32_t tqDeleteTbUidList(STQ* pTq, SArray* tbUidList) {
2,194,453✔
1285
  if (pTq == NULL) {
2,194,453✔
1286
    return 0;  // mounted vnode may have no tq
×
1287
  }
1288
  if (tbUidList == NULL) {
2,194,453✔
1289
    return TSDB_CODE_INVALID_PARA;
×
1290
  }
1291
  void*   pIter = NULL;
2,194,453✔
1292
  int32_t vgId = TD_VID(pTq->pVnode);
2,194,453✔
1293

1294
  // update the table list for each consumer handle
1295
  taosWLockLatch(&pTq->lock);
2,194,453✔
1296
  while (1) {
307,760✔
1297
    pIter = taosHashIterate(pTq->pHandle, pIter);
2,502,213✔
1298
    if (pIter == NULL) {
2,502,213✔
1299
      break;
2,194,453✔
1300
    }
1301

1302
    STqHandle* pTqHandle = (STqHandle*)pIter;
307,760✔
1303
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " delete table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
307,760✔
1304
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
307,760✔
NEW
1305
      int32_t code = qDeleteTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList);
×
UNCOV
1306
      if (code != 0) {
×
UNCOV
1307
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1308
        continue;
×
1309
      }
1310
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
307,760✔
1311
      int32_t sz = taosArrayGetSize(tbUidList);
307,760✔
1312
      for (int32_t i = 0; i < sz; i++) {
307,760✔
NEW
1313
        int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
NEW
1314
        if (tbUid &&
×
NEW
1315
            taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
NEW
1316
          tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
NEW
1317
          continue;
×
1318
        }
1319
      }
1320
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
NEW
1321
      tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1322
    }
1323
  }
1324
  taosWUnLockLatch(&pTq->lock);
2,194,453✔
1325
  return 0;
2,194,453✔
1326
}
1327

1328
static int32_t addTableListForStableTmq(STqHandle* pTqHandle, STQ* pTq, SArray* tbUidList) {
5,817✔
1329
  int     ret = qFilterTableList(pTq->pVnode, tbUidList, pTqHandle->execHandle.execTb.node,
5,817✔
1330
                      pTqHandle->execHandle.task, pTqHandle->execHandle.execTb.suid);
5,817✔
1331
  if (ret != TDB_CODE_SUCCESS) {
5,817✔
NEW
1332
    tqError("tqAddTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1333
            pTqHandle->consumerId);
NEW
1334
    return ret;
×
1335
  }
1336
  tqDebug("%s handle %s consumer:0x%" PRIx64 " add %d tables to tqReader", __func__, pTqHandle->subKey,
5,817✔
1337
          pTqHandle->consumerId, (int32_t)taosArrayGetSize(tbUidList));
1338
  tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
5,817✔
1339
  return 0;
5,817✔
1340
}
1341

1342
int32_t tqAddTbUidList(STQ* pTq, SArray* tbUidList) {
52,384,827✔
1343
  if (pTq == NULL) {
52,384,827✔
NEW
1344
    return 0;  // mounted vnode may have no tq
×
1345
  }
1346
  if (tbUidList == NULL) {
52,384,827✔
NEW
1347
    return TSDB_CODE_INVALID_PARA;
×
1348
  }
1349
  void*   pIter = NULL;
52,384,827✔
1350
  int32_t vgId = TD_VID(pTq->pVnode);
52,384,827✔
1351
  int32_t code = 0;
52,385,603✔
1352

1353
  // update the table list for each consumer handle
1354
  taosWLockLatch(&pTq->lock);
52,385,603✔
1355
  while (1) {
1,016,432✔
1356
    pIter = taosHashIterate(pTq->pHandle, pIter);
53,401,645✔
1357
    if (pIter == NULL) {
53,402,034✔
1358
      break;
52,385,602✔
1359
    }
1360

1361
    STqHandle* pTqHandle = (STqHandle*)pIter;
1,016,432✔
1362
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " add table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
1,016,432✔
1363
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,016,432✔
1364
      code = qAddTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList);
401,054✔
1365
      if (code != 0) {
401,054✔
NEW
1366
        tqError("add table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
NEW
1367
        break;
×
1368
      }
1369
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
615,378✔
1370
      code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
4,785✔
1371
      if (code != 0) {
4,785✔
NEW
1372
        tqError("add table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
NEW
1373
        break;
×
1374
      }
1375
    }
1376
  }
1377
  taosHashCancelIterate(pTq->pHandle, pIter);
52,385,602✔
1378
  taosWUnLockLatch(&pTq->lock);
52,384,114✔
1379

1380
  return code;
52,384,803✔
1381
}
1382

1383
int32_t tqUpdateTbUidList(STQ* pTq, SArray* tbUidList, SArray* cidList) {
7,420,731✔
1384
  if (pTq == NULL) {
7,420,731✔
NEW
1385
    return 0;  // mounted vnode may have no tq
×
1386
  }
1387
  if (tbUidList == NULL) {
7,420,731✔
NEW
1388
    return TSDB_CODE_INVALID_PARA;
×
1389
  }
1390
  void*   pIter = NULL;
7,420,731✔
1391
  int32_t vgId = TD_VID(pTq->pVnode);
7,420,731✔
1392
  int32_t code = 0;
7,420,731✔
1393
  // update the table list for each consumer handle
1394
  taosWLockLatch(&pTq->lock);
7,420,731✔
1395
  while (1) {
2,702✔
1396
    pIter = taosHashIterate(pTq->pHandle, pIter);
7,423,433✔
1397
    if (pIter == NULL) {
7,423,433✔
1398
      break;
7,420,731✔
1399
    }
1400

1401
    STqHandle* pTqHandle = (STqHandle*)pIter;
2,702✔
1402
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
2,702✔
1403
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,702✔
1404
      SNode* pTagCond = getTagCondNodeForQueryTmq(pTqHandle->execHandle.task);
638✔
1405
      bool ret = checkCidInTagCondition(pTagCond, cidList);
638✔
1406
      if (ret){
638✔
NEW
1407
        code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList);
×
NEW
1408
        if (code != 0) {
×
NEW
1409
          tqError("update table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
NEW
1410
          break;
×
1411
        }
1412
      }
1413
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
2,064✔
1414
      SNode* pTagCond = getTagCondNodeForStableTmq(pTqHandle->execHandle.execTb.node);
2,064✔
1415
      bool ret = checkCidInTagCondition(pTagCond, cidList);
2,064✔
1416
      if (ret){
2,064✔
1417
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
1,032✔
1418
        code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
1,032✔
1419
        if (code != 0) {
1,032✔
NEW
1420
          tqError("update table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
NEW
1421
          break;
×
1422
        }
1423
      }
1424
    }
1425
  }
1426

1427
  taosHashCancelIterate(pTq->pHandle, pIter);
7,420,731✔
1428
  taosWUnLockLatch(&pTq->lock);
7,420,731✔
1429

1430
  return code;
7,420,731✔
1431
}
1432

UNCOV
1433
static void destroySourceScanTables(void* ptr) {
×
UNCOV
1434
  SArray** pTables = ptr;
×
UNCOV
1435
  if (pTables && *pTables) {
×
UNCOV
1436
    taosArrayDestroy(*pTables);
×
UNCOV
1437
    *pTables = NULL;
×
1438
  }
UNCOV
1439
}
×
1440

UNCOV
1441
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
UNCOV
1442
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
UNCOV
1443
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
UNCOV
1444
  if (pCol1->vColId == pCol2->vColId) {
×
1445
    return 0;
×
1446
  } else if (pCol1->vColId < pCol2->vColId) {
×
UNCOV
1447
    return -1;
×
1448
  } else {
UNCOV
1449
    return 1;
×
1450
  }
1451
}
1452

UNCOV
1453
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
UNCOV
1454
  if (value) {
×
UNCOV
1455
    SSchemaWrapper* pSchemaWrapper = value;
×
1456
    tDeleteSchemaWrapper(pSchemaWrapper);
1457
  }
UNCOV
1458
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc