• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4976

06 Mar 2026 09:48AM UTC coverage: 68.446% (+0.08%) from 68.37%
#4976

push

travis-ci

web-flow
feat(TDgpt): support multiple input data columns for anomaly detection. (#34606)

0 of 93 new or added lines in 9 files covered. (0.0%)

5718 existing lines in 144 files now uncovered.

211146 of 308486 relevant lines covered (68.45%)

136170362.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.83
/source/common/src/tanalytics.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tanalytics.h"
18
#include "ttypes.h"
19
#include "tutil.h"
20
#include "osTime.h"
21

22
#ifdef USE_ANALYTICS
23
#include <curl/curl.h>
24

25
#define ANALYTICS_ALOG_SPLIT_CHAR ","
26

27
typedef struct {
28
  int64_t       ver;
29
  SHashObj     *hash;  // algoname:algotype -> SAnalyticsUrl
30
  TdThreadMutex lock;
31
} SAlgoMgmt;
32

33
typedef struct {
34
  char   *data;
35
  int64_t dataLen;
36
} SCurlResp;
37

38
static SAlgoMgmt tsAlgos = {0};
39
static int32_t   taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
40

41
const char *taosAnalysisAlgoType(EAnalyAlgoType type) {
×
42
  switch (type) {
×
43
    case ANALY_ALGO_TYPE_ANOMALY_DETECT:
×
44
      return "anomaly-detection";
×
45
    case ANALY_ALGO_TYPE_FORECAST:
×
46
      return "forecast";
×
47
    case ANALY_ALGO_TYPE_IMPUTATION:
×
48
      return "imputation";
×
49
    case ANALY_ALGO_TYPE_CORREL:
×
50
      return "correlation";
×
51
    case ANALY_ALGO_TYPE_CLASSIFI:
×
52
      return "classification";
×
53
    case ANALY_ALGO_TYPE_MOTIF:
×
54
      return "moti-discovery";
×
55
    default:
×
56
      return "unknown";
×
57
  }
58
}
59

60
const char *taosAnalyAlgoUrlStr(EAnalyAlgoType type) {
×
61
  switch (type) {
×
62
    case ANALY_ALGO_TYPE_ANOMALY_DETECT:
×
63
      return "anomaly-detect";
×
64
    case ANALY_ALGO_TYPE_FORECAST:
×
65
      return "forecast";
×
66
    case ANALY_ALGO_TYPE_IMPUTATION:
×
67
      return "imputation";
×
68
    case ANALY_ALGO_TYPE_CORREL:
×
69
      return "correlation";
×
70
    case ANALY_ALGO_TYPE_CLASSIFI:
×
71
      return "classification";
×
72
    case ANALY_ALGO_TYPE_MOTIF:
×
73
      return "moti-discovery";
×
74
    default:
×
75
      return "unknown";
×
76
  }
77
}
78

79
EAnalyAlgoType taosAnalyAlgoInt(const char *name) {
×
80
  for (EAnalyAlgoType i = 0; i < ANALY_ALGO_TYPE_END; ++i) {
×
81
    if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) {
×
82
      return i;
×
83
    }
84
  }
85

86
  return ANALY_ALGO_TYPE_END;
×
87
}
88

89
int32_t taosAnalyticsInit() {
609,884✔
90
  if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
609,884✔
91
    uError("failed to init curl");
×
92
    return -1;
×
93
  }
94

95
  tsAlgos.ver = 0;
609,884✔
96
  if (taosThreadMutexInit(&tsAlgos.lock, NULL) != 0) {
609,884✔
97
    uError("failed to init algo mutex");
×
98
    return -1;
×
99
  }
100

101
  tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
609,884✔
102
  if (tsAlgos.hash == NULL) {
609,884✔
103
    uError("failed to init algo hash");
×
104
    return -1;
×
105
  }
106

107
  uInfo("analysis env is initialized");
609,884✔
108
  return 0;
609,884✔
109
}
110

111
static void taosAnalyFreeHash(SHashObj *hash) {
609,884✔
112
  void *pIter = taosHashIterate(hash, NULL);
609,884✔
113
  while (pIter != NULL) {
609,884✔
114
    SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;
×
115
    taosMemoryFree(pUrl->url);
×
116
    pIter = taosHashIterate(hash, pIter);
×
117
  }
118
  taosHashCleanup(hash);
609,884✔
119
}
609,884✔
120

121
void taosAnalyticsCleanup() {
609,884✔
122
  curl_global_cleanup();
609,884✔
123
  if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) {
609,884✔
124
    uError("failed to destroy anal lock");
×
125
  }
126
  taosAnalyFreeHash(tsAlgos.hash);
609,884✔
127
  tsAlgos.hash = NULL;
609,884✔
128
  uInfo("analysis env is cleaned up");
609,884✔
129
}
609,884✔
130

131
void taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {
×
132
  if (newVer > tsAlgos.ver) {
×
133
    if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
134
      SHashObj *hash = tsAlgos.hash;
×
135
      tsAlgos.ver = newVer;
×
136
      tsAlgos.hash = pHash;
×
137
      if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
138
        uError("failed to unlock hash")
×
139
      }
140
      taosAnalyFreeHash(hash);
×
141
    }
142
  } else {
143
    taosAnalyFreeHash(pHash);
×
144
  }
145
}
×
146

147
int32_t taosAnalyGetOpts(const char *pOption, SHashObj **pOptHash) {
×
148
  int32_t num = 0;
×
149
  int32_t code = 0;
×
150
  char   *pTmp = NULL;
×
151

152
  if (pOptHash != NULL) {
×
153
    (*pOptHash) = NULL;
×
154
  } else {
155
    return TSDB_CODE_INVALID_PARA;
×
156
  }
157

158
  pTmp = taosStrdup(pOption);
×
159
  if (pTmp == NULL) {
×
160
    return terrno;
×
161
  }
162

163
  int32_t unused = strdequote(pTmp);
×
164
  char  **pList = strsplit(pTmp, ANALYTICS_ALOG_SPLIT_CHAR, &num);
×
165

166
  (*pOptHash) = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), 1, HASH_NO_LOCK);
×
167
  if ((*pOptHash) == NULL) {
×
168
    taosMemoryFree(pTmp);
×
169
    taosMemoryFree(pList);
×
170
    return terrno;
×
171
  }
172

173
  for (int32_t i = 0; i < num; ++i) {
×
174
    int32_t parts = 0;
×
175
    char  **pParts = strsplit(pList[i], "=", &parts);
×
176

177
    if (parts < 2) {  // invalid parameters, ignore and continue
×
178
      taosMemoryFree(pParts);
×
179
      continue;
×
180
    }
181

182
    size_t keyLen = strtrim(pParts[0]);
×
183
    size_t valLen = strtrim(pParts[1]);
×
184

185
    code = taosHashPut(*pOptHash, pParts[0], keyLen, pParts[1], valLen);
×
186
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_DUP_KEY) {
×
187
      // return error
188
      taosMemoryFree(pTmp);
×
189
      taosMemoryFree(pList);
×
190
      taosMemoryFree(pParts);
×
191
      return code;
×
192
    } else {
193
      code = 0;
×
194
    }
195

196
    taosMemoryFree(pParts);
×
197
  }
198

199
  taosMemoryFree(pTmp);
×
200
  taosMemoryFree(pList);
×
201
  return code;
×
202
}
203

204
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
×
205
  SHashObj* p = NULL;
×
206
  int32_t code = taosAnalyGetOpts(option, &p);
×
207
  if (code != TSDB_CODE_SUCCESS) {
×
208
    if (p != NULL) {
×
209
      taosHashCleanup(p);
×
210
      p = NULL;
×
211
    }
212
    return false;
×
213
  }
214

215
  void* pVal = taosHashGet(p, optName, strlen(optName));
×
216
  if (pVal == NULL) {
×
217
    taosHashCleanup(p);
×
218
    return false;
×
219
  }
220

221
  int32_t valLen = taosHashGetValueSize(pVal);
×
222

223
  if (optValue != NULL && optMaxLen >= 1) {
×
224
    int32_t len = MIN(valLen + 1, optMaxLen);
×
225
    tstrncpy(optValue, (char *)pVal, len);
×
226
  }
227

228
  taosHashCleanup(p);
×
229
  return true;
×
230
}
231

232
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalyAlgoType type, char *url, int32_t urlLen) {
×
233
  int32_t code = 0;
×
234
  char    name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0};
×
235
  int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
×
236

237
  char *unused = strntolower(name, name, nameLen);
×
238

239
  if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
240
    SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
×
241
    if (pUrl != NULL) {
×
242
      tstrncpy(url, pUrl->url, urlLen);
×
243
      uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalysisAlgoType(type), url);
×
244
    } else {
245
      url[0] = 0;
×
246
      code = TSDB_CODE_ANA_ALGO_NOT_FOUND;
×
247
      uError("algo:%s, type:%s, url not found", algoName, taosAnalysisAlgoType(type));
×
248
    }
249

250
    if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
251
      uError("failed to unlock hash");
×
252
      return TSDB_CODE_OUT_OF_MEMORY;
×
253
    }
254
  }
255

256
  return code;
×
257
}
258

259
int64_t taosAnalyGetVersion() { return tsAlgos.ver; }
50,271,291✔
260

261
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
×
262
  SCurlResp *pRsp = userdata;
×
263
  if (contLen == 0 || nmemb == 0 || pCont == NULL) {
×
264
    pRsp->dataLen = 0;
×
265
    pRsp->data = NULL;
×
266
    uError("curl response is received, len:%" PRId64, pRsp->dataLen);
×
267
    return 0;
×
268
  }
269

270
  int64_t newDataSize = (int64_t)contLen * nmemb;
×
271
  int64_t size = pRsp->dataLen + newDataSize;
×
272

273
  if (pRsp->data == NULL) {
×
274
    pRsp->data = taosMemoryMalloc(size + 1);
×
275
    if (pRsp->data == NULL) {
×
276
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
277
      return 0;  // return the recv length, if failed, return 0
×
278
    }
279
  } else {
280
    char *p = taosMemoryRealloc(pRsp->data, size + 1);
×
281
    if (p == NULL) {
×
282
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
283
      return 0;  // return the recv length, if failed, return 0
×
284
    }
285

286
    pRsp->data = p;
×
287
  }
288

289
  if (pRsp->data != NULL) {
×
290
    (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize);
×
291

292
    pRsp->dataLen = size;
×
293
    pRsp->data[size] = 0;
×
294

295
    uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data);
×
296
    return newDataSize;
×
297
  } else {
298
    pRsp->dataLen = 0;
×
299
    uError("failed to malloc curl response");
×
300
    return 0;
×
301
  }
302
}
303

304
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) {
×
305
  CURL    *curl = NULL;
×
306
  CURLcode code = 0;
×
307

308
  curl = curl_easy_init();
×
309
  if (curl == NULL) {
×
310
    uError("failed to create curl handle");
×
311
    return -1;
×
312
  }
313

314
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
315
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
316
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
317
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 1000) != 0) goto _OVER;
×
318

319
  uDebug("curl get request will sent, url:%s", url);
×
320
  code = curl_easy_perform(curl);
×
321
  if (code != CURLE_OK) {
×
322
    uError("failed to perform curl action, code:%d", code);
×
323
  }
324

325
_OVER:
×
326
  if (curl != NULL) curl_easy_cleanup(curl);
×
327
  return code;
×
328
}
329

330
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout,
×
331
                                   const char *id) {
332
  struct curl_slist *headers = NULL;
×
333
  CURL              *curl = NULL;
×
334
  CURLcode           code = 0;
×
335

336
  curl = curl_easy_init();
×
337
  if (curl == NULL) {
×
338
    uError("%s failed to create curl handle", id);
×
339
    return -1;
×
340
  }
341

342
  headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
×
343
  if (curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers) != 0) goto _OVER;
×
344
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
345
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
346
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
347
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout) != 0) goto _OVER;
×
348
  if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER;
×
349
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER;
×
350
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER;
×
351
  if (curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L) != 0) goto _OVER;
×
352
  if (curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L) != 0) goto _OVER;
×
353

354
  uDebugL("%s curl post request will sent, url:%s len:%d content:%s", id, url, bufLen, buf);
×
355
  code = curl_easy_perform(curl);
×
356
  if (code != CURLE_OK) {
×
357
    uError("%s failed to perform curl action, code:%d", id, code);
×
358
  }
359

360
_OVER:
×
361
  if (curl != NULL) {
×
362
    curl_slist_free_all(headers);
×
363
    curl_easy_cleanup(curl);
×
364
  }
365
  return code;
×
366
}
367

368
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout, const char* id) {
×
369
  int32_t   code = -1;
×
370
  char     *pCont = NULL;
×
371
  int64_t   contentLen;
×
372
  SJson    *pJson = NULL;
×
373
  SCurlResp curlRsp = {0};
×
374

375
  if (type == ANALYTICS_HTTP_TYPE_GET) {
×
376
    if ((code = taosCurlGetRequest(url, &curlRsp)) != 0) {
×
377
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
378
      goto _OVER;
×
379
    }
380
  } else {
381
    code = taosAnalyBufGetCont(pBuf, &pCont, &contentLen);
×
382
    if (code != 0) {
×
383
      terrno = code;
×
384
      goto _OVER;
×
385
    }
386
    if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen, timeout, id) != 0) {
×
387
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
388
      goto _OVER;
×
389
    }
390
  }
391

392
  if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
×
393
    terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL;
×
394
    goto _OVER;
×
395
  }
396

397
  pJson = tjsonParse(curlRsp.data);
×
398
  if (pJson == NULL) {
×
399
    if (curlRsp.data[0] == '<') {
×
400
      terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
401
    } else {
402
      terrno = TSDB_CODE_INVALID_JSON_FORMAT;
×
403
    }
404
    goto _OVER;
×
405
  }
406

407
_OVER:
×
408
  if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
×
409
  if (pCont != NULL) taosMemoryFree(pCont);
×
410
  return pJson;
×
411
}
412

413
static int32_t taosAnalyJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
×
414
  int32_t   code = 0;
×
415
  int64_t   contLen;
×
416
  char     *pCont = NULL;
×
417
  TdFilePtr pFile = NULL;
×
418

419
  pFile = taosOpenFile(fileName, TD_FILE_READ);
×
420
  if (pFile == NULL) {
×
421
    code = terrno;
×
422
    goto _OVER;
×
423
  }
424

425
  code = taosFStatFile(pFile, &contLen, NULL);
×
426
  if (code != 0) goto _OVER;
×
427

428
  pCont = taosMemoryMalloc(contLen + 1);
×
429
  if (pCont == NULL) {
×
430
    code = TSDB_CODE_OUT_OF_MEMORY;
×
431
    goto _OVER;
×
432
  }
433

434
  if (taosReadFile(pFile, pCont, contLen) != contLen) {
×
435
    code = terrno;
×
436
    goto _OVER;
×
437
  }
438

439
  pCont[contLen] = '\0';
×
440

441
_OVER:
×
442
  if (code == 0) {
×
443
    *ppCont = pCont;
×
444
    *pContLen = contLen;
×
445
  } else {
446
    if (pCont != NULL) taosMemoryFree(pCont);
×
447
  }
448
  if (pFile != NULL) taosCloseFile(&pFile);
×
449
  return code;
×
450
}
451

452
static int32_t taosAnalyJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
453
  char    buf[64] = {0};
×
454
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
×
455
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
456
    return terrno;
×
457
  }
458
  return 0;
×
459
}
460

461
static int32_t taosAnalyJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
462
  int32_t code = 0;
×
463
  int32_t keyLen = strlen(optName);
×
464
  int32_t valLen = strlen(optVal);
×
465

466
  int32_t totalLen = keyLen + valLen + 20;
×
467
  char *  buf = taosMemoryMalloc(totalLen);
×
468
  if (buf == NULL) {
×
469
    uError("failed to prepare the buffer for serializing the key/value info for analysis, len:%d, code:%s", totalLen,
×
470
           tstrerror(terrno));
471
    return terrno;
×
472
  }
473

474
  int32_t bufLen = tsnprintf(buf, totalLen, "\"%s\": \"%s\",\n", optName, optVal);
×
475
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
476
    code = terrno;
×
477
  }
478

479
  taosMemoryFree(buf);
×
480
  return code;
×
481
}
482

483
static int32_t taosAnalyJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
484
  char    buf[128] = {0};
×
485
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
×
486
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
487
    return terrno;
×
488
  }
489
  return 0;
×
490
}
491

492
static int32_t taosAnalyJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) {
×
493
  if (bufLen <= 0) {
×
494
    bufLen = strlen(buf);
×
495
  }
496
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
497
    return terrno;
×
498
  }
499
  return 0;
×
500
}
501

502
static int32_t taosAnalyJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalyJsonBufWriteStr(pBuf, "{\n", 0); }
×
503

504
static int32_t tsosAnalyJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols, const char* pId) {
×
505
  pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
506
  if (pBuf->filePtr == NULL) {
×
507
    return terrno;
×
508
  }
509

510
  pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf));
×
511
  if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
×
512
  pBuf->numOfCols = numOfCols;
×
513

514
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
515
    return taosAnalyJsonBufWriteStart(pBuf);
×
516
  }
517

518
  pBuf->pid = pId;
×
519

520
  for (int32_t i = 0; i < numOfCols; ++i) {
×
521
    SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
522
    snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d-%p", pBuf->fileName, i, pBuf);
×
523
    pCol->filePtr =
×
524
        taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
525
    if (pCol->filePtr == NULL) {
×
526
      uError("%s failed to open tmp file for keep forecast history data, code:%s", pId, tstrerror(terrno));
×
527
      return terrno;
×
528
    }
529
  }
530

531
  return taosAnalyJsonBufWriteStart(pBuf);
×
532
}
533

534
static int32_t taosAnalyJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
535
  char buf[128] = {0};
×
536
  bool first = (colIndex == 0);
×
537
  bool last = (colIndex == pBuf->numOfCols - 1);
×
538

539
  if (first) {
×
540
    if (taosAnalyJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
×
541
      return terrno;
×
542
    }
543
  }
544

545
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "  [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name,
×
546
                             tDataTypes[colType].bytes, last ? "" : ",");
×
547
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
548
    return terrno;
×
549
  }
550

551
  if (last) {
×
552
    if (taosAnalyJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
×
553
      return terrno;
×
554
    }
555
  }
556

557
  return 0;
×
558
}
559

560
static int32_t taosAnalyJsonBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
561
  return taosAnalyJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
×
562
}
563

564
static int32_t taosAnalyJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
×
565
  if (bufLen <= 0) {
×
566
    bufLen = strlen(buf);
×
567
  }
568

569
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
570
    int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen);
×
571
    if (ret != bufLen) {
×
572
      return terrno;
×
573
    }
574
  } else {
575
    int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen);
×
576
    if (ret != bufLen) {
×
577
      return terrno;
×
578
    }
579
  }
580

581
  return 0;
×
582
}
583

584
static int32_t taosAnalyJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
585
  return taosAnalyJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
×
586
}
587

588
static int32_t taosAnalyJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
589
  if (colIndex == pBuf->numOfCols - 1) {
×
590
    return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
×
591
  } else {
UNCOV
592
    return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
×
593
  }
594
}
595

596
static int32_t taosAnalyJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
597
  char    buf[64];
×
598
  int32_t bufLen = 0;
×
599

600
  if (pBuf->pCols[colIndex].numOfRows != 0) {
×
601
    buf[bufLen] = ',';
×
602
    buf[bufLen + 1] = '\n';
×
603
    buf[bufLen + 2] = 0;
×
604
    bufLen += 2;
×
605
  }
606

607
  switch (colType) {
×
608
    case TSDB_DATA_TYPE_BOOL:
×
609
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0);
×
610
      break;
×
611
    case TSDB_DATA_TYPE_TINYINT:
×
612
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue);
×
613
      break;
×
614
    case TSDB_DATA_TYPE_UTINYINT:
×
615
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue);
×
616
      break;
×
617
    case TSDB_DATA_TYPE_SMALLINT:
×
618
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue);
×
619
      break;
×
620
    case TSDB_DATA_TYPE_USMALLINT:
×
621
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue);
×
622
      break;
×
623
    case TSDB_DATA_TYPE_INT:
×
624
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue);
×
625
      break;
×
626
    case TSDB_DATA_TYPE_UINT:
×
627
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue);
×
628
      break;
×
629
    case TSDB_DATA_TYPE_BIGINT:
×
630
    case TSDB_DATA_TYPE_TIMESTAMP:
631
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64, *(int64_t *)colValue);
×
632
      break;
×
633
    case TSDB_DATA_TYPE_UBIGINT:
×
634
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64, *(uint64_t *)colValue);
×
635
      break;
×
636
    case TSDB_DATA_TYPE_FLOAT:
×
637
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue));
×
638
      break;
×
639
    case TSDB_DATA_TYPE_DOUBLE:
×
640
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue));
×
641
      break;
×
642
    default:
×
643
      buf[bufLen] = '\0';
×
644
  }
645

646
  pBuf->pCols[colIndex].numOfRows++;
×
647
  return taosAnalyJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
×
648
}
649

650
static int32_t taosAnalyJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
651
  int32_t code = 0;
×
652
  char   *pCont = NULL;
×
653
  int64_t contLen = 0;
×
654

655
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
656
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
657
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
658

659
      code = taosFsyncFile(pCol->filePtr);
×
660
      if (code != 0) return code;
×
661

662
      code = taosCloseFile(&pCol->filePtr);
×
663
      if (code != 0) return code;
×
664

665
      code = taosAnalyJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
×
666
      if (code != 0) return code;
×
667

668
      code = taosAnalyJsonBufWriteStr(pBuf, pCont, contLen);
×
669
      if (code != 0) return code;
×
670

671
      taosMemoryFreeClear(pCont);
×
672
      contLen = 0;
×
673
    }
674
  }
675

676
  return taosAnalyJsonBufWriteStr(pBuf, "],\n", 0);
×
677
}
678

679
static int32_t taosAnalyJsonBufWriteEnd(SAnalyticBuf *pBuf) {
×
680
  int32_t code = taosAnalyJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
×
681
  if (code != 0) return code;
×
682

683
  return taosAnalyJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
×
684
}
685

686
int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) {
×
687
  int32_t code = taosAnalyJsonBufWriteEnd(pBuf);
×
688
  if (code != 0) return code;
×
689

690
  if (pBuf->filePtr != NULL) {
×
691
    code = taosFsyncFile(pBuf->filePtr);
×
692
    if (code != 0) return code;
×
693
    code = taosCloseFile(&pBuf->filePtr);
×
694
    if (code != 0) return code;
×
695
  }
696

697
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
698
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
699
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
700
      if (pCol->filePtr != NULL) {
×
701
        code = taosFsyncFile(pCol->filePtr);
×
702
        if (code != 0) return code;
×
703
        code = taosCloseFile(&pCol->filePtr);
×
704
        if (code != 0) return code;
×
705
      }
706
    }
707
  }
708

709
  return 0;
×
710
}
711

712
void taosAnalyBufDestroy(SAnalyticBuf *pBuf) {
×
713
  if (pBuf->fileName[0] != 0) {
×
714
    if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
×
715
    if (taosRemoveFile(pBuf->fileName) != 0) {
×
716
      uError("%s failed to remove file:%s, code:%s", pBuf->pid, pBuf->fileName, tstrerror(terrno));
×
717
    }
718
    pBuf->fileName[0] = 0;
×
719
  }
720

721
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
722
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
723
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
724
      if (pCol->fileName[0] != 0) {
×
725
        if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr);
×
726
        if (taosRemoveFile(pCol->fileName) != 0) {
×
727
          uError("%s failed to remove file:%s, code:%s", pBuf->pid, pCol->fileName, tstrerror(terrno));
×
728
        }
729
        pCol->fileName[0] = 0;
×
730
      }
731
    }
732
  }
733

734
  taosMemoryFreeClear(pBuf->pCols);
×
735
  pBuf->numOfCols = 0;
×
736
}
×
737

738
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols, const char* pId) {
×
739
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
740
    return tsosAnalyJsonBufOpen(pBuf, numOfCols, pId);
×
741
  } else {
742
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
743
  }
744
}
745

746
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
747
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
748
    return taosAnalyJsonBufWriteOptStr(pBuf, optName, optVal);
×
749
  } else {
750
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
751
  }
752
}
753

754
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
755
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
756
    return taosAnalyJsonBufWriteOptInt(pBuf, optName, optVal);
×
757
  } else {
758
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
759
  }
760
}
761

762
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
763
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
764
    return taosAnalyJsonBufWriteOptFloat(pBuf, optName, optVal);
×
765
  } else {
766
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
767
  }
768
}
769

770
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
771
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
772
    return taosAnalyJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
×
773
  } else {
774
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
775
  }
776
}
777

778
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
779
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
780
    return taosAnalyJsonBufWriteDataBegin(pBuf);
×
781
  } else {
782
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
783
  }
784
}
785

786
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
787
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
788
    return taosAnalyJsonBufWriteColBegin(pBuf, colIndex);
×
789
  } else {
790
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
791
  }
792
}
793

794
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
795
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
796
    return taosAnalyJsonBufWriteColData(pBuf, colIndex, colType, colValue);
×
797
  } else {
798
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
799
  }
800
}
801

802
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
803
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
804
    return taosAnalyJsonBufWriteColEnd(pBuf, colIndex);
×
805
  } else {
806
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
807
  }
808
}
809

810
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
811
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
812
    return taosAnalyJsonBufWriteDataEnd(pBuf);
×
813
  } else {
814
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
815
  }
816
}
817

818
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) {
×
819
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
820
    return taosAnalJsonBufClose(pBuf);
×
821
  } else {
822
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
823
  }
824
}
825

826
static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) {
×
827
  *ppCont = NULL;
×
828
  *pContLen = 0;
×
829

830
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
831
    return taosAnalyJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
×
832
  } else {
833
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
834
  }
835
}
836

837
// extract the timeout parameter
838
int64_t taosAnalysisParseTimout(SHashObj* pHashMap, const char* id) {
×
839
  int32_t code = 0;
×
840
  char* pTimeout = taosHashGet(pHashMap, ALGO_OPT_TIMEOUT_NAME, strlen(ALGO_OPT_TIMEOUT_NAME));
×
841
  if (pTimeout == NULL) {
×
842
    uDebug("%s not set the timeout val, set default:%d", id, ANALY_DEFAULT_TIMEOUT);
×
843
    return ANALY_DEFAULT_TIMEOUT;
×
844
  } else {
845
    int64_t t = taosStr2Int64(pTimeout, NULL, 10);
×
846
    if (t <= 0 || t > ANALY_MAX_TIMEOUT) {
×
847
      uDebug("%s timeout val:%" PRId64 "s is invalid (greater than 10min or less than 1s), use default:%dms", id, t,
×
848
             ANALY_DEFAULT_TIMEOUT);
849
      return ANALY_DEFAULT_TIMEOUT;
×
850
    }
851

852
    uDebug("%s timeout val is set to: %" PRId64 "s", id, t);
×
853
    return t;
×
854
  }
855
}
856

857
int32_t taosAnalysisParseAlgo(const char* pOpt, char* pAlgoName, char* pUrl, int32_t type, int32_t len, SHashObj* pHashMap, const char* id) {
×
858
  char* pAlgo = taosHashGet(pHashMap, ALGO_OPT_ALGO_NAME, strlen(ALGO_OPT_ALGO_NAME));
×
859
  if (pAlgo == NULL) {
×
860
    uError("%s failed to get analysis algorithm name from %s", id, pOpt);
×
861
    return TSDB_CODE_ANA_ALGO_NOT_FOUND;
×
862
  }
863

864
  tstrncpy(pAlgoName, pAlgo, taosHashGetValueSize(pAlgo) + 1);
×
865

866
  if (taosAnalyGetAlgoUrl(pAlgoName, type, pUrl, len) != 0) {
×
867
    uError("%s failed to get analysis algorithm url from %s", id, pAlgoName);
×
868
    return TSDB_CODE_ANA_ALGO_NOT_LOAD;
×
869
  }
870

871
  return 0;
×
872
}
873

874
int8_t taosAnalysisParseWncheck(SHashObj* pHashMap, const char* id) {
×
875
  char* pWncheck = taosHashGet(pHashMap, ALGO_OPT_WNCHECK_NAME, strlen(ALGO_OPT_WNCHECK_NAME));
×
876
  if (pWncheck != NULL) {
×
877
    int32_t v = (int32_t) taosStr2Int64(pWncheck, NULL, 10);
×
878
    uDebug("%s analysis wncheck:%d", id, v);
×
879
    return v;
×
880
  } else {
881
    uDebug("%s analysis wncheck not found, use default:%d", id, ANALY_DEFAULT_WNCHECK);
×
882
    return ANALY_DEFAULT_WNCHECK;
×
883
  }
884
}
885

886
#else
887

888
int32_t taosAnalyticsInit() { return 0; }
889
void    taosAnalyticsCleanup() {}
890
SJson  *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout, const char* id) {
891
  return NULL;
892
}
893

894
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalyAlgoType type, char *url, int32_t urlLen) { return 0; }
895
bool    taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
896
int64_t taosAnalyGetVersion() { return 0; }
897
void    taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {}
898

899
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols, const char* id) { return 0; }
900
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
901
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
902
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
903
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
904
  return 0;
905
}
906
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
907
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
908
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
909
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
910
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
911
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) { return 0; }
912
void    taosAnalyBufDestroy(SAnalyticBuf *pBuf) {}
913

914
const char   *taosAnalysisAlgoType(EAnalyAlgoType algoType) { return 0; }
915
EAnalyAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
916
const char   *taosAnalAlgoUrlStr(EAnalyAlgoType algoType) { return 0; }
917

918
int64_t taosAnalysisParseTimout(SHashObj *pHashMap, const char *id) { return 0; }
919

920
int32_t taosAnalysisParseAlgo(const char *pOpt, char *pAlgoName, char *pUrl, int32_t type, int32_t len,
921
                              SHashObj *pHashMap, const char *id) {
922
  return 0;
923
}
924

925
int8_t taosAnalysisParseWncheck(SHashObj* pHashMap, const char* id) { return 0;}
926
int32_t taosAnalyGetOpts(const char *pOption, SHashObj **pOptHash) { return 0;}
927

928
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc