diff --git a/conf/configuration.go b/conf/configuration.go index ca031e41..f25fef27 100644 --- a/conf/configuration.go +++ b/conf/configuration.go @@ -13,20 +13,21 @@ import ( ) type nd struct { - Port string `default:"4533"` - MusicFolder string `default:"./music"` - DataFolder string `default:"./"` - DbPath string - LogLevel string `default:"info"` + Port string `default:"4533"` + MusicFolder string `default:"./music"` + DataFolder string `default:"./"` + ScanInterval string `default:"1m"` + DbPath string + LogLevel string `default:"info"` IgnoredArticles string `default:"The El La Los Las Le Les Os As O A"` IndexGroups string `default:"A B C D E F G H I J K L M N O P Q R S T U V W X-Z(XYZ) [Unknown]([)"` - EnableDownsampling bool `default:"false"` - MaxBitRate int `default:"0"` - DownsampleCommand string `default:"ffmpeg -i %s -map 0:0 -b:a %bk -v 0 -f mp3 -"` - ProbeCommand string `default:"ffmpeg -i %s -f ffmetadata"` - ScanInterval string `default:"1m"` + EnableDownsampling bool `default:"false"` + MaxBitRate int `default:"0"` + MaxTranscodingCacheSize int64 `default:"100000000"` // 100MB + DownsampleCommand string `default:"ffmpeg -i %s -map 0:0 -b:a %bk -v 0 -f mp3 -"` + ProbeCommand string `default:"ffmpeg -i %s -f ffmetadata"` // DevFlags. These are used to enable/disable debugging and incomplete features DevDisableBanner bool `default:"false"` diff --git a/engine/media_streamer.go b/engine/media_streamer.go index 65455663..ccfe255e 100644 --- a/engine/media_streamer.go +++ b/engine/media_streamer.go @@ -6,48 +6,42 @@ import ( "io" "net/http" "os" - "path/filepath" "strings" "time" "github.com/deluan/navidrome/conf" - "github.com/deluan/navidrome/consts" "github.com/deluan/navidrome/engine/ffmpeg" "github.com/deluan/navidrome/log" "github.com/deluan/navidrome/model" "github.com/deluan/navidrome/utils" + "gopkg.in/djherbis/fscache.v0" ) type MediaStreamer interface { NewFileSystem(ctx context.Context, maxBitRate int, format string) (http.FileSystem, error) } -func NewMediaStreamer(ds model.DataStore, ffm ffmpeg.FFmpeg) MediaStreamer { - return &mediaStreamer{ds: ds, ffm: ffm} +func NewMediaStreamer(ds model.DataStore, ffm ffmpeg.FFmpeg, cache fscache.Cache) MediaStreamer { + return &mediaStreamer{ds: ds, ffm: ffm, cache: cache} } type mediaStreamer struct { - ds model.DataStore - ffm ffmpeg.FFmpeg + ds model.DataStore + ffm ffmpeg.FFmpeg + cache fscache.Cache } func (ms *mediaStreamer) NewFileSystem(ctx context.Context, maxBitRate int, format string) (http.FileSystem, error) { - cacheFolder := filepath.Join(conf.Server.DataFolder, consts.CacheDir) - err := os.MkdirAll(cacheFolder, 0755) - if err != nil { - log.Error("Could not create cache folder", "folder", cacheFolder, err) - return nil, err - } - return &mediaFileSystem{ctx: ctx, ds: ms.ds, ffm: ms.ffm, maxBitRate: maxBitRate, format: format, cacheFolder: cacheFolder}, nil + return &mediaFileSystem{ctx: ctx, ds: ms.ds, ffm: ms.ffm, cache: ms.cache, maxBitRate: maxBitRate, format: format}, nil } type mediaFileSystem struct { - ctx context.Context - ds model.DataStore - maxBitRate int - format string - cacheFolder string - ffm ffmpeg.FFmpeg + ctx context.Context + ds model.DataStore + maxBitRate int + format string + ffm ffmpeg.FFmpeg + cache fscache.Cache } func (fs *mediaFileSystem) selectTranscodingOptions(mf *model.MediaFile) (string, int) { @@ -93,115 +87,87 @@ func (fs *mediaFileSystem) Open(name string) (http.File, error) { return os.Open(mf.Path) } - cachedFile := fs.cacheFilePath(mf, bitRate, format) - if _, err := os.Stat(cachedFile); !os.IsNotExist(err) { - log.Debug(fs.ctx, "Streaming cached transcoded", "id", mf.ID, "path", mf.Path, - "requestBitrate", bitRate, "requestFormat", format, - "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix) - return os.Open(cachedFile) - } - log.Debug(fs.ctx, "Streaming transcoded file", "id", mf.ID, "path", mf.Path, "requestBitrate", bitRate, "requestFormat", format, "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix) - return fs.transcodeFile(mf, bitRate, format, cachedFile) + return fs.transcodeFile(mf, bitRate, format) } -func (fs *mediaFileSystem) cacheFilePath(mf *model.MediaFile, bitRate int, format string) string { - // Break the cache in subfolders, to avoid too many files in the same folder - subDir := strings.ToLower(mf.ID[:2]) - subDir = filepath.Join(fs.cacheFolder, subDir) - // Make sure the subfolder to exist - os.Mkdir(subDir, 0755) - return filepath.Join(subDir, fmt.Sprintf("%s.%d.%s", mf.ID, bitRate, format)) +func (fs *mediaFileSystem) transcodeFile(mf *model.MediaFile, bitRate int, format string) (*transcodingFile, error) { + key := fmt.Sprintf("%s.%d.%s", mf.ID, bitRate, format) + r, w, err := fs.cache.Get(key) + if err != nil { + log.Error("Error creating stream caching buffer", "id", mf.ID, err) + return nil, os.ErrInvalid + } + + // If it is a new file (not found in the cached), start a new transcoding session + if w != nil { + log.Debug("File not found in cache. Starting new transcoding session", "id", mf.ID) + out, err := fs.ffm.StartTranscoding(fs.ctx, mf.Path, bitRate, format) + if err != nil { + log.Error("Error starting transcoder", "id", mf.ID, err) + return nil, os.ErrInvalid + } + go func() { + io.Copy(w, out) + out.Close() + w.Close() + }() + } else { + log.Debug("Reading transcoded file from cache", "id", mf.ID) + } + + return newTranscodingFile(fs.ctx, r, mf, bitRate), nil } -func (fs *mediaFileSystem) transcodeFile(mf *model.MediaFile, bitRate int, format, cacheFile string) (*transcodingFile, error) { - out, err := fs.ffm.StartTranscoding(fs.ctx, mf.Path, bitRate, format) - if err != nil { - log.Error("Error starting transcoder", "id", mf.ID, err) - return nil, os.ErrInvalid +// transcodingFile Implements http.File interface, required for the FileSystem. It needs a Closer, a Reader and +// a Seeker for the same stream. Because the fscache package only provides a ReaderAtCloser (without the Seek() +// method), we wrap that reader with a SectionReader, which provides a Seek(). But we still need the original +// reader, as we need to close the stream when the transfer is complete +func newTranscodingFile(ctx context.Context, reader fscache.ReadAtCloser, + mf *model.MediaFile, bitRate int) *transcodingFile { + + size := int64(mf.Duration*float32(bitRate*1000)) / 8 + return &transcodingFile{ + ctx: ctx, + mf: mf, + bitRate: bitRate, + size: size, + closer: reader, + ReadSeeker: io.NewSectionReader(reader, 0, size), } - buf, err := newStreamBuffer(cacheFile) - if err != nil { - log.Error("Error creating stream buffer", "id", mf.ID, err) - return nil, os.ErrInvalid - } - r, err := buf.NewReader() - if err != nil { - log.Error("Error opening stream reader", "id", mf.ID, err) - return nil, os.ErrInvalid - } - go func() { - io.Copy(buf, out) - out.Close() - buf.Sync() - buf.Close() - }() - s := &transcodingFile{ - ctx: fs.ctx, - mf: mf, - bitRate: bitRate, - } - s.File = r - return s, nil } type transcodingFile struct { ctx context.Context mf *model.MediaFile bitRate int - http.File + size int64 + closer io.Closer + io.ReadSeeker } -func (h *transcodingFile) Stat() (os.FileInfo, error) { - return &streamHandlerFileInfo{mf: h.mf, bitRate: h.bitRate}, nil +func (tf *transcodingFile) Stat() (os.FileInfo, error) { + return &streamHandlerFileInfo{f: tf}, nil } -// Don't return EOF, just wait for more data. When the request ends, this "File" will be closed, and then -// the Read will be interrupted -func (h *transcodingFile) Read(b []byte) (int, error) { - for { - n, err := h.File.Read(b) - if n > 0 { - return n, nil - } else if err != io.EOF { - return n, err - } - time.Sleep(100 * time.Millisecond) - } +func (tf *transcodingFile) Close() error { + return tf.closer.Close() +} + +func (tf *transcodingFile) Readdir(count int) ([]os.FileInfo, error) { + return nil, nil } type streamHandlerFileInfo struct { - mf *model.MediaFile - bitRate int + f *transcodingFile } -func (f *streamHandlerFileInfo) Name() string { return f.mf.Title } -func (f *streamHandlerFileInfo) Size() int64 { return int64(f.mf.Duration*float32(f.bitRate*1000)) / 8 } -func (f *streamHandlerFileInfo) Mode() os.FileMode { return os.FileMode(0777) } -func (f *streamHandlerFileInfo) ModTime() time.Time { return f.mf.UpdatedAt } -func (f *streamHandlerFileInfo) IsDir() bool { return false } -func (f *streamHandlerFileInfo) Sys() interface{} { return nil } - -// From: https://stackoverflow.com/a/44322300 -type streamBuffer struct { - *os.File -} - -func (mb *streamBuffer) NewReader() (http.File, error) { - f, err := os.Open(mb.Name()) - if err != nil { - return nil, err - } - return f, nil -} - -func newStreamBuffer(name string) (*streamBuffer, error) { - f, err := os.Create(name) - if err != nil { - return nil, err - } - return &streamBuffer{File: f}, nil -} +func (fi *streamHandlerFileInfo) Name() string { return fi.f.mf.Title } +func (fi *streamHandlerFileInfo) ModTime() time.Time { return fi.f.mf.UpdatedAt } +func (fi *streamHandlerFileInfo) Size() int64 { return fi.f.size } +func (fi *streamHandlerFileInfo) Mode() os.FileMode { return os.FileMode(0777) } +func (fi *streamHandlerFileInfo) IsDir() bool { return false } +func (fi *streamHandlerFileInfo) Sys() interface{} { return nil } diff --git a/engine/media_streamer_test.go b/engine/media_streamer_test.go index c75a16d2..81d8a02f 100644 --- a/engine/media_streamer_test.go +++ b/engine/media_streamer_test.go @@ -14,32 +14,22 @@ import ( "github.com/deluan/navidrome/persistence" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "gopkg.in/djherbis/fscache.v0" ) var _ = Describe("MediaStreamer", func() { var streamer MediaStreamer var ds model.DataStore - var tempDir string ctx := log.NewContext(nil) - BeforeSuite(func() { - conf.Server.EnableDownsampling = true - tempDir, err := ioutil.TempDir("", "stream_tests") - if err != nil { - panic(err) - } - conf.Server.DataFolder = tempDir - }) - BeforeEach(func() { + conf.Server.EnableDownsampling = true + fs := fscache.NewMemFs() + cache, _ := fscache.NewCache(fs, nil) ds = &persistence.MockDataStore{} ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "bitRate": 128}]`, 1) - streamer = NewMediaStreamer(ds, &fakeFFmpeg{}) - }) - - AfterSuite(func() { - os.RemoveAll(tempDir) + streamer = NewMediaStreamer(ds, &fakeFFmpeg{}, cache) }) getFile := func(id string, maxBitRate int, format string) (http.File, error) { @@ -63,9 +53,6 @@ var _ = Describe("MediaStreamer", func() { Expect(s).To(BeAssignableToTypeOf(&transcodingFile{})) Expect(s.(*transcodingFile).bitRate).To(Equal(64)) }) - It("returns a File if the transcoding is cached", func() { - Expect(getFile("123", 64, "mp3")).To(BeAssignableToTypeOf(&os.File{})) - }) }) }) diff --git a/engine/wire_providers.go b/engine/wire_providers.go index 38186ec0..92bb7d82 100644 --- a/engine/wire_providers.go +++ b/engine/wire_providers.go @@ -1,8 +1,14 @@ package engine import ( + "path/filepath" + "time" + + "github.com/deluan/navidrome/conf" + "github.com/deluan/navidrome/consts" "github.com/deluan/navidrome/engine/ffmpeg" "github.com/google/wire" + "gopkg.in/djherbis/fscache.v0" ) var Set = wire.NewSet( @@ -17,4 +23,16 @@ var Set = wire.NewSet( NewUsers, NewMediaStreamer, ffmpeg.New, + NewTranscodingCache, ) + +func NewTranscodingCache() (fscache.Cache, error) { + lru := fscache.NewLRUHaunter(0, conf.Server.MaxTranscodingCacheSize, 30*time.Second) + h := fscache.NewLRUHaunterStrategy(lru) + cacheFolder := filepath.Join(conf.Server.DataFolder, consts.CacheDir) + fs, err := fscache.NewFs(cacheFolder, 0755) + if err != nil { + return nil, err + } + return fscache.NewCacheWithHaunter(fs, h) +} diff --git a/go.mod b/go.mod index f6422521..e5c5761c 100644 --- a/go.mod +++ b/go.mod @@ -38,5 +38,8 @@ require ( golang.org/x/text v0.3.2 // indirect golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/djherbis/atime.v1 v1.0.0 // indirect + gopkg.in/djherbis/fscache.v0 v0.9.0 + gopkg.in/djherbis/stream.v1 v1.2.0 // indirect gopkg.in/yaml.v2 v2.2.8 // indirect ) diff --git a/go.sum b/go.sum index 76403945..ea4ab638 100644 --- a/go.sum +++ b/go.sum @@ -172,6 +172,12 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/djherbis/atime.v1 v1.0.0 h1:eMRqB/JrLKocla2PBPKgQYg/p5UG4L6AUAs92aP7F60= +gopkg.in/djherbis/atime.v1 v1.0.0/go.mod h1:hQIUStKmJfvf7xdh/wtK84qe+DsTV5LnA9lzxxtPpJ8= +gopkg.in/djherbis/fscache.v0 v0.9.0 h1:CBmOlHQKg99q0xATpQpSNAR970UN4vECB5SjzkuyLe0= +gopkg.in/djherbis/fscache.v0 v0.9.0/go.mod h1:izqJMuO+STCEMBEGFiwW5zPlamuiUOxMRpNzHT5cQHc= +gopkg.in/djherbis/stream.v1 v1.2.0 h1:3tZuXO+RK8opjw8/BJr780h+eAPwOFfLHCKRKyYxk3s= +gopkg.in/djherbis/stream.v1 v1.2.0/go.mod h1:aEV8CBVRmSpLamVJfM903Npic1IKmb2qS30VAZ+sssg= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/main.go b/main.go index 5f7fd27b..093bb251 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "github.com/deluan/navidrome/conf" "github.com/deluan/navidrome/consts" "github.com/deluan/navidrome/db" @@ -14,8 +16,12 @@ func main() { conf.Load() db.EnsureLatestVersion() + subsonic, err := CreateSubsonicAPIRouter() + if err != nil { + panic(fmt.Sprintf("Could not create the Subsonic API router. Aborting! err=%v", err)) + } a := CreateServer(conf.Server.MusicFolder) - a.MountRouter("/rest", CreateSubsonicAPIRouter()) + a.MountRouter("/rest", subsonic) a.MountRouter("/app", CreateAppRouter("/app")) a.Run(":" + conf.Server.Port) } diff --git a/wire_gen.go b/wire_gen.go index c538885c..b828cc04 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -31,7 +31,7 @@ func CreateAppRouter(path string) *app.Router { return router } -func CreateSubsonicAPIRouter() *subsonic.Router { +func CreateSubsonicAPIRouter() (*subsonic.Router, error) { dataStore := persistence.New() browser := engine.NewBrowser(dataStore) cover := engine.NewCover(dataStore) @@ -43,9 +43,13 @@ func CreateSubsonicAPIRouter() *subsonic.Router { scrobbler := engine.NewScrobbler(dataStore, nowPlayingRepository) search := engine.NewSearch(dataStore) fFmpeg := ffmpeg.New() - mediaStreamer := engine.NewMediaStreamer(dataStore, fFmpeg) + cache, err := engine.NewTranscodingCache() + if err != nil { + return nil, err + } + mediaStreamer := engine.NewMediaStreamer(dataStore, fFmpeg, cache) router := subsonic.New(browser, cover, listGenerator, users, playlists, ratings, scrobbler, search, mediaStreamer) - return router + return router, nil } // wire_injectors.go: diff --git a/wire_injectors.go b/wire_injectors.go index 6696473c..c477f76e 100644 --- a/wire_injectors.go +++ b/wire_injectors.go @@ -31,6 +31,6 @@ func CreateAppRouter(path string) *app.Router { panic(wire.Build(allProviders)) } -func CreateSubsonicAPIRouter() *subsonic.Router { +func CreateSubsonicAPIRouter() (*subsonic.Router, error) { panic(wire.Build(allProviders)) }