• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.99
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "taoserror.h"
17
#include "tarray.h"
18
#include "tdef.h"
19
#include "thash.h"
20
#include "tmsg.h"
21
#include "tpriv.h"
22
#include "tq.h"
23

24
static int32_t tqRetrieveCols(STqReader *pReader, SSDataBlock *pRes, SHashObj* pCol2SlotId);
25

26
static void processCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
11,574✔
27
  int32_t code = 0;
11,574✔
28
  int32_t lino = 0;
11,574✔
29
  int32_t        needRebuild = 0;
11,574✔
30
  SVCreateTbReq* pCreateReq = NULL;
11,574✔
31
  SVCreateTbBatchReq reqNew = {0};
11,574✔
32
  void* buf = NULL;
11,574✔
33
  SVCreateTbBatchReq req = {0};
11,574✔
34
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
11,574✔
35
  if (code < 0) {
11,574✔
36
    lino = __LINE__;
×
37
    goto end;
×
38
  }
39

40
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
23,474✔
41
    pCreateReq = req.pReqs + iReq;
11,900✔
42
    if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) && 
11,900✔
43
         pCreateReq->ctb.suid == tbSuid &&
15,818✔
44
         taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
6,441✔
45
      needRebuild++;
4,687✔
46
    }
47
  }
48
  if (needRebuild == 0) {
11,574✔
49
    // do nothing
50
  } else if (needRebuild == req.nReqs) {
4,687✔
51
    *realTbSuid = tbSuid;
4,361✔
52
  } else {
53
    *realTbSuid = tbSuid;
326✔
54
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
326✔
55
    if (reqNew.pArray == NULL) {
326✔
56
      code = terrno;
×
57
      lino = __LINE__;
×
58
      goto end;
×
59
    }
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
978✔
61
      pCreateReq = req.pReqs + iReq;
652✔
62
      if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) &&
652✔
63
          pCreateReq->ctb.suid == tbSuid &&
1,304✔
64
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
652✔
65
        reqNew.nReqs++;
326✔
66
        if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
652✔
67
          code = terrno;
×
68
          lino = __LINE__;
×
69
          goto end;
×
70
        }
71
      }
72
    }
73

74
    int     tlen = 0;
326✔
75
    tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
326✔
76
    buf = taosMemoryMalloc(tlen);
326✔
77
    if (NULL == buf || code < 0) {
326✔
78
      lino = __LINE__;
×
79
      goto end;
×
80
    }
81
    SEncoder coderNew = {0};
326✔
82
    tEncoderInit(&coderNew, buf, tlen);
326✔
83
    code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
326✔
84
    tEncoderClear(&coderNew);
326✔
85
    if (code < 0) {
326✔
86
      lino = __LINE__;
×
87
      goto end;
×
88
    }
89
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
326✔
90
    pHead->bodyLen = tlen + sizeof(SMsgHead);
326✔
91
  }
92

93
end:
11,574✔
94
  taosMemoryFree(buf);
11,574✔
95
  taosArrayDestroy(reqNew.pArray);
11,574✔
96
  tDeleteSVCreateTbBatchReq(&req);
11,574✔
97
  if (code < 0) {
11,574✔
98
    tqError("processCreateTbMsg failed, code:%d, line:%d", code, lino);
×
99
  }
100
}
11,574✔
101

102
static void processAlterTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
6,847✔
103
  SVAlterTbReq req = {0};
6,847✔
104
  SVAlterTbReq reqNew = {0};
6,847✔
105
  SMetaReader mr = {0};
6,847✔
106
  void* buf = NULL;
6,847✔
107
  int32_t lino = 0;
6,847✔
108
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
6,847✔
109
  if (code < 0) {
6,847✔
110
    tqError("vgId:%d, processAlterTbMsg failed to decode SVAlterTbReq, code:%s",
×
111
            TD_VID(pReader->pVnode), tstrerror(code));
112
    lino = __LINE__;
×
113
    goto end;
×
114
  }
115

116
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
6,847✔
117
    int32_t needRebuild = 0;
3,917✔
118
    for (int32_t i = 0; i < taosArrayGetSize(req.tables); i++) {
9,669✔
119
      SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
5,752✔
120
      if (pTable == NULL || pTable->tbName == NULL) {
5,752✔
121
        tqWarn("vgId:%d, processAlterTbMsg Type 1 table[%d] has invalid name, skip",
×
122
               TD_VID(pReader->pVnode), i);
123
        continue;
×
124
      }
125

126
      metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
5,752✔
127
      code = metaGetTableEntryByName(&mr, pTable->tbName);
5,752✔
128
      if (code < 0) {
5,752✔
129
        // Table not found, skip
130
        tqDebug("vgId:%d, processAlterTbMsg Type 1 table %s not found, skip",
×
131
                TD_VID(pReader->pVnode), pTable->tbName);
132
        metaReaderClear(&mr);
×
133
        continue;
×
134
      }
135

136
      if (mr.me.ctbEntry.suid == tbSuid &&
11,504✔
137
          taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
5,752✔
138
        needRebuild++;
4,691✔
139
      }
140
      metaReaderClear(&mr);
5,752✔
141
    }
142

143
    if (needRebuild == 0) {
3,917✔
144
      // No tables in subscription scope, skip message
145
      tqDebug("vgId:%d, processAlterTbMsg Type 1: 0/%d tables in subscription, skip",
327✔
146
              TD_VID(pReader->pVnode), (int)taosArrayGetSize(req.tables));
147
    } else if (needRebuild == taosArrayGetSize(req.tables)) {
3,590✔
148
      // All tables in subscription scope, forward complete message
149
      *realTbSuid = tbSuid;
2,856✔
150
      tqDebug("vgId:%d, processAlterTbMsg Type 1: %d/%d tables in subscription, forward all",
2,856✔
151
              TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
152
    } else {
153
      // Partial match: rebuild message with only subscribed tables
154
      *realTbSuid = tbSuid;
734✔
155
      tqDebug("vgId:%d, processAlterTbMsg Type 1: %d/%d tables in subscription, rebuild message",
734✔
156
              TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
157

158
      // Build filtered message
159
      reqNew.action = req.action;
734✔
160
      reqNew.tbName = req.tbName;
734✔
161
      reqNew.tables = taosArrayInit(needRebuild, sizeof(SUpdateTableTagVal));
734✔
162
      if (reqNew.tables == NULL) {
734✔
163
        code = terrno;
×
164
        tqError("vgId:%d, processAlterTbMsg failed to allocate filtered tables array, code:%s",
×
165
                TD_VID(pReader->pVnode), tstrerror(code));
166
        lino = __LINE__;
×
167
        goto end;
×
168
      }
169

170
      // Collect only subscribed tables
171
      for (int32_t i = 0; i < taosArrayGetSize(req.tables); i++) {
2,202✔
172
        SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
1,468✔
173
        if (pTable == NULL || pTable->tbName == NULL) {
1,468✔
174
          continue;
×
175
        }
176
        metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
1,468✔
177
        code = metaGetTableEntryByName(&mr, pTable->tbName);
1,468✔
178
        if (code < 0) {
1,468✔
179
          metaReaderClear(&mr);
×
180
          continue;
×
181
        }
182

183
        if (mr.me.ctbEntry.suid == tbSuid &&
2,936✔
184
            taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
1,468✔
185
          if (taosArrayPush(reqNew.tables, pTable) == NULL) {
1,468✔
186
            code = terrno;
×
187
            tqError("vgId:%d, processAlterTbMsg failed to add table %s to filtered array, code:%s",
×
188
                    TD_VID(pReader->pVnode), pTable->tbName, tstrerror(code));
189
            lino = __LINE__;
×
190
            metaReaderClear(&mr);
×
191
            goto end;
×
192
          }
193
        }
194
        metaReaderClear(&mr);
1,468✔
195
      }
196

197
      // Encode filtered message
198
      int tlen = 0;
734✔
199
      tEncodeSize(tEncodeSVAlterTbReq, &reqNew, tlen, code);
734✔
200
      if (code < 0) {
734✔
201
        tqError("vgId:%d, processAlterTbMsg failed to calculate encode size, code:%s",
×
202
                TD_VID(pReader->pVnode), tstrerror(code));
203
        lino = __LINE__;
×
204
        goto end;
×
205
      }
206

207
      buf = taosMemoryMalloc(tlen);
734✔
208
      if (NULL == buf) {
734✔
209
        code = terrno;
×
210
        tqError("vgId:%d, processAlterTbMsg failed to allocate encode buffer size:%d, code:%s",
×
211
                TD_VID(pReader->pVnode), tlen, tstrerror(code));
212
        lino = __LINE__;
×
213
        goto end;
×
214
      }
215

216
      SEncoder coderNew = {0};
734✔
217
      tEncoderInit(&coderNew, buf, tlen);
734✔
218
      code = tEncodeSVAlterTbReq(&coderNew, &reqNew);
734✔
219
      tEncoderClear(&coderNew);
734✔
220
      if (code != 0) {
734✔
221
        tqError("vgId:%d, processAlterTbMsg failed to encode filtered message, code:%s",
×
222
                TD_VID(pReader->pVnode), tstrerror(code));
223
        lino = __LINE__;
×
224
        goto end;
×
225
      }
226
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
734✔
227
      pHead->bodyLen = tlen + sizeof(SMsgHead);
734✔
228
      tqInfo("vgId:%d, processAlterTbMsg Type 1 rebuilt message with %d/%d tables",
734✔
229
             TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
230
    }
231

232
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
2,930✔
233
    // Type 2: WHERE condition on super table
234
    metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
734✔
235
    code = metaGetTableEntryByName(&mr, req.tbName);
734✔
236
    if (code < 0) {
734✔
237
      tqError("vgId:%d, processAlterTbMsg Type 2 failed to get super table %s, code:%s",
×
238
              TD_VID(pReader->pVnode), req.tbName, tstrerror(code));
239
      lino = __LINE__;
×
240
      metaReaderClear(&mr);
×
241
      goto end;
×
242
    }
243

244
    // For Type 2, the super table is specified in req.tbName
245
    // The actual filtering happens through tbIdHash which already contains
246
    // only the child tables matching the topic's WHERE condition
247
    if (mr.me.type == TSDB_SUPER_TABLE) {
734✔
248
      *realTbSuid = mr.me.uid;
734✔
249
      tqDebug("vgId:%d, processAlterTbMsg Type 2: super table %" PRId64 " (WHERE condition filtering via tbIdHash)",
734✔
250
              TD_VID(pReader->pVnode), mr.me.uid);
251
    }
252
    metaReaderClear(&mr);
734✔
253
  } else {
254
    // Legacy single-table tag modification
255
    metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
2,196✔
256
    code = metaGetTableEntryByName(&mr, req.tbName);
2,196✔
257
    if (code < 0) {
2,196✔
258
      tqDebug("vgId:%d, processAlterTbMsg legacy type failed to get table %s, code:%s",
×
259
              TD_VID(pReader->pVnode), req.tbName ? req.tbName : "(null)", tstrerror(code));
260
      lino = __LINE__;
×
261
      metaReaderClear(&mr);
×
262
      goto end;
×
263
    }
264
    if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
2,196✔
265
      *realTbSuid = mr.me.ctbEntry.suid;
732✔
266
    }
267
    metaReaderClear(&mr);
2,196✔
268
  }
269

270
end:
6,847✔
271
  taosMemoryFree(buf);
6,847✔
272
  taosArrayDestroy(reqNew.tables);
6,847✔
273
  destroyAlterTbReq(&req);
6,847✔
274
  if (code < 0) {
6,847✔
275
    tqError("vgId:%d, processAlterTbMsg failed at line:%d, code:%s",
×
276
            TD_VID(pReader->pVnode), lino, tstrerror(code));
277
  }
278
} 
6,847✔
279

280
static void processDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
×
281
  SVDropTbBatchReq req = {0};
×
282
  SVDropTbBatchReq reqNew = {0};
×
283
  void* buf = NULL;
×
284
  int32_t lino = 0;
×
285
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
×
286
  if (code < 0) {
×
287
    lino = __LINE__;
×
288
    goto end;
×
289
  }
290

291
  int32_t      needRebuild = 0;
×
292
  SVDropTbReq* pDropReq = NULL;
×
293
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
294
    pDropReq = req.pReqs + iReq;
×
295

296
    if (pDropReq->suid == tbSuid &&
×
297
        taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
298
      needRebuild++;
×
299
    }
300
  }
301
  if (needRebuild == 0) {
×
302
    // do nothing
303
  } else if (needRebuild == req.nReqs) {
×
304
    *realTbSuid = tbSuid;
×
305
  } else {
306
    *realTbSuid = tbSuid;
×
307
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
308
    if (reqNew.pArray == NULL) {
×
309
      code = terrno;
×
310
      lino = __LINE__;
×
311
      goto end;
×
312
    }
313
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
314
      pDropReq = req.pReqs + iReq;
×
315
      if (pDropReq->suid == tbSuid &&
×
316
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
317
        reqNew.nReqs++;
×
318
        if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
319
          code = terrno;
×
320
          lino = __LINE__;
×
321
          goto end;
×
322
        }
323
      }
324
    }
325

326
    int     tlen = 0;
×
327
    tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
×
328
    buf = taosMemoryMalloc(tlen);
×
329
    if (NULL == buf || code < 0) {
×
330
      lino = __LINE__;
×
331
      goto end;
×
332
    }
333
    SEncoder coderNew = {0};
×
334
    tEncoderInit(&coderNew, buf, tlen);
×
335
    code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
336
    tEncoderClear(&coderNew);
×
337
    if (code != 0) {
×
338
      lino = __LINE__;
×
339
      goto end;
×
340
    }
341
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
342
    pHead->bodyLen = tlen + sizeof(SMsgHead);
×
343
  }
344

345
end:
×
346
  taosMemoryFree(buf);
×
347
  taosArrayDestroy(reqNew.pArray);
×
348
  if (code < 0) {
×
349
    tqError("processDropTbMsg failed, code:%d, line:%d", code, lino);
×
350
  }
351
}
×
352

353
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
39,078✔
354
  int32_t code = 0;
39,078✔
355
  int32_t lino = 0;
39,078✔
356
  if (pHandle == NULL || pHead == NULL) {
39,078✔
357
    return false;
×
358
  }
359
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
39,078✔
360
    return true;
17,072✔
361
  }
362

363
  STqExecHandle* pExec = &pHandle->execHandle;
22,006✔
364
  STqReader* pReader = pExec->pTqReader;
22,006✔
365

366
  int16_t msgType = pHead->msgType;
22,006✔
367
  char*   body = pHead->body;
22,006✔
368
  int32_t bodyLen = pHead->bodyLen;
22,006✔
369

370
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
22,006✔
371
  int64_t  realTbSuid = 0;
22,006✔
372
  SDecoder dcoder = {0};
22,006✔
373
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
22,006✔
374
  int32_t  len = bodyLen - sizeof(SMsgHead);
22,006✔
375
  tDecoderInit(&dcoder, data, len);
22,006✔
376

377
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
25,591✔
378
    SVCreateStbReq req = {0};
3,585✔
379
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
3,585✔
380
      goto end;
×
381
    }
382
    realTbSuid = req.suid;
3,585✔
383
  } else if (msgType == TDMT_VND_DROP_STB) {
18,421✔
384
    SVDropStbReq req = {0};
×
385
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
386
      goto end;
×
387
    }
388
    realTbSuid = req.suid;
×
389
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
18,421✔
390
    processCreateTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
11,574✔
391
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
6,847✔
392
    processAlterTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
6,847✔
393
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
394
    processDropTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
×
395
  } else if (msgType == TDMT_VND_DELETE) {
×
396
    SDeleteRes req = {0};
×
397
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
398
      goto end;
×
399
    }
400
    realTbSuid = req.suid;
×
401
  }
402

403
end:
22,006✔
404
  tDecoderClear(&dcoder);
22,006✔
405
  bool tmp = tbSuid == realTbSuid;
22,006✔
406
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
22,006✔
407
  return tmp;
22,006✔
408
}
409

410
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
49,534,075✔
411
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
49,534,075✔
UNCOV
412
    return -1;
×
413
  }
414
  int32_t code = -1;
49,643,340✔
415
  int32_t vgId = TD_VID(pTq->pVnode);
49,643,340✔
416
  int64_t id = pHandle->pWalReader->readerId;
49,660,420✔
417

418
  int64_t offset = *fetchOffset;
49,657,671✔
419
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
49,666,821✔
420
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
49,667,288✔
421
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
49,658,291✔
422

423
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
49,662,615✔
424
          ", 0x%" PRIx64,
425
          vgId, offset, lastVer, committedVer, appliedVer, id);
426

427
  while (offset <= appliedVer) {
52,514,682✔
428
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
47,648,267✔
429
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
430
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
431
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
432
      goto END;
×
433
    }
434

435
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
47,647,973✔
436
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
437

438
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
47,647,639✔
439
      code = walFetchBody(pHandle->pWalReader);
44,778,274✔
440
      goto END;
44,776,689✔
441
    } else {
442
      if (pHandle->fetchMeta != WITH_DATA) {
2,870,359✔
443
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
68,326✔
444
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
68,326✔
445
          code = walFetchBody(pHandle->pWalReader);
39,078✔
446
          if (code < 0) {
39,078✔
447
            goto END;
×
448
          }
449

450
          pHead = &(pHandle->pWalReader->pHead->head);
39,078✔
451
          if (isValValidForTable(pHandle, pHead)) {
39,078✔
452
            code = 0;
28,934✔
453
            goto END;
28,934✔
454
          } else {
455
            offset++;
10,144✔
456
            code = -1;
10,144✔
457
            continue;
10,144✔
458
          }
459
        }
460
      }
461
      code = walSkipFetchBody(pHandle->pWalReader);
2,831,281✔
462
      if (code < 0) {
2,831,281✔
463
        goto END;
×
464
      }
465
      offset++;
2,831,281✔
466
    }
467
    code = -1;
2,831,281✔
468
  }
469

470
END:
4,866,415✔
471
  *fetchOffset = offset;
49,672,038✔
472
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
49,670,416✔
473
          ", applied:%" PRId64 ", 0x%" PRIx64,
474
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
475
  return code;
49,672,943✔
476
}
477

478
bool tqGetTablePrimaryKey(STqReader* pReader) {
6,289,883✔
479
  if (pReader == NULL) {
6,289,883✔
480
    return false;
×
481
  }
482
  return pReader->hasPrimaryKey;
6,289,883✔
483
}
484

485
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
48,243✔
486
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
48,243✔
487

488
  if (pReader == NULL) {
48,243✔
489
    return;
×
490
  }
491
  bool            ret = false;
48,243✔
492
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnode->pMeta, uid, -1, 1, NULL, 0);
48,243✔
493
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
48,243✔
494
    ret = true;
971✔
495
  }
496
  tDeleteSchemaWrapper(schema);
497
  pReader->hasPrimaryKey = ret;
48,243✔
498
}
499

500
static void freeTagCache(void* pData){
1,723,541✔
501
  if (pData == NULL) return;
1,723,541✔
502
  SArray* tagCache = *(SArray**)pData;
1,723,541✔
503
  taosArrayDestroyP(tagCache, taosMemFree);
1,723,541✔
504
}
505

506
STqReader* tqReaderOpen(SVnode* pVnode) {
439,874✔
507
  tqDebug("%s:%p", __FUNCTION__, pVnode);
439,874✔
508
  if (pVnode == NULL) {
442,516✔
509
    return NULL;
×
510
  }
511
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
442,516✔
512
  if (pReader == NULL) {
442,516✔
513
    return NULL;
×
514
  }
515

516
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
442,516✔
517
  if (pReader->pWalReader == NULL) {
442,516✔
518
    taosMemoryFree(pReader);
×
519
    return NULL;
×
520
  }
521

522
  pReader->pVnode = pVnode;
442,516✔
523
  pReader->pSchemaWrapper = NULL;
442,516✔
524
  pReader->tbIdHash = NULL;
442,516✔
525
  pReader->pTableTagCacheForTmq = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
442,516✔
526
  if (pReader->pTableTagCacheForTmq == NULL) {
442,516✔
527
    walCloseReader(pReader->pWalReader);
×
528
    taosMemoryFree(pReader);
×
529
    return NULL;
×
530
  }
531
  taosHashSetFreeFp(pReader->pTableTagCacheForTmq, freeTagCache);
442,516✔
532
  taosInitRWLatch(&pReader->tagCachelock);
442,516✔
533

534
  return pReader;
442,516✔
535
}
536

537
void tqReaderClose(STqReader* pReader) {
447,340✔
538
  tqDebug("%s:%p", __FUNCTION__, pReader);
447,340✔
539
  if (pReader == NULL) return;
447,340✔
540

541
  // close wal reader
542
  walCloseReader(pReader->pWalReader);
442,516✔
543
  taosHashCleanup(pReader->pTableTagCacheForTmq);
442,516✔
544
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
442,516✔
545
  taosMemoryFree(pReader->pTSchema);
442,516✔
546
  taosMemoryFree(pReader->extSchema);
442,516✔
547

548
  // free hash
549
  taosHashCleanup(pReader->tbIdHash);
442,516✔
550
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
442,516✔
551

552
  taosMemoryFree(pReader);
442,516✔
553
}
554

555
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
370,519✔
556
  if (pReader == NULL) {
370,519✔
557
    return TSDB_CODE_INVALID_PARA;
×
558
  }
559
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
370,519✔
560
    return terrno;
134,335✔
561
  }
562
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
235,838✔
563
  return 0;
236,184✔
564
}
565

566
static int32_t getTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid) {
106,594,768✔
567
  int32_t code = 0;
106,594,768✔
568
  int32_t lino = 0;
106,594,768✔
569

570
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
106,594,768✔
571
  if (data == NULL) {
106,676,059✔
572
    SStorageAPI api = {0}; 
39,920,358✔
573
    initStorageAPI(&api);
39,921,032✔
574
    code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, 0, &pReader->tagCachelock);
39,915,277✔
575
    TSDB_CHECK_CODE(code, lino, END);
39,921,257✔
576
  }
577

578
  END:
66,755,701✔
579
  if (code != TSDB_CODE_SUCCESS) {
106,642,453✔
580
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
581
  }
582
  
583
  return code;
106,582,959✔
584
}
585

586
void tqUpdateTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid, col_id_t colId) {
699,859✔
587
  int32_t code = 0;
699,859✔
588
  int32_t lino = 0;
699,859✔
589

590
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
699,859✔
591
  if (data == NULL) {
699,859✔
592
    return;
699,179✔
593
  }
594

595
  SStorageAPI api = {0}; 
680✔
596
  initStorageAPI(&api);
680✔
597
  code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, colId, &pReader->tagCachelock);
680✔
598
  TSDB_CHECK_CODE(code, lino, END);
680✔
599

600
  END:
680✔
601
  if (code != TSDB_CODE_SUCCESS) {
680✔
602
    tqError("%s failed at %d, failed to update tag cache code:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
603
  }
604
}
605

606
static int32_t tqRetrievePseudoCols(STqReader* pReader, SSDataBlock* pBlock, int32_t numOfRows, int64_t uid, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr) {
106,620,621✔
607
  if (pReader == NULL || pBlock == NULL) {
106,620,621✔
608
    return TSDB_CODE_INVALID_PARA;
×
609
  }
610
  int32_t code = TSDB_CODE_SUCCESS;
106,658,587✔
611
  int32_t lino = 0;
106,658,587✔
612
  
613
  code = getTableTagCache(pReader, pPseudoExpr, numOfPseudoExpr, uid);
106,658,587✔
614
  TSDB_CHECK_CODE(code, lino, END);
106,593,006✔
615

616
  code = fillTag(pReader->pTableTagCacheForTmq, pPseudoExpr, numOfPseudoExpr, uid, pBlock, numOfRows, pBlock->info.rows - numOfRows, 1, &pReader->tagCachelock);
106,593,006✔
617
  TSDB_CHECK_CODE(code, lino, END);
106,653,149✔
618

619
END:
106,653,149✔
620
  if (code != 0) {
106,653,149✔
621
    tqError("tqRetrievePseudoCols failed, line:%d, msg:%s", lino, tstrerror(code));
×
622
  }
623
  return code;
106,620,299✔
624
}
625

626
int32_t tqNextBlockInWal(STqReader* pReader, SSDataBlock* pRes, SHashObj* pCol2SlotId, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
9,339,157✔
627
                         int sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
628
  int32_t code = 0;
9,339,157✔
629
  if (pReader == NULL) {
9,339,157✔
630
    return TSDB_CODE_INVALID_PARA;
×
631
  }
632
  SWalReader* pWalReader = pReader->pWalReader;
9,339,157✔
633

634
  int64_t st = taosGetTimestampMs();
9,338,480✔
635
  while (1) {
109,274,794✔
636
    code = walNextValidMsg(pWalReader, false);
118,613,274✔
637
    if (code != 0) {
118,597,270✔
638
      break;
8,563,653✔
639
    }
640

641
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
110,033,617✔
642
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
110,093,794✔
643
    int64_t ver = pWalReader->pHead->head.version;
110,083,548✔
644
    SDecoder decoder = {0};
110,097,306✔
645
    code = tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder);
110,084,738✔
646
    tDecoderClear(&decoder);
110,042,962✔
647
    if (code != 0) {
110,036,192✔
648
      return code;
×
649
    }
650
    pReader->nextBlk = 0;
110,036,192✔
651

652
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
110,082,881✔
653
    while (pReader->nextBlk < numOfBlocks) {
220,145,458✔
654
      tqDebug("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
110,056,404✔
655
              pReader->msg.ver);
656

657
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
110,106,381✔
658
      if (pSubmitTbData == NULL) {
110,117,868✔
659
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
660
                pReader->msg.ver);
661
        return terrno;
×
662
      }
663
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
110,117,868✔
664
        pReader->nextBlk += 1;
×
665
        continue;
×
666
      }
667
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
110,116,428✔
668
        tqDebug("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
106,691,962✔
669
        int32_t numOfRows = pRes->info.rows;
106,686,086✔
670
        code = tqRetrieveCols(pReader, pRes, pCol2SlotId);
106,688,282✔
671
        if (code != TSDB_CODE_SUCCESS) {
106,613,927✔
672
          return code;
×
673
        }
674
        code = tqRetrievePseudoCols(pReader, pRes, numOfRows, pSubmitTbData->uid, pPseudoExpr, numOfPseudoExpr);
106,613,927✔
675
        if (code != TSDB_CODE_SUCCESS) {
106,596,429✔
676
          return code;
×
677
        }
678

679
      }
680
      pReader->nextBlk += 1;
110,023,799✔
681
    }
682

683
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
110,054,504✔
684
    pReader->msg.msgStr = NULL;
110,055,920✔
685

686
    if (pRes->info.rows >= minPollRows || (enableReplay && pRes->info.rows > 0)){
110,056,040✔
687
      break;
688
    }
689
    int64_t elapsed = taosGetTimestampMs() - st;
109,429,275✔
690
    if (elapsed > timeout || elapsed < 0) {
109,429,275✔
691
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
153,305✔
692
      terrno = code;
153,305✔
693
      break;
140,178✔
694
    }
695
  }
696
  return code;
9,337,482✔
697
}
698

699
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
154,804,616✔
700
  if (pReader == NULL) {
154,804,616✔
701
    return TSDB_CODE_INVALID_PARA;
×
702
  }
703
  pReader->msg.msgStr = msgStr;
154,804,616✔
704
  pReader->msg.msgLen = msgLen;
154,819,862✔
705
  pReader->msg.ver = ver;
154,848,482✔
706

707
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
154,850,686✔
708

709
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
154,850,686✔
710
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
154,864,521✔
711

712
  if (code != 0) {
154,860,272✔
713
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
714
  }
715

716
  return code;
154,793,668✔
717
}
718

719
void tqReaderClearSubmitMsg(STqReader* pReader) {
89,448,458✔
720
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
89,448,458✔
721
  pReader->nextBlk = 0;
89,452,942✔
722
  pReader->msg.msgStr = NULL;
89,479,956✔
723
}
89,493,530✔
724

725
SWalReader* tqGetWalReader(STqReader* pReader) {
18,931,513✔
726
  if (pReader == NULL) {
18,931,513✔
727
    return NULL;
×
728
  }
729
  return pReader->pWalReader;
18,931,513✔
730
}
731

732
int64_t tqGetResultBlockTime(STqReader* pReader) {
9,337,188✔
733
  if (pReader == NULL) {
9,337,188✔
734
    return 0;
×
735
  }
736
  return pReader->lastTs;
9,337,188✔
737
}
738

739
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
24,952,581✔
740
  int32_t code = false;
24,952,581✔
741
  int32_t lino = 0;
24,952,581✔
742
  int64_t uid = 0;
24,952,581✔
743
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
24,952,581✔
744
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
24,952,581✔
745
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
24,954,411✔
746

747
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
24,955,875✔
748
  while (pReader->nextBlk < blockSz) {
26,181,715✔
749
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
13,092,448✔
750
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
13,092,774✔
751
    uid = pSubmitTbData->uid;
13,092,774✔
752
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
13,092,774✔
753
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
13,094,458✔
754

755
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
1,226,269✔
756
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
757
    pReader->nextBlk++;
1,226,269✔
758
  }
759

760
  tqReaderClearSubmitMsg(pReader);
13,088,052✔
761
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
13,086,734✔
762

763
END:
13,086,734✔
764
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
24,954,923✔
765
  return code;
24,953,994✔
766
}
767

768
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
63,109,442✔
769
  int32_t code = false;
63,109,442✔
770
  int32_t lino = 0;
63,109,442✔
771
  int64_t uid = 0;
63,109,442✔
772

773
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
63,109,442✔
774
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
63,109,442✔
775
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
63,136,262✔
776

777
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
63,136,262✔
778
  while (pReader->nextBlk < blockSz) {
63,356,359✔
779
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
31,681,713✔
780
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
31,688,407✔
781
    uid = pSubmitTbData->uid;
31,688,407✔
782
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
31,688,407✔
783
    TSDB_CHECK_NULL(ret, code, lino, END, true);
31,689,361✔
784
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
196,267✔
785
    pReader->nextBlk++;
196,267✔
786
  }
787
  tqReaderClearSubmitMsg(pReader);
31,669,797✔
788
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
31,648,502✔
789

790
END:
31,648,502✔
791
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
63,141,596✔
792
  return code;
63,057,960✔
793
}
794

795
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
43,284,066✔
796
                    SExtSchema* extSrc) {
797
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
43,284,066✔
798
    return TSDB_CODE_INVALID_PARA;
×
799
  }
800
  int32_t code = 0;
43,319,646✔
801

802
  int32_t cnt = 0;
43,319,646✔
803
  for (int32_t i = 0; i < pSrc->nCols; i++) {
242,262,393✔
804
    cnt += mask[i];
198,930,946✔
805
  }
806

807
  pDst->nCols = cnt;
43,332,318✔
808
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
43,343,757✔
809
  if (pDst->pSchema == NULL) {
43,295,167✔
810
    return TAOS_GET_TERRNO(terrno);
×
811
  }
812

813
  int32_t j = 0;
43,294,616✔
814
  for (int32_t i = 0; i < pSrc->nCols; i++) {
242,299,254✔
815
    if (mask[i]) {
198,928,603✔
816
      pDst->pSchema[j++] = pSrc->pSchema[i];
198,982,059✔
817
      SColumnInfoData colInfo =
198,975,276✔
818
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
198,983,217✔
819
      if (extSrc != NULL) {
198,952,407✔
820
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
821
      }
822
      code = blockDataAppendColInfo(pBlock, &colInfo);
198,952,407✔
823
      if (code != 0) {
198,994,173✔
824
        return code;
×
825
      }
826
    }
827
  }
828
  return 0;
43,346,018✔
829
}
830

831
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
832
  int32_t code = 0;
×
833
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
834
    return TSDB_CODE_INVALID_PARA;
×
835
  }
836
  // TODO(yhDeng)
837
  if (COL_VAL_IS_VALUE(pColVal)) {
×
838
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
839
    if (val == NULL) {
×
840
      return terrno;
×
841
    }
842

843
    uint64_t seq = 0;
×
844
    int32_t  len = 0;
×
845
    if (pColVal->value.pData != NULL) {
×
846
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
847
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
848
      }
849
      SBlobItem item = {0};
×
850
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
851
      if (code != 0) {
×
852
        taosMemoryFree(val);
×
853
        terrno = code;
×
854
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
855
        return code;
×
856
      }
857

858
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
859
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
860
      len = item.len;
×
861
    }
862

863
    blobDataSetLen(val, len);
×
864
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
865

866
    taosMemoryFree(val);
×
867
  } else {
868
    colDataSetNULL(pColumnInfoData, idx);
×
869
  }
870
  return code;
×
871
}
872
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
873
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
874

875
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
876
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
877
      char val[65535 + 2] = {0};
2,147,483,647✔
878
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
879
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647✔
880
      }
881
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
882
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
883
    } else {
884
      colDataSetNULL(pColumnInfoData, rowIndex);
×
885
    }
886
  } else {
887
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
888
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
889
  }
890

891
  return code;
2,147,483,647✔
892
}
893

894
static int32_t setBlockData(SSDataBlock* pBlock, int32_t slotId, int32_t rowIndex, SColVal* colVal, SBlobSet* pBlobSet) {
2,147,483,647✔
895
  int32_t        code = 0;
2,147,483,647✔
896
  SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
897
  if (pColData == NULL) {
2,147,483,647✔
898
    return terrno;
×
899
  }
900

901
  uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
902
  if (isBlob == 0) {
2,147,483,647✔
903
    code = doSetVal(pColData, rowIndex, colVal);
2,147,483,647✔
904
  } else {
905
    code = doSetBlobVal(pColData, rowIndex, colVal, pBlobSet);
×
906
  }
907
  return code;
2,147,483,647✔
908
}
909

910
static int32_t processSubmitRow(SArray*         pRows,
106,672,508✔
911
                                SSDataBlock*    pBlock,
912
                                SHashObj*       pCol2SlotId,
913
                                STqReader*      pReader,
914
                                SBlobSet*       pBlobSet) {
915
  int32_t        code = 0;
106,672,508✔
916
  int32_t        line = 0;
106,672,508✔
917

918
  SArray* pColArray = taosArrayInit(4, INT_BYTES * 2);
106,672,508✔
919
  TSDB_CHECK_NULL(pColArray, code, line, END, terrno);
106,682,337✔
920

921
  int32_t sourceIdx = -1;
106,682,337✔
922
  int32_t rowIndex = 0;
106,682,337✔
923
  SRow* pRow = taosArrayGetP(pRows, rowIndex);
106,682,337✔
924
  TSDB_CHECK_NULL(pRow, code, line, END, terrno);
106,681,965✔
925
  while (++sourceIdx < pReader->pTSchema->numOfCols) {
873,753,869✔
926
    SColVal colVal = {0};
767,238,503✔
927
    code = tRowGet(pRow, pReader->pTSchema, sourceIdx, &colVal);
767,141,723✔
928
    TSDB_CHECK_CODE(code, line, END);
767,249,492✔
929
    void* pSlotId = taosHashGet(pCol2SlotId, &colVal.cid, sizeof(colVal.cid));
767,249,492✔
930
    if (pSlotId == NULL) {
767,781,665✔
931
      continue;
283,036,436✔
932
    }
933
    int32_t pData[2] = {sourceIdx, *(int16_t*)pSlotId};
484,745,229✔
934
    TSDB_CHECK_NULL(taosArrayPush(pColArray, pData), code, line, END, terrno);
484,624,147✔
935
    code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
484,624,147✔
936
    TSDB_CHECK_CODE(code, line, END);
484,023,753✔
937
  }
938
  
939
  for (rowIndex = 1; rowIndex < taosArrayGetSize(pRows); rowIndex++) {
2,147,483,647✔
940
    SRow* pRow = taosArrayGetP(pRows, rowIndex);
2,147,483,647✔
941
    TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
942
    for (int32_t j = 0; j < taosArrayGetSize(pColArray); j++) {
2,147,483,647✔
943
      int32_t* pData = taosArrayGet(pColArray, j);
2,147,483,647✔
944
      TSDB_CHECK_NULL(pData, code, line, END, terrno);
2,147,483,647✔
945

946
      SColVal colVal = {0};
2,147,483,647✔
947
      code = tRowGet(pRow, pReader->pTSchema, pData[0], &colVal);
2,147,483,647✔
948
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
949

950
      code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
2,147,483,647✔
951
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
952
    }
953
  }
954

955
  END:
107,503,035✔
956
  taosArrayDestroy(pColArray);
92,048,399✔
957
  return code;
106,646,988✔
958
}
959

960
static int32_t processSubmitCol(SArray*         pCols,
1,352✔
961
                                SSDataBlock*    pBlock,
962
                                SHashObj*       pCol2SlotId,
963
                                SBlobSet*       pBlobSet) {
964
  int32_t        code = 0;
1,352✔
965
  int32_t        line = 0;
1,352✔
966

967
  for (int32_t i = 0; i < taosArrayGetSize(pCols); i++) {
4,056✔
968
    SColData* pCol = taosArrayGet(pCols, i);
2,704✔
969
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
2,704✔
970
    void* pSlotId = taosHashGet(pCol2SlotId, &pCol->cid, sizeof(pCol->cid));
2,704✔
971
    if (pSlotId == NULL) {
2,704✔
972
      continue;
1,352✔
973
    }
974
    SColVal colVal = {0};
1,352✔
975
    for (int32_t row = 0; row < pCol->nVal; row++) {
4,056✔
976
      code = tColDataGetValue(pCol, row, &colVal);
2,704✔
977
      TSDB_CHECK_CODE(code, line, END);
2,704✔
978

979
      code = setBlockData(pBlock, *(int16_t*)pSlotId, pBlock->info.rows + row, &colVal, pBlobSet);
2,704✔
980
      TSDB_CHECK_CODE(code, line, END);
2,704✔
981
    }
982
  }
983
  
984
  END:
1,352✔
985
  return code;
1,352✔
986
}
987

988
static int32_t checkSchema(STqReader* pReader, SSubmitTbData* pSubmitTbData) {
106,663,902✔
989
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
106,663,902✔
990
  int32_t sversion = pSubmitTbData->sver;
106,690,892✔
991
  int64_t suid = pSubmitTbData->suid;
106,689,814✔
992
  int64_t uid = pSubmitTbData->uid;
106,688,067✔
993
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
106,688,350✔
994
      (pReader->cachedSchemaVer != sversion)) {
106,538,488✔
995
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
152,790✔
996
    taosMemoryFreeClear(pReader->extSchema);
151,461✔
997
    taosMemoryFreeClear(pReader->pTSchema);
151,803✔
998
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
151,461✔
999
    if (pReader->pSchemaWrapper == NULL) {
152,097✔
1000
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d, possibly dropped table",
×
1001
              vgId, suid, uid, pReader->cachedSchemaVer);
1002
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
1003
    }
1004
    pReader->pTSchema = tBuildTSchema(pReader->pSchemaWrapper->pSchema, pReader->pSchemaWrapper->nCols, pReader->pSchemaWrapper->version);
152,097✔
1005
    if (pReader->pTSchema == NULL) {
152,097✔
1006
      tqWarn("vgId:%d, cannot build schema for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d",
×
1007
              vgId, suid, uid, pReader->cachedSchemaVer);
1008
      return terrno;
×
1009
    }
1010
    pReader->cachedSchemaUid = uid;
151,803✔
1011
    pReader->cachedSchemaSuid = suid;
152,097✔
1012
    pReader->cachedSchemaVer = sversion;
152,097✔
1013
  }
1014
  return TSDB_CODE_SUCCESS;
106,684,065✔
1015
}
1016

1017
static int32_t tqRetrieveCols(STqReader* pReader, SSDataBlock* pBlock, SHashObj* pCol2SlotId) {
106,682,812✔
1018
  if (pReader == NULL || pBlock == NULL) {
106,682,812✔
1019
    return TSDB_CODE_INVALID_PARA;
×
1020
  }
1021
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
106,688,668✔
1022
  int32_t        code = 0;
106,689,034✔
1023
  int32_t        line = 0;
106,689,034✔
1024
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
106,689,034✔
1025
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
106,688,658✔
1026
  pReader->lastTs = pSubmitTbData->ctimeMs;
106,688,658✔
1027

1028
  int32_t numOfRows = 0;
106,691,962✔
1029
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
106,691,962✔
1030
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
1,352✔
1031
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
1,352✔
1032
    numOfRows = pCol->nVal;
1,352✔
1033
  } else {
1034
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
106,661,646✔
1035
  }
1036

1037
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfRows);
106,690,132✔
1038
  TSDB_CHECK_CODE(code, line, END);
106,686,271✔
1039

1040
  code = checkSchema(pReader, pSubmitTbData);
106,686,271✔
1041
  TSDB_CHECK_CODE(code, line, END);
106,684,455✔
1042

1043
  // convert and scan one block
1044
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
106,684,455✔
1045
    SArray* pCols = pSubmitTbData->aCol;
1,352✔
1046
    code = processSubmitCol(pCols, pBlock, pCol2SlotId, pSubmitTbData->pBlobSet);
1,352✔
1047
    TSDB_CHECK_CODE(code, line, END);
1,352✔
1048
  } else {
1049
    SArray*         pRows = pSubmitTbData->aRowP;
106,685,299✔
1050
    code = processSubmitRow(pRows, pBlock, pCol2SlotId, pReader, pSubmitTbData->pBlobSet);
106,684,172✔
1051
    TSDB_CHECK_CODE(code, line, END);
106,621,370✔
1052
  }
1053
  pBlock->info.rows += numOfRows;
106,622,722✔
1054
END:
106,651,704✔
1055
  if (code != 0) {
106,651,704✔
1056
    tqError("tqRetrieveCols failed, line:%d, msg:%s", line, tstrerror(code));
×
1057
  }
1058
  return code;
106,623,765✔
1059
}
1060

1061
#define PROCESS_VAL                                      \
1062
  if (curRow == 0) {                                     \
1063
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
1064
    buildNew = true;                                     \
1065
  } else {                                               \
1066
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
1067
    if (currentRowAssigned != assigned[j]) {             \
1068
      assigned[j] = currentRowAssigned;                  \
1069
      buildNew = true;                                   \
1070
    }                                                    \
1071
  }
1072

1073
#define SET_DATA                                                                                    \
1074
  if (colVal.cid < pColData->info.colId) {                                                          \
1075
    sourceIdx++;                                                                                    \
1076
  } else if (colVal.cid == pColData->info.colId) {                                                  \
1077
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
1078
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
1079
    } else {                                                                                        \
1080
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
1081
    }                                                                                               \
1082
    sourceIdx++;                                                                                    \
1083
    targetIdx++;                                                                                    \
1084
  } else {                                                                                          \
1085
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
1086
    targetIdx++;                                                                                    \
1087
  }
1088

1089
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
43,268,159✔
1090
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
1091
  int32_t         code = 0;
43,268,159✔
1092
  SSchemaWrapper* pSW = NULL;
43,268,159✔
1093
  SSDataBlock*    block = NULL;
43,301,744✔
1094
  if (taosArrayGetSize(blocks) > 0) {
43,301,744✔
1095
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
1096
    TQ_NULL_GO_TO_END(pLastBlock);
×
1097
    pLastBlock->info.rows = curRow - *lastRow;
×
1098
    *lastRow = curRow;
×
1099
  }
1100

1101
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
43,316,163✔
1102
  TQ_NULL_GO_TO_END(block);
43,270,945✔
1103

1104
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
43,270,945✔
1105
  TQ_NULL_GO_TO_END(pSW);
43,281,199✔
1106

1107
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
43,281,199✔
1108
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
43,345,323✔
1109
          (int32_t)taosArrayGetSize(block->pDataBlock));
1110

1111
  block->info.id.uid = pSubmitTbData->uid;
43,345,323✔
1112
  block->info.version = pReader->msg.ver;
43,330,004✔
1113
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
43,333,548✔
1114
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
43,334,807✔
1115
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
43,336,737✔
1116
  pSW = NULL;
43,336,737✔
1117

1118
  taosMemoryFreeClear(block);
43,336,737✔
1119

1120
END:
43,335,293✔
1121
  if (code != 0) {
43,324,294✔
1122
    tqError("processBuildNew failed, code:%d", code);
×
1123
  }
1124
  tDeleteSchemaWrapper(pSW);
43,324,294✔
1125
  blockDataFreeRes(block);
43,292,942✔
1126
  taosMemoryFree(block);
43,293,191✔
1127
  return code;
43,328,525✔
1128
}
1129
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
567,929✔
1130
  int32_t code = 0;
567,929✔
1131
  int32_t curRow = 0;
567,929✔
1132
  int32_t lastRow = 0;
567,929✔
1133

1134
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
567,929✔
1135
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
568,256✔
1136
  TQ_NULL_GO_TO_END(assigned);
568,256✔
1137

1138
  SArray*   pCols = pSubmitTbData->aCol;
568,256✔
1139
  SColData* pCol = taosArrayGet(pCols, 0);
568,256✔
1140
  TQ_NULL_GO_TO_END(pCol);
568,256✔
1141
  int32_t numOfRows = pCol->nVal;
568,256✔
1142
  int32_t numOfCols = taosArrayGetSize(pCols);
568,256✔
1143
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
568,256✔
1144
          numOfRows);
1145
  for (int32_t i = 0; i < numOfRows; i++) {
86,550,104✔
1146
    bool buildNew = false;
85,967,474✔
1147

1148
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
343,618,823✔
1149
      int32_t k = 0;
256,208,979✔
1150
      for (; k < numOfCols; k++) {
508,971,677✔
1151
        pCol = taosArrayGet(pCols, k);
498,959,970✔
1152
        TQ_NULL_GO_TO_END(pCol);
488,826,470✔
1153
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
488,826,470✔
1154
          SColVal colVal = {0};
254,999,147✔
1155
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
255,817,962✔
1156
          PROCESS_VAL
258,107,363✔
1157
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
258,153,260✔
1158
          break;
257,548,121✔
1159
        }
1160
      }
1161
      if (k >= numOfCols) {
257,651,349✔
1162
        // this column is not in the current row, so we set it to NULL
1163
        assigned[j] = 0;
×
1164
        buildNew = true;
×
1165
      }
1166
    }
1167

1168
    if (buildNew) {
85,586,192✔
1169
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
568,256✔
1170
    }
1171

1172
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
85,586,192✔
1173
    TQ_NULL_GO_TO_END(pBlock);
86,069,716✔
1174

1175
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
86,069,716✔
1176
            (int32_t)taosArrayGetSize(blocks));
1177

1178
    int32_t targetIdx = 0;
86,069,716✔
1179
    int32_t sourceIdx = 0;
86,069,716✔
1180
    int32_t colActual = blockDataGetNumOfCols(pBlock);
86,069,716✔
1181
    while (targetIdx < colActual && sourceIdx < numOfCols) {
342,286,160✔
1182
      pCol = taosArrayGet(pCols, sourceIdx);
256,304,312✔
1183
      TQ_NULL_GO_TO_END(pCol);
250,403,083✔
1184
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
250,403,083✔
1185
      TQ_NULL_GO_TO_END(pColData);
254,207,749✔
1186
      SColVal colVal = {0};
254,207,749✔
1187
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
255,363,652✔
1188
      SET_DATA
257,703,515✔
1189
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
256,569,421✔
1190
    }
1191

1192
    curRow++;
85,981,848✔
1193
  }
1194
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
582,630✔
1195
  pLastBlock->info.rows = curRow - lastRow;
568,256✔
1196
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
567,929✔
1197
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1198
END:
11,885,670✔
1199
  if (code != TSDB_CODE_SUCCESS) {
567,929✔
1200
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1201
  }
1202
  taosMemoryFree(assigned);
567,929✔
1203
  return code;
568,256✔
1204
}
1205

1206
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
42,677,408✔
1207
  int32_t   code = 0;
42,677,408✔
1208
  STSchema* pTSchema = NULL;
42,677,408✔
1209

1210
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
42,677,408✔
1211
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
42,727,658✔
1212
  TQ_NULL_GO_TO_END(assigned);
42,714,867✔
1213

1214
  int32_t curRow = 0;
42,714,867✔
1215
  int32_t lastRow = 0;
42,714,867✔
1216
  SArray* pRows = pSubmitTbData->aRowP;
42,699,858✔
1217
  int32_t numOfRows = taosArrayGetSize(pRows);
42,760,285✔
1218
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
42,750,572✔
1219
  TQ_NULL_GO_TO_END(pTSchema);
42,763,674✔
1220
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
42,763,674✔
1221

1222
  for (int32_t i = 0; i < numOfRows; i++) {
1,335,867,024✔
1223
    bool  buildNew = false;
1,293,274,438✔
1224
    SRow* pRow = taosArrayGetP(pRows, i);
1,293,274,438✔
1225
    TQ_NULL_GO_TO_END(pRow);
1,292,457,203✔
1226

1227
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1228
      SColVal colVal = {0};
2,147,483,647✔
1229
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647✔
1230
      PROCESS_VAL
2,147,483,647✔
1231
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647✔
1232
    }
1233

1234
    if (buildNew) {
1,282,705,690✔
1235
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
42,778,375✔
1236
    }
1237

1238
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1,282,681,667✔
1239
    TQ_NULL_GO_TO_END(pBlock);
1,293,072,990✔
1240

1241
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1,293,072,990✔
1242
            (int32_t)taosArrayGetSize(blocks));
1243

1244
    int32_t targetIdx = 0;
1,293,072,990✔
1245
    int32_t sourceIdx = 0;
1,293,072,990✔
1246
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1,293,072,990✔
1247
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647✔
1248
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1249
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647✔
1250
      SColVal          colVal = {0};
2,147,483,647✔
1251
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647✔
1252
      SET_DATA
2,147,483,647✔
1253
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647✔
1254
    }
1255

1256
    curRow++;
1,293,172,052✔
1257
  }
1258
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
42,592,586✔
1259
  if (pLastBlock != NULL) {
42,773,168✔
1260
    pLastBlock->info.rows = curRow - lastRow;
42,775,470✔
1261
  }
1262

1263
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
42,777,040✔
1264
          (int)taosArrayGetSize(blocks));
1265
END:
43,027,499✔
1266
  if (code != TSDB_CODE_SUCCESS) {
42,761,039✔
1267
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1268
  }
1269
  taosMemoryFreeClear(pTSchema);
42,735,936✔
1270
  taosMemoryFree(assigned);
42,700,702✔
1271
  return code;
42,741,796✔
1272
}
1273

1274
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
2,375✔
1275
  int32_t code = 0;
2,375✔
1276
  int32_t lino = 0;
2,375✔
1277
  void*   createReq = NULL;
2,375✔
1278
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
2,375✔
1279
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
2,375✔
1280

1281
  if (pRsp->createTableNum == 0) {
2,375✔
1282
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
2,009✔
1283
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
2,009✔
1284
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
2,009✔
1285
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
2,009✔
1286
  }
1287

1288
  uint32_t len = 0;
2,375✔
1289
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
2,375✔
1290
  TSDB_CHECK_CODE(code, lino, END);
2,375✔
1291
  createReq = taosMemoryCalloc(1, len);
2,375✔
1292
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
2,375✔
1293

1294
  SEncoder encoder = {0};
2,375✔
1295
  tEncoderInit(&encoder, createReq, len);
2,375✔
1296
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
2,375✔
1297
  tEncoderClear(&encoder);
2,375✔
1298
  TSDB_CHECK_CODE(code, lino, END);
2,375✔
1299
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
4,750✔
1300
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
4,750✔
1301
  pRsp->createTableNum++;
2,375✔
1302
  tqTrace("build create table info msg success");
2,375✔
1303

1304
END:
2,375✔
1305
  if (code != 0) {
2,375✔
1306
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1307
    taosMemoryFree(createReq);
×
1308
  }
1309
  return code;
2,375✔
1310
}
1311

1312
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
43,351,595✔
1313
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1314
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
43,351,595✔
1315
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
43,351,595✔
1316
  if (pSubmitTbData == NULL) {
43,359,929✔
1317
    return terrno;
×
1318
  }
1319
  pReader->nextBlk++;
43,359,929✔
1320

1321
  if (pSubmitTbDataRet) {
43,352,151✔
1322
    *pSubmitTbDataRet = pSubmitTbData;
43,356,897✔
1323
  }
1324

1325
  if (fetchMeta == ONLY_META) {
43,352,714✔
1326
    if (pSubmitTbData->pCreateTbReq != NULL) {
975✔
1327
      if (pRsp->createTableReq == NULL) {
975✔
1328
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
975✔
1329
        if (pRsp->createTableReq == NULL) {
975✔
1330
          return terrno;
×
1331
        }
1332
      }
1333
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
1,950✔
1334
        return terrno;
×
1335
      }
1336
      pSubmitTbData->pCreateTbReq = NULL;
975✔
1337
    }
1338
    return 0;
975✔
1339
  }
1340

1341
  int32_t sversion = pSubmitTbData->sver;
43,351,739✔
1342
  int64_t uid = pSubmitTbData->uid;
43,351,466✔
1343
  pReader->lastBlkUid = uid;
43,350,901✔
1344

1345
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
43,354,898✔
1346
  taosMemoryFreeClear(pReader->extSchema);
43,351,285✔
1347
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
43,357,067✔
1348
  if (pReader->pSchemaWrapper == NULL) {
43,302,772✔
1349
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
13,971✔
1350
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1351
    pReader->cachedSchemaSuid = 0;
13,971✔
1352
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
13,971✔
1353
  }
1354

1355
  if (pSubmitTbData->pCreateTbReq != NULL) {
43,285,033✔
1356
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
2,375✔
1357
    if (code != 0) {
2,375✔
1358
      return code;
×
1359
    }
1360
  } else if (rawList != NULL) {
43,279,474✔
1361
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1362
      return terrno;
×
1363
    }
1364
    pReader->pSchemaWrapper = NULL;
×
1365
    return 0;
×
1366
  }
1367

1368
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
43,281,849✔
1369
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
567,929✔
1370
  } else {
1371
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
42,710,735✔
1372
  }
1373
}
1374

1375
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
351,658✔
1376
  if (pReader == NULL || tbUidList == NULL) {
351,658✔
1377
    return TSDB_CODE_SUCCESS;
×
1378
  }
1379
  if (pReader->tbIdHash) {
351,658✔
1380
    taosHashClear(pReader->tbIdHash);
×
1381
  } else {
1382
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
351,658✔
1383
    if (pReader->tbIdHash == NULL) {
351,658✔
1384
      tqError("s-task:%s failed to init hash table", id);
×
1385
      return terrno;
×
1386
    }
1387
  }
1388

1389
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
10,015,852✔
1390
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
9,672,066✔
1391
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
9,665,118✔
1392
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1393
      continue;
×
1394
    }
1395
  }
1396

1397
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
348,706✔
1398
  return TSDB_CODE_SUCCESS;
351,658✔
1399
}
1400

1401
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
438,500✔
1402
  if (pReader == NULL || pTableUidList == NULL) {
438,500✔
1403
    return;
×
1404
  }
1405
  if (pReader->tbIdHash == NULL) {
438,500✔
1406
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1407
    if (pReader->tbIdHash == NULL) {
×
1408
      tqError("failed to init hash table");
×
1409
      return;
×
1410
    }
1411
  }
1412

1413
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
438,500✔
1414
  for (int i = 0; i < numOfTables; i++) {
985,375✔
1415
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
546,875✔
1416
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
546,875✔
1417
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1418
      continue;
×
1419
    }
1420
    tqDebug("%s add table uid:%" PRId64 " to hash", __func__, *pKey);
546,875✔
1421
  }
1422
}
1423

1424
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1425
  if (pReader == NULL) {
×
1426
    return false;
×
1427
  }
1428
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1429
}
1430

1431
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1432
  if (pReader == NULL) {
×
1433
    return false;
×
1434
  }
1435
  return pReader->msg.msgStr == NULL;
×
1436
}
1437

1438
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
2,196✔
1439
  if (pReader == NULL || tbUidList == NULL) {
2,196✔
1440
    return;
×
1441
  }
1442
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
4,392✔
1443
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
2,196✔
1444
    int32_t code = taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
2,196✔
1445
    if (code != 0) {
2,196✔
1446
      tqWarn("%s failed to remove table uid:%" PRId64 " from hash, msg:%s", __func__, pKey != NULL ? *pKey : 0, tstrerror(code));
732✔
1447
    }
1448
  }
1449
}
1450

1451
int32_t tqDeleteTbUidList(STQ* pTq, SArray* tbUidList) {
2,473,121✔
1452
  if (pTq == NULL) {
2,473,121✔
1453
    return 0;  // mounted vnode may have no tq
×
1454
  }
1455
  if (tbUidList == NULL) {
2,473,121✔
1456
    return TSDB_CODE_INVALID_PARA;
×
1457
  }
1458
  void*   pIter = NULL;
2,473,121✔
1459
  int32_t vgId = TD_VID(pTq->pVnode);
2,473,121✔
1460

1461
  // update the table list for each consumer handle
1462
  taosWLockLatch(&pTq->lock);
2,473,121✔
1463
  while (1) {
335,984✔
1464
    pIter = taosHashIterate(pTq->pHandle, pIter);
2,809,105✔
1465
    if (pIter == NULL) {
2,809,105✔
1466
      break;
2,473,121✔
1467
    }
1468

1469
    STqHandle* pTqHandle = (STqHandle*)pIter;
335,984✔
1470
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " delete table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
335,984✔
1471
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
335,984✔
UNCOV
1472
      int32_t code = qDeleteTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
UNCOV
1473
      if (code != 0) {
×
1474
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1475
        continue;
×
1476
      }
1477
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
335,984✔
1478
      int32_t sz = taosArrayGetSize(tbUidList);
335,984✔
1479
      for (int32_t i = 0; i < sz; i++) {
771,164✔
1480
        int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
435,180✔
1481
        if (tbUid &&
870,360✔
1482
            taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
435,180✔
1483
          tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1484
          continue;
×
1485
        }
1486
      }
1487
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
1488
      tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1489
    }
1490
  }
1491
  taosWUnLockLatch(&pTq->lock);
2,473,121✔
1492
  return 0;
2,473,121✔
1493
}
1494

1495
static SArray* copyUidList(const SArray* tbUidList) {
13,524✔
1496
  SArray* tbUidListCopy = taosArrayInit(4, sizeof(int64_t));
13,524✔
1497
  if (tbUidListCopy == NULL) {
13,524✔
1498
    return NULL;
×
1499
  }
1500

1501
  if (taosArrayAddAll(tbUidListCopy, tbUidList) == NULL) {
13,524✔
1502
    taosArrayDestroy(tbUidListCopy);
×
1503
    tqError("copy table uid list failed");
×
1504
    return NULL;
×
1505
  }
1506
  return tbUidListCopy;
13,524✔
1507
}
1508

1509
static int32_t addTableListForStableTmq(STqHandle* pTqHandle, STQ* pTq, const SArray* tbUidList) {
13,524✔
1510
  int32_t code = 0;
13,524✔
1511
  SArray* tbUidListCopy = copyUidList(tbUidList);
13,524✔
1512
  if (tbUidListCopy == NULL) {
13,524✔
1513
    code = terrno;
×
1514
    goto END;
×
1515
  }
1516
  code = qFilterTableList(pTq->pVnode, tbUidListCopy, pTqHandle->execHandle.execTb.node,
13,524✔
1517
                      pTqHandle->execHandle.task, pTqHandle->execHandle.execTb.suid);
13,524✔
1518
  if (code != TDB_CODE_SUCCESS) {
13,524✔
1519
    tqError("tqAddTbUidList error:%d handle %s consumer:0x%" PRIx64, code, pTqHandle->subKey,
×
1520
            pTqHandle->consumerId);
1521
    goto END;
×
1522
  }
1523
  tqDebug("%s handle %s consumer:0x%" PRIx64 " add %d tables to tqReader", __func__, pTqHandle->subKey,
13,524✔
1524
          pTqHandle->consumerId, (int32_t)taosArrayGetSize(tbUidListCopy));
1525
  tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, tbUidListCopy);
13,524✔
1526

1527
END:
13,524✔
1528
  taosArrayDestroy(tbUidListCopy);
13,524✔
1529
  return code;
13,524✔
1530
}
1531

1532
int32_t tqAddTbUidList(STQ* pTq, const SArray* tbUidList) {
59,954,779✔
1533
  if (pTq == NULL) {
59,954,779✔
1534
    return 0;  // mounted vnode may have no tq
×
1535
  }
1536
  if (tbUidList == NULL) {
59,954,779✔
1537
    return TSDB_CODE_INVALID_PARA;
×
1538
  }
1539
  void*   pIter = NULL;
59,954,779✔
1540
  int32_t vgId = TD_VID(pTq->pVnode);
59,954,779✔
1541
  int32_t code = 0;
59,955,588✔
1542

1543
  // update the table list for each consumer handle
1544
  taosWLockLatch(&pTq->lock);
59,955,588✔
1545
  while (1) {
1,101,288✔
1546
    pIter = taosHashIterate(pTq->pHandle, pIter);
61,056,507✔
1547
    if (pIter == NULL) {
61,056,876✔
1548
      break;
59,955,588✔
1549
    }
1550

1551
    STqHandle* pTqHandle = (STqHandle*)pIter;
1,101,288✔
1552
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " add table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
1,101,288✔
1553
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,101,288✔
1554
      code = qAddTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
424,976✔
1555
      if (code != 0) {
424,976✔
1556
        tqError("add table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1557
        break;
×
1558
      }
1559
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
676,312✔
1560
      code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
11,328✔
1561
      if (code != 0) {
11,328✔
1562
        tqError("add table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1563
        break;
×
1564
      }
1565
    }
1566
  }
1567
  taosHashCancelIterate(pTq->pHandle, pIter);
59,955,588✔
1568
  taosWUnLockLatch(&pTq->lock);
59,954,710✔
1569

1570
  return code;
59,955,263✔
1571
}
1572

1573
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, SArray* cidList, SArray* cidListArray) {
7,966,216✔
1574
  if (pTq == NULL) {
7,966,216✔
1575
    return 0;  // mounted vnode may have no tq
×
1576
  }
1577
  if (tbUidList == NULL) {
7,966,216✔
1578
    return TSDB_CODE_INVALID_PARA;
×
1579
  }
1580
  void*   pIter = NULL;
7,966,216✔
1581
  int32_t vgId = TD_VID(pTq->pVnode);
7,966,216✔
1582
  int32_t code = 0;
7,966,216✔
1583
  // update the table list for each consumer handle
1584
  taosWLockLatch(&pTq->lock);
7,966,216✔
1585
  while (1) {
2,876✔
1586
    pIter = taosHashIterate(pTq->pHandle, pIter);
7,969,092✔
1587
    if (pIter == NULL) {
7,969,092✔
1588
      break;
7,966,216✔
1589
    }
1590

1591
    STqHandle* pTqHandle = (STqHandle*)pIter;
2,876✔
1592
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
2,876✔
1593
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,876✔
1594
      SNode* pTagCond = getTagCondNodeForQueryTmq(pTqHandle->execHandle.task);
680✔
1595
      bool ret = checkCidInTagCondition(pTagCond, cidList);
680✔
1596
      if (ret){
680✔
1597
        code = qUpdateTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
1598
        if (code != 0) {
×
1599
          tqError("update table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1600
          break;
×
1601
        }
1602
      }
1603
      qUpdateTableTagCacheForTmq(pTqHandle->execHandle.task, tbUidList, cidList, cidListArray);
680✔
1604
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
2,196✔
1605
      SNode* pTagCond = getTagCondNodeForStableTmq(pTqHandle->execHandle.execTb.node);
2,196✔
1606
      bool ret = checkCidInTagCondition(pTagCond, cidList);
2,196✔
1607
      if (ret){
2,196✔
1608
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
2,196✔
1609
        code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
2,196✔
1610
        if (code != 0) {
2,196✔
1611
          tqError("update table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1612
          break;
×
1613
        }
1614
      }
1615
    }
1616
  }
1617

1618
  taosHashCancelIterate(pTq->pHandle, pIter);
7,966,216✔
1619
  taosWUnLockLatch(&pTq->lock);
7,966,216✔
1620

1621
  return code;
7,966,216✔
1622
}
1623

1624
static void destroySourceScanTables(void* ptr) {
×
1625
  SArray** pTables = ptr;
×
1626
  if (pTables && *pTables) {
×
1627
    taosArrayDestroy(*pTables);
×
1628
    *pTables = NULL;
×
1629
  }
1630
}
×
1631

1632
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1633
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1634
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1635
  if (pCol1->vColId == pCol2->vColId) {
×
1636
    return 0;
×
1637
  } else if (pCol1->vColId < pCol2->vColId) {
×
1638
    return -1;
×
1639
  } else {
1640
    return 1;
×
1641
  }
1642
}
1643

1644
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1645
  if (value) {
×
1646
    SSchemaWrapper* pSchemaWrapper = value;
×
1647
    tDeleteSchemaWrapper(pSchemaWrapper);
1648
  }
1649
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc