• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 5041216934

pending completion
5041216934

push

github

GitHub
Merge pull request #657 from taosdata/fix/TD-24314

6 of 6 new or added lines in 1 file covered. (100.0%)

10915 of 13598 relevant lines covered (80.27%)

226908.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.37
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14

15
extern int g_majorVersionOfClient;
16

17
int selectAndGetResult(threadInfo *pThreadInfo, char *command) {
6,265✔
18
    int ret = 0;
6,265✔
19

20
    if (g_arguments->terminate) {
6,265✔
21
        return -1;
×
22
    }
23
    uint32_t threadID = pThreadInfo->threadID;
6,265✔
24
    char dbName[TSDB_DB_NAME_LEN] = {0};
6,265✔
25
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
6,265✔
26

27
    if (g_queryInfo.iface == REST_IFACE) {
6,265✔
28
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
36✔
29
                                   0, g_arguments->port, false,
36✔
30
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
36✔
31
        if (0 != retCode) {
36✔
32
            errorPrint("====restful return fail, threadID[%u]\n",
×
33
                       threadID);
34
            ret = -1;
×
35
        }
36
    } else {
37
        TAOS *taos = pThreadInfo->conn->taos;
6,229✔
38
        if (taos_select_db(taos, g_queryInfo.dbName)) {
6,229✔
39
            errorPrint("thread[%u]: failed to select database(%s)\n",
×
40
                threadID, dbName);
41
            ret = -2;
×
42
        } else {
43
            TAOS_RES *res = taos_query(taos, command);
6,228✔
44
            int code = taos_errno(res);
6,229✔
45
            if (res == NULL || code) {
6,229✔
46
                if (YES_IF_FAILED == g_arguments->continueIfFail) {
15✔
47
                    warnPrint("failed to execute sql:%s, "
3✔
48
                              "code: 0x%08x, reason:%s\n",
49
                               command, code, taos_errstr(res));
50
                } else {
51
                    errorPrint("failed to execute sql:%s, "
12✔
52
                               "code: 0x%08x, reason:%s\n",
53
                               command, code, taos_errstr(res));
54
                    ret = -1;
12✔
55
                }
56
            } else {
57
                //if (strlen(pThreadInfo->filePath) > 0) {
58
                    fetchResult(res, pThreadInfo);
6,214✔
59
                //}
60
            }
61
            taos_free_result(res);
6,229✔
62
        }
63
    }
64
    return ret;
6,265✔
65
}
66

67
static void *mixedQuery(void *sarg) {
2✔
68
    queryThreadInfo *pThreadInfo = (queryThreadInfo*)sarg;
2✔
69
#ifdef LINUX
70
    prctl(PR_SET_NAME, "mixedQuery");
2✔
71
#endif
72
    int64_t lastPrintTs = toolsGetTimestampMs();
2✔
73
    int64_t st;
74
    int64_t et;
75
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
2✔
76
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
4✔
77
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
2✔
78
        for (int j = 0; j < queryTimes; ++j) {
6✔
79
            if (g_arguments->terminate) {
4✔
80
                return NULL;
×
81
            }
82
            if (g_queryInfo.reset_query_cache) {
4✔
83
                if (queryDbExecCall(pThreadInfo->conn,
×
84
                                    "RESET QUERY CACHE")) {
85
                    errorPrint("%s() LN%d, reset query cache failed\n",
×
86
                               __func__, __LINE__);
87
                    return NULL;
×
88
                }
89
            }
90
            st = toolsGetTimestampUs();
4✔
91
            if (g_queryInfo.iface == REST_IFACE) {
4✔
92
                int retCode = postProceSql(sql->command, g_queryInfo.dbName,
×
93
                                           0, g_queryInfo.iface, 0,
×
94
                                           g_arguments->port,
×
95
                                           false, pThreadInfo->sockfd, "");
96
                if (retCode) {
×
97
                    errorPrint("thread[%d]: restful query <%s> failed\n",
×
98
                            pThreadInfo->threadId, sql->command);
99
                    continue;
×
100
                }
101
            } else {
102
                if (g_queryInfo.dbName != NULL) {
4✔
103
                    if (taos_select_db(pThreadInfo->conn->taos,
4✔
104
                                       g_queryInfo.dbName)) {
4✔
105
                        errorPrint("thread[%d]: failed to "
×
106
                                   "select database(%s)\n",
107
                                   pThreadInfo->threadId,
108
                                   g_queryInfo.dbName);
109
                        return NULL;
×
110
                    }
111
                }
112
                TAOS_RES *res = taos_query(pThreadInfo->conn->taos,
4✔
113
                                           sql->command);
4✔
114
                if (res == NULL || taos_errno(res) != 0) {
4✔
115
                    if (YES_IF_FAILED == g_arguments->continueIfFail) {
×
116
                        warnPrint(
×
117
                                "thread[%d]: failed to execute sql :%s, "
118
                                "code: 0x%x, reason: %s\n",
119
                                pThreadInfo->threadId,
120
                                sql->command,
121
                                taos_errno(res), taos_errstr(res));
122
                    } else {
123
                        errorPrint(
×
124
                                "thread[%d]: failed to execute sql :%s, "
125
                                "code: 0x%x, reason: %s\n",
126
                                pThreadInfo->threadId,
127
                                sql->command,
128
                                taos_errno(res), taos_errstr(res));
129
                        if (TSDB_CODE_RPC_NETWORK_UNAVAIL ==
×
130
                                taos_errno(res)) {
×
131
                            return NULL;
×
132
                        }
133
                    }
134
                    continue;
×
135
                }
136
                taos_free_result(res);
4✔
137
            }
138
            et = toolsGetTimestampUs();
4✔
139
            int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
4✔
140
            *delay = et - st;
4✔
141
            debugPrint("%s() LN%d, delay: %"PRId64"\n",
4✔
142
                       __func__, __LINE__, *delay);
143

144
            pThreadInfo->total_delay += (et - st);
4✔
145
            if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
4✔
146
                tmfree(delay);
×
147
            }
148
            int64_t currentPrintTs = toolsGetTimestampMs();
4✔
149
            if (currentPrintTs - lastPrintTs > 10 * 1000) {
4✔
150
                infoPrint("thread[%d] has currently complete query %d times\n",
×
151
                        pThreadInfo->threadId,
152
                        (int)pThreadInfo->query_delay_list->size);
153
                lastPrintTs = currentPrintTs;
×
154
            }
155
        }
156
    }
157
    return NULL;
2✔
158
}
159

160
static void *specifiedTableQuery(void *sarg) {
107✔
161
    threadInfo *pThreadInfo = (threadInfo *)sarg;
107✔
162
#ifdef LINUX
163
    prctl(PR_SET_NAME, "specTableQuery");
107✔
164
#endif
165
    uint64_t st = 0;
107✔
166
    uint64_t et = 0;
107✔
167
    uint64_t minDelay = UINT64_MAX;
107✔
168
    uint64_t maxDelay = 0;
107✔
169
    uint64_t totalDelay = 0;
107✔
170
    int32_t  index = 0;
107✔
171

172
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
107✔
173
    pThreadInfo->query_delay_list = benchCalloc(queryTimes,
107✔
174
            sizeof(uint64_t), false);
175
    uint64_t  lastPrintTime = toolsGetTimestampMs();
107✔
176
    uint64_t  startTs = toolsGetTimestampMs();
107✔
177

178
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls,
107✔
179
            pThreadInfo->querySeq);
180

181
    if (sql->result[0] != '\0') {
107✔
182
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
102✔
183
                sql->result, pThreadInfo->threadID);
102✔
184
    }
185

186
    while (index < queryTimes) {
298✔
187
        if (g_queryInfo.specifiedQueryInfo.queryInterval
191✔
188
            && (et - st) <
190✔
189
                (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval*1000) {
190✔
190
            toolsMsleep((int32_t)(
108✔
191
                        g_queryInfo.specifiedQueryInfo.queryInterval*1000
108✔
192
                        - (et - st)));  // ms
108✔
193
        }
194
        if (g_queryInfo.reset_query_cache) {
191✔
195
            if (queryDbExecCall(pThreadInfo->conn,
6✔
196
                                "RESET QUERY CACHE")) {
197
                errorPrint("%s() LN%d, reset query cache failed\n",
×
198
                           __func__, __LINE__);
199
                return NULL;
×
200
            }
201
        }
202

203
        st = toolsGetTimestampUs();
191✔
204
        int ret = selectAndGetResult(pThreadInfo, sql->command);
191✔
205
        if (ret) {
191✔
206
            g_fail = true;
12✔
207
        }
208

209
        et = toolsGetTimestampUs();
191✔
210
        int64_t delay = et - st;
191✔
211
        debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, delay);
191✔
212

213
        if (ret == 0) {
191✔
214
            pThreadInfo->query_delay_list[index] = delay;
179✔
215
            pThreadInfo->totalQueried++;
179✔
216
        }
217
        index++;
191✔
218
        totalDelay += delay;
191✔
219
        if (delay > maxDelay) {
191✔
220
            maxDelay = delay;
122✔
221
        }
222
        if (delay < minDelay) {
191✔
223
            minDelay = delay;
176✔
224
        }
225

226
        uint64_t currentPrintTime = toolsGetTimestampMs();
191✔
227
        uint64_t endTs = toolsGetTimestampMs();
191✔
228

229
        if ((ret == 0) && (currentPrintTime - lastPrintTime > 30 * 1000)) {
191✔
230
            infoPrint(
×
231
                    "thread[%d] has currently completed queries: %" PRIu64
232
                    ", QPS: %10.6f\n",
233
                    pThreadInfo->threadID, pThreadInfo->totalQueried,
234
                    (double)(pThreadInfo->totalQueried /
235
                        ((endTs - startTs) / 1000.0)));
236
            lastPrintTime = currentPrintTime;
×
237
        }
238

239
        if (-2 == ret) {
191✔
240
            toolsMsleep(1000);
×
241
            return NULL;
×
242
        }
243
    }
244
    qsort(pThreadInfo->query_delay_list, queryTimes,
107✔
245
            sizeof(uint64_t), compare);
246
    pThreadInfo->avg_delay = (double)totalDelay / queryTimes;
107✔
247
    return NULL;
107✔
248
}
249

250
static void *superTableQuery(void *sarg) {
38✔
251
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
38✔
252
    threadInfo *pThreadInfo = (threadInfo *)sarg;
38✔
253
#ifdef LINUX
254
    prctl(PR_SET_NAME, "superTableQuery");
38✔
255
#endif
256

257
    uint64_t st = 0;
38✔
258
    uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000;
38✔
259

260
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
38✔
261
    uint64_t startTs = toolsGetTimestampMs();
38✔
262

263
    uint64_t lastPrintTime = toolsGetTimestampMs();
38✔
264
    while (queryTimes--) {
122✔
265
        if (g_queryInfo.superQueryInfo.queryInterval
84✔
266
            && ((et - st) <
83✔
267
                (int64_t)g_queryInfo.superQueryInfo.queryInterval*1000)) {
83✔
268
            toolsMsleep((int32_t)
46✔
269
                        (g_queryInfo.superQueryInfo.queryInterval*1000
46✔
270
                        - (et - st)));
46✔
271
        }
272

273
        st = toolsGetTimestampMs();
84✔
274
        for (int i = (int)pThreadInfo->start_table_from;
84✔
275
             i <= pThreadInfo->end_table_to; i++) {
218✔
276
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
6,208✔
277
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
6,074✔
278
                replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr,
6,074✔
279
                                    i);
280
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
6,074✔
281
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
6,074✔
282
                            g_queryInfo.superQueryInfo.result[j],
6,074✔
283
                            pThreadInfo->threadID);
284
                }
285
                if (selectAndGetResult(pThreadInfo, sqlstr)) {
6,074✔
286
                    g_fail = true;
×
287
                }
288

289
                pThreadInfo->totalQueried++;
6,074✔
290

291
                int64_t currentPrintTime = toolsGetTimestampMs();
6,074✔
292
                int64_t endTs = toolsGetTimestampMs();
6,074✔
293
                if (currentPrintTime - lastPrintTime > 30 * 1000) {
6,074✔
294
                    infoPrint(
×
295
                        "thread[%d] has currently completed queries: %" PRIu64
296
                        ", QPS: %10.3f\n",
297
                        pThreadInfo->threadID, pThreadInfo->totalQueried,
298
                        (double)(pThreadInfo->totalQueried /
299
                                 ((endTs - startTs) / 1000.0)));
300
                    lastPrintTime = currentPrintTime;
×
301
                }
302
            }
303
        }
304
        et = toolsGetTimestampMs();
84✔
305
    }
306
    tmfree(sqlstr);
38✔
307
    return NULL;
38✔
308
}
309

310
static int multi_thread_super_table_query(uint16_t iface, char* dbName) {
12✔
311
    int ret = -1;
12✔
312
    pthread_t * pidsOfSub = NULL;
12✔
313
    threadInfo *infosOfSub = NULL;
12✔
314
    //==== create sub threads for query from all sub table of the super table
315
    if ((g_queryInfo.superQueryInfo.sqlCount > 0)
12✔
316
            && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
20✔
317
        pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
318
                                *sizeof(pthread_t),
319
                                false);
320
        infosOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
10✔
321
                                 *sizeof(threadInfo), false);
322

323
        int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
10✔
324
        int     threads = g_queryInfo.superQueryInfo.threadCnt;
10✔
325

326
        int64_t a = ntables / threads;
10✔
327
        if (a < 1) {
10✔
328
            threads = (int)ntables;
2✔
329
            a = 1;
2✔
330
        }
331

332
        int64_t b = 0;
10✔
333
        if (threads != 0) {
10✔
334
            b = ntables % threads;
10✔
335
        }
336

337
        uint64_t tableFrom = 0;
10✔
338
        for (int i = 0; i < threads; i++) {
48✔
339
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
340
            pThreadInfo->threadID = i;
38✔
341
            pThreadInfo->start_table_from = tableFrom;
38✔
342
            pThreadInfo->ntables = i < b ? a + 1 : a;
38✔
343
            pThreadInfo->end_table_to =
38✔
344
                    i < b ? tableFrom + a : tableFrom + a - 1;
38✔
345
            tableFrom = pThreadInfo->end_table_to + 1;
38✔
346
            if (iface == REST_IFACE) {
38✔
347
                int sockfd = createSockFd();
5✔
348
                if (sockfd < 0) {
5✔
349
                    goto OVER;
×
350
                }
351
                pThreadInfo->sockfd = sockfd;
5✔
352
            } else {
353
                pThreadInfo->conn = initBenchConn();
33✔
354
                if (pThreadInfo->conn == NULL) {
33✔
355
                    goto OVER;
×
356
                }
357
            }
358
            pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
38✔
359
        }
360
        g_queryInfo.superQueryInfo.threadCnt = threads;
10✔
361

362
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
48✔
363
            if (!g_arguments->terminate)
38✔
364
                pthread_join(pidsOfSub[i], NULL);
38✔
365
            threadInfo *pThreadInfo = infosOfSub + i;
38✔
366
            if (iface == REST_IFACE) {
38✔
367
                destroySockFd(pThreadInfo->sockfd);
5✔
368
            } else {
369
                closeBenchConn(pThreadInfo->conn);
33✔
370
            }
371
            if (g_fail) {
38✔
372
                goto OVER;
×
373
            }
374
        }
375
        for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; ++i) {
48✔
376
            g_queryInfo.superQueryInfo.totalQueried
377
                += infosOfSub[i].totalQueried;
38✔
378
        }
379
    } else {
380
        return 0;
2✔
381
    }
382

383
    ret = 0;
10✔
384
OVER:
10✔
385
    tmfree((char *)pidsOfSub);
10✔
386
    tmfree((char *)infosOfSub);
10✔
387

388
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
80✔
389
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
70✔
390
    }
391
    tmfree(g_queryInfo.superQueryInfo.childTblName);
10✔
392
    return ret;
10✔
393
}
394

395
static int multi_thread_specified_table_query(uint16_t iface, char* dbName) {
13✔
396
    pthread_t * pids = NULL;
13✔
397
    threadInfo *infos = NULL;
13✔
398
    //==== create sub threads for query from specify table
399
    int      nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
13✔
400
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
13✔
401
    if ((nSqlCount > 0) && (nConcurrent > 0)) {
13✔
402
        pids = benchCalloc(1, nConcurrent*nSqlCount*sizeof(pthread_t), false);
11✔
403
        infos = benchCalloc(1, nConcurrent*nSqlCount*sizeof(threadInfo), false);
11✔
404
        for (uint64_t i = 0; i < nSqlCount; i++) {
48✔
405
            SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
39✔
406
            for (int j = 0; j < nConcurrent; j++) {
146✔
407
                uint64_t    seq = i * nConcurrent + j;
107✔
408
                threadInfo *pThreadInfo = infos + seq;
107✔
409
                pThreadInfo->threadID = (int)seq;
107✔
410
                pThreadInfo->querySeq = i;
107✔
411
                if (iface == REST_IFACE) {
107✔
412
                    int sockfd = createSockFd();
13✔
413
                    // int iMode = 1;
414
                    // ioctl(sockfd, FIONBIO, &iMode);
415
                    if (sockfd < 0) {
13✔
416
                        tmfree((char *)pids);
×
417
                        tmfree((char *)infos);
×
418
                        return -1;
×
419
                    }
420
                    pThreadInfo->sockfd = sockfd;
13✔
421
                } else {
422
                    pThreadInfo->conn = initBenchConn();
94✔
423
                    if (pThreadInfo->conn == NULL) {
94✔
424
                        destroySockFd(pThreadInfo->sockfd);
×
425
                        tmfree((char *)pids);
×
426
                        tmfree((char *)infos);
×
427
                        return -1;
×
428
                    }
429
                }
430

431
                pthread_create(pids + seq, NULL, specifiedTableQuery,
107✔
432
                               pThreadInfo);
433
            }
434
            for (int j = 0; j < nConcurrent; j++) {
146✔
435
                uint64_t seq = i * nConcurrent + j;
107✔
436
                pthread_join(pids[seq], NULL);
107✔
437
                threadInfo *pThreadInfo = infos + seq;
107✔
438
                if (iface == REST_IFACE) {
107✔
439
#ifdef WINDOWS
440
                    closesocket(pThreadInfo->sockfd);
441
                    WSACleanup();
442
#else
443
                    close(pThreadInfo->sockfd);
13✔
444
#endif
445
                } else {
446
                    closeBenchConn(pThreadInfo->conn);
94✔
447
                }
448
                if (g_fail) {
107✔
449
                    tmfree(pThreadInfo->query_delay_list);
6✔
450
                }
451
            }
452

453
            if (g_fail) {
39✔
454
                tmfree((char *)pids);
2✔
455
                tmfree((char *)infos);
2✔
456
                return -1;
2✔
457
            }
458
            uint64_t query_times = g_queryInfo.specifiedQueryInfo.queryTimes;
37✔
459
            uint64_t totalQueryTimes = query_times * nConcurrent;
37✔
460
            double avg_delay = 0.0;
37✔
461
            for (int j = 0; j < nConcurrent; j++) {
138✔
462
                uint64_t    seq = i * nConcurrent + j;
101✔
463
                threadInfo *pThreadInfo = infos + seq;
101✔
464
                avg_delay += pThreadInfo->avg_delay;
101✔
465
                for (uint64_t k = 0;
101✔
466
                        k < g_queryInfo.specifiedQueryInfo.queryTimes; k++) {
280✔
467
                    sql->delay_list[j*query_times + k] =
179✔
468
                        pThreadInfo->query_delay_list[k];
179✔
469
                }
470
                tmfree(pThreadInfo->query_delay_list);
101✔
471
            }
472
            avg_delay /= nConcurrent;
37✔
473
            qsort(sql->delay_list, g_queryInfo.specifiedQueryInfo.queryTimes,
37✔
474
                  sizeof(uint64_t), compare);
475
            infoPrintNoTimestamp("complete query with %d threads and %"PRIu64
37✔
476
                    " query delay "
477
                    "avg: \t%.6fs "
478
                    "min: \t%.6fs "
479
                    "max: \t%.6fs "
480
                    "p90: \t%.6fs "
481
                    "p95: \t%.6fs "
482
                    "p99: \t%.6fs "
483
                    "SQL command: %s"
484
                    "\n",
485
                      nConcurrent, query_times,
486
                      avg_delay/1E6,  /* avg */
487
                      sql->delay_list[0]/1E6, /* min */
488
                      sql->delay_list[totalQueryTimes - 1]/1E6,  /*  max */
489
                      /*  p90 */
490
                      sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)]/1E6,
491
                      /*  p95 */
492
                      sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)]/1E6,
493
                      /*  p99 */
494
                      sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)]/1E6,
495
                      sql->command);
496
            infoPrintNoTimestampToFile(g_arguments->fpOfInsertResult,
37✔
497
                    "complete query with %d threads and %"PRIu64
498
                    " query delay "
499
                    "avg: \t%.6fs "
500
                    "min: \t%.6fs "
501
                    "max: \t%.6fs "
502
                    "p90: \t%.6fs "
503
                    "p95: \t%.6fs "
504
                    "p99: \t%.6fs "
505
                    "SQL command: %s"
506
                    "\n",
507
                      nConcurrent, query_times,
508
                      avg_delay/1E6,  /* avg */
509
                      sql->delay_list[0]/1E6, /* min */
510
                      sql->delay_list[totalQueryTimes - 1]/1E6,  /*  max */
511
                      /*  p90 */
512
                      sql->delay_list[(uint64_t)(totalQueryTimes * 0.90)]/1E6,
513
                      /*  p95 */
514
                      sql->delay_list[(uint64_t)(totalQueryTimes * 0.95)]/1E6,
515
                      /*  p99 */
516
                      sql->delay_list[(uint64_t)(totalQueryTimes * 0.99)]/1E6,
517
                      sql->command);
518
        }
519
    } else {
520
        return 0;
2✔
521
    }
522

523
    g_queryInfo.specifiedQueryInfo.totalQueried =
9✔
524
        g_queryInfo.specifiedQueryInfo.queryTimes * nConcurrent;
9✔
525
    tmfree((char *)pids);
9✔
526
    tmfree((char *)infos);
9✔
527
    for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqls->size; ++i) {
26✔
528
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
17✔
529
        tmfree(sql->command);
17✔
530
        tmfree(sql->delay_list);
17✔
531
    }
532
    benchArrayDestroy(g_queryInfo.specifiedQueryInfo.sqls);
9✔
533
    return 0;
9✔
534
}
535

536
static int multi_thread_specified_mixed_query(uint16_t iface, char* dbName) {
1✔
537
    int code = -1;
1✔
538
    int thread = g_queryInfo.specifiedQueryInfo.concurrent;
1✔
539
    pthread_t * pids = benchCalloc(thread, sizeof(pthread_t), true);
1✔
540
    queryThreadInfo *infos = benchCalloc(thread, sizeof(queryThreadInfo), true);
1✔
541
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
1✔
542
    int start_sql = 0;
1✔
543
    int a = total_sql_num / thread;
1✔
544
    if (a < 1) {
1✔
545
        thread = total_sql_num;
1✔
546
        a = 1;
1✔
547
    }
548
    int b = 0;
1✔
549
    if (thread != 0) {
1✔
550
        b = total_sql_num % thread;
1✔
551
    }
552
    for (int i = 0; i < thread; ++i) {
3✔
553
        queryThreadInfo *pQueryThreadInfo = infos + i;
2✔
554
        pQueryThreadInfo->threadId = i;
2✔
555
        pQueryThreadInfo->start_sql = start_sql;
2✔
556
        pQueryThreadInfo->end_sql = i < b ? start_sql + a : start_sql + a - 1;
2✔
557
        start_sql = pQueryThreadInfo->end_sql + 1;
2✔
558
        pQueryThreadInfo->total_delay = 0;
2✔
559
        pQueryThreadInfo->query_delay_list = benchArrayInit(1, sizeof(int64_t));
2✔
560
        if (iface == REST_IFACE) {
2✔
561
            int sockfd = createSockFd();
×
562
            if (sockfd < 0) {
×
563
                goto OVER;
×
564
            }
565
            pQueryThreadInfo->sockfd = sockfd;
×
566
        } else {
567
            pQueryThreadInfo->conn = initBenchConn();
2✔
568
            if (pQueryThreadInfo->conn == NULL) {
2✔
569
                goto OVER;
×
570
            }
571
        }
572
        pthread_create(pids + i, NULL, mixedQuery, pQueryThreadInfo);
2✔
573
    }
574

575
    int64_t start = toolsGetTimestampUs();
1✔
576
    for (int i = 0; i < thread; ++i) {
3✔
577
        pthread_join(pids[i], NULL);
2✔
578
    }
579
    int64_t end = toolsGetTimestampUs();
1✔
580

581
    // statistic
582
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
1✔
583
    int64_t total_delay = 0;
1✔
584
    for (int i = 0; i < thread; ++i) {
3✔
585
        queryThreadInfo * pThreadInfo = infos + i;
2✔
586
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
2✔
587
                pThreadInfo->query_delay_list->size);
2✔
588
        total_delay += pThreadInfo->total_delay;
2✔
589
        tmfree(pThreadInfo->query_delay_list);
2✔
590
        if (iface == REST_IFACE) {
2✔
591
#ifdef  WINDOWS
592
            closesocket(pThreadInfo->sockfd);
593
            WSACleanup();
594
#else
595
            close(pThreadInfo->sockfd);
×
596
#endif
597
        } else {
598
            closeBenchConn(pThreadInfo->conn);
2✔
599
        }
600
    }
601
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
1✔
602
    if (delay_list->size) {
1✔
603
        infoPrint(
1✔
604
                "spend %.6fs using "
605
                "%d threads complete query %d times,cd  "
606
                "min delay: %.6fs, "
607
                "avg delay: %.6fs, "
608
                "p90: %.6fs, "
609
                "p95: %.6fs, "
610
                "p99: %.6fs, "
611
                "max: %.6fs\n",
612
                (end - start)/1E6,
613
                thread, (int)delay_list->size,
614
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
615
                (double)total_delay/delay_list->size/1E6,
616
                *(int64_t *)(benchArrayGet(delay_list,
617
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
618
                *(int64_t *)(benchArrayGet(delay_list,
619
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
620
                *(int64_t *)(benchArrayGet(delay_list,
621
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
622
                *(int64_t *)(benchArrayGet(delay_list,
623
                                    (int32_t)(delay_list->size - 1)))/1E6);
624
    } else {
625
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
626
                   __func__, __LINE__, (int64_t)delay_list->size);
627
    }
628
    benchArrayDestroy(delay_list);
1✔
629
    code = 0;
1✔
630
OVER:
1✔
631
    tmfree(pids);
1✔
632
    tmfree(infos);
1✔
633
    return code;
1✔
634
}
635

636
#define KILLID_LEN  20
637

638
void *queryKiller(void *arg) {
×
639
    char host[MAX_HOSTNAME_LEN] = {0};
×
640
    tstrncpy(host, g_arguments->host, MAX_HOSTNAME_LEN);
×
641

642
    while (true) {
×
643
        TAOS *taos = taos_connect(g_arguments->host, g_arguments->user,
×
644
                g_arguments->password, NULL, g_arguments->port);
×
645
        if (NULL == taos) {
×
646
            errorPrint("Slow query killer thread "
×
647
                    "failed to connect to the server %s\n",
648
                    g_arguments->host);
649
            return NULL;
×
650
        }
651

652
        char command[TSDB_MAX_ALLOWED_SQL_LEN] =
×
653
            "SELECT kill_id,exec_usec,sql FROM performance_schema.perf_queries";
654
        TAOS_RES *res = taos_query(taos, command);
×
655
        int32_t code = taos_errno(res);
×
656
        if (code) {
×
657
            printErrCmdCodeStr(command, code, res);
×
658
        }
659

660
        TAOS_ROW row = NULL;
×
661
        while ((row = taos_fetch_row(res)) != NULL) {
×
662
            int32_t *lengths = taos_fetch_lengths(res);
×
663
            if (lengths[0] <= 0) {
×
664
                infoPrint("No valid query found by %s\n", command);
×
665
            } else {
666
                int64_t execUSec = *(int64_t*)row[1];
×
667

668
                if (execUSec > g_queryInfo.killQueryThreshold * 1000000) {
×
669
                    char sql[SHORT_1K_SQL_BUFF_LEN] = {0};
×
670
                    tstrncpy(sql, (char*)row[2],
×
671
                             min(strlen((char*)row[2])+1,
672
                                 SHORT_1K_SQL_BUFF_LEN));
673

674
                    char killId[KILLID_LEN] = {0};
×
675
                    tstrncpy(killId, (char*)row[0],
×
676
                            min(strlen((char*)row[0])+1, KILLID_LEN));
677
                    char killCommand[KILLID_LEN + 15] = {0};
×
678
                    snprintf(killCommand, KILLID_LEN + 15,
×
679
                             "KILL QUERY '%s'", killId);
680
                    TAOS_RES *resKill = taos_query(taos, killCommand);
×
681
                    int32_t codeKill = taos_errno(resKill);
×
682
                    if (codeKill) {
×
683
                        printErrCmdCodeStr(killCommand, codeKill, resKill);
×
684
                    } else {
685
                        infoPrint("%s succeed, sql: %s killed!\n",
×
686
                                  killCommand, sql);
687
                        taos_free_result(resKill);
×
688
                    }
689
                }
690
            }
691
        }
692

693
        taos_free_result(res);
×
694
        taos_close(taos);
×
695
        toolsMsleep(g_queryInfo.killQueryInterval*1000);
×
696
    }
697

698
    return NULL;
699
}
700

701
int queryTestProcess() {
14✔
702
    prompt(0);
14✔
703

704
    if (REST_IFACE == g_queryInfo.iface) {
14✔
705
        encodeAuthBase64();
3✔
706
    }
707

708
    pthread_t pidKiller = {0};
14✔
709
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
14✔
710
        pthread_create(&pidKiller, NULL, queryKiller, NULL);
×
711
        pthread_join(pidKiller, NULL);
×
712
        toolsMsleep(1000);
×
713
    }
714

715
    if (g_queryInfo.iface == REST_IFACE) {
14✔
716
        if (convertHostToServAddr(g_arguments->host,
3✔
717
                    g_arguments->port + TSDB_PORT_HTTP,
3✔
718
                    &(g_arguments->serv_addr)) != 0) {
3✔
719
            errorPrint("%s", "convert host to server address\n");
×
720
            return -1;
×
721
        }
722
    }
723

724
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
14✔
725
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
12✔
726
        SBenchConn* conn = initBenchConn();
12✔
727
        if (conn == NULL) {
12✔
728
            return -1;
×
729
        }
730
        char  cmd[SHORT_1K_SQL_BUFF_LEN] = "\0";
12✔
731
        if (3 == g_majorVersionOfClient) {
12✔
732
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
12✔
733
                    "SELECT COUNT(*) FROM( SELECT DISTINCT(TBNAME) FROM %s.%s)",
734
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
735
        } else {
736
            snprintf(cmd, SHORT_1K_SQL_BUFF_LEN,
×
737
                     "SELECT COUNT(TBNAME) FROM %s.%s",
738
                    g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
739
        }
740
        TAOS_RES *res = taos_query(conn->taos, cmd);
12✔
741
        int32_t   code = taos_errno(res);
12✔
742
        if (code) {
12✔
743
            printErrCmdCodeStr(cmd, code, res);
×
744
            closeBenchConn(conn);
×
745
            return -1;
×
746
        }
747
        TAOS_ROW    row = NULL;
12✔
748
        int         num_fields = taos_num_fields(res);
12✔
749
        TAOS_FIELD *fields = taos_fetch_fields(res);
12✔
750
        while ((row = taos_fetch_row(res)) != NULL) {
24✔
751
            if (0 == strlen((char *)(row[0]))) {
12✔
752
                errorPrint("stable %s have no child table\n",
×
753
                        g_queryInfo.superQueryInfo.stbName);
754
                taos_free_result(res);
×
755
                closeBenchConn(conn);
×
756
                return -1;
×
757
            }
758
            char temp[256] = {0};
12✔
759
            taos_print_row(temp, row, fields, num_fields);
12✔
760
            g_queryInfo.superQueryInfo.childTblCount = (int64_t)atol(temp);
12✔
761
        }
762
        infoPrint("%s's childTblCount: %" PRId64 "\n",
12✔
763
                g_queryInfo.superQueryInfo.stbName,
764
                g_queryInfo.superQueryInfo.childTblCount);
765
        taos_free_result(res);
12✔
766
        g_queryInfo.superQueryInfo.childTblName =
12✔
767
            benchCalloc(g_queryInfo.superQueryInfo.childTblCount,
12✔
768
                    sizeof(char *), false);
769
        if (getAllChildNameOfSuperTable(
12✔
770
                    conn->taos, g_queryInfo.dbName,
771
                    g_queryInfo.superQueryInfo.stbName,
772
                    g_queryInfo.superQueryInfo.childTblName,
773
                    g_queryInfo.superQueryInfo.childTblCount)) {
774
            tmfree(g_queryInfo.superQueryInfo.childTblName);
×
775
            closeBenchConn(conn);
×
776
            return -1;
×
777
        }
778
        closeBenchConn(conn);
12✔
779
    }
780
    uint64_t startTs = toolsGetTimestampMs();
14✔
781
    if (g_queryInfo.specifiedQueryInfo.mixed_query) {
14✔
782
        if (multi_thread_specified_mixed_query(g_queryInfo.iface,
1✔
783
                    g_queryInfo.dbName)) {
784
            return -1;
×
785
        }
786
    } else {
787
        if (multi_thread_specified_table_query(g_queryInfo.iface,
13✔
788
                    g_queryInfo.dbName)) {
789
            return -1;
2✔
790
        }
791
    }
792
    if (multi_thread_super_table_query(g_queryInfo.iface,
12✔
793
                g_queryInfo.dbName)) {
794
        return -1;
×
795
    }
796
    // workaround to use separate taos connection;
797
    uint64_t endTs = toolsGetTimestampMs();
12✔
798
    int64_t t = endTs - startTs;
12✔
799
    double  tInS = (double)t / 1000.0;
12✔
800
    if (g_queryInfo.specifiedQueryInfo.totalQueried)
12✔
801
        infoPrint("Total specified queries: %" PRIu64 "\n",
9✔
802
              g_queryInfo.specifiedQueryInfo.totalQueried);
803
    if (g_queryInfo.superQueryInfo.totalQueried)
12✔
804
    infoPrint("Total super queries: %" PRIu64 "\n",
10✔
805
              g_queryInfo.superQueryInfo.totalQueried);
806
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
12✔
807
        + g_queryInfo.superQueryInfo.totalQueried;
12✔
808
    infoPrint(
12✔
809
            "Spend %.4f second completed total queries: %" PRIu64
810
            ", the QPS of all threads: %10.3f\n\n",
811
            tInS, totalQueried, (double)totalQueried / tInS);
812
    infoPrintToFile(g_arguments->fpOfInsertResult,
12✔
813
            "Spend %.4f second completed total queries: %" PRIu64
814
            ", the QPS of all threads: %10.3f\n\n",
815
            tInS, totalQueried, (double)totalQueried / tInS);
816
    return 0;
12✔
817
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc