• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3584

17 Jan 2025 07:28AM UTC coverage: 63.756% (-0.1%) from 63.876%
#3584

push

travis-ci

web-flow
Merge pull request #29594 from taosdata/fix/insert-when-2-replicas

fix/insert-when-2-replicas

141233 of 284535 branches covered (49.64%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

684 existing lines in 111 files now uncovered.

219774 of 281695 relevant lines covered (78.02%)

18696822.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.41
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
559✔
20
  if (pHandle == NULL || pHead == NULL) {
559!
21
    return false;
×
22
  }
23
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
559✔
24
    return true;
555✔
25
  }
26

27
  int16_t msgType = pHead->msgType;
4✔
28
  char*   body = pHead->body;
4✔
29
  int32_t bodyLen = pHead->bodyLen;
4✔
30

31
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
4✔
32
  int64_t  realTbSuid = 0;
4✔
33
  SDecoder dcoder = {0};
4✔
34
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
4✔
35
  int32_t  len = bodyLen - sizeof(SMsgHead);
4✔
36
  tDecoderInit(&dcoder, data, len);
4✔
37

38
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
4!
39
    SVCreateStbReq req = {0};
2✔
40
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2!
41
      goto end;
×
42
    }
43
    realTbSuid = req.suid;
2✔
44
  } else if (msgType == TDMT_VND_DROP_STB) {
2!
45
    SVDropStbReq req = {0};
×
46
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
47
      goto end;
×
48
    }
49
    realTbSuid = req.suid;
×
50
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2!
51
    SVCreateTbBatchReq req = {0};
2✔
52
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
2!
53
      goto end;
×
54
    }
55

56
    int32_t        needRebuild = 0;
2✔
57
    SVCreateTbReq* pCreateReq = NULL;
2✔
58
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
4✔
59
      pCreateReq = req.pReqs + iReq;
2✔
60
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
2!
61
        needRebuild++;
×
62
      }
63
    }
64
    if (needRebuild == 0) {
2!
65
      // do nothing
66
    } else if (needRebuild == req.nReqs) {
×
67
      realTbSuid = tbSuid;
×
68
    } else {
69
      realTbSuid = tbSuid;
×
70
      SVCreateTbBatchReq reqNew = {0};
×
71
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
72
      if (reqNew.pArray == NULL) {
×
73
        tDeleteSVCreateTbBatchReq(&req);
×
74
        goto end;
×
75
      }
76
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
77
        pCreateReq = req.pReqs + iReq;
×
78
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
79
          reqNew.nReqs++;
×
80
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
81
            taosArrayDestroy(reqNew.pArray);
×
82
            tDeleteSVCreateTbBatchReq(&req);
×
83
            goto end;
×
84
          }
85
        }
86
      }
87

88
      int     tlen = 0;
×
89
      int32_t ret = 0;
×
90
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
91
      void* buf = taosMemoryMalloc(tlen);
×
92
      if (NULL == buf) {
×
93
        taosArrayDestroy(reqNew.pArray);
×
94
        tDeleteSVCreateTbBatchReq(&req);
×
95
        goto end;
×
96
      }
97
      SEncoder coderNew = {0};
×
98
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
99
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
100
      tEncoderClear(&coderNew);
×
101
      if (ret < 0) {
×
102
        taosMemoryFree(buf);
×
103
        taosArrayDestroy(reqNew.pArray);
×
104
        tDeleteSVCreateTbBatchReq(&req);
×
105
        goto end;
×
106
      }
107
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
108
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
109
      taosMemoryFree(buf);
×
110
      taosArrayDestroy(reqNew.pArray);
×
111
    }
112

113
    tDeleteSVCreateTbBatchReq(&req);
2✔
114
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
115
    SVAlterTbReq req = {0};
×
116

117
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
118
      goto end;
×
119
    }
120

121
    SMetaReader mr = {0};
×
122
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
123

124
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
125
      metaReaderClear(&mr);
×
126
      goto end;
×
127
    }
128
    realTbSuid = mr.me.ctbEntry.suid;
×
129
    metaReaderClear(&mr);
×
130
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
131
    SVDropTbBatchReq req = {0};
×
132

133
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
134
      goto end;
×
135
    }
136

137
    int32_t      needRebuild = 0;
×
138
    SVDropTbReq* pDropReq = NULL;
×
139
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
140
      pDropReq = req.pReqs + iReq;
×
141

142
      if (pDropReq->suid == tbSuid) {
×
143
        needRebuild++;
×
144
      }
145
    }
146
    if (needRebuild == 0) {
×
147
      // do nothing
148
    } else if (needRebuild == req.nReqs) {
×
149
      realTbSuid = tbSuid;
×
150
    } else {
151
      realTbSuid = tbSuid;
×
152
      SVDropTbBatchReq reqNew = {0};
×
153
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
154
      if (reqNew.pArray == NULL) {
×
155
        goto end;
×
156
      }
157
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
158
        pDropReq = req.pReqs + iReq;
×
159
        if (pDropReq->suid == tbSuid) {
×
160
          reqNew.nReqs++;
×
161
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
162
            taosArrayDestroy(reqNew.pArray);
×
163
            goto end;
×
164
          }
165
        }
166
      }
167

168
      int     tlen = 0;
×
169
      int32_t ret = 0;
×
170
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
171
      void* buf = taosMemoryMalloc(tlen);
×
172
      if (NULL == buf) {
×
173
        taosArrayDestroy(reqNew.pArray);
×
174
        goto end;
×
175
      }
176
      SEncoder coderNew = {0};
×
177
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
178
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
179
      tEncoderClear(&coderNew);
×
180
      if (ret != 0) {
×
181
        taosMemoryFree(buf);
×
182
        taosArrayDestroy(reqNew.pArray);
×
183
        goto end;
×
184
      }
185
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
186
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
187
      taosMemoryFree(buf);
×
188
      taosArrayDestroy(reqNew.pArray);
×
189
    }
190
  } else if (msgType == TDMT_VND_DELETE) {
×
191
    SDeleteRes req = {0};
×
192
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
193
      goto end;
×
194
    }
195
    realTbSuid = req.suid;
×
196
  }
197

198
end:
×
199
  tDecoderClear(&dcoder);
4✔
200
  bool tmp = tbSuid == realTbSuid;
4✔
201
  tqDebug("%s suid:%"PRId64" realSuid:%"PRId64" return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
4!
202
  return tmp;
4✔
203
}
204

205
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
136,097✔
206
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
136,097!
207
    return -1;
×
208
  }
209
  int32_t code = -1;
136,098✔
210
  int32_t vgId = TD_VID(pTq->pVnode);
136,098✔
211
  int64_t id = pHandle->pWalReader->readerId;
136,098✔
212

213
  int64_t offset = *fetchOffset;
136,098✔
214
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
136,098✔
215
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
136,095✔
216
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
136,094✔
217

218
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
136,094!
219
          ", 0x%" PRIx64,
220
          vgId, offset, lastVer, committedVer, appliedVer, id);
221

222
  while (offset <= appliedVer) {
145,734✔
223
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
132,021!
224
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
225
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
226
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
227
      goto END;
×
228
    }
229

230
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
132,016!
231
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
232

233
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
132,019✔
234
      code = walFetchBody(pHandle->pWalReader);
121,827✔
235
      goto END;
121,821✔
236
    } else {
237
      if (pHandle->fetchMeta != WITH_DATA) {
10,192✔
238
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
728✔
239
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
728✔
240
          code = walFetchBody(pHandle->pWalReader);
559✔
241
          if (code < 0) {
559!
242
            goto END;
×
243
          }
244

245
          pHead = &(pHandle->pWalReader->pHead->head);
559✔
246
          if (isValValidForTable(pHandle, pHead)) {
559✔
247
            code = 0;
556✔
248
            goto END;
556✔
249
          } else {
250
            offset++;
3✔
251
            code = -1;
3✔
252
            continue;
3✔
253
          }
254
        }
255
      }
256
      code = walSkipFetchBody(pHandle->pWalReader);
9,633✔
257
      if (code < 0) {
9,636!
258
        goto END;
×
259
      }
260
      offset++;
9,636✔
261
    }
262
    code = -1;
9,636✔
263
  }
264

265
END:
13,713✔
266
  *fetchOffset = offset;
136,090✔
267
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64 ", 0x%" PRIx64,
136,090✔
268
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
269
  return code;
136,096✔
270
}
271

272
bool tqGetTablePrimaryKey(STqReader* pReader) {
20,418✔
273
  if (pReader == NULL) {
20,418!
274
    return false;
×
275
  }
276
  return pReader->hasPrimaryKey;
20,418✔
277
}
278

279
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
182✔
280
  tqDebug("%s:%p uid:%"PRId64, __FUNCTION__ , pReader, uid);
182!
281

282
  if (pReader == NULL) {
182!
283
    return;
×
284
  }
285
  bool            ret = false;
182✔
286
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
182✔
287
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
182!
288
    ret = true;
3✔
289
  }
290
  tDeleteSchemaWrapper(schema);
291
  pReader->hasPrimaryKey = ret;
182✔
292
}
293

294
STqReader* tqReaderOpen(SVnode* pVnode) {
8,774✔
295
  tqDebug("%s:%p", __FUNCTION__ , pVnode);
8,774✔
296
  if (pVnode == NULL) {
8,787!
297
    return NULL;
×
298
  }
299
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
8,787!
300
  if (pReader == NULL) {
8,788!
301
    return NULL;
×
302
  }
303

304
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
8,788✔
305
  if (pReader->pWalReader == NULL) {
8,788!
306
    taosMemoryFree(pReader);
×
307
    return NULL;
×
308
  }
309

310
  pReader->pVnodeMeta = pVnode->pMeta;
8,788✔
311
  pReader->pColIdList = NULL;
8,788✔
312
  pReader->cachedSchemaVer = 0;
8,788✔
313
  pReader->cachedSchemaSuid = 0;
8,788✔
314
  pReader->pSchemaWrapper = NULL;
8,788✔
315
  pReader->tbIdHash = NULL;
8,788✔
316
  pReader->pResBlock = NULL;
8,788✔
317

318
  int32_t code = createDataBlock(&pReader->pResBlock);
8,788✔
319
  if (code) {
8,787!
320
    terrno = code;
×
321
  }
322

323
  return pReader;
8,787✔
324
}
325

326
void tqReaderClose(STqReader* pReader) {
8,700✔
327
  tqDebug("%s:%p", __FUNCTION__ , pReader);
8,700✔
328
  if (pReader == NULL) return;
8,702✔
329

330
  // close wal reader
331
  if (pReader->pWalReader) {
8,661!
332
    walCloseReader(pReader->pWalReader);
8,661✔
333
  }
334

335
  if (pReader->pSchemaWrapper) {
8,660✔
336
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,982!
337
  }
338

339
  if (pReader->pColIdList) {
8,660✔
340
    taosArrayDestroy(pReader->pColIdList);
8,352✔
341
  }
342

343
  // free hash
344
  blockDataDestroy(pReader->pResBlock);
8,661✔
345
  taosHashCleanup(pReader->tbIdHash);
8,660✔
346
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
8,660✔
347
  taosMemoryFree(pReader);
8,660!
348
}
349

350
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
4,329✔
351
  if (pReader == NULL) {
4,329!
352
    return TSDB_CODE_INVALID_PARA;
×
353
  }
354
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
4,329✔
355
    return -1;
139✔
356
  }
357
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
4,190✔
358
  return 0;
4,190✔
359
}
360

361
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
524,524✔
362
  int32_t code = 0;
524,524✔
363

364
  while (1) {
8,146✔
365
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
532,670✔
366

367
    SWalCont* pCont = &pReader->pHead->head;
460,794✔
368
    int64_t   ver = pCont->version;
460,794✔
369
    if (ver > maxVer) {
460,794✔
370
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
282✔
371
      return TSDB_CODE_SUCCESS;
282✔
372
    }
373

374
    if (pCont->msgType == TDMT_VND_SUBMIT) {
460,512✔
375
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
449,808✔
376
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
449,808✔
377

378
      void* data = taosMemoryMalloc(len);
449,808!
379
      if (data == NULL) {
449,802!
380
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
381
        // retry
382
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
383
        return terrno;
×
384
      }
385

386
      (void)memcpy(data, pBody, len);
449,802✔
387
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
449,802✔
388

389
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
449,802✔
390
      if (code != 0) {
449,806!
391
        tqError("%s failed to create data submit for stream since out of memory", id);
×
392
        return code;
×
393
      }
394
    } else if (pCont->msgType == TDMT_VND_DELETE) {
10,704✔
395
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
10,522✔
396
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
10,522✔
397
      EStreamType blockType = STREAM_DELETE_DATA;
10,522✔
398
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
10,522✔
399
      if (code == TSDB_CODE_SUCCESS) {
10,522!
400
        if (*pItem == NULL) {
10,522✔
401
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
8,146✔
402
          // we need to continue check next data in the wal files.
403
          continue;
8,146✔
404
        } else {
405
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
2,376✔
406
        }
407
      } else {
408
        terrno = code;
×
409
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
410
        return code;
×
411
      }
412

413
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
367!
414
      void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
185✔
415
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
185✔
416
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
185✔
417
      if (TSDB_CODE_SUCCESS == code) {
185!
418
        if (!*pItem) {
185!
419
          continue;
×
420
        } else {
421
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
185!
422
        }
423
      } else {
424
        terrno = code;
×
425
        return code;
×
426
      }
427
    } else {
428
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
429
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
430
    }
431

432
    return code;
452,366✔
433
  }
434
}
435

436
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
516,824✔
437
  if (pReader == NULL) {
516,824!
438
    return false;
×
439
  }
440
  SWalReader* pWalReader = pReader->pWalReader;
516,824✔
441

442
  int64_t st = taosGetTimestampMs();
516,829✔
443
  while (1) {
542,868✔
444
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
1,059,697✔
445
    while (pReader->nextBlk < numOfBlocks) {
1,352,383✔
446
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
766,304✔
447
              pReader->msg.ver);
448

449
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
766,304✔
450
      if (pSubmitTbData == NULL) {
766,299✔
451
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
1!
452
                pReader->msg.ver);
453
        return false;
×
454
      }
455
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
766,298✔
456
        pReader->nextBlk += 1;
110✔
457
        continue;
110✔
458
      }
459
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
766,184!
460
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
473,604✔
461
        SSDataBlock* pRes = NULL;
473,604✔
462
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
473,604✔
463
        if (code == TSDB_CODE_SUCCESS) {
473,608!
464
          return true;
473,612✔
465
        }
466
      } else {
467
        pReader->nextBlk += 1;
292,610✔
468
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
292,610!
469
      }
470
    }
471

472
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
586,079✔
473
    pReader->msg.msgStr = NULL;
586,080✔
474

475
    int64_t elapsed = taosGetTimestampMs() - st;
586,078✔
476
    if (elapsed > 1000 || elapsed < 0) {
586,078✔
477
      return false;
8✔
478
    }
479

480
    // try next message in wal file
481
    if (walNextValidMsg(pWalReader) < 0) {
586,070✔
482
      return false;
43,213✔
483
    }
484

485
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
542,847✔
486
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
542,847✔
487
    int64_t ver = pWalReader->pHead->head.version;
542,847✔
488
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
542,847!
489
      return false;
×
490
    }
491
    pReader->nextBlk = 0;
542,868✔
492
  }
493
}
494

495
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
1,110,145✔
496
  if (pReader == NULL) {
1,110,145!
497
    return TSDB_CODE_INVALID_PARA;
×
498
  }
499
  pReader->msg.msgStr = msgStr;
1,110,145✔
500
  pReader->msg.msgLen = msgLen;
1,110,145✔
501
  pReader->msg.ver = ver;
1,110,145✔
502

503
  tqDebug("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
1,110,145✔
504
  SDecoder decoder = {0};
1,110,194✔
505

506
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
1,110,194✔
507
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
1,110,167✔
508
  tDecoderClear(&decoder);
1,110,100✔
509

510
  if (code != 0) {
1,110,187!
511
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
512
  }
513

514
  return code;
1,110,191✔
515
}
516

517
SWalReader* tqGetWalReader(STqReader* pReader) {
566,124✔
518
  if (pReader == NULL) {
566,124!
519
    return NULL;
×
520
  }
521
  return pReader->pWalReader;
566,124✔
522
}
523

524
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
516,818✔
525
  if (pReader == NULL) {
516,818!
526
    return NULL;
×
527
  }
528
  return pReader->pResBlock;
516,818✔
529
}
530

531
int64_t tqGetResultBlockTime(STqReader* pReader) {
516,813✔
532
  if (pReader == NULL) {
516,813!
533
    return 0;
×
534
  }
535
  return pReader->lastTs;
516,813✔
536
}
537

538
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
887,953✔
539
  int32_t code = false;
887,953✔
540
  int32_t lino = 0;
887,953✔
541
  int64_t uid = 0;
887,953✔
542
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
887,953!
543
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
887,953!
544
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
887,953!
545

546
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
887,953✔
547
  while (pReader->nextBlk < blockSz) {
1,002,081✔
548
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
520,070✔
549
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
520,062!
550
    uid = pSubmitTbData->uid;
520,062✔
551
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
520,062✔
552
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
520,095✔
553

554
    tqDebug("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
114,124✔
555
    pReader->nextBlk++;
114,129✔
556
  }
557

558
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
482,011✔
559
  pReader->nextBlk = 0;
482,055✔
560
  pReader->msg.msgStr = NULL;
482,055✔
561
  tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
482,055✔
562

563
END:
56,170✔
564
  tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
888,027✔
565
  return code;
888,010✔
566
}
567

568
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
170,584✔
569
  int32_t code = false;
170,584✔
570
  int32_t lino = 0;
170,584✔
571
  int64_t uid = 0;
170,584✔
572

573
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
170,584!
574
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
170,584!
575
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
170,584!
576

577
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
170,584✔
578
  while (pReader->nextBlk < blockSz) {
170,583✔
579
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
85,335✔
580
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
85,334!
581
    uid = pSubmitTbData->uid;
85,334✔
582
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
85,334✔
583
    TSDB_CHECK_NULL(ret, code, lino, END, true);
85,334!
584
    tqDebug("iterator data block in hash continue, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
×
585
    pReader->nextBlk++;
×
586
  }
587

588
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
85,248✔
589
  pReader->nextBlk = 0;
85,263✔
590
  pReader->msg.msgStr = NULL;
85,263✔
591
  tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
85,263!
592

UNCOV
593
END:
×
594
  tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
170,598✔
595
  return code;
170,595✔
596
}
597

598
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
117,971✔
599
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
117,971!
600
    return TSDB_CODE_INVALID_PARA;
×
601
  }
602
  int32_t code = 0;
117,971✔
603

604
  int32_t cnt = 0;
117,971✔
605
  for (int32_t i = 0; i < pSrc->nCols; i++) {
703,800✔
606
    cnt += mask[i];
585,829✔
607
  }
608

609
  pDst->nCols = cnt;
117,971✔
610
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
117,971!
611
  if (pDst->pSchema == NULL) {
117,960!
612
    return TAOS_GET_TERRNO(terrno);
×
613
  }
614

615
  int32_t j = 0;
117,960✔
616
  for (int32_t i = 0; i < pSrc->nCols; i++) {
703,617✔
617
    if (mask[i]) {
585,644✔
618
      pDst->pSchema[j++] = pSrc->pSchema[i];
585,630✔
619
      SColumnInfoData colInfo =
620
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
585,630✔
621
      code = blockDataAppendColInfo(pBlock, &colInfo);
585,800✔
622
      if (code != 0) {
585,643!
623
        return code;
×
624
      }
625
    }
626
  }
627
  return 0;
117,973✔
628
}
629

630
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
1,869✔
631
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
1,869!
632
    return TSDB_CODE_INVALID_PARA;
×
633
  }
634
  SSDataBlock* pBlock = pReader->pResBlock;
1,872✔
635
  if (blockDataGetNumOfCols(pBlock) > 0) {
1,872✔
636
      blockDataDestroy(pBlock);
2✔
637
      int32_t code = createDataBlock(&pReader->pResBlock);
2✔
638
      if (code) {
2!
639
        return code;
×
640
      }
641
      pBlock = pReader->pResBlock;
2✔
642

643
      pBlock->info.id.uid = pReader->cachedSchemaUid;
2✔
644
      pBlock->info.version = pReader->msg.ver;
2✔
645
  }
646

647
  int32_t numOfCols = taosArrayGetSize(pColIdList);
1,873✔
648

649
  if (numOfCols == 0) {  // all columns are required
1,872!
650
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
651
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
652
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
653

654
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
655
      if (code != TSDB_CODE_SUCCESS) {
×
656
        blockDataFreeRes(pBlock);
×
657
        return terrno;
×
658
      }
659
    }
660
  } else {
661
    if (numOfCols > pSchema->nCols) {
1,872✔
662
      numOfCols = pSchema->nCols;
2✔
663
    }
664

665
    int32_t i = 0;
1,872✔
666
    int32_t j = 0;
1,872✔
667
    while (i < pSchema->nCols && j < numOfCols) {
14,881✔
668
      SSchema* pColSchema = &pSchema->pSchema[i];
13,010✔
669
      col_id_t colIdSchema = pColSchema->colId;
13,010✔
670

671
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
13,010✔
672
      if (pColIdNeed == NULL) {
13,015!
673
        break;
×
674
      }
675
      if (colIdSchema < *pColIdNeed) {
13,015✔
676
        i++;
1,627✔
677
      } else if (colIdSchema > *pColIdNeed) {
11,388!
678
        j++;
×
679
      } else {
680
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
11,388✔
681
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
11,408✔
682
        if (code != TSDB_CODE_SUCCESS) {
11,382!
683
          return -1;
×
684
        }
685
        i++;
11,382✔
686
        j++;
11,382✔
687
      }
688
    }
689
  }
690

691
  return TSDB_CODE_SUCCESS;
1,871✔
692
}
693

694
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
183,751,729✔
695
  int32_t code = TSDB_CODE_SUCCESS;
183,751,729✔
696

697
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
191,854,703!
698
    char val[65535 + 2] = {0};
7,694,759✔
699
    if (COL_VAL_IS_VALUE(pColVal)) {
7,694,759!
700
      if (pColVal->value.pData != NULL) {
8,109,109!
701
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
8,113,251✔
702
      }
703
      varDataSetLen(val, pColVal->value.nData);
8,109,109✔
704
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
8,109,109✔
705
    } else {
706
      colDataSetNULL(pColumnInfoData, rowIndex);
×
707
    }
708
  } else {
709
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
176,056,970✔
710
  }
711

712
  return code;
183,897,743✔
713
}
714

715
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
846,498✔
716
  if (pReader == NULL || pRes == NULL) {
846,498!
717
    return TSDB_CODE_INVALID_PARA;
×
718
  }
719
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
846,517✔
720
  int32_t        code = 0;
846,528✔
721
  int32_t        line = 0;
846,528✔
722
  STSchema*      pTSchema = NULL;
846,528✔
723
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
846,528✔
724
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
846,529!
725
  SSDataBlock* pBlock = pReader->pResBlock;
846,529✔
726
  *pRes = pBlock;
846,529✔
727

728
  blockDataCleanup(pBlock);
846,529✔
729

730
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
846,489✔
731
  int32_t sversion = pSubmitTbData->sver;
846,489✔
732
  int64_t suid = pSubmitTbData->suid;
846,489✔
733
  int64_t uid = pSubmitTbData->uid;
846,489✔
734
  pReader->lastTs = pSubmitTbData->ctimeMs;
846,489✔
735

736
  pBlock->info.id.uid = uid;
846,489✔
737
  pBlock->info.version = pReader->msg.ver;
846,489✔
738

739
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
846,489✔
740
      (pReader->cachedSchemaVer != sversion)) {
844,608!
741
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,880✔
742

743
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
1,883✔
744
    if (pReader->pSchemaWrapper == NULL) {
1,884✔
745
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
14✔
746
             "version %d, possibly dropped table",
747
             vgId, suid, uid, pReader->cachedSchemaVer);
748
      pReader->cachedSchemaSuid = 0;
11✔
749
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
11✔
750
    }
751

752
    pReader->cachedSchemaUid = uid;
1,870✔
753
    pReader->cachedSchemaSuid = suid;
1,870✔
754
    pReader->cachedSchemaVer = sversion;
1,870✔
755

756
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
1,870!
757
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
758
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
759
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
760
    }
761
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
1,870✔
762
    TSDB_CHECK_CODE(code, line, END);
1,873!
763
    pBlock = pReader->pResBlock;
1,873✔
764
    *pRes = pBlock;
1,873✔
765
  }
766

767
  int32_t numOfRows = 0;
846,482✔
768
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
846,482✔
769
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
4✔
770
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
4!
771
    numOfRows = pCol->nVal;
4✔
772
  } else {
773
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
846,478✔
774
  }
775

776
  code = blockDataEnsureCapacity(pBlock, numOfRows);
846,485✔
777
  TSDB_CHECK_CODE(code, line, END);
846,485!
778
  pBlock->info.rows = numOfRows;
846,485✔
779
  int32_t colActual = blockDataGetNumOfCols(pBlock);
846,485✔
780

781
  // convert and scan one block
782
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
846,484✔
783
    SArray* pCols = pSubmitTbData->aCol;
4✔
784
    int32_t numOfCols = taosArrayGetSize(pCols);
4✔
785
    int32_t targetIdx = 0;
4✔
786
    int32_t sourceIdx = 0;
4✔
787
    while (targetIdx < colActual) {
18✔
788
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14✔
789
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
14!
790
      if (sourceIdx >= numOfCols) {
14✔
791
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
4!
792
        colDataSetNNULL(pColData, 0, numOfRows);
4!
793
        targetIdx++;
4✔
794
        continue;
4✔
795
      }
796

797
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
10✔
798
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
10!
799
      SColVal colVal = {0};
10✔
800
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
10!
801
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
802
      if (pCol->cid < pColData->info.colId) {
10✔
803
        sourceIdx++;
4✔
804
      } else if (pCol->cid == pColData->info.colId) {
6✔
805
        for (int32_t i = 0; i < pCol->nVal; i++) {
12✔
806
          code = tColDataGetValue(pCol, i, &colVal);
8✔
807
          TSDB_CHECK_CODE(code, line, END);
8!
808
          code = doSetVal(pColData, i, &colVal);
8✔
809
          TSDB_CHECK_CODE(code, line, END);
8!
810
        }
811
        sourceIdx++;
4✔
812
        targetIdx++;
4✔
813
      } else {
814
        colDataSetNNULL(pColData, 0, numOfRows);
2!
815
        targetIdx++;
2✔
816
      }
817
    }
818
  } else {
819
    SArray*         pRows = pSubmitTbData->aRowP;
846,480✔
820
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
846,480✔
821
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
846,480✔
822
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
846,501!
823

824
    for (int32_t i = 0; i < numOfRows; i++) {
62,484,365✔
825
      SRow* pRow = taosArrayGetP(pRows, i);
62,069,524✔
826
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
62,038,451✔
827
      int32_t sourceIdx = 0;
62,026,940✔
828
      for (int32_t j = 0; j < colActual; j++) {
230,261,741✔
829
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
168,623,877✔
830
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
168,570,334!
831

832
        while (1) {
19,975,451✔
833
          SColVal colVal = {0};
188,545,785✔
834
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
188,545,785✔
835
          TSDB_CHECK_CODE(code, line, END);
188,083,050!
836

837
          if (colVal.cid < pColData->info.colId) {
188,083,050✔
838
            sourceIdx++;
19,975,451✔
839
            continue;
19,975,451✔
840
          } else if (colVal.cid == pColData->info.colId) {
168,107,599!
841
            code = doSetVal(pColData, i, &colVal);
168,774,402✔
842
            TSDB_CHECK_CODE(code, line, END);
168,901,604!
843
            sourceIdx++;
168,901,604✔
844
            break;
168,234,801✔
845
          } else {
846
            colDataSetNULL(pColData, i);
×
847
            break;
×
848
          }
849
        }
850
      }
851
    }
852
  }
853

854
END:
414,841✔
855
  if (code != 0) {
414,845!
856
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
857
  }
858
  taosMemoryFreeClear(pTSchema);
846,476!
859
  return code;
846,506✔
860
}
861

862
#define PROCESS_VAL                                      \
863
  if (curRow == 0) {                                     \
864
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
865
    buildNew = true;                                     \
866
  } else {                                               \
867
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
868
    if (currentRowAssigned != assigned[j]) {             \
869
      assigned[j] = currentRowAssigned;                  \
870
      buildNew = true;                                   \
871
    }                                                    \
872
  }
873

874
#define SET_DATA                                                     \
875
  if (colVal.cid < pColData->info.colId) {                           \
876
    sourceIdx++;                                                     \
877
  } else if (colVal.cid == pColData->info.colId) {                   \
878
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
879
    sourceIdx++;                                                     \
880
    targetIdx++;                                                     \
881
  }
882

883
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
117,961✔
884
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
885
                               int32_t* lastRow) {
886
  int32_t         code = 0;
117,961✔
887
  SSchemaWrapper* pSW = NULL;
117,961✔
888
  SSDataBlock*    block = NULL;
117,961✔
889
  if (taosArrayGetSize(blocks) > 0) {
117,961!
890
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
891
    TQ_NULL_GO_TO_END(pLastBlock);
×
892
    pLastBlock->info.rows = curRow - *lastRow;
×
893
    *lastRow = curRow;
×
894
  }
895

896
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
117,967!
897
  TQ_NULL_GO_TO_END(block);
117,974!
898

899
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
117,974!
900
  TQ_NULL_GO_TO_END(pSW);
117,972!
901

902
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
117,972!
903
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
117,964!
904
          (int32_t)taosArrayGetSize(block->pDataBlock));
905

906
  block->info.id.uid = pSubmitTbData->uid;
117,964✔
907
  block->info.version = pReader->msg.ver;
117,964✔
908
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
117,964!
909
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
117,952!
910
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
117,944!
911
  pSW = NULL;
117,944✔
912
  taosMemoryFreeClear(block);
117,944!
913

UNCOV
914
END:
×
915
  if (code != 0) {
117,970!
916
    tqError("processBuildNew failed, code:%d", code);
×
917
  }
918
  tDeleteSchemaWrapper(pSW);
117,970!
919
  blockDataFreeRes(block);
117,964✔
920
  taosMemoryFree(block);
117,958!
921
  return code;
117,963✔
922
}
923
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
113✔
924
  int32_t code = 0;
113✔
925
  int32_t curRow = 0;
113✔
926
  int32_t lastRow = 0;
113✔
927

928
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
113✔
929
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
113!
930
  TQ_NULL_GO_TO_END(assigned);
113!
931

932
  SArray*   pCols = pSubmitTbData->aCol;
113✔
933
  SColData* pCol = taosArrayGet(pCols, 0);
113✔
934
  TQ_NULL_GO_TO_END(pCol);
113!
935
  int32_t numOfRows = pCol->nVal;
113✔
936
  int32_t numOfCols = taosArrayGetSize(pCols);
113✔
937
  tqDebug("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
113!
938
  for (int32_t i = 0; i < numOfRows; i++) {
298✔
939
    bool buildNew = false;
185✔
940

941
    for (int32_t j = 0; j < numOfCols; j++) {
947✔
942
      pCol = taosArrayGet(pCols, j);
762✔
943
      TQ_NULL_GO_TO_END(pCol);
762!
944
      SColVal colVal = {0};
762✔
945
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
762!
946
      PROCESS_VAL
762!
947
    }
948

949
    if (buildNew) {
185✔
950
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
113!
951
                                       curRow, &lastRow));
952
    }
953

954
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
185✔
955
    TQ_NULL_GO_TO_END(pBlock);
185!
956

957
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
185!
958
            (int32_t)taosArrayGetSize(blocks));
959

960
    int32_t targetIdx = 0;
185✔
961
    int32_t sourceIdx = 0;
185✔
962
    int32_t colActual = blockDataGetNumOfCols(pBlock);
185✔
963
    while (targetIdx < colActual) {
945✔
964
      pCol = taosArrayGet(pCols, sourceIdx);
760✔
965
      TQ_NULL_GO_TO_END(pCol);
760!
966
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
760✔
967
      TQ_NULL_GO_TO_END(pColData);
760!
968
      SColVal colVal = {0};
760✔
969
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
760!
970
      SET_DATA
760!
971
    }
972

973
    curRow++;
185✔
974
  }
975
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
113✔
976
  pLastBlock->info.rows = curRow - lastRow;
113✔
977
  tqDebug("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
113!
978
END:
×
979
  if (code != TSDB_CODE_SUCCESS) {
113!
980
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
981
  }
982
  taosMemoryFree(assigned);
113!
983
  return code;
113✔
984
}
985

986
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
117,837✔
987
  int32_t   code = 0;
117,837✔
988
  STSchema* pTSchema = NULL;
117,837✔
989

990
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
117,837✔
991
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
117,837!
992
  TQ_NULL_GO_TO_END(assigned);
117,841!
993

994
  int32_t curRow = 0;
117,841✔
995
  int32_t lastRow = 0;
117,841✔
996
  SArray* pRows = pSubmitTbData->aRowP;
117,841✔
997
  int32_t numOfRows = taosArrayGetSize(pRows);
117,841✔
998
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
117,839✔
999
  TQ_NULL_GO_TO_END(pTSchema);
117,836!
1000
  tqDebug("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
117,836✔
1001

1002
  for (int32_t i = 0; i < numOfRows; i++) {
3,508,765✔
1003
    bool  buildNew = false;
3,379,757✔
1004
    SRow* pRow = taosArrayGetP(pRows, i);
3,379,757✔
1005
    TQ_NULL_GO_TO_END(pRow);
3,375,838!
1006

1007
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
18,339,628✔
1008
      SColVal colVal = {0};
14,980,780✔
1009
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
14,980,780!
1010
      PROCESS_VAL
14,965,023!
1011
    }
1012

1013
    if (buildNew) {
3,358,848✔
1014
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
117,848!
1015
                                       curRow, &lastRow));
1016
    }
1017

1018
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
3,358,849✔
1019
    TQ_NULL_GO_TO_END(pBlock);
3,377,234!
1020

1021
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
3,377,234!
1022
            (int32_t)taosArrayGetSize(blocks));
1023

1024
    int32_t targetIdx = 0;
3,377,234✔
1025
    int32_t sourceIdx = 0;
3,377,234✔
1026
    int32_t colActual = blockDataGetNumOfCols(pBlock);
3,377,234✔
1027
    while (targetIdx < colActual) {
18,361,175✔
1028
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14,970,273✔
1029
      SColVal          colVal = {0};
14,958,627✔
1030
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
14,958,627!
1031
      SET_DATA
14,905,827!
1032
    }
1033

1034
    curRow++;
3,390,902✔
1035
  }
1036
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
129,008✔
1037
  pLastBlock->info.rows = curRow - lastRow;
117,799✔
1038

1039
  tqDebug("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks));
117,799!
1040
END:
×
1041
  if (code != TSDB_CODE_SUCCESS) {
117,863!
1042
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1043
  }
1044
  taosMemoryFreeClear(pTSchema);
117,863!
1045
  taosMemoryFree(assigned);
117,863!
1046
  return code;
117,861✔
1047
}
1048

1049
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
118,390✔
1050
  tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
118,390!
1051
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
118,396✔
1052
  if (pSubmitTbData == NULL) {
118,396!
1053
    return terrno;
×
1054
  }
1055
  pReader->nextBlk++;
118,396✔
1056

1057
  if (pSubmitTbDataRet) {
118,396!
1058
    *pSubmitTbDataRet = pSubmitTbData;
118,396✔
1059
  }
1060

1061
  int32_t sversion = pSubmitTbData->sver;
118,396✔
1062
  int64_t uid = pSubmitTbData->uid;
118,396✔
1063
  pReader->lastBlkUid = uid;
118,396✔
1064

1065
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
118,396✔
1066
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
118,396✔
1067
  if (pReader->pSchemaWrapper == NULL) {
118,385✔
1068
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
438✔
1069
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1070
    pReader->cachedSchemaSuid = 0;
420✔
1071
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
420✔
1072
  }
1073

1074
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
117,947✔
1075
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
113✔
1076
  } else {
1077
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
117,834✔
1078
  }
1079
}
1080

1081
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) {
8,480✔
1082
  if (pReader == NULL){
8,480!
1083
    return;
×
1084
  }
1085
  pReader->pColIdList = pColIdList;
8,480✔
1086
}
1087

1088
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
8,558✔
1089
  if (pReader == NULL || tbUidList == NULL) {
8,558!
1090
    return;
×
1091
  }
1092
  if (pReader->tbIdHash) {
8,558✔
1093
    taosHashClear(pReader->tbIdHash);
15✔
1094
  } else {
1095
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
8,543✔
1096
    if (pReader->tbIdHash == NULL) {
8,547!
1097
      tqError("s-task:%s failed to init hash table", id);
×
1098
      return;
×
1099
    }
1100
  }
1101

1102
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
303,401✔
1103
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
294,835✔
1104
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
294,831!
1105
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1106
      continue;
×
1107
    }
1108
  }
1109

1110
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
8,557✔
1111
}
1112

1113
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
20,622✔
1114
  if (pReader == NULL || pTableUidList == NULL) {
20,622!
1115
    return;
×
1116
  }
1117
  if (pReader->tbIdHash == NULL) {
20,623!
1118
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1119
    if (pReader->tbIdHash == NULL) {
×
1120
      tqError("failed to init hash table");
×
1121
      return;
×
1122
    }
1123
  }
1124

1125
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
20,623✔
1126
  for (int i = 0; i < numOfTables; i++) {
21,581✔
1127
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
959✔
1128
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
959!
1129
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1130
      continue;
×
1131
    }
1132
  }
1133
}
1134

1135
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
2,397✔
1136
  if (pReader == NULL) {
2,397!
1137
    return false;
×
1138
  }
1139
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
2,397✔
1140
}
1141

1142
bool tqCurrentBlockConsumed(const STqReader* pReader) {
846,664✔
1143
  if (pReader == NULL) {
846,664!
1144
    return false;
×
1145
  }
1146
  return pReader->msg.msgStr == NULL;
846,664✔
1147
}
1148

1149
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
766✔
1150
  if (pReader == NULL || tbUidList == NULL) {
766!
1151
    return;
×
1152
  }
1153
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
770✔
1154
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
4✔
1155
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
4!
1156
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
1!
1157
    }
1158
  }
1159
}
1160

1161
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
108,796✔
1162
  if (pTq == NULL || tbUidList == NULL) {
108,796!
1163
    return TSDB_CODE_INVALID_PARA;
×
1164
  }
1165
  void*   pIter = NULL;
108,797✔
1166
  int32_t vgId = TD_VID(pTq->pVnode);
108,797✔
1167

1168
  // update the table list for each consumer handle
1169
  taosWLockLatch(&pTq->lock);
108,797✔
1170
  while (1) {
4,117✔
1171
    pIter = taosHashIterate(pTq->pHandle, pIter);
112,916✔
1172
    if (pIter == NULL) {
112,914✔
1173
      break;
108,798✔
1174
    }
1175

1176
    STqHandle* pTqHandle = (STqHandle*)pIter;
4,116✔
1177
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,116✔
1178
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
1,005✔
1179
      if (code != 0) {
1,005!
1180
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1181
        continue;
×
1182
      }
1183
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
3,111✔
1184
      if (!isAdd) {
3,097✔
1185
        int32_t sz = taosArrayGetSize(tbUidList);
1,004✔
1186
        for (int32_t i = 0; i < sz; i++) {
1,004!
1187
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1188
          if (tbUid &&
×
1189
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1190
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1191
            continue;
×
1192
          }
1193
        }
1194
      }
1195
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
14!
1196
      if (isAdd) {
15!
1197
        SArray* list = NULL;
15✔
1198
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
15✔
1199
                                    &list, pTqHandle->execHandle.task);
1200
        if (ret != TDB_CODE_SUCCESS) {
15!
1201
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1202
                  pTqHandle->consumerId);
1203
          taosArrayDestroy(list);
×
1204
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1205
          taosWUnLockLatch(&pTq->lock);
×
1206

1207
          return ret;
×
1208
        }
1209
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
15✔
1210
        taosArrayDestroy(list);
15✔
1211
      } else {
1212
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1213
      }
1214
    }
1215
  }
1216
  taosWUnLockLatch(&pTq->lock);
108,798✔
1217

1218
  // update the table list handle for each stream scanner/wal reader
1219
  streamMetaWLock(pTq->pStreamMeta);
108,797✔
1220
  while (1) {
41,955✔
1221
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
150,752✔
1222
    if (pIter == NULL) {
150,749✔
1223
      break;
108,799✔
1224
    }
1225

1226
    int64_t      refId = *(int64_t*)pIter;
41,950✔
1227
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
41,950✔
1228
    if (pTask != NULL) {
41,955!
1229
      int32_t taskId = pTask->id.taskId;
41,955✔
1230

1231
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
41,955✔
1232
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
20,431✔
1233
        if (code != 0) {
20,429✔
1234
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
65!
1235
        }
1236
      }
1237
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
41,953✔
1238
      if (ret) {
41,955!
1239
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1240
      }
1241
    }
1242
  }
1243

1244
  streamMetaWUnLock(pTq->pStreamMeta);
108,799✔
1245
  return 0;
108,799✔
1246
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc