Use non-blocking event sending

This commit is contained in:
Deluan 2020-12-12 13:35:49 -05:00
parent 500da8bc7b
commit e2969aa34c
1 changed files with 37 additions and 23 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model/request"
"github.com/google/uuid"
)
type Broker interface {
@ -30,6 +31,7 @@ type (
messageChan chan message
clientsChan chan client
client struct {
id string
address string
username string
userAgent string
@ -38,7 +40,7 @@ type (
)
func (c client) String() string {
return fmt.Sprintf("%s (%s - %s)", c.username, c.address, c.userAgent)
return fmt.Sprintf("%s (%s - %s - %s)", c.id, c.username, c.address, c.userAgent)
}
type broker struct {
@ -52,16 +54,16 @@ type broker struct {
closingClients clientsChan
// Client connections registry
clients map[client]bool
clients map[client]struct{}
}
func NewBroker() Broker {
// Instantiate a broker
broker := &broker{
notifier: make(messageChan, 100),
newClients: make(clientsChan),
closingClients: make(clientsChan),
clients: make(map[client]bool),
newClients: make(clientsChan, 1),
closingClients: make(clientsChan, 1),
clients: make(map[client]struct{}),
}
// Set it running - listening and broadcasting events
@ -104,24 +106,10 @@ func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
client := client{
username: user.UserName,
address: r.RemoteAddr,
userAgent: r.UserAgent(),
channel: make(messageChan),
}
// Signal the broker that we have a new client
broker.newClients <- client
client := broker.subscribe(r)
defer broker.unsubscribe(client)
log.Debug(ctx, "New broker client", "client", client.String())
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- client
}()
for {
select {
case event := <-client.channel:
@ -139,6 +127,26 @@ func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func (broker *broker) subscribe(r *http.Request) client {
user, _ := request.UserFrom(r.Context())
id, _ := uuid.NewRandom()
client := client{
id: id.String(),
username: user.UserName,
address: r.RemoteAddr,
userAgent: r.UserAgent(),
channel: make(messageChan, 5),
}
// Signal the broker that we have a new client
broker.newClients <- client
return client
}
func (broker *broker) unsubscribe(c client) {
broker.closingClients <- c
}
func (broker *broker) listen() {
keepAlive := time.NewTicker(keepAliveFrequency)
defer keepAlive.Stop()
@ -148,7 +156,7 @@ func (broker *broker) listen() {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
broker.clients[s] = struct{}{}
log.Debug("Client added to event broker", "numClients", len(broker.clients), "newClient", s.String())
// Send a serverStart event to new client
@ -157,6 +165,7 @@ func (broker *broker) listen() {
case s := <-broker.closingClients:
// A client has detached and we want to
// stop sending them messages.
close(s.channel)
delete(broker.clients, s)
log.Debug("Removed client from event broker", "numClients", len(broker.clients), "client", s.String())
@ -165,7 +174,12 @@ func (broker *broker) listen() {
// Send event to all connected clients
for client := range broker.clients {
log.Trace("Putting event on client's queue", "client", client.String(), "event", event)
client.channel <- event
// Use non-blocking send
select {
case client.channel <- event:
default:
log.Warn("Could not send message to client", "client", client.String(), "event", event)
}
}
case ts := <-keepAlive.C: