Compare commits

...

13 Commits

Author SHA1 Message Date
Richard Anthony 58761f3527
Merge ac10f62295 into e4f9bce384 2024-05-10 08:36:28 +10:00
Michael Eischer e4f9bce384
Merge pull request #4792 from restic/request-watchdog
backend: enforce that backend HTTP requests make progress
2024-05-09 23:55:30 +02:00
Michael Eischer 3740700ddc add http timeouts to changelog 2024-05-09 23:46:17 +02:00
Michael Eischer ebd01a4675 backend: add tests for watchdogRoundTripper 2024-05-09 23:46:17 +02:00
Michael Eischer 8778670232 backend: cancel stuck http requests
requests that make no upload or download progress within a timeout are
canceled.
2024-05-09 23:46:17 +02:00
Michael Eischer 0987c731ec backend: configure protocol-level connection health checks
This should detect a connection that is stuck for more than 2 minutes.
2024-05-09 23:46:17 +02:00
Richard Anthony ac10f62295 fixup! Revert "feat: HTTP API" 2023-01-28 19:48:20 +11:00
Richard Anthony ad1f844cea Revert "feat: HTTP API"
This reverts commit 1b4d6e2777.
2023-01-28 06:49:31 +11:00
Richard Anthony bd25620880 cleanup 2023-01-27 19:09:03 +11:00
Richard Anthony 1b4d6e2777 feat: HTTP API
Expose internal functions to allow for http api and or SDK
2023-01-27 19:07:28 +11:00
Richard Anthony 619e243e60 wip: attempt to build for wasm 2022-12-28 10:56:35 +11:00
Richard Anthony 73c7780a03 wip: build with tinygo 2022-12-28 10:56:35 +11:00
Richard Anthony 7ca98f6cd8
Create wasi.yml 2022-12-25 13:29:59 +11:00
9 changed files with 404 additions and 2 deletions

32
activ.go Normal file
View File

@ -0,0 +1,32 @@
package restic
import (
"context"
"time"
"github.com/restic/restic/internal/restic"
)
type Snapshot struct {
paths, tags []string
hostname string
time time.Time
}
type Filter struct {
ctx context.Context
be restic.Lister
loader restic.LoaderUnpacked
hosts, snapshotIDs []string
tags restic.TagList
paths []restic.TagList
cb restic.SnapshotFindCb
}
func New(s *Snapshot) (*restic.Snapshot, error) {
return restic.NewSnapshot(s.paths, s.tags, s.hostname, s.time)
}
func Find(f *Filter) {
restic.FindFilteredSnapshots(f.ctx, f.be, f.loader, f.hosts, f.paths, f.tags, f.snapshotIDs, f.cb)
}

View File

@ -4,5 +4,14 @@ Restic now downloads pack files in large chunks instead of using a streaming
download. This prevents failures due to interrupted streams. The `restore`
command now also retries downloading individual blobs that cannot be retrieved.
HTTP requests that are stuck for more than two minutes while uploading or
downloading are now forcibly interrupted. This ensures that stuck requests are
retried after a short timeout. These new request timeouts can temporarily be
disabled by setting the environment variable
`RESTIC_FEATURES=http-timeouts=false`. Note that this feature flag will be
removed in the next minor restic version.
https://github.com/restic/restic/issues/4627
https://github.com/restic/restic/issues/4193
https://github.com/restic/restic/pull/4605
https://github.com/restic/restic/pull/4792

View File

@ -13,6 +13,8 @@ import (
"github.com/peterbourgon/unixtransport"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/feature"
"golang.org/x/net/http2"
)
// TransportOptions collects various options which can be set for an HTTP based
@ -74,7 +76,6 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) {
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
@ -83,6 +84,17 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) {
TLSClientConfig: &tls.Config{},
}
// ensure that http2 connections are closed if they are broken
h2, err := http2.ConfigureTransports(tr)
if err != nil {
panic(err)
}
if feature.Flag.Enabled(feature.HTTPTimeouts) {
h2.WriteByteTimeout = 120 * time.Second
h2.ReadIdleTimeout = 60 * time.Second
h2.PingTimeout = 60 * time.Second
}
unixtransport.Register(tr)
if opts.InsecureTLS {
@ -119,6 +131,11 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) {
tr.TLSClientConfig.RootCAs = pool
}
rt := http.RoundTripper(tr)
if feature.Flag.Enabled(feature.HTTPTimeouts) {
rt = newWatchdogRoundtripper(rt, 120*time.Second, 128*1024)
}
// wrap in the debug round tripper (if active)
return debug.RoundTripper(tr), nil
return debug.RoundTripper(rt), nil
}

View File

@ -0,0 +1,104 @@
package backend
import (
"context"
"io"
"net/http"
"time"
)
// watchdogRoundtripper cancels an http request if an upload or download did not make progress
// within timeout. The time between fully sending the request and receiving an response is also
// limited by this timeout. This ensures that stuck requests are cancelled after some time.
//
// The roundtriper makes the assumption that the upload and download happen continuously. In particular,
// the caller must not make long pauses between individual read requests from the response body.
type watchdogRoundtripper struct {
rt http.RoundTripper
timeout time.Duration
chunkSize int
}
var _ http.RoundTripper = &watchdogRoundtripper{}
func newWatchdogRoundtripper(rt http.RoundTripper, timeout time.Duration, chunkSize int) *watchdogRoundtripper {
return &watchdogRoundtripper{
rt: rt,
timeout: timeout,
chunkSize: chunkSize,
}
}
func (w *watchdogRoundtripper) RoundTrip(req *http.Request) (*http.Response, error) {
timer := time.NewTimer(w.timeout)
ctx, cancel := context.WithCancel(req.Context())
// cancel context if timer expires
go func() {
defer timer.Stop()
select {
case <-timer.C:
cancel()
case <-ctx.Done():
}
}()
kick := func() {
timer.Reset(w.timeout)
}
req = req.Clone(ctx)
if req.Body != nil {
// kick watchdog timer as long as uploading makes progress
req.Body = newWatchdogReadCloser(req.Body, w.chunkSize, kick, nil)
}
resp, err := w.rt.RoundTrip(req)
if err != nil {
return nil, err
}
// kick watchdog timer as long as downloading makes progress
// cancel context to stop goroutine once response body is closed
resp.Body = newWatchdogReadCloser(resp.Body, w.chunkSize, kick, cancel)
return resp, nil
}
func newWatchdogReadCloser(rc io.ReadCloser, chunkSize int, kick func(), close func()) *watchdogReadCloser {
return &watchdogReadCloser{
rc: rc,
chunkSize: chunkSize,
kick: kick,
close: close,
}
}
type watchdogReadCloser struct {
rc io.ReadCloser
chunkSize int
kick func()
close func()
}
var _ io.ReadCloser = &watchdogReadCloser{}
func (w *watchdogReadCloser) Read(p []byte) (n int, err error) {
w.kick()
// Read is not required to fill the whole passed in byte slice
// Thus, keep things simple and just stay within our chunkSize.
if len(p) > w.chunkSize {
p = p[:w.chunkSize]
}
n, err = w.rc.Read(p)
w.kick()
return n, err
}
func (w *watchdogReadCloser) Close() error {
if w.close != nil {
w.close()
}
return w.rc.Close()
}

View File

@ -0,0 +1,201 @@
package backend
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
rtest "github.com/restic/restic/internal/test"
)
func TestRead(t *testing.T) {
data := []byte("abcdef")
var ctr int
kick := func() {
ctr++
}
var closed bool
onClose := func() {
closed = true
}
wd := newWatchdogReadCloser(io.NopCloser(bytes.NewReader(data)), 1, kick, onClose)
out, err := io.ReadAll(wd)
rtest.OK(t, err)
rtest.Equals(t, data, out, "data mismatch")
// the EOF read also triggers the kick function
rtest.Equals(t, len(data)*2+2, ctr, "unexpected number of kick calls")
rtest.Equals(t, false, closed, "close function called too early")
rtest.OK(t, wd.Close())
rtest.Equals(t, true, closed, "close function not called")
}
func TestRoundtrip(t *testing.T) {
t.Parallel()
// at the higher delay values, it takes longer to transmit the request/response body
// than the roundTripper timeout
for _, delay := range []int{0, 1, 10, 20} {
t.Run(fmt.Sprintf("%v", delay), func(t *testing.T) {
msg := []byte("ping-pong-data")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
// slowly send the reply
for len(data) >= 2 {
_, _ = w.Write(data[:2])
w.(http.Flusher).Flush()
data = data[2:]
time.Sleep(time.Duration(delay) * time.Millisecond)
}
_, _ = w.Write(data)
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 50*time.Millisecond, 2)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), time.Duration(delay)*time.Millisecond)))
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.OK(t, err)
rtest.Equals(t, 200, resp.StatusCode, "unexpected status code")
response, err := io.ReadAll(resp.Body)
rtest.OK(t, err)
rtest.Equals(t, msg, response, "unexpected response")
rtest.OK(t, resp.Body.Close())
})
}
}
func TestCanceledRoundtrip(t *testing.T) {
rt := newWatchdogRoundtripper(http.DefaultTransport, time.Second, 2)
ctx, cancel := context.WithCancel(context.Background())
cancel()
req, err := http.NewRequestWithContext(ctx, "GET", "http://some.random.url.dfdgsfg", nil)
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.Equals(t, context.Canceled, err)
// make linter happy
if resp != nil {
rtest.OK(t, resp.Body.Close())
}
}
type slowReader struct {
data io.Reader
delay time.Duration
}
func newSlowReader(data io.Reader, delay time.Duration) *slowReader {
return &slowReader{
data: data,
delay: delay,
}
}
func (s *slowReader) Read(p []byte) (n int, err error) {
time.Sleep(s.delay)
return s.data.Read(p)
}
func TestUploadTimeout(t *testing.T) {
t.Parallel()
msg := []byte("ping")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
return
}
t.Error("upload should have been canceled")
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), 100*time.Millisecond)))
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.Equals(t, context.Canceled, err)
// make linter happy
if resp != nil {
rtest.OK(t, resp.Body.Close())
}
}
func TestProcessingTimeout(t *testing.T) {
t.Parallel()
msg := []byte("ping")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
return
}
time.Sleep(100 * time.Millisecond)
w.WriteHeader(200)
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(bytes.NewReader(msg)))
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.Equals(t, context.Canceled, err)
// make linter happy
if resp != nil {
rtest.OK(t, resp.Body.Close())
}
}
func TestDownloadTimeout(t *testing.T) {
t.Parallel()
msg := []byte("ping")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
_, _ = w.Write(data[:2])
w.(http.Flusher).Flush()
data = data[2:]
time.Sleep(100 * time.Millisecond)
_, _ = w.Write(data)
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(bytes.NewReader(msg)))
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.OK(t, err)
rtest.Equals(t, 200, resp.StatusCode, "unexpected status code")
_, err = io.ReadAll(resp.Body)
rtest.Equals(t, context.Canceled, err, "response download not canceled")
rtest.OK(t, resp.Body.Close())
}

View File

@ -8,6 +8,7 @@ const (
DeprecateLegacyIndex FlagName = "deprecate-legacy-index"
DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout"
DeviceIDForHardlinks FlagName = "device-id-for-hardlinks"
HTTPTimeouts FlagName = "http-timeouts"
)
func init() {
@ -15,5 +16,6 @@ func init() {
DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."},
DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."},
DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"},
HTTPTimeouts: {Type: Beta, Description: "enforce timeouts for stuck HTTP requests."},
})
}

19
internal/fs/const_js.go Normal file
View File

@ -0,0 +1,19 @@
//go:build js
// +build js
package fs
// Flags to OpenFile wrapping those of the underlying system. Not all flags may
// be implemented on a given system.
const (
O_RDONLY int = 0
O_WRONLY int = 0
O_RDWR int = 0
O_APPEND int = 0
O_CREATE int = 0
O_EXCL int = 0
O_SYNC int = 0
O_TRUNC int = 0
O_NONBLOCK int = 0
O_NOFOLLOW int = 0
)

15
internal/fs/stat_js.go Normal file
View File

@ -0,0 +1,15 @@
//go:build js
// +build js
package fs
import (
"os"
// "syscall"
// "time"
)
// extendedStat extracts info into an ExtendedFileInfo for unix based operating systems.
func extendedStat(fi os.FileInfo) ExtendedFileInfo {
return ExtendedFileInfo{}
}

View File

@ -0,0 +1,3 @@
package signals
func setupSignals() {}