• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3842

07 Apr 2025 11:21AM UTC coverage: 62.696% (-0.3%) from 63.027%
#3842

push

travis-ci

web-flow
merge: from main to 3.0 branch (#30679)

154855 of 315075 branches covered (49.15%)

Branch coverage included in aggregate %.

6 of 8 new or added lines in 5 files covered. (75.0%)

2309 existing lines in 130 files now uncovered.

240176 of 314995 relevant lines covered (76.25%)

19119980.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.45
/source/common/src/tanalytics.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "tanalytics.h"
18
#include "ttypes.h"
19
#include "tutil.h"
20

21
#ifdef USE_ANALYTICS
22
#include <curl/curl.h>
23
#define ANALYTICS_ALOG_SPLIT_CHAR ","
24

25
typedef struct {
26
  int64_t       ver;
27
  SHashObj     *hash;  // algoname:algotype -> SAnalyticsUrl
28
  TdThreadMutex lock;
29
} SAlgoMgmt;
30

31
typedef struct {
32
  char   *data;
33
  int64_t dataLen;
34
} SCurlResp;
35

36
static SAlgoMgmt tsAlgos = {0};
37
static int32_t   taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen);
38

UNCOV
39
const char *taosAnalysisAlgoType(EAnalAlgoType type) {
×
UNCOV
40
  switch (type) {
×
UNCOV
41
    case ANALY_ALGO_TYPE_ANOMALY_DETECT:
×
UNCOV
42
      return "anomaly-detection";
×
UNCOV
43
    case ANALY_ALGO_TYPE_FORECAST:
×
UNCOV
44
      return "forecast";
×
45
    default:
×
46
      return "unknown";
×
47
  }
48
}
49

UNCOV
50
const char *taosAnalyAlgoUrlStr(EAnalAlgoType type) {
×
UNCOV
51
  switch (type) {
×
UNCOV
52
    case ANALY_ALGO_TYPE_ANOMALY_DETECT:
×
UNCOV
53
      return "anomaly-detect";
×
UNCOV
54
    case ANALY_ALGO_TYPE_FORECAST:
×
UNCOV
55
      return "forecast";
×
56
    default:
×
57
      return "unknown";
×
58
  }
59
}
60

UNCOV
61
EAnalAlgoType taosAnalyAlgoInt(const char *name) {
×
UNCOV
62
  for (EAnalAlgoType i = 0; i < ANALY_ALGO_TYPE_END; ++i) {
×
UNCOV
63
    if (strcasecmp(name, taosAnalysisAlgoType(i)) == 0) {
×
UNCOV
64
      return i;
×
65
    }
66
  }
67

68
  return ANALY_ALGO_TYPE_END;
×
69
}
70

71
int32_t taosAnalyticsInit() {
2,335✔
72
  if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
2,335!
73
    uError("failed to init curl");
×
74
    return -1;
×
75
  }
76

77
  tsAlgos.ver = 0;
2,335✔
78
  if (taosThreadMutexInit(&tsAlgos.lock, NULL) != 0) {
2,335!
79
    uError("failed to init algo mutex");
×
80
    return -1;
×
81
  }
82

83
  tsAlgos.hash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
2,335✔
84
  if (tsAlgos.hash == NULL) {
2,335!
85
    uError("failed to init algo hash");
×
86
    return -1;
×
87
  }
88

89
  uInfo("analysis env is initialized");
2,335!
90
  return 0;
2,335✔
91
}
92

93
static void taosAnalyFreeHash(SHashObj *hash) {
2,335✔
94
  void *pIter = taosHashIterate(hash, NULL);
2,335✔
95
  while (pIter != NULL) {
2,335!
UNCOV
96
    SAnalyticsUrl *pUrl = (SAnalyticsUrl *)pIter;
×
UNCOV
97
    taosMemoryFree(pUrl->url);
×
UNCOV
98
    pIter = taosHashIterate(hash, pIter);
×
99
  }
100
  taosHashCleanup(hash);
2,335✔
101
}
2,335✔
102

103
void taosAnalyticsCleanup() {
2,335✔
104
  curl_global_cleanup();
2,335✔
105
  if (taosThreadMutexDestroy(&tsAlgos.lock) != 0) {
2,335!
106
    uError("failed to destroy anal lock");
×
107
  }
108
  taosAnalyFreeHash(tsAlgos.hash);
2,335✔
109
  tsAlgos.hash = NULL;
2,335✔
110
  uInfo("analysis env is cleaned up");
2,335!
111
}
2,335✔
112

UNCOV
113
void taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {
×
UNCOV
114
  if (newVer > tsAlgos.ver) {
×
UNCOV
115
    if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
UNCOV
116
      SHashObj *hash = tsAlgos.hash;
×
UNCOV
117
      tsAlgos.ver = newVer;
×
UNCOV
118
      tsAlgos.hash = pHash;
×
UNCOV
119
      if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
120
        uError("failed to unlock hash")
×
121
      }
UNCOV
122
      taosAnalyFreeHash(hash);
×
123
    }
124
  } else {
125
    taosAnalyFreeHash(pHash);
×
126
  }
UNCOV
127
}
×
128

UNCOV
129
bool taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) {
×
UNCOV
130
  char  buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
×
UNCOV
131
  char *pStart = NULL;
×
UNCOV
132
  char *pEnd = NULL;
×
133

UNCOV
134
  pStart = strstr(option, optName);
×
UNCOV
135
  if (pStart == NULL) {
×
136
    return false;
×
137
  }
138

UNCOV
139
  pEnd = strstr(pStart, ANALYTICS_ALOG_SPLIT_CHAR);
×
UNCOV
140
  if (optMaxLen > 0) {
×
UNCOV
141
    if (pEnd > pStart) {
×
UNCOV
142
      int32_t len = (int32_t)(pEnd - pStart);
×
UNCOV
143
      len = MIN(len + 1, TSDB_ANALYTIC_ALGO_OPTION_LEN);
×
UNCOV
144
      tstrncpy(buf, pStart, len);
×
145
    } else {
UNCOV
146
      int32_t len = MIN(tListLen(buf), strlen(pStart) + 1);
×
UNCOV
147
      tstrncpy(buf, pStart, len);
×
148
    }
149

UNCOV
150
    char *pRight = strstr(buf, "=");
×
UNCOV
151
    if (pRight == NULL) {
×
152
      return false;
×
153
    } else {
UNCOV
154
      pRight += 1;
×
155
    }
156

UNCOV
157
    int32_t unused = strtrim(pRight);
×
158

UNCOV
159
    int32_t vLen = MIN(optMaxLen, strlen(pRight) + 1);
×
UNCOV
160
    tstrncpy(optValue, pRight, vLen);
×
161
  }
162

UNCOV
163
  return true;
×
164
}
165

UNCOV
166
bool taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue) {
×
UNCOV
167
  char    buf[TSDB_ANALYTIC_ALGO_OPTION_LEN] = {0};
×
UNCOV
168
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "%s=", optName);
×
169

UNCOV
170
  char *pos1 = strstr(option, buf);
×
UNCOV
171
  char *pos2 = strstr(option, ANALYTICS_ALOG_SPLIT_CHAR);
×
UNCOV
172
  if (pos1 != NULL) {
×
UNCOV
173
    *optValue = taosStr2Int64(pos1 + bufLen, NULL, 10);
×
UNCOV
174
    return true;
×
175
  } else {
UNCOV
176
    return false;
×
177
  }
178
}
179

UNCOV
180
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) {
×
UNCOV
181
  int32_t code = 0;
×
UNCOV
182
  char    name[TSDB_ANALYTIC_ALGO_KEY_LEN] = {0};
×
UNCOV
183
  int32_t nameLen = 1 + tsnprintf(name, sizeof(name) - 1, "%d:%s", type, algoName);
×
184

UNCOV
185
  char *unused = strntolower(name, name, nameLen);
×
186

UNCOV
187
  if (taosThreadMutexLock(&tsAlgos.lock) == 0) {
×
UNCOV
188
    SAnalyticsUrl *pUrl = taosHashAcquire(tsAlgos.hash, name, nameLen);
×
UNCOV
189
    if (pUrl != NULL) {
×
UNCOV
190
      tstrncpy(url, pUrl->url, urlLen);
×
UNCOV
191
      uDebug("algo:%s, type:%s, url:%s", algoName, taosAnalysisAlgoType(type), url);
×
192
    } else {
UNCOV
193
      url[0] = 0;
×
UNCOV
194
      terrno = TSDB_CODE_ANA_ALGO_NOT_FOUND;
×
UNCOV
195
      code = terrno;
×
UNCOV
196
      uError("algo:%s, type:%s, url not found", algoName, taosAnalysisAlgoType(type));
×
197
    }
198

UNCOV
199
    if (taosThreadMutexUnlock(&tsAlgos.lock) != 0) {
×
200
      uError("failed to unlock hash");
×
201
      return TSDB_CODE_OUT_OF_MEMORY;
×
202
    }
203
  }
204

UNCOV
205
  return code;
×
206
}
207

208
int64_t taosAnalyGetVersion() { return tsAlgos.ver; }
162,365✔
209

UNCOV
210
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
×
UNCOV
211
  SCurlResp *pRsp = userdata;
×
UNCOV
212
  if (contLen == 0 || nmemb == 0 || pCont == NULL) {
×
213
    pRsp->dataLen = 0;
×
214
    pRsp->data = NULL;
×
215
    uError("curl response is received, len:%" PRId64, pRsp->dataLen);
×
216
    return 0;
×
217
  }
218

UNCOV
219
  int64_t newDataSize = (int64_t)contLen * nmemb;
×
UNCOV
220
  int64_t size = pRsp->dataLen + newDataSize;
×
221

UNCOV
222
  if (pRsp->data == NULL) {
×
UNCOV
223
    pRsp->data = taosMemoryMalloc(size + 1);
×
UNCOV
224
    if (pRsp->data == NULL) {
×
225
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
226
      return 0;  // return the recv length, if failed, return 0
×
227
    }
228
  } else {
229
    char *p = taosMemoryRealloc(pRsp->data, size + 1);
×
230
    if (p == NULL) {
×
231
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
232
      return 0;  // return the recv length, if failed, return 0
×
233
    }
234

235
    pRsp->data = p;
×
236
  }
237

UNCOV
238
  if (pRsp->data != NULL) {
×
UNCOV
239
    (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize);
×
240

UNCOV
241
    pRsp->dataLen = size;
×
UNCOV
242
    pRsp->data[size] = 0;
×
243

UNCOV
244
    uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data);
×
UNCOV
245
    return newDataSize;
×
246
  } else {
247
    pRsp->dataLen = 0;
×
248
    uError("failed to malloc curl response");
×
249
    return 0;
×
250
  }
251
}
252

UNCOV
253
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp) {
×
UNCOV
254
  CURL    *curl = NULL;
×
UNCOV
255
  CURLcode code = 0;
×
256

UNCOV
257
  curl = curl_easy_init();
×
UNCOV
258
  if (curl == NULL) {
×
259
    uError("failed to create curl handle");
×
260
    return -1;
×
261
  }
262

UNCOV
263
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
UNCOV
264
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
UNCOV
265
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
UNCOV
266
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, 100) != 0) goto _OVER;
×
267

UNCOV
268
  uDebug("curl get request will sent, url:%s", url);
×
UNCOV
269
  code = curl_easy_perform(curl);
×
UNCOV
270
  if (code != CURLE_OK) {
×
UNCOV
271
    uError("failed to perform curl action, code:%d", code);
×
272
  }
273

UNCOV
274
_OVER:
×
UNCOV
275
  if (curl != NULL) curl_easy_cleanup(curl);
×
UNCOV
276
  return code;
×
277
}
278

UNCOV
279
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout) {
×
UNCOV
280
  struct curl_slist *headers = NULL;
×
UNCOV
281
  CURL              *curl = NULL;
×
UNCOV
282
  CURLcode           code = 0;
×
283

UNCOV
284
  curl = curl_easy_init();
×
UNCOV
285
  if (curl == NULL) {
×
286
    uError("failed to create curl handle");
×
287
    return -1;
×
288
  }
289

UNCOV
290
  headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
×
UNCOV
291
  if (curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers) != 0) goto _OVER;
×
UNCOV
292
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
UNCOV
293
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
UNCOV
294
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
UNCOV
295
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER;
×
UNCOV
296
  if (curl_easy_setopt(curl, CURLOPT_POST, 1) != 0) goto _OVER;
×
UNCOV
297
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen) != 0) goto _OVER;
×
UNCOV
298
  if (curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf) != 0) goto _OVER;
×
299

UNCOV
300
  uDebugL("curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf);
×
UNCOV
301
  code = curl_easy_perform(curl);
×
UNCOV
302
  if (code != CURLE_OK) {
×
303
    uError("failed to perform curl action, code:%d", code);
×
304
  }
305

UNCOV
306
_OVER:
×
UNCOV
307
  if (curl != NULL) {
×
UNCOV
308
    curl_slist_free_all(headers);
×
UNCOV
309
    curl_easy_cleanup(curl);
×
310
  }
UNCOV
311
  return code;
×
312
}
313

UNCOV
314
SJson *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) {
×
UNCOV
315
  int32_t   code = -1;
×
UNCOV
316
  char     *pCont = NULL;
×
317
  int64_t   contentLen;
UNCOV
318
  SJson    *pJson = NULL;
×
UNCOV
319
  SCurlResp curlRsp = {0};
×
320

UNCOV
321
  if (type == ANALYTICS_HTTP_TYPE_GET) {
×
UNCOV
322
    if (taosCurlGetRequest(url, &curlRsp) != 0) {
×
UNCOV
323
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
UNCOV
324
      goto _OVER;
×
325
    }
326
  } else {
UNCOV
327
    code = taosAnalyBufGetCont(pBuf, &pCont, &contentLen);
×
UNCOV
328
    if (code != 0) {
×
329
      terrno = code;
×
330
      goto _OVER;
×
331
    }
UNCOV
332
    if (taosCurlPostRequest(url, &curlRsp, pCont, contentLen, timeout) != 0) {
×
333
      terrno = TSDB_CODE_ANA_URL_CANT_ACCESS;
×
334
      goto _OVER;
×
335
    }
336
  }
337

UNCOV
338
  if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
×
339
    terrno = TSDB_CODE_ANA_URL_RSP_IS_NULL;
×
340
    goto _OVER;
×
341
  }
342

UNCOV
343
  pJson = tjsonParse(curlRsp.data);
×
UNCOV
344
  if (pJson == NULL) {
×
345
    if (curlRsp.data[0] == '<') {
×
346
      terrno = TSDB_CODE_ANA_ANODE_RETURN_ERROR;
×
347
    } else {
348
      terrno = TSDB_CODE_INVALID_JSON_FORMAT;
×
349
    }
350
    goto _OVER;
×
351
  }
352

UNCOV
353
_OVER:
×
UNCOV
354
  if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
×
UNCOV
355
  if (pCont != NULL) taosMemoryFree(pCont);
×
UNCOV
356
  return pJson;
×
357
}
358

UNCOV
359
static int32_t taosAnalyJsonBufGetCont(const char *fileName, char **ppCont, int64_t *pContLen) {
×
UNCOV
360
  int32_t   code = 0;
×
361
  int64_t   contLen;
UNCOV
362
  char     *pCont = NULL;
×
UNCOV
363
  TdFilePtr pFile = NULL;
×
364

UNCOV
365
  pFile = taosOpenFile(fileName, TD_FILE_READ);
×
UNCOV
366
  if (pFile == NULL) {
×
367
    code = terrno;
×
368
    goto _OVER;
×
369
  }
370

UNCOV
371
  code = taosFStatFile(pFile, &contLen, NULL);
×
UNCOV
372
  if (code != 0) goto _OVER;
×
373

UNCOV
374
  pCont = taosMemoryMalloc(contLen + 1);
×
UNCOV
375
  if (pCont == NULL) {
×
376
    code = TSDB_CODE_OUT_OF_MEMORY;
×
377
    goto _OVER;
×
378
  }
379

UNCOV
380
  if (taosReadFile(pFile, pCont, contLen) != contLen) {
×
381
    code = terrno;
×
382
    goto _OVER;
×
383
  }
384

UNCOV
385
  pCont[contLen] = '\0';
×
386

UNCOV
387
_OVER:
×
UNCOV
388
  if (code == 0) {
×
UNCOV
389
    *ppCont = pCont;
×
UNCOV
390
    *pContLen = contLen;
×
391
  } else {
392
    if (pCont != NULL) taosMemoryFree(pCont);
×
393
  }
UNCOV
394
  if (pFile != NULL) taosCloseFile(&pFile);
×
UNCOV
395
  return code;
×
396
}
397

UNCOV
398
static int32_t taosAnalyJsonBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
UNCOV
399
  char    buf[64] = {0};
×
UNCOV
400
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %" PRId64 ",\n", optName, optVal);
×
UNCOV
401
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
402
    return terrno;
×
403
  }
UNCOV
404
  return 0;
×
405
}
406

UNCOV
407
static int32_t taosAnalyJsonBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
UNCOV
408
  char    buf[128] = {0};
×
UNCOV
409
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": \"%s\",\n", optName, optVal);
×
UNCOV
410
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
411
    return terrno;
×
412
  }
UNCOV
413
  return 0;
×
414
}
415

416
static int32_t taosAnalyJsonBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
417
  char    buf[128] = {0};
×
418
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "\"%s\": %f,\n", optName, optVal);
×
419
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
420
    return terrno;
×
421
  }
422
  return 0;
×
423
}
424

UNCOV
425
static int32_t taosAnalyJsonBufWriteStr(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen) {
×
UNCOV
426
  if (bufLen <= 0) {
×
UNCOV
427
    bufLen = strlen(buf);
×
428
  }
UNCOV
429
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
430
    return terrno;
×
431
  }
UNCOV
432
  return 0;
×
433
}
434

UNCOV
435
static int32_t taosAnalyJsonBufWriteStart(SAnalyticBuf *pBuf) { return taosAnalyJsonBufWriteStr(pBuf, "{\n", 0); }
×
436

UNCOV
437
static int32_t tsosAnalyJsonBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
×
UNCOV
438
  pBuf->filePtr = taosOpenFile(pBuf->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
UNCOV
439
  if (pBuf->filePtr == NULL) {
×
440
    return terrno;
×
441
  }
442

UNCOV
443
  pBuf->pCols = taosMemoryCalloc(numOfCols, sizeof(SAnalyticsColBuf));
×
UNCOV
444
  if (pBuf->pCols == NULL) return TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
445
  pBuf->numOfCols = numOfCols;
×
446

UNCOV
447
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
UNCOV
448
    return taosAnalyJsonBufWriteStart(pBuf);
×
449
  }
450

UNCOV
451
  for (int32_t i = 0; i < numOfCols; ++i) {
×
UNCOV
452
    SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
UNCOV
453
    snprintf(pCol->fileName, sizeof(pCol->fileName), "%s-c%d", pBuf->fileName, i);
×
UNCOV
454
    pCol->filePtr =
×
UNCOV
455
        taosOpenFile(pCol->fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
×
UNCOV
456
    if (pCol->filePtr == NULL) {
×
457
      return terrno;
×
458
    }
459
  }
460

UNCOV
461
  return taosAnalyJsonBufWriteStart(pBuf);
×
462
}
463

UNCOV
464
static int32_t taosAnalyJsonBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
UNCOV
465
  char buf[128] = {0};
×
UNCOV
466
  bool first = (colIndex == 0);
×
UNCOV
467
  bool last = (colIndex == pBuf->numOfCols - 1);
×
468

UNCOV
469
  if (first) {
×
UNCOV
470
    if (taosAnalyJsonBufWriteStr(pBuf, "\"schema\": [\n", 0) != 0) {
×
471
      return terrno;
×
472
    }
473
  }
474

UNCOV
475
  int32_t bufLen = tsnprintf(buf, sizeof(buf), "  [\"%s\", \"%s\", %d]%s\n", colName, tDataTypes[colType].name,
×
476
                             tDataTypes[colType].bytes, last ? "" : ",");
UNCOV
477
  if (taosWriteFile(pBuf->filePtr, buf, bufLen) != bufLen) {
×
478
    return terrno;
×
479
  }
480

UNCOV
481
  if (last) {
×
UNCOV
482
    if (taosAnalyJsonBufWriteStr(pBuf, "],\n", 0) != 0) {
×
483
      return terrno;
×
484
    }
485
  }
486

UNCOV
487
  return 0;
×
488
}
489

UNCOV
490
static int32_t taosAnalyJsonBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
UNCOV
491
  return taosAnalyJsonBufWriteStr(pBuf, "\"data\": [\n", 0);
×
492
}
493

UNCOV
494
static int32_t taosAnalyJsonBufWriteStrUseCol(SAnalyticBuf *pBuf, const char *buf, int32_t bufLen, int32_t colIndex) {
×
UNCOV
495
  if (bufLen <= 0) {
×
UNCOV
496
    bufLen = strlen(buf);
×
497
  }
498

UNCOV
499
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON) {
×
UNCOV
500
    int32_t ret = taosWriteFile(pBuf->filePtr, buf, bufLen);
×
UNCOV
501
    if (ret != bufLen) {
×
502
      return terrno;
×
503
    }
504
  } else {
UNCOV
505
    int32_t ret = taosWriteFile(pBuf->pCols[colIndex].filePtr, buf, bufLen);
×
UNCOV
506
    if (ret != bufLen) {
×
507
      return terrno;
×
508
    }
509
  }
510

UNCOV
511
  return 0;
×
512
}
513

UNCOV
514
static int32_t taosAnalyJsonBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
UNCOV
515
  return taosAnalyJsonBufWriteStrUseCol(pBuf, "[\n", 0, colIndex);
×
516
}
517

UNCOV
518
static int32_t taosAnalyJsonBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
UNCOV
519
  if (colIndex == pBuf->numOfCols - 1) {
×
UNCOV
520
    return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n]\n", 0, colIndex);
×
521

522
  } else {
UNCOV
523
    return taosAnalyJsonBufWriteStrUseCol(pBuf, "\n],\n", 0, colIndex);
×
524
  }
525
}
526

UNCOV
527
static int32_t taosAnalyJsonBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
528
  char    buf[64];
UNCOV
529
  int32_t bufLen = 0;
×
530

UNCOV
531
  if (pBuf->pCols[colIndex].numOfRows != 0) {
×
UNCOV
532
    buf[bufLen] = ',';
×
UNCOV
533
    buf[bufLen + 1] = '\n';
×
UNCOV
534
    buf[bufLen + 2] = 0;
×
UNCOV
535
    bufLen += 2;
×
536
  }
537

UNCOV
538
  switch (colType) {
×
539
    case TSDB_DATA_TYPE_BOOL:
×
540
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", (*((int8_t *)colValue) == 1) ? 1 : 0);
×
541
      break;
×
UNCOV
542
    case TSDB_DATA_TYPE_TINYINT:
×
UNCOV
543
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int8_t *)colValue);
×
UNCOV
544
      break;
×
545
    case TSDB_DATA_TYPE_UTINYINT:
×
546
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint8_t *)colValue);
×
547
      break;
×
548
    case TSDB_DATA_TYPE_SMALLINT:
×
549
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int16_t *)colValue);
×
550
      break;
×
551
    case TSDB_DATA_TYPE_USMALLINT:
×
552
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint16_t *)colValue);
×
553
      break;
×
UNCOV
554
    case TSDB_DATA_TYPE_INT:
×
UNCOV
555
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%d", *(int32_t *)colValue);
×
UNCOV
556
      break;
×
557
    case TSDB_DATA_TYPE_UINT:
×
558
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%u", *(uint32_t *)colValue);
×
559
      break;
×
UNCOV
560
    case TSDB_DATA_TYPE_BIGINT:
×
561
    case TSDB_DATA_TYPE_TIMESTAMP:
UNCOV
562
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRId64, *(int64_t *)colValue);
×
UNCOV
563
      break;
×
564
    case TSDB_DATA_TYPE_UBIGINT:
×
565
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%" PRIu64, *(uint64_t *)colValue);
×
566
      break;
×
UNCOV
567
    case TSDB_DATA_TYPE_FLOAT:
×
UNCOV
568
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_FLOAT_VAL(colValue));
×
UNCOV
569
      break;
×
UNCOV
570
    case TSDB_DATA_TYPE_DOUBLE:
×
UNCOV
571
      bufLen += tsnprintf(buf + bufLen, sizeof(buf) - bufLen, "%f", GET_DOUBLE_VAL(colValue));
×
UNCOV
572
      break;
×
573
    default:
×
574
      buf[bufLen] = '\0';
×
575
  }
576

UNCOV
577
  pBuf->pCols[colIndex].numOfRows++;
×
UNCOV
578
  return taosAnalyJsonBufWriteStrUseCol(pBuf, buf, bufLen, colIndex);
×
579
}
580

UNCOV
581
static int32_t taosAnalyJsonBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
UNCOV
582
  int32_t code = 0;
×
UNCOV
583
  char   *pCont = NULL;
×
UNCOV
584
  int64_t contLen = 0;
×
585

UNCOV
586
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
587
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
UNCOV
588
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
589

UNCOV
590
      code = taosFsyncFile(pCol->filePtr);
×
UNCOV
591
      if (code != 0) return code;
×
592

UNCOV
593
      code = taosCloseFile(&pCol->filePtr);
×
UNCOV
594
      if (code != 0) return code;
×
595

UNCOV
596
      code = taosAnalyJsonBufGetCont(pBuf->pCols[i].fileName, &pCont, &contLen);
×
UNCOV
597
      if (code != 0) return code;
×
598

UNCOV
599
      code = taosAnalyJsonBufWriteStr(pBuf, pCont, contLen);
×
UNCOV
600
      if (code != 0) return code;
×
601

UNCOV
602
      taosMemoryFreeClear(pCont);
×
UNCOV
603
      contLen = 0;
×
604
    }
605
  }
606

UNCOV
607
  return taosAnalyJsonBufWriteStr(pBuf, "],\n", 0);
×
608
}
609

UNCOV
610
static int32_t taosAnalyJsonBufWriteEnd(SAnalyticBuf *pBuf) {
×
UNCOV
611
  int32_t code = taosAnalyJsonBufWriteOptInt(pBuf, "rows", pBuf->pCols[0].numOfRows);
×
UNCOV
612
  if (code != 0) return code;
×
613

UNCOV
614
  return taosAnalyJsonBufWriteStr(pBuf, "\"protocol\": 1.0\n}", 0);
×
615
}
616

UNCOV
617
int32_t taosAnalJsonBufClose(SAnalyticBuf *pBuf) {
×
UNCOV
618
  int32_t code = taosAnalyJsonBufWriteEnd(pBuf);
×
UNCOV
619
  if (code != 0) return code;
×
620

UNCOV
621
  if (pBuf->filePtr != NULL) {
×
UNCOV
622
    code = taosFsyncFile(pBuf->filePtr);
×
UNCOV
623
    if (code != 0) return code;
×
UNCOV
624
    code = taosCloseFile(&pBuf->filePtr);
×
UNCOV
625
    if (code != 0) return code;
×
626
  }
627

UNCOV
628
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
629
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
UNCOV
630
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
UNCOV
631
      if (pCol->filePtr != NULL) {
×
632
        code = taosFsyncFile(pCol->filePtr);
×
633
        if (code != 0) return code;
×
634
        code = taosCloseFile(&pCol->filePtr);
×
635
        if (code != 0) return code;
×
636
      }
637
    }
638
  }
639

UNCOV
640
  return 0;
×
641
}
642

UNCOV
643
void taosAnalyBufDestroy(SAnalyticBuf *pBuf) {
×
UNCOV
644
  if (pBuf->fileName[0] != 0) {
×
UNCOV
645
    if (pBuf->filePtr != NULL) (void)taosCloseFile(&pBuf->filePtr);
×
646
    // taosRemoveFile(pBuf->fileName);
UNCOV
647
    pBuf->fileName[0] = 0;
×
648
  }
649

UNCOV
650
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
651
    for (int32_t i = 0; i < pBuf->numOfCols; ++i) {
×
UNCOV
652
      SAnalyticsColBuf *pCol = &pBuf->pCols[i];
×
UNCOV
653
      if (pCol->fileName[0] != 0) {
×
UNCOV
654
        if (pCol->filePtr != NULL) (void)taosCloseFile(&pCol->filePtr);
×
UNCOV
655
        if (taosRemoveFile(pCol->fileName) != 0) {
×
656
          uError("failed to remove file %s", pCol->fileName);
×
657
        }
UNCOV
658
        pCol->fileName[0] = 0;
×
659
      }
660
    }
661
  }
662

UNCOV
663
  taosMemoryFreeClear(pBuf->pCols);
×
UNCOV
664
  pBuf->numOfCols = 0;
×
UNCOV
665
}
×
666

UNCOV
667
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) {
×
UNCOV
668
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
669
    return tsosAnalyJsonBufOpen(pBuf, numOfCols);
×
670
  } else {
671
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
672
  }
673
}
674

UNCOV
675
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) {
×
UNCOV
676
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
677
    return taosAnalyJsonBufWriteOptStr(pBuf, optName, optVal);
×
678
  } else {
679
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
680
  }
681
}
682

UNCOV
683
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) {
×
UNCOV
684
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
685
    return taosAnalyJsonBufWriteOptInt(pBuf, optName, optVal);
×
686
  } else {
687
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
688
  }
689
}
690

691
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) {
×
692
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
693
    return taosAnalyJsonBufWriteOptFloat(pBuf, optName, optVal);
×
694
  } else {
695
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
696
  }
697
}
698

UNCOV
699
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
×
UNCOV
700
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
701
    return taosAnalyJsonBufWriteColMeta(pBuf, colIndex, colType, colName);
×
702
  } else {
703
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
704
  }
705
}
706

UNCOV
707
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) {
×
UNCOV
708
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
709
    return taosAnalyJsonBufWriteDataBegin(pBuf);
×
710
  } else {
711
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
712
  }
713
}
714

UNCOV
715
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) {
×
UNCOV
716
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
717
    return taosAnalyJsonBufWriteColBegin(pBuf, colIndex);
×
718
  } else {
719
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
720
  }
721
}
722

UNCOV
723
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) {
×
UNCOV
724
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
725
    return taosAnalyJsonBufWriteColData(pBuf, colIndex, colType, colValue);
×
726
  } else {
727
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
728
  }
729
}
730

UNCOV
731
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) {
×
UNCOV
732
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
733
    return taosAnalyJsonBufWriteColEnd(pBuf, colIndex);
×
734
  } else {
735
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
736
  }
737
}
738

UNCOV
739
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) {
×
UNCOV
740
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
741
    return taosAnalyJsonBufWriteDataEnd(pBuf);
×
742
  } else {
743
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
744
  }
745
}
746

UNCOV
747
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) {
×
UNCOV
748
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
749
    return taosAnalJsonBufClose(pBuf);
×
750
  } else {
751
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
752
  }
753
}
754

UNCOV
755
static int32_t taosAnalyBufGetCont(SAnalyticBuf *pBuf, char **ppCont, int64_t *pContLen) {
×
UNCOV
756
  *ppCont = NULL;
×
UNCOV
757
  *pContLen = 0;
×
758

UNCOV
759
  if (pBuf->bufType == ANALYTICS_BUF_TYPE_JSON || pBuf->bufType == ANALYTICS_BUF_TYPE_JSON_COL) {
×
UNCOV
760
    return taosAnalyJsonBufGetCont(pBuf->fileName, ppCont, pContLen);
×
761
  } else {
762
    return TSDB_CODE_ANA_BUF_INVALID_TYPE;
×
763
  }
764
}
765

766
#else
767

768
int32_t taosAnalyticsInit() { return 0; }
769
void    taosAnalyticsCleanup() {}
770
SJson  *taosAnalySendReqRetJson(const char *url, EAnalyHttpType type, SAnalyticBuf *pBuf, int64_t timeout) { return NULL; }
771

772
int32_t taosAnalyGetAlgoUrl(const char *algoName, EAnalAlgoType type, char *url, int32_t urlLen) { return 0; }
773
bool    taosAnalyGetOptStr(const char *option, const char *optName, char *optValue, int32_t optMaxLen) { return true; }
774
bool    taosAnalyGetOptInt(const char *option, const char *optName, int64_t *optValue) { return true; }
775
int64_t taosAnalyGetVersion() { return 0; }
776
void    taosAnalyUpdate(int64_t newVer, SHashObj *pHash) {}
777

778
int32_t tsosAnalyBufOpen(SAnalyticBuf *pBuf, int32_t numOfCols) { return 0; }
779
int32_t taosAnalyBufWriteOptStr(SAnalyticBuf *pBuf, const char *optName, const char *optVal) { return 0; }
780
int32_t taosAnalyBufWriteOptInt(SAnalyticBuf *pBuf, const char *optName, int64_t optVal) { return 0; }
781
int32_t taosAnalyBufWriteOptFloat(SAnalyticBuf *pBuf, const char *optName, float optVal) { return 0; }
782
int32_t taosAnalyBufWriteColMeta(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, const char *colName) {
783
  return 0;
784
}
785
int32_t taosAnalyBufWriteDataBegin(SAnalyticBuf *pBuf) { return 0; }
786
int32_t taosAnalyBufWriteColBegin(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
787
int32_t taosAnalyBufWriteColData(SAnalyticBuf *pBuf, int32_t colIndex, int32_t colType, void *colValue) { return 0; }
788
int32_t taosAnalyBufWriteColEnd(SAnalyticBuf *pBuf, int32_t colIndex) { return 0; }
789
int32_t taosAnalyBufWriteDataEnd(SAnalyticBuf *pBuf) { return 0; }
790
int32_t taosAnalyBufClose(SAnalyticBuf *pBuf) { return 0; }
791
void    taosAnalyBufDestroy(SAnalyticBuf *pBuf) {}
792

793
const char   *taosAnalysisAlgoType(EAnalAlgoType algoType) { return 0; }
794
EAnalAlgoType taosAnalAlgoInt(const char *algoName) { return 0; }
795
const char   *taosAnalAlgoUrlStr(EAnalAlgoType algoType) { return 0; }
796

797
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc