mirror of https://github.com/0xERR0R/blocky.git
redis refactoring 1
This commit is contained in:
parent
37fbb423b7
commit
350b2fe4de
|
@ -5,7 +5,6 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -31,8 +30,9 @@ const (
|
|||
|
||||
// sendBuffer message
|
||||
type bufferMessage struct {
|
||||
TTL uint32
|
||||
Key string
|
||||
Message *dns.Msg
|
||||
Message []byte
|
||||
}
|
||||
|
||||
// redis pubsub message
|
||||
|
@ -126,10 +126,11 @@ func New(ctx context.Context, cfg *config.Redis) (*Client, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// PublishCache publish cache to redis async
|
||||
func (c *Client) PublishCache(key string, message *dns.Msg) {
|
||||
if len(key) > 0 && message != nil {
|
||||
// PublishCache publish cache entry to redis if key and message are not empty and ttl > 0
|
||||
func (c *Client) PublishCache(key string, ttl uint32, message []byte) {
|
||||
if len(key) > 0 && len(message) > 0 && ttl > 0 {
|
||||
c.sendBuffer <- &bufferMessage{
|
||||
TTL: ttl,
|
||||
Key: key,
|
||||
Message: message,
|
||||
}
|
||||
|
@ -212,27 +213,20 @@ func (c *Client) startup(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (c *Client) publishMessageFromBuffer(ctx context.Context, s *bufferMessage) {
|
||||
origRes := s.Message
|
||||
origRes.Compress = true
|
||||
binRes, pErr := origRes.Pack()
|
||||
|
||||
if pErr == nil {
|
||||
binMsg, mErr := json.Marshal(redisMessage{
|
||||
Key: s.Key,
|
||||
Type: messageTypeCache,
|
||||
Message: binRes,
|
||||
Client: c.id,
|
||||
})
|
||||
|
||||
if mErr == nil {
|
||||
c.client.Publish(ctx, SyncChannelName, binMsg)
|
||||
}
|
||||
|
||||
c.client.Set(ctx,
|
||||
prefixKey(s.Key),
|
||||
binRes,
|
||||
c.getTTL(origRes))
|
||||
psMsg, err := json.Marshal(redisMessage{
|
||||
Key: s.Key,
|
||||
Type: messageTypeCache,
|
||||
Message: s.Message,
|
||||
Client: c.id,
|
||||
})
|
||||
if err == nil {
|
||||
c.client.Publish(ctx, SyncChannelName, psMsg)
|
||||
}
|
||||
|
||||
c.client.Set(ctx,
|
||||
prefixKey(s.Key),
|
||||
s.Message,
|
||||
util.ToTTLDuration(s.TTL))
|
||||
}
|
||||
|
||||
func (c *Client) processReceivedMessage(ctx context.Context, msg *redis.Message) {
|
||||
|
@ -326,20 +320,6 @@ func convertMessage(message *redisMessage, ttl time.Duration) (*CacheMessage, er
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// getTTL of dns message or return defaultCacheTime if 0
|
||||
func (c *Client) getTTL(dns *dns.Msg) time.Duration {
|
||||
ttl := uint32(math.MaxInt32)
|
||||
for _, a := range dns.Answer {
|
||||
ttl = min(ttl, a.Header().Ttl)
|
||||
}
|
||||
|
||||
if ttl == 0 {
|
||||
return defaultCacheTime
|
||||
}
|
||||
|
||||
return time.Duration(ttl) * time.Second
|
||||
}
|
||||
|
||||
// prefixKey with CacheStorePrefix
|
||||
func prefixKey(key string) string {
|
||||
return fmt.Sprintf("%s%s", CacheStorePrefix, key)
|
||||
|
|
|
@ -96,10 +96,12 @@ var _ = Describe("Redis client", func() {
|
|||
|
||||
By("publish new message with TTL > 0", func() {
|
||||
res, err := util.NewMsgWithAnswer("example.com.", 123, dns.Type(dns.TypeA), "123.124.122.123")
|
||||
|
||||
Expect(err).Should(Succeed())
|
||||
|
||||
redisClient.PublishCache("example.com", res)
|
||||
binRes, err := res.Pack()
|
||||
Expect(err).Should(Succeed())
|
||||
|
||||
redisClient.PublishCache("example.com", 123, binRes)
|
||||
})
|
||||
|
||||
By("Database has one entry with correct TTL", func() {
|
||||
|
@ -111,34 +113,6 @@ var _ = Describe("Redis client", func() {
|
|||
Expect(ttl.Seconds()).Should(BeNumerically("~", 123))
|
||||
})
|
||||
})
|
||||
|
||||
It("One new entry with default TTL should be persisted in the database", func(ctx context.Context) {
|
||||
redisClient, err = New(ctx, redisConfig)
|
||||
Expect(err).Should(Succeed())
|
||||
|
||||
By("Database is empty", func() {
|
||||
Eventually(func() []string {
|
||||
return redisServer.DB(redisConfig.Database).Keys()
|
||||
}).Should(BeEmpty())
|
||||
})
|
||||
|
||||
By("publish new message with TTL = 0", func() {
|
||||
res, err := util.NewMsgWithAnswer("example.com.", 0, dns.Type(dns.TypeA), "123.124.122.123")
|
||||
|
||||
Expect(err).Should(Succeed())
|
||||
|
||||
redisClient.PublishCache("example.com", res)
|
||||
})
|
||||
|
||||
By("Database has one entry with default TTL", func() {
|
||||
Eventually(func() bool {
|
||||
return redisServer.DB(redisConfig.Database).Exists(exampleComKey)
|
||||
}).Should(BeTrue())
|
||||
|
||||
ttl := redisServer.DB(redisConfig.Database).TTL(exampleComKey)
|
||||
Expect(ttl.Seconds()).Should(BeNumerically("~", defaultCacheTime.Seconds()))
|
||||
})
|
||||
})
|
||||
})
|
||||
When("Redis client publishes 'enabled' message", func() {
|
||||
It("should propagate the message over redis", func(ctx context.Context) {
|
||||
|
@ -312,13 +286,13 @@ var _ = Describe("Redis client", func() {
|
|||
})
|
||||
|
||||
By("Put valid data in Redis by publishing the cache entry", func() {
|
||||
var res *dns.Msg
|
||||
|
||||
res, err = util.NewMsgWithAnswer("example.com.", 123, dns.Type(dns.TypeA), "123.124.122.123")
|
||||
|
||||
res, err := util.NewMsgWithAnswer("example.com.", 123, dns.Type(dns.TypeA), "123.124.122.123")
|
||||
Expect(err).Should(Succeed())
|
||||
|
||||
redisClient.PublishCache("example.com", res)
|
||||
binRes, err := res.Pack()
|
||||
Expect(err).Should(Succeed())
|
||||
|
||||
redisClient.PublishCache("example.com", 123, binRes)
|
||||
})
|
||||
|
||||
By("Database has one entry now", func() {
|
||||
|
|
Loading…
Reference in New Issue