diff --git a/archiver_test.go b/archiver_test.go index 63531b0b4..b519db120 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -4,8 +4,8 @@ import ( "bytes" "crypto/sha256" "io" - "math" "testing" + "time" "github.com/restic/chunker" "github.com/restic/restic" @@ -242,47 +242,48 @@ func BenchmarkLoadTree(t *testing.B) { } } -// Saves several identical chunks concurrently and later check that there are no +// Saves several identical chunks concurrently and later checks that there are no // unreferenced packs in the repository. See also #292 and #358. -// The combination of high duplication and high concurrency should provoke any -// issues leading to unreferenced packs. -func TestParallelSaveWithHighDuplication(t *testing.T) { +func TestParallelSaveWithDuplication(t *testing.T) { + for seed := 0; seed < 5; seed++ { + testParallelSaveWithDuplication(t, seed) + } +} + +func testParallelSaveWithDuplication(t *testing.T, seed int) { repo := SetupRepo() defer TeardownRepo(repo) - // For every seed a pseudo-random 32Mb blob is generated and split into - // chunks. During the test all chunks of all blobs are processed in parallel - // goroutines. To increase duplication, each chunk is processed - // times. Concurrency can be limited by changing . - // Note: seeds 5, 3, 66, 4, 12 produce the most chunks (descending) - seeds := []int{5, 3, 66, 4, 12} - maxParallel := math.MaxInt32 - duplication := 15 + dataSizeMb := 92 + duplication := 7 arch := restic.NewArchiver(repo) - data := getRandomData(seeds) + data, chunks := getRandomData(seed, dataSizeMb*1024*1024) + reader := bytes.NewReader(data) - barrier := make(chan struct{}, maxParallel) errChannels := [](<-chan error){} - for _, d := range data { - for _, c := range d.chunks { - for dupIdx := 0; dupIdx < duplication; dupIdx++ { - errChan := make(chan error) - errChannels = append(errChannels, errChan) + // interweaved processing of subsequent chunks + maxParallel := 2*duplication - 1 + barrier := make(chan struct{}, maxParallel) - go func(buf *[]byte, c *chunker.Chunk, errChan chan<- error) { - barrier <- struct{}{} + for _, c := range chunks { + for dupIdx := 0; dupIdx < duplication; dupIdx++ { + errChan := make(chan error) + errChannels = append(errChannels, errChan) - hash := c.Digest - id := backend.ID{} - copy(id[:], hash) + go func(reader *bytes.Reader, c *chunker.Chunk, errChan chan<- error) { + barrier <- struct{}{} - err := arch.Save(pack.Data, id, c.Length, c.Reader(bytes.NewReader(*buf))) - <-barrier - errChan <- err - }(&d.buf, c, errChan) - } + hash := c.Digest + id := backend.ID{} + copy(id[:], hash) + + time.Sleep(time.Duration(hash[0])) + err := arch.Save(pack.Data, id, c.Length, c.Reader(reader)) + <-barrier + errChan <- err + }(reader, c, errChan) } } @@ -297,34 +298,20 @@ func TestParallelSaveWithHighDuplication(t *testing.T) { assertNoUnreferencedPacks(t, chkr) } -func getRandomData(seeds []int) []*chunkedData { - chunks := []*chunkedData{} - sem := make(chan struct{}, len(seeds)) +func getRandomData(seed int, size int) ([]byte, []*chunker.Chunk) { + buf := Random(seed, size) + chunks := []*chunker.Chunk{} + chunker := chunker.New(bytes.NewReader(buf), testPol, sha256.New()) - for seed := range seeds { - c := &chunkedData{} + for { + c, err := chunker.Next() + if err == io.EOF { + break + } chunks = append(chunks, c) - - go func(seed int, data *chunkedData) { - data.buf = Random(seed, 32*1024*1024) - chunker := chunker.New(bytes.NewReader(data.buf), testPol, sha256.New()) - - for { - c, err := chunker.Next() - if err == io.EOF { - break - } - data.chunks = append(data.chunks, c) - } - - sem <- struct{}{} - }(seed, c) } - for i := 0; i < len(seeds); i++ { - <-sem - } - return chunks + return buf, chunks } func createAndInitChecker(t *testing.T, repo *repository.Repository) *checker.Checker {