• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.22
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
561✔
20
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
561✔
21
    return true;
557✔
22
  }
23

24
  int16_t msgType = pHead->msgType;
4✔
25
  char*   body = pHead->body;
4✔
26
  int32_t bodyLen = pHead->bodyLen;
4✔
27

28
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
4✔
29
  int64_t  realTbSuid = 0;
4✔
30
  SDecoder dcoder = {0};
4✔
31
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
4✔
32
  int32_t  len = bodyLen - sizeof(SMsgHead);
4✔
33
  tDecoderInit(&dcoder, data, len);
4✔
34

35
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
4!
36
    SVCreateStbReq req = {0};
2✔
37
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2!
38
      goto end;
×
39
    }
40
    realTbSuid = req.suid;
2✔
41
  } else if (msgType == TDMT_VND_DROP_STB) {
2!
42
    SVDropStbReq req = {0};
×
43
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
44
      goto end;
×
45
    }
46
    realTbSuid = req.suid;
×
47
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2!
48
    SVCreateTbBatchReq req = {0};
2✔
49
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
2!
50
      goto end;
×
51
    }
52

53
    int32_t        needRebuild = 0;
2✔
54
    SVCreateTbReq* pCreateReq = NULL;
2✔
55
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
4✔
56
      pCreateReq = req.pReqs + iReq;
2✔
57
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
2!
58
        needRebuild++;
×
59
      }
60
    }
61
    if (needRebuild == 0) {
2!
62
      // do nothing
63
    } else if (needRebuild == req.nReqs) {
×
64
      realTbSuid = tbSuid;
×
65
    } else {
66
      realTbSuid = tbSuid;
×
67
      SVCreateTbBatchReq reqNew = {0};
×
68
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
69
      if (reqNew.pArray == NULL) {
×
70
        tDeleteSVCreateTbBatchReq(&req);
×
71
        goto end;
×
72
      }
73
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
74
        pCreateReq = req.pReqs + iReq;
×
75
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
76
          reqNew.nReqs++;
×
77
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
78
            taosArrayDestroy(reqNew.pArray);
×
79
            tDeleteSVCreateTbBatchReq(&req);
×
80
            goto end;
×
81
          }
82
        }
83
      }
84

85
      int     tlen = 0;
×
86
      int32_t ret = 0;
×
87
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
88
      void* buf = taosMemoryMalloc(tlen);
×
89
      if (NULL == buf) {
×
90
        taosArrayDestroy(reqNew.pArray);
×
91
        tDeleteSVCreateTbBatchReq(&req);
×
92
        goto end;
×
93
      }
94
      SEncoder coderNew = {0};
×
95
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
96
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
97
      tEncoderClear(&coderNew);
×
98
      if (ret < 0) {
×
99
        taosMemoryFree(buf);
×
100
        taosArrayDestroy(reqNew.pArray);
×
101
        tDeleteSVCreateTbBatchReq(&req);
×
102
        goto end;
×
103
      }
104
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
105
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
106
      taosMemoryFree(buf);
×
107
      taosArrayDestroy(reqNew.pArray);
×
108
    }
109

110
    tDeleteSVCreateTbBatchReq(&req);
2✔
111
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
112
    SVAlterTbReq req = {0};
×
113

114
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
115
      goto end;
×
116
    }
117

118
    SMetaReader mr = {0};
×
119
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
120

121
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
122
      metaReaderClear(&mr);
×
123
      goto end;
×
124
    }
125
    realTbSuid = mr.me.ctbEntry.suid;
×
126
    metaReaderClear(&mr);
×
127
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
128
    SVDropTbBatchReq req = {0};
×
129

130
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
131
      goto end;
×
132
    }
133

134
    int32_t      needRebuild = 0;
×
135
    SVDropTbReq* pDropReq = NULL;
×
136
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
137
      pDropReq = req.pReqs + iReq;
×
138

139
      if (pDropReq->suid == tbSuid) {
×
140
        needRebuild++;
×
141
      }
142
    }
143
    if (needRebuild == 0) {
×
144
      // do nothing
145
    } else if (needRebuild == req.nReqs) {
×
146
      realTbSuid = tbSuid;
×
147
    } else {
148
      realTbSuid = tbSuid;
×
149
      SVDropTbBatchReq reqNew = {0};
×
150
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
151
      if (reqNew.pArray == NULL) {
×
152
        goto end;
×
153
      }
154
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
155
        pDropReq = req.pReqs + iReq;
×
156
        if (pDropReq->suid == tbSuid) {
×
157
          reqNew.nReqs++;
×
158
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
159
            taosArrayDestroy(reqNew.pArray);
×
160
            goto end;
×
161
          }
162
        }
163
      }
164

165
      int     tlen = 0;
×
166
      int32_t ret = 0;
×
167
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
168
      void* buf = taosMemoryMalloc(tlen);
×
169
      if (NULL == buf) {
×
170
        taosArrayDestroy(reqNew.pArray);
×
171
        goto end;
×
172
      }
173
      SEncoder coderNew = {0};
×
174
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
175
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
176
      tEncoderClear(&coderNew);
×
177
      if (ret != 0) {
×
178
        taosMemoryFree(buf);
×
179
        taosArrayDestroy(reqNew.pArray);
×
180
        goto end;
×
181
      }
182
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
183
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
184
      taosMemoryFree(buf);
×
185
      taosArrayDestroy(reqNew.pArray);
×
186
    }
187
  } else if (msgType == TDMT_VND_DELETE) {
×
188
    SDeleteRes req = {0};
×
189
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
190
      goto end;
×
191
    }
192
    realTbSuid = req.suid;
×
193
  }
194

195
end:
×
196
  tDecoderClear(&dcoder);
4✔
197
  return tbSuid == realTbSuid;
4✔
198
}
199

200
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
71,028✔
201
  int32_t code = -1;
71,028✔
202
  int32_t vgId = TD_VID(pTq->pVnode);
71,028✔
203
  int64_t id = pHandle->pWalReader->readerId;
71,028✔
204

205
  int64_t offset = *fetchOffset;
71,028✔
206
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
71,028✔
207
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
71,054✔
208
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
71,071✔
209

210
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
71,075!
211
          ", 0x%" PRIx64,
212
          vgId, offset, lastVer, committedVer, appliedVer, id);
213

214
  while (offset <= appliedVer) {
74,962✔
215
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
67,782!
216
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
217
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
218
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
219
      goto END;
×
220
    }
221

222
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
67,777!
223
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
224

225
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
67,780✔
226
      code = walFetchBody(pHandle->pWalReader);
63,378✔
227
      goto END;
63,377✔
228
    } else {
229
      if (pHandle->fetchMeta != WITH_DATA) {
4,402✔
230
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
705✔
231
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
705✔
232
          code = walFetchBody(pHandle->pWalReader);
561✔
233
          if (code < 0) {
561!
234
            goto END;
×
235
          }
236

237
          pHead = &(pHandle->pWalReader->pHead->head);
561✔
238
          if (isValValidForTable(pHandle, pHead)) {
561✔
239
            code = 0;
558✔
240
            goto END;
558✔
241
          } else {
242
            offset++;
3✔
243
            code = -1;
3✔
244
            continue;
3✔
245
          }
246
        }
247
      }
248
      code = walSkipFetchBody(pHandle->pWalReader);
3,841✔
249
      if (code < 0) {
3,841!
250
        goto END;
×
251
      }
252
      offset++;
3,841✔
253
    }
254
    code = -1;
3,841✔
255
  }
256

257
END:
7,180✔
258
  *fetchOffset = offset;
71,115✔
259
  return code;
71,115✔
260
}
261

262
bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; }
1,641✔
263

264
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
63✔
265
  bool            ret = false;
63✔
266
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
63✔
267
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
62!
268
    ret = true;
3✔
269
  }
270
  tDeleteSchemaWrapper(schema);
271
  pReader->hasPrimaryKey = ret;
63✔
272
}
63✔
273

274
STqReader* tqReaderOpen(SVnode* pVnode) {
2,418✔
275
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
2,418✔
276
  if (pReader == NULL) {
2,431!
277
    return NULL;
×
278
  }
279

280
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
2,431✔
281
  if (pReader->pWalReader == NULL) {
2,431!
282
    taosMemoryFree(pReader);
×
283
    return NULL;
×
284
  }
285

286
  pReader->pVnodeMeta = pVnode->pMeta;
2,431✔
287
  pReader->pColIdList = NULL;
2,431✔
288
  pReader->cachedSchemaVer = 0;
2,431✔
289
  pReader->cachedSchemaSuid = 0;
2,431✔
290
  pReader->pSchemaWrapper = NULL;
2,431✔
291
  pReader->tbIdHash = NULL;
2,431✔
292
  pReader->pResBlock = NULL;
2,431✔
293

294
  int32_t code = createDataBlock(&pReader->pResBlock);
2,431✔
295
  if (code) {
2,430!
296
    terrno = code;
×
297
  }
298

299
  return pReader;
2,430✔
300
}
301

302
void tqReaderClose(STqReader* pReader) {
2,437✔
303
  if (pReader == NULL) return;
2,437✔
304

305
  // close wal reader
306
  if (pReader->pWalReader) {
2,417!
307
    walCloseReader(pReader->pWalReader);
2,417✔
308
  }
309

310
  if (pReader->pSchemaWrapper) {
2,417✔
311
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
899!
312
  }
313

314
  if (pReader->pColIdList) {
2,417✔
315
    taosArrayDestroy(pReader->pColIdList);
2,261✔
316
  }
317

318
  // free hash
319
  blockDataDestroy(pReader->pResBlock);
2,417✔
320
  taosHashCleanup(pReader->tbIdHash);
2,417✔
321
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
2,417✔
322
  taosMemoryFree(pReader);
2,417✔
323
}
324

325
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
3,755✔
326
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
3,755✔
327
    return -1;
44✔
328
  }
329
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
3,711!
330
  return 0;
3,711✔
331
}
332

333
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
43,030✔
334
  int32_t code = 0;
43,030✔
335

336
  while (1) {
1,181✔
337
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
44,211✔
338

339
    SWalCont* pCont = &pReader->pHead->head;
23,050✔
340
    int64_t   ver = pCont->version;
23,050✔
341
    if (ver > maxVer) {
23,050✔
342
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
58!
343
      return TSDB_CODE_SUCCESS;
58✔
344
    }
345

346
    if (pCont->msgType == TDMT_VND_SUBMIT) {
22,992✔
347
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
21,136✔
348
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
21,136✔
349

350
      void* data = taosMemoryMalloc(len);
21,136✔
351
      if (data == NULL) {
21,137!
352
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
353
        // retry
354
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
355
        return terrno;
×
356
      }
357

358
      (void)memcpy(data, pBody, len);
21,137✔
359
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
21,137✔
360

361
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
21,137✔
362
      if (code != 0) {
21,136!
363
        tqError("%s failed to create data submit for stream since out of memory", id);
×
364
        return code;
×
365
      }
366
    } else if (pCont->msgType == TDMT_VND_DELETE) {
1,856!
367
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
1,857✔
368
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
1,857✔
369
      EStreamType blockType = STREAM_DELETE_DATA;
1,857✔
370
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
1,857✔
371
      if (code == TSDB_CODE_SUCCESS) {
1,857!
372
        if (*pItem == NULL) {
1,857✔
373
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
1,181!
374
          // we need to continue check next data in the wal files.
375
          continue;
1,181✔
376
        } else {
377
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
676!
378
        }
379
      } else {
380
        terrno = code;
×
381
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
382
        return code;
×
383
      }
384

UNCOV
385
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
×
UNCOV
386
      void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
×
UNCOV
387
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
×
UNCOV
388
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
×
UNCOV
389
      if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
390
        if (!*pItem) {
×
391
          continue;
×
392
        } else {
UNCOV
393
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
×
394
        }
395
      } else {
396
        terrno = code;
×
397
        return code;
×
398
      }
399
    } else {
400
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
401
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
402
    }
403

404
    return code;
21,812✔
405
  }
406
}
407

408
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
343,321✔
409
  SWalReader* pWalReader = pReader->pWalReader;
343,321✔
410

411
  int64_t st = taosGetTimestampMs();
343,334✔
412
  while (1) {
507,805✔
413
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
851,139✔
414
    while (pReader->nextBlk < numOfBlocks) {
1,062,297✔
415
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
507,848✔
416
              pReader->msg.ver);
417

418
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
507,848✔
419
      if (pSubmitTbData == NULL) {
507,837!
UNCOV
420
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
421
                pReader->msg.ver);
422
        return false;
×
423
      }
424
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
507,839✔
425
        pReader->nextBlk += 1;
110✔
426
        continue;
110✔
427
      }
428
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
507,728!
429
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
296,694✔
430
        SSDataBlock* pRes = NULL;
296,694✔
431
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
296,694✔
432
        if (code == TSDB_CODE_SUCCESS) {
296,685!
433
          return true;
296,686✔
434
        }
435
      } else {
436
        pReader->nextBlk += 1;
211,070✔
437
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
211,070!
438
      }
439
    }
440

441
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
554,449✔
442
    pReader->msg.msgStr = NULL;
554,462✔
443

444
    int64_t elapsed = taosGetTimestampMs() - st;
554,460✔
445
    if (elapsed > 1000 || elapsed < 0) {
554,460!
446
      return false;
×
447
    }
448

449
    // try next message in wal file
450
    if (walNextValidMsg(pWalReader) < 0) {
554,460✔
451
      return false;
46,635✔
452
    }
453

454
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
507,801✔
455
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
507,801✔
456
    int64_t ver = pWalReader->pHead->head.version;
507,801✔
457
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
507,801!
458
      return false;
×
459
    }
460
    pReader->nextBlk = 0;
507,805✔
461
  }
462
}
463

464
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
592,883✔
465
  pReader->msg.msgStr = msgStr;
592,883✔
466
  pReader->msg.msgLen = msgLen;
592,883✔
467
  pReader->msg.ver = ver;
592,883✔
468

469
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
592,883✔
470
  SDecoder decoder = {0};
592,945✔
471

472
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
592,945✔
473
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
592,935✔
474
  if (code != 0) {
592,834!
475
    tDecoderClear(&decoder);
×
476
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
477
    return code;
×
478
  }
479

480
  tDecoderClear(&decoder);
592,834✔
481
  return 0;
592,911✔
482
}
483

484
SWalReader* tqGetWalReader(STqReader* pReader) { return pReader->pWalReader; }
395,605✔
485

486
SSDataBlock* tqGetResultBlock(STqReader* pReader) { return pReader->pResBlock; }
343,299✔
487

488
int64_t tqGetResultBlockTime(STqReader* pReader) { return pReader->lastTs; }
343,277✔
489

490
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
87,396✔
491
  if (pReader->msg.msgStr == NULL) {
87,396!
492
    return false;
×
493
  }
494

495
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
87,396✔
496
  while (pReader->nextBlk < numOfBlocks) {
100,942✔
497
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
50,623✔
498
            (pReader->nextBlk + 1), numOfBlocks, idstr);
499

500
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
50,625✔
501
    if (pSubmitTbData == NULL) {
50,625!
502
      return false;
×
503
    }
504
    if (pReader->tbIdHash == NULL) {
50,625!
505
      return true;
×
506
    }
507

508
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
50,625✔
509
    if (ret != NULL) {
50,625✔
510
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
37,082✔
511
      return true;
37,081✔
512
    } else {
513
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
13,543✔
514
              taosHashGetSize(pReader->tbIdHash), idstr);
515
    }
516

517
    pReader->nextBlk++;
13,543✔
518
  }
519

520
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
50,319✔
521
  pReader->nextBlk = 0;
50,320✔
522
  pReader->msg.msgStr = NULL;
50,320✔
523

524
  return false;
50,320✔
525
}
526

527
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
69,212✔
528
  if (pReader->msg.msgStr == NULL) return false;
69,212!
529

530
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
69,212✔
531
  while (pReader->nextBlk < blockSz) {
69,629✔
532
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
34,858✔
533
    if (pSubmitTbData == NULL) return false;
34,858!
534
    if (filterOutUids == NULL) return true;
34,858!
535

536
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
34,858✔
537
    if (ret == NULL) {
34,858✔
538
      return true;
34,448✔
539
    }
540
    pReader->nextBlk++;
410✔
541
  }
542

543
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
34,771✔
544
  pReader->nextBlk = 0;
34,781✔
545
  pReader->msg.msgStr = NULL;
34,781✔
546

547
  return false;
34,781✔
548
}
549

550
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
62,981✔
551
  int32_t code = 0;
62,981✔
552

553
  int32_t cnt = 0;
62,981✔
554
  for (int32_t i = 0; i < pSrc->nCols; i++) {
378,335✔
555
    cnt += mask[i];
315,354✔
556
  }
557

558
  pDst->nCols = cnt;
62,981✔
559
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
62,981✔
560
  if (pDst->pSchema == NULL) {
62,973!
UNCOV
561
    return TAOS_GET_TERRNO(terrno);
×
562
  }
563

564
  int32_t j = 0;
62,974✔
565
  for (int32_t i = 0; i < pSrc->nCols; i++) {
378,137✔
566
    if (mask[i]) {
315,056!
567
      pDst->pSchema[j++] = pSrc->pSchema[i];
315,065✔
568
      SColumnInfoData colInfo =
569
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
315,065✔
570
      code = blockDataAppendColInfo(pBlock, &colInfo);
315,341✔
571
      if (code != 0) {
315,172!
572
        return code;
×
573
      }
574
    }
575
  }
576
  return 0;
63,081✔
577
}
578

579
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
816✔
580
  SSDataBlock* pBlock = pReader->pResBlock;
816✔
581
  if (blockDataGetNumOfCols(pBlock) > 0) {
816✔
582
      blockDataDestroy(pBlock);
1✔
583
      int32_t code = createDataBlock(&pReader->pResBlock);
1✔
584
      if (code) {
1!
585
        return code;
×
586
      }
587
      pBlock = pReader->pResBlock;
1✔
588

589
      pBlock->info.id.uid = pReader->cachedSchemaUid;
1✔
590
      pBlock->info.version = pReader->msg.ver;
1✔
591
  }
592

593
  int32_t numOfCols = taosArrayGetSize(pColIdList);
817✔
594

595
  if (numOfCols == 0) {  // all columns are required
817!
596
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
597
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
598
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
599

600
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
601
      if (code != TSDB_CODE_SUCCESS) {
×
602
        blockDataFreeRes(pBlock);
×
603
        return terrno;
×
604
      }
605
    }
606
  } else {
607
    if (numOfCols > pSchema->nCols) {
817✔
608
      numOfCols = pSchema->nCols;
1✔
609
    }
610

611
    int32_t i = 0;
817✔
612
    int32_t j = 0;
817✔
613
    while (i < pSchema->nCols && j < numOfCols) {
4,997✔
614
      SSchema* pColSchema = &pSchema->pSchema[i];
4,178✔
615
      col_id_t colIdSchema = pColSchema->colId;
4,178✔
616

617
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
4,178✔
618
      if (pColIdNeed == NULL) {
4,178!
619
        break;
×
620
      }
621
      if (colIdSchema < *pColIdNeed) {
4,178✔
622
        i++;
448✔
623
      } else if (colIdSchema > *pColIdNeed) {
3,730!
624
        j++;
×
625
      } else {
626
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
3,730✔
627
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
3,733✔
628
        if (code != TSDB_CODE_SUCCESS) {
3,732!
629
          return -1;
×
630
        }
631
        i++;
3,732✔
632
        j++;
3,732✔
633
      }
634
    }
635
  }
636

637
  return TSDB_CODE_SUCCESS;
819✔
638
}
639

640
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
54,437,234✔
641
  int32_t code = TSDB_CODE_SUCCESS;
54,437,234✔
642

643
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
61,054,954!
644
    char val[65535 + 2] = {0};
6,408,674✔
645
    if (COL_VAL_IS_VALUE(pColVal)) {
6,408,674!
646
      if (pColVal->value.pData != NULL) {
6,627,290!
647
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
6,628,485✔
648
      }
649
      varDataSetLen(val, pColVal->value.nData);
6,627,290✔
650
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
6,627,290✔
651
    } else {
652
      colDataSetNULL(pColumnInfoData, rowIndex);
×
653
    }
654
  } else {
655
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
48,028,560✔
656
  }
657

658
  return code;
54,316,288✔
659
}
660

661
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
305,175✔
662
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
305,175✔
663
  int32_t        code = 0;
305,175✔
664
  int32_t        line = 0;
305,175✔
665
  STSchema*      pTSchema = NULL;
305,175✔
666
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
305,175✔
667
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
305,176!
668
  SSDataBlock* pBlock = pReader->pResBlock;
305,176✔
669
  *pRes = pBlock;
305,176✔
670

671
  blockDataCleanup(pBlock);
305,176✔
672

673
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
305,096✔
674
  int32_t sversion = pSubmitTbData->sver;
305,096✔
675
  int64_t suid = pSubmitTbData->suid;
305,096✔
676
  int64_t uid = pSubmitTbData->uid;
305,096✔
677
  pReader->lastTs = pSubmitTbData->ctimeMs;
305,096✔
678

679
  pBlock->info.id.uid = uid;
305,096✔
680
  pBlock->info.version = pReader->msg.ver;
305,096✔
681

682
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
305,096✔
683
      (pReader->cachedSchemaVer != sversion)) {
304,282!
684
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
811✔
685

686
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
817✔
687
    if (pReader->pSchemaWrapper == NULL) {
817✔
688
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
1!
689
             "version %d, possibly dropped table",
690
             vgId, suid, uid, pReader->cachedSchemaVer);
691
      pReader->cachedSchemaSuid = 0;
×
692
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
693
    }
694

695
    pReader->cachedSchemaUid = uid;
816✔
696
    pReader->cachedSchemaSuid = suid;
816✔
697
    pReader->cachedSchemaVer = sversion;
816✔
698

699
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
816!
700
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
701
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
702
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
703
    }
704
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
816✔
705
    TSDB_CHECK_CODE(code, line, END);
816!
706
    pBlock = pReader->pResBlock;
816✔
707
    *pRes = pBlock;
816✔
708
  }
709

710
  int32_t numOfRows = 0;
305,101✔
711
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
305,101✔
712
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
4✔
713
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
4!
714
    numOfRows = pCol->nVal;
4✔
715
  } else {
716
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
305,097✔
717
  }
718

719
  code = blockDataEnsureCapacity(pBlock, numOfRows);
305,097✔
720
  TSDB_CHECK_CODE(code, line, END);
305,097!
721
  pBlock->info.rows = numOfRows;
305,097✔
722
  int32_t colActual = blockDataGetNumOfCols(pBlock);
305,097✔
723

724
  // convert and scan one block
725
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
305,111✔
726
    SArray* pCols = pSubmitTbData->aCol;
4✔
727
    int32_t numOfCols = taosArrayGetSize(pCols);
4✔
728
    int32_t targetIdx = 0;
4✔
729
    int32_t sourceIdx = 0;
4✔
730
    while (targetIdx < colActual) {
18✔
731
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14✔
732
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
14!
733
      if (sourceIdx >= numOfCols) {
14✔
734
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
4!
735
        colDataSetNNULL(pColData, 0, numOfRows);
4!
736
        targetIdx++;
4✔
737
        continue;
4✔
738
      }
739

740
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
10✔
741
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
10!
742
      SColVal colVal = {0};
10✔
743
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
10!
744
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
745
      if (pCol->cid < pColData->info.colId) {
10✔
746
        sourceIdx++;
4✔
747
      } else if (pCol->cid == pColData->info.colId) {
6✔
748
        for (int32_t i = 0; i < pCol->nVal; i++) {
12✔
749
          tColDataGetValue(pCol, i, &colVal);
8✔
750
          code = doSetVal(pColData, i, &colVal);
8✔
751
          TSDB_CHECK_CODE(code, line, END);
8!
752
        }
753
        sourceIdx++;
4✔
754
        targetIdx++;
4✔
755
      } else {
756
        colDataSetNNULL(pColData, 0, numOfRows);
2!
757
        targetIdx++;
2✔
758
      }
759
    }
760
  } else {
761
    SArray*         pRows = pSubmitTbData->aRowP;
305,107✔
762
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
305,107✔
763
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
305,107✔
764
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
305,178✔
765

766
    for (int32_t i = 0; i < numOfRows; i++) {
16,449,644✔
767
      SRow* pRow = taosArrayGetP(pRows, i);
16,152,444✔
768
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
16,145,260!
769
      int32_t sourceIdx = 0;
16,148,030✔
770
      for (int32_t j = 0; j < colActual; j++) {
59,013,374✔
771
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
42,868,900✔
772
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
42,813,633!
773

774
        while (1) {
10,475,780✔
775
          SColVal colVal = {0};
53,289,413✔
776
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
53,289,413✔
777
          TSDB_CHECK_CODE(code, line, END);
53,261,248!
778

779
          if (colVal.cid < pColData->info.colId) {
53,261,248✔
780
            sourceIdx++;
10,475,780✔
781
            continue;
10,475,780✔
782
          } else if (colVal.cid == pColData->info.colId) {
42,785,468!
783
            code = doSetVal(pColData, i, &colVal);
42,885,172✔
784
            TSDB_CHECK_CODE(code, line, END);
42,965,048!
785
            sourceIdx++;
42,965,048✔
786
            break;
42,865,344✔
787
          } else {
788
            colDataSetNULL(pColData, i);
×
789
            break;
×
790
          }
791
        }
792
      }
793
    }
794
  }
795

796
END:
297,200✔
797
  if (code != 0) {
297,204!
798
    tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
×
799
  }
800
  taosMemoryFreeClear(pTSchema);
305,137✔
801
  return code;
305,184✔
802
}
803

804
#define PROCESS_VAL                                      \
805
  if (curRow == 0) {                                     \
806
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
807
    buildNew = true;                                     \
808
  } else {                                               \
809
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
810
    if (currentRowAssigned != assigned[j]) {             \
811
      assigned[j] = currentRowAssigned;                  \
812
      buildNew = true;                                   \
813
    }                                                    \
814
  }
815

816
#define SET_DATA                                                     \
817
  if (colVal.cid < pColData->info.colId) {                           \
818
    sourceIdx++;                                                     \
819
  } else if (colVal.cid == pColData->info.colId) {                   \
820
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
821
    sourceIdx++;                                                     \
822
    targetIdx++;                                                     \
823
  }
824

825
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
62,963✔
826
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
827
                               int32_t* lastRow) {
828
  int32_t         code = 0;
62,963✔
829
  SSchemaWrapper* pSW = NULL;
62,963✔
830
  SSDataBlock*    block = NULL;
62,963✔
831
  if (taosArrayGetSize(blocks) > 0) {
62,963!
832
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
833
    TQ_NULL_GO_TO_END(pLastBlock);
×
834
    pLastBlock->info.rows = curRow - *lastRow;
×
835
    *lastRow = curRow;
×
836
  }
837

838
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
62,968✔
839
  TQ_NULL_GO_TO_END(block);
63,000!
840

841
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
63,000✔
842
  TQ_NULL_GO_TO_END(pSW);
62,994!
843

844
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
62,994!
845
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
62,967✔
846
          (int32_t)taosArrayGetSize(block->pDataBlock));
847

848
  block->info.id.uid = pSubmitTbData->uid;
62,967✔
849
  block->info.version = pReader->msg.ver;
62,967✔
850
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
62,967!
851
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
62,993!
852
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
62,979!
853
  pSW = NULL;
62,979✔
854
  taosMemoryFreeClear(block);
62,979✔
855

856
END:
3✔
857
  tDeleteSchemaWrapper(pSW);
63,016!
858
  blockDataFreeRes(block);
63,013✔
859
  taosMemoryFree(block);
62,999✔
860
  return code;
62,996✔
861
}
862
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
113✔
863
  int32_t code = 0;
113✔
864
  int32_t curRow = 0;
113✔
865
  int32_t lastRow = 0;
113✔
866

867
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
113✔
868
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
113✔
869
  TQ_NULL_GO_TO_END(assigned);
113!
870

871
  SArray*   pCols = pSubmitTbData->aCol;
113✔
872
  SColData* pCol = taosArrayGet(pCols, 0);
113✔
873
  TQ_NULL_GO_TO_END(pCol);
113!
874
  int32_t numOfRows = pCol->nVal;
113✔
875
  int32_t numOfCols = taosArrayGetSize(pCols);
113✔
876
  for (int32_t i = 0; i < numOfRows; i++) {
298✔
877
    bool buildNew = false;
185✔
878

879
    for (int32_t j = 0; j < numOfCols; j++) {
946✔
880
      pCol = taosArrayGet(pCols, j);
761✔
881
      TQ_NULL_GO_TO_END(pCol);
761!
882
      SColVal colVal = {0};
761✔
883
      tColDataGetValue(pCol, i, &colVal);
761✔
884
      PROCESS_VAL
761!
885
    }
886

887
    if (buildNew) {
185✔
888
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
113!
889
                                       curRow, &lastRow));
890
    }
891

892
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
185✔
893
    TQ_NULL_GO_TO_END(pBlock);
185!
894

895
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
185!
896
            (int32_t)taosArrayGetSize(blocks));
897

898
    int32_t targetIdx = 0;
185✔
899
    int32_t sourceIdx = 0;
185✔
900
    int32_t colActual = blockDataGetNumOfCols(pBlock);
185✔
901
    while (targetIdx < colActual) {
945✔
902
      pCol = taosArrayGet(pCols, sourceIdx);
760✔
903
      TQ_NULL_GO_TO_END(pCol);
760!
904
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
760✔
905
      TQ_NULL_GO_TO_END(pColData);
760!
906
      SColVal colVal = {0};
760✔
907
      tColDataGetValue(pCol, i, &colVal);
760✔
908
      SET_DATA
760!
909
    }
910

911
    curRow++;
185✔
912
  }
913
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
113✔
914
  pLastBlock->info.rows = curRow - lastRow;
113✔
915

916
END:
113✔
917
  taosMemoryFree(assigned);
113✔
918
  return code;
113✔
919
}
920

921
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
62,887✔
922
  int32_t   code = 0;
62,887✔
923
  STSchema* pTSchema = NULL;
62,887✔
924

925
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
62,887✔
926
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
62,887✔
927
  TQ_NULL_GO_TO_END(assigned);
62,889!
928

929
  int32_t curRow = 0;
62,889✔
930
  int32_t lastRow = 0;
62,889✔
931
  SArray* pRows = pSubmitTbData->aRowP;
62,889✔
932
  int32_t numOfRows = taosArrayGetSize(pRows);
62,889✔
933
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
62,881✔
934

935
  for (int32_t i = 0; i < numOfRows; i++) {
2,524,458✔
936
    bool  buildNew = false;
2,461,601✔
937
    SRow* pRow = taosArrayGetP(pRows, i);
2,461,601✔
938
    TQ_NULL_GO_TO_END(pRow);
2,461,286!
939

940
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
13,861,196✔
941
      SColVal colVal = {0};
11,401,040✔
942
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
11,401,040!
943
      PROCESS_VAL
11,399,614!
944
    }
945

946
    if (buildNew) {
2,460,156✔
947
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
62,862!
948
                                       curRow, &lastRow));
949
    }
950

951
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
2,460,173✔
952
    TQ_NULL_GO_TO_END(pBlock);
2,457,254!
953

954
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
2,457,254✔
955
            (int32_t)taosArrayGetSize(blocks));
956

957
    int32_t targetIdx = 0;
2,457,254✔
958
    int32_t sourceIdx = 0;
2,457,254✔
959
    int32_t colActual = blockDataGetNumOfCols(pBlock);
2,457,254✔
960
    while (targetIdx < colActual) {
13,799,684✔
961
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
11,338,865✔
962
      SColVal          colVal = {0};
11,325,242✔
963
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
11,325,242!
964
      SET_DATA
11,336,386!
965
    }
966

967
    curRow++;
2,460,819✔
968
  }
969
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
62,857✔
970
  pLastBlock->info.rows = curRow - lastRow;
62,850✔
971

972
END:
62,850✔
973
  taosMemoryFreeClear(pTSchema);
62,850✔
974
  taosMemoryFree(assigned);
62,886✔
975
  return code;
62,874✔
976
}
977

978
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
63,036✔
979
  tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
63,036✔
980
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
63,036✔
981
  if (pSubmitTbData == NULL) {
63,034!
982
    return terrno;
×
983
  }
984
  pReader->nextBlk++;
63,034✔
985

986
  if (pSubmitTbDataRet) {
63,034!
987
    *pSubmitTbDataRet = pSubmitTbData;
63,034✔
988
  }
989

990
  int32_t sversion = pSubmitTbData->sver;
63,034✔
991
  int64_t uid = pSubmitTbData->uid;
63,034✔
992
  pReader->lastBlkUid = uid;
63,034✔
993

994
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
63,034✔
995
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
63,031✔
996
  if (pReader->pSchemaWrapper == NULL) {
63,033✔
997
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
26✔
998
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
999
    pReader->cachedSchemaSuid = 0;
22✔
1000
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
22✔
1001
  }
1002

1003
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
63,007✔
1004
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
113✔
1005
  } else {
1006
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
62,894✔
1007
  }
1008
}
1009

1010
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
2,272✔
1011

1012
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
2,315✔
1013
  if (pReader->tbIdHash) {
2,315✔
1014
    taosHashClear(pReader->tbIdHash);
10✔
1015
  } else {
1016
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
2,305✔
1017
    if (pReader->tbIdHash == NULL) {
2,306!
1018
      tqError("s-task:%s failed to init hash table", id);
×
1019
      return;
×
1020
    }
1021
  }
1022

1023
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
13,428✔
1024
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
11,105✔
1025
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
11,102!
1026
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1027
      continue;
×
1028
    }
1029
  }
1030

1031
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
2,317✔
1032
}
1033

1034
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
3,495✔
1035
  if (pReader->tbIdHash == NULL) {
3,495!
1036
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1037
    if (pReader->tbIdHash == NULL) {
×
1038
      tqError("failed to init hash table");
×
1039
      return;
×
1040
    }
1041
  }
1042

1043
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
3,495✔
1044
  for (int i = 0; i < numOfTables; i++) {
3,886✔
1045
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
391✔
1046
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
391!
1047
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1048
      continue;
×
1049
    }
1050
  }
1051
}
1052

1053
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
694✔
1054
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
694✔
1055
}
1056

1057
bool tqCurrentBlockConsumed(const STqReader* pReader) { return pReader->msg.msgStr == NULL; }
37,472✔
1058

1059
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
20✔
1060
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
28✔
1061
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
8✔
1062
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
8!
1063
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
2!
1064
    }
1065
  }
1066
}
20✔
1067

1068
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
22,781✔
1069
  void*   pIter = NULL;
22,781✔
1070
  int32_t vgId = TD_VID(pTq->pVnode);
22,781✔
1071

1072
  // update the table list for each consumer handle
1073
  taosWLockLatch(&pTq->lock);
22,781✔
1074
  while (1) {
825✔
1075
    pIter = taosHashIterate(pTq->pHandle, pIter);
23,610✔
1076
    if (pIter == NULL) {
23,609✔
1077
      break;
22,783✔
1078
    }
1079

1080
    STqHandle* pTqHandle = (STqHandle*)pIter;
826✔
1081
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
826✔
1082
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
720✔
1083
      if (code != 0) {
720!
1084
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1085
        continue;
×
1086
      }
1087
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
106✔
1088
      if (!isAdd) {
95✔
1089
        int32_t sz = taosArrayGetSize(tbUidList);
2✔
1090
        for (int32_t i = 0; i < sz; i++) {
102✔
1091
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
100✔
1092
          if (tbUid &&
200!
1093
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
100✔
1094
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1095
            continue;
×
1096
          }
1097
        }
1098
      }
1099
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
11✔
1100
      if (isAdd) {
10!
1101
        SArray* list = NULL;
10✔
1102
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
10✔
1103
                                    &list, pTqHandle->execHandle.task);
1104
        if (ret != TDB_CODE_SUCCESS) {
10!
1105
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1106
                  pTqHandle->consumerId);
1107
          taosArrayDestroy(list);
×
1108
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1109
          taosWUnLockLatch(&pTq->lock);
×
1110

1111
          return ret;
×
1112
        }
1113
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
10✔
1114
        taosArrayDestroy(list);
10✔
1115
      } else {
1116
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1117
      }
1118
    }
1119
  }
1120
  taosWUnLockLatch(&pTq->lock);
22,783✔
1121

1122
  // update the table list handle for each stream scanner/wal reader
1123
  streamMetaWLock(pTq->pStreamMeta);
22,785✔
1124
  while (1) {
5,749✔
1125
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
28,534✔
1126
    if (pIter == NULL) {
28,534✔
1127
      break;
22,784✔
1128
    }
1129

1130
    int64_t      refId = *(int64_t*)pIter;
5,750✔
1131
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
5,750✔
1132
    if (pTask != NULL) {
5,749!
1133
      int32_t taskId = pTask->id.taskId;
5,749✔
1134

1135
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
5,749✔
1136
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
2,782✔
1137
        if (code != 0) {
2,782✔
1138
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
3!
1139
        }
1140
      }
1141
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
5,749✔
1142
      if (ret) {
5,749!
1143
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1144
      }
1145
    }
1146
  }
1147

1148
  streamMetaWUnLock(pTq->pStreamMeta);
22,784✔
1149
  return 0;
22,785✔
1150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc