• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4685

21 Aug 2025 12:03PM UTC coverage: 60.092% (+0.06%) from 60.031%
#4685

push

travis-ci

web-flow
jdbc release 3.7.3 (#32682)

* chore(jdbc): update jdbc version

* docs(jdbc): release 3.7.3

138041 of 292193 branches covered (47.24%)

Branch coverage included in aggregate %.

208485 of 284466 relevant lines covered (73.29%)

24747801.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.18
/source/libs/executor/src/forecastoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tanalytics.h"
22
#include "taoserror.h"
23
#include "tcommon.h"
24
#include "tdatablock.h"
25
#include "tmsg.h"
26

27
#ifdef USE_ANALYTICS
28

29
#define ALGO_OPT_RETCONF_NAME      "return_conf"
30
#define ALGO_OPT_FORECASTROWS_NAME "rows"
31
#define ALGO_OPT_CONF_NAME         "conf"
32
#define ALGO_OPT_START_NAME        "start"
33
#define ALGO_OPT_EVERY_NAME        "every"
34

35
typedef struct {
36
  char*           pName;
37
  SColumnInfoData data;
38
  SColumn         col;
39
  int32_t         numOfRows;
40
} SColFutureData;
41

42
typedef struct {
43
  char         algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
44
  char         algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
45
  char*        pOptions;
46
  int64_t      maxTs;
47
  int64_t      minTs;
48
  int64_t      numOfRows;
49
  uint64_t     groupId;
50
  int64_t      forecastRows;
51
  int64_t      cachedRows;
52
  int32_t      numOfBlocks;
53
  int64_t      timeout;
54
  int16_t      resTsSlot;
55
  int16_t      resValSlot;
56
  int16_t      resLowSlot;
57
  int16_t      resHighSlot;
58
  int16_t      inputTsSlot;
59
  int16_t      targetValSlot;
60
  int8_t       targetValType;
61
  int8_t       inputPrecision;
62
  int8_t       wncheck;
63
  double       conf;
64
  int64_t      startTs;
65
  int64_t      every;
66
  int8_t       setStart;
67
  int8_t       setEvery;
68
  SArray*      pCovariateSlotList;   // covariate slot list
69
  SArray*      pDynamicRealList;     // dynamic real data list
70
  int32_t      numOfInputCols;
71
  SAnalyticBuf analyBuf;
72
} SForecastSupp;
73

74
typedef struct SForecastOperatorInfo {
75
  SSDataBlock*  pRes;
76
  SExprSupp     scalarSup;  // scalar calculation
77
  SForecastSupp forecastSupp;
78
} SForecastOperatorInfo;
79

80
static void destroyForecastInfo(void* param);
81
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id);
82

83
static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int32_t newRowsNum) {
84
  if (pBlock->info.rows < pBlock->info.capacity) {
27!
85
    return TSDB_CODE_SUCCESS;
27✔
86
  }
87

88
  int32_t code = blockDataEnsureCapacity(pBlock, newRowsNum);
×
89
  if (code != TSDB_CODE_SUCCESS) {
×
90
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
91
    return code;
×
92
  }
93

94
  return TSDB_CODE_SUCCESS;
×
95
}
96

97
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
27✔
98
  int32_t       code = TSDB_CODE_SUCCESS;
27✔
99
  int32_t       lino = 0;
27✔
100
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
101

102
  if (pSupp->cachedRows > ANALY_FORECAST_MAX_ROWS) {
27!
103
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
104
    qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
×
105
           tstrerror(code), ANALY_FORECAST_MAX_ROWS);
106
    return code;
×
107
  }
108

109
  pSupp->numOfBlocks++;
27✔
110
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
27!
111

112
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
446✔
113
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetValSlot);
419✔
114
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputTsSlot);
419✔
115
    if (pTsCol == NULL || pValCol == NULL) break;
419!
116

117
    int32_t index = 0;
419✔
118
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
419✔
119
    char*   val = colDataGetData(pValCol, j);
419!
120
    int16_t valType = pValCol->info.type;
419✔
121

122
    pSupp->minTs = MIN(pSupp->minTs, ts);
419✔
123
    pSupp->maxTs = MAX(pSupp->maxTs, ts);
419✔
124
    pSupp->numOfRows++;
419✔
125

126
    // write the primary time stamp column data
127
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
419✔
128
    if (TSDB_CODE_SUCCESS != code) {
419!
129
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
130
      return code;
×
131
    }
132

133
    // write the main column for forecasting
134
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
419✔
135
    if (TSDB_CODE_SUCCESS != code) {
419!
136
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
137
      return code;
×
138
    }
139

140
    // let's handle the dynamic_real_columns
141
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pDynamicRealList); ++i) {
443✔
142
      SColFutureData*  pCol = taosArrayGet(pSupp->pDynamicRealList, i);
24✔
143
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->col.slotId);
24✔
144

145
      char* pVal = colDataGetData(pColData, j);
24!
146
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->col.type, pVal);
24✔
147
      if (TSDB_CODE_SUCCESS != code) {
24!
148
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
149
        return code;
×
150
      }
151
    }
152

153
    // now handle the past_dynamic_real columns and
154
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
479✔
155
      SColumn*         pCol = taosArrayGet(pSupp->pCovariateSlotList, i);
60✔
156
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
60✔
157

158
      char* pVal = colDataGetData(pColData, j);
60!
159
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->type, pVal);
60✔
160
      if (TSDB_CODE_SUCCESS != code) {
60!
161
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
162
        return code;
×
163
      }
164
    }
165
  }
166

167
  return 0;
27✔
168
}
169

170
static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
27✔
171
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
172
  int32_t       code = 0;
27✔
173

174
  // add the future dynamic real column data
175

176
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
88✔
177
    // the primary timestamp and the forecast column
178
    // add the future dynamic real column data
179
    if ((i >= 2) && ((i - 2) < taosArrayGetSize(pSupp->pDynamicRealList))) {
61✔
180
      SColFutureData* pCol = taosArrayGet(pSupp->pDynamicRealList, i - 2);
2✔
181
      int16_t         valType = pCol->col.type;
2✔
182
      for (int32_t j = 0; j < pCol->numOfRows; ++j) {
105✔
183
        char* pVal = colDataGetData(&pCol->data, j);
103!
184
        code = taosAnalyBufWriteColData(pBuf, i, valType, pVal);
103✔
185
        if (code != 0) {
103!
186
          return code;
×
187
        }
188
      }
189
    }
190

191
    code = taosAnalyBufWriteColEnd(pBuf, i);
61✔
192
    if (code != 0) return code;
61!
193
  }
194

195
  code = taosAnalyBufWriteDataEnd(pBuf);
27✔
196
  if (code != 0) return code;
27!
197

198
  code = taosAnalyBufWriteOptStr(pBuf, "option", pSupp->pOptions);
27✔
199
  if (code != 0) return code;
27!
200

201
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pSupp->algoName);
27✔
202
  if (code != 0) return code;
27!
203

204
  const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
27✔
205
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
27!
206
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
27!
207
  code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
27✔
208
  if (code != 0) return code;
27!
209

210
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
27✔
211
  if (code != 0) return code;
27!
212

213
  bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1);
27!
214
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_RETCONF_NAME, !noConf);
27✔
215
  if (code != 0) return code;
27!
216

217
  if (pSupp->cachedRows < ANALY_FORECAST_MIN_ROWS) {
27!
218
    qError("%s history rows for forecasting not enough, min required:%d, current:%" PRId64, id, ANALY_FORECAST_MIN_ROWS,
×
219
           pSupp->forecastRows);
220
    return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
221
  }
222

223
  code = taosAnalyBufWriteOptInt(pBuf, "forecast_rows", pSupp->forecastRows);
27✔
224
  if (code != 0) return code;
27!
225

226
  code = taosAnalyBufWriteOptFloat(pBuf, "conf", pSupp->conf);
27✔
227
  if (code != 0) return code;
27!
228

229
  int32_t len = strlen(pSupp->pOptions);
27✔
230
  int64_t every = (pSupp->setEvery != 0) ? pSupp->every : ((pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows - 1));
27✔
231
  code = taosAnalyBufWriteOptInt(pBuf, "every", every);
27✔
232
  if (code != 0) return code;
27!
233

234
  int64_t start = (pSupp->setStart != 0) ? pSupp->startTs : pSupp->maxTs + every;
27✔
235
  code = taosAnalyBufWriteOptInt(pBuf, "start", start);
27✔
236
  if (code != 0) return code;
27!
237

238
  if (taosArrayGetSize(pSupp->pCovariateSlotList) + taosArrayGetSize(pSupp->pDynamicRealList) > 0) {
27✔
239
    code = taosAnalyBufWriteOptStr(pBuf, "type", "covariate");
4✔
240
    if (code != 0) return code;
4!
241
  }
242

243
  code = taosAnalyBufClose(pBuf);
27✔
244
  return code;
27✔
245
}
246

247
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
27✔
248
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
249
  int32_t       resCurRow = pBlock->info.rows;
27✔
250
  int8_t        tmpI8 = 0;
27✔
251
  int16_t       tmpI16 = 0;
27✔
252
  int32_t       tmpI32 = 0;
27✔
253
  int64_t       tmpI64 = 0;
27✔
254
  double        tmpDouble = 0;
27✔
255
  int32_t       code = 0;
27✔
256

257
  SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
27✔
258
  if (NULL == pResValCol) {
27!
259
    return terrno;
×
260
  }
261

262
  SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
27✔
263
  SColumnInfoData* pResLowCol =
27✔
264
      ((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
27✔
265
  SColumnInfoData* pResHighCol =
27✔
266
      (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
27✔
267

268
  SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout);
27✔
269
  if (pJson == NULL) {
27!
270
    return terrno;
×
271
  }
272

273
  int32_t rows = 0;
27✔
274
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
27✔
275
  if (rows < 0 && code == 0) {
27!
276
    code = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
277

278
    char    pMsg[1024] = {0};
×
279
    int32_t ret = tjsonGetStringValue(pJson, "msg", pMsg);
×
280
    
281
    if (ret == 0) {
×
282
      qError("%s failed to exec forecast, msg:%s", pId, pMsg);
×
283
      void* p = strstr(pMsg, "white noise");
×
284
      if (p != NULL) {
×
285
        code = TSDB_CODE_ANA_WN_DATA;
×
286
      } else {
287
        p = strstr(pMsg, "[Errno 111] Connection refused");
×
288
        if (p != NULL) {  // the specified forecast model not loaded yet
×
289
          code = TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
290
        }
291
      }
292
    } else {
293
      qError("%s failed to extract msg from server, unknown error", pId);
×
294
    }
295

296
    tjsonDelete(pJson);
×
297
    return code;
×
298
  }
299

300
  // invalid json format
301
  if (code != 0) {
27!
302
    goto _OVER;
×
303
  }
304

305
  SJson* res = tjsonGetObjectItem(pJson, "res");
27✔
306
  if (res == NULL) goto _OVER;
27!
307
  int32_t ressize = tjsonGetArraySize(res);
27✔
308
  bool    returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1);
27!
309

310
  if ((returnConf && (ressize != 4)) || ((!returnConf) && (ressize != 2))) {
27!
311
    goto _OVER;
×
312
  }
313

314
  if (pResTsCol != NULL) {
27✔
315
    resCurRow = pBlock->info.rows;
9✔
316
    SJson* tsJsonArray = tjsonGetArrayItem(res, 0);
9✔
317
    if (tsJsonArray == NULL) goto _OVER;
9!
318
    int32_t tsSize = tjsonGetArraySize(tsJsonArray);
9✔
319
    if (tsSize != rows) goto _OVER;
9!
320
    for (int32_t i = 0; i < tsSize; ++i) {
94✔
321
      SJson* tsJson = tjsonGetArrayItem(tsJsonArray, i);
85✔
322
      tjsonGetObjectValueBigInt(tsJson, &tmpI64);
85✔
323
      colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
85✔
324
      resCurRow++;
85✔
325
    }
326
  }
327

328
  if (pResLowCol != NULL) {
27✔
329
    resCurRow = pBlock->info.rows;
9✔
330
    SJson* lowJsonArray = tjsonGetArrayItem(res, 2);
9✔
331
    if (lowJsonArray == NULL) goto _OVER;
9!
332
    int32_t lowSize = tjsonGetArraySize(lowJsonArray);
9✔
333
    if (lowSize != rows) goto _OVER;
9!
334
    for (int32_t i = 0; i < lowSize; ++i) {
94✔
335
      SJson* lowJson = tjsonGetArrayItem(lowJsonArray, i);
85✔
336
      tjsonGetObjectValueDouble(lowJson, &tmpDouble);
85✔
337
      colDataSetDouble(pResLowCol, resCurRow, &tmpDouble);
85✔
338
      resCurRow++;
85✔
339
    }
340
  }
341

342
  if (pResHighCol != NULL) {
27✔
343
    resCurRow = pBlock->info.rows;
9✔
344
    SJson* highJsonArray = tjsonGetArrayItem(res, 3);
9✔
345
    if (highJsonArray == NULL) goto _OVER;
9!
346
    int32_t highSize = tjsonGetArraySize(highJsonArray);
9✔
347
    if (highSize != rows) goto _OVER;
9!
348
    for (int32_t i = 0; i < highSize; ++i) {
94✔
349
      SJson* highJson = tjsonGetArrayItem(highJsonArray, i);
85✔
350
      tjsonGetObjectValueDouble(highJson, &tmpDouble);
85✔
351
      colDataSetDouble(pResHighCol, resCurRow, &tmpDouble);
85✔
352
      resCurRow++;
85✔
353
    }
354
  }
355

356
  resCurRow = pBlock->info.rows;
27✔
357
  SJson* valJsonArray = tjsonGetArrayItem(res, 1);
27✔
358
  if (valJsonArray == NULL) goto _OVER;
27!
359
  int32_t valSize = tjsonGetArraySize(valJsonArray);
27✔
360
  if (valSize != rows) goto _OVER;
27!
361
  for (int32_t i = 0; i < valSize; ++i) {
321✔
362
    SJson* valJson = tjsonGetArrayItem(valJsonArray, i);
294✔
363
    tjsonGetObjectValueDouble(valJson, &tmpDouble);
294✔
364

365
    colDataSetDouble(pResValCol, resCurRow, &tmpDouble);
294✔
366
    resCurRow++;
294✔
367
  }
368

369
  pBlock->info.rows += rows;
27✔
370

371
  if (pJson != NULL) tjsonDelete(pJson);
27!
372
  return 0;
27✔
373

374
_OVER:
×
375
  tjsonDelete(pJson);
×
376
  if (code == 0) {
×
377
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
378
  }
379

380
  qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code));
×
381
  return code;
×
382
}
383

384
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) {
27✔
385
  int32_t       code = TSDB_CODE_SUCCESS;
27✔
386
  int32_t       lino = 0;
27✔
387
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
388

389
  code = forecastCloseBuf(pSupp, pId);
27✔
390
  QUERY_CHECK_CODE(code, lino, _end);
27!
391

392
  code = forecastEnsureBlockCapacity(pResBlock, 1);
27✔
393
  QUERY_CHECK_CODE(code, lino, _end);
27!
394

395
  code = forecastAnalysis(pSupp, pResBlock, pId);
27✔
396
  QUERY_CHECK_CODE(code, lino, _end);
27!
397

398
  uInfo("%s block:%d, forecast finalize", pId, pSupp->numOfBlocks);
27!
399

400
_end:
×
401
  pSupp->numOfBlocks = 0;
27✔
402
  taosAnalyBufDestroy(&pSupp->analyBuf);
27✔
403
  return code;
27✔
404
}
405

406
static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
54✔
407
  int32_t                code = TSDB_CODE_SUCCESS;
54✔
408
  int32_t                lino = 0;
54✔
409
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
54✔
410
  SForecastOperatorInfo* pInfo = pOperator->info;
54✔
411
  SSDataBlock*           pResBlock = pInfo->pRes;
54✔
412
  SForecastSupp*         pSupp = &pInfo->forecastSupp;
54✔
413
  SExprSupp*             pScalarSupp = &pInfo->scalarSup;
54✔
414
  SAnalyticBuf*          pBuf = &pSupp->analyBuf;
54✔
415
  int64_t                st = taosGetTimestampUs();
54✔
416
  int32_t                numOfBlocks = pSupp->numOfBlocks;
54✔
417
  const char*            pId = GET_TASKID(pOperator->pTaskInfo);
54✔
418

419
  blockDataCleanup(pResBlock);
54✔
420

421
  while (1) {
27✔
422
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
81✔
423
    if (pBlock == NULL) {
81✔
424
      break;
54✔
425
    }
426

427
    if (pScalarSupp->pExprInfo != NULL) {
27!
428
      code = projectApplyFunctions(pScalarSupp->pExprInfo, pBlock, pBlock, pScalarSupp->pCtx, pScalarSupp->numOfExprs,
×
429
                                   NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
×
430
      if (code != TSDB_CODE_SUCCESS) {
×
431
        T_LONG_JMP(pTaskInfo->env, code);
×
432
      }
433
    }
434

435
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
27!
436
      pSupp->groupId = pBlock->info.id.groupId;
27✔
437
      numOfBlocks++;
27✔
438
      pSupp->cachedRows += pBlock->info.rows;
27✔
439
      qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
27!
440
             pBlock->info.rows, pSupp->cachedRows);
441
      code = forecastCacheBlock(pSupp, pBlock, pId);
27✔
442
      QUERY_CHECK_CODE(code, lino, _end);
27!
443
    } else {
444
      qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
445
      code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
446
      QUERY_CHECK_CODE(code, lino, _end);
×
447
      pSupp->groupId = pBlock->info.id.groupId;
×
448
      numOfBlocks = 1;
×
449
      pSupp->cachedRows = pBlock->info.rows;
×
450
      qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
×
451
             pBlock->info.rows, pSupp->cachedRows);
452
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
453
      QUERY_CHECK_CODE(code, lino, _end);
×
454
    }
455

456
    if (pResBlock->info.rows > 0) {
27!
457
      (*ppRes) = pResBlock;
×
458
      qDebug("%s group:%" PRId64 ", return to upstream, blocks:%d", pId, pResBlock->info.id.groupId, numOfBlocks);
×
459
      return code;
×
460
    }
461
  }
462

463
  if (numOfBlocks > 0) {
54✔
464
    qDebug("%s group:%" PRId64 ", read finish, blocks:%d", pId, pSupp->groupId, numOfBlocks);
27!
465
    code = forecastAggregateBlocks(pSupp, pResBlock, pId);
27✔
466
    QUERY_CHECK_CODE(code, lino, _end);
27!
467
  }
468

469
  int64_t cost = taosGetTimestampUs() - st;
54✔
470
  qDebug("%s all groups finished, cost:%" PRId64 "us", pId, cost);
54!
471

472
_end:
×
473
  if (code != TSDB_CODE_SUCCESS) {
54!
474
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
×
475
    pTaskInfo->code = code;
×
476
    T_LONG_JMP(pTaskInfo->env, code);
×
477
  }
478

479
  (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock;
54✔
480
  return code;
54✔
481
}
482

483
static int32_t forecastParseOutput(SForecastSupp* pSupp, SExprSupp* pExprSup) {
41✔
484
  pSupp->resLowSlot = -1;
41✔
485
  pSupp->resHighSlot = -1;
41✔
486
  pSupp->resTsSlot = -1;
41✔
487
  pSupp->resValSlot = -1;
41✔
488

489
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
109✔
490
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
68✔
491
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
68✔
492
    if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST) {
68✔
493
      pSupp->resValSlot = dstSlot;
41✔
494
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_ROWTS) {
27✔
495
      pSupp->resTsSlot = dstSlot;
9✔
496
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_LOW) {
18✔
497
      pSupp->resLowSlot = dstSlot;
9✔
498
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_HIGH) {
9!
499
      pSupp->resHighSlot = dstSlot;
9✔
500
    } else {
501
    }
502
  }
503

504
  return 0;
41✔
505
}
506

507
static int32_t validInputParams(SFunctionNode* pFunc, const char* id) {
41✔
508
  int32_t code = 0;
41✔
509
  int32_t num = LIST_LENGTH(pFunc->pParameterList);
41!
510

511
  if (num <= 1) {
41!
512
    qError("%s invalid number of parameters:%d", id, num);
×
513
    code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
514
    goto _end;
×
515
  }
516

517
  for (int32_t i = 0; i < num; ++i) {
214✔
518
    SNode* p = nodesListGetNode(pFunc->pParameterList, i);
173✔
519
    if (p == NULL) {
173!
520
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
521
      qError("%s %d-th parameter in forecast function is NULL, code:%s", id, i, tstrerror(code));
×
522
      goto _end;
×
523
    }
524
  }
525

526
  if (num == 2) {  // column_name, timestamp_column_name
41!
527
    SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0);
×
528
    SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1);
×
529

530
    if (nodeType(p1) != QUERY_NODE_COLUMN || nodeType(p2) != QUERY_NODE_COLUMN) {
×
531
      qError("%s invalid column type, column 1:%d, column 2:%d", id, nodeType(p1), nodeType(p2));
×
532
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
533
      goto _end;
×
534
    }
535
  } else if (num >= 3) {
41!
536
    // column_name_#1, column_name_#2...., analytics_options, timestamp_column_name, primary_key_column if exists
537
    // column_name_#1, timestamp_column_name, primary_key_column if exists
538
    // column_name_#1, analytics_options, timestamp_column_name
539
    SNode* pTarget = nodesListGetNode(pFunc->pParameterList, 0);
41✔
540
    if (nodeType(pTarget) != QUERY_NODE_COLUMN) {
41!
541
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
542
      qError("%s first parameter is not valid column in forecast function", id);
×
543
      goto _end;
×
544
    }
545

546
    SNode* pNode = nodesListGetNode(pFunc->pParameterList, num - 1);
41✔
547
    if (nodeType(pNode) != QUERY_NODE_COLUMN) {
41!
548
      qError("%s last parameter is not valid column, actual:%d", id, nodeType(pNode));
×
549
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
550
    }
551
  }
552

553
_end:
41✔
554
  if (code) {
41!
555
    qError("%s valid the parameters failed, code:%s", id, tstrerror(code));
×
556
  }
557
  return code;
41✔
558
}
559

560
static bool existInList(SForecastSupp* pSupp, int32_t slotId) {
26✔
561
  for (int32_t j = 0; j < taosArrayGetSize(pSupp->pCovariateSlotList); ++j) {
38✔
562
    SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
12✔
563

564
    if (pCol->slotId == slotId) {
12!
565
      return true;
×
566
    }
567
  }
568

569
  return false;
26✔
570
}
571

572
static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const char* id) {
41✔
573
  int32_t code = 0;
41✔
574
  SNode*  pNode = NULL;
41✔
575
  
576
  pSupp->inputTsSlot = -1;
41✔
577
  pSupp->targetValSlot = -1;
41✔
578
  pSupp->targetValType = -1;
41✔
579
  pSupp->inputPrecision = -1;
41✔
580

581
  FOREACH(pNode, pFuncs) {
109!
582
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
68!
583
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
68✔
584
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
68✔
585

586
      if (pFunc->funcType == FUNCTION_TYPE_FORECAST) {
68✔
587
        code = validInputParams(pFunc, id);
41✔
588
        if (code) {
41!
589
          return code;
×
590
        }
591

592
        pSupp->numOfInputCols = 2;
41✔
593

594
        if (numOfParam == 2) {
41!
595
          // column, ts
596
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
597
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
598

599
          pSupp->inputTsSlot = pTsNode->slotId;
×
600
          pSupp->inputPrecision = pTsNode->node.resType.precision;
×
601
          pSupp->targetValSlot = pTarget->slotId;
×
602
          pSupp->targetValType = pTarget->node.resType.type;
×
603

604
          // let's add the holtwinters as the default forecast algorithm
605
          pSupp->pOptions = taosStrdup("algo=holtwinters");
×
606
          if (pSupp->pOptions == NULL) {
×
607
            qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
608
            return terrno;
×
609
          }
610
        } else {
611
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
41✔
612
          bool         assignTs = false;
41✔
613
          bool         assignOpt = false;
41✔
614

615
          pSupp->targetValSlot = pTarget->slotId;
41✔
616
          pSupp->targetValType = pTarget->node.resType.type;
41✔
617

618
          // set the primary ts column and option info
619
          for (int32_t i = 0; i < numOfParam; ++i) {
214✔
620
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
173✔
621
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
173✔
622
              SColumnNode* pColNode = (SColumnNode*)pNode;
133✔
623
              if (pColNode->isPrimTs && (!assignTs)) {
133!
624
                pSupp->inputTsSlot = pColNode->slotId;
41✔
625
                pSupp->inputPrecision = pColNode->node.resType.precision;
41✔
626
                assignTs = true;
41✔
627
                continue;
41✔
628
              }
629
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
40!
630
              if (!assignOpt) {
40!
631
                SValueNode* pOptNode = (SValueNode*)pNode;
40✔
632
                pSupp->pOptions = taosStrdup(pOptNode->literal);
40!
633
                assignOpt = true;
40✔
634
                continue;
40✔
635
              }
636
            }
637
          }
638

639
          if (!assignOpt) {
41✔
640
            // set the default forecast option
641
            pSupp->pOptions = taosStrdup("algo=holtwinters");
1!
642
            if (pSupp->pOptions == NULL) {
1!
643
              qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
644
              return terrno;
×
645
            }
646
          }
647

648
          pSupp->pCovariateSlotList = taosArrayInit(4, sizeof(SColumn));
41✔
649
          pSupp->pDynamicRealList = taosArrayInit(4, sizeof(SColFutureData));
41✔
650

651
          // the first is the target column
652
          for (int32_t i = 1; i < numOfParam; ++i) {
74!
653
            SColumnNode* p = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, i);
74✔
654
            if ((nodeType(p) != QUERY_NODE_COLUMN) || (nodeType(p) == QUERY_NODE_COLUMN && p->isPrimTs)) {
74!
655
              break;
656
            }
657

658
            if (p->slotId == pSupp->targetValSlot) {
33✔
659
              continue; // duplicate the target column, ignore it
7✔
660
            }
661

662
            bool exist = existInList(pSupp, p->slotId);
26✔
663
            if (exist) {
26!
664
              continue;  // duplicate column, ignore it
×
665
            }
666

667
            SColumn col = {.slotId = p->slotId,
26✔
668
                           .colType = p->colType,
26✔
669
                           .type = p->node.resType.type,
26✔
670
                           .bytes = p->node.resType.bytes};
26✔
671

672
            tstrncpy(col.name, p->colName, tListLen(col.name));
26✔
673
            void* pRet = taosArrayPush(pSupp->pCovariateSlotList, &col);
26✔
674
            if (pRet == NULL) {
26!
675
              code = terrno;
×
676
              qError("failed to record the covariate slot index, since %s", tstrerror(code));
×
677
            }
678
          }
679

680
          pSupp->numOfInputCols += taosArrayGetSize(pSupp->pCovariateSlotList);
41✔
681
        }
682
      }
683
    }
684
  }
685

686
  return code;
41✔
687
}
688

689
static void initForecastOpt(SForecastSupp* pSupp) {
41✔
690
  pSupp->maxTs = 0;
41✔
691
  pSupp->minTs = INT64_MAX;
41✔
692
  pSupp->numOfRows = 0;
41✔
693
  pSupp->wncheck = ANALY_FORECAST_DEFAULT_WNCHECK;
41✔
694
  pSupp->forecastRows = ANALY_FORECAST_DEFAULT_ROWS;
41✔
695
  pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
41✔
696
  pSupp->setEvery = 0;
41✔
697
  pSupp->setStart = 0;
41✔
698
}
41✔
699

700
static int32_t filterNotSupportForecast(SForecastSupp* pSupp) {
40✔
701
  if (taosArrayGetSize(pSupp->pCovariateSlotList) > 0) {
40✔
702
    if (taosStrcasecmp(pSupp->algoName, "holtwinters") == 0) {
14✔
703
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
1✔
704
    } else if (taosStrcasecmp(pSupp->algoName, "arima") == 0) {
13✔
705
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
1✔
706
    } else if (taosStrcasecmp(pSupp->algoName, "timemoe-fc") == 0) {
12!
707
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
708
    }
709
  }
710

711
  return TSDB_CODE_SUCCESS;
38✔
712
}
713

714

715
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
41✔
716
  int32_t   code = 0;
41✔
717
  int32_t   lino = 0;
41✔
718
  SHashObj* pHashMap = NULL;
41✔
719

720
  initForecastOpt(pSupp);
41✔
721

722
  code = taosAnalyGetOpts(pSupp->pOptions, &pHashMap);
41✔
723
  if (code != TSDB_CODE_SUCCESS) {
41!
724
    return code;
×
725
  }
726

727
  if (taosHashGetSize(pHashMap) == 0) {
41!
728
    code = TSDB_CODE_INVALID_PARA;
×
729
    qError("%s no valid options for forecast, failed to exec", id);
×
730
    return code;
×
731
  }
732

733
  if (taosHashGetSize(pHashMap) == 0) {
41!
734
    code = TSDB_CODE_INVALID_PARA;
×
735
    qError("%s no valid options for forecast, failed to exec", id);
×
736
    return code;
×
737
  }
738

739
  code = taosAnalysisParseAlgo(pSupp->pOptions, pSupp->algoName, pSupp->algoUrl, ANALY_ALGO_TYPE_FORECAST,
41✔
740
                               tListLen(pSupp->algoUrl), pHashMap, id);
741
  TSDB_CHECK_CODE(code, lino, _end);
41✔
742

743
  code = filterNotSupportForecast(pSupp);
40✔
744
  if (code) {
40✔
745
    qError("%s not support forecast model, %s", id, pSupp->algoName);
2!
746
    TSDB_CHECK_CODE(code, lino, _end);
2!
747
  }
748

749
  // extract the timeout parameter
750
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
38✔
751
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
38✔
752

753
  // extract the forecast rows
754
  char* pRows = taosHashGet(pHashMap, ALGO_OPT_FORECASTROWS_NAME, strlen(ALGO_OPT_FORECASTROWS_NAME));
38✔
755
  if (pRows != NULL) {
38✔
756
    int64_t v = 0;
19✔
757
    code = toInteger(pRows, taosHashGetValueSize(pRows), 10, &v);
19✔
758

759
    pSupp->forecastRows = v;
19✔
760
    qDebug("%s forecast rows:%"PRId64, id, pSupp->forecastRows);
19!
761
  } else {
762
    qDebug("%s forecast rows not found:%s, use default:%" PRId64, id, pSupp->pOptions, pSupp->forecastRows);
19!
763
  }
764

765
  if (pSupp->forecastRows > ANALY_FORECAST_RES_MAX_ROWS) {
38!
766
    qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_RES_MAX_ROWS,
×
767
           pSupp->forecastRows);
768
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
769
    goto _end;
×
770
  }
771

772
  if (pSupp->forecastRows <= 0) {
38✔
773
    qError("%s output rows should be greater than 0, input:%" PRId64, id, pSupp->forecastRows);
3!
774
    code = TSDB_CODE_INVALID_PARA;
3✔
775
    goto _end;
3✔
776
  }
777

778
  // extract the confidence interval value
779
  char* pConf = taosHashGet(pHashMap, ALGO_OPT_CONF_NAME, strlen(ALGO_OPT_CONF_NAME));
35✔
780
  if (pConf != NULL) {
35✔
781
    char*  endPtr = NULL;
17✔
782
    double v = taosStr2Double(pConf, &endPtr);
17✔
783
    pSupp->conf = v;
17✔
784

785
    if (v <= 0 || v > 1.0) {
17!
786
      pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
787
      qWarn("%s valid conf range is (0, 1], user specified:%.2f out of range, set the default:%.2f", id, v,
×
788
             pSupp->conf);
789
    } else {
790
      qDebug("%s forecast conf:%.2f", id, pSupp->conf);
17!
791
    }
792
  } else {
793
    qDebug("%s forecast conf not found:%s, use default:%.2f", id, pSupp->pOptions, pSupp->conf);
18!
794
  }
795

796
  // extract the start timestamp
797
  char* pStart = taosHashGet(pHashMap, ALGO_OPT_START_NAME, strlen(ALGO_OPT_START_NAME));
35✔
798
  if (pStart != NULL) {
35✔
799
    int64_t v = 0;
2✔
800
    code = toInteger(pStart, taosHashGetValueSize(pStart), 10, &v);
2✔
801
    pSupp->startTs = v;
2✔
802
    pSupp->setStart = 1;
2✔
803
    qDebug("%s forecast set start ts:%"PRId64, id, pSupp->startTs);
2!
804
  }
805

806
  // extract the time step
807
  char* pEvery = taosHashGet(pHashMap, ALGO_OPT_EVERY_NAME, strlen(ALGO_OPT_EVERY_NAME));
35✔
808
  if (pEvery != NULL) {
35✔
809
    int64_t v = 0;
3✔
810
    code = toInteger(pEvery, taosHashGetValueSize(pEvery), 10, &v);
3✔
811
    pSupp->every = v;
3✔
812
    pSupp->setEvery = 1;
3✔
813
    qDebug("%s forecast set every ts:%"PRId64, id, pSupp->every);
3!
814
  }
815

816
  if (pSupp->setEvery && pSupp->every <= 0) {
35✔
817
    qError("%s period should be greater than 0, user specified:%"PRId64, id, pSupp->every);
1!
818
    code = TSDB_CODE_INVALID_PARA;
1✔
819
    goto _end;
1✔
820
  }
821

822
  // extract the dynamic real feature for covariate forecasting
823
  void*       pIter = NULL;
34✔
824
  size_t      keyLen = 0;
34✔
825
  const char* p = "dynamic_real_";
34✔
826

827
  while ((pIter = taosHashIterate(pHashMap, pIter))) {
112✔
828
    const char* pVal = pIter;
85✔
829
    char*       pKey = taosHashGetKey((void*)pVal, &keyLen);
85✔
830
    int32_t     idx = 0;
85✔
831
    char        nameBuf[512] = {0};
85✔
832

833
    if (strncmp(pKey, p, strlen(p)) == 0) {
85✔
834

835
      if (strncmp(&pKey[keyLen - 4], "_col", 4) == 0) {
15✔
836
        continue;
6✔
837
      }
838

839
      int32_t ret = sscanf(pKey, "dynamic_real_%d", &idx);
9✔
840
      if (ret == 0) {
9!
841
        continue;
×
842
      }
843

844
      memcpy(nameBuf, pKey, keyLen);
9✔
845
      strncpy(&nameBuf[keyLen], "_col", strlen("_col"));
9✔
846

847
      void* pCol = taosHashGet(pHashMap, nameBuf, strlen(nameBuf));
9✔
848
      if (pCol == NULL) {
9✔
849
        char* pTmp = taosStrndupi(pKey, keyLen);
5✔
850
        qError("%s dynamic real column related:%s column name:%s not specified", id, pTmp, nameBuf);
5!
851
        
852
        taosMemoryFree(pTmp);
5!
853
        code = TSDB_CODE_INVALID_PARA;
5✔
854
        goto _end;
7✔
855
      } else {
856
        // build dynamic_real_feature
857
        SColFutureData d = {.pName = taosStrndupi(pCol, taosHashGetValueSize(pCol))};
4✔
858
        if (d.pName == NULL) {
4!
859
          qError("%s failed to clone the future dynamic real column name:%s", id, (char*) pCol);
×
860
          code = terrno;
×
861
          goto _end;
2✔
862
        }
863

864
        int32_t index = -1;
4✔
865
        for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
8✔
866
          SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, i);
6✔
867
          if (strcmp(pColx->name, d.pName) == 0) {
6✔
868
            index = i;
2✔
869
            break;
2✔
870
          }
871
        }
872

873
        if (index == -1) {
4✔
874
          qError("%s not found the required future dynamic real column:%s", id, d.pName);
2!
875
          code = TSDB_CODE_INVALID_PARA;
2✔
876
          taosMemoryFree(d.pName);
2!
877
          goto _end;
2✔
878
        }
879

880
        SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, index);
2✔
881
        d.data.info.slotId = pColx->slotId;
2✔
882
        d.data.info.type = pColx->type;
2✔
883
        d.data.info.bytes = pColx->bytes;
2✔
884

885
        int32_t len = taosHashGetValueSize((void*)pVal);
2✔
886
        char*   buf = taosStrndupi(pVal, len);
2✔
887
        int32_t unused = strdequote((char*)buf);
2✔
888

889
        int32_t num = 0;
2✔
890
        char**  pList = strsplit(buf, " ", &num);
2✔
891
        if (num != pSupp->forecastRows) {
2!
892
          qError("%s the rows:%d of future dynamic real column data is not equalled to the forecasting rows:%" PRId64,
×
893
                 id, num, pSupp->forecastRows);
894
          code = TSDB_CODE_INVALID_PARA;
×
895

896
          taosMemoryFree(d.pName);
×
897
          taosMemoryFree(pList);
×
898
          taosMemoryFree(buf);
×
899
          goto _end;
×
900
        }
901

902
        d.numOfRows = num;
2✔
903

904
        code = colInfoDataEnsureCapacity(&d.data, num, true);
2✔
905
        if (code != 0) {
2!
906
          qError("%s failed to prepare buffer, code:%s", id, tstrerror(code));
×
907
          goto _end;
×
908
        }
909

910
        for (int32_t j = 0; j < num; ++j) {
105✔
911
          char* ps = NULL;
103✔
912
          if (j == 0) {
103✔
913
            ps = strstr(pList[j], "[") + 1;
2✔
914
          } else {
915
            ps = pList[j];
101✔
916
          }
917

918
          code = 0;
103✔
919

920
          switch(pColx->type) {
103!
921
            case TSDB_DATA_TYPE_TINYINT: {
×
922
              int8_t t1 = taosStr2Int8(ps, NULL, 10);
×
923
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
924
              break;
×
925
            }
926
            case TSDB_DATA_TYPE_SMALLINT: {
×
927
              int16_t t1 = taosStr2Int16(ps, NULL, 10);
×
928
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
929
              break;
×
930
            }
931
            case TSDB_DATA_TYPE_INT: {
103✔
932
              int32_t t1 = taosStr2Int32(ps, NULL, 10);
103✔
933
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
103✔
934
              break;
103✔
935
            }
936
            case TSDB_DATA_TYPE_BIGINT: {
×
937
              int64_t t1 = taosStr2Int64(ps, NULL, 10);
×
938
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
939
              break;
×
940
            }
941
            case TSDB_DATA_TYPE_FLOAT: {
×
942
              float t1 = taosStr2Float(ps, NULL);
×
943
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
944
              break;
×
945
            }
946
            case TSDB_DATA_TYPE_DOUBLE: {
×
947
              double t1 = taosStr2Double(ps, NULL);
×
948
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
949
              break;
×
950
            }
951
            case TSDB_DATA_TYPE_UTINYINT: {
×
952
              uint8_t t1 = taosStr2UInt8(ps, NULL, 10);
×
953
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
954
              break;
×
955
            }
956
            case TSDB_DATA_TYPE_USMALLINT: {
×
957
              uint16_t t1 = taosStr2UInt16(ps, NULL, 10);
×
958
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
959
              break;
×
960
            }
961
            case TSDB_DATA_TYPE_UINT: {
×
962
              uint32_t t1 = taosStr2UInt32(ps, NULL, 10);
×
963
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
964
              break;
×
965
            }
966
            case TSDB_DATA_TYPE_UBIGINT: {
×
967
              uint64_t t1 = taosStr2UInt64(ps, NULL, 10);
×
968
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
969
              break;
×
970
            }
971
          }
972

973
          if (code != 0) {
103!
974
            break;
×
975
          }
976
        }
977

978
        taosMemoryFree(pList);
2!
979
        taosMemoryFree(buf);
2!
980
        if (code != 0) {
2!
981
          goto _end;
×
982
        }
983

984
        void* noret = taosArrayPush(pSupp->pDynamicRealList, &d);
2✔
985
        if (noret == NULL) {
2!
986
          qError("%s failed to add column info in dynamic real column info", id);
×
987
          code = terrno;
×
988
          goto _end;
×
989
        }
990
      }
991
    }
992
  }
993

994
_end:
27✔
995
  taosHashCleanup(pHashMap);
41✔
996
  return code;
41✔
997
}
998

999
static int32_t forecastCreateBuf(SForecastSupp* pSupp) {
27✔
1000
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
27✔
1001
  int64_t       ts = 0;  // taosGetTimestampMs();
27✔
1002
  int32_t       index = 0;
27✔
1003

1004
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
27✔
1005
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%" PRId64, tsTempDir, ts);
27✔
1006

1007
  int32_t numOfCols = taosArrayGetSize(pSupp->pCovariateSlotList) + 2;
27✔
1008

1009
  int32_t code = tsosAnalyBufOpen(pBuf, numOfCols);
27✔
1010
  if (code != 0) goto _OVER;
27!
1011

1012
  code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
27✔
1013
  if (code != 0) goto _OVER;
27!
1014

1015
  code = taosAnalyBufWriteColMeta(pBuf, index++, pSupp->targetValType, "val");
27✔
1016
  if (code != 0) goto _OVER;
27!
1017

1018
  int32_t numOfDynamicReal = taosArrayGetSize(pSupp->pDynamicRealList);
27✔
1019
  int32_t numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
27✔
1020

1021
  if (numOfPastDynamicReal >= numOfDynamicReal) {
27!
1022
    for(int32_t i = 0; i < numOfDynamicReal; ++i) {
29✔
1023
      SColFutureData* pData = taosArrayGet(pSupp->pDynamicRealList, i);
2✔
1024

1025
      for(int32_t k = 0; k < taosArrayGetSize(pSupp->pCovariateSlotList); ++k) {
2!
1026
        SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, k);
2✔
1027
        if (strcmp(pCol->name, pData->pName) == 0) {
2!
1028
          char name[128] = {0};
2✔
1029
          (void) tsnprintf(name, tListLen(name), "dynamic_real_%d", i + 1);
2✔
1030
          code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
2✔
1031
          if (code != 0) {
2!
1032
            goto _OVER;
×
1033
          }
1034

1035
          memcpy(&pData->col, pCol, sizeof(SColumn));
2✔
1036
          taosArrayRemove(pSupp->pCovariateSlotList, k);
2✔
1037
          break;
2✔
1038
        }
1039
      }
1040
    }
1041

1042
    numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
27✔
1043
    for (int32_t j = 0; j < numOfPastDynamicReal; ++j) {
32✔
1044
      SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
5✔
1045

1046
      char name[128] = {0};
5✔
1047
      (void)tsnprintf(name, tListLen(name), "past_dynamic_real_%d", j + 1);
5✔
1048

1049
      code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
5✔
1050
      if (code) {
5!
1051
        goto _OVER;
×
1052
      }
1053
    }
1054
  }
1055

1056
  code = taosAnalyBufWriteDataBegin(pBuf);
27✔
1057
  if (code != 0) goto _OVER;
27!
1058

1059
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
88✔
1060
    code = taosAnalyBufWriteColBegin(pBuf, i);
61✔
1061
    if (code != 0) goto _OVER;
61!
1062
  }
1063

1064
_OVER:
27✔
1065
  if (code != 0) {
27!
1066
    (void)taosAnalyBufClose(pBuf);
×
1067
    taosAnalyBufDestroy(pBuf);
×
1068
  }
1069
  return code;
27✔
1070
}
1071

1072
static int32_t resetForecastOperState(SOperatorInfo* pOper) {
×
1073
  int32_t code = 0, lino = 0;
×
1074
  SForecastOperatorInfo* pInfo = pOper->info;
×
1075
  const char*            pId = pOper->pTaskInfo->id.str;
×
1076
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pOper->pPhyNode;
×
1077
  SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
×
1078

1079
  pOper->status = OP_NOT_OPENED;
×
1080

1081
  blockDataCleanup(pInfo->pRes);
×
1082

1083
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
×
1084
  pInfo->forecastSupp.pCovariateSlotList = NULL;
×
1085

1086
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
×
1087

1088
  cleanupExprSupp(&pOper->exprSupp);
×
1089
  cleanupExprSupp(&pInfo->scalarSup);
×
1090

1091
  int32_t                 numOfExprs = 0;
×
1092
  SExprInfo*              pExprInfo = NULL;
×
1093

1094
  TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs));
×
1095

1096
  TAOS_CHECK_EXIT(initExprSupp(&pOper->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore));
×
1097

1098
  TAOS_CHECK_EXIT(filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOper->exprSupp.pFilterInfo, 0,
×
1099
                            pTaskInfo->pStreamRuntimeInfo));
1100

1101
  TAOS_CHECK_EXIT(forecastParseInput(&pInfo->forecastSupp, pForecastPhyNode->pFuncs, pId));
×
1102

1103
  TAOS_CHECK_EXIT(forecastParseOutput(&pInfo->forecastSupp, &pOper->exprSupp));
×
1104

1105
  TAOS_CHECK_EXIT(forecastParseOpt(&pInfo->forecastSupp, pId));
×
1106

1107
  TAOS_CHECK_EXIT(forecastCreateBuf(&pInfo->forecastSupp));
×
1108

1109
  if (pForecastPhyNode->pExprs != NULL) {
×
1110
    int32_t    num = 0;
×
1111
    SExprInfo* pScalarExprInfo = NULL;
×
1112
    TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num));
×
1113
    TAOS_CHECK_EXIT(initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore));
×
1114
  }
1115

1116
  initResultSizeInfo(&pOper->resultInfo, 4096);
×
1117

1118
_exit:
×
1119

1120
  if (code) {
×
1121
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1122
  }
1123

1124
  return code;  
×
1125
}
1126

1127

1128

1129

1130
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
41✔
1131
                                   SOperatorInfo** pOptrInfo) {
1132
  QRY_PARAM_CHECK(pOptrInfo);
41!
1133

1134
  int32_t                code = 0;
41✔
1135
  int32_t                lino = 0;
41✔
1136
  SForecastOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SForecastOperatorInfo));
41!
1137
  SOperatorInfo*         pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
41!
1138
  if (pOperator == NULL || pInfo == NULL) {
41!
1139
    code = terrno;
×
1140
    goto _error;
×
1141
  }
1142

1143
  pOperator->pPhyNode = pPhyNode;
41✔
1144

1145
  const char*             pId = pTaskInfo->id.str;
41✔
1146
  SForecastSupp*          pSupp = &pInfo->forecastSupp;
41✔
1147
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
41✔
1148
  SExprSupp*              pExprSup = &pOperator->exprSupp;
41✔
1149
  int32_t                 numOfExprs = 0;
41✔
1150
  SExprInfo*              pExprInfo = NULL;
41✔
1151

1152
  code = createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
41✔
1153
  QUERY_CHECK_CODE(code, lino, _error);
41!
1154

1155
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
41✔
1156
  QUERY_CHECK_CODE(code, lino, _error);
41!
1157

1158
  if (pForecastPhyNode->pExprs != NULL) {
41!
1159
    int32_t    num = 0;
×
1160
    SExprInfo* pScalarExprInfo = NULL;
×
1161
    code = createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
×
1162
    QUERY_CHECK_CODE(code, lino, _error);
×
1163

1164
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
1165
    QUERY_CHECK_CODE(code, lino, _error);
×
1166
  }
1167

1168
  code = filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
41✔
1169
                            pTaskInfo->pStreamRuntimeInfo);
41✔
1170
  QUERY_CHECK_CODE(code, lino, _error);
41!
1171

1172
  code = forecastParseInput(pSupp, pForecastPhyNode->pFuncs, pId);
41✔
1173
  QUERY_CHECK_CODE(code, lino, _error);
41!
1174

1175
  code = forecastParseOutput(pSupp, pExprSup);
41✔
1176
  QUERY_CHECK_CODE(code, lino, _error);
41!
1177

1178
  code = forecastParseOpt(pSupp, pId);
41✔
1179
  QUERY_CHECK_CODE(code, lino, _error);
41✔
1180

1181
  code = forecastCreateBuf(pSupp);
27✔
1182
  QUERY_CHECK_CODE(code, lino, _error);
27!
1183

1184
  initResultSizeInfo(&pOperator->resultInfo, 4096);
27✔
1185

1186
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
27✔
1187
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
27!
1188

1189
  setOperatorInfo(pOperator, "ForecastOperator", QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, false, OP_NOT_OPENED, pInfo,
27✔
1190
                  pTaskInfo);
1191
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, forecastNext, NULL, destroyForecastInfo, optrDefaultBufFn,
27✔
1192
                                         NULL, optrDefaultGetNextExtFn, NULL);
1193

1194
  setOperatorResetStateFn(pOperator, resetForecastOperState);
27✔
1195

1196
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
27✔
1197
  QUERY_CHECK_CODE(code, lino, _error);
27!
1198

1199
  code = appendDownstream(pOperator, &downstream, 1);
27✔
1200
  QUERY_CHECK_CODE(code, lino, _error);
27!
1201

1202
  *pOptrInfo = pOperator;
27✔
1203

1204
  qDebug("%s forecast env is initialized, option:%s", pId, pSupp->pOptions);
27!
1205
  return TSDB_CODE_SUCCESS;
27✔
1206

1207
_error:
14✔
1208
  if (code != TSDB_CODE_SUCCESS) {
14!
1209
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
14!
1210
  }
1211
  if (pInfo != NULL) destroyForecastInfo(pInfo);
14!
1212
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
14✔
1213
  pTaskInfo->code = code;
14✔
1214
  return code;
14✔
1215
}
1216

1217
static void destroyColFutureData(void* p) {
2✔
1218
  SColFutureData* pData = p;
2✔
1219
  taosMemoryFree(pData->pName);
2!
1220
  colDataDestroy(&pData->data);
2✔
1221
}
2✔
1222

1223
static void destroyForecastInfo(void* param) {
41✔
1224
  SForecastOperatorInfo* pInfo = (SForecastOperatorInfo*)param;
41✔
1225

1226
  blockDataDestroy(pInfo->pRes);
41✔
1227
  pInfo->pRes = NULL;
41✔
1228

1229
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
41✔
1230
  pInfo->forecastSupp.pCovariateSlotList = NULL;
41✔
1231

1232
  taosArrayDestroyEx(pInfo->forecastSupp.pDynamicRealList, destroyColFutureData);
41✔
1233
  pInfo->forecastSupp.pDynamicRealList = NULL;
41✔
1234

1235
  taosMemoryFree(pInfo->forecastSupp.pOptions);
41!
1236

1237
  cleanupExprSupp(&pInfo->scalarSup);
41✔
1238
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
41✔
1239
  taosMemoryFreeClear(param);
41!
1240
}
41✔
1241

1242
#else
1243

1244
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1245
                                   SOperatorInfo** pOptrInfo) {
1246
  return TSDB_CODE_OPS_NOT_SUPPORT;
1247
}
1248

1249
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc