Load cache asynchronously

This commit is contained in:
Deluan 2020-07-24 15:40:27 -04:00
parent a0bed9beeb
commit 9b1d5c196f
9 changed files with 134 additions and 46 deletions

View File

@ -41,11 +41,8 @@ func CreateAppRouter() *app.Router {
func CreateSubsonicAPIRouter() (*subsonic.Router, error) { func CreateSubsonicAPIRouter() (*subsonic.Router, error) {
dataStore := persistence.New() dataStore := persistence.New()
browser := engine.NewBrowser(dataStore) browser := engine.NewBrowser(dataStore)
imageCache, err := core.NewImageCache() coverCache := core.NewImageCache()
if err != nil { cover := core.NewCover(dataStore, coverCache)
return nil, err
}
cover := core.NewCover(dataStore, imageCache)
nowPlayingRepository := engine.NewNowPlayingRepository() nowPlayingRepository := engine.NewNowPlayingRepository()
listGenerator := engine.NewListGenerator(dataStore, nowPlayingRepository) listGenerator := engine.NewListGenerator(dataStore, nowPlayingRepository)
users := engine.NewUsers(dataStore) users := engine.NewUsers(dataStore)
@ -54,10 +51,7 @@ func CreateSubsonicAPIRouter() (*subsonic.Router, error) {
scrobbler := engine.NewScrobbler(dataStore, nowPlayingRepository) scrobbler := engine.NewScrobbler(dataStore, nowPlayingRepository)
search := engine.NewSearch(dataStore) search := engine.NewSearch(dataStore)
transcoderTranscoder := transcoder.New() transcoderTranscoder := transcoder.New()
transcodingCache, err := core.NewTranscodingCache() transcodingCache := core.NewTranscodingCache()
if err != nil {
return nil, err
}
mediaStreamer := core.NewMediaStreamer(dataStore, transcoderTranscoder, transcodingCache) mediaStreamer := core.NewMediaStreamer(dataStore, transcoderTranscoder, transcodingCache)
players := engine.NewPlayers(dataStore) players := engine.NewPlayers(dataStore)
router := subsonic.New(browser, cover, listGenerator, users, playlists, ratings, scrobbler, search, mediaStreamer, players) router := subsonic.New(browser, cover, listGenerator, users, playlists, ratings, scrobbler, search, mediaStreamer, players)

View File

@ -28,13 +28,15 @@ type Cover interface {
Get(ctx context.Context, id string, size int, out io.Writer) error Get(ctx context.Context, id string, size int, out io.Writer) error
} }
func NewCover(ds model.DataStore, cache *FileCache) Cover { type CoverCache FileCache
func NewCover(ds model.DataStore, cache CoverCache) Cover {
return &cover{ds: ds, cache: cache} return &cover{ds: ds, cache: cache}
} }
type cover struct { type cover struct {
ds model.DataStore ds model.DataStore
cache *FileCache cache FileCache
} }
type coverInfo struct { type coverInfo struct {
@ -182,7 +184,7 @@ func readFromFile(path string) ([]byte, error) {
return buf.Bytes(), nil return buf.Bytes(), nil
} }
func NewImageCache() (*FileCache, error) { func NewImageCache() CoverCache {
return NewFileCache("Image", conf.Server.ImageCacheSize, consts.ImageCacheDir, consts.DefaultImageCacheMaxItems, return NewFileCache("Image", conf.Server.ImageCacheSize, consts.ImageCacheDir, consts.DefaultImageCacheMaxItems,
func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) {
info := arg.(*coverInfo) info := arg.(*coverInfo)

View File

@ -30,7 +30,8 @@ var _ = Describe("Cover", func() {
BeforeEach(func() { BeforeEach(func() {
conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches") conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches")
conf.Server.ImageCacheSize = "100MB" conf.Server.ImageCacheSize = "100MB"
cache, _ := NewImageCache() cache := NewImageCache()
Eventually(func() bool { return cache.Ready() }).Should(BeTrue())
cover = NewCover(ds, cache) cover = NewCover(ds, cache)
}) })

View File

@ -5,39 +5,65 @@ import (
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
"time"
"github.com/deluan/navidrome/conf" "github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/consts" "github.com/deluan/navidrome/consts"
"github.com/deluan/navidrome/log" "github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/utils"
"github.com/djherbis/fscache" "github.com/djherbis/fscache"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
) )
type ReadFunc func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) type ReadFunc func(ctx context.Context, arg fmt.Stringer) (io.Reader, error)
func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) (*FileCache, error) { type FileCache interface {
cache, err := newFSCache(name, cacheSize, cacheFolder, maxItems) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error)
if err != nil { Ready() bool
return nil, err }
func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache {
fc := &fileCache{
name: name,
cacheSize: cacheSize,
cacheFolder: cacheFolder,
maxItems: maxItems,
getReader: getReader,
disabled: utils.AtomicBool{},
ready: utils.AtomicBool{},
} }
return &FileCache{
name: name, go func() {
disabled: cache == nil, cache, err := newFSCache(fc.name, fc.cacheSize, fc.cacheFolder, fc.maxItems)
cache: cache, if err == nil {
getReader: getReader, fc.cache = cache
}, nil fc.disabled.Set(cache == nil)
}
fc.ready.Set(true)
}()
return fc
} }
type FileCache struct { type fileCache struct {
disabled bool name string
name string cacheSize string
cache fscache.Cache cacheFolder string
getReader ReadFunc maxItems int
cache fscache.Cache
getReader ReadFunc
disabled utils.AtomicBool
ready utils.AtomicBool
} }
func (fc *FileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) { func (fc *fileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) {
if fc.disabled { if !fc.Ready() {
log.Debug(ctx, "Cache not initialized yet", "cache", fc.name)
}
if fc.disabled.Get() {
log.Debug(ctx, "Cache disabled", "cache", fc.name) log.Debug(ctx, "Cache disabled", "cache", fc.name)
}
if fc.disabled.Get() || !fc.Ready() {
reader, err := fc.getReader(ctx, arg) reader, err := fc.getReader(ctx, arg)
if err != nil { if err != nil {
return nil, err return nil, err
@ -71,6 +97,7 @@ func (fc *FileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream,
return &CachedStream{ return &CachedStream{
Reader: sr, Reader: sr,
Seeker: sr, Seeker: sr,
Cached: true,
}, nil }, nil
} else { } else {
log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key) log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key)
@ -78,12 +105,17 @@ func (fc *FileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream,
} }
// All other cases, just return a Reader, without Seek capabilities // All other cases, just return a Reader, without Seek capabilities
return &CachedStream{Reader: r}, nil return &CachedStream{Reader: r, Cached: true}, nil
}
func (fc *fileCache) Ready() bool {
return fc.ready.Get()
} }
type CachedStream struct { type CachedStream struct {
io.Reader io.Reader
io.Seeker io.Seeker
Cached bool
} }
func (s *CachedStream) Seekable() bool { return s.Seeker != nil } func (s *CachedStream) Seekable() bool { return s.Seeker != nil }
@ -125,20 +157,27 @@ func copyAndClose(ctx context.Context, w io.WriteCloser, r io.Reader) {
func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) { func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) {
size, err := humanize.ParseBytes(cacheSize) size, err := humanize.ParseBytes(cacheSize)
if err != nil { if err != nil {
log.Error("Invalid cache size. Using default size", "cache", name, "size", cacheSize, "defaultSize", consts.DefaultCacheSize) log.Error("Invalid cache size. Using default size", "cache", name, "size", cacheSize,
"defaultSize", humanize.Bytes(consts.DefaultCacheSize))
size = consts.DefaultCacheSize size = consts.DefaultCacheSize
} }
if size == 0 { if size == 0 {
log.Warn(fmt.Sprintf("%s cache disabled", name)) log.Warn(fmt.Sprintf("%s cache disabled", name))
return nil, nil return nil, nil
} }
start := time.Now()
lru := fscache.NewLRUHaunter(maxItems, int64(size), consts.DefaultCacheCleanUpInterval) lru := fscache.NewLRUHaunter(maxItems, int64(size), consts.DefaultCacheCleanUpInterval)
h := fscache.NewLRUHaunterStrategy(lru) h := fscache.NewLRUHaunterStrategy(lru)
cacheFolder = filepath.Join(conf.Server.DataFolder, cacheFolder) cacheFolder = filepath.Join(conf.Server.DataFolder, cacheFolder)
log.Info(fmt.Sprintf("Creating %s cache", name), "path", cacheFolder, "maxSize", humanize.Bytes(size)) log.Info(fmt.Sprintf("Creating %s cache", name), "path", cacheFolder, "maxSize", humanize.Bytes(size))
fs, err := fscache.NewFs(cacheFolder, 0755) fs, err := fscache.NewFs(cacheFolder, 0755)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("Error initializing %s cache", name), err, "elapsedTime", time.Since(start))
return nil, err return nil, err
} }
log.Debug(fmt.Sprintf("%s cache initialized", name), "elapsedTime", time.Since(start))
return fscache.NewCacheWithHaunter(fs, h) return fscache.NewCacheWithHaunter(fs, h)
} }

View File

@ -14,6 +14,13 @@ import (
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
// Call NewFileCache and wait for it to be ready
func callNewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache {
fc := NewFileCache(name, cacheSize, cacheFolder, maxItems, getReader)
Eventually(func() bool { return fc.Ready() }).Should(BeTrue())
return fc
}
var _ = Describe("File Caches", func() { var _ = Describe("File Caches", func() {
BeforeEach(func() { BeforeEach(func() {
conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches") conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches")
@ -24,31 +31,29 @@ var _ = Describe("File Caches", func() {
Describe("NewFileCache", func() { Describe("NewFileCache", func() {
It("creates the cache folder", func() { It("creates the cache folder", func() {
Expect(NewFileCache("test", "1k", "test", 0, nil)).ToNot(BeNil()) Expect(callNewFileCache("test", "1k", "test", 0, nil)).ToNot(BeNil())
_, err := os.Stat(filepath.Join(conf.Server.DataFolder, "test")) _, err := os.Stat(filepath.Join(conf.Server.DataFolder, "test"))
Expect(os.IsNotExist(err)).To(BeFalse()) Expect(os.IsNotExist(err)).To(BeFalse())
}) })
It("creates the cache folder with invalid size", func() { It("creates the cache folder with invalid size", func() {
fc, err := NewFileCache("test", "abc", "test", 0, nil) fc := callNewFileCache("test", "abc", "test", 0, nil)
Expect(err).To(BeNil())
Expect(fc.cache).ToNot(BeNil()) Expect(fc.cache).ToNot(BeNil())
Expect(fc.disabled).To(BeFalse()) Expect(fc.disabled.Get()).To(BeFalse())
}) })
It("returns empty if cache size is '0'", func() { It("returns empty if cache size is '0'", func() {
fc, err := NewFileCache("test", "0", "test", 0, nil) fc := callNewFileCache("test", "0", "test", 0, nil)
Expect(err).To(BeNil())
Expect(fc.cache).To(BeNil()) Expect(fc.cache).To(BeNil())
Expect(fc.disabled).To(BeTrue()) Expect(fc.disabled.Get()).To(BeTrue())
}) })
}) })
Describe("FileCache", func() { Describe("FileCache", func() {
It("caches data if cache is enabled", func() { It("caches data if cache is enabled", func() {
called := false called := false
fc, _ := NewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) {
called = true called = true
return strings.NewReader(arg.String()), nil return strings.NewReader(arg.String()), nil
}) })
@ -67,7 +72,7 @@ var _ = Describe("File Caches", func() {
It("does not cache data if cache is disabled", func() { It("does not cache data if cache is disabled", func() {
called := false called := false
fc, _ := NewFileCache("test", "0", "test", 0, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { fc := callNewFileCache("test", "0", "test", 0, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) {
called = true called = true
return strings.NewReader(arg.String()), nil return strings.NewReader(arg.String()), nil
}) })

View File

@ -20,14 +20,16 @@ type MediaStreamer interface {
NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error) NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error)
} }
func NewMediaStreamer(ds model.DataStore, ffm transcoder.Transcoder, cache *FileCache) MediaStreamer { type TranscodingCache FileCache
func NewMediaStreamer(ds model.DataStore, ffm transcoder.Transcoder, cache TranscodingCache) MediaStreamer {
return &mediaStreamer{ds: ds, ffm: ffm, cache: cache} return &mediaStreamer{ds: ds, ffm: ffm, cache: cache}
} }
type mediaStreamer struct { type mediaStreamer struct {
ds model.DataStore ds model.DataStore
ffm transcoder.Transcoder ffm transcoder.Transcoder
cache *FileCache cache FileCache
} }
type streamJob struct { type streamJob struct {
@ -90,7 +92,7 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat str
log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", mf.Path, log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", mf.Path,
"requestBitrate", reqBitRate, "requestFormat", reqFormat, "requestBitrate", reqBitRate, "requestFormat", reqFormat,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix, "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix,
"selectedBitrate", bitRate, "selectedFormat", format) "selectedBitrate", bitRate, "selectedFormat", format, "cached", r.Cached)
s.Reader = r s.Reader = r
s.Closer = r s.Closer = r
@ -166,7 +168,7 @@ func selectTranscodingOptions(ctx context.Context, ds model.DataStore, mf *model
return return
} }
func NewTranscodingCache() (*FileCache, error) { func NewTranscodingCache() TranscodingCache {
return NewFileCache("Transcoding", conf.Server.TranscodingCacheSize, return NewFileCache("Transcoding", conf.Server.TranscodingCacheSize,
consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems, consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems,
func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) {

View File

@ -27,7 +27,8 @@ var _ = Describe("MediaStreamer", func() {
conf.Server.TranscodingCacheSize = "100MB" conf.Server.TranscodingCacheSize = "100MB"
ds = &persistence.MockDataStore{MockedTranscoding: &mockTranscodingRepository{}} ds = &persistence.MockDataStore{MockedTranscoding: &mockTranscodingRepository{}}
ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "suffix": "mp3", "bitRate": 128, "duration": 257.0}]`) ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "suffix": "mp3", "bitRate": 128, "duration": 257.0}]`)
testCache, _ := NewTranscodingCache() testCache := NewTranscodingCache()
Eventually(func() bool { return testCache.Ready() }).Should(BeTrue())
streamer = NewMediaStreamer(ds, ffmpeg, testCache) streamer = NewMediaStreamer(ds, ffmpeg, testCache)
}) })

17
utils/atomic.go Normal file
View File

@ -0,0 +1,17 @@
package utils
import "sync/atomic"
type AtomicBool struct{ flag int32 }
func (b *AtomicBool) Get() bool {
return atomic.LoadInt32(&(b.flag)) != 0
}
func (b *AtomicBool) Set(value bool) {
var i int32 = 0
if value {
i = 1
}
atomic.StoreInt32(&(b.flag), i)
}

27
utils/atomic_test.go Normal file
View File

@ -0,0 +1,27 @@
package utils_test
import (
"github.com/deluan/navidrome/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("AtomicBool", func() {
var b utils.AtomicBool
BeforeEach(func() {
b = utils.AtomicBool{}
})
It("initializes with value = false", func() {
Expect(b.Get()).To(BeFalse())
})
It("sets value", func() {
b.Set(true)
Expect(b.Get()).To(BeTrue())
b.Set(false)
Expect(b.Get()).To(BeFalse())
})
})