• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 12929200663

23 Jan 2025 12:30PM UTC coverage: 74.953% (-0.3%) from 75.249%
12929200663

Pull #839

github

web-flow
Merge 5b5d1f387 into 84c50c54f
Pull Request #839: FIX mixed query mode no need calc thread

388 of 548 new or added lines in 4 files covered. (70.8%)

243 existing lines in 9 files now uncovered.

12434 of 16589 relevant lines covered (74.95%)

321013.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.31
/src/benchUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
char resEncodingChunk[] = "Encoding: chunked";
17
char succMessage[] = "succ";
18
char resHttp[] = "HTTP/1.1 ";
19
char resHttpOk[] = "HTTP/1.1 200 OK";
20
char influxHttpOk[] = "HTTP/1.1 204";
21
char opentsdbHttpOk[] = "HTTP/1.1 400";
22

23
FORCE_INLINE void* benchCalloc(size_t nmemb, size_t size, bool record) {
2,094,102✔
24
    void* ret = calloc(nmemb, size);
2,101,438✔
25
    if (NULL == ret) {
2,097,018✔
26
        errorPrint("%s", "failed to allocate memory\n");
×
27
        exit(EXIT_FAILURE);
×
28
    }
29
    if (record) {
2,101,438✔
30
        g_memoryUsage += nmemb * size;
147,705✔
31
    }
32
    return ret;
2,101,438✔
33
}
34

35
FORCE_INLINE void tmfclose(FILE *fp) {
174✔
36
    if (NULL != fp) {
487✔
37
        fclose(fp);
487✔
38
        fp = NULL;
487✔
39
    }
40
}
313✔
41

42
FORCE_INLINE void tmfree(void *buf) {
2,083,956✔
43
    if (NULL != buf) {
2,139,245✔
44
        free(buf);
2,132,784✔
45
    }
46
}
57,179✔
47

48
FORCE_INLINE bool isRest(int32_t iface) { 
670✔
49
    return REST_IFACE == iface || SML_REST_IFACE == iface;
670✔
50
}
51

52
void ERROR_EXIT(const char *msg) {
×
53
    errorPrint("%s", msg);
×
54
    exit(EXIT_FAILURE);
×
55
}
56

57
#ifdef WINDOWS
58
HANDLE g_stdoutHandle;
59
DWORD  g_consoleMode;
60

61
void setupForAnsiEscape(void) {
62
    DWORD mode = 0;
63
    g_stdoutHandle = GetStdHandle(STD_OUTPUT_HANDLE);
64

65
    if (g_stdoutHandle == INVALID_HANDLE_VALUE) {
66
        exit(GetLastError());
67
    }
68

69
    if (!GetConsoleMode(g_stdoutHandle, &mode)) {
70
        exit(GetLastError());
71
    }
72

73
    g_consoleMode = mode;
74

75
    // Enable ANSI escape codes
76
    mode |= ENABLE_VIRTUAL_TERMINAL_PROCESSING;
77

78
    if (!SetConsoleMode(g_stdoutHandle, mode)) {
79
        exit(GetLastError());
80
    }
81
}
82

83
void resetAfterAnsiEscape(void) {
84
    // Reset colors
85
    printf("\x1b[0m");
86

87
    // Reset console mode
88
    if (!SetConsoleMode(g_stdoutHandle, g_consoleMode)) {
89
        exit(GetLastError());
90
    }
91
}
92

93
unsigned int taosRandom() {
94
    unsigned int number;
95
    rand_s(&number);
96

97
    return number;
98
}
99
#else  // Not windows
100
void setupForAnsiEscape(void) {}
×
101

102
void resetAfterAnsiEscape(void) {
×
103
    // Reset colors
104
    printf("\x1b[0m");
×
105
}
×
106

107
FORCE_INLINE unsigned int taosRandom() { return (unsigned int)rand(); }
248,097,951✔
108
#endif
109

110
void swapItem(char** names, int32_t i, int32_t j ) {
10✔
111
    debugPrint("swap item i=%d (%s) j=%d (%s)\n", i, names[i], j, names[j]);
10✔
112
    char * p = names[i];
10✔
113
    names[i] = names[j];
10✔
114
    names[j] = p;
10✔
115
}
10✔
116

117
int getAllChildNameOfSuperTable(TAOS *taos, char *dbName, char *stbName,
3✔
118
        char ** childTblNameOfSuperTbl,
119
        int64_t childTblCountOfSuperTbl) {
120
    char cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
3✔
121
    snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
3✔
122
             "select distinct tbname from %s.`%s` limit %" PRId64 "",
123
            dbName, stbName, childTblCountOfSuperTbl);
124
    TAOS_RES *res = taos_query(taos, cmd);
3✔
125
    int32_t   code = taos_errno(res);
3✔
126
    int64_t   count = 0;
3✔
127
    if (code) {
3✔
128
        printErrCmdCodeStr(cmd, code, res);
129
        return -1;
×
130
    }
131
    TAOS_ROW row = NULL;
3✔
132
    while ((row = taos_fetch_row(res)) != NULL) {
26✔
133
        if (0 == strlen((char *)(row[0]))) {
23✔
134
            errorPrint("No.%" PRId64 " table return empty name\n",
×
135
                    count);
136
            return -1;
×
137
        }
138
        int32_t * lengths = taos_fetch_lengths(res);
23✔
139
        childTblNameOfSuperTbl[count] =
23✔
140
            benchCalloc(1, TSDB_TABLE_NAME_LEN + 3, true);
23✔
141
        childTblNameOfSuperTbl[count][0] = '`';
23✔
142
        strncpy(childTblNameOfSuperTbl[count] + 1, row[0], lengths[0]);
23✔
143
        childTblNameOfSuperTbl[count][lengths[0] + 1] = '`';
23✔
144
        childTblNameOfSuperTbl[count][lengths[0] + 2] = '\0';
23✔
145
        debugPrint("childTblNameOfSuperTbl[%" PRId64 "]: %s\n", count,
23✔
146
                childTblNameOfSuperTbl[count]);
147
        count++;
23✔
148
    }
149
    taos_free_result(res);
3✔
150

151
    // random swap order
152
    if (count < 4) {
3✔
153
        return 0;
1✔
154
    }
155

156
    int32_t swapCnt = count/2;
2✔
157
    for(int32_t i = 0; i < swapCnt; i++ ) {
12✔
158
        int32_t j = swapCnt + RD(swapCnt);
20✔
159
        swapItem(childTblNameOfSuperTbl, i, j);
10✔
160
    }
161
    return 0;
2✔
162
}
163

164
int convertHostToServAddr(char *host, uint16_t port,
254✔
165
        struct sockaddr_in *serv_addr) {
166
    if (!host) {
254✔
167
        errorPrint("%s", "convertHostToServAddr host is null.");
×
168
        return -1;
×
169
    }
170
    debugPrint("convertHostToServAddr(host: %s, port: %d)\n", host,
254✔
171
            port);
172
#ifdef WINDOWS
173
    WSADATA wsaData;
174
    int ret = WSAStartup(MAKEWORD(2, 2), &wsaData);
175
    if (ret) {
176
        return ret;
177
    }
178
#endif
179
    struct hostent *server = gethostbyname(host);
254✔
180
    if ((server == NULL) || (server->h_addr == NULL)) {
254✔
181
        errorPrint("%s", "no such host");
×
182
        return -1;
×
183
    }
184
    memset(serv_addr, 0, sizeof(struct sockaddr_in));
254✔
185
    serv_addr->sin_family = AF_INET;
254✔
186
    serv_addr->sin_port = htons(port);
254✔
187

188
#ifdef WINDOWS
189
    struct addrinfo  hints = {0};
190
    hints.ai_family = AF_INET;
191
    hints.ai_socktype = SOCK_STREAM;
192

193
    struct addrinfo *pai = NULL;
194

195
    if (!getaddrinfo(server->h_name, NULL, &hints, &pai)) {
196
        serv_addr->sin_addr.s_addr =
197
               ((struct sockaddr_in *) pai->ai_addr)->sin_addr.s_addr;
198
        freeaddrinfo(pai);
199
    }
200
    WSACleanup();
201
#else
202
    serv_addr->sin_addr.s_addr = inet_addr(host);
254✔
203
    memcpy(&(serv_addr->sin_addr.s_addr), server->h_addr, server->h_length);
254✔
204
#endif
205
    return 0;
254✔
206
}
207

208
void prompt(bool nonStopMode) {
581✔
209
    if (!g_arguments->answer_yes) {
581✔
210
        g_arguments->in_prompt = true;
7✔
211
        if (nonStopMode) {
7✔
212
            printf(
×
213
                    "\n\n         Current is the Non-Stop insertion mode. "
214
                    "benchmark will continuously "
215
                    "insert data unless you press "
216
                    "Ctrl-C to end it.\n\n         "
217
                    "press enter key to continue and "
218
                    "Ctrl-C to "
219
                    "stop\n\n");
220
            (void)getchar();
×
221
        } else {
222
            printf(
7✔
223
                    "\n\n         Press enter key to continue or Ctrl-C to "
224
                    "stop\n\n");
225
            (void)getchar();
7✔
226
        }
227
        g_arguments->in_prompt = false;
7✔
228
    }
229
}
581✔
230

231
static void appendResultBufToFile(char *resultBuf, char * filePath) {
313✔
232
    FILE* fp = fopen(filePath, "at");
313✔
233
    if (fp == NULL) {
313✔
234
        errorPrint(
×
235
                "failed to open result file: %s, result will not save "
236
                "to file\n", filePath);
237
        return;
×
238
    }
239
    fprintf(fp, "%s", resultBuf);
313✔
240
    tmfclose(fp);
241
}
242

243
int32_t replaceChildTblName(char *inSql, char *outSql, int tblIndex) {
7✔
244
    // child table mark
245
    char mark[32] = "xxxx";
7✔
246
    char *pos = strstr(inSql, mark);
7✔
247
    if (0 == pos) {
7✔
NEW
248
        errorPrint("sql format error, sql not found mark string '%s'", mark);
×
NEW
249
        return -1;
×
250
    }        
251

252
    char subTblName[TSDB_TABLE_NAME_LEN];
253
    snprintf(subTblName, TSDB_TABLE_NAME_LEN,
7✔
254
            "`%s`.`%s`", g_queryInfo.dbName,
255
            g_queryInfo.superQueryInfo.childTblName[tblIndex]);
7✔
256

257
    int32_t len = pos - inSql + 1;
7✔
258
    tstrncpy(outSql, inSql, len);
7✔
259
    snprintf(outSql + len, TSDB_MAX_ALLOWED_SQL_LEN - 1,
7✔
260
             "%s%s", subTblName, pos + strlen(mark));
7✔
261
    return 0;         
262
}
263

72✔
264
int64_t toolsGetTimestamp(int32_t precision) {
72✔
265
    if (precision == TSDB_TIME_PRECISION_MICRO) {
×
266
        return toolsGetTimestampUs();
72✔
267
    } else if (precision == TSDB_TIME_PRECISION_NANO) {
×
268
        return toolsGetTimestampNs();
269
    } else {
72✔
270
        return toolsGetTimestampMs();
271
    }
272
}
273

193✔
274
int regexMatch(const char *s, const char *reg, int cflags) {
275
    regex_t regex;
193✔
276
    char    msgbuf[100] = {0};
277

278
    /* Compile regular expression */
193✔
279
    if (regcomp(&regex, reg, cflags) != 0)
×
280
        ERROR_EXIT("Failed to regex compile\n");
281

282
    /* Execute regular expression */
193✔
283
    int reti = regexec(&regex, s, 0, NULL, 0);
193✔
284
    if (!reti) {
3✔
285
        regfree(&regex);
3✔
286
        return 1;
190✔
287
    } else if (reti == REG_NOMATCH) {
190✔
288
        regfree(&regex);
190✔
289
        return 0;
290
    } else {
×
291
        regerror(reti, &regex, msgbuf, sizeof(msgbuf));
×
292
        regfree(&regex);
×
293
        printf("Regex match failed: %s\n", msgbuf);
×
294
        exit(EXIT_FAILURE);
295
    }
296
    return 0;
297
}
298

299

300

301

1,413✔
302
SBenchConn* initBenchConnImpl() {
1,413✔
303
    SBenchConn* conn = benchCalloc(1, sizeof(SBenchConn), true);
304
#ifdef WEBSOCKET
1,413✔
305
    if (g_arguments->websocket) {
29✔
306
        conn->taos_ws = ws_connect(g_arguments->dsn);
29✔
307
        char maskedDsn[256] = "\0";
29✔
308
        memcpy(maskedDsn, g_arguments->dsn, 20);
29✔
309
        memcpy(maskedDsn+20, "...", 3);
29✔
310
        memcpy(maskedDsn+23,
29✔
311
               g_arguments->dsn + strlen(g_arguments->dsn)-10, 10);
29✔
312
        if (conn->taos_ws == NULL) {
1✔
313
            errorPrint("failed to connect %s, reason: %s\n",
314
                    maskedDsn, ws_errstr(NULL));
315
            tmfree(conn);
1✔
316
            return NULL;
317
        }
318

28✔
319
        succPrint("%s conneced\n", maskedDsn);
320
    } else {
321
#endif
2,768✔
322
        conn->taos = taos_connect(g_arguments->host,
1,384✔
323
                g_arguments->user, g_arguments->password,
1,384✔
324
                NULL, g_arguments->port);
1,384✔
325
        if (conn->taos == NULL) {
3✔
326
            errorPrint("failed to connect native %s:%d, "
327
                       "code: 0x%08x, reason: %s\n",
328
                    g_arguments->host, g_arguments->port,
329
                    taos_errno(NULL), taos_errstr(NULL));
330
            tmfree(conn);
3✔
331
            return NULL;
332
        }
333

1,381✔
334
        conn->ctaos = taos_connect(g_arguments->host,
1,381✔
335
                                   g_arguments->user,
1,381✔
336
                                   g_arguments->password,
1,381✔
337
                                   NULL, g_arguments->port);
338
#ifdef WEBSOCKET
339
    }
340
#endif
1,409✔
341
    return conn;
342
}
343

1,413✔
344
SBenchConn* initBenchConn() {
345

1,413✔
346
    SBenchConn* conn = NULL;
1,413✔
347
    int32_t keep_trying = 0;
348
    while(1) {
1,413✔
349
        conn = initBenchConnImpl();
1,413✔
350
        if(conn || ++keep_trying > g_arguments->keep_trying  || g_arguments->terminate) {
351
            break;
352
        }
353

×
354
        infoPrint("sleep %dms and try to connect... %d  \n", g_arguments->trying_interval, keep_trying);
×
355
        if(g_arguments->trying_interval > 0) {
×
356
            toolsMsleep(g_arguments->trying_interval);
357
        }        
358
    } 
359

1,413✔
360
    return conn;
361
}
362

1,409✔
363
void closeBenchConn(SBenchConn* conn) {
1,409✔
364
    if(conn == NULL)
×
365
       return ;
366
#ifdef WEBSOCKET
1,409✔
367
    if (g_arguments->websocket) {
28✔
368
        ws_close(conn->taos_ws);
369
    } else {
370
#endif
1,381✔
371
        if(conn->taos) {
1,381✔
372
            taos_close(conn->taos);
1,381✔
373
            conn->taos = NULL;
374
        }
1,381✔
375
        if (conn->ctaos) {
1,381✔
376
            taos_close(conn->ctaos);
1,381✔
377
            conn->ctaos = NULL;
378
        }
379
#ifdef WEBSOCKET
380
    }
381
#endif
382
    tmfree(conn);
383
}
384

21✔
385
int32_t queryDbExecRest(char *command, char* dbName, int precision,
386
                    int iface, int protocol, bool tcp, int sockfd) {
21✔
387
    int32_t code = postProceSql(command,
388
                         dbName,
389
                         precision,
390
                         iface,
391
                         protocol,
21✔
392
                         g_arguments->port,
393
                         tcp,
394
                         sockfd,
395
                         NULL);
20✔
396
    return code;
397
}
398

66,222✔
399
int32_t queryDbExecCall(SBenchConn *conn, char *command) {
66,222✔
400
    int32_t code = 0;
401
#ifdef WEBSOCKET
66,222✔
402
    if (g_arguments->websocket) {
1,028✔
403
        WS_RES* res = ws_query_timeout(conn->taos_ws,
1,028✔
404
                                       command, g_arguments->timeout);
1,029✔
405
        code = ws_errno(res);
1,027✔
406
        if (code != 0) {
×
407
            errorPrint("Failed to execute <%s>, code: 0x%08x, reason: %s\n",
408
                       command, code, ws_errstr(res));
409
        }
1,027✔
410
        ws_free_result(res);
411
    } else {
412
#endif
65,194✔
413
        TAOS_RES *res = taos_query(conn->taos, command);
65,198✔
414
        code = taos_errno(res);
65,183✔
415
        if (code) {
416
            printErrCmdCodeStr(command, code, res);
417
        } else {
65,170✔
418
            taos_free_result(res);
419
        }
420
#ifdef WEBSOCKET
421
    }
422
#endif
66,225✔
423
    return code;
424
}
425

176✔
426
void encodeAuthBase64() {
427
    char        userpass_buf[INPUT_BUF_LEN];
428
    static char base64[] = {
429
        'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
430
        'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
431
        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
432
        'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
433
        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '+', '/'};
176✔
434
    snprintf(userpass_buf, INPUT_BUF_LEN, "%s:%s", g_arguments->user,
176✔
435
            g_arguments->password);
436

176✔
437
    int mod_table[] = {0, 2, 1};
438

176✔
439
    size_t userpass_buf_len = strlen(userpass_buf);
176✔
440
    size_t encoded_len = 4 * ((userpass_buf_len + 2) / 3);
441

176✔
442
    memset(g_arguments->base64_buf, 0, INPUT_BUF_LEN);
1,061✔
443
    for (int n = 0, m = 0; n < userpass_buf_len;) {
885✔
444
        uint32_t oct_a =
885✔
445
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
885✔
446
        uint32_t oct_b =
885✔
447
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
885✔
448
        uint32_t oct_c =
885✔
449
            n < userpass_buf_len ? (unsigned char)userpass_buf[n++] : 0;
885✔
450
        uint32_t triple = (oct_a << 0x10) + (oct_b << 0x08) + oct_c;
451

885✔
452
        g_arguments->base64_buf[m++] = base64[(triple >> 3 * 6) & 0x3f];
885✔
453
        g_arguments->base64_buf[m++] = base64[(triple >> 2 * 6) & 0x3f];
885✔
454
        g_arguments->base64_buf[m++] = base64[(triple >> 1 * 6) & 0x3f];
885✔
455
        g_arguments->base64_buf[m++] = base64[(triple >> 0 * 6) & 0x3f];
456
    }
457

527✔
458
    for (int l = 0; l < mod_table[userpass_buf_len % 3]; l++)
351✔
459
        g_arguments->base64_buf[encoded_len - 1 - l] = '=';
176✔
460
}
461

334✔
462
int postProceSqlImpl(char *sqlstr, char* dbName, int precision, int iface,
463
                     int protocol, uint16_t rest_port, bool tcp, int sockfd,
464
                     char* filePath,
465
                     char *responseBuf, int64_t response_length) {
334✔
466
    int32_t      code = -1;
334✔
467
    char *       req_fmt =
468
        "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: "
469
        "Basic %s\r\nContent-Length: %d\r\nContent-Type: "
470
        "application/x-www-form-urlencoded\r\n\r\n%s";
334✔
471
    char url[URL_BUFF_LEN] = {0};
334✔
472
    if (iface == REST_IFACE) {
173✔
473
        snprintf(url, URL_BUFF_LEN, "/rest/sql/%s", dbName);
161✔
474
    } else if (iface == SML_REST_IFACE
161✔
475
            && protocol == TSDB_SML_LINE_PROTOCOL) {
50✔
476
        snprintf(url, URL_BUFF_LEN,
477
                 "/influxdb/v1/write?db=%s&precision=%s", dbName,
478
                precision == TSDB_TIME_PRECISION_MILLI
479
                ? "ms"
480
                : precision == TSDB_TIME_PRECISION_NANO
481
                ? "ns"
×
482
                : "u");
111✔
483
    } else if (iface == SML_REST_IFACE
111✔
484
            && protocol == TSDB_SML_TELNET_PROTOCOL) {
65✔
485
        snprintf(url, URL_BUFF_LEN, "/opentsdb/v1/put/telnet/%s", dbName);
46✔
486
    } else if (iface == SML_REST_IFACE
46✔
487
            && (protocol == TSDB_SML_JSON_PROTOCOL
1✔
488
                || protocol == SML_JSON_TAOS_FORMAT)) {
46✔
489
        snprintf(url, URL_BUFF_LEN, "/opentsdb/v1/put/json/%s", dbName);
490
    }
491

492
    int      bytes, sent, received, req_str_len, resp_len;
334✔
493
    char *   request_buf = NULL;
334✔
494
    int req_buf_len = (int)strlen(sqlstr) + REQ_EXTRA_BUF_LEN;
495

334✔
496
    if (g_arguments->terminate) {
×
497
        goto free_of_postImpl;
498
    }
334✔
499
    request_buf = benchCalloc(1, req_buf_len, false);
500

501
    int r;
334✔
502
    if (protocol == TSDB_SML_TELNET_PROTOCOL && tcp) {
32✔
503
        r = snprintf(request_buf, req_buf_len, "%s", sqlstr);
504
    } else {
302✔
505
        r = snprintf(request_buf, req_buf_len, req_fmt, url, g_arguments->host,
302✔
506
                rest_port, g_arguments->base64_buf, strlen(sqlstr),
507
                sqlstr);
508
    }
334✔
509
    if (r >= req_buf_len) {
×
510
        free(request_buf);
×
511
        ERROR_EXIT("too long request");
512
    }
513

334✔
514
    req_str_len = (int)strlen(request_buf);
334✔
515
    debugPrint("request buffer: %s\n", request_buf);
334✔
516
    sent = 0;
517
    do {
668✔
518
        bytes = send(sockfd, request_buf + sent,
334✔
519
                req_str_len - sent, 0);
334✔
520
        if (bytes < 0) {
×
521
            errorPrint("%s", "writing no message to socket\n");
×
522
            goto free_of_postImpl;
523
        }
334✔
524
        if (bytes == 0) break;
334✔
525
        sent += bytes;
334✔
526
    } while ((sent < req_str_len) && !g_arguments->terminate);
527

334✔
528
    if (protocol == TSDB_SML_TELNET_PROTOCOL
65✔
529
            && iface == SML_REST_IFACE && tcp) {
32✔
530
        code = 0;
32✔
531
        goto free_of_postImpl;
532
    }
533

302✔
534
    resp_len = response_length - 1;
302✔
535
    received = 0;
536

302✔
537
    bool chunked = false;
538

302✔
539
    if (g_arguments->terminate) {
×
540
        goto free_of_postImpl;
541
    }
542
    do {
818✔
543
        bytes = recv(sockfd, responseBuf + received,
409✔
544
                resp_len - received, 0);
409✔
545
        if (bytes <= 0) {
×
546
            errorPrint("%s", "reading no response from socket\n");
×
547
            goto free_of_postImpl;
548
        }
409✔
549
        responseBuf[resp_len] = 0;
409✔
550
        debugPrint("response buffer: %s bytes=%d\n", responseBuf, bytes);
409✔
551
        if (NULL != strstr(responseBuf, resEncodingChunk)) {
173✔
552
            chunked = true;
553
        }
409✔
554
        int64_t index = strlen(responseBuf) - 1;
1,729✔
555
        while (responseBuf[index] == '\n' || responseBuf[index] == '\r') {
1,320✔
556
            if (index == 0) {
×
557
                break;
558
            }
1,320✔
559
            index--;
560
        }
409✔
561
        debugPrint("index: %" PRId64 "\n", index);
409✔
562
        if (chunked && responseBuf[index] == '0') {
15✔
563
            code = 0;
15✔
564
            break;
565
        }
394✔
566
        if (!chunked && responseBuf[index] == '}') {
×
567
            code = 0;
×
568
            break;
569
        }
570

394✔
571
        received += bytes;
572

394✔
573
        if (g_arguments->test_mode == INSERT_TEST) {
394✔
574
            if (strlen(responseBuf)) {
394✔
575
                if (((NULL != strstr(responseBuf, resEncodingChunk)) &&
158✔
576
                            (NULL != strstr(responseBuf, resHttp))) ||
236✔
577
                        ((NULL != strstr(responseBuf, resHttpOk)) ||
236✔
578
                         (NULL != strstr(responseBuf, influxHttpOk)) ||
107✔
579
                         (NULL != strstr(responseBuf, opentsdbHttpOk)))) {
580
                    break;
581
                }
582
            }
583
        }
107✔
584
    } while ((received < resp_len) && !g_arguments->terminate);
585

302✔
586
    if (received == resp_len) {
×
587
        errorPrint("%s", "storing complete response from socket\n");
×
588
        goto free_of_postImpl;
589
    }
590

302✔
591
    if (NULL == strstr(responseBuf, resHttpOk) &&
129✔
592
            NULL == strstr(responseBuf, influxHttpOk) &&
×
593
            NULL == strstr(responseBuf, succMessage) &&
×
594
            NULL == strstr(responseBuf, opentsdbHttpOk)) {
×
595
        errorPrint("Response:\n%s\n", responseBuf);
×
596
        goto free_of_postImpl;
597
    }
598

302✔
599
    code = 0;
334✔
600
free_of_postImpl:
334✔
UNCOV
601
    if (filePath && strlen(filePath) > 0 && !g_arguments->terminate) {
×
602
        appendResultBufToFile(responseBuf, filePath);
603
    }
604
    tmfree(request_buf);
334✔
605
    return code;
606
}
607

18✔
608
static int getServerVersionRestImpl(int16_t rest_port, int sockfd) {
18✔
609
    int server_ver = -1;
18✔
610
    char       command[SHORT_1K_SQL_BUFF_LEN] = "\0";
18✔
611
    snprintf(command, SHORT_1K_SQL_BUFF_LEN, "SELECT SERVER_VERSION()");
18✔
612
    char *responseBuf = benchCalloc(1, RESP_BUF_LEN, false);
18✔
613
    int code = postProceSqlImpl(command,
614
                                NULL,
615
                                0,
616
                                REST_IFACE,
617
                                0,
618
                                rest_port,
619
                                false,
620
                                sockfd,
621
                                NULL, responseBuf, RESP_BUF_LEN);
18✔
622
    if (code != 0) {
×
623
        errorPrint("Failed to execute command: %s\n", command);
×
624
        goto free_of_getversion;
625
    }
18✔
626
    debugPrint("response buffer: %s\n", responseBuf);
18✔
627
    if (NULL != strstr(responseBuf, resHttpOk)) {
18✔
628
        char* start = strstr(responseBuf, "{");
18✔
629
        if (start == NULL) {
×
630
            errorPrint("Invalid response format: %s\n", responseBuf);
×
631
            goto free_of_getversion;
632
        }
18✔
633
        tools_cJSON* resObj = tools_cJSON_Parse(start);
18✔
634
        if (resObj == NULL) {
×
635
            errorPrint("Cannot parse response into json: %s\n", start);
636
        }
18✔
637
        tools_cJSON* dataObj = tools_cJSON_GetObjectItem(resObj, "data");
18✔
638
        if (!tools_cJSON_IsArray(dataObj)) {
×
639
            char* pstr = tools_cJSON_Print(resObj);
×
640
            errorPrint("Invalid or miss 'data' key in json: %s\n", pstr ? pstr : "null");
641
            tmfree(pstr);
×
642
            tools_cJSON_Delete(resObj);
×
643
            goto free_of_getversion;
644
        }
18✔
645
        tools_cJSON *versionObj = tools_cJSON_GetArrayItem(dataObj, 0);
18✔
646
        tools_cJSON *versionStrObj = tools_cJSON_GetArrayItem(versionObj, 0);
18✔
647
        server_ver = atoi(versionStrObj->valuestring);
18✔
648
        char* pstr = tools_cJSON_Print(versionStrObj);        
18✔
649
        debugPrint("versionStrObj: %s, version: %s, server_ver: %d\n",
650
                   pstr ? pstr : "null",
651
                   versionStrObj->valuestring, server_ver);
652
        tmfree(pstr);
18✔
653
        tools_cJSON_Delete(resObj);
654
    }
×
655
free_of_getversion:
18✔
656
    free(responseBuf);
18✔
657
    return server_ver;
658
}
659

18✔
660
int getServerVersionRest(int16_t rest_port) {
18✔
661
    int sockfd = createSockFd();
18✔
662
    if (sockfd < 0) {
×
663
        return -1;
664
    }
665

18✔
666
    int server_version = getServerVersionRestImpl(rest_port, sockfd);
667

18✔
668
    destroySockFd(sockfd);
18✔
669
    return server_version;
670
}
671

155✔
672
static int getCodeFromResp(char *responseBuf) {
155✔
673
    int code = -1;
155✔
674
    char* start = strstr(responseBuf, "{");
155✔
675
    if (start == NULL) {
×
676
        errorPrint("Invalid response format: %s\n", responseBuf);
×
677
        return -1;
678
    }
155✔
679
    tools_cJSON* resObj = tools_cJSON_Parse(start);
155✔
680
    if (resObj == NULL) {
×
681
        errorPrint("Cannot parse response into json: %s\n", start);
×
682
        return -1;
683
    }
155✔
684
    tools_cJSON* codeObj = tools_cJSON_GetObjectItem(resObj, "code");
154✔
685
    if (!tools_cJSON_IsNumber(codeObj)) {
×
686
        char* pstr = tools_cJSON_Print(resObj);
×
687
        errorPrint("Invalid or miss 'code' key in json: %s\n", pstr ? pstr : "null");
688
        tmfree(pstr);
×
689
        tools_cJSON_Delete(resObj);
×
690
        return -1;
691
    }
692

154✔
693
    code = codeObj->valueint;
694

154✔
695
    if (codeObj->valueint != 0) {
×
696
        tools_cJSON* desc = tools_cJSON_GetObjectItem(resObj, "desc");
×
697
        if (!tools_cJSON_IsString(desc)) {
×
698
            char* pstr = tools_cJSON_Print(resObj);
×
699
            errorPrint("Invalid or miss 'desc' key in json: %s\n", pstr ? pstr : "null");
700
            tmfree(pstr);
×
701
            return -1;
702
        }
×
703
        errorPrint("response, code: %d, reason: %s\n",
704
                   (int)codeObj->valueint, desc->valuestring);
705
    }
706

154✔
707
    tools_cJSON_Delete(resObj);
155✔
708
    return code;
709
}
710

316✔
711
int postProceSql(char *sqlstr, char* dbName, int precision, int iface,
712
                 int protocol, uint16_t rest_port,
713
                 bool tcp, int sockfd, char* filePath) {
714
    uint64_t response_length;
316✔
715
    if (g_arguments->test_mode == INSERT_TEST) {
316✔
716
        response_length = RESP_BUF_LEN;
UNCOV
717
    } else {
×
718
        response_length = g_queryInfo.response_buffer;
719
    }
720

316✔
721
    char *responseBuf = benchCalloc(1, response_length, false);
316✔
722
    int code = postProceSqlImpl(sqlstr, dbName, precision, iface, protocol,
723
                                rest_port,
724
                                tcp, sockfd, filePath, responseBuf,
725
                                response_length);
726
    // compatibility 2.6
316✔
727
    if (-1 == g_arguments->rest_server_ver_major) {
UNCOV
728
        // confirm version is 2.x according to "succ"
×
729
        if (NULL != strstr(responseBuf, succMessage) && iface == REST_IFACE) {
×
730
            g_arguments->rest_server_ver_major = 2;
731
        }
732
    }
733

316✔
734
    if (NULL != strstr(responseBuf, resHttpOk) && iface == REST_IFACE) {
735
        // if taosd is not starting , rest_server_ver_major can't be got by 'select server_version()' , so is -1
155✔
736
        if (-1 == g_arguments->rest_server_ver_major || 3 <= g_arguments->rest_server_ver_major) {
155✔
737
            code = getCodeFromResp(responseBuf);
738
        } else {
×
739
            code = 0;
740
        }
154✔
741
        goto free_of_post;
742
    }
743

161✔
744
    if (2 == g_arguments->rest_server_ver_major) {
×
745
        if (NULL != strstr(responseBuf, succMessage) && iface == REST_IFACE) {
×
746
            code = getCodeFromResp(responseBuf);
747
        } else {
×
748
            code = 0;
749
        }
×
750
        goto free_of_post;
751
    }
752

161✔
753
    if (NULL != strstr(responseBuf, influxHttpOk) &&
50✔
754
            protocol == TSDB_SML_LINE_PROTOCOL && iface == SML_REST_IFACE) {
50✔
755
        code = 0;
50✔
756
        goto free_of_post;
757
    }
758

111✔
759
    if (NULL != strstr(responseBuf, opentsdbHttpOk)
×
760
            && (protocol == TSDB_SML_TELNET_PROTOCOL
×
761
            || protocol == TSDB_SML_JSON_PROTOCOL
×
762
            || protocol == SML_JSON_TAOS_FORMAT)
×
763
            && iface == SML_REST_IFACE) {
×
764
        code = 0;
×
765
        goto free_of_post;
766
    }
767

111✔
768
    if (g_arguments->test_mode == INSERT_TEST) {
111✔
769
        debugPrint("Response: \n%s\n", responseBuf);
111✔
770
        char* start = strstr(responseBuf, "{");
111✔
771
        if ((start == NULL)
111✔
772
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
46✔
773
                && (TSDB_SML_JSON_PROTOCOL != protocol)
1✔
774
                && (SML_JSON_TAOS_FORMAT != protocol)
775
                ) {
×
776
            errorPrint("Invalid response format: %s\n", responseBuf);
×
777
            goto free_of_post;
778
        }
111✔
779
        tools_cJSON* resObj = tools_cJSON_Parse(start);
111✔
780
        if ((resObj == NULL)
111✔
781
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
46✔
782
                && (TSDB_SML_JSON_PROTOCOL != protocol)
1✔
783
                && (SML_JSON_TAOS_FORMAT != protocol)
784
                ) {
×
785
            errorPrint("Cannot parse response into json: %s\n", start);
786
        }
111✔
787
        tools_cJSON* codeObj = tools_cJSON_GetObjectItem(resObj, "code");
111✔
788
        if ((!tools_cJSON_IsNumber(codeObj))
111✔
789
                && (TSDB_SML_TELNET_PROTOCOL != protocol)
46✔
790
                && (TSDB_SML_JSON_PROTOCOL != protocol)
1✔
791
                && (SML_JSON_TAOS_FORMAT != protocol)
792
                ) {
×
793
            char* pstr = tools_cJSON_Print(resObj);
×
794
            errorPrint("Invalid or miss 'code' key in json: %s\n", pstr ? pstr : "null");
795
            tmfree(pstr);
×
796
            tools_cJSON_Delete(resObj);
×
797
            goto free_of_post;
798
        }
799

111✔
800
        if ((SML_REST_IFACE == iface) && codeObj
×
801
                && (200 == codeObj->valueint)) {
×
802
            code = 0;
×
803
            tools_cJSON_Delete(resObj);
×
804
            goto free_of_post;
805
        }
806

111✔
807
        if ((iface == SML_REST_IFACE)
111✔
808
                && (protocol == TSDB_SML_LINE_PROTOCOL)
×
809
                && codeObj
×
810
                && (codeObj->valueint != 0) && (codeObj->valueint != 200)) {
×
811
            tools_cJSON* desc = tools_cJSON_GetObjectItem(resObj, "desc");
×
812
            if (!tools_cJSON_IsString(desc)) {
×
813
                char* pstr = tools_cJSON_Print(resObj);
×
814
                errorPrint("Invalid or miss 'desc' key in json: %s\n", pstr ? pstr : "null");
815
                tmfree(pstr);
816
            } else {
×
817
                errorPrint("insert mode response, code: %d, reason: %s\n",
818
                       (int)codeObj->valueint, desc->valuestring);
819
            }
820
        } else {
111✔
821
            code = 0;
822
        }
111✔
823
        tools_cJSON_Delete(resObj);
824
    }
×
825
free_of_post:
316✔
826
    free(responseBuf);
316✔
827
    return code;
828
}
829

830
// fetch result fo file or nothing
879✔
831
int64_t fetchResult(TAOS_RES *res, char * filePath) {
879✔
832
    TAOS_ROW    row        = NULL;
879✔
833
    int         num_fields = 0;
879✔
834
    int64_t     totalLen   = 0;
879✔
835
    TAOS_FIELD *fields     = 0;
879✔
836
    int64_t     rows       = 0;
879✔
837
    char       *databuf    = NULL;
879✔
838
    bool        toFile     = strlen(filePath) > 0;
839
    
840

879✔
841
    if(toFile) {
313✔
842
        num_fields = taos_field_count(res);
313✔
843
        fields     = taos_fetch_fields(res);
313✔
844
        databuf    = (char *)benchCalloc(1, FETCH_BUFFER_SIZE, true);
845
    }
846

847
    // fetch the records row by row
2,028✔
848
    while ((row = taos_fetch_row(res))) {
1,149✔
849
        if (toFile) {
313✔
850
            if (totalLen >= (FETCH_BUFFER_SIZE - HEAD_BUFF_LEN * 2)) {
NEW
851
                // buff is full
×
852
                appendResultBufToFile(databuf, filePath);
×
853
                totalLen = 0;
×
854
                memset(databuf, 0, FETCH_BUFFER_SIZE);
855
            }
856

857
            // format row
313✔
858
            char temp[HEAD_BUFF_LEN] = {0};
313✔
859
            int  len = taos_print_row(temp, row, fields, num_fields);
313✔
860
            len += snprintf(temp + len, HEAD_BUFF_LEN - len, "\n");
861
            //debugPrint("query result:%s\n", temp);
313✔
862
            memcpy(databuf + totalLen, temp, len);
313✔
863
            totalLen += len;
864
        }
1,149✔
865
        rows ++;
866
        //if not toFile , only loop call taos_fetch_row
867
    }
868

869
    // end
879✔
870
    if (toFile) {
313✔
871
        appendResultBufToFile(databuf, filePath);
313✔
872
        free(databuf);
873
    }
879✔
874
    return rows;
875
}
876

2,135✔
877
char *convertDatatypeToString(int type) {
2,135✔
878
    switch (type) {
248✔
879
        case TSDB_DATA_TYPE_BINARY:
248✔
880
            return "binary";
42✔
881
        case TSDB_DATA_TYPE_NCHAR:
42✔
882
            return "nchar";
41✔
883
        case TSDB_DATA_TYPE_TIMESTAMP:
41✔
884
            return "timestamp";
319✔
885
        case TSDB_DATA_TYPE_TINYINT:
319✔
886
            return "tinyint";
43✔
887
        case TSDB_DATA_TYPE_UTINYINT:
43✔
888
            return "tinyint unsigned";
276✔
889
        case TSDB_DATA_TYPE_SMALLINT:
276✔
890
            return "smallint";
43✔
891
        case TSDB_DATA_TYPE_USMALLINT:
43✔
892
            return "smallint unsigned";
436✔
893
        case TSDB_DATA_TYPE_INT:
436✔
894
            return "int";
54✔
895
        case TSDB_DATA_TYPE_UINT:
54✔
896
            return "int unsigned";
276✔
897
        case TSDB_DATA_TYPE_BIGINT:
276✔
898
            return "bigint";
43✔
899
        case TSDB_DATA_TYPE_UBIGINT:
43✔
900
            return "bigint unsigned";
46✔
901
        case TSDB_DATA_TYPE_BOOL:
46✔
902
            return "bool";
155✔
903
        case TSDB_DATA_TYPE_FLOAT:
155✔
904
            return "float";
113✔
905
        case TSDB_DATA_TYPE_DOUBLE:
113✔
906
            return "double";
×
907
        case TSDB_DATA_TYPE_JSON:
×
908
            return "json";    
×
909
        case TSDB_DATA_TYPE_VARBINARY:
×
910
            return "varbinary";
×
911
        case TSDB_DATA_TYPE_GEOMETRY:
×
912
            return "geometry";
×
913
        default:
×
914
            break;
915
    }
×
916
    return "unknown type";
917
}
918

1,363✔
919
int convertTypeToLength(uint8_t type) {
1,363✔
920
    int ret = 0;
1,363✔
921
    switch (type) {
239✔
922
        case TSDB_DATA_TYPE_TIMESTAMP:
923
        case TSDB_DATA_TYPE_UBIGINT:
924
        case TSDB_DATA_TYPE_BIGINT:
239✔
925
            ret = sizeof(int64_t);
239✔
926
            break;
351✔
927
        case TSDB_DATA_TYPE_BOOL:
928
        case TSDB_DATA_TYPE_TINYINT:
929
        case TSDB_DATA_TYPE_UTINYINT:
351✔
930
            ret = sizeof(int8_t);
351✔
931
            break;
194✔
932
        case TSDB_DATA_TYPE_SMALLINT:
933
        case TSDB_DATA_TYPE_USMALLINT:
194✔
934
            ret = sizeof(int16_t);
194✔
935
            break;
280✔
936
        case TSDB_DATA_TYPE_INT:
937
        case TSDB_DATA_TYPE_UINT:
280✔
938
            ret = sizeof(int32_t);
280✔
939
            break;
144✔
940
        case TSDB_DATA_TYPE_FLOAT:
144✔
941
            ret = sizeof(float);
144✔
942
            break;
126✔
943
        case TSDB_DATA_TYPE_DOUBLE:
126✔
944
            ret = sizeof(double);
126✔
945
            break;
1✔
946
        case TSDB_DATA_TYPE_JSON:
1✔
947
            ret = JSON_FIXED_LENGTH;
1✔
948
            break;
28✔
949
        default:
28✔
950
            break;
951
    }
1,363✔
952
    return ret;
953
}
954

1,500✔
955
int64_t convertDatatypeToDefaultMin(uint8_t type) {
1,500✔
956
    int64_t ret = 0;
1,500✔
957
    switch (type) {
110✔
958
        case TSDB_DATA_TYPE_BOOL:
959
        case TSDB_DATA_TYPE_GEOMETRY:
110✔
960
            ret = 0;
110✔
961
            break;
132✔
962
        case TSDB_DATA_TYPE_TINYINT:
132✔
963
            ret = -127;
132✔
964
            break;
102✔
965
        case TSDB_DATA_TYPE_SMALLINT:
102✔
966
            ret = -32767;
102✔
967
            break;
518✔
968
        case TSDB_DATA_TYPE_INT:
969
        case TSDB_DATA_TYPE_BIGINT:
970
        case TSDB_DATA_TYPE_FLOAT:
971
        case TSDB_DATA_TYPE_DOUBLE:
518✔
972
            ret = -1 * (RAND_MAX >> 1);
518✔
973
            break;
638✔
974
        default:
638✔
975
            break;
976
    }
1,500✔
977
    return ret;
978
}
979

1,507✔
980
int64_t convertDatatypeToDefaultMax(uint8_t type) {
1,507✔
981
    int64_t ret = 0;
1,507✔
982
    switch (type) {
110✔
983
        case TSDB_DATA_TYPE_BOOL:
110✔
984
            ret = 1;
110✔
985
            break;
132✔
986
        case TSDB_DATA_TYPE_TINYINT:
132✔
987
            ret = 128;
132✔
988
            break;
71✔
989
        case TSDB_DATA_TYPE_UTINYINT:
71✔
990
            ret = 254;
71✔
991
            break;
102✔
992
        case TSDB_DATA_TYPE_SMALLINT:
993
        case TSDB_DATA_TYPE_GEOMETRY:
102✔
994
            ret = 32767;
102✔
995
            break;
70✔
996
        case TSDB_DATA_TYPE_USMALLINT:
70✔
997
            ret = 65534;
70✔
998
            break;
525✔
999
        case TSDB_DATA_TYPE_INT:
1000
        case TSDB_DATA_TYPE_BIGINT:
1001
        case TSDB_DATA_TYPE_FLOAT:
1002
        case TSDB_DATA_TYPE_DOUBLE:
525✔
1003
            ret = RAND_MAX >> 1;
525✔
1004
            break;
188✔
1005
        case TSDB_DATA_TYPE_UINT:
1006
        case TSDB_DATA_TYPE_UBIGINT:
1007
        case TSDB_DATA_TYPE_TIMESTAMP:
188✔
1008
            ret = RAND_MAX;
188✔
1009
            break;
309✔
1010
        default:
309✔
1011
            break;
1012
    }
1,507✔
1013
    return ret;
1014
}
1015

1016
// compare str with length
12,673✔
1017
int32_t strCompareN(char *str1, char *str2, int length) {
12,673✔
1018
    if (length == 0) {
12,595✔
1019
        return strcasecmp(str1, str2);
1020
    } else {
78✔
1021
        return strncasecmp(str1, str2, length);
1022
    }
1023
}
1024

1,641✔
1025
int convertStringToDatatype(char *type, int length) {
1026
    // compare with length
1,641✔
1027
    if (0 == strCompareN(type, "binary", length)) {
160✔
1028
        return TSDB_DATA_TYPE_BINARY;
1,481✔
1029
    } else if (0 == strCompareN(type, "nchar", length)) {
110✔
1030
        return TSDB_DATA_TYPE_NCHAR;
1,371✔
1031
    } else if (0 == strCompareN(type, "timestamp", length)) {
45✔
1032
        return TSDB_DATA_TYPE_TIMESTAMP;
1,326✔
1033
    } else if (0 == strCompareN(type, "bool", length)) {
116✔
1034
        return TSDB_DATA_TYPE_BOOL;
1,210✔
1035
    } else if (0 == strCompareN(type, "tinyint", length)) {
154✔
1036
        return TSDB_DATA_TYPE_TINYINT;
1,056✔
1037
    } else if (0 == strCompareN(type, "utinyint", length)) {
81✔
1038
        return TSDB_DATA_TYPE_UTINYINT;
975✔
1039
    } else if (0 == strCompareN(type, "smallint", length)) {
113✔
1040
        return TSDB_DATA_TYPE_SMALLINT;
862✔
1041
    } else if (0 == strCompareN(type, "usmallint", length)) {
81✔
1042
        return TSDB_DATA_TYPE_USMALLINT;
781✔
1043
    } else if (0 == strCompareN(type, "int", length)) {
202✔
1044
        return TSDB_DATA_TYPE_INT;
579✔
1045
    } else if (0 == strCompareN(type, "uint", length)) {
84✔
1046
        return TSDB_DATA_TYPE_UINT;
495✔
1047
    } else if (0 == strCompareN(type, "bigint", length)) {
113✔
1048
        return TSDB_DATA_TYPE_BIGINT;
382✔
1049
    } else if (0 == strCompareN(type, "ubigint", length)) {
81✔
1050
        return TSDB_DATA_TYPE_UBIGINT;
301✔
1051
    } else if (0 == strCompareN(type, "float", length)) {
146✔
1052
        return TSDB_DATA_TYPE_FLOAT;
155✔
1053
    } else if (0 == strCompareN(type, "double", length)) {
126✔
1054
        return TSDB_DATA_TYPE_DOUBLE;
29✔
1055
    } else if (0 == strCompareN(type, "json", length)) {
2✔
1056
        return TSDB_DATA_TYPE_JSON;
27✔
1057
    } else if (0 == strCompareN(type, "varchar", length)) {
26✔
1058
        return TSDB_DATA_TYPE_BINARY;
1✔
1059
    } else if (0 == strCompareN(type, "varbinary", length)) {
×
1060
        return TSDB_DATA_TYPE_VARBINARY;
1✔
1061
    } else if (0 == strCompareN(type, "geometry", length)) {
×
1062
        return TSDB_DATA_TYPE_GEOMETRY;
1063
    } else {
1✔
1064
        errorPrint("unknown data type: %s\n", type);
1✔
1065
        exit(EXIT_FAILURE);
1066
    }
1067
}
1068

1069

550,794✔
1070
int compare(const void *a, const void *b) {
550,794✔
1071
    return *(int64_t *)a - *(int64_t *)b;
1072
}
1073

1074
//
1075
// --------------------  BArray operator -------------------
1076
//
1077

2,287✔
1078
BArray* benchArrayInit(size_t size, size_t elemSize) {
2,287✔
1079
    assert(elemSize > 0);
1080

2,287✔
1081
    if (size < BARRAY_MIN_SIZE) {
2,257✔
1082
        size = BARRAY_MIN_SIZE;
1083
    }
1084

2,287✔
1085
    BArray* pArray = (BArray *)benchCalloc(1, sizeof(BArray), true);
1086

2,287✔
1087
    pArray->size = 0;
2,287✔
1088
    pArray->pData = benchCalloc(size, elemSize, true);
1089

2,287✔
1090
    pArray->capacity = size;
2,287✔
1091
    pArray->elemSize = elemSize;
2,287✔
1092
    return pArray;
1093
}
1094

52,771✔
1095
static int32_t benchArrayEnsureCap(BArray* pArray, size_t newCap) {
52,771✔
1096
    if (newCap > pArray->capacity) {
675✔
1097
        size_t tsize = (pArray->capacity << 1u);
739✔
1098
        while (newCap > tsize) {
64✔
1099
            tsize = (tsize << 1u);
1100
        }
1101

675✔
1102
        void* pData = realloc(pArray->pData, tsize * pArray->elemSize);
675✔
1103
        if (pData == NULL) {
×
1104
            return -1;
1105
        }
675✔
1106
        pArray->pData = pData;
675✔
1107
        pArray->capacity = tsize;
1108
    }
52,771✔
1109
    return 0;
1110
}
1111

52,718✔
1112
void* benchArrayAddBatch(BArray* pArray, void* pData, int32_t elems, bool free) {
52,718✔
1113
    if (pData == NULL || elems <=0) {
×
1114
        return NULL;
1115
    }
1116

52,732✔
1117
    if (benchArrayEnsureCap(pArray, pArray->size + elems) != 0) {
×
1118
        return NULL;
1119
    }
1120

52,784✔
1121
    void* dst = BARRAY_GET_ELEM(pArray, pArray->size);
52,784✔
1122
    memcpy(dst, pData, pArray->elemSize * elems);
52,784✔
1123
    if (free) {
1124
        tmfree(pData); // TODO remove this
1125
    }
52,784✔
1126
    pArray->size += elems;
52,784✔
1127
    return dst;
1128
}
1129

51,486✔
1130
FORCE_INLINE void* benchArrayPush(BArray* pArray, void* pData) {
51,486✔
1131
    return benchArrayAddBatch(pArray, pData, 1, true);
1132
}
1133

658✔
1134
FORCE_INLINE void* benchArrayPushNoFree(BArray* pArray, void* pData) {
658✔
1135
    return benchArrayAddBatch(pArray, pData, 1, false);
1136
}
1137

1138

2,224✔
1139
void* benchArrayDestroy(BArray* pArray) {
2,224✔
1140
    if (pArray) {
1,653✔
1141
        tmfree(pArray->pData);
1142
        tmfree(pArray);
1143
    }
2,224✔
1144
    return NULL;
1145
}
1146

313✔
1147
void benchArrayClear(BArray* pArray) {
313✔
1148
    if (pArray == NULL) return;
313✔
1149
    pArray->size = 0;
1150
}
1151

32,540,727✔
1152
void* benchArrayGet(const BArray* pArray, size_t index) {
32,540,727✔
1153
    if (index >= pArray->size) {
×
1154
        errorPrint("benchArrayGet index(%zu) greater than BArray size(%zu)\n",
1155
                   index, pArray->size);
×
1156
        exit(EXIT_FAILURE);
1157
    }
32,540,727✔
1158
    return BARRAY_GET_ELEM(pArray, index);
1159
}
1160

7✔
1161
bool searchBArray(BArray *pArray, const char *field_name, int32_t name_len, uint8_t field_type) {
7✔
1162
    if (pArray == NULL || field_name == NULL) {
×
1163
        return false;
1164
    }
11✔
1165
    for (int i = 0; i < pArray->size; i++) {
11✔
1166
        Field *field = benchArrayGet(pArray, i);
11✔
1167
        if (strlen(field->name) == name_len && strncasecmp(field->name, field_name, name_len) == 0) {
7✔
1168
            if (field->type == field_type) {
7✔
1169
                return true;
1170
            }
×
1171
            return false;
1172
        }
1173
    }
×
1174
    return false;
1175
}
1176

1177
//
1178
// malloc a new and copy data from array
1179
// return value must call benchArrayDestroy to free
1180
//
75✔
1181
BArray * copyBArray(BArray *pArray) {
75✔
1182
    BArray * pNew = benchArrayInit(pArray->size, pArray->elemSize);
75✔
1183
    benchArrayAddBatch(pNew, pArray->pData, pArray->size, false);
75✔
1184
    return pNew;
1185
}
1186

1187
//
1188
//  ---------------- others ------------------------
1189
//
1190

1191
#ifdef LINUX
180✔
1192
int32_t bsem_wait(sem_t* sem) {
180✔
1193
    int ret = 0;
1194
    do {
180✔
1195
        ret = sem_wait(sem);
1✔
1196
    } while (ret != 0 && errno  == EINTR);
1✔
1197
    return ret;
1198
}
1199

180✔
1200
void benchSetSignal(int32_t signum, ToolsSignalHandler sigfp) {
1201
    struct sigaction act;
180✔
1202
    memset(&act, 0, sizeof(act));
180✔
1203
    act.sa_flags = SA_SIGINFO | SA_RESTART;
180✔
1204
    act.sa_sigaction = (void (*)(int, siginfo_t *, void *)) sigfp;
180✔
1205
    sigaction(signum, &act, NULL);
180✔
1206
}
1207
#endif
1208

254✔
1209
int convertServAddr(int iface, bool tcp, int protocol) {
254✔
1210
    if (tcp
3✔
1211
            && iface == SML_REST_IFACE
3✔
1212
            && protocol == TSDB_SML_TELNET_PROTOCOL) {
1213
        // telnet_tcp_port        
3✔
1214
        if (convertHostToServAddr(g_arguments->host,
3✔
1215
                    g_arguments->telnet_tcp_port,
3✔
1216
                    &(g_arguments->serv_addr))) {
×
1217
            errorPrint("%s\n", "convert host to server address");
×
1218
            return -1;
1219
        }
3✔
1220
        infoPrint("convertServAddr host=%s telnet_tcp_port:%d to serv_addr=%p iface=%d \n", 
1221
                g_arguments->host, g_arguments->telnet_tcp_port, &g_arguments->serv_addr, iface);
1222
    } else {
251✔
1223
        int port = g_arguments->port_inputted ? g_arguments->port:DEFAULT_REST_PORT;
251✔
1224
        if (convertHostToServAddr(g_arguments->host,
1225
                                    port,
251✔
1226
                    &(g_arguments->serv_addr))) {
×
1227
            errorPrint("%s\n", "convert host to server address");
×
1228
            return -1;
1229
        }
251✔
1230
        infoPrint("convertServAddr host=%s port:%d to serv_addr=%p iface=%d \n", 
1231
                g_arguments->host, port, &g_arguments->serv_addr, iface);
1232
    }
254✔
1233
    return 0;
1234
}
1235

×
1236
static void errorPrintSocketMsg(char *msg, int result) {
1237
#ifdef WINDOWS
1238
    errorPrint("%s: %d\n", msg, WSAGetLastError());
1239
#else
×
1240
    errorPrint("%s: %d\n", msg, result);
1241
#endif
×
1242
}
1243

96✔
1244
int createSockFd() {
1245
#ifdef WINDOWS
1246
    WSADATA wsaData;
1247
    WSAStartup(MAKEWORD(2, 2), &wsaData);
1248
    SOCKET sockfd;
1249
#else
1250
    int sockfd;
1251
#endif
96✔
1252
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
96✔
1253
    if (sockfd < 0) {
×
1254
        errorPrintSocketMsg("Could not create socket : ", sockfd);
×
1255
        return -1;
1256
    }
1257

96✔
1258
    int retConn = connect(
96✔
1259
            sockfd, (struct sockaddr *)&(g_arguments->serv_addr),
1260
            sizeof(struct sockaddr));
96✔
1261
    infoPrint("createSockFd call connect serv_addr=%p retConn=%d\n", &g_arguments->serv_addr, retConn);
96✔
1262
    if (retConn < 0) {
×
1263
        errorPrint("%s\n", "failed to connect");
1264
#ifdef WINDOWS
1265
        closesocket(sockfd);
1266
        WSACleanup();
1267
#else
×
1268
        close(sockfd);
1269
#endif
×
1270
        return -1;
1271
    }
96✔
1272
    return sockfd;
1273
}
1274

42✔
1275
static void closeSockFd(int sockfd) {
1276
#ifdef WINDOWS
1277
    closesocket(sockfd);
1278
    WSACleanup();
1279
#else
42✔
1280
    close(sockfd);
1281
#endif
42✔
1282
}
1283

42✔
1284
void destroySockFd(int sockfd) {
1285
    // check valid
42✔
1286
    if (sockfd < 0) {
×
1287
        return;
1288
    }
1289

1290
    // shutdown the connection since no more data will be sent
1291
    int result;
42✔
1292
    result = shutdown(sockfd, SHUT_WR);
42✔
1293
    if (SOCKET_ERROR == result) {
×
1294
        errorPrintSocketMsg("Socket shutdown failed with error: ", result);
×
1295
        closeSockFd(sockfd);
×
1296
        return;
1297
    }
1298
    // Receive until the peer closes the connection
1299
    do {
78✔
1300
        int recvbuflen = LARGE_BUFF_LEN;
1301
        char recvbuf[LARGE_BUFF_LEN];
78✔
1302
        result = recv(sockfd, recvbuf, recvbuflen, 0);
78✔
1303
        if ( result > 0 ) {
36✔
1304
            debugPrint("Socket bytes received: %d\n", result);
42✔
1305
        } else if (result == 0) {
42✔
1306
            infoPrint("Connection closed with result %d\n", result);
1307
        } else {
×
1308
            errorPrintSocketMsg("Socket recv failed with error: ", result);
1309
        }
78✔
1310
    } while (result > 0);
1311

42✔
1312
    closeSockFd(sockfd);
1313
}
1314

3✔
1315
FORCE_INLINE void printErrCmdCodeStr(char *cmd, int32_t code, TAOS_RES *res) {    
1316
    char buff[512];
16✔
1317
    char *msg = cmd;
16✔
1318
    if (strlen(cmd) >= sizeof(buff)) {
8✔
1319
        memcpy(buff, cmd, 500);
8✔
1320
        buff[500] = 0;
8✔
1321
        strcat(buff, "...");
8✔
1322
        msg = buff;
1323
    }
16✔
1324
    errorPrint("failed to run error code: 0x%08x, reason: %s command %s\n",
1325
               code, taos_errstr(res), msg);
16✔
1326
    taos_free_result(res);
13✔
1327
}
1328

×
1329
int32_t benchGetTotalMemory(int64_t *totalKB) {
1330
#ifdef WINDOWS
1331
  MEMORYSTATUSEX memsStat;
1332
  memsStat.dwLength = sizeof(memsStat);
1333
  if (!GlobalMemoryStatusEx(&memsStat)) {
1334
    return -1;
1335
  }
1336

1337
  *totalKB = memsStat.ullTotalPhys / 1024;
1338
  return 0;
1339
#elif defined(_TD_DARWIN_64)
1340
  *totalKB = 0;
1341
  return 0;
1342
#else
×
1343
  int64_t tsPageSizeKB = sysconf(_SC_PAGESIZE) / 1024;
×
1344
  *totalKB = (int64_t)(sysconf(_SC_PHYS_PAGES) * tsPageSizeKB);
×
1345
  return 0;
1346
#endif
1347
}
1348

1349
// geneate question mark string , using insert into ... values(?,?,?...)
1350
// return value must call tmfree to free memory
209✔
1351
char* genQMark( int32_t QCnt) {
209✔
1352
    char * buf = benchCalloc(4, QCnt, false);
1,815✔
1353
    for (int32_t i = 0; i < QCnt; i++) {
1,606✔
1354
        if (i == 0)
200✔
1355
            strcat(buf, "?");
1356
        else
1,406✔
1357
            strcat(buf, ",?");
1358
    }
209✔
1359
    return buf;
1360
}
1361

1362
//
1363
//  STMT2  
1364
//
1365

1366
// create
133✔
1367
TAOS_STMT2_BINDV* createBindV(int32_t capacity, int32_t tagCnt, int32_t colCnt) {
1368
    // calc total size
133✔
1369
    int32_t tableSize = sizeof(char *) + sizeof(TAOS_STMT2_BIND *) + sizeof(TAOS_STMT2_BIND *) + 
133✔
1370
                        sizeof(TAOS_STMT2_BIND) * tagCnt + sizeof(TAOS_STMT2_BIND) * colCnt;
133✔
1371
    int32_t size = sizeof(TAOS_STMT2_BINDV) + tableSize * capacity;
133✔
1372
    TAOS_STMT2_BINDV *bindv = benchCalloc(1, size, false);
133✔
1373
    resetBindV(bindv, capacity, tagCnt, colCnt);
1374

133✔
1375
    return bindv;
1376
}
1377

1378
// reset tags and cols poitner
133✔
1379
void resetBindV(TAOS_STMT2_BINDV *bindv, int32_t capacity, int32_t tagCnt, int32_t colCnt) {
133✔
1380
    unsigned char *p = (unsigned char *)bindv;
1381
    // tbnames
133✔
1382
    p += sizeof(TAOS_STMT2_BINDV); // skip BINDV
133✔
1383
    bindv->tbnames = (char **)p;
1384
    // tags
133✔
1385
    if(tagCnt == 0 ) {
133✔
1386
        bindv->tags = NULL;
1387
    } else {
×
1388
        p += sizeof(char *) * capacity; // skip tbnames
×
1389
        bindv->tags = (TAOS_STMT2_BIND **)p;
1390
    }
1391
    // bind_cols
133✔
1392
    p += sizeof(TAOS_STMT2_BIND *) * capacity; // skip tags
133✔
1393
    bindv->bind_cols = (TAOS_STMT2_BIND **)p;
133✔
1394
    p += sizeof(TAOS_STMT2_BIND *) * capacity; // skip cols
1395

1396
    int32_t i;
1397
    // tags body
133✔
1398
    if (tagCnt > 0) {
×
1399
        for (i = 0; i < capacity; i++) {
×
1400
            bindv->tags[i] = (TAOS_STMT2_BIND *)p;
×
1401
            p += sizeof(TAOS_STMT2_BIND) * tagCnt; // skip tag bodys
1402
        }
1403
    }
1404
    // bind_cols body
266✔
1405
    for (i = 0; i < capacity; i++) {
133✔
1406
        bindv->bind_cols[i] = (TAOS_STMT2_BIND*)p;
133✔
1407
        p += sizeof(TAOS_STMT2_BIND) * colCnt; // skip cols bodys
1408
    }
133✔
1409
}
1410

1411
// clear bindv
×
1412
void clearBindV(TAOS_STMT2_BINDV *bindv) {
×
1413
    if (bindv == NULL)
×
1414
        return ;
×
1415
    for(int32_t i = 0; i < bindv->count; i++) {
×
1416
        bindv->tags[i]      = NULL;
×
1417
        bindv->bind_cols[i] = NULL;
1418
    }
×
1419
    bindv->count = 0;
1420
}
1421

1422
// free
201✔
1423
void freeBindV(TAOS_STMT2_BINDV *bindv) {
1424
    tmfree(bindv);
201✔
1425
}
1426

1427
//
1428
//   debug show 
1429
//
1430

×
1431
void showBind(TAOS_STMT2_BIND* bind) {
1432
    // loop each column
×
1433
    int32_t pos = 0;
×
1434
    char* buff  = bind->buffer;
×
1435
    for(int32_t n=0; n<bind->num; n++) {
×
1436
        switch (bind->buffer_type) {
×
1437
        case TSDB_DATA_TYPE_TIMESTAMP:
×
1438
            debugPrint("   n=%d value=%" PRId64 "\n", n, *(int64_t *)(buff + pos));
×
1439
            pos += sizeof(int64_t);
×
1440
            break;
×
1441
        case TSDB_DATA_TYPE_FLOAT:
×
1442
            debugPrint("   n=%d value=%f\n", n, *(float *)(buff + pos));
×
1443
            pos += sizeof(float);
×
1444
            break;
×
1445
        case TSDB_DATA_TYPE_INT:
×
1446
            debugPrint("   n=%d value=%d\n", n, *(int32_t *)(buff + pos));
×
1447
            pos += sizeof(int32_t);
×
1448
            break;
×
1449
        default:
×
1450
            break;
1451
        } 
1452
    }
1453

×
1454
}
1455

×
1456
void showTableBinds(char* label, TAOS_STMT2_BIND* binds, int32_t cnt) {
×
1457
    for (int32_t j = 0; j < cnt; j++) {
×
1458
        if(binds == NULL) {
×
1459
            debugPrint("  %d %s is NULL \n", j, label);
1460
        } else {
×
1461
            debugPrint("  %d %s type=%d buffer=%p \n", j, label, binds[j].buffer_type, binds[j].buffer);
×
1462
            showBind(&binds[j]);
1463
        }
1464
    }
×
1465
}
1466

1467
// show bindv
×
1468
void showBindV(TAOS_STMT2_BINDV *bindv, BArray *tags, BArray *cols) {
1469
    // num and base info
×
1470
    debugPrint("show bindv table count=%d names=%p tags=%p bind_cols=%p\n", 
1471
                bindv->count, bindv->tbnames, bindv->tags, bindv->bind_cols);
1472
    
×
1473
    for(int32_t i=0; i< bindv->count; i++) {
×
1474
        debugPrint(" show bindv table index=%d name=%s \n", i, bindv->tbnames[i]);
×
1475
        if(bindv->tags)
×
1476
            showTableBinds("tag",    bindv->tags[i],      tags->size);
×
1477
        if(bindv->bind_cols)    
×
1478
            showTableBinds("column", bindv->bind_cols[i], cols->size + 1);
1479
    }
×
1480
}
1481

1482
// engine util/src/thashutil.c
1483
uint32_t MurmurHash3_32(const char *key, uint32_t len);
1484
// get group index about dbname.tbname
294✔
1485
int32_t calcGroupIndex(char* dbName, char* tbName, int32_t groupCnt) {
1486
    // check valid
294✔
1487
    if (dbName == NULL || tbName == NULL) {
×
1488
        return -1;
1489
    }
1490
    char key[1024];
294✔
1491
    snprintf(key, sizeof(key), "1.%s.%s", dbName, tbName);
294✔
1492
    uint32_t hash = MurmurHash3_32(key, strlen(key));
294✔
1493
    uint32_t step = UINT32_MAX / groupCnt;
686✔
1494
    for (int32_t i = 0; i < groupCnt; i++) {
686✔
1495
        if (hash < (i + 1) * step)
1496
        {
294✔
1497
            return i;
1498
        }
1499
    }
×
1500
    return groupCnt - 1;
1501
}
1502

1503
// windows no export MurmurHash3_32 function from engine
1504
#ifdef WINDOWS
1505
// define
1506
#define ROTL32(x, r) ((x) << (r) | (x) >> (32u - (r)))
1507
#define FMIX32(h)      \
1508
  do {                 \
1509
    (h) ^= (h) >> 16;  \
1510
    (h) *= 0x85ebca6b; \
1511
    (h) ^= (h) >> 13;  \
1512
    (h) *= 0xc2b2ae35; \
1513
    (h) ^= (h) >> 16;  \
1514
  } while (0)
1515

1516
// impl MurmurHash3_32
1517
uint32_t MurmurHash3_32(const char *key, uint32_t len) {
1518
  const uint8_t *data = (const uint8_t *)key;
1519
  const int32_t  nblocks = len >> 2u;
1520

1521
  uint32_t h1 = 0x12345678;
1522

1523
  const uint32_t c1 = 0xcc9e2d51;
1524
  const uint32_t c2 = 0x1b873593;
1525

1526
  const uint32_t *blocks = (const uint32_t *)(data + nblocks * 4);
1527

1528
  for (int32_t i = -nblocks; i; i++) {
1529
    uint32_t k1 = blocks[i];
1530

1531
    k1 *= c1;
1532
    k1 = ROTL32(k1, 15u);
1533
    k1 *= c2;
1534

1535
    h1 ^= k1;
1536
    h1 = ROTL32(h1, 13u);
1537
    h1 = h1 * 5 + 0xe6546b64;
1538
  }
1539

1540
  const uint8_t *tail = (data + nblocks * 4);
1541

1542
  uint32_t k1 = 0;
1543

1544
  switch (len & 3u) {
1545
    case 3:
1546
      k1 ^= tail[2] << 16;
1547
    case 2:
1548
      k1 ^= tail[1] << 8;
1549
    case 1:
1550
      k1 ^= tail[0];
1551
      k1 *= c1;
1552
      k1 = ROTL32(k1, 15u);
1553
      k1 *= c2;
1554
      h1 ^= k1;
1555
  };
1556

1557
  h1 ^= len;
1558

1559
  FMIX32(h1);
1560

1561
  return h1;
1562
}
1563
#endif
1564

1565

1566
//
1567
// ---------------- benchQuery util ----------------------
1568
//
1569

1570
// init conn
35✔
1571
int32_t initQueryConn(qThreadInfo * pThreadInfo, int iface) {
1572
    // create conn
35✔
NEW
1573
    if (iface == REST_IFACE) {
×
NEW
1574
        int sockfd = createSockFd();
×
NEW
1575
        if (sockfd < 0) {
×
1576
            return -1;
NEW
1577
        }
×
1578
        pThreadInfo->sockfd = sockfd;
1579
    } else {
35✔
1580
        pThreadInfo->conn = initBenchConn();
35✔
NEW
1581
        if (pThreadInfo->conn == NULL) {
×
1582
            return -1;
1583
        }
1584
    }
1585

35✔
1586
    return 0;
1587
}
1588

1589
// close conn
35✔
1590
void closeQueryConn(qThreadInfo * pThreadInfo, int iface) {
35✔
1591
    if (iface == REST_IFACE) {
1592
#ifdef WINDOWS
1593
        closesocket(pThreadInfo->sockfd);
1594
        WSACleanup();
NEW
1595
#else
×
1596
        close(pThreadInfo->sockfd);
1597
#endif
1598
    } else {
35✔
1599
        closeBenchConn(pThreadInfo->conn);
35✔
1600
        pThreadInfo->conn = NULL;
1601
    }
35✔
1602
}
1603

1604

1605
// free g_queryInfo.specailQueryInfo memory , can re-call
9✔
1606
void freeSpecialQueryInfo() {
1607
    // can re-call
9✔
NEW
1608
    if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
×
1609
        return;
1610
    }
1611

1612
    // loop free each item memory
27✔
1613
    for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
18✔
1614
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
18✔
1615
        tmfree(sql->command);
18✔
1616
        tmfree(sql->delay_list);
1617
    }
1618

1619
    // free Array
9✔
1620
    benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
9✔
1621
    g_queryInfo.specifiedQueryInfo.sqls = NULL;
1622
}
1623

1624

1625
#define KILLID_LEN  64
NEW
1626

×
NEW
1627
void *queryKiller(void *arg) {
×
NEW
1628
    char host[MAX_HOSTNAME_LEN] = {0};
×
1629
    tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN);
NEW
1630

×
NEW
1631
    while (true) {
×
NEW
1632
        TAOS *taos = taos_connect(g_arguments->host, g_arguments->user,
×
NEW
1633
                g_arguments->password, NULL, g_arguments->port);
×
NEW
1634
        if (NULL == taos) {
×
1635
            errorPrint("Slow query killer thread "
1636
                    "failed to connect to the server %s\n",
NEW
1637
                    g_arguments->host);
×
1638
            return NULL;
1639
        }
NEW
1640

×
1641
        char command[TSDB_MAX_ALLOWED_SQL_LEN] =
NEW
1642
            "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries";
×
NEW
1643
        TAOS_RES *res = taos_query(taos, command);
×
NEW
1644
        int32_t code = taos_errno(res);
×
1645
        if (code) {
1646
            printErrCmdCodeStr(command, code, res);
1647
        }
NEW
1648

×
NEW
1649
        TAOS_ROW row = NULL;
×
NEW
1650
        while ((row = taos_fetch_row(res)) != NULL) {
×
NEW
1651
            int32_t *lengths = taos_fetch_lengths(res);
×
NEW
1652
            if (lengths[0] <= 0) {
×
1653
                infoPrint("No valid query found by %s\n", command);
NEW
1654
            } else {
×
1655
                int64_t execUSec = *(int64_t*)row[1];
NEW
1656

×
NEW
1657
                if (execUSec > g_queryInfo.killQueryThreshold * 1000000) {
×
NEW
1658
                    char sql[SHORT_1K_SQL_BUFF_LEN] = {0};
×
1659
                    tstrncpy(sql, (char*)row[2],
1660
                             min(strlen((char*)row[2])+1,
1661
                                 SHORT_1K_SQL_BUFF_LEN));
NEW
1662

×
NEW
1663
                    char killId[KILLID_LEN] = {0};
×
1664
                    tstrncpy(killId, (char*)row[0],
NEW
1665
                            min(strlen((char*)row[0])+1, KILLID_LEN));
×
NEW
1666
                    char killCommand[KILLID_LEN + 32] = {0};
×
NEW
1667
                    snprintf(killCommand, sizeof(killCommand), "KILL QUERY '%s'", killId);
×
NEW
1668
                    TAOS_RES *resKill = taos_query(taos, killCommand);
×
NEW
1669
                    int32_t codeKill = taos_errno(resKill);
×
1670
                    if (codeKill) {
1671
                        printErrCmdCodeStr(killCommand, codeKill, resKill);
NEW
1672
                    } else {
×
1673
                        infoPrint("%s succeed, sql: %s killed!\n",
NEW
1674
                                  killCommand, sql);
×
1675
                        taos_free_result(resKill);
1676
                    }
1677
                }
1678
            }
1679
        }
NEW
1680

×
NEW
1681
        taos_free_result(res);
×
NEW
1682
        taos_close(taos);
×
1683
        toolsMsleep(g_queryInfo.killQueryInterval*1000);
1684
    }
1685

1686
    return NULL;
1687
}
1688

NEW
1689
// kill show
×
NEW
1690
int killSlowQuery() {
×
NEW
1691
    pthread_t pidKiller = {0};
×
NEW
1692
    int32_t ret = pthread_create(&pidKiller, NULL, queryKiller, NULL);
×
NEW
1693
    if (ret != 0) {
×
NEW
1694
        errorPrint("pthread_create failed create queryKiller thread. error code =%d \n", ret);
×
1695
        return -1;
NEW
1696
    }
×
NEW
1697
    pthread_join(pidKiller, NULL);
×
NEW
1698
    toolsMsleep(1000);
×
1699
    return 0;
1700
}
1701

1702
// fetch super table child name from server
3✔
1703
int fetchChildTableName(char *dbName, char *stbName) {
3✔
1704
    SBenchConn* conn = initBenchConn();
3✔
NEW
1705
    if (conn == NULL) {
×
1706
        return -1;
1707
    }
1708

1709
    // get child count
3✔
1710
    char  cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
3✔
1711
    if (3 == g_majorVersionOfClient) {
3✔
1712
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
1713
                "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM `%s`.`%s`)",
1714
                dbName, stbName);
NEW
1715
    } else {
×
1716
        snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
1717
                    "SELECT COUNT(TBNAME) FROM `%s`.`%s`",
1718
                dbName, stbName);
1719
    }
3✔
1720
    TAOS_RES *res = taos_query(conn->taos, cmd);
3✔
1721
    int32_t   code = taos_errno(res);
3✔
1722
    if (code) {
NEW
1723
        printErrCmdCodeStr(cmd, code, res);
×
NEW
1724
        closeBenchConn(conn);
×
1725
        return -1;
1726
    }
1727

3✔
1728
    TAOS_ROW    row = NULL;
3✔
1729
    int         num_fields = taos_num_fields(res);
3✔
1730
    TAOS_FIELD *fields = taos_fetch_fields(res);
6✔
1731
    while ((row = taos_fetch_row(res)) != NULL) {
3✔
NEW
1732
        if (0 == strlen((char *)(row[0]))) {
×
NEW
1733
            errorPrint("stable %s have no child table\n", stbName);
×
NEW
1734
            taos_free_result(res);
×
NEW
1735
            closeBenchConn(conn);
×
1736
            return -1;
1737
        }
3✔
1738
        char temp[256] = {0};
3✔
1739
        taos_print_row(temp, row, fields, num_fields);
1740

1741
        // set child table count
3✔
1742
        g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp);
1743
    }
3✔
1744
    infoPrint("%s's childTblCount: %" PRId64 "\n", stbName, g_queryInfo.superQueryInfo.childTblCount);
3✔
1745
    taos_free_result(res);
1746

1747
    // malloc memory with child table count
3✔
1748
    g_queryInfo.superQueryInfo.childTblName =
3✔
1749
        benchCalloc(g_queryInfo.superQueryInfo.childTblCount,
1750
                sizeof(char *), false);
1751
    // fetch child table name
3✔
1752
    if (getAllChildNameOfSuperTable(
1753
                conn->taos, dbName, stbName,
1754
                g_queryInfo.superQueryInfo.childTblName,
1755
                g_queryInfo.superQueryInfo.childTblCount)) {
NEW
1756
        // faild            
×
NEW
1757
        tmfree(g_queryInfo.superQueryInfo.childTblName);
×
NEW
1758
        closeBenchConn(conn);
×
1759
        return -1;
1760
    }
3✔
1761
    closeBenchConn(conn);
1762

1763
    // succ
3✔
1764
    return 0;
1765
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc