diff --git a/cmd/root.go b/cmd/root.go index e0290f68..8b8c1b2a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -74,8 +74,8 @@ func runNavidrome() { func startServer() (func() error, func(err error)) { return func() error { a := CreateServer(conf.Server.MusicFolder) - a.MountRouter("Subsonic API", consts.URLPathSubsonicAPI, CreateSubsonicAPIRouter()) a.MountRouter("Native API", consts.URLPathNativeAPI, CreateNativeAPIRouter()) + a.MountRouter("Subsonic API", consts.URLPathSubsonicAPI, CreateSubsonicAPIRouter()) if conf.Server.DevEnableScrobble { a.MountRouter("LastFM Auth", consts.URLPathNativeAPI+"/lastfm", CreateLastFMRouter()) } diff --git a/core/agents/lastfm/agent.go b/core/agents/lastfm/agent.go index 7b823e33..7095be5c 100644 --- a/core/agents/lastfm/agent.go +++ b/core/agents/lastfm/agent.go @@ -160,9 +160,10 @@ func (l *lastfmAgent) callArtistGetTopTracks(ctx context.Context, artistName, mb func (l *lastfmAgent) NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error { sk, err := l.sessionKeys.get(ctx, userId) - if err != nil { - return err + if err != nil || sk == "" { + return scrobbler.ErrNotAuthorized } + err = l.client.UpdateNowPlaying(ctx, sk, ScrobbleInfo{ artist: track.Artist, track: track.Title, @@ -173,38 +174,44 @@ func (l *lastfmAgent) NowPlaying(ctx context.Context, userId string, track *mode albumArtist: track.AlbumArtist, }) if err != nil { - return err + log.Warn(ctx, "Last.fm client.updateNowPlaying returned error", "track", track.Title, err) + return scrobbler.ErrUnrecoverable } return nil } -func (l *lastfmAgent) Scrobble(ctx context.Context, userId string, scrobbles []scrobbler.Scrobble) error { +func (l *lastfmAgent) Scrobble(ctx context.Context, userId string, s scrobbler.Scrobble) error { sk, err := l.sessionKeys.get(ctx, userId) - if err != nil { - return err + if err != nil || sk == "" { + return scrobbler.ErrNotAuthorized } - // TODO Implement batch scrobbling - for _, s := range scrobbles { - if s.Duration <= 30 { - log.Debug(ctx, "Skipping Last.fm scrobble for short song", "track", s.Title, "duration", s.Duration) - continue - } - err = l.client.Scrobble(ctx, sk, ScrobbleInfo{ - artist: s.Artist, - track: s.Title, - album: s.Album, - trackNumber: s.TrackNumber, - mbid: s.MbzTrackID, - duration: int(s.Duration), - albumArtist: s.AlbumArtist, - timestamp: s.TimeStamp, - }) - if err != nil { - return err - } + if s.Duration <= 30 { + log.Debug(ctx, "Skipping Last.fm scrobble for short song", "track", s.Title, "duration", s.Duration) + return nil } - return nil + err = l.client.Scrobble(ctx, sk, ScrobbleInfo{ + artist: s.Artist, + track: s.Title, + album: s.Album, + trackNumber: s.TrackNumber, + mbid: s.MbzTrackID, + duration: int(s.Duration), + albumArtist: s.AlbumArtist, + timestamp: s.TimeStamp, + }) + if err == nil { + return nil + } + lfErr, isLastFMError := err.(*lastFMError) + if !isLastFMError { + log.Warn(ctx, "Last.fm client.scrobble returned error", "track", s.Title, err) + return scrobbler.ErrRetryLater + } + if lfErr.Code == 11 || lfErr.Code == 16 { + return scrobbler.ErrRetryLater + } + return scrobbler.ErrUnrecoverable } func (l *lastfmAgent) IsAuthorized(ctx context.Context, userId string) bool { diff --git a/core/agents/lastfm/agent_test.go b/core/agents/lastfm/agent_test.go index c636359b..f8ea2b37 100644 --- a/core/agents/lastfm/agent_test.go +++ b/core/agents/lastfm/agent_test.go @@ -264,15 +264,19 @@ var _ = Describe("lastfmAgent", func() { Expect(sentParams.Get("duration")).To(Equal(strconv.FormatFloat(float64(track.Duration), 'G', -1, 32))) Expect(sentParams.Get("mbid")).To(Equal(track.MbzTrackID)) }) + + It("returns ErrNotAuthorized if user is not linked", func() { + err := agent.NowPlaying(ctx, "user-2", track) + Expect(err).To(MatchError(scrobbler.ErrNotAuthorized)) + }) }) Describe("Scrobble", func() { It("calls Last.fm with correct params", func() { ts := time.Now() - scrobbles := []scrobbler.Scrobble{{MediaFile: *track, TimeStamp: ts}} httpClient.Res = http.Response{Body: ioutil.NopCloser(bytes.NewBufferString("{}")), StatusCode: 200} - err := agent.Scrobble(ctx, "user-1", scrobbles) + err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: ts}) Expect(err).ToNot(HaveOccurred()) Expect(httpClient.SavedRequest.Method).To(Equal(http.MethodPost)) @@ -291,14 +295,58 @@ var _ = Describe("lastfmAgent", func() { It("skips songs with less than 31 seconds", func() { track.Duration = 29 - scrobbles := []scrobbler.Scrobble{{MediaFile: *track, TimeStamp: time.Now()}} httpClient.Res = http.Response{Body: ioutil.NopCloser(bytes.NewBufferString("{}")), StatusCode: 200} - err := agent.Scrobble(ctx, "user-1", scrobbles) + err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()}) Expect(err).ToNot(HaveOccurred()) Expect(httpClient.SavedRequest).To(BeNil()) }) + + It("returns ErrNotAuthorized if user is not linked", func() { + err := agent.Scrobble(ctx, "user-2", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()}) + Expect(err).To(MatchError(scrobbler.ErrNotAuthorized)) + }) + + It("returns ErrRetryLater on error 11", func() { + httpClient.Res = http.Response{ + Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":11,"message":"Service Offline - This service is temporarily offline. Try again later."}`)), + StatusCode: 400, + } + + err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()}) + Expect(err).To(MatchError(scrobbler.ErrRetryLater)) + }) + + It("returns ErrRetryLater on error 16", func() { + httpClient.Res = http.Response{ + Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":16,"message":"There was a temporary error processing your request. Please try again"}`)), + StatusCode: 400, + } + + err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()}) + Expect(err).To(MatchError(scrobbler.ErrRetryLater)) + }) + + It("returns ErrRetryLater on http errors", func() { + httpClient.Res = http.Response{ + Body: ioutil.NopCloser(bytes.NewBufferString(`internal server error`)), + StatusCode: 500, + } + + err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()}) + Expect(err).To(MatchError(scrobbler.ErrRetryLater)) + }) + + It("returns ErrUnrecoverable on other errors", func() { + httpClient.Res = http.Response{ + Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":8,"message":"Operation failed - Something else went wrong"}`)), + StatusCode: 400, + } + + err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()}) + Expect(err).To(MatchError(scrobbler.ErrUnrecoverable)) + }) }) }) diff --git a/core/scrobbler/buffered_scrobbler.go b/core/scrobbler/buffered_scrobbler.go new file mode 100644 index 00000000..1d98293e --- /dev/null +++ b/core/scrobbler/buffered_scrobbler.go @@ -0,0 +1,115 @@ +package scrobbler + +import ( + "context" + "time" + + "github.com/navidrome/navidrome/log" + "github.com/navidrome/navidrome/model" +) + +func NewBufferedScrobbler(ds model.DataStore, s Scrobbler, service string) *bufferedScrobbler { + b := &bufferedScrobbler{ds: ds, wrapped: s, service: service} + b.wakeSignal = make(chan struct{}, 1) + go b.run() + return b +} + +type bufferedScrobbler struct { + ds model.DataStore + wrapped Scrobbler + service string + wakeSignal chan struct{} +} + +func (b *bufferedScrobbler) IsAuthorized(ctx context.Context, userId string) bool { + return b.wrapped.IsAuthorized(ctx, userId) +} + +func (b *bufferedScrobbler) NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error { + return b.wrapped.NowPlaying(ctx, userId, track) +} + +func (b *bufferedScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error { + err := b.ds.ScrobbleBuffer(ctx).Enqueue(b.service, userId, s.ID, s.TimeStamp) + if err != nil { + return err + } + + b.sendWakeSignal() + return nil +} + +func (b *bufferedScrobbler) sendWakeSignal() { + // Don't block if the previous signal was not read yet + select { + case b.wakeSignal <- struct{}{}: + default: + } +} + +func (b *bufferedScrobbler) run() { + ctx := context.Background() + for { + if !b.processQueue(ctx) { + time.AfterFunc(5*time.Second, func() { + b.sendWakeSignal() + }) + } + <-b.wakeSignal + } +} + +func (b *bufferedScrobbler) processQueue(ctx context.Context) bool { + buffer := b.ds.ScrobbleBuffer(ctx) + userIds, err := buffer.UserIDs(b.service) + if err != nil { + log.Error(ctx, "Error retrieving userIds from scrobble buffer", "scrobbler", b.service, err) + return false + } + result := true + for _, userId := range userIds { + if !b.processUserQueue(ctx, userId) { + result = false + } + } + return result +} + +func (b *bufferedScrobbler) processUserQueue(ctx context.Context, userId string) bool { + buffer := b.ds.ScrobbleBuffer(ctx) + for { + entry, err := buffer.Next(b.service, userId) + if err != nil { + log.Error(ctx, "Error reading from scrobble buffer", "scrobbler", b.service, err) + return false + } + if entry == nil { + return true + } + log.Debug(ctx, "Sending scrobble", "scrobbler", b.service, "track", entry.Title, "artist", entry.Artist) + err = b.wrapped.Scrobble(ctx, entry.UserID, Scrobble{ + MediaFile: entry.MediaFile, + TimeStamp: entry.PlayTime, + }) + if err != nil { + switch err { + case ErrRetryLater: + log.Warn(ctx, "Could not send scrobble. Will be retried", "userId", entry.UserID, + "track", entry.Title, "artist", entry.Artist, "scrobbler", b.service, err) + return false + default: + log.Error(ctx, "Error sending scrobble to service. Discarding", "scrobbler", b.service, + "userId", entry.UserID, "artist", entry.Artist, "track", entry.Title, err) + } + } + err = buffer.Dequeue(entry) + if err != nil { + log.Error(ctx, "Error removing entry from scrobble buffer", "userId", entry.UserID, + "track", entry.Title, "artist", entry.Artist, "scrobbler", b.service, err) + return false + } + } +} + +var _ Scrobbler = (*bufferedScrobbler)(nil) diff --git a/core/scrobbler/interfaces.go b/core/scrobbler/interfaces.go index 158931c8..90141f11 100644 --- a/core/scrobbler/interfaces.go +++ b/core/scrobbler/interfaces.go @@ -2,6 +2,7 @@ package scrobbler import ( "context" + "errors" "time" "github.com/navidrome/navidrome/model" @@ -12,10 +13,16 @@ type Scrobble struct { TimeStamp time.Time } +var ( + ErrNotAuthorized = errors.New("not authorized") + ErrRetryLater = errors.New("retry later") + ErrUnrecoverable = errors.New("unrecoverable") +) + type Scrobbler interface { IsAuthorized(ctx context.Context, userId string) bool NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error - Scrobble(ctx context.Context, userId string, scrobbles []Scrobble) error + Scrobble(ctx context.Context, userId string, s Scrobble) error } type Constructor func(ds model.DataStore) Scrobbler diff --git a/core/scrobbler/play_tracker.go b/core/scrobbler/play_tracker.go index c3aaff08..7824c040 100644 --- a/core/scrobbler/play_tracker.go +++ b/core/scrobbler/play_tracker.go @@ -39,9 +39,10 @@ type PlayTracker interface { } type playTracker struct { - ds model.DataStore - broker events.Broker - playMap *ttlcache.Cache + ds model.DataStore + broker events.Broker + playMap *ttlcache.Cache + scrobblers map[string]Scrobbler } func GetPlayTracker(ds model.DataStore, broker events.Broker) PlayTracker { @@ -49,7 +50,14 @@ func GetPlayTracker(ds model.DataStore, broker events.Broker) PlayTracker { m := ttlcache.NewCache() m.SkipTTLExtensionOnHit(true) _ = m.SetTTL(nowPlayingExpire) - return &playTracker{ds: ds, playMap: m, broker: broker} + p := &playTracker{ds: ds, playMap: m, broker: broker} + p.scrobblers = make(map[string]Scrobbler) + for name, constructor := range constructors { + s := constructor(ds) + s = NewBufferedScrobbler(ds, s, name) + p.scrobblers[name] = s + } + return p }) return instance.(*playTracker) } @@ -78,15 +86,12 @@ func (p *playTracker) dispatchNowPlaying(ctx context.Context, userId string, tra return } // TODO Parallelize - for name, constructor := range scrobblers { - err := func() error { - s := constructor(p.ds) - if !s.IsAuthorized(ctx, userId) { - return nil - } - log.Debug(ctx, "Sending NowPlaying info", "scrobbler", name, "track", t.Title, "artist", t.Artist) - return s.NowPlaying(ctx, userId, t) - }() + for name, s := range p.scrobblers { + if !s.IsAuthorized(ctx, userId) { + continue + } + log.Debug(ctx, "Sending NowPlaying info", "scrobbler", name, "track", t.Title, "artist", t.Artist) + err := s.NowPlaying(ctx, userId, t) if err != nil { log.Error(ctx, "Error sending NowPlayingInfo", "scrobbler", name, "track", t.Title, "artist", t.Artist, err) return @@ -161,17 +166,13 @@ func (p *playTracker) incPlay(ctx context.Context, track *model.MediaFile, times func (p *playTracker) dispatchScrobble(ctx context.Context, t *model.MediaFile, playTime time.Time) error { u, _ := request.UserFrom(ctx) - scrobbles := []Scrobble{{MediaFile: *t, TimeStamp: playTime}} - // TODO Parallelize - for name, constructor := range scrobblers { - err := func() error { - s := constructor(p.ds) - if !s.IsAuthorized(ctx, u.ID) { - return nil - } - log.Debug(ctx, "Sending Scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist) - return s.Scrobble(ctx, u.ID, scrobbles) - }() + scrobble := Scrobble{MediaFile: *t, TimeStamp: playTime} + for name, s := range p.scrobblers { + if !s.IsAuthorized(ctx, u.ID) { + continue + } + log.Debug(ctx, "Buffering scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist) + err := s.Scrobble(ctx, u.ID, scrobble) if err != nil { log.Error(ctx, "Error sending Scrobble", "scrobbler", name, "track", t.Title, "artist", t.Artist, err) return err @@ -180,14 +181,14 @@ func (p *playTracker) dispatchScrobble(ctx context.Context, t *model.MediaFile, return nil } -var scrobblers map[string]Constructor +var constructors map[string]Constructor func Register(name string, init Constructor) { if !conf.Server.DevEnableScrobble { return } - if scrobblers == nil { - scrobblers = make(map[string]Constructor) + if constructors == nil { + constructors = make(map[string]Constructor) } - scrobblers[name] = init + constructors[name] = init } diff --git a/core/scrobbler/play_tracker_test.go b/core/scrobbler/play_tracker_test.go index 76220c42..1d28f65a 100644 --- a/core/scrobbler/play_tracker_test.go +++ b/core/scrobbler/play_tracker_test.go @@ -6,11 +6,9 @@ import ( "time" "github.com/navidrome/navidrome/conf" - - "github.com/navidrome/navidrome/server/events" - "github.com/navidrome/navidrome/model" "github.com/navidrome/navidrome/model/request" + "github.com/navidrome/navidrome/server/events" "github.com/navidrome/navidrome/tests" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -19,11 +17,11 @@ import ( var _ = Describe("PlayTracker", func() { var ctx context.Context var ds model.DataStore - var broker PlayTracker + var tracker PlayTracker var track model.MediaFile var album model.Album var artist model.Artist - var fake *fakeScrobbler + var fake fakeScrobbler BeforeEach(func() { conf.Server.DevEnableScrobble = true @@ -31,11 +29,18 @@ var _ = Describe("PlayTracker", func() { ctx = request.WithUser(ctx, model.User{ID: "u-1"}) ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: true}) ds = &tests.MockDataStore{} - broker = GetPlayTracker(ds, events.GetBroker()) - fake = &fakeScrobbler{Authorized: true} + fake = fakeScrobbler{Authorized: true} Register("fake", func(ds model.DataStore) Scrobbler { - return fake + return &fake }) + tracker = GetPlayTracker(ds, events.GetBroker()) + + // Remove buffering to simplify tests + for i, s := range tracker.(*playTracker).scrobblers { + if bs, ok := s.(*bufferedScrobbler); ok { + tracker.(*playTracker).scrobblers[i] = bs.wrapped + } + } track = model.MediaFile{ ID: "123", @@ -58,7 +63,7 @@ var _ = Describe("PlayTracker", func() { Describe("NowPlaying", func() { It("sends track to agent", func() { - err := broker.NowPlaying(ctx, "player-1", "player-one", "123") + err := tracker.NowPlaying(ctx, "player-1", "player-one", "123") Expect(err).ToNot(HaveOccurred()) Expect(fake.NowPlayingCalled).To(BeTrue()) Expect(fake.UserID).To(Equal("u-1")) @@ -67,7 +72,7 @@ var _ = Describe("PlayTracker", func() { It("does not send track to agent if user has not authorized", func() { fake.Authorized = false - err := broker.NowPlaying(ctx, "player-1", "player-one", "123") + err := tracker.NowPlaying(ctx, "player-1", "player-one", "123") Expect(err).ToNot(HaveOccurred()) Expect(fake.NowPlayingCalled).To(BeFalse()) @@ -75,7 +80,7 @@ var _ = Describe("PlayTracker", func() { It("does not send track to agent if player is not enabled to send scrobbles", func() { ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false}) - err := broker.NowPlaying(ctx, "player-1", "player-one", "123") + err := tracker.NowPlaying(ctx, "player-1", "player-one", "123") Expect(err).ToNot(HaveOccurred()) Expect(fake.NowPlayingCalled).To(BeFalse()) @@ -91,11 +96,11 @@ var _ = Describe("PlayTracker", func() { track2.ID = "456" _ = ds.MediaFile(ctx).Put(&track) ctx = request.WithUser(ctx, model.User{UserName: "user-1"}) - _ = broker.NowPlaying(ctx, "player-1", "player-one", "123") + _ = tracker.NowPlaying(ctx, "player-1", "player-one", "123") ctx = request.WithUser(ctx, model.User{UserName: "user-2"}) - _ = broker.NowPlaying(ctx, "player-2", "player-two", "456") + _ = tracker.NowPlaying(ctx, "player-2", "player-two", "456") - playing, err := broker.GetNowPlaying(ctx) + playing, err := tracker.GetNowPlaying(ctx) Expect(err).ToNot(HaveOccurred()) Expect(playing).To(HaveLen(2)) @@ -116,19 +121,19 @@ var _ = Describe("PlayTracker", func() { ctx = request.WithUser(ctx, model.User{ID: "u-1", UserName: "user-1"}) ts := time.Now() - err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}}) + err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}}) Expect(err).ToNot(HaveOccurred()) Expect(fake.ScrobbleCalled).To(BeTrue()) Expect(fake.UserID).To(Equal("u-1")) - Expect(fake.Scrobbles[0].ID).To(Equal("123")) + Expect(fake.LastScrobble.ID).To(Equal("123")) }) It("increments play counts in the DB", func() { ctx = request.WithUser(ctx, model.User{ID: "u-1", UserName: "user-1"}) ts := time.Now() - err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}}) + err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: ts}}) Expect(err).ToNot(HaveOccurred()) Expect(track.PlayCount).To(Equal(int64(1))) @@ -139,7 +144,7 @@ var _ = Describe("PlayTracker", func() { It("does not send track to agent if user has not authorized", func() { fake.Authorized = false - err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}}) + err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}}) Expect(err).ToNot(HaveOccurred()) Expect(fake.ScrobbleCalled).To(BeFalse()) @@ -148,7 +153,7 @@ var _ = Describe("PlayTracker", func() { It("does not send track to agent player is not enabled to send scrobbles", func() { ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false}) - err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}}) + err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}}) Expect(err).ToNot(HaveOccurred()) Expect(fake.ScrobbleCalled).To(BeFalse()) @@ -157,7 +162,7 @@ var _ = Describe("PlayTracker", func() { It("increments play counts even if it cannot scrobble", func() { fake.Error = errors.New("error") - err := broker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}}) + err := tracker.Submit(ctx, []Submission{{TrackID: "123", Timestamp: time.Now()}}) Expect(err).ToNot(HaveOccurred()) Expect(fake.ScrobbleCalled).To(BeFalse()) @@ -177,7 +182,7 @@ type fakeScrobbler struct { ScrobbleCalled bool UserID string Track *model.MediaFile - Scrobbles []Scrobble + LastScrobble Scrobble Error error } @@ -195,12 +200,12 @@ func (f *fakeScrobbler) NowPlaying(ctx context.Context, userId string, track *mo return nil } -func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, scrobbles []Scrobble) error { +func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error { f.ScrobbleCalled = true if f.Error != nil { return f.Error } f.UserID = userId - f.Scrobbles = scrobbles + f.LastScrobble = s return nil } diff --git a/db/migration/20210616150710_encrypt_all_passwords.go b/db/migration/20210616150710_encrypt_all_passwords.go index afa7f39f..d9a2be30 100644 --- a/db/migration/20210616150710_encrypt_all_passwords.go +++ b/db/migration/20210616150710_encrypt_all_passwords.go @@ -52,6 +52,5 @@ func upEncodeAllPasswords(tx *sql.Tx) error { } func downEncodeAllPasswords(tx *sql.Tx) error { - // This code is executed when the migration is rolled back. return nil } diff --git a/db/migration/20210623155401_add_user_prefs_player_scrobbler_enabled.go b/db/migration/20210623155401_add_user_prefs_player_scrobbler_enabled.go index a95083ee..da63ecd7 100644 --- a/db/migration/20210623155401_add_user_prefs_player_scrobbler_enabled.go +++ b/db/migration/20210623155401_add_user_prefs_player_scrobbler_enabled.go @@ -40,6 +40,5 @@ alter table player add scrobble_enabled bool default true; } func downAddUserPrefsPlayerScrobblerEnabled(tx *sql.Tx) error { - // This code is executed when the migration is rolled back. return nil } diff --git a/db/migration/20210626213026_add_scrobble_buffer.go b/db/migration/20210626213026_add_scrobble_buffer.go new file mode 100644 index 00000000..4fe74968 --- /dev/null +++ b/db/migration/20210626213026_add_scrobble_buffer.go @@ -0,0 +1,38 @@ +package migrations + +import ( + "database/sql" + + "github.com/pressly/goose" +) + +func init() { + goose.AddMigration(upAddScrobbleBuffer, downAddScrobbleBuffer) +} + +func upAddScrobbleBuffer(tx *sql.Tx) error { + _, err := tx.Exec(` +create table if not exists scrobble_buffer +( + user_id varchar not null + constraint scrobble_buffer_user_id_fk + references user + on update cascade on delete cascade, + service varchar not null, + media_file_id varchar not null + constraint scrobble_buffer_media_file_id_fk + references media_file + on update cascade on delete cascade, + play_time datetime not null, + enqueue_time datetime not null default current_timestamp, + constraint scrobble_buffer_pk + unique (user_id, service, media_file_id, play_time, user_id) +); +`) + + return err +} + +func downAddScrobbleBuffer(tx *sql.Tx) error { + return nil +} diff --git a/model/datastore.go b/model/datastore.go index 19b7f92e..19a08c05 100644 --- a/model/datastore.go +++ b/model/datastore.go @@ -33,6 +33,7 @@ type DataStore interface { Property(ctx context.Context) PropertyRepository User(ctx context.Context) UserRepository UserProps(ctx context.Context) UserPropsRepository + ScrobbleBuffer(ctx context.Context) ScrobbleBufferRepository Resource(ctx context.Context, model interface{}) ResourceRepository diff --git a/model/scrobble_buffer.go b/model/scrobble_buffer.go new file mode 100644 index 00000000..e75936c4 --- /dev/null +++ b/model/scrobble_buffer.go @@ -0,0 +1,21 @@ +package model + +import "time" + +type ScrobbleEntry struct { + MediaFile + Service string + UserID string `json:"user_id" orm:"column(user_id)"` + PlayTime time.Time + EnqueueTime time.Time +} + +type ScrobbleEntries []ScrobbleEntry + +type ScrobbleBufferRepository interface { + UserIDs(service string) ([]string, error) + Enqueue(service, userId, mediaFileId string, playTime time.Time) error + Next(service string, userId string) (*ScrobbleEntry, error) + Dequeue(entry *ScrobbleEntry) error + Length() (int64, error) +} diff --git a/persistence/persistence.go b/persistence/persistence.go index 57bdcc05..8ad96896 100644 --- a/persistence/persistence.go +++ b/persistence/persistence.go @@ -70,6 +70,10 @@ func (s *SQLStore) Player(ctx context.Context) model.PlayerRepository { return NewPlayerRepository(ctx, s.getOrmer()) } +func (s *SQLStore) ScrobbleBuffer(ctx context.Context) model.ScrobbleBufferRepository { + return NewScrobbleBufferRepository(ctx, s.getOrmer()) +} + func (s *SQLStore) Resource(ctx context.Context, m interface{}) model.ResourceRepository { switch m.(type) { case model.User: diff --git a/persistence/scrobble_buffer_repository.go b/persistence/scrobble_buffer_repository.go new file mode 100644 index 00000000..1e32ccee --- /dev/null +++ b/persistence/scrobble_buffer_repository.go @@ -0,0 +1,83 @@ +package persistence + +import ( + "context" + "time" + + . "github.com/Masterminds/squirrel" + "github.com/astaxie/beego/orm" + "github.com/navidrome/navidrome/model" +) + +type scrobbleBufferRepository struct { + sqlRepository +} + +func NewScrobbleBufferRepository(ctx context.Context, o orm.Ormer) model.ScrobbleBufferRepository { + r := &scrobbleBufferRepository{} + r.ctx = ctx + r.ormer = o + r.tableName = "scrobble_buffer" + return r +} + +func (r *scrobbleBufferRepository) UserIDs(service string) ([]string, error) { + sql := Select().Columns("user_id"). + From(r.tableName). + Where(And{ + Eq{"service": service}, + }). + GroupBy("user_id"). + OrderBy("count(*)") + var userIds []string + err := r.queryAll(sql, &userIds) + return userIds, err +} + +func (r *scrobbleBufferRepository) Enqueue(service, userId, mediaFileId string, playTime time.Time) error { + ins := Insert(r.tableName).SetMap(map[string]interface{}{ + "user_id": userId, + "service": service, + "media_file_id": mediaFileId, + "play_time": playTime, + "enqueue_time": time.Now(), + }) + _, err := r.executeSQL(ins) + return err +} + +func (r *scrobbleBufferRepository) Next(service string, userId string) (*model.ScrobbleEntry, error) { + sql := Select().Columns("s.*, m.*"). + From(r.tableName+" s"). + LeftJoin("media_file m on m.id = s.media_file_id"). + Where(And{ + Eq{"service": service}, + Eq{"user_id": userId}, + }). + OrderBy("play_time", "s.rowid").Limit(1) + + res := model.ScrobbleEntries{} + // TODO Rewrite queryOne to use QueryRows, to workaround the recursive embedded structs issue + err := r.queryAll(sql, &res) + if err == model.ErrNotFound || len(res) == 0 { + return nil, nil + } + if err != nil { + return nil, err + } + return &res[0], nil +} + +func (r *scrobbleBufferRepository) Dequeue(entry *model.ScrobbleEntry) error { + return r.delete(And{ + Eq{"service": entry.Service}, + Eq{"media_file_id": entry.MediaFile.ID}, + Eq{"play_time": entry.PlayTime}, + }) +} + +func (r *scrobbleBufferRepository) Length() (int64, error) { + return r.count(Select()) +} + +var _ model.ScrobbleBufferRepository = (*scrobbleBufferRepository)(nil) diff --git a/persistence/sql_base_repository.go b/persistence/sql_base_repository.go index fd89f9fd..ef598633 100644 --- a/persistence/sql_base_repository.go +++ b/persistence/sql_base_repository.go @@ -149,7 +149,7 @@ func (r sqlRepository) queryOne(sq Sqlizer, response interface{}) error { start := time.Now() err = r.ormer.Raw(query, args...).QueryRow(response) if err == orm.ErrNoRows { - r.logSQL(query, args, nil, 1, start) + r.logSQL(query, args, nil, 0, start) return model.ErrNotFound } r.logSQL(query, args, err, 1, start) diff --git a/tests/mock_persistence.go b/tests/mock_persistence.go index 7150a0ec..e4517602 100644 --- a/tests/mock_persistence.go +++ b/tests/mock_persistence.go @@ -7,16 +7,17 @@ import ( ) type MockDataStore struct { - MockedGenre model.GenreRepository - MockedAlbum model.AlbumRepository - MockedArtist model.ArtistRepository - MockedMediaFile model.MediaFileRepository - MockedUser model.UserRepository - MockedProperty model.PropertyRepository - MockedPlayer model.PlayerRepository - MockedShare model.ShareRepository - MockedTranscoding model.TranscodingRepository - MockedUserProps model.UserPropsRepository + MockedGenre model.GenreRepository + MockedAlbum model.AlbumRepository + MockedArtist model.ArtistRepository + MockedMediaFile model.MediaFileRepository + MockedUser model.UserRepository + MockedProperty model.PropertyRepository + MockedPlayer model.PlayerRepository + MockedShare model.ShareRepository + MockedTranscoding model.TranscodingRepository + MockedUserProps model.UserPropsRepository + MockedScrobbleBuffer model.ScrobbleBufferRepository } func (db *MockDataStore) Album(context.Context) model.AlbumRepository { @@ -101,6 +102,13 @@ func (db *MockDataStore) Player(context.Context) model.PlayerRepository { return struct{ model.PlayerRepository }{} } +func (db *MockDataStore) ScrobbleBuffer(ctx context.Context) model.ScrobbleBufferRepository { + if db.MockedScrobbleBuffer == nil { + db.MockedScrobbleBuffer = CreateMockedScrobbleBufferRepo() + } + return db.MockedScrobbleBuffer +} + func (db *MockDataStore) WithTx(block func(db model.DataStore) error) error { return block(db) } diff --git a/tests/mock_scrobble_buffer_repo.go b/tests/mock_scrobble_buffer_repo.go new file mode 100644 index 00000000..06b28af7 --- /dev/null +++ b/tests/mock_scrobble_buffer_repo.go @@ -0,0 +1,81 @@ +package tests + +import ( + "time" + + "github.com/navidrome/navidrome/model" +) + +type MockedScrobbleBufferRepo struct { + Error error + data model.ScrobbleEntries +} + +func CreateMockedScrobbleBufferRepo() *MockedScrobbleBufferRepo { + return &MockedScrobbleBufferRepo{} +} + +func (m *MockedScrobbleBufferRepo) UserIDs(service string) ([]string, error) { + if m.Error != nil { + return nil, m.Error + } + userIds := make(map[string]struct{}) + for _, e := range m.data { + if e.Service == service { + userIds[e.UserID] = struct{}{} + } + } + var result []string + for uid := range userIds { + result = append(result, uid) + } + return result, nil +} + +func (m *MockedScrobbleBufferRepo) Enqueue(service, userId, mediaFileId string, playTime time.Time) error { + if m.Error != nil { + return m.Error + } + m.data = append(m.data, model.ScrobbleEntry{ + MediaFile: model.MediaFile{ID: mediaFileId}, + Service: service, + UserID: userId, + PlayTime: playTime, + EnqueueTime: time.Now(), + }) + return nil +} + +func (m *MockedScrobbleBufferRepo) Next(service, userId string) (*model.ScrobbleEntry, error) { + if m.Error != nil { + return nil, m.Error + } + for _, e := range m.data { + if e.Service == service && e.UserID == userId { + return &e, nil + } + } + return nil, nil +} + +func (m *MockedScrobbleBufferRepo) Dequeue(entry *model.ScrobbleEntry) error { + if m.Error != nil { + return m.Error + } + newData := model.ScrobbleEntries{} + for _, e := range m.data { + if e.Service == entry.Service && e.UserID == entry.UserID && e.PlayTime == entry.PlayTime && e.MediaFile.ID == entry.MediaFile.ID { + continue + } + newData = append(newData, e) + } + m.data = newData + return nil +} + +func (m *MockedScrobbleBufferRepo) Length() (int64, error) { + if m.Error != nil { + return 0, m.Error + } + return int64(len(m.data)), nil +}