Make pipeline stages simple functions, to simplify (future) tests
This commit is contained in:
parent
7a7827e50c
commit
bbfe8727ca
|
@ -17,7 +17,7 @@ const (
|
|||
filesBatchSize = 100
|
||||
)
|
||||
|
||||
func (s *scanner2) processFolder(ctx context.Context) pipeline.StageFn[*folderEntry] {
|
||||
func processFolder(ctx context.Context) pipeline.StageFn[*folderEntry] {
|
||||
return func(entry *folderEntry) (*folderEntry, error) {
|
||||
// Load children mediafiles from DB
|
||||
mfs, err := entry.scanCtx.ds.MediaFile(ctx).GetByFolder(entry.id)
|
||||
|
|
|
@ -17,12 +17,12 @@ import (
|
|||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
func (s *scanner2) produceFolders(ctx context.Context, libs []model.Library, fullRescan bool) pipeline.ProducerFn[*folderEntry] {
|
||||
func produceFolders(ctx context.Context, ds model.DataStore, libs []model.Library, fullRescan bool) pipeline.ProducerFn[*folderEntry] {
|
||||
scanCtxChan := make(chan *scanContext, len(libs))
|
||||
go func() {
|
||||
defer close(scanCtxChan)
|
||||
for _, lib := range libs {
|
||||
scanCtx, err := newScannerContext(ctx, s.ds, lib, fullRescan)
|
||||
scanCtx, err := newScannerContext(ctx, ds, lib, fullRescan)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Scanner: Error creating scan context", "lib", lib.Name, err)
|
||||
continue
|
||||
|
|
|
@ -32,9 +32,9 @@ func (s *scanner2) RescanAll(requestCtx context.Context, fullRescan bool) error
|
|||
log.Info(ctx, "Scanner: Starting scan", "fullRescan", fullRescan, "numLibraries", len(libs))
|
||||
|
||||
err = s.runPipeline(
|
||||
pipeline.NewProducer(s.produceFolders(ctx, libs, fullRescan), pipeline.Name("read folders from disk")),
|
||||
pipeline.NewStage(s.processFolder(ctx), pipeline.Name("process folder")),
|
||||
pipeline.NewStage(s.logFolder(ctx), pipeline.Name("log results")),
|
||||
pipeline.NewProducer(produceFolders(ctx, s.ds, libs, fullRescan), pipeline.Name("read folders from disk")),
|
||||
pipeline.NewStage(processFolder(ctx), pipeline.Name("process folder")),
|
||||
pipeline.NewStage(logFolder(ctx), pipeline.Name("log results")),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
@ -54,7 +54,7 @@ func (s *scanner2) runPipeline(producer pipeline.Producer[*folderEntry], stages
|
|||
return pipeline.Do(producer, stages...)
|
||||
}
|
||||
|
||||
func (s *scanner2) logFolder(ctx context.Context) func(folder *folderEntry) (out *folderEntry, err error) {
|
||||
func logFolder(ctx context.Context) func(folder *folderEntry) (out *folderEntry, err error) {
|
||||
return func(folder *folderEntry) (out *folderEntry, err error) {
|
||||
log.Debug(ctx, "Scanner: Completed processing folder", "_path", folder.path,
|
||||
"audioCount", len(folder.audioFiles), "imageCount", len(folder.imageFiles), "plsCount", len(folder.playlists))
|
||||
|
|
Loading…
Reference in New Issue