resumable.go raw

   1  // Copyright 2016 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package gensupport
   6  
   7  import (
   8  	"context"
   9  	"encoding/base64"
  10  	"encoding/binary"
  11  	"errors"
  12  	"fmt"
  13  	"io"
  14  	"net/http"
  15  	"strings"
  16  	"sync"
  17  	"time"
  18  
  19  	"github.com/google/uuid"
  20  	"google.golang.org/api/internal"
  21  )
  22  
  23  const (
  24  	crc32cPrefix  = "crc32c"
  25  	hashHeaderKey = "X-Goog-Hash"
  26  )
  27  
  28  // ResumableUpload is used by the generated APIs to provide resumable uploads.
  29  // It is not used by developers directly.
  30  type ResumableUpload struct {
  31  	Client *http.Client
  32  	// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
  33  	URI       string
  34  	UserAgent string // User-Agent for header of the request
  35  	// Media is the object being uploaded.
  36  	Media *MediaBuffer
  37  	// MediaType defines the media type, e.g. "image/jpeg".
  38  	MediaType string
  39  
  40  	mu       sync.Mutex // guards progress
  41  	progress int64      // number of bytes uploaded so far
  42  
  43  	// Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
  44  	Callback func(int64)
  45  
  46  	// Retry optionally configures retries for requests made against the upload.
  47  	Retry *RetryConfig
  48  
  49  	// ChunkRetryDeadline configures the per-chunk deadline after which no further
  50  	// retries should happen.
  51  	ChunkRetryDeadline time.Duration
  52  
  53  	// ChunkTransferTimeout configures the per-chunk transfer timeout. If a chunk upload stalls for longer than
  54  	// this duration, the upload will be retried.
  55  	ChunkTransferTimeout time.Duration
  56  
  57  	// Track current request invocation ID and attempt count for retry metrics
  58  	// and idempotency headers.
  59  	invocationID string
  60  	attempts     int
  61  }
  62  
  63  // Progress returns the number of bytes uploaded at this point.
  64  func (rx *ResumableUpload) Progress() int64 {
  65  	rx.mu.Lock()
  66  	defer rx.mu.Unlock()
  67  	return rx.progress
  68  }
  69  
  70  // doUploadRequest performs a single HTTP request to upload data.
  71  // off specifies the offset in rx.Media from which data is drawn.
  72  // size is the number of bytes in data.
  73  // final specifies whether data is the final chunk to be uploaded.
  74  func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
  75  	req, err := http.NewRequest("POST", rx.URI, data)
  76  	if err != nil {
  77  		return nil, err
  78  	}
  79  
  80  	req.ContentLength = size
  81  	var contentRange string
  82  	if final {
  83  		if size == 0 {
  84  			contentRange = fmt.Sprintf("bytes */%v", off)
  85  		} else {
  86  			contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
  87  		}
  88  	} else {
  89  		contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
  90  	}
  91  	req.Header.Set("Content-Range", contentRange)
  92  	req.Header.Set("Content-Type", rx.MediaType)
  93  	req.Header.Set("User-Agent", rx.UserAgent)
  94  
  95  	// TODO(b/274504690): Consider dropping gccl-invocation-id key since it
  96  	// duplicates the X-Goog-Gcs-Idempotency-Token header (added in v0.115.0).
  97  	baseXGoogHeader := "gl-go/" + GoVersion() + " gdcl/" + internal.Version
  98  	invocationHeader := fmt.Sprintf("gccl-invocation-id/%s gccl-attempt-count/%d", rx.invocationID, rx.attempts)
  99  	req.Header.Set("X-Goog-Api-Client", strings.Join([]string{baseXGoogHeader, invocationHeader}, " "))
 100  
 101  	// Set idempotency token header which is used by GCS uploads.
 102  	req.Header.Set("X-Goog-Gcs-Idempotency-Token", rx.invocationID)
 103  
 104  	// Google's upload endpoint uses status code 308 for a
 105  	// different purpose than the "308 Permanent Redirect"
 106  	// since-standardized in RFC 7238. Because of the conflict in
 107  	// semantics, Google added this new request header which
 108  	// causes it to not use "308" and instead reply with 200 OK
 109  	// and sets the upload-specific "X-HTTP-Status-Code-Override:
 110  	// 308" response header.
 111  	req.Header.Set("X-GUploader-No-308", "yes")
 112  
 113  	// Server accepts checksum only on final request through header.
 114  	if final && rx.Media.enableAutoChecksum {
 115  		req.Header.Set(hashHeaderKey, fmt.Sprintf("%v=%v", crc32cPrefix, encodeUint32(rx.Media.fullObjectChecksum)))
 116  	}
 117  
 118  	return SendRequest(ctx, rx.Client, req)
 119  }
 120  
 121  func statusResumeIncomplete(resp *http.Response) bool {
 122  	// This is how the server signals "status resume incomplete"
 123  	// when X-GUploader-No-308 is set to "yes":
 124  	return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
 125  }
 126  
 127  // reportProgress calls a user-supplied callback to report upload progress.
 128  // If old==updated, the callback is not called.
 129  func (rx *ResumableUpload) reportProgress(old, updated int64) {
 130  	if updated-old == 0 {
 131  		return
 132  	}
 133  	rx.mu.Lock()
 134  	rx.progress = updated
 135  	rx.mu.Unlock()
 136  	if rx.Callback != nil {
 137  		rx.Callback(updated)
 138  	}
 139  }
 140  
 141  // transferChunk performs a single HTTP request to upload a single chunk.
 142  // It uses a goroutine to perform the upload and a timer to enforce ChunkTransferTimeout.
 143  func (rx *ResumableUpload) transferChunk(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
 144  	// If no timeout is specified, perform the request synchronously without a timer.
 145  	if rx.ChunkTransferTimeout == 0 {
 146  		res, err := rx.doUploadRequest(ctx, chunk, off, size, done)
 147  		if err != nil {
 148  			return res, err
 149  		}
 150  		return res, nil
 151  	}
 152  
 153  	// Start a timer for the ChunkTransferTimeout duration.
 154  	timer := time.NewTimer(rx.ChunkTransferTimeout)
 155  
 156  	// A struct to hold the result from the goroutine.
 157  	type uploadResult struct {
 158  		res *http.Response
 159  		err error
 160  	}
 161  
 162  	// A buffered channel to receive the result of the upload.
 163  	resultCh := make(chan uploadResult, 1)
 164  
 165  	// Create a cancellable context for the upload request. This allows us to
 166  	// abort the request if the timer fires first.
 167  	rCtx, cancel := context.WithCancel(ctx)
 168  	// NOTE: We do NOT use `defer cancel()` here. The context must remain valid
 169  	// for the caller to read the response body of a successful request.
 170  	// Cancellation is handled manually on timeout paths.
 171  
 172  	// Starting the chunk upload in parallel.
 173  	go func() {
 174  		res, err := rx.doUploadRequest(rCtx, chunk, off, size, done)
 175  		resultCh <- uploadResult{res: res, err: err}
 176  	}()
 177  
 178  	// Wait for timer to fire or result channel to have the uploadResult or ctx to be cancelled.
 179  	select {
 180  	// Note: Calling cancel() will guarantee that the goroutine finishes,
 181  	// so these two cases will never block forever on draining the resultCh.
 182  	case <-ctx.Done():
 183  		// Context is cancelled for the overall upload.
 184  		cancel()
 185  		// Drain resultCh.
 186  		<-resultCh
 187  		return nil, ctx.Err()
 188  	case <-timer.C:
 189  		// Chunk Transfer timer fired before resultCh so we return context.DeadlineExceeded.
 190  		cancel()
 191  		// Drain resultCh.
 192  		<-resultCh
 193  		return nil, context.DeadlineExceeded
 194  	case result := <-resultCh:
 195  		// Handle the result from the upload.
 196  		if result.err != nil {
 197  			return result.res, result.err
 198  		}
 199  		return result.res, nil
 200  	}
 201  }
 202  
 203  // uploadChunkWithRetries attempts to upload a single chunk, with retries
 204  // within ChunkRetryDeadline if ChunkTransferTimeout is non-zero.
 205  func (rx *ResumableUpload) uploadChunkWithRetries(ctx context.Context, chunk io.Reader, off, size int64, done bool) (*http.Response, error) {
 206  	// Configure error retryable criteria.
 207  	shouldRetry := rx.Retry.errorFunc()
 208  
 209  	// Configure single chunk retry deadline.
 210  	chunkRetryDeadline := defaultRetryDeadline
 211  	if rx.ChunkRetryDeadline != 0 {
 212  		chunkRetryDeadline = rx.ChunkRetryDeadline
 213  	}
 214  
 215  	// Each chunk gets its own initialized-at-zero backoff and invocation ID.
 216  	bo := rx.Retry.backoff()
 217  	quitAfterTimer := time.NewTimer(chunkRetryDeadline)
 218  	defer quitAfterTimer.Stop()
 219  	rx.attempts = 1
 220  	rx.invocationID = uuid.New().String()
 221  
 222  	var pause time.Duration
 223  	var resp *http.Response
 224  	var err error
 225  
 226  	// Retry loop for a single chunk.
 227  	for {
 228  		// Wait for the backoff period, unless the context is canceled or the
 229  		// retry deadline is hit.
 230  		backoffPauseTimer := time.NewTimer(pause)
 231  		select {
 232  		case <-ctx.Done():
 233  			backoffPauseTimer.Stop()
 234  			if err == nil {
 235  				err = ctx.Err()
 236  			}
 237  			return resp, err
 238  		case <-backoffPauseTimer.C:
 239  		case <-quitAfterTimer.C:
 240  			backoffPauseTimer.Stop()
 241  			return resp, err
 242  		}
 243  		backoffPauseTimer.Stop()
 244  
 245  		// Check for context cancellation or timeout once more. If more than one
 246  		// case in the select statement above was satisfied at the same time, Go
 247  		// will choose one arbitrarily.
 248  		// That can cause an operation to go through even if the context was
 249  		// canceled before or the timeout was reached.
 250  		select {
 251  		case <-ctx.Done():
 252  			if err == nil {
 253  				err = ctx.Err()
 254  			}
 255  			return resp, err
 256  		case <-quitAfterTimer.C:
 257  			return resp, err
 258  		default:
 259  		}
 260  
 261  		// We close the response's body here, since we definitely will not
 262  		// return `resp` now. If we close it before the select case above, a
 263  		// timer may fire and cause us to return a response with a closed body
 264  		// (in which case, the caller will not get the error message in the body).
 265  		if resp != nil && resp.Body != nil {
 266  			// Read the body to EOF - if the Body is not both read to EOF and closed,
 267  			// the Client's underlying RoundTripper may not be able to re-use the
 268  			// persistent TCP connection to the server for a subsequent "keep-alive" request.
 269  			// See https://pkg.go.dev/net/http#Client.Do
 270  			io.Copy(io.Discard, resp.Body)
 271  			resp.Body.Close()
 272  		}
 273  
 274  		resp, err = rx.transferChunk(ctx, chunk, off, size, done)
 275  		status := 0
 276  		if resp != nil {
 277  			status = resp.StatusCode
 278  		}
 279  		// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
 280  		// this file), so we don't expect to get a 308.
 281  		if status == 308 {
 282  			return nil, errors.New("unexpected 308 response status code")
 283  		}
 284  		// Chunk upload should be retried if the ChunkTransferTimeout is non-zero and err is context deadline exceeded
 285  		// or we encounter a retryable error.
 286  		if (rx.ChunkTransferTimeout != 0 && errors.Is(err, context.DeadlineExceeded)) || shouldRetry(status, err) {
 287  			rx.attempts++
 288  			pause = bo.Pause()
 289  			chunk, _, _, _ = rx.Media.Chunk()
 290  			continue
 291  		}
 292  		return resp, err
 293  	}
 294  }
 295  
 296  // Upload starts the process of a resumable upload with a cancellable context.
 297  // It is called from the auto-generated API code and is not visible to the user.
 298  // Before sending an HTTP request, Upload calls any registered hook functions,
 299  // and calls the returned functions after the request returns (see send.go).
 300  // rx is private to the auto-generated API code.
 301  // Exactly one of resp or err will be nil.  If resp is non-nil, the caller must call resp.Body.Close.
 302  // Upload does not parse the response into the error on a non 200 response;
 303  // it is the caller's responsibility to call resp.Body.Close.
 304  func (rx *ResumableUpload) Upload(ctx context.Context) (*http.Response, error) {
 305  	for {
 306  		chunk, off, size, err := rx.Media.Chunk()
 307  		done := err == io.EOF
 308  		if !done && err != nil {
 309  			return nil, err
 310  		}
 311  
 312  		resp, err := rx.uploadChunkWithRetries(ctx, chunk, off, int64(size), done)
 313  		// There are a couple of cases where it's possible for err and resp to both
 314  		// be non-nil. However, we expose a simpler contract to our callers: exactly
 315  		// one of resp and err will be non-nil. This means that any response body
 316  		// must be closed here before returning a non-nil error.
 317  		if err != nil {
 318  			if resp != nil && resp.Body != nil {
 319  				resp.Body.Close()
 320  			}
 321  			// If there were retries, indicate this in the error message and wrap the final error.
 322  			if rx.attempts > 1 {
 323  				return nil, fmt.Errorf("chunk upload failed after %d attempts, final error: %w", rx.attempts, err)
 324  			}
 325  			return nil, err
 326  		}
 327  
 328  		// This case is very unlikely but possible only if rx.ChunkRetryDeadline is
 329  		// set to a very small value, in which case no requests will be sent before
 330  		// the deadline. Return an error to avoid causing a panic.
 331  		if resp == nil {
 332  			return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDeadline", rx.URI)
 333  		}
 334  		if resp.StatusCode == http.StatusOK {
 335  			rx.reportProgress(off, off+int64(size))
 336  		}
 337  		if statusResumeIncomplete(resp) {
 338  			// The upload is not yet complete, but the server has acknowledged this chunk.
 339  			// We don't have anything to do with the response body.
 340  			if resp.Body != nil {
 341  				io.Copy(io.Discard, resp.Body)
 342  				resp.Body.Close()
 343  			}
 344  			rx.Media.Next()
 345  			continue
 346  		}
 347  		return resp, nil
 348  	}
 349  }
 350  
 351  // Encode a uint32 as Base64 in big-endian byte order.
 352  func encodeUint32(u uint32) string {
 353  	b := make([]byte, 4)
 354  	binary.BigEndian.PutUint32(b, u)
 355  	return base64.StdEncoding.EncodeToString(b)
 356  }
 357