• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

umputun / feed-master / 21549514637

31 Jan 2026 07:13PM UTC coverage: 73.69% (-0.9%) from 74.577%
21549514637

push

github

umputun
ci: update Go version to 1.25 in build workflow

go.mod requires Go 1.25 but CI was configured for 1.22, causing build failures.

1420 of 1927 relevant lines covered (73.69%)

10.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.36
/app/proc/processor.go
1
// Package proc provided the primary blocking loop
2
// updating from sources and making feeds
3
package proc
4

5
import (
6
        "context"
7
        "fmt"
8
        "time"
9

10
        log "github.com/go-pkgz/lgr"
11
        "github.com/go-pkgz/repeater"
12
        "github.com/go-pkgz/syncs"
13

14
        "github.com/umputun/feed-master/app/config"
15
        "github.com/umputun/feed-master/app/feed"
16
)
17

18
//go:generate moq -out mocks/telegram_notif.go -pkg mocks -skip-ensure -fmt goimports . TelegramNotif
19
//go:generate moq -out mocks/twitter_notif.go -pkg mocks -skip-ensure -fmt goimports . TwitterNotif
20

21
// TelegramNotif is interface to send messages to telegram
22
type TelegramNotif interface {
23
        Send(chanID string, item feed.Item) error
24
}
25

26
// TwitterNotif is interface to send message to twitter
27
type TwitterNotif interface {
28
        Send(item feed.Item) error
29
}
30

31
// Processor is a feed reader and store writer
32
type Processor struct {
33
        Conf          *config.Conf
34
        Store         *BoltDB
35
        TelegramNotif TelegramNotif
36
        TwitterNotif  TwitterNotif
37
}
38

39
// Do activate loop of goroutine for each feed, concurrency limited by p.Conf.Concurrent
40
func (p *Processor) Do(ctx context.Context) error {
4✔
41
        log.Printf("[INFO] activate processor, feeds=%d, %+v", len(p.Conf.Feeds), p.Conf)
4✔
42

4✔
43
        for {
16✔
44
                select {
12✔
45
                case <-ctx.Done():
4✔
46
                        return fmt.Errorf("processor stopped: %w", ctx.Err())
4✔
47
                default:
8✔
48
                        p.processFeeds(ctx)
8✔
49
                }
50
        }
51
}
52

53
func (p *Processor) processFeeds(ctx context.Context) {
8✔
54
        log.Printf("[DEBUG] refresh started")
8✔
55
        swg := syncs.NewSizedGroup(p.Conf.System.Concurrent, syncs.Preemptive, syncs.Context(ctx))
8✔
56
        for name, fm := range p.Conf.Feeds {
16✔
57
                for _, src := range fm.Sources {
16✔
58
                        swg.Go(func(context.Context) {
16✔
59
                                p.processFeed(name, src.URL, fm.TelegramChannel, p.Conf.System.MaxItems, fm.Filter)
8✔
60
                        })
8✔
61
                }
62
        }
63
        swg.Wait()
8✔
64
        log.Printf("[DEBUG] refresh completed")
8✔
65
        time.Sleep(p.Conf.System.UpdateInterval)
8✔
66
}
67

68
func (p *Processor) processFeed(name, url, telegramChannel string, maximum int, filter config.Filter) {
8✔
69
        rss, err := feed.Parse(url)
8✔
70
        if err != nil {
8✔
71
                log.Printf("[WARN] failed to parse %s, %v", url, err)
×
72
                return
×
73
        }
×
74

75
        // up to MaxItems (5) items from each feed
76
        upto := min(len(rss.ItemList), maximum)
8✔
77

8✔
78
        for _, item := range rss.ItemList[:upto] {
40✔
79
                // skip 1y and older
32✔
80
                if item.DT.Before(time.Now().AddDate(-1, 0, 0)) {
36✔
81
                        continue
4✔
82
                }
83

84
                skip, err := filter.Skip(item)
28✔
85
                if err != nil {
28✔
86
                        log.Printf("[WARN] failed to filter %s (%s) to %s, save as is, %v", item.GUID, item.PubDate, name, err)
×
87
                }
×
88
                if skip {
30✔
89
                        item.Junk = true
2✔
90
                        log.Printf("[INFO] filtered %s (%s), %s %s", item.GUID, item.PubDate, name, item.Title)
2✔
91
                }
2✔
92

93
                created, err := p.Store.Save(name, item)
28✔
94
                if err != nil {
28✔
95
                        log.Printf("[WARN] failed to save %s (%s) to %s, %v", item.GUID, item.PubDate, name, err)
×
96
                }
×
97

98
                // don't attempt to send anything if the entry was already saved
99
                // or in case it was filtered out
100
                if !created || item.Junk {
45✔
101
                        continue
17✔
102
                }
103

104
                rptr := repeater.NewDefault(3, 5*time.Second)
11✔
105
                attemptNum := 0
11✔
106
                err = rptr.Do(context.Background(), func() error {
22✔
107
                        attemptNum++
11✔
108
                        startTime := time.Now()
11✔
109
                        log.Printf("[DEBUG] sending telegram message (attempt %d/3): title=%q, size=%d bytes, url=%s to channel=%s",
11✔
110
                                attemptNum, item.Title, item.Enclosure.Length, item.Enclosure.URL, telegramChannel)
11✔
111

11✔
112
                        if e := p.TelegramNotif.Send(telegramChannel, item); e != nil {
11✔
113
                                elapsed := time.Since(startTime)
×
114
                                log.Printf("[WARN] failed attempt %d/3 to send telegram message after %v: title=%q, size=%d bytes, url=%s to channel=%s, error=%v",
×
115
                                        attemptNum, elapsed, item.Title, item.Enclosure.Length, item.Enclosure.URL, telegramChannel, e)
×
116
                                return fmt.Errorf("telegram send: %w", e)
×
117
                        }
×
118

119
                        elapsed := time.Since(startTime)
11✔
120
                        log.Printf("[INFO] successfully sent telegram message in %v: title=%q, size=%d bytes",
11✔
121
                                elapsed, item.Title, item.Enclosure.Length)
11✔
122
                        return nil
11✔
123
                })
124
                if err != nil {
11✔
125
                        log.Printf("[WARN] failed to send telegram message after 3 attempts: title=%q, size=%d bytes, url=%s to channel=%s, final_error=%v",
×
126
                                item.Title, item.Enclosure.Length, item.Enclosure.URL, telegramChannel, err)
×
127
                }
×
128

129
                if err := p.TwitterNotif.Send(item); err != nil {
11✔
130
                        log.Printf("[WARN] failed send twitter message, url=%s, %v", item.Enclosure.URL, err)
×
131
                }
×
132
        }
133

134
        // keep up to MaxKeepInDB items in bucket
135
        if removed, err := p.Store.removeOld(name, p.Conf.System.MaxKeepInDB); err == nil {
16✔
136
                if removed > 0 {
9✔
137
                        log.Printf("[DEBUG] removed %d from %s", removed, name)
1✔
138
                }
1✔
139
        } else {
×
140
                log.Printf("[WARN] failed to remove, %v", err)
×
141
        }
×
142
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc