• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4969

27 Feb 2026 07:19AM UTC coverage: 67.69% (+0.8%) from 66.902%
#4969

push

travis-ci

web-flow
merge: from main to 3.0 #34603

15 of 58 new or added lines in 2 files covered. (25.86%)

5075 existing lines in 154 files now uncovered.

208337 of 307781 relevant lines covered (67.69%)

129686642.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.95
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "taoserror.h"
17
#include "tarray.h"
18
#include "tdef.h"
19
#include "thash.h"
20
#include "tmsg.h"
21
#include "tpriv.h"
22
#include "tq.h"
23

24
static int32_t tqRetrieveCols(STqReader *pReader, SSDataBlock *pRes, SHashObj* pCol2SlotId);
25

26
static void processCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
1,271✔
27
  int32_t code = 0;
1,271✔
28
  int32_t lino = 0;
1,271✔
29
  int32_t        needRebuild = 0;
1,271✔
30
  SVCreateTbReq* pCreateReq = NULL;
1,271✔
31
  SVCreateTbBatchReq reqNew = {0};
1,271✔
32
  void* buf = NULL;
1,271✔
33
  SVCreateTbBatchReq req = {0};
1,271✔
34
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
1,271✔
35
  if (code < 0) {
1,271✔
UNCOV
36
    lino = __LINE__;
×
UNCOV
37
    goto end;
×
38
  }
39

40
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2,859✔
41
    pCreateReq = req.pReqs + iReq;
1,588✔
42
    if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
2,858✔
43
        taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
1,270✔
44
      needRebuild++;
635✔
45
    }
46
  }
47
  if (needRebuild == 0) {
1,271✔
48
    // do nothing
49
  } else if (needRebuild == req.nReqs) {
635✔
50
    *realTbSuid = tbSuid;
318✔
51
  } else {
52
    *realTbSuid = tbSuid;
317✔
53
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
317✔
54
    if (reqNew.pArray == NULL) {
317✔
UNCOV
55
      code = terrno;
×
UNCOV
56
      lino = __LINE__;
×
UNCOV
57
      goto end;
×
58
    }
59
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
951✔
60
      pCreateReq = req.pReqs + iReq;
634✔
61
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid &&
1,268✔
62
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
634✔
63
        reqNew.nReqs++;
317✔
64
        if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
634✔
UNCOV
65
          code = terrno;
×
UNCOV
66
          lino = __LINE__;
×
UNCOV
67
          goto end;
×
68
        }
69
      }
70
    }
71

72
    int     tlen = 0;
317✔
73
    tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
317✔
74
    buf = taosMemoryMalloc(tlen);
317✔
75
    if (NULL == buf || code < 0) {
317✔
UNCOV
76
      lino = __LINE__;
×
UNCOV
77
      goto end;
×
78
    }
79
    SEncoder coderNew = {0};
317✔
80
    tEncoderInit(&coderNew, buf, tlen);
317✔
81
    code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
317✔
82
    tEncoderClear(&coderNew);
317✔
83
    if (code < 0) {
317✔
UNCOV
84
      lino = __LINE__;
×
UNCOV
85
      goto end;
×
86
    }
87
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
317✔
88
    pHead->bodyLen = tlen + sizeof(SMsgHead);
317✔
89
  }
90

91
end:
1,271✔
92
  taosMemoryFree(buf);
1,271✔
93
  taosArrayDestroy(reqNew.pArray);
1,271✔
94
  tDeleteSVCreateTbBatchReq(&req);
1,271✔
95
  if (code < 0) {
1,271✔
UNCOV
96
    tqError("processCreateTbMsg failed, code:%d, line:%d", code, lino);
×
97
  }
98
}
1,271✔
99

100
static void processAlterTbMsg(SDecoder* dcoder, STqReader* pReader, int64_t* realTbSuid) {
954✔
101
  SVAlterTbReq req = {0};
954✔
102
  SMetaReader mr = {0};
954✔
103
  int32_t lino = 0;
954✔
104
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
954✔
105
  if (code < 0) {
954✔
UNCOV
106
    lino = __LINE__;
×
UNCOV
107
    goto end;
×
108
  }
109

110
  metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
954✔
111

112
  code = metaGetTableEntryByName(&mr, req.tbName);
954✔
113
  if (code < 0) {
954✔
UNCOV
114
    lino = __LINE__;
×
UNCOV
115
    goto end;
×
116
  }
117
  if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
954✔
118
    *realTbSuid = mr.me.ctbEntry.suid;
636✔
119
  }
120

121
end:
954✔
122
  taosArrayDestroy(req.pMultiTag);
954✔
123
  metaReaderClear(&mr);  
954✔
124
  if (code < 0) {
954✔
125
    tqError("processAlterTbMsg failed, code:%d, line:%d", code, lino);
×
126
  }
127
} 
954✔
128

129
static void processDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
×
130
  SVDropTbBatchReq req = {0};
×
131
  SVDropTbBatchReq reqNew = {0};
×
132
  void* buf = NULL;
×
UNCOV
133
  int32_t lino = 0;
×
UNCOV
134
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
×
135
  if (code < 0) {
×
136
    lino = __LINE__;
×
137
    goto end;
×
138
  }
139

140
  int32_t      needRebuild = 0;
×
141
  SVDropTbReq* pDropReq = NULL;
×
142
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
UNCOV
143
    pDropReq = req.pReqs + iReq;
×
144

145
    if (pDropReq->suid == tbSuid &&
×
UNCOV
146
        taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
147
      needRebuild++;
×
148
    }
149
  }
150
  if (needRebuild == 0) {
×
151
    // do nothing
152
  } else if (needRebuild == req.nReqs) {
×
153
    *realTbSuid = tbSuid;
×
154
  } else {
155
    *realTbSuid = tbSuid;
×
UNCOV
156
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
157
    if (reqNew.pArray == NULL) {
×
158
      code = terrno;
×
159
      lino = __LINE__;
×
160
      goto end;
×
161
    }
162
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
163
      pDropReq = req.pReqs + iReq;
×
164
      if (pDropReq->suid == tbSuid &&
×
165
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
UNCOV
166
        reqNew.nReqs++;
×
UNCOV
167
        if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
UNCOV
168
          code = terrno;
×
UNCOV
169
          lino = __LINE__;
×
170
          goto end;
×
171
        }
172
      }
173
    }
174

175
    int     tlen = 0;
×
UNCOV
176
    tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
×
177
    buf = taosMemoryMalloc(tlen);
×
178
    if (NULL == buf || code < 0) {
×
179
      lino = __LINE__;
×
180
      goto end;
×
181
    }
182
    SEncoder coderNew = {0};
×
183
    tEncoderInit(&coderNew, buf, tlen);
×
UNCOV
184
    code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
185
    tEncoderClear(&coderNew);
×
186
    if (code != 0) {
×
UNCOV
187
      lino = __LINE__;
×
UNCOV
188
      goto end;
×
189
    }
190
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
191
    pHead->bodyLen = tlen + sizeof(SMsgHead);
×
192
  }
193

UNCOV
194
end:
×
195
  taosMemoryFree(buf);
×
UNCOV
196
  taosArrayDestroy(reqNew.pArray);
×
UNCOV
197
  if (code < 0) {
×
UNCOV
198
    tqError("processDropTbMsg failed, code:%d, line:%d", code, lino);
×
199
  }
UNCOV
200
}
×
201

202
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
147,916✔
203
  int32_t code = 0;
147,916✔
204
  int32_t lino = 0;
147,916✔
205
  if (pHandle == NULL || pHead == NULL) {
147,916✔
UNCOV
206
    return false;
×
207
  }
208
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
147,916✔
209
    return true;
145,056✔
210
  }
211

212
  STqExecHandle* pExec = &pHandle->execHandle;
2,860✔
213
  STqReader* pReader = pExec->pTqReader;
2,860✔
214

215
  int16_t msgType = pHead->msgType;
2,860✔
216
  char*   body = pHead->body;
2,860✔
217
  int32_t bodyLen = pHead->bodyLen;
2,860✔
218

219
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
2,860✔
220
  int64_t  realTbSuid = 0;
2,860✔
221
  SDecoder dcoder = {0};
2,860✔
222
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
2,860✔
223
  int32_t  len = bodyLen - sizeof(SMsgHead);
2,860✔
224
  tDecoderInit(&dcoder, data, len);
2,860✔
225

226
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
3,495✔
227
    SVCreateStbReq req = {0};
635✔
228
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
635✔
229
      goto end;
×
230
    }
231
    realTbSuid = req.suid;
635✔
232
  } else if (msgType == TDMT_VND_DROP_STB) {
2,225✔
UNCOV
233
    SVDropStbReq req = {0};
×
UNCOV
234
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
UNCOV
235
      goto end;
×
236
    }
237
    realTbSuid = req.suid;
×
238
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2,225✔
239
    processCreateTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
1,271✔
240
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
954✔
241
    processAlterTbMsg(&dcoder, pReader, &realTbSuid);
954✔
242
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
UNCOV
243
    processDropTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
×
244
  } else if (msgType == TDMT_VND_DELETE) {
×
UNCOV
245
    SDeleteRes req = {0};
×
UNCOV
246
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
UNCOV
247
      goto end;
×
248
    }
UNCOV
249
    realTbSuid = req.suid;
×
250
  }
251

252
end:
2,860✔
253
  tDecoderClear(&dcoder);
2,860✔
254
  bool tmp = tbSuid == realTbSuid;
2,860✔
255
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
2,860✔
256
  return tmp;
2,860✔
257
}
258

259
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
50,129,909✔
260
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
50,129,909✔
UNCOV
261
    return -1;
×
262
  }
263
  int32_t code = -1;
50,201,095✔
264
  int32_t vgId = TD_VID(pTq->pVnode);
50,201,095✔
265
  int64_t id = pHandle->pWalReader->readerId;
50,206,511✔
266

267
  int64_t offset = *fetchOffset;
50,206,569✔
268
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
50,216,405✔
269
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
50,214,980✔
270
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
50,214,264✔
271

272
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
50,211,729✔
273
          ", 0x%" PRIx64,
274
          vgId, offset, lastVer, committedVer, appliedVer, id);
275

276
  while (offset <= appliedVer) {
53,089,147✔
277
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
47,788,020✔
UNCOV
278
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
279
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
280
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
UNCOV
281
      goto END;
×
282
    }
283

284
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
47,787,675✔
285
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
286

287
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
47,787,422✔
288
      code = walFetchBody(pHandle->pWalReader);
44,774,465✔
289
      goto END;
44,774,465✔
290
    } else {
291
      if (pHandle->fetchMeta != WITH_DATA) {
3,014,478✔
292
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
183,713✔
293
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
183,713✔
294
          code = walFetchBody(pHandle->pWalReader);
147,916✔
295
          if (code < 0) {
147,916✔
UNCOV
296
            goto END;
×
297
          }
298

299
          pHead = &(pHandle->pWalReader->pHead->head);
147,916✔
300
          if (isValValidForTable(pHandle, pHead)) {
147,916✔
301
            code = 0;
146,962✔
302
            goto END;
146,962✔
303
          } else {
304
            offset++;
954✔
305
            code = -1;
954✔
306
            continue;
954✔
307
          }
308
        }
309
      }
310
      code = walSkipFetchBody(pHandle->pWalReader);
2,866,562✔
311
      if (code < 0) {
2,866,562✔
UNCOV
312
        goto END;
×
313
      }
314
      offset++;
2,866,562✔
315
    }
316
    code = -1;
2,866,562✔
317
  }
318

319
END:
5,301,127✔
320
  *fetchOffset = offset;
50,222,554✔
321
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
50,221,631✔
322
          ", applied:%" PRId64 ", 0x%" PRIx64,
323
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
324
  return code;
50,221,631✔
325
}
326

327
bool tqGetTablePrimaryKey(STqReader* pReader) {
6,004,234✔
328
  if (pReader == NULL) {
6,004,234✔
UNCOV
329
    return false;
×
330
  }
331
  return pReader->hasPrimaryKey;
6,004,234✔
332
}
333

334
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
47,772✔
335
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
47,772✔
336

337
  if (pReader == NULL) {
47,772✔
UNCOV
338
    return;
×
339
  }
340
  bool            ret = false;
47,772✔
341
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnode->pMeta, uid, -1, 1, NULL, 0);
47,772✔
342
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
47,772✔
343
    ret = true;
907✔
344
  }
345
  tDeleteSchemaWrapper(schema);
346
  pReader->hasPrimaryKey = ret;
47,469✔
347
}
348

349
static void freeTagCache(void* pData){
1,626,251✔
350
  if (pData == NULL) return;
1,626,251✔
351
  SArray* tagCache = *(SArray**)pData;
1,626,251✔
352
  taosArrayDestroyP(tagCache, taosMemFree);
1,626,251✔
353
}
354

355
STqReader* tqReaderOpen(SVnode* pVnode) {
437,988✔
356
  tqDebug("%s:%p", __FUNCTION__, pVnode);
437,988✔
357
  if (pVnode == NULL) {
439,907✔
UNCOV
358
    return NULL;
×
359
  }
360
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
439,907✔
361
  if (pReader == NULL) {
439,907✔
UNCOV
362
    return NULL;
×
363
  }
364

365
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
439,907✔
366
  if (pReader->pWalReader == NULL) {
439,907✔
UNCOV
367
    taosMemoryFree(pReader);
×
UNCOV
368
    return NULL;
×
369
  }
370

371
  pReader->pVnode = pVnode;
439,907✔
372
  pReader->pSchemaWrapper = NULL;
439,907✔
373
  pReader->tbIdHash = NULL;
439,907✔
374
  pReader->pTableTagCacheForTmq = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
439,213✔
375
  if (pReader->pTableTagCacheForTmq == NULL) {
439,590✔
UNCOV
376
    walCloseReader(pReader->pWalReader);
×
UNCOV
377
    taosMemoryFree(pReader);
×
UNCOV
378
    return NULL;
×
379
  }
380
  taosHashSetFreeFp(pReader->pTableTagCacheForTmq, freeTagCache);
439,590✔
381
  taosInitRWLatch(&pReader->tagCachelock);
439,907✔
382

383
  return pReader;
439,907✔
384
}
385

386
void tqReaderClose(STqReader* pReader) {
444,001✔
387
  tqDebug("%s:%p", __FUNCTION__, pReader);
444,001✔
388
  if (pReader == NULL) return;
444,318✔
389

390
  // close wal reader
391
  walCloseReader(pReader->pWalReader);
439,907✔
392
  taosHashCleanup(pReader->pTableTagCacheForTmq);
439,907✔
393
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
439,907✔
394
  taosMemoryFree(pReader->pTSchema);
439,907✔
395
  taosMemoryFree(pReader->extSchema);
439,635✔
396

397
  // free hash
398
  taosHashCleanup(pReader->tbIdHash);
439,907✔
399
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
439,635✔
400

401
  taosMemoryFree(pReader);
439,907✔
402
}
403

404
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
984,954✔
405
  if (pReader == NULL) {
984,954✔
UNCOV
406
    return TSDB_CODE_INVALID_PARA;
×
407
  }
408
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
984,954✔
409
    return terrno;
22,364✔
410
  }
411
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
962,247✔
412
  return 0;
962,876✔
413
}
414

415
static int32_t getTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid) {
112,493,201✔
416
  int32_t code = 0;
112,493,201✔
417
  int32_t lino = 0;
112,493,201✔
418

419
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
112,493,201✔
420
  if (data == NULL) {
112,571,424✔
421
    SStorageAPI api = {0}; 
37,135,800✔
422
    initStorageAPI(&api);
37,134,730✔
423
    code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, 0, &pReader->tagCachelock);
37,134,942✔
424
    TSDB_CHECK_CODE(code, lino, END);
37,135,877✔
425
  }
426

427
  END:
75,435,624✔
428
  if (code != TSDB_CODE_SUCCESS) {
112,569,449✔
UNCOV
429
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
430
  }
431
  
432
  return code;
112,546,199✔
433
}
434

435
void tqUpdateTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid, col_id_t colId) {
458,611✔
436
  int32_t code = 0;
458,611✔
437
  int32_t lino = 0;
458,611✔
438

439
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
458,611✔
440
  if (data == NULL) {
458,611✔
441
    return;
457,965✔
442
  }
443

444
  SStorageAPI api = {0}; 
646✔
445
  initStorageAPI(&api);
646✔
446
  code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, colId, &pReader->tagCachelock);
646✔
447
  TSDB_CHECK_CODE(code, lino, END);
646✔
448

449
  END:
646✔
450
  if (code != TSDB_CODE_SUCCESS) {
646✔
UNCOV
451
    tqError("%s failed at %d, failed to update tag cache code:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
452
  }
453
}
454

455
static int32_t tqRetrievePseudoCols(STqReader* pReader, SSDataBlock* pBlock, int32_t numOfRows, int64_t uid, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr) {
112,510,447✔
456
  if (pReader == NULL || pBlock == NULL) {
112,510,447✔
UNCOV
457
    return TSDB_CODE_INVALID_PARA;
×
458
  }
459
  int32_t code = TSDB_CODE_SUCCESS;
112,568,336✔
460
  int32_t lino = 0;
112,568,336✔
461
  
462
  code = getTableTagCache(pReader, pPseudoExpr, numOfPseudoExpr, uid);
112,568,336✔
463
  TSDB_CHECK_CODE(code, lino, END);
112,552,038✔
464

465
  code = fillTag(pReader->pTableTagCacheForTmq, pPseudoExpr, numOfPseudoExpr, uid, pBlock, numOfRows, pBlock->info.rows - numOfRows, 1, &pReader->tagCachelock);
112,552,038✔
466
  TSDB_CHECK_CODE(code, lino, END);
112,581,323✔
467

468
END:
112,581,323✔
469
  if (code != 0) {
112,581,323✔
UNCOV
470
    tqError("tqRetrievePseudoCols failed, line:%d, msg:%s", lino, tstrerror(code));
×
471
  }
472
  return code;
112,573,944✔
473
}
474

475
int32_t tqNextBlockInWal(STqReader* pReader, SSDataBlock* pRes, SHashObj* pCol2SlotId, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
23,793,356✔
476
                         int sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
477
  int32_t code = 0;
23,793,356✔
478
  if (pReader == NULL) {
23,793,356✔
UNCOV
479
    return TSDB_CODE_INVALID_PARA;
×
480
  }
481
  SWalReader* pWalReader = pReader->pWalReader;
23,793,356✔
482

483
  int64_t st = taosGetTimestampMs();
23,793,021✔
484
  while (1) {
136,372,806✔
485
    code = walNextValidMsg(pWalReader, false);
160,165,827✔
486
    if (code != 0) {
160,282,316✔
487
      break;
22,973,864✔
488
    }
489

490
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
137,308,452✔
491
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
137,349,990✔
492
    int64_t ver = pWalReader->pHead->head.version;
137,372,463✔
493
    SDecoder decoder = {0};
137,357,942✔
494
    code = tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder);
137,361,132✔
495
    tDecoderClear(&decoder);
137,335,799✔
496
    if (code != 0) {
137,352,979✔
UNCOV
497
      return code;
×
498
    }
499
    pReader->nextBlk = 0;
137,352,979✔
500

501
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
137,362,934✔
502
    while (pReader->nextBlk < numOfBlocks) {
274,708,685✔
503
      tqDebug("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
137,347,907✔
504
              pReader->msg.ver);
505

506
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
137,378,099✔
507
      if (pSubmitTbData == NULL) {
137,406,770✔
UNCOV
508
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
509
                pReader->msg.ver);
UNCOV
510
        return terrno;
×
511
      }
512
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
137,406,770✔
513
        pReader->nextBlk += 1;
27,666✔
514
        continue;
27,666✔
515
      }
516
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
137,375,975✔
517
        tqDebug("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
112,608,503✔
518
        int32_t numOfRows = pRes->info.rows;
112,600,229✔
519
        code = tqRetrieveCols(pReader, pRes, pCol2SlotId);
112,607,089✔
520
        if (code != TSDB_CODE_SUCCESS) {
112,489,435✔
UNCOV
521
          return code;
×
522
        }
523
        code = tqRetrievePseudoCols(pReader, pRes, numOfRows, pSubmitTbData->uid, pPseudoExpr, numOfPseudoExpr);
112,489,435✔
524
        if (code != TSDB_CODE_SUCCESS) {
112,569,596✔
UNCOV
525
          return code;
×
526
        }
527

528
      }
529
      pReader->nextBlk += 1;
137,318,118✔
530
    }
531

532
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
137,376,075✔
533
    pReader->msg.msgStr = NULL;
137,342,945✔
534

535
    if (pRes->info.rows >= minPollRows || (enableReplay && pRes->info.rows > 0)){
137,322,423✔
536
      break;
537
    }
538
    int64_t elapsed = taosGetTimestampMs() - st;
136,765,589✔
539
    if (elapsed > timeout || elapsed < 0) {
136,765,589✔
540
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
232,903✔
541
      terrno = code;
232,903✔
542
      break;
246,617✔
543
    }
544
  }
545
  return code;
23,793,039✔
546
}
547

548
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
182,048,685✔
549
  if (pReader == NULL) {
182,048,685✔
UNCOV
550
    return TSDB_CODE_INVALID_PARA;
×
551
  }
552
  pReader->msg.msgStr = msgStr;
182,048,685✔
553
  pReader->msg.msgLen = msgLen;
182,100,802✔
554
  pReader->msg.ver = ver;
182,098,502✔
555

556
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
182,136,301✔
557

558
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
182,136,301✔
559
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
182,092,625✔
560

561
  if (code != 0) {
182,110,106✔
562
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
563
  }
564

565
  return code;
182,114,827✔
566
}
567

568
void tqReaderClearSubmitMsg(STqReader* pReader) {
89,476,876✔
569
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
89,476,876✔
570
  pReader->nextBlk = 0;
89,481,638✔
571
  pReader->msg.msgStr = NULL;
89,484,718✔
572
}
89,488,109✔
573

574
SWalReader* tqGetWalReader(STqReader* pReader) {
48,408,179✔
575
  if (pReader == NULL) {
48,408,179✔
576
    return NULL;
×
577
  }
578
  return pReader->pWalReader;
48,408,179✔
579
}
580

581
int64_t tqGetResultBlockTime(STqReader* pReader) {
23,793,374✔
582
  if (pReader == NULL) {
23,793,374✔
UNCOV
583
    return 0;
×
584
  }
585
  return pReader->lastTs;
23,793,374✔
586
}
587

588
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
22,966,893✔
589
  int32_t code = false;
22,966,893✔
590
  int32_t lino = 0;
22,966,893✔
591
  int64_t uid = 0;
22,966,893✔
592
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
22,966,893✔
593
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
22,966,893✔
594
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
22,968,219✔
595

596
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
22,967,594✔
597
  while (pReader->nextBlk < blockSz) {
24,122,066✔
598
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
12,063,521✔
599
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
12,063,231✔
600
    uid = pSubmitTbData->uid;
12,063,231✔
601
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
12,062,582✔
602
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
12,064,184✔
603

604
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
1,154,807✔
605
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
606
    pReader->nextBlk++;
1,154,807✔
607
  }
608

609
  tqReaderClearSubmitMsg(pReader);
12,059,781✔
610
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
12,059,788✔
611

612
END:
12,059,788✔
613
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
22,969,165✔
614
  return code;
22,966,544✔
615
}
616

617
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
65,391,302✔
618
  int32_t code = false;
65,391,302✔
619
  int32_t lino = 0;
65,391,302✔
620
  int64_t uid = 0;
65,391,302✔
621

622
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
65,391,302✔
623
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
65,391,302✔
624
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
65,396,573✔
625

626
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
65,396,573✔
627
  while (pReader->nextBlk < blockSz) {
65,425,227✔
628
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
32,725,621✔
629
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
32,723,959✔
630
    uid = pSubmitTbData->uid;
32,723,959✔
631
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
32,729,306✔
632
    TSDB_CHECK_NULL(ret, code, lino, END, true);
32,727,061✔
UNCOV
633
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
UNCOV
634
    pReader->nextBlk++;
×
635
  }
636
  tqReaderClearSubmitMsg(pReader);
32,707,950✔
637
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
32,681,097✔
638

639
END:
32,681,097✔
640
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
65,408,158✔
641
  return code;
65,356,804✔
642
}
643

644
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
43,365,027✔
645
                    SExtSchema* extSrc) {
646
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
43,365,027✔
UNCOV
647
    return TSDB_CODE_INVALID_PARA;
×
648
  }
649
  int32_t code = 0;
43,410,890✔
650

651
  int32_t cnt = 0;
43,410,890✔
652
  for (int32_t i = 0; i < pSrc->nCols; i++) {
242,926,200✔
653
    cnt += mask[i];
199,455,178✔
654
  }
655

656
  pDst->nCols = cnt;
43,474,712✔
657
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
43,426,056✔
658
  if (pDst->pSchema == NULL) {
43,399,439✔
UNCOV
659
    return TAOS_GET_TERRNO(terrno);
×
660
  }
661

662
  int32_t j = 0;
43,391,184✔
663
  for (int32_t i = 0; i < pSrc->nCols; i++) {
242,983,917✔
664
    if (mask[i]) {
199,513,494✔
665
      pDst->pSchema[j++] = pSrc->pSchema[i];
199,557,523✔
666
      SColumnInfoData colInfo =
199,552,532✔
667
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
199,563,607✔
668
      if (extSrc != NULL) {
199,539,583✔
669
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
56,898✔
670
      }
671
      code = blockDataAppendColInfo(pBlock, &colInfo);
199,539,583✔
672
      if (code != 0) {
199,578,558✔
UNCOV
673
        return code;
×
674
      }
675
    }
676
  }
677
  return 0;
43,435,525✔
678
}
679

UNCOV
680
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
681
  int32_t code = 0;
×
682
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
683
    return TSDB_CODE_INVALID_PARA;
×
684
  }
685
  // TODO(yhDeng)
UNCOV
686
  if (COL_VAL_IS_VALUE(pColVal)) {
×
687
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
688
    if (val == NULL) {
×
689
      return terrno;
×
690
    }
691

692
    uint64_t seq = 0;
×
693
    int32_t  len = 0;
×
UNCOV
694
    if (pColVal->value.pData != NULL) {
×
UNCOV
695
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
696
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
697
      }
698
      SBlobItem item = {0};
×
UNCOV
699
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
UNCOV
700
      if (code != 0) {
×
701
        taosMemoryFree(val);
×
702
        terrno = code;
×
UNCOV
703
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
704
        return code;
×
705
      }
706

UNCOV
707
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
708
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
UNCOV
709
      len = item.len;
×
710
    }
711

UNCOV
712
    blobDataSetLen(val, len);
×
UNCOV
713
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
714

UNCOV
715
    taosMemoryFree(val);
×
716
  } else {
UNCOV
717
    colDataSetNULL(pColumnInfoData, idx);
×
718
  }
UNCOV
719
  return code;
×
720
}
721
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
722
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
723

724
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
725
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
726
      char val[65535 + 2] = {0};
2,147,483,647✔
727
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
728
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647✔
729
      }
730
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
731
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
732
    } else {
UNCOV
733
      colDataSetNULL(pColumnInfoData, rowIndex);
×
734
    }
735
  } else {
736
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
737
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
738
  }
739

740
  return code;
2,147,483,647✔
741
}
742

743
static int32_t setBlockData(SSDataBlock* pBlock, int32_t slotId, int32_t rowIndex, SColVal* colVal, SBlobSet* pBlobSet) {
2,147,483,647✔
744
  int32_t        code = 0;
2,147,483,647✔
745
  SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
746
  if (pColData == NULL) {
2,147,483,647✔
UNCOV
747
    return terrno;
×
748
  }
749

750
  uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
751
  if (isBlob == 0) {
2,147,483,647✔
752
    code = doSetVal(pColData, rowIndex, colVal);
2,147,483,647✔
753
  } else {
UNCOV
754
    code = doSetBlobVal(pColData, rowIndex, colVal, pBlobSet);
×
755
  }
756
  return code;
2,147,483,647✔
757
}
758

759
static int32_t processSubmitRow(SArray*         pRows,
112,580,765✔
760
                                SSDataBlock*    pBlock,
761
                                SHashObj*       pCol2SlotId,
762
                                STqReader*      pReader,
763
                                SBlobSet*       pBlobSet) {
764
  int32_t        code = 0;
112,580,765✔
765
  int32_t        line = 0;
112,580,765✔
766

767
  SArray* pColArray = taosArrayInit(4, INT_BYTES * 2);
112,580,765✔
768
  TSDB_CHECK_NULL(pColArray, code, line, END, terrno);
112,600,313✔
769

770
  int32_t sourceIdx = -1;
112,600,313✔
771
  int32_t rowIndex = 0;
112,600,313✔
772
  SRow* pRow = taosArrayGetP(pRows, rowIndex);
112,600,313✔
773
  TSDB_CHECK_NULL(pRow, code, line, END, terrno);
112,597,569✔
774
  while (++sourceIdx < pReader->pTSchema->numOfCols) {
946,136,106✔
775
    SColVal colVal = {0};
833,857,399✔
776
    code = tRowGet(pRow, pReader->pTSchema, sourceIdx, &colVal);
833,673,171✔
777
    TSDB_CHECK_CODE(code, line, END);
833,359,729✔
778
    void* pSlotId = taosHashGet(pCol2SlotId, &colVal.cid, sizeof(colVal.cid));
833,359,729✔
779
    if (pSlotId == NULL) {
834,639,819✔
780
      continue;
305,106,253✔
781
    }
782
    int32_t pData[2] = {sourceIdx, *(int16_t*)pSlotId};
529,533,566✔
783
    TSDB_CHECK_NULL(taosArrayPush(pColArray, pData), code, line, END, terrno);
529,374,494✔
784
    code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
529,374,494✔
785
    TSDB_CHECK_CODE(code, line, END);
528,693,713✔
786
  }
787
  
788
  for (rowIndex = 1; rowIndex < taosArrayGetSize(pRows); rowIndex++) {
2,147,483,647✔
789
    SRow* pRow = taosArrayGetP(pRows, rowIndex);
2,147,483,647✔
790
    TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
791
    for (int32_t j = 0; j < taosArrayGetSize(pColArray); j++) {
2,147,483,647✔
792
      int32_t* pData = taosArrayGet(pColArray, j);
2,147,483,647✔
793
      TSDB_CHECK_NULL(pData, code, line, END, terrno);
2,147,483,647✔
794

795
      SColVal colVal = {0};
2,147,483,647✔
796
      code = tRowGet(pRow, pReader->pTSchema, pData[0], &colVal);
2,147,483,647✔
797
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
798

799
      code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
2,147,483,647✔
800
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
801
    }
802
  }
803

804
  END:
111,437,030✔
805
  taosArrayDestroy(pColArray);
110,305,082✔
806
  return code;
112,531,697✔
807
}
808

809
static int32_t processSubmitCol(SArray*         pCols,
1,296✔
810
                                SSDataBlock*    pBlock,
811
                                SHashObj*       pCol2SlotId,
812
                                SBlobSet*       pBlobSet) {
813
  int32_t        code = 0;
1,296✔
814
  int32_t        line = 0;
1,296✔
815

816
  for (int32_t i = 0; i < taosArrayGetSize(pCols); i++) {
3,888✔
817
    SColData* pCol = taosArrayGet(pCols, i);
2,592✔
818
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
2,592✔
819
    void* pSlotId = taosHashGet(pCol2SlotId, &pCol->cid, sizeof(pCol->cid));
2,592✔
820
    if (pSlotId == NULL) {
2,592✔
821
      continue;
1,296✔
822
    }
823
    SColVal colVal = {0};
1,296✔
824
    for (int32_t row = 0; row < pCol->nVal; row++) {
3,888✔
825
      code = tColDataGetValue(pCol, row, &colVal);
2,592✔
826
      TSDB_CHECK_CODE(code, line, END);
2,592✔
827

828
      code = setBlockData(pBlock, *(int16_t*)pSlotId, pBlock->info.rows + row, &colVal, pBlobSet);
2,592✔
829
      TSDB_CHECK_CODE(code, line, END);
2,592✔
830
    }
831
  }
832
  
833
  END:
1,296✔
834
  return code;
1,296✔
835
}
836

837
static int32_t checkSchema(STqReader* pReader, SSubmitTbData* pSubmitTbData) {
112,595,435✔
838
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
112,595,435✔
839
  int32_t sversion = pSubmitTbData->sver;
112,605,031✔
840
  int64_t suid = pSubmitTbData->suid;
112,603,994✔
841
  int64_t uid = pSubmitTbData->uid;
112,607,089✔
842
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
112,606,395✔
843
      (pReader->cachedSchemaVer != sversion)) {
112,465,405✔
844
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
111,835✔
845
    taosMemoryFreeClear(pReader->extSchema);
141,676✔
846
    taosMemoryFreeClear(pReader->pTSchema);
141,676✔
847
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
141,676✔
848
    if (pReader->pSchemaWrapper == NULL) {
141,404✔
UNCOV
849
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d, possibly dropped table",
×
850
              vgId, suid, uid, pReader->cachedSchemaVer);
UNCOV
851
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
852
    }
853
    pReader->pTSchema = tBuildTSchema(pReader->pSchemaWrapper->pSchema, pReader->pSchemaWrapper->nCols, pReader->pSchemaWrapper->version);
141,001✔
854
    if (pReader->pTSchema == NULL) {
141,676✔
UNCOV
855
      tqWarn("vgId:%d, cannot build schema for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d",
×
856
              vgId, suid, uid, pReader->cachedSchemaVer);
UNCOV
857
      return terrno;
×
858
    }
859
    pReader->cachedSchemaUid = uid;
141,323✔
860
    pReader->cachedSchemaSuid = suid;
141,323✔
861
    pReader->cachedSchemaVer = sversion;
141,323✔
862
  }
863
  return TSDB_CODE_SUCCESS;
112,604,680✔
864
}
865

866
static int32_t tqRetrieveCols(STqReader* pReader, SSDataBlock* pBlock, SHashObj* pCol2SlotId) {
112,583,798✔
867
  if (pReader == NULL || pBlock == NULL) {
112,583,798✔
UNCOV
868
    return TSDB_CODE_INVALID_PARA;
×
869
  }
870
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
112,607,089✔
871
  int32_t        code = 0;
112,590,274✔
872
  int32_t        line = 0;
112,590,274✔
873
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
112,590,274✔
874
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
112,607,440✔
875
  pReader->lastTs = pSubmitTbData->ctimeMs;
112,607,440✔
876

877
  int32_t numOfRows = 0;
112,606,076✔
878
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
112,606,076✔
879
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
1,296✔
880
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
1,296✔
881
    numOfRows = pCol->nVal;
1,296✔
882
  } else {
883
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
112,603,392✔
884
  }
885

886
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfRows);
112,597,485✔
887
  TSDB_CHECK_CODE(code, line, END);
112,600,229✔
888

889
  code = checkSchema(pReader, pSubmitTbData);
112,600,229✔
890
  TSDB_CHECK_CODE(code, line, END);
112,604,337✔
891

892
  // convert and scan one block
893
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
112,604,337✔
894
    SArray* pCols = pSubmitTbData->aCol;
1,296✔
895
    code = processSubmitCol(pCols, pBlock, pCol2SlotId, pSubmitTbData->pBlobSet);
1,296✔
896
    TSDB_CHECK_CODE(code, line, END);
1,296✔
897
  } else {
898
    SArray*         pRows = pSubmitTbData->aRowP;
112,603,384✔
899
    code = processSubmitRow(pRows, pBlock, pCol2SlotId, pReader, pSubmitTbData->pBlobSet);
112,600,981✔
900
    TSDB_CHECK_CODE(code, line, END);
112,518,312✔
901
  }
902
  pBlock->info.rows += numOfRows;
112,519,608✔
903
END:
112,543,339✔
904
  if (code != 0) {
112,543,339✔
UNCOV
905
    tqError("tqRetrieveCols failed, line:%d, msg:%s", line, tstrerror(code));
×
906
  }
907
  return code;
112,552,943✔
908
}
909

910
#define PROCESS_VAL                                      \
911
  if (curRow == 0) {                                     \
912
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
913
    buildNew = true;                                     \
914
  } else {                                               \
915
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
916
    if (currentRowAssigned != assigned[j]) {             \
917
      assigned[j] = currentRowAssigned;                  \
918
      buildNew = true;                                   \
919
    }                                                    \
920
  }
921

922
#define SET_DATA                                                                                    \
923
  if (colVal.cid < pColData->info.colId) {                                                          \
924
    sourceIdx++;                                                                                    \
925
  } else if (colVal.cid == pColData->info.colId) {                                                  \
926
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
927
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
928
    } else {                                                                                        \
929
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
930
    }                                                                                               \
931
    sourceIdx++;                                                                                    \
932
    targetIdx++;                                                                                    \
933
  } else {                                                                                          \
934
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
935
    targetIdx++;                                                                                    \
936
  }
937

938
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
43,359,185✔
939
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
940
  int32_t         code = 0;
43,359,185✔
941
  SSchemaWrapper* pSW = NULL;
43,359,185✔
942
  SSDataBlock*    block = NULL;
43,382,596✔
943
  if (taosArrayGetSize(blocks) > 0) {
43,382,596✔
UNCOV
944
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
UNCOV
945
    TQ_NULL_GO_TO_END(pLastBlock);
×
UNCOV
946
    pLastBlock->info.rows = curRow - *lastRow;
×
UNCOV
947
    *lastRow = curRow;
×
948
  }
949

950
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
43,412,829✔
951
  TQ_NULL_GO_TO_END(block);
43,399,539✔
952

953
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
43,399,539✔
954
  TQ_NULL_GO_TO_END(pSW);
43,398,766✔
955

956
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
43,398,766✔
957
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
43,432,117✔
958
          (int32_t)taosArrayGetSize(block->pDataBlock));
959

960
  block->info.id.uid = pSubmitTbData->uid;
43,432,117✔
961
  block->info.version = pReader->msg.ver;
43,421,914✔
962
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
43,429,626✔
963
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
43,422,545✔
964
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
43,423,619✔
965
  pSW = NULL;
43,423,619✔
966

967
  taosMemoryFreeClear(block);
43,423,619✔
968

969
END:
43,406,130✔
970
  if (code != 0) {
43,411,391✔
UNCOV
971
    tqError("processBuildNew failed, code:%d", code);
×
972
  }
973
  tDeleteSchemaWrapper(pSW);
43,411,391✔
974
  blockDataFreeRes(block);
43,390,813✔
975
  taosMemoryFree(block);
43,400,281✔
976
  return code;
43,393,762✔
977
}
978
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
853,157✔
979
  int32_t code = 0;
853,157✔
980
  int32_t curRow = 0;
853,157✔
981
  int32_t lastRow = 0;
853,157✔
982

983
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
853,475✔
984
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
855,701✔
985
  TQ_NULL_GO_TO_END(assigned);
854,168✔
986

987
  SArray*   pCols = pSubmitTbData->aCol;
854,168✔
988
  SColData* pCol = taosArrayGet(pCols, 0);
854,168✔
989
  TQ_NULL_GO_TO_END(pCol);
855,383✔
990
  int32_t numOfRows = pCol->nVal;
855,383✔
991
  int32_t numOfCols = taosArrayGetSize(pCols);
855,064✔
992
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
854,109✔
993
          numOfRows);
994
  for (int32_t i = 0; i < numOfRows; i++) {
125,452,599✔
995
    bool buildNew = false;
124,746,053✔
996

997
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
497,439,988✔
998
      int32_t k = 0;
368,790,232✔
999
      for (; k < numOfCols; k++) {
733,431,753✔
1000
        pCol = taosArrayGet(pCols, k);
713,566,144✔
1001
        TQ_NULL_GO_TO_END(pCol);
713,972,272✔
1002
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
713,972,272✔
1003
          SColVal colVal = {0};
368,916,673✔
1004
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
369,546,578✔
1005
          PROCESS_VAL
371,590,668✔
1006
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
372,780,805✔
1007
          break;
371,551,703✔
1008
        }
1009
      }
1010
      if (k >= numOfCols) {
372,693,935✔
1011
        // this column is not in the current row, so we set it to NULL
UNCOV
1012
        assigned[j] = 0;
×
UNCOV
1013
        buildNew = true;
×
1014
      }
1015
    }
1016

1017
    if (buildNew) {
122,924,913✔
1018
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
856,017✔
1019
    }
1020

1021
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
122,924,913✔
1022
    TQ_NULL_GO_TO_END(pBlock);
124,738,393✔
1023

1024
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
124,738,393✔
1025
            (int32_t)taosArrayGetSize(blocks));
1026

1027
    int32_t targetIdx = 0;
124,738,393✔
1028
    int32_t sourceIdx = 0;
124,738,393✔
1029
    int32_t colActual = blockDataGetNumOfCols(pBlock);
124,738,393✔
1030
    while (targetIdx < colActual && sourceIdx < numOfCols) {
495,332,036✔
1031
      pCol = taosArrayGet(pCols, sourceIdx);
370,732,594✔
1032
      TQ_NULL_GO_TO_END(pCol);
369,534,313✔
1033
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
369,534,313✔
1034
      TQ_NULL_GO_TO_END(pColData);
366,325,377✔
1035
      SColVal colVal = {0};
366,325,377✔
1036
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
368,003,082✔
1037
      SET_DATA
372,504,473✔
1038
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
371,710,891✔
1039
    }
1040

1041
    curRow++;
124,599,442✔
1042
  }
1043
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
706,546✔
1044
  pLastBlock->info.rows = curRow - lastRow;
856,335✔
1045
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
856,335✔
1046
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1047
END:
24,067,358✔
1048
  if (code != TSDB_CODE_SUCCESS) {
856,017✔
UNCOV
1049
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1050
  }
1051
  taosMemoryFree(assigned);
856,017✔
1052
  return code;
856,335✔
1053
}
1054

1055
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
42,504,790✔
1056
  int32_t   code = 0;
42,504,790✔
1057
  STSchema* pTSchema = NULL;
42,504,790✔
1058

1059
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
42,504,790✔
1060
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
42,541,466✔
1061
  TQ_NULL_GO_TO_END(assigned);
42,534,180✔
1062

1063
  int32_t curRow = 0;
42,534,180✔
1064
  int32_t lastRow = 0;
42,534,180✔
1065
  SArray* pRows = pSubmitTbData->aRowP;
42,520,798✔
1066
  int32_t numOfRows = taosArrayGetSize(pRows);
42,552,969✔
1067
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
42,556,109✔
1068
  TQ_NULL_GO_TO_END(pTSchema);
42,546,795✔
1069
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
42,546,795✔
1070

1071
  for (int32_t i = 0; i < numOfRows; i++) {
1,236,898,836✔
1072
    bool  buildNew = false;
1,194,461,544✔
1073
    SRow* pRow = taosArrayGetP(pRows, i);
1,194,461,544✔
1074
    TQ_NULL_GO_TO_END(pRow);
1,193,607,661✔
1075

1076
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1077
      SColVal colVal = {0};
2,147,483,647✔
1078
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647✔
1079
      PROCESS_VAL
2,147,483,647✔
1080
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647✔
1081
    }
1082

1083
    if (buildNew) {
1,189,128,507✔
1084
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
42,576,591✔
1085
    }
1086

1087
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1,189,105,293✔
1088
    TQ_NULL_GO_TO_END(pBlock);
1,195,625,808✔
1089

1090
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1,195,625,808✔
1091
            (int32_t)taosArrayGetSize(blocks));
1092

1093
    int32_t targetIdx = 0;
1,195,625,808✔
1094
    int32_t sourceIdx = 0;
1,195,625,808✔
1095
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1,195,625,808✔
1096
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647✔
1097
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1098
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647✔
1099
      SColVal          colVal = {0};
2,147,483,647✔
1100
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647✔
1101
      SET_DATA
2,147,483,647✔
1102
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647✔
1103
    }
1104

1105
    curRow++;
1,194,394,079✔
1106
  }
1107
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
42,437,292✔
1108
  if (pLastBlock != NULL) {
42,577,911✔
1109
    pLastBlock->info.rows = curRow - lastRow;
42,578,267✔
1110
  }
1111

1112
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
42,556,170✔
1113
          (int)taosArrayGetSize(blocks));
1114
END:
45,262,444✔
1115
  if (code != TSDB_CODE_SUCCESS) {
42,558,852✔
UNCOV
1116
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1117
  }
1118
  taosMemoryFreeClear(pTSchema);
42,534,006✔
1119
  taosMemoryFree(assigned);
42,560,839✔
1120
  return code;
42,524,273✔
1121
}
1122

1123
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
8,006✔
1124
  int32_t code = 0;
8,006✔
1125
  int32_t lino = 0;
8,006✔
1126
  void*   createReq = NULL;
8,006✔
1127
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
8,006✔
1128
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
8,006✔
1129

1130
  if (pRsp->createTableNum == 0) {
8,006✔
1131
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
4,352✔
1132
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
4,352✔
1133
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
4,352✔
1134
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
4,352✔
1135
  }
1136

1137
  uint32_t len = 0;
8,006✔
1138
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
8,006✔
1139
  TSDB_CHECK_CODE(code, lino, END);
8,006✔
1140
  createReq = taosMemoryCalloc(1, len);
8,006✔
1141
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
8,006✔
1142

1143
  SEncoder encoder = {0};
8,006✔
1144
  tEncoderInit(&encoder, createReq, len);
8,006✔
1145
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
8,006✔
1146
  tEncoderClear(&encoder);
8,006✔
1147
  TSDB_CHECK_CODE(code, lino, END);
8,006✔
1148
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
16,012✔
1149
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
16,012✔
1150
  pRsp->createTableNum++;
8,006✔
1151
  tqTrace("build create table info msg success");
8,006✔
1152

1153
END:
8,006✔
1154
  if (code != 0) {
8,006✔
UNCOV
1155
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1156
    taosMemoryFree(createReq);
×
1157
  }
1158
  return code;
8,006✔
1159
}
1160

1161
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
43,631,264✔
1162
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1163
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
43,631,264✔
1164
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
43,631,264✔
1165
  if (pSubmitTbData == NULL) {
43,639,118✔
UNCOV
1166
    return terrno;
×
1167
  }
1168
  pReader->nextBlk++;
43,639,118✔
1169

1170
  if (pSubmitTbDataRet) {
43,627,568✔
1171
    *pSubmitTbDataRet = pSubmitTbData;
43,637,539✔
1172
  }
1173

1174
  if (fetchMeta == ONLY_META) {
43,627,533✔
1175
    if (pSubmitTbData->pCreateTbReq != NULL) {
6,624✔
1176
      if (pRsp->createTableReq == NULL) {
1,926✔
1177
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
1,143✔
1178
        if (pRsp->createTableReq == NULL) {
1,143✔
UNCOV
1179
          return terrno;
×
1180
        }
1181
      }
1182
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
3,852✔
UNCOV
1183
        return terrno;
×
1184
      }
1185
      pSubmitTbData->pCreateTbReq = NULL;
1,926✔
1186
    }
1187
    return 0;
6,624✔
1188
  }
1189

1190
  int32_t sversion = pSubmitTbData->sver;
43,620,909✔
1191
  int64_t uid = pSubmitTbData->uid;
43,630,246✔
1192
  pReader->lastBlkUid = uid;
43,633,129✔
1193

1194
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
43,631,197✔
1195
  taosMemoryFreeClear(pReader->extSchema);
43,628,048✔
1196
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
43,618,222✔
1197
  if (pReader->pSchemaWrapper == NULL) {
43,619,528✔
1198
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
199,063✔
1199
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1200
    pReader->cachedSchemaSuid = 0;
199,063✔
1201
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
199,063✔
1202
  }
1203

1204
  if (pSubmitTbData->pCreateTbReq != NULL) {
43,370,008✔
1205
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
8,006✔
1206
    if (code != 0) {
8,006✔
UNCOV
1207
      return code;
×
1208
    }
1209
  } else if (rawList != NULL) {
43,406,005✔
1210
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
UNCOV
1211
      return terrno;
×
1212
    }
1213
    pReader->pSchemaWrapper = NULL;
×
UNCOV
1214
    return 0;
×
1215
  }
1216

1217
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
43,414,011✔
1218
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
854,429✔
1219
  } else {
1220
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
42,538,135✔
1221
  }
1222
}
1223

1224
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
344,369✔
1225
  if (pReader == NULL || tbUidList == NULL) {
344,369✔
1226
    return TSDB_CODE_SUCCESS;
×
1227
  }
1228
  if (pReader->tbIdHash) {
344,651✔
UNCOV
1229
    taosHashClear(pReader->tbIdHash);
×
1230
  } else {
1231
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
344,912✔
1232
    if (pReader->tbIdHash == NULL) {
344,912✔
UNCOV
1233
      tqError("s-task:%s failed to init hash table", id);
×
UNCOV
1234
      return terrno;
×
1235
    }
1236
  }
1237

1238
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
9,755,578✔
1239
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
9,406,296✔
1240
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
9,406,160✔
1241
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1242
      continue;
×
1243
    }
1244
  }
1245

1246
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
344,912✔
1247
  return TSDB_CODE_SUCCESS;
344,912✔
1248
}
1249

1250
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
382,740✔
1251
  if (pReader == NULL || pTableUidList == NULL) {
382,740✔
UNCOV
1252
    return;
×
1253
  }
1254
  if (pReader->tbIdHash == NULL) {
382,740✔
UNCOV
1255
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
UNCOV
1256
    if (pReader->tbIdHash == NULL) {
×
1257
      tqError("failed to init hash table");
×
1258
      return;
×
1259
    }
1260
  }
1261

1262
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
382,740✔
1263
  for (int i = 0; i < numOfTables; i++) {
712,988✔
1264
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
330,248✔
1265
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
330,248✔
1266
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
UNCOV
1267
      continue;
×
1268
    }
1269
    tqDebug("%s add table uid:%" PRId64 " to hash", __func__, *pKey);
330,248✔
1270
  }
1271
}
1272

1273
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
UNCOV
1274
  if (pReader == NULL) {
×
UNCOV
1275
    return false;
×
1276
  }
UNCOV
1277
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1278
}
1279

UNCOV
1280
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
UNCOV
1281
  if (pReader == NULL) {
×
UNCOV
1282
    return false;
×
1283
  }
UNCOV
1284
  return pReader->msg.msgStr == NULL;
×
1285
}
1286

1287
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
1,029✔
1288
  if (pReader == NULL || tbUidList == NULL) {
1,029✔
1289
    return;
×
1290
  }
1291
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
2,058✔
1292
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
1,029✔
1293
    int32_t code = taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
1,029✔
1294
    if (code != 0) {
1,029✔
1295
      tqWarn("%s failed to remove table uid:%" PRId64 " from hash, msg:%s", __func__, pKey != NULL ? *pKey : 0, tstrerror(code));
343✔
1296
    }
1297
  }
1298
}
1299

1300
int32_t tqDeleteTbUidList(STQ* pTq, SArray* tbUidList) {
2,211,389✔
1301
  if (pTq == NULL) {
2,211,389✔
UNCOV
1302
    return 0;  // mounted vnode may have no tq
×
1303
  }
1304
  if (tbUidList == NULL) {
2,211,389✔
UNCOV
1305
    return TSDB_CODE_INVALID_PARA;
×
1306
  }
1307
  void*   pIter = NULL;
2,211,389✔
1308
  int32_t vgId = TD_VID(pTq->pVnode);
2,211,389✔
1309

1310
  // update the table list for each consumer handle
1311
  taosWLockLatch(&pTq->lock);
2,211,389✔
1312
  while (1) {
312,820✔
1313
    pIter = taosHashIterate(pTq->pHandle, pIter);
2,524,209✔
1314
    if (pIter == NULL) {
2,524,209✔
1315
      break;
2,211,389✔
1316
    }
1317

1318
    STqHandle* pTqHandle = (STqHandle*)pIter;
312,820✔
1319
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " delete table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
312,820✔
1320
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
312,820✔
1321
      int32_t code = qDeleteTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
UNCOV
1322
      if (code != 0) {
×
UNCOV
1323
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
UNCOV
1324
        continue;
×
1325
      }
1326
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
312,820✔
1327
      int32_t sz = taosArrayGetSize(tbUidList);
312,820✔
1328
      for (int32_t i = 0; i < sz; i++) {
312,820✔
UNCOV
1329
        int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1330
        if (tbUid &&
×
UNCOV
1331
            taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1332
          tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
UNCOV
1333
          continue;
×
1334
        }
1335
      }
UNCOV
1336
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
1337
      tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1338
    }
1339
  }
1340
  taosWUnLockLatch(&pTq->lock);
2,211,389✔
1341
  return 0;
2,211,389✔
1342
}
1343

1344
static int32_t addTableListForStableTmq(STqHandle* pTqHandle, STQ* pTq, SArray* tbUidList) {
8,943✔
1345
  int     ret = qFilterTableList(pTq->pVnode, tbUidList, pTqHandle->execHandle.execTb.node,
8,943✔
1346
                      pTqHandle->execHandle.task, pTqHandle->execHandle.execTb.suid);
8,943✔
1347
  if (ret != TDB_CODE_SUCCESS) {
8,943✔
UNCOV
1348
    tqError("tqAddTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1349
            pTqHandle->consumerId);
UNCOV
1350
    return ret;
×
1351
  }
1352
  tqDebug("%s handle %s consumer:0x%" PRIx64 " add %d tables to tqReader", __func__, pTqHandle->subKey,
8,943✔
1353
          pTqHandle->consumerId, (int32_t)taosArrayGetSize(tbUidList));
1354
  tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
8,943✔
1355
  return 0;
8,943✔
1356
}
1357

1358
int32_t tqAddTbUidList(STQ* pTq, SArray* tbUidList) {
52,337,930✔
1359
  if (pTq == NULL) {
52,337,930✔
UNCOV
1360
    return 0;  // mounted vnode may have no tq
×
1361
  }
1362
  if (tbUidList == NULL) {
52,337,930✔
UNCOV
1363
    return TSDB_CODE_INVALID_PARA;
×
1364
  }
1365
  void*   pIter = NULL;
52,337,930✔
1366
  int32_t vgId = TD_VID(pTq->pVnode);
52,337,930✔
1367
  int32_t code = 0;
52,337,930✔
1368

1369
  // update the table list for each consumer handle
1370
  taosWLockLatch(&pTq->lock);
52,337,930✔
1371
  while (1) {
999,348✔
1372
    pIter = taosHashIterate(pTq->pHandle, pIter);
53,337,278✔
1373
    if (pIter == NULL) {
53,337,278✔
1374
      break;
52,337,930✔
1375
    }
1376

1377
    STqHandle* pTqHandle = (STqHandle*)pIter;
999,348✔
1378
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " add table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
999,348✔
1379
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
999,348✔
1380
      code = qAddTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
373,797✔
1381
      if (code != 0) {
373,797✔
UNCOV
1382
        tqError("add table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
UNCOV
1383
        break;
×
1384
      }
1385
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
625,551✔
1386
      code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
7,914✔
1387
      if (code != 0) {
7,914✔
1388
        tqError("add table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
UNCOV
1389
        break;
×
1390
      }
1391
    }
1392
  }
1393
  taosHashCancelIterate(pTq->pHandle, pIter);
52,337,930✔
1394
  taosWUnLockLatch(&pTq->lock);
52,337,930✔
1395

1396
  return code;
52,337,930✔
1397
}
1398

1399
int32_t tqUpdateTbUidList(STQ* pTq, SArray* tbUidList, SArray* cidList) {
7,422,535✔
1400
  if (pTq == NULL) {
7,422,535✔
UNCOV
1401
    return 0;  // mounted vnode may have no tq
×
1402
  }
1403
  if (tbUidList == NULL) {
7,422,535✔
UNCOV
1404
    return TSDB_CODE_INVALID_PARA;
×
1405
  }
1406
  void*   pIter = NULL;
7,422,535✔
1407
  int32_t vgId = TD_VID(pTq->pVnode);
7,422,535✔
1408
  int32_t code = 0;
7,422,535✔
1409
  // update the table list for each consumer handle
1410
  taosWLockLatch(&pTq->lock);
7,422,535✔
1411
  while (1) {
2,704✔
1412
    pIter = taosHashIterate(pTq->pHandle, pIter);
7,425,239✔
1413
    if (pIter == NULL) {
7,425,239✔
1414
      break;
7,422,535✔
1415
    }
1416

1417
    STqHandle* pTqHandle = (STqHandle*)pIter;
2,704✔
1418
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
2,704✔
1419
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,704✔
1420
      SNode* pTagCond = getTagCondNodeForQueryTmq(pTqHandle->execHandle.task);
646✔
1421
      bool ret = checkCidInTagCondition(pTagCond, cidList);
646✔
1422
      if (ret){
646✔
UNCOV
1423
        code = qUpdateTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
UNCOV
1424
        if (code != 0) {
×
UNCOV
1425
          tqError("update table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
UNCOV
1426
          break;
×
1427
        }
1428
      }
1429
      qUpdateTableTagCacheForTmq(pTqHandle->execHandle.task, tbUidList, cidList);
646✔
1430
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
2,058✔
1431
      SNode* pTagCond = getTagCondNodeForStableTmq(pTqHandle->execHandle.execTb.node);
2,058✔
1432
      bool ret = checkCidInTagCondition(pTagCond, cidList);
2,058✔
1433
      if (ret){
2,058✔
1434
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
1,029✔
1435
        code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
1,029✔
1436
        if (code != 0) {
1,029✔
1437
          tqError("update table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
UNCOV
1438
          break;
×
1439
        }
1440
      }
1441
    }
1442
  }
1443

1444
  taosHashCancelIterate(pTq->pHandle, pIter);
7,422,535✔
1445
  taosWUnLockLatch(&pTq->lock);
7,422,535✔
1446

1447
  return code;
7,422,535✔
1448
}
1449

UNCOV
1450
static void destroySourceScanTables(void* ptr) {
×
UNCOV
1451
  SArray** pTables = ptr;
×
UNCOV
1452
  if (pTables && *pTables) {
×
1453
    taosArrayDestroy(*pTables);
×
1454
    *pTables = NULL;
×
1455
  }
UNCOV
1456
}
×
1457

1458
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
UNCOV
1459
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
UNCOV
1460
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
UNCOV
1461
  if (pCol1->vColId == pCol2->vColId) {
×
UNCOV
1462
    return 0;
×
UNCOV
1463
  } else if (pCol1->vColId < pCol2->vColId) {
×
UNCOV
1464
    return -1;
×
1465
  } else {
UNCOV
1466
    return 1;
×
1467
  }
1468
}
1469

UNCOV
1470
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
UNCOV
1471
  if (value) {
×
UNCOV
1472
    SSchemaWrapper* pSchemaWrapper = value;
×
1473
    tDeleteSchemaWrapper(pSchemaWrapper);
1474
  }
UNCOV
1475
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc