• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/source/dnode/vnode/src/tq/tqPush.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
13,058,550✔
20
  if (taosHashGetSize(pTq->pPushMgr) <= 0) {
13,058,550✔
21
    return 0;
13,030,160✔
22
  }
23
  SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH};
28,250✔
24
  msg.pCont = rpcMallocCont(sizeof(SMsgHead));
28,250✔
25
  if (msg.pCont == NULL) {
28,268!
26
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
27
  }
28
  msg.contLen = sizeof(SMsgHead);
28,268✔
29
  SMsgHead *pHead = msg.pCont;
28,268✔
30
  pHead->vgId = TD_VID(pTq->pVnode);
28,268✔
31
  pHead->contLen = msg.contLen;
28,268✔
32
  int32_t code = tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
28,268✔
33
  if (code != 0){
28,268!
34
    tqError("vgId:%d failed to push msg to queue, code:%d", TD_VID(pTq->pVnode), code);
×
35
    rpcFreeCont(msg.pCont);
×
36
  }
37
  return code;
28,268✔
38
}
39

40
int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
13,365,610✔
41
  int32_t code = 0;
13,365,610✔
42
  if (msgType == TDMT_VND_SUBMIT) {
13,365,610✔
43
    code = tqProcessSubmitReqForSubscribe(pTq);
13,058,556✔
44
    if (code != 0){
13,058,405!
45
      tqError("vgId:%d failed to process submit request for subscribe, code:%d", TD_VID(pTq->pVnode), code);
×
46
    }
47
  }
48

49
  streamMetaRLock(pTq->pStreamMeta);
13,365,459✔
50
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
13,366,383✔
51
  streamMetaRUnLock(pTq->pStreamMeta);
13,365,942✔
52

53
//  tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
54

55
  // push data for stream processing:
56
  // 1. the vnode has already been restored.
57
  // 2. the vnode should be the leader.
58
  // 3. the stream is not suspended yet.
59
  if ((!tsDisableStream) && (numOfTasks > 0)) {
13,366,286!
60
    code = tqScanWalAsync(pTq, true);
455,668✔
61
  }
62

63
  return code;
13,366,369✔
64
}
65

66
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
30,213✔
67
  int32_t    vgId = TD_VID(pTq->pVnode);
30,213✔
68
  STqHandle* pHandle = (STqHandle*)handle;
30,213✔
69

70
  if (pHandle->msg == NULL) {
30,213!
71
    pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
30,213✔
72
    if (pHandle->msg == NULL) {
30,213!
73
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
74
    }
75
    (void)memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
30,213✔
76
    pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
30,213✔
77
    if (pHandle->msg->pCont == NULL) {
30,213!
78
      taosMemoryFree(pHandle->msg);
×
79
      pHandle->msg = NULL;
×
80
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
81
    }
82
  } else {
UNCOV
83
    tqPushEmptyDataRsp(pHandle, vgId);
×
84

UNCOV
85
    void* tmp = pHandle->msg->pCont;
×
UNCOV
86
    (void)memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
×
UNCOV
87
    pHandle->msg->pCont = tmp;
×
88
  }
89

90
  (void)memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
30,213✔
91
  pHandle->msg->contLen = pMsg->contLen;
30,213✔
92
  int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
30,213✔
93
  tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
30,213!
94
          pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
95
  if (ret != 0) {
30,213!
UNCOV
96
    rpcFreeCont(pHandle->msg->pCont);
×
UNCOV
97
    taosMemoryFree(pHandle->msg);
×
UNCOV
98
    pHandle->msg = NULL;
×
99
  }
100
  return ret;
30,213✔
101
}
102

103
void tqUnregisterPushHandle(STQ* pTq, void *handle) {
3,601✔
104
  STqHandle *pHandle = (STqHandle*)handle;
3,601✔
105
  int32_t    vgId = TD_VID(pTq->pVnode);
3,601✔
106

107
  if(taosHashGetSize(pTq->pPushMgr) <= 0) {
3,601✔
108
    return;
2,821✔
109
  }
110
  int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
780✔
111
  tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
780!
112

113
  if(ret == 0 && pHandle->msg != NULL) {
780!
114
    tqPushEmptyDataRsp(pHandle, vgId);
760✔
115

116
    rpcFreeCont(pHandle->msg->pCont);
760✔
117
    taosMemoryFree(pHandle->msg);
760✔
118
    pHandle->msg = NULL;
760✔
119
  }
120
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc