• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

m-lab / etl / 7551

22 Oct 2024 03:54PM UTC coverage: 67.258% (-0.1%) from 67.366%
7551

Pull #1133

travis-pro

web-flow
Merge 0597bc1cc into ea7137600
Pull Request #1133: Prevent scamper1 rows that are too large

2 of 5 new or added lines in 1 file covered. (40.0%)

4 existing lines in 1 file now uncovered.

3338 of 4963 relevant lines covered (67.26%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.84
/parser/scamper1.go
1
package parser
2

3
import (
4
        "fmt"
5
        "strings"
6
        "time"
7

8
        "github.com/m-lab/etl/etl"
9
        "github.com/m-lab/etl/metrics"
10
        "github.com/m-lab/etl/row"
11
        "github.com/m-lab/etl/schema"
12
        "github.com/m-lab/traceroute-caller/parser"
13
)
14

15
//=====================================================================================
16
//                       scamper1 Parser
17
//=====================================================================================
18

19
const (
20
        scamper1 = "scamper1"
21

22
        // BigQuery rows must be under 100MB. Use 90MB to allow for JSON export.
23
        // This is already an unusually large trace file.
24
        maxRowSize = 90000000
25
)
26

27
// Scamper1Parser handles parsing for the scamper1 datatype.
28
type Scamper1Parser struct {
29
        *row.Base
30
        table  string
31
        suffix string
32
}
33

34
// NewScamper1Parser returns a new parser for the scamper1 archives.
35
func NewScamper1Parser(sink row.Sink, table, suffix string) etl.Parser {
1✔
36
        bufSize := etl.SCAMPER1.BQBufferSize()
1✔
37
        return &Scamper1Parser{
1✔
38
                Base:   row.NewBase(table, sink, bufSize),
1✔
39
                table:  table,
1✔
40
                suffix: suffix,
1✔
41
        }
1✔
42
}
1✔
43

44
// parseTracelb parses the TracelbLine struct defined in traceroute-caller and populates the BQTracelbLine.
45
func parseTracelb(bqScamperOutput *schema.BQScamperOutput, tracelb parser.TracelbLine) {
1✔
46
        bqScamperOutput.Tracelb = schema.BQTracelbLine{
1✔
47
                Type:        tracelb.Type,
1✔
48
                Version:     tracelb.Version,
1✔
49
                Userid:      tracelb.Userid,
1✔
50
                Method:      tracelb.Method,
1✔
51
                Src:         tracelb.Src,
1✔
52
                Dst:         tracelb.Dst,
1✔
53
                Start:       tracelb.Start,
1✔
54
                ProbeSize:   tracelb.ProbeSize,
1✔
55
                Firsthop:    tracelb.Firsthop,
1✔
56
                Attempts:    tracelb.Attempts,
1✔
57
                Confidence:  tracelb.Confidence,
1✔
58
                Tos:         tracelb.Tos,
1✔
59
                Gaplimit:    tracelb.Gaplint,
1✔
60
                WaitTimeout: tracelb.WaitTimeout,
1✔
61
                WaitProbe:   tracelb.WaitProbe,
1✔
62
                Probec:      tracelb.Probec,
1✔
63
                ProbecMax:   tracelb.ProbecMax,
1✔
64
                Nodec:       tracelb.Nodec,
1✔
65
                Linkc:       tracelb.Linkc,
1✔
66
        }
1✔
67

1✔
68
        nodes := tracelb.Nodes
1✔
69
        bqScamperOutput.Tracelb.Nodes = make([]schema.BQScamperNode, 0, len(nodes))
1✔
70

1✔
71
        for _, node := range nodes {
2✔
72
                bqLinkArray := make([]schema.BQScamperLinkArray, 0, len(node.Links))
1✔
73
                for _, link := range node.Links {
2✔
74
                        bqLinks := schema.BQScamperLinkArray{}
1✔
75
                        bqLinks.Links = make([]parser.ScamperLink, len(link))
1✔
76
                        copy(bqLinks.Links, link)
1✔
77
                        bqLinkArray = append(bqLinkArray, bqLinks)
1✔
78
                }
1✔
79

80
                bqScamperNode := schema.BQScamperNode{
1✔
81
                        HopID: GetHopID(bqScamperOutput.CycleStart.StartTime, bqScamperOutput.CycleStart.Hostname,
1✔
82
                                node.Addr),
1✔
83
                        Addr:  node.Addr,
1✔
84
                        Name:  node.Name,
1✔
85
                        QTTL:  node.QTTL,
1✔
86
                        Linkc: node.Linkc,
1✔
87
                        Links: bqLinkArray,
1✔
88
                }
1✔
89
                bqScamperOutput.Tracelb.Nodes = append(bqScamperOutput.Tracelb.Nodes, bqScamperNode)
1✔
90
        }
91
}
92

93
// IsParsable returns the canonical test type and whether to parse data.
94
func (p *Scamper1Parser) IsParsable(testName string, data []byte) (string, bool) {
1✔
95
        if strings.HasSuffix(testName, "jsonl") {
2✔
96
                return scamper1, true
1✔
97
        }
1✔
98
        return "", false
1✔
99
}
100

101
// ParseAndInsert decodes the scamper1 data and inserts it into BQ.
102
func (p *Scamper1Parser) ParseAndInsert(meta etl.Metadata, testName string, rawContent []byte) error {
1✔
103
        metrics.WorkerState.WithLabelValues(p.TableName(), scamper1).Inc()
1✔
104
        defer metrics.WorkerState.WithLabelValues(p.TableName(), scamper1).Dec()
1✔
105

1✔
106
        // BigQuery rows must be under 100MB.
1✔
107
        if len(rawContent) > maxRowSize {
1✔
NEW
108
                metrics.TestTotal.WithLabelValues(p.TableName(), scamper1, "row too big").Inc()
×
NEW
109
                return fmt.Errorf("row size too big")
×
NEW
110
        }
×
111

112
        trcParser, err := parser.New("mda")
1✔
113
        if err != nil {
1✔
114
                metrics.TestTotal.WithLabelValues(p.TableName(), scamper1, err.Error()).Inc()
×
115
                return fmt.Errorf("failed to initialize traceroute parser, error: %w", err)
×
116
        }
×
117

118
        rawData, err := trcParser.ParseRawData(rawContent)
1✔
119
        archiveURL := meta.ArchiveURL
1✔
120
        if err != nil {
2✔
121
                metrics.TestTotal.WithLabelValues(p.TableName(), scamper1, err.Error()).Inc()
1✔
122
                return fmt.Errorf("failed to parse scamper1 file: %s, archiveURL: %s, error: %w", testName, archiveURL, err)
1✔
123
        }
1✔
124

125
        scamperOutput, ok := rawData.(parser.Scamper1)
1✔
126
        if !ok {
1✔
127
                metrics.TestTotal.WithLabelValues(p.TableName(), scamper1, "failed to convert ParsedData to Scamper1 object").Inc()
×
128
                return fmt.Errorf("failed to convert ParsedData to Scamper1 object for file: %s", testName)
×
129
        }
×
130

131
        bqScamperOutput := schema.BQScamperOutput{
1✔
132
                Metadata:   scamperOutput.Metadata,
1✔
133
                CycleStart: scamperOutput.CycleStart,
1✔
134
                CycleStop:  scamperOutput.CycleStop,
1✔
135
        }
1✔
136
        parseTracelb(&bqScamperOutput, scamperOutput.Tracelb)
1✔
137

1✔
138
        parseInfo := schema.ParseInfo{
1✔
139
                Version:     meta.Version,
1✔
140
                Time:        time.Now(),
1✔
141
                ArchiveURL:  meta.ArchiveURL,
1✔
142
                Filename:    testName,
1✔
143
                GitCommit:   meta.GitCommit,
1✔
144
                ArchiveSize: meta.ArchiveSize,
1✔
145
                FileSize:    int64(len(rawContent)),
1✔
146
        }
1✔
147

1✔
148
        row := schema.Scamper1Row{
1✔
149
                ID:     bqScamperOutput.Metadata.UUID,
1✔
150
                Parser: parseInfo,
1✔
151
                Date:   meta.Date,
1✔
152
                Raw:    bqScamperOutput,
1✔
153
        }
1✔
154

1✔
155
        // Insert the row.
1✔
156
        if err := p.Put(&row); err != nil {
1✔
157
                return err
×
158
        }
×
159

160
        // Count successful inserts.
161
        metrics.TestTotal.WithLabelValues(p.TableName(), scamper1, "ok").Inc()
1✔
162

1✔
163
        return nil
1✔
164
}
165

166
// NB: These functions are also required to complete the etl.Parser interface
167
// For scamper1, we just forward the calls to the Inserter.
168

169
// Flush flushes any pending rows.
170
func (p *Scamper1Parser) Flush() error {
1✔
171
        return p.Base.Flush()
1✔
172
}
1✔
173

174
// TableName of the table that this Parser inserts into.
175
// Used for metrics and logging.
176
func (p *Scamper1Parser) TableName() string {
1✔
177
        return p.table
1✔
178
}
1✔
179

180
// FullTableName of the BQ table that the uploader pushes to,
181
// including $YYYYMMNN, or _YYYYMMNN.
182
func (p *Scamper1Parser) FullTableName() string {
×
183
        return p.table + p.suffix
×
184
}
×
185

186
// RowsInBuffer returns the count of rows currently in the buffer.
187
func (p *Scamper1Parser) RowsInBuffer() int {
×
188
        return p.GetStats().Pending
×
189
}
×
190

191
// Committed returns the count of rows successfully committed to BQ.
192
func (p *Scamper1Parser) Committed() int {
×
193
        return p.GetStats().Committed
×
194
}
×
195

196
// Accepted returns the count of all rows received through InsertRow(s).
197
func (p *Scamper1Parser) Accepted() int {
1✔
198
        return p.GetStats().Total()
1✔
199
}
1✔
200

201
// Failed returns the count of all rows that could not be committed.
202
func (p *Scamper1Parser) Failed() int {
×
203
        return p.GetStats().Failed
×
204
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc