Use diodes instead of channels in SSE broker

This commit is contained in:
Deluan 2021-02-01 17:41:17 -05:00 committed by Deluan Quintão
parent 591a5344ac
commit 905c685696
4 changed files with 55 additions and 30 deletions

1
go.mod
View File

@ -3,6 +3,7 @@ module github.com/deluan/navidrome
go 1.15
require (
code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee
github.com/ClickHouse/clickhouse-go v1.4.3 // indirect
github.com/Masterminds/squirrel v1.5.0
github.com/astaxie/beego v1.12.3

2
go.sum
View File

@ -12,6 +12,8 @@ cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee h1:iAAPf9s7/+BIiGf+RjgcXLm3NoZaLIJsBXJuUa63Lx8=
code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee/go.mod h1:Jzi+ccHgo/V/PLQUaQ6hnZcC1c4BS790gx21LRRui4g=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=

26
server/events/diode.go Normal file
View File

@ -0,0 +1,26 @@
package events
import (
"context"
"code.cloudfoundry.org/go-diodes"
)
type diode struct {
d *diodes.Poller
}
func newDiode(ctx context.Context, size int, alerter diodes.Alerter) *diode {
return &diode{
d: diodes.NewPoller(diodes.NewOneToOne(size, alerter), diodes.WithPollingContext(ctx)),
}
}
func (d *diode) set(data message) {
d.d.Set(diodes.GenericDataType(&data))
}
func (d *diode) next() *message {
data := d.d.Next()
return (*message)(data)
}

View File

@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"
"code.cloudfoundry.org/go-diodes"
"github.com/deluan/navidrome/consts"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model/request"
@ -44,7 +45,7 @@ type (
address string
username string
userAgent string
channel messageChan
diode *diode
}
)
@ -78,30 +79,30 @@ func NewBroker() Broker {
}
func (b *broker) SendMessage(evt Event) {
msg := b.preparePackage(evt)
msg := b.prepareMessage(evt)
log.Trace("Broker received new event", "event", msg)
b.publish <- msg
}
func (b *broker) newEventID() uint32 {
func (b *broker) nextEventID() uint32 {
return atomic.AddUint32(&eventId, 1)
}
func (b *broker) preparePackage(event Event) message {
pkg := message{}
pkg.ID = b.newEventID()
pkg.Event = event.EventName()
func (b *broker) prepareMessage(event Event) message {
msg := message{}
msg.ID = b.nextEventID()
msg.Event = event.EventName()
data, _ := json.Marshal(event)
pkg.Data = string(data)
return pkg
msg.Data = string(data)
return msg
}
// writeEvent Write to the ResponseWriter, Server Sent Events compatible
func writeEvent(w io.Writer, event message, timeout time.Duration) (n int, err error) {
func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) {
flusher, _ := w.(http.Flusher)
complete := make(chan struct{}, 1)
go func() {
n, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
_, err = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
complete <- struct{}{}
@ -110,7 +111,7 @@ func writeEvent(w io.Writer, event message, timeout time.Duration) (n int, err e
case <-complete:
return
case <-time.After(timeout):
return 0, errWriteTimeOut
return errWriteTimeOut
}
}
@ -138,15 +139,14 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Debug(ctx, "New broker client", "client", c.String())
for {
select {
case event := <-c.channel:
log.Trace(ctx, "Sending event to client", "event", event, "client", c.String())
_, err := writeEvent(w, event, writeTimeOut)
if err == errWriteTimeOut {
return
}
case <-ctx.Done():
log.Trace(ctx, "Client closed the connection", "client", c.String())
event := c.diode.next()
if event == nil {
log.Trace(ctx, "Client closed the EventStream connection", "client", c.String())
return
}
log.Trace(ctx, "Sending event to client", "event", *event, "client", c.String())
if err := writeEvent(w, *event, writeTimeOut); err == errWriteTimeOut {
log.Debug(ctx, "Timeout sending event to client", "event", *event, "client", c.String())
return
}
}
@ -159,8 +159,10 @@ func (b *broker) subscribe(r *http.Request) client {
username: user.UserName,
address: r.RemoteAddr,
userAgent: r.UserAgent(),
channel: make(messageChan, 5),
}
c.diode = newDiode(r.Context(), 1000, diodes.AlertFunc(func(missed int) {
log.Trace("Dropped SSE events", "client", c.String(), "missed", missed)
}))
// Signal the broker that we have a new client
b.subscribing <- c
@ -186,12 +188,11 @@ func (b *broker) listen() {
log.Debug("Client added to event broker", "numClients", len(clients), "newClient", c.String())
// Send a serverStart event to new client
c.channel <- b.preparePackage(&ServerStart{consts.ServerStart})
c.diode.set(b.prepareMessage(&ServerStart{consts.ServerStart}))
case c := <-b.unsubscribing:
// A client has detached and we want to
// stop sending them messages.
close(c.channel)
delete(clients, c)
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
@ -200,12 +201,7 @@ func (b *broker) listen() {
// Send event to all connected clients
for c := range clients {
log.Trace("Putting event on client's queue", "client", c.String(), "event", event)
// Use non-blocking send. If cannot send, ignore the message
select {
case c.channel <- event:
default:
log.Warn("Could not send event to client", "client", c.String(), "event", event)
}
c.diode.set(event)
}
case ts := <-keepAlive.C: