• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4973

03 Mar 2026 09:57AM UTC coverage: 67.69% (+0.05%) from 67.642%
#4973

push

travis-ci

web-flow
test(query): fix result of explain cases (#34658)

208339 of 307783 relevant lines covered (67.69%)

131201157.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.26
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "taoserror.h"
17
#include "tarray.h"
18
#include "tdef.h"
19
#include "thash.h"
20
#include "tmsg.h"
21
#include "tpriv.h"
22
#include "tq.h"
23

24
static int32_t tqRetrieveCols(STqReader *pReader, SSDataBlock *pRes, SHashObj* pCol2SlotId);
25

26
static void processCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
1,279✔
27
  int32_t code = 0;
1,279✔
28
  int32_t lino = 0;
1,279✔
29
  int32_t        needRebuild = 0;
1,279✔
30
  SVCreateTbReq* pCreateReq = NULL;
1,279✔
31
  SVCreateTbBatchReq reqNew = {0};
1,279✔
32
  void* buf = NULL;
1,279✔
33
  SVCreateTbBatchReq req = {0};
1,279✔
34
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
1,279✔
35
  if (code < 0) {
1,279✔
36
    lino = __LINE__;
×
37
    goto end;
×
38
  }
39

40
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2,877✔
41
    pCreateReq = req.pReqs + iReq;
1,598✔
42
    if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
2,876✔
43
        taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
1,278✔
44
      needRebuild++;
639✔
45
    }
46
  }
47
  if (needRebuild == 0) {
1,279✔
48
    // do nothing
49
  } else if (needRebuild == req.nReqs) {
639✔
50
    *realTbSuid = tbSuid;
320✔
51
  } else {
52
    *realTbSuid = tbSuid;
319✔
53
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
319✔
54
    if (reqNew.pArray == NULL) {
319✔
55
      code = terrno;
×
56
      lino = __LINE__;
×
57
      goto end;
×
58
    }
59
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
957✔
60
      pCreateReq = req.pReqs + iReq;
638✔
61
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
1,276✔
62
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
638✔
63
        reqNew.nReqs++;
319✔
64
        if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
638✔
65
          code = terrno;
×
66
          lino = __LINE__;
×
67
          goto end;
×
68
        }
69
      }
70
    }
71

72
    int     tlen = 0;
319✔
73
    tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
319✔
74
    buf = taosMemoryMalloc(tlen);
319✔
75
    if (NULL == buf || code < 0) {
319✔
76
      lino = __LINE__;
×
77
      goto end;
×
78
    }
79
    SEncoder coderNew = {0};
319✔
80
    tEncoderInit(&coderNew, buf, tlen);
319✔
81
    code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
319✔
82
    tEncoderClear(&coderNew);
319✔
83
    if (code < 0) {
319✔
84
      lino = __LINE__;
×
85
      goto end;
×
86
    }
87
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
319✔
88
    pHead->bodyLen = tlen + sizeof(SMsgHead);
319✔
89
  }
90

91
end:
1,279✔
92
  taosMemoryFree(buf);
1,279✔
93
  taosArrayDestroy(reqNew.pArray);
1,279✔
94
  tDeleteSVCreateTbBatchReq(&req);
1,279✔
95
  if (code < 0) {
1,279✔
96
    tqError("processCreateTbMsg failed, code:%d, line:%d", code, lino);
×
97
  }
98
}
1,279✔
99

100
static void processAlterTbMsg(SDecoder* dcoder, STqReader* pReader, int64_t* realTbSuid) {
960✔
101
  SVAlterTbReq req = {0};
960✔
102
  SMetaReader mr = {0};
960✔
103
  int32_t lino = 0;
960✔
104
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
960✔
105
  if (code < 0) {
960✔
106
    lino = __LINE__;
×
107
    goto end;
×
108
  }
109

110
  metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
960✔
111

112
  code = metaGetTableEntryByName(&mr, req.tbName);
960✔
113
  if (code < 0) {
960✔
114
    lino = __LINE__;
×
115
    goto end;
×
116
  }
117
  if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
960✔
118
    *realTbSuid = mr.me.ctbEntry.suid;
640✔
119
  }
120

121
end:
960✔
122
  taosArrayDestroy(req.pMultiTag);
960✔
123
  metaReaderClear(&mr);  
960✔
124
  if (code < 0) {
960✔
125
    tqError("processAlterTbMsg failed, code:%d, line:%d", code, lino);
×
126
  }
127
} 
960✔
128

129
static void processDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
×
130
  SVDropTbBatchReq req = {0};
×
131
  SVDropTbBatchReq reqNew = {0};
×
132
  void* buf = NULL;
×
133
  int32_t lino = 0;
×
134
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
×
135
  if (code < 0) {
×
136
    lino = __LINE__;
×
137
    goto end;
×
138
  }
139

140
  int32_t      needRebuild = 0;
×
141
  SVDropTbReq* pDropReq = NULL;
×
142
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
143
    pDropReq = req.pReqs + iReq;
×
144

145
    if (pDropReq->suid == tbSuid &&
×
146
        taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
147
      needRebuild++;
×
148
    }
149
  }
150
  if (needRebuild == 0) {
×
151
    // do nothing
152
  } else if (needRebuild == req.nReqs) {
×
153
    *realTbSuid = tbSuid;
×
154
  } else {
155
    *realTbSuid = tbSuid;
×
156
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
157
    if (reqNew.pArray == NULL) {
×
158
      code = terrno;
×
159
      lino = __LINE__;
×
160
      goto end;
×
161
    }
162
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
163
      pDropReq = req.pReqs + iReq;
×
164
      if (pDropReq->suid == tbSuid &&
×
165
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
166
        reqNew.nReqs++;
×
167
        if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
168
          code = terrno;
×
169
          lino = __LINE__;
×
170
          goto end;
×
171
        }
172
      }
173
    }
174

175
    int     tlen = 0;
×
176
    tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
×
177
    buf = taosMemoryMalloc(tlen);
×
178
    if (NULL == buf || code < 0) {
×
179
      lino = __LINE__;
×
180
      goto end;
×
181
    }
182
    SEncoder coderNew = {0};
×
183
    tEncoderInit(&coderNew, buf, tlen);
×
184
    code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
185
    tEncoderClear(&coderNew);
×
186
    if (code != 0) {
×
187
      lino = __LINE__;
×
188
      goto end;
×
189
    }
190
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
191
    pHead->bodyLen = tlen + sizeof(SMsgHead);
×
192
  }
193

194
end:
×
195
  taosMemoryFree(buf);
×
196
  taosArrayDestroy(reqNew.pArray);
×
197
  if (code < 0) {
×
198
    tqError("processDropTbMsg failed, code:%d, line:%d", code, lino);
×
199
  }
200
}
×
201

202
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
147,948✔
203
  int32_t code = 0;
147,948✔
204
  int32_t lino = 0;
147,948✔
205
  if (pHandle == NULL || pHead == NULL) {
147,948✔
206
    return false;
×
207
  }
208
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
147,948✔
209
    return true;
145,070✔
210
  }
211

212
  STqExecHandle* pExec = &pHandle->execHandle;
2,878✔
213
  STqReader* pReader = pExec->pTqReader;
2,878✔
214

215
  int16_t msgType = pHead->msgType;
2,878✔
216
  char*   body = pHead->body;
2,878✔
217
  int32_t bodyLen = pHead->bodyLen;
2,878✔
218

219
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
2,878✔
220
  int64_t  realTbSuid = 0;
2,878✔
221
  SDecoder dcoder = {0};
2,878✔
222
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
2,878✔
223
  int32_t  len = bodyLen - sizeof(SMsgHead);
2,878✔
224
  tDecoderInit(&dcoder, data, len);
2,878✔
225

226
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
3,517✔
227
    SVCreateStbReq req = {0};
639✔
228
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
639✔
229
      goto end;
×
230
    }
231
    realTbSuid = req.suid;
639✔
232
  } else if (msgType == TDMT_VND_DROP_STB) {
2,239✔
233
    SVDropStbReq req = {0};
×
234
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
235
      goto end;
×
236
    }
237
    realTbSuid = req.suid;
×
238
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2,239✔
239
    processCreateTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
1,279✔
240
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
960✔
241
    processAlterTbMsg(&dcoder, pReader, &realTbSuid);
960✔
242
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
243
    processDropTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
×
244
  } else if (msgType == TDMT_VND_DELETE) {
×
245
    SDeleteRes req = {0};
×
246
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
247
      goto end;
×
248
    }
249
    realTbSuid = req.suid;
×
250
  }
251

252
end:
2,878✔
253
  tDecoderClear(&dcoder);
2,878✔
254
  bool tmp = tbSuid == realTbSuid;
2,878✔
255
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
2,878✔
256
  return tmp;
2,878✔
257
}
258

259
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
48,952,207✔
260
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
48,952,207✔
261
    return -1;
×
262
  }
263
  int32_t code = -1;
49,050,064✔
264
  int32_t vgId = TD_VID(pTq->pVnode);
49,050,064✔
265
  int64_t id = pHandle->pWalReader->readerId;
49,052,423✔
266

267
  int64_t offset = *fetchOffset;
49,050,815✔
268
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
49,079,562✔
269
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
49,066,544✔
270
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
49,068,691✔
271

272
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
49,065,148✔
273
          ", 0x%" PRIx64,
274
          vgId, offset, lastVer, committedVer, appliedVer, id);
275

276
  while (offset <= appliedVer) {
51,731,169✔
277
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
47,751,937✔
278
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
279
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
280
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
281
      goto END;
×
282
    }
283

284
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
47,751,308✔
285
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
286

287
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
47,751,557✔
288
      code = walFetchBody(pHandle->pWalReader);
44,966,063✔
289
      goto END;
44,963,846✔
290
    } else {
291
      if (pHandle->fetchMeta != WITH_DATA) {
2,786,510✔
292
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
183,316✔
293
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
183,316✔
294
          code = walFetchBody(pHandle->pWalReader);
147,948✔
295
          if (code < 0) {
147,948✔
296
            goto END;
×
297
          }
298

299
          pHead = &(pHandle->pWalReader->pHead->head);
147,948✔
300
          if (isValValidForTable(pHandle, pHead)) {
147,948✔
301
            code = 0;
146,988✔
302
            goto END;
146,988✔
303
          } else {
304
            offset++;
960✔
305
            code = -1;
960✔
306
            continue;
960✔
307
          }
308
        }
309
      }
310
      code = walSkipFetchBody(pHandle->pWalReader);
2,638,562✔
311
      if (code < 0) {
2,637,972✔
312
        goto END;
×
313
      }
314
      offset++;
2,637,972✔
315
    }
316
    code = -1;
2,637,972✔
317
  }
318

319
END:
3,979,232✔
320
  *fetchOffset = offset;
49,090,066✔
321
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
49,090,421✔
322
          ", applied:%" PRId64 ", 0x%" PRIx64,
323
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
324
  return code;
49,091,306✔
325
}
326

327
bool tqGetTablePrimaryKey(STqReader* pReader) {
5,823,584✔
328
  if (pReader == NULL) {
5,823,584✔
329
    return false;
×
330
  }
331
  return pReader->hasPrimaryKey;
5,823,584✔
332
}
333

334
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
44,228✔
335
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
44,228✔
336

337
  if (pReader == NULL) {
44,228✔
338
    return;
×
339
  }
340
  bool            ret = false;
44,228✔
341
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnode->pMeta, uid, -1, 1, NULL, 0);
44,228✔
342
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
44,228✔
343
    ret = true;
921✔
344
  }
345
  tDeleteSchemaWrapper(schema);
346
  pReader->hasPrimaryKey = ret;
44,228✔
347
}
348

349
static void freeTagCache(void* pData){
1,628,705✔
350
  if (pData == NULL) return;
1,628,705✔
351
  SArray* tagCache = *(SArray**)pData;
1,628,705✔
352
  taosArrayDestroyP(tagCache, taosMemFree);
1,629,000✔
353
}
354

355
STqReader* tqReaderOpen(SVnode* pVnode) {
441,528✔
356
  tqDebug("%s:%p", __FUNCTION__, pVnode);
441,528✔
357
  if (pVnode == NULL) {
446,277✔
358
    return NULL;
×
359
  }
360
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
446,277✔
361
  if (pReader == NULL) {
446,277✔
362
    return NULL;
×
363
  }
364

365
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
446,277✔
366
  if (pReader->pWalReader == NULL) {
446,277✔
367
    taosMemoryFree(pReader);
×
368
    return NULL;
×
369
  }
370

371
  pReader->pVnode = pVnode;
446,277✔
372
  pReader->pSchemaWrapper = NULL;
446,277✔
373
  pReader->tbIdHash = NULL;
446,277✔
374
  pReader->pTableTagCacheForTmq = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
446,277✔
375
  if (pReader->pTableTagCacheForTmq == NULL) {
446,277✔
376
    walCloseReader(pReader->pWalReader);
×
377
    taosMemoryFree(pReader);
×
378
    return NULL;
×
379
  }
380
  taosHashSetFreeFp(pReader->pTableTagCacheForTmq, freeTagCache);
446,277✔
381
  taosInitRWLatch(&pReader->tagCachelock);
446,277✔
382

383
  return pReader;
446,277✔
384
}
385

386
void tqReaderClose(STqReader* pReader) {
449,853✔
387
  tqDebug("%s:%p", __FUNCTION__, pReader);
449,853✔
388
  if (pReader == NULL) return;
450,749✔
389

390
  // close wal reader
391
  walCloseReader(pReader->pWalReader);
446,277✔
392
  taosHashCleanup(pReader->pTableTagCacheForTmq);
446,277✔
393
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
446,277✔
394
  taosMemoryFree(pReader->pTSchema);
446,277✔
395
  taosMemoryFree(pReader->extSchema);
446,277✔
396

397
  // free hash
398
  taosHashCleanup(pReader->tbIdHash);
445,955✔
399
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
446,277✔
400

401
  taosMemoryFree(pReader);
446,277✔
402
}
403

404
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
926,046✔
405
  if (pReader == NULL) {
926,046✔
406
    return TSDB_CODE_INVALID_PARA;
×
407
  }
408
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
926,046✔
409
    return terrno;
23,652✔
410
  }
411
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
902,500✔
412
  return 0;
903,090✔
413
}
414

415
static int32_t getTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid) {
107,009,251✔
416
  int32_t code = 0;
107,009,251✔
417
  int32_t lino = 0;
107,009,251✔
418

419
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
107,009,251✔
420
  if (data == NULL) {
107,085,922✔
421
    SStorageAPI api = {0}; 
38,007,896✔
422
    initStorageAPI(&api);
38,008,211✔
423
    code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, 0, &pReader->tagCachelock);
37,988,739✔
424
    TSDB_CHECK_CODE(code, lino, END);
38,009,923✔
425
  }
426

427
  END:
69,078,026✔
428
  if (code != TSDB_CODE_SUCCESS) {
107,053,664✔
429
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
430
  }
431
  
432
  return code;
107,022,910✔
433
}
434

435
void tqUpdateTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid, col_id_t colId) {
477,822✔
436
  int32_t code = 0;
477,822✔
437
  int32_t lino = 0;
477,822✔
438

439
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
477,822✔
440
  if (data == NULL) {
477,822✔
441
    return;
477,177✔
442
  }
443

444
  SStorageAPI api = {0}; 
645✔
445
  initStorageAPI(&api);
645✔
446
  code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, colId, &pReader->tagCachelock);
645✔
447
  TSDB_CHECK_CODE(code, lino, END);
645✔
448

449
  END:
645✔
450
  if (code != TSDB_CODE_SUCCESS) {
645✔
451
    tqError("%s failed at %d, failed to update tag cache code:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
452
  }
453
}
454

455
static int32_t tqRetrievePseudoCols(STqReader* pReader, SSDataBlock* pBlock, int32_t numOfRows, int64_t uid, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr) {
107,010,094✔
456
  if (pReader == NULL || pBlock == NULL) {
107,010,094✔
457
    return TSDB_CODE_INVALID_PARA;
×
458
  }
459
  int32_t code = TSDB_CODE_SUCCESS;
107,058,496✔
460
  int32_t lino = 0;
107,058,496✔
461
  
462
  code = getTableTagCache(pReader, pPseudoExpr, numOfPseudoExpr, uid);
107,058,496✔
463
  TSDB_CHECK_CODE(code, lino, END);
107,025,333✔
464

465
  code = fillTag(pReader->pTableTagCacheForTmq, pPseudoExpr, numOfPseudoExpr, uid, pBlock, numOfRows, pBlock->info.rows - numOfRows, 1, &pReader->tagCachelock);
107,025,333✔
466
  TSDB_CHECK_CODE(code, lino, END);
107,042,602✔
467

468
END:
107,042,602✔
469
  if (code != 0) {
107,042,602✔
470
    tqError("tqRetrievePseudoCols failed, line:%d, msg:%s", lino, tstrerror(code));
×
471
  }
472
  return code;
107,025,333✔
473
}
474

475
int32_t tqNextBlockInWal(STqReader* pReader, SSDataBlock* pRes, SHashObj* pCol2SlotId, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
22,881,414✔
476
                         int sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
477
  int32_t code = 0;
22,881,414✔
478
  if (pReader == NULL) {
22,881,414✔
479
    return TSDB_CODE_INVALID_PARA;
×
480
  }
481
  SWalReader* pWalReader = pReader->pWalReader;
22,881,414✔
482

483
  int64_t st = taosGetTimestampMs();
22,880,731✔
484
  while (1) {
126,208,696✔
485
    code = walNextValidMsg(pWalReader, false);
149,089,427✔
486
    if (code != 0) {
149,100,094✔
487
      break;
22,122,772✔
488
    }
489

490
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
126,977,322✔
491
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
127,039,787✔
492
    int64_t ver = pWalReader->pHead->head.version;
127,050,149✔
493
    SDecoder decoder = {0};
127,022,893✔
494
    code = tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder);
127,045,785✔
495
    tDecoderClear(&decoder);
126,968,357✔
496
    if (code != 0) {
127,035,495✔
497
      return code;
×
498
    }
499
    pReader->nextBlk = 0;
127,035,495✔
500

501
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
127,046,489✔
502
    while (pReader->nextBlk < numOfBlocks) {
254,061,345✔
503
      tqDebug("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
127,011,985✔
504
              pReader->msg.ver);
505

506
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
127,103,649✔
507
      if (pSubmitTbData == NULL) {
127,111,948✔
508
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
509
                pReader->msg.ver);
510
        return terrno;
×
511
      }
512
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
127,111,948✔
513
        pReader->nextBlk += 1;
27,666✔
514
        continue;
27,666✔
515
      }
516
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
127,080,882✔
517
        tqDebug("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
107,116,556✔
518
        int32_t numOfRows = pRes->info.rows;
107,107,715✔
519
        code = tqRetrieveCols(pReader, pRes, pCol2SlotId);
107,113,155✔
520
        if (code != TSDB_CODE_SUCCESS) {
106,954,794✔
521
          return code;
×
522
        }
523
        code = tqRetrievePseudoCols(pReader, pRes, numOfRows, pSubmitTbData->uid, pPseudoExpr, numOfPseudoExpr);
106,954,794✔
524
        if (code != TSDB_CODE_SUCCESS) {
107,040,240✔
525
          return code;
×
526
        }
527

528
      }
529
      pReader->nextBlk += 1;
127,007,980✔
530
    }
531

532
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
127,058,125✔
533
    pReader->msg.msgStr = NULL;
127,015,005✔
534

535
    if (pRes->info.rows >= minPollRows || (enableReplay && pRes->info.rows > 0)){
127,001,461✔
536
      break;
537
    }
538
    int64_t elapsed = taosGetTimestampMs() - st;
126,430,428✔
539
    if (elapsed > timeout || elapsed < 0) {
126,430,428✔
540
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
152,339✔
541
      terrno = code;
152,339✔
542
      break;
165,240✔
543
    }
544
  }
545
  return code;
22,880,757✔
546
}
547

548
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
171,924,525✔
549
  if (pReader == NULL) {
171,924,525✔
550
    return TSDB_CODE_INVALID_PARA;
×
551
  }
552
  pReader->msg.msgStr = msgStr;
171,924,525✔
553
  pReader->msg.msgLen = msgLen;
171,962,432✔
554
  pReader->msg.ver = ver;
171,965,372✔
555

556
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
172,004,364✔
557

558
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
172,004,364✔
559
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
171,982,766✔
560

561
  if (code != 0) {
171,942,143✔
562
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
563
  }
564

565
  return code;
171,960,051✔
566
}
567

568
void tqReaderClearSubmitMsg(STqReader* pReader) {
89,813,257✔
569
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
89,813,257✔
570
  pReader->nextBlk = 0;
89,836,326✔
571
  pReader->msg.msgStr = NULL;
89,846,939✔
572
}
89,845,597✔
573

574
SWalReader* tqGetWalReader(STqReader* pReader) {
46,523,848✔
575
  if (pReader == NULL) {
46,523,848✔
576
    return NULL;
×
577
  }
578
  return pReader->pWalReader;
46,523,848✔
579
}
580

581
int64_t tqGetResultBlockTime(STqReader* pReader) {
22,880,824✔
582
  if (pReader == NULL) {
22,880,824✔
583
    return 0;
×
584
  }
585
  return pReader->lastTs;
22,880,824✔
586
}
587

588
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
23,165,932✔
589
  int32_t code = false;
23,165,932✔
590
  int32_t lino = 0;
23,165,932✔
591
  int64_t uid = 0;
23,165,932✔
592
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
23,165,932✔
593
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
23,165,932✔
594
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
23,167,637✔
595

596
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
23,171,063✔
597
  while (pReader->nextBlk < blockSz) {
24,316,585✔
598
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
12,160,110✔
599
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
12,159,179✔
600
    uid = pSubmitTbData->uid;
12,159,179✔
601
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
12,158,838✔
602
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
12,161,085✔
603

604
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
1,144,531✔
605
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
606
    pReader->nextBlk++;
1,144,531✔
607
  }
608

609
  tqReaderClearSubmitMsg(pReader);
12,157,097✔
610
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
12,156,756✔
611

612
END:
12,156,756✔
613
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
23,173,310✔
614
  return code;
23,168,877✔
615
}
616

617
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
65,568,246✔
618
  int32_t code = false;
65,568,246✔
619
  int32_t lino = 0;
65,568,246✔
620
  int64_t uid = 0;
65,568,246✔
621

622
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
65,568,246✔
623
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
65,568,246✔
624
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
65,586,211✔
625

626
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
65,586,211✔
627
  while (pReader->nextBlk < blockSz) {
65,598,486✔
628
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
32,817,826✔
629
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
32,819,000✔
630
    uid = pSubmitTbData->uid;
32,819,000✔
631
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
32,819,295✔
632
    TSDB_CHECK_NULL(ret, code, lino, END, true);
32,820,416✔
633
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
634
    pReader->nextBlk++;
×
635
  }
636
  tqReaderClearSubmitMsg(pReader);
32,797,198✔
637
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
32,790,559✔
638

639
END:
32,790,559✔
640
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
65,610,975✔
641
  return code;
65,537,518✔
642
}
643

644
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
43,548,524✔
645
                    SExtSchema* extSrc) {
646
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
43,548,524✔
647
    return TSDB_CODE_INVALID_PARA;
×
648
  }
649
  int32_t code = 0;
43,609,453✔
650

651
  int32_t cnt = 0;
43,609,453✔
652
  for (int32_t i = 0; i < pSrc->nCols; i++) {
244,132,531✔
653
    cnt += mask[i];
200,536,368✔
654
  }
655

656
  pDst->nCols = cnt;
43,575,846✔
657
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
43,616,021✔
658
  if (pDst->pSchema == NULL) {
43,566,665✔
659
    return TAOS_GET_TERRNO(terrno);
×
660
  }
661

662
  int32_t j = 0;
43,564,514✔
663
  for (int32_t i = 0; i < pSrc->nCols; i++) {
244,173,462✔
664
    if (mask[i]) {
200,475,638✔
665
      pDst->pSchema[j++] = pSrc->pSchema[i];
200,546,796✔
666
      SColumnInfoData colInfo =
200,593,905✔
667
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
200,583,714✔
668
      if (extSrc != NULL) {
200,555,651✔
669
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
56,637✔
670
      }
671
      code = blockDataAppendColInfo(pBlock, &colInfo);
200,555,651✔
672
      if (code != 0) {
200,570,100✔
673
        return code;
×
674
      }
675
    }
676
  }
677
  return 0;
43,635,926✔
678
}
679

680
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
681
  int32_t code = 0;
×
682
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
683
    return TSDB_CODE_INVALID_PARA;
×
684
  }
685
  // TODO(yhDeng)
686
  if (COL_VAL_IS_VALUE(pColVal)) {
×
687
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
688
    if (val == NULL) {
×
689
      return terrno;
×
690
    }
691

692
    uint64_t seq = 0;
×
693
    int32_t  len = 0;
×
694
    if (pColVal->value.pData != NULL) {
×
695
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
696
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
697
      }
698
      SBlobItem item = {0};
×
699
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
700
      if (code != 0) {
×
701
        taosMemoryFree(val);
×
702
        terrno = code;
×
703
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
704
        return code;
×
705
      }
706

707
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
708
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
709
      len = item.len;
×
710
    }
711

712
    blobDataSetLen(val, len);
×
713
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
714

715
    taosMemoryFree(val);
×
716
  } else {
717
    colDataSetNULL(pColumnInfoData, idx);
×
718
  }
719
  return code;
×
720
}
721
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
722
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
723

724
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
725
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
726
      char val[65535 + 2] = {0};
2,147,483,647✔
727
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
728
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647✔
729
      }
730
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
731
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
732
    } else {
733
      colDataSetNULL(pColumnInfoData, rowIndex);
×
734
    }
735
  } else {
736
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
737
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
738
  }
739

740
  return code;
2,147,483,647✔
741
}
742

743
static int32_t setBlockData(SSDataBlock* pBlock, int32_t slotId, int32_t rowIndex, SColVal* colVal, SBlobSet* pBlobSet) {
2,147,483,647✔
744
  int32_t        code = 0;
2,147,483,647✔
745
  SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
746
  if (pColData == NULL) {
2,147,483,647✔
747
    return terrno;
×
748
  }
749

750
  uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
751
  if (isBlob == 0) {
2,147,483,647✔
752
    code = doSetVal(pColData, rowIndex, colVal);
2,147,483,647✔
753
  } else {
754
    code = doSetBlobVal(pColData, rowIndex, colVal, pBlobSet);
349✔
755
  }
756
  return code;
2,147,483,647✔
757
}
758

759
static int32_t processSubmitRow(SArray*         pRows,
107,089,224✔
760
                                SSDataBlock*    pBlock,
761
                                SHashObj*       pCol2SlotId,
762
                                STqReader*      pReader,
763
                                SBlobSet*       pBlobSet) {
764
  int32_t        code = 0;
107,089,224✔
765
  int32_t        line = 0;
107,089,224✔
766

767
  SArray* pColArray = taosArrayInit(4, INT_BYTES * 2);
107,089,224✔
768
  TSDB_CHECK_NULL(pColArray, code, line, END, terrno);
107,097,883✔
769

770
  int32_t sourceIdx = -1;
107,097,883✔
771
  int32_t rowIndex = 0;
107,097,883✔
772
  SRow* pRow = taosArrayGetP(pRows, rowIndex);
107,097,883✔
773
  TSDB_CHECK_NULL(pRow, code, line, END, terrno);
107,084,373✔
774
  while (++sourceIdx < pReader->pTSchema->numOfCols) {
871,092,199✔
775
    SColVal colVal = {0};
764,530,770✔
776
    code = tRowGet(pRow, pReader->pTSchema, sourceIdx, &colVal);
764,646,294✔
777
    TSDB_CHECK_CODE(code, line, END);
765,214,894✔
778
    void* pSlotId = taosHashGet(pCol2SlotId, &colVal.cid, sizeof(colVal.cid));
765,214,894✔
779
    if (pSlotId == NULL) {
766,521,830✔
780
      continue;
266,401,553✔
781
    }
782
    int32_t pData[2] = {sourceIdx, *(int16_t*)pSlotId};
500,120,277✔
783
    TSDB_CHECK_NULL(taosArrayPush(pColArray, pData), code, line, END, terrno);
500,364,722✔
784
    code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
500,364,722✔
785
    TSDB_CHECK_CODE(code, line, END);
498,147,522✔
786
  }
787
  
788
  for (rowIndex = 1; rowIndex < taosArrayGetSize(pRows); rowIndex++) {
2,147,483,647✔
789
    SRow* pRow = taosArrayGetP(pRows, rowIndex);
2,147,483,647✔
790
    TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
791
    for (int32_t j = 0; j < taosArrayGetSize(pColArray); j++) {
2,147,483,647✔
792
      int32_t* pData = taosArrayGet(pColArray, j);
2,147,483,647✔
793
      TSDB_CHECK_NULL(pData, code, line, END, terrno);
2,147,483,647✔
794

795
      SColVal colVal = {0};
2,147,483,647✔
796
      code = tRowGet(pRow, pReader->pTSchema, pData[0], &colVal);
2,147,483,647✔
797
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
798

799
      code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
2,147,483,647✔
800
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
801
    }
802
  }
803

804
  END:
96,378,014✔
805
  taosArrayDestroy(pColArray);
74,769,915✔
806
  return code;
107,011,924✔
807
}
808

809
static int32_t processSubmitCol(SArray*         pCols,
1,300✔
810
                                SSDataBlock*    pBlock,
811
                                SHashObj*       pCol2SlotId,
812
                                SBlobSet*       pBlobSet) {
813
  int32_t        code = 0;
1,300✔
814
  int32_t        line = 0;
1,300✔
815

816
  for (int32_t i = 0; i < taosArrayGetSize(pCols); i++) {
3,900✔
817
    SColData* pCol = taosArrayGet(pCols, i);
2,600✔
818
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
2,600✔
819
    void* pSlotId = taosHashGet(pCol2SlotId, &pCol->cid, sizeof(pCol->cid));
2,600✔
820
    if (pSlotId == NULL) {
2,600✔
821
      continue;
1,300✔
822
    }
823
    SColVal colVal = {0};
1,300✔
824
    for (int32_t row = 0; row < pCol->nVal; row++) {
3,900✔
825
      code = tColDataGetValue(pCol, row, &colVal);
2,600✔
826
      TSDB_CHECK_CODE(code, line, END);
2,600✔
827

828
      code = setBlockData(pBlock, *(int16_t*)pSlotId, pBlock->info.rows + row, &colVal, pBlobSet);
2,600✔
829
      TSDB_CHECK_CODE(code, line, END);
2,600✔
830
    }
831
  }
832
  
833
  END:
1,300✔
834
  return code;
1,300✔
835
}
836

837
static int32_t checkSchema(STqReader* pReader, SSubmitTbData* pSubmitTbData) {
107,092,791✔
838
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
107,092,791✔
839
  int32_t sversion = pSubmitTbData->sver;
107,103,639✔
840
  int64_t suid = pSubmitTbData->suid;
107,108,713✔
841
  int64_t uid = pSubmitTbData->uid;
107,110,753✔
842
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
107,115,513✔
843
      (pReader->cachedSchemaVer != sversion)) {
106,971,640✔
844
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
151,954✔
845
    taosMemoryFreeClear(pReader->extSchema);
143,193✔
846
    taosMemoryFreeClear(pReader->pTSchema);
143,193✔
847
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
143,193✔
848
    if (pReader->pSchemaWrapper == NULL) {
142,842✔
849
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d, possibly dropped table",
×
850
              vgId, suid, uid, pReader->cachedSchemaVer);
851
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
852
    }
853
    pReader->pTSchema = tBuildTSchema(pReader->pSchemaWrapper->pSchema, pReader->pSchemaWrapper->nCols, pReader->pSchemaWrapper->version);
141,892✔
854
    if (pReader->pTSchema == NULL) {
142,844✔
855
      tqWarn("vgId:%d, cannot build schema for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d",
×
856
              vgId, suid, uid, pReader->cachedSchemaVer);
857
      return terrno;
×
858
    }
859
    pReader->cachedSchemaUid = uid;
141,901✔
860
    pReader->cachedSchemaSuid = suid;
141,901✔
861
    pReader->cachedSchemaVer = sversion;
142,153✔
862
  }
863
  return TSDB_CODE_SUCCESS;
107,097,902✔
864
}
865

866
static int32_t tqRetrieveCols(STqReader* pReader, SSDataBlock* pBlock, SHashObj* pCol2SlotId) {
107,107,174✔
867
  if (pReader == NULL || pBlock == NULL) {
107,107,174✔
868
    return TSDB_CODE_INVALID_PARA;
×
869
  }
870
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
107,112,475✔
871
  int32_t        code = 0;
107,109,416✔
872
  int32_t        line = 0;
107,109,416✔
873
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
107,109,416✔
874
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
107,110,435✔
875
  pReader->lastTs = pSubmitTbData->ctimeMs;
107,110,435✔
876

877
  int32_t numOfRows = 0;
107,110,429✔
878
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
107,110,429✔
879
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
1,300✔
880
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
1,300✔
881
    numOfRows = pCol->nVal;
1,300✔
882
  } else {
883
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
107,105,049✔
884
  }
885

886
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfRows);
107,106,016✔
887
  TSDB_CHECK_CODE(code, line, END);
107,092,091✔
888

889
  code = checkSchema(pReader, pSubmitTbData);
107,092,091✔
890
  TSDB_CHECK_CODE(code, line, END);
107,103,942✔
891

892
  // convert and scan one block
893
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
107,103,942✔
894
    SArray* pCols = pSubmitTbData->aCol;
1,300✔
895
    code = processSubmitCol(pCols, pBlock, pCol2SlotId, pSubmitTbData->pBlobSet);
1,300✔
896
    TSDB_CHECK_CODE(code, line, END);
1,300✔
897
  } else {
898
    SArray*         pRows = pSubmitTbData->aRowP;
107,103,322✔
899
    code = processSubmitRow(pRows, pBlock, pCol2SlotId, pReader, pSubmitTbData->pBlobSet);
107,099,636✔
900
    TSDB_CHECK_CODE(code, line, END);
107,004,255✔
901
  }
902
  pBlock->info.rows += numOfRows;
107,005,555✔
903
END:
107,047,003✔
904
  if (code != 0) {
107,047,003✔
905
    tqError("tqRetrieveCols failed, line:%d, msg:%s", line, tstrerror(code));
×
906
  }
907
  return code;
107,046,268✔
908
}
909

910
#define PROCESS_VAL                                      \
911
  if (curRow == 0) {                                     \
912
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
913
    buildNew = true;                                     \
914
  } else {                                               \
915
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
916
    if (currentRowAssigned != assigned[j]) {             \
917
      assigned[j] = currentRowAssigned;                  \
918
      buildNew = true;                                   \
919
    }                                                    \
920
  }
921

922
#define SET_DATA                                                                                    \
923
  if (colVal.cid < pColData->info.colId) {                                                          \
924
    sourceIdx++;                                                                                    \
925
  } else if (colVal.cid == pColData->info.colId) {                                                  \
926
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
927
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
928
    } else {                                                                                        \
929
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
930
    }                                                                                               \
931
    sourceIdx++;                                                                                    \
932
    targetIdx++;                                                                                    \
933
  } else {                                                                                          \
934
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
935
    targetIdx++;                                                                                    \
936
  }
937

938
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
43,529,251✔
939
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
940
  int32_t         code = 0;
43,529,251✔
941
  SSchemaWrapper* pSW = NULL;
43,529,251✔
942
  SSDataBlock*    block = NULL;
43,545,216✔
943
  if (taosArrayGetSize(blocks) > 0) {
43,545,216✔
944
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
945
    TQ_NULL_GO_TO_END(pLastBlock);
×
946
    pLastBlock->info.rows = curRow - *lastRow;
×
947
    *lastRow = curRow;
×
948
  }
949

950
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
43,582,621✔
951
  TQ_NULL_GO_TO_END(block);
43,532,365✔
952

953
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
43,532,365✔
954
  TQ_NULL_GO_TO_END(pSW);
43,579,991✔
955

956
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
43,579,991✔
957
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
43,631,833✔
958
          (int32_t)taosArrayGetSize(block->pDataBlock));
959

960
  block->info.id.uid = pSubmitTbData->uid;
43,631,833✔
961
  block->info.version = pReader->msg.ver;
43,616,837✔
962
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
43,628,522✔
963
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
43,611,269✔
964
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
43,619,695✔
965
  pSW = NULL;
43,619,695✔
966

967
  taosMemoryFreeClear(block);
43,619,695✔
968

969
END:
43,618,257✔
970
  if (code != 0) {
43,604,541✔
971
    tqError("processBuildNew failed, code:%d", code);
×
972
  }
973
  tDeleteSchemaWrapper(pSW);
43,604,541✔
974
  blockDataFreeRes(block);
43,571,993✔
975
  taosMemoryFree(block);
43,577,152✔
976
  return code;
43,571,551✔
977
}
978
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
582,604✔
979
  int32_t code = 0;
582,604✔
980
  int32_t curRow = 0;
582,604✔
981
  int32_t lastRow = 0;
582,604✔
982

983
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
582,604✔
984
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
582,923✔
985
  TQ_NULL_GO_TO_END(assigned);
583,562✔
986

987
  SArray*   pCols = pSubmitTbData->aCol;
583,562✔
988
  SColData* pCol = taosArrayGet(pCols, 0);
582,924✔
989
  TQ_NULL_GO_TO_END(pCol);
583,242✔
990
  int32_t numOfRows = pCol->nVal;
583,242✔
991
  int32_t numOfCols = taosArrayGetSize(pCols);
583,242✔
992
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
582,923✔
993
          numOfRows);
994
  for (int32_t i = 0; i < numOfRows; i++) {
83,822,297✔
995
    bool buildNew = false;
83,002,257✔
996

997
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
328,944,599✔
998
      int32_t k = 0;
242,852,701✔
999
      for (; k < numOfCols; k++) {
483,598,674✔
1000
        pCol = taosArrayGet(pCols, k);
461,503,513✔
1001
        TQ_NULL_GO_TO_END(pCol);
461,282,933✔
1002
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
461,282,933✔
1003
          SColVal colVal = {0};
241,840,510✔
1004
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
243,116,750✔
1005
          PROCESS_VAL
246,267,596✔
1006
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
246,322,717✔
1007
          break;
245,299,040✔
1008
        }
1009
      }
1010
      if (k >= numOfCols) {
245,942,342✔
1011
        // this column is not in the current row, so we set it to NULL
1012
        assigned[j] = 0;
×
1013
        buildNew = true;
×
1014
      }
1015
    }
1016

1017
    if (buildNew) {
78,577,635✔
1018
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
583,562✔
1019
    }
1020

1021
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
78,576,997✔
1022
    TQ_NULL_GO_TO_END(pBlock);
83,202,333✔
1023

1024
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
83,202,333✔
1025
            (int32_t)taosArrayGetSize(blocks));
1026

1027
    int32_t targetIdx = 0;
83,202,333✔
1028
    int32_t sourceIdx = 0;
83,202,333✔
1029
    int32_t colActual = blockDataGetNumOfCols(pBlock);
83,202,333✔
1030
    while (targetIdx < colActual && sourceIdx < numOfCols) {
327,656,485✔
1031
      pCol = taosArrayGet(pCols, sourceIdx);
244,417,111✔
1032
      TQ_NULL_GO_TO_END(pCol);
243,640,531✔
1033
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
243,640,531✔
1034
      TQ_NULL_GO_TO_END(pColData);
240,793,095✔
1035
      SColVal colVal = {0};
240,793,095✔
1036
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
241,081,719✔
1037
      SET_DATA
245,246,043✔
1038
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
247,595,241✔
1039
    }
1040

1041
    curRow++;
83,239,374✔
1042
  }
1043
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
820,040✔
1044
  pLastBlock->info.rows = curRow - lastRow;
583,242✔
1045
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
583,562✔
1046
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1047
END:
25,097,686✔
1048
  if (code != TSDB_CODE_SUCCESS) {
583,562✔
1049
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1050
  }
1051
  taosMemoryFree(assigned);
583,243✔
1052
  return code;
583,562✔
1053
}
1054

1055
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
42,931,062✔
1056
  int32_t   code = 0;
42,931,062✔
1057
  STSchema* pTSchema = NULL;
42,931,062✔
1058

1059
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
42,931,062✔
1060
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
42,976,729✔
1061
  TQ_NULL_GO_TO_END(assigned);
42,992,138✔
1062

1063
  int32_t curRow = 0;
42,992,138✔
1064
  int32_t lastRow = 0;
42,992,138✔
1065
  SArray* pRows = pSubmitTbData->aRowP;
42,970,786✔
1066
  int32_t numOfRows = taosArrayGetSize(pRows);
43,016,623✔
1067
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
43,008,394✔
1068
  TQ_NULL_GO_TO_END(pTSchema);
43,017,136✔
1069
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
43,017,136✔
1070

1071
  for (int32_t i = 0; i < numOfRows; i++) {
1,226,240,284✔
1072
    bool  buildNew = false;
1,183,311,373✔
1073
    SRow* pRow = taosArrayGetP(pRows, i);
1,183,311,373✔
1074
    TQ_NULL_GO_TO_END(pRow);
1,182,570,467✔
1075

1076
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1077
      SColVal colVal = {0};
2,147,483,647✔
1078
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647✔
1079
      PROCESS_VAL
2,147,483,647✔
1080
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647✔
1081
    }
1082

1083
    if (buildNew) {
1,165,668,273✔
1084
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
43,038,668✔
1085
    }
1086

1087
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1,165,622,154✔
1088
    TQ_NULL_GO_TO_END(pBlock);
1,184,441,692✔
1089

1090
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1,184,441,692✔
1091
            (int32_t)taosArrayGetSize(blocks));
1092

1093
    int32_t targetIdx = 0;
1,184,441,692✔
1094
    int32_t sourceIdx = 0;
1,184,441,692✔
1095
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1,184,441,692✔
1096
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647✔
1097
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1098
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647✔
1099
      SColVal          colVal = {0};
2,147,483,647✔
1100
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647✔
1101
      SET_DATA
2,147,483,647✔
1102
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647✔
1103
    }
1104

1105
    curRow++;
1,183,314,499✔
1106
  }
1107
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
42,928,911✔
1108
  if (pLastBlock != NULL) {
43,042,695✔
1109
    pLastBlock->info.rows = curRow - lastRow;
43,042,990✔
1110
  }
1111

1112
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
43,017,792✔
1113
          (int)taosArrayGetSize(blocks));
1114
END:
50,761,174✔
1115
  if (code != TSDB_CODE_SUCCESS) {
43,007,202✔
1116
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1117
  }
1118
  taosMemoryFreeClear(pTSchema);
42,977,779✔
1119
  taosMemoryFree(assigned);
43,012,441✔
1120
  return code;
42,985,958✔
1121
}
1122

1123
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
8,011✔
1124
  int32_t code = 0;
8,011✔
1125
  int32_t lino = 0;
8,011✔
1126
  void*   createReq = NULL;
8,011✔
1127
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
8,011✔
1128
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
8,011✔
1129

1130
  if (pRsp->createTableNum == 0) {
8,011✔
1131
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
4,357✔
1132
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
4,357✔
1133
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
4,357✔
1134
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
4,357✔
1135
  }
1136

1137
  uint32_t len = 0;
8,011✔
1138
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
8,011✔
1139
  TSDB_CHECK_CODE(code, lino, END);
8,011✔
1140
  createReq = taosMemoryCalloc(1, len);
8,011✔
1141
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
8,011✔
1142

1143
  SEncoder encoder = {0};
8,011✔
1144
  tEncoderInit(&encoder, createReq, len);
8,011✔
1145
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
8,011✔
1146
  tEncoderClear(&encoder);
8,011✔
1147
  TSDB_CHECK_CODE(code, lino, END);
8,011✔
1148
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
16,022✔
1149
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
16,022✔
1150
  pRsp->createTableNum++;
8,011✔
1151
  tqTrace("build create table info msg success");
8,011✔
1152

1153
END:
8,011✔
1154
  if (code != 0) {
8,011✔
1155
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1156
    taosMemoryFree(createReq);
×
1157
  }
1158
  return code;
8,011✔
1159
}
1160

1161
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
43,830,419✔
1162
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1163
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
43,830,419✔
1164
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
43,830,419✔
1165
  if (pSubmitTbData == NULL) {
43,835,408✔
1166
    return terrno;
×
1167
  }
1168
  pReader->nextBlk++;
43,835,408✔
1169

1170
  if (pSubmitTbDataRet) {
43,824,849✔
1171
    *pSubmitTbDataRet = pSubmitTbData;
43,832,387✔
1172
  }
1173

1174
  if (fetchMeta == ONLY_META) {
43,825,537✔
1175
    if (pSubmitTbData->pCreateTbReq != NULL) {
6,627✔
1176
      if (pRsp->createTableReq == NULL) {
1,929✔
1177
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
1,146✔
1178
        if (pRsp->createTableReq == NULL) {
1,146✔
1179
          return terrno;
×
1180
        }
1181
      }
1182
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
3,858✔
1183
        return terrno;
×
1184
      }
1185
      pSubmitTbData->pCreateTbReq = NULL;
1,929✔
1186
    }
1187
    return 0;
6,627✔
1188
  }
1189

1190
  int32_t sversion = pSubmitTbData->sver;
43,818,910✔
1191
  int64_t uid = pSubmitTbData->uid;
43,820,900✔
1192
  pReader->lastBlkUid = uid;
43,824,855✔
1193

1194
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
43,826,292✔
1195
  taosMemoryFreeClear(pReader->extSchema);
43,820,501✔
1196
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
43,816,487✔
1197
  if (pReader->pSchemaWrapper == NULL) {
43,807,915✔
1198
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
198,951✔
1199
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1200
    pReader->cachedSchemaSuid = 0;
198,319✔
1201
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
198,951✔
1202
  }
1203

1204
  if (pSubmitTbData->pCreateTbReq != NULL) {
43,576,762✔
1205
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
8,011✔
1206
    if (code != 0) {
8,011✔
1207
      return code;
×
1208
    }
1209
  } else if (rawList != NULL) {
43,602,479✔
1210
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1211
      return terrno;
×
1212
    }
1213
    pReader->pSchemaWrapper = NULL;
×
1214
    return 0;
×
1215
  }
1216

1217
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
43,610,490✔
1218
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
582,924✔
1219
  } else {
1220
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
42,987,007✔
1221
  }
1222
}
1223

1224
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
345,674✔
1225
  if (pReader == NULL || tbUidList == NULL) {
345,674✔
1226
    return TSDB_CODE_SUCCESS;
×
1227
  }
1228
  if (pReader->tbIdHash) {
346,571✔
1229
    taosHashClear(pReader->tbIdHash);
×
1230
  } else {
1231
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
346,226✔
1232
    if (pReader->tbIdHash == NULL) {
346,571✔
1233
      tqError("s-task:%s failed to init hash table", id);
×
1234
      return terrno;
×
1235
    }
1236
  }
1237

1238
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
9,688,465✔
1239
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
9,332,853✔
1240
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
9,332,701✔
1241
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1242
      continue;
×
1243
    }
1244
  }
1245

1246
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
344,912✔
1247
  return TSDB_CODE_SUCCESS;
346,571✔
1248
}
1249

1250
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
397,890✔
1251
  if (pReader == NULL || pTableUidList == NULL) {
397,890✔
1252
    return;
×
1253
  }
1254
  if (pReader->tbIdHash == NULL) {
397,890✔
1255
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1256
    if (pReader->tbIdHash == NULL) {
×
1257
      tqError("failed to init hash table");
×
1258
      return;
×
1259
    }
1260
  }
1261

1262
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
397,890✔
1263
  for (int i = 0; i < numOfTables; i++) {
776,097✔
1264
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
378,207✔
1265
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
378,207✔
1266
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1267
      continue;
×
1268
    }
1269
    tqDebug("%s add table uid:%" PRId64 " to hash", __func__, *pKey);
378,207✔
1270
  }
1271
}
1272

1273
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1274
  if (pReader == NULL) {
×
1275
    return false;
×
1276
  }
1277
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1278
}
1279

1280
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1281
  if (pReader == NULL) {
×
1282
    return false;
×
1283
  }
1284
  return pReader->msg.msgStr == NULL;
×
1285
}
1286

1287
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
2,408✔
1288
  if (pReader == NULL || tbUidList == NULL) {
2,408✔
1289
    return;
×
1290
  }
1291
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
3,428✔
1292
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
1,020✔
1293
    int32_t code = taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
1,020✔
1294
    if (code != 0) {
1,020✔
1295
      tqWarn("%s failed to remove table uid:%" PRId64 " from hash, msg:%s", __func__, pKey != NULL ? *pKey : 0, tstrerror(code));
340✔
1296
    }
1297
  }
1298
}
1299

1300
int32_t tqDeleteTbUidList(STQ* pTq, SArray* tbUidList) {
2,205,321✔
1301
  if (pTq == NULL) {
2,205,321✔
1302
    return 0;  // mounted vnode may have no tq
×
1303
  }
1304
  if (tbUidList == NULL) {
2,205,321✔
1305
    return TSDB_CODE_INVALID_PARA;
×
1306
  }
1307
  void*   pIter = NULL;
2,205,321✔
1308
  int32_t vgId = TD_VID(pTq->pVnode);
2,205,321✔
1309

1310
  // update the table list for each consumer handle
1311
  taosWLockLatch(&pTq->lock);
2,205,321✔
1312
  while (1) {
314,868✔
1313
    pIter = taosHashIterate(pTq->pHandle, pIter);
2,520,189✔
1314
    if (pIter == NULL) {
2,520,189✔
1315
      break;
2,205,321✔
1316
    }
1317

1318
    STqHandle* pTqHandle = (STqHandle*)pIter;
314,868✔
1319
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " delete table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
314,868✔
1320
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
314,868✔
1321
      int32_t code = qDeleteTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
1,388✔
1322
      if (code != 0) {
1,388✔
1323
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1324
        continue;
×
1325
      }
1326
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
313,480✔
1327
      int32_t sz = taosArrayGetSize(tbUidList);
313,480✔
1328
      for (int32_t i = 0; i < sz; i++) {
313,480✔
1329
        int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1330
        if (tbUid &&
×
1331
            taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1332
          tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1333
          continue;
×
1334
        }
1335
      }
1336
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
1337
      tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1338
    }
1339
  }
1340
  taosWUnLockLatch(&pTq->lock);
2,205,321✔
1341
  return 0;
2,205,321✔
1342
}
1343

1344
static int32_t addTableListForStableTmq(STqHandle* pTqHandle, STQ* pTq, SArray* tbUidList) {
12,531✔
1345
  int     ret = qFilterTableList(pTq->pVnode, tbUidList, pTqHandle->execHandle.execTb.node,
12,531✔
1346
                      pTqHandle->execHandle.task, pTqHandle->execHandle.execTb.suid);
12,531✔
1347
  if (ret != TDB_CODE_SUCCESS) {
12,531✔
1348
    tqError("tqAddTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1349
            pTqHandle->consumerId);
1350
    return ret;
×
1351
  }
1352
  tqDebug("%s handle %s consumer:0x%" PRIx64 " add %d tables to tqReader", __func__, pTqHandle->subKey,
12,531✔
1353
          pTqHandle->consumerId, (int32_t)taosArrayGetSize(tbUidList));
1354
  tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
12,531✔
1355
  return 0;
12,531✔
1356
}
1357

1358
int32_t tqAddTbUidList(STQ* pTq, SArray* tbUidList) {
53,244,988✔
1359
  if (pTq == NULL) {
53,244,988✔
1360
    return 0;  // mounted vnode may have no tq
×
1361
  }
1362
  if (tbUidList == NULL) {
53,244,988✔
1363
    return TSDB_CODE_INVALID_PARA;
×
1364
  }
1365
  void*   pIter = NULL;
53,244,988✔
1366
  int32_t vgId = TD_VID(pTq->pVnode);
53,244,988✔
1367
  int32_t code = 0;
53,245,448✔
1368

1369
  // update the table list for each consumer handle
1370
  taosWLockLatch(&pTq->lock);
53,245,448✔
1371
  while (1) {
1,020,880✔
1372
    pIter = taosHashIterate(pTq->pHandle, pIter);
54,266,328✔
1373
    if (pIter == NULL) {
54,266,326✔
1374
      break;
53,245,446✔
1375
    }
1376

1377
    STqHandle* pTqHandle = (STqHandle*)pIter;
1,020,880✔
1378
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " add table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
1,020,880✔
1379
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,020,880✔
1380
      code = qAddTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
385,359✔
1381
      if (code != 0) {
385,359✔
1382
        tqError("add table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1383
        break;
×
1384
      }
1385
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
635,521✔
1386
      code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
11,511✔
1387
      if (code != 0) {
11,511✔
1388
        tqError("add table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1389
        break;
×
1390
      }
1391
    }
1392
  }
1393
  taosHashCancelIterate(pTq->pHandle, pIter);
53,245,446✔
1394
  taosWUnLockLatch(&pTq->lock);
53,245,447✔
1395

1396
  return code;
53,245,448✔
1397
}
1398

1399
int32_t tqUpdateTbUidList(STQ* pTq, SArray* tbUidList, SArray* cidList) {
7,431,179✔
1400
  if (pTq == NULL) {
7,431,179✔
1401
    return 0;  // mounted vnode may have no tq
×
1402
  }
1403
  if (tbUidList == NULL) {
7,431,179✔
1404
    return TSDB_CODE_INVALID_PARA;
×
1405
  }
1406
  void*   pIter = NULL;
7,431,179✔
1407
  int32_t vgId = TD_VID(pTq->pVnode);
7,431,179✔
1408
  int32_t code = 0;
7,431,179✔
1409
  // update the table list for each consumer handle
1410
  taosWLockLatch(&pTq->lock);
7,431,179✔
1411
  while (1) {
2,685✔
1412
    pIter = taosHashIterate(pTq->pHandle, pIter);
7,433,864✔
1413
    if (pIter == NULL) {
7,433,864✔
1414
      break;
7,431,179✔
1415
    }
1416

1417
    STqHandle* pTqHandle = (STqHandle*)pIter;
2,685✔
1418
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
2,685✔
1419
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,685✔
1420
      SNode* pTagCond = getTagCondNodeForQueryTmq(pTqHandle->execHandle.task);
645✔
1421
      bool ret = checkCidInTagCondition(pTagCond, cidList);
645✔
1422
      if (ret){
645✔
1423
        code = qUpdateTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
1424
        if (code != 0) {
×
1425
          tqError("update table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1426
          break;
×
1427
        }
1428
      }
1429
      qUpdateTableTagCacheForTmq(pTqHandle->execHandle.task, tbUidList, cidList);
645✔
1430
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
2,040✔
1431
      SNode* pTagCond = getTagCondNodeForStableTmq(pTqHandle->execHandle.execTb.node);
2,040✔
1432
      bool ret = checkCidInTagCondition(pTagCond, cidList);
2,040✔
1433
      if (ret){
2,040✔
1434
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
1,020✔
1435
        code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
1,020✔
1436
        if (code != 0) {
1,020✔
1437
          tqError("update table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1438
          break;
×
1439
        }
1440
      }
1441
    }
1442
  }
1443

1444
  taosHashCancelIterate(pTq->pHandle, pIter);
7,431,179✔
1445
  taosWUnLockLatch(&pTq->lock);
7,431,179✔
1446

1447
  return code;
7,431,179✔
1448
}
1449

1450
static void destroySourceScanTables(void* ptr) {
×
1451
  SArray** pTables = ptr;
×
1452
  if (pTables && *pTables) {
×
1453
    taosArrayDestroy(*pTables);
×
1454
    *pTables = NULL;
×
1455
  }
1456
}
×
1457

1458
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1459
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1460
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1461
  if (pCol1->vColId == pCol2->vColId) {
×
1462
    return 0;
×
1463
  } else if (pCol1->vColId < pCol2->vColId) {
×
1464
    return -1;
×
1465
  } else {
1466
    return 1;
×
1467
  }
1468
}
1469

1470
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1471
  if (value) {
×
1472
    SSchemaWrapper* pSchemaWrapper = value;
×
1473
    tDeleteSchemaWrapper(pSchemaWrapper);
1474
  }
1475
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc