• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

taosdata / taos-tools / 12347770233

16 Dec 2024 07:18AM UTC coverage: 75.16% (-0.04%) from 75.196%
12347770233

push

github

web-flow
Merge pull request #828 from taosdata/enh/TD-33170-3.0-1

enh: query remove use db clause and remove check csv line

21 of 25 new or added lines in 2 files covered. (84.0%)

19 existing lines in 3 files now uncovered.

12061 of 16047 relevant lines covered (75.16%)

335414.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.51
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
extern int g_majorVersionOfClient;
17

18
int selectAndGetResult(threadInfo *pThreadInfo, char *command) {
6,271✔
19
    int ret = 0;
6,271✔
20

21
    if (g_arguments->terminate) {
6,271✔
22
        return -1;
×
23
    }
24
    uint32_t threadID = pThreadInfo->threadID;
6,271✔
25
    char dbName[TSDB_DB_NAME_LEN] = {0};
6,271✔
26
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
6,271✔
27

28
    if (g_queryInfo.iface == REST_IFACE) {
6,271✔
29
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
35✔
30
                                   0, g_arguments->port, false,
35✔
31
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
35✔
32
        if (0 != retCode) {
36✔
33
            errorPrint("====restful return fail, threadID[%u]\n",
×
34
                       threadID);
35
            ret = -1;
×
36
        }
37
    } else {
38
        TAOS *taos = pThreadInfo->conn->taos;
6,236✔
39
        int64_t rows  = 0;
6,236✔
40
        TAOS_RES *res = taos_query(taos, command);
6,236✔
41
        int code = taos_errno(res);
6,235✔
42
        if (res == NULL || code) {
6,233✔
43
            if (YES_IF_FAILED == g_arguments->continueIfFail) {
15✔
44
                warnPrint("failed to execute sql:%s, "
3✔
45
                            "code: 0x%08x, reason:%s\n",
46
                            command, code, taos_errstr(res));
47
            } else {
48
                errorPrint("failed to execute sql:%s, "
12✔
49
                            "code: 0x%08x, reason:%s\n",
50
                            command, code, taos_errstr(res));
51
                ret = -1;
12✔
52
            }
53
        } else {
54
            //if (strlen(pThreadInfo->filePath) > 0) {
55
                rows = fetchResult(res, pThreadInfo);
6,218✔
56
            //}
57
        }
58
        taos_free_result(res);
6,235✔
59
        debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
6,235✔
60
    }
61
    return ret;
6,271✔
62
}
63

64
static void *mixedQuery(void *sarg) {
2✔
65
    queryThreadInfo *pThreadInfo = (queryThreadInfo*)sarg;
2✔
66
#ifdef LINUX
67
    prctl(PR_SET_NAME, "mixedQuery");
2✔
68
#endif
69
    // use db
70
    if (g_queryInfo.dbName) {
2✔
71
        if (pThreadInfo->conn &&
2✔
72
            pThreadInfo->conn->taos &&
4✔
73
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
2✔
NEW
74
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadId, g_queryInfo.dbName);
×
NEW
75
                return NULL;
×
76
        }
77
    }
78

79
    int64_t lastPrintTs = toolsGetTimestampMs();
2✔
80
    int64_t st;
81
    int64_t et;
82
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
2✔
83
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
4✔
84
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
2✔
85
        for (int j = 0; j < queryTimes; ++j) {
6✔
86
            if (g_arguments->terminate) {
4✔
87
                return NULL;
×
88
            }
89
            if (g_queryInfo.reset_query_cache) {
4✔
90
                if (queryDbExecCall(pThreadInfo->conn,
×
91
                                    "RESET QUERY CACHE")) {
92
                    errorPrint("%s() LN%d, reset query cache failed\n",
×
93
                               __func__, __LINE__);
94
                    return NULL;
×
95
                }
96
            }
97
            st = toolsGetTimestampUs();
4✔
98
            if (g_queryInfo.iface == REST_IFACE) {
4✔
99
                int retCode = postProceSql(sql->command, g_queryInfo.dbName,
×
100
                                           0, g_queryInfo.iface, 0,
×
101
                                           g_arguments->port,
×
102
                                           false, pThreadInfo->sockfd, "");
103
                if (retCode) {
×
104
                    errorPrint("thread[%d]: restful query <%s> failed\n",
×
105
                            pThreadInfo->threadId, sql->command);
106
                    continue;
×
107
                }
108
            } else {
109
                TAOS_RES *res = taos_query(pThreadInfo->conn->taos,
4✔
110
                                           sql->command);
4✔
111
                if (res == NULL || taos_errno(res) != 0) {
4✔
112
                    if (YES_IF_FAILED == g_arguments->continueIfFail) {
×
113
                        warnPrint(
×
114
                                "thread[%d]: failed to execute sql :%s, "
115
                                "code: 0x%x, reason: %s\n",
116
                                pThreadInfo->threadId,
117
                                sql->command,
118
                                taos_errno(res), taos_errstr(res));
119
                    } else {
120
                        errorPrint(
×
121
                                "thread[%d]: failed to execute sql :%s, "
122
                                "code: 0x%x, reason: %s\n",
123
                                pThreadInfo->threadId,
124
                                sql->command,
125
                                taos_errno(res), taos_errstr(res));
126
                        if (TSDB_CODE_RPC_NETWORK_UNAVAIL ==
×
127
                                taos_errno(res)) {
×
128
                            taos_free_result(res);
×
129
                            return NULL;
×
130
                        }
131
                    }
132
                    taos_free_result(res);
×
133
                    continue;
×
134
                }
135
                taos_free_result(res);
4✔
136
            }
137
            et = toolsGetTimestampUs();
4✔
138
            int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
4✔
139
            *delay = et - st;
4✔
140
            debugPrint("%s() LN%d, delay: %"PRId64"\n",
4✔
141
                       __func__, __LINE__, *delay);
142

143
            pThreadInfo->total_delay += (et - st);
4✔
144
            if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
4✔
145
                tmfree(delay);
×
146
            }
147
            int64_t currentPrintTs = toolsGetTimestampMs();
4✔
148
            if (currentPrintTs - lastPrintTs > 10 * 1000) {
4✔
149
                infoPrint("thread[%d] has currently complete query %d times\n",
×
150
                        pThreadInfo->threadId,
151
                        (int)pThreadInfo->query_delay_list->size);
152
                lastPrintTs = currentPrintTs;
×
153
            }
154
        }
155
    }
156
    return NULL;
2✔
157
}
158

159
static void *specifiedTableQuery(void *sarg) {
107✔
160
    threadInfo *pThreadInfo = (threadInfo *)sarg;
107✔
161
#ifdef LINUX
162
    prctl(PR_SET_NAME, "specTableQuery");
107✔
163
#endif
164
    uint64_t st = 0;
107✔
165
    uint64_t et = 0;
107✔
166
    uint64_t minDelay = UINT64_MAX;
107✔
167
    uint64_t maxDelay = 0;
107✔
168
    uint64_t totalDelay = 0;
107✔
169
    int32_t  index = 0;
107✔
170

171
    // use db
172
    if (g_queryInfo.dbName) {
107✔
173
        if (pThreadInfo->conn &&
107✔
174
            pThreadInfo->conn->taos &&
188✔
175
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
94✔
NEW
176
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
NEW
177
                return NULL;
×
178
        }
179
    }
180

181
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
107✔
182
    pThreadInfo->query_delay_list = benchCalloc(queryTimes,
107✔
183
            sizeof(uint64_t), false);
184
    uint64_t  lastPrintTime = toolsGetTimestampMs();
107✔
185
    uint64_t  startTs = toolsGetTimestampMs();
107✔
186

187
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls,
107✔
188
            pThreadInfo->querySeq);
189

190
    if (sql->result[0] != '\0') {
107✔
191
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
102✔
192
                sql->result, pThreadInfo->threadID);
102✔
193
    }
194

195
    while (index < queryTimes) {
298✔
196
        // check cancel
197
        if (g_arguments->terminate) {
191✔
198
            return NULL;
×
199
        }
200

201
        if (g_queryInfo.specifiedQueryInfo.queryInterval &&
191✔
202
            (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
190✔
203
            toolsMsleep((int32_t)(
106✔
204
                        g_queryInfo.specifiedQueryInfo.queryInterval*1000
106✔
205
                        - (et - st)));  // ms
106✔
206
        }
207
        if (g_queryInfo.reset_query_cache) {
191✔
208
            // execute sql 
209
            if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE")) {
6✔
210
                errorPrint("%s() LN%d, reset query cache failed\n",
×
211
                           __func__, __LINE__);
212
                return NULL;
×
213
            }
214
        }
215

216
        st = toolsGetTimestampUs();
191✔
217
        int ret = selectAndGetResult(pThreadInfo, sql->command);
190✔
218
        if (ret) {
191✔
219
            g_fail = true;
12✔
220
        }
221

222
        et = toolsGetTimestampUs();
191✔
223
        int64_t delay = et - st;
191✔
224
        debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, delay);
191✔
225

226
        if (ret == 0) {
191✔
227
            pThreadInfo->query_delay_list[index] = delay;
179✔
228
            pThreadInfo->totalQueried++;
179✔
229
        }
230
        index++;
191✔
231
        totalDelay += delay;
191✔
232
        if (delay > maxDelay) {
191✔
233
            maxDelay = delay;
124✔
234
        }
235
        if (delay < minDelay) {
191✔
236
            minDelay = delay;
174✔
237
        }
238

239
        uint64_t currentPrintTime = toolsGetTimestampMs();
191✔
240
        uint64_t endTs = toolsGetTimestampMs();
191✔
241

242
        if ((ret == 0) && (currentPrintTime - lastPrintTime > 30 * 1000)) {
191✔
243
            infoPrint(
×
244
                    "thread[%d] has currently completed queries: %" PRIu64
245
                    ", QPS: %10.6f\n",
246
                    pThreadInfo->threadID, pThreadInfo->totalQueried,
247
                    (double)(pThreadInfo->totalQueried /
248
                        ((endTs - startTs) / 1000.0)));
249
            lastPrintTime = currentPrintTime;
×
250
        }
251

252
        if (-2 == ret) {
191✔
253
            toolsMsleep(1000);
×
254
            return NULL;
×
255
        }
256
    }
257
    qsort(pThreadInfo->query_delay_list, queryTimes,
107✔
258
            sizeof(uint64_t), compare);
259
    pThreadInfo->avg_delay = (double)totalDelay / queryTimes;
107✔
260
    return NULL;
107✔
261
}
262

263
static void *superTableQuery(void *sarg) {
38✔
264
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
38✔
265
    threadInfo *pThreadInfo = (threadInfo *)sarg;
38✔
266
#ifdef LINUX
267
    prctl(PR_SET_NAME, "superTableQuery");
38✔
268
#endif
269

270
    uint64_t st = 0;
38✔
271
    uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000;
38✔
272

273
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
38✔
274
    uint64_t startTs = toolsGetTimestampMs();
38✔
275

276
    uint64_t lastPrintTime = toolsGetTimestampMs();
38✔
277
    while (queryTimes--) {
122✔
278
        if (g_queryInfo.superQueryInfo.queryInterval
84✔
279
            && ((et - st) <
83✔
280
                (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000)) {
83✔
281
            toolsMsleep((int32_t)
46✔
282
                        (g_queryInfo.superQueryInfo.queryInterval*1000
46✔
283
                        - (et - st)));
46✔
284
        }
285

286
        st = toolsGetTimestampMs();
84✔
287
        for (int i = (int)pThreadInfo->start_table_from;
84✔
288
             i <= pThreadInfo->end_table_to; i++) {
218✔
289
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
6,208✔
290
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
6,074✔
291
                replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr,
6,074✔
292
                                    i);
293
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
6,073✔
294
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
6,074✔
295
                            g_queryInfo.superQueryInfo.result[j],
6,074✔
296
                            pThreadInfo->threadID);
297
                }
298
                if (selectAndGetResult(pThreadInfo, sqlstr)) {
6,073✔
299
                    g_fail = true;
×
300
                }
301

302
                pThreadInfo->totalQueried++;
6,074✔
303

304
                int64_t currentPrintTime = toolsGetTimestampMs();
6,074✔
305
                int64_t endTs = toolsGetTimestampMs();
6,074✔
306
                if (currentPrintTime - lastPrintTime > 30 * 1000) {
6,074✔
307
                    infoPrint(
×
308
                        "thread[%d] has currently completed queries: %" PRIu64
309
                        ", QPS: %10.3f\n",
310
                        pThreadInfo->threadID, pThreadInfo->totalQueried,
311
                        (double)(pThreadInfo->totalQueried /
312
                                 ((endTs - startTs) / 1000.0)));
313
                    lastPrintTime = currentPrintTime;
×
314
                }
315
            }
316
        }
317
        et = toolsGetTimestampMs();
84✔
318
    }
319
    tmfree(sqlstr);
38✔
320
    return NULL;
38✔
321
}
322

323
static int multi_thread_super_table_query(uint16_t iface, char* dbName) {
12✔
324
    int ret = -1;
12✔
325
    pthread_t * pidsOfSub = NULL;
12✔
326
    threadInfo *infosOfSub = NULL;
12✔
327
    //==== create sub threads for query from all sub table of the super table
328
    if ((g_queryInfo.superQueryInfo.sqlCount > 0)
12✔
329
            && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
20✔
330
        pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
331
                                *sizeof(pthread_t),
332
                                false);
333
        infosOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
334
                                 *sizeof(threadInfo), false);
335

336
        int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
10✔
337
        int     threads = g_queryInfo.superQueryInfo.threadCnt;
10✔
338

339
        int64_t a = ntables / threads;
10✔
340
        if (a < 1) {
10✔
341
            threads = (int)ntables;
2✔
342
            a = 1;
2✔
343
        }
344

345
        int64_t b = 0;
10✔
346
        if (threads != 0) {
10✔
347
            b = ntables % threads;
10✔
348
        }
349

350
        uint64_t tableFrom = 0;
10✔
351
        for (int i = 0; i < threads; i++) {
48✔
352
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
353
            pThreadInfo->threadID = i;
38✔
354
            pThreadInfo->start_table_from = tableFrom;
38✔
355
            pThreadInfo->ntables = i < b ? a + 1 : a;
38✔
356
            pThreadInfo->end_table_to =
38✔
357
                    i < b ? tableFrom + a : tableFrom + a - 1;
38✔
358
            tableFrom = pThreadInfo->end_table_to + 1;
38✔
359
            if (iface == REST_IFACE) {
38✔
360
                int sockfd = createSockFd();
5✔
361
                if (sockfd < 0) {
5✔
362
                    goto OVER;
×
363
                }
364
                pThreadInfo->sockfd = sockfd;
5✔
365
            } else {
366
                pThreadInfo->conn = initBenchConn();
33✔
367
                if (pThreadInfo->conn == NULL) {
33✔
368
                    goto OVER;
×
369
                }
370
            }
371
            pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
38✔
372
        }
373
        g_queryInfo.superQueryInfo.threadCnt = threads;
10✔
374

375
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
48✔
376
            if (!g_arguments->terminate)
38✔
377
                pthread_join(pidsOfSub[i], NULL);
38✔
378
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
379
            if (iface == REST_IFACE) {
38✔
380
                destroySockFd(pThreadInfo->sockfd);
5✔
381
            } else {
382
                closeBenchConn(pThreadInfo->conn);
33✔
383
            }
384
            if (g_fail) {
38✔
385
                goto OVER;
×
386
            }
387
        }
388
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; ++i) {
48✔
389
            g_queryInfo.superQueryInfo.totalQueried
390
                += infosOfSub[i].totalQueried;
38✔
391
        }
392
    } else {
393
        return 0;
2✔
394
    }
395

396
    ret = 0;
10✔
397
OVER:
10✔
398
    tmfree((char *)pidsOfSub);
10✔
399
    tmfree((char *)infosOfSub);
10✔
400

401
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
80✔
402
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
70✔
403
    }
404
    tmfree(g_queryInfo.superQueryInfo.childTblName);
10✔
405
    return ret;
10✔
406
}
407

408
// free g_queryInfo.specailQueryInfo memory , can re-call
409
void freeSpecialQueryInfo() {
11✔
410
    // can re-call
411
    if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
11✔
412
        return;
×
413
    }
414

415
    // loop free each item memory
416
    for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
228✔
417
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
217✔
418
        tmfree(sql->command);
217✔
419
        tmfree(sql->delay_list);
217✔
420
    }
421

422
    // free Array
423
    benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
11✔
424
    g_queryInfo.specifiedQueryInfo.sqls = NULL;
11✔
425
}
426

427

428
static int multi_thread_specified_table_query(uint16_t iface, char* dbName) {
13✔
429
    pthread_t * pids = NULL;
13✔
430
    threadInfo *infos = NULL;
13✔
431
    //==== create sub threads for query from specify table
432
    int      nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
13✔
433
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
13✔
434

435
    // check invaid
436
    if(nSqlCount == 0 || nConcurrent == 0 ) {
13✔
437
        if(nSqlCount == 0)
2✔
438
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
2✔
439
        if(nConcurrent == 0)
2✔
440
           warnPrint("concurrent is %d , specified_table_query->concurrent is zero. \n", nConcurrent);
×
441
        return 0;
2✔
442
    }
443

444
    // malloc funciton global memory
445
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
11✔
446
    infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false);
11✔
447

448
    bool exeError = false;
11✔
449
    for (uint64_t i = 0; i < nSqlCount; i++) {
48✔
450
        // reset
451
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
39✔
452
        memset(infos, 0, nConcurrent * sizeof(threadInfo));
39✔
453

454
        // get execute sql
455
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
39✔
456

457
        // create threads
458
        int threadCnt = 0;
39✔
459
        for (int j = 0; j < nConcurrent; j++) {
146✔
460
           threadInfo *pThreadInfo = infos + j;
107✔
461
           pThreadInfo->threadID = i * nConcurrent + j;
107✔
462
           pThreadInfo->querySeq = i;
107✔
463
           if (iface == REST_IFACE) {
107✔
464
                int sockfd = createSockFd();
13✔
465
                // int iMode = 1;
466
                // ioctl(sockfd, FIONBIO, &iMode);
467
                if (sockfd < 0) {
13✔
468
                    exeError = true;
×
469

470
                    break;
×
471
                }
472
                pThreadInfo->sockfd = sockfd;
13✔
473
           } else {
474
                pThreadInfo->conn = initBenchConn();
94✔
475
                if (pThreadInfo->conn == NULL) {
94✔
476
                    destroySockFd(pThreadInfo->sockfd);
×
477
                    exeError = true;
×
478
                    break;
×
479
                }
480
           }
481

482
           pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo);
107✔
483
           threadCnt++;
107✔
484
        }
485

486
        // if failed, set termainte flag true like ctrl+c exit
487
        if (exeError) {
39✔
488
            errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i);
×
489
            g_arguments->terminate = true;
×
490
        }
491

492
        // wait threads execute finished one by one
493
        for (int j = 0; j < threadCnt ; j++) {
146✔
494
           pthread_join(pids[j], NULL);
107✔
495
           threadInfo *pThreadInfo = infos + j;
107✔
496
           if (iface == REST_IFACE) {
107✔
497
#ifdef WINDOWS
498
                closesocket(pThreadInfo->sockfd);
499
                WSACleanup();
500
#else
501
                close(pThreadInfo->sockfd);
13✔
502
#endif
503
           } else {
504
                closeBenchConn(pThreadInfo->conn);
94✔
505
                pThreadInfo->conn = NULL;
94✔
506
           }
507

508
           // need exit in loop
509
           if (g_fail || g_arguments->terminate) {
107✔
510
                // free BArray
511
                tmfree(pThreadInfo->query_delay_list);
6✔
512
                pThreadInfo->query_delay_list = NULL;
6✔
513
           }
514
        }
515

516
        // cancel or need exit check
517
        if (g_fail || g_arguments->terminate) {
39✔
518
            // free current funciton malloc memory
519
            tmfree((char *)pids);
2✔
520
            tmfree((char *)infos);
2✔
521
            // free global
522
            freeSpecialQueryInfo();
2✔
523
            return -1;
2✔
524
        }
525

526
        // execute successfully
527
        uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
37✔
528
        uint64_t totalQueryTimes = query_times * nConcurrent;
37✔
529
        double   avg_delay = 0.0;
37✔
530
        for (int j = 0; j < nConcurrent; j++) {
138✔
531
           threadInfo *pThreadInfo = infos + j;
101✔
532
           avg_delay += pThreadInfo->avg_delay;
101✔
533
           for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
280✔
534
                sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k];
179✔
535
           }
536

537
           // free BArray
538
           tmfree(pThreadInfo->query_delay_list);
101✔
539
           pThreadInfo->query_delay_list = NULL;
101✔
540
        }
541
        avg_delay /= nConcurrent;
37✔
542
        qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare);
37✔
543
        infoPrintNoTimestamp("complete query with %d threads and %" PRIu64
37✔
544
                             " query delay "
545
                             "avg: \t%.6fs "
546
                             "min: \t%.6fs "
547
                             "max: \t%.6fs "
548
                             "p90: \t%.6fs "
549
                             "p95: \t%.6fs "
550
                             "p99: \t%.6fs "
551
                             "SQL command: %s"
552
                             "\n",
553
                             nConcurrent, query_times, avg_delay / 1E6,  /* avg */
554
                             sql->delay_list[0] / 1E6,                   /* min */
555
                             sql->delay_list[totalQueryTimes - 1] / 1E6, /*  max */
556
                             /*  p90 */
557
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
558
                             /*  p95 */
559
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
560
                             /*  p99 */
561
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
562
        infoPrintNoTimestampToFile("complete query with %d threads and %" PRIu64
37✔
563
                                   " query delay "
564
                                   "avg: \t%.6fs "
565
                                   "min: \t%.6fs "
566
                                   "max: \t%.6fs "
567
                                   "p90: \t%.6fs "
568
                                   "p95: \t%.6fs "
569
                                   "p99: \t%.6fs "
570
                                   "SQL command: %s"
571
                                   "\n",
572
                                   nConcurrent, query_times, avg_delay / 1E6,  /* avg */
573
                                   sql->delay_list[0] / 1E6,                   /* min */
574
                                   sql->delay_list[totalQueryTimes - 1] / 1E6, /*  max */
575
                                   /*  p90 */
576
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
577
                                   /*  p95 */
578
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
579
                                   /*  p99 */
580
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
581
    }
582

583
    g_queryInfo.specifiedQueryInfo.totalQueried =
9✔
584
        g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent;
9✔
585
    tmfree((char *)pids);
9✔
586
    tmfree((char *)infos);
9✔
587

588
    // free specialQueryInfo
589
    freeSpecialQueryInfo();
9✔
590
    return 0;
9✔
591
}
592

593
static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) {
1✔
594
    int code = -1;
1✔
595
    int thread = g_queryInfo.specifiedQueryInfo.concurrent;
1✔
596
    pthread_t * pids = benchCalloc(thread, sizeof(pthread_t), true);
1✔
597
    queryThreadInfo *infos = benchCalloc(thread, sizeof(queryThreadInfo), true);
1✔
598
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
1✔
599
    int start_sql = 0;
1✔
600
    int a = total_sql_num / thread;
1✔
601
    if (a < 1) {
1✔
602
        thread = total_sql_num;
1✔
603
        a = 1;
1✔
604
    }
605
    int b = 0;
1✔
606
    if (thread != 0) {
1✔
607
        b = total_sql_num % thread;
1✔
608
    }
609
    for (int i = 0; i < thread; ++i) {
3✔
610
        queryThreadInfo *pQueryThreadInfo = infos + i;
2✔
611
        pQueryThreadInfo->threadId = i;
2✔
612
        pQueryThreadInfo->start_sql = start_sql;
2✔
613
        pQueryThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1;
2✔
614
        start_sql = pQueryThreadInfo->end_sql + 1;
2✔
615
        pQueryThreadInfo->total_delay = 0;
2✔
616
        pQueryThreadInfo->query_delay_list = benchArrayInit(1, sizeof(int64_t));
2✔
617
        if (iface == REST_IFACE) {
2✔
618
            int sockfd = createSockFd();
×
619
            if (sockfd < 0) {
×
620
                goto OVER;
×
621
            }
622
            pQueryThreadInfo->sockfd = sockfd;
×
623
        } else {
624
            pQueryThreadInfo->conn = initBenchConn();
2✔
625
            if (pQueryThreadInfo->conn == NULL) {
2✔
626
                goto OVER;
×
627
            }
628
        }
629
        pthread_create(pids + i, NULL, mixedQuery, pQueryThreadInfo);
2✔
630
    }
631

632
    int64_t start = toolsGetTimestampUs();
1✔
633
    for (int i = 0; i < thread; ++i) {
3✔
634
        pthread_join(pids[i], NULL);
2✔
635
    }
636
    int64_t end = toolsGetTimestampUs();
1✔
637

638
    // statistic
639
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
1✔
640
    int64_t total_delay = 0;
1✔
641
    for (int i = 0; i < thread; ++i) {
3✔
642
        queryThreadInfo * pThreadInfo = infos + i;
2✔
643
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
2✔
644
                pThreadInfo->query_delay_list->size, true);
2✔
645
        total_delay += pThreadInfo->total_delay;
2✔
646
        tmfree(pThreadInfo->query_delay_list);
2✔
647
        pThreadInfo->query_delay_list = NULL;
2✔
648

649
        if (iface == REST_IFACE) {
2✔
650
#ifdef  WINDOWS
651
            closesocket(pThreadInfo->sockfd);
652
            WSACleanup();
653
#else
654
            close(pThreadInfo->sockfd);
×
655
#endif
656
        } else {
657
            closeBenchConn(pThreadInfo->conn);
2✔
658
        }
659
    }
660
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
1✔
661
    if (delay_list->size) {
1✔
662
        infoPrint(
1✔
663
                "spend %.6fs using "
664
                "%d threads complete query %d times,cd  "
665
                "min delay: %.6fs, "
666
                "avg delay: %.6fs, "
667
                "p90: %.6fs, "
668
                "p95: %.6fs, "
669
                "p99: %.6fs, "
670
                "max: %.6fs\n",
671
                (end - start)/1E6,
672
                thread, (int)delay_list->size,
673
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
674
                (double)total_delay/delay_list->size/1E6,
675
                *(int64_t *)(benchArrayGet(delay_list,
676
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
677
                *(int64_t *)(benchArrayGet(delay_list,
678
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
679
                *(int64_t *)(benchArrayGet(delay_list,
680
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
681
                *(int64_t *)(benchArrayGet(delay_list,
682
                                    (int32_t)(delay_list->size - 1)))/1E6);
683
    } else {
684
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
685
                   __func__, __LINE__, (int64_t)delay_list->size);
686
    }
687
    benchArrayDestroy(delay_list);
1✔
688
    code = 0;
1✔
689
OVER:
1✔
690
    tmfree(pids);
1✔
691
    tmfree(infos);
1✔
692
    return code;
1✔
693
}
694

695
#define KILLID_LEN  20
696

697
void *queryKiller(void *arg) {
×
698
    char host[MAX_HOSTNAME_LEN] = {0};
×
699
    tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN);
×
700

701
    while (true) {
×
702
        TAOS *taos = taos_connect(g_arguments->host, g_arguments->user,
×
703
                g_arguments->password, NULL, g_arguments->port);
×
704
        if (NULL == taos) {
×
705
            errorPrint("Slow query killer thread "
×
706
                    "failed to connect to the server %s\n",
707
                    g_arguments->host);
708
            return NULL;
×
709
        }
710

711
        char command[TSDB_MAX_ALLOWED_SQL_LEN] =
×
712
            "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries";
713
        TAOS_RES *res = taos_query(taos, command);
×
714
        int32_t code = taos_errno(res);
×
715
        if (code) {
×
716
            printErrCmdCodeStr(command, code, res);
×
717
        }
718

719
        TAOS_ROW row = NULL;
×
720
        while ((row = taos_fetch_row(res)) != NULL) {
×
721
            int32_t *lengths = taos_fetch_lengths(res);
×
722
            if (lengths[0] <= 0) {
×
723
                infoPrint("No valid query found by %s\n", command);
×
724
            } else {
725
                int64_t execUSec = *(int64_t*)row[1];
×
726

727
                if (execUSec > g_queryInfo.killQueryThreshold * 1000000) {
×
728
                    char sql[SHORT_1K_SQL_BUFF_LEN] = {0};
×
729
                    tstrncpy(sql, (char*)row[2],
×
730
                             min(strlen((char*)row[2])+1,
731
                                 SHORT_1K_SQL_BUFF_LEN));
732

733
                    char killId[KILLID_LEN] = {0};
×
734
                    tstrncpy(killId, (char*)row[0],
×
735
                            min(strlen((char*)row[0])+1, KILLID_LEN));
736
                    char killCommand[KILLID_LEN + 15] = {0};
×
737
                    snprintf(killCommand, KILLID_LEN + 15,
×
738
                             "KILL QUERY '%s'", killId);
739
                    TAOS_RES *resKill = taos_query(taos, killCommand);
×
740
                    int32_t codeKill = taos_errno(resKill);
×
741
                    if (codeKill) {
×
742
                        printErrCmdCodeStr(killCommand, codeKill, resKill);
×
743
                    } else {
744
                        infoPrint("%s succeed, sql: %s killed!\n",
×
745
                                  killCommand, sql);
746
                        taos_free_result(resKill);
×
747
                    }
748
                }
749
            }
750
        }
751

752
        taos_free_result(res);
×
753
        taos_close(taos);
×
754
        toolsMsleep(g_queryInfo.killQueryInterval*1000);
×
755
    }
756

757
    return NULL;
758
}
759

760
int queryTestProcess() {
14✔
761
    prompt(0);
14✔
762

763
    if (REST_IFACE == g_queryInfo.iface) {
14✔
764
        encodeAuthBase64();
3✔
765
    }
766

767
    pthread_t pidKiller = {0};
14✔
768
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
14✔
769
        pthread_create(&pidKiller, NULL, queryKiller, NULL);
×
770
        pthread_join(pidKiller, NULL);
×
771
        toolsMsleep(1000);
×
772
    }
773

774
    if (g_queryInfo.iface == REST_IFACE) {
14✔
775
        if (convertHostToServAddr(g_arguments->host,
3✔
776
                    g_arguments->port + TSDB_PORT_HTTP,
3✔
777
                    &(g_arguments->serv_addr)) != 0) {
3✔
778
            errorPrint("%s", "convert host to server address\n");
×
779
            return -1;
×
780
        }
781
    }
782

783
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
14✔
784
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
12✔
785
        SBenchConn* conn = initBenchConn();
12✔
786
        if (conn == NULL) {
12✔
787
            return -1;
×
788
        }
789
        char  cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
12✔
790
        if (3 == g_majorVersionOfClient) {
12✔
791
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
12✔
792
                    "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM %s.%s)",
793
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
794
        } else {
795
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
796
                     "SELECT COUNT(TBNAME) FROM %s.%s",
797
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
798
        }
799
        TAOS_RES *res = taos_query(conn->taos, cmd);
12✔
800
        int32_t   code = taos_errno(res);
12✔
801
        if (code) {
12✔
802
            printErrCmdCodeStr(cmd, code, res);
×
803
            closeBenchConn(conn);
×
804
            return -1;
×
805
        }
806
        TAOS_ROW    row = NULL;
12✔
807
        int         num_fields = taos_num_fields(res);
12✔
808
        TAOS_FIELD *fields = taos_fetch_fields(res);
12✔
809
        while ((row = taos_fetch_row(res)) != NULL) {
24✔
810
            if (0 == strlen((char *)(row[0]))) {
12✔
811
                errorPrint("stable %s have no child table\n",
×
812
                        g_queryInfo.superQueryInfo.stbName);
813
                taos_free_result(res);
×
814
                closeBenchConn(conn);
×
815
                return -1;
×
816
            }
817
            char temp[256] = {0};
12✔
818
            taos_print_row(temp, row, fields, num_fields);
12✔
819
            g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp);
12✔
820
        }
821
        infoPrint("%s's childTblCount: %" PRId64 "\n",
12✔
822
                g_queryInfo.superQueryInfo.stbName,
823
                g_queryInfo.superQueryInfo.childTblCount);
824
        taos_free_result(res);
12✔
825
        g_queryInfo.superQueryInfo.childTblName =
12✔
826
            benchCalloc(g_queryInfo.superQueryInfo.childTblCount,
12✔
827
                    sizeof(char *), false);
828
        if (getAllChildNameOfSuperTable(
12✔
829
                    conn->taos, g_queryInfo.dbName,
830
                    g_queryInfo.superQueryInfo.stbName,
831
                    g_queryInfo.superQueryInfo.childTblName,
832
                    g_queryInfo.superQueryInfo.childTblCount)) {
833
            tmfree(g_queryInfo.superQueryInfo.childTblName);
×
834
            closeBenchConn(conn);
×
835
            return -1;
×
836
        }
837
        closeBenchConn(conn);
12✔
838
    }
839
    uint64_t startTs = toolsGetTimestampMs();
14✔
840
    if (g_queryInfo.specifiedQueryInfo.mixed_query) {
14✔
841
        if (multi_thread_specified_mixed_query(g_queryInfo.iface,
1✔
842
                    g_queryInfo.dbName)) {
843
            return -1;
×
844
        }
845
    } else {
846
        if (multi_thread_specified_table_query(g_queryInfo.iface,
13✔
847
                    g_queryInfo.dbName)) {
848
            return -1;
2✔
849
        }
850
    }
851
    if (multi_thread_super_table_query(g_queryInfo.iface,
12✔
852
                g_queryInfo.dbName)) {
853
        return -1;
×
854
    }
855
    // workaround to use separate taos connection;
856
    uint64_t endTs = toolsGetTimestampMs();
12✔
857
    int64_t t = endTs - startTs;
12✔
858
    double  tInS = (double)t / 1000.0;
12✔
859
    if (g_queryInfo.specifiedQueryInfo.totalQueried)
12✔
860
        infoPrint("Total specified queries: %" PRIu64 "\n",
9✔
861
              g_queryInfo.specifiedQueryInfo.totalQueried);
862
    if (g_queryInfo.superQueryInfo.totalQueried)
12✔
863
    infoPrint("Total super queries: %" PRIu64 "\n",
10✔
864
              g_queryInfo.superQueryInfo.totalQueried);
865
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
12✔
866
        + g_queryInfo.superQueryInfo.totalQueried;
12✔
867
    infoPrint(
12✔
868
            "Spend %.4f second completed total queries: %" PRIu64
869
            ", the QPS of all threads: %10.3f\n\n",
870
            tInS, totalQueried, (double)totalQueried / tInS);
871
    infoPrintToFile(
12✔
872
            "Spend %.4f second completed total queries: %" PRIu64
873
            ", the QPS of all threads: %10.3f\n\n",
874
            tInS, totalQueried, (double)totalQueried / tInS);
875
    return 0;
12✔
876
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc