Replace fastwalk with custom walk_dir_tree

This commit is contained in:
Deluan 2023-12-25 19:11:31 -05:00
parent 9596061464
commit 0e8042d344
9 changed files with 275 additions and 233 deletions

1
go.mod
View File

@ -6,7 +6,6 @@ require (
github.com/Masterminds/squirrel v1.5.4
github.com/RaveNoX/go-jsoncommentstrip v1.0.0
github.com/bradleyjkemp/cupaloy/v2 v2.8.0
github.com/charlievieth/fastwalk v1.0.1
github.com/deluan/rest v0.0.0-20211102003136-6260bc399cbf
github.com/deluan/sanitize v0.0.0-20230310221930-6e18967d9fc1
github.com/dexterlb/mpvipc v0.0.0-20230829142118-145d6eabdc37

4
go.sum
View File

@ -12,8 +12,6 @@ github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oM
github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/charlievieth/fastwalk v1.0.1 h1:jW01w8OCFdKS9JvAcnI+JHhWU/FuIEmNb24Ri9p7OVg=
github.com/charlievieth/fastwalk v1.0.1/go.mod h1:dryXgMJyGHbMrAmmnF0/EJNBbZaihlwcNud5IuGyogU=
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -97,8 +95,6 @@ github.com/jellydator/ttlcache/v2 v2.11.1 h1:AZGME43Eh2Vv3giG6GeqeLeFXxwxn1/qHIt
github.com/jellydator/ttlcache/v2 v2.11.1/go.mod h1:RtE5Snf0/57e+2cLWFYWCCsLas2Hy3c5Z4n14XmSvTI=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/karrick/godirwalk v1.17.0 h1:b4kY7nqDdioR/6qnbHQyDvmA17u5G1cZ6J+CZXwSWoI=
github.com/karrick/godirwalk v1.17.0/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=

View File

@ -274,6 +274,8 @@ func addFields(logger *logrus.Entry, keyValuePairs []interface{}) *logrus.Entry
} else {
logger = logger.WithField(name, v.String())
}
case []string:
logger = logger.WithField(name, strings.Join(v, ","))
default:
logger = logger.WithField(name, v)
}

View File

@ -102,7 +102,7 @@ func AddValues(ctx, requestCtx context.Context) context.Context {
ClientUniqueId,
}
for _, key := range keys {
if v, ok := requestCtx.Value(key).(string); ok {
if v := requestCtx.Value(key); v != nil {
ctx = context.WithValue(ctx, key, v)
}
}

View File

@ -1,20 +1,11 @@
package scanner2
import (
"context"
"io/fs"
"os"
"path/filepath"
"sort"
"time"
"github.com/charlievieth/fastwalk"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
)
type folderEntry struct {
fastwalk.DirEntry
scanCtx *scanContext
path string // Full path
id string // DB ID
@ -29,122 +20,3 @@ type folderEntry struct {
func (f *folderEntry) isExpired() bool {
return f.updTime.Before(f.modTime)
}
func loadDir(ctx context.Context, scanCtx *scanContext, dirPath string, d fastwalk.DirEntry) (folder *folderEntry, children []string, err error) {
folder = &folderEntry{DirEntry: d, scanCtx: scanCtx, path: dirPath}
folder.id = model.FolderID(scanCtx.lib, dirPath)
folder.updTime = scanCtx.getLastUpdatedInDB(folder.id)
folder.audioFiles = make(map[string]fs.DirEntry)
folder.imageFiles = make(map[string]fs.DirEntry)
dirInfo, err := d.Stat()
if err != nil {
log.Error(ctx, "Error stating dir", "path", dirPath, err)
return nil, nil, err
}
folder.modTime = dirInfo.ModTime()
dir, err := os.Open(dirPath)
if err != nil {
log.Error(ctx, "Error in Opening directory", "path", dirPath, err)
return folder, children, err
}
defer dir.Close()
for _, entry := range fullReadDir(ctx, dir) {
isDir, err := isDirOrSymlinkToDir(dirPath, entry)
// Skip invalid symlinks
if err != nil {
log.Error(ctx, "Invalid symlink", "dir", filepath.Join(dirPath, entry.Name()), err)
continue
}
if isDir && isDirReadable(ctx, dirPath, entry) {
children = append(children, filepath.Join(dirPath, entry.Name()))
} else {
fileInfo, err := entry.Info()
if err != nil {
log.Error(ctx, "Error getting fileInfo", "name", entry.Name(), err)
return folder, children, err
}
if fileInfo.ModTime().After(folder.modTime) {
folder.modTime = fileInfo.ModTime()
}
filePath := filepath.Join(dirPath, entry.Name())
switch {
case model.IsAudioFile(entry.Name()):
folder.audioFiles[filePath] = entry
case model.IsValidPlaylist(entry.Name()):
folder.playlists = append(folder.playlists, entry)
case model.IsImageFile(entry.Name()):
folder.imageFiles[filePath] = entry
if fileInfo.ModTime().After(folder.imagesUpdatedAt) {
folder.imagesUpdatedAt = fileInfo.ModTime()
}
}
}
}
return folder, children, nil
}
// fullReadDir reads all files in the folder, skipping the ones with errors.
// It also detects when it is "stuck" with an error in the same directory over and over.
// In this case, it stops and returns whatever it was able to read until it got stuck.
// See discussion here: https://github.com/navidrome/navidrome/issues/1164#issuecomment-881922850
func fullReadDir(ctx context.Context, dir fs.ReadDirFile) []fs.DirEntry {
var allEntries []fs.DirEntry
var prevErrStr = ""
for {
entries, err := dir.ReadDir(-1)
allEntries = append(allEntries, entries...)
if err == nil {
break
}
log.Warn(ctx, "Skipping DirEntry", err)
if prevErrStr == err.Error() {
log.Error(ctx, "Duplicate DirEntry failure, bailing", err)
break
}
prevErrStr = err.Error()
}
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].Name() < allEntries[j].Name() })
return allEntries
}
// isDirOrSymlinkToDir returns true if and only if the dirEnt represents a file
// system directory, or a symbolic link to a directory. Note that if the dirEnt
// is not a directory but is a symbolic link, this method will resolve by
// sending a request to the operating system to follow the symbolic link.
// originally copied from github.com/karrick/godirwalk, modified to use dirEntry for
// efficiency for go 1.16 and beyond
func isDirOrSymlinkToDir(baseDir string, dirEnt fs.DirEntry) (bool, error) {
if dirEnt.IsDir() {
return true, nil
}
if dirEnt.Type()&os.ModeSymlink == 0 {
return false, nil
}
// Does this symlink point to a directory?
fileInfo, err := os.Stat(filepath.Join(baseDir, dirEnt.Name()))
if err != nil {
return false, err
}
return fileInfo.IsDir(), nil
}
// isDirReadable returns true if the directory represented by dirEnt is readable
func isDirReadable(ctx context.Context, baseDir string, dirEnt fs.DirEntry) bool {
path := filepath.Join(baseDir, dirEnt.Name())
dir, err := os.Open(path)
if err != nil {
log.Warn("Skipping unreadable directory", "path", path, err)
return false
}
err = dir.Close()
if err != nil {
log.Warn(ctx, "Error closing directory", "path", path, err)
}
return true
}

View File

@ -0,0 +1,49 @@
package scanner2
import (
"context"
"github.com/google/go-pipeline/pkg/pipeline"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/utils/slice"
)
func (s *scanner2) processFolder(ctx context.Context) pipeline.StageFn[*folderEntry] {
return func(entry *folderEntry) (*folderEntry, error) {
// Load children mediafiles from DB
mfs, err := entry.scanCtx.ds.MediaFile(ctx).GetByFolder(entry.id)
if err != nil {
log.Warn(ctx, "Scanner: Error loading mediafiles from DB. Skipping", "folder", entry.path, err)
return entry, nil
}
dbTracks := slice.ToMap(mfs, func(mf model.MediaFile) (string, model.MediaFile) { return mf.Path, mf })
// Get list of files to import, leave dbTracks with tracks to be removed
var filesToImport []string
for afPath, af := range entry.audioFiles {
dbTrack, foundInDB := dbTracks[afPath]
if !foundInDB || entry.scanCtx.fullRescan {
filesToImport = append(filesToImport, afPath)
} else {
info, err := af.Info()
if err != nil {
log.Warn(ctx, "Scanner: Error getting file info", "folder", entry.path, "file", af.Name(), err)
return nil, err
}
if info.ModTime().After(dbTrack.UpdatedAt) {
filesToImport = append(filesToImport, afPath)
}
}
delete(dbTracks, afPath)
}
//tracksToRemove := dbTracks // Just to name things properly
// Load tags from files to import
// Add new/updated files to DB
// Remove deleted mediafiles from DB
// Update folder info in DB
return entry, nil
}
}

220
scanner2/produce_folders.go Normal file
View File

@ -0,0 +1,220 @@
package scanner2
import (
"context"
"io/fs"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"github.com/google/go-pipeline/pkg/pipeline"
"github.com/navidrome/navidrome/consts"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/utils/pl"
"golang.org/x/exp/maps"
)
func (s *scanner2) produceFolders(ctx context.Context, libs []model.Library, fullRescan bool) pipeline.ProducerFn[*folderEntry] {
scanCtxChan := make(chan *scanContext, len(libs))
go func() {
defer close(scanCtxChan)
for _, lib := range libs {
scanCtx, err := newScannerContext(ctx, s.ds, lib, fullRescan)
if err != nil {
log.Error(ctx, "Scanner: Error creating scan context", "lib", lib.Name, err)
continue
}
scanCtxChan <- scanCtx
}
}()
return func(put func(entry *folderEntry)) error {
var total int64
for scanCtx := range pl.ReadOrDone(ctx, scanCtxChan) {
outputChan, err := walkDirTree(ctx, scanCtx)
if err != nil {
log.Warn(ctx, "Scanner: Error scanning library", "lib", scanCtx.lib.Name, err)
}
for folder := range pl.ReadOrDone(ctx, outputChan) {
put(folder)
}
total += scanCtx.numFolders.Load()
}
log.Info(ctx, "Scanner: Finished loading all folders", "numFolders", total)
return nil
}
}
func walkDirTree(ctx context.Context, scanCtx *scanContext) (<-chan *folderEntry, error) {
results := make(chan *folderEntry)
go func() {
defer close(results)
rootFolder := scanCtx.lib.Path
err := walkFolder(ctx, scanCtx, rootFolder, results)
if err != nil {
log.Error(ctx, "Scanner: There were errors reading directories from filesystem", "path", rootFolder, err)
return
}
log.Debug(ctx, "Scanner: Finished reading folders", "lib", scanCtx.lib.Name, "path", rootFolder, "numFolders", scanCtx.numFolders.Load())
}()
return results, nil
}
func walkFolder(ctx context.Context, scanCtx *scanContext, currentFolder string, results chan<- *folderEntry) error {
folder, children, err := loadDir(ctx, scanCtx, currentFolder)
if err != nil {
return err
}
scanCtx.numFolders.Add(1)
for _, c := range children {
err := walkFolder(ctx, scanCtx, c, results)
if err != nil {
return err
}
}
dir := filepath.Clean(currentFolder)
log.Trace(ctx, "Scanner: Found directory", "_path", dir, "audioFiles", maps.Keys(folder.audioFiles),
"images", maps.Keys(folder.imageFiles), "playlists", folder.playlists, "imagesUpdatedAt", folder.imagesUpdatedAt,
"updTime", folder.updTime, "modTime", folder.modTime, "numChildren", len(children))
folder.path = dir
results <- folder
return nil
}
func loadDir(ctx context.Context, scanCtx *scanContext, dirPath string) (folder *folderEntry, children []string, err error) {
folder = &folderEntry{scanCtx: scanCtx, path: dirPath}
folder.id = model.FolderID(scanCtx.lib, dirPath)
folder.updTime = scanCtx.getLastUpdatedInDB(folder.id)
folder.audioFiles = make(map[string]fs.DirEntry)
folder.imageFiles = make(map[string]fs.DirEntry)
dirInfo, err := os.Stat(dirPath)
if err != nil {
log.Error(ctx, "Scanner: Error stating dir", "path", dirPath, err)
return nil, nil, err
}
folder.modTime = dirInfo.ModTime()
dir, err := os.Open(dirPath)
if err != nil {
log.Error(ctx, "Scanner: Error in Opening directory", "path", dirPath, err)
return folder, children, err
}
defer dir.Close()
for _, entry := range fullReadDir(ctx, dir) {
isDir, err := isDirOrSymlinkToDir(dirPath, entry)
// Skip invalid symlinks
if err != nil {
log.Error(ctx, "Scanner: Invalid symlink", "dir", filepath.Join(dirPath, entry.Name()), err)
continue
}
if isDir && !isDirIgnored(dirPath, entry) && isDirReadable(ctx, dirPath, entry) {
children = append(children, filepath.Join(dirPath, entry.Name()))
} else {
fileInfo, err := entry.Info()
if err != nil {
log.Error(ctx, "Scanner: Error getting fileInfo", "name", entry.Name(), err)
return folder, children, err
}
if fileInfo.ModTime().After(folder.modTime) {
folder.modTime = fileInfo.ModTime()
}
switch {
case model.IsAudioFile(entry.Name()):
folder.audioFiles[entry.Name()] = entry
case model.IsValidPlaylist(entry.Name()):
folder.playlists = append(folder.playlists, entry)
case model.IsImageFile(entry.Name()):
folder.imageFiles[entry.Name()] = entry
if fileInfo.ModTime().After(folder.imagesUpdatedAt) {
folder.imagesUpdatedAt = fileInfo.ModTime()
}
}
}
}
return folder, children, nil
}
// fullReadDir reads all files in the folder, skipping the ones with errors.
// It also detects when it is "stuck" with an error in the same directory over and over.
// In this case, it stops and returns whatever it was able to read until it got stuck.
// See discussion here: https://github.com/navidrome/navidrome/issues/1164#issuecomment-881922850
func fullReadDir(ctx context.Context, dir fs.ReadDirFile) []fs.DirEntry {
var allEntries []fs.DirEntry
var prevErrStr = ""
for {
entries, err := dir.ReadDir(-1)
allEntries = append(allEntries, entries...)
if err == nil {
break
}
log.Warn(ctx, "Skipping DirEntry", err)
if prevErrStr == err.Error() {
log.Error(ctx, "Scanner: Duplicate DirEntry failure, bailing", err)
break
}
prevErrStr = err.Error()
}
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].Name() < allEntries[j].Name() })
return allEntries
}
// isDirOrSymlinkToDir returns true if and only if the dirEnt represents a file
// system directory, or a symbolic link to a directory. Note that if the dirEnt
// is not a directory but is a symbolic link, this method will resolve by
// sending a request to the operating system to follow the symbolic link.
// originally copied from github.com/karrick/godirwalk, modified to use dirEntry for
// efficiency for go 1.16 and beyond
func isDirOrSymlinkToDir(baseDir string, dirEnt fs.DirEntry) (bool, error) {
if dirEnt.IsDir() {
return true, nil
}
if dirEnt.Type()&os.ModeSymlink == 0 {
return false, nil
}
// Does this symlink point to a directory?
fileInfo, err := os.Stat(filepath.Join(baseDir, dirEnt.Name()))
if err != nil {
return false, err
}
return fileInfo.IsDir(), nil
}
// isDirReadable returns true if the directory represented by dirEnt is readable
func isDirReadable(ctx context.Context, baseDir string, dirEnt fs.DirEntry) bool {
path := filepath.Join(baseDir, dirEnt.Name())
dir, err := os.Open(path)
if err != nil {
log.Warn("Scanner: Skipping unreadable directory", "path", path, err)
return false
}
err = dir.Close()
if err != nil {
log.Warn(ctx, "Scanner: Error closing directory", "path", path, err)
}
return true
}
// isDirIgnored returns true if the directory represented by dirEnt contains an
// `ignore` file (named after skipScanFile)
func isDirIgnored(baseDir string, dirEnt fs.DirEntry) bool {
// allows Album folders for albums which eg start with ellipses
name := dirEnt.Name()
if strings.HasPrefix(name, ".") && !strings.HasPrefix(name, "..") {
return true
}
if runtime.GOOS == "windows" && strings.EqualFold(name, "$RECYCLE.BIN") {
return true
}
_, err := os.Stat(filepath.Join(baseDir, name, consts.SkipScanFile))
return err == nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/navidrome/navidrome/model"
@ -16,6 +17,7 @@ type scanContext struct {
lastUpdates map[string]time.Time
lock sync.RWMutex
fullRescan bool
numFolders atomic.Int64
}
func newScannerContext(ctx context.Context, ds model.DataStore, lib model.Library, fullRescan bool) (*scanContext, error) {

View File

@ -2,17 +2,13 @@ package scanner2
import (
"context"
"io/fs"
"time"
"github.com/charlievieth/fastwalk"
"github.com/google/go-pipeline/pkg/pipeline"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/model/request"
"github.com/navidrome/navidrome/scanner"
"github.com/navidrome/navidrome/utils/pl"
"github.com/navidrome/navidrome/utils/slice"
)
type scanner2 struct {
@ -60,106 +56,12 @@ func (s *scanner2) runPipeline(producer pipeline.Producer[*folderEntry], stages
func (s *scanner2) logFolder(ctx context.Context) func(folder *folderEntry) (out *folderEntry, err error) {
return func(folder *folderEntry) (out *folderEntry, err error) {
log.Debug(ctx, "Scanner: Found folder", "folder", folder.Name(), "_path", folder.path,
log.Debug(ctx, "Scanner: Completed processing folder", "_path", folder.path,
"audioCount", len(folder.audioFiles), "imageCount", len(folder.imageFiles), "plsCount", len(folder.playlists))
return folder, nil
}
}
func (s *scanner2) produceFolders(ctx context.Context, libs []model.Library, fullRescan bool) pipeline.ProducerFn[*folderEntry] {
scanCtxChan := make(chan *scanContext, len(libs))
go func() {
defer close(scanCtxChan)
for _, lib := range libs {
scanCtx, err := newScannerContext(ctx, s.ds, lib, fullRescan)
if err != nil {
log.Error(ctx, "Scanner: Error creating scan context", "lib", lib.Name, err)
continue
}
scanCtxChan <- scanCtx
}
}()
return func(put func(entry *folderEntry)) error {
outputChan := make(chan *folderEntry)
go func() {
defer close(outputChan)
for scanCtx := range pl.ReadOrDone(ctx, scanCtxChan) {
conf := &fastwalk.Config{Follow: true}
// lib.Path
err := fastwalk.Walk(conf, scanCtx.lib.Path, func(path string, d fs.DirEntry, err error) error {
if err != nil {
log.Warn(ctx, "Scanner: Error walking path", "lib", scanCtx.lib.Name, "path", path, err)
return nil
}
// Skip non-directories
if !d.IsDir() {
return nil
}
// Load all pertinent info from directory
folder, _, err := loadDir(ctx, scanCtx, path, d.(fastwalk.DirEntry))
if err != nil {
log.Warn(ctx, "Scanner: Error loading dir", "lib", scanCtx.lib.Name, "path", path, err)
return nil
}
outputChan <- folder
return nil
})
if err != nil {
log.Warn(ctx, "Scanner: Error scanning library", "lib", scanCtx.lib.Name, err)
}
}
}()
var total int
for folder := range pl.ReadOrDone(ctx, outputChan) {
total++
put(folder)
}
log.Info(ctx, "Scanner: Finished loading all folders", "numFolders", total)
return nil
}
}
func (s *scanner2) processFolder(ctx context.Context) pipeline.StageFn[*folderEntry] {
return func(entry *folderEntry) (*folderEntry, error) {
// Load children mediafiles from DB
mfs, err := entry.scanCtx.ds.MediaFile(ctx).GetByFolder(entry.id)
if err != nil {
log.Warn(ctx, "Scanner: Error loading mediafiles from DB. Skipping", "folder", entry.path, err)
return entry, nil
}
dbTracks := slice.ToMap(mfs, func(mf model.MediaFile) (string, model.MediaFile) { return mf.Path, mf })
// Get list of files to import, leave dbTracks with tracks to be removed
var filesToImport []string
for afPath, af := range entry.audioFiles {
dbTrack, foundInDB := dbTracks[afPath]
if !foundInDB || entry.scanCtx.fullRescan {
filesToImport = append(filesToImport, afPath)
} else {
info, err := af.Info()
if err != nil {
log.Warn(ctx, "Scanner: Error getting file info", "folder", entry.path, "file", af.Name(), err)
return nil, err
}
if info.ModTime().After(dbTrack.UpdatedAt) {
filesToImport = append(filesToImport, afPath)
}
}
delete(dbTracks, afPath)
}
//tracksToRemove := dbTracks // Just to name things properly
// Load tags from files to import
// Add new/updated files to DB
// Remove deleted mediafiles from DB
// Update folder info in DB
return entry, nil
}
}
func (s *scanner2) Status(context.Context) (*scanner.StatusInfo, error) {
return &scanner.StatusInfo{}, nil
}