• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3561

19 Dec 2024 03:15AM UTC coverage: 58.812% (-1.3%) from 60.124%
#3561

push

travis-ci

web-flow
Merge pull request #29213 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

130770 of 287658 branches covered (45.46%)

Branch coverage included in aggregate %.

32 of 78 new or added lines in 4 files covered. (41.03%)

7347 existing lines in 166 files now uncovered.

205356 of 283866 relevant lines covered (72.34%)

7187865.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.56
/source/dnode/vnode/src/tq/tqPush.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "vnd.h"
18

19
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
12,465,440✔
20
  if (pTq == NULL) {
12,465,440!
UNCOV
21
    return TSDB_CODE_INVALID_MSG;
×
22
  }
23
  if (taosHashGetSize(pTq->pPushMgr) <= 0) {
12,465,440✔
24
    return 0;
12,440,690✔
25
  }
26
  SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH};
24,821✔
27
  msg.pCont = rpcMallocCont(sizeof(SMsgHead));
24,821✔
28
  if (msg.pCont == NULL) {
24,823!
UNCOV
29
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
30
  }
31
  msg.contLen = sizeof(SMsgHead);
24,823✔
32
  SMsgHead *pHead = msg.pCont;
24,823✔
33
  pHead->vgId = TD_VID(pTq->pVnode);
24,823✔
34
  pHead->contLen = msg.contLen;
24,823✔
35
  int32_t code = tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
24,823✔
36
  if (code != 0){
24,823!
UNCOV
37
    tqError("vgId:%d failed to push msg to queue, code:%d", TD_VID(pTq->pVnode), code);
×
UNCOV
38
    rpcFreeCont(msg.pCont);
×
39
  }
40
  return code;
24,823✔
41
}
42

43
int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
12,635,135✔
44
  int32_t code = 0;
12,635,135✔
45
  if (msgType == TDMT_VND_SUBMIT) {
12,635,135✔
46
    code = tqProcessSubmitReqForSubscribe(pTq);
12,465,445✔
47
    if (code != 0){
12,465,511!
UNCOV
48
      tqError("vgId:%d failed to process submit request for subscribe, code:%d", TD_VID(pTq->pVnode), code);
×
49
    }
50
  }
51

52
  streamMetaRLock(pTq->pStreamMeta);
12,635,201✔
53
  int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
12,635,311✔
54
  streamMetaRUnLock(pTq->pStreamMeta);
12,635,288✔
55

56
//  tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
57

58
  // push data for stream processing:
59
  // 1. the vnode has already been restored.
60
  // 2. the vnode should be the leader.
61
  // 3. the stream is not suspended yet.
62
  if ((!tsDisableStream) && (numOfTasks > 0)) {
12,635,292✔
63
    code = tqScanWalAsync(pTq, true);
450,214✔
64
  }
65

66
  return code;
12,635,321✔
67
}
68

69
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
26,105✔
70
  if (pTq == NULL || handle == NULL || pMsg == NULL) {
26,105!
UNCOV
71
    return TSDB_CODE_INVALID_MSG;
×
72
  }
73
  int32_t    vgId = TD_VID(pTq->pVnode);
26,105✔
74
  STqHandle* pHandle = (STqHandle*)handle;
26,105✔
75

76
  if (pHandle->msg == NULL) {
26,105!
77
    pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
26,105!
78
    if (pHandle->msg == NULL) {
26,105!
79
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
80
    }
81
    (void)memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
26,105✔
82
    pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
26,105✔
83
    if (pHandle->msg->pCont == NULL) {
26,105!
UNCOV
84
      taosMemoryFree(pHandle->msg);
×
85
      pHandle->msg = NULL;
×
86
      return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
87
    }
88
  } else {
UNCOV
89
    tqPushEmptyDataRsp(pHandle, vgId);
×
90

UNCOV
91
    void* tmp = pHandle->msg->pCont;
×
UNCOV
92
    (void)memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
×
UNCOV
93
    pHandle->msg->pCont = tmp;
×
94
  }
95

96
  (void)memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
26,105✔
97
  pHandle->msg->contLen = pMsg->contLen;
26,105✔
98
  int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
26,105✔
99
  tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
26,105!
100
          pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
101
  if (ret != 0) {
26,105!
UNCOV
102
    rpcFreeCont(pHandle->msg->pCont);
×
UNCOV
103
    taosMemoryFree(pHandle->msg);
×
UNCOV
104
    pHandle->msg = NULL;
×
105
  }
106
  return ret;
26,105✔
107
}
108

109
void tqUnregisterPushHandle(STQ* pTq, void *handle) {
2,452✔
110
  if (pTq == NULL || handle == NULL) {
2,452!
UNCOV
111
    return;
×
112
  }
113
  STqHandle *pHandle = (STqHandle*)handle;
2,452✔
114
  int32_t    vgId = TD_VID(pTq->pVnode);
2,452✔
115

116
  if(taosHashGetSize(pTq->pPushMgr) <= 0) {
2,452✔
117
    return;
1,778✔
118
  }
119
  int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
674✔
120
  tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
674!
121

122
  if(ret == 0 && pHandle->msg != NULL) {
674!
123
    tqPushEmptyDataRsp(pHandle, vgId);
653✔
124

125
    rpcFreeCont(pHandle->msg->pCont);
653✔
126
    taosMemoryFree(pHandle->msg);
653!
127
    pHandle->msg = NULL;
653✔
128
  }
129
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc