diff --git a/server/events/sse.go b/server/events/sse.go index 45c4b012..7e7aeefe 100644 --- a/server/events/sse.go +++ b/server/events/sse.go @@ -85,29 +85,29 @@ func (broker *broker) preparePackage(event Event) message { return pkg } -func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - ctx := req.Context() +func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() // Make sure that the writer supports flushing. - flusher, ok := rw.(http.Flusher) + flusher, ok := w.(http.Flusher) user, _ := request.UserFrom(ctx) if !ok { - log.Error(rw, "Streaming unsupported! Events cannot be sent to this client", "address", req.RemoteAddr, - "userAgent", req.UserAgent(), "user", user.UserName) - http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError) + log.Error(w, "Streaming unsupported! Events cannot be sent to this client", "address", r.RemoteAddr, + "userAgent", r.UserAgent(), "user", user.UserName) + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) return } - rw.Header().Set("Content-Type", "text/event-stream") - rw.Header().Set("Cache-Control", "no-cache, no-transform") - rw.Header().Set("Connection", "keep-alive") - rw.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache, no-transform") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") // Each connection registers its own message channel with the Broker's connections registry client := client{ username: user.UserName, - address: req.RemoteAddr, - userAgent: req.UserAgent(), + address: r.RemoteAddr, + userAgent: r.UserAgent(), channel: make(messageChan), } @@ -122,22 +122,20 @@ func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { broker.closingClients <- client }() - // Listen to client close and un-register messageChan - notify := ctx.Done() - go func() { - <-notify - broker.closingClients <- client - }() - for { - // Write to the ResponseWriter - // Server Sent Events compatible - event := <-client.channel - log.Trace(ctx, "Sending event to client", "event", event, "client", client.String()) - _, _ = fmt.Fprintf(rw, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) + select { + case event := <-client.channel: + // Write to the ResponseWriter + // Server Sent Events compatible + log.Trace(ctx, "Sending event to client", "event", event, "client", client.String()) + _, _ = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) - // Flush the data immediately instead of buffering it for later. - flusher.Flush() + // Flush the data immediately instead of buffering it for later. + flusher.Flush() + case <-ctx.Done(): + log.Trace(ctx, "Closing event stream connection", "client", client.String()) + return + } } }