From 2f1137bac49d6b65e632d5d11224937d4ffae20d Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 15 Feb 2015 12:57:09 +0100 Subject: [PATCH] Add pipe package --- pipe/generic_test.go | 36 ++++++++ pipe/pipe.go | 101 ++++++++++++++++++++++ pipe/pipe_test.go | 202 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 339 insertions(+) create mode 100644 pipe/generic_test.go create mode 100644 pipe/pipe.go create mode 100644 pipe/pipe_test.go diff --git a/pipe/generic_test.go b/pipe/generic_test.go new file mode 100644 index 000000000..983f3e2fc --- /dev/null +++ b/pipe/generic_test.go @@ -0,0 +1,36 @@ +package pipe_test + +import ( + "fmt" + "path/filepath" + "reflect" + "runtime" + "testing" +) + +// assert fails the test if the condition is false. +func assert(tb testing.TB, condition bool, msg string, v ...interface{}) { + if !condition { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...) + tb.FailNow() + } +} + +// ok fails the test if an err is not nil. +func ok(tb testing.TB, err error) { + if err != nil { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error()) + tb.FailNow() + } +} + +// equals fails the test if exp is not equal to act. +func equals(tb testing.TB, exp, act interface{}) { + if !reflect.DeepEqual(exp, act) { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act) + tb.FailNow() + } +} diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 000000000..b8a08ec1d --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,101 @@ +package pipe + +import ( + "fmt" + "os" + "path/filepath" + "sort" +) + +type Entry struct { + Path string + Info os.FileInfo + Error error + Result chan<- interface{} +} + +type Dir struct { + Path string + Error error + + Entries [](<-chan interface{}) + Result chan<- interface{} +} + +// readDirNames reads the directory named by dirname and returns +// a sorted list of directory entries. +// taken from filepath/path.go +func readDirNames(dirname string) ([]string, error) { + f, err := os.Open(dirname) + if err != nil { + return nil, err + } + names, err := f.Readdirnames(-1) + f.Close() + if err != nil { + return nil, err + } + sort.Strings(names) + return names, nil +} + +func isDir(fi os.FileInfo) bool { + return fi.IsDir() +} + +func isFile(fi os.FileInfo) bool { + return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0 +} + +func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir, res chan<- interface{}) error { + info, err := os.Lstat(path) + if err != nil { + return err + } + + if !info.IsDir() { + return fmt.Errorf("path is not a directory, cannot walk: %s", path) + } + + names, err := readDirNames(path) + if err != nil { + dirCh <- Dir{Path: path, Error: err} + return err + } + + entries := make([]<-chan interface{}, 0, len(names)) + + for _, name := range names { + subpath := filepath.Join(path, name) + ch := make(chan interface{}, 1) + + fi, err := os.Lstat(subpath) + if err != nil { + entries = append(entries, ch) + entCh <- Entry{Info: fi, Error: err, Result: ch} + continue + } + + if isFile(fi) { + ch := make(chan interface{}, 1) + entCh <- Entry{Info: fi, Path: subpath, Result: ch} + } else if isDir(fi) { + ch := make(chan interface{}, 1) + entries = append(entries, ch) + walk(subpath, done, entCh, dirCh, ch) + } + } + + dirCh <- Dir{Path: path, Entries: entries, Result: res} + return nil +} + +// Walk takes a path and sends a Job for each file and directory it finds below +// the path. When the channel done is closed, processing stops. +func Walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir) (<-chan interface{}, error) { + resCh := make(chan interface{}, 1) + err := walk(path, done, entCh, dirCh, resCh) + close(entCh) + close(dirCh) + return resCh, err +} diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go new file mode 100644 index 000000000..1564b2c4f --- /dev/null +++ b/pipe/pipe_test.go @@ -0,0 +1,202 @@ +package pipe_test + +import ( + "flag" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/restic/restic/pipe" +) + +var testWalkerPath = flag.String("test.walkerpath", ".", "pipeline walker testpath (default: .)") +var maxWorkers = flag.Int("test.workers", 100, "max concurrency (default: 100)") + +func isFile(fi os.FileInfo) bool { + return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0 +} + +type stats struct { + dirs, files int +} + +func statPath(path string) (stats, error) { + var s stats + + // count files and directories with filepath.Walk() + err := filepath.Walk(*testWalkerPath, func(p string, fi os.FileInfo, err error) error { + if fi == nil { + return err + } + + if fi.IsDir() { + s.dirs++ + } else if isFile(fi) { + s.files++ + } + + return err + }) + + return s, err +} + +func TestPipelineWalker(t *testing.T) { + if *testWalkerPath == "" { + t.Skipf("walkerpah not set, skipping TestPipelineWalker") + } + + before, err := statPath(*testWalkerPath) + ok(t, err) + + t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath, + before.dirs, before.files) + + after := stats{} + m := sync.Mutex{} + + var wg sync.WaitGroup + worker := func(done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) { + defer wg.Done() + for { + select { + case e, ok := <-entCh: + if !ok { + // channel is closed + return + } + + m.Lock() + after.files++ + m.Unlock() + + e.Result <- true + + case dir, ok := <-dirCh: + if !ok { + // channel is closed + return + } + + // wait for all content + for _, ch := range dir.Entries { + <-ch + } + + m.Lock() + after.dirs++ + m.Unlock() + + dir.Result <- true + case <-done: + // pipeline was cancelled + return + } + } + } + + done := make(chan struct{}) + entCh := make(chan pipe.Entry) + dirCh := make(chan pipe.Dir) + + for i := 0; i < *maxWorkers; i++ { + wg.Add(1) + go worker(done, entCh, dirCh) + } + + resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh) + ok(t, err) + + // wait for all workers to terminate + wg.Wait() + + // wait for top-level blob + <-resCh + + t.Logf("walked path %s with %d dirs, %d files", *testWalkerPath, + after.dirs, after.files) + + assert(t, before == after, "stats do not match, expected %v, got %v", before, after) +} + +func BenchmarkPipelineWalker(b *testing.B) { + if *testWalkerPath == "" { + b.Skipf("walkerpah not set, skipping BenchPipelineWalker") + } + + var max time.Duration + m := sync.Mutex{} + + worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) { + defer wg.Done() + for { + select { + case e, ok := <-entCh: + if !ok { + // channel is closed + return + } + + // fmt.Printf("file: %v\n", j.Path) + + // simulate backup + time.Sleep(10 * time.Millisecond) + + e.Result <- true + + case dir, ok := <-dirCh: + if !ok { + // channel is closed + return + } + + start := time.Now() + + // wait for all content + for _, ch := range dir.Entries { + <-ch + } + + d := time.Since(start) + m.Lock() + if d > max { + max = d + } + + // fmt.Printf("dir %v: %v\n", d, j.Path) + m.Unlock() + + dir.Result <- true + case <-done: + // pipeline was cancelled + return + } + } + } + + for i := 0; i < b.N; i++ { + done := make(chan struct{}) + entCh := make(chan pipe.Entry, 100) + dirCh := make(chan pipe.Dir, 100) + + var wg sync.WaitGroup + b.Logf("starting %d workers", *maxWorkers) + for i := 0; i < *maxWorkers; i++ { + wg.Add(1) + go worker(&wg, done, entCh, dirCh) + } + + resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh) + ok(b, err) + + // wait for all workers to terminate + wg.Wait() + + // wait for final result + <-resCh + } + + b.Logf("max duration for a dir: %v", max) +}