Fix file descriptor leak in SSE implementation.master

See https://github.com/deluan/navidrome/issues/446#issuecomment-736296465
This commit is contained in:
Deluan 2020-12-01 09:24:44 -05:00
parent 9414ce6549
commit a8c5fa6d49
1 changed files with 24 additions and 26 deletions

View File

@ -85,29 +85,29 @@ func (broker *broker) preparePackage(event Event) message {
return pkg return pkg
} }
func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) { func (broker *broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := req.Context() ctx := r.Context()
// Make sure that the writer supports flushing. // Make sure that the writer supports flushing.
flusher, ok := rw.(http.Flusher) flusher, ok := w.(http.Flusher)
user, _ := request.UserFrom(ctx) user, _ := request.UserFrom(ctx)
if !ok { if !ok {
log.Error(rw, "Streaming unsupported! Events cannot be sent to this client", "address", req.RemoteAddr, log.Error(w, "Streaming unsupported! Events cannot be sent to this client", "address", r.RemoteAddr,
"userAgent", req.UserAgent(), "user", user.UserName) "userAgent", r.UserAgent(), "user", user.UserName)
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError) http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return return
} }
rw.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache, no-transform") w.Header().Set("Cache-Control", "no-cache, no-transform")
rw.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry // Each connection registers its own message channel with the Broker's connections registry
client := client{ client := client{
username: user.UserName, username: user.UserName,
address: req.RemoteAddr, address: r.RemoteAddr,
userAgent: req.UserAgent(), userAgent: r.UserAgent(),
channel: make(messageChan), channel: make(messageChan),
} }
@ -122,22 +122,20 @@ func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
broker.closingClients <- client broker.closingClients <- client
}() }()
// Listen to client close and un-register messageChan
notify := ctx.Done()
go func() {
<-notify
broker.closingClients <- client
}()
for { for {
// Write to the ResponseWriter select {
// Server Sent Events compatible case event := <-client.channel:
event := <-client.channel // Write to the ResponseWriter
log.Trace(ctx, "Sending event to client", "event", event, "client", client.String()) // Server Sent Events compatible
_, _ = fmt.Fprintf(rw, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data) log.Trace(ctx, "Sending event to client", "event", event, "client", client.String())
_, _ = fmt.Fprintf(w, "id: %d\nevent: %s\ndata: %s\n\n", event.ID, event.Event, event.Data)
// Flush the data immediately instead of buffering it for later. // Flush the data immediately instead of buffering it for later.
flusher.Flush() flusher.Flush()
case <-ctx.Done():
log.Trace(ctx, "Closing event stream connection", "client", client.String())
return
}
} }
} }