• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 5087974551

pending completion
5087974551

push

github

GitHub
fix: limit num_of_records_per_req range size (#662)

6 of 6 new or added lines in 1 file covered. (100.0%)

10934 of 13623 relevant lines covered (80.26%)

227182.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.43
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14

15
extern int g_majorVersionOfClient;
16

17
int selectAndGetResult(threadInfo *pThreadInfo, char *command) {
6,265✔
18
    int ret = 0;
6,265✔
19

20
    if (g_arguments->terminate) {
6,265✔
21
        return -1;
×
22
    }
23
    uint32_t threadID = pThreadInfo->threadID;
6,265✔
24
    char dbName[TSDB_DB_NAME_LEN] = {0};
6,265✔
25
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
6,265✔
26

27
    if (g_queryInfo.iface == REST_IFACE) {
6,265✔
28
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
36✔
29
                                   0, g_arguments->port, false,
36✔
30
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
36✔
31
        if (0 != retCode) {
36✔
32
            errorPrint("====restful return fail, threadID[%u]\n",
×
33
                       threadID);
34
            ret = -1;
×
35
        }
36
    } else {
37
        TAOS *taos = pThreadInfo->conn->taos;
6,229✔
38
        if (taos_select_db(taos, g_queryInfo.dbName)) {
6,229✔
39
            errorPrint("thread[%u]: failed to select database(%s)\n",
×
40
                threadID, dbName);
41
            ret = -2;
×
42
        } else {
43
            TAOS_RES *res = taos_query(taos, command);
6,229✔
44
            int code = taos_errno(res);
6,228✔
45
            if (res == NULL || code) {
6,228✔
46
                if (YES_IF_FAILED == g_arguments->continueIfFail) {
15✔
47
                    warnPrint("failed to execute sql:%s, "
3✔
48
                              "code: 0x%08x, reason:%s\n",
49
                               command, code, taos_errstr(res));
50
                } else {
51
                    errorPrint("failed to execute sql:%s, "
12✔
52
                               "code: 0x%08x, reason:%s\n",
53
                               command, code, taos_errstr(res));
54
                    ret = -1;
12✔
55
                }
56
            } else {
57
                //if (strlen(pThreadInfo->filePath) > 0) {
58
                    fetchResult(res, pThreadInfo);
6,213✔
59
                //}
60
            }
61
            taos_free_result(res);
6,229✔
62
        }
63
    }
64
    return ret;
6,265✔
65
}
66

67
static void *mixedQuery(void *sarg) {
2✔
68
    queryThreadInfo *pThreadInfo = (queryThreadInfo*)sarg;
2✔
69
#ifdef LINUX
70
    prctl(PR_SET_NAME, "mixedQuery");
2✔
71
#endif
72
    int64_t lastPrintTs = toolsGetTimestampMs();
2✔
73
    int64_t st;
74
    int64_t et;
75
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
2✔
76
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
4✔
77
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
2✔
78
        for (int j = 0; j < queryTimes; ++j) {
6✔
79
            if (g_arguments->terminate) {
4✔
80
                return NULL;
×
81
            }
82
            if (g_queryInfo.reset_query_cache) {
4✔
83
                if (queryDbExecCall(pThreadInfo->conn,
×
84
                                    "RESET QUERY CACHE")) {
85
                    errorPrint("%s() LN%d, reset query cache failed\n",
×
86
                               __func__, __LINE__);
87
                    return NULL;
×
88
                }
89
            }
90
            st = toolsGetTimestampUs();
4✔
91
            if (g_queryInfo.iface == REST_IFACE) {
4✔
92
                int retCode = postProceSql(sql->command, g_queryInfo.dbName,
×
93
                                           0, g_queryInfo.iface, 0,
×
94
                                           g_arguments->port,
×
95
                                           false, pThreadInfo->sockfd, "");
96
                if (retCode) {
×
97
                    errorPrint("thread[%d]: restful query <%s> failed\n",
×
98
                            pThreadInfo->threadId, sql->command);
99
                    continue;
×
100
                }
101
            } else {
102
                if (g_queryInfo.dbName != NULL) {
4✔
103
                    if (taos_select_db(pThreadInfo->conn->taos,
4✔
104
                                       g_queryInfo.dbName)) {
4✔
105
                        errorPrint("thread[%d]: failed to "
×
106
                                   "select database(%s)\n",
107
                                   pThreadInfo->threadId,
108
                                   g_queryInfo.dbName);
109
                        return NULL;
×
110
                    }
111
                }
112
                TAOS_RES *res = taos_query(pThreadInfo->conn->taos,
4✔
113
                                           sql->command);
4✔
114
                if (res == NULL || taos_errno(res) != 0) {
4✔
115
                    if (YES_IF_FAILED == g_arguments->continueIfFail) {
×
116
                        warnPrint(
×
117
                                "thread[%d]: failed to execute sql :%s, "
118
                                "code: 0x%x, reason: %s\n",
119
                                pThreadInfo->threadId,
120
                                sql->command,
121
                                taos_errno(res), taos_errstr(res));
122
                    } else {
123
                        errorPrint(
×
124
                                "thread[%d]: failed to execute sql :%s, "
125
                                "code: 0x%x, reason: %s\n",
126
                                pThreadInfo->threadId,
127
                                sql->command,
128
                                taos_errno(res), taos_errstr(res));
129
                        if (TSDB_CODE_RPC_NETWORK_UNAVAIL ==
×
130
                                taos_errno(res)) {
×
131
                            return NULL;
×
132
                        }
133
                    }
134
                    continue;
×
135
                }
136
                taos_free_result(res);
4✔
137
            }
138
            et = toolsGetTimestampUs();
4✔
139
            int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
4✔
140
            *delay = et - st;
4✔
141
            debugPrint("%s() LN%d, delay: %"PRId64"\n",
4✔
142
                       __func__, __LINE__, *delay);
143

144
            pThreadInfo->total_delay += (et - st);
4✔
145
            if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
4✔
146
                tmfree(delay);
×
147
            }
148
            int64_t currentPrintTs = toolsGetTimestampMs();
4✔
149
            if (currentPrintTs - lastPrintTs > 10 * 1000) {
4✔
150
                infoPrint("thread[%d] has currently complete query %d times\n",
×
151
                        pThreadInfo->threadId,
152
                        (int)pThreadInfo->query_delay_list->size);
153
                lastPrintTs = currentPrintTs;
×
154
            }
155
        }
156
    }
157
    return NULL;
2✔
158
}
159

160
static void *specifiedTableQuery(void *sarg) {
107✔
161
    threadInfo *pThreadInfo = (threadInfo *)sarg;
107✔
162
#ifdef LINUX
163
    prctl(PR_SET_NAME, "specTableQuery");
107✔
164
#endif
165
    uint64_t st = 0;
107✔
166
    uint64_t et = 0;
107✔
167
    uint64_t minDelay = UINT64_MAX;
107✔
168
    uint64_t maxDelay = 0;
107✔
169
    uint64_t totalDelay = 0;
107✔
170
    int32_t  index = 0;
107✔
171

172
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
107✔
173
    pThreadInfo->query_delay_list = benchCalloc(queryTimes,
107✔
174
            sizeof(uint64_t), false);
175
    uint64_t  lastPrintTime = toolsGetTimestampMs();
107✔
176
    uint64_t  startTs = toolsGetTimestampMs();
107✔
177

178
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls,
107✔
179
            pThreadInfo->querySeq);
180

181
    if (sql->result[0] != '\0') {
107✔
182
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
102✔
183
                sql->result, pThreadInfo->threadID);
102✔
184
    }
185

186
    while (index < queryTimes) {
298✔
187
        // check cancel
188
        if (g_arguments->terminate) {
191✔
189
            return NULL;
×
190
        }
191

192
        if (g_queryInfo.specifiedQueryInfo.queryInterval &&
191✔
193
            (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
190✔
194
            toolsMsleep((int32_t)(
107✔
195
                        g_queryInfo.specifiedQueryInfo.queryInterval*1000
107✔
196
                        - (et - st)));  // ms
107✔
197
        }
198
        if (g_queryInfo.reset_query_cache) {
191✔
199
            if (queryDbExecCall(pThreadInfo->conn,
6✔
200
                                "RESET QUERY CACHE")) {
201
                errorPrint("%s() LN%d, reset query cache failed\n",
×
202
                           __func__, __LINE__);
203
                return NULL;
×
204
            }
205
        }
206

207
        st = toolsGetTimestampUs();
191✔
208
        int ret = selectAndGetResult(pThreadInfo, sql->command);
191✔
209
        if (ret) {
191✔
210
            g_fail = true;
12✔
211
        }
212

213
        et = toolsGetTimestampUs();
191✔
214
        int64_t delay = et - st;
191✔
215
        debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, delay);
191✔
216

217
        if (ret == 0) {
191✔
218
            pThreadInfo->query_delay_list[index] = delay;
179✔
219
            pThreadInfo->totalQueried++;
179✔
220
        }
221
        index++;
191✔
222
        totalDelay += delay;
191✔
223
        if (delay > maxDelay) {
191✔
224
            maxDelay = delay;
123✔
225
        }
226
        if (delay < minDelay) {
191✔
227
            minDelay = delay;
175✔
228
        }
229

230
        uint64_t currentPrintTime = toolsGetTimestampMs();
191✔
231
        uint64_t endTs = toolsGetTimestampMs();
191✔
232

233
        if ((ret == 0) && (currentPrintTime - lastPrintTime > 30 * 1000)) {
191✔
234
            infoPrint(
×
235
                    "thread[%d] has currently completed queries: %" PRIu64
236
                    ", QPS: %10.6f\n",
237
                    pThreadInfo->threadID, pThreadInfo->totalQueried,
238
                    (double)(pThreadInfo->totalQueried /
239
                        ((endTs - startTs) / 1000.0)));
240
            lastPrintTime = currentPrintTime;
×
241
        }
242

243
        if (-2 == ret) {
191✔
244
            toolsMsleep(1000);
×
245
            return NULL;
×
246
        }
247
    }
248
    qsort(pThreadInfo->query_delay_list, queryTimes,
107✔
249
            sizeof(uint64_t), compare);
250
    pThreadInfo->avg_delay = (double)totalDelay / queryTimes;
107✔
251
    return NULL;
107✔
252
}
253

254
static void *superTableQuery(void *sarg) {
38✔
255
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
38✔
256
    threadInfo *pThreadInfo = (threadInfo *)sarg;
38✔
257
#ifdef LINUX
258
    prctl(PR_SET_NAME, "superTableQuery");
38✔
259
#endif
260

261
    uint64_t st = 0;
38✔
262
    uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000;
38✔
263

264
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
38✔
265
    uint64_t startTs = toolsGetTimestampMs();
38✔
266

267
    uint64_t lastPrintTime = toolsGetTimestampMs();
38✔
268
    while (queryTimes--) {
122✔
269
        if (g_queryInfo.superQueryInfo.queryInterval
84✔
270
            && ((et - st) <
83✔
271
                (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000)) {
83✔
272
            toolsMsleep((int32_t)
46✔
273
                        (g_queryInfo.superQueryInfo.queryInterval*1000
46✔
274
                        - (et - st)));
46✔
275
        }
276

277
        st = toolsGetTimestampMs();
84✔
278
        for (int i = (int)pThreadInfo->start_table_from;
84✔
279
             i <= pThreadInfo->end_table_to; i++) {
218✔
280
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
6,208✔
281
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
6,074✔
282
                replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr,
6,074✔
283
                                    i);
284
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
6,074✔
285
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
6,074✔
286
                            g_queryInfo.superQueryInfo.result[j],
6,074✔
287
                            pThreadInfo->threadID);
288
                }
289
                if (selectAndGetResult(pThreadInfo, sqlstr)) {
6,074✔
290
                    g_fail = true;
×
291
                }
292

293
                pThreadInfo->totalQueried++;
6,074✔
294

295
                int64_t currentPrintTime = toolsGetTimestampMs();
6,074✔
296
                int64_t endTs = toolsGetTimestampMs();
6,074✔
297
                if (currentPrintTime - lastPrintTime > 30 * 1000) {
6,074✔
298
                    infoPrint(
×
299
                        "thread[%d] has currently completed queries: %" PRIu64
300
                        ", QPS: %10.3f\n",
301
                        pThreadInfo->threadID, pThreadInfo->totalQueried,
302
                        (double)(pThreadInfo->totalQueried /
303
                                 ((endTs - startTs) / 1000.0)));
304
                    lastPrintTime = currentPrintTime;
×
305
                }
306
            }
307
        }
308
        et = toolsGetTimestampMs();
84✔
309
    }
310
    tmfree(sqlstr);
38✔
311
    return NULL;
38✔
312
}
313

314
static int multi_thread_super_table_query(uint16_t iface, char* dbName) {
12✔
315
    int ret = -1;
12✔
316
    pthread_t * pidsOfSub = NULL;
12✔
317
    threadInfo *infosOfSub = NULL;
12✔
318
    //==== create sub threads for query from all sub table of the super table
319
    if ((g_queryInfo.superQueryInfo.sqlCount > 0)
12✔
320
            && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
20✔
321
        pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
322
                                *sizeof(pthread_t),
323
                                false);
324
        infosOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
325
                                 *sizeof(threadInfo), false);
326

327
        int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
10✔
328
        int     threads = g_queryInfo.superQueryInfo.threadCnt;
10✔
329

330
        int64_t a = ntables / threads;
10✔
331
        if (a < 1) {
10✔
332
            threads = (int)ntables;
2✔
333
            a = 1;
2✔
334
        }
335

336
        int64_t b = 0;
10✔
337
        if (threads != 0) {
10✔
338
            b = ntables % threads;
10✔
339
        }
340

341
        uint64_t tableFrom = 0;
10✔
342
        for (int i = 0; i < threads; i++) {
48✔
343
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
344
            pThreadInfo->threadID = i;
38✔
345
            pThreadInfo->start_table_from = tableFrom;
38✔
346
            pThreadInfo->ntables = i < b ? a + 1 : a;
38✔
347
            pThreadInfo->end_table_to =
38✔
348
                    i < b ? tableFrom + a : tableFrom + a - 1;
38✔
349
            tableFrom = pThreadInfo->end_table_to + 1;
38✔
350
            if (iface == REST_IFACE) {
38✔
351
                int sockfd = createSockFd();
5✔
352
                if (sockfd < 0) {
5✔
353
                    goto OVER;
×
354
                }
355
                pThreadInfo->sockfd = sockfd;
5✔
356
            } else {
357
                pThreadInfo->conn = initBenchConn();
33✔
358
                if (pThreadInfo->conn == NULL) {
33✔
359
                    goto OVER;
×
360
                }
361
            }
362
            pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
38✔
363
        }
364
        g_queryInfo.superQueryInfo.threadCnt = threads;
10✔
365

366
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
48✔
367
            if (!g_arguments->terminate)
38✔
368
                pthread_join(pidsOfSub[i], NULL);
38✔
369
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
370
            if (iface == REST_IFACE) {
38✔
371
                destroySockFd(pThreadInfo->sockfd);
5✔
372
            } else {
373
                closeBenchConn(pThreadInfo->conn);
33✔
374
            }
375
            if (g_fail) {
38✔
376
                goto OVER;
×
377
            }
378
        }
379
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; ++i) {
48✔
380
            g_queryInfo.superQueryInfo.totalQueried
381
                += infosOfSub[i].totalQueried;
38✔
382
        }
383
    } else {
384
        return 0;
2✔
385
    }
386

387
    ret = 0;
10✔
388
OVER:
10✔
389
    tmfree((char *)pidsOfSub);
10✔
390
    tmfree((char *)infosOfSub);
10✔
391

392
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
80✔
393
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
70✔
394
    }
395
    tmfree(g_queryInfo.superQueryInfo.childTblName);
10✔
396
    return ret;
10✔
397
}
398

399
// free g_queryInfo.specailQueryInfo memory , can re-call
400
void freeSpecialQueryInfo() {
11✔
401
    // can re-call
402
    if (g_queryInfo.specifiedQueryInfo.sqls == NULL) {
11✔
403
        return;
×
404
    }
405

406
    // loop free each item memory
407
    for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
228✔
408
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
217✔
409
        tmfree(sql->command);
217✔
410
        tmfree(sql->delay_list);
217✔
411
    }
412

413
    // free Array
414
    benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
11✔
415
    g_queryInfo.specifiedQueryInfo.sqls = NULL;
11✔
416
}
417

418

419
static int multi_thread_specified_table_query(uint16_t iface, char* dbName) {
13✔
420
    pthread_t * pids = NULL;
13✔
421
    threadInfo *infos = NULL;
13✔
422
    //==== create sub threads for query from specify table
423
    int      nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
13✔
424
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
13✔
425

426
    // check invaid
427
    if(nSqlCount == 0 || nConcurrent == 0 ) {
13✔
428
        if(nSqlCount == 0)
2✔
429
           infoPrint(" query sql count is %" PRIu64 ".  must set query sqls. \n", nSqlCount);
2✔
430
        if(nConcurrent == 0)
2✔
431
           infoPrint(" concurrent is %d , specified_table_query->concurrent must not zero. \n", nConcurrent);
×
432
        return 0;
2✔
433
    }
434

435
    // malloc funciton global memory
436
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
11✔
437
    infos = benchCalloc(1, nConcurrent * sizeof(threadInfo), false);
11✔
438

439
    bool exeError = false;
11✔
440
    for (uint64_t i = 0; i < nSqlCount; i++) {
48✔
441
        // reset
442
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
39✔
443
        memset(infos, 0, nConcurrent * sizeof(threadInfo));
39✔
444

445
        // get execute sql
446
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
39✔
447

448
        // create threads
449
        for (int j = 0; j < nConcurrent; j++) {
146✔
450
           threadInfo *pThreadInfo = infos + j;
107✔
451
           pThreadInfo->threadID = i * nConcurrent + j;
107✔
452
           pThreadInfo->querySeq = i;
107✔
453
           if (iface == REST_IFACE) {
107✔
454
                int sockfd = createSockFd();
13✔
455
                // int iMode = 1;
456
                // ioctl(sockfd, FIONBIO, &iMode);
457
                if (sockfd < 0) {
13✔
458
                    exeError = true;
×
459

460
                    break;
×
461
                }
462
                pThreadInfo->sockfd = sockfd;
13✔
463
           } else {
464
                pThreadInfo->conn = initBenchConn();
94✔
465
                if (pThreadInfo->conn == NULL) {
94✔
466
                    destroySockFd(pThreadInfo->sockfd);
×
467
                    exeError = true;
×
468
                    break;
×
469
                }
470
           }
471

472
           pthread_create(pids + j, NULL, specifiedTableQuery, pThreadInfo);
107✔
473
        }
474

475
        // if failed, set termainte flag true like ctrl+c exit
476
        if (exeError) {
39✔
477
            errorPrint(" i=%" PRIu64 " create thread occur error, so wait exit ...\n", i);
×
478
            g_arguments->terminate = true;
×
479
        }
480

481
        // wait threads execute finished one by one
482
        for (int j = 0; (j < nConcurrent && pids[j] > 0) ; j++) {
146✔
483
           pthread_join(pids[j], NULL);
107✔
484
           threadInfo *pThreadInfo = infos + j;
107✔
485
           if (iface == REST_IFACE) {
107✔
486
#ifdef WINDOWS
487
                closesocket(pThreadInfo->sockfd);
488
                WSACleanup();
489
#else
490
                close(pThreadInfo->sockfd);
13✔
491
#endif
492
           } else {
493
                closeBenchConn(pThreadInfo->conn);
94✔
494
                pThreadInfo->conn = NULL;
94✔
495
           }
496

497
           // need exit in loop
498
           if (g_fail || g_arguments->terminate) {
107✔
499
                // free BArray
500
                tmfree(pThreadInfo->query_delay_list);
6✔
501
                pThreadInfo->query_delay_list = NULL;
6✔
502
           }
503
        }
504

505
        // cancel or need exit check
506
        if (g_fail || g_arguments->terminate) {
39✔
507
            // free current funciton malloc memory
508
            tmfree((char *)pids);
2✔
509
            tmfree((char *)infos);
2✔
510
            // free global
511
            freeSpecialQueryInfo();
2✔
512
            return -1;
2✔
513
        }
514

515
        // execute successfully
516
        uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
37✔
517
        uint64_t totalQueryTimes = query_times * nConcurrent;
37✔
518
        double   avg_delay = 0.0;
37✔
519
        for (int j = 0; j < nConcurrent; j++) {
138✔
520
           threadInfo *pThreadInfo = infos + j;
101✔
521
           avg_delay += pThreadInfo->avg_delay;
101✔
522
           for (uint64_t k = 0; k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
280✔
523
                sql->delay_list[j * query_times + k] = pThreadInfo->query_delay_list[k];
179✔
524
           }
525

526
           // free BArray
527
           tmfree(pThreadInfo->query_delay_list);
101✔
528
           pThreadInfo->query_delay_list = NULL;
101✔
529
        }
530
        avg_delay /= nConcurrent;
37✔
531
        qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes, sizeof(uint64_t), compare);
37✔
532
        infoPrintNoTimestamp("complete query with %d threads and %" PRIu64
37✔
533
                             " query delay "
534
                             "avg: \t%.6fs "
535
                             "min: \t%.6fs "
536
                             "max: \t%.6fs "
537
                             "p90: \t%.6fs "
538
                             "p95: \t%.6fs "
539
                             "p99: \t%.6fs "
540
                             "SQL command: %s"
541
                             "\n",
542
                             nConcurrent, query_times, avg_delay / 1E6,  /* avg */
543
                             sql->delay_list[0] / 1E6,                   /* min */
544
                             sql->delay_list[totalQueryTimes - 1] / 1E6, /*  max */
545
                             /*  p90 */
546
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
547
                             /*  p95 */
548
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
549
                             /*  p99 */
550
                             sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
551
        infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult,
37✔
552
                                   "complete query with %d threads and %" PRIu64
553
                                   " query delay "
554
                                   "avg: \t%.6fs "
555
                                   "min: \t%.6fs "
556
                                   "max: \t%.6fs "
557
                                   "p90: \t%.6fs "
558
                                   "p95: \t%.6fs "
559
                                   "p99: \t%.6fs "
560
                                   "SQL command: %s"
561
                                   "\n",
562
                                   nConcurrent, query_times, avg_delay / 1E6,  /* avg */
563
                                   sql->delay_list[0] / 1E6,                   /* min */
564
                                   sql->delay_list[totalQueryTimes - 1] / 1E6, /*  max */
565
                                   /*  p90 */
566
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)] / 1E6,
567
                                   /*  p95 */
568
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)] / 1E6,
569
                                   /*  p99 */
570
                                   sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)] / 1E6, sql->command);
571
    }
572

573
    g_queryInfo.specifiedQueryInfo.totalQueried =
9✔
574
        g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent;
9✔
575
    tmfree((char *)pids);
9✔
576
    tmfree((char *)infos);
9✔
577

578
    // free specialQueryInfo
579
    freeSpecialQueryInfo();
9✔
580
    return 0;
9✔
581
}
582

583
static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) {
1✔
584
    int code = -1;
1✔
585
    int thread = g_queryInfo.specifiedQueryInfo.concurrent;
1✔
586
    pthread_t * pids = benchCalloc(thread, sizeof(pthread_t), true);
1✔
587
    queryThreadInfo *infos = benchCalloc(thread, sizeof(queryThreadInfo), true);
1✔
588
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
1✔
589
    int start_sql = 0;
1✔
590
    int a = total_sql_num / thread;
1✔
591
    if (a < 1) {
1✔
592
        thread = total_sql_num;
1✔
593
        a = 1;
1✔
594
    }
595
    int b = 0;
1✔
596
    if (thread != 0) {
1✔
597
        b = total_sql_num % thread;
1✔
598
    }
599
    for (int i = 0; i < thread; ++i) {
3✔
600
        queryThreadInfo *pQueryThreadInfo = infos + i;
2✔
601
        pQueryThreadInfo->threadId = i;
2✔
602
        pQueryThreadInfo->start_sql = start_sql;
2✔
603
        pQueryThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1;
2✔
604
        start_sql = pQueryThreadInfo->end_sql + 1;
2✔
605
        pQueryThreadInfo->total_delay = 0;
2✔
606
        pQueryThreadInfo->query_delay_list = benchArrayInit(1, sizeof(int64_t));
2✔
607
        if (iface == REST_IFACE) {
2✔
608
            int sockfd = createSockFd();
×
609
            if (sockfd < 0) {
×
610
                goto OVER;
×
611
            }
612
            pQueryThreadInfo->sockfd = sockfd;
×
613
        } else {
614
            pQueryThreadInfo->conn = initBenchConn();
2✔
615
            if (pQueryThreadInfo->conn == NULL) {
2✔
616
                goto OVER;
×
617
            }
618
        }
619
        pthread_create(pids + i, NULL, mixedQuery, pQueryThreadInfo);
2✔
620
    }
621

622
    int64_t start = toolsGetTimestampUs();
1✔
623
    for (int i = 0; i < thread; ++i) {
3✔
624
        pthread_join(pids[i], NULL);
2✔
625
    }
626
    int64_t end = toolsGetTimestampUs();
1✔
627

628
    // statistic
629
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
1✔
630
    int64_t total_delay = 0;
1✔
631
    for (int i = 0; i < thread; ++i) {
3✔
632
        queryThreadInfo * pThreadInfo = infos + i;
2✔
633
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
2✔
634
                pThreadInfo->query_delay_list->size);
2✔
635
        total_delay += pThreadInfo->total_delay;
2✔
636
        tmfree(pThreadInfo->query_delay_list);
2✔
637
        pThreadInfo->query_delay_list = NULL;
2✔
638

639
        if (iface == REST_IFACE) {
2✔
640
#ifdef  WINDOWS
641
            closesocket(pThreadInfo->sockfd);
642
            WSACleanup();
643
#else
644
            close(pThreadInfo->sockfd);
×
645
#endif
646
        } else {
647
            closeBenchConn(pThreadInfo->conn);
2✔
648
        }
649
    }
650
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
1✔
651
    if (delay_list->size) {
1✔
652
        infoPrint(
1✔
653
                "spend %.6fs using "
654
                "%d threads complete query %d times,cd  "
655
                "min delay: %.6fs, "
656
                "avg delay: %.6fs, "
657
                "p90: %.6fs, "
658
                "p95: %.6fs, "
659
                "p99: %.6fs, "
660
                "max: %.6fs\n",
661
                (end - start)/1E6,
662
                thread, (int)delay_list->size,
663
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
664
                (double)total_delay/delay_list->size/1E6,
665
                *(int64_t *)(benchArrayGet(delay_list,
666
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
667
                *(int64_t *)(benchArrayGet(delay_list,
668
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
669
                *(int64_t *)(benchArrayGet(delay_list,
670
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
671
                *(int64_t *)(benchArrayGet(delay_list,
672
                                    (int32_t)(delay_list->size - 1)))/1E6);
673
    } else {
674
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
675
                   __func__, __LINE__, (int64_t)delay_list->size);
676
    }
677
    benchArrayDestroy(delay_list);
1✔
678
    code = 0;
1✔
679
OVER:
1✔
680
    tmfree(pids);
1✔
681
    tmfree(infos);
1✔
682
    return code;
1✔
683
}
684

685
#define KILLID_LEN  20
686

687
void *queryKiller(void *arg) {
×
688
    char host[MAX_HOSTNAME_LEN] = {0};
×
689
    tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN);
×
690

691
    while (true) {
×
692
        TAOS *taos = taos_connect(g_arguments->host, g_arguments->user,
×
693
                g_arguments->password, NULL, g_arguments->port);
×
694
        if (NULL == taos) {
×
695
            errorPrint("Slow query killer thread "
×
696
                    "failed to connect to the server %s\n",
697
                    g_arguments->host);
698
            return NULL;
×
699
        }
700

701
        char command[TSDB_MAX_ALLOWED_SQL_LEN] =
×
702
            "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries";
703
        TAOS_RES *res = taos_query(taos, command);
×
704
        int32_t code = taos_errno(res);
×
705
        if (code) {
×
706
            printErrCmdCodeStr(command, code, res);
×
707
        }
708

709
        TAOS_ROW row = NULL;
×
710
        while ((row = taos_fetch_row(res)) != NULL) {
×
711
            int32_t *lengths = taos_fetch_lengths(res);
×
712
            if (lengths[0] <= 0) {
×
713
                infoPrint("No valid query found by %s\n", command);
×
714
            } else {
715
                int64_t execUSec = *(int64_t*)row[1];
×
716

717
                if (execUSec > g_queryInfo.killQueryThreshold * 1000000) {
×
718
                    char sql[SHORT_1K_SQL_BUFF_LEN] = {0};
×
719
                    tstrncpy(sql, (char*)row[2],
×
720
                             min(strlen((char*)row[2])+1,
721
                                 SHORT_1K_SQL_BUFF_LEN));
722

723
                    char killId[KILLID_LEN] = {0};
×
724
                    tstrncpy(killId, (char*)row[0],
×
725
                            min(strlen((char*)row[0])+1, KILLID_LEN));
726
                    char killCommand[KILLID_LEN + 15] = {0};
×
727
                    snprintf(killCommand, KILLID_LEN + 15,
×
728
                             "KILL QUERY '%s'", killId);
729
                    TAOS_RES *resKill = taos_query(taos, killCommand);
×
730
                    int32_t codeKill = taos_errno(resKill);
×
731
                    if (codeKill) {
×
732
                        printErrCmdCodeStr(killCommand, codeKill, resKill);
×
733
                    } else {
734
                        infoPrint("%s succeed, sql: %s killed!\n",
×
735
                                  killCommand, sql);
736
                        taos_free_result(resKill);
×
737
                    }
738
                }
739
            }
740
        }
741

742
        taos_free_result(res);
×
743
        taos_close(taos);
×
744
        toolsMsleep(g_queryInfo.killQueryInterval*1000);
×
745
    }
746

747
    return NULL;
748
}
749

750
int queryTestProcess() {
14✔
751
    prompt(0);
14✔
752

753
    if (REST_IFACE == g_queryInfo.iface) {
14✔
754
        encodeAuthBase64();
3✔
755
    }
756

757
    pthread_t pidKiller = {0};
14✔
758
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
14✔
759
        pthread_create(&pidKiller, NULL, queryKiller, NULL);
×
760
        pthread_join(pidKiller, NULL);
×
761
        toolsMsleep(1000);
×
762
    }
763

764
    if (g_queryInfo.iface == REST_IFACE) {
14✔
765
        if (convertHostToServAddr(g_arguments->host,
3✔
766
                    g_arguments->port + TSDB_PORT_HTTP,
3✔
767
                    &(g_arguments->serv_addr)) != 0) {
3✔
768
            errorPrint("%s", "convert host to server address\n");
×
769
            return -1;
×
770
        }
771
    }
772

773
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
14✔
774
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
12✔
775
        SBenchConn* conn = initBenchConn();
12✔
776
        if (conn == NULL) {
12✔
777
            return -1;
×
778
        }
779
        char  cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
12✔
780
        if (3 == g_majorVersionOfClient) {
12✔
781
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
12✔
782
                    "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM %s.%s)",
783
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
784
        } else {
785
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
786
                     "SELECT COUNT(TBNAME) FROM %s.%s",
787
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
788
        }
789
        TAOS_RES *res = taos_query(conn->taos, cmd);
12✔
790
        int32_t   code = taos_errno(res);
12✔
791
        if (code) {
12✔
792
            printErrCmdCodeStr(cmd, code, res);
×
793
            closeBenchConn(conn);
×
794
            return -1;
×
795
        }
796
        TAOS_ROW    row = NULL;
12✔
797
        int         num_fields = taos_num_fields(res);
12✔
798
        TAOS_FIELD *fields = taos_fetch_fields(res);
12✔
799
        while ((row = taos_fetch_row(res)) != NULL) {
24✔
800
            if (0 == strlen((char *)(row[0]))) {
12✔
801
                errorPrint("stable %s have no child table\n",
×
802
                        g_queryInfo.superQueryInfo.stbName);
803
                taos_free_result(res);
×
804
                closeBenchConn(conn);
×
805
                return -1;
×
806
            }
807
            char temp[256] = {0};
12✔
808
            taos_print_row(temp, row, fields, num_fields);
12✔
809
            g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp);
12✔
810
        }
811
        infoPrint("%s's childTblCount: %" PRId64 "\n",
12✔
812
                g_queryInfo.superQueryInfo.stbName,
813
                g_queryInfo.superQueryInfo.childTblCount);
814
        taos_free_result(res);
12✔
815
        g_queryInfo.superQueryInfo.childTblName =
12✔
816
            benchCalloc(g_queryInfo.superQueryInfo.childTblCount,
12✔
817
                    sizeof(char *), false);
818
        if (getAllChildNameOfSuperTable(
12✔
819
                    conn->taos, g_queryInfo.dbName,
820
                    g_queryInfo.superQueryInfo.stbName,
821
                    g_queryInfo.superQueryInfo.childTblName,
822
                    g_queryInfo.superQueryInfo.childTblCount)) {
823
            tmfree(g_queryInfo.superQueryInfo.childTblName);
×
824
            closeBenchConn(conn);
×
825
            return -1;
×
826
        }
827
        closeBenchConn(conn);
12✔
828
    }
829
    uint64_t startTs = toolsGetTimestampMs();
14✔
830
    if (g_queryInfo.specifiedQueryInfo.mixed_query) {
14✔
831
        if (multi_thread_specified_mixed_query(g_queryInfo.iface,
1✔
832
                    g_queryInfo.dbName)) {
833
            return -1;
×
834
        }
835
    } else {
836
        if (multi_thread_specified_table_query(g_queryInfo.iface,
13✔
837
                    g_queryInfo.dbName)) {
838
            return -1;
2✔
839
        }
840
    }
841
    if (multi_thread_super_table_query(g_queryInfo.iface,
12✔
842
                g_queryInfo.dbName)) {
843
        return -1;
×
844
    }
845
    // workaround to use separate taos connection;
846
    uint64_t endTs = toolsGetTimestampMs();
12✔
847
    int64_t t = endTs - startTs;
12✔
848
    double  tInS = (double)t / 1000.0;
12✔
849
    if (g_queryInfo.specifiedQueryInfo.totalQueried)
12✔
850
        infoPrint("Total specified queries: %" PRIu64 "\n",
9✔
851
              g_queryInfo.specifiedQueryInfo.totalQueried);
852
    if (g_queryInfo.superQueryInfo.totalQueried)
12✔
853
    infoPrint("Total super queries: %" PRIu64 "\n",
10✔
854
              g_queryInfo.superQueryInfo.totalQueried);
855
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
12✔
856
        + g_queryInfo.superQueryInfo.totalQueried;
12✔
857
    infoPrint(
12✔
858
            "Spend %.4f second completed total queries: %" PRIu64
859
            ", the QPS of all threads: %10.3f\n\n",
860
            tInS, totalQueried, (double)totalQueried / tInS);
861
    infoPrintToFile(g_arguments->fpOfInsertResult,
12✔
862
            "Spend %.4f second completed total queries: %" PRIu64
863
            ", the QPS of all threads: %10.3f\n\n",
864
            tInS, totalQueried, (double)totalQueried / tInS);
865
    return 0;
12✔
866
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc