• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4856

17 Nov 2025 09:53AM UTC coverage: 64.286% (+0.2%) from 64.039%
#4856

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

218 of 311 new or added lines in 32 files covered. (70.1%)

4657 existing lines in 112 files now uncovered.

151658 of 235910 relevant lines covered (64.29%)

116320814.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/forecastoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "tanalytics.h"
22
#include "taoserror.h"
23
#include "tcommon.h"
24
#include "tdatablock.h"
25
#include "tmsg.h"
26
#include "tmsg.h"
27

28
#ifdef USE_ANALYTICS
29

30
#define ALGO_OPT_RETCONF_NAME      "return_conf"
31
#define ALGO_OPT_FORECASTROWS_NAME "rows"
32
#define ALGO_OPT_CONF_NAME         "conf"
33
#define ALGO_OPT_START_NAME        "start"
34
#define ALGO_OPT_EVERY_NAME        "every"
35

36
typedef struct {
37
  char*           pName;
38
  SColumnInfoData data;
39
  SColumn         col;
40
  int32_t         numOfRows;
41
} SColFutureData;
42

43
typedef struct {
44
  char         algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
45
  char         algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
46
  char*        pOptions;
47
  int64_t      maxTs;
48
  int64_t      minTs;
49
  int64_t      numOfRows;
50
  uint64_t     groupId;
51
  int64_t      forecastRows;
52
  int64_t      cachedRows;
53
  int32_t      numOfBlocks;
54
  int64_t      timeout;
55
  int16_t      resTsSlot;
56
  int16_t      resValSlot;
57
  int16_t      resLowSlot;
58
  int16_t      resHighSlot;
59
  int16_t      inputTsSlot;
60
  int16_t      targetValSlot;
61
  int8_t       targetValType;
62
  int8_t       inputPrecision;
63
  int8_t       wncheck;
64
  double       conf;
65
  int64_t      startTs;
66
  int64_t      every;
67
  int8_t       setStart;
68
  int8_t       setEvery;
69
  SArray*      pCovariateSlotList;   // covariate slot list
70
  SArray*      pDynamicRealList;     // dynamic real data list
71
  int32_t      numOfInputCols;
72
  SAnalyticBuf analyBuf;
73
} SForecastSupp;
74

75
typedef struct SForecastOperatorInfo {
76
  SSDataBlock*  pRes;
77
  SExprSupp     scalarSup;  // scalar calculation
78
  SForecastSupp forecastSupp;
79
} SForecastOperatorInfo;
80

81
static void    destroyForecastInfo(void* param);
82
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id);
83

84
static FORCE_INLINE int32_t forecastEnsureBlockCapacity(SSDataBlock* pBlock, int32_t newRowsNum) {
85
  if (pBlock->info.rows < pBlock->info.capacity) {
×
86
    return TSDB_CODE_SUCCESS;
×
87
  }
88

89
  int32_t code = blockDataEnsureCapacity(pBlock, newRowsNum);
×
90
  if (code != TSDB_CODE_SUCCESS) {
×
91
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
92
    return code;
×
93
  }
94

95
  return TSDB_CODE_SUCCESS;
×
96
}
97

98
static int32_t forecastCacheBlock(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* id) {
×
99
  int32_t       code = TSDB_CODE_SUCCESS;
×
100
  int32_t       lino = 0;
×
101
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
102

103
  if (pSupp->cachedRows > ANALY_FORECAST_MAX_ROWS) {
×
104
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
105
    qError("%s rows:%" PRId64 " for forecast cache, error happens, code:%s, upper limit:%d", id, pSupp->cachedRows,
×
106
           tstrerror(code), ANALY_FORECAST_MAX_ROWS);
107
    return code;
×
108
  }
109

110
  pSupp->numOfBlocks++;
×
111
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->numOfBlocks, pBlock, pBlock->info.rows);
×
112

113
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
114
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetValSlot);
×
115
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->inputTsSlot);
×
116
    if (pTsCol == NULL || pValCol == NULL) break;
×
117

118
    int32_t index = 0;
×
119
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
×
120
    char*   val = colDataGetData(pValCol, j);
×
121
    int16_t valType = pValCol->info.type;
×
122

123
    pSupp->minTs = MIN(pSupp->minTs, ts);
×
124
    pSupp->maxTs = MAX(pSupp->maxTs, ts);
×
125
    pSupp->numOfRows++;
×
126

127
    // write the primary time stamp column data
128
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
×
129
    if (TSDB_CODE_SUCCESS != code) {
×
130
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
131
      return code;
×
132
    }
133

134
    // write the main column for forecasting
135
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
×
136
    if (TSDB_CODE_SUCCESS != code) {
×
137
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
138
      return code;
×
139
    }
140

141
    // let's handle the dynamic_real_columns
142
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pDynamicRealList); ++i) {
×
143
      SColFutureData*  pCol = taosArrayGet(pSupp->pDynamicRealList, i);
×
144
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->col.slotId);
×
145

146
      char* pVal = colDataGetData(pColData, j);
×
147
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->col.type, pVal);
×
148
      if (TSDB_CODE_SUCCESS != code) {
×
149
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
150
        return code;
×
151
      }
152
    }
153

154
    // now handle the past_dynamic_real columns and
155
    for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
×
156
      SColumn*         pCol = taosArrayGet(pSupp->pCovariateSlotList, i);
×
157
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
×
158

159
      char* pVal = colDataGetData(pColData, j);
×
160
      code = taosAnalyBufWriteColData(pBuf, index++, pCol->type, pVal);
×
161
      if (TSDB_CODE_SUCCESS != code) {
×
162
        qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
163
        return code;
×
164
      }
165
    }
166
  }
167

168
  return 0;
×
169
}
170

171
static int32_t forecastCloseBuf(SForecastSupp* pSupp, const char* id) {
×
172
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
173
  int32_t       code = 0;
×
174

175
  // add the future dynamic real column data
176

177
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
×
178
    // the primary timestamp and the forecast column
179
    // add the future dynamic real column data
180
    if ((i >= 2) && ((i - 2) < taosArrayGetSize(pSupp->pDynamicRealList))) {
×
181
      SColFutureData* pCol = taosArrayGet(pSupp->pDynamicRealList, i - 2);
×
182
      int16_t         valType = pCol->col.type;
×
183
      for (int32_t j = 0; j < pCol->numOfRows; ++j) {
×
184
        char* pVal = colDataGetData(&pCol->data, j);
×
185
        code = taosAnalyBufWriteColData(pBuf, i, valType, pVal);
×
186
        if (code != 0) {
×
187
          return code;
×
188
        }
189
      }
190
    }
191

192
    code = taosAnalyBufWriteColEnd(pBuf, i);
×
193
    if (code != 0) return code;
×
194
  }
195

196
  code = taosAnalyBufWriteDataEnd(pBuf);
×
197
  if (code != 0) return code;
×
198

199
  code = taosAnalyBufWriteOptStr(pBuf, "option", pSupp->pOptions);
×
200
  if (code != 0) return code;
×
201

202
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pSupp->algoName);
×
203
  if (code != 0) return code;
×
204

205
  const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
×
206
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_MICRO) prec = TSDB_TIME_PRECISION_MICRO_STR;
×
207
  if (pSupp->inputPrecision == TSDB_TIME_PRECISION_NANO) prec = TSDB_TIME_PRECISION_NANO_STR;
×
208
  code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
×
209
  if (code != 0) return code;
×
210

211
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
×
212
  if (code != 0) return code;
×
213

214
  bool noConf = (pSupp->resHighSlot == -1 && pSupp->resLowSlot == -1);
×
215
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_RETCONF_NAME, !noConf);
×
216
  if (code != 0) return code;
×
217

218
  if (pSupp->cachedRows < ANALY_FORECAST_MIN_ROWS) {
×
219
    qError("%s history rows for forecasting not enough, min required:%d, current:%" PRId64, id, ANALY_FORECAST_MIN_ROWS,
×
220
           pSupp->forecastRows);
221
    return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
222
  }
223

224
  if (pSupp->cachedRows < ANALY_TDTSFM_FORECAST_MIN_ROWS && strcmp(pSupp->algoName, "tdtsfm_1") == 0) {
×
225
    qError("%s history rows for forecasting when using tdtsfm model not enough, min required:%d, current:%" PRId64, id,
×
226
           ANALY_TDTSFM_FORECAST_MIN_ROWS, pSupp->forecastRows);
227
    return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
228
  }
229

230
  code = taosAnalyBufWriteOptInt(pBuf, "forecast_rows", pSupp->forecastRows);
×
231
  if (code != 0) return code;
×
232

233
  code = taosAnalyBufWriteOptFloat(pBuf, "conf", pSupp->conf);
×
234
  if (code != 0) return code;
×
235

236
  int32_t len = strlen(pSupp->pOptions);
×
237
  int64_t every = (pSupp->setEvery != 0) ? pSupp->every : ((pSupp->maxTs - pSupp->minTs) / (pSupp->numOfRows - 1));
×
238
  code = taosAnalyBufWriteOptInt(pBuf, "every", every);
×
239
  if (code != 0) return code;
×
240

241
  int64_t start = (pSupp->setStart != 0) ? pSupp->startTs : pSupp->maxTs + every;
×
242
  code = taosAnalyBufWriteOptInt(pBuf, "start", start);
×
243
  if (code != 0) return code;
×
244

245
  if (taosArrayGetSize(pSupp->pCovariateSlotList) + taosArrayGetSize(pSupp->pDynamicRealList) > 0) {
×
246
    code = taosAnalyBufWriteOptStr(pBuf, "type", "covariate");
×
247
    if (code != 0) return code;
×
248
  }
249

250
  code = taosAnalyBufClose(pBuf);
×
251
  return code;
×
252
}
253

254
static int32_t forecastAnalysis(SForecastSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
×
255
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
256
  int32_t       resCurRow = pBlock->info.rows;
×
257
  int64_t       tmpI64 = 0;
×
258
  double        tmpDouble = 0;
×
259
  int32_t       code = 0;
×
260

261
  SColumnInfoData* pResValCol = taosArrayGet(pBlock->pDataBlock, pSupp->resValSlot);
×
262
  if (NULL == pResValCol) {
×
263
    return terrno;
×
264
  }
265

266
  SColumnInfoData* pResTsCol = ((pSupp->resTsSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resTsSlot) : NULL);
×
267
  SColumnInfoData* pResLowCol =
×
268
      ((pSupp->resLowSlot != -1) ? taosArrayGet(pBlock->pDataBlock, pSupp->resLowSlot) : NULL);
×
269
  SColumnInfoData* pResHighCol =
×
270
      (pSupp->resHighSlot != -1 ? taosArrayGet(pBlock->pDataBlock, pSupp->resHighSlot) : NULL);
×
271

272
  SJson* pJson = taosAnalySendReqRetJson(pSupp->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout, pId);
×
273
  if (pJson == NULL) {
×
274
    return terrno;
×
275
  }
276

277
  int32_t rows = 0;
×
278
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
×
279
  if (rows < 0 && code == 0) {
×
NEW
280
    code = parseErrorMsgFromAnalyticServer(pJson, pId);
×
281
    tjsonDelete(pJson);
×
282
    return code;
×
283
  }
284

285
  // invalid json format
286
  if (code != 0) {
×
287
    goto _OVER;
×
288
  }
289

290
  SJson* res = tjsonGetObjectItem(pJson, "res");
×
UNCOV
291
  if (res == NULL) goto _OVER;
×
UNCOV
292
  int32_t ressize = tjsonGetArraySize(res);
×
UNCOV
293
  bool    returnConf = (pSupp->resHighSlot != -1 || pSupp->resLowSlot != -1);
×
294

295
  if ((returnConf && (ressize != 4)) || ((!returnConf) && (ressize != 2))) {
×
296
    goto _OVER;
×
297
  }
298

299
  if (pResTsCol != NULL) {
×
300
    resCurRow = pBlock->info.rows;
×
301
    SJson* tsJsonArray = tjsonGetArrayItem(res, 0);
×
302
    if (tsJsonArray == NULL) goto _OVER;
×
303
    int32_t tsSize = tjsonGetArraySize(tsJsonArray);
×
304
    if (tsSize != rows) goto _OVER;
×
UNCOV
305
    for (int32_t i = 0; i < tsSize; ++i) {
×
UNCOV
306
      SJson* tsJson = tjsonGetArrayItem(tsJsonArray, i);
×
UNCOV
307
      tjsonGetObjectValueBigInt(tsJson, &tmpI64);
×
308
      colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
×
309
      resCurRow++;
×
310
    }
311
  }
312

313
  if (pResLowCol != NULL) {
×
314
    resCurRow = pBlock->info.rows;
×
315
    SJson* lowJsonArray = tjsonGetArrayItem(res, 2);
×
316
    if (lowJsonArray == NULL) goto _OVER;
×
317
    int32_t lowSize = tjsonGetArraySize(lowJsonArray);
×
318
    if (lowSize != rows) goto _OVER;
×
UNCOV
319
    for (int32_t i = 0; i < lowSize; ++i) {
×
UNCOV
320
      SJson* lowJson = tjsonGetArrayItem(lowJsonArray, i);
×
UNCOV
321
      tjsonGetObjectValueDouble(lowJson, &tmpDouble);
×
322
      colDataSetDouble(pResLowCol, resCurRow, &tmpDouble);
×
323
      resCurRow++;
×
324
    }
325
  }
326

327
  if (pResHighCol != NULL) {
×
328
    resCurRow = pBlock->info.rows;
×
329
    SJson* highJsonArray = tjsonGetArrayItem(res, 3);
×
UNCOV
330
    if (highJsonArray == NULL) goto _OVER;
×
331
    int32_t highSize = tjsonGetArraySize(highJsonArray);
×
332
    if (highSize != rows) goto _OVER;
×
UNCOV
333
    for (int32_t i = 0; i < highSize; ++i) {
×
UNCOV
334
      SJson* highJson = tjsonGetArrayItem(highJsonArray, i);
×
335
      tjsonGetObjectValueDouble(highJson, &tmpDouble);
×
UNCOV
336
      colDataSetDouble(pResHighCol, resCurRow, &tmpDouble);
×
337
      resCurRow++;
×
338
    }
339
  }
340

341
  resCurRow = pBlock->info.rows;
×
342
  SJson* valJsonArray = tjsonGetArrayItem(res, 1);
×
343
  if (valJsonArray == NULL) goto _OVER;
×
UNCOV
344
  int32_t valSize = tjsonGetArraySize(valJsonArray);
×
UNCOV
345
  if (valSize != rows) goto _OVER;
×
346
  for (int32_t i = 0; i < valSize; ++i) {
×
347
    SJson* valJson = tjsonGetArrayItem(valJsonArray, i);
×
UNCOV
348
    tjsonGetObjectValueDouble(valJson, &tmpDouble);
×
349

350
    colDataSetDouble(pResValCol, resCurRow, &tmpDouble);
×
351
    resCurRow++;
×
352
  }
353

UNCOV
354
  pBlock->info.rows += rows;
×
355

356
  if (pJson != NULL) tjsonDelete(pJson);
×
UNCOV
357
  return 0;
×
358

359
_OVER:
×
UNCOV
360
  tjsonDelete(pJson);
×
361
  if (code == 0) {
×
362
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
363
  }
364

UNCOV
365
  qError("%s failed to perform forecast finalize since %s", pId, tstrerror(code));
×
366
  return code;
×
367
}
368

369
static int32_t forecastAggregateBlocks(SForecastSupp* pSupp, SSDataBlock* pResBlock, const char* pId) {
×
UNCOV
370
  int32_t       code = TSDB_CODE_SUCCESS;
×
UNCOV
371
  int32_t       lino = 0;
×
372
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
373

374
  code = forecastCloseBuf(pSupp, pId);
×
375
  QUERY_CHECK_CODE(code, lino, _end);
×
376

377
  code = forecastEnsureBlockCapacity(pResBlock, 1);
×
378
  QUERY_CHECK_CODE(code, lino, _end);
×
379

380
  code = forecastAnalysis(pSupp, pResBlock, pId);
×
381
  QUERY_CHECK_CODE(code, lino, _end);
×
382

383
  uInfo("%s block:%d, forecast finalize", pId, pSupp->numOfBlocks);
×
384

385
_end:
×
UNCOV
386
  pSupp->numOfBlocks = 0;
×
387
  taosAnalyBufDestroy(&pSupp->analyBuf);
×
388
  return code;
×
389
}
390

UNCOV
391
static int32_t forecastNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
392
  int32_t                code = TSDB_CODE_SUCCESS;
×
393
  int32_t                lino = 0;
×
394
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
×
395
  SForecastOperatorInfo* pInfo = pOperator->info;
×
396
  SSDataBlock*           pResBlock = pInfo->pRes;
×
397
  SForecastSupp*         pSupp = &pInfo->forecastSupp;
×
UNCOV
398
  SExprSupp*             pScalarSupp = &pInfo->scalarSup;
×
UNCOV
399
  SAnalyticBuf*          pBuf = &pSupp->analyBuf;
×
UNCOV
400
  int64_t                st = taosGetTimestampUs();
×
401
  int32_t                numOfBlocks = pSupp->numOfBlocks;
×
402
  const char*            pId = GET_TASKID(pOperator->pTaskInfo);
×
403

404
  blockDataCleanup(pResBlock);
×
405

UNCOV
406
  while (1) {
×
407
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
408
    if (pBlock == NULL) {
×
UNCOV
409
      break;
×
410
    }
411

412
    if (pScalarSupp->pExprInfo != NULL) {
×
413
      code = projectApplyFunctions(pScalarSupp->pExprInfo, pBlock, pBlock, pScalarSupp->pCtx, pScalarSupp->numOfExprs,
×
414
                                   NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
×
415
      if (code != TSDB_CODE_SUCCESS) {
×
416
        T_LONG_JMP(pTaskInfo->env, code);
×
417
      }
418
    }
419

UNCOV
420
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
×
UNCOV
421
      pSupp->groupId = pBlock->info.id.groupId;
×
422
      numOfBlocks++;
×
423
      pSupp->cachedRows += pBlock->info.rows;
×
424
      qDebug("%s group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId, numOfBlocks,
×
425
             pBlock->info.rows, pSupp->cachedRows);
UNCOV
426
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
UNCOV
427
      QUERY_CHECK_CODE(code, lino, _end);
×
428
    } else {
429
      qDebug("%s group:%" PRId64 ", read finish for new group coming, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
430
      code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
431
      QUERY_CHECK_CODE(code, lino, _end);
×
432
      pSupp->groupId = pBlock->info.id.groupId;
×
UNCOV
433
      numOfBlocks = 1;
×
UNCOV
434
      pSupp->cachedRows = pBlock->info.rows;
×
435
      qDebug("%s group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%" PRId64, pId, pSupp->groupId,
×
436
             pBlock->info.rows, pSupp->cachedRows);
UNCOV
437
      code = forecastCacheBlock(pSupp, pBlock, pId);
×
438
      QUERY_CHECK_CODE(code, lino, _end);
×
439
    }
440

441
    if (pResBlock->info.rows > 0) {
×
442
      (*ppRes) = pResBlock;
×
UNCOV
443
      qDebug("%s group:%" PRId64 ", return to upstream, blocks:%d", pId, pResBlock->info.id.groupId, numOfBlocks);
×
UNCOV
444
      return code;
×
445
    }
446
  }
447

UNCOV
448
  if (numOfBlocks > 0) {
×
449
    qDebug("%s group:%" PRId64 ", read finish, blocks:%d", pId, pSupp->groupId, numOfBlocks);
×
450
    code = forecastAggregateBlocks(pSupp, pResBlock, pId);
×
451
    QUERY_CHECK_CODE(code, lino, _end);
×
452
  }
453

UNCOV
454
  int64_t cost = taosGetTimestampUs() - st;
×
455
  qDebug("%s all groups finished, cost:%" PRId64 "us", pId, cost);
×
456

457
_end:
×
458
  if (code != TSDB_CODE_SUCCESS) {
×
459
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
×
460
    pTaskInfo->code = code;
×
461
    T_LONG_JMP(pTaskInfo->env, code);
×
462
  }
463

464
  (*ppRes) = (pResBlock->info.rows == 0) ? NULL : pResBlock;
×
465
  return code;
×
466
}
467

UNCOV
468
static int32_t forecastParseOutput(SForecastSupp* pSupp, SExprSupp* pExprSup) {
×
UNCOV
469
  pSupp->resLowSlot = -1;
×
470
  pSupp->resHighSlot = -1;
×
UNCOV
471
  pSupp->resTsSlot = -1;
×
UNCOV
472
  pSupp->resValSlot = -1;
×
473

474
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
×
475
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
×
UNCOV
476
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
×
477
    if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST) {
×
478
      pSupp->resValSlot = dstSlot;
×
479
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_ROWTS) {
×
480
      pSupp->resTsSlot = dstSlot;
×
UNCOV
481
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_LOW) {
×
UNCOV
482
      pSupp->resLowSlot = dstSlot;
×
483
    } else if (pExprInfo->pExpr->_function.functionType == FUNCTION_TYPE_FORECAST_HIGH) {
×
484
      pSupp->resHighSlot = dstSlot;
×
485
    } else {
486
    }
487
  }
488

UNCOV
489
  return 0;
×
490
}
491

492
static int32_t validInputParams(SFunctionNode* pFunc, const char* id) {
×
493
  int32_t code = 0;
×
494
  int32_t num = LIST_LENGTH(pFunc->pParameterList);
×
495

496
  if (num <= 1) {
×
497
    qError("%s invalid number of parameters:%d", id, num);
×
498
    code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
499
    goto _end;
×
500
  }
501

UNCOV
502
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
503
    SNode* p = nodesListGetNode(pFunc->pParameterList, i);
×
UNCOV
504
    if (p == NULL) {
×
505
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
506
      qError("%s %d-th parameter in forecast function is NULL, code:%s", id, i, tstrerror(code));
×
507
      goto _end;
×
508
    }
509
  }
510

UNCOV
511
  if (num == 2) {  // column_name, timestamp_column_name
×
512
    SNode* p1 = nodesListGetNode(pFunc->pParameterList, 0);
×
513
    SNode* p2 = nodesListGetNode(pFunc->pParameterList, 1);
×
514

515
    if (nodeType(p1) != QUERY_NODE_COLUMN || nodeType(p2) != QUERY_NODE_COLUMN) {
×
UNCOV
516
      qError("%s invalid column type, column 1:%d, column 2:%d", id, nodeType(p1), nodeType(p2));
×
UNCOV
517
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
UNCOV
518
      goto _end;
×
519
    }
520
  } else if (num >= 3) {
×
521
    // column_name_#1, column_name_#2...., analytics_options, timestamp_column_name, primary_key_column if exists
522
    // column_name_#1, timestamp_column_name, primary_key_column if exists
523
    // column_name_#1, analytics_options, timestamp_column_name
UNCOV
524
    SNode* pTarget = nodesListGetNode(pFunc->pParameterList, 0);
×
UNCOV
525
    if (nodeType(pTarget) != QUERY_NODE_COLUMN) {
×
526
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
527
      qError("%s first parameter is not valid column in forecast function", id);
×
528
      goto _end;
×
529
    }
530

531
    SNode* pNode = nodesListGetNode(pFunc->pParameterList, num - 1);
×
UNCOV
532
    if (nodeType(pNode) != QUERY_NODE_COLUMN) {
×
UNCOV
533
      qError("%s last parameter is not valid column, actual:%d", id, nodeType(pNode));
×
UNCOV
534
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
535
    }
536
  }
537

538
_end:
×
539
  if (code) {
×
540
    qError("%s valid the parameters failed, code:%s", id, tstrerror(code));
×
541
  }
542
  return code;
×
543
}
544

545
static bool existInList(SForecastSupp* pSupp, int32_t slotId) {
×
UNCOV
546
  for (int32_t j = 0; j < taosArrayGetSize(pSupp->pCovariateSlotList); ++j) {
×
547
    SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
×
548

549
    if (pCol->slotId == slotId) {
×
550
      return true;
×
551
    }
552
  }
553

554
  return false;
×
555
}
556

UNCOV
557
static int32_t forecastParseInput(SForecastSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
558
  int32_t code = 0;
×
UNCOV
559
  SNode*  pNode = NULL;
×
560
  
UNCOV
561
  pSupp->inputTsSlot = -1;
×
562
  pSupp->targetValSlot = -1;
×
563
  pSupp->targetValType = -1;
×
UNCOV
564
  pSupp->inputPrecision = -1;
×
565

566
  FOREACH(pNode, pFuncs) {
×
567
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
568
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
UNCOV
569
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
570

571
      if (pFunc->funcType == FUNCTION_TYPE_FORECAST) {
×
572
        code = validInputParams(pFunc, id);
×
573
        if (code) {
×
574
          return code;
×
575
        }
576

577
        pSupp->numOfInputCols = 2;
×
578

579
        if (numOfParam == 2) {
×
580
          // column, ts
581
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
582
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
583

UNCOV
584
          pSupp->inputTsSlot = pTsNode->slotId;
×
585
          pSupp->inputPrecision = pTsNode->node.resType.precision;
×
586
          pSupp->targetValSlot = pTarget->slotId;
×
587
          pSupp->targetValType = pTarget->node.resType.type;
×
588

589
          // let's add the holtwinters as the default forecast algorithm
590
          pSupp->pOptions = taosStrdup("algo=holtwinters");
×
591
          if (pSupp->pOptions == NULL) {
×
592
            qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
593
            return terrno;
×
594
          }
595
        } else {
596
          SColumnNode* pTarget = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
597
          bool         assignTs = false;
×
598
          bool         assignOpt = false;
×
599

600
          pSupp->targetValSlot = pTarget->slotId;
×
UNCOV
601
          pSupp->targetValType = pTarget->node.resType.type;
×
602

603
          // set the primary ts column and option info
UNCOV
604
          for (int32_t i = 0; i < numOfParam; ++i) {
×
605
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
×
UNCOV
606
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
607
              SColumnNode* pColNode = (SColumnNode*)pNode;
×
608
              if (pColNode->isPrimTs && (!assignTs)) {
×
609
                pSupp->inputTsSlot = pColNode->slotId;
×
610
                pSupp->inputPrecision = pColNode->node.resType.precision;
×
UNCOV
611
                assignTs = true;
×
UNCOV
612
                continue;
×
613
              }
614
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
615
              if (!assignOpt) {
×
UNCOV
616
                SValueNode* pOptNode = (SValueNode*)pNode;
×
UNCOV
617
                pSupp->pOptions = taosStrdup(pOptNode->literal);
×
618
                assignOpt = true;
×
619
                continue;
×
620
              }
621
            }
622
          }
623

624
          if (!assignOpt) {
×
625
            // set the default forecast option
UNCOV
626
            pSupp->pOptions = taosStrdup("algo=holtwinters");
×
UNCOV
627
            if (pSupp->pOptions == NULL) {
×
628
              qError("%s failed to dup forecast option, code:%s", id, tstrerror(terrno));
×
629
              return terrno;
×
630
            }
631
          }
632

633
          pSupp->pCovariateSlotList = taosArrayInit(4, sizeof(SColumn));
×
634
          pSupp->pDynamicRealList = taosArrayInit(4, sizeof(SColFutureData));
×
635

636
          // the first is the target column
UNCOV
637
          for (int32_t i = 1; i < numOfParam; ++i) {
×
638
            SColumnNode* p = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, i);
×
639
            if ((nodeType(p) != QUERY_NODE_COLUMN) || (nodeType(p) == QUERY_NODE_COLUMN && p->isPrimTs)) {
×
640
              break;
641
            }
642

UNCOV
643
            if (p->slotId == pSupp->targetValSlot) {
×
UNCOV
644
              continue; // duplicate the target column, ignore it
×
645
            }
646

UNCOV
647
            bool exist = existInList(pSupp, p->slotId);
×
UNCOV
648
            if (exist) {
×
UNCOV
649
              continue;  // duplicate column, ignore it
×
650
            }
651

652
            SColumn col = {.slotId = p->slotId,
×
UNCOV
653
                           .colType = p->colType,
×
UNCOV
654
                           .type = p->node.resType.type,
×
655
                           .bytes = p->node.resType.bytes};
×
656

657
            tstrncpy(col.name, p->colName, tListLen(col.name));
×
658
            void* pRet = taosArrayPush(pSupp->pCovariateSlotList, &col);
×
659
            if (pRet == NULL) {
×
660
              code = terrno;
×
661
              qError("failed to record the covariate slot index, since %s", tstrerror(code));
×
662
            }
663
          }
664

UNCOV
665
          pSupp->numOfInputCols += taosArrayGetSize(pSupp->pCovariateSlotList);
×
666
        }
667
      }
668
    }
669
  }
670

671
  return code;
×
672
}
673

UNCOV
674
static void initForecastOpt(SForecastSupp* pSupp) {
×
UNCOV
675
  pSupp->maxTs = 0;
×
UNCOV
676
  pSupp->minTs = INT64_MAX;
×
677
  pSupp->numOfRows = 0;
×
UNCOV
678
  pSupp->wncheck = ANALY_DEFAULT_WNCHECK;
×
UNCOV
679
  pSupp->forecastRows = ANALY_FORECAST_DEFAULT_ROWS;
×
UNCOV
680
  pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
681
  pSupp->setEvery = 0;
×
682
  pSupp->setStart = 0;
×
683
}
×
684

UNCOV
685
static int32_t filterNotSupportForecast(SForecastSupp* pSupp) {
×
686
  if (taosArrayGetSize(pSupp->pCovariateSlotList) > 0) {
×
UNCOV
687
    if (taosStrcasecmp(pSupp->algoName, "holtwinters") == 0) {
×
688
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
689
    } else if (taosStrcasecmp(pSupp->algoName, "arima") == 0) {
×
690
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
UNCOV
691
    } else if (taosStrcasecmp(pSupp->algoName, "timemoe-fc") == 0) {
×
UNCOV
692
      return TSDB_CODE_ANA_NOT_SUPPORT_FORECAST;
×
693
    }
694
  }
695

696
  return TSDB_CODE_SUCCESS;
×
697
}
698

699

700
static int32_t forecastParseOpt(SForecastSupp* pSupp, const char* id) {
×
701
  int32_t   code = 0;
×
702
  int32_t   lino = 0;
×
UNCOV
703
  SHashObj* pHashMap = NULL;
×
704

705
  initForecastOpt(pSupp);
×
706

707
  code = taosAnalyGetOpts(pSupp->pOptions, &pHashMap);
×
UNCOV
708
  if (code != TSDB_CODE_SUCCESS) {
×
709
    return code;
×
710
  }
711

712
  if (taosHashGetSize(pHashMap) == 0) {
×
UNCOV
713
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
714
    qError("%s no valid options for forecast, failed to exec", id);
×
UNCOV
715
    return code;
×
716
  }
717

UNCOV
718
  if (taosHashGetSize(pHashMap) == 0) {
×
UNCOV
719
    code = TSDB_CODE_INVALID_PARA;
×
720
    qError("%s no valid options for forecast, failed to exec", id);
×
721
    return code;
×
722
  }
723

UNCOV
724
  code = taosAnalysisParseAlgo(pSupp->pOptions, pSupp->algoName, pSupp->algoUrl, ANALY_ALGO_TYPE_FORECAST,
×
725
                               tListLen(pSupp->algoUrl), pHashMap, id);
726
  TSDB_CHECK_CODE(code, lino, _end);
×
727

728
  code = filterNotSupportForecast(pSupp);
×
UNCOV
729
  if (code) {
×
UNCOV
730
    qError("%s not support forecast model, %s", id, pSupp->algoName);
×
731
    TSDB_CHECK_CODE(code, lino, _end);
×
732
  }
733

734
  // extract the timeout parameter
735
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
×
UNCOV
736
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
×
737

738
  // extract the forecast rows
739
  char* pRows = taosHashGet(pHashMap, ALGO_OPT_FORECASTROWS_NAME, strlen(ALGO_OPT_FORECASTROWS_NAME));
×
740
  if (pRows != NULL) {
×
741
    int64_t v = 0;
×
UNCOV
742
    code = toInteger(pRows, taosHashGetValueSize(pRows), 10, &v);
×
743

UNCOV
744
    pSupp->forecastRows = v;
×
745
    qDebug("%s forecast rows:%"PRId64, id, pSupp->forecastRows);
×
746
  } else {
747
    qDebug("%s forecast rows not found:%s, use default:%" PRId64, id, pSupp->pOptions, pSupp->forecastRows);
×
748
  }
749

UNCOV
750
  if (pSupp->forecastRows > ANALY_FORECAST_RES_MAX_ROWS) {
×
751
    qError("%s required too many forecast rows, max allowed:%d, required:%" PRId64, id, ANALY_FORECAST_RES_MAX_ROWS,
×
752
           pSupp->forecastRows);
753
    code = TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
UNCOV
754
    goto _end;
×
755
  }
756

UNCOV
757
  if (pSupp->forecastRows <= 0) {
×
UNCOV
758
    qError("%s output rows should be greater than 0, input:%" PRId64, id, pSupp->forecastRows);
×
759
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
760
    goto _end;
×
761
  }
762

763
  // extract the confidence interval value
764
  char* pConf = taosHashGet(pHashMap, ALGO_OPT_CONF_NAME, strlen(ALGO_OPT_CONF_NAME));
×
765
  if (pConf != NULL) {
×
766
    char*  endPtr = NULL;
×
767
    double v = taosStr2Double(pConf, &endPtr);
×
768
    pSupp->conf = v;
×
769

UNCOV
770
    if (v <= 0 || v > 1.0) {
×
UNCOV
771
      pSupp->conf = ANALY_FORECAST_DEFAULT_CONF;
×
UNCOV
772
      qWarn("%s valid conf range is (0, 1], user specified:%.2f out of range, set the default:%.2f", id, v,
×
773
             pSupp->conf);
774
    } else {
775
      qDebug("%s forecast conf:%.2f", id, pSupp->conf);
×
776
    }
777
  } else {
778
    qDebug("%s forecast conf not found:%s, use default:%.2f", id, pSupp->pOptions, pSupp->conf);
×
779
  }
780

781
  // extract the start timestamp
782
  char* pStart = taosHashGet(pHashMap, ALGO_OPT_START_NAME, strlen(ALGO_OPT_START_NAME));
×
783
  if (pStart != NULL) {
×
784
    int64_t v = 0;
×
785
    code = toInteger(pStart, taosHashGetValueSize(pStart), 10, &v);
×
UNCOV
786
    pSupp->startTs = v;
×
UNCOV
787
    pSupp->setStart = 1;
×
UNCOV
788
    qDebug("%s forecast set start ts:%"PRId64, id, pSupp->startTs);
×
789
  }
790

791
  // extract the time step
UNCOV
792
  char* pEvery = taosHashGet(pHashMap, ALGO_OPT_EVERY_NAME, strlen(ALGO_OPT_EVERY_NAME));
×
793
  if (pEvery != NULL) {
×
794
    int64_t v = 0;
×
795
    code = toInteger(pEvery, taosHashGetValueSize(pEvery), 10, &v);
×
796
    pSupp->every = v;
×
797
    pSupp->setEvery = 1;
×
UNCOV
798
    qDebug("%s forecast set every ts:%"PRId64, id, pSupp->every);
×
799
  }
800

801
  if (pSupp->setEvery && pSupp->every <= 0) {
×
802
    qError("%s period should be greater than 0, user specified:%"PRId64, id, pSupp->every);
×
UNCOV
803
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
804
    goto _end;
×
805
  }
806

807
  // extract the dynamic real feature for covariate forecasting
UNCOV
808
  void*       pIter = NULL;
×
UNCOV
809
  size_t      keyLen = 0;
×
810
  const char* p = "dynamic_real_";
×
811

UNCOV
812
  while ((pIter = taosHashIterate(pHashMap, pIter))) {
×
813
    const char* pVal = pIter;
×
814
    char*       pKey = taosHashGetKey((void*)pVal, &keyLen);
×
815
    int32_t     idx = 0;
×
816
    char        nameBuf[512] = {0};
×
817

818
    if (strncmp(pKey, p, strlen(p)) == 0) {
×
819

820
      if (strncmp(&pKey[keyLen - 4], "_col", 4) == 0) {
×
UNCOV
821
        continue;
×
822
      }
823

824
      int32_t ret = sscanf(pKey, "dynamic_real_%d", &idx);
×
825
      if (ret == 0) {
×
826
        continue;
×
827
      }
828

UNCOV
829
      memcpy(nameBuf, pKey, keyLen);
×
830
      strncpy(&nameBuf[keyLen], "_col", strlen("_col"));
×
831

832
      void* pCol = taosHashGet(pHashMap, nameBuf, strlen(nameBuf));
×
833
      if (pCol == NULL) {
×
834
        char* pTmp = taosStrndupi(pKey, keyLen);
×
835
        qError("%s dynamic real column related:%s column name:%s not specified", id, pTmp, nameBuf);
×
836
        
UNCOV
837
        taosMemoryFree(pTmp);
×
UNCOV
838
        code = TSDB_CODE_INVALID_PARA;
×
839
        goto _end;
×
840
      } else {
841
        // build dynamic_real_feature
842
        SColFutureData d = {.pName = taosStrndupi(pCol, taosHashGetValueSize(pCol))};
×
843
        if (d.pName == NULL) {
×
UNCOV
844
          qError("%s failed to clone the future dynamic real column name:%s", id, (char*) pCol);
×
UNCOV
845
          code = terrno;
×
846
          goto _end;
×
847
        }
848

849
        int32_t index = -1;
×
UNCOV
850
        for (int32_t i = 0; i < taosArrayGetSize(pSupp->pCovariateSlotList); ++i) {
×
851
          SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, i);
×
852
          if (strcmp(pColx->name, d.pName) == 0) {
×
853
            index = i;
×
UNCOV
854
            break;
×
855
          }
856
        }
857

858
        if (index == -1) {
×
UNCOV
859
          qError("%s not found the required future dynamic real column:%s", id, d.pName);
×
860
          code = TSDB_CODE_INVALID_PARA;
×
UNCOV
861
          taosMemoryFree(d.pName);
×
862
          goto _end;
×
863
        }
864

865
        SColumn* pColx = taosArrayGet(pSupp->pCovariateSlotList, index);
×
UNCOV
866
        d.data.info.slotId = pColx->slotId;
×
UNCOV
867
        d.data.info.type = pColx->type;
×
868
        d.data.info.bytes = pColx->bytes;
×
869

870
        int32_t len = taosHashGetValueSize((void*)pVal);
×
871
        char*   buf = taosStrndupi(pVal, len);
×
872
        int32_t unused = strdequote((char*)buf);
×
873

UNCOV
874
        int32_t num = 0;
×
UNCOV
875
        char**  pList = strsplit(buf, " ", &num);
×
876
        if (num != pSupp->forecastRows) {
×
877
          qError("%s the rows:%d of future dynamic real column data is not equalled to the forecasting rows:%" PRId64,
×
878
                 id, num, pSupp->forecastRows);
879
          code = TSDB_CODE_INVALID_PARA;
×
880

881
          taosMemoryFree(d.pName);
×
UNCOV
882
          taosMemoryFree(pList);
×
UNCOV
883
          taosMemoryFree(buf);
×
884
          goto _end;
×
885
        }
886

887
        d.numOfRows = num;
×
888

889
        code = colInfoDataEnsureCapacity(&d.data, num, true);
×
890
        if (code != 0) {
×
UNCOV
891
          qError("%s failed to prepare buffer, code:%s", id, tstrerror(code));
×
892
          goto _end;
×
893
        }
894

895
        for (int32_t j = 0; j < num; ++j) {
×
UNCOV
896
          char* ps = NULL;
×
897
          if (j == 0) {
×
898
            ps = strstr(pList[j], "[") + 1;
×
899
          } else {
900
            ps = pList[j];
×
901
          }
902

903
          code = 0;
×
904

905
          switch(pColx->type) {
×
UNCOV
906
            case TSDB_DATA_TYPE_TINYINT: {
×
907
              int8_t t1 = taosStr2Int8(ps, NULL, 10);
×
908
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
909
              break;
×
910
            }
UNCOV
911
            case TSDB_DATA_TYPE_SMALLINT: {
×
912
              int16_t t1 = taosStr2Int16(ps, NULL, 10);
×
913
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
914
              break;
×
915
            }
UNCOV
916
            case TSDB_DATA_TYPE_INT: {
×
917
              int32_t t1 = taosStr2Int32(ps, NULL, 10);
×
918
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
919
              break;
×
920
            }
UNCOV
921
            case TSDB_DATA_TYPE_BIGINT: {
×
922
              int64_t t1 = taosStr2Int64(ps, NULL, 10);
×
923
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
924
              break;
×
925
            }
UNCOV
926
            case TSDB_DATA_TYPE_FLOAT: {
×
927
              float t1 = taosStr2Float(ps, NULL);
×
928
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
929
              break;
×
930
            }
UNCOV
931
            case TSDB_DATA_TYPE_DOUBLE: {
×
932
              double t1 = taosStr2Double(ps, NULL);
×
933
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
934
              break;
×
935
            }
UNCOV
936
            case TSDB_DATA_TYPE_UTINYINT: {
×
UNCOV
937
              uint8_t t1 = taosStr2UInt8(ps, NULL, 10);
×
UNCOV
938
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
939
              break;
×
940
            }
UNCOV
941
            case TSDB_DATA_TYPE_USMALLINT: {
×
UNCOV
942
              uint16_t t1 = taosStr2UInt16(ps, NULL, 10);
×
UNCOV
943
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
944
              break;
×
945
            }
946
            case TSDB_DATA_TYPE_UINT: {
×
947
              uint32_t t1 = taosStr2UInt32(ps, NULL, 10);
×
UNCOV
948
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
UNCOV
949
              break;
×
950
            }
951
            case TSDB_DATA_TYPE_UBIGINT: {
×
952
              uint64_t t1 = taosStr2UInt64(ps, NULL, 10);
×
953
              code = colDataSetVal(&d.data, j, (const char*)&t1, false);
×
954
              break;
×
955
            }
956
          }
957

UNCOV
958
          if (code != 0) {
×
UNCOV
959
            break;
×
960
          }
961
        }
962

UNCOV
963
        taosMemoryFree(pList);
×
UNCOV
964
        taosMemoryFree(buf);
×
965
        if (code != 0) {
×
966
          goto _end;
×
967
        }
968

UNCOV
969
        void* noret = taosArrayPush(pSupp->pDynamicRealList, &d);
×
970
        if (noret == NULL) {
×
971
          qError("%s failed to add column info in dynamic real column info", id);
×
UNCOV
972
          code = terrno;
×
973
          goto _end;
×
974
        }
975
      }
976
    }
977
  }
978

979
_end:
×
UNCOV
980
  taosHashCleanup(pHashMap);
×
981
  return code;
×
982
}
983

984
static int32_t forecastCreateBuf(SForecastSupp* pSupp, const char* pId) {
×
985
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
UNCOV
986
  int64_t       ts = taosGetTimestampNs();
×
987
  int32_t       index = 0;
×
988

989
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
×
UNCOV
990
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-forecast-%p-%" PRId64, tsTempDir, pSupp, ts);
×
991

992
  int32_t numOfCols = taosArrayGetSize(pSupp->pCovariateSlotList) + 2;
×
993

994
  int32_t code = tsosAnalyBufOpen(pBuf, numOfCols, pId);
×
995
  if (code != 0) goto _OVER;
×
996

997
  code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
×
998
  if (code != 0) goto _OVER;
×
999

UNCOV
1000
  code = taosAnalyBufWriteColMeta(pBuf, index++, pSupp->targetValType, "val");
×
1001
  if (code != 0) goto _OVER;
×
1002

1003
  int32_t numOfDynamicReal = taosArrayGetSize(pSupp->pDynamicRealList);
×
UNCOV
1004
  int32_t numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
×
1005

UNCOV
1006
  if (numOfPastDynamicReal >= numOfDynamicReal) {
×
UNCOV
1007
    for(int32_t i = 0; i < numOfDynamicReal; ++i) {
×
1008
      SColFutureData* pData = taosArrayGet(pSupp->pDynamicRealList, i);
×
1009

1010
      for(int32_t k = 0; k < taosArrayGetSize(pSupp->pCovariateSlotList); ++k) {
×
UNCOV
1011
        SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, k);
×
1012
        if (strcmp(pCol->name, pData->pName) == 0) {
×
1013
          char name[128] = {0};
×
UNCOV
1014
          (void) tsnprintf(name, tListLen(name), "dynamic_real_%d", i + 1);
×
1015
          code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
×
1016
          if (code != 0) {
×
1017
            goto _OVER;
×
1018
          }
1019

UNCOV
1020
          memcpy(&pData->col, pCol, sizeof(SColumn));
×
UNCOV
1021
          taosArrayRemove(pSupp->pCovariateSlotList, k);
×
1022
          break;
×
1023
        }
1024
      }
1025
    }
1026

1027
    numOfPastDynamicReal = taosArrayGetSize(pSupp->pCovariateSlotList);
×
UNCOV
1028
    for (int32_t j = 0; j < numOfPastDynamicReal; ++j) {
×
UNCOV
1029
      SColumn* pCol = taosArrayGet(pSupp->pCovariateSlotList, j);
×
1030

1031
      char name[128] = {0};
×
1032
      (void)tsnprintf(name, tListLen(name), "past_dynamic_real_%d", j + 1);
×
1033

UNCOV
1034
      code = taosAnalyBufWriteColMeta(pBuf, index++, pCol->type, name);
×
1035
      if (code) {
×
UNCOV
1036
        goto _OVER;
×
1037
      }
1038
    }
1039
  }
1040

1041
  code = taosAnalyBufWriteDataBegin(pBuf);
×
1042
  if (code != 0) goto _OVER;
×
1043

UNCOV
1044
  for (int32_t i = 0; i < pSupp->numOfInputCols; ++i) {
×
1045
    code = taosAnalyBufWriteColBegin(pBuf, i);
×
UNCOV
1046
    if (code != 0) goto _OVER;
×
1047
  }
1048

1049
_OVER:
×
1050
  if (code != 0) {
×
UNCOV
1051
    (void)taosAnalyBufClose(pBuf);
×
1052
    taosAnalyBufDestroy(pBuf);
×
1053
  }
1054
  return code;
×
1055
}
1056

1057
static int32_t resetForecastOperState(SOperatorInfo* pOper) {
×
1058
  int32_t code = 0, lino = 0;
×
UNCOV
1059
  SForecastOperatorInfo* pInfo = pOper->info;
×
1060
  const char*            pId = pOper->pTaskInfo->id.str;
×
UNCOV
1061
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pOper->pPhyNode;
×
1062
  SExecTaskInfo* pTaskInfo = pOper->pTaskInfo;
×
1063

1064
  pOper->status = OP_NOT_OPENED;
×
1065

UNCOV
1066
  blockDataCleanup(pInfo->pRes);
×
1067

UNCOV
1068
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
×
1069
  pInfo->forecastSupp.pCovariateSlotList = NULL;
×
1070

1071
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
×
1072

UNCOV
1073
  cleanupExprSupp(&pOper->exprSupp);
×
1074
  cleanupExprSupp(&pInfo->scalarSup);
×
1075

1076
  int32_t                 numOfExprs = 0;
×
1077
  SExprInfo*              pExprInfo = NULL;
×
1078

UNCOV
1079
  TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs));
×
1080

1081
  TAOS_CHECK_EXIT(initExprSupp(&pOper->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore));
×
1082

1083
  TAOS_CHECK_EXIT(filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOper->exprSupp.pFilterInfo, 0,
×
1084
                            pTaskInfo->pStreamRuntimeInfo));
1085

1086
  TAOS_CHECK_EXIT(forecastParseInput(&pInfo->forecastSupp, pForecastPhyNode->pFuncs, pId));
×
1087

UNCOV
1088
  TAOS_CHECK_EXIT(forecastParseOutput(&pInfo->forecastSupp, &pOper->exprSupp));
×
1089

UNCOV
1090
  TAOS_CHECK_EXIT(forecastParseOpt(&pInfo->forecastSupp, pId));
×
UNCOV
1091
  TAOS_CHECK_EXIT(forecastCreateBuf(&pInfo->forecastSupp, pId));
×
1092

UNCOV
1093
  if (pForecastPhyNode->pExprs != NULL) {
×
UNCOV
1094
    int32_t    num = 0;
×
1095
    SExprInfo* pScalarExprInfo = NULL;
×
UNCOV
1096
    TAOS_CHECK_EXIT(createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num));
×
1097
    TAOS_CHECK_EXIT(initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore));
×
1098
  }
1099

1100
  initResultSizeInfo(&pOper->resultInfo, 4096);
×
1101

1102
_exit:
×
1103

1104
  if (code) {
×
1105
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1106
  }
1107

1108
  return code;  
×
1109
}
1110

1111

1112

1113

1114
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
×
1115
                                   SOperatorInfo** pOptrInfo) {
UNCOV
1116
  QRY_PARAM_CHECK(pOptrInfo);
×
1117

1118
  int32_t                code = 0;
×
UNCOV
1119
  int32_t                lino = 0;
×
1120
  SForecastOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SForecastOperatorInfo));
×
1121
  SOperatorInfo*         pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
UNCOV
1122
  if (pOperator == NULL || pInfo == NULL) {
×
1123
    code = terrno;
×
1124
    goto _error;
×
1125
  }
1126

1127
  pOperator->pPhyNode = pPhyNode;
×
1128

1129
  const char*             pId = pTaskInfo->id.str;
×
1130
  SForecastSupp*          pSupp = &pInfo->forecastSupp;
×
UNCOV
1131
  SForecastFuncPhysiNode* pForecastPhyNode = (SForecastFuncPhysiNode*)pPhyNode;
×
UNCOV
1132
  SExprSupp*              pExprSup = &pOperator->exprSupp;
×
1133
  int32_t                 numOfExprs = 0;
×
1134
  SExprInfo*              pExprInfo = NULL;
×
1135

UNCOV
1136
  code = createExprInfo(pForecastPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
×
1137
  QUERY_CHECK_CODE(code, lino, _error);
×
1138

UNCOV
1139
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
1140
  QUERY_CHECK_CODE(code, lino, _error);
×
1141

UNCOV
1142
  if (pForecastPhyNode->pExprs != NULL) {
×
1143
    int32_t    num = 0;
×
1144
    SExprInfo* pScalarExprInfo = NULL;
×
UNCOV
1145
    code = createExprInfo(pForecastPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
×
1146
    QUERY_CHECK_CODE(code, lino, _error);
×
1147

UNCOV
1148
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
1149
    QUERY_CHECK_CODE(code, lino, _error);
×
1150
  }
1151

1152
  code = filterInitFromNode((SNode*)pForecastPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
×
UNCOV
1153
                            pTaskInfo->pStreamRuntimeInfo);
×
1154
  QUERY_CHECK_CODE(code, lino, _error);
×
1155

1156
  code = forecastParseInput(pSupp, pForecastPhyNode->pFuncs, pId);
×
UNCOV
1157
  QUERY_CHECK_CODE(code, lino, _error);
×
1158

1159
  code = forecastParseOutput(pSupp, pExprSup);
×
UNCOV
1160
  QUERY_CHECK_CODE(code, lino, _error);
×
1161

1162
  code = forecastParseOpt(pSupp, pId);
×
UNCOV
1163
  QUERY_CHECK_CODE(code, lino, _error);
×
1164

1165
  code = forecastCreateBuf(pSupp, pId);
×
UNCOV
1166
  QUERY_CHECK_CODE(code, lino, _error);
×
1167

UNCOV
1168
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
1169

1170
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
×
UNCOV
1171
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
×
1172

1173
  setOperatorInfo(pOperator, "ForecastOperator", QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC, false, OP_NOT_OPENED, pInfo,
×
1174
                  pTaskInfo);
UNCOV
1175
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, forecastNext, NULL, destroyForecastInfo, optrDefaultBufFn,
×
1176
                                         NULL, optrDefaultGetNextExtFn, NULL);
1177

1178
  setOperatorResetStateFn(pOperator, resetForecastOperState);
×
1179

UNCOV
1180
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
×
UNCOV
1181
  QUERY_CHECK_CODE(code, lino, _error);
×
1182

1183
  code = appendDownstream(pOperator, &downstream, 1);
×
1184
  QUERY_CHECK_CODE(code, lino, _error);
×
1185

1186
  *pOptrInfo = pOperator;
×
1187

1188
  qDebug("%s forecast env is initialized, option:%s", pId, pSupp->pOptions);
×
1189
  return TSDB_CODE_SUCCESS;
×
1190

1191
_error:
×
1192
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1193
    qError("%s %s failed at line %d since %s", pId, __func__, lino, tstrerror(code));
×
1194
  }
1195
  if (pInfo != NULL) destroyForecastInfo(pInfo);
×
UNCOV
1196
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1197
  pTaskInfo->code = code;
×
1198
  return code;
×
1199
}
1200

UNCOV
1201
static void destroyColFutureData(void* p) {
×
1202
  SColFutureData* pData = p;
×
1203
  taosMemoryFree(pData->pName);
×
1204
  colDataDestroy(&pData->data);
×
1205
}
×
1206

UNCOV
1207
static void destroyForecastInfo(void* param) {
×
UNCOV
1208
  SForecastOperatorInfo* pInfo = (SForecastOperatorInfo*)param;
×
1209

UNCOV
1210
  blockDataDestroy(pInfo->pRes);
×
UNCOV
1211
  pInfo->pRes = NULL;
×
1212

UNCOV
1213
  taosArrayDestroy(pInfo->forecastSupp.pCovariateSlotList);
×
UNCOV
1214
  pInfo->forecastSupp.pCovariateSlotList = NULL;
×
1215

UNCOV
1216
  taosArrayDestroyEx(pInfo->forecastSupp.pDynamicRealList, destroyColFutureData);
×
UNCOV
1217
  pInfo->forecastSupp.pDynamicRealList = NULL;
×
1218

UNCOV
1219
  taosMemoryFree(pInfo->forecastSupp.pOptions);
×
1220

UNCOV
1221
  cleanupExprSupp(&pInfo->scalarSup);
×
UNCOV
1222
  taosAnalyBufDestroy(&pInfo->forecastSupp.analyBuf);
×
UNCOV
1223
  taosMemoryFreeClear(param);
×
UNCOV
1224
}
×
1225

1226
#else
1227

1228
int32_t createForecastOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
1229
                                   SOperatorInfo** pOptrInfo) {
1230
  return TSDB_CODE_OPS_NOT_SUPPORT;
1231
}
1232

1233
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc