diff --git a/server/events/diode.go b/server/events/diode.go deleted file mode 100644 index 8ac5cd33..00000000 --- a/server/events/diode.go +++ /dev/null @@ -1,34 +0,0 @@ -package events - -import ( - "context" - - "code.cloudfoundry.org/go-diodes" -) - -type diode struct { - d *diodes.Waiter -} - -func newDiode(ctx context.Context, size int, alerter diodes.Alerter) *diode { - return &diode{ - d: diodes.NewWaiter(diodes.NewOneToOne(size, alerter), diodes.WithWaiterContext(ctx)), - } -} - -func (d *diode) put(data message) { - d.d.Set(diodes.GenericDataType(&data)) -} - -func (d *diode) tryNext() (*message, bool) { - data, ok := d.d.TryNext() - if !ok { - return nil, ok - } - return (*message)(data), true -} - -func (d *diode) next() *message { - data := d.d.Next() - return (*message)(data) -} diff --git a/server/events/diode_test.go b/server/events/diode_test.go deleted file mode 100644 index 50e3dd1a..00000000 --- a/server/events/diode_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package events - -import ( - "context" - - "code.cloudfoundry.org/go-diodes" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -var _ = Describe("diode", func() { - var diode *diode - var ctx context.Context - var ctxCancel context.CancelFunc - var missed int - - BeforeEach(func() { - missed = 0 - ctx, ctxCancel = context.WithCancel(context.Background()) - diode = newDiode(ctx, 2, diodes.AlertFunc(func(m int) { missed = m })) - }) - - It("enqueues the data correctly", func() { - diode.put(message{data: "1"}) - diode.put(message{data: "2"}) - Expect(diode.next()).To(Equal(&message{data: "1"})) - Expect(diode.next()).To(Equal(&message{data: "2"})) - Expect(missed).To(BeZero()) - }) - - It("drops messages when diode is full", func() { - diode.put(message{data: "1"}) - diode.put(message{data: "2"}) - diode.put(message{data: "3"}) - next, ok := diode.tryNext() - Expect(ok).To(BeTrue()) - Expect(next).To(Equal(&message{data: "3"})) - - _, ok = diode.tryNext() - Expect(ok).To(BeFalse()) - - Expect(missed).To(Equal(2)) - }) - - It("returns nil when diode is empty and the context is canceled", func() { - diode.put(message{data: "1"}) - ctxCancel() - Expect(diode.next()).To(Equal(&message{data: "1"})) - Expect(diode.next()).To(BeNil()) - }) -}) diff --git a/server/events/sse.go b/server/events/sse.go index 9b920033..f82ff117 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -1,4 +1,4 @@ -// Based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go +// Package events based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go package events import ( @@ -8,11 +8,11 @@ import ( "net/http" "time" - "code.cloudfoundry.org/go-diodes" "github.com/google/uuid" "github.com/navidrome/navidrome/consts" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model/request" + "github.com/navidrome/navidrome/utils/diodes" "github.com/navidrome/navidrome/utils/singleton" ) @@ -41,7 +41,7 @@ type ( username string userAgent string clientUniqueId string - diode *diode + diode *diodes.Diode[message] } ) @@ -150,7 +150,7 @@ func (b *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug(ctx, "New broker client", "client", c.String()) for { - event := c.diode.next() + event := c.diode.Next() if event == nil { log.Trace(ctx, "Client closed the EventStream connection", "client", c.String()) return @@ -174,7 +174,7 @@ func (b *broker) subscribe(r *http.Request) client { userAgent: r.UserAgent(), clientUniqueId: clientUniqueId, } - c.diode = newDiode(ctx, 1024, diodes.AlertFunc(func(missed int) { + c.diode = diodes.New[message](ctx, 1024, diodes.AlertFunc(func(missed int) { log.Debug("Dropped SSE events", "client", c.String(), "missed", missed) })) @@ -224,7 +224,7 @@ func (b *broker) listen() { // Send a serverStart event to new client msg := b.prepareMessage(context.Background(), &ServerStart{StartTime: consts.ServerStart, Version: consts.Version}) - c.diode.put(msg) + c.diode.Put(msg) case c := <-b.unsubscribing: // A client has detached, and we want to @@ -240,7 +240,7 @@ func (b *broker) listen() { for c := range clients { if b.shouldSend(msg, c) { log.Trace("Putting event on client's queue", "client", c.String(), "event", msg) - c.diode.put(msg) + c.diode.Put(msg) } } @@ -253,7 +253,7 @@ func (b *broker) listen() { msg.id = getNextEventId() for c := range clients { log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg) - c.diode.put(msg) + c.diode.Put(msg) } } } diff --git a/utils/diodes/diodes.go b/utils/diodes/diodes.go new file mode 100644 index 00000000..64e2e436 --- /dev/null +++ b/utils/diodes/diodes.go @@ -0,0 +1,38 @@ +package diodes + +import ( + "context" + + "code.cloudfoundry.org/go-diodes" +) + +type Diode[T any] struct { + d *diodes.Waiter +} + +type Alerter = diodes.Alerter + +type AlertFunc = diodes.AlertFunc + +func New[T any](ctx context.Context, size int, alerter Alerter) *Diode[T] { + return &Diode[T]{ + d: diodes.NewWaiter(diodes.NewOneToOne(size, alerter), diodes.WithWaiterContext(ctx)), + } +} + +func (d *Diode[T]) Put(data T) { + d.d.Set(diodes.GenericDataType(&data)) +} + +func (d *Diode[T]) TryNext() (*T, bool) { + data, ok := d.d.TryNext() + if !ok { + return nil, ok + } + return (*T)(data), true +} + +func (d *Diode[T]) Next() *T { + data := d.d.Next() + return (*T)(data) +} diff --git a/utils/diodes/diodes_test.go b/utils/diodes/diodes_test.go new file mode 100644 index 00000000..fda9746e --- /dev/null +++ b/utils/diodes/diodes_test.go @@ -0,0 +1,62 @@ +package diodes_test + +import ( + "context" + "testing" + + "github.com/navidrome/navidrome/tests" + . "github.com/navidrome/navidrome/utils/diodes" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestDiodes(t *testing.T) { + tests.Init(t, false) + RegisterFailHandler(Fail) + RunSpecs(t, "Diodes Suite") +} + +var _ = Describe("Diode", func() { + type message struct { + data string + } + var diode *Diode[message] + var ctx context.Context + var ctxCancel context.CancelFunc + var missed int + + BeforeEach(func() { + missed = 0 + ctx, ctxCancel = context.WithCancel(context.Background()) + diode = New[message](ctx, 2, AlertFunc(func(m int) { missed = m })) + }) + + It("enqueues the data correctly", func() { + diode.Put(message{data: "1"}) + diode.Put(message{data: "2"}) + Expect(diode.Next()).To(Equal(&message{data: "1"})) + Expect(diode.Next()).To(Equal(&message{data: "2"})) + Expect(missed).To(BeZero()) + }) + + It("drops messages when Diode is full", func() { + diode.Put(message{data: "1"}) + diode.Put(message{data: "2"}) + diode.Put(message{data: "3"}) + next, ok := diode.TryNext() + Expect(ok).To(BeTrue()) + Expect(next).To(Equal(&message{data: "3"})) + + _, ok = diode.TryNext() + Expect(ok).To(BeFalse()) + + Expect(missed).To(Equal(2)) + }) + + It("returns nil when Diode is empty and the context is canceled", func() { + diode.Put(message{data: "1"}) + ctxCancel() + Expect(diode.Next()).To(Equal(&message{data: "1"})) + Expect(diode.Next()).To(BeNil()) + }) +})