feat: add a proper caching system to the transcoding functionality

This commit is contained in:
Deluan 2020-02-20 19:08:10 -05:00
parent fc14e346b9
commit a6b0c57ce0
9 changed files with 131 additions and 140 deletions

View File

@ -13,20 +13,21 @@ import (
) )
type nd struct { type nd struct {
Port string `default:"4533"` Port string `default:"4533"`
MusicFolder string `default:"./music"` MusicFolder string `default:"./music"`
DataFolder string `default:"./"` DataFolder string `default:"./"`
DbPath string ScanInterval string `default:"1m"`
LogLevel string `default:"info"` DbPath string
LogLevel string `default:"info"`
IgnoredArticles string `default:"The El La Los Las Le Les Os As O A"` IgnoredArticles string `default:"The El La Los Las Le Les Os As O A"`
IndexGroups string `default:"A B C D E F G H I J K L M N O P Q R S T U V W X-Z(XYZ) [Unknown]([)"` IndexGroups string `default:"A B C D E F G H I J K L M N O P Q R S T U V W X-Z(XYZ) [Unknown]([)"`
EnableDownsampling bool `default:"false"` EnableDownsampling bool `default:"false"`
MaxBitRate int `default:"0"` MaxBitRate int `default:"0"`
DownsampleCommand string `default:"ffmpeg -i %s -map 0:0 -b:a %bk -v 0 -f mp3 -"` MaxTranscodingCacheSize int64 `default:"100000000"` // 100MB
ProbeCommand string `default:"ffmpeg -i %s -f ffmetadata"` DownsampleCommand string `default:"ffmpeg -i %s -map 0:0 -b:a %bk -v 0 -f mp3 -"`
ScanInterval string `default:"1m"` ProbeCommand string `default:"ffmpeg -i %s -f ffmetadata"`
// DevFlags. These are used to enable/disable debugging and incomplete features // DevFlags. These are used to enable/disable debugging and incomplete features
DevDisableBanner bool `default:"false"` DevDisableBanner bool `default:"false"`

View File

@ -6,48 +6,42 @@ import (
"io" "io"
"net/http" "net/http"
"os" "os"
"path/filepath"
"strings" "strings"
"time" "time"
"github.com/deluan/navidrome/conf" "github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/consts"
"github.com/deluan/navidrome/engine/ffmpeg" "github.com/deluan/navidrome/engine/ffmpeg"
"github.com/deluan/navidrome/log" "github.com/deluan/navidrome/log"
"github.com/deluan/navidrome/model" "github.com/deluan/navidrome/model"
"github.com/deluan/navidrome/utils" "github.com/deluan/navidrome/utils"
"gopkg.in/djherbis/fscache.v0"
) )
type MediaStreamer interface { type MediaStreamer interface {
NewFileSystem(ctx context.Context, maxBitRate int, format string) (http.FileSystem, error) NewFileSystem(ctx context.Context, maxBitRate int, format string) (http.FileSystem, error)
} }
func NewMediaStreamer(ds model.DataStore, ffm ffmpeg.FFmpeg) MediaStreamer { func NewMediaStreamer(ds model.DataStore, ffm ffmpeg.FFmpeg, cache fscache.Cache) MediaStreamer {
return &mediaStreamer{ds: ds, ffm: ffm} return &mediaStreamer{ds: ds, ffm: ffm, cache: cache}
} }
type mediaStreamer struct { type mediaStreamer struct {
ds model.DataStore ds model.DataStore
ffm ffmpeg.FFmpeg ffm ffmpeg.FFmpeg
cache fscache.Cache
} }
func (ms *mediaStreamer) NewFileSystem(ctx context.Context, maxBitRate int, format string) (http.FileSystem, error) { func (ms *mediaStreamer) NewFileSystem(ctx context.Context, maxBitRate int, format string) (http.FileSystem, error) {
cacheFolder := filepath.Join(conf.Server.DataFolder, consts.CacheDir) return &mediaFileSystem{ctx: ctx, ds: ms.ds, ffm: ms.ffm, cache: ms.cache, maxBitRate: maxBitRate, format: format}, nil
err := os.MkdirAll(cacheFolder, 0755)
if err != nil {
log.Error("Could not create cache folder", "folder", cacheFolder, err)
return nil, err
}
return &mediaFileSystem{ctx: ctx, ds: ms.ds, ffm: ms.ffm, maxBitRate: maxBitRate, format: format, cacheFolder: cacheFolder}, nil
} }
type mediaFileSystem struct { type mediaFileSystem struct {
ctx context.Context ctx context.Context
ds model.DataStore ds model.DataStore
maxBitRate int maxBitRate int
format string format string
cacheFolder string ffm ffmpeg.FFmpeg
ffm ffmpeg.FFmpeg cache fscache.Cache
} }
func (fs *mediaFileSystem) selectTranscodingOptions(mf *model.MediaFile) (string, int) { func (fs *mediaFileSystem) selectTranscodingOptions(mf *model.MediaFile) (string, int) {
@ -93,115 +87,87 @@ func (fs *mediaFileSystem) Open(name string) (http.File, error) {
return os.Open(mf.Path) return os.Open(mf.Path)
} }
cachedFile := fs.cacheFilePath(mf, bitRate, format)
if _, err := os.Stat(cachedFile); !os.IsNotExist(err) {
log.Debug(fs.ctx, "Streaming cached transcoded", "id", mf.ID, "path", mf.Path,
"requestBitrate", bitRate, "requestFormat", format,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix)
return os.Open(cachedFile)
}
log.Debug(fs.ctx, "Streaming transcoded file", "id", mf.ID, "path", mf.Path, log.Debug(fs.ctx, "Streaming transcoded file", "id", mf.ID, "path", mf.Path,
"requestBitrate", bitRate, "requestFormat", format, "requestBitrate", bitRate, "requestFormat", format,
"originalBitrate", mf.BitRate, "originalFormat", mf.Suffix) "originalBitrate", mf.BitRate, "originalFormat", mf.Suffix)
return fs.transcodeFile(mf, bitRate, format, cachedFile) return fs.transcodeFile(mf, bitRate, format)
} }
func (fs *mediaFileSystem) cacheFilePath(mf *model.MediaFile, bitRate int, format string) string { func (fs *mediaFileSystem) transcodeFile(mf *model.MediaFile, bitRate int, format string) (*transcodingFile, error) {
// Break the cache in subfolders, to avoid too many files in the same folder key := fmt.Sprintf("%s.%d.%s", mf.ID, bitRate, format)
subDir := strings.ToLower(mf.ID[:2]) r, w, err := fs.cache.Get(key)
subDir = filepath.Join(fs.cacheFolder, subDir) if err != nil {
// Make sure the subfolder to exist log.Error("Error creating stream caching buffer", "id", mf.ID, err)
os.Mkdir(subDir, 0755) return nil, os.ErrInvalid
return filepath.Join(subDir, fmt.Sprintf("%s.%d.%s", mf.ID, bitRate, format)) }
// If it is a new file (not found in the cached), start a new transcoding session
if w != nil {
log.Debug("File not found in cache. Starting new transcoding session", "id", mf.ID)
out, err := fs.ffm.StartTranscoding(fs.ctx, mf.Path, bitRate, format)
if err != nil {
log.Error("Error starting transcoder", "id", mf.ID, err)
return nil, os.ErrInvalid
}
go func() {
io.Copy(w, out)
out.Close()
w.Close()
}()
} else {
log.Debug("Reading transcoded file from cache", "id", mf.ID)
}
return newTranscodingFile(fs.ctx, r, mf, bitRate), nil
} }
func (fs *mediaFileSystem) transcodeFile(mf *model.MediaFile, bitRate int, format, cacheFile string) (*transcodingFile, error) { // transcodingFile Implements http.File interface, required for the FileSystem. It needs a Closer, a Reader and
out, err := fs.ffm.StartTranscoding(fs.ctx, mf.Path, bitRate, format) // a Seeker for the same stream. Because the fscache package only provides a ReaderAtCloser (without the Seek()
if err != nil { // method), we wrap that reader with a SectionReader, which provides a Seek(). But we still need the original
log.Error("Error starting transcoder", "id", mf.ID, err) // reader, as we need to close the stream when the transfer is complete
return nil, os.ErrInvalid func newTranscodingFile(ctx context.Context, reader fscache.ReadAtCloser,
mf *model.MediaFile, bitRate int) *transcodingFile {
size := int64(mf.Duration*float32(bitRate*1000)) / 8
return &transcodingFile{
ctx: ctx,
mf: mf,
bitRate: bitRate,
size: size,
closer: reader,
ReadSeeker: io.NewSectionReader(reader, 0, size),
} }
buf, err := newStreamBuffer(cacheFile)
if err != nil {
log.Error("Error creating stream buffer", "id", mf.ID, err)
return nil, os.ErrInvalid
}
r, err := buf.NewReader()
if err != nil {
log.Error("Error opening stream reader", "id", mf.ID, err)
return nil, os.ErrInvalid
}
go func() {
io.Copy(buf, out)
out.Close()
buf.Sync()
buf.Close()
}()
s := &transcodingFile{
ctx: fs.ctx,
mf: mf,
bitRate: bitRate,
}
s.File = r
return s, nil
} }
type transcodingFile struct { type transcodingFile struct {
ctx context.Context ctx context.Context
mf *model.MediaFile mf *model.MediaFile
bitRate int bitRate int
http.File size int64
closer io.Closer
io.ReadSeeker
} }
func (h *transcodingFile) Stat() (os.FileInfo, error) { func (tf *transcodingFile) Stat() (os.FileInfo, error) {
return &streamHandlerFileInfo{mf: h.mf, bitRate: h.bitRate}, nil return &streamHandlerFileInfo{f: tf}, nil
} }
// Don't return EOF, just wait for more data. When the request ends, this "File" will be closed, and then func (tf *transcodingFile) Close() error {
// the Read will be interrupted return tf.closer.Close()
func (h *transcodingFile) Read(b []byte) (int, error) { }
for {
n, err := h.File.Read(b) func (tf *transcodingFile) Readdir(count int) ([]os.FileInfo, error) {
if n > 0 { return nil, nil
return n, nil
} else if err != io.EOF {
return n, err
}
time.Sleep(100 * time.Millisecond)
}
} }
type streamHandlerFileInfo struct { type streamHandlerFileInfo struct {
mf *model.MediaFile f *transcodingFile
bitRate int
} }
func (f *streamHandlerFileInfo) Name() string { return f.mf.Title } func (fi *streamHandlerFileInfo) Name() string { return fi.f.mf.Title }
func (f *streamHandlerFileInfo) Size() int64 { return int64(f.mf.Duration*float32(f.bitRate*1000)) / 8 } func (fi *streamHandlerFileInfo) ModTime() time.Time { return fi.f.mf.UpdatedAt }
func (f *streamHandlerFileInfo) Mode() os.FileMode { return os.FileMode(0777) } func (fi *streamHandlerFileInfo) Size() int64 { return fi.f.size }
func (f *streamHandlerFileInfo) ModTime() time.Time { return f.mf.UpdatedAt } func (fi *streamHandlerFileInfo) Mode() os.FileMode { return os.FileMode(0777) }
func (f *streamHandlerFileInfo) IsDir() bool { return false } func (fi *streamHandlerFileInfo) IsDir() bool { return false }
func (f *streamHandlerFileInfo) Sys() interface{} { return nil } func (fi *streamHandlerFileInfo) Sys() interface{} { return nil }
// From: https://stackoverflow.com/a/44322300
type streamBuffer struct {
*os.File
}
func (mb *streamBuffer) NewReader() (http.File, error) {
f, err := os.Open(mb.Name())
if err != nil {
return nil, err
}
return f, nil
}
func newStreamBuffer(name string) (*streamBuffer, error) {
f, err := os.Create(name)
if err != nil {
return nil, err
}
return &streamBuffer{File: f}, nil
}

View File

@ -14,32 +14,22 @@ import (
"github.com/deluan/navidrome/persistence" "github.com/deluan/navidrome/persistence"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"gopkg.in/djherbis/fscache.v0"
) )
var _ = Describe("MediaStreamer", func() { var _ = Describe("MediaStreamer", func() {
var streamer MediaStreamer var streamer MediaStreamer
var ds model.DataStore var ds model.DataStore
var tempDir string
ctx := log.NewContext(nil) ctx := log.NewContext(nil)
BeforeSuite(func() {
conf.Server.EnableDownsampling = true
tempDir, err := ioutil.TempDir("", "stream_tests")
if err != nil {
panic(err)
}
conf.Server.DataFolder = tempDir
})
BeforeEach(func() { BeforeEach(func() {
conf.Server.EnableDownsampling = true
fs := fscache.NewMemFs()
cache, _ := fscache.NewCache(fs, nil)
ds = &persistence.MockDataStore{} ds = &persistence.MockDataStore{}
ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "bitRate": 128}]`, 1) ds.MediaFile(ctx).(*persistence.MockMediaFile).SetData(`[{"id": "123", "path": "tests/fixtures/test.mp3", "bitRate": 128}]`, 1)
streamer = NewMediaStreamer(ds, &fakeFFmpeg{}) streamer = NewMediaStreamer(ds, &fakeFFmpeg{}, cache)
})
AfterSuite(func() {
os.RemoveAll(tempDir)
}) })
getFile := func(id string, maxBitRate int, format string) (http.File, error) { getFile := func(id string, maxBitRate int, format string) (http.File, error) {
@ -63,9 +53,6 @@ var _ = Describe("MediaStreamer", func() {
Expect(s).To(BeAssignableToTypeOf(&transcodingFile{})) Expect(s).To(BeAssignableToTypeOf(&transcodingFile{}))
Expect(s.(*transcodingFile).bitRate).To(Equal(64)) Expect(s.(*transcodingFile).bitRate).To(Equal(64))
}) })
It("returns a File if the transcoding is cached", func() {
Expect(getFile("123", 64, "mp3")).To(BeAssignableToTypeOf(&os.File{}))
})
}) })
}) })

View File

@ -1,8 +1,14 @@
package engine package engine
import ( import (
"path/filepath"
"time"
"github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/consts"
"github.com/deluan/navidrome/engine/ffmpeg" "github.com/deluan/navidrome/engine/ffmpeg"
"github.com/google/wire" "github.com/google/wire"
"gopkg.in/djherbis/fscache.v0"
) )
var Set = wire.NewSet( var Set = wire.NewSet(
@ -17,4 +23,16 @@ var Set = wire.NewSet(
NewUsers, NewUsers,
NewMediaStreamer, NewMediaStreamer,
ffmpeg.New, ffmpeg.New,
NewTranscodingCache,
) )
func NewTranscodingCache() (fscache.Cache, error) {
lru := fscache.NewLRUHaunter(0, conf.Server.MaxTranscodingCacheSize, 30*time.Second)
h := fscache.NewLRUHaunterStrategy(lru)
cacheFolder := filepath.Join(conf.Server.DataFolder, consts.CacheDir)
fs, err := fscache.NewFs(cacheFolder, 0755)
if err != nil {
return nil, err
}
return fscache.NewCacheWithHaunter(fs, h)
}

3
go.mod
View File

@ -38,5 +38,8 @@ require (
golang.org/x/text v0.3.2 // indirect golang.org/x/text v0.3.2 // indirect
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/djherbis/atime.v1 v1.0.0 // indirect
gopkg.in/djherbis/fscache.v0 v0.9.0
gopkg.in/djherbis/stream.v1 v1.2.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect gopkg.in/yaml.v2 v2.2.8 // indirect
) )

6
go.sum
View File

@ -172,6 +172,12 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/djherbis/atime.v1 v1.0.0 h1:eMRqB/JrLKocla2PBPKgQYg/p5UG4L6AUAs92aP7F60=
gopkg.in/djherbis/atime.v1 v1.0.0/go.mod h1:hQIUStKmJfvf7xdh/wtK84qe+DsTV5LnA9lzxxtPpJ8=
gopkg.in/djherbis/fscache.v0 v0.9.0 h1:CBmOlHQKg99q0xATpQpSNAR970UN4vECB5SjzkuyLe0=
gopkg.in/djherbis/fscache.v0 v0.9.0/go.mod h1:izqJMuO+STCEMBEGFiwW5zPlamuiUOxMRpNzHT5cQHc=
gopkg.in/djherbis/stream.v1 v1.2.0 h1:3tZuXO+RK8opjw8/BJr780h+eAPwOFfLHCKRKyYxk3s=
gopkg.in/djherbis/stream.v1 v1.2.0/go.mod h1:aEV8CBVRmSpLamVJfM903Npic1IKmb2qS30VAZ+sssg=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=

View File

@ -1,6 +1,8 @@
package main package main
import ( import (
"fmt"
"github.com/deluan/navidrome/conf" "github.com/deluan/navidrome/conf"
"github.com/deluan/navidrome/consts" "github.com/deluan/navidrome/consts"
"github.com/deluan/navidrome/db" "github.com/deluan/navidrome/db"
@ -14,8 +16,12 @@ func main() {
conf.Load() conf.Load()
db.EnsureLatestVersion() db.EnsureLatestVersion()
subsonic, err := CreateSubsonicAPIRouter()
if err != nil {
panic(fmt.Sprintf("Could not create the Subsonic API router. Aborting! err=%v", err))
}
a := CreateServer(conf.Server.MusicFolder) a := CreateServer(conf.Server.MusicFolder)
a.MountRouter("/rest", CreateSubsonicAPIRouter()) a.MountRouter("/rest", subsonic)
a.MountRouter("/app", CreateAppRouter("/app")) a.MountRouter("/app", CreateAppRouter("/app"))
a.Run(":" + conf.Server.Port) a.Run(":" + conf.Server.Port)
} }

View File

@ -31,7 +31,7 @@ func CreateAppRouter(path string) *app.Router {
return router return router
} }
func CreateSubsonicAPIRouter() *subsonic.Router { func CreateSubsonicAPIRouter() (*subsonic.Router, error) {
dataStore := persistence.New() dataStore := persistence.New()
browser := engine.NewBrowser(dataStore) browser := engine.NewBrowser(dataStore)
cover := engine.NewCover(dataStore) cover := engine.NewCover(dataStore)
@ -43,9 +43,13 @@ func CreateSubsonicAPIRouter() *subsonic.Router {
scrobbler := engine.NewScrobbler(dataStore, nowPlayingRepository) scrobbler := engine.NewScrobbler(dataStore, nowPlayingRepository)
search := engine.NewSearch(dataStore) search := engine.NewSearch(dataStore)
fFmpeg := ffmpeg.New() fFmpeg := ffmpeg.New()
mediaStreamer := engine.NewMediaStreamer(dataStore, fFmpeg) cache, err := engine.NewTranscodingCache()
if err != nil {
return nil, err
}
mediaStreamer := engine.NewMediaStreamer(dataStore, fFmpeg, cache)
router := subsonic.New(browser, cover, listGenerator, users, playlists, ratings, scrobbler, search, mediaStreamer) router := subsonic.New(browser, cover, listGenerator, users, playlists, ratings, scrobbler, search, mediaStreamer)
return router return router, nil
} }
// wire_injectors.go: // wire_injectors.go:

View File

@ -31,6 +31,6 @@ func CreateAppRouter(path string) *app.Router {
panic(wire.Build(allProviders)) panic(wire.Build(allProviders))
} }
func CreateSubsonicAPIRouter() *subsonic.Router { func CreateSubsonicAPIRouter() (*subsonic.Router, error) {
panic(wire.Build(allProviders)) panic(wire.Build(allProviders))
} }