redis refactoring 2

This commit is contained in:
Kwitsch 2024-04-23 16:55:48 +00:00
parent 73cbd9c15e
commit 7b2fcc2953
2 changed files with 68 additions and 58 deletions

View File

@ -92,30 +92,34 @@ func New(ctx context.Context, cfg *config.Redis) (*Client, error) {
rdb := baseClient.WithContext(ctx)
_, err := rdb.Ping(ctx).Result()
if err == nil {
var id []byte
id, err = uuid.New().MarshalBinary()
if err == nil {
// construct client
res := &Client{
config: cfg,
client: rdb,
l: log.PrefixedLog("redis"),
id: id,
sendBuffer: make(chan *CacheEntry, chanCap),
CacheChannel: make(chan *CacheEntry, chanCap),
EnabledChannel: make(chan *EnabledMessage, chanCap),
}
// start channel handling go routine
err = res.startup(ctx)
return res, err
}
if err != nil {
return nil, err
}
return nil, err
var id []byte
id, err = uuid.New().MarshalBinary()
if err != nil {
return nil, err
}
// construct client
res := &Client{
config: cfg,
client: rdb,
l: log.PrefixedLog("redis"),
id: id,
sendBuffer: make(chan *CacheEntry, chanCap),
CacheChannel: make(chan *CacheEntry, chanCap),
EnabledChannel: make(chan *EnabledMessage, chanCap),
}
// start channel handling go routine
err = res.startup(ctx)
if err != nil {
return nil, err
}
return res, nil
}
// PublishCache publish cache entry to redis if key and message are not empty and ttl > 0
@ -130,18 +134,21 @@ func (c *Client) PublishCache(key string, ttl uint32, message []byte) {
}
func (c *Client) PublishEnabled(ctx context.Context, state *EnabledMessage) {
binState, sErr := json.Marshal(state)
if sErr == nil {
binMsg, mErr := json.Marshal(redisMessage{
Type: messageTypeEnable,
Message: binState,
Client: c.id,
})
if mErr == nil {
c.client.Publish(ctx, SyncChannelName, binMsg)
}
binState, err := json.Marshal(state)
if err != nil {
return
}
binMsg, err := json.Marshal(redisMessage{
Type: messageTypeEnable,
Message: binState,
Client: c.id,
})
if err != nil {
return
}
c.client.Publish(ctx, SyncChannelName, binMsg)
}
// GetRedisCache reads the redis cache and publish it to the channel
@ -176,32 +183,35 @@ func (c *Client) startup(ctx context.Context) error {
ps := c.client.Subscribe(ctx, SyncChannelName)
_, err := ps.Receive(ctx)
if err == nil {
go func() {
for {
select {
// received message from subscription
case msg := <-ps.Channel():
c.l.Debug("Received message: ", msg)
if msg != nil && len(msg.Payload) > 0 {
// message is not empty
c.processReceivedMessage(ctx, msg)
}
// publish message from buffer
case s := <-c.sendBuffer:
c.publishMessageFromBuffer(ctx, s)
// context is done
case <-ctx.Done():
c.client.Close()
return
}
}
}()
if err != nil {
return err
}
return err
go func() {
defer ps.Close()
defer c.client.Close()
for {
select {
// received message from subscription
case msg := <-ps.Channel():
c.l.Debug("Received message: ", msg)
if msg != nil && len(msg.Payload) > 0 {
// message is not empty
c.processReceivedMessage(ctx, msg)
}
// publish message from buffer
case s := <-c.sendBuffer:
c.publishMessageFromBuffer(ctx, s)
// context is done
case <-ctx.Done():
return
}
}
}()
return nil
}
// publishMessageFromBuffer publishes a message from the buffer to the redis channel and stores it in the cache

View File

@ -86,7 +86,7 @@ func SetAnswerMaxTTL[T TTLInput](msg *dns.Msg, max T) {
// SetAnswerMinMaxTTL sets the TTL of all answers in the message that are less than the specified minimum TTL
// to the minimum TTL and the TTL of all answers that are greater than the specified maximum TTL to the maximum TTL.
func SetAnswerMinMaxTTL[T TTLInput, TT TTLInput](msg *dns.Msg, min T, max TT) {
func SetAnswerMinMaxTTL[T, TT TTLInput](msg *dns.Msg, min T, max TT) {
minTTL := ToTTL(min)
maxTTL := ToTTL(max)