blocky/lists/list_cache.go

280 lines
6.9 KiB
Go
Raw Normal View History

2020-01-12 18:23:35 +01:00
package lists
//go:generate go run github.com/abice/go-enum -f=$GOFILE --marshal --names
2020-01-12 18:23:35 +01:00
import (
"context"
"errors"
2020-01-12 18:23:35 +01:00
"fmt"
"net"
2020-01-12 18:23:35 +01:00
"github.com/sirupsen/logrus"
"github.com/0xERR0R/blocky/cache/stringcache"
2023-04-17 18:21:56 +02:00
"github.com/0xERR0R/blocky/config"
2021-08-25 22:06:34 +02:00
"github.com/0xERR0R/blocky/evt"
"github.com/0xERR0R/blocky/lists/parsers"
2021-08-25 22:06:34 +02:00
"github.com/0xERR0R/blocky/log"
2023-04-17 18:21:56 +02:00
"github.com/ThinkChaos/parcour"
"github.com/ThinkChaos/parcour/jobgroup"
2020-01-12 18:23:35 +01:00
)
const (
groupProducersBufferCap = 1000
regexWarningThreshold = 500
)
2021-09-09 22:57:05 +02:00
// ListCacheType represents the type of cached list ENUM(
// denylist // is a list with blocked domains
// allowlist // is a list with allowlisted domains / IPs
2021-09-09 22:57:05 +02:00
// )
type ListCacheType int
// Matcher checks if a domain is in a list
2020-01-12 18:23:35 +01:00
type Matcher interface {
// Match matches passed domain name against cached list entries
Match(domain string, groupsToCheck []string) (groups []string)
2020-01-12 18:23:35 +01:00
}
// ListCache generic cache of strings divided in groups
2020-01-12 18:23:35 +01:00
type ListCache struct {
groupedCache stringcache.GroupedStringCache
regexCache stringcache.GroupedStringCache
2020-01-12 18:23:35 +01:00
cfg config.SourceLoading
2023-04-17 18:21:56 +02:00
listType ListCacheType
groupSources map[string][]config.BytesSource
downloader FileDownloader
2020-01-12 18:23:35 +01:00
}
refactor: configuration rework (usage and printing) (#920) * refactor: make `config.Duration` a struct with `time.Duration` embed Allows directly calling `time.Duration` methods. * refactor(HostsFileResolver): don't copy individual config items The idea is to make adding configuration options easier, and searching for references straight forward. * refactor: move config printing to struct and use a logger Using a logger allows using multiple levels so the whole configuration can be printed in trace/verbose mode, but only important parts are shown by default. * squash: rename `Cast` to `ToDuration` * squash: revert `Duration` to a simple wrapper ("new type" pattern) * squash: `Duration.IsZero` tests * squash: refactor resolvers to rely on their config directly if possible * squash: implement `IsEnabled` and `LogValues` for all resolvers * refactor: use go-enum `--values` to simplify getting all log fields * refactor: simplify `QType` unmarshaling * squash: rename `ValueLogger` to `Configurable` * squash: rename `UpstreamConfig` to `ParallelBestConfig` * squash: rename `RewriteConfig` to `RewriterConfig` * squash: config tests * squash: resolver tests * squash: add `ForEach` test and improve `Chain` ones * squash: simplify implementing `config.Configurable` * squash: minor changes for better coverage * squash: more `UnmarshalYAML` -> `UnmarshalText` * refactor: move `config.Upstream` into own file * refactor: add `Resolver.Type` method * squash: add `log` method to `typed` to use `Resolover.Type` as prefix * squash: tweak startup config logging * squash: add `LogResolverConfig` tests * squash: make sure all options of type `Duration` use `%s`
2023-03-12 22:14:10 +01:00
// LogConfig implements `config.Configurable`.
func (b *ListCache) LogConfig(logger *logrus.Entry) {
total := 0
regexes := 0
2020-01-12 18:23:35 +01:00
2023-04-17 18:21:56 +02:00
for group := range b.groupSources {
count := b.groupedCache.ElementCount(group)
logger.Infof("%s: %d entries", group, count)
total += count
regexes += b.regexCache.ElementCount(group)
}
if regexes > regexWarningThreshold {
logger.Warnf(
"REGEXES: %d !! High use of regexes is not recommended: they use a lot of memory and are very slow to search",
regexes,
)
2020-01-12 18:23:35 +01:00
}
refactor: configuration rework (usage and printing) (#920) * refactor: make `config.Duration` a struct with `time.Duration` embed Allows directly calling `time.Duration` methods. * refactor(HostsFileResolver): don't copy individual config items The idea is to make adding configuration options easier, and searching for references straight forward. * refactor: move config printing to struct and use a logger Using a logger allows using multiple levels so the whole configuration can be printed in trace/verbose mode, but only important parts are shown by default. * squash: rename `Cast` to `ToDuration` * squash: revert `Duration` to a simple wrapper ("new type" pattern) * squash: `Duration.IsZero` tests * squash: refactor resolvers to rely on their config directly if possible * squash: implement `IsEnabled` and `LogValues` for all resolvers * refactor: use go-enum `--values` to simplify getting all log fields * refactor: simplify `QType` unmarshaling * squash: rename `ValueLogger` to `Configurable` * squash: rename `UpstreamConfig` to `ParallelBestConfig` * squash: rename `RewriteConfig` to `RewriterConfig` * squash: config tests * squash: resolver tests * squash: add `ForEach` test and improve `Chain` ones * squash: simplify implementing `config.Configurable` * squash: minor changes for better coverage * squash: more `UnmarshalYAML` -> `UnmarshalText` * refactor: move `config.Upstream` into own file * refactor: add `Resolver.Type` method * squash: add `log` method to `typed` to use `Resolover.Type` as prefix * squash: tweak startup config logging * squash: add `LogResolverConfig` tests * squash: make sure all options of type `Duration` use `%s`
2023-03-12 22:14:10 +01:00
logger.Infof("TOTAL: %d entries", total)
2020-01-12 18:23:35 +01:00
}
// NewListCache creates new list instance
func NewListCache(ctx context.Context,
t ListCacheType, cfg config.SourceLoading,
2023-04-17 18:21:56 +02:00
groupSources map[string][]config.BytesSource, downloader FileDownloader,
) (*ListCache, error) {
regexCache := stringcache.NewInMemoryGroupedRegexCache()
2023-04-17 18:21:56 +02:00
c := &ListCache{
groupedCache: stringcache.NewChainedGroupedCache(
regexCache,
stringcache.NewInMemoryGroupedWildcardCache(), // must be after regex which can contain '*'
stringcache.NewInMemoryGroupedStringCache(), // accepts all values, must be last
),
regexCache: regexCache,
2023-04-17 18:21:56 +02:00
cfg: cfg,
listType: t,
groupSources: groupSources,
downloader: downloader,
}
2021-10-13 21:40:18 +02:00
err := cfg.StartPeriodicRefresh(ctx, c.refresh, func(err error) {
2023-04-17 18:21:56 +02:00
logger().WithError(err).Errorf("could not init %s", t)
})
if err != nil {
return nil, err
}
2021-10-13 21:40:18 +02:00
2023-04-17 18:21:56 +02:00
return c, nil
2020-01-12 18:23:35 +01:00
}
func logger() *logrus.Entry {
return log.PrefixedLog("list_cache")
2020-01-12 18:23:35 +01:00
}
2023-04-17 18:21:56 +02:00
// Match matches passed domain name against cached list entries
func (b *ListCache) Match(domain string, groupsToCheck []string) (groups []string) {
return b.groupedCache.Contains(domain, groupsToCheck)
}
2020-01-12 18:23:35 +01:00
2023-04-17 18:21:56 +02:00
// Refresh triggers the refresh of a list
func (b *ListCache) Refresh() error {
return b.refresh(context.Background())
2023-04-17 18:21:56 +02:00
}
2020-01-12 18:23:35 +01:00
2023-04-17 18:21:56 +02:00
func (b *ListCache) refresh(ctx context.Context) error {
unlimitedGrp, _ := jobgroup.WithContext(ctx)
defer unlimitedGrp.Close()
2023-04-17 18:21:56 +02:00
producersGrp := jobgroup.WithMaxConcurrency(unlimitedGrp, b.cfg.Concurrency)
defer producersGrp.Close()
2023-04-17 18:21:56 +02:00
for group, sources := range b.groupSources {
group, sources := group, sources
2023-04-17 18:21:56 +02:00
unlimitedGrp.Go(func(ctx context.Context) error {
err := b.createCacheForGroup(producersGrp, unlimitedGrp, group, sources)
if err != nil {
count := b.groupedCache.ElementCount(group)
2023-04-17 18:21:56 +02:00
logger := logger().WithFields(logrus.Fields{
"group": group,
"total_count": count,
})
2023-04-17 18:21:56 +02:00
if count == 0 {
logger.Warn("Populating of group cache failed, cache will be empty until refresh succeeds")
} else {
logger.Warn("Populating of group cache failed, using existing cache, if any")
}
2023-04-17 18:21:56 +02:00
return err
}
2023-04-17 18:21:56 +02:00
count := b.groupedCache.ElementCount(group)
2023-04-17 18:21:56 +02:00
evt.Bus().Publish(evt.BlockingCacheGroupChanged, b.listType, group, count)
2023-04-17 18:21:56 +02:00
logger().WithFields(logrus.Fields{
"group": group,
"total_count": count,
}).Info("group import finished")
2020-01-12 18:23:35 +01:00
2023-04-17 18:21:56 +02:00
return nil
})
}
2023-04-17 18:21:56 +02:00
return unlimitedGrp.Wait()
2020-01-12 18:23:35 +01:00
}
2023-04-17 18:21:56 +02:00
func (b *ListCache) createCacheForGroup(
producersGrp, consumersGrp jobgroup.JobGroup, group string, sources []config.BytesSource,
) error {
groupFactory := b.groupedCache.Refresh(group)
2020-01-12 18:23:35 +01:00
2023-04-17 18:21:56 +02:00
producers := parcour.NewProducersWithBuffer[string](producersGrp, consumersGrp, groupProducersBufferCap)
defer producers.Close()
2023-04-17 18:21:56 +02:00
for i, source := range sources {
i, source := i, source
2021-10-13 21:40:18 +02:00
2023-04-17 18:21:56 +02:00
producers.GoProduce(func(ctx context.Context, hostsChan chan<- string) error {
locInfo := fmt.Sprintf("item #%d of group %s", i, group)
2021-10-13 21:40:18 +02:00
2023-04-17 18:21:56 +02:00
opener, err := NewSourceOpener(locInfo, source, b.downloader)
if err != nil {
return err
}
2023-04-17 18:21:56 +02:00
return b.parseFile(ctx, opener, hostsChan)
})
}
2023-04-17 18:21:56 +02:00
hasEntries := false
producers.GoConsume(func(ctx context.Context, ch <-chan string) error {
for host := range ch {
if groupFactory.AddEntry(host) {
hasEntries = true
} else {
logger().WithField("host", host).Warn("no list cache was able to use host")
}
}
2021-10-13 21:40:18 +02:00
2023-04-17 18:21:56 +02:00
return nil
})
2023-04-17 18:21:56 +02:00
err := producers.Wait()
if err != nil {
if !hasEntries {
// Always fail the group if no entries were parsed
return err
}
2021-10-13 21:40:18 +02:00
2023-04-17 18:21:56 +02:00
var transientErr *TransientError
2020-01-12 18:23:35 +01:00
2023-04-17 18:21:56 +02:00
if errors.As(err, &transientErr) {
// Temporary error: fail the whole group to retry later
return err
}
}
groupFactory.Finish()
2020-01-12 18:23:35 +01:00
2023-04-17 18:21:56 +02:00
return nil
2020-01-12 18:23:35 +01:00
}
// downloads file (or reads local file) and writes each line in the file to the result channel
2023-04-17 18:21:56 +02:00
func (b *ListCache) parseFile(ctx context.Context, opener SourceOpener, resultCh chan<- string) error {
count := 0
2020-01-12 18:23:35 +01:00
logger := func() *logrus.Entry {
return logger().WithFields(logrus.Fields{
2023-04-17 18:21:56 +02:00
"source": opener.String(),
"count": count,
})
}
2020-01-12 18:23:35 +01:00
2023-04-17 18:21:56 +02:00
logger().Debug("starting processing of source")
r, err := opener.Open(ctx)
2020-01-12 18:23:35 +01:00
if err != nil {
logger().Error("cannot open source: ", err)
return err
2020-01-12 18:23:35 +01:00
}
defer r.Close()
2023-04-17 18:21:56 +02:00
p := parsers.AllowErrors(parsers.Hosts(r), b.cfg.MaxErrorsPerSource)
p.OnErr(func(err error) {
logger().Warnf("parse error: %s, trying to continue", err)
})
err = parsers.ForEach[*parsers.HostsIterator](ctx, p, func(hosts *parsers.HostsIterator) error {
return hosts.ForEach(func(host string) error {
count++
2020-01-12 18:23:35 +01:00
// For IPs, we want to ensure the string is the Go representation so that when
// we compare responses, a same IP matches, even if it was written differently
// in the list.
if ip := net.ParseIP(host); ip != nil {
host = ip.String()
}
2020-01-12 18:23:35 +01:00
resultCh <- host
return nil
})
})
if err != nil {
// Don't log cancelation: it was caused by another goroutine failing
if !errors.Is(err, context.Canceled) {
logger().Error("parse error: ", err)
2020-01-12 18:23:35 +01:00
}
// Only propagate the error if no entries were parsed
// If the file was partially parsed, we'll settle for that
if count == 0 {
return err
}
return nil
2020-01-12 18:23:35 +01:00
}
logger().Info("import succeeded")
return nil
}