• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4869

26 Nov 2025 05:46AM UTC coverage: 64.539% (-0.09%) from 64.629%
#4869

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

771 of 945 new or added lines in 33 files covered. (81.59%)

3214 existing lines in 124 files now uncovered.

158203 of 245129 relevant lines covered (64.54%)

113224023.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/imputationoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "osMemPool.h"
21
#include "osMemory.h"
22
#include "querytask.h"
23
#include "tanalytics.h"
24
#include "taoserror.h"
25
#include "tcommon.h"
26
#include "tdatablock.h"
27
#include "tdef.h"
28
#include "thash.h"
29
#include "tjson.h"
30
#include "tmsg.h"
31
#include "types.h"
32

33
#ifdef USE_ANALYTICS
34

35
#define FREQ_STR     "freq"
36
#define RADIUS_STR   "radius"
37
#define LAGSTART_STR "lag_start"
38
#define LAGEND_STR   "lag_end"
39

40
typedef struct {
41
  int64_t      timeout;
42
  int8_t       wncheck;
43
  SAnalyticBuf analyBuf;
44
  int32_t      numOfRows;
45
  int32_t      numOfBlocks;
46
  int32_t      numOfCols;
47
  STimeWindow  win;
48
  uint64_t     groupId;
49
  SArray      *pBlocks;    // SSDataBlock*
50
} SBaseSupp;
51

52
typedef struct {
53
  SBaseSupp    base;
54
  int32_t      targetSlot;
55
  int32_t      targetType;
56
  int32_t      tsSlot;
57
  int32_t      tsPrecision;
58
  int32_t      resTsSlot;
59
  int32_t      resMarkSlot;
60
  char         freq[64];         // frequency of data
61
} SImputationSupp;
62

63
typedef struct {
64
  int32_t   radius;
65
  int32_t   lagStart;
66
  int32_t   lagEnd;
67
  SBaseSupp base;
68

69
  int32_t targetSlot1;
70
  int32_t targetSlot2;
71

72
  int32_t targetSlot1Type;
73
  int32_t targetSlot2Type;
74
} SCorrelationSupp;
75

76
typedef struct {
77
  SOptrBasicInfo   binfo;
78
  SExprSupp        scalarSup;
79
  char             algoName[TSDB_ANALYTIC_ALGO_NAME_LEN];
80
  char             algoUrl[TSDB_ANALYTIC_ALGO_URL_LEN];
81
  char*            options;
82
  SColumn          targetCol;
83
  int32_t          resTargetSlot;
84
  int32_t          analysisType;
85
  SImputationSupp  imputatSup;
86
  SCorrelationSupp corrSupp;
87
} SAnalysisOperatorInfo;
88

89
static void    analysisDestroyOperatorInfo(void* param);
90
static int32_t imputationNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
91
static int32_t doAnalysis(SAnalysisOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo);
92
static int32_t doCacheBlock(SAnalysisOperatorInfo* pInfo, SSDataBlock* pBlock, const char* id);
93
static int32_t doParseInputForImputation(SAnalysisOperatorInfo* pInfo, SImputationSupp* pSupp, SNodeList* pFuncs, const char* id);
94
static int32_t doParseInputForDtw(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id);
95
static int32_t doParseInputForTlcc(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id);
96
static int32_t doSetResSlot(SAnalysisOperatorInfo* pInfo, SExprSupp* pExprSup);
97
static int32_t doParseOption(SAnalysisOperatorInfo* pInfo, const char* id);
98
static int32_t doCreateBuf(SAnalysisOperatorInfo* pInfo, const char* pId);
99
static int32_t estResultRowsAfterImputation(int32_t rows, int64_t skey, int64_t ekey, int32_t prec, const char* pFreq, const char* id);
100
static void    doInitImputOptions(SImputationSupp* pSupp);
101
static void    doInitDtwOptions(SCorrelationSupp* pSupp);
102
static int32_t parseFreq(SImputationSupp* pSupp, SHashObj* pHashMap, const char* id);
103
static void    parseRadius(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id);
104

105
int32_t createGenericAnalysisOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
×
106
                                          SOperatorInfo** pOptrInfo) {
107
  QRY_PARAM_CHECK(pOptrInfo);
×
108

109
  int32_t                    code = TSDB_CODE_SUCCESS;
×
110
  int32_t                    lino = 0;
×
111
  size_t                     keyBufSize = 0;
×
112
  int32_t                    num = 0;
×
113
  SExprInfo*                 pExprInfo = NULL;
×
114
  int32_t                    numOfExprs = 0;
×
115
  const char*                id = GET_TASKID(pTaskInfo);
×
116
  SGenericAnalysisPhysiNode* pAnalysisNode = (SGenericAnalysisPhysiNode*)physiNode;
×
117
  SExprSupp*                 pExprSup = NULL;
×
118

119
  SAnalysisOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAnalysisOperatorInfo));
×
120
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
121
  if (pOperator == NULL || pInfo == NULL) {
×
122
    code = terrno;
×
123
    goto _error;
×
124
  }
125

126
  pAnalysisNode = (SGenericAnalysisPhysiNode*)physiNode;
×
127
  pExprSup = &pOperator->exprSupp;
×
128

129
  code = createExprInfo(pAnalysisNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
×
130
  QUERY_CHECK_CODE(code, lino, _error);
×
131

132
  code = initExprSupp(pExprSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
×
133
  QUERY_CHECK_CODE(code, lino, _error);
×
134

135
  if (pAnalysisNode->pExprs != NULL) {
×
136
    SExprInfo* pScalarExprInfo = NULL;
×
137
    code = createExprInfo(pAnalysisNode->pExprs, NULL, &pScalarExprInfo, &num);
×
138
    QUERY_CHECK_CODE(code, lino, _error);
×
139

140
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
×
141
    QUERY_CHECK_CODE(code, lino, _error);
×
142
  }
143

144
  code = filterInitFromNode((SNode*)pAnalysisNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0, pTaskInfo->pStreamRuntimeInfo);
×
145
  QUERY_CHECK_CODE(code, lino, _error);
×
146

147
  for (int32_t i = 0; i < numOfExprs; ++i) {
×
148
    int32_t type = pExprInfo[i].pExpr->_function.functionType;
×
149
    if (type == FUNCTION_TYPE_IMPUTATION || type == FUNCTION_TYPE_DTW || type == FUNCTION_TYPE_DTW_PATH ||
×
150
        type == FUNCTION_TYPE_TLCC) {
151
      pInfo->analysisType = type;
×
152
      break;
×
153
    }
154
  }
155

156
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
157
    doInitImputOptions(&pInfo->imputatSup);
×
158
    code = doParseInputForImputation(pInfo, &pInfo->imputatSup, pAnalysisNode->pFuncs, id);
×
159
    QUERY_CHECK_CODE(code, lino, _error);
×
160
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
161
    doInitDtwOptions(&pInfo->corrSupp);
×
162
    code = doParseInputForDtw(pInfo, &pInfo->corrSupp, pAnalysisNode->pFuncs, id);
×
163
    QUERY_CHECK_CODE(code, lino, _error);
×
164
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
165
    doInitDtwOptions(&pInfo->corrSupp);
×
166
    code = doParseInputForTlcc(pInfo, &pInfo->corrSupp, pAnalysisNode->pFuncs, id);
×
167
    QUERY_CHECK_CODE(code, lino, _error);
×
168
  } else {
169
    code = TSDB_CODE_INVALID_PARA;
×
170
    goto _error;
×
171
  }
172

173
  code = doParseOption(pInfo, id);
×
174
  QUERY_CHECK_CODE(code, lino, _error);
×
175

176
  code = doSetResSlot(pInfo, pExprSup);
×
177
  QUERY_CHECK_CODE(code, lino, _error);
×
178

179
  code = doCreateBuf(pInfo, id);
×
180
  QUERY_CHECK_CODE(code, lino, _error);
×
181

182
  initResultSizeInfo(&pOperator->resultInfo, 4096);
×
183

184
  pInfo->binfo.pRes = createDataBlockFromDescNode(physiNode->pOutputDataBlockDesc);
×
185
  QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
×
186

187
  setOperatorInfo(pOperator, "GenericAnalyOptr", QUERY_NODE_PHYSICAL_PLAN_ANALYSIS_FUNC, false, OP_NOT_OPENED,
×
188
                  pInfo, pTaskInfo);
189
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, imputationNext, NULL, analysisDestroyOperatorInfo,
×
190
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
191

192
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
×
193
  QUERY_CHECK_CODE(code, lino, _error);
×
194

195
  code = appendDownstream(pOperator, &downstream, 1);
×
196
  QUERY_CHECK_CODE(code, lino, _error);
×
197

198
  *pOptrInfo = pOperator;
×
199

200
  qDebug("%s forecast env is initialized, option:%s", id, pInfo->options);
×
201
  return TSDB_CODE_SUCCESS;
×
202

203
_error:
×
204
  if (code != TSDB_CODE_SUCCESS) {
×
205
    qError("%s %s failed at line %d since %s", id, __func__, lino, tstrerror(code));
×
206
  }
207

208
  if (pInfo != NULL) analysisDestroyOperatorInfo(pInfo);
×
209
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
210
  pTaskInfo->code = code;
×
211
  return code;
×
212
}
213

214
static int32_t getBaseSupp(SAnalysisOperatorInfo* pInfo, SBaseSupp** ppSupp, const char* pId) {
×
215
  int32_t code = 0;
×
216
  *ppSupp = NULL;
×
217

218
  switch (pInfo->analysisType) {
×
219
    case FUNCTION_TYPE_IMPUTATION:
×
220
      *ppSupp = &pInfo->imputatSup.base;
×
221
      break;
×
222
    case FUNCTION_TYPE_DTW:
×
223
    case FUNCTION_TYPE_DTW_PATH:
224
    case FUNCTION_TYPE_TLCC:
225
      *ppSupp = &pInfo->corrSupp.base;
×
226
      break;
×
227
    default:
×
228
      // Handle error: unknown analysis type
229
      code = TSDB_CODE_INVALID_PARA;
×
230
      qError("%s unknown analysis type: %d", pId, pInfo->analysisType);
×
231
  }
232

233
  return code;
×
234
}
235

236
static int32_t imputationNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
237
  int32_t                code = TSDB_CODE_SUCCESS;
×
238
  int32_t                lino = 0;
×
239
  SAnalysisOperatorInfo* pInfo = pOperator->info;
×
240
  SExecTaskInfo*         pTaskInfo = pOperator->pTaskInfo;
×
241
  SOptrBasicInfo*        pBInfo = &pInfo->binfo;
×
242
  SSDataBlock*           pRes = pInfo->binfo.pRes;
×
243
  int64_t                st = taosGetTimestampUs();
×
244
  const char*            idstr = GET_TASKID(pTaskInfo);
×
245
  SBaseSupp*             pSupp = NULL;
×
246

247
  code = getBaseSupp(pInfo, &pSupp, idstr);
×
248
  QUERY_CHECK_CODE(code, lino, _end);
×
249

250
  int32_t numOfBlocks = taosArrayGetSize(pSupp->pBlocks);
×
251
  blockDataCleanup(pRes);
×
252

253
  while (1) {
×
254
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
255
    if (pBlock == NULL) {
×
256
      break;
×
257
    }
258

259
    if (pSupp->groupId == 0 || pSupp->groupId == pBlock->info.id.groupId) {
×
260
      pSupp->groupId = pBlock->info.id.groupId;
×
261
      numOfBlocks++;
×
262
      code = doCacheBlock(pInfo, pBlock, idstr);
×
263

264
      qDebug("group:%" PRId64 ", blocks:%d, rows:%" PRId64 ", total rows:%d", pSupp->groupId, numOfBlocks,
×
265
             pBlock->info.rows, pSupp->numOfRows);
266
      QUERY_CHECK_CODE(code, lino, _end);
×
267
    } else {
268
      qDebug("group:%" PRId64 ", read completed for new group coming, blocks:%d", pSupp->groupId, numOfBlocks);
×
269
      code = doAnalysis(pInfo, pTaskInfo);
×
270
      QUERY_CHECK_CODE(code, lino, _end);
×
271

272
      pSupp->groupId = pBlock->info.id.groupId;
×
273
      numOfBlocks = 1;
×
274
      qDebug("group:%" PRId64 ", new group, rows:%" PRId64 ", total rows:%d", pSupp->groupId, pBlock->info.rows,
×
275
             pSupp->numOfRows);
276
      code = doCacheBlock(pInfo, pBlock, idstr);
×
277
      QUERY_CHECK_CODE(code, lino, _end);
×
278
    }
279

280
    if (pRes->info.rows > 0) {
×
281
      (*ppRes) = pRes;
×
282
      qDebug("group:%" PRId64 ", return to upstream, blocks:%d", pRes->info.id.groupId, numOfBlocks);
×
283
      return code;
×
284
    }
285
  }
286

287
  if (numOfBlocks > 0) {
×
288
    qDebug("group:%" PRId64 ", read finish, blocks:%d", pSupp->groupId, numOfBlocks);
×
289
    code = doAnalysis(pInfo, pTaskInfo);
×
290
  }
291

292
  int64_t cost = taosGetTimestampUs() - st;
×
293
  qDebug("%s all groups finished, cost:%" PRId64 "us", idstr, cost);
×
294

295
_end:
×
296
  if (code != TSDB_CODE_SUCCESS) {
×
297
    qError("%s %s failed at line %d since %s", idstr, __func__, lino, tstrerror(code));
×
298
    pTaskInfo->code = code;
×
299
    T_LONG_JMP(pTaskInfo->env, code);
×
300
  }
301

302
  (*ppRes) = (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes;
×
303
  return code;
×
304
}
305

306
static void analysisDestroyOperatorInfo(void* param) {
×
307
  SAnalysisOperatorInfo* pInfo = (SAnalysisOperatorInfo*)param;
×
308
  if (pInfo == NULL) return;
×
309

310
  cleanupBasicInfo(&pInfo->binfo);
×
311
  cleanupExprSupp(&pInfo->scalarSup);
×
312

313
  SArray* pBlocks = NULL;
×
314
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
315
    pBlocks = pInfo->imputatSup.base.pBlocks;
×
316
  } else {
317
    pBlocks = pInfo->corrSupp.base.pBlocks;
×
318
  }
319

320
  for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
×
321
    SSDataBlock* pBlock = taosArrayGetP(pBlocks, i);
×
322
    blockDataDestroy(pBlock);
×
323
  }
324

325
  taosArrayDestroy(pBlocks);
×
326
  taosMemoryFreeClear(param);
×
327
}
328

329
static int32_t doCacheBlockForImputation(SImputationSupp* pSupp, const char* id, SSDataBlock* pBlock) {
×
330
  SAnalyticBuf* pBuf = &pSupp->base.analyBuf;
×
331
  int32_t code = 0;
×
332

333
  if (pSupp->base.numOfRows > ANALY_IMPUTATION_INPUT_MAX_ROWS) {
×
334
    qError("%s too many rows for imputation, maximum allowed:%d, input:%d", id, ANALY_IMPUTATION_INPUT_MAX_ROWS,
×
335
           pSupp->base.numOfRows);
336
    return TSDB_CODE_ANA_ANODE_TOO_MANY_ROWS;
×
337
  }
338

339
  pSupp->base.numOfBlocks++;
×
340
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->base.numOfBlocks, pBlock, pBlock->info.rows);
×
341

342
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
343
    SColumnInfoData* pValCol = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot);
×
344
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSupp->tsSlot);
×
345
    if (pTsCol == NULL || pValCol == NULL) {
×
346
      break;
347
    }
348

349
    int32_t index = 0;
×
350

351
    int64_t ts = ((TSKEY*)pTsCol->pData)[j];
×
352
    char*   val = colDataGetData(pValCol, j);
×
353
    int16_t valType = pValCol->info.type;
×
354

355
    pSupp->base.win.skey = MIN(pSupp->base.win.skey, ts);
×
356
    pSupp->base.win.ekey = MAX(pSupp->base.win.ekey, ts);
×
357

358
    pSupp->base.numOfRows++;
×
359

360
    // write the primary time stamp column data
361
    code = taosAnalyBufWriteColData(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, &ts);
×
362
    if (TSDB_CODE_SUCCESS != code) {
×
363
      qError("%s failed to write ts in buf, code:%s", id, tstrerror(code));
×
364
      return code;
×
365
    }
366

367
    // write the main column for imputation
368
    code = taosAnalyBufWriteColData(pBuf, index++, valType, val);
×
369
    if (TSDB_CODE_SUCCESS != code) {
×
370
      qError("%s failed to write val in buf, code:%s", id, tstrerror(code));
×
371
      return code;
×
372
    }
373
  }
374

375
  return code;
×
376
}
377

378
int32_t doCacheBlockForDtw(SCorrelationSupp* pSupp, const char* id, SSDataBlock* pBlock) {
×
379
  SAnalyticBuf* pBuf = &pSupp->base.analyBuf;
×
380
  int32_t code = 0;
×
381

382
  pSupp->base.numOfBlocks++;
×
383
  qDebug("%s block:%d, %p rows:%" PRId64, id, pSupp->base.numOfBlocks, pBlock, pBlock->info.rows);
×
384

385
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
×
386
    SColumnInfoData* pCol1 = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot1);
×
387
    SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, pSupp->targetSlot2);
×
388
    if (pCol1 == NULL || pCol2 == NULL) {
×
389
      break;
390
    }
391

392
    int32_t index = 0;
×
393

394
    char*   val1 = colDataGetData(pCol1, j);
×
395
    char*   val2 = colDataGetData(pCol2, j);
×
396

397
    pSupp->base.numOfRows++;
×
398

399
    // write the primary time stamp column data
400
    code = taosAnalyBufWriteColData(pBuf, index++, pCol1->info.type, val1);
×
401
    if (TSDB_CODE_SUCCESS != code) {
×
402
      qError("%s failed to write col1 in buf, code:%s", id, tstrerror(code));
×
403
      return code;
×
404
    }
405

406
    // write the main column for imputation
407
    code = taosAnalyBufWriteColData(pBuf, index++, pCol2->info.type, val2);
×
408
    if (TSDB_CODE_SUCCESS != code) {
×
409
      qError("%s failed to write col2 in buf, code:%s", id, tstrerror(code));
×
410
      return code;
×
411
    }
412
  }
413

414
  return code;
×
415

416
}
417

418
static int32_t doCacheBlock(SAnalysisOperatorInfo* pInfo, SSDataBlock* pBlock, const char* id) {
×
419
  int32_t       code = TSDB_CODE_SUCCESS;
×
420
  int32_t       lino = 0;
×
421

422
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
423
    code = doCacheBlockForImputation(&pInfo->imputatSup, id, pBlock);
×
424
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH || pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
425
    code = doCacheBlockForDtw(&pInfo->corrSupp, id, pBlock);
×
426
  }
427

428
  return code;
×
429
}
430

431
static int32_t finishBuildRequest(SAnalysisOperatorInfo* pInfo, SBaseSupp* pSupp, const char* id) {
×
432
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
433
  int32_t       code = 0;
×
434

435
  // let's check existed rows for imputation
436
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
437
    if (pSupp->numOfRows < ANALY_IMPUTATION_INPUT_MIN_ROWS) {
×
438
      qError("%s input rows for imputation aren't enough, min required:%d, current:%d", id, ANALY_IMPUTATION_INPUT_MIN_ROWS,
×
439
             pSupp->numOfRows);
440
      return TSDB_CODE_ANA_ANODE_NOT_ENOUGH_ROWS;
×
441
    }
442

443
    code = estResultRowsAfterImputation(pSupp->numOfRows, pSupp->win.skey, pSupp->win.ekey, 
×
444
      pInfo->imputatSup.tsPrecision, pInfo->imputatSup.freq, id);
×
445
    if (code != 0) {
×
446
      return code;
×
447
    }
448
  }
449

450
  for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
451
    code = taosAnalyBufWriteColEnd(pBuf, i);
×
452
    if (code != 0) return code;
×
453
  }
454

455
  code = taosAnalyBufWriteDataEnd(pBuf);
×
456
  if (code != 0) return code;
×
457

458
  code = taosAnalyBufWriteOptStr(pBuf, "option", pInfo->options);
×
459
  if (code != 0) return code;
×
460

461
  code = taosAnalyBufWriteOptStr(pBuf, "algo", pInfo->algoName);
×
462
  if (code != 0) return code;
×
463

464
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
465
    const char* prec = TSDB_TIME_PRECISION_MILLI_STR;
×
466
    int32_t p = pInfo->imputatSup.tsPrecision;
×
467

468
    switch (p) {
×
469
      case TSDB_TIME_PRECISION_MICRO:
×
470
        prec = TSDB_TIME_PRECISION_MICRO_STR;
×
471
        break;
×
472
      case TSDB_TIME_PRECISION_NANO:
×
473
        prec = TSDB_TIME_PRECISION_NANO_STR;
×
474
        break;
×
475
      case TSDB_TIME_PRECISION_SECONDS:
×
476
        prec = "s";
×
477
        break;
×
478
      default:
×
479
        prec = TSDB_TIME_PRECISION_MILLI_STR;
×
480
    }
481

482
    code = taosAnalyBufWriteOptStr(pBuf, "freq", pInfo->imputatSup.freq);
×
483
    if (code != 0) return code;
×
484

485
    code = taosAnalyBufWriteOptStr(pBuf, "prec", prec);
×
486
    if (code != 0) return code;
×
487
  }
488

489
  code = taosAnalyBufWriteOptInt(pBuf, ALGO_OPT_WNCHECK_NAME, pSupp->wncheck);
×
490
  if (code != 0) return code;
×
491

492
  code = taosAnalyBufClose(pBuf);
×
493
  return code;
×
494
}
495

496
static int32_t buildDtwPathResult(SJson* pathJson, SColumnInfoData* pResTargetCol, const char* pId, char* buf,
×
497
                                  int32_t bufSize) {
498
  int32_t pair = tjsonGetArraySize(pathJson);
×
499
  if (pair != 2) {
×
500
    qError("%s invalid path data, should be array of pair", pId);
×
501
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
502
  }
503

504
  SJson* first = tjsonGetArrayItem(pathJson, 0);
×
505
  SJson* second = tjsonGetArrayItem(pathJson, 1);
×
506

507
  int64_t t1, t2;
×
508
  tjsonGetObjectValueBigInt(first, &t1);
×
509
  tjsonGetObjectValueBigInt(second, &t2);
×
510

511
  int32_t n = snprintf(varDataVal(buf), bufSize - VARSTR_HEADER_SIZE, "(%" PRId64 ", %" PRId64 ")", t1, t2);
×
512
  if (n > 0) {
×
513
    varDataSetLen(buf, n);
×
514
    return TSDB_CODE_SUCCESS;
×
515
  } else {
516
    return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
517
  }
518
}
519

520
static int32_t doAnalysisImpl(SAnalysisOperatorInfo* pInfo, SBaseSupp* pSupp, SSDataBlock* pBlock, const char* pId) {
×
521
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
522
  int32_t       resCurRow = pBlock->info.rows;
×
523
  int64_t       tmpI64 = 0;
×
524
  double        tmpDouble = 0;
×
525
  int32_t       code = 0;
×
526

527
  SColumnInfoData* pResTargetCol = taosArrayGet(pBlock->pDataBlock, pInfo->resTargetSlot);
×
528
  if (NULL == pResTargetCol) {
×
529
    return terrno;
×
530
  }
531

532
  SJson* pJson = taosAnalySendReqRetJson(pInfo->algoUrl, ANALYTICS_HTTP_TYPE_POST, pBuf, pSupp->timeout, pId);
×
533
  if (pJson == NULL) {
×
534
    return terrno;
×
535
  }
536

537
  int32_t rows = 0;
×
538
  tjsonGetInt32ValueFromDouble(pJson, "rows", rows, code);
×
539
  if (rows < 0 && code == 0) {
×
540
    code = parseErrorMsgFromAnalyticServer(pJson, pId);
×
541
    tjsonDelete(pJson);
×
542
    return code;
×
543
  }
544

545
  if (code < 0) {
×
546
    goto _OVER;
×
547
  }
548

549
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
550
    SJson* pTarget = tjsonGetObjectItem(pJson, "target");
×
551
    if (pTarget == NULL) goto _OVER;
×
552

553
    SJson* pTsList = tjsonGetObjectItem(pJson, "ts");
×
554
    if (pTsList == NULL) goto _OVER;
×
555

556
    SJson* pMask = tjsonGetObjectItem(pJson, "mask");
×
557
    if (pMask == NULL) goto _OVER;
×
558

559
    int32_t listLen = tjsonGetArraySize(pTarget);
×
560
    if (listLen != rows) {
×
561
      goto _OVER;
×
562
    }
563

564
    if (pInfo->imputatSup.resTsSlot != -1) {
×
565
      SColumnInfoData* pResTsCol = taosArrayGet(pBlock->pDataBlock, pInfo->imputatSup.resTsSlot);
×
566
      if (pResTsCol != NULL) {
×
567
        for (int32_t i = 0; i < rows; ++i) {
×
568
          SJson* tsJson = tjsonGetArrayItem(pTsList, i);
×
569
          tjsonGetObjectValueBigInt(tsJson, &tmpI64);
×
570
          colDataSetInt64(pResTsCol, resCurRow, &tmpI64);
×
571
          resCurRow++;
×
572
        }
573
      }
574
    }
575

576
    resCurRow = pBlock->info.rows;
×
577
    if (pResTargetCol->info.type == TSDB_DATA_TYPE_DOUBLE) {
×
578
      for (int32_t i = 0; i < rows; ++i) {
×
579
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
580
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
581
        colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
582
        resCurRow++;
×
583
      }
584
    } else if (pResTargetCol->info.type == TSDB_DATA_TYPE_INT) {
×
585
      for (int32_t i = 0; i < rows; ++i) {
×
586
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
587
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
588
        int32_t t = tmpDouble;
×
589
        colDataSetInt32(pResTargetCol, resCurRow, &t);
×
590
        resCurRow++;
×
591
      }
592
    } else if (pResTargetCol->info.type == TSDB_DATA_TYPE_FLOAT) {
×
593
      for (int32_t i = 0; i < rows; ++i) {
×
594
        SJson* targetJson = tjsonGetArrayItem(pTarget, i);
×
595
        tjsonGetObjectValueDouble(targetJson, &tmpDouble);
×
596
        float t = tmpDouble;
×
597
        colDataSetFloat(pResTargetCol, resCurRow, &t);
×
598
        resCurRow++;
×
599
      }
600
    }
601

602
    if (pInfo->imputatSup.resMarkSlot != -1) {
×
603
      SColumnInfoData* pResMarkCol = taosArrayGet(pBlock->pDataBlock, pInfo->imputatSup.resMarkSlot);
×
604
      if (pResMarkCol != NULL) {
×
605
        resCurRow = pBlock->info.rows;
×
606
        for (int32_t i = 0; i < rows; ++i) {
×
607
          SJson* markJson = tjsonGetArrayItem(pMask, i);
×
608
          tjsonGetObjectValueBigInt(markJson, &tmpI64);
×
609
          int32_t v = tmpI64;
×
610
          colDataSetInt32(pResMarkCol, resCurRow, &v);
×
611
          resCurRow++;
×
612
        }
613
      }
614
    }
615
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW) {
×
616
    // dtw result example:
617
    // {'option': 'algo=dtw,radius=1', 'rows': 4, 'distance': 1.6, 'path': [(0, 0), (1, 0), (2, 0), (3, 1), (3, 2), (3, 3)]}
618

619
    SJson* pTarget = tjsonGetObjectItem(pJson, "distance");
×
620
    if (pTarget == NULL) goto _OVER;
×
621

622
    tjsonGetObjectValueDouble(pTarget, &tmpDouble);
×
623
    colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
624
    rows = 1;
×
625
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
626
    // dtw path results are the same as above
627
    SJson* pPath = tjsonGetObjectItem(pJson, "path");
×
628
    if (pPath == NULL) goto _OVER;
×
629

630
    int32_t listLen = tjsonGetArraySize(pPath);
×
631
    if (listLen != rows) {
×
632
      goto _OVER;
×
633
    }
634

635
    for (int32_t i = 0; i < rows; ++i) {
×
636
      SJson* pathJson = tjsonGetArrayItem(pPath, i);
×
637

638
      char buf[128 + VARSTR_HEADER_SIZE] = {0};
×
639
      code = buildDtwPathResult(pathJson, pResTargetCol, pId, buf, sizeof(buf));
×
640
      if (code != 0) {
×
641
        qError("%s failed to build path result, code:%s", pId, tstrerror(code));
×
642
        goto _OVER;
×
643
      }
644

645
      code = colDataSetVal(pResTargetCol, resCurRow, buf, false);
×
646
      if (code != 0) {
×
647
        qError("%s failed to set path result to column, code:%s", pId, tstrerror(code));
×
648
        goto _OVER;
×
649
      }
650

651
      resCurRow++;
×
652
    }
653
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
654
    // tlcc result example:
655
    // {'option': 'algo=tlcc,lag_start=-1,lag_end=1', 'rows': 3, 'lags': [-1, 0, 1], 'ccf_vals': [-0.24, 0.9, -0.2]}
656
    SJson* pLags = tjsonGetObjectItem(pJson, "lags");
×
657
    if (pLags == NULL) goto _OVER;
×
658

659
    SJson* pCcfVals = tjsonGetObjectItem(pJson, "ccf_vals");
×
660
    if (pCcfVals == NULL) goto _OVER;
×
661

662
    for (int32_t i = 0; i < rows; ++i) {
×
663
      SJson* ccfValJson = tjsonGetArrayItem(pCcfVals, i);
×
664
      tjsonGetObjectValueDouble(ccfValJson, &tmpDouble);
×
665

666
      int64_t index = 0;
×
667
      SJson*  pLastIndexJson = tjsonGetArrayItem(pLags, i);
×
668
      tjsonGetObjectValueBigInt(pLastIndexJson, &index);
×
669

670
      char    buf[128 + VARSTR_HEADER_SIZE] = {0};
×
671
      int32_t n = snprintf(varDataVal(buf), tListLen(buf) - VARSTR_HEADER_SIZE, "(%" PRId64 ", %.4f)", index, tmpDouble);
×
672
      if (n > 0) {
×
673
        varDataSetLen(buf, n);
×
674
      } else {
675
        return TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
676
      }
677

678
      code = colDataSetVal(pResTargetCol, resCurRow, buf, false);
×
679
      resCurRow++;
×
680
    }
681

682
    tjsonGetObjectValueDouble(pLags, &tmpDouble);
×
683
    colDataSetDouble(pResTargetCol, resCurRow, &tmpDouble);
×
684
  }
685

686
  pBlock->info.rows += rows;
×
687

688
  if (pJson != NULL) tjsonDelete(pJson);
×
689
  return 0;
×
690

691
_OVER:
×
692
  tjsonDelete(pJson);
×
693
  if (code == 0) {
×
694
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
695
  }
696

697
  qError("%s failed to perform analysis finalize since %s", pId, tstrerror(code));
×
698
  return code;
×
699
}
700

701
static int32_t doAnalysis(SAnalysisOperatorInfo* pInfo, SExecTaskInfo* pTaskInfo) {
×
702
  int32_t                  code = TSDB_CODE_SUCCESS;
×
703
  int32_t                  lino = 0;
×
704
  char*                    id = GET_TASKID(pTaskInfo);
×
705
  SSDataBlock*             pRes = pInfo->binfo.pRes;
×
706

707
  SBaseSupp* pSupp = (pInfo->analysisType==FUNCTION_TYPE_IMPUTATION)? &pInfo->imputatSup.base:&pInfo->corrSupp.base;
×
708

709
  if (pSupp->numOfRows <= 0) {
×
710
    taosArrayClear(pSupp->pBlocks);
×
711
    pSupp->numOfRows = 0;
×
712
    return code;
×
713
  }
714

715
  qDebug("%s group:%" PRId64 ", do analysis, rows:%d", id, pSupp->groupId, pSupp->numOfRows);
×
716
  pRes->info.id.groupId = pSupp->groupId;
×
717

718
  code = finishBuildRequest(pInfo, pSupp, id);
×
719
  QUERY_CHECK_CODE(code, lino, _end);
×
720

721
  //   if (pBlock->info.rows < pBlock->info.capacity) {
722
  //   return TSDB_CODE_SUCCESS;
723
  // }
724

725
  // code = blockDataEnsureCapacity(pRes, newRowsNum);
726
  // if (code != TSDB_CODE_SUCCESS) {
727
  //   qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
728
  //   return code;
729
  // }
730

731
  // QUERY_CHECK_CODE(code, lino, _end);
732

733
  code = doAnalysisImpl(pInfo, pSupp, pRes, id);
×
734
  QUERY_CHECK_CODE(code, lino, _end);
×
735

736
  uInfo("%s block:%d, analysis finalize", id, pSupp->numOfBlocks);
×
737

738
_end:
×
739
  pSupp->numOfBlocks = 0;
×
740
  taosAnalyBufDestroy(&pSupp->analyBuf);
×
741
  return code;
×
742
}
743

744
static int32_t doParseInputForImputation(SAnalysisOperatorInfo* pInfo, SImputationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
745
  int32_t code = 0;
×
746
  SNode*  pNode = NULL;
×
747

748
  FOREACH(pNode, pFuncs) {
×
749
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
750
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
751
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
752

753
      if (pFunc->funcType == FUNCTION_TYPE_IMPUTATION) {
×
754
        // code = validInputParams(pFunc, id);
755
        // if (code) {
756
          // return code;
757
        // }
758

759
        pSupp->base.numOfCols = 2;
×
760

761
        if (numOfParam == 2) {
×
762
          // column, ts
763
          SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
764
          SColumnNode* pTsNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
765

766
          pSupp->tsSlot = pTsNode->slotId;
×
767
          pSupp->tsPrecision = pTsNode->node.resType.precision;
×
768
          pSupp->targetSlot = pTargetNode->slotId;
×
769
          pSupp->targetType = pTargetNode->node.resType.type;
×
770

771
          // let's set the moment as the default imputation algorithm
772
          pInfo->options = taosStrdup("algo=moment");
×
773
        } else {
774
          // column, options, ts
775
          SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
776
          if (nodeType(pTargetNode) != QUERY_NODE_COLUMN) {
×
777
            // return error
778
          }
779

780
          bool assignTs = false;
×
781
          bool assignOpt = false;
×
782

783
          pSupp->targetSlot = pTargetNode->slotId;
×
784
          pSupp->targetType = pTargetNode->node.resType.type;
×
785

786
          for (int32_t i = 0; i < pFunc->pParameterList->length; ++i) {
×
787
            SNode* pNode = nodesListGetNode(pFunc->pParameterList, i);
×
788
            if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
789
              SColumnNode* pColNode = (SColumnNode*)pNode;
×
790
              if (pColNode->isPrimTs && (!assignTs)) {
×
791
                pSupp->tsSlot = pColNode->slotId;
×
792
                pSupp->tsPrecision = pColNode->node.resType.precision;
×
793
                assignTs = true;
×
794
                continue;
×
795
              }
796
            } else if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
797
              if (!assignOpt) {
×
798
                SValueNode* pOptNode = (SValueNode*)pNode;
×
799
                pInfo->options = taosStrdup(pOptNode->literal);
×
800
                assignOpt = true;
×
801
                continue;
×
802
              }
803
            }
804
          }
805

806
          if (!assignOpt) {
×
807
            qError("%s option is missing, failed to do imputation", id);
×
808
            code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
809
          }
810
        }
811
      }
812
    }
813
  }
814

815
  if (pInfo->options == NULL) {
×
816
    qError("%s option is missing or clone option string failed, failed to do imputation", id);
×
817
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
818
  }
819

820
  return code;
×
821
}
822

823
int32_t doParseInputForDtw(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
824
  int32_t code = 0;
×
825
  SNode*  pNode = NULL;
×
826

827
  FOREACH(pNode, pFuncs) {
×
828
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
829
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
830
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
831

832
      if (pFunc->funcType == FUNCTION_TYPE_DTW || pFunc->funcType == FUNCTION_TYPE_DTW_PATH) {
×
833
        // code = validInputParams(pFunc, id);
834
        // if (code) {
835
          // return code;
836
        // }
837

838
        pSupp->base.numOfCols = 2;
×
839

840
        if (numOfParam == 2) {
×
841
          // column1, column2
842
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
843
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
844

845
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
846
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
847

848
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
849
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
850

851
          // let's set the default radius to be 1
852
          pInfo->options = taosStrdup("algo=dtw,radius=1");
×
853
        } else if (numOfParam == 3) {
×
854
          // column, options, ts
855

856
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
857
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
858

859
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
860
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
861

862
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
863
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
864

865
          SValueNode* pOptNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
×
866

867
          int32_t bufLen = strlen(pOptNode->literal) + 30;
×
868
          pInfo->options = taosMemoryMalloc(bufLen);
×
869
          if (pInfo->options == NULL) {
×
870
            code = terrno;
×
871
            qError("%s failed to prepare option buffer, code:%s", id, tstrerror(code));
×
872
            return code;
×
873
          }
874

875
          int32_t ret = snprintf(pInfo->options, bufLen, "%s,%s", pOptNode->literal, "algo=dtw");
×
876
          if (ret < 0 || ret >= bufLen) {
×
877
            code = TSDB_CODE_OUT_OF_MEMORY;
×
878
            qError("%s failed to clone options string, code:%s", id, tstrerror(code));
×
879
            return code;
×
880
          }
881
        } else {
882
          qError("%s too many parameters in dtw function", id);
×
883
          code = TSDB_CODE_INVALID_PARA;
×
884
          return code;
×
885
        }
886
      }
887
    }
888
  }
889

890
  if (pInfo->options == NULL) {
×
891
    qError("%s option is missing or clone option string failed, failed to do correlation analysis", id);
×
892
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
893
  }
894

895
  return code;
×
896
}
897

898
static int32_t doParseInputForTlcc(SAnalysisOperatorInfo* pInfo, SCorrelationSupp* pSupp, SNodeList* pFuncs, const char* id) {
×
899
  int32_t code = 0;
×
900
  SNode*  pNode = NULL;
×
901

902
  FOREACH(pNode, pFuncs) {
×
903
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
×
904
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
×
905
      int32_t        numOfParam = LIST_LENGTH(pFunc->pParameterList);
×
906

907
      if (pFunc->funcType == FUNCTION_TYPE_TLCC) {
×
908
        // code = validInputParams(pFunc, id);
909
        // if (code) {
910
          // return code;
911
        // }
912

913
        pSupp->base.numOfCols = 2;
×
914

915
        if (numOfParam == 2) {
×
916
          // column1, column2
917
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
918
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
919

920
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
921
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
922

923
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
924
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
925

926
          // let's set the default radius to be 1
927
          pInfo->options = taosStrdup("algo=tlcc,lag_start=-1,lag_end=1");
×
928
        } else if (numOfParam == 3) {
×
929
          // column, options, ts
930
          // SColumnNode* pTargetNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
931
          // if (nodeType(pTargetNode) != QUERY_NODE_COLUMN) {
932
          // return error
933
          // }
934

935
          SColumnNode* pTargetNode1 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
×
936
          SColumnNode* pTargetNode2 = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
937

938
          pSupp->targetSlot1 = pTargetNode1->slotId;
×
939
          pSupp->targetSlot1Type = pTargetNode1->node.resType.type;
×
940

941
          pSupp->targetSlot2 = pTargetNode2->slotId;
×
942
          pSupp->targetSlot2Type = pTargetNode2->node.resType.type;
×
943

944
          SValueNode* pOptNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
×
945

946
          int32_t bufLen = strlen(pOptNode->literal) + 30;
×
947
          pInfo->options = taosMemoryMalloc(bufLen);
×
948
          if (pInfo->options == NULL) {
×
949
            code = terrno;
×
950
            qError("%s failed to prepare option buffer, code:%s", id, tstrerror(code));
×
951
            return code;
×
952
          }
953

954
          int32_t ret = snprintf(pInfo->options, bufLen, "%s,%s", pOptNode->literal, "algo=tlcc");
×
955
          if (ret < 0 || ret >= bufLen) {
×
956
            code = TSDB_CODE_OUT_OF_MEMORY;
×
957
            qError("%s failed to clone options string, code:%s", id, tstrerror(code));
×
958
            return code;
×
959
          }
960
        } else {
961
          qError("%s too many parameters in tlcc function", id);
×
962
          code = TSDB_CODE_INVALID_PARA;
×
963
          return code;
×
964
        }
965
      }
966
    }
967
  }
968

969
  if (pInfo->options == NULL) {
×
970
    qError("%s option is missing or clone option string failed, failed to do correlation analysis", id);
×
971
    code = TSDB_CODE_ANA_INTERNAL_ERROR;
×
972
  }
973

974
  return code;
×
975
}
976

977
static int32_t doSetResSlot(SAnalysisOperatorInfo* pInfo, SExprSupp* pExprSup) {
×
978
  pInfo->imputatSup.resTsSlot = -1;
×
979
  pInfo->resTargetSlot = -1;
×
980
  pInfo->imputatSup.resMarkSlot = -1;
×
981

982
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
×
983
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
×
984
    int32_t    dstSlot = pExprInfo->base.resSchema.slotId;
×
985
    int32_t    functionType = pExprInfo->pExpr->_function.functionType;
×
986
    if (functionType == FUNCTION_TYPE_IMPUTATION || functionType == FUNCTION_TYPE_DTW ||
×
987
        functionType == FUNCTION_TYPE_DTW_PATH || functionType == FUNCTION_TYPE_TLCC) {
×
988
      pInfo->resTargetSlot = dstSlot;
×
989
    } else if (functionType == FUNCTION_TYPE_IMPUTATION_ROWTS) {
×
990
      pInfo->imputatSup.resTsSlot = dstSlot;
×
991
    } else if (functionType == FUNCTION_TYPE_IMPUTATION_MARK) {
×
992
      pInfo->imputatSup.resMarkSlot = dstSlot;
×
993
    }
994
  }
995

996
  return 0;
×
997
}
998

999
void doInitBaseOptions(SBaseSupp* pSupp) {
×
1000
  pSupp->numOfCols = 0;
×
1001
  pSupp->numOfRows = 0;
×
1002
  pSupp->numOfBlocks = 0;
×
1003
  pSupp->wncheck = ANALY_DEFAULT_WNCHECK;
×
1004
  pSupp->timeout = ANALY_DEFAULT_TIMEOUT;
×
1005
  pSupp->groupId = 0;
×
1006

1007
  pSupp->win.skey = INT64_MAX;
×
1008
  pSupp->win.ekey = INT64_MIN;
×
1009
}
×
1010

1011
void doInitImputOptions(SImputationSupp* pSupp) {
×
1012
  doInitBaseOptions(&pSupp->base);
×
1013

1014
  pSupp->freq[0] = 'S';  // d(day), h(hour), m(minute),s(second), ms(millisecond), us(microsecond)
×
1015
  pSupp->freq[1] = '\0';
×
1016

1017
  pSupp->tsSlot = -1;
×
1018
  pSupp->targetSlot = -1;
×
1019
  pSupp->targetType = -1;
×
1020
  pSupp->tsPrecision = -1;
×
1021
}
×
1022

1023
void doInitDtwOptions(SCorrelationSupp* pSupp) {
×
1024
  doInitBaseOptions(&pSupp->base);
×
1025
  pSupp->radius = 1;
×
1026

1027
  pSupp->lagStart = -1;
×
1028
  pSupp->lagEnd = 1;
×
1029

1030
  pSupp->targetSlot1 = -1;
×
1031
  pSupp->targetSlot1Type = TSDB_DATA_TYPE_INT;
×
1032
  pSupp->targetSlot2 = -1;
×
1033
  pSupp->targetSlot2Type = TSDB_DATA_TYPE_INT;
×
1034
}
×
1035

1036
int32_t parseFreq(SImputationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
NEW
1037
  int32_t code = 0;
×
NEW
1038
  char*   p = NULL;
×
NEW
1039
  int32_t len = 0;
×
NEW
1040
  regex_t regex;
×
1041

UNCOV
1042
  char* pFreq = taosHashGet(pHashMap, FREQ_STR, strlen(FREQ_STR));
×
1043
  if (pFreq != NULL) {
×
1044
    len = taosHashGetValueSize(pFreq);
×
NEW
1045
    p = taosStrndupi(pFreq, len);
×
1046
    if (p == NULL) {
×
1047
      qError("%s failed to clone the freq param, code:%s", id, strerror(terrno));
×
1048
      return terrno;
×
1049
    }
1050

NEW
1051
    if (regcomp(&regex, "^([1-9][0-9]*|[1-9]*)(ms|us|ns|[smhdw])$", REG_EXTENDED | REG_ICASE) != 0) {
×
NEW
1052
      qError("%s failed to compile regex for freq param", id);
×
NEW
1053
      return TSDB_CODE_INVALID_PARA;
×
1054
    }
1055

NEW
1056
    int32_t res = regexec(&regex, p, 0, NULL, 0);
×
NEW
1057
    regfree(&regex);
×
NEW
1058
    if (res != 0) {
×
1059
      qError("%s invalid freq parameter: %s", id, p);
×
NEW
1060
      taosMemoryFreeClear(p);
×
NEW
1061
      return TSDB_CODE_INVALID_PARA;
×
1062
    }
1063

UNCOV
1064
    if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1065
      tstrncpy(pSupp->freq, pFreq, len + 1);
×
1066
      qDebug("%s data freq:%s", id, pSupp->freq);
×
1067
    }
1068
  } else {
1069
    qDebug("%s not specify data freq, default: %s", id, pSupp->freq);
×
1070
  }
1071

UNCOV
1072
  taosMemoryFreeClear(p);
×
1073
  return code;
×
1074
}
1075

1076
void parseRadius(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
1077
  char* pRadius = taosHashGet(pHashMap, RADIUS_STR, strlen(RADIUS_STR));
×
UNCOV
1078
  if (pRadius != NULL) {
×
1079
    pSupp->radius = *(int32_t*)pRadius;
×
UNCOV
1080
    qDebug("%s dtw search radius:%d", id, pSupp->radius);
×
1081
  } else {
UNCOV
1082
    qDebug("%s not specify search radius, default: %d", id, pSupp->radius);
×
1083
  }
1084
}
×
1085

1086
void parseLag(SCorrelationSupp* pSupp, SHashObj* pHashMap, const char* id) {
×
1087
  char* pLagStart = taosHashGet(pHashMap, LAGSTART_STR, strlen(LAGSTART_STR));
×
UNCOV
1088
  if (pLagStart != NULL) {
×
1089
    pSupp->lagStart = *(int32_t*)pLagStart;
×
UNCOV
1090
    qDebug("%s tlcc lag start:%d", id, pSupp->lagStart);
×
1091
  } else {
1092
    qDebug("%s not specify tlcc lag start, default: %d", id, pSupp->lagStart);
×
1093
  }
1094

1095
  char* pLagEnd = taosHashGet(pHashMap, LAGEND_STR, strlen(LAGEND_STR));
×
UNCOV
1096
  if (pLagEnd != NULL) {
×
1097
    pSupp->lagEnd = *(int32_t*)pLagEnd;
×
UNCOV
1098
    qDebug("%s tlcc lag end:%d", id, pSupp->lagEnd);
×
1099
  } else {
UNCOV
1100
    qDebug("%s not specify tlcc lag end, default: %d", id, pSupp->lagEnd);
×
1101
  }
1102
}
×
1103

1104
int32_t estResultRowsAfterImputation(int32_t rows, int64_t skey, int64_t ekey, int32_t prec, const char* pFreq, const char* id) {
×
1105
  int64_t range = ekey - skey;
×
1106
  double  factor = 0;
×
1107
  if (prec == TSDB_TIME_PRECISION_MILLI) {
×
1108
    if (strcmp(pFreq, "h") == 0) {
×
1109
      factor = 0.001 * 1/3600;
×
1110
    } else if (strcmp(pFreq, "m") == 0) {
×
1111
      factor = 0.001 * 1/60;
×
1112
    } else if (strcmp(pFreq, "s") == 0) {
×
1113
      factor = 0.001;
×
1114
    } else if (strcmp(pFreq, "ms") == 0) {
×
1115
      factor = 1;
×
1116
    } else if (strcmp(pFreq, "us") == 0) {
×
UNCOV
1117
      factor *= 1000;
×
UNCOV
1118
    } else if (strcmp(pFreq, "ns") == 0) {
×
1119
      factor *= 1000000;
×
1120
    }
1121

1122
    int64_t num = range * factor - rows;
×
UNCOV
1123
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
1124
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
1125
      return TSDB_CODE_INVALID_PARA;
×
1126
    }
1127
  } else if (prec == TSDB_TIME_PRECISION_MICRO) {
×
1128
    if (strcmp(pFreq, "h") == 0) {
×
1129
      factor = 0.000001 * 1/3600;
×
1130
    } else if (strcmp(pFreq, "m") == 0) {
×
1131
      factor = 0.000001 * 1/60;
×
1132
    } else if (strcmp(pFreq, "s") == 0) {
×
1133
      factor = 0.000001;
×
1134
    } else if (strcmp(pFreq, "ms") == 0) {
×
1135
      factor = 1000;
×
1136
    } else if (strcmp(pFreq, "us") == 0) {
×
UNCOV
1137
      factor *= 1;
×
UNCOV
1138
    } else if (strcmp(pFreq, "ns") == 0) {
×
1139
      factor *= 1000;
×
1140
    }
1141

1142
    int64_t num = range * factor - rows;
×
UNCOV
1143
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
1144
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
1145
      return TSDB_CODE_INVALID_PARA;
×
1146
    }
1147
  } else if (prec == TSDB_TIME_PRECISION_NANO) {
×
1148
    if (strcmp(pFreq, "h") == 0) {
×
1149
      factor = 0.000000001 * 1/3600;
×
1150
    } else if (strcmp(pFreq, "m") == 0) {
×
1151
      factor = 0.000000001 * 1/60;
×
1152
    } else if (strcmp(pFreq, "s") == 0) {
×
1153
      factor = 0.000000001;
×
1154
    } else if (strcmp(pFreq, "ms") == 0) {
×
1155
      factor = 1000000;
×
1156
    } else if (strcmp(pFreq, "us") == 0) {
×
UNCOV
1157
      factor *= 1000;
×
UNCOV
1158
    } else if (strcmp(pFreq, "ns") == 0) {
×
1159
      factor *= 1;
×
1160
    }
1161

1162
    int64_t num = range * factor - rows;
×
UNCOV
1163
    if (num > ANALY_MAX_IMPUT_ROWS) {
×
UNCOV
1164
      qError("%s too many rows to imputation, est:%"PRId64, id, num);
×
UNCOV
1165
      return TSDB_CODE_INVALID_PARA;
×
1166
    }
1167
  }
1168

1169
  return TSDB_CODE_SUCCESS;
×
1170
}
1171

1172
int32_t doParseOption(SAnalysisOperatorInfo* pInfo, const char* id) {
×
UNCOV
1173
  int32_t   code = 0;
×
1174
  int32_t   lino = 0;
×
1175
  SHashObj* pHashMap = NULL;
×
1176

UNCOV
1177
  code = taosAnalyGetOpts(pInfo->options, &pHashMap);
×
UNCOV
1178
  if (code != TSDB_CODE_SUCCESS) {
×
1179
    return code;
×
1180
  }
1181

1182
  int32_t type = 0;
×
1183
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
1184
    type = ANALY_ALGO_TYPE_IMPUTATION;
×
UNCOV
1185
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH ||
×
UNCOV
1186
             pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
1187
    type = ANALY_ALGO_TYPE_CORREL;
×
1188
  }
1189

UNCOV
1190
  code = taosAnalysisParseAlgo(pInfo->options, pInfo->algoName, pInfo->algoUrl, type, tListLen(pInfo->algoUrl),
×
1191
                               pHashMap, id);
1192
  TSDB_CHECK_CODE(code, lino, _end);
×
1193

1194
  // extract the timeout parameter
1195
  SBaseSupp* pSupp =
×
UNCOV
1196
      (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) ? &pInfo->imputatSup.base : &pInfo->corrSupp.base;
×
1197
  pSupp->timeout = taosAnalysisParseTimout(pHashMap, id);
×
UNCOV
1198
  pSupp->wncheck = taosAnalysisParseWncheck(pHashMap, id);
×
1199

1200
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
1201
    // extract data freq
1202
    code = parseFreq(&pInfo->imputatSup, pHashMap, id);
×
1203
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH) {
×
UNCOV
1204
    parseRadius(&pInfo->corrSupp, pHashMap, id);
×
UNCOV
1205
  } else if (pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
1206
    parseLag(&pInfo->corrSupp, pHashMap, id);
×
1207
  }
1208

UNCOV
1209
_end:
×
UNCOV
1210
  taosHashCleanup(pHashMap);
×
1211
  return code;
×
1212
}
1213

UNCOV
1214
static int32_t doCreateBuf(SAnalysisOperatorInfo* pInfo, const char* pId) {
×
1215
  SBaseSupp* pSupp =
×
1216
      (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) ? &pInfo->imputatSup.base : &pInfo->corrSupp.base;
×
1217

UNCOV
1218
  SAnalyticBuf* pBuf = &pSupp->analyBuf;
×
1219
  int64_t       ts = taosGetTimestampNs();
×
1220
  int32_t       index = 0;
×
1221

1222
  pBuf->bufType = ANALYTICS_BUF_TYPE_JSON_COL;
×
1223
  snprintf(pBuf->fileName, sizeof(pBuf->fileName), "%s/tdengine-analysis-%p-%" PRId64, tsTempDir, pInfo, ts);
×
1224

1225
  int32_t code = tsosAnalyBufOpen(pBuf, pSupp->numOfCols, pId);
×
1226
  if (code != 0) goto _OVER;
×
1227

UNCOV
1228
  if (pInfo->analysisType == FUNCTION_TYPE_IMPUTATION) {
×
1229
    code = taosAnalyBufWriteColMeta(pBuf, index++, TSDB_DATA_TYPE_TIMESTAMP, "ts");
×
1230
    if (code != 0) goto _OVER;
×
1231

1232
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->imputatSup.targetType, "val");
×
1233
    if (code != 0) goto _OVER;
×
UNCOV
1234
  } else if (pInfo->analysisType == FUNCTION_TYPE_DTW || pInfo->analysisType == FUNCTION_TYPE_DTW_PATH || pInfo->analysisType == FUNCTION_TYPE_TLCC) {
×
1235
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->corrSupp.targetSlot1Type, "val");
×
1236
    if (code != 0) goto _OVER;
×
1237

UNCOV
1238
    code = taosAnalyBufWriteColMeta(pBuf, index++, pInfo->corrSupp.targetSlot2Type, "val1");
×
1239
    if (code != 0) goto _OVER;
×
1240
  }
1241

1242
  code = taosAnalyBufWriteDataBegin(pBuf);
×
1243
  if (code != 0) goto _OVER;
×
1244

UNCOV
1245
  for (int32_t i = 0; i < pSupp->numOfCols; ++i) {
×
UNCOV
1246
    code = taosAnalyBufWriteColBegin(pBuf, i);
×
1247
    if (code != 0) goto _OVER;
×
1248
  }
1249

1250
_OVER:
×
UNCOV
1251
  if (code != 0) {
×
1252
    (void)taosAnalyBufClose(pBuf);
×
UNCOV
1253
    taosAnalyBufDestroy(pBuf);
×
1254
  }
UNCOV
1255
  return code;
×
1256
}
1257

1258
#else
1259

1260
int32_t createGenericAnalysisOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo,
1261
                                     SOperatorInfo** pOptrInfo) {
1262
  return TSDB_CODE_OPS_NOT_SUPPORT;
1263
}
1264
void analysisDestroyOperatorInfo(void* param) {}
1265

1266
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc