Don't rely on goroutines to send keepalive events

This commit is contained in:
Deluan 2021-07-01 13:22:52 -04:00
parent 452c8dc44b
commit ed286c7103
1 changed files with 21 additions and 11 deletions

View File

@ -7,7 +7,6 @@ import (
"fmt"
"io"
"net/http"
"sync/atomic"
"time"
"code.cloudfoundry.org/go-diodes"
@ -28,11 +27,6 @@ const (
writeTimeOut = 5 * time.Second
)
var (
eventId uint32
errWriteTimeOut = errors.New("write timeout")
)
type (
message struct {
id uint32
@ -74,7 +68,7 @@ func GetBroker() Broker {
instance := singleton.Get(&broker{}, func() interface{} {
// Instantiate a broker
broker := &broker{
publish: make(messageChan, 100),
publish: make(messageChan, 2),
subscribing: make(clientsChan, 1),
unsubscribing: make(clientsChan, 1),
}
@ -101,6 +95,8 @@ func (b *broker) prepareMessage(ctx context.Context, event Event) message {
return msg
}
var errWriteTimeOut = errors.New("write timeout")
// writeEvent Write to the ResponseWriter, Server Sent Events compatible
func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) {
flusher, _ := w.(http.Flusher)
@ -170,7 +166,7 @@ func (b *broker) subscribe(r *http.Request) client {
clientUniqueId: clientUniqueId,
}
c.diode = newDiode(ctx, 1024, diodes.AlertFunc(func(missed int) {
log.Trace("Dropped SSE events", "client", c.String(), "missed", missed)
log.Debug("Dropped SSE events", "client", c.String(), "missed", missed)
}))
// Signal the broker that we have a new client
@ -201,6 +197,12 @@ func (b *broker) listen() {
defer keepAlive.Stop()
clients := map[client]struct{}{}
var eventId uint32
getNextEventId := func() uint32 {
eventId++
return eventId
}
for {
select {
@ -222,7 +224,7 @@ func (b *broker) listen() {
log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String())
case msg := <-b.publish:
msg.id = atomic.AddUint32(&eventId, 1)
msg.id = getNextEventId()
log.Trace("Got new published event", "event", msg)
// We got a new event from the outside!
// Send event to all connected clients
@ -234,8 +236,16 @@ func (b *broker) listen() {
}
case ts := <-keepAlive.C:
// Send a keep alive message every 15 seconds
go b.SendMessage(context.Background(), &KeepAlive{TS: ts.Unix()})
// Send a keep alive message every 15 seconds to all connected clients
if len(clients) == 0 {
continue
}
msg := b.prepareMessage(context.Background(), &KeepAlive{TS: ts.Unix()})
msg.id = getNextEventId()
for c := range clients {
log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg)
c.diode.put(msg)
}
}
}
}