redis refactoring 2

This commit is contained in:
Kwitsch 2024-04-21 16:38:03 +00:00
parent 423ed031ea
commit 4928399387
3 changed files with 56 additions and 69 deletions

View File

@ -10,7 +10,6 @@ import (
"github.com/0xERR0R/blocky/config"
"github.com/0xERR0R/blocky/log"
"github.com/0xERR0R/blocky/model"
"github.com/0xERR0R/blocky/util"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
@ -28,11 +27,10 @@ const (
messageTypeEnable = 1
)
// sendBuffer message
type bufferMessage struct {
TTL uint32
Key string
Message []byte
type CacheEntry struct {
TTL uint32
Key string
Entry []byte
}
// redis pubsub message
@ -43,12 +41,6 @@ type redisMessage struct {
Client []byte `json:"c"`
}
// CacheChannel message
type CacheMessage struct {
Key string
Response *model.Response
}
type EnabledMessage struct {
State bool `json:"s"`
Duration time.Duration `json:"d,omitempty"`
@ -61,8 +53,8 @@ type Client struct {
client *redis.Client
l *logrus.Entry
id []byte
sendBuffer chan *bufferMessage
CacheChannel chan *CacheMessage
sendBuffer chan *CacheEntry
CacheChannel chan *CacheEntry
EnabledChannel chan *EnabledMessage
}
@ -111,8 +103,8 @@ func New(ctx context.Context, cfg *config.Redis) (*Client, error) {
client: rdb,
l: log.PrefixedLog("redis"),
id: id,
sendBuffer: make(chan *bufferMessage, chanCap),
CacheChannel: make(chan *CacheMessage, chanCap),
sendBuffer: make(chan *CacheEntry, chanCap),
CacheChannel: make(chan *CacheEntry, chanCap),
EnabledChannel: make(chan *EnabledMessage, chanCap),
}
@ -129,10 +121,10 @@ func New(ctx context.Context, cfg *config.Redis) (*Client, error) {
// PublishCache publish cache entry to redis if key and message are not empty and ttl > 0
func (c *Client) PublishCache(key string, ttl uint32, message []byte) {
if len(key) > 0 && len(message) > 0 && ttl > 0 {
c.sendBuffer <- &bufferMessage{
TTL: ttl,
Key: key,
Message: message,
c.sendBuffer <- &CacheEntry{
TTL: ttl,
Key: key,
Entry: message,
}
}
}
@ -212,11 +204,11 @@ func (c *Client) startup(ctx context.Context) error {
return err
}
func (c *Client) publishMessageFromBuffer(ctx context.Context, s *bufferMessage) {
func (c *Client) publishMessageFromBuffer(ctx context.Context, s *CacheEntry) {
psMsg, err := json.Marshal(redisMessage{
Key: s.Key,
Type: messageTypeCache,
Message: s.Message,
Message: s.Entry,
Client: c.id,
})
if err == nil {
@ -225,7 +217,7 @@ func (c *Client) publishMessageFromBuffer(ctx context.Context, s *bufferMessage)
c.client.Set(ctx,
prefixKey(s.Key),
s.Message,
s.Entry,
util.ToTTLDuration(s.TTL))
}
@ -242,7 +234,7 @@ func (c *Client) processReceivedMessage(ctx context.Context, msg *redis.Message)
if !bytes.Equal(rm.Client, c.id) {
switch rm.Type {
case messageTypeCache:
var cm *CacheMessage
var cm *CacheEntry
cm, err := convertMessage(&rm, 0)
if err != nil {
@ -269,55 +261,50 @@ func (c *Client) processReceivedMessage(ctx context.Context, msg *redis.Message)
}
// getResponse returns model.Response for a key
func (c *Client) getResponse(ctx context.Context, key string) (*CacheMessage, error) {
func (c *Client) getResponse(ctx context.Context, key string) (*CacheEntry, error) {
resp, err := c.client.Get(ctx, key).Result()
if err == nil {
var ttl time.Duration
ttl, err = c.client.TTL(ctx, key).Result()
if err == nil {
var result *CacheMessage
result, err = convertMessage(&redisMessage{
Key: cleanKey(key),
Message: []byte(resp),
}, ttl)
if err != nil {
return nil, fmt.Errorf("conversion error: %w", err)
}
return result, nil
}
if err != nil {
return nil, err
}
return nil, err
ttl, err := c.client.TTL(ctx, key).Result()
if err != nil {
return nil, err
}
result := CacheEntry{
TTL: util.ToTTL(ttl),
Key: cleanKey(key),
Entry: []byte(resp),
}
return &result, nil
}
// convertMessage converts redisMessage to CacheMessage
func convertMessage(message *redisMessage, ttl time.Duration) (*CacheMessage, error) {
msg := dns.Msg{}
err := msg.Unpack(message.Message)
if err == nil {
if ttl > 0 {
for _, a := range msg.Answer {
a.Header().Ttl = uint32(ttl.Seconds())
}
}
res := &CacheMessage{
Key: message.Key,
Response: &model.Response{
RType: model.ResponseTypeCACHED,
Reason: cacheReason,
Res: &msg,
},
}
return res, nil
func convertMessage(message *redisMessage, ttl time.Duration) (*CacheEntry, error) {
res := CacheEntry{
TTL: util.ToTTL(ttl),
Key: message.Key,
Entry: message.Message,
}
return nil, err
// if ttl is set, use it
if res.TTL > 0 {
return &res, nil
}
// try to extract ttl from message
var msg *dns.Msg
err := msg.Unpack(message.Message)
if err != nil {
return nil, err
}
res.TTL = util.GetAnswerMinTTL(msg)
return &res, nil
}
// prefixKey with CacheStorePrefix

View File

@ -196,7 +196,7 @@ var _ = Describe("Redis client", func() {
rec := redisServer.Publish(SyncChannelName, string(binMsg))
Expect(rec).Should(Equal(1))
Eventually(func() chan *CacheMessage {
Eventually(func() chan *CacheEntry {
return redisClient.CacheChannel
}).Should(HaveLen(lenE + 1))
}, SpecTimeout(time.Second*6))
@ -229,7 +229,7 @@ var _ = Describe("Redis client", func() {
return redisClient.EnabledChannel
}).Should(HaveLen(lenE))
Eventually(func() chan *CacheMessage {
Eventually(func() chan *CacheEntry {
return redisClient.CacheChannel
}).Should(HaveLen(lenC))
}, SpecTimeout(time.Second*6))
@ -262,7 +262,7 @@ var _ = Describe("Redis client", func() {
return redisClient.EnabledChannel
}).Should(HaveLen(lenE))
Eventually(func() chan *CacheMessage {
Eventually(func() chan *CacheEntry {
return redisClient.CacheChannel
}).Should(HaveLen(lenC))
}, SpecTimeout(time.Second*6))

View File

@ -146,7 +146,7 @@ func (r *CachingResolver) redisSubscriber(ctx context.Context) {
dlogger.Debug("received from redis")
// TODO: Add to cache
r.resultCache.Put(rc.Key, &rc.Entry, util.ToTTLDuration(rc.TTL))
}
case <-ctx.Done():