• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 12906126817

22 Jan 2025 10:22AM UTC coverage: 75.09% (-0.2%) from 75.249%
12906126817

Pull #839

github

web-flow
Merge 2cfc9a375 into cde537a70
Pull Request #839: FIX mixed query mode no need calc thread

406 of 549 new or added lines in 4 files covered. (73.95%)

91 existing lines in 6 files now uncovered.

12456 of 16588 relevant lines covered (75.09%)

328442.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.4
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
// query and get result  record is true to total request
17
int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool record) {
11,315✔
18
    int ret = 0;
11,315✔
19

20
    // user cancel
21
    if (g_arguments->terminate) {
11,315✔
22
        return -1;
×
23
    }
24

25
    // execute sql
26
    uint32_t threadID = pThreadInfo->threadID;
11,315✔
27
    char dbName[TSDB_DB_NAME_LEN] = {0};
11,315✔
28
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
11,315✔
29

30
    if (g_queryInfo.iface == REST_IFACE) {
11,315✔
31
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
2,236✔
32
                                   0, g_arguments->port, false,
2,236✔
33
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
2,236✔
34
        if (0 != retCode) {
2,236✔
NEW
35
            errorPrint("====restful return fail, threadID[%u]\n", threadID);
×
UNCOV
36
            ret = -1;
×
37
        }
38
    } else {
39
        // query
40
        TAOS *taos = pThreadInfo->conn->taos;
9,079✔
41
        int64_t rows  = 0;
9,079✔
42
        TAOS_RES *res = taos_query(taos, command);
9,079✔
43
        int code = taos_errno(res);
9,084✔
44
        if (res == NULL || code) {
9,083✔
45
            // failed query
46
            errorPrint("failed to execute sql:%s, "
9✔
47
                        "code: 0x%08x, reason:%s\n",
48
                        command, code, taos_errstr(res));
49
            ret = -1;
9✔
50
        } else {
51
            // succ query
52
            if (record)
9,074✔
53
                rows = fetchResult(res, pThreadInfo->filePath);
9,066✔
54
        }
55

56
        // free result
57
        if (res) {
9,085✔
58
            taos_free_result(res);
9,084✔
59
        }
60
        debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
9,084✔
61
    }
62

63
    // record count
64
    if (ret ==0) {
11,320✔
65
        // succ
66
        if (record) 
11,311✔
67
            pThreadInfo->nSucc ++;
11,301✔
68
    } else {
69
        // fail
70
        if (record)
9✔
71
            pThreadInfo->nFail ++;
9✔
72

73
        // continue option
74
        if (YES_IF_FAILED == g_arguments->continueIfFail) {
9✔
75
            ret = 0; // force continue
3✔
76
        }
77
    }
78

79
    return ret;
11,320✔
80
}
81

82
// interlligent sleep
83
void autoSleep(uint64_t st, uint64_t et) {
2,921✔
84
    if (g_queryInfo.specifiedQueryInfo.queryInterval &&
2,921✔
85
        (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
182✔
86
        toolsMsleep((int32_t)(
19✔
87
                    g_queryInfo.specifiedQueryInfo.queryInterval*1000
19✔
88
                    - (et - st)));  // ms
19✔
89
    }
90
}
2,921✔
91

92
// reset 
93
int32_t resetQueryCache(qThreadInfo* pThreadInfo) {    
7✔
94
    // execute sql 
95
    if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", false)) {
7✔
NEW
96
        errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
97
        return -1;
×
98
    }
99
    // succ
100
    return 0;
7✔
101
}
102

103

104

105
//
106
//  ---------------------------------  second levle funtion for Thread -----------------------------------
107
//
108

109
// show rela qps
110
int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) {
11,304✔
111
    int64_t now = toolsGetTimestampMs();
11,304✔
112
    if (now - lastPrintTime > 10 * 1000) {
11,305✔
113
        // real total
NEW
114
        uint64_t totalQueried = pThreadInfo->nSucc;
×
NEW
115
        if(g_arguments->continueIfFail == YES_IF_FAILED) {
×
NEW
116
            totalQueried += pThreadInfo->nFail;
×
117
        }
NEW
118
        infoPrint(
×
119
            "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n",
120
            pThreadInfo->threadID, totalQueried,
121
            (double)(totalQueried / ((now - startTs) / 1000.0)));
NEW
122
        return now;
×
123
    } else {
124
        return lastPrintTime;
11,305✔
125
    }    
126
}
127

128
// spec query mixed thread
129
static void *specQueryMixThread(void *sarg) {
9✔
130
    qThreadInfo *pThreadInfo = (qThreadInfo*)sarg;
9✔
131
#ifdef LINUX
132
    prctl(PR_SET_NAME, "specQueryMixThread");
9✔
133
#endif
134
    // use db
135
    if (g_queryInfo.dbName) {
9✔
136
        if (pThreadInfo->conn &&
9✔
137
            pThreadInfo->conn->taos &&
12✔
138
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
6✔
NEW
139
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
140
                return NULL;
×
141
        }
142
    }
143

144
    int64_t st = 0;
9✔
145
    int64_t et = 0;
9✔
146
    int64_t startTs = toolsGetTimestampMs();
9✔
147
    int64_t lastPrintTime = startTs;
9✔
148
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
9✔
149
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
9✔
150
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
18✔
151
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
9✔
152
        for (int j = 0; j < queryTimes; ++j) {
533✔
153
            // use cancel
154
            if(g_arguments->terminate) {
524✔
NEW
155
                infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
156
                break;
×
157
            }
158

159
            // reset cache
160
            if (g_queryInfo.reset_query_cache) {
524✔
NEW
161
                if (resetQueryCache(pThreadInfo)) {
×
NEW
162
                    errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
163
                    return NULL;
×
164
                }
165
            }
166

167
            // execute sql
168
            st = toolsGetTimestampUs();
524✔
169
            int ret = selectAndGetResult(pThreadInfo, sql->command, true);
524✔
170
            if (ret) {
524✔
NEW
171
                g_fail = true;
×
NEW
172
                errorPrint("failed call mix selectAndGetResult, i=%d j=%d", i, j);
×
NEW
173
                return NULL;
×
174
            }
175
            et = toolsGetTimestampUs();
524✔
176

177
            // sleep
178
            autoSleep(st, et);
524✔
179

180
            // delay
181
            if (ret == 0) {
524✔
182
                int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
524✔
183
                *delay = et - st;
524✔
184
                debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay);
524✔
185

186
                pThreadInfo->total_delay += *delay;
524✔
187
                if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
524✔
NEW
188
                    tmfree(delay);
×
189
                }
190
            }
191

192
            // real show
193
            lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
524✔
194
        }
195
    }
196

197
    return NULL;
9✔
198
}
199

200
// spec query thread
201
static void *specQueryThread(void *sarg) {
126✔
202
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
126✔
203
#ifdef LINUX
204
    prctl(PR_SET_NAME, "specQueryThread");
126✔
205
#endif
206
    uint64_t st = 0;
126✔
207
    uint64_t et = 0;
126✔
208
    int32_t  index = 0;
126✔
209

210
    // use db
211
    if (g_queryInfo.dbName) {
126✔
212
        if (pThreadInfo->conn &&
126✔
213
            pThreadInfo->conn->taos &&
208✔
214
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
104✔
215
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
216
                return NULL;
×
217
        }
218
    }
219

220
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
126✔
221
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
126✔
222

223
    uint64_t  startTs       = toolsGetTimestampMs();
126✔
224
    uint64_t  lastPrintTime = startTs;
126✔
225

226
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq);
126✔
227

228
    if (sql->result[0] != '\0') {
126✔
229
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
108✔
230
                sql->result, pThreadInfo->threadID);
108✔
231
    }
232

233
    while (index < queryTimes) {
1,845✔
234
        // use cancel
235
        if(g_arguments->terminate) {
1,724✔
NEW
236
            infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID);
×
237
            break;
6✔
238
        }
239

240
        // reset cache
241
        if (g_queryInfo.reset_query_cache) {
1,724✔
242
            if (resetQueryCache(pThreadInfo)) {
6✔
NEW
243
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
244
                return NULL;
×
245
            }
246
        }
247

248
        // execute sql
249
        st = toolsGetTimestampUs();
1,724✔
250
        int ret = selectAndGetResult(pThreadInfo, sql->command, true);
1,725✔
251
        if (ret) {
1,725✔
252
            g_fail = true;
6✔
253
            errorPrint("failed call spec selectAndGetResult, index=%d\n", index);
6✔
254
            break;
6✔
255
        }
256
        et = toolsGetTimestampUs();
1,719✔
257

258
        // sleep
259
        autoSleep(st, et);
1,719✔
260

261
        uint64_t delay = et - st;
1,718✔
262
        debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
1,718✔
263

264
        if (ret == 0) {
1,718✔
265
            // only succ add delay list
266
            benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
1,719✔
267
            pThreadInfo->total_delay += delay;
1,719✔
268
        }
269
        index++;
1,718✔
270

271
        // real show
272
        lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
1,718✔
273
    }
274

275
    return NULL;
127✔
276
}
277

278
// super table query thread
279
static void *stbQueryThread(void *sarg) {
40✔
280
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
40✔
281
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
40✔
282
#ifdef LINUX
283
    prctl(PR_SET_NAME, "stbQueryThread");
40✔
284
#endif
285

286
    uint64_t st = 0;
40✔
287
    uint64_t et = 0;
40✔
288

289
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
40✔
290
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t));
40✔
291
    
292
    uint64_t startTs = toolsGetTimestampMs();
40✔
293
    uint64_t lastPrintTime = startTs;
40✔
294
    while (queryTimes--) {
719✔
295
        // use cancel
296
        if(g_arguments->terminate) {
679✔
NEW
297
            infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
298
            break;
×
299
        }
300

301
        // reset cache
302
        if (g_queryInfo.reset_query_cache) {
679✔
303
            if (resetQueryCache(pThreadInfo)) {
1✔
NEW
304
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
305
                return NULL;
×
306
            }
307
        }
308

309
        // execute
310
        st = toolsGetTimestampMs();
679✔
311
        // for each table
312
        for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
2,803✔
313
            // use cancel
314
            if(g_arguments->terminate) {
2,123✔
NEW
315
                infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
316
                break;
×
317
            }
318

319
            // for each sql
320
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
11,185✔
321
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
9,064✔
322
                // use cancel
323
                if(g_arguments->terminate) {
9,064✔
NEW
324
                    infoPrint("%s\n", "user cancel , so exit testing.");
×
325
                    break;
3✔
326
                }
327

328
                
329
                // get real child name sql
330
                if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) {
9,064✔
331
                    // fault
NEW
332
                    tmfree(sqlstr);
×
NEW
333
                    return NULL;
×
334
                }
335

336
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
9,061✔
337
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
8,055✔
338
                            g_queryInfo.superQueryInfo.result[j],
8,055✔
339
                            pThreadInfo->threadID);
340
                }
341

342
                // execute sql
343
                uint64_t s = toolsGetTimestampUs();
9,061✔
344
                int ret = selectAndGetResult(pThreadInfo, sqlstr, true);
9,055✔
345
                if (ret) {
9,059✔
346
                    // found error
NEW
347
                    errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j);
×
UNCOV
348
                    g_fail = true;
×
NEW
349
                    tmfree(sqlstr);
×
NEW
350
                    return NULL;
×
351
                }
352
                uint64_t delay = toolsGetTimestampUs() - s;
9,059✔
353
                debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
9,064✔
354
                if (ret == 0) {
9,064✔
355
                    // only succ add delay list
356
                    benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
9,063✔
357
                    pThreadInfo->total_delay += delay;
9,057✔
358
                }
359

360
                // show real QPS
361
                lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
9,058✔
362
            }
363
        }
364
        et = toolsGetTimestampMs();
680✔
365

366
        // sleep
367
        autoSleep(st, et);
679✔
368
    }
369
    tmfree(sqlstr);
40✔
370

371
    return NULL;
40✔
372
}
373

374
//
375
// ---------------------------------  firse level function ------------------------------
376
//
377

378
void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend) {
15✔
379
    // valid check
380
    if (infos == NULL || threadCnt == 0) {
15✔
NEW
381
        return ;
×
382
    }
383
    
384
    // statistic
385
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
15✔
386
    double total_delays = 0;
15✔
387

388
    // clear
389
    for (int i = 0; i < threadCnt; ++i) {
64✔
390
        qThreadInfo * pThreadInfo = infos + i;
49✔
391
        if(pThreadInfo->query_delay_list == NULL) {
49✔
NEW
392
            continue;;
×
393
        }
394
        
395
        // append delay
396
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
49✔
397
                pThreadInfo->query_delay_list->size, false);
49✔
398
        total_delays += pThreadInfo->total_delay;
49✔
399

400
        // free delay
401
        benchArrayDestroy(pThreadInfo->query_delay_list);
49✔
402
        pThreadInfo->query_delay_list = NULL;
49✔
403

404
    }
405

406
    // succ is zero
407
    if (delay_list->size == 0) {
15✔
NEW
408
        errorPrint("%s", "succ queries count is zero.\n");
×
NEW
409
        benchArrayDestroy(delay_list);
×
NEW
410
        return ;
×
411
    }
412

413

414
    // sort
415
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
15✔
416

417
    // show delay min max
418
    if (delay_list->size) {
15✔
419
        infoPrint(
15✔
420
                "spend %.6fs using "
421
                "%d threads complete query %d times,  "
422
                "min delay: %.6fs, "
423
                "avg delay: %.6fs, "
424
                "p90: %.6fs, "
425
                "p95: %.6fs, "
426
                "p99: %.6fs, "
427
                "max: %.6fs\n",
428
                spend/1E6,
429
                threadCnt, (int)delay_list->size,
430
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
431
                (double)total_delays/delay_list->size/1E6,
432
                *(int64_t *)(benchArrayGet(delay_list,
433
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
434
                *(int64_t *)(benchArrayGet(delay_list,
435
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
436
                *(int64_t *)(benchArrayGet(delay_list,
437
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
438
                *(int64_t *)(benchArrayGet(delay_list,
439
                                    (int32_t)(delay_list->size - 1)))/1E6);
440
    } else {
NEW
441
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
442
                   __func__, __LINE__, (int64_t)delay_list->size);
443
    }
444
    benchArrayDestroy(delay_list);
15✔
445
}
446

447
//
448
// super table query
449
//
450
static int stbQuery(uint16_t iface, char* dbName) {
11✔
451
    int ret = -1;
11✔
452
    pthread_t * pidsOfSub   = NULL;
11✔
453
    qThreadInfo *threadInfos = NULL;
11✔
454
    g_queryInfo.superQueryInfo.totalQueried = 0;
11✔
455
    g_queryInfo.superQueryInfo.totalFail    = 0;
11✔
456

457
    // check
458
    if ((g_queryInfo.superQueryInfo.sqlCount == 0)
11✔
459
        || (g_queryInfo.superQueryInfo.threadCnt == 0)) {
11✔
UNCOV
460
        return 0;
×
461
    }
462

463
    // malloc 
464
    pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
11✔
465
                            *sizeof(pthread_t),
466
                            false);
467
    threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
11✔
468
                                *sizeof(qThreadInfo), false);
469

470
    int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
11✔
471
    int nConcurrent = g_queryInfo.superQueryInfo.threadCnt;
11✔
472

473
    int64_t a = ntables / nConcurrent;
11✔
474
    if (a < 1) {
11✔
475
        nConcurrent = (int)ntables;
2✔
476
        a = 1;
2✔
477
    }
478

479
    int64_t b = 0;
11✔
480
    if (nConcurrent != 0) {
11✔
481
        b = ntables % nConcurrent;
11✔
482
    }
483

484
    uint64_t tableFrom = 0;
11✔
485
    int threadCnt = 0;
11✔
486
    for (int i = 0; i < nConcurrent; i++) {
51✔
487
        qThreadInfo *pThreadInfo = threadInfos + i;
40✔
488
        pThreadInfo->threadID = i;
40✔
489
        pThreadInfo->start_table_from = tableFrom;
40✔
490
        pThreadInfo->ntables = i < b ? a + 1 : a;
40✔
491
        pThreadInfo->end_table_to =
40✔
492
                i < b ? tableFrom + a : tableFrom + a - 1;
40✔
493
        tableFrom = pThreadInfo->end_table_to + 1;
40✔
494
        // create conn
495
        if (initQueryConn(pThreadInfo, iface)){
40✔
NEW
496
            break;
×
497
        }
498
        int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo);
40✔
499
        if (code != 0) {
40✔
NEW
500
            errorPrint("failed stbQueryThread create. error code =%d \n", code);
×
NEW
501
            break;
×
502
        }
503
        threadCnt ++;
40✔
504
    }
505

506
    bool needExit = false;
11✔
507
    // if failed, set termainte flag true like ctrl+c exit
508
    if (threadCnt != nConcurrent  ) {
11✔
NEW
509
        needExit = true;
×
NEW
510
        g_arguments->terminate = true;
×
NEW
511
        goto OVER;
×
512
    }
513

514
    // reset total
515
    g_queryInfo.superQueryInfo.totalQueried = 0;
11✔
516
    g_queryInfo.superQueryInfo.totalFail    = 0;
11✔
517

518
    // real thread count
519
    g_queryInfo.superQueryInfo.threadCnt = threadCnt;
11✔
520
    int64_t start = toolsGetTimestampUs();
11✔
521

522
    for (int i = 0; i < threadCnt; i++) {
51✔
523
        pthread_join(pidsOfSub[i], NULL);
40✔
524
        qThreadInfo *pThreadInfo = threadInfos + i;
40✔
525
        // add succ
526
        g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc;
40✔
527
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
40✔
528
            // "yes" need add fail cnt
529
            g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail;
6✔
530
            g_queryInfo.superQueryInfo.totalFail    += pThreadInfo->nFail;
6✔
531
        }
532

533
        // close conn
534
        closeQueryConn(pThreadInfo, iface);
40✔
535
    }
536
    int64_t end = toolsGetTimestampUs();
11✔
537

538
    if (needExit) {
11✔
NEW
539
        goto OVER;
×
540
    }
541

542
    // total show
543
    totalChildQuery(threadInfos, threadCnt, end - start);
11✔
544

545
    ret = 0;
11✔
546

547
OVER:
11✔
548
    tmfree((char *)pidsOfSub);
11✔
549
    tmfree((char *)threadInfos);
11✔
550

551
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
91✔
552
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
80✔
553
    }
554
    tmfree(g_queryInfo.superQueryInfo.childTblName);
11✔
555
    return ret;
11✔
556
}
557

558
//
559
// specQuery
560
//
561
static int specQuery(uint16_t iface, char* dbName) {
14✔
562
    int ret = -1;
14✔
563
    pthread_t    *pids = NULL;
14✔
564
    qThreadInfo *infos = NULL;
14✔
565
    int    nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
14✔
566
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
14✔
567
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
14✔
568
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
14✔
569

570
    // check invaid
571
    if(nSqlCount == 0 || nConcurrent == 0 ) {
14✔
UNCOV
572
        if(nSqlCount == 0)
×
UNCOV
573
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
×
UNCOV
574
        if(nConcurrent == 0)
×
NEW
575
           warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent);
×
UNCOV
576
        return 0;
×
577
    }
578

579
    // malloc threads memory
580
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
14✔
581
    infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false);
14✔
582

583
    for (uint64_t i = 0; i < nSqlCount; i++) {
58✔
584
        if( g_arguments->terminate ) {
46✔
NEW
585
            break;
×
586
        }
587

588
        // reset
589
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
46✔
590
        memset(infos, 0, nConcurrent * sizeof(qThreadInfo));
46✔
591

592
        // get execute sql
593
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
46✔
594

595
        // create threads
596
        int threadCnt = 0;
46✔
597
        for (int j = 0; j < nConcurrent; j++) {
172✔
598
           qThreadInfo *pThreadInfo = infos + j;
126✔
599
           pThreadInfo->threadID = i * nConcurrent + j;
126✔
600
           pThreadInfo->querySeq = i;
126✔
601

602
           // create conn
603
           if (initQueryConn(pThreadInfo, iface)) {
126✔
NEW
604
               break;
×
605
           }
606

607
           int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo);
126✔
608
           if (code != 0) {
126✔
NEW
609
               errorPrint("failed specQueryThread create. error code =%d \n", code);
×
NEW
610
               break;
×
611
           }
612
           threadCnt++;
126✔
613
        }
614

615
        bool needExit = false;
46✔
616
        // if failed, set termainte flag true like ctrl+c exit
617
        if (threadCnt != nConcurrent  ) {
46✔
NEW
618
            needExit = true;
×
UNCOV
619
            g_arguments->terminate = true;
×
620
        }
621

622
        int64_t start = toolsGetTimestampUs();
46✔
623
        // wait threads execute finished one by one
624
        for (int j = 0; j < threadCnt ; j++) {
172✔
625
           pthread_join(pids[j], NULL);
126✔
626
           qThreadInfo *pThreadInfo = infos + j;
126✔
627
           closeQueryConn(pThreadInfo, iface);
126✔
628

629
           // need exit in loop
630
           if (needExit) {
126✔
631
                // free BArray
NEW
632
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
UNCOV
633
                pThreadInfo->query_delay_list = NULL;
×
634
           }
635
        }
636
        int64_t spend = toolsGetTimestampUs() - start;
46✔
637
        if(spend == 0) {
46✔
638
            // avoid xx/spend expr throw error
NEW
639
            spend = 1;
×
640
        }
641

642
        // create 
643
        if (needExit) {
46✔
NEW
644
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
645
            goto OVER;
×
646
        }
647

648
        //
649
        // show QPS and P90 ...
650
        //
651
        uint64_t n = 0;
46✔
652
        double  total_delays = 0.0;
46✔
653
        uint64_t totalQueried = 0;
46✔
654
        uint64_t totalFail    = 0;
46✔
655
        for (int j = 0; j < threadCnt; j++) {
172✔
656
           qThreadInfo *pThreadInfo = infos + j;
126✔
657
           if(pThreadInfo->query_delay_list == NULL) {
126✔
NEW
658
                continue;;
×
659
           }
660
           
661
           // total one sql
662
           for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) {
1,845✔
663
                int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k);
1,719✔
664
                sql->delay_list[n++] = *delay;
1,719✔
665
                total_delays += *delay;
1,719✔
666
           }
667

668
           // total queries
669
           totalQueried += pThreadInfo->nSucc;
126✔
670
           if (g_arguments->continueIfFail == YES_IF_FAILED) {
126✔
671
                totalQueried += pThreadInfo->nFail;
7✔
672
                totalFail    += pThreadInfo->nFail;
7✔
673
           }
674

675
           // free BArray query_delay_list
676
           benchArrayDestroy(pThreadInfo->query_delay_list);
126✔
677
           pThreadInfo->query_delay_list = NULL;
126✔
678
        }
679

680
        // appand current sql
681
        g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried;
46✔
682
        g_queryInfo.specifiedQueryInfo.totalFail    += totalFail;
46✔
683

684
        // succ is zero
685
        if(totalQueried == 0 || n == 0) {
46✔
686
            errorPrint("%s", "succ queries count is zero.\n");
2✔
687
            goto OVER;
2✔
688
        }
689

690
        qsort(sql->delay_list, n, sizeof(uint64_t), compare);
44✔
691
        int32_t bufLen = strlen(sql->command) + 512;
44✔
692
        char * buf = benchCalloc(bufLen, sizeof(char), false);
44✔
693
        snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " "
44✔
694
                             "sql %"PRIu64" spend %.6fs QPS: %.3f "
695
                             "query delay "
696
                             "avg: %.6fs "
697
                             "min: %.6fs "
698
                             "max: %.6fs "
699
                             "p90: %.6fs "
700
                             "p95: %.6fs "
701
                             "p99: %.6fs "
702
                             "SQL command: %s \n",
703
                             threadCnt, totalQueried,
704
                             i + 1, spend/1E6, totalQueried / (spend/1E6),
44✔
705
                             total_delays/n/1E6,           /* avg */
44✔
706
                             sql->delay_list[0] / 1E6,     /* min */
44✔
707
                             sql->delay_list[n - 1] / 1E6, /* max */
44✔
708
                             /*  p90 */
709
                             sql->delay_list[(uint64_t)(n * 0.90)] / 1E6,
44✔
710
                             /*  p95 */
711
                             sql->delay_list[(uint64_t)(n * 0.95)] / 1E6,
44✔
712
                             /*  p99 */
713
                             sql->delay_list[(uint64_t)(n * 0.99)] / 1E6, 
44✔
714
                             sql->command);
715

716
        infoPrintNoTimestamp("%s", buf);
44✔
717
        infoPrintNoTimestampToFile("%s", buf);
44✔
718
        tmfree(buf);
44✔
719
    }
720
    ret = 0;
12✔
721

722
OVER:
14✔
723
    tmfree((char *)pids);
14✔
724
    tmfree((char *)infos);
14✔
725

726
    // free specialQueryInfo
727
    freeSpecialQueryInfo();
14✔
728
    return ret;
14✔
729
}
730

731
//
732
// specQueryMix
733
//
734
static int specQueryMix(uint16_t iface, char* dbName) {
4✔
735
    // init
736
    int ret            = -1;
4✔
737
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
4✔
738
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
4✔
739
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
4✔
740

741
    // concurent calc
742
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
4✔
743
    int start_sql     = 0;
4✔
744
    int a             = total_sql_num / nConcurrent;
4✔
745
    if (a < 1) {
4✔
746
        warnPrint("sqls num:%d < concurent:%d, so set concurrent to %d\n", total_sql_num, nConcurrent, nConcurrent);
2✔
747
        nConcurrent = total_sql_num;
2✔
748
        a = 1;
2✔
749
    }
750
    int b = 0;
4✔
751
    if (nConcurrent != 0) {
4✔
752
        b = total_sql_num % nConcurrent;
4✔
753
    }
754

755
    //
756
    // running
757
    //
758
    int threadCnt = 0;
4✔
759
    for (int i = 0; i < nConcurrent; ++i) {
13✔
760
        qThreadInfo *pThreadInfo = infos + i;
9✔
761
        pThreadInfo->threadID    = i;
9✔
762
        pThreadInfo->start_sql   = start_sql;
9✔
763
        pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
9✔
764
        start_sql = pThreadInfo->end_sql + 1;
9✔
765
        pThreadInfo->total_delay = 0;
9✔
766

767
        // create conn
768
        if (initQueryConn(pThreadInfo, iface)){
9✔
NEW
769
            break;
×
770
        }
771
        // main run
772
        int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
9✔
773
        if (code != 0) {
9✔
NEW
774
            errorPrint("failed specQueryMixThread create. error code =%d \n", code);
×
NEW
775
            break;
×
776
        }
777
        
778
        threadCnt ++;
9✔
779
    }
780

781
    bool needExit = false;
4✔
782
    // if failed, set termainte flag true like ctrl+c exit
783
    if (threadCnt != nConcurrent) {
4✔
NEW
784
        needExit = true;
×
NEW
785
        g_arguments->terminate = true;
×
786
    }
787

788
    // reset total
789
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
4✔
790
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
4✔
791

792
    int64_t start = toolsGetTimestampUs();
4✔
793
    for (int i = 0; i < threadCnt; ++i) {
13✔
794
        pthread_join(pids[i], NULL);
9✔
795
        qThreadInfo *pThreadInfo = infos + i;
9✔
796
        closeQueryConn(pThreadInfo, iface);
9✔
797

798
        // total queries
799
        g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
9✔
800
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
9✔
801
            // yes need add failed count
802
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
2✔
803
            g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
2✔
804
        }
805

806
        // destory
807
        if (needExit) {
9✔
NEW
808
            benchArrayDestroy(pThreadInfo->query_delay_list);
×
NEW
809
            pThreadInfo->query_delay_list = NULL;
×
810
        }
811
    }
812
    int64_t end = toolsGetTimestampUs();
4✔
813

814
    // create 
815
    if (needExit) {
4✔
NEW
816
        errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
817
        goto OVER;
×
818
    }
819

820
    // statistic
821
    totalChildQuery(infos, threadCnt, end - start);
4✔
822
    ret = 0;
4✔
823

824
OVER:
4✔
825
    tmfree(pids);
4✔
826
    tmfree(infos);
4✔
827

828
    // free sqls
829
    freeSpecialQueryInfo();
4✔
830

831
    return ret;
4✔
832
}
833

834
// total query for end 
835
void totalQuery(int64_t spends) {
27✔
836
    // total QPS
837
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
27✔
838
        + g_queryInfo.superQueryInfo.totalQueried;
27✔
839

840
    // error rate
841
    char errRate[128] = "";
27✔
842
    if(g_arguments->continueIfFail == YES_IF_FAILED) {
27✔
843
        uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail;
5✔
844
        if (totalQueried > 0) {
5✔
845
            snprintf(errRate, sizeof(errRate), " ,error %" PRIu64 " (rate:%.3f%%)", totalFail, ((float)totalFail * 100)/totalQueried);
5✔
846
        }
847
    }
848

849
    // show
850
    double  tInS = (double)spends / 1000;
27✔
851
    char buf[512] = "";
27✔
852
    snprintf(buf, sizeof(buf),
27✔
853
                "Spend %.4f second completed total queries: %" PRIu64
854
                ", the QPS of all threads: %10.3f%s\n\n",
855
                tInS, totalQueried, (double)totalQueried / tInS, errRate);
27✔
856
    infoPrint("%s", buf);
27✔
857
    infoPrintToFile("%s", buf);
27✔
858
}
27✔
859

860
int queryTestProcess() {
29✔
861
    prompt(0);
29✔
862

863
    if (REST_IFACE == g_queryInfo.iface) {
29✔
864
        encodeAuthBase64();
9✔
865
    }
866

867
    // kill sql for executing seconds over "kill_slow_query_threshold"
868
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
29✔
NEW
869
        int32_t ret = killSlowQuery();
×
NEW
870
        if (ret != 0) {
×
NEW
871
            return ret;
×
872
        }
873
    }
874

875
    // covert addr
876
    if (g_queryInfo.iface == REST_IFACE) {
29✔
877
        if (convertHostToServAddr(g_arguments->host,
9✔
878
                    g_arguments->port + TSDB_PORT_HTTP,
9✔
879
                    &(g_arguments->serv_addr)) != 0) {
9✔
880
            errorPrint("%s", "convert host to server address\n");
×
881
            return -1;
×
882
        }
883
    }
884

885
    // fetch child name if super table
886
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
29✔
887
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
11✔
888
        int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
11✔
889
        if (ret != 0) {
11✔
NEW
890
            errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
×
UNCOV
891
            return -1;
×
892
        }
893
    }
894

895
    // 
896
    // start running
897
    //
898

899
    
900
    uint64_t startTs = toolsGetTimestampMs();
29✔
901
    if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) {
29✔
902
        // specified table
903
        if (g_queryInfo.specifiedQueryInfo.mixed_query) {
18✔
904
            // mixed
905
            if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) {
4✔
NEW
906
                return -1;
×
907
            }
908
        } else {
909
            // no mixied
910
            if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
14✔
911
                return -1;
2✔
912
            }
913
        }
914
    } else if(g_queryInfo.superQueryInfo.sqlCount > 0) {
11✔
915
        // super table
916
        if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
11✔
UNCOV
917
            return -1;
×
918
        }
919
    } else {
920
        // nothing
NEW
921
        errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty.");
×
UNCOV
922
        return -1;
×
923
    }
924

925
    // total 
926
    totalQuery(toolsGetTimestampMs() - startTs); 
27✔
927
    return 0;
27✔
928
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc