• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4819

20 Oct 2025 02:42AM UTC coverage: 61.392% (+0.3%) from 61.125%
#4819

push

travis-ci

happyguoxy
add coverage result json

156672 of 324369 branches covered (48.3%)

Branch coverage included in aggregate %.

207936 of 269535 relevant lines covered (77.15%)

242336611.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.15
/source/dnode/vnode/src/vnd/vnodeStream.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <stdbool.h>
17
#include <stdint.h>
18
#include "nodes.h"
19
#include "osMemPool.h"
20
#include "osMemory.h"
21
#include "scalar.h"
22
#include "streamReader.h"
23
#include "taosdef.h"
24
#include "tarray.h"
25
#include "tcommon.h"
26
#include "tdatablock.h"
27
#include "tdb.h"
28
#include "tdef.h"
29
#include "tencode.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "tlist.h"
33
#include "tmsg.h"
34
#include "tsimplehash.h"
35
#include "vnd.h"
36
#include "vnode.h"
37
#include "vnodeInt.h"
38

39
#define BUILD_OPTION(options, sStreamReaderInfo, _ver, _order, startTime, endTime, _schemas, _isSchema, _scanMode, \
40
                     _gid, _initReader, _mapInfo)                                                                        \
41
  SStreamTriggerReaderTaskInnerOptions options = {.suid = (_mapInfo == NULL ? sStreamReaderInfo->suid : 0),              \
42
                                                  .ver = _ver,                                                     \
43
                                                  .order = _order,                                                 \
44
                                                  .twindows = {.skey = startTime, .ekey = endTime},                \
45
                                                  .sStreamReaderInfo = sStreamReaderInfo,                     \
46
                                                  .schemas = _schemas,                                             \
47
                                                  .isSchema = _isSchema,                                           \
48
                                                  .scanMode = _scanMode,                                           \
49
                                                  .gid = _gid,                                                     \
50
                                                  .initReader = _initReader,                                       \
51
                                                  .mapInfo = _mapInfo};
52

53
typedef struct WalMetaResult {
54
  uint64_t    id;
55
  int64_t     skey;
56
  int64_t     ekey;
57
} WalMetaResult;
58

59
static int64_t getSessionKey(int64_t session, int64_t type) { return (session | (type << 32)); }
39,186,952✔
60

61
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas);
62

63
int32_t sortCid(const void *lp, const void *rp) {
488,895✔
64
  int16_t* c1 = (int16_t*)lp;
488,895✔
65
  int16_t* c2 = (int16_t*)rp;
488,895✔
66

67
  if (*c1 < *c2) {
488,895✔
68
    return -1;
483,763✔
69
  } else if (*c1 > *c2) {
5,132!
70
    return 1;
5,132✔
71
  }
72

73
  return 0;
×
74
}
75

76
int32_t sortSSchema(const void *lp, const void *rp) {
488,895✔
77
  SSchema* c1 = (SSchema*)lp;
488,895✔
78
  SSchema* c2 = (SSchema*)rp;
488,895✔
79

80
  if (c1->colId < c2->colId) {
488,895✔
81
    return -1;
483,763✔
82
  } else if (c1->colId > c2->colId) {
5,132!
83
    return 1;
5,132✔
84
  }
85

86
  return 0;
×
87
}
88

89
static int32_t addColData(SSDataBlock* pResBlock, int32_t index, void* data) {
34,517,896✔
90
  SColumnInfoData* pSrc = taosArrayGet(pResBlock->pDataBlock, index);
34,517,896✔
91
  if (pSrc == NULL) {
34,554,118!
92
    return terrno;
×
93
  }
94

95
  memcpy(pSrc->pData + pResBlock->info.rows * pSrc->info.bytes, data, pSrc->info.bytes);
34,554,118!
96
  return 0;
34,524,724✔
97
}
98

99
static int32_t getTableDataInfo(SStreamReaderTaskInner* pTask, bool* hasNext) {
45,050,939✔
100
  int32_t code = pTask->api.tsdReader.tsdNextDataBlock(pTask->pReader, hasNext);
45,050,939✔
101
  if (code != TSDB_CODE_SUCCESS) {
45,052,278!
102
    pTask->api.tsdReader.tsdReaderReleaseDataBlock(pTask->pReader);
×
103
  }
104

105
  return code;
45,053,637✔
106
}
107

108
static int32_t getTableData(SStreamReaderTaskInner* pTask, SSDataBlock** ppRes) {
3,751,679✔
109
  return pTask->api.tsdReader.tsdReaderRetrieveDataBlock(pTask->pReader, ppRes, NULL);
3,751,679✔
110
}
111

112
static int32_t buildOTableInfoRsp(const SSTriggerOrigTableInfoRsp* rsp, void** data, size_t* size) {
136,722✔
113
  int32_t code = 0;
136,722✔
114
  int32_t lino = 0;
136,722✔
115
  void*   buf = NULL;
136,722✔
116
  int32_t len = tSerializeSTriggerOrigTableInfoRsp(NULL, 0, rsp);
136,722✔
117
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
135,439!
118
  buf = rpcMallocCont(len);
135,439✔
119
  STREAM_CHECK_NULL_GOTO(buf, terrno);
135,439!
120
  int32_t actLen = tSerializeSTriggerOrigTableInfoRsp(buf, len, rsp);
135,439✔
121
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
136,722!
122
  *data = buf;
136,722✔
123
  *size = len;
136,722✔
124
  buf = NULL;
136,722✔
125
end:
136,722✔
126
  rpcFreeCont(buf);
136,722✔
127
  return code;
136,722✔
128
}
129

130
static bool needRefreshTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int8_t tableType, int64_t suid, int64_t uid, bool isCalc){
3,562,026✔
131
  if (sStreamReaderInfo->isVtableStream) {
3,562,026!
132
    int64_t id[2] = {suid, uid};
1,569,512✔
133
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id)) == NULL) {
1,565,457!
134
      return true;
1,560,138✔
135
    }
136
  } else {
137
    if (tableType != TD_CHILD_TABLE) {
1,995,204✔
138
      return false;
667,930✔
139
    }
140
    if (sStreamReaderInfo->tableType == TD_SUPER_TABLE && 
1,327,274✔
141
        suid == sStreamReaderInfo->suid && 
832,015✔
142
        qStreamGetGroupId(sStreamReaderInfo->tableList, uid) == -1) {
30,885✔
143
      return true;
12,006✔
144
    }
145
  }
146
  return false;
1,326,019✔
147
}
148

149
static bool uidInTableList(SStreamTriggerReaderInfo* sStreamReaderInfo, int64_t suid, int64_t uid, uint64_t* id, bool isCalc){
49,160,273✔
150
  if (sStreamReaderInfo->isVtableStream) {
49,160,273!
151
    int64_t tmp[2] = {suid, uid};
18,306,710✔
152
    if(tSimpleHashGet(isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, tmp, sizeof(tmp)) == NULL) {
18,308,051✔
153
      return false;
6,651,888✔
154
    }
155
    *id = uid;
11,658,910✔
156
  } else {
157
    if (sStreamReaderInfo->tableList == NULL) return false;
30,873,712!
158

159
    if (sStreamReaderInfo->tableType == TD_SUPER_TABLE) {
30,865,679✔
160
      if (suid != sStreamReaderInfo->suid) return false;
19,874,795✔
161
      if (sStreamReaderInfo->pTagCond == NULL) {
14,960,789✔
162
        if (sStreamReaderInfo->partitionCols == NULL){
12,694,101✔
163
          *id = 0;
54,534✔
164
        } else if (sStreamReaderInfo->groupByTbname){
12,639,571!
165
          *id= uid;
11,738,399✔
166
        } else {
167
          *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
902,549✔
168
          if (*id == -1) return false;
903,872!
169
        }
170
      } else {
171
        //*id= uid;
172
        *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
2,269,388✔
173
        if (*id == -1) return false;
2,269,388✔
174
      }
175
    } else {
176
      *id = qStreamGetGroupId(sStreamReaderInfo->tableList, uid);
11,013,781✔
177
      if(*id == -1) *id = uid;
11,008,368✔
178
      return uid == sStreamReaderInfo->uid;
11,009,717✔
179
    }
180
  }
181
  return true;
25,667,712✔
182
}
183

184
static int32_t generateTablistForStreamReader(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, bool isHistory) {
4,459,443✔
185
  int32_t                   code = 0;
4,459,443✔
186
  int32_t                   lino = 0;
4,459,443✔
187
  SNodeList* groupNew = NULL;                                      
4,459,443✔
188
  STREAM_CHECK_RET_GOTO(nodesCloneList(sStreamReaderInfo->partitionCols, &groupNew));
4,459,443!
189

190
  SStorageAPI api = {0};
4,462,132✔
191
  initStorageAPI(&api);
4,462,132✔
192
  code = qStreamCreateTableListForReader(pVnode, sStreamReaderInfo->suid, sStreamReaderInfo->uid, sStreamReaderInfo->tableType, groupNew,
4,456,838✔
193
                                         true, sStreamReaderInfo->pTagCond, sStreamReaderInfo->pTagIndexCond, &api, 
194
                                         isHistory ? &sStreamReaderInfo->historyTableList : &sStreamReaderInfo->tableList,
195
                                         isHistory ? NULL : sStreamReaderInfo->groupIdMap);
196
  end:
4,460,849✔
197
  nodesDestroyList(groupNew);
4,460,849✔
198
  STREAM_PRINT_LOG_END(code, lino);
4,460,849!
199
  return code;
4,463,466✔
200
}
201

202
static int32_t buildVTableInfoRsp(const SStreamMsgVTableInfo* rsp, void** data, size_t* size) {
489,693✔
203
  int32_t code = 0;
489,693✔
204
  int32_t lino = 0;
489,693✔
205
  void*   buf = NULL;
489,693✔
206
  int32_t len = tSerializeSStreamMsgVTableInfo(NULL, 0, rsp);
489,693✔
207
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
491,034!
208
  buf = rpcMallocCont(len);
491,034✔
209
  STREAM_CHECK_NULL_GOTO(buf, terrno);
491,034!
210
  int32_t actLen = tSerializeSStreamMsgVTableInfo(buf, len, rsp);
491,034✔
211
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
492,375!
212
  *data = buf;
492,375✔
213
  *size = len;
492,375✔
214
  buf = NULL;
492,375✔
215
end:
492,375✔
216
  rpcFreeCont(buf);
492,375✔
217
  return code;
492,375✔
218
}
219

220
static int32_t buildTsRsp(const SStreamTsResponse* tsRsp, void** data, size_t* size) {
1,113,410✔
221
  int32_t code = 0;
1,113,410✔
222
  int32_t lino = 0;
1,113,410✔
223
  void*   buf = NULL;
1,113,410✔
224
  int32_t len = tSerializeSStreamTsResponse(NULL, 0, tsRsp);
1,113,410✔
225
  STREAM_CHECK_CONDITION_GOTO(len <= 0, TSDB_CODE_INVALID_PARA);
1,113,410!
226
  buf = rpcMallocCont(len);
1,113,410✔
227
  STREAM_CHECK_NULL_GOTO(buf, terrno);
1,116,100!
228
  int32_t actLen = tSerializeSStreamTsResponse(buf, len, tsRsp);
1,116,100✔
229
  STREAM_CHECK_CONDITION_GOTO(actLen != len, TSDB_CODE_INVALID_PARA);
1,114,755!
230
  *data = buf;
1,114,755✔
231
  *size = len;
1,113,307✔
232
  buf = NULL;
1,114,652✔
233
end:
1,114,652✔
234
  rpcFreeCont(buf);
1,114,652✔
235
  return code;
1,114,652✔
236
}
237

238

239
static int32_t buildRsp(SSDataBlock* pBlock, void** data, size_t* size) {
39,708,204✔
240
  int32_t code = 0;
39,708,204✔
241
  int32_t lino = 0;
39,708,204✔
242
  void*   buf = NULL;
39,708,204✔
243
  STREAM_CHECK_CONDITION_GOTO(pBlock == NULL || pBlock->info.rows == 0, TSDB_CODE_SUCCESS);
39,708,204!
244
  size_t dataEncodeSize = blockGetEncodeSize(pBlock);
2,466,747✔
245
  buf = rpcMallocCont(dataEncodeSize);
2,468,063✔
246
  STREAM_CHECK_NULL_GOTO(buf, terrno);
2,465,435!
247
  int32_t actualLen = blockEncode(pBlock, buf, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
2,465,435✔
248
  STREAM_CHECK_CONDITION_GOTO(actualLen < 0, terrno);
2,465,435!
249
  *data = buf;
2,465,435✔
250
  *size = dataEncodeSize;
2,465,435✔
251
  buf = NULL;
2,465,435✔
252
end:
39,712,228✔
253
  rpcFreeCont(buf);
39,712,228✔
254
  return code;
39,705,576✔
255
}
256

257
static int32_t resetTsdbReader(SStreamReaderTaskInner* pTask) {
805,439✔
258
  int32_t        pNum = 1;
805,439✔
259
  STableKeyInfo* pList = NULL;
805,439✔
260
  int32_t        code = 0;
805,439✔
261
  int32_t        lino = 0;
805,439✔
262
  STREAM_CHECK_RET_GOTO(qStreamGetTableList(pTask->pTableList, pTask->currentGroupIndex, &pList, &pNum));
805,439!
263
  if (pList == NULL || pNum == 0) {
805,439!
264
    code = TSDB_CODE_INVALID_PARA;
×
265
    goto end;
×
266
  }
267
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdSetQueryTableList(pTask->pReader, pList, pNum));
805,439!
268

269
  cleanupQueryTableDataCond(&pTask->cond);
802,789✔
270
  uint64_t suid = pTask->options.sStreamReaderInfo->isVtableStream ? pList->groupId : pTask->options.suid;
804,127!
271
  STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTask->cond, pTask->options.order, pTask->options.schemas, true,
804,101!
272
                                                      pTask->options.twindows, suid, pTask->options.ver, NULL));
273
  STREAM_CHECK_RET_GOTO(pTask->api.tsdReader.tsdReaderResetStatus(pTask->pReader, &pTask->cond));
805,439!
274

275
end:
804,127✔
276
  STREAM_PRINT_LOG_END(code, lino);
804,127!
277
  return code;
805,439✔
278
}
279

280
static int32_t buildWalMetaBlock(SSDataBlock* pBlock, int8_t type, int64_t id, bool isVTable, int64_t uid,
×
281
                                 int64_t skey, int64_t ekey, int64_t ver, int64_t rows) {
282
  int32_t code = 0;
×
283
  int32_t lino = 0;
×
284
  int32_t index = 0;
×
285
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &type));
×
286
  if (!isVTable) {
×
287
    STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
×
288
  }
289
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &uid));
×
290
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
×
291
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
×
292
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
×
293
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &rows));
×
294

295
end:
×
296
  // STREAM_PRINT_LOG_END(code, lino)
297
  return code;
×
298
}
299

300
static int32_t buildWalMetaBlockNew(SSDataBlock* pBlock, int64_t id, int64_t skey, int64_t ekey, int64_t ver) {
8,318,010✔
301
  int32_t code = 0;
8,318,010✔
302
  int32_t lino = 0;
8,318,010✔
303
  int32_t index = 0;
8,318,010✔
304
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
8,318,010!
305
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &skey));
8,309,991!
306
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ekey));
8,312,680!
307
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
8,319,347!
308

309
end:
8,318,002✔
310
  return code;
8,318,002✔
311
}
312

313
static int32_t buildDropTableBlock(SSDataBlock* pBlock, int64_t id, int64_t ver) {
1,372✔
314
  int32_t code = 0;
1,372✔
315
  int32_t lino = 0;
1,372✔
316
  int32_t index = 0;
1,372✔
317
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &id));
1,372!
318
  STREAM_CHECK_RET_GOTO(addColData(pBlock, index++, &ver));
1,372!
319

320
end:
1,372✔
321
  return code;
1,372✔
322
}
323

324
static void buildTSchema(STSchema* pTSchema, int32_t ver, col_id_t colId, int8_t type, int32_t bytes) {
×
325
  pTSchema->numOfCols = 1;
×
326
  pTSchema->version = ver;
×
327
  pTSchema->columns[0].colId = colId;
×
328
  pTSchema->columns[0].type = type;
×
329
  pTSchema->columns[0].bytes = bytes;
×
330
}
×
331

332
static int32_t scanDeleteDataNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
147,730✔
333
                              int64_t ver) {
334
  int32_t    code = 0;
147,730✔
335
  int32_t    lino = 0;
147,730✔
336
  SDecoder   decoder = {0};
147,730✔
337
  SDeleteRes req = {0};
147,730✔
338
  void* pTask = sStreamReaderInfo->pTask;
147,730✔
339

340
  req.uidList = taosArrayInit(0, sizeof(tb_uid_t));
147,730✔
341
  tDecoderInit(&decoder, data, len);
147,730✔
342
  STREAM_CHECK_RET_GOTO(tDecodeDeleteRes(&decoder, &req));
147,730!
343
  STREAM_CHECK_CONDITION_GOTO((sStreamReaderInfo->tableType == TSDB_SUPER_TABLE && !sStreamReaderInfo->isVtableStream && req.suid != sStreamReaderInfo->suid), TDB_CODE_SUCCESS);
147,730!
344
  
345
  for (int32_t i = 0; i < taosArrayGetSize(req.uidList); i++) {
222,945✔
346
    uint64_t* uid = taosArrayGet(req.uidList, i);
127,624✔
347
    STREAM_CHECK_NULL_GOTO(uid, terrno);
127,624!
348
    uint64_t   id = 0;
127,624✔
349
    ST_TASK_ILOG("stream reader scan delete start data:uid %" PRIu64 ", skey %" PRIu64 ", ekey %" PRIu64, *uid, req.skey, req.ekey);
127,624!
350
    STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, req.suid, *uid, &id, false), TDB_CODE_SUCCESS);
127,624✔
351
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->deleteBlock, ((SSDataBlock*)rsp->deleteBlock)->info.rows + 1));
95,321!
352
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->deleteBlock, id, req.skey, req.ekey, ver));
95,321!
353
    ((SSDataBlock*)rsp->deleteBlock)->info.rows++;
95,321✔
354
    rsp->totalRows++;
95,321✔
355
  }
356

357
end:
147,730✔
358
  taosArrayDestroy(req.uidList);
147,730✔
359
  tDecoderClear(&decoder);
147,730✔
360
  return code;
147,730✔
361
}
362

363
static int32_t scanDropTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len,
1,372✔
364
                             int64_t ver) {
365
  int32_t  code = 0;
1,372✔
366
  int32_t  lino = 0;
1,372✔
367
  SDecoder decoder = {0};
1,372✔
368
  void* pTask = sStreamReaderInfo->pTask;
1,372✔
369

370
  SVDropTbBatchReq req = {0};
1,372✔
371
  tDecoderInit(&decoder, data, len);
1,372✔
372
  STREAM_CHECK_RET_GOTO(tDecodeSVDropTbBatchReq(&decoder, &req));
1,372!
373

374
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2,744✔
375
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
1,372✔
376
    STREAM_CHECK_NULL_GOTO(pDropTbReq, TSDB_CODE_INVALID_PARA);
1,372!
377
    uint64_t id = 0;
1,372✔
378
    if(!uidInTableList(sStreamReaderInfo, pDropTbReq->suid, pDropTbReq->uid, &id, false)) {
1,372!
379
      continue;
×
380
    }
381

382
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dropBlock, ((SSDataBlock*)rsp->dropBlock)->info.rows + 1));
1,372!
383
    STREAM_CHECK_RET_GOTO(buildDropTableBlock(rsp->dropBlock, id, ver));
1,372!
384
    ((SSDataBlock*)rsp->dropBlock)->info.rows++;
1,372✔
385
    rsp->totalRows++;
1,372✔
386
    ST_TASK_ILOG("stream reader scan drop uid %" PRId64 ", id %" PRIu64, pDropTbReq->uid, id);
1,372!
387
  }
388

389
end:
1,372✔
390
  tDecoderClear(&decoder);
1,372✔
391
  return code;
1,372✔
392
}
393

394
static int32_t reloadTableList(SStreamTriggerReaderInfo* sStreamReaderInfo){
1,600,417✔
395
  (void)taosThreadMutexLock(&sStreamReaderInfo->mutex);
1,600,417✔
396
  qStreamDestroyTableList(sStreamReaderInfo->tableList);
1,600,417✔
397
  sStreamReaderInfo->tableList = NULL;
1,599,076✔
398
  int32_t code = generateTablistForStreamReader(sStreamReaderInfo->pVnode, sStreamReaderInfo, false);
1,600,417✔
399
  if (code == 0){
1,600,417!
400
    qStreamDestroyTableList(sStreamReaderInfo->historyTableList);
1,600,417✔
401
    sStreamReaderInfo->historyTableList = NULL;
1,599,069✔
402
    code = generateTablistForStreamReader(sStreamReaderInfo->pVnode, sStreamReaderInfo, true);
1,600,417✔
403
  }
404
  (void)taosThreadMutexUnlock(&sStreamReaderInfo->mutex);
1,600,417✔
405
  return code;
1,600,417✔
406
}
407

408
static int32_t scanCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
132,636✔
409
  int32_t  code = 0;
132,636✔
410
  int32_t  lino = 0;
132,636✔
411
  SDecoder decoder = {0};
132,636✔
412
  void* pTask = sStreamReaderInfo->pTask;
132,636✔
413

414
  SVCreateTbBatchReq req = {0};
132,636✔
415
  tDecoderInit(&decoder, data, len);
132,636✔
416
  
417
  STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbBatchReq(&decoder, &req));
132,636!
418

419
  bool found = false;
132,636✔
420
  SVCreateTbReq* pCreateReq = NULL;
132,636✔
421
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
200,651✔
422
    pCreateReq = req.pReqs + iReq;
132,636✔
423
    if (!needRefreshTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
132,636✔
424
      ST_TASK_ILOG("stream reader scan create table jump, %s", pCreateReq->name);
68,015!
425
      continue;
68,015✔
426
    }
427
    ST_TASK_ILOG("stream reader scan create table %s", pCreateReq->name);
64,621!
428

429
    found = true;
64,621✔
430
    break;
64,621✔
431
  }
432
  STREAM_CHECK_CONDITION_GOTO(!found, TDB_CODE_SUCCESS);
132,636✔
433

434
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
64,621!
435
end:
132,636✔
436
  tDeleteSVCreateTbBatchReq(&req);
132,636✔
437
  tDecoderClear(&decoder);
132,636✔
438
  return code;
132,636✔
439
}
440

441
static int32_t processAutoCreateTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, SVCreateTbReq* pCreateReq) {
3,429,390✔
442
  int32_t  code = 0;
3,429,390✔
443
  int32_t  lino = 0;
3,429,390✔
444
  void*    pTask = sStreamReaderInfo->pTask;
3,429,390✔
445
  if (!needRefreshTableList(sStreamReaderInfo, pCreateReq->type, pCreateReq->ctb.suid, pCreateReq->uid, false)) {
3,428,092✔
446
    ST_TASK_DLOG("stream reader scan auto create table jump, %s", pCreateReq->name);
1,920,515✔
447
    goto end;
1,925,897✔
448
  }
449
  ST_TASK_ILOG("stream reader scan auto create table %s", pCreateReq->name);
1,506,175!
450

451
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
1,507,523!
452
end:
1,507,523✔
453
  return code;
3,433,420✔
454
}
455

456
static int32_t scanAlterTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
172,678✔
457
  int32_t  code = 0;
172,678✔
458
  int32_t  lino = 0;
172,678✔
459
  SDecoder decoder = {0};
172,678✔
460
  void* pTask = sStreamReaderInfo->pTask;
172,678✔
461

462
  SVAlterTbReq req = {0};
172,678✔
463
  tDecoderInit(&decoder, data, len);
172,678✔
464
  
465
  STREAM_CHECK_RET_GOTO(tDecodeSVAlterTbReq(&decoder, &req));
172,678!
466
  STREAM_CHECK_CONDITION_GOTO(req.action != TSDB_ALTER_TABLE_UPDATE_TAG_VAL && req.action != TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL, TDB_CODE_SUCCESS);
172,678!
467

468
  ETableType tbType = 0;
43,669✔
469
  uint64_t suid = 0;
43,669✔
470
  STREAM_CHECK_RET_GOTO(metaGetTableTypeSuidByName(sStreamReaderInfo->pVnode, req.tbName, &tbType, &suid));
43,669!
471
  STREAM_CHECK_CONDITION_GOTO(tbType != TSDB_CHILD_TABLE, TDB_CODE_SUCCESS);
43,669!
472
  STREAM_CHECK_CONDITION_GOTO(suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
43,669✔
473

474
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
28,273!
475
  ST_TASK_ILOG("stream reader scan alter table %s", req.tbName);
28,273!
476

477
end:
172,678✔
478
  taosArrayDestroy(req.pMultiTag);
172,678✔
479
  tDecoderClear(&decoder);
172,678✔
480
  return code;
172,678✔
481
}
482

483
// static int32_t scanAlterSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
484
//   int32_t  code = 0;
485
//   int32_t  lino = 0;
486
//   SDecoder decoder = {0};
487
//   SMAlterStbReq reqAlter = {0};
488
//   SVCreateStbReq req = {0};
489
//   tDecoderInit(&decoder, data, len);
490
//   void* pTask = sStreamReaderInfo->pTask;
491
  
492
//   STREAM_CHECK_RET_GOTO(tDecodeSVCreateStbReq(&decoder, &req));
493
//   STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
494
//   if (req.alterOriData != 0) {
495
//     STREAM_CHECK_RET_GOTO(tDeserializeSMAlterStbReq(req.alterOriData, req.alterOriDataLen, &reqAlter));
496
//     STREAM_CHECK_CONDITION_GOTO(reqAlter.alterType != TSDB_ALTER_TABLE_DROP_TAG && reqAlter.alterType != TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TDB_CODE_SUCCESS);
497
//   }
498
  
499
//   STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
500

501
//   ST_TASK_ILOG("stream reader scan alter suid %" PRId64, req.suid);
502
// end:
503
//   tFreeSMAltertbReq(&reqAlter);
504
//   tDecoderClear(&decoder);
505
//   return code;
506
// }
507

508
static int32_t scanDropSTableNew(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len) {
×
509
  int32_t  code = 0;
×
510
  int32_t  lino = 0;
×
511
  SDecoder decoder = {0};
×
512
  void* pTask = sStreamReaderInfo->pTask;
×
513

514
  SVDropStbReq req = {0};
×
515
  tDecoderInit(&decoder, data, len);
×
516
  STREAM_CHECK_RET_GOTO(tDecodeSVDropStbReq(&decoder, &req));
×
517
  STREAM_CHECK_CONDITION_GOTO(req.suid != sStreamReaderInfo->suid, TDB_CODE_SUCCESS);
×
518
  STREAM_CHECK_RET_GOTO(reloadTableList(sStreamReaderInfo));
×
519

520
  ST_TASK_ILOG("stream reader scan drop suid %" PRId64, req.suid);
×
521
end:
×
522
  tDecoderClear(&decoder);
×
523
  return code;
×
524
}
525

526
static int32_t scanSubmitTbDataForMeta(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* gidHash) {
11,906,803✔
527
  int32_t code = 0;
11,906,803✔
528
  int32_t lino = 0;
11,906,803✔
529
  WalMetaResult walMeta = {0};
11,906,803✔
530
  SSubmitTbData submitTbData = {0};
11,908,144✔
531
  
532
  if (tStartDecode(pCoder) < 0) {
11,908,137!
533
    code = TSDB_CODE_INVALID_MSG;
×
534
    TSDB_CHECK_CODE(code, lino, end);
×
535
  }
536

537
  uint8_t       version = 0;
11,914,856✔
538
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
11,912,174!
539
    code = TSDB_CODE_INVALID_MSG;
×
540
    TSDB_CHECK_CODE(code, lino, end);
×
541
  }
542
  version = (submitTbData.flags >> 8) & 0xff;
11,912,174✔
543
  submitTbData.flags = submitTbData.flags & 0xff;
11,912,174✔
544

545
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
546
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
11,912,174✔
547
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
1,844,481!
548
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
1,844,496!
549
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
1,844,496!
550
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq));
1,841,799!
551
  }
552

553
  // submit data
554
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
11,914,877!
555
    code = TSDB_CODE_INVALID_MSG;
×
556
    TSDB_CHECK_CODE(code, lino, end);
×
557
  }
558
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
11,912,181!
559
    code = TSDB_CODE_INVALID_MSG;
×
560
    TSDB_CHECK_CODE(code, lino, end);
×
561
  }
562

563
  if (!uidInTableList(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &walMeta.id, false)){
11,912,181✔
564
    goto end;
8,308,239✔
565
  }
566
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
3,605,341!
567
    code = TSDB_CODE_INVALID_MSG;
×
568
    TSDB_CHECK_CODE(code, lino, end);
×
569
  }
570

571
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
3,605,341!
572
    uint64_t nColData = 0;
×
573
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
574
      code = TSDB_CODE_INVALID_MSG;
×
575
      TSDB_CHECK_CODE(code, lino, end);
×
576
    }
577

578
    SColData colData = {0};
×
579
    code = tDecodeColData(version, pCoder, &colData, false);
×
580
    if (code) {
×
581
      code = TSDB_CODE_INVALID_MSG;
×
582
      TSDB_CHECK_CODE(code, lino, end);
×
583
    }
584

585
    if (colData.flag != HAS_VALUE) {
×
586
      code = TSDB_CODE_INVALID_MSG;
×
587
      TSDB_CHECK_CODE(code, lino, end);
×
588
    }
589
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
590
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
591

592
    for (uint64_t i = 1; i < nColData; i++) {
×
593
      code = tDecodeColData(version, pCoder, &colData, true);
×
594
      if (code) {
×
595
        code = TSDB_CODE_INVALID_MSG;
×
596
        TSDB_CHECK_CODE(code, lino, end);
×
597
      }
598
    }
599
  } else {
600
    uint64_t nRow = 0;
3,605,341✔
601
    if (tDecodeU64v(pCoder, &nRow) < 0) {
3,605,341!
602
      code = TSDB_CODE_INVALID_MSG;
×
603
      TSDB_CHECK_CODE(code, lino, end);
×
604
    }
605

606
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
14,074,874✔
607
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
10,468,192✔
608
      pCoder->pos += pRow->len;
10,470,874✔
609
      if (iRow == 0){
10,472,215✔
610
#ifndef NO_UNALIGNED_ACCESS
611
        walMeta.skey = pRow->ts;
3,605,341✔
612
#else
613
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
614
#endif
615
      }
616
      if (iRow == nRow - 1) {
10,472,215✔
617
#ifndef NO_UNALIGNED_ACCESS
618
        walMeta.ekey = pRow->ts;
3,604,000✔
619
#else
620
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
621
#endif
622
      }
623
    }
624
  }
625

626
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
3,605,341✔
627
  if (data != NULL) {
3,602,645!
628
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
629
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
630
  } else {
631
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
3,602,645!
632
  }
633

634
end:
11,890,869✔
635
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
11,902,882✔
636
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
11,902,824!
637
  tEndDecode(pCoder);
11,904,172✔
638
  return code;
11,905,469✔
639
}
640

641
static int32_t scanSubmitDataForMeta(SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* rsp, void* data, int32_t len, int64_t ver) {
11,904,142✔
642
  int32_t  code = 0;
11,904,142✔
643
  int32_t  lino = 0;
11,904,142✔
644
  SDecoder decoder = {0};
11,904,142✔
645
  SSHashObj* gidHash = NULL;
11,905,483✔
646
  void* pTask = sStreamReaderInfo->pTask;
11,905,483✔
647

648
  tDecoderInit(&decoder, data, len);
11,908,165✔
649
  if (tStartDecode(&decoder) < 0) {
11,918,879!
650
    code = TSDB_CODE_INVALID_MSG;
×
651
    TSDB_CHECK_CODE(code, lino, end);
×
652
  }
653

654
  uint64_t nSubmitTbData = 0;
11,914,870✔
655
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
11,913,515!
656
    code = TSDB_CODE_INVALID_MSG;
×
657
    TSDB_CHECK_CODE(code, lino, end);
×
658
  }
659

660
  gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
11,913,515✔
661
  STREAM_CHECK_NULL_GOTO(gidHash, terrno);
11,905,469!
662

663
  for (int32_t i = 0; i < nSubmitTbData; i++) {
23,804,240✔
664
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataForMeta(&decoder, sStreamReaderInfo, gidHash));
11,901,446!
665
  }
666
  tEndDecode(&decoder);
11,902,794✔
667

668
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
11,897,402!
669
  int32_t iter = 0;
11,885,356✔
670
  void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
11,890,727✔
671
  while (px != NULL) {
15,516,225✔
672
    WalMetaResult* pMeta = (WalMetaResult*)px;
3,605,334✔
673
    STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
3,605,334!
674
    ((SSDataBlock*)rsp->metaBlock)->info.rows++;
3,606,682✔
675
    rsp->totalRows++;
3,606,682✔
676
    ST_TASK_DLOG("stream reader scan submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
3,605,348✔
677
          ", ver:%"PRId64, pMeta->skey, pMeta->ekey, pMeta->id, ver);
678
    px = tSimpleHashIterate(gidHash, px, &iter);
3,605,348✔
679
  }
680
end:
11,910,891✔
681
  tDecoderClear(&decoder);
11,913,573✔
682
  tSimpleHashCleanup( gidHash);
11,913,553✔
683
  return code;
11,912,195✔
684
}
685

686
static int32_t createBlockForTsdbMeta(SSDataBlock** pBlock, bool isVTable) {
1,022,334✔
687
  int32_t code = 0;
1,022,334✔
688
  int32_t lino = 0;
1,022,334✔
689
  SArray* schemas = taosArrayInit(8, sizeof(SSchema));
1,022,334✔
690
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,022,334!
691

692
  int32_t index = 1;
1,022,334✔
693
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // skey
1,022,334!
694
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_TIMESTAMP, LONG_BYTES, index++))  // ekey
1,022,334!
695
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // uid
1,022,334!
696
  if (!isVTable) {
1,022,334✔
697
    STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_UBIGINT, LONG_BYTES, index++))  // gid
217,325!
698
  }
699
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))     // nrows
1,022,334!
700

701
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
1,022,334!
702

703
end:
1,022,334✔
704
  taosArrayDestroy(schemas);
1,022,334✔
705
  return code;
1,022,334✔
706
}
707

708
static int32_t createBlockForWalMetaNew(SSDataBlock** pBlock) {
676,044✔
709
  int32_t code = 0;
676,044✔
710
  int32_t lino = 0;
676,044✔
711
  SArray* schemas = NULL;
676,044✔
712

713
  schemas = taosArrayInit(8, sizeof(SSchema));
676,044✔
714
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
682,682!
715

716
  int32_t index = 0;
682,682✔
717
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
682,682!
718
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // skey
682,682!
719
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ekey
682,682!
720
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
682,682!
721

722
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
682,682!
723

724
end:
682,682✔
725
  taosArrayDestroy(schemas);
682,682✔
726
  return code;
681,352✔
727
}
728

729
static int32_t createBlockForDropTable(SSDataBlock** pBlock) {
1,372✔
730
  int32_t code = 0;
1,372✔
731
  int32_t lino = 0;
1,372✔
732
  SArray* schemas = NULL;
1,372✔
733

734
  schemas = taosArrayInit(8, sizeof(SSchema));
1,372✔
735
  STREAM_CHECK_NULL_GOTO(schemas, terrno);
1,372!
736

737
  int32_t index = 0;
1,372✔
738
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // gid non vtable/uid vtable
1,372!
739
  STREAM_CHECK_RET_GOTO(qStreamBuildSchema(schemas, TSDB_DATA_TYPE_BIGINT, LONG_BYTES, index++))  // ver
1,372!
740

741
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, pBlock));
1,372!
742

743
end:
1,372✔
744
  taosArrayDestroy(schemas);
1,372✔
745
  return code;
1,372✔
746
}
747

748
static int32_t processMeta(int16_t msgType, SStreamTriggerReaderInfo* sStreamReaderInfo, void *data, int32_t len, SSTriggerWalNewRsp* rsp, int32_t ver) {
1,003,368✔
749
  int32_t code = 0;
1,003,368✔
750
  int32_t lino = 0;
1,003,368✔
751
  SDecoder dcoder = {0};
1,003,368✔
752
  tDecoderInit(&dcoder, data, len);
1,003,368✔
753
  if (msgType == TDMT_VND_DELETE && sStreamReaderInfo->deleteReCalc != 0) {
1,003,368✔
754
    if (rsp->deleteBlock == NULL) {
147,730✔
755
      STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&rsp->deleteBlock));
53,374!
756
    }
757
      
758
    STREAM_CHECK_RET_GOTO(scanDeleteDataNew(sStreamReaderInfo, rsp, data, len, ver));
147,730!
759
  } else if (msgType == TDMT_VND_DROP_TABLE && sStreamReaderInfo->deleteOutTbl != 0) {
855,638✔
760
    if (rsp->dropBlock == NULL) {
1,372!
761
      STREAM_CHECK_RET_GOTO(createBlockForDropTable((SSDataBlock**)&rsp->dropBlock));
1,372!
762
    }
763
    STREAM_CHECK_RET_GOTO(scanDropTableNew(sStreamReaderInfo, rsp, data, len, ver));
1,372!
764
  } else if (msgType == TDMT_VND_DROP_STB) {
854,266!
765
    STREAM_CHECK_RET_GOTO(scanDropSTableNew(sStreamReaderInfo, data, len));
×
766
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
854,266✔
767
    STREAM_CHECK_RET_GOTO(scanCreateTableNew(sStreamReaderInfo, data, len));
132,636!
768
  } else if (msgType == TDMT_VND_ALTER_STB) {
721,630✔
769
    // STREAM_CHECK_RET_GOTO(scanAlterSTableNew(sStreamReaderInfo, data, len));
770
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
505,354✔
771
    STREAM_CHECK_RET_GOTO(scanAlterTableNew(sStreamReaderInfo, data, len));
172,678!
772
  }
773

774
  end:
1,003,368✔
775
  tDecoderClear(&dcoder);
1,003,368✔
776
  return code;
1,003,368✔
777
}
778
static int32_t processWalVerMetaNew(SVnode* pVnode, SSTriggerWalNewRsp* rsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
15,227,341✔
779
                       int64_t ctime) {
780
  int32_t code = 0;
15,227,341✔
781
  int32_t lino = 0;
15,227,341✔
782
  void* pTask = sStreamReaderInfo->pTask;
15,227,341✔
783

784
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
15,230,030✔
785
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
15,199,054!
786
  code = walReaderSeekVer(pWalReader, rsp->ver);
15,199,054✔
787
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
15,199,164✔
788
    if (rsp->ver < walGetFirstVer(pWalReader->pWal)) {
13,989,263!
789
      rsp->ver = walGetFirstVer(pWalReader->pWal);
×
790
    }
791
    ST_TASK_DLOG("vgId:%d %s scan wal error:%s", TD_VID(pVnode), __func__, tstrerror(code));
14,005,410✔
792
    code = TSDB_CODE_SUCCESS;
14,010,693✔
793
    goto end;
14,010,693✔
794
  }
795
  STREAM_CHECK_RET_GOTO(code);
1,209,901!
796

797
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, STREAM_RETURN_ROWS_NUM));
1,209,901!
798
  while (1) {
12,431,174✔
799
    code = walNextValidMsg(pWalReader, true);
13,637,083✔
800
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){\
13,617,191✔
801
      ST_TASK_DLOG("vgId:%d %s scan wal error:%s", TD_VID(pVnode), __func__, tstrerror(code));
1,207,268✔
802
      code = TSDB_CODE_SUCCESS;
1,213,941✔
803
      goto end;
1,213,941✔
804
    }
805
    STREAM_CHECK_RET_GOTO(code);
12,409,923!
806
    rsp->ver = pWalReader->curVersion;
12,409,923✔
807
    SWalCont* wCont = &pWalReader->pHead->head;
12,420,684✔
808
    rsp->verTime = wCont->ingestTs;
12,424,647✔
809
    if (wCont->ingestTs / 1000 > ctime) break;
12,421,848!
810
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
12,419,169✔
811
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
12,421,843✔
812
    int64_t ver = wCont->version;
12,432,581✔
813

814
    ST_TASK_DLOG("vgId:%d stream reader scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d",
12,423,119✔
815
      TD_VID(pVnode), ver, wCont->msgType, sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
816
    if (wCont->msgType == TDMT_VND_SUBMIT) {
12,439,281✔
817
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
11,904,200✔
818
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
11,909,564✔
819
      STREAM_CHECK_RET_GOTO(scanSubmitDataForMeta(sStreamReaderInfo, rsp, data, len, ver));
11,916,269!
820
    } else {
821
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, rsp, ver));
517,654!
822
    }
823

824
    if (rsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
12,425,824!
825
      break;
×
826
    }
827
  }
828

829
end:
15,224,634✔
830
  walCloseReader(pWalReader);
15,224,634✔
831
  return code;
15,215,258✔
832
}
833

834
static int32_t processTag(SVnode* pVnode, SStreamTriggerReaderInfo* info, bool isCalc, SStorageAPI* api, 
9,145,213✔
835
  uint64_t uid, SSDataBlock* pBlock, uint32_t currentRow, uint32_t numOfRows, uint32_t numOfBlocks) {
836
  int32_t     code = 0;
9,145,213✔
837
  int32_t     lino = 0;
9,145,213✔
838
  SMetaReader mr = {0};
9,145,213✔
839
  SArray* tagCache = NULL;
9,145,421✔
840

841
  SHashObj* metaCache = isCalc ? info->pTableMetaCacheCalc : info->pTableMetaCacheTrigger;
9,141,590!
842
  SExprInfo*   pExprInfo = isCalc ? info->pExprInfoCalcTag : info->pExprInfoTriggerTag; 
9,138,798!
843
  int32_t      numOfExpr = isCalc ? info->numOfExprCalcTag : info->numOfExprTriggerTag;
9,141,379!
844
  if (numOfExpr == 0) {
9,140,041!
845
    return TSDB_CODE_SUCCESS;
×
846
  }
847

848
  void* uidData = taosHashGet(metaCache, &uid, LONG_BYTES);
9,140,041✔
849
  if (uidData == NULL) {
9,146,970✔
850
    api->metaReaderFn.initReader(&mr, pVnode, META_READER_LOCK, &api->metaFn);
412,662✔
851
    code = api->metaReaderFn.getEntryGetUidCache(&mr, uid);
413,997✔
852
    api->metaReaderFn.readerReleaseLock(&mr);
413,997✔
853
    STREAM_CHECK_RET_GOTO(code);
412,653!
854

855
    tagCache = taosArrayInit(numOfExpr, POINTER_BYTES);
412,653✔
856
    STREAM_CHECK_NULL_GOTO(tagCache, terrno);
413,894!
857
    if(taosHashPut(metaCache, &uid, LONG_BYTES, &tagCache, POINTER_BYTES) != 0) {
413,894!
858
      taosArrayDestroyP(tagCache, taosMemFree);
×
859
      code = terrno;
×
860
      goto end;
×
861
    }
862
  } else {
863
    tagCache = *(SArray**)uidData;
8,734,308✔
864
    STREAM_CHECK_CONDITION_GOTO(taosArrayGetSize(tagCache) != numOfExpr, TSDB_CODE_INVALID_PARA);
8,727,377!
865
  }
866
  
867
  for (int32_t j = 0; j < numOfExpr; ++j) {
24,737,994✔
868
    const SExprInfo* pExpr1 = &pExprInfo[j];
15,592,386✔
869
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
15,596,423✔
870

871
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
15,600,354✔
872
    STREAM_CHECK_NULL_GOTO(pColInfoData, terrno);
15,597,889!
873
    int32_t functionId = pExpr1->pExpr->_function.functionId;
15,597,889✔
874

875
    // this is to handle the tbname
876
    if (fmIsScanPseudoColumnFunc(functionId)) {
15,596,421✔
877
      int32_t fType = pExpr1->pExpr->_function.functionType;
9,136,267✔
878
      if (fType == FUNCTION_TYPE_TBNAME) {
9,145,686!
879
        char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
9,149,662✔
880
        if (uidData == NULL) {
9,148,309✔
881
          STR_TO_VARSTR(buf, mr.me.name)
413,894!
882
          char* tbname = taosStrdup(mr.me.name);
413,997!
883
          STREAM_CHECK_NULL_GOTO(tbname, terrno);
413,997!
884
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &tbname), terrno);
827,994!
885
        } else {
886
          char* tbname = taosArrayGetP(tagCache, j);
8,734,415✔
887
          STR_TO_VARSTR(buf, tbname)
8,723,451!
888
        }
889
        for (uint32_t i = 0; i < numOfRows; i++){
27,269,343✔
890
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
18,125,053!
891
        }
892
        code = colDataSetNItems(pColInfoData, currentRow, buf, numOfRows, numOfBlocks, false);
9,144,290✔
893
        pColInfoData->info.colId = -1;
9,144,142✔
894
      }
895
    } else {  // these are tags
896
      char* data = NULL;
6,451,011✔
897
      const char* p = NULL;
6,451,011✔
898
      STagVal tagVal = {0};
6,451,011✔
899
      if (uidData == NULL) {
6,451,011✔
900
        tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
334,412✔
901
        p = api->metaFn.extractTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
334,412✔
902

903
        if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
334,412!
904
          data = tTagValToData((const STagVal*)p, false);
334,412✔
905
        } else {
906
          data = (char*)p;
×
907
        }
908

909
        if (data == NULL) {
334,412!
910
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &data), terrno);
×
911
        } else {
912
          int32_t len = pColInfoData->info.bytes;
334,412✔
913
          if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
334,412!
914
            len = calcStrBytesByType(pColInfoData->info.type, (char*)data);
95,348✔
915
          }
916
          char* pData = taosMemoryCalloc(1, len);
334,412!
917
          STREAM_CHECK_NULL_GOTO(pData, terrno);
334,412!
918
          (void)memcpy(pData, data, len);
334,412!
919
          STREAM_CHECK_NULL_GOTO(taosArrayPush(tagCache, &pData), terrno);
668,824!
920
        }
921
      } else {
922
        data = taosArrayGetP(tagCache, j);
6,116,599✔
923
      }
924

925
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
6,452,360!
926
      if (isNullVal) {
6,452,356!
927
        colDataSetNNULL(pColInfoData, currentRow, numOfRows);
×
928
      } else {
929
        for (uint32_t i = 0; i < numOfRows; i++){
48,129,041✔
930
          colDataClearNull_f(pColInfoData->nullbitmap, currentRow + i);
41,676,685!
931
        }
932
        code = colDataSetNItems(pColInfoData, currentRow, data, numOfRows, numOfBlocks, false);
6,452,356✔
933
        if (uidData == NULL && pColInfoData->info.type != TSDB_DATA_TYPE_JSON && IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
6,451,015!
934
          taosMemoryFree(data);
95,348!
935
        }
936
        STREAM_CHECK_RET_GOTO(code);
6,449,666!
937
      }
938
    }
939
  }
940

941
end:
9,148,485✔
942
  api->metaReaderFn.clearReader(&mr);
9,148,326✔
943
  return code;
9,145,690✔
944
}
945

946
int32_t getRowRange(SColData* pCol, STimeWindow* window, int32_t* rowStart, int32_t* rowEnd, int32_t* nRows) {
×
947
  int32_t code = 0;
×
948
  int32_t lino = 0;
×
949
  *nRows = 0;
×
950
  *rowStart = 0;
×
951
  *rowEnd = pCol->nVal;
×
952
  if (window != NULL) {
×
953
    SColVal colVal = {0};
×
954
    *rowStart = -1;
×
955
    *rowEnd = -1;
×
956
    for (int32_t k = 0; k < pCol->nVal; k++) {
×
957
      STREAM_CHECK_RET_GOTO(tColDataGetValue(pCol, k, &colVal));
×
958
      int64_t ts = VALUE_GET_TRIVIAL_DATUM(&colVal.value);
×
959
      if (ts >= window->skey && *rowStart == -1) {
×
960
        *rowStart = k;
×
961
      }
962
      if (ts > window->ekey && *rowEnd == -1) {
×
963
        *rowEnd = k;
×
964
      }
965
    }
966
    STREAM_CHECK_CONDITION_GOTO(*rowStart == -1 || *rowStart == *rowEnd, TDB_CODE_SUCCESS);
×
967

968
    if (*rowStart != -1 && *rowEnd == -1) {
×
969
      *rowEnd = pCol->nVal;
×
970
    }
971
  }
972
  *nRows = *rowEnd - *rowStart;
×
973

974
end:
×
975
  return code;
×
976
}
977

978
static int32_t setColData(int64_t rows, int32_t rowStart, int32_t rowEnd, SColData* colData, SColumnInfoData* pColData) {
×
979
  int32_t code = 0;
×
980
  int32_t lino = 0;
×
981
  for (int32_t k = rowStart; k < rowEnd; k++) {
×
982
    SColVal colVal = {0};
×
983
    STREAM_CHECK_RET_GOTO(tColDataGetValue(colData, k, &colVal));
×
984
    STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, rows + k, VALUE_GET_DATUM(&colVal.value, colVal.value.type),
×
985
                                        !COL_VAL_IS_VALUE(&colVal)));
986
  }
987
  end:
×
988
  return code;
×
989
}
990

991
static int32_t scanSubmitTbData(SVnode* pVnode, SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, 
16,808,324✔
992
  STSchema** schemas, SSHashObj* ranges, SSHashObj* gidHash, SSTriggerWalNewRsp* rsp, int64_t ver) {
993
  int32_t code = 0;
16,808,324✔
994
  int32_t lino = 0;
16,808,324✔
995
  uint64_t id = 0;
16,808,324✔
996
  WalMetaResult walMeta = {0};
16,816,397✔
997
  void* pTask = sStreamReaderInfo->pTask;
16,824,462✔
998
  SSDataBlock * pBlock = (SSDataBlock*)rsp->dataBlock;
16,828,489✔
999

1000
  if (tStartDecode(pCoder) < 0) {
16,825,792!
1001
    code = TSDB_CODE_INVALID_MSG;
×
1002
    TSDB_CHECK_CODE(code, lino, end);
×
1003
  }
1004

1005
  SSubmitTbData submitTbData = {0};
16,837,923✔
1006
  uint8_t       version = 0;
16,837,923✔
1007
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
16,828,486!
1008
    code = TSDB_CODE_INVALID_MSG;
×
1009
    TSDB_CHECK_CODE(code, lino, end);
×
1010
  }
1011
  version = (submitTbData.flags >> 8) & 0xff;
16,828,486✔
1012
  submitTbData.flags = submitTbData.flags & 0xff;
16,828,486✔
1013
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1014
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
16,828,486✔
1015
    if (tStartDecode(pCoder) < 0) {
65,215!
1016
      code = TSDB_CODE_INVALID_MSG;
×
1017
      TSDB_CHECK_CODE(code, lino, end);
×
1018
    }
1019
    tEndDecode(pCoder);
65,215✔
1020
  }
1021

1022
  // submit data
1023
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
16,828,483!
1024
    code = TSDB_CODE_INVALID_MSG;
×
1025
    TSDB_CHECK_CODE(code, lino, end);
×
1026
  }
1027
  if (tDecodeI64(pCoder, &submitTbData.uid) < 0) {
16,829,827!
1028
    code = TSDB_CODE_INVALID_MSG;
×
1029
    TSDB_CHECK_CODE(code, lino, end);
×
1030
  }
1031

1032
  STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, submitTbData.suid, submitTbData.uid, &id, rsp->isCalc), TDB_CODE_SUCCESS);
16,829,827✔
1033

1034
  walMeta.id = id;
13,300,819✔
1035
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
13,300,819✔
1036

1037
  if (ranges != NULL){
13,306,206✔
1038
    void* timerange = tSimpleHashGet(ranges, &id, sizeof(id));
8,690,227✔
1039
    if (timerange == NULL) goto end;;
8,691,575!
1040
    int64_t* pRange = (int64_t*)timerange;
8,691,575✔
1041
    window.skey = pRange[0];
8,691,575✔
1042
    window.ekey = pRange[1];
8,692,916✔
1043
  }
1044
  
1045
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
13,306,147!
1046
    code = TSDB_CODE_INVALID_MSG;
×
1047
    TSDB_CHECK_CODE(code, lino, end);
×
1048
  }
1049

1050
  if (*schemas == NULL) {
13,306,147!
1051
    *schemas = metaGetTbTSchema(pVnode->pMeta, submitTbData.suid != 0 ? submitTbData.suid : submitTbData.uid, submitTbData.sver, 1);
13,303,469✔
1052
    STREAM_CHECK_NULL_GOTO(*schemas, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
13,298,112!
1053
  }
1054

1055
  SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(sStreamReaderInfo->indexHash, &submitTbData.uid, LONG_BYTES);
13,298,190✔
1056
  STREAM_CHECK_NULL_GOTO(pSlice, TSDB_CODE_INVALID_PARA);
13,303,539!
1057
  int32_t blockStart = pSlice->currentRowIdx;
13,303,539✔
1058

1059
  int32_t numOfRows = 0;
13,306,230✔
1060
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
13,289,891!
1061
    uint64_t nColData = 0;
×
1062
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1063
      code = TSDB_CODE_INVALID_MSG;
×
1064
      TSDB_CHECK_CODE(code, lino, end);
×
1065
    }
1066

1067
    SColData colData = {0};
×
1068
    code = tDecodeColData(version, pCoder, &colData, false);
×
1069
    if (code) {
×
1070
      code = TSDB_CODE_INVALID_MSG;
×
1071
      TSDB_CHECK_CODE(code, lino, end);
×
1072
    }
1073

1074
    if (colData.flag != HAS_VALUE) {
×
1075
      code = TSDB_CODE_INVALID_MSG;
×
1076
      TSDB_CHECK_CODE(code, lino, end);
×
1077
    }
1078
    
1079
    walMeta.skey = ((TSKEY *)colData.pData)[0];
×
1080
    walMeta.ekey = ((TSKEY *)colData.pData)[colData.nVal - 1];
×
1081

1082
    int32_t rowStart = 0;
×
1083
    int32_t rowEnd = 0;
×
1084
    STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, &numOfRows));
×
1085
    STREAM_CHECK_CONDITION_GOTO(numOfRows <= 0, TDB_CODE_SUCCESS);
×
1086

1087
    int32_t pos = pCoder->pos;
×
1088
    for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
×
1089
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
×
1090
      STREAM_CHECK_NULL_GOTO(pColData, terrno);
×
1091
      if (pColData->info.colId <= -1) {
×
1092
        pColData->hasNull = true;
×
1093
        continue;
×
1094
      }
1095
      if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
×
1096
        STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colData, pColData));
×
1097
        continue;
×
1098
      }
1099

1100
      pCoder->pos = pos;
×
1101

1102
      int16_t colId = 0;
×
1103
      if (sStreamReaderInfo->isVtableStream){
×
1104
        int64_t id[2] = {submitTbData.suid, submitTbData.uid};
×
1105
        void *px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
×
1106
        STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
×
1107
        SSHashObj* uInfo = *(SSHashObj **)px;
×
1108
        STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
×
1109
        int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
×
1110
        if (tmp != NULL) {
×
1111
          colId = *tmp;
×
1112
        } else {
1113
          colId = -1;
×
1114
        }
1115
      } else {
1116
        colId = pColData->info.colId;
×
1117
      }
1118
      
1119
      uint64_t j = 1;
×
1120
      for (; j < nColData; j++) {
×
1121
        int16_t cid = 0;
×
1122
        int32_t posTmp = pCoder->pos;
×
1123
        pCoder->pos += INT_BYTES;
×
1124
        if ((code = tDecodeI16v(pCoder, &cid))) return code;
×
1125
        pCoder->pos = posTmp;
×
1126
        if (cid == colId) {
×
1127
          SColData colDataTmp = {0};
×
1128
          code = tDecodeColData(version, pCoder, &colDataTmp, false);
×
1129
          if (code) {
×
1130
            code = TSDB_CODE_INVALID_MSG;
×
1131
            TSDB_CHECK_CODE(code, lino, end);
×
1132
          }
1133
          STREAM_CHECK_RET_GOTO(setColData(blockStart, rowStart, rowEnd, &colDataTmp, pColData));
×
1134
          break;
×
1135
        }
1136
        code = tDecodeColData(version, pCoder, &colData, true);
×
1137
        if (code) {
×
1138
          code = TSDB_CODE_INVALID_MSG;
×
1139
          TSDB_CHECK_CODE(code, lino, end);
×
1140
        }
1141
      }
1142
      if (j == nColData) {
×
1143
        colDataSetNNULL(pColData, blockStart, numOfRows);
×
1144
      }
1145
    }
1146
  } else {
1147
    uint64_t nRow = 0;
13,289,891✔
1148
    if (tDecodeU64v(pCoder, &nRow) < 0) {
13,293,993!
1149
      code = TSDB_CODE_INVALID_MSG;
×
1150
      TSDB_CHECK_CODE(code, lino, end);
×
1151
    }
1152
    for (int32_t iRow = 0; iRow < nRow; ++iRow) {
33,374,077✔
1153
      SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
20,054,541✔
1154
      pCoder->pos += pRow->len;
20,078,557✔
1155

1156
      if (iRow == 0){
20,082,716✔
1157
#ifndef NO_UNALIGNED_ACCESS
1158
        walMeta.skey = pRow->ts;
13,304,783✔
1159
#else
1160
        walMeta.skey = taosGetInt64Aligned(&pRow->ts);
1161
#endif
1162
      }
1163
      if (iRow == nRow - 1) {
20,067,926✔
1164
#ifndef NO_UNALIGNED_ACCESS
1165
        walMeta.ekey = pRow->ts;
13,296,724✔
1166
#else
1167
        walMeta.ekey = taosGetInt64Aligned(&pRow->ts);
1168
#endif
1169
      }
1170

1171
      if (pRow->ts < window.skey || pRow->ts > window.ekey) {
20,069,323!
1172
        continue;
45,512✔
1173
      }
1174
     
1175
      for (int16_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {  // reader todo test null
142,307,860✔
1176
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
122,285,286✔
1177
        STREAM_CHECK_NULL_GOTO(pColData, terrno);
122,278,727!
1178
        if (pColData->info.colId <= -1) {
122,278,727✔
1179
          pColData->hasNull = true;
38,040,537✔
1180
          continue;
38,041,880✔
1181
        }
1182
        int16_t colId = 0;
84,260,945✔
1183
        if (sStreamReaderInfo->isVtableStream){
84,260,945!
1184
          int64_t id[2] = {submitTbData.suid, submitTbData.uid};
11,382,929✔
1185
          void* px = tSimpleHashGet(rsp->isCalc ? sStreamReaderInfo->uidHashCalc : sStreamReaderInfo->uidHashTrigger, id, sizeof(id));
11,382,929!
1186
          STREAM_CHECK_NULL_GOTO(px, TSDB_CODE_INVALID_PARA);
11,384,256!
1187
          SSHashObj* uInfo = *(SSHashObj**)px;
11,384,256✔
1188
          STREAM_CHECK_NULL_GOTO(uInfo, TSDB_CODE_INVALID_PARA);
11,382,908!
1189
          int16_t*  tmp = tSimpleHashGet(uInfo, &i, sizeof(i));
11,382,908✔
1190
          if (tmp != NULL) {
11,381,581✔
1191
            colId = *tmp;
9,735,070✔
1192
          } else {
1193
            colId = -1;
1,646,511✔
1194
          }
1195
          ST_TASK_TLOG("%s vtable colId:%d, i:%d, uid:%" PRId64, __func__, colId, i, submitTbData.uid);
11,386,952!
1196
        } else {
1197
          colId = pColData->info.colId;
72,886,118✔
1198
        }
1199
        
1200
        SColVal colVal = {0};
84,247,744✔
1201
        int32_t sourceIdx = 0;
84,247,739✔
1202
        while (1) {
1203
          if (sourceIdx >= (*schemas)->numOfCols) {
224,901,631✔
1204
            break;
41,166,981✔
1205
          }
1206
          STREAM_CHECK_RET_GOTO(tRowGet(pRow, *schemas, sourceIdx, &colVal));
183,689,271!
1207
          if (colVal.cid == colId) {
183,765,531✔
1208
            break;
43,111,639✔
1209
          }
1210
          sourceIdx++;
140,653,892✔
1211
        }
1212
        if (colVal.cid == colId && COL_VAL_IS_VALUE(&colVal)) {
84,278,620✔
1213
          if (IS_VAR_DATA_TYPE(colVal.value.type) || colVal.value.type == TSDB_DATA_TYPE_DECIMAL){
42,529,949!
1214
            STREAM_CHECK_RET_GOTO(varColSetVarData(pColData, blockStart+ numOfRows, (const char*)colVal.value.pData, colVal.value.nData, !COL_VAL_IS_VALUE(&colVal)));
114,477!
1215
          } else {
1216
            STREAM_CHECK_RET_GOTO(colDataSetVal(pColData, blockStart + numOfRows, (const char*)(&(colVal.value.val)), !COL_VAL_IS_VALUE(&colVal)));
42,415,472!
1217
          }
1218
        } else {
1219
          colDataSetNULL(pColData, blockStart + numOfRows);
41,748,671!
1220
        }
1221
      }
1222
      
1223
      numOfRows++;
20,034,572✔
1224
    }
1225
  }
1226

1227
  if (numOfRows > 0) {
13,303,493!
1228
    if (!sStreamReaderInfo->isVtableStream) {
13,303,493✔
1229
      SStorageAPI  api = {0};
8,889,462✔
1230
      initStorageAPI(&api);
8,886,667✔
1231
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo, rsp->isCalc, &api, submitTbData.uid, pBlock, blockStart, numOfRows, 1));
8,873,341!
1232
    }
1233
    
1234
    SColumnInfoData* pColData = taosArrayGetLast(pBlock->pDataBlock);
13,292,830✔
1235
    STREAM_CHECK_NULL_GOTO(pColData, terrno);
13,290,180!
1236
    STREAM_CHECK_RET_GOTO(colDataSetNItems(pColData, blockStart, (const char*)&ver, numOfRows, 1, false));
13,290,180!
1237
  }
1238

1239
  ST_TASK_DLOG("%s process submit data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
13,298,107✔
1240
    ", uid:%" PRId64 ", ver:%d, row index:%d, rows:%d", __func__, window.skey, window.ekey, 
1241
    id, submitTbData.uid, submitTbData.sver, pSlice->currentRowIdx, numOfRows);
1242
  pSlice->currentRowIdx += numOfRows;
13,300,939✔
1243
  pBlock->info.rows += numOfRows;
13,299,487✔
1244
  
1245
  if (gidHash == NULL) goto end;
13,302,187✔
1246

1247
  WalMetaResult* data = (WalMetaResult*)tSimpleHashGet(gidHash, &walMeta.id, LONG_BYTES);
4,611,961✔
1248
  if (data != NULL) {
4,609,267!
1249
    if (walMeta.skey < data->skey) data->skey = walMeta.skey;
×
1250
    if (walMeta.ekey > data->ekey) data->ekey = walMeta.ekey;
×
1251
  } else {
1252
    STREAM_CHECK_RET_GOTO(tSimpleHashPut(gidHash, &walMeta.id, LONG_BYTES, &walMeta, sizeof(WalMetaResult)));
4,609,267!
1253
  }
1254

1255
end:
16,834,378✔
1256
  if (code != 0) {                                                             \
16,831,177!
1257
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code)); \
×
1258
  }
1259
  tEndDecode(pCoder);
16,831,177✔
1260
  return code;
16,835,233✔
1261
}
1262
static int32_t scanSubmitData(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo,
16,816,388✔
1263
  void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp, int64_t ver) {
1264
  int32_t  code = 0;
16,816,388✔
1265
  int32_t  lino = 0;
16,816,388✔
1266
  STSchema* schemas = NULL;
16,816,388✔
1267
  SDecoder decoder = {0};
16,821,778✔
1268
  SSHashObj* gidHash = NULL;
16,832,540✔
1269
  void* pTask = sStreamReaderInfo->pTask;
16,832,540✔
1270

1271
  tDecoderInit(&decoder, data, len);
16,836,570✔
1272
  if (tStartDecode(&decoder) < 0) {
16,831,189!
1273
    code = TSDB_CODE_INVALID_MSG;
×
1274
    TSDB_CHECK_CODE(code, lino, end);
×
1275
  }
1276

1277
  uint64_t nSubmitTbData = 0;
16,829,841✔
1278
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
16,824,523!
1279
    code = TSDB_CODE_INVALID_MSG;
×
1280
    TSDB_CHECK_CODE(code, lino, end);
×
1281
  }
1282

1283
  if (rsp->metaBlock != NULL){
16,824,523✔
1284
    gidHash = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
8,142,314✔
1285
    STREAM_CHECK_NULL_GOTO(gidHash, terrno);
8,131,469!
1286
  }
1287

1288
  for (int32_t i = 0; i < nSubmitTbData; i++) {
33,639,451✔
1289
    STREAM_CHECK_RET_GOTO(scanSubmitTbData(pVnode, &decoder, sStreamReaderInfo, &schemas, ranges, gidHash, rsp, ver));
16,809,631!
1290
  }
1291

1292
  tEndDecode(&decoder);
16,829,820✔
1293

1294
  if (rsp->metaBlock != NULL){
16,832,526✔
1295
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->metaBlock, ((SSDataBlock*)rsp->metaBlock)->info.rows + tSimpleHashGetSize(gidHash)));
8,144,996!
1296
    int32_t iter = 0;
8,138,251✔
1297
    void*   px = tSimpleHashIterate(gidHash, NULL, &iter);
8,139,596✔
1298
    while (px != NULL) {
12,755,617✔
1299
      WalMetaResult* pMeta = (WalMetaResult*)px;
4,617,345✔
1300
      STREAM_CHECK_RET_GOTO(buildWalMetaBlockNew(rsp->metaBlock, pMeta->id, pMeta->skey, pMeta->ekey, ver));
4,617,345!
1301
      ((SSDataBlock*)rsp->metaBlock)->info.rows++;
4,615,999✔
1302
      ST_TASK_DLOG("%s process meta data:skey %" PRId64 ", ekey %" PRId64 ", id %" PRIu64
4,615,999✔
1303
            ", ver:%"PRId64, __func__, pMeta->skey, pMeta->ekey, pMeta->id, ver);
1304
      px = tSimpleHashIterate(gidHash, px, &iter);
4,615,999✔
1305
    }
1306
  }
1307
  
1308

1309
end:
16,817,381✔
1310
  taosMemoryFree(schemas);
16,829,843!
1311
  tSimpleHashCleanup(gidHash);
16,831,194✔
1312
  tDecoderClear(&decoder);
16,831,196✔
1313
  return code;
16,836,578✔
1314
}
1315

1316
static int32_t scanSubmitTbDataPre(SDecoder *pCoder, SStreamTriggerReaderInfo* sStreamReaderInfo, SSHashObj* ranges, 
20,308,073✔
1317
  uint64_t* gid, int64_t* uid, int32_t* numOfRows, bool isCalc) {
1318
  int32_t code = 0;
20,308,073✔
1319
  int32_t lino = 0;
20,308,073✔
1320
  void* pTask = sStreamReaderInfo->pTask;
20,308,073✔
1321

1322
  if (tStartDecode(pCoder) < 0) {
20,309,417!
1323
    code = TSDB_CODE_INVALID_MSG;
×
1324
    TSDB_CHECK_CODE(code, lino, end);
×
1325
  }
1326

1327
  SSubmitTbData submitTbData = {0};
20,316,158✔
1328
  uint8_t       version = 0;
20,317,503✔
1329
  if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
20,314,798!
1330
    code = TSDB_CODE_INVALID_MSG;
×
1331
    TSDB_CHECK_CODE(code, lino, end);
×
1332
  }
1333
  version = (submitTbData.flags >> 8) & 0xff;
20,314,798✔
1334
  submitTbData.flags = submitTbData.flags & 0xff;
20,314,798✔
1335

1336
  // STREAM_CHECK_CONDITION_GOTO(version < 2, TDB_CODE_SUCCESS);
1337
  if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
20,314,798✔
1338
    submitTbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
1,586,235!
1339
    STREAM_CHECK_NULL_GOTO(submitTbData.pCreateTbReq, terrno);
1,583,541!
1340
    STREAM_CHECK_RET_GOTO(tDecodeSVCreateTbReq(pCoder, submitTbData.pCreateTbReq));
1,583,541!
1341
    STREAM_CHECK_RET_GOTO(processAutoCreateTableNew(sStreamReaderInfo, submitTbData.pCreateTbReq));
1,584,868!
1342
  }
1343

1344
  // submit data
1345
  if (tDecodeI64(pCoder, &submitTbData.suid) < 0) {
20,320,215!
1346
    code = TSDB_CODE_INVALID_MSG;
×
1347
    TSDB_CHECK_CODE(code, lino, end);
×
1348
  }
1349
  if (tDecodeI64(pCoder, uid) < 0) {
20,320,171!
1350
    code = TSDB_CODE_INVALID_MSG;
×
1351
    TSDB_CHECK_CODE(code, lino, end);
×
1352
  }
1353

1354
  STREAM_CHECK_CONDITION_GOTO(!uidInTableList(sStreamReaderInfo, submitTbData.suid, *uid, gid, isCalc), TDB_CODE_SUCCESS);
20,320,171✔
1355

1356
  STimeWindow window = {.skey = INT64_MIN, .ekey = INT64_MAX};
13,302,223✔
1357

1358
  if (ranges != NULL){
13,303,506✔
1359
    void* timerange = tSimpleHashGet(ranges, gid, sizeof(*gid));
8,688,865✔
1360
    if (timerange == NULL) goto end;;
8,690,227!
1361
    int64_t* pRange = (int64_t*)timerange;
8,690,227✔
1362
    window.skey = pRange[0];
8,690,227✔
1363
    window.ekey = pRange[1];
8,690,227✔
1364
  }
1365
  
1366
  if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) {
13,310,250!
1367
    code = TSDB_CODE_INVALID_MSG;
×
1368
    TSDB_CHECK_CODE(code, lino, end);
×
1369
  }
1370

1371
  if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
13,310,250!
1372
    uint64_t nColData = 0;
×
1373
    if (tDecodeU64v(pCoder, &nColData) < 0) {
×
1374
      code = TSDB_CODE_INVALID_MSG;
×
1375
      TSDB_CHECK_CODE(code, lino, end);
×
1376
    }
1377

1378
    SColData colData = {0};
×
1379
    code = tDecodeColData(version, pCoder, &colData, false);
×
1380
    if (code) {
×
1381
      code = TSDB_CODE_INVALID_MSG;
×
1382
      TSDB_CHECK_CODE(code, lino, end);
×
1383
    }
1384

1385
    if (colData.flag != HAS_VALUE) {
×
1386
      code = TSDB_CODE_INVALID_MSG;
×
1387
      TSDB_CHECK_CODE(code, lino, end);
×
1388
    }
1389
    int32_t rowStart = 0;
×
1390
    int32_t rowEnd = 0;
×
1391
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) {
×
1392
      STREAM_CHECK_RET_GOTO(getRowRange(&colData, &window, &rowStart, &rowEnd, numOfRows));
×
1393
    } else {
1394
      (*numOfRows) = colData.nVal;
×
1395
    } 
1396
  } else {
1397
    uint64_t nRow = 0;
13,310,250✔
1398
    if (tDecodeU64v(pCoder, &nRow) < 0) {
13,302,237!
1399
      code = TSDB_CODE_INVALID_MSG;
×
1400
      TSDB_CHECK_CODE(code, lino, end);
×
1401
    }
1402

1403
    if (window.skey != INT64_MIN || window.ekey != INT64_MAX) { 
13,302,237✔
1404
      for (int32_t iRow = 0; iRow < nRow; ++iRow) {
24,093,892✔
1405
        SRow *pRow = (SRow *)(pCoder->data + pCoder->pos);
15,402,303✔
1406
        pCoder->pos += pRow->len;
15,402,245✔
1407
        if (pRow->ts < window.skey || pRow->ts > window.ekey) {
15,403,586!
1408
          continue;
45,512✔
1409
        }
1410
        (*numOfRows)++;
15,356,733✔
1411
      }
1412
    } else {
1413
      (*numOfRows) = nRow;
4,607,900✔
1414
    }
1415
  }
1416
  
1417
end:
20,313,469✔
1418
  tDestroySVSubmitCreateTbReq(submitTbData.pCreateTbReq, TSDB_MSG_FLG_DECODE);
20,297,303✔
1419
  taosMemoryFreeClear(submitTbData.pCreateTbReq);
20,298,718!
1420
  tEndDecode(pCoder);
20,297,369✔
1421
  return code;
20,297,312✔
1422
}
1423

1424
static int32_t scanSubmitDataPre(SStreamTriggerReaderInfo* sStreamReaderInfo, void* data, int32_t len, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
20,318,865✔
1425
  int32_t  code = 0;
20,318,865✔
1426
  int32_t  lino = 0;
20,318,865✔
1427
  SDecoder decoder = {0};
20,318,865✔
1428
  void* pTask = sStreamReaderInfo->pTask;
20,321,558✔
1429

1430
  tDecoderInit(&decoder, data, len);
20,321,565✔
1431
  if (tStartDecode(&decoder) < 0) {
20,318,871!
1432
    code = TSDB_CODE_INVALID_MSG;
×
1433
    TSDB_CHECK_CODE(code, lino, end);
×
1434
  }
1435

1436
  uint64_t nSubmitTbData = 0;
20,302,726✔
1437
  if (tDecodeU64v(&decoder, &nSubmitTbData) < 0) {
20,310,759!
1438
    code = TSDB_CODE_INVALID_MSG;
×
1439
    TSDB_CHECK_CODE(code, lino, end);
×
1440
  }
1441

1442
  for (int32_t i = 0; i < nSubmitTbData; i++) {
40,616,140✔
1443
    uint64_t gid = -1;
20,310,765✔
1444
    int64_t  uid = 0;
20,308,081✔
1445
    int32_t numOfRows = 0;
20,306,736✔
1446
    STREAM_CHECK_RET_GOTO(scanSubmitTbDataPre(&decoder, sStreamReaderInfo, ranges, &gid, &uid, &numOfRows, rsp->isCalc));
20,308,072!
1447
    if (numOfRows <= 0) {
20,302,749✔
1448
      continue;
7,018,020✔
1449
    }
1450
    rsp->totalRows += numOfRows;
13,284,729✔
1451

1452
    SStreamWalDataSlice* pSlice = (SStreamWalDataSlice*)tSimpleHashGet(sStreamReaderInfo->indexHash, &uid, LONG_BYTES);
13,299,485✔
1453
    if (pSlice != NULL) {
13,300,909✔
1454
      pSlice->numRows += numOfRows;
12,721,730✔
1455
      ST_TASK_DLOG("%s again uid:%" PRId64 ", gid:%" PRIu64 ", total numOfRows:%d", __func__, uid, gid, pSlice->numRows);
12,724,368✔
1456
      pSlice->gId = gid;
12,726,994✔
1457
    } else {
1458
      SStreamWalDataSlice tmp = {.gId=gid,.numRows=numOfRows,.currentRowIdx=0,.startRowIdx=0};
579,179✔
1459
      ST_TASK_DLOG("%s first uid:%" PRId64 ", gid:%" PRIu64 ", numOfRows:%d", __func__, uid, gid, tmp.numRows);
579,179✔
1460
      STREAM_CHECK_RET_GOTO(tSimpleHashPut(sStreamReaderInfo->indexHash, &uid, LONG_BYTES, &tmp, sizeof(tmp)));
579,179!
1461
    } 
1462
  }
1463

1464
  tEndDecode(&decoder);
20,305,375✔
1465

1466
end:
20,306,740✔
1467
  tDecoderClear(&decoder);
20,308,088✔
1468
  return code;
20,312,113✔
1469
}
1470

1471
static void resetIndexHash(SSHashObj* indexHash){
15,490,421✔
1472
  void*   pe = NULL;
15,490,421✔
1473
  int32_t iter = 0;
15,490,421✔
1474
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
38,493,594✔
1475
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
22,997,792✔
1476
    pInfo->startRowIdx = 0;
22,997,792✔
1477
    pInfo->currentRowIdx = 0;
22,996,447✔
1478
    pInfo->numRows = 0;
22,976,343✔
1479
    pInfo->gId = -1;
22,985,693✔
1480
  }
1481
}
15,490,398✔
1482

1483
static void buildIndexHash(SSHashObj* indexHash, void* pTask){
1,385,511✔
1484
  void*   pe = NULL;
1,385,511✔
1485
  int32_t iter = 0;
1,385,511✔
1486
  int32_t index = 0;
1,385,511✔
1487
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
4,994,851✔
1488
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
3,609,340✔
1489
    pInfo->startRowIdx = index;
3,609,340✔
1490
    pInfo->currentRowIdx = index;
3,609,340✔
1491
    index += pInfo->numRows;
3,609,340✔
1492
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
4,649,964!
1493
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1494
  }
1495
}
1,385,511✔
1496

1497
static void printIndexHash(SSHashObj* indexHash, void* pTask){
1,377,478✔
1498
  void*   pe = NULL;
1,377,478✔
1499
  int32_t iter = 0;
1,377,478✔
1500
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
4,973,452✔
1501
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
3,595,974✔
1502
    ST_TASK_DLOG("%s uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", __func__, *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
4,627,299!
1503
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1504
  }
1505
}
1,378,823✔
1506

1507
static void filterIndexHash(SSHashObj* indexHash, SColumnInfoData* pRet){
40,198✔
1508
  void*   pe = NULL;
40,198✔
1509
  int32_t iter = 0;
40,198✔
1510
  int32_t index = 0;
40,198✔
1511
  int32_t pIndex = 0;
40,198✔
1512
  int8_t* pIndicator = (int8_t*)pRet->pData;
40,198✔
1513
  while ((pe = tSimpleHashIterate(indexHash, pe, &iter)) != NULL) {
123,337✔
1514
    SStreamWalDataSlice* pInfo = (SStreamWalDataSlice*)pe;
83,139✔
1515
    pInfo->startRowIdx = index;
83,139✔
1516
    int32_t size = pInfo->numRows;
83,139✔
1517
    for (int32_t i = 0; i < pInfo->numRows; i++) {
470,175✔
1518
      if (pIndicator && !pIndicator[pIndex++]) {
387,036!
1519
        size--;
142,344✔
1520
      }
1521
    }
1522
    pInfo->numRows = size;
83,139✔
1523
    index += pInfo->numRows;
83,139✔
1524
    stTrace("stream reader re build index hash uid:%" PRId64 ", gid:%" PRIu64 ", startRowIdx:%d, numRows:%d", *(int64_t*)(tSimpleHashGetKey(pe, NULL)),
83,139!
1525
    pInfo->gId, pInfo->startRowIdx, pInfo->numRows);
1526
  }
1527
}
40,198✔
1528

1529
static int32_t prepareIndexMetaData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, SSTriggerWalNewRsp* resultRsp){
9,810,397✔
1530
  int32_t      code = 0;
9,810,397✔
1531
  int32_t      lino = 0;
9,810,397✔
1532
  void* pTask = sStreamReaderInfo->pTask;
9,810,397✔
1533

1534
  code = walReaderSeekVer(pWalReader, resultRsp->ver);
9,810,397✔
1535
  if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
9,804,679✔
1536
    if (resultRsp->ver < walGetFirstVer(pWalReader->pWal)) {
8,281,566!
1537
      resultRsp->ver = walGetFirstVer(pWalReader->pWal);
×
1538
    }
1539
    ST_TASK_DLOG("%s scan wal error:%s",  __func__, tstrerror(code));
8,290,985✔
1540
    code = TSDB_CODE_SUCCESS;
8,282,900✔
1541
    goto end;
8,282,900✔
1542
  }
1543
  STREAM_CHECK_RET_GOTO(code);
1,523,113!
1544

1545
  while (1) {
12,111,643✔
1546
    code = walNextValidMsg(pWalReader, true);
13,634,756✔
1547
    if (code == TSDB_CODE_WAL_LOG_NOT_EXIST){
13,628,008✔
1548
      ST_TASK_DLOG("%s scan wal error:%s", __func__, tstrerror(code));
1,517,711✔
1549
      code = TSDB_CODE_SUCCESS;
1,524,457✔
1550
      goto end;
1,524,457✔
1551
    }
1552
    STREAM_CHECK_RET_GOTO(code);
12,110,297!
1553
    resultRsp->ver = pWalReader->curVersion;
12,110,297✔
1554
    SWalCont* wCont = &pWalReader->pHead->head;
12,103,550✔
1555
    resultRsp->verTime = wCont->ingestTs;
12,100,870✔
1556
    void*   data = POINTER_SHIFT(wCont->body, sizeof(SMsgHead));
12,104,921✔
1557
    int32_t len = wCont->bodyLen - sizeof(SMsgHead);
12,110,317✔
1558
    int64_t ver = wCont->version;
12,108,955✔
1559
    ST_TASK_DLOG("%s scan wal ver:%" PRId64 ", type:%d, deleteData:%d, deleteTb:%d", __func__,
12,110,308✔
1560
      ver, wCont->msgType, sStreamReaderInfo->deleteReCalc, sStreamReaderInfo->deleteOutTbl);
1561
    if (wCont->msgType == TDMT_VND_SUBMIT) {
12,116,979✔
1562
      data = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
11,634,018✔
1563
      len = wCont->bodyLen - sizeof(SSubmitReq2Msg);
11,629,985✔
1564
      STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, data, len, NULL, resultRsp));
11,629,981!
1565
    } else if (wCont->msgType == TDMT_VND_ALTER_TABLE && resultRsp->totalRows > 0) {
485,714!
1566
      resultRsp->ver--;
×
1567
      break;
×
1568
    } else {
1569
      STREAM_CHECK_RET_GOTO(processMeta(wCont->msgType, sStreamReaderInfo, data, len, resultRsp, ver));
485,714!
1570
    }
1571

1572
    ST_TASK_DLOG("%s scan wal next ver:%" PRId64 ", totalRows:%d", __func__, resultRsp->ver, resultRsp->totalRows);
12,107,580✔
1573
    if (resultRsp->totalRows >= STREAM_RETURN_ROWS_NUM) {
12,107,797!
1574
      break;
×
1575
    }
1576
  }
1577
  
1578
end:
9,800,625✔
1579
  STREAM_PRINT_LOG_END(code, lino);
9,800,625!
1580
  return code;
9,803,334✔
1581
}
1582

1583
static int32_t prepareIndexData(SWalReader* pWalReader, SStreamTriggerReaderInfo* sStreamReaderInfo, 
5,677,660✔
1584
  SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp){
1585
  int32_t      code = 0;
5,677,660✔
1586
  int32_t      lino = 0;
5,677,660✔
1587

1588
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
14,367,894✔
1589
    int64_t *ver = taosArrayGet(versions, i);
8,688,890✔
1590
    if (ver == NULL) continue;
8,690,234!
1591

1592
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
8,690,234!
1593
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
8,692,916!
1594
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
1595
      continue;
×
1596
    }
1597
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
8,692,916!
1598

1599
    SWalCont* wCont = &pWalReader->pHead->head;
8,690,223✔
1600
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
8,690,223✔
1601
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
8,690,223✔
1602

1603
    STREAM_CHECK_RET_GOTO(scanSubmitDataPre(sStreamReaderInfo, pBody, bodyLen, ranges, rsp));
8,690,223!
1604
  }
1605
  
1606
end:
5,674,978✔
1607
  return code;
5,674,978✔
1608
}
1609

1610
static int32_t filterData(SSTriggerWalNewRsp* resultRsp, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,385,511✔
1611
  int32_t      code = 0;
1,385,511✔
1612
  int32_t       lino = 0;
1,385,511✔
1613
  SColumnInfoData* pRet = NULL;
1,385,511✔
1614
  int64_t totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
1,385,511✔
1615
  STREAM_CHECK_RET_GOTO(qStreamFilter(((SSDataBlock*)resultRsp->dataBlock), sStreamReaderInfo->pFilterInfo, &pRet));
1,385,511!
1616
  if (((SSDataBlock*)resultRsp->dataBlock)->info.rows < totalRows) {
1,385,511✔
1617
    filterIndexHash(sStreamReaderInfo->indexHash, pRet);
40,198✔
1618
  }
1619

1620
end:
1,384,166✔
1621
  colDataDestroy(pRet);
1,384,166✔
1622
  taosMemoryFree(pRet);
1,382,822!
1623
  return code;
1,384,166✔
1624
}
1625

1626
static int32_t processWalVerMetaDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
9,812,640✔
1627
                                    SSTriggerWalNewRsp* resultRsp) {
1628
  int32_t      code = 0;
9,812,640✔
1629
  int32_t      lino = 0;
9,812,640✔
1630
  void* pTask = sStreamReaderInfo->pTask;
9,812,640✔
1631
                                        
1632
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
9,812,640✔
1633
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
9,799,286!
1634
  resetIndexHash(sStreamReaderInfo->indexHash);
9,799,286✔
1635
  blockDataEmpty(resultRsp->dataBlock);
9,815,423✔
1636
  blockDataEmpty(resultRsp->metaBlock);
9,797,932✔
1637
  int64_t lastVer = resultRsp->ver;                                      
9,809,048✔
1638
  STREAM_CHECK_RET_GOTO(prepareIndexMetaData(pWalReader, sStreamReaderInfo, resultRsp));
9,809,048!
1639
  STREAM_CHECK_CONDITION_GOTO(resultRsp->totalRows == 0, TDB_CODE_SUCCESS);
9,806,022✔
1640

1641
  buildIndexHash(sStreamReaderInfo->indexHash, pTask);
390,458✔
1642
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(((SSDataBlock*)resultRsp->dataBlock), resultRsp->totalRows));
390,458!
1643
  while(lastVer < resultRsp->ver) {
8,881,702✔
1644
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, lastVer++));
8,495,279!
1645
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
8,492,586✔
1646
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
348,932!
1647
      continue;
348,932✔
1648
    }
1649
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
8,146,348!
1650
    SWalCont* wCont = &pWalReader->pHead->head;
8,143,658✔
1651
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
8,147,696✔
1652
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
8,147,696✔
1653

1654
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, NULL, resultRsp, wCont->version));
8,143,657!
1655
  }
1656

1657
  int32_t metaRows = resultRsp->totalRows - ((SSDataBlock*)resultRsp->dataBlock)->info.rows;
390,458✔
1658
  STREAM_CHECK_RET_GOTO(filterData(resultRsp, sStreamReaderInfo));
390,458!
1659
  resultRsp->totalRows = ((SSDataBlock*)resultRsp->dataBlock)->info.rows + metaRows;
390,458✔
1660

1661
end:
9,804,674✔
1662
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
9,804,674✔
1663
          resultRsp->totalRows, resultRsp->ver, walGetAppliedVer(pWalReader->pWal));
1664
  walCloseReader(pWalReader);
9,804,674✔
1665
  return code;
9,799,343✔
1666
}
1667

1668
static int32_t processWalVerDataNew(SVnode* pVnode, SStreamTriggerReaderInfo* sStreamReaderInfo, 
5,676,319✔
1669
                                    SArray* versions, SSHashObj* ranges, SSTriggerWalNewRsp* rsp) {
1670
  int32_t      code = 0;
5,676,319✔
1671
  int32_t      lino = 0;
5,676,319✔
1672

1673
  void* pTask = sStreamReaderInfo->pTask;
5,676,319✔
1674
  SWalReader* pWalReader = walOpenReader(pVnode->pWal, 0);
5,676,319✔
1675
  STREAM_CHECK_NULL_GOTO(pWalReader, terrno);
5,672,293!
1676
  
1677
  if (taosArrayGetSize(versions) > 0) {
5,672,293✔
1678
    rsp->ver = *(int64_t*)taosArrayGetLast(versions);
995,053✔
1679
  }
1680
  
1681
  resetIndexHash(sStreamReaderInfo->indexHash);
5,677,660✔
1682
  STREAM_CHECK_RET_GOTO(prepareIndexData(pWalReader, sStreamReaderInfo, versions, ranges, rsp));
5,677,660!
1683
  STREAM_CHECK_CONDITION_GOTO(rsp->totalRows == 0, TDB_CODE_SUCCESS);
5,674,978✔
1684

1685
  buildIndexHash(sStreamReaderInfo->indexHash, pTask);
995,053✔
1686

1687
  blockDataEmpty(rsp->dataBlock);
995,053✔
1688
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(rsp->dataBlock, rsp->totalRows));
995,053!
1689

1690
  for(int32_t i = 0; i < taosArrayGetSize(versions); i++) {
9,678,541✔
1691
    int64_t *ver = taosArrayGet(versions, i);
8,691,575✔
1692
    if (ver == NULL) continue;
8,688,879!
1693

1694
    STREAM_CHECK_RET_GOTO(walFetchHead(pWalReader, *ver));
8,688,879!
1695
    if(pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
8,682,147!
1696
      TAOS_CHECK_RETURN(walSkipFetchBody(pWalReader));
×
1697
      continue;
×
1698
    }
1699
    STREAM_CHECK_RET_GOTO(walFetchBody(pWalReader));
8,683,502!
1700
    SWalCont* wCont = &pWalReader->pHead->head;
8,679,472✔
1701
    void*   pBody = POINTER_SHIFT(wCont->body, sizeof(SSubmitReq2Msg));
8,687,533✔
1702
    int32_t bodyLen = wCont->bodyLen - sizeof(SSubmitReq2Msg);
8,688,882✔
1703

1704
    STREAM_CHECK_RET_GOTO(scanSubmitData(pVnode, sStreamReaderInfo, pBody, bodyLen, ranges, rsp, wCont->version));
8,684,859!
1705
  }
1706
  // printDataBlock(rsp->dataBlock, __func__, "processWalVerDataNew");
1707
  STREAM_CHECK_RET_GOTO(filterData(rsp, sStreamReaderInfo));
992,357!
1708
  rsp->totalRows = ((SSDataBlock*)rsp->dataBlock)->info.rows;
993,708✔
1709

1710
end:
5,673,633✔
1711
  ST_TASK_DLOG("vgId:%d %s end, get result totalRows:%d, process:%"PRId64"/%"PRId64, TD_VID(pVnode), __func__, 
5,673,633✔
1712
            rsp->totalRows, rsp->ver, walGetAppliedVer(pWalReader->pWal));
1713
  walCloseReader(pWalReader);
5,673,633✔
1714
  return code;
5,677,656✔
1715
}
1716

1717
static int32_t buildScheamFromMeta(SVnode* pVnode, int64_t uid, SArray** schemas) {
220,234✔
1718
  int32_t code = 0;
220,234✔
1719
  int32_t lino = 0;
220,234✔
1720
  SMetaReader metaReader = {0};
220,234✔
1721
  SStorageAPI api = {0};
220,234✔
1722
  initStorageAPI(&api);
220,234✔
1723
  *schemas = taosArrayInit(8, sizeof(SSchema));
220,234✔
1724
  STREAM_CHECK_NULL_GOTO(*schemas, terrno);
220,234!
1725
  
1726
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
220,234✔
1727
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, uid));
220,234!
1728

1729
  SSchemaWrapper* sSchemaWrapper = NULL;
220,234✔
1730
  if (metaReader.me.type == TD_CHILD_TABLE) {
220,234!
1731
    int64_t suid = metaReader.me.ctbEntry.suid;
220,234✔
1732
    tDecoderClear(&metaReader.coder);
220,234✔
1733
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
220,234!
1734
    sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
220,234✔
1735
  } else if (metaReader.me.type == TD_NORMAL_TABLE) {
×
1736
    sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
×
1737
  } else {
1738
    qError("invalid table type:%d", metaReader.me.type);
×
1739
  }
1740

1741
  for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
1,111,261✔
1742
    SSchema* s = sSchemaWrapper->pSchema + j;
891,027✔
1743
    STREAM_CHECK_NULL_GOTO(taosArrayPush(*schemas, s), terrno);
1,782,054!
1744
  }
1745

1746
end:
220,234✔
1747
  api.metaReaderFn.clearReader(&metaReader);
220,234✔
1748
  STREAM_PRINT_LOG_END(code, lino);
220,234!
1749
  if (code != 0)  {
220,234!
1750
    taosArrayDestroy(*schemas);
×
1751
    *schemas = NULL;
×
1752
  }
1753
  return code;
220,234✔
1754
}
1755

1756
static int32_t shrinkScheams(SArray* cols, SArray* schemas) {
220,234✔
1757
  int32_t code = 0;
220,234✔
1758
  int32_t lino = 0;
220,234✔
1759
  size_t  schemaLen = taosArrayGetSize(schemas);
220,234✔
1760
  STREAM_CHECK_RET_GOTO(taosArrayEnsureCap(schemas, schemaLen + taosArrayGetSize(cols)));
220,234!
1761
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
680,425✔
1762
    col_id_t* id = taosArrayGet(cols, i);
460,191✔
1763
    STREAM_CHECK_NULL_GOTO(id, terrno);
460,191!
1764
    for (size_t i = 0; i < schemaLen; i++) {
928,270!
1765
      SSchema* s = taosArrayGet(schemas, i);
928,270✔
1766
      STREAM_CHECK_NULL_GOTO(s, terrno);
928,270!
1767
      if (*id == s->colId) {
928,270✔
1768
        STREAM_CHECK_NULL_GOTO(taosArrayPush(schemas, s), terrno);
460,191!
1769
        break;
460,191✔
1770
      }
1771
    }
1772
  }
1773
  taosArrayPopFrontBatch(schemas, schemaLen);
220,234✔
1774

1775
end:
220,234✔
1776
  return code;
220,234✔
1777
}
1778

1779
static int32_t processWalVerDataVTable(SVnode* pVnode, SArray *cids, int64_t ver,
×
1780
  int64_t uid, STimeWindow* window, SSDataBlock** pBlock) {
1781
  int32_t      code = 0;
×
1782
  int32_t      lino = 0;
×
1783
  SArray*      schemas = NULL;
×
1784

1785
  SSDataBlock* pBlock2 = NULL;
×
1786

1787
  STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, uid, &schemas));
×
1788
  STREAM_CHECK_RET_GOTO(shrinkScheams(cids, schemas));
×
1789
  STREAM_CHECK_RET_GOTO(createDataBlockForStream(schemas, &pBlock2));
×
1790

1791
  pBlock2->info.id.uid = uid;
×
1792

1793
  // STREAM_CHECK_RET_GOTO(scanWalOneVer(pVnode, pBlock2, ver, uid, window));
1794
  //printDataBlock(pBlock2, __func__, "");
1795

1796
  *pBlock = pBlock2;
×
1797
  pBlock2 = NULL;
×
1798

1799
end:
×
1800
  STREAM_PRINT_LOG_END(code, lino);
×
1801
  blockDataDestroy(pBlock2);
×
1802
  taosArrayDestroy(schemas);
×
1803
  return code;
×
1804
}
1805

1806
static int32_t createTSAndCondition(int64_t start, int64_t end, SLogicConditionNode** pCond,
×
1807
                                    STargetNode* pTargetNodeTs) {
1808
  int32_t code = 0;
×
1809
  int32_t lino = 0;
×
1810

1811
  SColumnNode*         pCol = NULL;
×
1812
  SColumnNode*         pCol1 = NULL;
×
1813
  SValueNode*          pVal = NULL;
×
1814
  SValueNode*          pVal1 = NULL;
×
1815
  SOperatorNode*       op = NULL;
×
1816
  SOperatorNode*       op1 = NULL;
×
1817
  SLogicConditionNode* cond = NULL;
×
1818

1819
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol));
×
1820
  pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
×
1821
  pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
×
1822
  pCol->node.resType.bytes = LONG_BYTES;
×
1823
  pCol->slotId = pTargetNodeTs->slotId;
×
1824
  pCol->dataBlockId = pTargetNodeTs->dataBlockId;
×
1825

1826
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pCol, (SNode**)&pCol1));
×
1827

1828
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pVal));
×
1829
  pVal->node.resType.type = TSDB_DATA_TYPE_BIGINT;
×
1830
  pVal->node.resType.bytes = LONG_BYTES;
×
1831
  pVal->datum.i = start;
×
1832
  pVal->typeData = start;
×
1833

1834
  STREAM_CHECK_RET_GOTO(nodesCloneNode((SNode*)pVal, (SNode**)&pVal1));
×
1835
  pVal1->datum.i = end;
×
1836
  pVal1->typeData = end;
×
1837

1838
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op));
×
1839
  op->opType = OP_TYPE_GREATER_EQUAL;
×
1840
  op->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1841
  op->node.resType.bytes = CHAR_BYTES;
×
1842
  op->pLeft = (SNode*)pCol;
×
1843
  op->pRight = (SNode*)pVal;
×
1844
  pCol = NULL;
×
1845
  pVal = NULL;
×
1846

1847
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&op1));
×
1848
  op1->opType = OP_TYPE_LOWER_EQUAL;
×
1849
  op1->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1850
  op1->node.resType.bytes = CHAR_BYTES;
×
1851
  op1->pLeft = (SNode*)pCol1;
×
1852
  op1->pRight = (SNode*)pVal1;
×
1853
  pCol1 = NULL;
×
1854
  pVal1 = NULL;
×
1855

1856
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
×
1857
  cond->condType = LOGIC_COND_TYPE_AND;
×
1858
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
×
1859
  cond->node.resType.bytes = CHAR_BYTES;
×
1860
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
×
1861
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op));
×
1862
  op = NULL;
×
1863
  STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)op1));
×
1864
  op1 = NULL;
×
1865

1866
  *pCond = cond;
×
1867

1868
end:
×
1869
  if (code != 0) {
×
1870
    nodesDestroyNode((SNode*)pCol);
×
1871
    nodesDestroyNode((SNode*)pCol1);
×
1872
    nodesDestroyNode((SNode*)pVal);
×
1873
    nodesDestroyNode((SNode*)pVal1);
×
1874
    nodesDestroyNode((SNode*)op);
×
1875
    nodesDestroyNode((SNode*)op1);
×
1876
    nodesDestroyNode((SNode*)cond);
×
1877
  }
1878
  STREAM_PRINT_LOG_END(code, lino);
×
1879

1880
  return code;
×
1881
}
1882

1883
/*
1884
static int32_t createExternalConditions(SStreamRuntimeFuncInfo* data, SLogicConditionNode** pCond, STargetNode* pTargetNodeTs, STimeRangeNode* node) {
1885
  int32_t              code = 0;
1886
  int32_t              lino = 0;
1887
  SLogicConditionNode* pAndCondition = NULL;
1888
  SLogicConditionNode* cond = NULL;
1889

1890
  if (pTargetNodeTs == NULL) {
1891
    vError("stream reader %s no ts column", __func__);
1892
    return TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN;
1893
  }
1894
  STREAM_CHECK_RET_GOTO(nodesMakeNode(QUERY_NODE_LOGIC_CONDITION, (SNode**)&cond));
1895
  cond->condType = LOGIC_COND_TYPE_OR;
1896
  cond->node.resType.type = TSDB_DATA_TYPE_BOOL;
1897
  cond->node.resType.bytes = CHAR_BYTES;
1898
  STREAM_CHECK_RET_GOTO(nodesMakeList(&cond->pParameterList));
1899

1900
  for (int i = 0; i < taosArrayGetSize(data->pStreamPesudoFuncVals); ++i) {
1901
    data->curIdx = i;
1902

1903
    SReadHandle handle = {0};
1904
    calcTimeRange(node, data, &handle.winRange, &handle.winRangeValid);
1905
    if (!handle.winRangeValid) {
1906
      stError("stream reader %s invalid time range, skey:%" PRId64 ", ekey:%" PRId64, __func__, handle.winRange.skey,
1907
              handle.winRange.ekey);
1908
      continue;
1909
    }
1910
    STREAM_CHECK_RET_GOTO(createTSAndCondition(handle.winRange.skey, handle.winRange.ekey, &pAndCondition, pTargetNodeTs));
1911
    stDebug("%s create condition skey:%" PRId64 ", eksy:%" PRId64, __func__, handle.winRange.skey, handle.winRange.ekey);
1912
    STREAM_CHECK_RET_GOTO(nodesListAppend(cond->pParameterList, (SNode*)pAndCondition));
1913
    pAndCondition = NULL;
1914
  }
1915

1916
  *pCond = cond;
1917

1918
end:
1919
  if (code != 0) {
1920
    nodesDestroyNode((SNode*)pAndCondition);
1921
    nodesDestroyNode((SNode*)cond);
1922
  }
1923
  STREAM_PRINT_LOG_END(code, lino);
1924

1925
  return code;
1926
}
1927
*/
1928

1929
static int32_t processCalaTimeRange(SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo, SResFetchReq* req,
496,090✔
1930
                                    STimeRangeNode* node, SReadHandle* handle, bool isExtWin) {
1931
  int32_t code = 0;
496,090✔
1932
  int32_t lino = 0;
496,090✔
1933
  void* pTask = sStreamReaderCalcInfo->pTask;
496,090✔
1934
  STimeWindow* pWin = isExtWin ? &handle->extWinRange : &handle->winRange;
496,090!
1935
  bool* pValid = isExtWin ? &handle->extWinRangeValid : &handle->winRangeValid;
494,735!
1936
  
1937
  if (req->pStRtFuncInfo->withExternalWindow) {
496,090✔
1938
    sStreamReaderCalcInfo->tmpRtFuncInfo.curIdx = 0;
281,465✔
1939
    sStreamReaderCalcInfo->tmpRtFuncInfo.triggerType = req->pStRtFuncInfo->triggerType;
280,110✔
1940
    
1941
    SSTriggerCalcParam* pFirst = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, 0);
280,110✔
1942
    SSTriggerCalcParam* pLast = taosArrayGetLast(req->pStRtFuncInfo->pStreamPesudoFuncVals);
280,110✔
1943
    STREAM_CHECK_NULL_GOTO(pFirst, terrno);
280,110!
1944
    STREAM_CHECK_NULL_GOTO(pLast, terrno);
280,110!
1945

1946
    if (!node->needCalc) {
280,110✔
1947
      pWin->skey = pFirst->wstart;
210,603✔
1948
      pWin->ekey = pLast->wend;
211,958✔
1949
      *pValid = true;
210,603✔
1950
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
210,603✔
1951
        pWin->ekey--;
147,903✔
1952
      }
1953
    } else {
1954
      SSTriggerCalcParam* pTmp = taosArrayGet(sStreamReaderCalcInfo->tmpRtFuncInfo.pStreamPesudoFuncVals, 0);
70,862✔
1955
      memcpy(pTmp, pFirst, sizeof(*pTmp));
69,507!
1956

1957
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 1));
69,507!
1958
      if (*pValid) {
69,507!
1959
        int64_t skey = pWin->skey;
69,507✔
1960

1961
        memcpy(pTmp, pLast, sizeof(*pTmp));
69,507!
1962
        STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, &sStreamReaderCalcInfo->tmpRtFuncInfo, pWin, pValid, 2));
69,507!
1963

1964
        if (*pValid) {
69,507!
1965
          pWin->skey = skey;
69,507✔
1966
        }
1967
      }
1968
      pWin->ekey--;
69,507✔
1969
    }
1970
  } else {
1971
    if (!node->needCalc) {
214,625!
1972
      SSTriggerCalcParam* pCurr = taosArrayGet(req->pStRtFuncInfo->pStreamPesudoFuncVals, req->pStRtFuncInfo->curIdx);
214,625✔
1973
      pWin->skey = pCurr->wstart;
214,625✔
1974
      pWin->ekey = pCurr->wend;
214,625✔
1975
      *pValid = true;
214,625✔
1976
      if (req->pStRtFuncInfo->triggerType == STREAM_TRIGGER_SLIDING) {
214,625✔
1977
        pWin->ekey--;
197,981✔
1978
      }
1979
    } else {
1980
      STREAM_CHECK_RET_GOTO(streamCalcCurrWinTimeRange(node, req->pStRtFuncInfo, pWin, pValid, 3));
×
1981
      pWin->ekey--;
×
1982
    }
1983
  }
1984

1985
  ST_TASK_DLOG("%s type:%s, withExternalWindow:%d, skey:%" PRId64 ", ekey:%" PRId64 ", validRange:%d", 
496,090!
1986
      __func__, isExtWin ? "interp range" : "scan time range", req->pStRtFuncInfo->withExternalWindow, pWin->skey, pWin->ekey, *pValid);
1987

1988
end:
47,135✔
1989

1990
  if (code) {
496,090!
1991
    ST_TASK_ELOG("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1992
  }
1993
  
1994
  return code;
496,090✔
1995
}
1996

1997
static int32_t processTs(SVnode* pVnode, SStreamTsResponse* tsRsp, SStreamTriggerReaderInfo* sStreamReaderInfo,
1,108,032✔
1998
                                  SStreamReaderTaskInner* pTaskInner) {
1999
  int32_t code = 0;
1,108,032✔
2000
  int32_t lino = 0;
1,108,032✔
2001

2002
  void* pTask = sStreamReaderInfo->pTask;
1,108,032✔
2003
  tsRsp->tsInfo = taosArrayInit(qStreamGetTableListGroupNum(pTaskInner->pTableList), sizeof(STsInfo));
1,112,062✔
2004
  STREAM_CHECK_NULL_GOTO(tsRsp->tsInfo, terrno);
1,112,063!
2005
  while (true) {
805,439✔
2006
    bool hasNext = false;
1,917,513✔
2007
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
1,913,470!
2008
    if (hasNext) {
1,916,142!
2009
      pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
797,808✔
2010
      STsInfo* tsInfo = taosArrayReserve(tsRsp->tsInfo, 1);
797,812✔
2011
      STREAM_CHECK_NULL_GOTO(tsInfo, terrno)
796,463!
2012
      if (pTaskInner->options.order == TSDB_ORDER_ASC) {
796,463✔
2013
        tsInfo->ts = pTaskInner->pResBlock->info.window.skey;
516,959✔
2014
      } else {
2015
        tsInfo->ts = pTaskInner->pResBlock->info.window.ekey;
280,895✔
2016
      }
2017
      tsInfo->gId = (sStreamReaderInfo->groupByTbname || sStreamReaderInfo->tableType != TSDB_SUPER_TABLE) ? 
1,767,290!
2018
                    pTaskInner->pResBlock->info.id.uid : qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
970,716✔
2019
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64 ", ver:%" PRId64, TD_VID(pVnode), __func__, tsInfo->ts,
796,476✔
2020
              tsInfo->gId, tsRsp->ver);
2021
    }
2022
    
2023
    pTaskInner->currentGroupIndex++;
1,914,913✔
2024
    if (pTaskInner->currentGroupIndex >= qStreamGetTableListGroupNum(pTaskInner->pTableList) || pTaskInner->options.gid != 0) {
1,918,860✔
2025
      break;
2026
    }
2027
    STREAM_CHECK_RET_GOTO(resetTsdbReader(pTaskInner));
804,101!
2028
  }
2029

2030
end:
1,112,082✔
2031
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,112,082!
2032
  return code;
1,112,061✔
2033
}
2034

2035
static int32_t vnodeProcessStreamSetTableReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
136,722✔
2036
  int32_t code = 0;
136,722✔
2037
  int32_t lino = 0;
136,722✔
2038
  void*   buf = NULL;
136,722✔
2039
  size_t  size = 0;
136,722✔
2040
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
136,722!
2041
  void* pTask = sStreamReaderInfo->pTask;
136,722✔
2042

2043
  ST_TASK_DLOG("vgId:%d %s start, trigger hash size:%d, calc hash size:%d", TD_VID(pVnode), __func__,
136,722✔
2044
                tSimpleHashGetSize(req->setTableReq.uidInfoTrigger), tSimpleHashGetSize(req->setTableReq.uidInfoCalc));
2045

2046
  TSWAP(sStreamReaderInfo->uidHashTrigger, req->setTableReq.uidInfoTrigger);
136,722✔
2047
  TSWAP(sStreamReaderInfo->uidHashCalc, req->setTableReq.uidInfoCalc);
136,722✔
2048
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashTrigger, TSDB_CODE_INVALID_PARA);
136,722!
2049
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->uidHashCalc, TSDB_CODE_INVALID_PARA);
136,722!
2050

2051
  sStreamReaderInfo->isVtableStream = true;
136,722✔
2052
  sStreamReaderInfo->groupByTbname = true;
136,722✔
2053
end:
136,722✔
2054
  STREAM_PRINT_LOG_END_WITHID(code, lino);
136,722!
2055
  SRpcMsg rsp = {
136,722✔
2056
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2057
  tmsgSendRsp(&rsp);
136,722✔
2058
  return code;
136,722✔
2059
}
2060

2061
static int32_t vnodeProcessStreamLastTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
604,873✔
2062
  int32_t                 code = 0;
604,873✔
2063
  int32_t                 lino = 0;
604,873✔
2064
  SStreamReaderTaskInner* pTaskInner = NULL;
604,873✔
2065
  SStreamTsResponse       lastTsRsp = {0};
612,773✔
2066
  void*                   buf = NULL;
612,773✔
2067
  size_t                  size = 0;
612,773✔
2068

2069
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
611,490!
2070
  void* pTask = sStreamReaderInfo->pTask;
611,490✔
2071

2072
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
611,490✔
2073

2074
  BUILD_OPTION(options, sStreamReaderInfo, -1, TSDB_ORDER_DESC, INT64_MIN, INT64_MAX, sStreamReaderInfo->tsSchemas, true,
611,490✔
2075
               STREAM_SCAN_GROUP_ONE_BY_ONE, 0, true, sStreamReaderInfo->uidHashTrigger);
2076
  SStorageAPI api = {0};
611,429✔
2077
  initStorageAPI(&api);
611,429✔
2078
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
612,773!
2079

2080
  lastTsRsp.ver = pVnode->state.applied + 1;
612,773✔
2081

2082
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &lastTsRsp, sStreamReaderInfo, pTaskInner));
612,773!
2083
  ST_TASK_DLOG("vgId:%d %s get result, ver:%" PRId64, TD_VID(pVnode), __func__, lastTsRsp.ver);
612,773✔
2084
  STREAM_CHECK_RET_GOTO(buildTsRsp(&lastTsRsp, &buf, &size))
612,773!
2085
  if (stDebugFlag & DEBUG_DEBUG) {
612,670✔
2086
    int32_t nInfo = taosArrayGetSize(lastTsRsp.tsInfo);
453,708✔
2087
    for (int32_t i = 0; i < nInfo; i++) {
683,564✔
2088
      STsInfo* tsInfo = TARRAY_GET_ELEM(lastTsRsp.tsInfo, i);
229,650✔
2089
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64, TD_VID(pVnode), __func__, tsInfo->ts, tsInfo->gId);
229,650!
2090
    }
2091
  }
2092

2093
end:
612,876✔
2094
  STREAM_PRINT_LOG_END_WITHID(code, lino);
612,773!
2095
  SRpcMsg rsp = {
614,107✔
2096
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2097
  tmsgSendRsp(&rsp);
611,418✔
2098
  taosArrayDestroy(lastTsRsp.tsInfo);
611,470✔
2099
  releaseStreamTask(&pTaskInner);
612,773✔
2100
  return code;
612,773✔
2101
}
2102

2103
static int32_t vnodeProcessStreamFirstTsReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
512,296✔
2104
  int32_t                 code = 0;
512,296✔
2105
  int32_t                 lino = 0;
512,296✔
2106
  SStreamReaderTaskInner* pTaskInner = NULL;
512,296✔
2107
  SStreamTsResponse       firstTsRsp = {0};
512,296✔
2108
  void*                   buf = NULL;
512,296✔
2109
  size_t                  size = 0;
512,296✔
2110

2111
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
512,296!
2112
  void* pTask = sStreamReaderInfo->pTask;
512,296✔
2113
  ST_TASK_DLOG("vgId:%d %s start, startTime:%"PRId64" ver:%"PRId64" gid:%"PRId64, TD_VID(pVnode), __func__, req->firstTsReq.startTime, req->firstTsReq.ver, req->firstTsReq.gid);
512,296✔
2114
  BUILD_OPTION(options, sStreamReaderInfo, req->firstTsReq.ver, TSDB_ORDER_ASC, req->firstTsReq.startTime, INT64_MAX, sStreamReaderInfo->tsSchemas, true,
512,296✔
2115
               STREAM_SCAN_GROUP_ONE_BY_ONE, req->firstTsReq.gid, true, sStreamReaderInfo->uidHashTrigger);
2116
  SStorageAPI api = {0};
512,296✔
2117
  initStorageAPI(&api);
514,984✔
2118
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
510,952✔
2119
  
2120
  firstTsRsp.ver = pVnode->state.applied;
499,289✔
2121
  STREAM_CHECK_RET_GOTO(processTs(pVnode, &firstTsRsp, sStreamReaderInfo, pTaskInner));
500,634!
2122

2123
  ST_TASK_DLOG("vgId:%d %s get result size:%"PRIzu", ver:%"PRId64, TD_VID(pVnode), __func__, taosArrayGetSize(firstTsRsp.tsInfo), firstTsRsp.ver);
501,982✔
2124
  STREAM_CHECK_RET_GOTO(buildTsRsp(&firstTsRsp, &buf, &size));
501,982!
2125
  if (stDebugFlag & DEBUG_DEBUG) {
500,637✔
2126
    int32_t nInfo = taosArrayGetSize(firstTsRsp.tsInfo);
331,016✔
2127
    for (int32_t i = 0; i < nInfo; i++) {
625,813✔
2128
      STsInfo* tsInfo = TARRAY_GET_ELEM(firstTsRsp.tsInfo, i);
294,797✔
2129
      ST_TASK_DLOG("vgId:%d %s get ts:%" PRId64 ", gId:%" PRIu64, TD_VID(pVnode), __func__, tsInfo->ts, tsInfo->gId);
294,797!
2130
    }
2131
  }
2132

2133
end:
512,294✔
2134
  STREAM_PRINT_LOG_END_WITHID(code, lino);
510,945!
2135
  SRpcMsg rsp = {
510,945✔
2136
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2137
  tmsgSendRsp(&rsp);
509,601✔
2138
  taosArrayDestroy(firstTsRsp.tsInfo);
514,984✔
2139
  releaseStreamTask(&pTaskInner);
514,984✔
2140
  return code;
512,294✔
2141
}
2142

2143
static int32_t vnodeProcessStreamTsdbMetaReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,032,663✔
2144
  int32_t code = 0;
1,032,663✔
2145
  int32_t lino = 0;
1,032,663✔
2146
  void*   buf = NULL;
1,032,663✔
2147
  size_t  size = 0;
1,033,991✔
2148

2149
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,033,991!
2150
  void* pTask = sStreamReaderInfo->pTask;
1,033,991✔
2151
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
1,033,991✔
2152

2153
  SStreamReaderTaskInner* pTaskInner = NULL;
1,033,991✔
2154
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_META);
1,033,991✔
2155

2156
  if (req->base.type == STRIGGER_PULL_TSDB_META) {
1,033,991!
2157
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbMetaReq.ver, req->tsdbMetaReq.order, req->tsdbMetaReq.startTime, req->tsdbMetaReq.endTime, sStreamReaderInfo->tsSchemas, true, 
1,033,991✔
2158
      (req->tsdbMetaReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), req->tsdbMetaReq.gid, true, sStreamReaderInfo->uidHashTrigger);
2159
    SStorageAPI api = {0};
1,033,991✔
2160
    initStorageAPI(&api);
1,033,991✔
2161
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
1,033,991✔
2162
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
1,022,334!
2163
    
2164
    STREAM_CHECK_RET_GOTO(createBlockForTsdbMeta(&pTaskInner->pResBlockDst, sStreamReaderInfo->isVtableStream));
1,022,334!
2165
  } else {
2166
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2167
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2168
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2169
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2170
  }
2171

2172
  blockDataCleanup(pTaskInner->pResBlockDst);
1,022,334✔
2173
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pTaskInner->pResBlockDst, STREAM_RETURN_ROWS_NUM));
1,022,334!
2174
  bool hasNext = true;
1,022,334✔
2175
  while (true) {
304,874✔
2176
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
1,327,208!
2177
    if (!hasNext) {
1,327,208!
2178
      break;
1,022,334✔
2179
    }
2180
    pTaskInner->api.tsdReader.tsdReaderReleaseDataBlock(pTaskInner->pReader);
304,874✔
2181
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
304,874✔
2182

2183
    int32_t index = 0;
304,874✔
2184
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.skey));
304,874!
2185
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.window.ekey));
304,874!
2186
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.uid));
304,874!
2187
    if (!sStreamReaderInfo->isVtableStream) {
304,874!
2188
      STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.id.groupId));
96,052!
2189
    }
2190
    STREAM_CHECK_RET_GOTO(addColData(pTaskInner->pResBlockDst, index++, &pTaskInner->pResBlock->info.rows));
304,874!
2191

2192
    stDebug("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
304,874✔
2193
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2194
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2195
            pTaskInner->pResBlockDst->info.rows++;
304,874✔
2196
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
304,874!
2197
      break;
×
2198
    }
2199
  }
2200

2201
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
1,022,334✔
2202
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
1,022,334!
2203
  printDataBlock(pTaskInner->pResBlockDst, __func__, "meta", ((SStreamTask *)sStreamReaderInfo->pTask)->streamId);
1,022,334✔
2204
  if (!hasNext) {
1,022,334!
2205
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
1,022,334!
2206
  }
2207

2208
end:
1,033,991✔
2209
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,033,991!
2210
  SRpcMsg rsp = {
1,033,991✔
2211
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2212
  tmsgSendRsp(&rsp);
1,033,991✔
2213
  return code;
1,033,991✔
2214
}
2215

2216
static int32_t vnodeProcessStreamTsdbTsDataReqNonVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
52,733✔
2217
  int32_t                 code = 0;
52,733✔
2218
  int32_t                 lino = 0;
52,733✔
2219
  SStreamReaderTaskInner* pTaskInner = NULL;
52,733✔
2220
  void*                   buf = NULL;
52,733✔
2221
  size_t                  size = 0;
52,733✔
2222
  SSDataBlock*            pBlockRes = NULL;
52,733✔
2223

2224
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
52,733!
2225
  void* pTask = sStreamReaderInfo->pTask;
52,733✔
2226
  ST_TASK_DLOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
52,733!
2227
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2228
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2229

2230
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
52,733✔
2231
               sStreamReaderInfo->triggerCols, false, STREAM_SCAN_ALL, 0, true, NULL);
2232
  options.uid = req->tsdbTsDataReq.uid;
52,733✔
2233
  SStorageAPI api = {0};
52,733✔
2234
  initStorageAPI(&api);
52,733✔
2235
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
52,733!
2236
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
52,733!
2237
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
52,733!
2238

2239
  while (1) {
52,733✔
2240
    bool hasNext = false;
105,466✔
2241
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
105,466!
2242
    if (!hasNext) {
105,466!
2243
      break;
52,733✔
2244
    }
2245
    if (!sStreamReaderInfo->isVtableStream){
52,733!
2246
      pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
52,733✔
2247
    }
2248

2249
    SSDataBlock* pBlock = NULL;
52,733✔
2250
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
52,733!
2251
    if (pBlock != NULL && pBlock->info.rows > 0) {
52,733!
2252
      STREAM_CHECK_RET_GOTO(processTag(pVnode, sStreamReaderInfo, false, &api, pBlock->info.id.uid, pBlock,
52,733!
2253
          0, pBlock->info.rows, 1));
2254
    }
2255
    
2256
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
52,733!
2257
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
52,733!
2258
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
52,733!
2259
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2260
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2261
  }
2262

2263
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
52,733✔
2264

2265
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
52,733!
2266
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
52,733!
2267

2268
end:
52,733✔
2269
  STREAM_PRINT_LOG_END_WITHID(code, lino);
52,733!
2270
  SRpcMsg rsp = {
52,733✔
2271
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2272
  tmsgSendRsp(&rsp);
52,733✔
2273
  blockDataDestroy(pBlockRes);
52,733✔
2274

2275
  releaseStreamTask(&pTaskInner);
52,733✔
2276
  return code;
51,395✔
2277
}
2278

2279
static int32_t vnodeProcessStreamTsdbTsDataReqVTable(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
×
2280
  int32_t                 code = 0;
×
2281
  int32_t                 lino = 0;
×
2282
  SStreamReaderTaskInner* pTaskInner = NULL;
×
2283
  void*                   buf = NULL;
×
2284
  size_t                  size = 0;
×
2285
  SSDataBlock*            pBlockRes = NULL;
×
2286

2287
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
×
2288
  void* pTask = sStreamReaderInfo->pTask;
×
2289
  ST_TASK_ELOG("vgId:%d %s start, ver:%"PRId64",skey:%"PRId64",ekey:%"PRId64",uid:%"PRId64",suid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTsDataReq.ver, 
×
2290
                req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey, 
2291
                req->tsdbTsDataReq.uid, req->tsdbTsDataReq.suid);
2292

2293
  BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTsDataReq.ver, TSDB_ORDER_ASC, req->tsdbTsDataReq.skey, req->tsdbTsDataReq.ekey,
×
2294
               sStreamReaderInfo->tsSchemas, true, STREAM_SCAN_ALL, 0, true, NULL);
2295
  options.suid = req->tsdbTsDataReq.suid;
×
2296
  options.uid = req->tsdbTsDataReq.uid;
×
2297
  SStorageAPI api = {0};
×
2298
  initStorageAPI(&api);
×
2299
  STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->tsBlock, &api));
×
2300
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->tsBlock, false, &pBlockRes));
×
2301

2302
  while (1) {
×
2303
    bool hasNext = false;
×
2304
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
×
2305
    if (!hasNext) {
×
2306
      break;
×
2307
    }
2308

2309
    SSDataBlock* pBlock = NULL;
×
2310
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
×
2311
    STREAM_CHECK_RET_GOTO(blockDataMerge(pBlockRes, pBlock));
×
2312
    ST_TASK_DLOG("vgId:%d %s get  skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
×
2313
            TD_VID(pVnode), __func__, pBlockRes->info.window.skey, pBlockRes->info.window.ekey,
2314
            pBlockRes->info.id.uid, pBlockRes->info.id.groupId, pBlockRes->info.rows);
2315
  }
2316

2317
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
×
2318
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
×
2319

2320
end:
×
2321
  STREAM_PRINT_LOG_END_WITHID(code, lino);
×
2322
  SRpcMsg rsp = {
×
2323
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2324
  tmsgSendRsp(&rsp);
×
2325
  blockDataDestroy(pBlockRes);
×
2326

2327
  releaseStreamTask(&pTaskInner);
×
2328
  return code;
×
2329
}
2330

2331
static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
394,519✔
2332
  int32_t code = 0;
394,519✔
2333
  int32_t lino = 0;
394,519✔
2334
  void*   buf = NULL;
394,519✔
2335
  size_t  size = 0;
394,519✔
2336

2337
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
394,519!
2338
  SStreamReaderTaskInner* pTaskInner = NULL;
394,519✔
2339
  void* pTask = sStreamReaderInfo->pTask;
394,519✔
2340
  ST_TASK_DLOG("vgId:%d %s start. ver:%"PRId64",order:%d,startTs:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, req->tsdbTriggerDataReq.gid);
394,519✔
2341
  
2342
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_TRIGGER_DATA);
394,519✔
2343

2344
  if (req->base.type == STRIGGER_PULL_TSDB_TRIGGER_DATA) {
394,519✔
2345
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbTriggerDataReq.ver, req->tsdbTriggerDataReq.order, req->tsdbTriggerDataReq.startTime, INT64_MAX,
189,843✔
2346
                 sStreamReaderInfo->triggerCols, false, (req->tsdbTriggerDataReq.gid != 0 ? STREAM_SCAN_GROUP_ONE_BY_ONE : STREAM_SCAN_ALL), 
2347
                 req->tsdbTriggerDataReq.gid, true, NULL);
2348
    SStorageAPI api = {0};
189,843✔
2349
    initStorageAPI(&api);
189,843✔
2350
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
189,843!
2351

2352
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
189,843!
2353
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
189,843!
2354
  } else {
2355
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
204,676✔
2356
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
204,676!
2357
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
204,676✔
2358
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
204,676!
2359
  }
2360

2361
  blockDataCleanup(pTaskInner->pResBlockDst);
394,519✔
2362
  bool hasNext = true;
394,519✔
2363
  while (1) {
×
2364
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
394,519!
2365
    if (!hasNext) {
394,519!
2366
      break;
189,843✔
2367
    }
2368
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
204,676✔
2369
    pTaskInner->pResBlockDst->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
204,676✔
2370

2371
    SSDataBlock* pBlock = NULL;
204,676✔
2372
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
204,676!
2373
    if (pBlock != NULL && pBlock->info.rows > 0) {
204,676!
2374
      STREAM_CHECK_RET_GOTO(
204,676!
2375
        processTag(pVnode, sStreamReaderInfo, false, &pTaskInner->api, pBlock->info.id.uid, pBlock, 0, pBlock->info.rows, 1));
2376
    }
2377
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
204,676!
2378
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
204,676!
2379
    ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
204,676✔
2380
            TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
2381
            pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
2382
    if (pTaskInner->pResBlockDst->info.rows >= 0) { //todo
204,676!
2383
      break;
204,676✔
2384
    }
2385
  }
2386

2387
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
394,519!
2388
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
394,519✔
2389
  if (!hasNext) {
394,519!
2390
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
189,843!
2391
  }
2392

2393
end:
394,519✔
2394
  STREAM_PRINT_LOG_END_WITHID(code, lino);
394,519!
2395
  SRpcMsg rsp = {
394,519✔
2396
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2397
  tmsgSendRsp(&rsp);
394,519✔
2398

2399
  return code;
394,519✔
2400
}
2401

2402
static int32_t vnodeProcessStreamTsdbCalcDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
37,755,774✔
2403
  int32_t code = 0;
37,755,774✔
2404
  int32_t lino = 0;
37,755,774✔
2405
  void*   buf = NULL;
37,755,774✔
2406
  size_t  size = 0;
37,755,774✔
2407
  SSDataBlock*            pBlockRes = NULL;
37,757,108✔
2408

2409
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
37,757,108!
2410
  void* pTask = sStreamReaderInfo->pTask;
37,757,108✔
2411
  ST_TASK_DLOG("vgId:%d %s start, skey:%"PRId64",ekey:%"PRId64",gid:%"PRId64, TD_VID(pVnode), __func__, 
37,757,108✔
2412
    req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey, req->tsdbCalcDataReq.gid);
2413

2414
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo->triggerCols, TSDB_CODE_STREAM_NOT_TABLE_SCAN_PLAN);
37,759,776!
2415

2416
  SStreamReaderTaskInner* pTaskInner = NULL;
37,758,442✔
2417
  int64_t                 key = getSessionKey(req->base.sessionId, STRIGGER_PULL_TSDB_CALC_DATA);
37,758,442✔
2418

2419
  if (req->base.type == STRIGGER_PULL_TSDB_CALC_DATA) {
37,757,108!
2420
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbCalcDataReq.ver, TSDB_ORDER_ASC, req->tsdbCalcDataReq.skey, req->tsdbCalcDataReq.ekey,
37,757,108✔
2421
                 sStreamReaderInfo->triggerCols, false, STREAM_SCAN_GROUP_ONE_BY_ONE, req->tsdbCalcDataReq.gid, true, NULL);
2422
    SStorageAPI api = {0};
37,758,442✔
2423
    initStorageAPI(&api);
37,758,442✔
2424
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, sStreamReaderInfo->triggerResBlock, &api));
37,757,108✔
2425

2426
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
37,602,414!
2427
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerResBlock, false, &pTaskInner->pResBlockDst));
37,603,748!
2428
  } else {
2429
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2430
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2431
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2432
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2433
  }
2434

2435
  blockDataCleanup(pTaskInner->pResBlockDst);
37,597,078✔
2436
  bool hasNext = true;
37,599,746✔
2437
  while (1) {
3,275,344✔
2438
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
40,875,090!
2439
    if (!hasNext) {
40,869,842!
2440
      break;
37,598,430✔
2441
    }
2442
    pTaskInner->pResBlock->info.id.groupId = qStreamGetGroupId(pTaskInner->pTableList, pTaskInner->pResBlock->info.id.uid);
3,271,412✔
2443

2444
    SSDataBlock* pBlock = NULL;
3,274,036✔
2445
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
3,274,036!
2446
    STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, pTaskInner->pFilterInfo, NULL));
3,270,096!
2447
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
3,267,472!
2448
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
3,275,344!
2449
      break;
×
2450
    }
2451
  }
2452

2453
  STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcResBlock, false, &pBlockRes));
37,595,762!
2454
  STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlockRes, pTaskInner->pResBlockDst->info.capacity));
37,599,768!
2455
  blockDataTransform(pBlockRes, pTaskInner->pResBlockDst);
37,597,096✔
2456
  STREAM_CHECK_RET_GOTO(buildRsp(pBlockRes, &buf, &size));
37,597,096!
2457
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlockRes->info.rows);
37,593,134✔
2458
  if (!hasNext) {
37,601,080!
2459
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
37,601,080!
2460
  }
2461

2462
end:
37,747,770✔
2463
  STREAM_PRINT_LOG_END_WITHID(code, lino);
37,747,792!
2464
  SRpcMsg rsp = {
37,761,110✔
2465
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2466
  tmsgSendRsp(&rsp);
37,757,108✔
2467
  blockDataDestroy(pBlockRes);
37,751,772✔
2468
  return code;
37,755,774✔
2469
}
2470

2471
static int32_t vnodeProcessStreamTsdbVirtalDataReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
220,234✔
2472
  int32_t code = 0;
220,234✔
2473
  int32_t lino = 0;
220,234✔
2474
  void*   buf = NULL;
220,234✔
2475
  size_t  size = 0;
220,234✔
2476
  int32_t* slotIdList = NULL;
220,234✔
2477
  SArray* sortedCid = NULL;
220,234✔
2478
  SArray* schemas = NULL;
220,234✔
2479
  
2480
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
220,234!
2481
  void* pTask = sStreamReaderInfo->pTask;
220,234✔
2482
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
220,234✔
2483

2484
  SStreamReaderTaskInner* pTaskInner = NULL;
220,234✔
2485
  int64_t key = req->tsdbDataReq.uid;
220,234✔
2486

2487
  if (req->base.type == STRIGGER_PULL_TSDB_DATA) {
220,234!
2488
    // sort cid and build slotIdList
2489
    slotIdList = taosMemoryMalloc(taosArrayGetSize(req->tsdbDataReq.cids) * sizeof(int32_t));
220,234!
2490
    STREAM_CHECK_NULL_GOTO(slotIdList, terrno);
220,234!
2491
    sortedCid = taosArrayDup(req->tsdbDataReq.cids, NULL);
220,234✔
2492
    STREAM_CHECK_NULL_GOTO(sortedCid, terrno);
220,234!
2493
    taosArraySort(sortedCid, sortCid);
220,234✔
2494
    for (int32_t i = 0; i < taosArrayGetSize(req->tsdbDataReq.cids); i++) {
680,425✔
2495
      int16_t* cid = taosArrayGet(req->tsdbDataReq.cids, i);
460,191✔
2496
      STREAM_CHECK_NULL_GOTO(cid, terrno);
460,191!
2497
      for (int32_t j = 0; j < taosArrayGetSize(sortedCid); j++) {
730,135!
2498
        int16_t* cidSorted = taosArrayGet(sortedCid, j);
730,135✔
2499
        STREAM_CHECK_NULL_GOTO(cidSorted, terrno);
730,135!
2500
        if (*cid == *cidSorted) {
730,135✔
2501
          slotIdList[j] = i;
460,191✔
2502
          break;
460,191✔
2503
        }
2504
      }
2505
    }
2506

2507
    STREAM_CHECK_RET_GOTO(buildScheamFromMeta(pVnode, req->tsdbDataReq.uid, &schemas));
220,234!
2508
    STREAM_CHECK_RET_GOTO(shrinkScheams(req->tsdbDataReq.cids, schemas));
220,234!
2509
    BUILD_OPTION(options, sStreamReaderInfo, req->tsdbDataReq.ver, req->tsdbDataReq.order, req->tsdbDataReq.skey,
220,234✔
2510
                    req->tsdbDataReq.ekey, schemas, true, STREAM_SCAN_ALL, 0, false, NULL);
2511

2512
    options.suid = req->tsdbDataReq.suid;
220,234✔
2513
    options.uid = req->tsdbDataReq.uid;
220,234✔
2514

2515
    SStorageAPI api = {0};
220,234✔
2516
    initStorageAPI(&api);
220,234✔
2517
    STREAM_CHECK_RET_GOTO(createStreamTask(pVnode, &options, &pTaskInner, NULL, &api));
220,234!
2518
    STREAM_CHECK_RET_GOTO(taosHashPut(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES, &pTaskInner, sizeof(pTaskInner)));
220,234!
2519

2520
    STableKeyInfo       keyInfo = {.uid = req->tsdbDataReq.uid};
220,234✔
2521
    cleanupQueryTableDataCond(&pTaskInner->cond);
220,234✔
2522
    taosArraySort(pTaskInner->options.schemas, sortSSchema);
220,234✔
2523

2524
    STREAM_CHECK_RET_GOTO(qStreamInitQueryTableDataCond(&pTaskInner->cond, pTaskInner->options.order, pTaskInner->options.schemas,
220,234!
2525
                                                        pTaskInner->options.isSchema, pTaskInner->options.twindows,
2526
                                                        pTaskInner->options.suid, pTaskInner->options.ver, &slotIdList));
2527
    STREAM_CHECK_RET_GOTO(pTaskInner->api.tsdReader.tsdReaderOpen(pVnode, &pTaskInner->cond, &keyInfo, 1, pTaskInner->pResBlock,
220,234!
2528
                                                             (void**)&pTaskInner->pReader, pTaskInner->idStr, NULL));
2529
    STREAM_CHECK_RET_GOTO(createOneDataBlock(pTaskInner->pResBlock, false, &pTaskInner->pResBlockDst));
220,234!
2530
  } else {
2531
    void** tmp = taosHashGet(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES);
×
2532
    STREAM_CHECK_NULL_GOTO(tmp, TSDB_CODE_STREAM_NO_CONTEXT);
×
2533
    pTaskInner = *(SStreamReaderTaskInner**)tmp;
×
2534
    STREAM_CHECK_NULL_GOTO(pTaskInner, TSDB_CODE_INTERNAL_ERROR);
×
2535
  }
2536

2537
  blockDataCleanup(pTaskInner->pResBlockDst);
220,234✔
2538
  bool hasNext = true;
220,234✔
2539
  while (1) {
220,234✔
2540
    STREAM_CHECK_RET_GOTO(getTableDataInfo(pTaskInner, &hasNext));
440,468!
2541
    if (!hasNext) {
440,468!
2542
      break;
220,234✔
2543
    }
2544

2545
    SSDataBlock* pBlock = NULL;
220,234✔
2546
    STREAM_CHECK_RET_GOTO(getTableData(pTaskInner, &pBlock));
220,234!
2547
    STREAM_CHECK_RET_GOTO(blockDataMerge(pTaskInner->pResBlockDst, pBlock));
220,234!
2548
    if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
220,234!
2549
      break;
×
2550
    }
2551
  }
2552
  STREAM_CHECK_RET_GOTO(buildRsp(pTaskInner->pResBlockDst, &buf, &size));
220,234!
2553
  ST_TASK_DLOG("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlockDst->info.rows);
220,234✔
2554
  printDataBlock(pTaskInner->pResBlockDst, __func__, "tsdb_data", ((SStreamTask*)pTask)->streamId);
220,234✔
2555
  if (!hasNext) {
220,234!
2556
    STREAM_CHECK_RET_GOTO(taosHashRemove(sStreamReaderInfo->streamTaskMap, &key, LONG_BYTES));
220,234!
2557
  }
2558

2559
end:
220,234✔
2560
  STREAM_PRINT_LOG_END_WITHID(code, lino);
220,234!
2561
  SRpcMsg rsp = {
220,234✔
2562
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2563
  tmsgSendRsp(&rsp);
220,234✔
2564
  taosMemFree(slotIdList);
220,234✔
2565
  taosArrayDestroy(sortedCid);
220,234✔
2566
  taosArrayDestroy(schemas);
220,234✔
2567
  return code;
220,234✔
2568
}
2569

2570
static int32_t vnodeProcessStreamWalMetaNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
15,275,330✔
2571
  int32_t      code = 0;
15,275,330✔
2572
  int32_t      lino = 0;
15,275,330✔
2573
  void*        buf = NULL;
15,275,330✔
2574
  size_t       size = 0;
15,275,330✔
2575
  int64_t      lastVer = 0;
15,275,330✔
2576
  SSTriggerWalNewRsp resultRsp = {0};
15,275,330✔
2577

2578
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
15,274,001✔
2579
  void* pTask = sStreamReaderInfo->pTask;
15,219,275✔
2580
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaNewReq.lastVer);
15,224,632✔
2581

2582
  if (sStreamReaderInfo->metaBlock == NULL) {
15,227,291✔
2583
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
396,230!
2584
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
394,900!
2585
  }
2586
  blockDataEmpty(sStreamReaderInfo->metaBlock);
15,228,671✔
2587
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
15,217,979✔
2588
  resultRsp.ver = req->walMetaNewReq.lastVer;
15,219,309✔
2589
  STREAM_CHECK_RET_GOTO(processWalVerMetaNew(pVnode, &resultRsp, sStreamReaderInfo, req->walMetaNewReq.ctime));
15,216,617!
2590

2591
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
15,205,878✔
2592
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
15,212,565✔
2593
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, NULL);
314,038✔
2594
  buf = rpcMallocCont(size);
314,038✔
2595
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, NULL);
314,038✔
2596
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
314,038✔
2597

2598
end:
15,267,291✔
2599
  if (resultRsp.totalRows == 0) {
15,268,625✔
2600
    code = TSDB_CODE_STREAM_NO_DATA;
14,953,246✔
2601
    buf = rpcMallocCont(sizeof(int64_t));
14,953,246✔
2602
    *(int64_t *)buf = resultRsp.ver;
14,928,119✔
2603
    size = sizeof(int64_t);
14,928,119✔
2604
  }
2605
  SRpcMsg rsp = {
15,243,498✔
2606
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2607
  tmsgSendRsp(&rsp);
15,248,894✔
2608
  if (code == TSDB_CODE_STREAM_NO_DATA){
15,291,474✔
2609
    code = 0;
14,974,797✔
2610
  }
2611
  STREAM_PRINT_LOG_END_WITHID(code, lino);
15,291,474!
2612
  blockDataDestroy(resultRsp.deleteBlock);
15,299,546✔
2613
  blockDataDestroy(resultRsp.dropBlock);
15,291,436✔
2614

2615
  return code;
15,284,731✔
2616
}
2617
static int32_t vnodeProcessStreamWalMetaDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
9,811,405✔
2618
  int32_t      code = 0;
9,811,405✔
2619
  int32_t      lino = 0;
9,811,405✔
2620
  void*        buf = NULL;
9,811,405✔
2621
  size_t       size = 0;
9,811,405✔
2622
  SSTriggerWalNewRsp resultRsp = {0};
9,811,405✔
2623
  
2624
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
9,814,098!
2625
  void* pTask = sStreamReaderInfo->pTask;
9,814,098✔
2626
  ST_TASK_DLOG("vgId:%d %s start, request paras lastVer:%" PRId64, TD_VID(pVnode), __func__, req->walMetaDataNewReq.lastVer);
9,807,359✔
2627

2628
  if (sStreamReaderInfo->metaBlock == NULL) {
9,807,359✔
2629
    STREAM_CHECK_RET_GOTO(createBlockForWalMetaNew((SSDataBlock**)&sStreamReaderInfo->metaBlock));
233,078!
2630
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(sStreamReaderInfo->metaBlock, STREAM_RETURN_ROWS_NUM));
233,078!
2631
  }
2632
  resultRsp.metaBlock = sStreamReaderInfo->metaBlock;
9,814,087✔
2633
  resultRsp.dataBlock = sStreamReaderInfo->triggerBlock;
9,818,121✔
2634
  resultRsp.ver = req->walMetaDataNewReq.lastVer;
9,816,782✔
2635
  STREAM_CHECK_RET_GOTO(processWalVerMetaDataNew(pVnode, sStreamReaderInfo, &resultRsp));
9,815,432!
2636

2637
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
9,799,354✔
2638
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
386,404✔
2639
  buf = rpcMallocCont(size);
386,404✔
2640
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
387,748✔
2641
  printDataBlock(sStreamReaderInfo->metaBlock, __func__, "meta", ((SStreamTask*)pTask)->streamId);
387,748✔
2642
  printDataBlock(sStreamReaderInfo->triggerBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
386,413✔
2643
  printDataBlock(resultRsp.dropBlock, __func__, "drop", ((SStreamTask*)pTask)->streamId);
386,413✔
2644
  printDataBlock(resultRsp.deleteBlock, __func__, "delete", ((SStreamTask*)pTask)->streamId);
386,413✔
2645
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
386,413✔
2646

2647
end:
9,800,698✔
2648
  if (resultRsp.totalRows == 0) {
9,802,043✔
2649
    buf = rpcMallocCont(sizeof(int64_t));
9,419,681✔
2650
    *(int64_t *)buf = resultRsp.ver;
9,398,114✔
2651
    size = sizeof(int64_t);
9,406,129✔
2652
    code = TSDB_CODE_STREAM_NO_DATA;
9,406,129✔
2653
  }
2654
  SRpcMsg rsp = {
9,788,491✔
2655
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2656
  tmsgSendRsp(&rsp);
9,799,281✔
2657
  if (code == TSDB_CODE_STREAM_NO_DATA){
9,818,140✔
2658
    code = 0;
9,430,392✔
2659
  }
2660
  blockDataDestroy(resultRsp.deleteBlock);
9,818,140✔
2661
  blockDataDestroy(resultRsp.dropBlock);
9,815,422✔
2662

2663
  STREAM_PRINT_LOG_END_WITHID(code, lino);
9,812,716!
2664

2665
  return code;
9,814,052✔
2666
}
2667

2668
static int32_t vnodeProcessStreamWalDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
4,622,274✔
2669
  int32_t      code = 0;
4,622,274✔
2670
  int32_t      lino = 0;
4,622,274✔
2671
  void*        buf = NULL;
4,622,274✔
2672
  size_t       size = 0;
4,622,274✔
2673
  SSTriggerWalNewRsp resultRsp = {0};
4,622,274✔
2674

2675
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
4,622,274!
2676
  void* pTask = sStreamReaderInfo->pTask;
4,622,274✔
2677
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
4,622,274✔
2678

2679
  resultRsp.dataBlock = sStreamReaderInfo->triggerBlock;
4,622,274✔
2680
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
4,622,274!
2681
  ST_TASK_DLOG("vgId:%d %s get result last ver:%"PRId64" rows:%d", TD_VID(pVnode), __func__, resultRsp.ver, resultRsp.totalRows);
4,620,930✔
2682

2683
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
4,622,274✔
2684

2685
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
150,535✔
2686
  buf = rpcMallocCont(size);
150,535✔
2687
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
150,535✔
2688
  printDataBlock(sStreamReaderInfo->triggerBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
150,535✔
2689
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
150,535✔
2690

2691
end:
4,622,274✔
2692
  if (resultRsp.totalRows == 0) {
4,620,930✔
2693
    buf = rpcMallocCont(sizeof(int64_t));
4,470,395✔
2694
    *(int64_t *)buf = resultRsp.ver;
4,469,050✔
2695
    size = sizeof(int64_t);
4,469,050✔
2696
    code = TSDB_CODE_STREAM_NO_DATA;
4,469,050✔
2697
  }
2698
  SRpcMsg rsp = {
4,619,585✔
2699
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2700
  tmsgSendRsp(&rsp);
4,618,244✔
2701
  if (code == TSDB_CODE_STREAM_NO_DATA){
4,619,592✔
2702
    code = 0;
4,469,057✔
2703
  }
2704

2705
  blockDataDestroy(resultRsp.deleteBlock);
4,619,592✔
2706
  blockDataDestroy(resultRsp.dropBlock);
4,619,592✔
2707
  STREAM_PRINT_LOG_END_WITHID(code, lino);
4,620,933!
2708

2709
  return code;
4,620,933✔
2710
}
2711

2712
static int32_t vnodeProcessStreamWalCalcDataNewReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
1,056,727✔
2713
  int32_t      code = 0;
1,056,727✔
2714
  int32_t      lino = 0;
1,056,727✔
2715
  void*        buf = NULL;
1,056,727✔
2716
  size_t       size = 0;
1,056,727✔
2717
  SSTriggerWalNewRsp resultRsp = {0};
1,056,727✔
2718
  SSDataBlock* pBlock1 = NULL;
1,056,727✔
2719
  SSDataBlock* pBlock2 = NULL;
1,056,727✔
2720
  
2721
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
1,056,727!
2722
  void* pTask = sStreamReaderInfo->pTask;
1,056,727✔
2723
  ST_TASK_DLOG("vgId:%d %s start, request paras size:%zu", TD_VID(pVnode), __func__, taosArrayGetSize(req->walDataNewReq.versions));
1,056,727✔
2724

2725
  resultRsp.dataBlock = sStreamReaderInfo->isVtableStream ? sStreamReaderInfo->calcBlock : sStreamReaderInfo->triggerBlock;
1,056,727!
2726
  resultRsp.isCalc = sStreamReaderInfo->isVtableStream ? true : false;
1,056,727!
2727
  STREAM_CHECK_RET_GOTO(processWalVerDataNew(pVnode, sStreamReaderInfo, req->walDataNewReq.versions, req->walDataNewReq.ranges, &resultRsp));
1,056,727!
2728
  STREAM_CHECK_CONDITION_GOTO(resultRsp.totalRows == 0, TDB_CODE_SUCCESS);
1,055,382✔
2729

2730
  if (!sStreamReaderInfo->isVtableStream){
839,195!
2731
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->triggerBlock, true, &pBlock1));
675,424!
2732
    STREAM_CHECK_RET_GOTO(createOneDataBlock(sStreamReaderInfo->calcBlock, false, &pBlock2));
676,769!
2733
  
2734
    blockDataTransform(pBlock2, pBlock1);
676,769✔
2735
    resultRsp.dataBlock = pBlock2;
676,769✔
2736
  }
2737

2738
  size = tSerializeSStreamWalDataResponse(NULL, 0, &resultRsp, sStreamReaderInfo->indexHash);
840,540✔
2739
  buf = rpcMallocCont(size);
840,540✔
2740
  size = tSerializeSStreamWalDataResponse(buf, size, &resultRsp, sStreamReaderInfo->indexHash);
839,195✔
2741
  printDataBlock(resultRsp.dataBlock, __func__, "data", ((SStreamTask*)pTask)->streamId);
840,540✔
2742
  printIndexHash(sStreamReaderInfo->indexHash, pTask);
839,195✔
2743

2744
end:
1,056,727✔
2745
  if (resultRsp.totalRows == 0) {
1,056,727✔
2746
    buf = rpcMallocCont(sizeof(int64_t));
216,187✔
2747
    *(int64_t *)buf = resultRsp.ver;
216,187✔
2748
    size = sizeof(int64_t);
216,187✔
2749
    code = TSDB_CODE_STREAM_NO_DATA;
216,187✔
2750
  }
2751
  SRpcMsg rsp = {
1,056,727✔
2752
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2753
  tmsgSendRsp(&rsp);
1,055,382✔
2754
  if (code == TSDB_CODE_STREAM_NO_DATA){
1,056,727✔
2755
    code = 0;
216,187✔
2756
  }
2757

2758
  blockDataDestroy(pBlock1);
1,056,727✔
2759
  blockDataDestroy(pBlock2);
1,056,727✔
2760
  blockDataDestroy(resultRsp.deleteBlock);
1,056,727✔
2761
  blockDataDestroy(resultRsp.dropBlock);
1,056,727✔
2762
  STREAM_PRINT_LOG_END_WITHID(code, lino);
1,056,727!
2763

2764
  return code;
1,056,727✔
2765
}
2766

2767
static int32_t vnodeProcessStreamGroupColValueReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
914,918✔
2768
  int32_t code = 0;
914,918✔
2769
  int32_t lino = 0;
914,918✔
2770
  void*   buf = NULL;
914,918✔
2771
  size_t  size = 0;
914,918✔
2772

2773
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
914,918!
2774
  void* pTask = sStreamReaderInfo->pTask;
914,918✔
2775
  ST_TASK_DLOG("vgId:%d %s start, request gid:%" PRId64, TD_VID(pVnode), __func__, req->groupColValueReq.gid);
914,918✔
2776

2777
  SArray** gInfo = taosHashGet(sStreamReaderInfo->groupIdMap, &req->groupColValueReq.gid, POINTER_BYTES);
914,918✔
2778
  STREAM_CHECK_NULL_GOTO(gInfo, TSDB_CODE_STREAM_NO_CONTEXT);
914,918!
2779
  SStreamGroupInfo pGroupInfo = {0};
914,918✔
2780
  pGroupInfo.gInfo = *gInfo;
914,918✔
2781

2782
  size = tSerializeSStreamGroupInfo(NULL, 0, &pGroupInfo, TD_VID(pVnode));
914,918✔
2783
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
2784
  buf = rpcMallocCont(size);
914,918✔
2785
  STREAM_CHECK_NULL_GOTO(buf, terrno);
914,918!
2786
  size = tSerializeSStreamGroupInfo(buf, size, &pGroupInfo, TD_VID(pVnode));
914,918✔
2787
  STREAM_CHECK_CONDITION_GOTO(size < 0, size);
2788
end:
914,918✔
2789
  if (code != 0) {
914,918!
2790
    rpcFreeCont(buf);
×
2791
    buf = NULL;
×
2792
    size = 0;
×
2793
  }
2794
  STREAM_PRINT_LOG_END_WITHID(code, lino);
914,918!
2795
  SRpcMsg rsp = {
914,918✔
2796
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2797
  tmsgSendRsp(&rsp);
914,918✔
2798

2799
  return code;
914,918✔
2800
}
2801

2802
static int32_t vnodeProcessStreamVTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req, SStreamTriggerReaderInfo* sStreamReaderInfo) {
491,034✔
2803
  int32_t              code = 0;
491,034✔
2804
  int32_t              lino = 0;
491,034✔
2805
  void*                buf = NULL;
491,034✔
2806
  size_t               size = 0;
491,034✔
2807
  SStreamMsgVTableInfo vTableInfo = {0};
491,034✔
2808
  SMetaReader          metaReader = {0};
491,034✔
2809
  SStorageAPI api = {0};
491,034✔
2810
  initStorageAPI(&api);
491,034✔
2811

2812
  STREAM_CHECK_NULL_GOTO(sStreamReaderInfo, terrno);
492,375!
2813
  void* pTask = sStreamReaderInfo->pTask;
492,375✔
2814
  ST_TASK_DLOG("vgId:%d %s start", TD_VID(pVnode), __func__);
492,375✔
2815

2816
  SArray* cids = req->virTableInfoReq.cids;
492,375✔
2817
  STREAM_CHECK_NULL_GOTO(cids, terrno);
492,375!
2818

2819
  SArray* pTableListArray = qStreamGetTableArrayList(sStreamReaderInfo->tableList);
492,375✔
2820
  STREAM_CHECK_NULL_GOTO(pTableListArray, terrno);
491,034!
2821

2822
  vTableInfo.infos = taosArrayInit(taosArrayGetSize(pTableListArray), sizeof(VTableInfo));
491,034✔
2823
  STREAM_CHECK_NULL_GOTO(vTableInfo.infos, terrno);
491,034!
2824
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
491,034✔
2825

2826
  for (size_t i = 0; i < taosArrayGetSize(pTableListArray); i++) {
1,257,896✔
2827
    STableKeyInfo* pKeyInfo = taosArrayGet(pTableListArray, i);
765,521✔
2828
    if (pKeyInfo == NULL) {
765,521!
2829
      continue;
×
2830
    }
2831
    VTableInfo* vTable = taosArrayReserve(vTableInfo.infos, 1);
765,521✔
2832
    STREAM_CHECK_NULL_GOTO(vTable, terrno);
765,521!
2833
    vTable->uid = pKeyInfo->uid;
765,521✔
2834
    vTable->gId = pKeyInfo->groupId;
765,521✔
2835

2836
    code = api.metaReaderFn.getTableEntryByUid(&metaReader, pKeyInfo->uid);
765,521✔
2837
    if (taosArrayGetSize(cids) == 1 && *(col_id_t*)taosArrayGet(cids, 0) == PRIMARYKEY_TIMESTAMP_COL_ID){
764,180!
2838
      vTable->cols.nCols = metaReader.me.colRef.nCols;
33,358✔
2839
      vTable->cols.version = metaReader.me.colRef.version;
33,358✔
2840
      vTable->cols.pColRef = taosMemoryCalloc(metaReader.me.colRef.nCols, sizeof(SColRef));
33,358!
2841
      for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
200,148✔
2842
        memcpy(vTable->cols.pColRef + j, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
166,790!
2843
      }
2844
    } else {
2845
      vTable->cols.nCols = taosArrayGetSize(cids);
732,163✔
2846
      vTable->cols.version = metaReader.me.colRef.version;
732,163✔
2847
      vTable->cols.pColRef = taosMemoryCalloc(taosArrayGetSize(cids), sizeof(SColRef));
730,822!
2848
      for (size_t i = 0; i < taosArrayGetSize(cids); i++) {
2,396,700✔
2849
        for (size_t j = 0; j < metaReader.me.colRef.nCols; j++) {
7,099,326✔
2850
          if (metaReader.me.colRef.pColRef[j].hasRef &&
6,363,147!
2851
              metaReader.me.colRef.pColRef[j].id == *(col_id_t*)taosArrayGet(cids, i)) {
4,611,442✔
2852
            memcpy(vTable->cols.pColRef + i, &metaReader.me.colRef.pColRef[j], sizeof(SColRef));
933,715!
2853
            break;
931,047✔
2854
          }
2855
        }
2856
      }
2857
    }
2858
    tDecoderClear(&metaReader.coder);
765,521✔
2859
  }
2860
  ST_TASK_DLOG("vgId:%d %s end", TD_VID(pVnode), __func__);
491,034✔
2861
  STREAM_CHECK_RET_GOTO(buildVTableInfoRsp(&vTableInfo, &buf, &size));
491,034!
2862

2863
end:
492,375✔
2864
  tDestroySStreamMsgVTableInfo(&vTableInfo);
492,375✔
2865
  api.metaReaderFn.clearReader(&metaReader);
491,034✔
2866
  STREAM_PRINT_LOG_END_WITHID(code, lino);
492,375!
2867
  SRpcMsg rsp = {
492,375✔
2868
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2869
  tmsgSendRsp(&rsp);
489,693✔
2870
  return code;
492,375✔
2871
}
2872

2873
static int32_t vnodeProcessStreamOTableInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
136,722✔
2874
  int32_t                   code = 0;
136,722✔
2875
  int32_t                   lino = 0;
136,722✔
2876
  void*                     buf = NULL;
136,722✔
2877
  size_t                    size = 0;
136,722✔
2878
  SSTriggerOrigTableInfoRsp oTableInfo = {0};
136,722✔
2879
  SMetaReader               metaReader = {0};
136,722✔
2880
  int64_t streamId = req->base.streamId;
136,722✔
2881
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
136,722✔
2882

2883
  SStorageAPI api = {0};
136,722✔
2884
  initStorageAPI(&api);
136,722✔
2885

2886
  SArray* cols = req->origTableInfoReq.cols;
136,722✔
2887
  STREAM_CHECK_NULL_GOTO(cols, terrno);
136,722!
2888

2889
  oTableInfo.cols = taosArrayInit(taosArrayGetSize(cols), sizeof(OTableInfoRsp));
136,722✔
2890

2891
  STREAM_CHECK_NULL_GOTO(oTableInfo.cols, terrno);
135,388!
2892

2893
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
135,388✔
2894
  for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
366,573✔
2895
    OTableInfo*    oInfo = taosArrayGet(cols, i);
229,851✔
2896
    OTableInfoRsp* vTableInfo = taosArrayReserve(oTableInfo.cols, 1);
229,851✔
2897
    STREAM_CHECK_NULL_GOTO(oInfo, terrno);
228,517!
2898
    STREAM_CHECK_NULL_GOTO(vTableInfo, terrno);
228,517!
2899
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByName(&metaReader, oInfo->refTableName));
228,517!
2900
    vTableInfo->uid = metaReader.me.uid;
228,517✔
2901
    stsDebug("vgId:%d %s uid:%"PRId64, TD_VID(pVnode), __func__, vTableInfo->uid);
229,851✔
2902

2903
    SSchemaWrapper* sSchemaWrapper = NULL;
229,851✔
2904
    if (metaReader.me.type == TD_CHILD_TABLE) {
229,851✔
2905
      int64_t suid = metaReader.me.ctbEntry.suid;
224,487✔
2906
      vTableInfo->suid = suid;
224,487✔
2907
      tDecoderClear(&metaReader.coder);
224,487✔
2908
      STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, suid));
224,487!
2909
      sSchemaWrapper = &metaReader.me.stbEntry.schemaRow;
224,487✔
2910
    } else if (metaReader.me.type == TD_NORMAL_TABLE) {
5,364!
2911
      vTableInfo->suid = 0;
5,364✔
2912
      sSchemaWrapper = &metaReader.me.ntbEntry.schemaRow;
5,364✔
2913
    } else {
2914
      stError("invalid table type:%d", metaReader.me.type);
×
2915
    }
2916

2917
    for (size_t j = 0; j < sSchemaWrapper->nCols; j++) {
1,020,858!
2918
      SSchema* s = sSchemaWrapper->pSchema + j;
1,019,575✔
2919
      if (strcmp(s->name, oInfo->refColName) == 0) {
1,019,575!
2920
        vTableInfo->cid = s->colId;
229,851✔
2921
        break;
229,851✔
2922
      }
2923
    }
2924
    if (vTableInfo->cid == 0) {
229,851!
2925
      stError("vgId:%d %s, not found col %s in table %s", TD_VID(pVnode), __func__, oInfo->refColName,
×
2926
              oInfo->refTableName);
2927
    }
2928
    tDecoderClear(&metaReader.coder);
229,851✔
2929
  }
2930

2931
  STREAM_CHECK_RET_GOTO(buildOTableInfoRsp(&oTableInfo, &buf, &size));
136,722!
2932

2933
end:
136,722✔
2934
  tDestroySTriggerOrigTableInfoRsp(&oTableInfo);
136,722✔
2935
  api.metaReaderFn.clearReader(&metaReader);
136,722✔
2936
  STREAM_PRINT_LOG_END(code, lino);
136,722!
2937
  SRpcMsg rsp = {
136,722✔
2938
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
2939
  tmsgSendRsp(&rsp);
136,722✔
2940
  return code;
136,722✔
2941
}
2942

2943
static int32_t vnodeProcessStreamVTableTagInfoReq(SVnode* pVnode, SRpcMsg* pMsg, SSTriggerPullRequestUnion* req) {
422,622✔
2944
  int32_t                   code = 0;
422,622✔
2945
  int32_t                   lino = 0;
422,622✔
2946
  void*                     buf = NULL;
422,622✔
2947
  size_t                    size = 0;
422,622✔
2948
  SSDataBlock* pBlock = NULL;
422,622✔
2949

2950
  SMetaReader               metaReader = {0};
422,622✔
2951
  SMetaReader               metaReaderStable = {0};
422,622✔
2952
  int64_t streamId = req->base.streamId;
422,622✔
2953
  stsDebug("vgId:%d %s start", TD_VID(pVnode), __func__);
422,622✔
2954

2955
  SStorageAPI api = {0};
422,622✔
2956
  initStorageAPI(&api);
422,622✔
2957

2958
  SArray* cols = req->virTablePseudoColReq.cids;
422,622✔
2959
  STREAM_CHECK_NULL_GOTO(cols, terrno);
422,622!
2960

2961
  api.metaReaderFn.initReader(&metaReader, pVnode, META_READER_LOCK, &api.metaFn);
422,622✔
2962
  STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReader, req->virTablePseudoColReq.uid));
422,622!
2963

2964
  STREAM_CHECK_CONDITION_GOTO(metaReader.me.type != TD_VIRTUAL_CHILD_TABLE && metaReader.me.type != TD_VIRTUAL_NORMAL_TABLE, TSDB_CODE_INVALID_PARA);
422,622!
2965

2966
  STREAM_CHECK_RET_GOTO(createDataBlock(&pBlock));
422,622!
2967
  if (metaReader.me.type == TD_VIRTUAL_NORMAL_TABLE) {
422,622✔
2968
    STREAM_CHECK_CONDITION_GOTO (taosArrayGetSize(cols) < 1 || *(col_id_t*)taosArrayGet(cols, 0) != -1, TSDB_CODE_INVALID_PARA);
9,387!
2969
    SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
9,387✔
2970
    STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
9,387!
2971
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
9,387!
2972
    pBlock->info.rows = 1;
9,387✔
2973
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, 0);
9,387✔
2974
    STREAM_CHECK_NULL_GOTO(pDst, terrno);
9,387!
2975
    STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
9,387!
2976
  } else if (metaReader.me.type == TD_VIRTUAL_CHILD_TABLE){
413,235!
2977
    int64_t suid = metaReader.me.ctbEntry.suid;
413,235✔
2978
    api.metaReaderFn.readerReleaseLock(&metaReader);
413,235✔
2979
    api.metaReaderFn.initReader(&metaReaderStable, pVnode, META_READER_LOCK, &api.metaFn);
413,235✔
2980

2981
    STREAM_CHECK_RET_GOTO(api.metaReaderFn.getTableEntryByUid(&metaReaderStable, suid));
413,235!
2982
    SSchemaWrapper*  sSchemaWrapper = &metaReaderStable.me.stbEntry.schemaTag;
413,235✔
2983
    for (size_t i = 0; i < taosArrayGetSize(cols); i++){
1,114,799✔
2984
      col_id_t* id = taosArrayGet(cols, i);
701,564✔
2985
      STREAM_CHECK_NULL_GOTO(id, terrno);
701,564!
2986
      if (*id == -1) {
701,564✔
2987
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN, -1);
413,235✔
2988
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
413,235!
2989
        continue;
413,235✔
2990
      }
2991
      size_t j = 0;
288,329✔
2992
      for (; j < sSchemaWrapper->nCols; j++) {
348,359!
2993
        SSchema* s = sSchemaWrapper->pSchema + j;
348,359✔
2994
        if (s->colId == *id) {
348,359✔
2995
          SColumnInfoData idata = createColumnInfoData(s->type, s->bytes, s->colId);
288,329✔
2996
          STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
288,329!
2997
          break;
288,329✔
2998
        }
2999
      }
3000
      if (j == sSchemaWrapper->nCols) {
288,329!
3001
        SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, CHAR_BYTES, *id);
×
3002
        STREAM_CHECK_RET_GOTO(blockDataAppendColInfo(pBlock, &idata));
×
3003
      }
3004
    }
3005
    STREAM_CHECK_RET_GOTO(blockDataEnsureCapacity(pBlock, 1));
413,235!
3006
    pBlock->info.rows = 1;
413,235✔
3007
    
3008
    for (size_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++){
1,114,799✔
3009
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
701,564✔
3010
      STREAM_CHECK_NULL_GOTO(pDst, terrno);
701,564!
3011

3012
      if (pDst->info.colId == -1) {
701,564✔
3013
        STREAM_CHECK_RET_GOTO(varColSetVarData(pDst, 0, metaReader.me.name, strlen(metaReader.me.name), false));
413,235!
3014
        continue;
413,235✔
3015
      }
3016
      if (pDst->info.type == TSDB_DATA_TYPE_NULL) {
288,329!
3017
        STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, NULL, true));
×
3018
        continue;
×
3019
      }
3020

3021
      STagVal val = {0};
288,329✔
3022
      val.cid = pDst->info.colId;
288,329✔
3023
      const char* p = api.metaFn.extractTagVal(metaReader.me.ctbEntry.pTags, pDst->info.type, &val);
288,329✔
3024

3025
      char* data = NULL;
288,329✔
3026
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
288,329!
3027
        data = tTagValToData((const STagVal*)p, false);
288,329✔
3028
      } else {
3029
        data = (char*)p;
×
3030
      }
3031

3032
      STREAM_CHECK_RET_GOTO(colDataSetVal(pDst, 0, data,
288,329!
3033
                            (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))));
3034

3035
      if ((pDst->info.type != TSDB_DATA_TYPE_JSON) && (p != NULL) && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
288,329!
3036
          (data != NULL)) {
3037
        taosMemoryFree(data);
40,020!
3038
      }
3039
    }
3040
  } else {
3041
    stError("vgId:%d %s, invalid table type:%d", TD_VID(pVnode), __func__, metaReader.me.type);
×
3042
    code = TSDB_CODE_INVALID_PARA;
×
3043
    goto end;
×
3044
  }
3045
  
3046
  stsDebug("vgId:%d %s get result rows:%" PRId64, TD_VID(pVnode), __func__, pBlock->info.rows);
422,622✔
3047
  printDataBlock(pBlock, __func__, "", streamId);
422,622✔
3048
  STREAM_CHECK_RET_GOTO(buildRsp(pBlock, &buf, &size));
422,622!
3049

3050
end:
422,622✔
3051
  if(size == 0){
422,622!
3052
    code = TSDB_CODE_STREAM_NO_DATA;
×
3053
  }
3054
  api.metaReaderFn.clearReader(&metaReaderStable);
422,622✔
3055
  api.metaReaderFn.clearReader(&metaReader);
422,622✔
3056
  STREAM_PRINT_LOG_END(code, lino);
422,622!
3057
  SRpcMsg rsp = {
422,622✔
3058
      .msgType = TDMT_STREAM_TRIGGER_PULL_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
3059
  tmsgSendRsp(&rsp);
422,622✔
3060
  blockDataDestroy(pBlock);
422,622✔
3061
  return code;
422,622✔
3062
}
3063

3064
static int32_t vnodeProcessStreamFetchMsg(SVnode* pVnode, SRpcMsg* pMsg) {
17,301,688✔
3065
  int32_t            code = 0;
17,301,688✔
3066
  int32_t            lino = 0;
17,301,688✔
3067
  void*              buf = NULL;
17,301,688✔
3068
  size_t             size = 0;
17,301,688✔
3069
  void*              taskAddr = NULL;
17,301,688✔
3070
  SArray*            pResList = NULL;
17,301,688✔
3071

3072
  SResFetchReq req = {0};
17,301,688✔
3073
  STREAM_CHECK_CONDITION_GOTO(tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0,
17,301,688!
3074
                              TSDB_CODE_QRY_INVALID_INPUT);
3075
  SArray* calcInfoList = (SArray*)qStreamGetReaderInfo(req.queryId, req.taskId, &taskAddr);
17,298,946✔
3076
  STREAM_CHECK_NULL_GOTO(calcInfoList, terrno);
17,300,333!
3077

3078
  STREAM_CHECK_CONDITION_GOTO(req.execId < 0, TSDB_CODE_INVALID_PARA);
17,301,688!
3079
  SStreamTriggerReaderCalcInfo* sStreamReaderCalcInfo = taosArrayGetP(calcInfoList, req.execId);
17,301,688✔
3080
  STREAM_CHECK_NULL_GOTO(sStreamReaderCalcInfo, terrno);
17,301,688!
3081
  void* pTask = sStreamReaderCalcInfo->pTask;
17,301,688✔
3082
  ST_TASK_DLOG("vgId:%d %s start, execId:%d, reset:%d, pTaskInfo:%p, scan type:%d", TD_VID(pVnode), __func__, req.execId, req.reset,
17,301,688!
3083
               sStreamReaderCalcInfo->pTaskInfo, nodeType(sStreamReaderCalcInfo->calcAst->pNode));
3084

3085
  if (req.reset || sStreamReaderCalcInfo->pTaskInfo == NULL) {
17,301,688!
3086
  // if (req.reset) {
3087
    qDestroyTask(sStreamReaderCalcInfo->pTaskInfo);
17,169,688✔
3088
    int64_t uid = 0;
17,164,227✔
3089
    if (req.dynTbname) {
17,164,227!
3090
      SArray* vals = req.pStRtFuncInfo->pStreamPartColVals;
187,786✔
3091
      for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
187,786!
3092
        SStreamGroupValue* pValue = taosArrayGet(vals, i);
187,786✔
3093
        if (pValue != NULL && pValue->isTbname) {
187,786!
3094
          uid = pValue->uid;
187,786✔
3095
          break;
187,786✔
3096
        }
3097
      }
3098
    }
3099
    
3100
    SReadHandle handle = {0};
17,164,227✔
3101
    handle.vnode = pVnode;
17,166,950✔
3102
    handle.uid = uid;
17,166,950✔
3103

3104
    initStorageAPI(&handle.api);
17,166,950✔
3105
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode) ||
17,157,519✔
3106
      QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == nodeType(sStreamReaderCalcInfo->calcAst->pNode)){
16,367,269✔
3107
      STimeRangeNode* node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pTimeRange;
16,547,824✔
3108
      if (node != NULL) {
16,545,128✔
3109
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, false));
496,090!
3110
      } else {
3111
        ST_TASK_DLOG("vgId:%d %s no scan time range node", TD_VID(pVnode), __func__);
16,049,038✔
3112
      }
3113

3114
      node = (STimeRangeNode*)((STableScanPhysiNode*)(sStreamReaderCalcInfo->calcAst->pNode))->pExtTimeRange;
16,547,870✔
3115
      if (node != NULL) {
16,547,800!
3116
        STREAM_CHECK_RET_GOTO(processCalaTimeRange(sStreamReaderCalcInfo, &req, node, &handle, true));
×
3117
      } else {
3118
        ST_TASK_DLOG("vgId:%d %s no interp time range node", TD_VID(pVnode), __func__);
16,547,800✔
3119
      }      
3120
    }
3121

3122
    TSWAP(sStreamReaderCalcInfo->rtInfo.funcInfo, *req.pStRtFuncInfo);
17,166,940!
3123
    handle.streamRtInfo = &sStreamReaderCalcInfo->rtInfo;
17,169,688✔
3124

3125
    // if (sStreamReaderCalcInfo->pTaskInfo == NULL) {
3126
    STREAM_CHECK_RET_GOTO(qCreateStreamExecTaskInfo(&sStreamReaderCalcInfo->pTaskInfo,
17,168,333✔
3127
                                                    sStreamReaderCalcInfo->calcScanPlan, &handle, NULL, TD_VID(pVnode),
3128
                                                    req.taskId));
3129
    // } else {
3130
    // STREAM_CHECK_RET_GOTO(qResetTableScan(sStreamReaderCalcInfo->pTaskInfo, handle.winRange));
3131
    // }
3132

3133
    STREAM_CHECK_RET_GOTO(qSetTaskId(sStreamReaderCalcInfo->pTaskInfo, req.taskId, req.queryId));
17,156,236!
3134
  }
3135

3136
  if (req.pOpParam != NULL) {
17,289,577✔
3137
    qUpdateOperatorParam(sStreamReaderCalcInfo->pTaskInfo, req.pOpParam);
266,786✔
3138
  }
3139
  
3140
  pResList = taosArrayInit(4, POINTER_BYTES);
17,289,577✔
3141
  STREAM_CHECK_NULL_GOTO(pResList, terrno);
17,289,577!
3142
  uint64_t ts = 0;
17,289,577✔
3143
  bool     hasNext = false;
17,289,577✔
3144
  STREAM_CHECK_RET_GOTO(qExecTaskOpt(sStreamReaderCalcInfo->pTaskInfo, pResList, &ts, &hasNext, NULL, req.pOpParam != NULL));
17,289,577✔
3145

3146
  for(size_t i = 0; i < taosArrayGetSize(pResList); i++){
41,100,237✔
3147
    SSDataBlock* pBlock = taosArrayGetP(pResList, i);
23,826,822✔
3148
    if (pBlock == NULL) continue;
23,826,822!
3149
    printDataBlock(pBlock, __func__, "fetch", ((SStreamTask*)pTask)->streamId);
23,826,822✔
3150
/*    
3151
    if (sStreamReaderCalcInfo->rtInfo.funcInfo.withExternalWindow) {
3152
      STREAM_CHECK_RET_GOTO(qStreamFilter(pBlock, sStreamReaderCalcInfo->pFilterInfo, NULL));
3153
      printDataBlock(pBlock, __func__, "fetch filter");
3154
    }
3155
*/    
3156
  }
3157

3158
  ST_TASK_DLOG("vgId:%d %s start to build rsp", TD_VID(pVnode), __func__);
17,273,415✔
3159
  STREAM_CHECK_RET_GOTO(streamBuildFetchRsp(pResList, hasNext, &buf, &size, pVnode->config.tsdbCfg.precision));
17,273,415!
3160
  ST_TASK_DLOG("vgId:%d %s end:", TD_VID(pVnode), __func__);
17,273,415✔
3161

3162
end:
17,298,614✔
3163
  taosArrayDestroy(pResList);
17,299,006✔
3164
  streamReleaseTask(taskAddr);
17,300,347✔
3165

3166
  STREAM_PRINT_LOG_END(code, lino);
17,300,347!
3167
  SRpcMsg rsp = {.msgType = TDMT_STREAM_FETCH_RSP, .info = pMsg->info, .pCont = buf, .contLen = size, .code = code};
17,300,347✔
3168
  tmsgSendRsp(&rsp);
17,301,688✔
3169
  tDestroySResFetchReq(&req);
17,297,577✔
3170
  return code;
17,297,610✔
3171
}
3172

3173
int32_t vnodeProcessStreamReaderMsg(SVnode* pVnode, SRpcMsg* pMsg) {
90,917,710✔
3174
  int32_t                   code = 0;
90,917,710✔
3175
  int32_t                   lino = 0;
90,917,710✔
3176
  SSTriggerPullRequestUnion req = {0};
90,917,710✔
3177
  void*                     taskAddr = NULL;
90,925,683✔
3178

3179
  vDebug("vgId:%d, msg:%p in stream reader queue is processing", pVnode->config.vgId, pMsg);
90,892,644✔
3180
  if (!syncIsReadyForRead(pVnode->sync)) {
90,904,375✔
3181
    vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
148,439✔
3182
    return 0;
147,136✔
3183
  }
3184

3185
  if (pMsg->msgType == TDMT_STREAM_FETCH) {
90,769,184✔
3186
    return vnodeProcessStreamFetchMsg(pVnode, pMsg);
17,301,688✔
3187
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_PULL) {
73,464,805!
3188
    void*   pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
73,474,214✔
3189
    int32_t len = pMsg->contLen - sizeof(SMsgHead);
73,475,580✔
3190
    STREAM_CHECK_RET_GOTO(tDeserializeSTriggerPullRequest(pReq, len, &req));
73,471,536!
3191
    stDebug("vgId:%d %s start, type:%d, streamId:%" PRIx64 ", readerTaskId:%" PRIx64 ", sessionId:%" PRIx64,
73,454,042✔
3192
            TD_VID(pVnode), __func__, req.base.type, req.base.streamId, req.base.readerTaskId, req.base.sessionId);
3193
    SStreamTriggerReaderInfo* sStreamReaderInfo = (STRIGGER_PULL_OTABLE_INFO == req.base.type) ? NULL : qStreamGetReaderInfo(req.base.streamId, req.base.readerTaskId, &taskAddr);
73,456,739✔
3194
    if (sStreamReaderInfo != NULL) {  
73,452,587✔
3195
      (void)taosThreadMutexLock(&sStreamReaderInfo->mutex);
73,266,637✔
3196
      if (sStreamReaderInfo->tableList == NULL) {
73,280,142✔
3197
        STREAM_CHECK_RET_GOTO(generateTablistForStreamReader(pVnode, sStreamReaderInfo, false));  
630,649!
3198
        STREAM_CHECK_RET_GOTO(generateTablistForStreamReader(pVnode, sStreamReaderInfo, true));
630,649!
3199
        STREAM_CHECK_RET_GOTO(filterInitFromNode(sStreamReaderInfo->pConditions, &sStreamReaderInfo->pFilterInfo, 0, NULL));
630,649!
3200
      }
3201
      (void)taosThreadMutexUnlock(&sStreamReaderInfo->mutex);
73,286,805✔
3202
      sStreamReaderInfo->pVnode = pVnode;
73,274,752✔
3203
    }
3204
    switch (req.base.type) {
73,456,679!
3205
      case STRIGGER_PULL_SET_TABLE:
136,722✔
3206
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamSetTableReq(pVnode, pMsg, &req, sStreamReaderInfo));
136,722!
3207
        break;
136,722✔
3208
      case STRIGGER_PULL_LAST_TS:
612,773✔
3209
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamLastTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
612,773!
3210
        break;
612,773✔
3211
      case STRIGGER_PULL_FIRST_TS:
513,640✔
3212
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamFirstTsReq(pVnode, pMsg, &req, sStreamReaderInfo));
513,640✔
3213
        break;
501,982✔
3214
      case STRIGGER_PULL_TSDB_META:
1,033,991✔
3215
      case STRIGGER_PULL_TSDB_META_NEXT:
3216
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbMetaReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,033,991✔
3217
        break;
1,022,334✔
3218
      case STRIGGER_PULL_TSDB_TS_DATA:
52,733✔
3219
        if (sStreamReaderInfo->isVtableStream) {
52,733!
3220
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqVTable(pVnode, pMsg, &req, sStreamReaderInfo));
×
3221
        } else {
3222
          STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTsDataReqNonVTable(pVnode, pMsg, &req, sStreamReaderInfo));
52,733!
3223
        }
3224
        break;
52,733✔
3225
      case STRIGGER_PULL_TSDB_TRIGGER_DATA:
394,519✔
3226
      case STRIGGER_PULL_TSDB_TRIGGER_DATA_NEXT:
3227
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbTriggerDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
394,519!
3228
        break;
394,519✔
3229
      case STRIGGER_PULL_TSDB_CALC_DATA:
37,758,442✔
3230
      case STRIGGER_PULL_TSDB_CALC_DATA_NEXT:
3231
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbCalcDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
37,758,442✔
3232
        break;
37,599,746✔
3233
      case STRIGGER_PULL_TSDB_DATA:
220,234✔
3234
      case STRIGGER_PULL_TSDB_DATA_NEXT:
3235
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamTsdbVirtalDataReq(pVnode, pMsg, &req, sStreamReaderInfo));
220,234!
3236
        break;
220,234✔
3237
      case STRIGGER_PULL_GROUP_COL_VALUE:
914,918✔
3238
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamGroupColValueReq(pVnode, pMsg, &req, sStreamReaderInfo));
914,918!
3239
        break;
914,918✔
3240
      case STRIGGER_PULL_VTABLE_INFO:
491,034✔
3241
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableInfoReq(pVnode, pMsg, &req, sStreamReaderInfo));
491,034!
3242
        break;
492,375✔
3243
      case STRIGGER_PULL_VTABLE_PSEUDO_COL:
422,622✔
3244
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamVTableTagInfoReq(pVnode, pMsg, &req));
422,622!
3245
        break;
422,622✔
3246
      case STRIGGER_PULL_OTABLE_INFO:
136,722✔
3247
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamOTableInfoReq(pVnode, pMsg, &req));
136,722!
3248
        break;
136,722✔
3249
      case STRIGGER_PULL_WAL_META_NEW:
15,275,227✔
3250
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
15,275,227!
3251
        break;
15,286,072✔
3252
      case STRIGGER_PULL_WAL_DATA_NEW:
4,620,933✔
3253
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
4,620,933!
3254
        break;
4,619,592✔
3255
      case STRIGGER_PULL_WAL_META_DATA_NEW:
9,815,442✔
3256
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalMetaDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
9,815,442!
3257
        break;
9,814,072✔
3258
      case STRIGGER_PULL_WAL_CALC_DATA_NEW:
1,056,727✔
3259
        STREAM_CHECK_RET_GOTO(vnodeProcessStreamWalCalcDataNewReq(pVnode, pMsg, &req, sStreamReaderInfo));
1,056,727!
3260
        break;
1,056,727✔
3261
      default:
×
3262
        vError("unknown inner msg type:%d in stream reader queue", req.base.type);
×
3263
        STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
3264
        break;
×
3265
    }
3266
  } else {
3267
    vError("unknown msg type:%d in stream reader queue", pMsg->msgType);
×
3268
    STREAM_CHECK_RET_GOTO(TSDB_CODE_APP_ERROR);
×
3269
  }
3270
end:
73,440,126✔
3271

3272
  streamReleaseTask(taskAddr);
73,459,451✔
3273

3274
  tDestroySTriggerPullRequest(&req);
73,464,940✔
3275
  STREAM_PRINT_LOG_END(code, lino);
73,424,997!
3276
  return code;
73,435,235✔
3277
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc