From 7b11660f4f201980fbb9493ce6979f238761da0a Mon Sep 17 00:00:00 2001 From: Philipp Serr Date: Wed, 23 Sep 2015 22:27:48 +0200 Subject: [PATCH] Prevent concurrent processing of same blob ... by first adding a preliminary index entry and making this fail if an index entry for the same blob already exists. A preliminary index entry is characterized by not yet being associated with a pack. Until now, these entries where added to the index just like final index entries using index.Store, which silently overwrites existing index entries. This commit adds a new method index.StoreInProgress which refuses to overwrite existing index entries and allows for creating preliminary index entries only. The existing method index.Store has not been changed and continues to silently overwrite existing index entries. This distinction is important, as otherwise, it would be impossible to update a preliminary index entry after the blob has been written to a pack. Resolves: restic#292 --- repository/index.go | 24 ++++++++++++++- repository/index_test.go | 65 ++++++++++++++++++++++++++++++++++++++++ repository/repository.go | 21 +++++++++---- 3 files changed, 104 insertions(+), 6 deletions(-) diff --git a/repository/index.go b/repository/index.go index b53de02dd..0611ced6b 100644 --- a/repository/index.go +++ b/repository/index.go @@ -2,6 +2,7 @@ package repository import ( "encoding/json" + "errors" "fmt" "io" "sync" @@ -43,7 +44,8 @@ func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset } } -// Store remembers the id and pack in the index. +// Store remembers the id and pack in the index. An existing entry will be +// silently overwritten. func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { idx.m.Lock() defer idx.m.Unlock() @@ -54,6 +56,26 @@ func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset idx.store(t, id, pack, offset, length, false) } +// StoreInProgress adds a preliminary index entry for a blob that is about to be +// saved. The entry must be updated using Store once the the blob has been +// written to a pack. Adding an preliminary index fails if there's an existing +// entry associated with the same id. +func (idx *Index) StoreInProgress(t pack.BlobType, id backend.ID) error { + idx.m.Lock() + defer idx.m.Unlock() + + if _, hasID := idx.pack[id]; hasID { + errorMsg := fmt.Sprintf("index already contains id %v (%v)", id.Str(), t) + debug.Log("Index.StoreInProgress", errorMsg) + return errors.New(errorMsg) + } + + idx.store(t, id, nil, 0, 0, false) + debug.Log("Index.StoreInProgress", "preliminary entry added for id %v (%v)", + id.Str(), t) + return nil +} + // Remove removes the pack ID from the index. func (idx *Index) Remove(packID backend.ID) { idx.m.Lock() diff --git a/repository/index_test.go b/repository/index_test.go index f30f7beab..668acebb9 100644 --- a/repository/index_test.go +++ b/repository/index_test.go @@ -223,3 +223,68 @@ func TestIndexUnserialize(t *testing.T) { Equals(t, test.length, length) } } + +func TestStoreOverwritesPreliminaryEntry(t *testing.T) { + idx := repository.NewIndex() + + blobID := randomID() + dataType := pack.Data + idx.StoreInProgress(dataType, blobID) + + packID := randomID() + offset := uint(0) + length := uint(100) + idx.Store(dataType, blobID, &packID, offset, length) + + actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID) + OK(t, err) + Equals(t, packID, *actPackID) + Equals(t, dataType, actType) + Equals(t, offset, actOffset) + Equals(t, length, actLength) +} + +func TestStoreInProgressAddsPreliminaryEntry(t *testing.T) { + idx := repository.NewIndex() + + blobID := randomID() + dataType := pack.Data + + err := idx.StoreInProgress(dataType, blobID) + OK(t, err) + + actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID) + OK(t, err) + Assert(t, actPackID == nil, + "Preliminary index entry illegaly associated with a pack id.") + Equals(t, uint(0), actOffset) + Equals(t, uint(0), actLength) + Equals(t, dataType, actType) +} + +func TestStoreInProgressRefusesToOverwriteExistingFinalEntry(t *testing.T) { + idx := repository.NewIndex() + + blobID := randomID() + dataType := pack.Data + packID := randomID() + offset := uint(0) + length := uint(100) + idx.Store(dataType, blobID, &packID, offset, length) + + err := idx.StoreInProgress(dataType, blobID) + Assert(t, err != nil, + "index.StoreInProgress did not refuse to overwrite existing entry") +} + +func TestStoreInProgressRefusesToOverwriteExistingPreliminaryEntry(t *testing.T) { + idx := repository.NewIndex() + + blobID := randomID() + dataType := pack.Data + + _ = idx.StoreInProgress(dataType, blobID) + err := idx.StoreInProgress(dataType, blobID) + Assert(t, err != nil, + "index.StoreInProgress did not refuse to overwrite existing entry") +} diff --git a/repository/repository.go b/repository/repository.go index 0a4630cfa..c6900f43f 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -303,20 +303,31 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID return backend.ID{}, err } + // add this id to the index, although we don't know yet in which pack it + // will be saved; the entry will be updated when the pack is written. + // Note: the current id needs to be added to the index before searching + // for a suitable packer: There's a little chance that more than one + // goroutine handles the same blob concurrently. Due to idx.StoreInProgress + // locking the index and raising an error if a matching index entry + // already exists, updating the index first ensures that only one of + // those goroutines will continue. See issue restic#292. + debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t) + err = r.idx.StoreInProgress(t, *id) + if err != nil { + debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t) + return backend.ID{}, nil + } + // find suitable packer and add blob packer, err := r.findPacker(uint(len(ciphertext))) if err != nil { + r.idx.Remove(*id) return backend.ID{}, err } // save ciphertext packer.Add(t, *id, bytes.NewReader(ciphertext)) - // add this id to the index, although we don't know yet in which pack it - // will be saved, the entry will be updated when the pack is written. - r.idx.Store(t, *id, nil, 0, 0) - debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t) - // if the pack is not full enough and there are less than maxPackers // packers, put back to the list if packer.Size() < minPackSize && r.countPacker() < maxPackers {