Use google/go-pipelines

This commit is contained in:
Deluan 2023-12-24 16:32:31 -05:00
parent e47ddd740a
commit 9596061464
3 changed files with 108 additions and 100 deletions

1
go.mod
View File

@ -22,6 +22,7 @@ require (
github.com/go-chi/cors v1.2.1
github.com/go-chi/httprate v0.9.0
github.com/go-chi/jwtauth/v5 v5.3.1
github.com/google/go-pipeline v0.0.0-20230411140531-6cbedfc1d3fc
github.com/google/uuid v1.6.0
github.com/google/wire v0.6.0
github.com/hashicorp/go-multierror v1.1.1

5
go.sum
View File

@ -12,6 +12,7 @@ github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oM
github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/charlievieth/fastwalk v1.0.1 h1:jW01w8OCFdKS9JvAcnI+JHhWU/FuIEmNb24Ri9p7OVg=
github.com/charlievieth/fastwalk v1.0.1/go.mod h1:dryXgMJyGHbMrAmmnF0/EJNBbZaihlwcNud5IuGyogU=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
@ -68,6 +69,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-pipeline v0.0.0-20230411140531-6cbedfc1d3fc h1:hd+uUVsB1vdxohPneMrhGH2YfQuH5hRIK9u4/XCeUtw=
github.com/google/go-pipeline v0.0.0-20230411140531-6cbedfc1d3fc/go.mod h1:SL66SJVysrh7YbDCP9tH30b8a9o/N2HeiQNUm85EKhc=
github.com/google/pprof v0.0.0-20230323073829-e72429f035bd h1:r8yyd+DJDmsUhGrRBxH5Pj7KeFK5l+Y3FsgT8keqKtk=
github.com/google/pprof v0.0.0-20230323073829-e72429f035bd/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk=
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
@ -94,6 +97,8 @@ github.com/jellydator/ttlcache/v2 v2.11.1 h1:AZGME43Eh2Vv3giG6GeqeLeFXxwxn1/qHIt
github.com/jellydator/ttlcache/v2 v2.11.1/go.mod h1:RtE5Snf0/57e+2cLWFYWCCsLas2Hy3c5Z4n14XmSvTI=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI=
github.com/karrick/godirwalk v1.17.0/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/charlievieth/fastwalk"
"github.com/google/go-pipeline/pkg/pipeline"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/request"
@ -34,128 +35,129 @@ func (s *scanner2) RescanAll(requestCtx context.Context, fullRescan bool) error
startTime := time.Now()
log.Info(ctx, "Scanner: Starting scan", "fullRescan", fullRescan, "numLibraries", len(libs))
scanCtxChan := createScanContexts(ctx, s.ds, libs, fullRescan)
folderChan, folderErrChan := walkDirEntries(ctx, scanCtxChan)
changedFolderChan, changedFolderErrChan := pl.Filter(ctx, 4, folderChan, onlyOutdated)
processedFolderChan, processedFolderErrChan := pl.Stage(ctx, 4, changedFolderChan, processFolder)
err = s.runPipeline(
pipeline.NewProducer(s.produceFolders(ctx, libs, fullRescan), pipeline.Name("read folders from disk")),
pipeline.NewStage(s.processFolder(ctx), pipeline.Name("process folder")),
pipeline.NewStage(s.logFolder(ctx), pipeline.Name("log results")),
)
logErrChan := pl.Sink(ctx, 4, processedFolderChan, func(ctx context.Context, folder *folderEntry) error {
log.Debug(ctx, "Scanner: Found folder", "folder", folder.Name(), "_path", folder.path,
"audioCount", len(folder.audioFiles), "imageCount", len(folder.imageFiles), "plsCount", len(folder.playlists))
return nil
})
// Wait for pipeline to end, return first error found
for err := range pl.Merge(ctx, folderErrChan, logErrChan, changedFolderErrChan, processedFolderErrChan) {
return err
if err != nil {
log.Error(ctx, "Scanner: Error scanning libraries", "duration", time.Since(startTime), err)
} else {
log.Info(ctx, "Scanner: Finished scanning all libraries", "duration", time.Since(startTime))
}
log.Info(ctx, "Scanner: Finished scanning all libraries", "duration", time.Since(startTime))
return nil
return err
}
func createScanContexts(ctx context.Context, ds model.DataStore, libs []model.Library, fullRescan bool) chan *scanContext {
outputChannel := make(chan *scanContext, len(libs))
func (s *scanner2) runPipeline(producer pipeline.Producer[*folderEntry], stages ...pipeline.Stage[*folderEntry]) error {
if log.CurrentLevel() >= log.LevelDebug {
metrics, err := pipeline.Measure(producer, stages...)
log.Trace(metrics.String())
return err
}
return pipeline.Do(producer, stages...)
}
func (s *scanner2) logFolder(ctx context.Context) func(folder *folderEntry) (out *folderEntry, err error) {
return func(folder *folderEntry) (out *folderEntry, err error) {
log.Debug(ctx, "Scanner: Found folder", "folder", folder.Name(), "_path", folder.path,
"audioCount", len(folder.audioFiles), "imageCount", len(folder.imageFiles), "plsCount", len(folder.playlists))
return folder, nil
}
}
func (s *scanner2) produceFolders(ctx context.Context, libs []model.Library, fullRescan bool) pipeline.ProducerFn[*folderEntry] {
scanCtxChan := make(chan *scanContext, len(libs))
go func() {
defer close(outputChannel)
defer close(scanCtxChan)
for _, lib := range libs {
scanCtx, err := newScannerContext(ctx, ds, lib, fullRescan)
scanCtx, err := newScannerContext(ctx, s.ds, lib, fullRescan)
if err != nil {
log.Error(ctx, "Scanner: Error creating scan context", "lib", lib.Name, err)
continue
}
outputChannel <- scanCtx
scanCtxChan <- scanCtx
}
}()
return outputChannel
}
return func(put func(entry *folderEntry)) error {
outputChan := make(chan *folderEntry)
go func() {
defer close(outputChan)
for scanCtx := range pl.ReadOrDone(ctx, scanCtxChan) {
conf := &fastwalk.Config{Follow: true}
// lib.Path
err := fastwalk.Walk(conf, scanCtx.lib.Path, func(path string, d fs.DirEntry, err error) error {
if err != nil {
log.Warn(ctx, "Scanner: Error walking path", "lib", scanCtx.lib.Name, "path", path, err)
return nil
}
func walkDirEntries(ctx context.Context, libsChan <-chan *scanContext) (chan *folderEntry, chan error) {
outputChannel := make(chan *folderEntry)
errChannel := make(chan error)
go func() {
defer close(outputChannel)
defer close(errChannel)
errChan := pl.Sink(ctx, 1, libsChan, func(ctx context.Context, scanCtx *scanContext) error {
conf := &fastwalk.Config{Follow: true}
// lib.Path
err := fastwalk.Walk(conf, scanCtx.lib.Path, func(path string, d fs.DirEntry, err error) error {
// Skip non-directories
if !d.IsDir() {
return nil
}
// Load all pertinent info from directory
folder, _, err := loadDir(ctx, scanCtx, path, d.(fastwalk.DirEntry))
if err != nil {
log.Warn(ctx, "Scanner: Error loading dir", "lib", scanCtx.lib.Name, "path", path, err)
return nil
}
outputChan <- folder
return nil
})
if err != nil {
log.Warn(ctx, "Scanner: Error walking path", "lib", scanCtx.lib.Name, "path", path, err)
return nil
log.Warn(ctx, "Scanner: Error scanning library", "lib", scanCtx.lib.Name, err)
}
// Skip non-directories
if !d.IsDir() {
return nil
}
// Load all pertinent info from directory
folder, _, err := loadDir(ctx, scanCtx, path, d.(fastwalk.DirEntry))
if err != nil {
log.Warn(ctx, "Scanner: Error loading dir", "lib", scanCtx.lib.Name, "path", path, err)
return nil
}
outputChannel <- folder
return nil
})
if err != nil {
log.Warn(ctx, "Scanner: Error scanning library", "lib", scanCtx.lib.Name, err)
}
return nil
})
// Wait for pipeline to end, and forward any errors
for err := range pl.ReadOrDone(ctx, errChan) {
select {
case errChannel <- err:
default:
}
}()
var total int
for folder := range pl.ReadOrDone(ctx, outputChan) {
total++
put(folder)
}
}()
return outputChannel, errChannel
log.Info(ctx, "Scanner: Finished loading all folders", "numFolders", total)
return nil
}
}
// onlyOutdated returns a filter function that returns true if the folder is outdated (needs to be scanned)
func onlyOutdated(_ context.Context, entry *folderEntry) (bool, error) {
return entry.scanCtx.fullRescan || entry.isExpired(), nil
}
func (s *scanner2) processFolder(ctx context.Context) pipeline.StageFn[*folderEntry] {
return func(entry *folderEntry) (*folderEntry, error) {
// Load children mediafiles from DB
mfs, err := entry.scanCtx.ds.MediaFile(ctx).GetByFolder(entry.id)
if err != nil {
log.Warn(ctx, "Scanner: Error loading mediafiles from DB. Skipping", "folder", entry.path, err)
return entry, nil
}
dbTracks := slice.ToMap(mfs, func(mf model.MediaFile) (string, model.MediaFile) { return mf.Path, mf })
// Get list of files to import, leave dbTracks with tracks to be removed
var filesToImport []string
for afPath, af := range entry.audioFiles {
dbTrack, foundInDB := dbTracks[afPath]
if !foundInDB || entry.scanCtx.fullRescan {
filesToImport = append(filesToImport, afPath)
} else {
info, err := af.Info()
if err != nil {
log.Warn(ctx, "Scanner: Error getting file info", "folder", entry.path, "file", af.Name(), err)
return nil, err
}
if info.ModTime().After(dbTrack.UpdatedAt) {
filesToImport = append(filesToImport, afPath)
}
}
delete(dbTracks, afPath)
}
//tracksToRemove := dbTracks // Just to name things properly
// Load tags from files to import
// Add new/updated files to DB
// Remove deleted mediafiles from DB
// Update folder info in DB
func processFolder(ctx context.Context, entry *folderEntry) (*folderEntry, error) {
// Load children mediafiles from DB
mfs, err := entry.scanCtx.ds.MediaFile(ctx).GetByFolder(entry.id)
if err != nil {
log.Warn(ctx, "Scanner: Error loading mediafiles from DB. Skipping", "folder", entry.path, err)
return entry, nil
}
dbTracks := slice.ToMap(mfs, func(mf model.MediaFile) (string, model.MediaFile) { return mf.Path, mf })
// Get list of files to import, leave dbTracks with tracks to be removed
var filesToImport []string
for afPath, af := range entry.audioFiles {
dbTrack, foundInDB := dbTracks[afPath]
if !foundInDB || entry.scanCtx.fullRescan {
filesToImport = append(filesToImport, afPath)
} else {
info, err := af.Info()
if err != nil {
log.Warn(ctx, "Scanner: Error getting file info", "folder", entry.path, "file", af.Name(), err)
return nil, err
}
if info.ModTime().After(dbTrack.UpdatedAt) {
filesToImport = append(filesToImport, afPath)
}
}
delete(dbTracks, afPath)
}
//tracksToRemove := dbTracks // Just to name things properly
// Load tags from files to import
// Add new/updated files to DB
// Remove deleted mediafiles from DB
// Update folder info in DB
return entry, nil
}
func (s *scanner2) Status(context.Context) (*scanner.StatusInfo, error) {