• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 12119116745

02 Dec 2024 12:14PM UTC coverage: 75.196% (-0.2%) from 75.413%
12119116745

push

github

web-flow
Merge pull request #826 from taosdata/fix/TD-33101-3.0

fix: insert_mode empty and bind_vgroups on cloud no privilege

1 of 1 new or added line in 1 file covered. (100.0%)

76 existing lines in 5 files now uncovered.

12066 of 16046 relevant lines covered (75.2%)

336464.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.31
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
extern int g_majorVersionOfClient;
17

18
int selectAndGetResult(threadInfo *pThreadInfo, char *command) {
6,269✔
19
    int ret = 0;
6,269✔
20

21
    if (g_arguments->terminate) {
6,269✔
22
        return -1;
×
23
    }
24
    uint32_t threadID = pThreadInfo->threadID;
6,269✔
25
    char dbName[TSDB_DB_NAME_LEN] = {0};
6,269✔
26
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
6,269✔
27

28
    if (g_queryInfo.iface == REST_IFACE) {
6,269✔
29
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
36✔
30
                                   0, g_arguments->port, false,
36✔
31
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
36✔
32
        if (0 != retCode) {
36✔
33
            errorPrint("====restful return fail, threadID[%u]\n",
×
34
                       threadID);
35
            ret = -1;
×
36
        }
37
    } else {
38
        TAOS *taos = pThreadInfo->conn->taos;
6,233✔
39
        if (taos_select_db(taos, g_queryInfo.dbName)) {
6,233✔
40
            errorPrint("thread[%u]: failed to select database(%s)\n",
×
41
                threadID, dbName);
42
            ret = -2;
×
43
        } else {
44
            int64_t rows  = 0;
6,234✔
45
            TAOS_RES *res = taos_query(taos, command);
6,234✔
46
            int code = taos_errno(res);
6,235✔
47
            if (res == NULL || code) {
6,234✔
48
                if (YES_IF_FAILED == g_arguments->continueIfFail) {
15✔
49
                    warnPrint("failed to execute sql:%s, "
3✔
50
                              "code: 0x%08x, reason:%s\n",
51
                               command, code, taos_errstr(res));
52
                } else {
53
                    errorPrint("failed to execute sql:%s, "
12✔
54
                               "code: 0x%08x, reason:%s\n",
55
                               command, code, taos_errstr(res));
56
                    ret = -1;
12✔
57
                }
58
            } else {
59
                //if (strlen(pThreadInfo->filePath) > 0) {
60
                    rows = fetchResult(res, pThreadInfo);
6,219✔
61
                //}
62
            }
63
            taos_free_result(res);
6,235✔
64
            debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
6,235✔
65
        }
66
    }
67
    return ret;
6,270✔
68
}
69

70
static void *mixedQuery(void *sarg) {
2✔
71
    queryThreadInfo *pThreadInfo = (queryThreadInfo*)sarg;
2✔
72
#ifdef LINUX
73
    prctl(PR_SET_NAME, "mixedQuery");
2✔
74
#endif
75
    int64_t lastPrintTs = toolsGetTimestampMs();
2✔
76
    int64_t st;
77
    int64_t et;
78
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
2✔
79
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
4✔
80
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
2✔
81
        for (int j = 0; j < queryTimes; ++j) {
6✔
82
            if (g_arguments->terminate) {
4✔
83
                return NULL;
×
84
            }
85
            if (g_queryInfo.reset_query_cache) {
4✔
86
                if (queryDbExecCall(pThreadInfo->conn,
×
87
                                    "RESET QUERY CACHE")) {
88
                    errorPrint("%s() LN%d, reset query cache failed\n",
×
89
                               __func__, __LINE__);
90
                    return NULL;
×
91
                }
92
            }
93
            st = toolsGetTimestampUs();
4✔
94
            if (g_queryInfo.iface == REST_IFACE) {
4✔
95
                int retCode = postProceSql(sql->command, g_queryInfo.dbName,
×
96
                                           0, g_queryInfo.iface, 0,
×
97
                                           g_arguments->port,
×
98
                                           false, pThreadInfo->sockfd, "");
99
                if (retCode) {
×
100
                    errorPrint("thread[%d]: restful query <%s> failed\n",
×
101
                            pThreadInfo->threadId, sql->command);
102
                    continue;
×
103
                }
104
            } else {
105
                if (g_queryInfo.dbName != NULL) {
4✔
106
                    if (taos_select_db(pThreadInfo->conn->taos,
4✔
107
                                       g_queryInfo.dbName)) {
4✔
108
                        errorPrint("thread[%d]: failed to "
×
109
                                   "select database(%s)\n",
110
                                   pThreadInfo->threadId,
111
                                   g_queryInfo.dbName);
112
                        return NULL;
×
113
                    }
114
                }
115
                TAOS_RES *res = taos_query(pThreadInfo->conn->taos,
4✔
116
                                           sql->command);
4✔
117
                if (res == NULL || taos_errno(res) != 0) {
4✔
118
                    if (YES_IF_FAILED == g_arguments->continueIfFail) {
×
119
                        warnPrint(
×
120
                                "thread[%d]: failed to execute sql :%s, "
121
                                "code: 0x%x, reason: %s\n",
122
                                pThreadInfo->threadId,
123
                                sql->command,
124
                                taos_errno(res), taos_errstr(res));
125
                    } else {
126
                        errorPrint(
×
127
                                "thread[%d]: failed to execute sql :%s, "
128
                                "code: 0x%x, reason: %s\n",
129
                                pThreadInfo->threadId,
130
                                sql->command,
131
                                taos_errno(res), taos_errstr(res));
132
                        if (TSDB_CODE_RPC_NETWORK_UNAVAIL ==
×
133
                                taos_errno(res)) {
×
134
                            taos_free_result(res);
×
135
                            return NULL;
×
136
                        }
137
                    }
138
                    taos_free_result(res);
×
139
                    continue;
×
140
                }
141
                taos_free_result(res);
4✔
142
            }
143
            et = toolsGetTimestampUs();
4✔
144
            int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
4✔
145
            *delay = et - st;
4✔
146
            debugPrint("%s() LN%d, delay: %"PRId64"\n",
4✔
147
                       __func__, __LINE__, *delay);
148

149
            pThreadInfo->total_delay += (et - st);
4✔
150
            if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
4✔
151
                tmfree(delay);
×
152
            }
153
            int64_t currentPrintTs = toolsGetTimestampMs();
4✔
154
            if (currentPrintTs - lastPrintTs > 10 * 1000) {
4✔
155
                infoPrint("thread[%d] has currently complete query %d times\n",
×
156
                        pThreadInfo->threadId,
157
                        (int)pThreadInfo->query_delay_list->size);
158
                lastPrintTs = currentPrintTs;
×
159
            }
160
        }
161
    }
162
    return NULL;
2✔
163
}
164

165
static void *specifiedTableQuery(void *sarg) {
107✔
166
    threadInfo *pThreadInfo = (threadInfo *)sarg;
107✔
167
#ifdef LINUX
168
    prctl(PR_SET_NAME, "specTableQuery");
107✔
169
#endif
170
    uint64_t st = 0;
107✔
171
    uint64_t et = 0;
107✔
172
    uint64_t minDelay = UINT64_MAX;
107✔
173
    uint64_t maxDelay = 0;
107✔
174
    uint64_t totalDelay = 0;
107✔
175
    int32_t  index = 0;
107✔
176

177
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
107✔
178
    pThreadInfo->query_delay_list = benchCalloc(queryTimes,
107✔
179
            sizeof(uint64_t), false);
180
    uint64_t  lastPrintTime = toolsGetTimestampMs();
107✔
181
    uint64_t  startTs = toolsGetTimestampMs();
106✔
182

183
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls,
107✔
184
            pThreadInfo->querySeq);
185

186
    if (sql->result[0] != '\0') {
107✔
187
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
102✔
188
                sql->result, pThreadInfo->threadID);
102✔
189
    }
190

191
    while (index < queryTimes) {
298✔
192
        // check cancel
193
        if (g_arguments->terminate) {
191✔
194
            return NULL;
×
195
        }
196

197
        if (g_queryInfo.specifiedQueryInfo.queryInterval &&
191✔
198
            (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
190✔
199
            toolsMsleep((int32_t)(
106✔
200
                        g_queryInfo.specifiedQueryInfo.queryInterval*1000
106✔
201
                        - (et - st)));  // ms
106✔
202
        }
203
        if (g_queryInfo.reset_query_cache) {
191✔
204
            // execute sql 
205
            if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE")) {
6✔
206
                errorPrint("%s() LN%d, reset query cache failed\n",
×
207
                           __func__, __LINE__);
208
                return NULL;
×
209
            }
210
        }
211

212
        st = toolsGetTimestampUs();
191✔
213
        int ret = selectAndGetResult(pThreadInfo, sql->command);
191✔
214
        if (ret) {
191✔
215
            g_fail = true;
12✔
216
        }
217

218
        et = toolsGetTimestampUs();
191✔
219
        int64_t delay = et - st;
191✔
220
        debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, delay);
191✔
221

222
        if (ret == 0) {
191✔
223
            pThreadInfo->query_delay_list[index] = delay;
179✔
224
            pThreadInfo->totalQueried++;
179✔
225
        }
226
        index++;
191✔
227
        totalDelay += delay;
191✔
228
        if (delay > maxDelay) {
191✔
229
            maxDelay = delay;
138✔
230
        }
231
        if (delay < minDelay) {
191✔
232
            minDelay = delay;
160✔
233
        }
234

235
        uint64_t currentPrintTime = toolsGetTimestampMs();
191✔
236
        uint64_t endTs = toolsGetTimestampMs();
191✔
237

238
        if ((ret == 0) && (currentPrintTime - lastPrintTime > 30 * 1000)) {
191✔
239
            infoPrint(
×
240
                    "thread[%d] has currently completed queries: %" PRIu64
241
                    ", QPS: %10.6f\n",
242
                    pThreadInfo->threadID, pThreadInfo->totalQueried,
243
                    (double)(pThreadInfo->totalQueried /
244
                        ((endTs - startTs) / 1000.0)));
245
            lastPrintTime = currentPrintTime;
×
246
        }
247

248
        if (-2 == ret) {
191✔
249
            toolsMsleep(1000);
×
250
            return NULL;
×
251
        }
252
    }
253
    qsort(pThreadInfo->query_delay_list, queryTimes,
107✔
254
            sizeof(uint64_t), compare);
255
    pThreadInfo->avg_delay = (double)totalDelay / queryTimes;
107✔
256
    return NULL;
107✔
257
}
258

259
static void *superTableQuery(void *sarg) {
38✔
260
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
38✔
261
    threadInfo *pThreadInfo = (threadInfo *)sarg;
38✔
262
#ifdef LINUX
263
    prctl(PR_SET_NAME, "superTableQuery");
38✔
264
#endif
265

266
    uint64_t st = 0;
38✔
267
    uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000;
38✔
268

269
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
38✔
270
    uint64_t startTs = toolsGetTimestampMs();
38✔
271

272
    uint64_t lastPrintTime = toolsGetTimestampMs();
38✔
273
    while (queryTimes--) {
122✔
274
        if (g_queryInfo.superQueryInfo.queryInterval
84✔
275
            && ((et - st) <
83✔
276
                (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000)) {
83✔
277
            toolsMsleep((int32_t)
46✔
278
                        (g_queryInfo.superQueryInfo.queryInterval*1000
46✔
279
                        - (et - st)));
46✔
280
        }
281

282
        st = toolsGetTimestampMs();
84✔
283
        for (int i = (int)pThreadInfo->start_table_from;
84✔
284
             i <= pThreadInfo->end_table_to; i++) {
218✔
285
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
6,208✔
286
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
6,074✔
287
                replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr,
6,074✔
288
                                    i);
289
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
6,073✔
290
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
6,073✔
291
                            g_queryInfo.superQueryInfo.result[j],
6,073✔
292
                            pThreadInfo->threadID);
293
                }
294
                if (selectAndGetResult(pThreadInfo, sqlstr)) {
6,073✔
295
                    g_fail = true;
×
296
                }
297

298
                pThreadInfo->totalQueried++;
6,074✔
299

300
                int64_t currentPrintTime = toolsGetTimestampMs();
6,074✔
301
                int64_t endTs = toolsGetTimestampMs();
6,074✔
302
                if (currentPrintTime - lastPrintTime > 30 * 1000) {
6,074✔
303
                    infoPrint(
×
304
                        "thread[%d] has currently completed queries: %" PRIu64
305
                        ", QPS: %10.3f\n",
306
                        pThreadInfo->threadID, pThreadInfo->totalQueried,
307
                        (double)(pThreadInfo->totalQueried /
308
                                 ((endTs - startTs) / 1000.0)));
UNCOV
309
                    lastPrintTime = currentPrintTime;
×
310
                }
311
            }
312
        }
313
        et = toolsGetTimestampMs();
84✔
314
    }
315
    tmfree(sqlstr);
38✔
316
    return NULL;
38✔
317
}
318

319
static int multi_thread_super_table_query(uint16_t iface, char* dbName) {
12✔
320
    int ret = -1;
12✔
321
    pthread_t * pidsOfSub = NULL;
12✔
322
    threadInfo *infosOfSub = NULL;
12✔
323
    //==== create sub threads for query from all sub table of the super table
324
    if ((g_queryInfo.superQueryInfo.sqlCount > 0)
12✔
325
            && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
20✔
326
        pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
327
                                *sizeof(pthread_t),
328
                                false);
329
        infosOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
330
                                 *sizeof(threadInfo), false);
331

332
        int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
10✔
333
        int     threads = g_queryInfo.superQueryInfo.threadCnt;
10✔
334

335
        int64_t a = ntables / threads;
10✔
336
        if (a < 1) {
10✔
337
            threads = (int)ntables;
2✔
338
            a = 1;
2✔
339
        }
340

341
        int64_t b = 0;
10✔
342
        if (threads != 0) {
10✔
343
            b = ntables % threads;
10✔
344
        }
345

346
        uint64_t tableFrom = 0;
10✔
347
        for (int i = 0; i < threads; i++) {
48✔
348
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
349
            pThreadInfo->threadID = i;
38✔
350
            pThreadInfo->start_table_from = tableFrom;
38✔
351
            pThreadInfo->ntables = i < b ? a + 1 : a;
38✔
352
            pThreadInfo->end_table_to =
38✔
353
                    i < b ? tableFrom + a : tableFrom + a - 1;
38✔
354
            tableFrom = pThreadInfo->end_table_to + 1;
38✔
355
            if (iface == REST_IFACE) {
38✔
356
                int sockfd = createSockFd();
5✔
357
                if (sockfd < 0) {
5✔
358
                    goto OVER;
×
359
                }
360
                pThreadInfo->sockfd = sockfd;
5✔
361
            } else {
362
                pThreadInfo->conn = initBenchConn();
33✔
363
                if (pThreadInfo->conn == NULL) {
33✔
364
                    goto OVER;
×
365
                }
366
            }
367
            pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
38✔
368
        }
369
        g_queryInfo.superQueryInfo.threadCnt = threads;
10✔
370

371
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
48✔
372
            if (!g_arguments->terminate)
38✔
373
                pthread_join(pidsOfSub[i], NULL);
38✔
374
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
375
            if (iface == REST_IFACE) {
38✔
376
                destroySockFd(pThreadInfo->sockfd);
5✔
377
            } else {
378
                closeBenchConn(pThreadInfo->conn);
33✔
379
            }
380
            if (g_fail) {
38✔
381
                goto OVER;
×
382
            }
383
        }
384
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; ++i) {
48✔
385
            g_queryInfo.superQueryInfo.totalQueried
386
                += infosOfSub[i].totalQueried;
38✔
387
        }
388
    } else {
389
        return 0;
2✔
390
    }
391

392
    ret = 0;
10✔
393
OVER:
10✔
394
    tmfree((char *)pidsOfSub);
10✔
395
    tmfree((char *)infosOfSub);
10✔
396

397
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
80✔
398
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
70✔
399
    }
400
    tmfree(g_queryInfo.superQueryInfo.childTblName);
10✔
401
    return ret;
10✔
402
}
403

404
// free g_queryInfo.specailQueryInfo memory , can re-call
405
void freeSpecialQueryInfo() {
11✔
406
    // can re-call
407
    if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
11✔
408
        return;
×
409
    }
410

411
    // loop free each item memory
412
    for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
228✔
413
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
217✔
414
        tmfree(sql->command);
217✔
415
        tmfree(sql->delay_list);
217✔
416
    }
417

418
    // free Array
419
    benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
11✔
420
    g_queryInfo.specifiedQueryInfo.sqls = NULL;
11✔
421
}
422

423

424
static int multi_thread_specified_table_query(uint16_t iface, char* dbName) {
13✔
425
    pthread_t * pids = NULL;
13✔
426
    threadInfo *infos = NULL;
13✔
427
    //==== create sub threads for query from specify table
428
    int      nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
13✔
429
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
13✔
430

431
    // check invaid
432
    if(nSqlCount == 0 || nConcurrent == 0 ) {
13✔
433
        if(nSqlCount == 0)
2✔
434
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
2✔
435
        if(nConcurrent == 0)
2✔
436
           warnPrint("concurrent is %d , specified_table_query->concurrent is zero. \n", nConcurrent);
×
437
        return 0;
2✔
438
    }
439

440
    // malloc funciton global memory
441
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
11✔
442
    infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false);
11✔
443

444
    bool exeError = false;
11✔
445
    for (uint64_t i = 0; i < nSqlCount; i++) {
48✔
446
        // reset
447
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
39✔
448
        memset(infos, 0, nConcurrent * sizeof(threadInfo));
39✔
449

450
        // get execute sql
451
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
39✔
452

453
        // create threads
454
        int threadCnt = 0;
39✔
455
        for (int j = 0; j < nConcurrent; j++) {
146✔
456
           threadInfo *pThreadInfo = infos + j;
107✔
457
           pThreadInfo->threadID = i * nConcurrent + j;
107✔
458
           pThreadInfo->querySeq = i;
107✔
459
           if (iface == REST_IFACE) {
107✔
460
                int sockfd = createSockFd();
13✔
461
                // int iMode = 1;
462
                // ioctl(sockfd, FIONBIO, &iMode);
463
                if (sockfd < 0) {
13✔
464
                    exeError = true;
×
465

466
                    break;
×
467
                }
468
                pThreadInfo->sockfd = sockfd;
13✔
469
           } else {
470
                pThreadInfo->conn = initBenchConn();
94✔
471
                if (pThreadInfo->conn == NULL) {
94✔
472
                    destroySockFd(pThreadInfo->sockfd);
×
473
                    exeError = true;
×
474
                    break;
×
475
                }
476
           }
477

478
           pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo);
107✔
479
           threadCnt++;
107✔
480
        }
481

482
        // if failed, set termainte flag true like ctrl+c exit
483
        if (exeError) {
39✔
484
            errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i);
×
485
            g_arguments->terminate = true;
×
486
        }
487

488
        // wait threads execute finished one by one
489
        for (int j = 0; j < threadCnt ; j++) {
146✔
490
           pthread_join(pids[j], NULL);
107✔
491
           threadInfo *pThreadInfo = infos + j;
107✔
492
           if (iface == REST_IFACE) {
107✔
493
#ifdef WINDOWS
494
                closesocket(pThreadInfo->sockfd);
495
                WSACleanup();
496
#else
497
                close(pThreadInfo->sockfd);
13✔
498
#endif
499
           } else {
500
                closeBenchConn(pThreadInfo->conn);
94✔
501
                pThreadInfo->conn = NULL;
94✔
502
           }
503

504
           // need exit in loop
505
           if (g_fail || g_arguments->terminate) {
107✔
506
                // free BArray
507
                tmfree(pThreadInfo->query_delay_list);
6✔
508
                pThreadInfo->query_delay_list = NULL;
6✔
509
           }
510
        }
511

512
        // cancel or need exit check
513
        if (g_fail || g_arguments->terminate) {
39✔
514
            // free current funciton malloc memory
515
            tmfree((char *)pids);
2✔
516
            tmfree((char *)infos);
2✔
517
            // free global
518
            freeSpecialQueryInfo();
2✔
519
            return -1;
2✔
520
        }
521

522
        // execute successfully
523
        uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
37✔
524
        uint64_t totalQueryTimes = query_times * nConcurrent;
37✔
525
        double   avg_delay = 0.0;
37✔
526
        for (int j = 0; j < nConcurrent; j++) {
138✔
527
           threadInfo *pThreadInfo = infos + j;
101✔
528
           avg_delay += pThreadInfo->avg_delay;
101✔
529
           for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
280✔
530
                sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k];
179✔
531
           }
532

533
           // free BArray
534
           tmfree(pThreadInfo->query_delay_list);
101✔
535
           pThreadInfo->query_delay_list = NULL;
101✔
536
        }
537
        avg_delay /= nConcurrent;
37✔
538
        qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare);
37✔
539
        infoPrintNoTimestamp("complete query with %d threads and %" PRIu64
37✔
540
                             " query delay "
541
                             "avg: \t%.6fs "
542
                             "min: \t%.6fs "
543
                             "max: \t%.6fs "
544
                             "p90: \t%.6fs "
545
                             "p95: \t%.6fs "
546
                             "p99: \t%.6fs "
547
                             "SQL command: %s"
548
                             "\n",
549
                             nConcurrent, query_times, avg_delay / 1E6,  /* avg */
550
                             sql->delay_list[0] / 1E6,                   /* min */
551
                             sql->delay_list[totalQueryTimes - 1] / 1E6, /*  max */
552
                             /*  p90 */
553
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
554
                             /*  p95 */
555
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
556
                             /*  p99 */
557
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
558
        infoPrintNoTimestampToFile("complete query with %d threads and %" PRIu64
37✔
559
                                   " query delay "
560
                                   "avg: \t%.6fs "
561
                                   "min: \t%.6fs "
562
                                   "max: \t%.6fs "
563
                                   "p90: \t%.6fs "
564
                                   "p95: \t%.6fs "
565
                                   "p99: \t%.6fs "
566
                                   "SQL command: %s"
567
                                   "\n",
568
                                   nConcurrent, query_times, avg_delay / 1E6,  /* avg */
569
                                   sql->delay_list[0] / 1E6,                   /* min */
570
                                   sql->delay_list[totalQueryTimes - 1] / 1E6, /*  max */
571
                                   /*  p90 */
572
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
573
                                   /*  p95 */
574
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
575
                                   /*  p99 */
576
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
577
    }
578

579
    g_queryInfo.specifiedQueryInfo.totalQueried =
9✔
580
        g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent;
9✔
581
    tmfree((char *)pids);
9✔
582
    tmfree((char *)infos);
9✔
583

584
    // free specialQueryInfo
585
    freeSpecialQueryInfo();
9✔
586
    return 0;
9✔
587
}
588

589
static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) {
1✔
590
    int code = -1;
1✔
591
    int thread = g_queryInfo.specifiedQueryInfo.concurrent;
1✔
592
    pthread_t * pids = benchCalloc(thread, sizeof(pthread_t), true);
1✔
593
    queryThreadInfo *infos = benchCalloc(thread, sizeof(queryThreadInfo), true);
1✔
594
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
1✔
595
    int start_sql = 0;
1✔
596
    int a = total_sql_num / thread;
1✔
597
    if (a < 1) {
1✔
598
        thread = total_sql_num;
1✔
599
        a = 1;
1✔
600
    }
601
    int b = 0;
1✔
602
    if (thread != 0) {
1✔
603
        b = total_sql_num % thread;
1✔
604
    }
605
    for (int i = 0; i < thread; ++i) {
3✔
606
        queryThreadInfo *pQueryThreadInfo = infos + i;
2✔
607
        pQueryThreadInfo->threadId = i;
2✔
608
        pQueryThreadInfo->start_sql = start_sql;
2✔
609
        pQueryThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1;
2✔
610
        start_sql = pQueryThreadInfo->end_sql + 1;
2✔
611
        pQueryThreadInfo->total_delay = 0;
2✔
612
        pQueryThreadInfo->query_delay_list = benchArrayInit(1, sizeof(int64_t));
2✔
613
        if (iface == REST_IFACE) {
2✔
614
            int sockfd = createSockFd();
×
615
            if (sockfd < 0) {
×
616
                goto OVER;
×
617
            }
618
            pQueryThreadInfo->sockfd = sockfd;
×
619
        } else {
620
            pQueryThreadInfo->conn = initBenchConn();
2✔
621
            if (pQueryThreadInfo->conn == NULL) {
2✔
622
                goto OVER;
×
623
            }
624
        }
625
        pthread_create(pids + i, NULL, mixedQuery, pQueryThreadInfo);
2✔
626
    }
627

628
    int64_t start = toolsGetTimestampUs();
1✔
629
    for (int i = 0; i < thread; ++i) {
3✔
630
        pthread_join(pids[i], NULL);
2✔
631
    }
632
    int64_t end = toolsGetTimestampUs();
1✔
633

634
    // statistic
635
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
1✔
636
    int64_t total_delay = 0;
1✔
637
    for (int i = 0; i < thread; ++i) {
3✔
638
        queryThreadInfo * pThreadInfo = infos + i;
2✔
639
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
2✔
640
                pThreadInfo->query_delay_list->size, true);
2✔
641
        total_delay += pThreadInfo->total_delay;
2✔
642
        tmfree(pThreadInfo->query_delay_list);
2✔
643
        pThreadInfo->query_delay_list = NULL;
2✔
644

645
        if (iface == REST_IFACE) {
2✔
646
#ifdef  WINDOWS
647
            closesocket(pThreadInfo->sockfd);
648
            WSACleanup();
649
#else
650
            close(pThreadInfo->sockfd);
×
651
#endif
652
        } else {
653
            closeBenchConn(pThreadInfo->conn);
2✔
654
        }
655
    }
656
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
1✔
657
    if (delay_list->size) {
1✔
658
        infoPrint(
1✔
659
                "spend %.6fs using "
660
                "%d threads complete query %d times,cd  "
661
                "min delay: %.6fs, "
662
                "avg delay: %.6fs, "
663
                "p90: %.6fs, "
664
                "p95: %.6fs, "
665
                "p99: %.6fs, "
666
                "max: %.6fs\n",
667
                (end - start)/1E6,
668
                thread, (int)delay_list->size,
669
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
670
                (double)total_delay/delay_list->size/1E6,
671
                *(int64_t *)(benchArrayGet(delay_list,
672
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
673
                *(int64_t *)(benchArrayGet(delay_list,
674
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
675
                *(int64_t *)(benchArrayGet(delay_list,
676
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
677
                *(int64_t *)(benchArrayGet(delay_list,
678
                                    (int32_t)(delay_list->size - 1)))/1E6);
679
    } else {
680
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
681
                   __func__, __LINE__, (int64_t)delay_list->size);
682
    }
683
    benchArrayDestroy(delay_list);
1✔
684
    code = 0;
1✔
685
OVER:
1✔
686
    tmfree(pids);
1✔
687
    tmfree(infos);
1✔
688
    return code;
1✔
689
}
690

691
#define KILLID_LEN  20
692

693
void *queryKiller(void *arg) {
×
694
    char host[MAX_HOSTNAME_LEN] = {0};
×
695
    tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN);
×
696

697
    while (true) {
×
698
        TAOS *taos = taos_connect(g_arguments->host, g_arguments->user,
×
699
                g_arguments->password, NULL, g_arguments->port);
×
700
        if (NULL == taos) {
×
701
            errorPrint("Slow query killer thread "
×
702
                    "failed to connect to the server %s\n",
703
                    g_arguments->host);
704
            return NULL;
×
705
        }
706

707
        char command[TSDB_MAX_ALLOWED_SQL_LEN] =
×
708
            "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries";
709
        TAOS_RES *res = taos_query(taos, command);
×
710
        int32_t code = taos_errno(res);
×
711
        if (code) {
×
712
            printErrCmdCodeStr(command, code, res);
×
713
        }
714

715
        TAOS_ROW row = NULL;
×
716
        while ((row = taos_fetch_row(res)) != NULL) {
×
717
            int32_t *lengths = taos_fetch_lengths(res);
×
718
            if (lengths[0] <= 0) {
×
719
                infoPrint("No valid query found by %s\n", command);
×
720
            } else {
721
                int64_t execUSec = *(int64_t*)row[1];
×
722

723
                if (execUSec > g_queryInfo.killQueryThreshold * 1000000) {
×
724
                    char sql[SHORT_1K_SQL_BUFF_LEN] = {0};
×
725
                    tstrncpy(sql, (char*)row[2],
×
726
                             min(strlen((char*)row[2])+1,
727
                                 SHORT_1K_SQL_BUFF_LEN));
728

729
                    char killId[KILLID_LEN] = {0};
×
730
                    tstrncpy(killId, (char*)row[0],
×
731
                            min(strlen((char*)row[0])+1, KILLID_LEN));
732
                    char killCommand[KILLID_LEN + 15] = {0};
×
733
                    snprintf(killCommand, KILLID_LEN + 15,
×
734
                             "KILL QUERY '%s'", killId);
735
                    TAOS_RES *resKill = taos_query(taos, killCommand);
×
736
                    int32_t codeKill = taos_errno(resKill);
×
737
                    if (codeKill) {
×
738
                        printErrCmdCodeStr(killCommand, codeKill, resKill);
×
739
                    } else {
740
                        infoPrint("%s succeed, sql: %s killed!\n",
×
741
                                  killCommand, sql);
742
                        taos_free_result(resKill);
×
743
                    }
744
                }
745
            }
746
        }
747

748
        taos_free_result(res);
×
749
        taos_close(taos);
×
750
        toolsMsleep(g_queryInfo.killQueryInterval*1000);
×
751
    }
752

753
    return NULL;
754
}
755

756
int queryTestProcess() {
14✔
757
    prompt(0);
14✔
758

759
    if (REST_IFACE == g_queryInfo.iface) {
14✔
760
        encodeAuthBase64();
3✔
761
    }
762

763
    pthread_t pidKiller = {0};
14✔
764
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
14✔
765
        pthread_create(&pidKiller, NULL, queryKiller, NULL);
×
766
        pthread_join(pidKiller, NULL);
×
767
        toolsMsleep(1000);
×
768
    }
769

770
    if (g_queryInfo.iface == REST_IFACE) {
14✔
771
        if (convertHostToServAddr(g_arguments->host,
3✔
772
                    g_arguments->port + TSDB_PORT_HTTP,
3✔
773
                    &(g_arguments->serv_addr)) != 0) {
3✔
774
            errorPrint("%s", "convert host to server address\n");
×
775
            return -1;
×
776
        }
777
    }
778

779
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
14✔
780
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
12✔
781
        SBenchConn* conn = initBenchConn();
12✔
782
        if (conn == NULL) {
12✔
783
            return -1;
×
784
        }
785
        char  cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
12✔
786
        if (3 == g_majorVersionOfClient) {
12✔
787
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
12✔
788
                    "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM %s.%s)",
789
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
790
        } else {
791
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
792
                     "SELECT COUNT(TBNAME) FROM %s.%s",
793
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
794
        }
795
        TAOS_RES *res = taos_query(conn->taos, cmd);
12✔
796
        int32_t   code = taos_errno(res);
12✔
797
        if (code) {
12✔
798
            printErrCmdCodeStr(cmd, code, res);
×
799
            closeBenchConn(conn);
×
800
            return -1;
×
801
        }
802
        TAOS_ROW    row = NULL;
12✔
803
        int         num_fields = taos_num_fields(res);
12✔
804
        TAOS_FIELD *fields = taos_fetch_fields(res);
12✔
805
        while ((row = taos_fetch_row(res)) != NULL) {
24✔
806
            if (0 == strlen((char *)(row[0]))) {
12✔
807
                errorPrint("stable %s have no child table\n",
×
808
                        g_queryInfo.superQueryInfo.stbName);
809
                taos_free_result(res);
×
810
                closeBenchConn(conn);
×
811
                return -1;
×
812
            }
813
            char temp[256] = {0};
12✔
814
            taos_print_row(temp, row, fields, num_fields);
12✔
815
            g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp);
12✔
816
        }
817
        infoPrint("%s's childTblCount: %" PRId64 "\n",
12✔
818
                g_queryInfo.superQueryInfo.stbName,
819
                g_queryInfo.superQueryInfo.childTblCount);
820
        taos_free_result(res);
12✔
821
        g_queryInfo.superQueryInfo.childTblName =
12✔
822
            benchCalloc(g_queryInfo.superQueryInfo.childTblCount,
12✔
823
                    sizeof(char *), false);
824
        if (getAllChildNameOfSuperTable(
12✔
825
                    conn->taos, g_queryInfo.dbName,
826
                    g_queryInfo.superQueryInfo.stbName,
827
                    g_queryInfo.superQueryInfo.childTblName,
828
                    g_queryInfo.superQueryInfo.childTblCount)) {
829
            tmfree(g_queryInfo.superQueryInfo.childTblName);
×
830
            closeBenchConn(conn);
×
831
            return -1;
×
832
        }
833
        closeBenchConn(conn);
12✔
834
    }
835
    uint64_t startTs = toolsGetTimestampMs();
14✔
836
    if (g_queryInfo.specifiedQueryInfo.mixed_query) {
14✔
837
        if (multi_thread_specified_mixed_query(g_queryInfo.iface,
1✔
838
                    g_queryInfo.dbName)) {
839
            return -1;
×
840
        }
841
    } else {
842
        if (multi_thread_specified_table_query(g_queryInfo.iface,
13✔
843
                    g_queryInfo.dbName)) {
844
            return -1;
2✔
845
        }
846
    }
847
    if (multi_thread_super_table_query(g_queryInfo.iface,
12✔
848
                g_queryInfo.dbName)) {
849
        return -1;
×
850
    }
851
    // workaround to use separate taos connection;
852
    uint64_t endTs = toolsGetTimestampMs();
12✔
853
    int64_t t = endTs - startTs;
12✔
854
    double  tInS = (double)t / 1000.0;
12✔
855
    if (g_queryInfo.specifiedQueryInfo.totalQueried)
12✔
856
        infoPrint("Total specified queries: %" PRIu64 "\n",
9✔
857
              g_queryInfo.specifiedQueryInfo.totalQueried);
858
    if (g_queryInfo.superQueryInfo.totalQueried)
12✔
859
    infoPrint("Total super queries: %" PRIu64 "\n",
10✔
860
              g_queryInfo.superQueryInfo.totalQueried);
861
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
12✔
862
        + g_queryInfo.superQueryInfo.totalQueried;
12✔
863
    infoPrint(
12✔
864
            "Spend %.4f second completed total queries: %" PRIu64
865
            ", the QPS of all threads: %10.3f\n\n",
866
            tInS, totalQueried, (double)totalQueried / tInS);
867
    infoPrintToFile(
12✔
868
            "Spend %.4f second completed total queries: %" PRIu64
869
            ", the QPS of all threads: %10.3f\n\n",
870
            tInS, totalQueried, (double)totalQueried / tInS);
871
    return 0;
12✔
872
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc