diff --git a/cmd/wire_gen.go b/cmd/wire_gen.go index 5efc7009..65ff30a2 100644 --- a/cmd/wire_gen.go +++ b/cmd/wire_gen.go @@ -41,11 +41,8 @@ func CreateAppRouter() *app.Router { func CreateSubsonicAPIRouter() (*subsonic.Router, error) { dataStore := persistence.New() browser := engine.NewBrowser(dataStore) - imageCache, err := core.NewImageCache() - if err != nil { - return nil, err - } - cover := core.NewCover(dataStore, imageCache) + coverCache := core.NewImageCache() + cover := core.NewCover(dataStore, coverCache) nowPlayingRepository := engine.NewNowPlayingRepository() listGenerator := engine.NewListGenerator(dataStore, nowPlayingRepository) users := engine.NewUsers(dataStore) @@ -54,10 +51,7 @@ func CreateSubsonicAPIRouter() (*subsonic.Router, error) { scrobbler := engine.NewScrobbler(dataStore, nowPlayingRepository) search := engine.NewSearch(dataStore) transcoderTranscoder := transcoder.New() - transcodingCache, err := core.NewTranscodingCache() - if err != nil { - return nil, err - } + transcodingCache := core.NewTranscodingCache() mediaStreamer := core.NewMediaStreamer(dataStore, transcoderTranscoder, transcodingCache) players := engine.NewPlayers(dataStore) router := subsonic.New(browser, cover, listGenerator, users, playlists, ratings, scrobbler, search, mediaStreamer, players) diff --git a/core/cover.go b/core/cover.go index af115052..623e2e46 100644 --- a/core/cover.go +++ b/core/cover.go @@ -28,13 +28,15 @@ type Cover interface { Get(ctx context.Context, id string, size int, out io.Writer) error } -func NewCover(ds model.DataStore, cache *FileCache) Cover { +type CoverCache FileCache + +func NewCover(ds model.DataStore, cache CoverCache) Cover { return &cover{ds: ds, cache: cache} } type cover struct { ds model.DataStore - cache *FileCache + cache FileCache } type coverInfo struct { @@ -182,7 +184,7 @@ func readFromFile(path string) ([]byte, error) { return buf.Bytes(), nil } -func NewImageCache() (*FileCache, error) { +func NewImageCache() CoverCache { return NewFileCache("Image", conf.Server.ImageCacheSize, consts.ImageCacheDir, consts.DefaultImageCacheMaxItems, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { info := arg.(*coverInfo) diff --git a/core/cover_test.go b/core/cover_test.go index 48dd1782..26ad8f52 100644 --- a/core/cover_test.go +++ b/core/cover_test.go @@ -30,7 +30,8 @@ var _ = Describe("Cover", func() { BeforeEach(func() { conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches") conf.Server.ImageCacheSize = "100MB" - cache, _ := NewImageCache() + cache := NewImageCache() + Eventually(func() bool { return cache.Ready() }).Should(BeTrue()) cover = NewCover(ds, cache) }) diff --git a/core/file_caches.go b/core/file_caches.go index 8ae50d93..c0be7a70 100644 --- a/core/file_caches.go +++ b/core/file_caches.go @@ -5,39 +5,65 @@ import ( "fmt" "io" "path/filepath" + "time" "github.com/deluan/navidrome/conf" "github.com/deluan/navidrome/consts" "github.com/deluan/navidrome/log" + "github.com/deluan/navidrome/utils" "github.com/djherbis/fscache" "github.com/dustin/go-humanize" ) type ReadFunc func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) -func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) (*FileCache, error) { - cache, err := newFSCache(name, cacheSize, cacheFolder, maxItems) - if err != nil { - return nil, err +type FileCache interface { + Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) + Ready() bool +} + +func NewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache { + fc := &fileCache{ + name: name, + cacheSize: cacheSize, + cacheFolder: cacheFolder, + maxItems: maxItems, + getReader: getReader, + disabled: utils.AtomicBool{}, + ready: utils.AtomicBool{}, } - return &FileCache{ - name: name, - disabled: cache == nil, - cache: cache, - getReader: getReader, - }, nil + + go func() { + cache, err := newFSCache(fc.name, fc.cacheSize, fc.cacheFolder, fc.maxItems) + if err == nil { + fc.cache = cache + fc.disabled.Set(cache == nil) + } + fc.ready.Set(true) + }() + + return fc } -type FileCache struct { - disabled bool - name string - cache fscache.Cache - getReader ReadFunc +type fileCache struct { + name string + cacheSize string + cacheFolder string + maxItems int + cache fscache.Cache + getReader ReadFunc + disabled utils.AtomicBool + ready utils.AtomicBool } -func (fc *FileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) { - if fc.disabled { +func (fc *fileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, error) { + if !fc.Ready() { + log.Debug(ctx, "Cache not initialized yet", "cache", fc.name) + } + if fc.disabled.Get() { log.Debug(ctx, "Cache disabled", "cache", fc.name) + } + if fc.disabled.Get() || !fc.Ready() { reader, err := fc.getReader(ctx, arg) if err != nil { return nil, err @@ -71,6 +97,7 @@ func (fc *FileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, return &CachedStream{ Reader: sr, Seeker: sr, + Cached: true, }, nil } else { log.Trace(ctx, "Cache HIT", "cache", fc.name, "key", key) @@ -78,12 +105,17 @@ func (fc *FileCache) Get(ctx context.Context, arg fmt.Stringer) (*CachedStream, } // All other cases, just return a Reader, without Seek capabilities - return &CachedStream{Reader: r}, nil + return &CachedStream{Reader: r, Cached: true}, nil +} + +func (fc *fileCache) Ready() bool { + return fc.ready.Get() } type CachedStream struct { io.Reader io.Seeker + Cached bool } func (s *CachedStream) Seekable() bool { return s.Seeker != nil } @@ -125,20 +157,27 @@ func copyAndClose(ctx context.Context, w io.WriteCloser, r io.Reader) { func newFSCache(name, cacheSize, cacheFolder string, maxItems int) (fscache.Cache, error) { size, err := humanize.ParseBytes(cacheSize) if err != nil { - log.Error("Invalid cache size. Using default size", "cache", name, "size", cacheSize, "defaultSize", consts.DefaultCacheSize) + log.Error("Invalid cache size. Using default size", "cache", name, "size", cacheSize, + "defaultSize", humanize.Bytes(consts.DefaultCacheSize)) size = consts.DefaultCacheSize } if size == 0 { log.Warn(fmt.Sprintf("%s cache disabled", name)) return nil, nil } + + start := time.Now() lru := fscache.NewLRUHaunter(maxItems, int64(size), consts.DefaultCacheCleanUpInterval) h := fscache.NewLRUHaunterStrategy(lru) cacheFolder = filepath.Join(conf.Server.DataFolder, cacheFolder) + log.Info(fmt.Sprintf("Creating %s cache", name), "path", cacheFolder, "maxSize", humanize.Bytes(size)) fs, err := fscache.NewFs(cacheFolder, 0755) if err != nil { + log.Error(fmt.Sprintf("Error initializing %s cache", name), err, "elapsedTime", time.Since(start)) return nil, err } + log.Debug(fmt.Sprintf("%s cache initialized", name), "elapsedTime", time.Since(start)) + return fscache.NewCacheWithHaunter(fs, h) } diff --git a/core/file_caches_test.go b/core/file_caches_test.go index 14b54011..4d3f19ae 100644 --- a/core/file_caches_test.go +++ b/core/file_caches_test.go @@ -14,6 +14,13 @@ import ( . "github.com/onsi/gomega" ) +// Call NewFileCache and wait for it to be ready +func callNewFileCache(name, cacheSize, cacheFolder string, maxItems int, getReader ReadFunc) *fileCache { + fc := NewFileCache(name, cacheSize, cacheFolder, maxItems, getReader) + Eventually(func() bool { return fc.Ready() }).Should(BeTrue()) + return fc +} + var _ = Describe("File Caches", func() { BeforeEach(func() { conf.Server.DataFolder, _ = ioutil.TempDir("", "file_caches") @@ -24,31 +31,29 @@ var _ = Describe("File Caches", func() { Describe("NewFileCache", func() { It("creates the cache folder", func() { - Expect(NewFileCache("test", "1k", "test", 0, nil)).ToNot(BeNil()) + Expect(callNewFileCache("test", "1k", "test", 0, nil)).ToNot(BeNil()) _, err := os.Stat(filepath.Join(conf.Server.DataFolder, "test")) Expect(os.IsNotExist(err)).To(BeFalse()) }) It("creates the cache folder with invalid size", func() { - fc, err := NewFileCache("test", "abc", "test", 0, nil) - Expect(err).To(BeNil()) + fc := callNewFileCache("test", "abc", "test", 0, nil) Expect(fc.cache).ToNot(BeNil()) - Expect(fc.disabled).To(BeFalse()) + Expect(fc.disabled.Get()).To(BeFalse()) }) It("returns empty if cache size is '0'", func() { - fc, err := NewFileCache("test", "0", "test", 0, nil) - Expect(err).To(BeNil()) + fc := callNewFileCache("test", "0", "test", 0, nil) Expect(fc.cache).To(BeNil()) - Expect(fc.disabled).To(BeTrue()) + Expect(fc.disabled.Get()).To(BeTrue()) }) }) Describe("FileCache", func() { It("caches data if cache is enabled", func() { called := false - fc, _ := NewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { + fc := callNewFileCache("test", "1KB", "test", 0, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { called = true return strings.NewReader(arg.String()), nil }) @@ -67,7 +72,7 @@ var _ = Describe("File Caches", func() { It("does not cache data if cache is disabled", func() { called := false - fc, _ := NewFileCache("test", "0", "test", 0, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { + fc := callNewFileCache("test", "0", "test", 0, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { called = true return strings.NewReader(arg.String()), nil }) diff --git a/core/media_streamer.go b/core/media_streamer.go index e9224008..5bbbc021 100644 --- a/core/media_streamer.go +++ b/core/media_streamer.go @@ -20,14 +20,16 @@ type MediaStreamer interface { NewStream(ctx context.Context, id string, reqFormat string, reqBitRate int) (*Stream, error) } -func NewMediaStreamer(ds model.DataStore, ffm transcoder.Transcoder, cache *FileCache) MediaStreamer { +type TranscodingCache FileCache + +func NewMediaStreamer(ds model.DataStore, ffm transcoder.Transcoder, cache TranscodingCache) MediaStreamer { return &mediaStreamer{ds: ds, ffm: ffm, cache: cache} } type mediaStreamer struct { ds model.DataStore ffm transcoder.Transcoder - cache *FileCache + cache FileCache } type streamJob struct { @@ -90,7 +92,7 @@ func (ms *mediaStreamer) NewStream(ctx context.Context, id string, reqFormat str log.Debug(ctx, "Streaming TRANSCODED file", "id", mf.ID, "path", mf.Path, "requestBitrate", reqBitRate, "requestFormat", reqFormat, "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix, - "selectedBitrate", bitRate, "selectedFormat", format) + "selectedBitrate", bitRate, "selectedFormat", format, "cached", r.Cached) s.Reader = r s.Closer = r @@ -166,7 +168,7 @@ func selectTranscodingOptions(ctx context.Context, ds model.DataStore, mf *model return } -func NewTranscodingCache() (*FileCache, error) { +func NewTranscodingCache() TranscodingCache { return NewFileCache("Transcoding", conf.Server.TranscodingCacheSize, consts.TranscodingCacheDir, consts.DefaultTranscodingCacheMaxItems, func(ctx context.Context, arg fmt.Stringer) (io.Reader, error) { diff --git a/core/media_streamer_test.go b/core/media_streamer_test.go index 0f7c7bd1..ddb1b0ae 100644 --- a/core/media_streamer_test.go +++ b/core/media_streamer_test.go @@ -27,7 +27,8 @@ var _ = Describe("MediaStreamer", func() { conf.Server.TranscodingCacheSize = "100MB" ds = &persistence.MockDataStore{MockedTranscoding: &mockTranscodingRepository{}} ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "suffix": "mp3", "bitRate": 128, "duration": 257.0}]`) - testCache, _ := NewTranscodingCache() + testCache := NewTranscodingCache() + Eventually(func() bool { return testCache.Ready() }).Should(BeTrue()) streamer = NewMediaStreamer(ds, ffmpeg, testCache) }) diff --git a/utils/atomic.go b/utils/atomic.go new file mode 100644 index 00000000..cfea57aa --- /dev/null +++ b/utils/atomic.go @@ -0,0 +1,17 @@ +package utils + +import "sync/atomic" + +type AtomicBool struct{ flag int32 } + +func (b *AtomicBool) Get() bool { + return atomic.LoadInt32(&(b.flag)) != 0 +} + +func (b *AtomicBool) Set(value bool) { + var i int32 = 0 + if value { + i = 1 + } + atomic.StoreInt32(&(b.flag), i) +} diff --git a/utils/atomic_test.go b/utils/atomic_test.go new file mode 100644 index 00000000..91ef318a --- /dev/null +++ b/utils/atomic_test.go @@ -0,0 +1,27 @@ +package utils_test + +import ( + "github.com/deluan/navidrome/utils" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("AtomicBool", func() { + var b utils.AtomicBool + + BeforeEach(func() { + b = utils.AtomicBool{} + }) + + It("initializes with value = false", func() { + Expect(b.Get()).To(BeFalse()) + }) + + It("sets value", func() { + b.Set(true) + Expect(b.Get()).To(BeTrue()) + + b.Set(false) + Expect(b.Get()).To(BeFalse()) + }) +})