diff --git a/server/events/sse.go b/server/events/sse.go index 96429ad6..e91a63a7 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "net/http" - "sync/atomic" "time" "code.cloudfoundry.org/go-diodes" @@ -28,11 +27,6 @@ const ( writeTimeOut = 5 * time.Second ) -var ( - eventId uint32 - errWriteTimeOut = errors.New("write timeout") -) - type ( message struct { id uint32 @@ -74,7 +68,7 @@ func GetBroker() Broker { instance := singleton.Get(&broker{}, func() interface{} { // Instantiate a broker broker := &broker{ - publish: make(messageChan, 100), + publish: make(messageChan, 2), subscribing: make(clientsChan, 1), unsubscribing: make(clientsChan, 1), } @@ -101,6 +95,8 @@ func (b *broker) prepareMessage(ctx context.Context, event Event) message { return msg } +var errWriteTimeOut = errors.New("write timeout") + // writeEvent Write to the ResponseWriter, Server Sent Events compatible func writeEvent(w io.Writer, event message, timeout time.Duration) (err error) { flusher, _ := w.(http.Flusher) @@ -170,7 +166,7 @@ func (b *broker) subscribe(r *http.Request) client { clientUniqueId: clientUniqueId, } c.diode = newDiode(ctx, 1024, diodes.AlertFunc(func(missed int) { - log.Trace("Dropped SSE events", "client", c.String(), "missed", missed) + log.Debug("Dropped SSE events", "client", c.String(), "missed", missed) })) // Signal the broker that we have a new client @@ -201,6 +197,12 @@ func (b *broker) listen() { defer keepAlive.Stop() clients := map[client]struct{}{} + var eventId uint32 + + getNextEventId := func() uint32 { + eventId++ + return eventId + } for { select { @@ -222,7 +224,7 @@ func (b *broker) listen() { log.Debug("Removed client from event broker", "numClients", len(clients), "client", c.String()) case msg := <-b.publish: - msg.id = atomic.AddUint32(&eventId, 1) + msg.id = getNextEventId() log.Trace("Got new published event", "event", msg) // We got a new event from the outside! // Send event to all connected clients @@ -234,8 +236,16 @@ func (b *broker) listen() { } case ts := <-keepAlive.C: - // Send a keep alive message every 15 seconds - go b.SendMessage(context.Background(), &KeepAlive{TS: ts.Unix()}) + // Send a keep alive message every 15 seconds to all connected clients + if len(clients) == 0 { + continue + } + msg := b.prepareMessage(context.Background(), &KeepAlive{TS: ts.Unix()}) + msg.id = getNextEventId() + for c := range clients { + log.Trace("Putting a keepalive event on client's queue", "client", c.String(), "event", msg) + c.diode.put(msg) + } } } }