• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.49
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
560✔
20
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
560✔
21
    return true;
556✔
22
  }
23

24
  int16_t msgType = pHead->msgType;
4✔
25
  char*   body = pHead->body;
4✔
26
  int32_t bodyLen = pHead->bodyLen;
4✔
27

28
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
4✔
29
  int64_t  realTbSuid = 0;
4✔
30
  SDecoder dcoder = {0};
4✔
31
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
4✔
32
  int32_t  len = bodyLen - sizeof(SMsgHead);
4✔
33
  tDecoderInit(&dcoder, data, len);
4✔
34

35
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
4!
36
    SVCreateStbReq req = {0};
2✔
37
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2!
38
      goto end;
×
39
    }
40
    realTbSuid = req.suid;
2✔
41
  } else if (msgType == TDMT_VND_DROP_STB) {
2!
42
    SVDropStbReq req = {0};
×
43
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
44
      goto end;
×
45
    }
46
    realTbSuid = req.suid;
×
47
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2!
48
    SVCreateTbBatchReq req = {0};
2✔
49
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
2!
50
      goto end;
×
51
    }
52

53
    int32_t        needRebuild = 0;
2✔
54
    SVCreateTbReq* pCreateReq = NULL;
2✔
55
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
4✔
56
      pCreateReq = req.pReqs + iReq;
2✔
57
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
2!
58
        needRebuild++;
×
59
      }
60
    }
61
    if (needRebuild == 0) {
2!
62
      // do nothing
63
    } else if (needRebuild == req.nReqs) {
×
64
      realTbSuid = tbSuid;
×
65
    } else {
66
      realTbSuid = tbSuid;
×
67
      SVCreateTbBatchReq reqNew = {0};
×
68
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
69
      if (reqNew.pArray == NULL) {
×
70
        tDeleteSVCreateTbBatchReq(&req);
×
71
        goto end;
×
72
      }
73
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
74
        pCreateReq = req.pReqs + iReq;
×
75
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
76
          reqNew.nReqs++;
×
77
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
78
            taosArrayDestroy(reqNew.pArray);
×
79
            tDeleteSVCreateTbBatchReq(&req);
×
80
            goto end;
×
81
          }
82
        }
83
      }
84

85
      int     tlen = 0;
×
86
      int32_t ret = 0;
×
87
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
88
      void* buf = taosMemoryMalloc(tlen);
×
89
      if (NULL == buf) {
×
90
        taosArrayDestroy(reqNew.pArray);
×
91
        tDeleteSVCreateTbBatchReq(&req);
×
92
        goto end;
×
93
      }
94
      SEncoder coderNew = {0};
×
95
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
96
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
97
      tEncoderClear(&coderNew);
×
98
      if (ret < 0) {
×
99
        taosMemoryFree(buf);
×
100
        taosArrayDestroy(reqNew.pArray);
×
101
        tDeleteSVCreateTbBatchReq(&req);
×
102
        goto end;
×
103
      }
104
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
105
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
106
      taosMemoryFree(buf);
×
107
      taosArrayDestroy(reqNew.pArray);
×
108
    }
109

110
    tDeleteSVCreateTbBatchReq(&req);
2✔
111
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
112
    SVAlterTbReq req = {0};
×
113

114
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
115
      goto end;
×
116
    }
117

118
    SMetaReader mr = {0};
×
119
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
120

121
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
122
      metaReaderClear(&mr);
×
123
      goto end;
×
124
    }
125
    realTbSuid = mr.me.ctbEntry.suid;
×
126
    metaReaderClear(&mr);
×
127
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
128
    SVDropTbBatchReq req = {0};
×
129

130
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
131
      goto end;
×
132
    }
133

134
    int32_t      needRebuild = 0;
×
135
    SVDropTbReq* pDropReq = NULL;
×
136
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
137
      pDropReq = req.pReqs + iReq;
×
138

139
      if (pDropReq->suid == tbSuid) {
×
140
        needRebuild++;
×
141
      }
142
    }
143
    if (needRebuild == 0) {
×
144
      // do nothing
145
    } else if (needRebuild == req.nReqs) {
×
146
      realTbSuid = tbSuid;
×
147
    } else {
148
      realTbSuid = tbSuid;
×
149
      SVDropTbBatchReq reqNew = {0};
×
150
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
151
      if (reqNew.pArray == NULL) {
×
152
        goto end;
×
153
      }
154
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
155
        pDropReq = req.pReqs + iReq;
×
156
        if (pDropReq->suid == tbSuid) {
×
157
          reqNew.nReqs++;
×
158
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
159
            taosArrayDestroy(reqNew.pArray);
×
160
            goto end;
×
161
          }
162
        }
163
      }
164

165
      int     tlen = 0;
×
166
      int32_t ret = 0;
×
167
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
168
      void* buf = taosMemoryMalloc(tlen);
×
169
      if (NULL == buf) {
×
170
        taosArrayDestroy(reqNew.pArray);
×
171
        goto end;
×
172
      }
173
      SEncoder coderNew = {0};
×
174
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
175
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
176
      tEncoderClear(&coderNew);
×
177
      if (ret != 0) {
×
178
        taosMemoryFree(buf);
×
179
        taosArrayDestroy(reqNew.pArray);
×
180
        goto end;
×
181
      }
182
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
183
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
184
      taosMemoryFree(buf);
×
185
      taosArrayDestroy(reqNew.pArray);
×
186
    }
187
  } else if (msgType == TDMT_VND_DELETE) {
×
188
    SDeleteRes req = {0};
×
189
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
190
      goto end;
×
191
    }
192
    realTbSuid = req.suid;
×
193
  }
194

195
end:
×
196
  tDecoderClear(&dcoder);
4✔
197
  return tbSuid == realTbSuid;
4✔
198
}
199

200
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
133,220✔
201
  int32_t code = -1;
133,220✔
202
  int32_t vgId = TD_VID(pTq->pVnode);
133,220✔
203
  int64_t id = pHandle->pWalReader->readerId;
133,220✔
204

205
  int64_t offset = *fetchOffset;
133,220✔
206
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
133,220✔
207
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
133,269✔
208
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
133,317✔
209

210
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
133,337!
211
          ", 0x%" PRIx64,
212
          vgId, offset, lastVer, committedVer, appliedVer, id);
213

214
  while (offset <= appliedVer) {
143,363✔
215
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
131,910!
216
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
217
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
218
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
219
      goto END;
×
220
    }
221

222
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
131,897!
223
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
224

225
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
131,908✔
226
      code = walFetchBody(pHandle->pWalReader);
121,487✔
227
      goto END;
121,476✔
228
    } else {
229
      if (pHandle->fetchMeta != WITH_DATA) {
10,421✔
230
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
709✔
231
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
709✔
232
          code = walFetchBody(pHandle->pWalReader);
560✔
233
          if (code < 0) {
560!
234
            goto END;
×
235
          }
236

237
          pHead = &(pHandle->pWalReader->pHead->head);
560✔
238
          if (isValValidForTable(pHandle, pHead)) {
560✔
239
            code = 0;
557✔
240
            goto END;
557✔
241
          } else {
242
            offset++;
3✔
243
            code = -1;
3✔
244
            continue;
3✔
245
          }
246
        }
247
      }
248
      code = walSkipFetchBody(pHandle->pWalReader);
9,861✔
249
      if (code < 0) {
9,860!
250
        goto END;
×
251
      }
252
      offset++;
9,860✔
253
    }
254
    code = -1;
9,860✔
255
  }
256

257
END:
11,453✔
258
  *fetchOffset = offset;
133,486✔
259
  return code;
133,486✔
260
}
261

262
bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; }
20,379✔
263

264
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
197✔
265
  bool            ret = false;
197✔
266
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
197✔
267
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
197!
268
    ret = true;
3✔
269
  }
270
  tDeleteSchemaWrapper(schema);
271
  pReader->hasPrimaryKey = ret;
197✔
272
}
197✔
273

274
STqReader* tqReaderOpen(SVnode* pVnode) {
5,671✔
275
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
5,671✔
276
  if (pReader == NULL) {
5,689!
277
    return NULL;
×
278
  }
279

280
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
5,689✔
281
  if (pReader->pWalReader == NULL) {
5,689!
282
    taosMemoryFree(pReader);
×
283
    return NULL;
×
284
  }
285

286
  pReader->pVnodeMeta = pVnode->pMeta;
5,689✔
287
  pReader->pColIdList = NULL;
5,689✔
288
  pReader->cachedSchemaVer = 0;
5,689✔
289
  pReader->cachedSchemaSuid = 0;
5,689✔
290
  pReader->pSchemaWrapper = NULL;
5,689✔
291
  pReader->tbIdHash = NULL;
5,689✔
292
  pReader->pResBlock = NULL;
5,689✔
293

294
  int32_t code = createDataBlock(&pReader->pResBlock);
5,689✔
295
  if (code) {
5,688!
296
    terrno = code;
×
297
  }
298

299
  return pReader;
5,688✔
300
}
301

302
void tqReaderClose(STqReader* pReader) {
5,706✔
303
  if (pReader == NULL) return;
5,706✔
304

305
  // close wal reader
306
  if (pReader->pWalReader) {
5,627!
307
    walCloseReader(pReader->pWalReader);
5,627✔
308
  }
309

310
  if (pReader->pSchemaWrapper) {
5,627✔
311
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,403!
312
  }
313

314
  if (pReader->pColIdList) {
5,627✔
315
    taosArrayDestroy(pReader->pColIdList);
5,303✔
316
  }
317

318
  // free hash
319
  blockDataDestroy(pReader->pResBlock);
5,627✔
320
  taosHashCleanup(pReader->tbIdHash);
5,627✔
321
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
5,627✔
322
  taosMemoryFree(pReader);
5,627✔
323
}
324

325
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
4,709✔
326
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
4,709✔
327
    return -1;
137✔
328
  }
329
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
4,571!
330
  return 0;
4,573✔
331
}
332

333
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
117,909✔
334
  int32_t code = 0;
117,909✔
335

336
  while (1) {
1,250✔
337
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
119,159✔
338

339
    SWalCont* pCont = &pReader->pHead->head;
61,690✔
340
    int64_t   ver = pCont->version;
61,690✔
341
    if (ver > maxVer) {
61,690✔
342
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
207!
343
      return TSDB_CODE_SUCCESS;
207✔
344
    }
345

346
    if (pCont->msgType == TDMT_VND_SUBMIT) {
61,483✔
347
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
58,836✔
348
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
58,836✔
349

350
      void* data = taosMemoryMalloc(len);
58,836✔
351
      if (data == NULL) {
58,835!
352
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
353
        // retry
354
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
355
        return terrno;
×
356
      }
357

358
      (void)memcpy(data, pBody, len);
58,835✔
359
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
58,835✔
360

361
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
58,835✔
362
      if (code != 0) {
58,836!
363
        tqError("%s failed to create data submit for stream since out of memory", id);
×
364
        return code;
×
365
      }
366
    } else if (pCont->msgType == TDMT_VND_DELETE) {
2,647✔
367
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
2,464✔
368
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
2,464✔
369
      EStreamType blockType = STREAM_DELETE_DATA;
2,464✔
370
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
2,464✔
371
      if (code == TSDB_CODE_SUCCESS) {
2,464!
372
        if (*pItem == NULL) {
2,464✔
373
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
1,250!
374
          // we need to continue check next data in the wal files.
375
          continue;
1,250✔
376
        } else {
377
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
1,214!
378
        }
379
      } else {
380
        terrno = code;
×
381
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
382
        return code;
×
383
      }
384

385
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
366!
386
      void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
183✔
387
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
183✔
388
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
183✔
389
      if (TSDB_CODE_SUCCESS == code) {
183!
390
        if (!*pItem) {
183!
391
          continue;
×
392
        } else {
393
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
183!
394
        }
395
      } else {
396
        terrno = code;
×
397
        return code;
×
398
      }
399
    } else {
400
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
401
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
402
    }
403

404
    return code;
60,233✔
405
  }
406
}
407

408
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
625,204✔
409
  SWalReader* pWalReader = pReader->pWalReader;
625,204✔
410

411
  int64_t st = taosGetTimestampMs();
625,209✔
412
  while (1) {
556,597✔
413
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
1,181,806✔
414
    while (pReader->nextBlk < numOfBlocks) {
1,416,106✔
415
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
796,138✔
416
              pReader->msg.ver);
417

418
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
796,138✔
419
      if (pSubmitTbData == NULL) {
796,133✔
420
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
3!
421
                pReader->msg.ver);
422
        return false;
×
423
      }
424
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
796,130✔
425
        pReader->nextBlk += 1;
110✔
426
        continue;
110✔
427
      }
428
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
796,020!
429
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
561,824✔
430
        SSDataBlock* pRes = NULL;
561,824✔
431
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
561,824✔
432
        if (code == TSDB_CODE_SUCCESS) {
561,830!
433
          return true;
561,830✔
434
        }
435
      } else {
436
        pReader->nextBlk += 1;
234,233✔
437
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
234,233!
438
      }
439
    }
440

441
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
619,968✔
442
    pReader->msg.msgStr = NULL;
619,993✔
443

444
    int64_t elapsed = taosGetTimestampMs() - st;
619,987✔
445
    if (elapsed > 1000 || elapsed < 0) {
619,987!
446
      return false;
×
447
    }
448

449
    // try next message in wal file
450
    if (walNextValidMsg(pWalReader) < 0) {
619,989✔
451
      return false;
63,377✔
452
    }
453

454
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
556,572✔
455
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
556,572✔
456
    int64_t ver = pWalReader->pHead->head.version;
556,572✔
457
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
556,572!
458
      return false;
×
459
    }
460
    pReader->nextBlk = 0;
556,597✔
461
  }
462
}
463

464
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
737,486✔
465
  pReader->msg.msgStr = msgStr;
737,486✔
466
  pReader->msg.msgLen = msgLen;
737,486✔
467
  pReader->msg.ver = ver;
737,486✔
468

469
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
737,486✔
470
  SDecoder decoder = {0};
737,556✔
471

472
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
737,556✔
473
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
737,547✔
474
  if (code != 0) {
737,430!
475
    tDecoderClear(&decoder);
×
476
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
477
    return code;
×
478
  }
479

480
  tDecoderClear(&decoder);
737,430✔
481
  return 0;
737,512✔
482
}
483

484
SWalReader* tqGetWalReader(STqReader* pReader) { return pReader->pWalReader; }
695,205✔
485

486
SSDataBlock* tqGetResultBlock(STqReader* pReader) { return pReader->pResBlock; }
625,198✔
487

488
int64_t tqGetResultBlockTime(STqReader* pReader) { return pReader->lastTs; }
625,176✔
489

490
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
156,746✔
491
  if (pReader->msg.msgStr == NULL) {
156,746!
492
    return false;
×
493
  }
494

495
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
156,746✔
496
  while (pReader->nextBlk < numOfBlocks) {
192,469✔
497
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
96,488✔
498
            (pReader->nextBlk + 1), numOfBlocks, idstr);
499

500
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
96,493✔
501
    if (pSubmitTbData == NULL) {
96,492!
502
      return false;
×
503
    }
504
    if (pReader->tbIdHash == NULL) {
96,492!
505
      return true;
×
506
    }
507

508
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
96,492✔
509
    if (ret != NULL) {
96,492✔
510
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
60,773✔
511
      return true;
60,772✔
512
    } else {
513
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
35,719✔
514
              taosHashGetSize(pReader->tbIdHash), idstr);
515
    }
516

517
    pReader->nextBlk++;
35,720✔
518
  }
519

520
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
95,981✔
521
  pReader->nextBlk = 0;
95,984✔
522
  pReader->msg.msgStr = NULL;
95,984✔
523

524
  return false;
95,984✔
525
}
526

527
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
169,304✔
528
  if (pReader->msg.msgStr == NULL) return false;
169,304!
529

530
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
169,304✔
531
  while (pReader->nextBlk < blockSz) {
169,748✔
532
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
84,978✔
533
    if (pSubmitTbData == NULL) return false;
84,975!
534
    if (filterOutUids == NULL) return true;
84,975!
535

536
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
84,975✔
537
    if (ret == NULL) {
84,984✔
538
      return true;
84,555✔
539
    }
540
    pReader->nextBlk++;
429✔
541
  }
542

543
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
84,770✔
544
  pReader->nextBlk = 0;
84,881✔
545
  pReader->msg.msgStr = NULL;
84,881✔
546

547
  return false;
84,881✔
548
}
549

550
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
117,468✔
551
  int32_t code = 0;
117,468✔
552

553
  int32_t cnt = 0;
117,468✔
554
  for (int32_t i = 0; i < pSrc->nCols; i++) {
699,620✔
555
    cnt += mask[i];
582,152✔
556
  }
557

558
  pDst->nCols = cnt;
117,468✔
559
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
117,468✔
560
  if (pDst->pSchema == NULL) {
117,480✔
561
    return TAOS_GET_TERRNO(terrno);
6!
562
  }
563

564
  int32_t j = 0;
117,474✔
565
  for (int32_t i = 0; i < pSrc->nCols; i++) {
698,683✔
566
    if (mask[i]) {
581,097!
567
      pDst->pSchema[j++] = pSrc->pSchema[i];
581,108✔
568
      SColumnInfoData colInfo =
569
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
581,108✔
570
      code = blockDataAppendColInfo(pBlock, &colInfo);
581,892✔
571
      if (code != 0) {
581,220!
572
        return code;
×
573
      }
574
    }
575
  }
576
  return 0;
117,586✔
577
}
578

579
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
1,267✔
580
  SSDataBlock* pBlock = pReader->pResBlock;
1,267✔
581
  if (blockDataGetNumOfCols(pBlock) > 0) {
1,267✔
582
      blockDataDestroy(pBlock);
1✔
583
      int32_t code = createDataBlock(&pReader->pResBlock);
1✔
584
      if (code) {
1!
585
        return code;
×
586
      }
587
      pBlock = pReader->pResBlock;
1✔
588

589
      pBlock->info.id.uid = pReader->cachedSchemaUid;
1✔
590
      pBlock->info.version = pReader->msg.ver;
1✔
591
  }
592

593
  int32_t numOfCols = taosArrayGetSize(pColIdList);
1,267✔
594

595
  if (numOfCols == 0) {  // all columns are required
1,267!
596
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
597
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
598
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
599

600
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
601
      if (code != TSDB_CODE_SUCCESS) {
×
602
        blockDataFreeRes(pBlock);
×
603
        return terrno;
×
604
      }
605
    }
606
  } else {
607
    if (numOfCols > pSchema->nCols) {
1,267✔
608
      numOfCols = pSchema->nCols;
1✔
609
    }
610

611
    int32_t i = 0;
1,267✔
612
    int32_t j = 0;
1,267✔
613
    while (i < pSchema->nCols && j < numOfCols) {
8,422✔
614
      SSchema* pColSchema = &pSchema->pSchema[i];
7,152✔
615
      col_id_t colIdSchema = pColSchema->colId;
7,152✔
616

617
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
7,152✔
618
      if (pColIdNeed == NULL) {
7,152!
619
        break;
×
620
      }
621
      if (colIdSchema < *pColIdNeed) {
7,152✔
622
        i++;
821✔
623
      } else if (colIdSchema > *pColIdNeed) {
6,331!
624
        j++;
×
625
      } else {
626
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
6,331✔
627
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
6,336✔
628
        if (code != TSDB_CODE_SUCCESS) {
6,334!
629
          return -1;
×
630
        }
631
        i++;
6,334✔
632
        j++;
6,334✔
633
      }
634
    }
635
  }
636

637
  return TSDB_CODE_SUCCESS;
1,270✔
638
}
639

640
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
69,064,362✔
641
  int32_t code = TSDB_CODE_SUCCESS;
69,064,362✔
642

643
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
78,364,650!
644
    char val[65535 + 2] = {0};
8,790,203✔
645
    if (COL_VAL_IS_VALUE(pColVal)) {
8,790,203!
646
      if (pColVal->value.pData != NULL) {
9,313,814!
647
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
9,316,313✔
648
      }
649
      varDataSetLen(val, pColVal->value.nData);
9,313,814✔
650
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
9,313,814✔
651
    } else {
652
      colDataSetNULL(pColumnInfoData, rowIndex);
×
653
    }
654
  } else {
655
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
60,274,159✔
656
  }
657

658
  return code;
69,277,656✔
659
}
660

661
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
589,533✔
662
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
589,533✔
663
  int32_t        code = 0;
589,533✔
664
  int32_t        line = 0;
589,533✔
665
  STSchema*      pTSchema = NULL;
589,533✔
666
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
589,533✔
667
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
589,528!
668
  SSDataBlock* pBlock = pReader->pResBlock;
589,528✔
669
  *pRes = pBlock;
589,528✔
670

671
  blockDataCleanup(pBlock);
589,528✔
672

673
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
589,478✔
674
  int32_t sversion = pSubmitTbData->sver;
589,478✔
675
  int64_t suid = pSubmitTbData->suid;
589,478✔
676
  int64_t uid = pSubmitTbData->uid;
589,478✔
677
  pReader->lastTs = pSubmitTbData->ctimeMs;
589,478✔
678

679
  pBlock->info.id.uid = uid;
589,478✔
680
  pBlock->info.version = pReader->msg.ver;
589,478✔
681

682
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
589,478✔
683
      (pReader->cachedSchemaVer != sversion)) {
588,217✔
684
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,276✔
685

686
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
1,267✔
687
    if (pReader->pSchemaWrapper == NULL) {
1,267!
688
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
689
             "version %d, possibly dropped table",
690
             vgId, suid, uid, pReader->cachedSchemaVer);
691
      pReader->cachedSchemaSuid = 0;
×
692
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
693
    }
694

695
    pReader->cachedSchemaUid = uid;
1,267✔
696
    pReader->cachedSchemaSuid = suid;
1,267✔
697
    pReader->cachedSchemaVer = sversion;
1,267✔
698

699
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
1,267!
700
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
701
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
702
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
703
    }
704
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
1,267✔
705
    TSDB_CHECK_CODE(code, line, END);
1,267!
706
    pBlock = pReader->pResBlock;
1,267✔
707
    *pRes = pBlock;
1,267✔
708
  }
709

710
  int32_t numOfRows = 0;
589,469✔
711
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
589,469✔
712
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
4✔
713
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
4!
714
    numOfRows = pCol->nVal;
4✔
715
  } else {
716
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
589,465✔
717
  }
718

719
  code = blockDataEnsureCapacity(pBlock, numOfRows);
589,465✔
720
  TSDB_CHECK_CODE(code, line, END);
589,462!
721
  pBlock->info.rows = numOfRows;
589,462✔
722
  int32_t colActual = blockDataGetNumOfCols(pBlock);
589,462✔
723

724
  // convert and scan one block
725
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
589,472✔
726
    SArray* pCols = pSubmitTbData->aCol;
4✔
727
    int32_t numOfCols = taosArrayGetSize(pCols);
4✔
728
    int32_t targetIdx = 0;
4✔
729
    int32_t sourceIdx = 0;
4✔
730
    while (targetIdx < colActual) {
18✔
731
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14✔
732
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
14!
733
      if (sourceIdx >= numOfCols) {
14✔
734
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
4!
735
        colDataSetNNULL(pColData, 0, numOfRows);
4!
736
        targetIdx++;
4✔
737
        continue;
4✔
738
      }
739

740
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
10✔
741
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
10!
742
      SColVal colVal = {0};
10✔
743
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
10!
744
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
745
      if (pCol->cid < pColData->info.colId) {
10✔
746
        sourceIdx++;
4✔
747
      } else if (pCol->cid == pColData->info.colId) {
6✔
748
        for (int32_t i = 0; i < pCol->nVal; i++) {
12✔
749
          tColDataGetValue(pCol, i, &colVal);
8✔
750
          code = doSetVal(pColData, i, &colVal);
8✔
751
          TSDB_CHECK_CODE(code, line, END);
8!
752
        }
753
        sourceIdx++;
4✔
754
        targetIdx++;
4✔
755
      } else {
756
        colDataSetNNULL(pColData, 0, numOfRows);
2!
757
        targetIdx++;
2✔
758
      }
759
    }
760
  } else {
761
    SArray*         pRows = pSubmitTbData->aRowP;
589,468✔
762
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
589,468✔
763
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
589,468✔
764
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
589,509✔
765

766
    for (int32_t i = 0; i < numOfRows; i++) {
20,034,352✔
767
      SRow* pRow = taosArrayGetP(pRows, i);
19,509,499✔
768
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
19,468,994!
769
      int32_t sourceIdx = 0;
19,476,713✔
770
      for (int32_t j = 0; j < colActual; j++) {
70,808,819✔
771
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
51,363,975✔
772
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
51,282,873!
773

774
        while (1) {
10,447,464✔
775
          SColVal colVal = {0};
61,730,337✔
776
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
61,730,337✔
777
          TSDB_CHECK_CODE(code, line, END);
61,507,312!
778

779
          if (colVal.cid < pColData->info.colId) {
61,507,312✔
780
            sourceIdx++;
10,447,464✔
781
            continue;
10,447,464✔
782
          } else if (colVal.cid == pColData->info.colId) {
51,059,848!
783
            code = doSetVal(pColData, i, &colVal);
51,411,092✔
784
            TSDB_CHECK_CODE(code, line, END);
51,683,350!
785
            sourceIdx++;
51,683,350✔
786
            break;
51,332,106✔
787
          } else {
788
            colDataSetNULL(pColData, i);
×
789
            break;
×
790
          }
791
        }
792
      }
793
    }
794
  }
795

796
END:
524,853✔
797
  if (code != 0) {
524,857!
798
    tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
×
799
  }
800
  taosMemoryFreeClear(pTSchema);
589,483✔
801
  return code;
589,546✔
802
}
803

804
#define PROCESS_VAL                                      \
805
  if (curRow == 0) {                                     \
806
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
807
    buildNew = true;                                     \
808
  } else {                                               \
809
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
810
    if (currentRowAssigned != assigned[j]) {             \
811
      assigned[j] = currentRowAssigned;                  \
812
      buildNew = true;                                   \
813
    }                                                    \
814
  }
815

816
#define SET_DATA                                                     \
817
  if (colVal.cid < pColData->info.colId) {                           \
818
    sourceIdx++;                                                     \
819
  } else if (colVal.cid == pColData->info.colId) {                   \
820
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
821
    sourceIdx++;                                                     \
822
    targetIdx++;                                                     \
823
  }
824

825
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
117,346✔
826
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
827
                               int32_t* lastRow) {
828
  int32_t         code = 0;
117,346✔
829
  SSchemaWrapper* pSW = NULL;
117,346✔
830
  SSDataBlock*    block = NULL;
117,346✔
831
  if (taosArrayGetSize(blocks) > 0) {
117,346!
832
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
833
    TQ_NULL_GO_TO_END(pLastBlock);
×
834
    pLastBlock->info.rows = curRow - *lastRow;
×
835
    *lastRow = curRow;
×
836
  }
837

838
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
117,384✔
839
  TQ_NULL_GO_TO_END(block);
117,520!
840

841
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
117,520✔
842
  TQ_NULL_GO_TO_END(pSW);
117,534!
843

844
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
117,534!
845
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
117,477✔
846
          (int32_t)taosArrayGetSize(block->pDataBlock));
847

848
  block->info.id.uid = pSubmitTbData->uid;
117,477✔
849
  block->info.version = pReader->msg.ver;
117,477✔
850
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
117,477!
851
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
117,468!
852
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
117,430!
853
  pSW = NULL;
117,430✔
854
  taosMemoryFreeClear(block);
117,430✔
855

856
END:
21✔
857
  tDeleteSchemaWrapper(pSW);
117,566!
858
  blockDataFreeRes(block);
117,548✔
859
  taosMemoryFree(block);
117,491✔
860
  return code;
117,508✔
861
}
862
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
113✔
863
  int32_t code = 0;
113✔
864
  int32_t curRow = 0;
113✔
865
  int32_t lastRow = 0;
113✔
866

867
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
113✔
868
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
113✔
869
  TQ_NULL_GO_TO_END(assigned);
113!
870

871
  SArray*   pCols = pSubmitTbData->aCol;
113✔
872
  SColData* pCol = taosArrayGet(pCols, 0);
113✔
873
  TQ_NULL_GO_TO_END(pCol);
113!
874
  int32_t numOfRows = pCol->nVal;
113✔
875
  int32_t numOfCols = taosArrayGetSize(pCols);
113✔
876
  for (int32_t i = 0; i < numOfRows; i++) {
297✔
877
    bool buildNew = false;
185✔
878

879
    for (int32_t j = 0; j < numOfCols; j++) {
952✔
880
      pCol = taosArrayGet(pCols, j);
767✔
881
      TQ_NULL_GO_TO_END(pCol);
767!
882
      SColVal colVal = {0};
767✔
883
      tColDataGetValue(pCol, i, &colVal);
767✔
884
      PROCESS_VAL
767!
885
    }
886

887
    if (buildNew) {
185✔
888
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
113!
889
                                       curRow, &lastRow));
890
    }
891

892
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
185✔
893
    TQ_NULL_GO_TO_END(pBlock);
185!
894

895
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
185!
896
            (int32_t)taosArrayGetSize(blocks));
897

898
    int32_t targetIdx = 0;
185✔
899
    int32_t sourceIdx = 0;
185✔
900
    int32_t colActual = blockDataGetNumOfCols(pBlock);
185✔
901
    while (targetIdx < colActual) {
945✔
902
      pCol = taosArrayGet(pCols, sourceIdx);
761✔
903
      TQ_NULL_GO_TO_END(pCol);
761!
904
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
761✔
905
      TQ_NULL_GO_TO_END(pColData);
761!
906
      SColVal colVal = {0};
761✔
907
      tColDataGetValue(pCol, i, &colVal);
761✔
908
      SET_DATA
761!
909
    }
910

911
    curRow++;
184✔
912
  }
913
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
112✔
914
  pLastBlock->info.rows = curRow - lastRow;
112✔
915

916
END:
112✔
917
  taosMemoryFree(assigned);
112✔
918
  return code;
112✔
919
}
920

921
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
117,390✔
922
  int32_t   code = 0;
117,390✔
923
  STSchema* pTSchema = NULL;
117,390✔
924

925
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
117,390✔
926
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
117,390✔
927
  TQ_NULL_GO_TO_END(assigned);
117,453!
928

929
  int32_t curRow = 0;
117,453✔
930
  int32_t lastRow = 0;
117,453✔
931
  SArray* pRows = pSubmitTbData->aRowP;
117,453✔
932
  int32_t numOfRows = taosArrayGetSize(pRows);
117,453✔
933
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
117,408✔
934

935
  for (int32_t i = 0; i < numOfRows; i++) {
3,904,292✔
936
    bool  buildNew = false;
3,787,006✔
937
    SRow* pRow = taosArrayGetP(pRows, i);
3,787,006✔
938
    TQ_NULL_GO_TO_END(pRow);
3,780,106!
939

940
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
21,372,184✔
941
      SColVal colVal = {0};
17,620,874✔
942
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
17,620,874!
943
      PROCESS_VAL
17,592,909!
944
    }
945

946
    if (buildNew) {
3,751,310✔
947
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
117,291!
948
                                       curRow, &lastRow));
949
    }
950

951
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
3,751,404✔
952
    TQ_NULL_GO_TO_END(pBlock);
3,764,451!
953

954
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
3,764,451✔
955
            (int32_t)taosArrayGetSize(blocks));
956

957
    int32_t targetIdx = 0;
3,764,451✔
958
    int32_t sourceIdx = 0;
3,764,451✔
959
    int32_t colActual = blockDataGetNumOfCols(pBlock);
3,764,451✔
960
    while (targetIdx < colActual) {
21,303,956✔
961
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
17,506,275✔
962
      SColVal          colVal = {0};
17,486,182✔
963
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
17,486,182!
964
      SET_DATA
17,400,336!
965
    }
966

967
    curRow++;
3,797,681✔
968
  }
969
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
117,286✔
970
  pLastBlock->info.rows = curRow - lastRow;
117,264✔
971

972
END:
117,264✔
973
  taosMemoryFreeClear(pTSchema);
117,264✔
974
  taosMemoryFree(assigned);
117,415✔
975
  return code;
117,390✔
976
}
977

978
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
117,615✔
979
  tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
117,615✔
980
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
117,615✔
981
  if (pSubmitTbData == NULL) {
117,613!
982
    return terrno;
×
983
  }
984
  pReader->nextBlk++;
117,613✔
985

986
  if (pSubmitTbDataRet) {
117,613!
987
    *pSubmitTbDataRet = pSubmitTbData;
117,613✔
988
  }
989

990
  int32_t sversion = pSubmitTbData->sver;
117,613✔
991
  int64_t uid = pSubmitTbData->uid;
117,613✔
992
  pReader->lastBlkUid = uid;
117,613✔
993

994
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
117,613✔
995
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
117,621✔
996
  if (pReader->pSchemaWrapper == NULL) {
117,596✔
997
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
70✔
998
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
999
    pReader->cachedSchemaSuid = 0;
47✔
1000
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
47✔
1001
  }
1002

1003
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
117,526✔
1004
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
113✔
1005
  } else {
1006
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
117,413✔
1007
  }
1008
}
1009

1010
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
5,361✔
1011

1012
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
5,442✔
1013
  if (pReader->tbIdHash) {
5,442✔
1014
    taosHashClear(pReader->tbIdHash);
14✔
1015
  } else {
1016
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
5,428✔
1017
    if (pReader->tbIdHash == NULL) {
5,432!
1018
      tqError("s-task:%s failed to init hash table", id);
×
1019
      return;
×
1020
    }
1021
  }
1022

1023
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
46,940✔
1024
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
41,500✔
1025
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
41,498!
1026
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1027
      continue;
×
1028
    }
1029
  }
1030

1031
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
5,441✔
1032
}
1033

1034
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
10,782✔
1035
  if (pReader->tbIdHash == NULL) {
10,782!
1036
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1037
    if (pReader->tbIdHash == NULL) {
×
1038
      tqError("failed to init hash table");
×
1039
      return;
×
1040
    }
1041
  }
1042

1043
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
10,782✔
1044
  for (int i = 0; i < numOfTables; i++) {
11,452✔
1045
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
670✔
1046
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
670!
1047
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1048
      continue;
×
1049
    }
1050
  }
1051
}
1052

1053
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
1,237✔
1054
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
1,237✔
1055
}
1056

1057
bool tqCurrentBlockConsumed(const STqReader* pReader) { return pReader->msg.msgStr == NULL; }
104,133✔
1058

1059
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
769✔
1060
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
2,291✔
1061
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
1,522✔
1062
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
1,522!
1063
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
1,454!
1064
    }
1065
  }
1066
}
769✔
1067

1068
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
125,707✔
1069
  void*   pIter = NULL;
125,707✔
1070
  int32_t vgId = TD_VID(pTq->pVnode);
125,707✔
1071

1072
  // update the table list for each consumer handle
1073
  taosWLockLatch(&pTq->lock);
125,707✔
1074
  while (1) {
4,075✔
1075
    pIter = taosHashIterate(pTq->pHandle, pIter);
129,789✔
1076
    if (pIter == NULL) {
129,788✔
1077
      break;
125,713✔
1078
    }
1079

1080
    STqHandle* pTqHandle = (STqHandle*)pIter;
4,075✔
1081
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,075✔
1082
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
906✔
1083
      if (code != 0) {
906!
1084
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1085
        continue;
×
1086
      }
1087
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
3,169✔
1088
      if (!isAdd) {
3,155✔
1089
        int32_t sz = taosArrayGetSize(tbUidList);
1,062✔
1090
        for (int32_t i = 0; i < sz; i++) {
2,422✔
1091
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
1,360✔
1092
          if (tbUid &&
2,720!
1093
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
1,360✔
1094
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1095
            continue;
×
1096
          }
1097
        }
1098
      }
1099
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
14!
1100
      if (isAdd) {
14!
1101
        SArray* list = NULL;
14✔
1102
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
14✔
1103
                                    &list, pTqHandle->execHandle.task);
1104
        if (ret != TDB_CODE_SUCCESS) {
14!
1105
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1106
                  pTqHandle->consumerId);
1107
          taosArrayDestroy(list);
×
1108
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1109
          taosWUnLockLatch(&pTq->lock);
×
1110

1111
          return ret;
×
1112
        }
1113
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
14✔
1114
        taosArrayDestroy(list);
14✔
1115
      } else {
1116
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1117
      }
1118
    }
1119
  }
1120
  taosWUnLockLatch(&pTq->lock);
125,713✔
1121

1122
  // update the table list handle for each stream scanner/wal reader
1123
  streamMetaWLock(pTq->pStreamMeta);
125,714✔
1124
  while (1) {
21,477✔
1125
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
147,193✔
1126
    if (pIter == NULL) {
147,192✔
1127
      break;
125,714✔
1128
    }
1129

1130
    int64_t      refId = *(int64_t*)pIter;
21,478✔
1131
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
21,478✔
1132
    if (pTask != NULL) {
21,477!
1133
      int32_t taskId = pTask->id.taskId;
21,477✔
1134

1135
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
21,477✔
1136
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
10,658✔
1137
        if (code != 0) {
10,658✔
1138
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
29!
1139
        }
1140
      }
1141
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
21,477✔
1142
      if (ret) {
21,477!
1143
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1144
      }
1145
    }
1146
  }
1147

1148
  streamMetaWUnLock(pTq->pStreamMeta);
125,714✔
1149
  return 0;
125,710✔
1150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc