• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 11379713827

17 Oct 2024 06:40AM UTC coverage: 74.936% (+2.6%) from 72.385%
11379713827

push

github

web-flow
Merge pull request #808 from taosdata/3.0

taos-tools 3.0 branch to main

448 of 573 new or added lines in 9 files covered. (78.18%)

2254 existing lines in 9 files now uncovered.

11753 of 15684 relevant lines covered (74.94%)

232876.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.31
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14

15
extern int g_majorVersionOfClient;
16

17
int selectAndGetResult(threadInfo *pThreadInfo, char *command) {
6,271✔
18
    int ret = 0;
6,271✔
19

20
    if (g_arguments->terminate) {
6,271✔
21
        return -1;
×
22
    }
23
    uint32_t threadID = pThreadInfo->threadID;
6,271✔
24
    char dbName[TSDB_DB_NAME_LEN] = {0};
6,271✔
25
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
6,271✔
26

27
    if (g_queryInfo.iface == REST_IFACE) {
6,271✔
28
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
36✔
29
                                   0, g_arguments->port, false,
36✔
30
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
36✔
31
        if (0 != retCode) {
36✔
32
            errorPrint("====restful return fail, threadID[%u]\n",
×
33
                       threadID);
34
            ret = -1;
×
35
        }
36
    } else {
37
        TAOS *taos = pThreadInfo->conn->taos;
6,235✔
38
        if (taos_select_db(taos, g_queryInfo.dbName)) {
6,235✔
39
            errorPrint("thread[%u]: failed to select database(%s)\n",
×
40
                threadID, dbName);
41
            ret = -2;
×
42
        } else {
43
            int64_t rows  = 0;
6,235✔
44
            TAOS_RES *res = taos_query(taos, command);
6,235✔
45
            int code = taos_errno(res);
6,235✔
46
            if (res == NULL || code) {
6,234✔
47
                if (YES_IF_FAILED == g_arguments->continueIfFail) {
15✔
48
                    warnPrint("failed to execute sql:%s, "
3✔
49
                              "code: 0x%08x, reason:%s\n",
50
                               command, code, taos_errstr(res));
51
                } else {
52
                    errorPrint("failed to execute sql:%s, "
12✔
53
                               "code: 0x%08x, reason:%s\n",
54
                               command, code, taos_errstr(res));
55
                    ret = -1;
12✔
56
                }
57
            } else {
58
                //if (strlen(pThreadInfo->filePath) > 0) {
59
                    rows = fetchResult(res, pThreadInfo);
6,219✔
60
                //}
61
            }
62
            taos_free_result(res);
6,235✔
63
            debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
6,235✔
64
        }
65
    }
66
    return ret;
6,271✔
67
}
68

69
static void *mixedQuery(void *sarg) {
2✔
70
    queryThreadInfo *pThreadInfo = (queryThreadInfo*)sarg;
2✔
71
#ifdef LINUX
72
    prctl(PR_SET_NAME, "mixedQuery");
2✔
73
#endif
74
    int64_t lastPrintTs = toolsGetTimestampMs();
2✔
75
    int64_t st;
76
    int64_t et;
77
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
2✔
78
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
4✔
79
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
2✔
80
        for (int j = 0; j < queryTimes; ++j) {
6✔
81
            if (g_arguments->terminate) {
4✔
UNCOV
82
                return NULL;
×
83
            }
84
            if (g_queryInfo.reset_query_cache) {
4✔
85
                if (queryDbExecCall(pThreadInfo->conn,
×
86
                                    "RESET QUERY CACHE")) {
87
                    errorPrint("%s() LN%d, reset query cache failed\n",
×
88
                               __func__, __LINE__);
UNCOV
89
                    return NULL;
×
90
                }
91
            }
92
            st = toolsGetTimestampUs();
4✔
93
            if (g_queryInfo.iface == REST_IFACE) {
4✔
94
                int retCode = postProceSql(sql->command, g_queryInfo.dbName,
×
UNCOV
95
                                           0, g_queryInfo.iface, 0,
×
96
                                           g_arguments->port,
×
97
                                           false, pThreadInfo->sockfd, "");
UNCOV
98
                if (retCode) {
×
99
                    errorPrint("thread[%d]: restful query <%s> failed\n",
×
100
                            pThreadInfo->threadId, sql->command);
UNCOV
101
                    continue;
×
102
                }
103
            } else {
104
                if (g_queryInfo.dbName != NULL) {
4✔
105
                    if (taos_select_db(pThreadInfo->conn->taos,
4✔
106
                                       g_queryInfo.dbName)) {
4✔
UNCOV
107
                        errorPrint("thread[%d]: failed to "
×
108
                                   "select database(%s)\n",
109
                                   pThreadInfo->threadId,
110
                                   g_queryInfo.dbName);
UNCOV
111
                        return NULL;
×
112
                    }
113
                }
114
                TAOS_RES *res = taos_query(pThreadInfo->conn->taos,
4✔
115
                                           sql->command);
4✔
116
                if (res == NULL || taos_errno(res) != 0) {
4✔
UNCOV
117
                    if (YES_IF_FAILED == g_arguments->continueIfFail) {
×
UNCOV
118
                        warnPrint(
×
119
                                "thread[%d]: failed to execute sql :%s, "
120
                                "code: 0x%x, reason: %s\n",
121
                                pThreadInfo->threadId,
122
                                sql->command,
123
                                taos_errno(res), taos_errstr(res));
124
                    } else {
UNCOV
125
                        errorPrint(
×
126
                                "thread[%d]: failed to execute sql :%s, "
127
                                "code: 0x%x, reason: %s\n",
128
                                pThreadInfo->threadId,
129
                                sql->command,
130
                                taos_errno(res), taos_errstr(res));
131
                        if (TSDB_CODE_RPC_NETWORK_UNAVAIL ==
×
132
                                taos_errno(res)) {
×
UNCOV
133
                            taos_free_result(res);
×
UNCOV
134
                            return NULL;
×
135
                        }
136
                    }
UNCOV
137
                    taos_free_result(res);
×
UNCOV
138
                    continue;
×
139
                }
140
                taos_free_result(res);
4✔
141
            }
142
            et = toolsGetTimestampUs();
4✔
143
            int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
4✔
144
            *delay = et - st;
4✔
145
            debugPrint("%s() LN%d, delay: %"PRId64"\n",
4✔
146
                       __func__, __LINE__, *delay);
147

148
            pThreadInfo->total_delay += (et - st);
4✔
149
            if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
4✔
UNCOV
150
                tmfree(delay);
×
151
            }
152
            int64_t currentPrintTs = toolsGetTimestampMs();
4✔
153
            if (currentPrintTs - lastPrintTs > 10 * 1000) {
4✔
UNCOV
154
                infoPrint("thread[%d] has currently complete query %d times\n",
×
155
                        pThreadInfo->threadId,
156
                        (int)pThreadInfo->query_delay_list->size);
UNCOV
157
                lastPrintTs = currentPrintTs;
×
158
            }
159
        }
160
    }
161
    return NULL;
2✔
162
}
163

164
static void *specifiedTableQuery(void *sarg) {
107✔
165
    threadInfo *pThreadInfo = (threadInfo *)sarg;
107✔
166
#ifdef LINUX
167
    prctl(PR_SET_NAME, "specTableQuery");
107✔
168
#endif
169
    uint64_t st = 0;
107✔
170
    uint64_t et = 0;
107✔
171
    uint64_t minDelay = UINT64_MAX;
107✔
172
    uint64_t maxDelay = 0;
107✔
173
    uint64_t totalDelay = 0;
107✔
174
    int32_t  index = 0;
107✔
175

176
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
107✔
177
    pThreadInfo->query_delay_list = benchCalloc(queryTimes,
107✔
178
            sizeof(uint64_t), false);
179
    uint64_t  lastPrintTime = toolsGetTimestampMs();
107✔
180
    uint64_t  startTs = toolsGetTimestampMs();
107✔
181

182
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls,
107✔
183
            pThreadInfo->querySeq);
184

185
    if (sql->result[0] != '\0') {
107✔
186
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
102✔
187
                sql->result, pThreadInfo->threadID);
102✔
188
    }
189

190
    while (index < queryTimes) {
298✔
191
        // check cancel
192
        if (g_arguments->terminate) {
191✔
UNCOV
193
            return NULL;
×
194
        }
195

196
        if (g_queryInfo.specifiedQueryInfo.queryInterval &&
191✔
197
            (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
190✔
198
            toolsMsleep((int32_t)(
106✔
199
                        g_queryInfo.specifiedQueryInfo.queryInterval*1000
106✔
200
                        - (et - st)));  // ms
106✔
201
        }
202
        if (g_queryInfo.reset_query_cache) {
191✔
203
            // execute sql 
204
            if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE")) {
6✔
205
                errorPrint("%s() LN%d, reset query cache failed\n",
×
206
                           __func__, __LINE__);
UNCOV
207
                return NULL;
×
208
            }
209
        }
210

211
        st = toolsGetTimestampUs();
191✔
212
        int ret = selectAndGetResult(pThreadInfo, sql->command);
191✔
213
        if (ret) {
191✔
214
            g_fail = true;
12✔
215
        }
216

217
        et = toolsGetTimestampUs();
191✔
218
        int64_t delay = et - st;
191✔
219
        debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, delay);
191✔
220

221
        if (ret == 0) {
191✔
222
            pThreadInfo->query_delay_list[index] = delay;
179✔
223
            pThreadInfo->totalQueried++;
179✔
224
        }
225
        index++;
191✔
226
        totalDelay += delay;
191✔
227
        if (delay > maxDelay) {
191✔
228
            maxDelay = delay;
131✔
229
        }
230
        if (delay < minDelay) {
191✔
231
            minDelay = delay;
167✔
232
        }
233

234
        uint64_t currentPrintTime = toolsGetTimestampMs();
191✔
235
        uint64_t endTs = toolsGetTimestampMs();
191✔
236

237
        if ((ret == 0) && (currentPrintTime - lastPrintTime > 30 * 1000)) {
191✔
UNCOV
238
            infoPrint(
×
239
                    "thread[%d] has currently completed queries: %" PRIu64
240
                    ", QPS: %10.6f\n",
241
                    pThreadInfo->threadID, pThreadInfo->totalQueried,
242
                    (double)(pThreadInfo->totalQueried /
243
                        ((endTs - startTs) / 1000.0)));
UNCOV
244
            lastPrintTime = currentPrintTime;
×
245
        }
246

247
        if (-2 == ret) {
191✔
UNCOV
248
            toolsMsleep(1000);
×
UNCOV
249
            return NULL;
×
250
        }
251
    }
252
    qsort(pThreadInfo->query_delay_list, queryTimes,
107✔
253
            sizeof(uint64_t), compare);
254
    pThreadInfo->avg_delay = (double)totalDelay / queryTimes;
107✔
255
    return NULL;
107✔
256
}
257

258
static void *superTableQuery(void *sarg) {
38✔
259
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
38✔
260
    threadInfo *pThreadInfo = (threadInfo *)sarg;
38✔
261
#ifdef LINUX
262
    prctl(PR_SET_NAME, "superTableQuery");
38✔
263
#endif
264

265
    uint64_t st = 0;
38✔
266
    uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000;
38✔
267

268
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
38✔
269
    uint64_t startTs = toolsGetTimestampMs();
38✔
270

271
    uint64_t lastPrintTime = toolsGetTimestampMs();
38✔
272
    while (queryTimes--) {
122✔
273
        if (g_queryInfo.superQueryInfo.queryInterval
84✔
274
            && ((et - st) <
83✔
275
                (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000)) {
83✔
276
            toolsMsleep((int32_t)
46✔
277
                        (g_queryInfo.superQueryInfo.queryInterval*1000
46✔
278
                        - (et - st)));
46✔
279
        }
280

281
        st = toolsGetTimestampMs();
84✔
282
        for (int i = (int)pThreadInfo->start_table_from;
84✔
283
             i <= pThreadInfo->end_table_to; i++) {
217✔
284
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
6,207✔
285
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
6,074✔
286
                replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr,
6,074✔
287
                                    i);
288
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
6,073✔
289
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
6,073✔
290
                            g_queryInfo.superQueryInfo.result[j],
6,073✔
291
                            pThreadInfo->threadID);
292
                }
293
                if (selectAndGetResult(pThreadInfo, sqlstr)) {
6,073✔
UNCOV
294
                    g_fail = true;
×
295
                }
296

297
                pThreadInfo->totalQueried++;
6,074✔
298

299
                int64_t currentPrintTime = toolsGetTimestampMs();
6,074✔
300
                int64_t endTs = toolsGetTimestampMs();
6,074✔
301
                if (currentPrintTime - lastPrintTime > 30 * 1000) {
6,073✔
UNCOV
302
                    infoPrint(
×
303
                        "thread[%d] has currently completed queries: %" PRIu64
304
                        ", QPS: %10.3f\n",
305
                        pThreadInfo->threadID, pThreadInfo->totalQueried,
306
                        (double)(pThreadInfo->totalQueried /
307
                                 ((endTs - startTs) / 1000.0)));
UNCOV
308
                    lastPrintTime = currentPrintTime;
×
309
                }
310
            }
311
        }
312
        et = toolsGetTimestampMs();
83✔
313
    }
314
    tmfree(sqlstr);
38✔
315
    return NULL;
38✔
316
}
317

318
static int multi_thread_super_table_query(uint16_t iface, char* dbName) {
12✔
319
    int ret = -1;
12✔
320
    pthread_t * pidsOfSub = NULL;
12✔
321
    threadInfo *infosOfSub = NULL;
12✔
322
    //==== create sub threads for query from all sub table of the super table
323
    if ((g_queryInfo.superQueryInfo.sqlCount > 0)
12✔
324
            && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
20✔
325
        pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
326
                                *sizeof(pthread_t),
327
                                false);
328
        infosOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
329
                                 *sizeof(threadInfo), false);
330

331
        int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
10✔
332
        int     threads = g_queryInfo.superQueryInfo.threadCnt;
10✔
333

334
        int64_t a = ntables / threads;
10✔
335
        if (a < 1) {
10✔
336
            threads = (int)ntables;
2✔
337
            a = 1;
2✔
338
        }
339

340
        int64_t b = 0;
10✔
341
        if (threads != 0) {
10✔
342
            b = ntables % threads;
10✔
343
        }
344

345
        uint64_t tableFrom = 0;
10✔
346
        for (int i = 0; i < threads; i++) {
48✔
347
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
348
            pThreadInfo->threadID = i;
38✔
349
            pThreadInfo->start_table_from = tableFrom;
38✔
350
            pThreadInfo->ntables = i < b ? a + 1 : a;
38✔
351
            pThreadInfo->end_table_to =
38✔
352
                    i < b ? tableFrom + a : tableFrom + a - 1;
38✔
353
            tableFrom = pThreadInfo->end_table_to + 1;
38✔
354
            if (iface == REST_IFACE) {
38✔
355
                int sockfd = createSockFd();
5✔
356
                if (sockfd < 0) {
5✔
357
                    goto OVER;
×
358
                }
359
                pThreadInfo->sockfd = sockfd;
5✔
360
            } else {
361
                pThreadInfo->conn = initBenchConn();
33✔
362
                if (pThreadInfo->conn == NULL) {
33✔
UNCOV
363
                    goto OVER;
×
364
                }
365
            }
366
            pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
38✔
367
        }
368
        g_queryInfo.superQueryInfo.threadCnt = threads;
10✔
369

370
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
48✔
371
            if (!g_arguments->terminate)
38✔
372
                pthread_join(pidsOfSub[i], NULL);
38✔
373
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
374
            if (iface == REST_IFACE) {
38✔
375
                destroySockFd(pThreadInfo->sockfd);
5✔
376
            } else {
377
                closeBenchConn(pThreadInfo->conn);
33✔
378
            }
379
            if (g_fail) {
38✔
UNCOV
380
                goto OVER;
×
381
            }
382
        }
383
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; ++i) {
48✔
384
            g_queryInfo.superQueryInfo.totalQueried
385
                += infosOfSub[i].totalQueried;
38✔
386
        }
387
    } else {
388
        return 0;
2✔
389
    }
390

391
    ret = 0;
10✔
392
OVER:
10✔
393
    tmfree((char *)pidsOfSub);
10✔
394
    tmfree((char *)infosOfSub);
10✔
395

396
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
80✔
397
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
70✔
398
    }
399
    tmfree(g_queryInfo.superQueryInfo.childTblName);
10✔
400
    return ret;
10✔
401
}
402

403
// free g_queryInfo.specailQueryInfo memory , can re-call
404
void freeSpecialQueryInfo() {
11✔
405
    // can re-call
406
    if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
11✔
UNCOV
407
        return;
×
408
    }
409

410
    // loop free each item memory
411
    for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
228✔
412
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
217✔
413
        tmfree(sql->command);
217✔
414
        tmfree(sql->delay_list);
217✔
415
    }
416

417
    // free Array
418
    benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
11✔
419
    g_queryInfo.specifiedQueryInfo.sqls = NULL;
11✔
420
}
421

422

423
static int multi_thread_specified_table_query(uint16_t iface, char* dbName) {
13✔
424
    pthread_t * pids = NULL;
13✔
425
    threadInfo *infos = NULL;
13✔
426
    //==== create sub threads for query from specify table
427
    int      nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
13✔
428
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
13✔
429

430
    // check invaid
431
    if(nSqlCount == 0 || nConcurrent == 0 ) {
13✔
432
        if(nSqlCount == 0)
2✔
433
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
2✔
434
        if(nConcurrent == 0)
2✔
UNCOV
435
           warnPrint("concurrent is %d , specified_table_query->concurrent is zero. \n", nConcurrent);
×
436
        return 0;
2✔
437
    }
438

439
    // malloc funciton global memory
440
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
11✔
441
    infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false);
11✔
442

443
    bool exeError = false;
11✔
444
    for (uint64_t i = 0; i < nSqlCount; i++) {
48✔
445
        // reset
446
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
39✔
447
        memset(infos, 0, nConcurrent * sizeof(threadInfo));
39✔
448

449
        // get execute sql
450
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
39✔
451

452
        // create threads
453
        int threadCnt = 0;
39✔
454
        for (int j = 0; j < nConcurrent; j++) {
146✔
455
           threadInfo *pThreadInfo = infos + j;
107✔
456
           pThreadInfo->threadID = i * nConcurrent + j;
107✔
457
           pThreadInfo->querySeq = i;
107✔
458
           if (iface == REST_IFACE) {
107✔
459
                int sockfd = createSockFd();
13✔
460
                // int iMode = 1;
461
                // ioctl(sockfd, FIONBIO, &iMode);
462
                if (sockfd < 0) {
13✔
UNCOV
463
                    exeError = true;
×
464

465
                    break;
×
466
                }
467
                pThreadInfo->sockfd = sockfd;
13✔
468
           } else {
469
                pThreadInfo->conn = initBenchConn();
94✔
470
                if (pThreadInfo->conn == NULL) {
94✔
471
                    destroySockFd(pThreadInfo->sockfd);
×
UNCOV
472
                    exeError = true;
×
UNCOV
473
                    break;
×
474
                }
475
           }
476

477
           pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo);
107✔
478
           threadCnt++;
107✔
479
        }
480

481
        // if failed, set termainte flag true like ctrl+c exit
482
        if (exeError) {
39✔
UNCOV
483
            errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i);
×
UNCOV
484
            g_arguments->terminate = true;
×
485
        }
486

487
        // wait threads execute finished one by one
488
        for (int j = 0; j < threadCnt ; j++) {
146✔
489
           pthread_join(pids[j], NULL);
107✔
490
           threadInfo *pThreadInfo = infos + j;
107✔
491
           if (iface == REST_IFACE) {
107✔
492
#ifdef WINDOWS
493
                closesocket(pThreadInfo->sockfd);
494
                WSACleanup();
495
#else
496
                close(pThreadInfo->sockfd);
13✔
497
#endif
498
           } else {
499
                closeBenchConn(pThreadInfo->conn);
94✔
500
                pThreadInfo->conn = NULL;
94✔
501
           }
502

503
           // need exit in loop
504
           if (g_fail || g_arguments->terminate) {
107✔
505
                // free BArray
506
                tmfree(pThreadInfo->query_delay_list);
6✔
507
                pThreadInfo->query_delay_list = NULL;
6✔
508
           }
509
        }
510

511
        // cancel or need exit check
512
        if (g_fail || g_arguments->terminate) {
39✔
513
            // free current funciton malloc memory
514
            tmfree((char *)pids);
2✔
515
            tmfree((char *)infos);
2✔
516
            // free global
517
            freeSpecialQueryInfo();
2✔
518
            return -1;
2✔
519
        }
520

521
        // execute successfully
522
        uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
37✔
523
        uint64_t totalQueryTimes = query_times * nConcurrent;
37✔
524
        double   avg_delay = 0.0;
37✔
525
        for (int j = 0; j < nConcurrent; j++) {
138✔
526
           threadInfo *pThreadInfo = infos + j;
101✔
527
           avg_delay += pThreadInfo->avg_delay;
101✔
528
           for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
280✔
529
                sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k];
179✔
530
           }
531

532
           // free BArray
533
           tmfree(pThreadInfo->query_delay_list);
101✔
534
           pThreadInfo->query_delay_list = NULL;
101✔
535
        }
536
        avg_delay /= nConcurrent;
37✔
537
        qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare);
37✔
538
        infoPrintNoTimestamp("complete query with %d threads and %" PRIu64
37✔
539
                             " query delay "
540
                             "avg: \t%.6fs "
541
                             "min: \t%.6fs "
542
                             "max: \t%.6fs "
543
                             "p90: \t%.6fs "
544
                             "p95: \t%.6fs "
545
                             "p99: \t%.6fs "
546
                             "SQL command: %s"
547
                             "\n",
548
                             nConcurrent, query_times, avg_delay / 1E6,  /* avg */
549
                             sql->delay_list[0] / 1E6,                   /* min */
550
                             sql->delay_list[totalQueryTimes - 1] / 1E6, /*  max */
551
                             /*  p90 */
552
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
553
                             /*  p95 */
554
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
555
                             /*  p99 */
556
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
557
        infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult,
37✔
558
                                   "complete query with %d threads and %" PRIu64
559
                                   " query delay "
560
                                   "avg: \t%.6fs "
561
                                   "min: \t%.6fs "
562
                                   "max: \t%.6fs "
563
                                   "p90: \t%.6fs "
564
                                   "p95: \t%.6fs "
565
                                   "p99: \t%.6fs "
566
                                   "SQL command: %s"
567
                                   "\n",
568
                                   nConcurrent, query_times, avg_delay / 1E6,  /* avg */
569
                                   sql->delay_list[0] / 1E6,                   /* min */
570
                                   sql->delay_list[totalQueryTimes - 1] / 1E6, /*  max */
571
                                   /*  p90 */
572
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
573
                                   /*  p95 */
574
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
575
                                   /*  p99 */
576
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
577
    }
578

579
    g_queryInfo.specifiedQueryInfo.totalQueried =
9✔
580
        g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent;
9✔
581
    tmfree((char *)pids);
9✔
582
    tmfree((char *)infos);
9✔
583

584
    // free specialQueryInfo
585
    freeSpecialQueryInfo();
9✔
586
    return 0;
9✔
587
}
588

589
static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) {
1✔
590
    int code = -1;
1✔
591
    int thread = g_queryInfo.specifiedQueryInfo.concurrent;
1✔
592
    pthread_t * pids = benchCalloc(thread, sizeof(pthread_t), true);
1✔
593
    queryThreadInfo *infos = benchCalloc(thread, sizeof(queryThreadInfo), true);
1✔
594
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
1✔
595
    int start_sql = 0;
1✔
596
    int a = total_sql_num / thread;
1✔
597
    if (a < 1) {
1✔
598
        thread = total_sql_num;
1✔
599
        a = 1;
1✔
600
    }
601
    int b = 0;
1✔
602
    if (thread != 0) {
1✔
603
        b = total_sql_num % thread;
1✔
604
    }
605
    for (int i = 0; i < thread; ++i) {
3✔
606
        queryThreadInfo *pQueryThreadInfo = infos + i;
2✔
607
        pQueryThreadInfo->threadId = i;
2✔
608
        pQueryThreadInfo->start_sql = start_sql;
2✔
609
        pQueryThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1;
2✔
610
        start_sql = pQueryThreadInfo->end_sql + 1;
2✔
611
        pQueryThreadInfo->total_delay = 0;
2✔
612
        pQueryThreadInfo->query_delay_list = benchArrayInit(1, sizeof(int64_t));
2✔
613
        if (iface == REST_IFACE) {
2✔
614
            int sockfd = createSockFd();
×
UNCOV
615
            if (sockfd < 0) {
×
616
                goto OVER;
×
617
            }
UNCOV
618
            pQueryThreadInfo->sockfd = sockfd;
×
619
        } else {
620
            pQueryThreadInfo->conn = initBenchConn();
2✔
621
            if (pQueryThreadInfo->conn == NULL) {
2✔
UNCOV
622
                goto OVER;
×
623
            }
624
        }
625
        pthread_create(pids + i, NULL, mixedQuery, pQueryThreadInfo);
2✔
626
    }
627

628
    int64_t start = toolsGetTimestampUs();
1✔
629
    for (int i = 0; i < thread; ++i) {
3✔
630
        pthread_join(pids[i], NULL);
2✔
631
    }
632
    int64_t end = toolsGetTimestampUs();
1✔
633

634
    // statistic
635
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
1✔
636
    int64_t total_delay = 0;
1✔
637
    for (int i = 0; i < thread; ++i) {
3✔
638
        queryThreadInfo * pThreadInfo = infos + i;
2✔
639
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
2✔
640
                pThreadInfo->query_delay_list->size, true);
2✔
641
        total_delay += pThreadInfo->total_delay;
2✔
642
        tmfree(pThreadInfo->query_delay_list);
2✔
643
        pThreadInfo->query_delay_list = NULL;
2✔
644

645
        if (iface == REST_IFACE) {
2✔
646
#ifdef  WINDOWS
647
            closesocket(pThreadInfo->sockfd);
648
            WSACleanup();
649
#else
UNCOV
650
            close(pThreadInfo->sockfd);
×
651
#endif
652
        } else {
653
            closeBenchConn(pThreadInfo->conn);
2✔
654
        }
655
    }
656
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
1✔
657
    if (delay_list->size) {
1✔
658
        infoPrint(
1✔
659
                "spend %.6fs using "
660
                "%d threads complete query %d times,cd  "
661
                "min delay: %.6fs, "
662
                "avg delay: %.6fs, "
663
                "p90: %.6fs, "
664
                "p95: %.6fs, "
665
                "p99: %.6fs, "
666
                "max: %.6fs\n",
667
                (end - start)/1E6,
668
                thread, (int)delay_list->size,
669
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
670
                (double)total_delay/delay_list->size/1E6,
671
                *(int64_t *)(benchArrayGet(delay_list,
672
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
673
                *(int64_t *)(benchArrayGet(delay_list,
674
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
675
                *(int64_t *)(benchArrayGet(delay_list,
676
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
677
                *(int64_t *)(benchArrayGet(delay_list,
678
                                    (int32_t)(delay_list->size - 1)))/1E6);
679
    } else {
UNCOV
680
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
681
                   __func__, __LINE__, (int64_t)delay_list->size);
682
    }
683
    benchArrayDestroy(delay_list);
1✔
684
    code = 0;
1✔
685
OVER:
1✔
686
    tmfree(pids);
1✔
687
    tmfree(infos);
1✔
688
    return code;
1✔
689
}
690

691
#define KILLID_LEN  20
692

693
void *queryKiller(void *arg) {
×
UNCOV
694
    char host[MAX_HOSTNAME_LEN] = {0};
×
695
    tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN);
×
696

697
    while (true) {
×
698
        TAOS *taos = taos_connect(g_arguments->host, g_arguments->user,
×
699
                g_arguments->password, NULL, g_arguments->port);
×
UNCOV
700
        if (NULL == taos) {
×
UNCOV
701
            errorPrint("Slow query killer thread "
×
702
                    "failed to connect to the server %s\n",
703
                    g_arguments->host);
UNCOV
704
            return NULL;
×
705
        }
706

707
        char command[TSDB_MAX_ALLOWED_SQL_LEN] =
×
708
            "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries";
709
        TAOS_RES *res = taos_query(taos, command);
×
710
        int32_t code = taos_errno(res);
×
UNCOV
711
        if (code) {
×
UNCOV
712
            printErrCmdCodeStr(command, code, res);
×
713
        }
714

715
        TAOS_ROW row = NULL;
×
716
        while ((row = taos_fetch_row(res)) != NULL) {
×
717
            int32_t *lengths = taos_fetch_lengths(res);
×
UNCOV
718
            if (lengths[0] <= 0) {
×
719
                infoPrint("No valid query found by %s\n", command);
×
720
            } else {
721
                int64_t execUSec = *(int64_t*)row[1];
×
722

723
                if (execUSec > g_queryInfo.killQueryThreshold * 1000000) {
×
UNCOV
724
                    char sql[SHORT_1K_SQL_BUFF_LEN] = {0};
×
UNCOV
725
                    tstrncpy(sql, (char*)row[2],
×
726
                             min(strlen((char*)row[2])+1,
727
                                 SHORT_1K_SQL_BUFF_LEN));
728

UNCOV
729
                    char killId[KILLID_LEN] = {0};
×
730
                    tstrncpy(killId, (char*)row[0],
×
731
                            min(strlen((char*)row[0])+1, KILLID_LEN));
UNCOV
732
                    char killCommand[KILLID_LEN + 15] = {0};
×
733
                    snprintf(killCommand, KILLID_LEN + 15,
×
734
                             "KILL QUERY '%s'", killId);
735
                    TAOS_RES *resKill = taos_query(taos, killCommand);
×
736
                    int32_t codeKill = taos_errno(resKill);
×
UNCOV
737
                    if (codeKill) {
×
738
                        printErrCmdCodeStr(killCommand, codeKill, resKill);
×
739
                    } else {
740
                        infoPrint("%s succeed, sql: %s killed!\n",
×
741
                                  killCommand, sql);
UNCOV
742
                        taos_free_result(resKill);
×
743
                    }
744
                }
745
            }
746
        }
747

748
        taos_free_result(res);
×
UNCOV
749
        taos_close(taos);
×
UNCOV
750
        toolsMsleep(g_queryInfo.killQueryInterval*1000);
×
751
    }
752

753
    return NULL;
754
}
755

756
int queryTestProcess() {
14✔
757
    prompt(0);
14✔
758

759
    if (REST_IFACE == g_queryInfo.iface) {
14✔
760
        encodeAuthBase64();
3✔
761
    }
762

763
    pthread_t pidKiller = {0};
14✔
764
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
14✔
765
        pthread_create(&pidKiller, NULL, queryKiller, NULL);
×
UNCOV
766
        pthread_join(pidKiller, NULL);
×
UNCOV
767
        toolsMsleep(1000);
×
768
    }
769

770
    if (g_queryInfo.iface == REST_IFACE) {
14✔
771
        if (convertHostToServAddr(g_arguments->host,
3✔
772
                    g_arguments->port + TSDB_PORT_HTTP,
3✔
773
                    &(g_arguments->serv_addr)) != 0) {
3✔
UNCOV
774
            errorPrint("%s", "convert host to server address\n");
×
UNCOV
775
            return -1;
×
776
        }
777
    }
778

779
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
14✔
780
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
12✔
781
        SBenchConn* conn = initBenchConn();
12✔
782
        if (conn == NULL) {
12✔
UNCOV
783
            return -1;
×
784
        }
785
        char  cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
12✔
786
        if (3 == g_majorVersionOfClient) {
12✔
787
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
12✔
788
                    "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM %s.%s)",
789
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
790
        } else {
UNCOV
791
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
792
                     "SELECT COUNT(TBNAME) FROM %s.%s",
793
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
794
        }
795
        TAOS_RES *res = taos_query(conn->taos, cmd);
12✔
796
        int32_t   code = taos_errno(res);
12✔
797
        if (code) {
12✔
798
            printErrCmdCodeStr(cmd, code, res);
×
UNCOV
799
            closeBenchConn(conn);
×
UNCOV
800
            return -1;
×
801
        }
802
        TAOS_ROW    row = NULL;
12✔
803
        int         num_fields = taos_num_fields(res);
12✔
804
        TAOS_FIELD *fields = taos_fetch_fields(res);
12✔
805
        while ((row = taos_fetch_row(res)) != NULL) {
24✔
806
            if (0 == strlen((char *)(row[0]))) {
12✔
807
                errorPrint("stable %s have no child table\n",
×
808
                        g_queryInfo.superQueryInfo.stbName);
809
                taos_free_result(res);
×
UNCOV
810
                closeBenchConn(conn);
×
UNCOV
811
                return -1;
×
812
            }
813
            char temp[256] = {0};
12✔
814
            taos_print_row(temp, row, fields, num_fields);
12✔
815
            g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp);
12✔
816
        }
817
        infoPrint("%s's childTblCount: %" PRId64 "\n",
12✔
818
                g_queryInfo.superQueryInfo.stbName,
819
                g_queryInfo.superQueryInfo.childTblCount);
820
        taos_free_result(res);
12✔
821
        g_queryInfo.superQueryInfo.childTblName =
12✔
822
            benchCalloc(g_queryInfo.superQueryInfo.childTblCount,
12✔
823
                    sizeof(char *), false);
824
        if (getAllChildNameOfSuperTable(
12✔
825
                    conn->taos, g_queryInfo.dbName,
826
                    g_queryInfo.superQueryInfo.stbName,
827
                    g_queryInfo.superQueryInfo.childTblName,
828
                    g_queryInfo.superQueryInfo.childTblCount)) {
829
            tmfree(g_queryInfo.superQueryInfo.childTblName);
×
UNCOV
830
            closeBenchConn(conn);
×
UNCOV
831
            return -1;
×
832
        }
833
        closeBenchConn(conn);
12✔
834
    }
835
    uint64_t startTs = toolsGetTimestampMs();
14✔
836
    if (g_queryInfo.specifiedQueryInfo.mixed_query) {
14✔
837
        if (multi_thread_specified_mixed_query(g_queryInfo.iface,
1✔
838
                    g_queryInfo.dbName)) {
UNCOV
839
            return -1;
×
840
        }
841
    } else {
842
        if (multi_thread_specified_table_query(g_queryInfo.iface,
13✔
843
                    g_queryInfo.dbName)) {
844
            return -1;
2✔
845
        }
846
    }
847
    if (multi_thread_super_table_query(g_queryInfo.iface,
12✔
848
                g_queryInfo.dbName)) {
UNCOV
849
        return -1;
×
850
    }
851
    // workaround to use separate taos connection;
852
    uint64_t endTs = toolsGetTimestampMs();
12✔
853
    int64_t t = endTs - startTs;
12✔
854
    double  tInS = (double)t / 1000.0;
12✔
855
    if (g_queryInfo.specifiedQueryInfo.totalQueried)
12✔
856
        infoPrint("Total specified queries: %" PRIu64 "\n",
9✔
857
              g_queryInfo.specifiedQueryInfo.totalQueried);
858
    if (g_queryInfo.superQueryInfo.totalQueried)
12✔
859
    infoPrint("Total super queries: %" PRIu64 "\n",
10✔
860
              g_queryInfo.superQueryInfo.totalQueried);
861
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
12✔
862
        + g_queryInfo.superQueryInfo.totalQueried;
12✔
863
    infoPrint(
12✔
864
            "Spend %.4f second completed total queries: %" PRIu64
865
            ", the QPS of all threads: %10.3f\n\n",
866
            tInS, totalQueried, (double)totalQueried / tInS);
867
    infoPrintToFile(g_arguments->fpOfInsertResult,
12✔
868
            "Spend %.4f second completed total queries: %" PRIu64
869
            ", the QPS of all threads: %10.3f\n\n",
870
            tInS, totalQueried, (double)totalQueried / tInS);
871
    return 0;
12✔
872
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc