diff --git a/src/restic/index/index.go b/src/restic/index/index.go new file mode 100644 index 000000000..9b7eed1da --- /dev/null +++ b/src/restic/index/index.go @@ -0,0 +1,147 @@ +// Package index contains various data structures for indexing content in a repository or backend. +package index + +import ( + "fmt" + "os" + "restic/backend" + "restic/debug" + "restic/pack" + "restic/repository" + "restic/worker" +) + +// Pack contains information about the contents of a pack. +type Pack struct { + Entries []pack.Blob +} + +// Index contains information about blobs and packs stored in a repo. +type Index struct { + Packs map[backend.ID]*Pack +} + +func newIndex() *Index { + return &Index{ + Packs: make(map[backend.ID]*Pack), + } +} + +// New creates a new index for repo from scratch. +func New(repo *repository.Repository) (*Index, error) { + done := make(chan struct{}) + defer close(done) + + ch := make(chan worker.Job) + go repository.ListAllPacks(repo, ch, done) + + idx := newIndex() + + for job := range ch { + packID := job.Data.(backend.ID) + if job.Error != nil { + fmt.Fprintf(os.Stderr, "unable to list pack %v: %v\n", packID.Str(), job.Error) + continue + } + + j := job.Result.(repository.ListAllPacksResult) + + debug.Log("Index.New", "pack %v contains %d blobs", packID.Str(), len(j.Entries)) + + if _, ok := idx.Packs[packID]; ok { + return nil, fmt.Errorf("pack %v processed twice", packID.Str()) + } + p := &Pack{Entries: j.Entries} + idx.Packs[packID] = p + } + + return idx, nil +} + +const loadIndexParallelism = 20 + +type packJSON struct { + ID backend.ID `json:"id"` + Blobs []blobJSON `json:"blobs"` +} + +type blobJSON struct { + ID backend.ID `json:"id"` + Type pack.BlobType `json:"type"` + Offset uint `json:"offset"` + Length uint `json:"length"` +} + +type indexJSON struct { + Supersedes backend.IDs `json:"supersedes,omitempty"` + Packs []*packJSON `json:"packs"` +} + +func loadIndexJSON(repo *repository.Repository, id backend.ID) (*indexJSON, error) { + fmt.Printf("process index %v\n", id.Str()) + + var idx indexJSON + err := repo.LoadJSONUnpacked(backend.Index, id, &idx) + if err != nil { + return nil, err + } + + return &idx, nil +} + +// Load creates an index by loading all index files from the repo. +func Load(repo *repository.Repository) (*Index, error) { + debug.Log("index.Load", "loading indexes") + + done := make(chan struct{}) + defer close(done) + + supersedes := make(map[backend.ID]backend.IDSet) + results := make(map[backend.ID]map[backend.ID]Pack) + + for id := range repo.List(backend.Index, done) { + debug.Log("index.Load", "Load index %v", id.Str()) + idx, err := loadIndexJSON(repo, id) + if err != nil { + return nil, err + } + + res := make(map[backend.ID]Pack) + supersedes[id] = backend.NewIDSet() + for _, sid := range idx.Supersedes { + debug.Log("index.Load", " index %v supersedes %v", id.Str(), sid) + supersedes[id].Insert(sid) + } + + for _, jpack := range idx.Packs { + P := Pack{} + for _, blob := range jpack.Blobs { + P.Entries = append(P.Entries, pack.Blob{ + ID: blob.ID, + Type: blob.Type, + Offset: blob.Offset, + Length: blob.Length, + }) + } + res[jpack.ID] = P + } + + results[id] = res + } + + for superID, list := range supersedes { + for indexID := range list { + debug.Log("index.Load", " removing index %v, superseded by %v", indexID.Str(), superID.Str()) + delete(results, indexID) + } + } + + idx := newIndex() + for _, packs := range results { + for id, pack := range packs { + idx.Packs[id] = &pack + } + } + + return idx, nil +} diff --git a/src/restic/index/index_test.go b/src/restic/index/index_test.go new file mode 100644 index 000000000..ad15a6a25 --- /dev/null +++ b/src/restic/index/index_test.go @@ -0,0 +1,87 @@ +package index + +import ( + "restic" + "restic/backend/local" + "restic/repository" + "testing" + "time" +) + +var ( + snapshotTime = time.Unix(1470492820, 207401672) + snapshots = 3 + depth = 3 +) + +func createFilledRepo(t testing.TB, snapshots int) (*repository.Repository, func()) { + repo, cleanup := repository.TestRepository(t) + + for i := 0; i < 3; i++ { + restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth) + } + + return repo, cleanup +} + +func TestIndexNew(t *testing.T) { + repo, cleanup := createFilledRepo(t, 3) + defer cleanup() + + idx, err := New(repo) + if err != nil { + t.Fatalf("New() returned error %v", err) + } + + if idx == nil { + t.Fatalf("New() returned nil index") + } +} + +func TestIndexLoad(t *testing.T) { + repo, cleanup := createFilledRepo(t, 3) + defer cleanup() + + idx, err := Load(repo) + if err != nil { + t.Fatalf("Load() returned error %v", err) + } + + if idx == nil { + t.Fatalf("Load() returned nil index") + } +} + +func openRepo(t testing.TB, dir, password string) *repository.Repository { + b, err := local.Open(dir) + if err != nil { + t.Fatalf("open backend %v failed: %v", dir, err) + } + + r := repository.New(b) + err = r.SearchKey(password) + if err != nil { + t.Fatalf("unable to open repo with password: %v", err) + } + + return r +} + +func BenchmarkIndexNew(b *testing.B) { + repo, cleanup := createFilledRepo(b, 3) + defer cleanup() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + idx, err := New(repo) + + if err != nil { + b.Fatalf("New() returned error %v", err) + } + + if idx == nil { + b.Fatalf("New() returned nil index") + } + } +}