mirror of https://github.com/0xERR0R/blocky.git
added transformAndPublish
This commit is contained in:
parent
6a899075b6
commit
86071445e5
|
@ -2,7 +2,6 @@ package resolver
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -119,28 +118,7 @@ func (r *CachingResolver) reloadCacheEntry(ctx context.Context, cacheKey string)
|
|||
return nil, 0
|
||||
}
|
||||
|
||||
if response.Res.Rcode == dns.RcodeSuccess {
|
||||
respCopy := response.Res.Copy()
|
||||
|
||||
// don't cache any EDNS OPT records
|
||||
util.RemoveEdns0Record(respCopy)
|
||||
|
||||
packed, err := respCopy.Pack()
|
||||
if err != nil {
|
||||
logger.Error("unable to pack response", err)
|
||||
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
if r.redisClient != nil {
|
||||
res := *respCopy
|
||||
r.redisClient.PublishCache(cacheKey, &res)
|
||||
}
|
||||
|
||||
return &packed, r.adjustTTLs(response.Res.Answer)
|
||||
}
|
||||
|
||||
return nil, 0
|
||||
return r.transformAndPublish(ctx, cacheKey, response, true)
|
||||
}
|
||||
|
||||
func (r *CachingResolver) redisSubscriber(ctx context.Context) {
|
||||
|
@ -151,8 +129,7 @@ func (r *CachingResolver) redisSubscriber(ctx context.Context) {
|
|||
case rc := <-r.redisClient.CacheChannel:
|
||||
if rc != nil {
|
||||
logger.Debug("Received key from redis: ", rc.Key)
|
||||
ttl := r.adjustTTLs(rc.Response.Res.Answer)
|
||||
r.putInCache(ctx, rc.Key, rc.Response, ttl, false)
|
||||
r.putInCache(ctx, rc.Key, rc.Response, false)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
|
@ -205,8 +182,7 @@ func (r *CachingResolver) Resolve(ctx context.Context, request *model.Request) (
|
|||
response, err = r.next.Resolve(ctx, request)
|
||||
|
||||
if err == nil {
|
||||
cacheTTL := r.adjustTTLs(response.Res.Answer)
|
||||
r.putInCache(ctx, cacheKey, response, cacheTTL, true)
|
||||
r.putInCache(ctx, cacheKey, response, true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -262,33 +238,60 @@ func isResponseCacheable(msg *dns.Msg) bool {
|
|||
return !msg.Truncated && !msg.CheckingDisabled
|
||||
}
|
||||
|
||||
func (r *CachingResolver) putInCache(
|
||||
ctx context.Context, cacheKey string, response *model.Response, ttl time.Duration, publish bool,
|
||||
) {
|
||||
// transformAndPublish transforms the response to a byte array and publishes it to redis if publish is true
|
||||
// and redis is enabled. Returns the byte array and the TTL of the response
|
||||
func (r *CachingResolver) transformAndPublish(ctx context.Context, cacheKey string,
|
||||
response *model.Response, publish bool,
|
||||
) (*[]byte, time.Duration) {
|
||||
if response.Res.Rcode == dns.RcodeSuccess && !isResponseCacheable(response.Res) {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
_, domainName := util.ExtractCacheKey(cacheKey)
|
||||
|
||||
_, logger := r.log(ctx)
|
||||
|
||||
respCopy := response.Res.Copy()
|
||||
|
||||
// don't cache any EDNS OPT records
|
||||
util.RemoveEdns0Record(respCopy)
|
||||
|
||||
packed, err := respCopy.Pack()
|
||||
util.LogOnError(ctx, "error on packing", err)
|
||||
if err != nil {
|
||||
logger.WithError(err).WithField("domain", domainName).Warn("cache prefetch failed")
|
||||
|
||||
if err == nil {
|
||||
if response.Res.Rcode == dns.RcodeSuccess && isResponseCacheable(response.Res) {
|
||||
// put value into cache
|
||||
r.resultCache.Put(cacheKey, &packed, ttl)
|
||||
} else if response.Res.Rcode == dns.RcodeNameError {
|
||||
if r.cfg.CacheTimeNegative.IsAboveZero() {
|
||||
// put negative cache if result code is NXDOMAIN
|
||||
r.resultCache.Put(cacheKey, &packed, r.cfg.CacheTimeNegative.ToDuration())
|
||||
}
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
ttl := time.Duration(0)
|
||||
|
||||
if response.Res.Rcode == dns.RcodeSuccess {
|
||||
ttl = r.adjustTTLs(response.Res.Answer)
|
||||
} else if response.Res.Rcode == dns.RcodeNameError {
|
||||
if r.cfg.CacheTimeNegative.IsAboveZero() {
|
||||
ttl = r.cfg.CacheTimeNegative.ToDuration()
|
||||
}
|
||||
}
|
||||
|
||||
if publish && r.redisClient != nil {
|
||||
res := *respCopy
|
||||
for _, rr := range res.Answer {
|
||||
rr.Header().Ttl = uint32(ttl.Seconds())
|
||||
}
|
||||
|
||||
r.redisClient.PublishCache(cacheKey, &res)
|
||||
}
|
||||
|
||||
return &packed, ttl
|
||||
}
|
||||
|
||||
func (r *CachingResolver) putInCache(
|
||||
ctx context.Context, cacheKey string, response *model.Response, publish bool,
|
||||
) {
|
||||
res, ttl := r.transformAndPublish(ctx, cacheKey, response, publish)
|
||||
if res != nil {
|
||||
r.resultCache.Put(cacheKey, res, ttl)
|
||||
}
|
||||
}
|
||||
|
||||
// adjustTTLs calculates and returns the min TTL (considers also the min and max cache time)
|
||||
|
|
Loading…
Reference in New Issue