• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

startreedata / pinot-client-go / 14309588206

07 Apr 2025 12:47PM UTC coverage: 92.279% (+0.2%) from 92.052%
14309588206

push

github

web-flow
upgrade versions for the ci pipeline (#44)

490 of 531 relevant lines covered (92.28%)

6.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/pinot/jsonAsyncHTTPClientTransport.go
1
package pinot
2

3
import (
4
        "bytes"
5
        "encoding/json"
6
        "fmt"
7
        "io"
8
        "net/http"
9
        "strings"
10

11
        log "github.com/sirupsen/logrus"
12
)
13

14
var (
15
        defaultHTTPHeader = map[string]string{
16
                "Content-Type": "application/json; charset=utf-8",
17
        }
18
)
19

20
// jsonAsyncHTTPClientTransport is the impl of clientTransport
21
type jsonAsyncHTTPClientTransport struct {
22
        client *http.Client
23
        header map[string]string
24
}
25

26
func (t jsonAsyncHTTPClientTransport) buildQueryOptions(query *Request) string {
13✔
27
        queryOptions := ""
13✔
28
        if query.queryFormat == "sql" {
25✔
29
                queryOptions = "groupByMode=sql;responseFormat=sql"
12✔
30
        }
12✔
31
        if query.useMultistageEngine {
15✔
32
                if queryOptions != "" {
4✔
33
                        queryOptions += ";"
2✔
34
                }
2✔
35
                queryOptions += "useMultistageEngine=true"
2✔
36
        }
37
        if t.client.Timeout > 0 {
25✔
38
                if queryOptions != "" {
24✔
39
                        queryOptions += ";"
12✔
40
                }
12✔
41
                queryOptions += fmt.Sprintf("timeoutMs=%d", t.client.Timeout.Milliseconds())
12✔
42
        }
43
        return queryOptions
13✔
44
}
45

46
func (t jsonAsyncHTTPClientTransport) execute(brokerAddress string, query *Request) (*BrokerResponse, error) {
10✔
47
        url := fmt.Sprintf(getQueryTemplate(query.queryFormat, brokerAddress), brokerAddress)
10✔
48
        requestJSON := map[string]string{}
10✔
49
        requestJSON[query.queryFormat] = query.query
10✔
50
        queryOptions := t.buildQueryOptions(query)
10✔
51
        if queryOptions != "" {
20✔
52
                requestJSON["queryOptions"] = queryOptions
10✔
53
        }
10✔
54
        if query.trace {
11✔
55
                requestJSON["trace"] = "true"
1✔
56
        }
1✔
57
        jsonValue, err := json.Marshal(requestJSON)
10✔
58
        if err != nil {
10✔
59
                log.Error("Unable to marshal request to JSON. ", err)
×
60
                return nil, err
×
61
        }
×
62
        req, err := createHTTPRequest(url, jsonValue, t.header)
10✔
63
        if err != nil {
11✔
64
                return nil, err
1✔
65
        }
1✔
66
        resp, err := t.client.Do(req)
9✔
67
        if err != nil {
12✔
68
                return nil, fmt.Errorf("got exceptions during sending request. %v", err)
3✔
69
        }
3✔
70
        defer func() {
12✔
71
                if err := resp.Body.Close(); err != nil {
6✔
72
                        log.Error("Got exceptions during closing response body. ", err)
×
73
                }
×
74
        }()
75
        if resp.StatusCode == http.StatusOK {
11✔
76
                bodyBytes, err := io.ReadAll(resp.Body)
5✔
77
                if err != nil {
5✔
78
                        return nil, fmt.Errorf("unable to read Pinot response. %v", err)
×
79
                }
×
80
                var brokerResponse BrokerResponse
5✔
81
                if err = decodeJSONWithNumber(bodyBytes, &brokerResponse); err != nil {
9✔
82
                        return nil, fmt.Errorf("unable to unmarshal json response to a brokerResponse structure. %v", err)
4✔
83
                }
4✔
84
                return &brokerResponse, nil
1✔
85
        }
86
        return nil, fmt.Errorf("caught http exception when querying Pinot: %v", resp.Status)
1✔
87
}
88

89
func getQueryTemplate(queryFormat string, brokerAddress string) string {
16✔
90
        if queryFormat == "sql" {
29✔
91
                if strings.HasPrefix(brokerAddress, "http://") || strings.HasPrefix(brokerAddress, "https://") {
21✔
92
                        return "%s/query/sql"
8✔
93
                }
8✔
94
                return "http://%s/query/sql"
5✔
95
        }
96
        if strings.HasPrefix(brokerAddress, "http://") || strings.HasPrefix(brokerAddress, "https://") {
5✔
97
                return "%s/query"
2✔
98
        }
2✔
99
        return "http://%s/query"
1✔
100
}
101

102
func createHTTPRequest(url string, jsonValue []byte, extraHeader map[string]string) (*http.Request, error) {
14✔
103
        r, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonValue))
14✔
104
        if err != nil {
17✔
105
                return nil, fmt.Errorf("invalid HTTP request: %v", err)
3✔
106
        }
3✔
107
        for k, v := range defaultHTTPHeader {
22✔
108
                r.Header.Add(k, v)
11✔
109
        }
11✔
110
        for k, v := range extraHeader {
15✔
111
                r.Header.Add(k, v)
4✔
112
        }
4✔
113
        return r, nil
11✔
114
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc