Add simple cache warmer, disabled by default

This commit is contained in:
Deluan 2020-10-25 12:00:21 -04:00
parent f3bb51f01b
commit 1e56f4da76
10 changed files with 267 additions and 28 deletions

View File

@ -27,7 +27,10 @@ func CreateServer(musicFolder string) *server.Server {
func CreateScanner(musicFolder string) scanner.Scanner {
dataStore := persistence.New()
scannerScanner := scanner.New(dataStore)
artworkCache := core.NewImageCache()
artwork := core.NewArtwork(dataStore, artworkCache)
cacheWarmer := core.NewCacheWarmer(artworkCache, artwork)
scannerScanner := scanner.New(dataStore, cacheWarmer)
return scannerScanner
}

View File

@ -48,6 +48,7 @@ type configOptions struct {
// DevFlags. These are used to enable/disable debugging and incomplete features
DevLogSourceLine bool
DevAutoCreateAdminPassword string
DevPreCacheAlbumArtwork bool
}
type scannerOptions struct {
@ -132,7 +133,7 @@ func init() {
// DevFlags. These are used to enable/disable debugging and incomplete features
viper.SetDefault("devlogsourceline", false)
viper.SetDefault("devautocreateadminpassword", "")
viper.SetDefault("devoldscanner", false)
viper.SetDefault("devprecachealbumartwork", false)
}
func InitConfig(cfgFile string) {

72
core/cache_warmer.go Normal file
View File

@ -0,0 +1,72 @@
package core
import (
"context"
"io/ioutil"
"github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/core/pool"
"github.com/deluan/navidrome/log"
)
type CacheWarmer interface {
AddAlbum(ctx context.Context, albumID string)
Flush(ctx context.Context)
}
func NewCacheWarmer(cache ArtworkCache, artwork Artwork) CacheWarmer {
w := &warmer{
artwork: artwork,
cache: cache,
albums: map[string]struct{}{},
}
p, err := pool.NewPool("artwork", 3, &artworkItem{}, w.execute)
if err != nil {
log.Error(context.Background(), "Error creating pool for Album Artwork Cache Warmer", err)
} else {
w.pool = p
}
return w
}
type warmer struct {
pool *pool.Pool
artwork Artwork
cache ArtworkCache
albums map[string]struct{}
}
func (w *warmer) AddAlbum(ctx context.Context, albumID string) {
if albumID == "" {
return
}
w.albums[albumID] = struct{}{}
}
func (w *warmer) Flush(ctx context.Context) {
if conf.Server.DevPreCacheAlbumArtwork {
if w.pool == nil || len(w.albums) == 0 {
return
}
log.Info(ctx, "Pre-caching album artworks", "numAlbums", len(w.albums))
for id := range w.albums {
w.pool.Submit(artworkItem{albumID: id})
}
}
w.albums = map[string]struct{}{}
}
func (w *warmer) execute(workload interface{}) {
ctx := context.Background()
item := workload.(artworkItem)
log.Trace(ctx, "Pre-caching album artwork", "albumID", item.albumID)
err := w.artwork.Get(ctx, item.albumID, 0, ioutil.Discard)
if err != nil {
log.Warn("Error pre-caching artwork from album", "id", item.albumID, err)
}
}
type artworkItem struct {
albumID string
}

99
core/pool/pool.go Normal file
View File

@ -0,0 +1,99 @@
package pool
type Executor func(workload interface{})
type Pool struct {
name string
item interface{}
workers []worker
exec Executor
//queue *dque.DQue
queue chan work // receives jobs to send to workers
end chan bool // when receives bool stops workers
}
func NewPool(name string, workerCount int, item interface{}, exec Executor) (*Pool, error) {
p := &Pool{
name: name,
item: item,
exec: exec,
queue: make(chan work),
end: make(chan bool),
}
//q, err := dque.NewOrOpen(name, filepath.Join(conf.Server.DataFolder, "queues", name), 50, p.itemBuilder)
//if err != nil {
// return nil, err
//}
//p.queue = q
for i := 0; i < workerCount; i++ {
worker := worker{
p: p,
id: i,
channel: make(chan work),
workerChannel: workerChannel,
end: make(chan bool)}
worker.Start()
p.workers = append(p.workers, worker)
}
// start pool
go func() {
for {
select {
case <-p.end:
for _, w := range p.workers {
w.Stop() // stop worker
}
return
case work := <-p.queue:
worker := <-workerChannel // wait for available channel
worker <- work // dispatch work to worker
}
}
}()
return p, nil
}
func (p *Pool) Submit(workload interface{}) {
p.queue <- work{workload}
}
//func (p *Pool) itemBuilder() interface{} {
// t := reflect.TypeOf(p.item)
// return reflect.New(t).Interface()
//}
//
var workerChannel = make(chan chan work)
type work struct {
workload interface{}
}
type worker struct {
id int
p *Pool
workerChannel chan chan work // used to communicate between dispatcher and workers
channel chan work
end chan bool
}
// start worker
func (w *worker) Start() {
go func() {
for {
w.workerChannel <- w.channel // when the worker is available place channel in queue
select {
case job := <-w.channel: // worker has received job
w.p.exec(job.workload) // do work
case <-w.end:
return
}
}
}()
}
// end worker
func (w *worker) Stop() {
w.end <- true
}

21
core/pool/pool_test.go Normal file
View File

@ -0,0 +1,21 @@
package pool
import (
"testing"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/tests"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestCore(t *testing.T) {
tests.Init(t, false)
log.SetLevel(log.LevelCritical)
RegisterFailHandler(Fail)
RunSpecs(t, "Core Suite")
}
var _ = Describe("Pool", func() {
})

View File

@ -17,6 +17,7 @@ var Set = wire.NewSet(
NewImageCache,
NewArchiver,
NewExternalInfo,
NewCacheWarmer,
LastFMNewClient,
SpotifyNewClient,
transcoder.New,

View File

@ -0,0 +1,25 @@
package migration
import (
"database/sql"
"os"
"path/filepath"
"github.com/deluan/navidrome/conf"
"github.com/pressly/goose"
)
func init() {
goose.AddMigration(Up20201025222059, Down20201025222059)
}
func Up20201025222059(tx *sql.Tx) error {
cacheFolder := filepath.Join(conf.Server.DataFolder, "cache")
notice(tx, "Purging all cache entries, as the format of the cache changed.")
return os.RemoveAll(cacheFolder)
}
func Down20201025222059(tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}

View File

@ -25,8 +25,12 @@ func newRefreshBuffer(ctx context.Context, ds model.DataStore) *refreshBuffer {
}
func (f *refreshBuffer) accumulate(mf model.MediaFile) {
f.album[mf.AlbumID] = struct{}{}
f.artist[mf.AlbumArtistID] = struct{}{}
if mf.AlbumID != "" {
f.album[mf.AlbumID] = struct{}{}
}
if mf.AlbumArtistID != "" {
f.artist[mf.AlbumArtistID] = struct{}{}
}
}
type refreshCallbackFunc = func(ids ...string) error

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/deluan/navidrome/core"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model"
)
@ -32,12 +33,13 @@ type FolderScanner interface {
}
type scanner struct {
folders map[string]FolderScanner
status map[string]*scanStatus
lock *sync.RWMutex
ds model.DataStore
done chan bool
scan chan bool
folders map[string]FolderScanner
status map[string]*scanStatus
lock *sync.RWMutex
ds model.DataStore
cacheWarmer core.CacheWarmer
done chan bool
scan chan bool
}
type scanStatus struct {
@ -46,14 +48,15 @@ type scanStatus struct {
lastUpdate time.Time
}
func New(ds model.DataStore) Scanner {
func New(ds model.DataStore, cacheWarmer core.CacheWarmer) Scanner {
s := &scanner{
ds: ds,
folders: map[string]FolderScanner{},
status: map[string]*scanStatus{},
lock: &sync.RWMutex{},
done: make(chan bool),
scan: make(chan bool),
ds: ds,
cacheWarmer: cacheWarmer,
folders: map[string]FolderScanner{},
status: map[string]*scanStatus{},
lock: &sync.RWMutex{},
done: make(chan bool),
scan: make(chan bool),
}
s.loadFolders()
return s
@ -213,5 +216,5 @@ func (s *scanner) loadFolders() {
}
func (s *scanner) newScanner(f model.MediaFolder) FolderScanner {
return NewTagScanner(f.Path, s.ds)
return NewTagScanner(f.Path, s.ds, s.cacheWarmer)
}

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/core"
"github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model"
"github.com/deluan/navidrome/model/request"
@ -17,19 +18,21 @@ import (
)
type TagScanner struct {
rootFolder string
ds model.DataStore
mapper *mediaFileMapper
plsSync *playlistSync
cnt *counters
rootFolder string
ds model.DataStore
mapper *mediaFileMapper
plsSync *playlistSync
cnt *counters
cacheWarmer core.CacheWarmer
}
func NewTagScanner(rootFolder string, ds model.DataStore) *TagScanner {
func NewTagScanner(rootFolder string, ds model.DataStore, cacheWarmer core.CacheWarmer) *TagScanner {
return &TagScanner{
rootFolder: rootFolder,
mapper: newMediaFileMapper(rootFolder),
plsSync: newPlaylistSync(ds),
ds: ds,
rootFolder: rootFolder,
mapper: newMediaFileMapper(rootFolder),
plsSync: newPlaylistSync(ds),
ds: ds,
cacheWarmer: cacheWarmer,
}
}
@ -62,6 +65,7 @@ const (
// Delete all empty albums, delete all empty artists, clean-up playlists
func (s *TagScanner) Scan(ctx context.Context, lastModifiedSince time.Time) error {
ctx = s.withAdminUser(ctx)
defer s.cacheWarmer.Flush(ctx)
start := time.Now()
allFSDirs, err := s.getDirTree(ctx)
@ -209,6 +213,7 @@ func (s *TagScanner) processDeletedDir(ctx context.Context, dir string) error {
for _, t := range mfs {
buffer.accumulate(t)
s.cacheWarmer.AddAlbum(ctx, t.AlbumID)
}
err = buffer.flush()
@ -285,6 +290,11 @@ func (s *TagScanner) processChangedDir(ctx context.Context, dir string) error {
}
}
// Pre cache all changed album artwork
for albumID := range buffer.album {
s.cacheWarmer.AddAlbum(ctx, albumID)
}
err = buffer.flush()
log.Info(ctx, "Finished processing changed folder", "dir", dir, "updated", numUpdatedTracks,
"purged", numPurgedTracks, "elapsed", time.Since(start))