• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / sql-runner / 15063404311

16 May 2025 07:46AM UTC coverage: 27.73% (+1.0%) from 26.721%
15063404311

push

github

jbeemster
Prepared for release

353 of 1273 relevant lines covered (27.73%)

2.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.2
/sql_runner/postgres_target.go
1
// Copyright (c) 2015-2025 Snowplow Analytics Ltd. All rights reserved.
2
//
3
// This program is licensed to you under the Apache License Version 2.0,
4
// and you may not use this file except in compliance with the Apache License Version 2.0.
5
// You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
6
//
7
// Unless required by applicable law or agreed to in writing,
8
// software distributed under the Apache License Version 2.0 is distributed on an
9
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10
// See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
11
package main
12

13
import (
14
        "context"
15
        "crypto/tls"
16
        "errors"
17
        "fmt"
18
        "log"
19
        "net"
20
        "os"
21
        "time"
22

23
        "github.com/go-pg/pg/v10"
24
        "github.com/go-pg/pg/v10/orm"
25
        "github.com/olekukonko/tablewriter"
26
)
27

28
// For Redshift queries
29
const (
30
        dialTimeout = 10 * time.Second
31
        readTimeout = 8 * time.Hour // TODO: make this user configurable
32
)
33

34
// PostgresTarget represents a Postgres as target.
35
type PostgresTarget struct {
36
        Target
37
        Client *pg.DB
38
}
39

40
// IsConnectable tests connection to determine whether the Postgres target is
41
// connectable.
42
func (pt PostgresTarget) IsConnectable() bool {
×
43
        client := pt.Client
×
44
        err := client.Ping(context.Background())
×
45

×
46
        return err == nil
×
47
}
×
48

49
// NewPostgresTarget returns a ptr to a PostgresTarget.
50
func NewPostgresTarget(target Target) (*PostgresTarget, error) {
5✔
51
        var tlsConfig *tls.Config
5✔
52
        if target.Ssl == true {
5✔
53
                tlsConfig = &tls.Config{
×
54
                        InsecureSkipVerify: true,
×
55
                }
×
56
        }
×
57

58
        if target.Host == "" || target.Port == "" || target.Username == "" || target.Database == "" {
9✔
59
                return nil, fmt.Errorf("missing target connection parameters")
4✔
60
        }
4✔
61

62
        db := pg.Connect(&pg.Options{
1✔
63
                Addr:        fmt.Sprintf("%s:%s", target.Host, target.Port),
1✔
64
                User:        target.Username,
1✔
65
                Password:    target.Password,
1✔
66
                Database:    target.Database,
1✔
67
                TLSConfig:   tlsConfig,
1✔
68
                DialTimeout: dialTimeout,
1✔
69
                ReadTimeout: readTimeout,
1✔
70
                Dialer: func(ctx context.Context, network, addr string) (net.Conn, error) {
1✔
71
                        cn, err := net.DialTimeout(network, addr, dialTimeout)
×
72
                        if err != nil {
×
73
                                return nil, err
×
74
                        }
×
75
                        return cn, cn.(*net.TCPConn).SetKeepAlive(true)
×
76
                },
77
        })
78

79
        return &PostgresTarget{target, db}, nil
1✔
80
}
81

82
// GetTarget returns the Target field of PostgresTarget.
83
func (pt PostgresTarget) GetTarget() Target {
×
84
        return pt.Target
×
85
}
×
86

87
// RunQuery runs a query against the target.
88
func (pt PostgresTarget) RunQuery(query ReadyQuery, dryRun bool, showQueryOutput bool) QueryStatus {
×
89
        var err error = nil
×
90
        var res orm.Result
×
91
        if dryRun {
×
92
                options := pt.Client.Options()
×
93
                address := options.Addr
×
94
                if pt.IsConnectable() {
×
95
                        log.Printf("SUCCESS: Able to connect to target database, %s\n.", address)
×
96
                } else {
×
97
                        log.Printf("ERROR: Cannot connect to target database, %s\n.", address)
×
98
                }
×
99
                return QueryStatus{query, query.Path, 0, nil}
×
100
        }
101

102
        affected := 0
×
103
        if showQueryOutput {
×
104
                var results Results
×
105
                res, err = pt.Client.Query(&results, query.Script)
×
106
                if err == nil {
×
107
                        affected = res.RowsAffected()
×
108
                } else {
×
109
                        log.Printf("ERROR: %s.", err)
×
110
                        return QueryStatus{query, query.Path, int(affected), err}
×
111
                }
×
112

113
                err = printTable(&results)
×
114
                if err != nil {
×
115
                        log.Printf("ERROR: %s.", err)
×
116
                        return QueryStatus{query, query.Path, int(affected), err}
×
117
                }
×
118
        } else {
×
119
                res, err = pt.Client.Exec(query.Script)
×
120
                if err == nil {
×
121
                        affected = res.RowsAffected()
×
122
                }
×
123
        }
124

125
        return QueryStatus{query, query.Path, affected, err}
×
126
}
127

128
func printTable(results *Results) error {
×
129
        columns := make([]string, len(results.columns))
×
130
        for k := range results.columns {
×
131
                columns[k] = results.columns[k]
×
132
        }
×
133

134
        if results.elements == 1 {
×
135
                if results.results[0][0] == "" {
×
136
                        return nil // blank output, edge case for asserts
×
137
                }
×
138
        } else if results.elements == 0 {
×
139
                return nil // break for no output
×
140
        }
×
141

142
        log.Printf("QUERY OUTPUT:\n")
×
143
        table := tablewriter.NewWriter(os.Stdout)
×
144
        table.SetHeader(columns)
×
145
        table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false})
×
146
        table.SetCenterSeparator("|")
×
147

×
148
        if len(results.columns) == 0 {
×
149
                return errors.New("Unable to read columns")
×
150
        }
×
151

152
        for _, row := range results.results {
×
153
                table.Append(row)
×
154
        }
×
155

156
        table.Render() // Send output
×
157
        return nil
×
158
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc