• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5043

29 Apr 2026 11:44AM UTC coverage: 73.107% (-0.06%) from 73.17%
#5043

push

travis-ci

web-flow
feat(statewindow): support multi columns (#35136)

1563 of 1828 new or added lines in 18 files covered. (85.5%)

7490 existing lines in 148 files now uncovered.

277321 of 379338 relevant lines covered (73.11%)

131116908.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.25
/source/common/src/msg/streamJson.c
1
#include "streamMsg.h"
2
#include "tjson.h"
3

4
static int32_t int16ToJson(const void* pObj, SJson* pJson);
5
static int32_t jsonToInt16(const SJson* pJson, void* pObj);
6

7
static const char* jkFieldName     = "name";
8
static const char* jkFieldType     = "type";
9
static const char* jkFieldFlags    = "flags";
10
static const char* jkFieldBytes    = "bytes";
11
static const char* jkFieldCompress = "compress";
12
static const char* jkFieldTypeMod  = "typeMod";
13
static int32_t sfieldWithOptionsToJson(const void* pObj, SJson* pJson) {
7,053,276✔
14
  const SFieldWithOptions* pField = (const SFieldWithOptions*)pObj;
7,053,276✔
15
  if (NULL != pField->name) {
7,053,276✔
16
    TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson, jkFieldName, pField->name));
7,053,276✔
17
  }
18
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkFieldType, pField->type));
7,053,276✔
19
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
7,053,276✔
20
    pJson, jkFieldFlags, pField->flags));
21
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
7,053,276✔
22
    pJson, jkFieldBytes, pField->bytes));
23
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
7,053,276✔
24
    pJson, jkFieldCompress, pField->compress));
25
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
7,053,276✔
26
    pJson, jkFieldTypeMod, pField->typeMod));
27
  return TSDB_CODE_SUCCESS;
7,053,276✔
28
}
29

30
static int32_t jsonToSFieldWithOptions(const SJson* pJson, void* pObj) {
2,105,884✔
31
  SFieldWithOptions* pField = (SFieldWithOptions*)pObj;
2,105,884✔
32
  TAOS_CHECK_RETURN(tjsonGetStringValue1(pJson, jkFieldName, pField->name, sizeof(pField->name)));
2,105,884✔
33
  TAOS_CHECK_RETURN(tjsonGetUTinyIntValue(pJson, jkFieldType, &pField->type));
2,105,884✔
34
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(pJson, jkFieldFlags, &pField->flags));
2,105,884✔
35
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkFieldBytes, &pField->bytes));
2,105,884✔
36
  TAOS_CHECK_RETURN(tjsonGetUIntValue(
2,105,884✔
37
    pJson, jkFieldCompress, &pField->compress));
38
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkFieldTypeMod, &pField->typeMod));
2,105,884✔
39
  return TSDB_CODE_SUCCESS;
2,105,884✔
40
}
41

42
static int32_t stagFieldWithOptionsToJson(const void* pObj, SJson* pJson) {
1,017,098✔
43
  const SFieldWithOptions* pField = (const SFieldWithOptions*)pObj;
1,017,098✔
44
  TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson, jkFieldName, pField->name));
1,017,098✔
45
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkFieldType, pField->type));
1,017,098✔
46
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,017,098✔
47
    pJson, jkFieldFlags, pField->flags));
48
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,017,098✔
49
    pJson, jkFieldBytes, pField->bytes));
50
  // TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
51
  //   pJson, jkFieldCompress, pField->compress));
52
  // TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
53
  //   pJson, jkFieldTypeMod, pField->typeMod));
54
  return TSDB_CODE_SUCCESS;
1,017,098✔
55
}
56

57
static int32_t jsonToSTagFieldWithOptions(const SJson* pJson, void* pObj) {
303,478✔
58
  SFieldWithOptions* pField = (SFieldWithOptions*)pObj;
303,478✔
59
  TAOS_CHECK_RETURN(tjsonGetStringValue1(pJson, jkFieldName, pField->name, sizeof(pField->name)));
303,478✔
60
  TAOS_CHECK_RETURN(tjsonGetUTinyIntValue(pJson, jkFieldType, &pField->type));
303,478✔
61
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(pJson, jkFieldFlags, &pField->flags));
303,478✔
62
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkFieldBytes, &pField->bytes));
303,478✔
63
  // TAOS_CHECK_RETURN(tjsonGetUIntValue(
64
  //   pJson, jkFieldCompress, &pField->compress));
65
  // TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkFieldTypeMod, &pField->typeMod));
66
  return TSDB_CODE_SUCCESS;
303,478✔
67
}
68

69
static const char* jkSessionTriggerSlotId     = "slotId";
70
static const char* jkSessionTriggerSessionVal = "sessionVal";
71
static int32_t sessionTriggerToJson(const void* pObj, SJson* pJson) {
85,252✔
72
  const SSessionTrigger* pTrigger = (const SSessionTrigger*)pObj;
85,252✔
73
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
85,252✔
74
    pJson, jkSessionTriggerSlotId, pTrigger->slotId));
75
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
85,252✔
76
    pJson, jkSessionTriggerSessionVal, pTrigger->sessionVal));
77
  return TSDB_CODE_SUCCESS;
85,252✔
78
}
79

80
static int32_t jsonToSessionTrigger(const SJson* pJson, void* pObj) {
19,045✔
81
  SSessionTrigger* pTrigger = (SSessionTrigger*)pObj;
19,045✔
82
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(
19,045✔
83
    pJson, jkSessionTriggerSlotId, &pTrigger->slotId));
84
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
19,045✔
85
    pJson, jkSessionTriggerSessionVal, &pTrigger->sessionVal));
86
  return TSDB_CODE_SUCCESS;
19,045✔
87
}
88

89
/* forward compat: decode old "slotId" single-value payloads */
90
static const char* jkStateTriggerSlotId           = "slotId";
91
static const char* jkStateTriggerSlotIds          = "slotIds";
92
static const char* jkStateTriggerExtend           = "extend";
93
static const char* jkStateTriggerZeroth           = "zeroth";
94
static const char* jkStateTriggerTrueForType      = "trueForType";
95
static const char* jkStateTriggerTrueForCount     = "trueForCount";
96
static const char* jkStateTriggerTrueForDuration  = "trueForDuration";
97
static const char* jkStateTriggerExpr             = "expr";
98
static int32_t stateTriggerToJson(const void* pObj, SJson* pJson) {
556,866✔
99
  const SStateWinTrigger* pTrigger = (const SStateWinTrigger*)pObj;
556,866✔
100
  TAOS_CHECK_RETURN(tjsonAddTArray(
556,866✔
101
    pJson, jkStateTriggerSlotIds,
102
    int16ToJson, pTrigger->pSlotIds));
103
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
556,866✔
104
    pJson, jkStateTriggerExtend, pTrigger->extend));
105
  if (NULL != pTrigger->zeroth) {
556,866✔
106
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
34,284✔
107
      pJson, jkStateTriggerZeroth, (const char*)pTrigger->zeroth));
108
  }
109
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkStateTriggerTrueForType, pTrigger->trueForType));
556,866✔
110
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkStateTriggerTrueForCount, pTrigger->trueForCount));
556,866✔
111
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkStateTriggerTrueForDuration, pTrigger->trueForDuration));
556,866✔
112
  if (NULL != pTrigger->expr) {
556,866✔
113
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
556,866✔
114
      pJson, jkStateTriggerExpr, (const char*)pTrigger->expr));
115
  }
116
  return TSDB_CODE_SUCCESS;
556,866✔
117
}
118

119
static int32_t jsonToStateTrigger(const SJson* pJson, void* pObj) {
189,008✔
120
  SStateWinTrigger* pTrigger = (SStateWinTrigger*)pObj;
189,008✔
121
  SJson* pSlotIds = tjsonGetObjectItem(pJson, jkStateTriggerSlotIds);
189,008✔
122
  if (pSlotIds != NULL) {
189,008✔
123
    TAOS_CHECK_RETURN(tjsonToTArray(
189,008✔
124
      pJson, jkStateTriggerSlotIds, jsonToInt16, &pTrigger->pSlotIds, sizeof(int16_t)));
NEW
125
  } else if (tjsonGetObjectItem(pJson, jkStateTriggerSlotId) != NULL) {
×
NEW
126
    int16_t slotId = -1;
×
NEW
127
    TAOS_CHECK_RETURN(tjsonGetSmallIntValue(pJson, jkStateTriggerSlotId, &slotId));
×
NEW
128
    pTrigger->pSlotIds = taosArrayInit(1, sizeof(int16_t));
×
NEW
129
    if (pTrigger->pSlotIds == NULL) {
×
NEW
130
      return terrno;
×
131
    }
NEW
132
    if (taosArrayPush(pTrigger->pSlotIds, &slotId) == NULL) {
×
NEW
133
      return terrno;
×
134
    }
135
  }
136
  TAOS_CHECK_RETURN(
189,008✔
137
    tjsonGetSmallIntValue(pJson, jkStateTriggerExtend, &pTrigger->extend));
138
  TAOS_CHECK_RETURN(tjsonDupStringValue(
189,008✔
139
    pJson, jkStateTriggerZeroth, (char**)&pTrigger->zeroth));
140
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkStateTriggerTrueForType, &pTrigger->trueForType));
189,008✔
141
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkStateTriggerTrueForCount, &pTrigger->trueForCount));
189,008✔
142
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(pJson, jkStateTriggerTrueForDuration, &pTrigger->trueForDuration));
189,008✔
143
  TAOS_CHECK_RETURN(tjsonDupStringValue(
189,008✔
144
    pJson, jkStateTriggerExpr, (char**)&pTrigger->expr));
145
  return TSDB_CODE_SUCCESS;
189,008✔
146
}
147

148
static const char* jkSlidingTriggerIntervalUnit = "intervalUnit";
149
static const char* jkSlidingTriggerSlidingUnit  = "slidingUnit";
150
static const char* jkSlidingTriggerOffsetUnit   = "offsetUnit";
151
static const char* jkSlidingTriggerSoffsetUnit  = "soffsetUnit";
152
static const char* jkSlidingTriggerPrecision    = "precision";
153
static const char* jkSlidingTriggerInterval     = "interval";
154
static const char* jkSlidingTriggerOffset       = "offset";
155
static const char* jkSlidingTriggerSliding      = "sliding";
156
static const char* jkSlidingTriggerSoffset      = "soffset";
157
static const char* jkSlidingTriggerOverlap      = "overlap";
158
static int32_t slidingTriggerToJson(const void* pObj, SJson* pJson) {
598,938✔
159
  const SSlidingTrigger* pTrigger = (const SSlidingTrigger*)pObj;
598,938✔
160
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
161
    pJson, jkSlidingTriggerIntervalUnit, pTrigger->intervalUnit));
162
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
163
    pJson, jkSlidingTriggerSlidingUnit, pTrigger->slidingUnit));
164
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
165
    pJson, jkSlidingTriggerOffsetUnit, pTrigger->offsetUnit));
166
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
167
    pJson, jkSlidingTriggerSoffsetUnit, pTrigger->soffsetUnit));
168
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
169
    pJson, jkSlidingTriggerPrecision, pTrigger->precision));
170
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
171
    pJson, jkSlidingTriggerInterval, pTrigger->interval));
172
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
173
    pJson, jkSlidingTriggerSliding, pTrigger->sliding));
174
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
175
    pJson, jkSlidingTriggerOffset, pTrigger->offset));
176
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
177
    pJson, jkSlidingTriggerSoffset, pTrigger->soffset));
178
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
598,938✔
179
    pJson, jkSlidingTriggerOverlap, pTrigger->overlap));
180
  return TSDB_CODE_SUCCESS;
598,938✔
181
}
182

183
static int32_t jsonToSlidingTrigger(const SJson* pJson, void* pObj) {
191,884✔
184
  SSlidingTrigger* pTrigger = (SSlidingTrigger*)pObj;
191,884✔
185
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
191,884✔
186
    pJson, jkSlidingTriggerIntervalUnit, &pTrigger->intervalUnit));
187
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
191,884✔
188
    pJson, jkSlidingTriggerSlidingUnit, &pTrigger->slidingUnit));
189
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
191,884✔
190
    pJson, jkSlidingTriggerOffsetUnit, &pTrigger->offsetUnit));
191
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
191,884✔
192
    pJson, jkSlidingTriggerSoffsetUnit, &pTrigger->soffsetUnit));
193
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
191,884✔
194
    pJson, jkSlidingTriggerPrecision, &pTrigger->precision));
195
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
191,884✔
196
    pJson, jkSlidingTriggerInterval, &pTrigger->interval));
197
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
191,884✔
198
    pJson, jkSlidingTriggerSliding, &pTrigger->sliding));
199
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
191,884✔
200
    pJson, jkSlidingTriggerOffset, &pTrigger->offset));
201
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
191,884✔
202
    pJson, jkSlidingTriggerSoffset, &pTrigger->soffset));
203
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
191,884✔
204
    pJson, jkSlidingTriggerOverlap, &pTrigger->overlap));
205
  return TSDB_CODE_SUCCESS;
191,884✔
206
}
207

208
static const char* jkEventTriggerStartCond       = "startCond";
209
static const char* jkEventTriggerEndCond         = "endCond";
210
static const char* jkEventTriggerTrueForType     = "trueForType";
211
static const char* jkEventTriggerTrueForCount    = "trueForCount";
212
static const char* jkEventTriggerTrueForDuration = "trueForDuration";
213
static int32_t eventTriggerToJson(const void* pObj, SJson* pJson) {
177,744✔
214
  const SEventTrigger* pTrigger = (const SEventTrigger*)pObj;
177,744✔
215
  if (NULL != pTrigger->startCond) {
177,744✔
216
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
177,744✔
217
      pJson, jkEventTriggerStartCond, (const char*)pTrigger->startCond));
218
  }
219
  if (NULL != pTrigger->endCond) {
177,744✔
220
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
165,264✔
221
      pJson, jkEventTriggerEndCond, (const char*)pTrigger->endCond));
222
  }
223
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkEventTriggerTrueForType, pTrigger->trueForType));
177,744✔
224
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkEventTriggerTrueForCount, pTrigger->trueForCount));
177,744✔
225
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkEventTriggerTrueForDuration, pTrigger->trueForDuration));
177,744✔
226
  return TSDB_CODE_SUCCESS;
177,744✔
227
}
228

229
static int32_t jsonToEventTrigger(const SJson* pJson, void* pObj) {
49,704✔
230
  SEventTrigger* pTrigger = (SEventTrigger*)pObj;
49,704✔
231
  TAOS_CHECK_RETURN(tjsonDupStringValue(
49,704✔
232
    pJson, jkEventTriggerStartCond, (char**)&pTrigger->startCond));
233
  TAOS_CHECK_RETURN(tjsonDupStringValue(
49,704✔
234
    pJson, jkEventTriggerEndCond, (char**)&pTrigger->endCond));
235
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkEventTriggerTrueForType, &pTrigger->trueForType));
49,704✔
236
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, jkEventTriggerTrueForCount, &pTrigger->trueForCount));
49,704✔
237
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(pJson, jkEventTriggerTrueForDuration, &pTrigger->trueForDuration));
49,704✔
238
  return TSDB_CODE_SUCCESS;
49,704✔
239
}
240

241
static const char* jkCountTriggerCountVal = "countVal";
242
static const char* jkCountTriggerSliding  = "sliding";
243
static const char* jkCountTriggerCondCols = "condCols";
244
static int32_t countTriggerToJson(const void* pObj, SJson* pJson) {
200,458✔
245
  const SCountTrigger* pTrigger = (const SCountTrigger*)pObj;
200,458✔
246
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkCountTriggerCountVal, pTrigger->countVal));
200,458✔
247
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, jkCountTriggerSliding, pTrigger->sliding));
200,458✔
248
  if (NULL != pTrigger->condCols) {
200,458✔
UNCOV
249
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
×
250
      pJson, jkCountTriggerCondCols, (const char*)pTrigger->condCols));
251
  }
252
  return TSDB_CODE_SUCCESS;
200,458✔
253
}
254

255
static int32_t jsonToCountTrigger(const SJson* pJson, void* pObj) {
54,260✔
256
  SCountTrigger* pTrigger = (SCountTrigger*)pObj;
54,260✔
257
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
54,260✔
258
    pJson, jkCountTriggerCountVal, &pTrigger->countVal));
259
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
54,260✔
260
    pJson, jkCountTriggerSliding, &pTrigger->sliding));
261
  TAOS_CHECK_RETURN(tjsonDupStringValue(
54,260✔
262
    pJson, jkCountTriggerCondCols, (char**)&pTrigger->condCols));
263
  return TSDB_CODE_SUCCESS;
54,260✔
264
}
265

266
static const char* jkPeriodTriggerPeriodUnit = "periodUnit";
267
static const char* jkPeriodTriggerOffsetUnit = "offsetUnit";
268
static const char* jkPeriodTriggerPrecision  = "precision";
269
static const char* jkPeriodTriggerPeriod     = "period";
270
static const char* jkPeriodTriggerOffset     = "offset";
271
static int32_t periodTriggerToJson(const void* pObj, SJson* pJson) {
84,416✔
272
  const SPeriodTrigger* pTrigger = (const SPeriodTrigger*)pObj;
84,416✔
273
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
84,416✔
274
    pJson, jkPeriodTriggerPeriodUnit, pTrigger->periodUnit));
275
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
84,416✔
276
    pJson, jkPeriodTriggerOffsetUnit, pTrigger->offsetUnit));
277
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
84,416✔
278
    pJson, jkPeriodTriggerPrecision, pTrigger->precision));
279
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
84,416✔
280
    pJson, jkPeriodTriggerPeriod, pTrigger->period));
281
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
84,416✔
282
    pJson, jkPeriodTriggerOffset, pTrigger->offset));
283
  return TSDB_CODE_SUCCESS;
84,416✔
284
}
285

286
static int32_t jsonToPeriodTrigger(const SJson* pJson, void* pObj) {
24,276✔
287
  SPeriodTrigger* pTrigger = (SPeriodTrigger*)pObj;
24,276✔
288
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
24,276✔
289
    pJson, jkPeriodTriggerPeriodUnit, &pTrigger->periodUnit));
290
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
24,276✔
291
    pJson, jkPeriodTriggerOffsetUnit, &pTrigger->offsetUnit));
292
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
24,276✔
293
    pJson, jkPeriodTriggerPrecision, &pTrigger->precision));
294
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
24,276✔
295
    pJson, jkPeriodTriggerPeriod, &pTrigger->period));
296
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
24,276✔
297
    pJson, jkPeriodTriggerOffset, &pTrigger->offset));
298
  return TSDB_CODE_SUCCESS;
24,276✔
299
}
300

301
static int32_t int32ToJson(const void* pObj, SJson* pJson) {
3,580,186✔
302
  const int32_t* pInt = (const int32_t*)pObj;
3,580,186✔
303
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "value", *pInt));
3,580,186✔
304
  return TSDB_CODE_SUCCESS;
3,580,186✔
305
}
306

307
static int32_t jsonToInt32(const SJson* pJson, void* pObj) {
1,073,204✔
308
  int32_t* pInt = (int32_t*)pObj;
1,073,204✔
309
  TAOS_CHECK_RETURN(tjsonGetIntValue(pJson, "value", pInt));
1,073,204✔
310
  return TSDB_CODE_SUCCESS;
1,073,204✔
311
}
312

313
static int32_t int16ToJson(const void* pObj, SJson* pJson) {
791,754✔
314
  const int16_t* pInt = (const int16_t*)pObj;
791,754✔
315
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(pJson, "value", *pInt));
791,754✔
316
  return TSDB_CODE_SUCCESS;
791,754✔
317
}
318

319
static int32_t jsonToInt16(const SJson* pJson, void* pObj) {
270,101✔
320
  int16_t* pInt = (int16_t*)pObj;
270,101✔
321
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(pJson, "value", pInt));
270,101✔
322
  return TSDB_CODE_SUCCESS;
270,101✔
323
}
324

325
static const char* jkSstreamCalcScanVgList        = "vgList";
326
static const char* jkSstreamCalcScanReadFromCache = "readFromCache";
327
static const char* jkSstreamCalcScanScanPlan      = "scanPlan";
328
static int32_t calcScanPlanToJson(const void* pObj, SJson* pJson) {
3,580,186✔
329
  const SStreamCalcScan* pPlan = (const SStreamCalcScan*)pObj;
3,580,186✔
330
  TAOS_CHECK_RETURN(tjsonAddArray(
3,580,186✔
331
    pJson, jkSstreamCalcScanVgList, int32ToJson,
332
    pPlan->vgList ? TARRAY_GET_ELEM(pPlan->vgList, 0) : NULL, sizeof(int32_t),
333
    pPlan->vgList ? pPlan->vgList->size : 0));
334
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
3,580,186✔
335
    pJson, jkSstreamCalcScanReadFromCache, pPlan->readFromCache));
336
  if (NULL != pPlan->scanPlan) {
3,580,186✔
337
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
3,580,186✔
338
      pJson, jkSstreamCalcScanScanPlan, (const char*)pPlan->scanPlan));
339
  }
340
  return TSDB_CODE_SUCCESS;
3,580,186✔
341
}
342

343
static int32_t jsonToCalcScanPlan(const SJson* pJson, void* pObj) {
1,073,204✔
344
  SStreamCalcScan* pPlan = (SStreamCalcScan*)pObj;
1,073,204✔
345
  TAOS_CHECK_RETURN(tjsonToTArray(
1,073,204✔
346
    pJson, jkSstreamCalcScanVgList, jsonToInt32,
347
    &pPlan->vgList, sizeof(int32_t)));
348
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
1,073,204✔
349
    pJson, jkSstreamCalcScanReadFromCache, &pPlan->readFromCache));
350
  TAOS_CHECK_RETURN(tjsonDupStringValue(
1,073,204✔
351
    pJson, jkSstreamCalcScanScanPlan, (char**)&pPlan->scanPlan));
352
  return TSDB_CODE_SUCCESS;
1,073,204✔
353
}
354

355
static const char* jkSDataTypeType      = "type";
356
static const char* jkSDataTypePrecision = "precision";
357
static const char* jkSDataTypeScale     = "scale";
358
static const char* jkSDataTypeBytes     = "bytes";
359
static int32_t sDataTypeToJson(const void* pObj, SJson* pJson) {
116,504✔
360
  const SDataType* pType = (const SDataType*)pObj;
116,504✔
361
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
116,504✔
362
    pJson, jkSDataTypeType, pType->type));
363
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
116,504✔
364
    pJson, jkSDataTypePrecision, pType->precision));
365
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
116,504✔
366
    pJson, jkSDataTypeScale, pType->scale));
367
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
116,504✔
368
    pJson, jkSDataTypeBytes, pType->bytes));
369
  return TSDB_CODE_SUCCESS;
116,504✔
370
}
371

372
static int32_t jsonToSDataType(const SJson* pJson, void* pObj) {
39,943✔
373
  SDataType* pType = (SDataType*)pObj;
39,943✔
374
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
39,943✔
375
    pJson, jkSDataTypeType, &pType->type));
376
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
39,943✔
377
    pJson, jkSDataTypePrecision, &pType->precision));
378
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
39,943✔
379
    pJson, jkSDataTypeScale, &pType->scale));
380
  TAOS_CHECK_RETURN(tjsonGetIntValue(
39,943✔
381
    pJson, jkSDataTypeBytes, &pType->bytes));
382
  return TSDB_CODE_SUCCESS;
39,943✔
383
}
384

385
static const char* jkSStreamOutColExpr = "expr";
386
static const char* jkSStreamOutColType = "type";
387
static int32_t sStreamOutColToJson(const void* pObj, SJson* pJson) {
116,504✔
388
  const SStreamOutCol* pCol = (const SStreamOutCol*)pObj;
116,504✔
389
  if (NULL != pCol->expr) {
116,504✔
390
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
116,504✔
391
      pJson, jkSStreamOutColExpr, (const char*)pCol->expr));
392
  }
393
  TAOS_CHECK_RETURN(tjsonAddObject(
116,504✔
394
    pJson, jkSStreamOutColType, sDataTypeToJson, &pCol->type));
395
  return TSDB_CODE_SUCCESS;
116,504✔
396
}
397

398
static int32_t jsonToSStreamOutCol(const SJson* pJson, void* pObj) {
39,943✔
399
  SStreamOutCol* pCol = (SStreamOutCol*)pObj;
39,943✔
400
  TAOS_CHECK_RETURN(tjsonDupStringValue(
39,943✔
401
    pJson, jkSStreamOutColExpr, (char**)&pCol->expr));
402
  TAOS_CHECK_RETURN(tjsonToObject(
39,943✔
403
    pJson, jkSStreamOutColType, jsonToSDataType, &pCol->type));
404
  return TSDB_CODE_SUCCESS;
39,943✔
405
}
406

407
static int32_t stringToJson(const void* pObj, SJson* pJson) {
2,049,278✔
408
  const char** pStr = (const char**)pObj;
2,049,278✔
409
  TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson, "value", *pStr));
2,049,278✔
410
  return TSDB_CODE_SUCCESS;
2,049,278✔
411
}
412

413
static int32_t jsonToString(const SJson* pJson, void* pObj) {
639,016✔
414
  char** pStr = (char**)pObj;
639,016✔
415
  TAOS_CHECK_RETURN(tjsonDupStringValue(pJson, "value", pStr));
639,016✔
416
  return TSDB_CODE_SUCCESS;
639,016✔
417
}
418

419
static const char* jkCreateStreamReqName                 = "name";
420
static const char* jkCreateStreamReqStreamId             = "streamId";
421
static const char* jkCreateStreamReqSql                  = "sql";
422

423
static const char* jkCreateStreamReqStreamDB             = "streamDB";
424
static const char* jkCreateStreamReqTriggerDB            = "triggerDB";
425
static const char* jkCreateStreamReqOutDB                = "outDB";
426
static const char* jkCreateStreamReqCalcDB               = "calcDB";
427

428
static const char* jkCreateStreamReqTriggerTblName       = "triggerTblName";
429
static const char* jkCreateStreamReqOutTblName           = "outTblName";
430

431
static const char* jkCreateStreamReqIgExists             = "igExists";
432
static const char* jkCreateStreamReqTriggerType          = "triggerType";
433
static const char* jkCreateStreamReqIgDisorder           = "igDisorder";
434
static const char* jkCreateStreamReqDeleteReCalc         = "deleteReCalc";
435
static const char* jkCreateStreamReqDeleteOutTbl         = "deleteOutTbl";
436
static const char* jkCreateStreamReqFillHistory          = "fillHistory";
437
static const char* jkCreateStreamReqFillHistoryFirst     = "fillHistoryFirst";
438
static const char* jkCreateStreamReqCalcNotifyOnly       = "calcNotifyOnly";
439
static const char* jkCreateStreamReqLowLatencyCalc       = "lowLatencyCalc";
440
static const char* jkCreateStreamReqIgNoDataTrigger      = "igNoDataTrigger";
441
static const char* jkCreateStreamReqMultiGroupCalc       = "multiGroupCalc";
442

443
static const char* jkCreateStreamReqPNotifyAddrUrls      = "pNotifyAddrUrls";
444
static const char* jkCreateStreamReqNotifyEventTypes     = "notifyEventTypes";
445
static const char* jkCreateStreamReqAddOptions           = "addOptions";
446
static const char* jkCreateStreamReqNotifyHistory        = "notifyHistory";
447

448
static const char* jkCreateStreamReqTriggerFilterCols    = "triggerFilterCols";
449
static const char* jkCreateStreamReqTriggerCols          = "triggerCols";
450
static const char* jkCreateStreamReqPartitionCols        = "partitionCols";
451
static const char* jkCreateStreamReqOutCols              = "outCols";
452
static const char* jkCreateStreamReqOutTags              = "outTags";
453
static const char* jkCreateStreamReqMaxDelay             = "maxDelay";
454
static const char* jkCreateStreamReqFillHistoryStartTime = 
455
  "fillHistoryStartTime";
456
static const char* jkCreateStreamReqWatermark            = "watermark";
457
static const char* jkCreateStreamReqExpiredTime          = "expiredTime";
458
static const char* jkCreateStreamReqIdleTimeoutMs        = "idleTimeoutMs";
459
static const char* jkCreateStreamReqTrigger              = "trigger";
460

461
static const char* jkCreateStreamReqTriggerTblType       = "triggerTblType";
462
static const char* jkCreateStreamReqTriggerTblUid        = "triggerTblUid";
463
static const char* jkCreateStreamReqTriggerTblSuid       = "triggerTblSuid";
464
static const char* jkCreateStreamReqTriggerPrec          = "triggerPrec";
465
static const char* jkCreateStreamReqVtableCalc           = "vtableCalc";
466
static const char* jkCreateStreamReqOutTblType           = "outTblType";
467
static const char* jkCreateStreamReqOutStbExists         = "outStbExists";
468
static const char* jkCreateStreamReqOutStbUid            = "outStbUid";
469
static const char* jkCreateStreamReqOutStbSversion       = "outStbSversion";
470
static const char* jkCreateStreamReqEventTypes           = "eventTypes";
471
static const char* jkCreateStreamReqFlags                = "flags";
472
static const char* jkCreateStreamReqTsmaId               = "tsmaId";
473
static const char* jkCreateStreamReqPlaceHolderBitmap    = "placeHolderBitmap";
474
static const char* jkCreateStreamReqCalcTsSlotId         = "calcTsSlotId";
475
static const char* jkCreateStreamReqTriTsSlotId          = "triTsSlotId";
476
static const char* jkCreateStreamReqCalcPkSlotId         = "calcPkSlotId";
477
static const char* jkCreateStreamReqTriPkSlotId          = "triPkSlotId";
478

479
static const char* jkCreateStreamReqTriggerTblVgId       = "triggerTblVgId";
480
static const char* jkCreateStreamReqOutTblVgId           = "outTblVgId";
481

482
static const char* jkCreateStreamReqTriggerScanPlan      = "triggerScanPlan";
483
static const char* jkCreateStreamReqCalcScanPlanList     = "calcScanPlanList";
484

485
static const char* jkCreateStreamReqTriggerHasPF         = "triggerHasPF";
486
static const char* jkCreateStreamReqTriggerPrevFilter    = "triggerPrevFilter";
487

488
static const char* jkCreateStreamReqNumOfCalcSubplan     = "numOfCalcSubplan";
489
static const char* jkCreateStreamReqCalcPlan             = "calcPlan";
490
static const char* jkCreateStreamReqSubTblNameExpr       = "subTblNameExpr";
491
static const char* jkCreateStreamReqTagValueExpr         = "tagValueExpr";
492
static const char* jkCreateStreamReqForceOutCols         = "forceOutCols";
493

494
static const char* jkCreateStreamReqColCids = "colCids";
495
static const char* jkCreateStreamReqTagCids = "tagCids";
496
static const char* jkCreateStreamReqNodelayCreateSubtable = "nodelayCreateSubtable";
497

498
static int32_t scmCreateStreamReqToJsonImpl(const void* pObj, void* pJson) {
1,703,674✔
499
  const SCMCreateStreamReq* pReq = (const SCMCreateStreamReq*)pObj;
1,703,674✔
500
  if (NULL != pReq->name) {
1,703,674✔
501
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,703,674✔
502
      pJson, jkCreateStreamReqName, pReq->name));
503
  }
504
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
505
    pJson, jkCreateStreamReqStreamId, pReq->streamId));
506
  if (NULL != pReq->sql) {
1,703,674✔
507
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,703,674✔
508
      pJson, jkCreateStreamReqSql, pReq->sql));
509
  }
510
  if (NULL != pReq->streamDB) {
1,703,674✔
511
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,703,674✔
512
      pJson, jkCreateStreamReqStreamDB, pReq->streamDB));
513
  }
514
  if (NULL != pReq->triggerDB) {
1,703,674✔
515
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,677,776✔
516
      pJson, jkCreateStreamReqTriggerDB, pReq->triggerDB));
517
  }
518
  if (NULL != pReq->outDB) {
1,703,674✔
519
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,683,160✔
520
      pJson, jkCreateStreamReqOutDB, pReq->outDB));
521
  }
522
  TAOS_CHECK_RETURN(tjsonAddArray(
1,703,674✔
523
    pJson, jkCreateStreamReqCalcDB, stringToJson,
524
    pReq->calcDB ? TARRAY_GET_ELEM(pReq->calcDB, 0) : NULL,
525
    pReq->calcDB ? pReq->calcDB->elemSize : 0,
526
    pReq->calcDB ? pReq->calcDB->size : 0));
527
  if (NULL != pReq->triggerTblName) {
1,703,674✔
528
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,677,776✔
529
      pJson, jkCreateStreamReqTriggerTblName, pReq->triggerTblName));
530
  }
531
  if (NULL != pReq->outTblName) {
1,703,674✔
532
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,683,160✔
533
      pJson, jkCreateStreamReqOutTblName, pReq->outTblName));
534
  }
535
  // trigger contol part
536
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
537
    pJson, jkCreateStreamReqIgExists, pReq->igExists));
538
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
539
    pJson, jkCreateStreamReqTriggerType, pReq->triggerType));
540
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
541
    pJson, jkCreateStreamReqIgDisorder, pReq->igDisorder));
542
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
543
    pJson, jkCreateStreamReqDeleteReCalc, pReq->deleteReCalc));
544
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
545
    pJson, jkCreateStreamReqDeleteOutTbl, pReq->deleteOutTbl));
546
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
547
    pJson, jkCreateStreamReqFillHistory, pReq->fillHistory));
548
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
549
    pJson, jkCreateStreamReqFillHistoryFirst, pReq->fillHistoryFirst));
550
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
551
    pJson, jkCreateStreamReqCalcNotifyOnly, pReq->calcNotifyOnly));
552
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
553
    pJson, jkCreateStreamReqLowLatencyCalc, pReq->lowLatencyCalc));
554
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
555
    pJson, jkCreateStreamReqIgNoDataTrigger, pReq->igNoDataTrigger));
556
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
557
    pJson, jkCreateStreamReqMultiGroupCalc, pReq->enableMultiGroupCalc));
558

559
  // notify part
560
  TAOS_CHECK_RETURN(tjsonAddArray(
1,703,674✔
561
    pJson, jkCreateStreamReqPNotifyAddrUrls, stringToJson,
562
    pReq->pNotifyAddrUrls ? TARRAY_GET_ELEM(pReq->pNotifyAddrUrls, 0) : NULL,
563
    pReq->pNotifyAddrUrls ? pReq->pNotifyAddrUrls->elemSize : 0,
564
    pReq->pNotifyAddrUrls ? pReq->pNotifyAddrUrls->size : 0));
565
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
566
    pJson, jkCreateStreamReqNotifyEventTypes, pReq->notifyEventTypes));
567
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
568
    pJson, jkCreateStreamReqAddOptions, pReq->addOptions));
569
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
570
    pJson, jkCreateStreamReqNotifyHistory, pReq->notifyHistory));
571

572
  // out table part
573
  // trigger cols and partition cols
574
  if (NULL != pReq->triggerFilterCols) {
1,703,674✔
575
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
166,322✔
576
      pJson, jkCreateStreamReqTriggerFilterCols,
577
      (const char*)pReq->triggerFilterCols));
578
  }
579
  if (NULL != pReq->triggerCols) {
1,703,674✔
580
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,619,258✔
581
      pJson, jkCreateStreamReqTriggerCols, (const char*)pReq->triggerCols));
582
  }
583
  if (NULL != pReq->partitionCols) {
1,703,674✔
584
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
638,816✔
585
      pJson, jkCreateStreamReqPartitionCols, (const char*)pReq->partitionCols));
586
  }
587

588
  // out cols
589
  TAOS_CHECK_RETURN(tjsonAddArray(
1,703,674✔
590
    pJson, jkCreateStreamReqOutCols, sfieldWithOptionsToJson,
591
    pReq->outCols ? TARRAY_GET_ELEM(pReq->outCols, 0) : NULL,
592
    pReq->outCols ? pReq->outCols->elemSize : 0,
593
    pReq->outCols ? pReq->outCols->size : 0));
594
  // out tags
595
  TAOS_CHECK_RETURN(tjsonAddArray(
1,703,674✔
596
    pJson, jkCreateStreamReqOutTags, stagFieldWithOptionsToJson,
597
    pReq->outTags ? TARRAY_GET_ELEM(pReq->outTags, 0) : NULL,
598
    pReq->outTags ? pReq->outTags->elemSize : 0,
599
    pReq->outTags ? pReq->outTags->size : 0));
600
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
601
    pJson, jkCreateStreamReqMaxDelay, pReq->maxDelay));
602
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
603
    pJson, jkCreateStreamReqFillHistoryStartTime, pReq->fillHistoryStartTime));
604
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
605
    pJson, jkCreateStreamReqWatermark, pReq->watermark));
606
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
607
    pJson, jkCreateStreamReqExpiredTime, pReq->expiredTime));
608
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
609
    pJson, jkCreateStreamReqIdleTimeoutMs, pReq->idleTimeoutMs));
610
  // trigger
611
  switch (pReq->triggerType) {
1,703,674✔
612
    case WINDOW_TYPE_SESSION:
85,252✔
613
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
85,252✔
614
        sessionTriggerToJson, &pReq->trigger));
615
      break;
85,252✔
616

617
    case WINDOW_TYPE_STATE:
556,866✔
618
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
556,866✔
619
        stateTriggerToJson, &pReq->trigger));
620
      break;
556,866✔
621

622
    case WINDOW_TYPE_INTERVAL:
598,938✔
623
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
598,938✔
624
        slidingTriggerToJson, &pReq->trigger));
625
      break;
598,938✔
626

627
    case WINDOW_TYPE_EVENT:
177,744✔
628
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
177,744✔
629
        eventTriggerToJson, &pReq->trigger));
630
      break;
177,744✔
631

632
    case WINDOW_TYPE_COUNT:
200,458✔
633
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
200,458✔
634
        countTriggerToJson, &pReq->trigger));
635
      break;
200,458✔
636

637
    case WINDOW_TYPE_PERIOD:
84,416✔
638
      TAOS_CHECK_RETURN(tjsonAddObject(pJson, jkCreateStreamReqTrigger,
84,416✔
639
        periodTriggerToJson, &pReq->trigger));
640
      break;
84,416✔
641

UNCOV
642
  default:
×
UNCOV
643
    return TSDB_CODE_STREAM_INVALID_TRIGGER;
×
644
  }
645

646
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
647
    pJson, jkCreateStreamReqTriggerTblType, pReq->triggerTblType));
648
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
649
    pJson, jkCreateStreamReqTriggerTblUid, pReq->triggerTblUid));
650
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
651
    pJson, jkCreateStreamReqTriggerTblSuid, pReq->triggerTblSuid));
652
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
653
    pJson, jkCreateStreamReqTriggerPrec, pReq->triggerPrec));
654
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
655
    pJson, jkCreateStreamReqVtableCalc, pReq->vtableCalc));
656
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
657
    pJson, jkCreateStreamReqOutTblType, pReq->outTblType));
658
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
659
    pJson, jkCreateStreamReqOutStbExists, pReq->outStbExists));
660
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
661
    pJson, jkCreateStreamReqOutStbUid, pReq->outStbUid));
662
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
663
    pJson, jkCreateStreamReqOutStbSversion, pReq->outStbSversion));
664
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
665
    pJson, jkCreateStreamReqEventTypes, pReq->eventTypes));
666
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
667
    pJson, jkCreateStreamReqFlags, pReq->flags));
668
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
669
    pJson, jkCreateStreamReqTsmaId, pReq->tsmaId));
670
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
671
    pJson, jkCreateStreamReqPlaceHolderBitmap, pReq->placeHolderBitmap));
672
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
673
    pJson, jkCreateStreamReqCalcTsSlotId, pReq->calcTsSlotId));
674
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
675
    pJson, jkCreateStreamReqTriTsSlotId, pReq->triTsSlotId));
676
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
677
    pJson, jkCreateStreamReqCalcPkSlotId, pReq->calcPkSlotId));
678
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
679
    pJson, jkCreateStreamReqTriPkSlotId, pReq->triPkSlotId));
680
  
681
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
682
    pJson, jkCreateStreamReqTriggerTblVgId, pReq->triggerTblVgId));
683
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
684
    pJson, jkCreateStreamReqOutTblVgId, pReq->outTblVgId));
685

686
  if (NULL != pReq->triggerScanPlan) {
1,703,674✔
687
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,677,776✔
688
      pJson, jkCreateStreamReqTriggerScanPlan, (const char*)pReq->triggerScanPlan));
689
  }
690
  TAOS_CHECK_RETURN(tjsonAddArray(
1,703,674✔
691
    pJson, jkCreateStreamReqCalcScanPlanList, calcScanPlanToJson,
692
    pReq->calcScanPlanList ? TARRAY_GET_ELEM(pReq->calcScanPlanList, 0) : NULL,
693
    pReq->calcScanPlanList ? pReq->calcScanPlanList->elemSize : 0,
694
    pReq->calcScanPlanList ? pReq->calcScanPlanList->size : 0));
695

696
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
697
    pJson, jkCreateStreamReqTriggerHasPF, pReq->triggerHasPF));
698
  if (NULL != pReq->triggerPrevFilter) {
1,703,674✔
699
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
61,878✔
700
      pJson, jkCreateStreamReqTriggerPrevFilter,
701
      (const char*)pReq->triggerPrevFilter));
702
  }
703

704
  TAOS_CHECK_RETURN(tjsonAddIntegerToObject(
1,703,674✔
705
    pJson, jkCreateStreamReqNumOfCalcSubplan, pReq->numOfCalcSubplan));
706
  if (NULL != pReq->calcPlan) {
1,703,674✔
707
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
1,683,160✔
708
      pJson, jkCreateStreamReqCalcPlan, (const char*)pReq->calcPlan));
709
  }
710
  if (NULL != pReq->subTblNameExpr) {
1,703,674✔
711
    TAOS_CHECK_RETURN(tjsonAddStringToObject(pJson,
638,816✔
712
      jkCreateStreamReqSubTblNameExpr, (const char*)pReq->subTblNameExpr));
713
  }
714
  if (NULL != pReq->tagValueExpr) {
1,703,674✔
715
    TAOS_CHECK_RETURN(tjsonAddStringToObject(
638,816✔
716
      pJson, jkCreateStreamReqTagValueExpr, (const char*)pReq->tagValueExpr));
717
  }
718
  TAOS_CHECK_RETURN(tjsonAddArray(
1,703,674✔
719
    pJson, jkCreateStreamReqForceOutCols, sStreamOutColToJson,
720
    pReq->forceOutCols ? TARRAY_GET_ELEM(pReq->forceOutCols, 0) : NULL,
721
    pReq->forceOutCols ? pReq->forceOutCols->elemSize : 0,
722
    pReq->forceOutCols ? pReq->forceOutCols->size : 0));
723
  TAOS_CHECK_RETURN(tjsonAddArray(
1,703,674✔
724
      pJson, jkCreateStreamReqColCids, int16ToJson,
725
      pReq->colCids ? TARRAY_GET_ELEM(pReq->colCids, 0) : NULL,
726
      pReq->colCids ? pReq->colCids->elemSize : 0,
727
      pReq->colCids ? pReq->colCids->size : 0));
728
  TAOS_CHECK_RETURN(tjsonAddArray(
1,703,674✔
729
      pJson, jkCreateStreamReqTagCids, int16ToJson,
730
      pReq->tagCids ? TARRAY_GET_ELEM(pReq->tagCids, 0) : NULL,
731
      pReq->tagCids ? pReq->tagCids->elemSize : 0,
732
      pReq->tagCids ? pReq->tagCids->size : 0));
733
  TAOS_CHECK_RETURN(
1,703,674✔
734
      tjsonAddIntegerToObject(pJson, jkCreateStreamReqNodelayCreateSubtable, pReq->nodelayCreateSubtable));
735

736
  return TSDB_CODE_SUCCESS;
1,703,674✔
737
}
738

739
int32_t scmCreateStreamReqToJson(
1,703,674✔
740
  const SCMCreateStreamReq* pReq, bool format, char** ppStr, int32_t* pStrLen) {
741
  int32_t code = TSDB_CODE_SUCCESS;
1,703,674✔
742
  int32_t lino = 0;
1,703,674✔
743
  int64_t streamId = pReq ? pReq->streamId : -1;
1,703,674✔
744
  TSDB_CHECK_NULL(pReq, code, lino, _end, TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
1,703,674✔
745
  TSDB_CHECK_NULL(ppStr, code, lino, _end, TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
1,703,674✔
746
  TSDB_CHECK_NULL(
1,703,674✔
747
    pStrLen, code, lino, _end, TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
748

749
  SJson* pJson = tjsonCreateObject();
1,703,674✔
750
  TSDB_CHECK_NULL(pJson, code, lino, _end, terrno);
1,703,674✔
751
  TSDB_CHECK_CODE(scmCreateStreamReqToJsonImpl(pReq, pJson), lino, _end);
1,703,674✔
752

753
  if (TSDB_CODE_SUCCESS == code) {
1,703,674✔
754
    *ppStr = format ? tjsonToString(pJson) : tjsonToUnformattedString(pJson);
1,703,674✔
755
    if (*ppStr == NULL) {
1,703,674✔
UNCOV
756
      code = terrno;
×
757
    } else {
758
      *pStrLen = strlen(*ppStr);
1,703,674✔
759
    }
760
  }
761

UNCOV
762
_end:
×
763
  if (TSDB_CODE_SUCCESS != code) {
1,703,674✔
UNCOV
764
    uError(
×
765
      "failed to convert SCMCreateStreamReq to json, lino: %d, since %s",
766
      lino, tstrerror(code));
767
  }
768
  tjsonDelete(pJson);
1,703,674✔
769
  return code;
1,703,674✔
770
}
771

772
int32_t jsonToSCMCreateStreamReq(const void* pJson, void* pObj) {
528,177✔
773
  SCMCreateStreamReq* pReq = (SCMCreateStreamReq*)pObj;
528,177✔
774
  pReq->calcTsSlotId = -1;
528,177✔
775
  pReq->triTsSlotId = -1;
528,177✔
776
  pReq->calcPkSlotId = -1;
528,177✔
777
  pReq->triPkSlotId = -1;
528,177✔
778
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
779
    pJson, jkCreateStreamReqName, (char**)&pReq->name));
780
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
781
    pJson, jkCreateStreamReqStreamId, &pReq->streamId));
782
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
783
    pJson, jkCreateStreamReqSql, (char**)&pReq->sql));
784

785
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
786
    pJson, jkCreateStreamReqStreamDB, (char**)&pReq->streamDB));
787
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
788
    pJson, jkCreateStreamReqTriggerDB, (char**)&pReq->triggerDB));
789
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
790
    pJson, jkCreateStreamReqOutDB, (char**)&pReq->outDB));
791
  TAOS_CHECK_RETURN(tjsonToTArray(
528,177✔
792
    pJson, jkCreateStreamReqCalcDB, jsonToString,
793
    &pReq->calcDB, POINTER_BYTES));
794
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
795
    pJson, jkCreateStreamReqTriggerTblName, (char**)&pReq->triggerTblName));
796
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
797
    pJson, jkCreateStreamReqOutTblName, (char**)&pReq->outTblName));
798

799
  // trigger control part
800
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
801
    pJson, jkCreateStreamReqIgExists, &pReq->igExists));
802
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
803
    pJson, jkCreateStreamReqTriggerType, &pReq->triggerType));
804
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
805
    pJson, jkCreateStreamReqIgDisorder, &pReq->igDisorder));
806
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
807
    pJson, jkCreateStreamReqDeleteReCalc, &pReq->deleteReCalc));
808
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
809
    pJson, jkCreateStreamReqDeleteOutTbl, &pReq->deleteOutTbl));
810
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
811
    pJson, jkCreateStreamReqFillHistory, &pReq->fillHistory));
812
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
813
    pJson, jkCreateStreamReqFillHistoryFirst, &pReq->fillHistoryFirst));
814
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
815
    pJson, jkCreateStreamReqCalcNotifyOnly, &pReq->calcNotifyOnly));
816
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
817
    pJson, jkCreateStreamReqLowLatencyCalc, &pReq->lowLatencyCalc));
818
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
819
    pJson, jkCreateStreamReqIgNoDataTrigger, &pReq->igNoDataTrigger));
820
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
821
    pJson, jkCreateStreamReqMultiGroupCalc, &pReq->enableMultiGroupCalc));
822

823
  // notify part
824
  TAOS_CHECK_RETURN(tjsonToTArray(
528,177✔
825
    pJson, jkCreateStreamReqPNotifyAddrUrls, jsonToString,
826
    &pReq->pNotifyAddrUrls, POINTER_BYTES));
827
  TAOS_CHECK_RETURN(tjsonGetIntValue(
528,177✔
828
    pJson, jkCreateStreamReqNotifyEventTypes, &pReq->notifyEventTypes));
829
  TAOS_CHECK_RETURN(tjsonGetIntValue(
528,177✔
830
    pJson, jkCreateStreamReqAddOptions, &pReq->addOptions));
831
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
832
    pJson, jkCreateStreamReqNotifyHistory, &pReq->notifyHistory));
833

834
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
835
    pJson, jkCreateStreamReqTriggerFilterCols,
836
    (char**)&pReq->triggerFilterCols));
837
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
838
    pJson, jkCreateStreamReqTriggerCols, (char**)&pReq->triggerCols));
839
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
840
    pJson, jkCreateStreamReqPartitionCols, (char**)&pReq->partitionCols));
841
  // out cols
842
  TAOS_CHECK_RETURN(tjsonToTArray(
528,177✔
843
    pJson, jkCreateStreamReqOutCols, jsonToSFieldWithOptions,
844
    &pReq->outCols, sizeof(SFieldWithOptions)));
845
  // out tags
846
  TAOS_CHECK_RETURN(tjsonToTArray(
528,177✔
847
    pJson, jkCreateStreamReqOutTags, jsonToSTagFieldWithOptions,
848
    &pReq->outTags, sizeof(SFieldWithOptions)));
849
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
850
    pJson, jkCreateStreamReqMaxDelay, &pReq->maxDelay));
851
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
852
    pJson, jkCreateStreamReqFillHistoryStartTime, &pReq->fillHistoryStartTime));
853
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
854
    pJson, jkCreateStreamReqWatermark, &pReq->watermark));
855
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
856
    pJson, jkCreateStreamReqExpiredTime, &pReq->expiredTime));
857
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
858
    pJson, jkCreateStreamReqIdleTimeoutMs, &pReq->idleTimeoutMs));
859
  // trigger
860
  switch (pReq->triggerType) {
528,177✔
861
    case WINDOW_TYPE_SESSION:
19,045✔
862
      TAOS_CHECK_RETURN(tjsonToObject(
19,045✔
863
        pJson, jkCreateStreamReqTrigger, jsonToSessionTrigger, &pReq->trigger));
864
      break;
19,045✔
865
    
866
    case WINDOW_TYPE_STATE:
189,008✔
867
      TAOS_CHECK_RETURN(tjsonToObject(
189,008✔
868
        pJson, jkCreateStreamReqTrigger, jsonToStateTrigger, &pReq->trigger));
869
      break;
189,008✔
870

871
    case WINDOW_TYPE_INTERVAL:
191,884✔
872
      TAOS_CHECK_RETURN(tjsonToObject(
191,884✔
873
        pJson, jkCreateStreamReqTrigger, jsonToSlidingTrigger, &pReq->trigger));
874
      break;
191,884✔
875
    
876
    case WINDOW_TYPE_EVENT:
49,704✔
877
      TAOS_CHECK_RETURN(tjsonToObject(
49,704✔
878
        pJson, jkCreateStreamReqTrigger, jsonToEventTrigger, &pReq->trigger));
879
      break;
49,704✔
880
    
881
    case WINDOW_TYPE_COUNT:
54,260✔
882
      TAOS_CHECK_RETURN(tjsonToObject(
54,260✔
883
        pJson, jkCreateStreamReqTrigger, jsonToCountTrigger, &pReq->trigger));
884
      break;
54,260✔
885
    
886
    case WINDOW_TYPE_PERIOD:
24,276✔
887
      TAOS_CHECK_RETURN(tjsonToObject(
24,276✔
888
        pJson, jkCreateStreamReqTrigger, jsonToPeriodTrigger, &pReq->trigger));
889
      break;
24,276✔
890
    
UNCOV
891
    default:
×
UNCOV
892
      return TSDB_CODE_STREAM_INVALID_TRIGGER;
×
893
  }
894

895
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
896
    pJson, jkCreateStreamReqTriggerTblType, &pReq->triggerTblType));
897
  TAOS_CHECK_RETURN(tjsonGetUBigIntValue(
528,177✔
898
    pJson, jkCreateStreamReqTriggerTblUid, &pReq->triggerTblUid));
899
  TAOS_CHECK_RETURN(tjsonGetUBigIntValue(
528,177✔
900
    pJson, jkCreateStreamReqTriggerTblSuid, &pReq->triggerTblSuid));
901
  TAOS_CHECK_RETURN(tjsonGetUTinyIntValue(
528,177✔
902
    pJson, jkCreateStreamReqTriggerPrec, &pReq->triggerPrec));
903
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
904
    pJson, jkCreateStreamReqVtableCalc, &pReq->vtableCalc));
905
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
906
    pJson, jkCreateStreamReqOutTblType, &pReq->outTblType));
907
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
908
    pJson, jkCreateStreamReqOutStbExists, &pReq->outStbExists));
909
  TAOS_CHECK_RETURN(tjsonGetUBigIntValue(
528,177✔
910
    pJson, jkCreateStreamReqOutStbUid, &pReq->outStbUid));
911
  TAOS_CHECK_RETURN(tjsonGetIntValue(
528,177✔
912
    pJson, jkCreateStreamReqOutStbSversion, &pReq->outStbSversion));
913
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
914
    pJson, jkCreateStreamReqEventTypes, &pReq->eventTypes));
915
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
916
    pJson, jkCreateStreamReqFlags, &pReq->flags));
917
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
918
    pJson, jkCreateStreamReqTsmaId, &pReq->tsmaId));
919
  TAOS_CHECK_RETURN(tjsonGetBigIntValue(
528,177✔
920
    pJson, jkCreateStreamReqPlaceHolderBitmap, &pReq->placeHolderBitmap));
921
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(
528,177✔
922
    pJson, jkCreateStreamReqCalcTsSlotId, &pReq->calcTsSlotId));
923
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(
528,177✔
924
    pJson, jkCreateStreamReqTriTsSlotId, &pReq->triTsSlotId));
925
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(
528,177✔
926
    pJson, jkCreateStreamReqCalcPkSlotId, &pReq->calcPkSlotId));
927
  TAOS_CHECK_RETURN(tjsonGetSmallIntValue(
528,177✔
928
    pJson, jkCreateStreamReqTriPkSlotId, &pReq->triPkSlotId));
929

930
  TAOS_CHECK_RETURN(tjsonGetIntValue(
528,177✔
931
    pJson, jkCreateStreamReqTriggerTblVgId, &pReq->triggerTblVgId));
932
  TAOS_CHECK_RETURN(tjsonGetIntValue(
528,177✔
933
    pJson, jkCreateStreamReqOutTblVgId, &pReq->outTblVgId));
934

935
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
936
    pJson, jkCreateStreamReqTriggerScanPlan, (char**)&pReq->triggerScanPlan));
937
  TAOS_CHECK_RETURN(tjsonToTArray(
528,177✔
938
    pJson, jkCreateStreamReqCalcScanPlanList, jsonToCalcScanPlan,
939
    &pReq->calcScanPlanList, sizeof(SStreamCalcScan)));
940

941
  TAOS_CHECK_RETURN(tjsonGetTinyIntValue(
528,177✔
942
    pJson, jkCreateStreamReqTriggerHasPF, &pReq->triggerHasPF));
943
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
944
    pJson, jkCreateStreamReqTriggerPrevFilter,
945
    (char**)&pReq->triggerPrevFilter));
946
  TAOS_CHECK_RETURN(tjsonGetIntValue(
528,177✔
947
    pJson, jkCreateStreamReqNumOfCalcSubplan, &pReq->numOfCalcSubplan));
948
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
949
    pJson, jkCreateStreamReqCalcPlan, (char**)&pReq->calcPlan));
950
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
951
    pJson, jkCreateStreamReqSubTblNameExpr, (char**)&pReq->subTblNameExpr));
952
  TAOS_CHECK_RETURN(tjsonDupStringValue(
528,177✔
953
    pJson, jkCreateStreamReqTagValueExpr, (char**)&pReq->tagValueExpr));
954
  TAOS_CHECK_RETURN(tjsonToTArray(
528,177✔
955
    pJson, jkCreateStreamReqForceOutCols,
956
    jsonToSStreamOutCol, &pReq->forceOutCols, sizeof(SStreamOutCol)));
957
  TAOS_CHECK_RETURN(tjsonToTArray(pJson, jkCreateStreamReqColCids, jsonToInt16, &pReq->colCids, sizeof(int16_t)));
528,177✔
958
  TAOS_CHECK_RETURN(tjsonToTArray(pJson, jkCreateStreamReqTagCids, jsonToInt16, &pReq->tagCids, sizeof(int16_t)));
528,177✔
959
  (void)tjsonGetTinyIntValue(pJson, jkCreateStreamReqNodelayCreateSubtable, &pReq->nodelayCreateSubtable);
528,177✔
960

961
  return TSDB_CODE_SUCCESS;
528,177✔
962
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc