• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3858

17 Apr 2025 01:40PM UTC coverage: 62.968% (+0.5%) from 62.513%
#3858

push

travis-ci

web-flow
docs(opc): add perssit data support (#30783)

156194 of 316378 branches covered (49.37%)

Branch coverage included in aggregate %.

242021 of 316027 relevant lines covered (76.58%)

19473613.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.85
/source/libs/executor/src/streamexecutorInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamexecutorInt.h"
17

18
#include "executorInt.h"
19
#include "operator.h"
20
#include "cmdnodes.h"
21
#include "tdatablock.h"
22

23
#define UPDATE_OPERATOR_INFO       BIT_FLAG_MASK(0)
24
#define FILL_HISTORY_OPERATOR      BIT_FLAG_MASK(1)
25
#define RECALCULATE_OPERATOR       BIT_FLAG_MASK(2)
26
#define SEMI_OPERATOR              BIT_FLAG_MASK(3)
27
#define FINAL_OPERATOR             BIT_FLAG_MASK(4)
28
#define SINGLE_OPERATOR            BIT_FLAG_MASK(5)
29

30
#define NOTIFY_EVENT_NAME_CACHE_LIMIT_MB 16
31

32
typedef struct SStreamNotifyEvent {
33
  uint64_t    gid;
34
  int64_t     eventType;
35
  STimeWindow win;
36
  cJSON*      pJson;
37
} SStreamNotifyEvent;
38

39
#define NOTIFY_EVENT_KEY_SIZE                                                                            \
40
  ((sizeof(((struct SStreamNotifyEvent*)0)->gid) + sizeof(((struct SStreamNotifyEvent*)0)->eventType)) + \
41
   sizeof(((struct SStreamNotifyEvent*)0)->win.skey))
42

43
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
1,432,518✔
44
  if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
1,432,518✔
45
    BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, UPDATE_OPERATOR_INFO);
1,426,272✔
46
  }
47
}
1,432,518✔
48

49
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo) { return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, UPDATE_OPERATOR_INFO); }
6,123✔
50

51
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) { BIT_FLAG_UNSET_MASK(pBasicInfo->operatorFlag, UPDATE_OPERATOR_INFO); }
100✔
52

53
static void destroyStreamWindowEvent(void* ptr) {
9✔
54
  SStreamNotifyEvent* pEvent = (SStreamNotifyEvent*)ptr;
9✔
55
  if (pEvent) {
9!
56
    if (pEvent->pJson) {
9!
57
      cJSON_Delete(pEvent->pJson);
×
58
    }
59
    *pEvent = (SStreamNotifyEvent){0};
9✔
60
  }
61
}
9✔
62

63
static void destroyStreamNotifyEventSupp(SStreamNotifyEventSupp* sup) {
6,564✔
64
  if (sup == NULL) return;
6,564!
65
  taosHashCleanup(sup->pWindowEventHashMap);
6,564✔
66
  taosHashCleanup(sup->pTableNameHashMap);
6,566✔
67
  blockDataDestroy(sup->pEventBlock);
6,566✔
68
  taosArrayDestroy(sup->pSessionKeys);
6,566✔
69
  *sup = (SStreamNotifyEventSupp){0};
6,565✔
70
}
71

72
static int32_t initStreamNotifyEventSupp(SStreamNotifyEventSupp* sup, const char* windowType, int32_t resCapacity) {
4,914✔
73
  int32_t         code = TSDB_CODE_SUCCESS;
4,914✔
74
  int32_t         lino = 0;
4,914✔
75
  SSDataBlock*    pBlock = NULL;
4,914✔
76
  SColumnInfoData infoData = {0};
4,914✔
77

78
  if (sup == NULL || sup->pWindowEventHashMap != NULL) {
4,914!
79
    goto _end;
×
80
  }
81

82
  code = createDataBlock(&pBlock);
4,918✔
83
  QUERY_CHECK_CODE(code, lino, _end);
4,917!
84

85
  pBlock->info.type = STREAM_NOTIFY_EVENT;
4,917✔
86
  pBlock->info.watermark = INT64_MIN;
4,917✔
87

88
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
4,917✔
89
  infoData.info.bytes = tDataTypes[infoData.info.type].bytes;
4,917✔
90
  code = blockDataAppendColInfo(pBlock, &infoData);
4,917✔
91
  QUERY_CHECK_CODE(code, lino, _end);
4,915!
92

93
  sup->pWindowEventHashMap = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
4,915✔
94
  QUERY_CHECK_NULL(sup->pWindowEventHashMap, code, lino, _end, terrno);
4,918!
95
  taosHashSetFreeFp(sup->pWindowEventHashMap, destroyStreamWindowEvent);
4,918✔
96
  sup->pTableNameHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
4,918✔
97
  QUERY_CHECK_NULL(sup->pTableNameHashMap, code, lino, _end, terrno);
4,917!
98
  sup->pEventBlock = pBlock;
4,917✔
99
  pBlock = NULL;
4,917✔
100
  code = blockDataEnsureCapacity(sup->pEventBlock, resCapacity);
4,917✔
101
  QUERY_CHECK_CODE(code, lino, _end);
4,918!
102
  sup->windowType = windowType;
4,918✔
103
  sup->pSessionKeys = taosArrayInit(resCapacity, sizeof(SSessionKey));
4,918✔
104
  QUERY_CHECK_NULL(sup->pSessionKeys, code, lino, _end, terrno);
4,918!
105

106
_end:
4,918✔
107
  if (code != TSDB_CODE_SUCCESS) {
4,914!
108
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
109
    if (sup) {
×
110
      destroyStreamNotifyEventSupp(sup);
×
111
    }
112
  }
113
  if (pBlock != NULL) {
4,914!
114
    blockDataDestroy(pBlock);
×
115
  }
116
  return code;
4,918✔
117
}
118

119
int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo, const struct SOperatorInfo* pOperator) {
6,107✔
120
  int32_t code = TSDB_CODE_SUCCESS;
6,107✔
121
  int32_t lino = 0;
6,107✔
122

123
  pBasicInfo->primaryPkIndex = -1;
6,107✔
124
  pBasicInfo->operatorFlag = 0;
6,107✔
125
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pBasicInfo->pDelRes);
6,107✔
126
  QUERY_CHECK_CODE(code, lino, _end);
6,114!
127

128
  pBasicInfo->pUpdated = taosArrayInit(1024, sizeof(SResultWindowInfo));
6,114✔
129
  QUERY_CHECK_NULL(pBasicInfo->pUpdated, code, lino, _end, terrno);
6,114!
130

131
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
6,114✔
132
  pBasicInfo->pSeDeleted = tSimpleHashInit(32, hashFn);
6,114✔
133
  
134
  const char* windowType = NULL;
6,114✔
135
  if (IS_NORMAL_INTERVAL_OP(pOperator)) {
6,114✔
136
    windowType = "Time";
3,250✔
137
  } else if (IS_NORMAL_SESSION_OP(pOperator)) {
2,864✔
138
    windowType = "Session";
781✔
139
  } else if (IS_NORMAL_STATE_OP(pOperator)) {
2,083✔
140
    windowType = "State";
353✔
141
  } else if (IS_NORMAL_EVENT_OP(pOperator)) {
1,730✔
142
    windowType = "Event";
217✔
143
  } else if (IS_NORMAL_COUNT_OP(pOperator)) {
1,513✔
144
    windowType = "Count";
316✔
145
  } else {
146
    return TSDB_CODE_SUCCESS;
1,197✔
147
  }
148
  code = initStreamNotifyEventSupp(&pBasicInfo->notifyEventSup, windowType, pOperator->resultInfo.capacity);
4,917✔
149
  QUERY_CHECK_CODE(code, lino, _end);
4,918!
150

151
_end:
4,918✔
152
  if (code != TSDB_CODE_SUCCESS) {
4,918!
153
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
154
  }
155
  return code;
4,918✔
156
}
157

158
void setFillHistoryOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
10✔
159
  BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, FILL_HISTORY_OPERATOR);
10✔
160
}
10✔
161

162
bool isHistoryOperator(SSteamOpBasicInfo* pBasicInfo) {
1,549✔
163
  return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, FILL_HISTORY_OPERATOR);
1,549✔
164
}
165

166
bool needBuildAllResult(SSteamOpBasicInfo* pBasicInfo) {
×
167
  return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, FILL_HISTORY_OPERATOR) || BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, SEMI_OPERATOR);
×
168
}
169

170
void setSemiOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
36✔
171
  BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, SEMI_OPERATOR);
36✔
172
}
36✔
173

174
bool isSemiOperator(SSteamOpBasicInfo* pBasicInfo) {
938✔
175
  return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, SEMI_OPERATOR);
938✔
176
}
177

178
void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
6,565✔
179
  blockDataDestroy(pBasicInfo->pCheckpointRes);
6,565✔
180
  pBasicInfo->pCheckpointRes = NULL;
6,566✔
181

182
  tSimpleHashCleanup(pBasicInfo->pSeDeleted);
6,566✔
183
  pBasicInfo->pSeDeleted = NULL;
6,566✔
184

185
  blockDataDestroy(pBasicInfo->pDelRes);
6,566✔
186
  pBasicInfo->pDelRes = NULL;
6,566✔
187
  taosArrayDestroyP(pBasicInfo->pUpdated, destroyFlusedPos);
6,566✔
188
  pBasicInfo->pUpdated = NULL;
6,566✔
189

190
  pBasicInfo->pTsDataState = NULL;
6,566✔
191

192
  destroyStreamNotifyEventSupp(&pBasicInfo->notifyEventSup);
6,566✔
193
}
6,565✔
194

195
static int32_t encodeStreamNotifyEventSupp(void** buf, const SStreamNotifyEventSupp* sup) {
88✔
196
  int32_t tlen = 0;
88✔
197
  void*   pIter = NULL;
88✔
198
  char*   str = NULL;
88✔
199

200
  if (sup == NULL) {
88!
201
    return tlen;
×
202
  }
203

204
  tlen += taosEncodeFixedI32(buf, taosHashGetSize(sup->pWindowEventHashMap));
88✔
205
  pIter = taosHashIterate(sup->pWindowEventHashMap, NULL);
88✔
206
  while (pIter) {
88!
207
    const SStreamNotifyEvent* pEvent = (const SStreamNotifyEvent*)pIter;
×
208
    str = cJSON_PrintUnformatted(pEvent->pJson);
×
209

210
    tlen += taosEncodeFixedU64(buf, pEvent->gid);
×
211
    tlen += taosEncodeFixedI64(buf, pEvent->eventType);
×
212
    tlen += taosEncodeFixedI64(buf, pEvent->win.skey);
×
213
    tlen += taosEncodeFixedI64(buf, pEvent->win.ekey);
×
214
    tlen += taosEncodeString(buf, str);
×
215
    cJSON_free(str);
×
216
    pIter = taosHashIterate(sup->pWindowEventHashMap, pIter);
×
217
  }
218
  return tlen;
88✔
219
}
220

221
static int32_t decodeStreamNotifyEventSupp(void** buf, SStreamNotifyEventSupp* sup) {
9✔
222
  int32_t            code = TSDB_CODE_SUCCESS;
9✔
223
  int32_t            lino = 0;
9✔
224
  void*              p = *buf;
9✔
225
  int32_t            size = 0;
9✔
226
  uint64_t           len = 0;
9✔
227
  SStreamNotifyEvent item = {0};
9!
228

229
  p = taosDecodeFixedI32(p, &size);
9✔
230
  for (int32_t i = 0; i < size; i++) {
9!
231
    p = taosDecodeFixedU64(p, &item.gid);
×
232
    p = taosDecodeFixedI64(p, &item.eventType);
×
233
    p = taosDecodeFixedI64(p, &item.win.skey);
×
234
    p = taosDecodeFixedI64(p, &item.win.ekey);
×
235
    p = taosDecodeVariantU64(p, &len);
×
236
    item.pJson = cJSON_Parse(p);
×
237
    if (item.pJson == NULL) {
×
238
      qWarn("failed to parse the json content since %s", cJSON_GetErrorPtr());
×
239
    }
240
    QUERY_CHECK_NULL(item.pJson, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
241
    p = POINTER_SHIFT(p, len);
×
242
    code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
×
243
    QUERY_CHECK_CODE(code, lino, _end);
×
244
    item.pJson = NULL;
×
245
  }
246
  *buf = p;
9✔
247
_end:
9✔
248
  if (code != TSDB_CODE_SUCCESS) {
9!
249
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
250
  }
251
  destroyStreamWindowEvent(&item);
9✔
252
  return code;
9✔
253
}
254

255
int32_t encodeStreamBasicInfo(void** buf, const SSteamOpBasicInfo* pBasicInfo) {
88✔
256
  return encodeStreamNotifyEventSupp(buf, &pBasicInfo->notifyEventSup);
88✔
257
}
258

259
int32_t decodeStreamBasicInfo(void** buf, SSteamOpBasicInfo* pBasicInfo) {
9✔
260
  return decodeStreamNotifyEventSupp(buf, &pBasicInfo->notifyEventSup);
9✔
261
}
262

263
static void streamNotifyGetEventWindowId(const SSessionKey* pSessionKey, char* buf) {
×
264
  uint64_t hash = 0;
×
265
  uint64_t ar[2];
266

267
  ar[0] = pSessionKey->groupId;
×
268
  ar[1] = pSessionKey->win.skey;
×
269
  hash = MurmurHash3_64((char*)ar, sizeof(ar));
×
270
  buf = u64toaFastLut(hash, buf);
×
271
}
×
272

273
#define JSON_CHECK_ADD_ITEM(obj, str, item) \
274
  QUERY_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
275

276
static int32_t jsonAddColumnField(const char* colName, int16_t type, bool isNull, const char* pData, cJSON* obj) {
×
277
  int32_t code = TSDB_CODE_SUCCESS;
×
278
  int32_t lino = 0;
×
279
  char*   temp = NULL;
×
280

281
  QUERY_CHECK_NULL(colName, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
282
  QUERY_CHECK_CONDITION(isNull || (pData != NULL), code, lino, _end, TSDB_CODE_INVALID_PARA);
×
283
  QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
284

285
  if (isNull) {
×
286
    JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNull());
×
287
    goto _end;
×
288
  }
289

290
  switch (type) {
×
291
    case TSDB_DATA_TYPE_BOOL: {
×
292
      bool val = *(const bool*)pData;
×
293
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateBool(val));
×
294
      break;
×
295
    }
296

297
    case TSDB_DATA_TYPE_TINYINT: {
×
298
      int8_t val = *(const int8_t*)pData;
×
299
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
300
      break;
×
301
    }
302

303
    case TSDB_DATA_TYPE_SMALLINT: {
×
304
      int16_t val = *(const int16_t*)pData;
×
305
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
306
      break;
×
307
    }
308

309
    case TSDB_DATA_TYPE_INT: {
×
310
      int32_t val = *(const int32_t*)pData;
×
311
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
312
      break;
×
313
    }
314

315
    case TSDB_DATA_TYPE_BIGINT:
×
316
    case TSDB_DATA_TYPE_TIMESTAMP: {
317
      int64_t val = *(const int64_t*)pData;
×
318
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
319
      break;
×
320
    }
321

322
    case TSDB_DATA_TYPE_FLOAT: {
×
323
      float val = *(const float*)pData;
×
324
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
325
      break;
×
326
    }
327

328
    case TSDB_DATA_TYPE_DOUBLE: {
×
329
      double val = *(const double*)pData;
×
330
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
331
      break;
×
332
    }
333

334
    case TSDB_DATA_TYPE_VARCHAR:
×
335
    case TSDB_DATA_TYPE_NCHAR: {
336
      // cJSON requires null-terminated strings, but this data is not null-terminated,
337
      // so we need to manually copy the string and add null termination.
338
      const char* src = varDataVal(pData);
×
339
      int32_t     len = varDataLen(pData);
×
340
      temp = cJSON_malloc(len + 1);
×
341
      QUERY_CHECK_NULL(temp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
342
      memcpy(temp, src, len);
×
343
      temp[len] = '\0';
×
344

345
      cJSON* item = cJSON_CreateStringReference(temp);
×
346
      JSON_CHECK_ADD_ITEM(obj, colName, item);
×
347

348
      // let the cjson object to free memory later
349
      item->type &= ~cJSON_IsReference;
×
350
      temp = NULL;
×
351
      break;
×
352
    }
353

354
    case TSDB_DATA_TYPE_UTINYINT: {
×
355
      uint8_t val = *(const uint8_t*)pData;
×
356
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
357
      break;
×
358
    }
359

360
    case TSDB_DATA_TYPE_USMALLINT: {
×
361
      uint16_t val = *(const uint16_t*)pData;
×
362
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
363
      break;
×
364
    }
365

366
    case TSDB_DATA_TYPE_UINT: {
×
367
      uint32_t val = *(const uint32_t*)pData;
×
368
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
369
      break;
×
370
    }
371

372
    case TSDB_DATA_TYPE_UBIGINT: {
×
373
      uint64_t val = *(const uint64_t*)pData;
×
374
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
×
375
      break;
×
376
    }
377

378
    default: {
×
379
      JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateStringReference("<Unable to display this data type>"));
×
380
      break;
×
381
    }
382
  }
383

384
_end:
×
385
  if (code != TSDB_CODE_SUCCESS) {
×
386
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
387
  }
388
  if (temp) {
×
389
    cJSON_free(temp);
×
390
  }
391
  return code;
×
392
}
393

394
static cJSON* createBasicAggNotifyEvent(const char* windowType, EStreamNotifyEventType eventType,
×
395
                                        const SSessionKey* pSessionKey) {
396
  int32_t     code = TSDB_CODE_SUCCESS;
×
397
  int32_t     lino = 0;
×
398
  const char* eventTypeStr = NULL;
×
399
  cJSON*      event = NULL;
×
400
  char        windowId[32];
401
  char        groupId[32];
402

403
  QUERY_CHECK_NULL(windowType, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
404
  QUERY_CHECK_NULL(pSessionKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
405

406
  if (eventType == SNOTIFY_EVENT_WINDOW_OPEN) {
×
407
    eventTypeStr = "WINDOW_OPEN";
×
408
  } else if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
×
409
    eventTypeStr = "WINDOW_CLOSE";
×
410
  } else if (eventType == SNOTIFY_EVENT_WINDOW_INVALIDATION) {
×
411
    eventTypeStr = "WINDOW_INVALIDATION";
×
412
  } else {
413
    QUERY_CHECK_CONDITION(false, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
414
  }
415

416
  qDebug("add stream notify event from %s Window, type: %s, start: %" PRId64 ", end: %" PRId64, windowType,
×
417
         eventTypeStr, pSessionKey->win.skey, pSessionKey->win.ekey);
418

419
  event = cJSON_CreateObject();
×
420
  QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
421

422
  // add basic info
423
  streamNotifyGetEventWindowId(pSessionKey, windowId);
×
424
  JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference(eventTypeStr));
×
425
  JSON_CHECK_ADD_ITEM(event, "eventTime", cJSON_CreateNumber(taosGetTimestampMs()));
×
426
  JSON_CHECK_ADD_ITEM(event, "windowId", cJSON_CreateString(windowId));
×
427
  JSON_CHECK_ADD_ITEM(event, "windowType", cJSON_CreateStringReference(windowType));
×
428
  char* p = u64toaFastLut(pSessionKey->groupId, groupId);
×
429
  JSON_CHECK_ADD_ITEM(event, "groupId", cJSON_CreateString(groupId));
×
430
  JSON_CHECK_ADD_ITEM(event, "windowStart", cJSON_CreateNumber(pSessionKey->win.skey));
×
431
  if (eventType != SNOTIFY_EVENT_WINDOW_OPEN) {
×
432
    if (strcmp(windowType, "Time") == 0) {
×
433
      JSON_CHECK_ADD_ITEM(event, "windowEnd", cJSON_CreateNumber(pSessionKey->win.ekey + 1));
×
434
    } else {
435
      JSON_CHECK_ADD_ITEM(event, "windowEnd", cJSON_CreateNumber(pSessionKey->win.ekey));
×
436
    }
437
  }
438

439
_end:
×
440
  if (code != TSDB_CODE_SUCCESS) {
×
441
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
442
    terrno = code;
×
443
    cJSON_Delete(event);
×
444
    event = NULL;
×
445
  }
446
  return event;
×
447
}
448

449
int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
×
450
                               const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
451
                               SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
452
  int32_t            code = TSDB_CODE_SUCCESS;
×
453
  int32_t            lino = 0;
×
454
  cJSON*             event = NULL;
×
455
  cJSON*             fields = NULL;
×
456
  cJSON*             cond = NULL;
×
457
  const SNode*       pNode = NULL;
×
458
  int32_t            origSize = 0;
×
459
  int64_t            startTime = 0;
×
460
  int64_t            endTime = 0;
×
461
  SStreamNotifyEvent item = {0};
×
462

463
  QUERY_CHECK_NULL(pInputBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
464
  QUERY_CHECK_NULL(pInputBlock->pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
465
  QUERY_CHECK_NULL(pCondCols, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
466
  QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
467
  QUERY_CHECK_NULL(sup->windowType, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
468
  QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
469

470
  startTime = taosGetMonoTimestampMs();
×
471
  event = createBasicAggNotifyEvent(sup->windowType, eventType, pSessionKey);
×
472
  QUERY_CHECK_NULL(event, code, lino, _end, terrno);
×
473

474
  // create fields object to store matched column values
475
  fields = cJSON_CreateObject();
×
476
  QUERY_CHECK_NULL(fields, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
477
  FOREACH(pNode, pCondCols) {
×
478
    const SColumnNode*     pColDef = (const SColumnNode*)pNode;
×
479
    const SColumnInfoData* pColData = taosArrayGet(pInputBlock->pDataBlock, pColDef->slotId);
×
480
    code = jsonAddColumnField(pColDef->colName, pColData->info.type, colDataIsNull_s(pColData, ri),
×
481
                              colDataGetData(pColData, ri), fields);
×
482
    QUERY_CHECK_CODE(code, lino, _end);
×
483
  }
484

485
  // add trigger condition
486
  cond = cJSON_CreateObject();
×
487
  QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
488
  JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(0));
×
489
  JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields);
×
490
  fields = NULL;
×
491
  JSON_CHECK_ADD_ITEM(event, "triggerCondition", cond);
×
492
  cond = NULL;
×
493

494
  item.gid = pSessionKey->groupId;
×
495
  item.win = pSessionKey->win;
×
496
  item.eventType = eventType;
×
497
  item.pJson = event;
×
498
  event = NULL;
×
499
  origSize = taosHashGetSize(sup->pWindowEventHashMap);
×
500
  code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
×
501
  QUERY_CHECK_CODE(code, lino, _end);
×
502
  item.pJson = NULL;
×
503

504
  endTime = taosGetMonoTimestampMs();
×
505
  pNotifyEventStat->notifyEventAddTimes++;
×
506
  pNotifyEventStat->notifyEventAddElems += taosHashGetSize(sup->pWindowEventHashMap) - origSize;
×
507
  pNotifyEventStat->notifyEventAddCostSec += (endTime - startTime) / 1000.0;
×
508
  pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
×
509

510
_end:
×
511
  if (code != TSDB_CODE_SUCCESS) {
×
512
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
513
  }
514
  destroyStreamWindowEvent(&item);
×
515
  if (cond != NULL) {
×
516
    cJSON_Delete(cond);
×
517
  }
518
  if (fields != NULL) {
×
519
    cJSON_Delete(fields);
×
520
  }
521
  if (event != NULL) {
×
522
    cJSON_Delete(event);
×
523
  }
524
  return code;
×
525
}
526

527
int32_t addStateAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
×
528
                               const SStateKeys* pCurState, const SStateKeys* pAnotherState, bool onlyUpdate,
529
                               SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
530
  int32_t            code = TSDB_CODE_SUCCESS;
×
531
  int32_t            lino = 0;
×
532
  cJSON*             event = NULL;
×
533
  int32_t            origSize = 0;
×
534
  int64_t            startTime = 0;
×
535
  int64_t            endTime = 0;
×
536
  SStreamNotifyEvent item = {0};
×
537

538
  QUERY_CHECK_NULL(pCurState, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
539
  QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
540
  QUERY_CHECK_NULL(sup->windowType, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
541
  QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
542

543
  item.gid = pSessionKey->groupId;
×
544
  item.win = pSessionKey->win;
×
545
  item.eventType = eventType;
×
546
  // Check if the notify event exists for update
547
  if (onlyUpdate && taosHashGet(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE) == NULL) {
×
548
    goto _end;
×
549
  }
550

551
  startTime = taosGetMonoTimestampMs();
×
552
  event = createBasicAggNotifyEvent(sup->windowType, eventType, pSessionKey);
×
553
  QUERY_CHECK_NULL(event, code, lino, _end, terrno);
×
554

555
  // add state value
556
  if (eventType == SNOTIFY_EVENT_WINDOW_OPEN) {
×
557
    if (pAnotherState) {
×
558
      code = jsonAddColumnField("prevState", pAnotherState->type, pAnotherState->isNull, pAnotherState->pData, event);
×
559
      QUERY_CHECK_CODE(code, lino, _end);
×
560
    } else {
561
      code = jsonAddColumnField("prevState", pCurState->type, true, NULL, event);
×
562
      QUERY_CHECK_CODE(code, lino, _end);
×
563
    }
564
  }
565
  code = jsonAddColumnField("curState", pCurState->type, pCurState->isNull, pCurState->pData, event);
×
566
  QUERY_CHECK_CODE(code, lino, _end);
×
567
  if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
×
568
    if (pAnotherState) {
×
569
      code = jsonAddColumnField("nextState", pAnotherState->type, pAnotherState->isNull, pAnotherState->pData, event);
×
570
      QUERY_CHECK_CODE(code, lino, _end);
×
571
    } else {
572
      code = jsonAddColumnField("nextState", pCurState->type, true, NULL, event);
×
573
      QUERY_CHECK_CODE(code, lino, _end);
×
574
    }
575
  }
576

577
  item.pJson = event;
×
578
  event = NULL;
×
579
  origSize = taosHashGetSize(sup->pWindowEventHashMap);
×
580
  code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
×
581
  QUERY_CHECK_CODE(code, lino, _end);
×
582
  item.pJson = NULL;
×
583

584
  endTime = taosGetMonoTimestampMs();
×
585
  pNotifyEventStat->notifyEventAddTimes++;
×
586
  pNotifyEventStat->notifyEventAddElems += taosHashGetSize(sup->pWindowEventHashMap) - origSize;
×
587
  pNotifyEventStat->notifyEventAddCostSec += (endTime - startTime) / 1000.0;
×
588
  pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
×
589

590
_end:
×
591
  if (code != TSDB_CODE_SUCCESS) {
×
592
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
593
  }
594
  destroyStreamWindowEvent(&item);
×
595
  if (event != NULL) {
×
596
    cJSON_Delete(event);
×
597
  }
598
  return code;
×
599
}
600

601
static int32_t addNormalAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
×
602
                                       SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
603
  int32_t            code = TSDB_CODE_SUCCESS;
×
604
  int32_t            lino = 0;
×
605
  cJSON*             event = NULL;
×
606
  int32_t            origSize = 0;
×
607
  int64_t            startTime = 0;
×
608
  int64_t            endTime = 0;
×
609
  SStreamNotifyEvent item = {0};
×
610

611
  QUERY_CHECK_NULL(pSessionKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
612
  QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
613
  QUERY_CHECK_NULL(sup->windowType, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
614
  QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
615

616
  startTime = taosGetMonoTimestampMs();
×
617
  event = createBasicAggNotifyEvent(sup->windowType, eventType, pSessionKey);
×
618
  QUERY_CHECK_NULL(event, code, lino, _end, terrno);
×
619

620
  item.gid = pSessionKey->groupId;
×
621
  item.win = pSessionKey->win;
×
622
  item.eventType = eventType;
×
623
  item.pJson = event;
×
624
  event = NULL;
×
625
  origSize = taosHashGetSize(sup->pWindowEventHashMap);
×
626
  code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
×
627
  QUERY_CHECK_CODE(code, lino, _end);
×
628
  item.pJson = NULL;
×
629

630
  endTime = taosGetMonoTimestampMs();
×
631
  pNotifyEventStat->notifyEventAddTimes++;
×
632
  pNotifyEventStat->notifyEventAddElems += taosHashGetSize(sup->pWindowEventHashMap) - origSize;
×
633
  pNotifyEventStat->notifyEventAddCostSec += (endTime - startTime) / 1000.0;
×
634
  pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
×
635

636
_end:
×
637
  if (code != TSDB_CODE_SUCCESS) {
×
638
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
639
  }
640
  destroyStreamWindowEvent(&item);
×
641
  if (event != NULL) {
×
642
    cJSON_Delete(event);
×
643
  }
644
  return code;
×
645
}
646

647
int32_t addIntervalAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
×
648
                                  SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
649
  return addNormalAggNotifyEvent(eventType, pSessionKey, sup, pNotifyEventStat);
×
650
}
651

652
int32_t addSessionAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
×
653
                                 SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
654
  return addNormalAggNotifyEvent(eventType, pSessionKey, sup, pNotifyEventStat);
×
655
}
656

657
int32_t addCountAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
×
658
                               SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
659
  return addNormalAggNotifyEvent(eventType, pSessionKey, sup, pNotifyEventStat);
×
660
}
661

662
int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SArray* pSessionKeys,
×
663
                                const SSchemaWrapper* pSchemaWrapper, SStreamNotifyEventSupp* sup,
664
                                STaskNotifyEventStat* pNotifyEventStat) {
665
  int32_t            code = TSDB_CODE_SUCCESS;
×
666
  int32_t            lino = 0;
×
667
  cJSON*             result = NULL;
×
668
  int32_t            origSize = 0;
×
669
  int64_t            startTime = 0;
×
670
  int64_t            endTime = 0;
×
671
  SStreamNotifyEvent item = {0};
×
672

673
  QUERY_CHECK_NULL(pResultBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
674
  QUERY_CHECK_NULL(pSessionKeys, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
675
  QUERY_CHECK_NULL(pSchemaWrapper, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
676
  QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
677
  QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
678

679
  qDebug("add %" PRId64 " stream notify results from window agg", pResultBlock->info.rows);
×
680
  startTime = taosGetMonoTimestampMs();
×
681
  origSize = taosHashGetSize(sup->pWindowEventHashMap);
×
682

683
  for (int32_t i = 0; i < pResultBlock->info.rows; ++i) {
×
684
    const SSessionKey* pSessionKey = taosArrayGet(pSessionKeys, i);
×
685
    item.gid = pSessionKey->groupId;
×
686
    item.win = pSessionKey->win;
×
687
    item.eventType = SNOTIFY_EVENT_WINDOW_CLOSE;
×
688
    SStreamNotifyEvent* pItem = taosHashGet(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE);
×
689
    if (pItem == NULL) {
×
690
      item.pJson = createBasicAggNotifyEvent(sup->windowType, SNOTIFY_EVENT_WINDOW_CLOSE, pSessionKey);
×
691
      QUERY_CHECK_NULL(item.pJson, code, lino, _end, terrno);
×
692
      if (strcmp(sup->windowType, "Event") == 0) {
×
693
        JSON_CHECK_ADD_ITEM(item.pJson, "triggerCondition", cJSON_CreateNull());
×
694
      } else if (strcmp(sup->windowType, "State") == 0) {
×
695
        JSON_CHECK_ADD_ITEM(item.pJson, "curState", cJSON_CreateNull());
×
696
        JSON_CHECK_ADD_ITEM(item.pJson, "nextState", cJSON_CreateNull());
×
697
      }
698
      code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
×
699
      QUERY_CHECK_CODE(code, lino, _end);
×
700
      item.pJson = NULL;
×
701
      pItem = taosHashGet(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE);
×
702
      QUERY_CHECK_NULL(pItem, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
703
    }
704

705
    // convert the result row into json
706
    result = cJSON_CreateObject();
×
707
    QUERY_CHECK_NULL(result, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
708
    for (int32_t j = 0; j < pSchemaWrapper->nCols; ++j) {
×
709
      const SSchema*         pCol = pSchemaWrapper->pSchema + j;
×
710
      const SColumnInfoData* pColData = taosArrayGet(pResultBlock->pDataBlock, pCol->colId - 1);
×
711
      code = jsonAddColumnField(pCol->name, pColData->info.type, colDataIsNull_s(pColData, i),
×
712
                                colDataGetData(pColData, i), result);
×
713
      QUERY_CHECK_CODE(code, lino, _end);
×
714
    }
715
    JSON_CHECK_ADD_ITEM(pItem->pJson, "result", result);
×
716
    result = NULL;
×
717
  }
718

719
  endTime = taosGetMonoTimestampMs();
×
720
  pNotifyEventStat->notifyEventAddTimes++;
×
721
  pNotifyEventStat->notifyEventAddElems += taosHashGetSize(sup->pWindowEventHashMap) - origSize;
×
722
  pNotifyEventStat->notifyEventAddCostSec += (endTime - startTime) / 1000.0;
×
723
  pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
×
724

725
_end:
×
726
  if (code != TSDB_CODE_SUCCESS) {
×
727
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
728
  }
729
  destroyStreamWindowEvent(&item);
×
730
  if (result != NULL) {
×
731
    cJSON_Delete(result);
×
732
  }
733
  return code;
×
734
}
735

736
int32_t addAggDeleteNotifyEvent(const SSDataBlock* pDeleteBlock, SStreamNotifyEventSupp* sup,
×
737
                                STaskNotifyEventStat* pNotifyEventStat) {
738
  int32_t          code = TSDB_CODE_SUCCESS;
×
739
  int32_t          lino = 0;
×
740
  SSessionKey      sessionKey = {0};
×
741
  SColumnInfoData* pWstartCol = NULL;
×
742
  SColumnInfoData* pWendCol = NULL;
×
743
  SColumnInfoData* pGroupIdCol = NULL;
×
744

745
  QUERY_CHECK_NULL(pDeleteBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
746
  QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
747
  QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
748

749
  qDebug("add %" PRId64 " stream notify delete events from window agg", pDeleteBlock->info.rows);
×
750

751
  pWstartCol = taosArrayGet(pDeleteBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
752
  pWendCol = taosArrayGet(pDeleteBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
753
  pGroupIdCol = taosArrayGet(pDeleteBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
754
  for (int32_t i = 0; i < pDeleteBlock->info.rows; ++i) {
×
755
    sessionKey.win.skey = *(int64_t*)colDataGetNumData(pWstartCol, i);
×
756
    sessionKey.win.ekey = *(int64_t*)colDataGetNumData(pWendCol, i);
×
757
    sessionKey.groupId = *(uint64_t*)colDataGetNumData(pGroupIdCol, i);
×
758
    code = addNormalAggNotifyEvent(SNOTIFY_EVENT_WINDOW_INVALIDATION, &sessionKey, sup, pNotifyEventStat);
×
759
    QUERY_CHECK_CODE(code, lino, _end);
×
760
  }
761

762
_end:
×
763
  if (code != TSDB_CODE_SUCCESS) {
×
764
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
765
  }
766
  return code;
×
767
}
768

769
static int32_t streamNotifyGetDestTableName(const SExecTaskInfo* pTaskInfo, uint64_t gid, char** pTableName) {
×
770
  int32_t                code = TSDB_CODE_SUCCESS;
×
771
  int32_t                lino = 0;
×
772
  const SStorageAPI*     pAPI = NULL;
×
773
  void*                  tbname = NULL;
×
774
  int32_t                winCode = TSDB_CODE_SUCCESS;
×
775
  char                   parTbName[TSDB_TABLE_NAME_LEN];
776
  const SStreamTaskInfo* pStreamInfo = NULL;
×
777

778
  QUERY_CHECK_NULL(pTaskInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
779
  QUERY_CHECK_NULL(pTableName, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
780

781
  *pTableName = NULL;
×
782

783
  pAPI = &pTaskInfo->storageAPI;
×
784
  code = pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, gid, &tbname, false, &winCode);
×
785
  QUERY_CHECK_CODE(code, lino, _end);
×
786
  if (winCode != TSDB_CODE_SUCCESS) {
×
787
    parTbName[0] = '\0';
×
788
  } else {
789
    tstrncpy(parTbName, tbname, sizeof(parTbName));
×
790
  }
791
  pAPI->stateStore.streamStateFreeVal(tbname);
×
792

793
  pStreamInfo = &pTaskInfo->streamInfo;
×
794
  code = buildSinkDestTableName(parTbName, pStreamInfo->stbFullName, gid, pStreamInfo->newSubTableRule, pTableName);
×
795
  QUERY_CHECK_CODE(code, lino, _end);
×
796

797
_end:
×
798
  if (code != TSDB_CODE_SUCCESS) {
×
799
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
800
  }
801
  return code;
×
802
}
803

804
int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup,
49,647✔
805
                              STaskNotifyEventStat* pNotifyEventStat) {
806
  int32_t          code = TSDB_CODE_SUCCESS;
49,647✔
807
  int32_t          lino = 0;
49,647✔
808
  int32_t          nWindowEvents = 0;
49,647✔
809
  SColumnInfoData* pEventStrCol = NULL;
49,647✔
810
  int64_t          startTime = 0;
49,647✔
811
  int64_t          endTime = 0;
49,647✔
812
  void*            pIter = NULL;
49,647✔
813

814
  if (pTaskInfo == NULL || sup == NULL || sup->pEventBlock == NULL || pNotifyEventStat == NULL) {
49,647!
815
    goto _end;
49,647✔
816
  }
817

818
  QUERY_CHECK_NULL(sup->pEventBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
819

820
  startTime = taosGetMonoTimestampMs();
×
821
  blockDataCleanup(sup->pEventBlock);
×
822
  nWindowEvents = taosHashGetSize(sup->pWindowEventHashMap);
×
823
  qDebug("start to build stream notify event block, nWindowEvents: %d", nWindowEvents);
×
824

825
  pEventStrCol = taosArrayGet(sup->pEventBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
×
826
  QUERY_CHECK_NULL(pEventStrCol, code, lino, _end, terrno);
×
827

828
  // Append all events content into data block.
829
  pIter = taosHashIterate(sup->pWindowEventHashMap, NULL);
×
830
  while (pIter) {
×
831
    const SStreamNotifyEvent* pEvent = (const SStreamNotifyEvent*)pIter;
×
832
    pIter = taosHashIterate(sup->pWindowEventHashMap, pIter);
×
833
    if (pEvent->eventType == SNOTIFY_EVENT_WINDOW_CLOSE && !cJSON_HasObjectItem(pEvent->pJson, "result")) {
×
834
      // current WINDOW_CLOSE event cannot be pushed yet due to watermark
835
      continue;
×
836
    }
837

838
    // get name of the dest child table
839
    char* tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(&pEvent->gid));
×
840
    if (tableName == NULL) {
×
841
      code = streamNotifyGetDestTableName(pTaskInfo, pEvent->gid, &tableName);
×
842
      QUERY_CHECK_CODE(code, lino, _end);
×
843
      code = taosHashPut(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid), tableName, strlen(tableName) + 1);
×
844
      taosMemoryFreeClear(tableName);
×
845
      QUERY_CHECK_CODE(code, lino, _end);
×
846
      tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid));
×
847
      QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
848
    }
849
    JSON_CHECK_ADD_ITEM(pEvent->pJson, "tableName", cJSON_CreateStringReference(tableName));
×
850

851
    // convert the json object into string and append it into the block
852
    char* str = cJSON_PrintUnformatted(pEvent->pJson);
×
853
    QUERY_CHECK_NULL(str, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
×
854
    int32_t len = strlen(str);
×
855
    code = varColSetVarData(pEventStrCol, sup->pEventBlock->info.rows, str, len, false);
×
856
    cJSON_free(str);
×
857
    QUERY_CHECK_CODE(code, lino, _end);
×
858
    sup->pEventBlock->info.rows++;
×
859
    code = taosHashRemove(sup->pWindowEventHashMap, pEvent, NOTIFY_EVENT_KEY_SIZE);
×
860
    if (code == TSDB_CODE_NOT_FOUND) {
×
861
      code = TSDB_CODE_SUCCESS;
×
862
    }
863
    QUERY_CHECK_CODE(code, lino, _end);
×
864
    if (sup->pEventBlock->info.rows >= sup->pEventBlock->info.capacity) {
×
865
      break;
×
866
    }
867
  }
868

869
  if (taosHashGetMemSize(sup->pTableNameHashMap) >= NOTIFY_EVENT_NAME_CACHE_LIMIT_MB * 1024 * 1024) {
×
870
    taosHashClear(sup->pTableNameHashMap);
×
871
  }
872

873
  endTime = taosGetMonoTimestampMs();
×
874
  if (sup->pEventBlock->info.rows > 0) {
×
875
    pNotifyEventStat->notifyEventPushTimes++;
×
876
    pNotifyEventStat->notifyEventPushElems += sup->pEventBlock->info.rows;
×
877
    pNotifyEventStat->notifyEventPushCostSec += (endTime - startTime) / 1000.0;
×
878
  }
879
  pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
×
880

881
_end:
49,647✔
882
  if (code != TSDB_CODE_SUCCESS) {
49,647!
883
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
884
  }
885
  if (pIter) {
49,647!
886
    taosHashCancelIterate(sup->pWindowEventHashMap, pIter);
×
887
  }
888
  return code;
49,652✔
889
}
890

891
int32_t removeOutdatedNotifyEvents(STimeWindowAggSupp* pTwSup, SStreamNotifyEventSupp* sup,
49,646✔
892
                                   STaskNotifyEventStat* pNotifyEventStat) {
893
  int32_t code = TSDB_CODE_SUCCESS;
49,646✔
894
  int32_t lino = 0;
49,646✔
895
  void*   pIter = NULL;
49,646✔
896

897
  if (pTwSup || sup == NULL || pNotifyEventStat == NULL) {
49,646!
898
    goto _end;
49,646✔
899
  }
900

901
  pIter = taosHashIterate(sup->pWindowEventHashMap, NULL);
×
902
  while (pIter) {
×
903
    const SStreamNotifyEvent* pEvent = (const SStreamNotifyEvent*)pIter;
×
904
    pIter = taosHashIterate(sup->pWindowEventHashMap, pIter);
×
905
    if (isOverdue(pEvent->win.ekey, pTwSup)) {
×
906
      code = taosHashRemove(sup->pWindowEventHashMap, pEvent, NOTIFY_EVENT_KEY_SIZE);
×
907
      QUERY_CHECK_CODE(code, lino, _end);
×
908
    }
909
  }
910

911
  pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
×
912

913
_end:
49,646✔
914
  if (code != TSDB_CODE_SUCCESS) {
49,646!
915
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
916
  }
917
  return code;
49,641✔
918
}
919

920
void setFinalOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
18✔
921
  BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, FINAL_OPERATOR);
18✔
922
}
18✔
923

924
bool isFinalOperator(SSteamOpBasicInfo* pBasicInfo) {
4,410✔
925
  return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, FINAL_OPERATOR);
4,410✔
926
}
927

928
void setRecalculateOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
36✔
929
  BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, RECALCULATE_OPERATOR);
36✔
930
}
36✔
931

932
void unsetRecalculateOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
×
933
  BIT_FLAG_UNSET_MASK(pBasicInfo->operatorFlag, RECALCULATE_OPERATOR);
×
934
}
×
935

936
bool isRecalculateOperator(SSteamOpBasicInfo* pBasicInfo) {
8,666✔
937
  return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, RECALCULATE_OPERATOR);
8,666✔
938
}
939

940
void setSingleOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
110✔
941
  BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, SINGLE_OPERATOR);
110✔
942
}
110✔
943

944
bool isSingleOperator(SSteamOpBasicInfo* pBasicInfo) {
375✔
945
  return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, SINGLE_OPERATOR);
375✔
946
}
947

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc