diff --git a/repository/index.go b/repository/index.go index b53de02dd..0611ced6b 100644 --- a/repository/index.go +++ b/repository/index.go @@ -2,6 +2,7 @@ package repository import ( "encoding/json" + "errors" "fmt" "io" "sync" @@ -43,7 +44,8 @@ func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset } } -// Store remembers the id and pack in the index. +// Store remembers the id and pack in the index. An existing entry will be +// silently overwritten. func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { idx.m.Lock() defer idx.m.Unlock() @@ -54,6 +56,26 @@ func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset idx.store(t, id, pack, offset, length, false) } +// StoreInProgress adds a preliminary index entry for a blob that is about to be +// saved. The entry must be updated using Store once the the blob has been +// written to a pack. Adding an preliminary index fails if there's an existing +// entry associated with the same id. +func (idx *Index) StoreInProgress(t pack.BlobType, id backend.ID) error { + idx.m.Lock() + defer idx.m.Unlock() + + if _, hasID := idx.pack[id]; hasID { + errorMsg := fmt.Sprintf("index already contains id %v (%v)", id.Str(), t) + debug.Log("Index.StoreInProgress", errorMsg) + return errors.New(errorMsg) + } + + idx.store(t, id, nil, 0, 0, false) + debug.Log("Index.StoreInProgress", "preliminary entry added for id %v (%v)", + id.Str(), t) + return nil +} + // Remove removes the pack ID from the index. func (idx *Index) Remove(packID backend.ID) { idx.m.Lock() diff --git a/repository/index_test.go b/repository/index_test.go index f30f7beab..668acebb9 100644 --- a/repository/index_test.go +++ b/repository/index_test.go @@ -223,3 +223,68 @@ func TestIndexUnserialize(t *testing.T) { Equals(t, test.length, length) } } + +func TestStoreOverwritesPreliminaryEntry(t *testing.T) { + idx := repository.NewIndex() + + blobID := randomID() + dataType := pack.Data + idx.StoreInProgress(dataType, blobID) + + packID := randomID() + offset := uint(0) + length := uint(100) + idx.Store(dataType, blobID, &packID, offset, length) + + actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID) + OK(t, err) + Equals(t, packID, *actPackID) + Equals(t, dataType, actType) + Equals(t, offset, actOffset) + Equals(t, length, actLength) +} + +func TestStoreInProgressAddsPreliminaryEntry(t *testing.T) { + idx := repository.NewIndex() + + blobID := randomID() + dataType := pack.Data + + err := idx.StoreInProgress(dataType, blobID) + OK(t, err) + + actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID) + OK(t, err) + Assert(t, actPackID == nil, + "Preliminary index entry illegaly associated with a pack id.") + Equals(t, uint(0), actOffset) + Equals(t, uint(0), actLength) + Equals(t, dataType, actType) +} + +func TestStoreInProgressRefusesToOverwriteExistingFinalEntry(t *testing.T) { + idx := repository.NewIndex() + + blobID := randomID() + dataType := pack.Data + packID := randomID() + offset := uint(0) + length := uint(100) + idx.Store(dataType, blobID, &packID, offset, length) + + err := idx.StoreInProgress(dataType, blobID) + Assert(t, err != nil, + "index.StoreInProgress did not refuse to overwrite existing entry") +} + +func TestStoreInProgressRefusesToOverwriteExistingPreliminaryEntry(t *testing.T) { + idx := repository.NewIndex() + + blobID := randomID() + dataType := pack.Data + + _ = idx.StoreInProgress(dataType, blobID) + err := idx.StoreInProgress(dataType, blobID) + Assert(t, err != nil, + "index.StoreInProgress did not refuse to overwrite existing entry") +} diff --git a/repository/repository.go b/repository/repository.go index 0a4630cfa..c6900f43f 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -303,20 +303,31 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID return backend.ID{}, err } + // add this id to the index, although we don't know yet in which pack it + // will be saved; the entry will be updated when the pack is written. + // Note: the current id needs to be added to the index before searching + // for a suitable packer: There's a little chance that more than one + // goroutine handles the same blob concurrently. Due to idx.StoreInProgress + // locking the index and raising an error if a matching index entry + // already exists, updating the index first ensures that only one of + // those goroutines will continue. See issue restic#292. + debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t) + err = r.idx.StoreInProgress(t, *id) + if err != nil { + debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t) + return backend.ID{}, nil + } + // find suitable packer and add blob packer, err := r.findPacker(uint(len(ciphertext))) if err != nil { + r.idx.Remove(*id) return backend.ID{}, err } // save ciphertext packer.Add(t, *id, bytes.NewReader(ciphertext)) - // add this id to the index, although we don't know yet in which pack it - // will be saved, the entry will be updated when the pack is written. - r.idx.Store(t, *id, nil, 0, 0) - debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t) - // if the pack is not full enough and there are less than maxPackers // packers, put back to the list if packer.Size() < minPackSize && r.countPacker() < maxPackers {