Adding a communication channel between server and clients using SSE

This commit is contained in:
Deluan 2020-11-08 00:06:48 -05:00
parent 3fc81638c7
commit 2b1a5f579a
15 changed files with 395 additions and 25 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/deluan/navidrome/scanner"
"github.com/deluan/navidrome/server"
"github.com/deluan/navidrome/server/app"
"github.com/deluan/navidrome/server/events"
"github.com/deluan/navidrome/server/subsonic"
"github.com/google/wire"
"sync"
@ -27,7 +28,8 @@ func CreateServer(musicFolder string) *server.Server {
func CreateAppRouter() *app.Router {
dataStore := persistence.New()
router := app.New(dataStore)
broker := GetBroker()
router := app.New(dataStore, broker)
return router
}
@ -53,10 +55,16 @@ func createScanner() scanner.Scanner {
artworkCache := core.GetImageCache()
artwork := core.NewArtwork(dataStore, artworkCache)
cacheWarmer := core.NewCacheWarmer(artwork, artworkCache)
scannerScanner := scanner.New(dataStore, cacheWarmer)
broker := GetBroker()
scannerScanner := scanner.New(dataStore, cacheWarmer, broker)
return scannerScanner
}
func createBroker() events.Broker {
broker := events.NewBroker()
return broker
}
// wire_injectors.go:
var allProviders = wire.NewSet(core.Set, subsonic.New, app.New, persistence.New)
@ -73,3 +81,16 @@ func GetScanner() scanner.Scanner {
})
return scannerInstance
}
// Broker must be a Singleton
var (
onceBroker sync.Once
brokerInstance events.Broker
)
func GetBroker() events.Broker {
onceBroker.Do(func() {
brokerInstance = createBroker()
})
return brokerInstance
}

View File

@ -3,14 +3,16 @@
package cmd
import (
"sync"
"github.com/deluan/navidrome/core"
"github.com/deluan/navidrome/persistence"
"github.com/deluan/navidrome/scanner"
"github.com/deluan/navidrome/server"
"github.com/deluan/navidrome/server/app"
"github.com/deluan/navidrome/server/events"
"github.com/deluan/navidrome/server/subsonic"
"github.com/google/wire"
"sync"
)
var allProviders = wire.NewSet(
@ -28,7 +30,10 @@ func CreateServer(musicFolder string) *server.Server {
}
func CreateAppRouter() *app.Router {
panic(wire.Build(allProviders))
panic(wire.Build(
allProviders,
GetBroker,
))
}
func CreateSubsonicAPIRouter() *subsonic.Router {
@ -54,6 +59,26 @@ func GetScanner() scanner.Scanner {
func createScanner() scanner.Scanner {
panic(wire.Build(
allProviders,
GetBroker,
scanner.New,
))
}
// Broker must be a Singleton
var (
onceBroker sync.Once
brokerInstance events.Broker
)
func GetBroker() events.Broker {
onceBroker.Do(func() {
brokerInstance = createBroker()
})
return brokerInstance
}
func createBroker() events.Broker {
panic(wire.Build(
events.NewBroker,
))
}

View File

@ -12,6 +12,7 @@ import (
"github.com/deluan/navidrome/core"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model"
"github.com/deluan/navidrome/server/events"
"github.com/deluan/navidrome/utils"
)
@ -47,6 +48,7 @@ type scanner struct {
lock *sync.RWMutex
ds model.DataStore
cacheWarmer core.CacheWarmer
broker events.Broker
done chan bool
scan chan bool
}
@ -57,10 +59,11 @@ type scanStatus struct {
lastUpdate time.Time
}
func New(ds model.DataStore, cacheWarmer core.CacheWarmer) Scanner {
func New(ds model.DataStore, cacheWarmer core.CacheWarmer, broker events.Broker) Scanner {
s := &scanner{
ds: ds,
cacheWarmer: cacheWarmer,
broker: broker,
folders: map[string]FolderScanner{},
status: map[string]*scanStatus{},
lock: &sync.RWMutex{},
@ -107,14 +110,21 @@ func (s *scanner) rescan(mediaFolder string, fullRescan bool) error {
log.Debug("Scanning folder (full scan)", "folder", mediaFolder)
}
progress := make(chan uint32)
progress := make(chan uint32, 100)
go func() {
defer func() {
s.broker.SendMessage(&events.ScanStatus{Scanning: false, Count: int64(s.status[mediaFolder].count)})
}()
for {
count, more := <-progress
if !more {
break
}
atomic.AddUint32(&s.status[mediaFolder].count, count)
if count == 0 {
continue
}
total := atomic.AddUint32(&s.status[mediaFolder].count, count)
s.broker.SendMessage(&events.ScanStatus{Scanning: true, Count: int64(total)})
}
}()

View File

@ -11,6 +11,7 @@ import (
"github.com/deluan/navidrome/core/auth"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model"
"github.com/deluan/navidrome/server/events"
"github.com/deluan/rest"
"github.com/go-chi/chi"
"github.com/go-chi/httprate"
@ -18,12 +19,13 @@ import (
)
type Router struct {
ds model.DataStore
mux http.Handler
ds model.DataStore
mux http.Handler
broker events.Broker
}
func New(ds model.DataStore) *Router {
return &Router{ds: ds}
func New(ds model.DataStore, broker events.Broker) *Router {
return &Router{ds: ds, broker: broker}
}
func (app *Router) Setup(path string) {
@ -68,6 +70,8 @@ func (app *Router) routes(path string) http.Handler {
// Keepalive endpoint to be used to keep the session valid (ex: while playing songs)
r.Get("/keepalive/*", func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(`{"response":"ok"}`)) })
r.Handle("/events", app.broker)
})
// Serve UI app assets

18
server/events/events.go Normal file
View File

@ -0,0 +1,18 @@
package events
type Event interface {
EventName() string
}
type ScanStatus struct {
Scanning bool `json:"scanning"`
Count int64 `json:"count"`
}
func (s ScanStatus) EventName() string { return "scanStatus" }
type KeepAlive struct {
TS int64 `json:"ts"`
}
func (s KeepAlive) EventName() string { return "keepAlive" }

134
server/events/sse.go Normal file
View File

@ -0,0 +1,134 @@
// Based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go
package events
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/deluan/navidrome/log"
)
type Broker interface {
http.Handler
SendMessage(event Event)
}
type broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
func NewBroker() Broker {
// Instantiate a broker
broker := &broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return broker
}
func (broker *broker) SendMessage(event Event) {
pkg := struct {
Event `json:"data"`
Name string `json:"name"`
}{}
pkg.Name = event.EventName()
pkg.Event = event
data, _ := json.Marshal(pkg)
broker.Notifier <- data
}
func (broker *broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache, no-transform")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
// notify := rw.(http.CloseNotifier).CloseNotify()
notify := req.Context().Done()
go func() {
<-notify
broker.closingClients <- messageChan
}()
for {
// Write to the ResponseWriter
// Server Sent Events compatible
_, _ = fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
}
}
func (broker *broker) listen() {
keepAlive := time.NewTicker(15 * time.Second)
defer keepAlive.Stop()
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Debug("Client added", "numClients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Debug("Removed client", "numClients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
clientMessageChan <- event
}
case ts := <-keepAlive.C:
// Send a keep alive packet every 15 seconds
broker.SendMessage(&KeepAlive{TS: ts.Unix()})
}
}
}

View File

@ -1,9 +1,9 @@
import React from 'react'
import ReactGA from 'react-ga'
import 'react-jinke-music-player/assets/index.css'
import { Provider } from 'react-redux'
import { Provider, useDispatch } from 'react-redux'
import { createHashHistory } from 'history'
import { Admin, Resource } from 'react-admin'
import { Admin as RAAdmin, Resource } from 'react-admin'
import dataProvider from './dataProvider'
import authProvider from './authProvider'
import { Layout, Login, Logout } from './layout'
@ -21,10 +21,13 @@ import {
addToPlaylistDialogReducer,
playQueueReducer,
albumViewReducer,
activityReducer,
} from './reducers'
import createAdminStore from './store/createAdminStore'
import { i18nProvider } from './i18n'
import config from './config'
import { startEventStream } from './eventStream'
import { updateScanStatus } from './actions'
const history = createHashHistory()
if (config.gaTrackingId) {
@ -46,10 +49,20 @@ const App = () => (
albumView: albumViewReducer,
theme: themeReducer,
addToPlaylistDialog: addToPlaylistDialogReducer,
activity: activityReducer,
},
})}
>
<Admin
<Admin />
</Provider>
)
const Admin = (props) => {
const dispatch = useDispatch()
startEventStream((data) => dispatch(updateScanStatus(data)))
return (
<RAAdmin
dataProvider={dataProvider}
authProvider={authProvider}
i18nProvider={i18nProvider}
@ -58,6 +71,7 @@ const App = () => (
layout={Layout}
loginPage={Login}
logoutButton={Logout}
{...props}
>
{(permissions) => [
<Resource name="album" {...album} options={{ subMenu: 'albumList' }} />,
@ -91,8 +105,8 @@ const App = () => (
<Player />,
]}
</Admin>
</Provider>
)
</RAAdmin>
)
}
export default App

View File

@ -0,0 +1,12 @@
export const ACTIVITY_SCAN_STATUS_UPD = 'ACTIVITY_SCAN_STATUS_UPD'
const actionsMap = { scanStatus: ACTIVITY_SCAN_STATUS_UPD }
export const updateScanStatus = (data) => {
let type = actionsMap[data.name]
if (!type) type = 'UNKNOWN'
return {
type,
data: data.data,
}
}

View File

@ -2,3 +2,4 @@ export * from './audioplayer'
export * from './themes'
export * from './albumView'
export * from './dialogs'
export * from './activity'

30
ui/src/eventStream.js Normal file
View File

@ -0,0 +1,30 @@
import baseUrl from './utils/baseUrl'
import throttle from 'lodash.throttle'
// TODO https://stackoverflow.com/a/20060461
let es = null
let dispatchFunc = null
const getEventStream = () => {
if (es === null) {
es = new EventSource(
baseUrl(`/app/api/events?jwt=${localStorage.getItem('token')}`)
)
}
return es
}
export const startEventStream = (func) => {
const es = getEventStream()
dispatchFunc = func
es.onmessage = throttle(
(msg) => {
const data = JSON.parse(msg.data)
if (data.name !== 'keepAlive') {
dispatchFunc(data)
}
},
100,
{ trailing: true }
)
}

View File

@ -0,0 +1,80 @@
import React, { useState } from 'react'
import { useSelector } from 'react-redux'
import {
Menu,
Badge,
CircularProgress,
IconButton,
makeStyles,
Tooltip,
MenuItem,
} from '@material-ui/core'
import { FiActivity } from 'react-icons/fi'
import subsonic from '../subsonic'
const useStyles = makeStyles((theme) => ({
wrapper: {
position: 'relative',
},
progress: {
position: 'absolute',
top: -1,
left: 0,
zIndex: 1,
},
button: {
zIndex: 2,
},
}))
const ActivityMenu = () => {
const classes = useStyles()
const [anchorEl, setAnchorEl] = useState(null)
const scanStatus = useSelector((state) => state.activity.scanStatus)
const open = Boolean(anchorEl)
const handleMenu = (event) => setAnchorEl(event.currentTarget)
const handleClose = () => setAnchorEl(null)
const startScan = () => fetch(subsonic.url('startScan', null))
return (
<div className={classes.wrapper}>
<Tooltip title={'Activity'}>
<IconButton className={classes.button} onClick={handleMenu}>
<Badge badgeContent={null} color="secondary">
<FiActivity size={'20'} />
</Badge>
</IconButton>
</Tooltip>
{scanStatus.scanning && (
<CircularProgress size={46} className={classes.progress} />
)}
<Menu
id="menu-activity"
anchorEl={anchorEl}
anchorOrigin={{
vertical: 'top',
horizontal: 'right',
}}
transformOrigin={{
vertical: 'top',
horizontal: 'right',
}}
open={open}
onClose={handleClose}
>
<MenuItem
className={classes.root}
activeClassName={classes.active}
onClick={startScan}
sidebarIsOpen={true}
>
{`Scanned: ${scanStatus.count}`}
</MenuItem>
</Menu>
</div>
)
}
export default ActivityMenu

View File

@ -12,6 +12,7 @@ import ViewListIcon from '@material-ui/icons/ViewList'
import InfoIcon from '@material-ui/icons/Info'
import AboutDialog from './AboutDialog'
import PersonalMenu from './PersonalMenu'
import ActivityMenu from './ActivityMenu'
const useStyles = makeStyles((theme) => ({
root: {
@ -85,13 +86,16 @@ const CustomUserMenu = ({ onClick, ...rest }) => {
}
return (
<UserMenu {...rest}>
<PersonalMenu sidebarIsOpen={true} onClick={onClick} />
<hr />
{resources.filter(settingsResources).map(renderSettingsMenuItemLink)}
<hr />
<AboutMenuItem />
</UserMenu>
<>
<ActivityMenu />
<UserMenu {...rest}>
<PersonalMenu sidebarIsOpen={true} onClick={onClick} />
<hr />
{resources.filter(settingsResources).map(renderSettingsMenuItemLink)}
<hr />
<AboutMenuItem />
</UserMenu>
</>
)
}

View File

@ -0,0 +1,16 @@
import { ACTIVITY_SCAN_STATUS_UPD } from '../actions'
export const activityReducer = (
previousState = {
scanStatus: { scanning: false, count: 0 },
},
payload
) => {
const { type, data } = payload
switch (type) {
case ACTIVITY_SCAN_STATUS_UPD:
return { ...previousState, scanStatus: data }
default:
return previousState
}
}

View File

@ -2,3 +2,4 @@ export * from './themeReducer'
export * from './dialogReducer'
export * from './playQueue'
export * from './albumView'
export * from './activityReducer'

View File

@ -9,7 +9,7 @@ const url = (command, id, options) => {
params.append('f', 'json')
params.append('v', '1.8.0')
params.append('c', 'NavidromeUI')
params.append('id', id)
id && params.append('id', id)
if (options) {
if (options.ts) {
options['_'] = new Date().getTime()