From 5bd33455a12131c3e1aee6395e22b240bf7436da Mon Sep 17 00:00:00 2001 From: Deluan Date: Thu, 1 Jul 2021 10:42:00 -0400 Subject: [PATCH] Fix deadlock situation when events are sent too fast to the broker --- scanner/scanner.go | 2 -- server/events/sse.go | 15 ++++++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/scanner/scanner.go b/scanner/scanner.go index 388b3282..1d15359b 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -48,7 +48,6 @@ type scanner struct { ds model.DataStore cacheWarmer core.CacheWarmer broker events.Broker - scan chan bool } type scanStatus struct { @@ -66,7 +65,6 @@ func New(ds model.DataStore, cacheWarmer core.CacheWarmer, broker events.Broker) folders: map[string]FolderScanner{}, status: map[string]*scanStatus{}, lock: &sync.RWMutex{}, - scan: make(chan bool), } s.loadFolders() return s diff --git a/server/events/sse.go b/server/events/sse.go index dc09c2ef..30c5d404 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -10,13 +10,12 @@ import ( "sync/atomic" "time" - "github.com/navidrome/navidrome/utils/singleton" - "code.cloudfoundry.org/go-diodes" "github.com/google/uuid" "github.com/navidrome/navidrome/consts" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model/request" + "github.com/navidrome/navidrome/utils/singleton" ) type Broker interface { @@ -89,17 +88,17 @@ func GetBroker() Broker { } func (b *broker) SendMessage(ctx context.Context, evt Event) { - msg := b.prepareMessage(evt) - msg.senderCtx = ctx + msg := b.prepareMessage(ctx, evt) log.Trace("Broker received new event", "event", msg) b.publish <- msg } -func (b *broker) prepareMessage(event Event) message { +func (b *broker) prepareMessage(ctx context.Context, event Event) message { msg := message{} msg.id = atomic.AddUint32(&eventId, 1) msg.data = event.Data(event) msg.event = event.Name(event) + msg.senderCtx = ctx return msg } @@ -213,7 +212,8 @@ func (b *broker) listen() { log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String()) // Send a serverStart event to new client - c.diode.put(b.prepareMessage(&ServerStart{StartTime: consts.ServerStart, Version: consts.Version()})) + c.diode.put(b.prepareMessage(context.Background(), + &ServerStart{StartTime: consts.ServerStart, Version: consts.Version()})) case c := <-b.unsubscribing: // A client has detached and we want to @@ -222,6 +222,7 @@ func (b *broker) listen() { log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String()) case event := <-b.publish: + log.Trace("Got new published event", "event", event) // We got a new event from the outside! // Send event to all connected clients for c := range clients { @@ -233,7 +234,7 @@ func (b *broker) listen() { case ts := <-keepAlive.C: // Send a keep alive message every 15 seconds - b.SendMessage(context.Background(), &KeepAlive{TS: ts.Unix()}) + go b.SendMessage(context.Background(), &KeepAlive{TS: ts.Unix()}) } } }