WIP: Add folders to DB
This commit is contained in:
parent
336ea2ebac
commit
8614a20f7e
|
@ -17,9 +17,10 @@ func upAddLibraryTable(ctx context.Context, tx *sql.Tx) error {
|
|||
_, err := tx.ExecContext(ctx, `
|
||||
create table library (
|
||||
id integer primary key autoincrement,
|
||||
name text not null unique,
|
||||
path text not null unique,
|
||||
remote_path text null default '',
|
||||
name varchar not null unique,
|
||||
path varchar not null unique,
|
||||
remote_path varchar null default '',
|
||||
extractor varchar null default 'taglib',
|
||||
last_scan_at datetime not null default '0000-00-00 00:00:00',
|
||||
updated_at datetime not null default current_timestamp,
|
||||
created_at datetime not null default current_timestamp
|
||||
|
@ -29,9 +30,9 @@ func upAddLibraryTable(ctx context.Context, tx *sql.Tx) error {
|
|||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, fmt.Sprintf(`
|
||||
insert into library(id, name, path, last_scan_at) values(1, 'Music Library', '%s', current_timestamp);
|
||||
insert into library(id, name, path, extractor, last_scan_at) values(1, 'Music Library', '%s', '%s', current_timestamp);
|
||||
delete from property where id like 'LastScan-%%';
|
||||
`, conf.Server.MusicFolder))
|
||||
`, conf.Server.MusicFolder, conf.Server.Scanner.Extractor))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
package migrations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/pressly/goose/v3"
|
||||
)
|
||||
|
||||
func init() {
|
||||
goose.AddMigrationContext(upAddFolderTable, downAddFolderTable)
|
||||
}
|
||||
|
||||
func upAddFolderTable(ctx context.Context, tx *sql.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, `
|
||||
create table if not exists folder(
|
||||
id varchar not null
|
||||
primary key,
|
||||
library_id integer not null
|
||||
references library (id)
|
||||
on delete cascade,
|
||||
path varchar default '' not null,
|
||||
name varchar default '' not null,
|
||||
updated_at timestamp default current_timestamp not null,
|
||||
created_at timestamp default current_timestamp not null,
|
||||
parent_id varchar default null
|
||||
references folder (id)
|
||||
on delete cascade
|
||||
);
|
||||
`)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func downAddFolderTable(ctx context.Context, tx *sql.Tx) error {
|
||||
// This code is executed when the migration is rolled back.
|
||||
return nil
|
||||
}
|
|
@ -21,6 +21,7 @@ type ResourceRepository interface {
|
|||
|
||||
type DataStore interface {
|
||||
Library(ctx context.Context) LibraryRepository
|
||||
Folder(ctx context.Context) FolderRepository
|
||||
Album(ctx context.Context) AlbumRepository
|
||||
Artist(ctx context.Context) ArtistRepository
|
||||
MediaFile(ctx context.Context) MediaFileRepository
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Folder struct {
|
||||
ID string
|
||||
LibraryID int
|
||||
Path string
|
||||
Name string
|
||||
ParentID string
|
||||
UpdateAt time.Time
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
func FolderID(lib Library, path string) string {
|
||||
path = strings.TrimPrefix(path, lib.Path)
|
||||
key := fmt.Sprintf("%d:%s", lib.ID, path)
|
||||
return fmt.Sprintf("%x", md5.Sum([]byte(key)))
|
||||
}
|
||||
func NewFolder(lib Library, path string) *Folder {
|
||||
id := FolderID(lib, path)
|
||||
dir, name := filepath.Split(path)
|
||||
return &Folder{
|
||||
ID: id,
|
||||
Path: dir,
|
||||
Name: name,
|
||||
}
|
||||
}
|
||||
|
||||
type FolderRepository interface {
|
||||
Get(lib Library, path string) (*Folder, error)
|
||||
GetAll(lib Library) ([]Folder, error)
|
||||
GetLastUpdates(lib Library) (map[string]time.Time, error)
|
||||
Put(lib Library, path string) error
|
||||
Touch(lib Library, path string, t time.Time) error
|
||||
}
|
|
@ -11,6 +11,7 @@ type Library struct {
|
|||
Name string
|
||||
Path string
|
||||
RemotePath string
|
||||
Extractor string
|
||||
LastScanAt time.Time
|
||||
UpdatedAt time.Time
|
||||
CreatedAt time.Time
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package persistence
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
. "github.com/Masterminds/squirrel"
|
||||
"github.com/navidrome/navidrome/model"
|
||||
"github.com/pocketbase/dbx"
|
||||
)
|
||||
|
||||
type folderRepository struct {
|
||||
sqlRepository
|
||||
}
|
||||
|
||||
func newFolderRepository(ctx context.Context, db dbx.Builder) model.FolderRepository {
|
||||
r := &folderRepository{}
|
||||
r.ctx = ctx
|
||||
r.db = db
|
||||
r.tableName = "folder"
|
||||
return r
|
||||
}
|
||||
|
||||
func (r folderRepository) Get(lib model.Library, path string) (*model.Folder, error) {
|
||||
id := model.NewFolder(lib, path).ID
|
||||
sq := r.newSelect().Where(Eq{"id": id})
|
||||
var res model.Folder
|
||||
err := r.queryOne(sq, res)
|
||||
return &res, err
|
||||
}
|
||||
|
||||
func (r folderRepository) GetAll(lib model.Library) ([]model.Folder, error) {
|
||||
sq := r.newSelect().Columns("*").Where(Eq{"library_id": lib.ID})
|
||||
var res []model.Folder
|
||||
err := r.queryAll(sq, &res)
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (r folderRepository) GetLastUpdates(lib model.Library) (map[string]time.Time, error) {
|
||||
sq := r.newSelect().Columns("id", "updated_at").Where(Eq{"library_id": lib.ID})
|
||||
var res []struct {
|
||||
ID string
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
err := r.queryAll(sq, &res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := make(map[string]time.Time, len(res))
|
||||
for _, f := range res {
|
||||
m[f.ID] = f.UpdatedAt
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (r folderRepository) Put(lib model.Library, path string) error {
|
||||
folder := model.NewFolder(lib, path)
|
||||
_, err := r.put(folder.ID, folder)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r folderRepository) Touch(lib model.Library, path string, t time.Time) error {
|
||||
id := model.FolderID(lib, path)
|
||||
sq := Update(r.tableName).Set("updated_at", t).Where(Eq{"id": id})
|
||||
_, err := r.executeSQL(sq)
|
||||
return err
|
||||
}
|
||||
|
||||
var _ model.FolderRepository = (*folderRepository)(nil)
|
|
@ -52,7 +52,7 @@ const hardCodedMusicFolderID = 1
|
|||
|
||||
func (r *libraryRepository) StoreMusicFolder() error {
|
||||
sq := Update(r.tableName).Set("path", conf.Server.MusicFolder).Set("updated_at", time.Now()).
|
||||
Where(Eq{"id": hardCodedMusicFolderID})
|
||||
Set("extractor", conf.Server.Scanner.Extractor).Where(Eq{"id": hardCodedMusicFolderID})
|
||||
_, err := r.executeSQL(sq)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -35,6 +35,10 @@ func (s *SQLStore) Library(ctx context.Context) model.LibraryRepository {
|
|||
return NewLibraryRepository(ctx, s.getDBXBuilder())
|
||||
}
|
||||
|
||||
func (s *SQLStore) Folder(ctx context.Context) model.FolderRepository {
|
||||
return newFolderRepository(ctx, s.getDBXBuilder())
|
||||
}
|
||||
|
||||
func (s *SQLStore) Genre(ctx context.Context) model.GenreRepository {
|
||||
return NewGenreRepository(ctx, s.getDBXBuilder())
|
||||
}
|
||||
|
|
|
@ -11,21 +11,30 @@ import (
|
|||
"github.com/charlievieth/fastwalk"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
type folderEntry struct {
|
||||
fastwalk.DirEntry
|
||||
path string
|
||||
scanCtx *scanContext
|
||||
modTime time.Time
|
||||
images []string
|
||||
path string // Full path
|
||||
id string // DB ID
|
||||
updTime time.Time // From DB
|
||||
modTime time.Time // From FS
|
||||
audioFiles []fs.DirEntry
|
||||
imageFiles []fs.DirEntry
|
||||
playlists []fs.DirEntry
|
||||
imagesUpdatedAt time.Time
|
||||
hasPlaylists bool
|
||||
audioFilesCount uint32
|
||||
}
|
||||
|
||||
func (f *folderEntry) isExpired() bool {
|
||||
return f.updTime.Before(f.modTime)
|
||||
}
|
||||
|
||||
func loadDir(ctx context.Context, scanCtx *scanContext, dirPath string, d fastwalk.DirEntry) (folder *folderEntry, children []string, err error) {
|
||||
folder = &folderEntry{DirEntry: d, scanCtx: scanCtx, path: dirPath}
|
||||
folder.id = model.FolderID(scanCtx.lib, dirPath)
|
||||
folder.updTime = scanCtx.getLastUpdatedInDB(folder.id)
|
||||
|
||||
dirInfo, err := d.Stat()
|
||||
if err != nil {
|
||||
|
@ -61,17 +70,20 @@ func loadDir(ctx context.Context, scanCtx *scanContext, dirPath string, d fastwa
|
|||
}
|
||||
switch {
|
||||
case model.IsAudioFile(entry.Name()):
|
||||
folder.audioFilesCount++
|
||||
folder.audioFiles = append(folder.audioFiles, entry)
|
||||
case model.IsValidPlaylist(entry.Name()):
|
||||
folder.hasPlaylists = true
|
||||
folder.playlists = append(folder.playlists, entry)
|
||||
case model.IsImageFile(entry.Name()):
|
||||
folder.images = append(folder.images, entry.Name())
|
||||
folder.imageFiles = append(folder.imageFiles, entry)
|
||||
if fileInfo.ModTime().After(folder.imagesUpdatedAt) {
|
||||
folder.imagesUpdatedAt = fileInfo.ModTime()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
slices.SortFunc(folder.audioFiles, func(i, j fs.DirEntry) bool { return i.Name() < j.Name() })
|
||||
slices.SortFunc(folder.imageFiles, func(i, j fs.DirEntry) bool { return i.Name() < j.Name() })
|
||||
slices.SortFunc(folder.playlists, func(i, j fs.DirEntry) bool { return i.Name() < j.Name() })
|
||||
return folder, children, nil
|
||||
}
|
||||
|
||||
|
@ -79,8 +91,8 @@ func loadDir(ctx context.Context, scanCtx *scanContext, dirPath string, d fastwa
|
|||
// It also detects when it is "stuck" with an error in the same directory over and over.
|
||||
// In this case, it stops and returns whatever it was able to read until it got stuck.
|
||||
// See discussion here: https://github.com/navidrome/navidrome/issues/1164#issuecomment-881922850
|
||||
func fullReadDir(ctx context.Context, dir fs.ReadDirFile) []os.DirEntry {
|
||||
var allEntries []os.DirEntry
|
||||
func fullReadDir(ctx context.Context, dir fs.ReadDirFile) []fs.DirEntry {
|
||||
var allEntries []fs.DirEntry
|
||||
var prevErrStr = ""
|
||||
for {
|
||||
entries, err := dir.ReadDir(-1)
|
||||
|
@ -121,7 +133,7 @@ func isDirOrSymlinkToDir(baseDir string, dirEnt fs.DirEntry) (bool, error) {
|
|||
}
|
||||
|
||||
// isDirReadable returns true if the directory represented by dirEnt is readable
|
||||
func isDirReadable(ctx context.Context, baseDir string, dirEnt os.DirEntry) bool {
|
||||
func isDirReadable(ctx context.Context, baseDir string, dirEnt fs.DirEntry) bool {
|
||||
path := filepath.Join(baseDir, dirEnt.Name())
|
||||
|
||||
dir, err := os.Open(path)
|
||||
|
|
|
@ -1,19 +1,42 @@
|
|||
package scanner2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/model"
|
||||
)
|
||||
|
||||
type scanContext struct {
|
||||
lib model.Library
|
||||
startTime time.Time
|
||||
lib model.Library
|
||||
ds model.DataStore
|
||||
startTime time.Time
|
||||
lastUpdates map[string]time.Time
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
func newScannerContext(lib model.Library) *scanContext {
|
||||
return &scanContext{
|
||||
lib: lib,
|
||||
startTime: time.Now(),
|
||||
func newScannerContext(ctx context.Context, ds model.DataStore, lib model.Library) (*scanContext, error) {
|
||||
lastUpdates, err := ds.Folder(ctx).GetLastUpdates(lib)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting last updates: %w", err)
|
||||
}
|
||||
return &scanContext{
|
||||
lib: lib,
|
||||
ds: ds,
|
||||
startTime: time.Now(),
|
||||
lastUpdates: lastUpdates,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *scanContext) getLastUpdatedInDB(id string) time.Time {
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
t, ok := s.lastUpdates[id]
|
||||
if !ok {
|
||||
return time.Time{}
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
|
|
@ -32,28 +32,46 @@ func (s *scanner2) RescanAll(requestCtx context.Context, fullRescan bool) error
|
|||
|
||||
startTime := time.Now()
|
||||
log.Info(ctx, "Scanner: Starting scan", "fullRescan", fullRescan, "numLibraries", len(libs))
|
||||
scanCtxChan := createScanContexts(ctx, libs)
|
||||
|
||||
scanCtxChan := createScanContexts(ctx, s.ds, libs)
|
||||
folderChan, folderErrChan := walkDirEntries(ctx, scanCtxChan)
|
||||
logErrChan := pl.Sink(ctx, 4, folderChan, func(ctx context.Context, folder *folderEntry) error {
|
||||
log.Debug(ctx, "Scanner: Found folder", "folder", folder.Name(), "_path", folder.path, "audioCount", folder.audioFilesCount, "images", folder.images, "hasPlaylist", folder.hasPlaylists)
|
||||
changedFolderChan, changedFolderErrChan := pl.Filter(ctx, 4, folderChan, onlyOutdated(fullRescan))
|
||||
|
||||
// TODO Next: load tags from all files that are newer than or not in DB
|
||||
|
||||
logErrChan := pl.Sink(ctx, 4, changedFolderChan, func(ctx context.Context, folder *folderEntry) error {
|
||||
log.Debug(ctx, "Scanner: Found folder", "folder", folder.Name(), "_path", folder.path,
|
||||
"audioCount", len(folder.audioFiles), "imageCount", len(folder.imageFiles), "plsCount", len(folder.playlists))
|
||||
return nil
|
||||
})
|
||||
|
||||
// Wait for pipeline to end, return first error found
|
||||
for err := range pl.Merge(ctx, folderErrChan, logErrChan) {
|
||||
for err := range pl.Merge(ctx, folderErrChan, logErrChan, changedFolderErrChan) {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info(ctx, "Scanner: Scan finished", "duration", time.Since(startTime))
|
||||
log.Info(ctx, "Scanner: Finished scanning all libraries", "duration", time.Since(startTime))
|
||||
return nil
|
||||
}
|
||||
|
||||
func createScanContexts(ctx context.Context, libs []model.Library) chan *scanContext {
|
||||
// onlyOutdated returns a filter function that returns true if the folder is outdated (needs to be scanned)
|
||||
func onlyOutdated(fullScan bool) func(ctx context.Context, entry *folderEntry) (bool, error) {
|
||||
return func(ctx context.Context, entry *folderEntry) (bool, error) {
|
||||
return fullScan || entry.isExpired(), nil
|
||||
}
|
||||
}
|
||||
|
||||
func createScanContexts(ctx context.Context, ds model.DataStore, libs []model.Library) chan *scanContext {
|
||||
outputChannel := make(chan *scanContext, len(libs))
|
||||
go func() {
|
||||
defer close(outputChannel)
|
||||
for _, lib := range libs {
|
||||
outputChannel <- newScannerContext(lib)
|
||||
scanCtx, err := newScannerContext(ctx, ds, lib)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Scanner: Error creating scan context", "lib", lib.Name, err)
|
||||
continue
|
||||
}
|
||||
outputChannel <- scanCtx
|
||||
}
|
||||
}()
|
||||
return outputChannel
|
||||
|
|
|
@ -47,6 +47,10 @@ func (db *MockDataStore) Library(context.Context) model.LibraryRepository {
|
|||
return struct{ model.LibraryRepository }{}
|
||||
}
|
||||
|
||||
func (db *MockDataStore) Folder(context.Context) model.FolderRepository {
|
||||
return struct{ model.FolderRepository }{}
|
||||
}
|
||||
|
||||
func (db *MockDataStore) Genre(context.Context) model.GenreRepository {
|
||||
if db.MockedGenre == nil {
|
||||
db.MockedGenre = &MockedGenreRepo{}
|
||||
|
|
|
@ -174,3 +174,32 @@ func FromSlice[T any](ctx context.Context, in []T) <-chan T {
|
|||
close(output)
|
||||
return output
|
||||
}
|
||||
|
||||
func Filter[T any](ctx context.Context, maxWorkers int, inputChan chan T, f func(context.Context, T) (bool, error)) (chan T, chan error) {
|
||||
outputChan := make(chan T)
|
||||
errorChan := make(chan error)
|
||||
go func() {
|
||||
defer close(outputChan)
|
||||
defer close(errorChan)
|
||||
|
||||
errChan := Sink(ctx, maxWorkers, inputChan, func(ctx context.Context, item T) error {
|
||||
ok, err := f(ctx, item)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
outputChan <- item
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Wait for pipeline to end, and forward any errors
|
||||
for err := range ReadOrDone(ctx, errChan) {
|
||||
select {
|
||||
case errorChan <- err:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
return outputChan, errorChan
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue