• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 13135713772

04 Feb 2025 12:42PM UTC coverage: 75.074% (-0.03%) from 75.104%
13135713772

Pull #843

github

web-flow
Merge 7e4333581 into 85e3589c2
Pull Request #843: taos-tools Merge 3.0 to Main Branch

433 of 593 new or added lines in 6 files covered. (73.02%)

41 existing lines in 6 files now uncovered.

12457 of 16593 relevant lines covered (75.07%)

327996.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.35
/src/benchUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
char resEncodingChunk[] = "Encoding: chunked";
17
char succMessage[] = "succ";
18
char resHttp[] = "HTTP/1.1 ";
19
char resHttpOk[] = "HTTP/1.1 200 OK";
20
char influxHttpOk[] = "HTTP/1.1 204";
21
char opentsdbHttpOk[] = "HTTP/1.1 400";
22

23
FORCE_INLINE void* benchCalloc(size_t nmemb, size_t size, bool record) {
3,908,219✔
24
    void* ret = calloc(nmemb, size);
3,929,475✔
25
    if (NULL == ret) {
3,921,999✔
26
        errorPrint("%s", "failed to allocate memory\n");
×
27
        exit(EXIT_FAILURE);
×
28
    }
29
    if (record) {
3,929,475✔
30
        g_memoryUsage += nmemb * size;
158,567✔
31
    }
32
    return ret;
3,929,475✔
33
}
34

35
FORCE_INLINE void tmfclose(FILE *fp) {
199✔
36
    if (NULL != fp) {
10,037✔
37
        fclose(fp);
10,035✔
38
        fp = NULL;
10,035✔
39
    }
40
}
9,838✔
41

42
FORCE_INLINE void tmfree(void *buf) {
3,898,116✔
43
    if (NULL != buf) {
3,957,274✔
44
        free(buf);
3,950,375✔
45
    }
46
}
61,834✔
47

48
FORCE_INLINE bool isRest(int32_t iface) { 
742✔
49
    return REST_IFACE == iface || SML_REST_IFACE == iface;
742✔
50
}
51

52
void ERROR_EXIT(const char *msg) {
×
53
    errorPrint("%s", msg);
×
54
    exit(EXIT_FAILURE);
×
55
}
56

57
#ifdef WINDOWS
58
HANDLE g_stdoutHandle;
59
DWORD  g_consoleMode;
60

61
void setupForAnsiEscape(void) {
62
    DWORD mode = 0;
63
    g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
64

65
    if (g_stdoutHandle == INVALID_HANDLE_VALUE) {
66
        exit(GetLastError());
67
    }
68

69
    if (!GetConsoleMode(g_stdoutHandle, &mode)) {
70
        exit(GetLastError());
71
    }
72

73
    g_consoleMode = mode;
74

75
    // Enable ANSI escape codes
76
    mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
77

78
    if (!SetConsoleMode(g_stdoutHandle, mode)) {
79
        exit(GetLastError());
80
    }
81
}
82

83
void resetAfterAnsiEscape(void) {
84
    // Reset colors
85
    printf("\x1b[0m");
86

87
    // Reset console mode
88
    if (!SetConsoleMode(g_stdoutHandle, g_consoleMode)) {
89
        exit(GetLastError());
90
    }
91
}
92

93
unsigned int taosRandom() {
94
    unsigned int number;
95
    rand_s(&number);
96

97
    return number;
98
}
99
#else  // Not windows
100
void setupForAnsiEscape(void) {}
×
101

102
void resetAfterAnsiEscape(void) {
×
103
    // Reset colors
104
    printf("\x1b[0m");
×
105
}
×
106

107
FORCE_INLINE unsigned int taosRandom() { return (unsigned int)rand(); }
253,186,287✔
108
#endif
109

110
void swapItem(char** names, int32_t i, int32_t j ) {
35✔
111
    debugPrint("swap item i=%d (%s) j=%d (%s)\n", i, names[i], j, names[j]);
35✔
112
    char * p = names[i];
35✔
113
    names[i] = names[j];
35✔
114
    names[j] = p;
35✔
115
}
35✔
116

117
int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
11✔
118
        char ** childTblNameOfSuperTbl,
119
        int64_t childTblCountOfSuperTbl) {
120
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
11✔
121
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
11✔
122
             "select distinct tbname from %s.`%s` limit %" PRId64 "",
123
            dbName, stbName, childTblCountOfSuperTbl);
124
    TAOS_RES *res = taos_query(taos, cmd);
11✔
125
    int32_t   code = taos_errno(res);
11✔
126
    int64_t   count = 0;
11✔
127
    if (code) {
11✔
128
        printErrCmdCodeStr(cmd, code, res);
129
        return -1;
×
130
    }
131
    TAOS_ROW row = NULL;
11✔
132
    while ((row = taos_fetch_row(res)) != NULL) {
91✔
133
        if (0 == strlen((char *)(row[0]))) {
80✔
134
            errorPrint("No.%" PRId64 " table return empty name\n",
×
135
                    count);
136
            return -1;
×
137
        }
138
        int32_t * lengths = taos_fetch_lengths(res);
80✔
139
        childTblNameOfSuperTbl[count] =
80✔
140
            benchCalloc(1, TSDB_TABLE_NAME_LEN + 3, true);
80✔
141
        childTblNameOfSuperTbl[count][0] = '`';
80✔
142
        strncpy(childTblNameOfSuperTbl[count] + 1, row[0], lengths[0]);
80✔
143
        childTblNameOfSuperTbl[count][lengths[0] + 1] = '`';
80✔
144
        childTblNameOfSuperTbl[count][lengths[0] + 2] = '\0';
80✔
145
        debugPrint("childTblNameOfSuperTbl[%" PRId64 "]: %s\n", count,
80✔
146
                childTblNameOfSuperTbl[count]);
147
        count++;
80✔
148
    }
149
    taos_free_result(res);
11✔
150

151
    // random swap order
152
    if (count < 4) {
11✔
153
        return 0;
4✔
154
    }
155

156
    int32_t swapCnt = count/2;
7✔
157
    for(int32_t i = 0; i < swapCnt; i++ ) {
42✔
158
        int32_t j = swapCnt + RD(swapCnt);
70✔
159
        swapItem(childTblNameOfSuperTbl, i, j);
35✔
160
    }
161
    return 0;
7✔
162
}
163

164
int convertHostToServAddr(char *host, uint16_t port,
279✔
165
        struct sockaddr_in *serv_addr) {
166
    if (!host) {
279✔
167
        errorPrint("%s", "convertHostToServAddr host is null.");
×
168
        return -1;
×
169
    }
170
    debugPrint("convertHostToServAddr(host: %s, port: %d)\n", host,
279✔
171
            port);
172
#ifdef WINDOWS
173
    WSADATA wsaData;
174
    int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
175
    if (ret) {
176
        return ret;
177
    }
178
#endif
179
    struct hostent *server = gethostbyname(host);
279✔
180
    if ((server == NULL) || (server->h_addr == NULL)) {
279✔
181
        errorPrint("%s", "no such host");
×
182
        return -1;
×
183
    }
184
    memset(serv_addr, 0, sizeof(struct sockaddr_in));
279✔
185
    serv_addr->sin_family = AF_INET;
279✔
186
    serv_addr->sin_port = htons(port);
279✔
187

188
#ifdef WINDOWS
189
    struct addrinfo  hints = {0};
190
    hints.ai_family = AF_INET;
191
    hints.ai_socktype = SOCK_STREAM;
192

193
    struct addrinfo *pai = NULL;
194

195
    if (!getaddrinfo(server->h_name, NULL, &hints, &pai)) {
196
        serv_addr->sin_addr.s_addr =
197
               ((struct sockaddr_in *) pai->ai_addr)->sin_addr.s_addr;
198
        freeaddrinfo(pai);
199
    }
200
    WSACleanup();
201
#else
202
    serv_addr->sin_addr.s_addr = inet_addr(host);
279✔
203
    memcpy(&(serv_addr->sin_addr.s_addr), server->h_addr, server->h_length);
279✔
204
#endif
205
    return 0;
279✔
206
}
207

208
void prompt(bool nonStopMode) {
638✔
209
    if (!g_arguments->answer_yes) {
638✔
210
        g_arguments->in_prompt = true;
7✔
211
        if (nonStopMode) {
7✔
212
            printf(
×
213
                    "\n\n         Current is the Non-Stop insertion mode. "
214
                    "benchmark will continuously "
215
                    "insert data unless you press "
216
                    "Ctrl-C to end it.\n\n         "
217
                    "press enter key to continue and "
218
                    "Ctrl-C to "
219
                    "stop\n\n");
220
            (void)getchar();
×
221
        } else {
222
            printf(
7✔
223
                    "\n\n         Press enter key to continue or Ctrl-C to "
224
                    "stop\n\n");
225
            (void)getchar();
7✔
226
        }
227
        g_arguments->in_prompt = false;
7✔
228
    }
229
}
638✔
230

231
static void appendResultBufToFile(char *resultBuf, char * filePath) {
9,835✔
232
    FILE* fp = fopen(filePath, "at");
9,835✔
233
    if (fp == NULL) {
9,835✔
234
        errorPrint(
×
235
                "failed to open result file: %s, result will not save "
236
                "to file\n", filePath);
237
        return;
×
238
    }
239
    fprintf(fp, "%s", resultBuf);
9,835✔
240
    tmfclose(fp);
241
}
242

243
int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex) {
9,062✔
244
    // child table mark
245
    char mark[32] = "xxxx";
9,062✔
246
    char *pos = strstr(inSql, mark);
9,062✔
247
    if (0 == pos) {
9,062✔
NEW
248
        errorPrint("sql format error, sql not found mark string '%s'", mark);
×
NEW
249
        return -1;
×
250
    }        
251

252
    char subTblName[TSDB_TABLE_NAME_LEN];
253
    snprintf(subTblName, TSDB_TABLE_NAME_LEN,
9,062✔
254
            "`%s`.%s", g_queryInfo.dbName,
255
            g_queryInfo.superQueryInfo.childTblName[tblIndex]);
9,062✔
256

257
    tstrncpy(outSql, inSql, pos - inSql + 1);
9,062✔
258
    snprintf(outSql + (pos - inSql), TSDB_MAX_ALLOWED_SQL_LEN - 1,
9,062✔
259
             "%s%s", subTblName, pos + strlen(mark));
9,062✔
260
    return 0;         
9,062✔
261
}
262

263
int64_t toolsGetTimestamp(int32_t precision) {
72✔
264
    if (precision == TSDB_TIME_PRECISION_MICRO) {
72✔
265
        return toolsGetTimestampUs();
×
266
    } else if (precision == TSDB_TIME_PRECISION_NANO) {
72✔
267
        return toolsGetTimestampNs();
×
268
    } else {
269
        return toolsGetTimestampMs();
72✔
270
    }
271
}
272

273
int regexMatch(const char *s, const char *reg, int cflags) {
193✔
274
    regex_t regex;
275
    char    msgbuf[100] = {0};
193✔
276

277
    /* Compile regular expression */
278
    if (regcomp(&regex, reg, cflags) != 0)
193✔
279
        ERROR_EXIT("Failed to regex compile\n");
×
280

281
    /* Execute regular expression */
282
    int reti = regexec(&regex, s, 0, NULL, 0);
193✔
283
    if (!reti) {
193✔
284
        regfree(&regex);
3✔
285
        return 1;
3✔
286
    } else if (reti == REG_NOMATCH) {
190✔
287
        regfree(&regex);
190✔
288
        return 0;
190✔
289
    } else {
290
        regerror(reti, &regex, msgbuf, sizeof(msgbuf));
×
291
        regfree(&regex);
×
292
        printf("Regex match failed: %s\n", msgbuf);
×
293
        exit(EXIT_FAILURE);
×
294
    }
295
    return 0;
296
}
297

298

299

300

301
SBenchConn* initBenchConnImpl() {
1,728✔
302
    SBenchConn* conn = benchCalloc(1, sizeof(SBenchConn), true);
1,728✔
303
#ifdef WEBSOCKET
304
    if (g_arguments->websocket) {
1,728✔
305
        conn->taos_ws = ws_connect(g_arguments->dsn);
29✔
306
        char maskedDsn[256] = "\0";
29✔
307
        memcpy(maskedDsn, g_arguments->dsn, 20);
29✔
308
        memcpy(maskedDsn+20, "...", 3);
29✔
309
        memcpy(maskedDsn+23,
29✔
310
               g_arguments->dsn + strlen(g_arguments->dsn)-10, 10);
29✔
311
        if (conn->taos_ws == NULL) {
29✔
312
            errorPrint("failed to connect %s, reason: %s\n",
1✔
313
                    maskedDsn, ws_errstr(NULL));
314
            tmfree(conn);
315
            return NULL;
1✔
316
        }
317

318
        succPrint("%s conneced\n", maskedDsn);
28✔
319
    } else {
320
#endif
321
        conn->taos = taos_connect(g_arguments->host,
3,398✔
322
                g_arguments->user, g_arguments->password,
1,699✔
323
                NULL, g_arguments->port);
1,699✔
324
        if (conn->taos == NULL) {
1,699✔
325
            errorPrint("failed to connect native %s:%d, "
3✔
326
                       "code: 0x%08x, reason: %s\n",
327
                    g_arguments->host, g_arguments->port,
328
                    taos_errno(NULL), taos_errstr(NULL));
329
            tmfree(conn);
330
            return NULL;
3✔
331
        }
332

333
        conn->ctaos = taos_connect(g_arguments->host,
1,696✔
334
                                   g_arguments->user,
1,696✔
335
                                   g_arguments->password,
1,696✔
336
                                   NULL, g_arguments->port);
1,696✔
337
#ifdef WEBSOCKET
338
    }
339
#endif
340
    return conn;
1,724✔
341
}
342

343
SBenchConn* initBenchConn() {
1,728✔
344

345
    SBenchConn* conn = NULL;
1,728✔
346
    int32_t keep_trying = 0;
1,728✔
347
    while(1) {
348
        conn = initBenchConnImpl();
1,728✔
349
        if(conn || ++keep_trying > g_arguments->keep_trying  || g_arguments->terminate) {
1,728✔
350
            break;
351
        }
352

UNCOV
353
        infoPrint("sleep %dms and try to connect... %d  \n", g_arguments->trying_interval, keep_trying);
×
UNCOV
354
        if(g_arguments->trying_interval > 0) {
×
355
            toolsMsleep(g_arguments->trying_interval);
×
356
        }        
357
    } 
358

359
    return conn;
1,728✔
360
}
361

362
void closeBenchConn(SBenchConn* conn) {
1,724✔
363
    if(conn == NULL)
1,724✔
364
       return ;
×
365
#ifdef WEBSOCKET
366
    if (g_arguments->websocket) {
1,724✔
367
        ws_close(conn->taos_ws);
28✔
368
    } else {
369
#endif
370
        if(conn->taos) {
1,696✔
371
            taos_close(conn->taos);
1,696✔
372
            conn->taos = NULL;
1,696✔
373
        }
374
        if (conn->ctaos) {
1,696✔
375
            taos_close(conn->ctaos);
1,696✔
376
            conn->ctaos = NULL;
1,696✔
377
        }
378
#ifdef WEBSOCKET
379
    }
380
#endif
381
    tmfree(conn);
382
}
383

384
int32_t queryDbExecRest(char *command, char* dbName, int precision,
21✔
385
                    int iface, int protocol, bool tcp, int sockfd) {
386
    int32_t code = postProceSql(command,
21✔
387
                         dbName,
388
                         precision,
389
                         iface,
390
                         protocol,
391
                         g_arguments->port,
21✔
392
                         tcp,
393
                         sockfd,
394
                         NULL);
395
    return code;
21✔
396
}
397

398
int32_t queryDbExecCall(SBenchConn *conn, char *command) {
66,342✔
399
    int32_t code = 0;
66,342✔
400
#ifdef WEBSOCKET
401
    if (g_arguments->websocket) {
66,342✔
402
        WS_RES* res = ws_query_timeout(conn->taos_ws,
1,029✔
403
                                       command, g_arguments->timeout);
1,029✔
404
        code = ws_errno(res);
1,029✔
405
        if (code != 0) {
1,029✔
406
            errorPrint("Failed to execute <%s>, code: 0x%08x, reason: %s\n",
×
407
                       command, code, ws_errstr(res));
408
        }
409
        ws_free_result(res);
1,029✔
410
    } else {
411
#endif
412
        TAOS_RES *res = taos_query(conn->taos, command);
65,313✔
413
        code = taos_errno(res);
65,321✔
414
        if (code) {
65,248✔
415
            printErrCmdCodeStr(command, code, res);
416
        } else {
417
            taos_free_result(res);
65,235✔
418
        }
419
#ifdef WEBSOCKET
420
    }
421
#endif
422
    return code;
66,337✔
423
}
424

425
void encodeAuthBase64() {
193✔
426
    char        userpass_buf[INPUT_BUF_LEN];
427
    static char base64[] = {
428
        'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
429
        'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
430
        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
431
        'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
432
        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'};
433
    snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s", g_arguments->user,
193✔
434
            g_arguments->password);
193✔
435

436
    int mod_table[] = {0, 2, 1};
193✔
437

438
    size_t userpass_buf_len = strlen(userpass_buf);
193✔
439
    size_t encoded_len = 4 * ((userpass_buf_len + 2) / 3);
193✔
440

441
    memset(g_arguments->base64_buf, 0, INPUT_BUF_LEN);
193✔
442
    for (int n = 0, m = 0; n < userpass_buf_len;) {
1,163✔
443
        uint32_t oct_a =
970✔
444
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
970✔
445
        uint32_t oct_b =
970✔
446
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
970✔
447
        uint32_t oct_c =
970✔
448
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
970✔
449
        uint32_t triple = (oct_a << 0x10) + (oct_b << 0x08) + oct_c;
970✔
450

451
        g_arguments->base64_buf[m++] = base64[(triple >> 3 * 6) & 0x3f];
970✔
452
        g_arguments->base64_buf[m++] = base64[(triple >> 2 * 6) & 0x3f];
970✔
453
        g_arguments->base64_buf[m++] = base64[(triple >> 1 * 6) & 0x3f];
970✔
454
        g_arguments->base64_buf[m++] = base64[(triple >> 0 * 6) & 0x3f];
970✔
455
    }
456

457
    for (int l = 0; l < mod_table[userpass_buf_len % 3]; l++)
578✔
458
        g_arguments->base64_buf[encoded_len - 1 - l] = '=';
385✔
459
}
193✔
460

461
int postProceSqlImpl(char *sqlstr, char* dbName, int precision, int iface,
2,570✔
462
                     int protocol, uint16_t rest_port, bool tcp, int sockfd,
463
                     char* filePath,
464
                     char *responseBuf, int64_t response_length) {
465
    int32_t      code = -1;
2,570✔
466
    char *       req_fmt =
2,570✔
467
        "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: "
468
        "Basic %s\r\nContent-Length: %d\r\nContent-Type: "
469
        "application/x-www-form-urlencoded\r\n\r\n%s";
470
    char url[URL_BUFF_LEN] = {0};
2,570✔
471
    if (iface == REST_IFACE) {
2,570✔
472
        snprintf(url, URL_BUFF_LEN, "/rest/sql/%s", dbName);
2,408✔
473
    } else if (iface == SML_REST_IFACE
162✔
474
            && protocol == TSDB_SML_LINE_PROTOCOL) {
160✔
475
        snprintf(url, URL_BUFF_LEN,
50✔
476
                 "/influxdb/v1/write?db=%s&precision=%s", dbName,
477
                precision == TSDB_TIME_PRECISION_MILLI
478
                ? "ms"
479
                : precision == TSDB_TIME_PRECISION_NANO
480
                ? "ns"
481
                : "u");
×
482
    } else if (iface == SML_REST_IFACE
112✔
483
            && protocol == TSDB_SML_TELNET_PROTOCOL) {
110✔
484
        snprintf(url, URL_BUFF_LEN, "/opentsdb/v1/put/telnet/%s", dbName);
65✔
485
    } else if (iface == SML_REST_IFACE
47✔
486
            && (protocol == TSDB_SML_JSON_PROTOCOL
46✔
487
                || protocol == SML_JSON_TAOS_FORMAT)) {
1✔
488
        snprintf(url, URL_BUFF_LEN, "/opentsdb/v1/put/json/%s", dbName);
45✔
489
    }
490

491
    int      bytes, sent, received, req_str_len, resp_len;
492
    char *   request_buf = NULL;
2,570✔
493
    int req_buf_len = (int)strlen(sqlstr) + REQ_EXTRA_BUF_LEN;
2,570✔
494

495
    if (g_arguments->terminate) {
2,570✔
496
        goto free_of_postImpl;
×
497
    }
498
    request_buf = benchCalloc(1, req_buf_len, false);
2,570✔
499

500
    int r;
501
    if (protocol == TSDB_SML_TELNET_PROTOCOL && tcp) {
2,570✔
502
        r = snprintf(request_buf, req_buf_len, "%s", sqlstr);
32✔
503
    } else {
504
        r = snprintf(request_buf, req_buf_len, req_fmt, url, g_arguments->host,
2,538✔
505
                rest_port, g_arguments->base64_buf, strlen(sqlstr),
2,538✔
506
                sqlstr);
507
    }
508
    if (r >= req_buf_len) {
2,570✔
509
        free(request_buf);
×
510
        ERROR_EXIT("too long request");
×
511
    }
512

513
    req_str_len = (int)strlen(request_buf);
2,567✔
514
    debugPrint("request buffer: %s\n", request_buf);
2,567✔
515
    sent = 0;
2,567✔
516
    do {
517
        bytes = send(sockfd, request_buf + sent,
5,132✔
518
                req_str_len - sent, 0);
2,567✔
519
        if (bytes < 0) {
2,565✔
520
            errorPrint("%s", "writing no message to socket\n");
3✔
521
            goto free_of_postImpl;
×
522
        }
523
        if (bytes == 0) break;
2,562✔
524
        sent += bytes;
2,562✔
525
    } while ((sent < req_str_len) && !g_arguments->terminate);
2,562✔
526

527
    if (protocol == TSDB_SML_TELNET_PROTOCOL
2,562✔
528
            && iface == SML_REST_IFACE && tcp) {
64✔
529
        code = 0;
31✔
530
        goto free_of_postImpl;
31✔
531
    }
532

533
    resp_len = response_length - 1;
2,531✔
534
    received = 0;
2,531✔
535

536
    bool chunked = false;
2,531✔
537

538
    if (g_arguments->terminate) {
2,531✔
539
        goto free_of_postImpl;
×
540
    }
541
    do {
542
        bytes = recv(sockfd, responseBuf + received,
8,932✔
543
                resp_len - received, 0);
4,463✔
544
        if (bytes <= 0) {
4,469✔
NEW
545
            errorPrint("%s", "reading no response from socket\n");
×
NEW
546
            goto free_of_postImpl;
×
547
        }
548
        responseBuf[resp_len] = 0;
4,469✔
549
        debugPrint("response buffer: %s bytes=%d\n", responseBuf, bytes);
4,469✔
550
        if (NULL != strstr(responseBuf, resEncodingChunk)) {
4,468✔
551
            chunked = true;
4,235✔
552
        }
553
        int64_t index = strlen(responseBuf) - 1;
4,468✔
554
        while (responseBuf[index] == '\n' || responseBuf[index] == '\r') {
18,368✔
555
            if (index == 0) {
13,900✔
NEW
556
                break;
×
557
            }
558
            index--;
13,900✔
559
        }
560
        debugPrint("index: %" PRId64 "\n", index);
4,468✔
561
        if (chunked && responseBuf[index] == '0') {
4,468✔
562
            code = 0;
2,249✔
563
            break;
2,249✔
564
        }
565
        if (!chunked && responseBuf[index] == '}') {
2,219✔
566
            code = 0;
×
567
            break;
×
568
        }
569

570
        received += bytes;
2,219✔
571

572
        if (g_arguments->test_mode == INSERT_TEST) {
2,219✔
573
            if (strlen(responseBuf)) {
391✔
574
                if (((NULL != strstr(responseBuf, resEncodingChunk)) &&
392✔
575
                            (NULL != strstr(responseBuf, resHttp))) ||
158✔
576
                        ((NULL != strstr(responseBuf, resHttpOk)) ||
234✔
577
                         (NULL != strstr(responseBuf, influxHttpOk)) ||
234✔
578
                         (NULL != strstr(responseBuf, opentsdbHttpOk)))) {
105✔
579
                    break;
580
                }
581
            }
582
        }
583
    } while ((received < resp_len) && !g_arguments->terminate);
1,932✔
584

585
    if (received == resp_len) {
2,536✔
586
        errorPrint("%s", "storing complete response from socket\n");
×
587
        goto free_of_postImpl;
×
588
    }
589

590
    if (NULL == strstr(responseBuf, resHttpOk) &&
2,536✔
591
            NULL == strstr(responseBuf, influxHttpOk) &&
128✔
592
            NULL == strstr(responseBuf, succMessage) &&
×
593
            NULL == strstr(responseBuf, opentsdbHttpOk)) {
×
594
        errorPrint("Response:\n%s\n", responseBuf);
×
595
        goto free_of_postImpl;
×
596
    }
597

598
    code = 0;
2,536✔
599
free_of_postImpl:
2,567✔
600
    if (filePath && strlen(filePath) > 0 && !g_arguments->terminate) {
2,567✔
601
        appendResultBufToFile(responseBuf, filePath);
1,336✔
602
    }
603
    tmfree(request_buf);
604
    return code;
2,567✔
605
}
606

607
static int getServerVersionRestImpl(int16_t rest_port, int sockfd) {
18✔
608
    int server_ver = -1;
18✔
609
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
18✔
610
    snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT SERVER_VERSION()");
18✔
611
    char *responseBuf = benchCalloc(1, RESP_BUF_LEN, false);
18✔
612
    int code = postProceSqlImpl(command,
18✔
613
                                NULL,
614
                                0,
615
                                REST_IFACE,
616
                                0,
617
                                rest_port,
618
                                false,
619
                                sockfd,
620
                                NULL, responseBuf, RESP_BUF_LEN);
621
    if (code != 0) {
18✔
622
        errorPrint("Failed to execute command: %s\n", command);
×
623
        goto free_of_getversion;
×
624
    }
625
    debugPrint("response buffer: %s\n", responseBuf);
18✔
626
    if (NULL != strstr(responseBuf, resHttpOk)) {
18✔
627
        char* start = strstr(responseBuf, "{");
18✔
628
        if (start == NULL) {
18✔
629
            errorPrint("Invalid response format: %s\n", responseBuf);
×
630
            goto free_of_getversion;
×
631
        }
632
        tools_cJSON* resObj = tools_cJSON_Parse(start);
18✔
633
        if (resObj == NULL) {
18✔
634
            errorPrint("Cannot parse response into json: %s\n", start);
×
635
        }
636
        tools_cJSON* dataObj = tools_cJSON_GetObjectItem(resObj, "data");
18✔
637
        if (!tools_cJSON_IsArray(dataObj)) {
18✔
638
            char* pstr = tools_cJSON_Print(resObj);
×
639
            errorPrint("Invalid or miss 'data' key in json: %s\n", pstr ? pstr : "null");
×
640
            tmfree(pstr);
641
            tools_cJSON_Delete(resObj);
×
642
            goto free_of_getversion;
×
643
        }
644
        tools_cJSON *versionObj = tools_cJSON_GetArrayItem(dataObj, 0);
18✔
645
        tools_cJSON *versionStrObj = tools_cJSON_GetArrayItem(versionObj, 0);
18✔
646
        server_ver = atoi(versionStrObj->valuestring);
18✔
647
        char* pstr = tools_cJSON_Print(versionStrObj);        
18✔
648
        debugPrint("versionStrObj: %s, version: %s, server_ver: %d\n",
18✔
649
                   pstr ? pstr : "null",
650
                   versionStrObj->valuestring, server_ver);
651
        tmfree(pstr);
652
        tools_cJSON_Delete(resObj);
18✔
653
    }
654
free_of_getversion:
×
655
    free(responseBuf);
18✔
656
    return server_ver;
18✔
657
}
658

659
int getServerVersionRest(int16_t rest_port) {
18✔
660
    int sockfd = createSockFd();
18✔
661
    if (sockfd < 0) {
18✔
662
        return -1;
×
663
    }
664

665
    int server_version = getServerVersionRestImpl(rest_port, sockfd);
18✔
666

667
    destroySockFd(sockfd);
18✔
668
    return server_version;
18✔
669
}
670

671
static int getCodeFromResp(char *responseBuf) {
2,391✔
672
    int code = -1;
2,391✔
673
    char* start = strstr(responseBuf, "{");
2,391✔
674
    if (start == NULL) {
2,391✔
675
        errorPrint("Invalid response format: %s\n", responseBuf);
×
676
        return -1;
×
677
    }
678
    tools_cJSON* resObj = tools_cJSON_Parse(start);
2,391✔
679
    if (resObj == NULL) {
2,391✔
680
        errorPrint("Cannot parse response into json: %s\n", start);
×
681
        return -1;
×
682
    }
683
    tools_cJSON* codeObj = tools_cJSON_GetObjectItem(resObj, "code");
2,391✔
684
    if (!tools_cJSON_IsNumber(codeObj)) {
2,388✔
685
        char* pstr = tools_cJSON_Print(resObj);
×
686
        errorPrint("Invalid or miss 'code' key in json: %s\n", pstr ? pstr : "null");
×
687
        tmfree(pstr);
688
        tools_cJSON_Delete(resObj);
×
689
        return -1;
×
690
    }
691

692
    code = codeObj->valueint;
2,388✔
693

694
    if (codeObj->valueint != 0) {
2,388✔
695
        tools_cJSON* desc = tools_cJSON_GetObjectItem(resObj, "desc");
×
696
        if (!tools_cJSON_IsString(desc)) {
×
697
            char* pstr = tools_cJSON_Print(resObj);
×
698
            errorPrint("Invalid or miss 'desc' key in json: %s\n", pstr ? pstr : "null");
×
699
            tmfree(pstr);
700
            return -1;
×
701
        }
702
        errorPrint("response, code: %d, reason: %s\n",
×
703
                   (int)codeObj->valueint, desc->valuestring);
704
    }
705

706
    tools_cJSON_Delete(resObj);
2,388✔
707
    return code;
2,391✔
708
}
709

710
int postProceSql(char *sqlstr, char* dbName, int precision, int iface,
2,551✔
711
                 int protocol, uint16_t rest_port,
712
                 bool tcp, int sockfd, char* filePath) {
713
    uint64_t response_length;
714
    if (g_arguments->test_mode == INSERT_TEST) {
2,551✔
715
        response_length = RESP_BUF_LEN;
316✔
716
    } else {
717
        response_length = g_queryInfo.response_buffer;
2,235✔
718
    }
719

720
    char *responseBuf = benchCalloc(1, response_length, false);
2,551✔
721
    int code = postProceSqlImpl(sqlstr, dbName, precision, iface, protocol,
2,551✔
722
                                rest_port,
723
                                tcp, sockfd, filePath, responseBuf,
724
                                response_length);
725
    // compatibility 2.6
726
    if (-1 == g_arguments->rest_server_ver_major) {
2,552✔
727
        // confirm version is 2.x according to "succ"
728
        if (NULL != strstr(responseBuf, succMessage) && iface == REST_IFACE) {
2,236✔
729
            g_arguments->rest_server_ver_major = 2;
×
730
        }
731
    }
732

733
    if (NULL != strstr(responseBuf, resHttpOk) && iface == REST_IFACE) {
2,552✔
734
        // if taosd is not starting , rest_server_ver_major can't be got by 'select server_version()' , so is -1
735
        if (-1 == g_arguments->rest_server_ver_major || 3 <= g_arguments->rest_server_ver_major) {
2,387✔
736
            code = getCodeFromResp(responseBuf);
2,387✔
737
        } else {
738
            code = 0;
×
739
        }
740
        goto free_of_post;
2,390✔
741
    }
742

743
    if (2 == g_arguments->rest_server_ver_major) {
165✔
744
        if (NULL != strstr(responseBuf, succMessage) && iface == REST_IFACE) {
×
745
            code = getCodeFromResp(responseBuf);
×
746
        } else {
747
            code = 0;
×
748
        }
749
        goto free_of_post;
×
750
    }
751

752
    if (NULL != strstr(responseBuf, influxHttpOk) &&
165✔
753
            protocol == TSDB_SML_LINE_PROTOCOL && iface == SML_REST_IFACE) {
50✔
754
        code = 0;
50✔
755
        goto free_of_post;
50✔
756
    }
757

758
    if (NULL != strstr(responseBuf, opentsdbHttpOk)
115✔
759
            && (protocol == TSDB_SML_TELNET_PROTOCOL
×
760
            || protocol == TSDB_SML_JSON_PROTOCOL
×
761
            || protocol == SML_JSON_TAOS_FORMAT)
×
762
            && iface == SML_REST_IFACE) {
×
763
        code = 0;
×
764
        goto free_of_post;
×
765
    }
766

767
    if (g_arguments->test_mode == INSERT_TEST) {
115✔
768
        debugPrint("Response: \n%s\n", responseBuf);
110✔
769
        char* start = strstr(responseBuf, "{");
110✔
770
        if ((start == NULL)
110✔
771
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
110✔
772
                && (TSDB_SML_JSON_PROTOCOL != protocol)
46✔
773
                && (SML_JSON_TAOS_FORMAT != protocol)
1✔
774
                ) {
775
            errorPrint("Invalid response format: %s\n", responseBuf);
×
776
            goto free_of_post;
×
777
        }
778
        tools_cJSON* resObj = tools_cJSON_Parse(start);
110✔
779
        if ((resObj == NULL)
111✔
780
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
111✔
781
                && (TSDB_SML_JSON_PROTOCOL != protocol)
46✔
782
                && (SML_JSON_TAOS_FORMAT != protocol)
1✔
783
                ) {
784
            errorPrint("Cannot parse response into json: %s\n", start);
×
785
        }
786
        tools_cJSON* codeObj = tools_cJSON_GetObjectItem(resObj, "code");
111✔
787
        if ((!tools_cJSON_IsNumber(codeObj))
110✔
788
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
111✔
789
                && (TSDB_SML_JSON_PROTOCOL != protocol)
46✔
790
                && (SML_JSON_TAOS_FORMAT != protocol)
1✔
791
                ) {
792
            char* pstr = tools_cJSON_Print(resObj);
×
793
            errorPrint("Invalid or miss 'code' key in json: %s\n", pstr ? pstr : "null");
×
794
            tmfree(pstr);
795
            tools_cJSON_Delete(resObj);
×
796
            goto free_of_post;
×
797
        }
798

799
        if ((SML_REST_IFACE == iface) && codeObj
110✔
800
                && (200 == codeObj->valueint)) {
×
801
            code = 0;
×
802
            tools_cJSON_Delete(resObj);
×
803
            goto free_of_post;
×
804
        }
805

806
        if ((iface == SML_REST_IFACE)
110✔
807
                && (protocol == TSDB_SML_LINE_PROTOCOL)
109✔
808
                && codeObj
×
809
                && (codeObj->valueint != 0) && (codeObj->valueint != 200)) {
×
810
            tools_cJSON* desc = tools_cJSON_GetObjectItem(resObj, "desc");
×
811
            if (!tools_cJSON_IsString(desc)) {
×
812
                char* pstr = tools_cJSON_Print(resObj);
×
813
                errorPrint("Invalid or miss 'desc' key in json: %s\n", pstr ? pstr : "null");
×
814
                tmfree(pstr);
815
            } else {
816
                errorPrint("insert mode response, code: %d, reason: %s\n",
×
817
                       (int)codeObj->valueint, desc->valuestring);
818
            }
819
        } else {
820
            code = 0;
110✔
821
        }
822
        tools_cJSON_Delete(resObj);
110✔
823
    }
824
free_of_post:
5✔
825
    free(responseBuf);
2,550✔
826
    return code;
2,550✔
827
}
828

829
// fetch result fo file or nothing
830
int64_t fetchResult(TAOS_RES *res, char * filePath) {
9,068✔
831
    TAOS_ROW    row        = NULL;
9,068✔
832
    int         num_fields = 0;
9,068✔
833
    int64_t     totalLen   = 0;
9,068✔
834
    TAOS_FIELD *fields     = 0;
9,068✔
835
    int64_t     rows       = 0;
9,068✔
836
    char       *databuf    = NULL;
9,068✔
837
    bool        toFile     = strlen(filePath) > 0;
9,068✔
838
    
839

840
    if(toFile) {
9,068✔
841
        num_fields = taos_field_count(res);
8,502✔
842
        fields     = taos_fetch_fields(res);
8,502✔
843
        databuf    = (char *)benchCalloc(1, FETCH_BUFFER_SIZE, true);
8,502✔
844
    }
845

846
    // fetch the records row by row
847
    while ((row = taos_fetch_row(res))) {
24,338✔
848
        if (toFile) {
15,270✔
849
            if (totalLen >= (FETCH_BUFFER_SIZE - HEAD_BUFF_LEN * 2)) {
14,442✔
850
                // buff is full
NEW
851
                appendResultBufToFile(databuf, filePath);
×
852
                totalLen = 0;
×
853
                memset(databuf, 0, FETCH_BUFFER_SIZE);
×
854
            }
855

856
            // format row
857
            char temp[HEAD_BUFF_LEN] = {0};
14,442✔
858
            int  len = taos_print_row(temp, row, fields, num_fields);
14,442✔
859
            len += snprintf(temp + len, HEAD_BUFF_LEN - len, "\n");
14,442✔
860
            //debugPrint("query result:%s\n", temp);
861
            memcpy(databuf + totalLen, temp, len);
14,442✔
862
            totalLen += len;
14,442✔
863
        }
864
        rows ++;
15,270✔
865
        //if not toFile , only loop call taos_fetch_row
866
    }
867

868
    // end
869
    if (toFile) {
9,072✔
870
        appendResultBufToFile(databuf, filePath);
8,498✔
871
        free(databuf);
8,501✔
872
    }
873
    return rows;
9,075✔
874
}
875

876
char *convertDatatypeToString(int type) {
2,407✔
877
    switch (type) {
2,407✔
878
        case TSDB_DATA_TYPE_BINARY:
416✔
879
            return "binary";
416✔
880
        case TSDB_DATA_TYPE_NCHAR:
42✔
881
            return "nchar";
42✔
882
        case TSDB_DATA_TYPE_TIMESTAMP:
41✔
883
            return "timestamp";
41✔
884
        case TSDB_DATA_TYPE_TINYINT:
351✔
885
            return "tinyint";
351✔
886
        case TSDB_DATA_TYPE_UTINYINT:
43✔
887
            return "tinyint unsigned";
43✔
888
        case TSDB_DATA_TYPE_SMALLINT:
276✔
889
            return "smallint";
276✔
890
        case TSDB_DATA_TYPE_USMALLINT:
43✔
891
            return "smallint unsigned";
43✔
892
        case TSDB_DATA_TYPE_INT:
452✔
893
            return "int";
452✔
894
        case TSDB_DATA_TYPE_UINT:
54✔
895
            return "int unsigned";
54✔
896
        case TSDB_DATA_TYPE_BIGINT:
276✔
897
            return "bigint";
276✔
898
        case TSDB_DATA_TYPE_UBIGINT:
43✔
899
            return "bigint unsigned";
43✔
900
        case TSDB_DATA_TYPE_BOOL:
46✔
901
            return "bool";
46✔
902
        case TSDB_DATA_TYPE_FLOAT:
155✔
903
            return "float";
155✔
904
        case TSDB_DATA_TYPE_DOUBLE:
169✔
905
            return "double";
169✔
906
        case TSDB_DATA_TYPE_JSON:
×
907
            return "json";    
×
908
        case TSDB_DATA_TYPE_VARBINARY:
×
909
            return "varbinary";
×
910
        case TSDB_DATA_TYPE_GEOMETRY:
×
911
            return "geometry";
×
912
        default:
×
913
            break;
×
914
    }
915
    return "unknown type";
×
916
}
917

918
int convertTypeToLength(uint8_t type) {
1,411✔
919
    int ret = 0;
1,411✔
920
    switch (type) {
1,411✔
921
        case TSDB_DATA_TYPE_TIMESTAMP:
239✔
922
        case TSDB_DATA_TYPE_UBIGINT:
923
        case TSDB_DATA_TYPE_BIGINT:
924
            ret = sizeof(int64_t);
239✔
925
            break;
239✔
926
        case TSDB_DATA_TYPE_BOOL:
367✔
927
        case TSDB_DATA_TYPE_TINYINT:
928
        case TSDB_DATA_TYPE_UTINYINT:
929
            ret = sizeof(int8_t);
367✔
930
            break;
367✔
931
        case TSDB_DATA_TYPE_SMALLINT:
194✔
932
        case TSDB_DATA_TYPE_USMALLINT:
933
            ret = sizeof(int16_t);
194✔
934
            break;
194✔
935
        case TSDB_DATA_TYPE_INT:
296✔
936
        case TSDB_DATA_TYPE_UINT:
937
            ret = sizeof(int32_t);
296✔
938
            break;
296✔
939
        case TSDB_DATA_TYPE_FLOAT:
144✔
940
            ret = sizeof(float);
144✔
941
            break;
144✔
942
        case TSDB_DATA_TYPE_DOUBLE:
142✔
943
            ret = sizeof(double);
142✔
944
            break;
142✔
945
        case TSDB_DATA_TYPE_JSON:
1✔
946
            ret = JSON_FIXED_LENGTH;
1✔
947
            break;
1✔
948
        default:
28✔
949
            break;
28✔
950
    }
951
    return ret;
1,411✔
952
}
953

954
int64_t convertDatatypeToDefaultMin(uint8_t type) {
1,596✔
955
    int64_t ret = 0;
1,596✔
956
    switch (type) {
1,596✔
957
        case TSDB_DATA_TYPE_BOOL:
110✔
958
        case TSDB_DATA_TYPE_GEOMETRY:
959
            ret = 0;
110✔
960
            break;
110✔
961
        case TSDB_DATA_TYPE_TINYINT:
148✔
962
            ret = -127;
148✔
963
            break;
148✔
964
        case TSDB_DATA_TYPE_SMALLINT:
102✔
965
            ret = -32767;
102✔
966
            break;
102✔
967
        case TSDB_DATA_TYPE_INT:
550✔
968
        case TSDB_DATA_TYPE_BIGINT:
969
        case TSDB_DATA_TYPE_FLOAT:
970
        case TSDB_DATA_TYPE_DOUBLE:
971
            ret = -1 * (RAND_MAX >> 1);
550✔
972
            break;
550✔
973
        default:
686✔
974
            break;
686✔
975
    }
976
    return ret;
1,596✔
977
}
978

979
int64_t convertDatatypeToDefaultMax(uint8_t type) {
1,603✔
980
    int64_t ret = 0;
1,603✔
981
    switch (type) {
1,603✔
982
        case TSDB_DATA_TYPE_BOOL:
110✔
983
            ret = 1;
110✔
984
            break;
110✔
985
        case TSDB_DATA_TYPE_TINYINT:
148✔
986
            ret = 128;
148✔
987
            break;
148✔
988
        case TSDB_DATA_TYPE_UTINYINT:
71✔
989
            ret = 254;
71✔
990
            break;
71✔
991
        case TSDB_DATA_TYPE_SMALLINT:
102✔
992
        case TSDB_DATA_TYPE_GEOMETRY:
993
            ret = 32767;
102✔
994
            break;
102✔
995
        case TSDB_DATA_TYPE_USMALLINT:
70✔
996
            ret = 65534;
70✔
997
            break;
70✔
998
        case TSDB_DATA_TYPE_INT:
557✔
999
        case TSDB_DATA_TYPE_BIGINT:
1000
        case TSDB_DATA_TYPE_FLOAT:
1001
        case TSDB_DATA_TYPE_DOUBLE:
1002
            ret = RAND_MAX >> 1;
557✔
1003
            break;
557✔
1004
        case TSDB_DATA_TYPE_UINT:
188✔
1005
        case TSDB_DATA_TYPE_UBIGINT:
1006
        case TSDB_DATA_TYPE_TIMESTAMP:
1007
            ret = RAND_MAX;
188✔
1008
            break;
188✔
1009
        default:
357✔
1010
            break;
357✔
1011
    }
1012
    return ret;
1,603✔
1013
}
1014

1015
// compare str with length
1016
int32_t strCompareN(char *str1, char *str2, int length) {
13,169✔
1017
    if (length == 0) {
13,169✔
1018
        return strcasecmp(str1, str2);
13,091✔
1019
    } else {
1020
        return strncasecmp(str1, str2, length);
78✔
1021
    }
1022
}
1023

1024
int convertStringToDatatype(char *type, int length) {
1,737✔
1025
    // compare with length
1026
    if (0 == strCompareN(type, "binary", length)) {
1,737✔
1027
        return TSDB_DATA_TYPE_BINARY;
208✔
1028
    } else if (0 == strCompareN(type, "nchar", length)) {
1,529✔
1029
        return TSDB_DATA_TYPE_NCHAR;
110✔
1030
    } else if (0 == strCompareN(type, "timestamp", length)) {
1,419✔
1031
        return TSDB_DATA_TYPE_TIMESTAMP;
45✔
1032
    } else if (0 == strCompareN(type, "bool", length)) {
1,374✔
1033
        return TSDB_DATA_TYPE_BOOL;
116✔
1034
    } else if (0 == strCompareN(type, "tinyint", length)) {
1,258✔
1035
        return TSDB_DATA_TYPE_TINYINT;
170✔
1036
    } else if (0 == strCompareN(type, "utinyint", length)) {
1,088✔
1037
        return TSDB_DATA_TYPE_UTINYINT;
81✔
1038
    } else if (0 == strCompareN(type, "smallint", length)) {
1,007✔
1039
        return TSDB_DATA_TYPE_SMALLINT;
113✔
1040
    } else if (0 == strCompareN(type, "usmallint", length)) {
894✔
1041
        return TSDB_DATA_TYPE_USMALLINT;
81✔
1042
    } else if (0 == strCompareN(type, "int", length)) {
813✔
1043
        return TSDB_DATA_TYPE_INT;
218✔
1044
    } else if (0 == strCompareN(type, "uint", length)) {
595✔
1045
        return TSDB_DATA_TYPE_UINT;
84✔
1046
    } else if (0 == strCompareN(type, "bigint", length)) {
511✔
1047
        return TSDB_DATA_TYPE_BIGINT;
113✔
1048
    } else if (0 == strCompareN(type, "ubigint", length)) {
398✔
1049
        return TSDB_DATA_TYPE_UBIGINT;
81✔
1050
    } else if (0 == strCompareN(type, "float", length)) {
317✔
1051
        return TSDB_DATA_TYPE_FLOAT;
146✔
1052
    } else if (0 == strCompareN(type, "double", length)) {
171✔
1053
        return TSDB_DATA_TYPE_DOUBLE;
142✔
1054
    } else if (0 == strCompareN(type, "json", length)) {
29✔
1055
        return TSDB_DATA_TYPE_JSON;
2✔
1056
    } else if (0 == strCompareN(type, "varchar", length)) {
27✔
1057
        return TSDB_DATA_TYPE_BINARY;
26✔
1058
    } else if (0 == strCompareN(type, "varbinary", length)) {
1✔
1059
        return TSDB_DATA_TYPE_VARBINARY;
×
1060
    } else if (0 == strCompareN(type, "geometry", length)) {
1✔
1061
        return TSDB_DATA_TYPE_GEOMETRY;
×
1062
    } else {
1063
        errorPrint("unknown data type: %s\n", type);
1✔
1064
        exit(EXIT_FAILURE);
1✔
1065
    }
1066
}
1067

1068

1069
int compare(const void *a, const void *b) {
645,747✔
1070
    return *(int64_t *)a - *(int64_t *)b;
645,747✔
1071
}
1072

1073
//
1074
// --------------------  BArray operator -------------------
1075
//
1076

1077
BArray* benchArrayInit(size_t size, size_t elemSize) {
2,727✔
1078
    assert(elemSize > 0);
2,727✔
1079

1080
    if (size < BARRAY_MIN_SIZE) {
2,727✔
1081
        size = BARRAY_MIN_SIZE;
2,679✔
1082
    }
1083

1084
    BArray* pArray = (BArray *)benchCalloc(1, sizeof(BArray), true);
2,727✔
1085

1086
    pArray->size = 0;
2,727✔
1087
    pArray->pData = benchCalloc(size, elemSize, true);
2,727✔
1088

1089
    pArray->capacity = size;
2,727✔
1090
    pArray->elemSize = elemSize;
2,727✔
1091
    return pArray;
2,727✔
1092
}
1093

1094
static int32_t benchArrayEnsureCap(BArray* pArray, size_t newCap) {
63,896✔
1095
    if (newCap > pArray->capacity) {
63,896✔
1096
        size_t tsize = (pArray->capacity << 1u);
852✔
1097
        while (newCap > tsize) {
942✔
1098
            tsize = (tsize << 1u);
90✔
1099
        }
1100

1101
        void* pData = realloc(pArray->pData, tsize * pArray->elemSize);
852✔
1102
        if (pData == NULL) {
852✔
1103
            return -1;
×
1104
        }
1105
        pArray->pData = pData;
852✔
1106
        pArray->capacity = tsize;
852✔
1107
    }
1108
    return 0;
63,896✔
1109
}
1110

1111
void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems, bool free) {
63,893✔
1112
    if (pData == NULL || elems <=0) {
63,893✔
1113
        return NULL;
30✔
1114
    }
1115

1116
    if (benchArrayEnsureCap(pArray, pArray->size + elems) != 0) {
63,863✔
1117
        return NULL;
×
1118
    }
1119

1120
    void* dst = BARRAY_GET_ELEM(pArray, pArray->size);
63,900✔
1121
    memcpy(dst, pData, pArray->elemSize * elems);
63,900✔
1122
    if (free) {
63,900✔
1123
        tmfree(pData); // TODO remove this
1124
    }
1125
    pArray->size += elems;
63,900✔
1126
    return dst;
63,900✔
1127
}
1128

1129
FORCE_INLINE void* benchArrayPush(BArray* pArray, void* pData) {
52,399✔
1130
    return benchArrayAddBatch(pArray, pData, 1, true);
52,399✔
1131
}
1132

1133
FORCE_INLINE void* benchArrayPushNoFree(BArray* pArray, void* pData) {
10,783✔
1134
    return benchArrayAddBatch(pArray, pData, 1, false);
10,783✔
1135
}
1136

1137

1138
void* benchArrayDestroy(BArray* pArray) {
2,663✔
1139
    if (pArray) {
2,663✔
1140
        tmfree(pArray->pData);
2,009✔
1141
        tmfree(pArray);
1142
    }
1143
    return NULL;
2,663✔
1144
}
1145

1146
void benchArrayClear(BArray* pArray) {
345✔
1147
    if (pArray == NULL) return;
345✔
1148
    pArray->size = 0;
345✔
1149
}
1150

1151
void* benchArrayGet(const BArray* pArray, size_t index) {
35,801,714✔
1152
    if (index >= pArray->size) {
35,801,714✔
1153
        errorPrint("benchArrayGet index(%zu) greater than BArray size(%zu)\n",
×
1154
                   index, pArray->size);
1155
        exit(EXIT_FAILURE);
×
1156
    }
1157
    return BARRAY_GET_ELEM(pArray, index);
35,801,714✔
1158
}
1159

1160
bool searchBArray(BArray *pArray, const char *field_name, int32_t name_len, uint8_t field_type) {
7✔
1161
    if (pArray == NULL || field_name == NULL) {
7✔
1162
        return false;
×
1163
    }
1164
    for (int i = 0; i < pArray->size; i++) {
11✔
1165
        Field *field = benchArrayGet(pArray, i);
11✔
1166
        if (strlen(field->name) == name_len && strncasecmp(field->name, field_name, name_len) == 0) {
11✔
1167
            if (field->type == field_type) {
7✔
1168
                return true;
7✔
1169
            }
1170
            return false;
×
1171
        }
1172
    }
1173
    return false;
×
1174
}
1175

1176
//
1177
// malloc a new and copy data from array
1178
// return value must call benchArrayDestroy to free
1179
//
1180
BArray * copyBArray(BArray *pArray) {
75✔
1181
    BArray * pNew = benchArrayInit(pArray->size, pArray->elemSize);
75✔
1182
    benchArrayAddBatch(pNew, pArray->pData, pArray->size, false);
75✔
1183
    return pNew;
75✔
1184
}
1185

1186
//
1187
//  ---------------- others ------------------------
1188
//
1189

1190
#ifdef LINUX
1191
int32_t bsem_wait(sem_t* sem) {
208✔
1192
    int ret = 0;
208✔
1193
    do {
1194
        ret = sem_wait(sem);
208✔
1195
    } while (ret != 0 && errno  == EINTR);
1✔
1196
    return ret;
1✔
1197
}
1198

1199
void benchSetSignal(int32_t signum, ToolsSignalHandler sigfp) {
208✔
1200
    struct sigaction act;
1201
    memset(&act, 0, sizeof(act));
208✔
1202
    act.sa_flags = SA_SIGINFO | SA_RESTART;
208✔
1203
    act.sa_sigaction = (void (*)(int, siginfo_t *, void *)) sigfp;
208✔
1204
    sigaction(signum, &act, NULL);
208✔
1205
}
208✔
1206
#endif
1207

1208
int convertServAddr(int iface, bool tcp, int protocol) {
270✔
1209
    if (tcp
270✔
1210
            && iface == SML_REST_IFACE
3✔
1211
            && protocol == TSDB_SML_TELNET_PROTOCOL) {
3✔
1212
        // telnet_tcp_port        
1213
        if (convertHostToServAddr(g_arguments->host,
3✔
1214
                    g_arguments->telnet_tcp_port,
3✔
1215
                    &(g_arguments->serv_addr))) {
3✔
1216
            errorPrint("%s\n", "convert host to server address");
×
1217
            return -1;
×
1218
        }
1219
        infoPrint("convertServAddr host=%s telnet_tcp_port:%d to serv_addr=%p iface=%d \n", 
3✔
1220
                g_arguments->host, g_arguments->telnet_tcp_port, &g_arguments->serv_addr, iface);
1221
    } else {
1222
        int port = g_arguments->port_inputted ? g_arguments->port:DEFAULT_REST_PORT;
267✔
1223
        if (convertHostToServAddr(g_arguments->host,
267✔
1224
                                    port,
1225
                    &(g_arguments->serv_addr))) {
267✔
1226
            errorPrint("%s\n", "convert host to server address");
×
1227
            return -1;
×
1228
        }
1229
        infoPrint("convertServAddr host=%s port:%d to serv_addr=%p iface=%d \n", 
267✔
1230
                g_arguments->host, port, &g_arguments->serv_addr, iface);
1231
    }
1232
    return 0;
270✔
1233
}
1234

1235
static void errorPrintSocketMsg(char *msg, int result) {
×
1236
#ifdef WINDOWS
1237
    errorPrint("%s: %d\n", msg, WSAGetLastError());
1238
#else
1239
    errorPrint("%s: %d\n", msg, result);
×
1240
#endif
1241
}
×
1242

1243
int createSockFd() {
129✔
1244
#ifdef WINDOWS
1245
    WSADATA wsaData;
1246
    WSAStartup(MAKEWORD(2, 2), &wsaData);
1247
    SOCKET sockfd;
1248
#else
1249
    int sockfd;
1250
#endif
1251
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
129✔
1252
    if (sockfd < 0) {
129✔
1253
        errorPrintSocketMsg("Could not create socket : ", sockfd);
×
1254
        return -1;
×
1255
    }
1256

1257
    int retConn = connect(
129✔
1258
            sockfd, (struct sockaddr *)&(g_arguments->serv_addr),
129✔
1259
            sizeof(struct sockaddr));
1260
    infoPrint("createSockFd call connect serv_addr=%p retConn=%d\n", &g_arguments->serv_addr, retConn);
129✔
1261
    if (retConn < 0) {
129✔
1262
        errorPrint("%s\n", "failed to connect");
×
1263
#ifdef WINDOWS
1264
        closesocket(sockfd);
1265
        WSACleanup();
1266
#else
1267
        close(sockfd);
×
1268
#endif
1269
        return -1;
×
1270
    }
1271
    return sockfd;
129✔
1272
}
1273

1274
static void closeSockFd(int sockfd) {
42✔
1275
#ifdef WINDOWS
1276
    closesocket(sockfd);
1277
    WSACleanup();
1278
#else
1279
    close(sockfd);
42✔
1280
#endif
1281
}
42✔
1282

1283
void destroySockFd(int sockfd) {
42✔
1284
    // check valid
1285
    if (sockfd < 0) {
42✔
1286
        return;
×
1287
    }
1288

1289
    // shutdown the connection since no more data will be sent
1290
    int result;
1291
    result = shutdown(sockfd, SHUT_WR);
42✔
1292
    if (SOCKET_ERROR == result) {
42✔
1293
        errorPrintSocketMsg("Socket shutdown failed with error: ", result);
×
1294
        closeSockFd(sockfd);
×
1295
        return;
×
1296
    }
1297
    // Receive until the peer closes the connection
1298
    do {
1299
        int recvbuflen = LARGE_BUFF_LEN;
80✔
1300
        char recvbuf[LARGE_BUFF_LEN];
1301
        result = recv(sockfd, recvbuf, recvbuflen, 0);
80✔
1302
        if ( result > 0 ) {
80✔
1303
            debugPrint("Socket bytes received: %d\n", result);
38✔
1304
        } else if (result == 0) {
42✔
1305
            infoPrint("Connection closed with result %d\n", result);
42✔
1306
        } else {
1307
            errorPrintSocketMsg("Socket recv failed with error: ", result);
×
1308
        }
1309
    } while (result > 0);
80✔
1310

1311
    closeSockFd(sockfd);
42✔
1312
}
1313

1314
FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) {    
3✔
1315
    char buff[512];
1316
    char *msg = cmd;
16✔
1317
    if (strlen(cmd) >= sizeof(buff)) {
16✔
1318
        memcpy(buff, cmd, 500);
8✔
1319
        buff[500] = 0;
8✔
1320
        strcat(buff, "...");
8✔
1321
        msg = buff;
8✔
1322
    }
1323
    errorPrint("failed to run error code: 0x%08x, reason: %s command %s\n",
16✔
1324
               code, taos_errstr(res), msg);
1325
    taos_free_result(res);
16✔
1326
}
13✔
1327

1328
int32_t benchGetTotalMemory(int64_t *totalKB) {
×
1329
#ifdef WINDOWS
1330
  MEMORYSTATUSEX memsStat;
1331
  memsStat.dwLength = sizeof(memsStat);
1332
  if (!GlobalMemoryStatusEx(&memsStat)) {
1333
    return -1;
1334
  }
1335

1336
  *totalKB = memsStat.ullTotalPhys / 1024;
1337
  return 0;
1338
#elif defined(_TD_DARWIN_64)
1339
  *totalKB = 0;
1340
  return 0;
1341
#else
1342
  int64_t tsPageSizeKB = sysconf(_SC_PAGESIZE) / 1024;
×
1343
  *totalKB = (int64_t)(sysconf(_SC_PHYS_PAGES) * tsPageSizeKB);
×
1344
  return 0;
×
1345
#endif
1346
}
1347

1348
// geneate question mark string , using insert into ... values(?,?,?...)
1349
// return value must call tmfree to free memory
1350
char* genQMark( int32_t QCnt) {
209✔
1351
    char * buf = benchCalloc(4, QCnt, false);
209✔
1352
    for (int32_t i = 0; i < QCnt; i++) {
1,810✔
1353
        if (i == 0)
1,601✔
1354
            strcat(buf, "?");
200✔
1355
        else
1356
            strcat(buf, ",?");
1,401✔
1357
    }
1358
    return buf;
209✔
1359
}
1360

1361
//
1362
//  STMT2  
1363
//
1364

1365
// create
1366
TAOS_STMT2_BINDV* createBindV(int32_t capacity, int32_t tagCnt, int32_t colCnt) {
133✔
1367
    // calc total size
1368
    int32_t tableSize = sizeof(char *) + sizeof(TAOS_STMT2_BIND *) + sizeof(TAOS_STMT2_BIND *) + 
133✔
1369
                        sizeof(TAOS_STMT2_BIND) * tagCnt + sizeof(TAOS_STMT2_BIND) * colCnt;
133✔
1370
    int32_t size = sizeof(TAOS_STMT2_BINDV) + tableSize * capacity;
133✔
1371
    TAOS_STMT2_BINDV *bindv = benchCalloc(1, size, false);
133✔
1372
    resetBindV(bindv, capacity, tagCnt, colCnt);
133✔
1373

1374
    return bindv;
131✔
1375
}
1376

1377
// reset tags and cols poitner
1378
void resetBindV(TAOS_STMT2_BINDV *bindv, int32_t capacity, int32_t tagCnt, int32_t colCnt) {
133✔
1379
    unsigned char *p = (unsigned char *)bindv;
133✔
1380
    // tbnames
1381
    p += sizeof(TAOS_STMT2_BINDV); // skip BINDV
133✔
1382
    bindv->tbnames = (char **)p;
133✔
1383
    // tags
1384
    if(tagCnt == 0 ) {
133✔
1385
        bindv->tags = NULL;
133✔
1386
    } else {
1387
        p += sizeof(char *) * capacity; // skip tbnames
×
1388
        bindv->tags = (TAOS_STMT2_BIND **)p;
×
1389
    }
1390
    // bind_cols
1391
    p += sizeof(TAOS_STMT2_BIND *) * capacity; // skip tags
133✔
1392
    bindv->bind_cols = (TAOS_STMT2_BIND **)p;
133✔
1393
    p += sizeof(TAOS_STMT2_BIND *) * capacity; // skip cols
133✔
1394

1395
    int32_t i;
1396
    // tags body
1397
    if (tagCnt > 0) {
133✔
1398
        for (i = 0; i < capacity; i++) {
×
1399
            bindv->tags[i] = (TAOS_STMT2_BIND *)p;
×
1400
            p += sizeof(TAOS_STMT2_BIND) * tagCnt; // skip tag bodys
×
1401
        }
1402
    }
1403
    // bind_cols body
1404
    for (i = 0; i < capacity; i++) {
265✔
1405
        bindv->bind_cols[i] = (TAOS_STMT2_BIND*)p;
132✔
1406
        p += sizeof(TAOS_STMT2_BIND) * colCnt; // skip cols bodys
132✔
1407
    }
1408
}
133✔
1409

1410
// clear bindv
1411
void clearBindV(TAOS_STMT2_BINDV *bindv) {
×
1412
    if (bindv == NULL)
×
1413
        return ;
×
1414
    for(int32_t i = 0; i < bindv->count; i++) {
×
1415
        bindv->tags[i]      = NULL;
×
1416
        bindv->bind_cols[i] = NULL;
×
1417
    }
1418
    bindv->count = 0;
×
1419
}
1420

1421
// free
1422
void freeBindV(TAOS_STMT2_BINDV *bindv) {
201✔
1423
    tmfree(bindv);
1424
}
201✔
1425

1426
//
1427
//   debug show 
1428
//
1429

1430
void showBind(TAOS_STMT2_BIND* bind) {
×
1431
    // loop each column
1432
    int32_t pos = 0;
×
1433
    char* buff  = bind->buffer;
×
1434
    for(int32_t n=0; n<bind->num; n++) {
×
1435
        switch (bind->buffer_type) {
×
1436
        case TSDB_DATA_TYPE_TIMESTAMP:
×
1437
            debugPrint("   n=%d value=%" PRId64 "\n", n, *(int64_t *)(buff + pos));
×
1438
            pos += sizeof(int64_t);
×
1439
            break;
×
1440
        case TSDB_DATA_TYPE_FLOAT:
×
1441
            debugPrint("   n=%d value=%f\n", n, *(float *)(buff + pos));
×
1442
            pos += sizeof(float);
×
1443
            break;
×
1444
        case TSDB_DATA_TYPE_INT:
×
1445
            debugPrint("   n=%d value=%d\n", n, *(int32_t *)(buff + pos));
×
1446
            pos += sizeof(int32_t);
×
1447
            break;
×
1448
        default:
×
1449
            break;
×
1450
        } 
1451
    }
1452

1453
}
×
1454

1455
void showTableBinds(char* label, TAOS_STMT2_BIND* binds, int32_t cnt) {
×
1456
    for (int32_t j = 0; j < cnt; j++) {
×
1457
        if(binds == NULL) {
×
1458
            debugPrint("  %d %s is NULL \n", j, label);
×
1459
        } else {
1460
            debugPrint("  %d %s type=%d buffer=%p \n", j, label, binds[j].buffer_type, binds[j].buffer);
×
1461
            showBind(&binds[j]);
×
1462
        }
1463
    }
1464
}
×
1465

1466
// show bindv
1467
void showBindV(TAOS_STMT2_BINDV *bindv, BArray *tags, BArray *cols) {
×
1468
    // num and base info
1469
    debugPrint("show bindv table count=%d names=%p tags=%p bind_cols=%p\n", 
×
1470
                bindv->count, bindv->tbnames, bindv->tags, bindv->bind_cols);
1471
    
1472
    for(int32_t i=0; i< bindv->count; i++) {
×
1473
        debugPrint(" show bindv table index=%d name=%s \n", i, bindv->tbnames[i]);
×
1474
        if(bindv->tags)
×
1475
            showTableBinds("tag",    bindv->tags[i],      tags->size);
×
1476
        if(bindv->bind_cols)    
×
1477
            showTableBinds("column", bindv->bind_cols[i], cols->size + 1);
×
1478
    }
1479
}
×
1480

1481
// engine util/src/thashutil.c
1482
uint32_t MurmurHash3_32(const char *key, uint32_t len);
1483
// get group index about dbname.tbname
1484
int32_t calcGroupIndex(char* dbName, char* tbName, int32_t groupCnt) {
294✔
1485
    // check valid
1486
    if (dbName == NULL || tbName == NULL) {
294✔
1487
        return -1;
×
1488
    }
1489
    char key[1024];
1490
    snprintf(key, sizeof(key), "1.%s.%s", dbName, tbName);
294✔
1491
    uint32_t hash = MurmurHash3_32(key, strlen(key));
294✔
1492
    uint32_t step = UINT32_MAX / groupCnt;
294✔
1493
    for (int32_t i = 0; i < groupCnt; i++) {
686✔
1494
        if (hash < (i + 1) * step)
686✔
1495
        {
1496
            return i;
294✔
1497
        }
1498
    }
1499
    return groupCnt - 1;
×
1500
}
1501

1502
// windows no export MurmurHash3_32 function from engine
1503
#ifdef WINDOWS
1504
// define
1505
#define ROTL32(x, r) ((x) << (r) | (x) >> (32u - (r)))
1506
#define FMIX32(h)      \
1507
  do {                 \
1508
    (h) ^= (h) >> 16;  \
1509
    (h) *= 0x85ebca6b; \
1510
    (h) ^= (h) >> 13;  \
1511
    (h) *= 0xc2b2ae35; \
1512
    (h) ^= (h) >> 16;  \
1513
  } while (0)
1514

1515
// impl MurmurHash3_32
1516
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
1517
  const uint8_t *data = (const uint8_t *)key;
1518
  const int32_t  nblocks = len >> 2u;
1519

1520
  uint32_t h1 = 0x12345678;
1521

1522
  const uint32_t c1 = 0xcc9e2d51;
1523
  const uint32_t c2 = 0x1b873593;
1524

1525
  const uint32_t *blocks = (const uint32_t *)(data + nblocks * 4);
1526

1527
  for (int32_t i = -nblocks; i; i++) {
1528
    uint32_t k1 = blocks[i];
1529

1530
    k1 *= c1;
1531
    k1 = ROTL32(k1, 15u);
1532
    k1 *= c2;
1533

1534
    h1 ^= k1;
1535
    h1 = ROTL32(h1, 13u);
1536
    h1 = h1 * 5 + 0xe6546b64;
1537
  }
1538

1539
  const uint8_t *tail = (data + nblocks * 4);
1540

1541
  uint32_t k1 = 0;
1542

1543
  switch (len & 3u) {
1544
    case 3:
1545
      k1 ^= tail[2] << 16;
1546
    case 2:
1547
      k1 ^= tail[1] << 8;
1548
    case 1:
1549
      k1 ^= tail[0];
1550
      k1 *= c1;
1551
      k1 = ROTL32(k1, 15u);
1552
      k1 *= c2;
1553
      h1 ^= k1;
1554
  };
1555

1556
  h1 ^= len;
1557

1558
  FMIX32(h1);
1559

1560
  return h1;
1561
}
1562
#endif
1563

1564

1565
//
1566
// ---------------- benchQuery util ----------------------
1567
//
1568

1569
// init conn
1570
int32_t initQueryConn(qThreadInfo * pThreadInfo, int iface) {
175✔
1571
    // create conn
1572
    if (iface == REST_IFACE) {
175✔
1573
        int sockfd = createSockFd();
33✔
1574
        if (sockfd < 0) {
33✔
NEW
1575
            return -1;
×
1576
        }
1577
        pThreadInfo->sockfd = sockfd;
33✔
1578
    } else {
1579
        pThreadInfo->conn = initBenchConn();
142✔
1580
        if (pThreadInfo->conn == NULL) {
142✔
NEW
1581
            return -1;
×
1582
        }
1583
    }
1584

1585
    return 0;
175✔
1586
}
1587

1588
// close conn
1589
void closeQueryConn(qThreadInfo * pThreadInfo, int iface) {
175✔
1590
    if (iface == REST_IFACE) {
175✔
1591
#ifdef WINDOWS
1592
        closesocket(pThreadInfo->sockfd);
1593
        WSACleanup();
1594
#else
1595
        close(pThreadInfo->sockfd);
33✔
1596
#endif
1597
    } else {
1598
        closeBenchConn(pThreadInfo->conn);
142✔
1599
        pThreadInfo->conn = NULL;
142✔
1600
    }
1601
}
175✔
1602

1603

1604
// free g_queryInfo.specailQueryInfo memory , can re-call
1605
void freeSpecialQueryInfo() {
18✔
1606
    // can re-call
1607
    if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
18✔
NEW
1608
        return;
×
1609
    }
1610

1611
    // loop free each item memory
1612
    for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
251✔
1613
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
233✔
1614
        tmfree(sql->command);
233✔
1615
        tmfree(sql->delay_list);
233✔
1616
    }
1617

1618
    // free Array
1619
    benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
18✔
1620
    g_queryInfo.specifiedQueryInfo.sqls = NULL;
18✔
1621
}
1622

1623

1624
#define KILLID_LEN  64
1625

NEW
1626
void *queryKiller(void *arg) {
×
NEW
1627
    char host[MAX_HOSTNAME_LEN] = {0};
×
NEW
1628
    tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN);
×
1629

NEW
1630
    while (true) {
×
NEW
1631
        TAOS *taos = taos_connect(g_arguments->host, g_arguments->user,
×
NEW
1632
                g_arguments->password, NULL, g_arguments->port);
×
NEW
1633
        if (NULL == taos) {
×
NEW
1634
            errorPrint("Slow query killer thread "
×
1635
                    "failed to connect to the server %s\n",
1636
                    g_arguments->host);
NEW
1637
            return NULL;
×
1638
        }
1639

NEW
1640
        char command[TSDB_MAX_ALLOWED_SQL_LEN] =
×
1641
            "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries";
NEW
1642
        TAOS_RES *res = taos_query(taos, command);
×
NEW
1643
        int32_t code = taos_errno(res);
×
NEW
1644
        if (code) {
×
1645
            printErrCmdCodeStr(command, code, res);
1646
        }
1647

NEW
1648
        TAOS_ROW row = NULL;
×
NEW
1649
        while ((row = taos_fetch_row(res)) != NULL) {
×
NEW
1650
            int32_t *lengths = taos_fetch_lengths(res);
×
NEW
1651
            if (lengths[0] <= 0) {
×
NEW
1652
                infoPrint("No valid query found by %s\n", command);
×
1653
            } else {
NEW
1654
                int64_t execUSec = *(int64_t*)row[1];
×
1655

NEW
1656
                if (execUSec > g_queryInfo.killQueryThreshold * 1000000) {
×
NEW
1657
                    char sql[SHORT_1K_SQL_BUFF_LEN] = {0};
×
NEW
1658
                    tstrncpy(sql, (char*)row[2],
×
1659
                             min(strlen((char*)row[2])+1,
1660
                                 SHORT_1K_SQL_BUFF_LEN));
1661

NEW
1662
                    char killId[KILLID_LEN] = {0};
×
NEW
1663
                    tstrncpy(killId, (char*)row[0],
×
1664
                            min(strlen((char*)row[0])+1, KILLID_LEN));
NEW
1665
                    char killCommand[KILLID_LEN + 32] = {0};
×
NEW
1666
                    snprintf(killCommand, sizeof(killCommand), "KILL QUERY '%s'", killId);
×
NEW
1667
                    TAOS_RES *resKill = taos_query(taos, killCommand);
×
NEW
1668
                    int32_t codeKill = taos_errno(resKill);
×
NEW
1669
                    if (codeKill) {
×
1670
                        printErrCmdCodeStr(killCommand, codeKill, resKill);
1671
                    } else {
NEW
1672
                        infoPrint("%s succeed, sql: %s killed!\n",
×
1673
                                  killCommand, sql);
NEW
1674
                        taos_free_result(resKill);
×
1675
                    }
1676
                }
1677
            }
1678
        }
1679

NEW
1680
        taos_free_result(res);
×
NEW
1681
        taos_close(taos);
×
NEW
1682
        toolsMsleep(g_queryInfo.killQueryInterval*1000);
×
1683
    }
1684

1685
    return NULL;
1686
}
1687

1688
// kill show
NEW
1689
int killSlowQuery() {
×
NEW
1690
    pthread_t pidKiller = {0};
×
NEW
1691
    int32_t ret = pthread_create(&pidKiller, NULL, queryKiller, NULL);
×
NEW
1692
    if (ret != 0) {
×
NEW
1693
        errorPrint("pthread_create failed create queryKiller thread. error code =%d \n", ret);
×
NEW
1694
        return -1;
×
1695
    }
NEW
1696
    pthread_join(pidKiller, NULL);
×
NEW
1697
    toolsMsleep(1000);
×
NEW
1698
    return 0;
×
1699
}
1700

1701
// fetch super table child name from server
1702
int fetchChildTableName(char *dbName, char *stbName) {
11✔
1703
    SBenchConn* conn = initBenchConn();
11✔
1704
    if (conn == NULL) {
11✔
NEW
1705
        return -1;
×
1706
    }
1707

1708
    // get child count
1709
    char  cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
11✔
1710
    if (3 == g_majorVersionOfClient) {
11✔
1711
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
11✔
1712
                "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM `%s`.`%s`)",
1713
                dbName, stbName);
1714
    } else {
NEW
1715
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
1716
                    "SELECT COUNT(TBNAME) FROM `%s`.`%s`",
1717
                dbName, stbName);
1718
    }
1719
    TAOS_RES *res = taos_query(conn->taos, cmd);
11✔
1720
    int32_t   code = taos_errno(res);
11✔
1721
    if (code) {
11✔
1722
        printErrCmdCodeStr(cmd, code, res);
NEW
1723
        closeBenchConn(conn);
×
NEW
1724
        return -1;
×
1725
    }
1726

1727
    TAOS_ROW    row = NULL;
11✔
1728
    int         num_fields = taos_num_fields(res);
11✔
1729
    TAOS_FIELD *fields = taos_fetch_fields(res);
11✔
1730
    while ((row = taos_fetch_row(res)) != NULL) {
22✔
1731
        if (0 == strlen((char *)(row[0]))) {
11✔
NEW
1732
            errorPrint("stable %s have no child table\n", stbName);
×
NEW
1733
            taos_free_result(res);
×
NEW
1734
            closeBenchConn(conn);
×
NEW
1735
            return -1;
×
1736
        }
1737
        char temp[256] = {0};
11✔
1738
        taos_print_row(temp, row, fields, num_fields);
11✔
1739

1740
        // set child table count
1741
        g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp);
11✔
1742
    }
1743
    infoPrint("%s's childTblCount: %" PRId64 "\n", stbName, g_queryInfo.superQueryInfo.childTblCount);
11✔
1744
    taos_free_result(res);
11✔
1745

1746
    // malloc memory with child table count
1747
    g_queryInfo.superQueryInfo.childTblName =
11✔
1748
        benchCalloc(g_queryInfo.superQueryInfo.childTblCount,
11✔
1749
                sizeof(char *), false);
1750
    // fetch child table name
1751
    if (getAllChildNameOfSuperTable(
11✔
1752
                conn->taos, dbName, stbName,
1753
                g_queryInfo.superQueryInfo.childTblName,
1754
                g_queryInfo.superQueryInfo.childTblCount)) {
1755
        // faild            
NEW
1756
        tmfree(g_queryInfo.superQueryInfo.childTblName);
×
NEW
1757
        closeBenchConn(conn);
×
NEW
1758
        return -1;
×
1759
    }
1760
    closeBenchConn(conn);
11✔
1761

1762
    // succ
1763
    return 0;
11✔
1764
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc