media.go raw

   1  // Copyright 2016 The Go Authors. All rights reserved.
   2  // Use of this source code is governed by a BSD-style
   3  // license that can be found in the LICENSE file.
   4  
   5  package gensupport
   6  
   7  import (
   8  	"bytes"
   9  	"fmt"
  10  	"io"
  11  	"mime"
  12  	"mime/multipart"
  13  	"net/http"
  14  	"net/textproto"
  15  	"strings"
  16  	"sync"
  17  	"time"
  18  
  19  	gax "github.com/googleapis/gax-go/v2"
  20  	"google.golang.org/api/googleapi"
  21  )
  22  
  23  type typeReader struct {
  24  	io.Reader
  25  	typ string
  26  }
  27  
  28  // multipartReader combines the contents of multiple readers to create a multipart/related HTTP body.
  29  // Close must be called if reads from the multipartReader are abandoned before reaching EOF.
  30  type multipartReader struct {
  31  	pr       *io.PipeReader
  32  	ctype    string
  33  	mu       sync.Mutex
  34  	pipeOpen bool
  35  }
  36  
  37  // boundary optionally specifies the MIME boundary
  38  func newMultipartReader(parts []typeReader, boundary string) *multipartReader {
  39  	mp := &multipartReader{pipeOpen: true}
  40  	var pw *io.PipeWriter
  41  	mp.pr, pw = io.Pipe()
  42  	mpw := multipart.NewWriter(pw)
  43  	if boundary != "" {
  44  		mpw.SetBoundary(boundary)
  45  	}
  46  	mp.ctype = "multipart/related; boundary=" + mpw.Boundary()
  47  	go func() {
  48  		for _, part := range parts {
  49  			w, err := mpw.CreatePart(typeHeader(part.typ))
  50  			if err != nil {
  51  				mpw.Close()
  52  				pw.CloseWithError(fmt.Errorf("googleapi: CreatePart failed: %v", err))
  53  				return
  54  			}
  55  			_, err = io.Copy(w, part.Reader)
  56  			if err != nil {
  57  				mpw.Close()
  58  				pw.CloseWithError(fmt.Errorf("googleapi: Copy failed: %v", err))
  59  				return
  60  			}
  61  		}
  62  
  63  		mpw.Close()
  64  		pw.Close()
  65  	}()
  66  	return mp
  67  }
  68  
  69  func (mp *multipartReader) Read(data []byte) (n int, err error) {
  70  	return mp.pr.Read(data)
  71  }
  72  
  73  func (mp *multipartReader) Close() error {
  74  	mp.mu.Lock()
  75  	if !mp.pipeOpen {
  76  		mp.mu.Unlock()
  77  		return nil
  78  	}
  79  	mp.pipeOpen = false
  80  	mp.mu.Unlock()
  81  	return mp.pr.Close()
  82  }
  83  
  84  // CombineBodyMedia combines a json body with media content to create a multipart/related HTTP body.
  85  // It returns a ReadCloser containing the combined body, and the overall "multipart/related" content type, with random boundary.
  86  //
  87  // The caller must call Close on the returned ReadCloser if reads are abandoned before reaching EOF.
  88  func CombineBodyMedia(body io.Reader, bodyContentType string, media io.Reader, mediaContentType string) (io.ReadCloser, string) {
  89  	return combineBodyMedia(body, bodyContentType, media, mediaContentType, "")
  90  }
  91  
  92  // combineBodyMedia is CombineBodyMedia but with an optional mimeBoundary field.
  93  func combineBodyMedia(body io.Reader, bodyContentType string, media io.Reader, mediaContentType, mimeBoundary string) (io.ReadCloser, string) {
  94  	mp := newMultipartReader([]typeReader{
  95  		{body, bodyContentType},
  96  		{media, mediaContentType},
  97  	}, mimeBoundary)
  98  	return mp, mp.ctype
  99  }
 100  
 101  func typeHeader(contentType string) textproto.MIMEHeader {
 102  	h := make(textproto.MIMEHeader)
 103  	if contentType != "" {
 104  		h.Set("Content-Type", contentType)
 105  	}
 106  	return h
 107  }
 108  
 109  // PrepareUpload determines whether the data in the supplied reader should be
 110  // uploaded in a single request, or in sequential chunks.
 111  // chunkSize is the size of the chunk that media should be split into.
 112  //
 113  // If chunkSize is zero, media is returned as the first value, and the other
 114  // two return values are nil, true.
 115  //
 116  // Otherwise, a MediaBuffer is returned, along with a bool indicating whether the
 117  // contents of media fit in a single chunk.
 118  //
 119  // After PrepareUpload has been called, media should no longer be used: the
 120  // media content should be accessed via one of the return values.
 121  func PrepareUpload(media io.Reader, chunkSize int, enableAutoChecksum bool) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
 122  	if chunkSize == 0 { // do not chunk
 123  		return media, nil, true
 124  	}
 125  	mb = NewMediaBuffer(media, chunkSize)
 126  	mb.enableAutoChecksum = enableAutoChecksum
 127  	_, _, _, err := mb.Chunk()
 128  	// If err is io.EOF, we can upload this in a single request. Otherwise, err is
 129  	// either nil or a non-EOF error. If it is the latter, then the next call to
 130  	// mb.Chunk will return the same error. Returning a MediaBuffer ensures that this
 131  	// error will be handled at some point.
 132  	return nil, mb, err == io.EOF
 133  }
 134  
 135  // MediaInfo holds information for media uploads. It is intended for use by generated
 136  // code only.
 137  type MediaInfo struct {
 138  	// At most one of Media and MediaBuffer will be set.
 139  	media                io.Reader
 140  	buffer               *MediaBuffer
 141  	singleChunk          bool
 142  	mType                string
 143  	size                 int64 // mediaSize, if known.  Used only for calls to progressUpdater_.
 144  	progressUpdater      googleapi.ProgressUpdater
 145  	chunkRetryDeadline   time.Duration
 146  	chunkTransferTimeout time.Duration
 147  }
 148  
 149  // NewInfoFromMedia should be invoked from the Media method of a call. It returns a
 150  // MediaInfo populated with chunk size and content type, and a reader or MediaBuffer
 151  // if needed.
 152  func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
 153  	mi := &MediaInfo{}
 154  	opts := googleapi.ProcessMediaOptions(options)
 155  	if !opts.ForceEmptyContentType {
 156  		mi.mType = opts.ContentType
 157  		if mi.mType == "" {
 158  			r, mi.mType = gax.DetermineContentType(r)
 159  		}
 160  	}
 161  	mi.chunkRetryDeadline = opts.ChunkRetryDeadline
 162  	mi.chunkTransferTimeout = opts.ChunkTransferTimeout
 163  	mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize, opts.EnableAutoChecksum)
 164  	return mi
 165  }
 166  
 167  // NewInfoFromResumableMedia should be invoked from the ResumableMedia method of a
 168  // call. It returns a MediaInfo using the given reader, size and media type.
 169  func NewInfoFromResumableMedia(r io.ReaderAt, size int64, mediaType string) *MediaInfo {
 170  	rdr := ReaderAtToReader(r, size)
 171  	mType := mediaType
 172  	if mType == "" {
 173  		rdr, mType = gax.DetermineContentType(rdr)
 174  	}
 175  
 176  	return &MediaInfo{
 177  		size:        size,
 178  		mType:       mType,
 179  		buffer:      NewMediaBuffer(rdr, googleapi.DefaultUploadChunkSize),
 180  		media:       nil,
 181  		singleChunk: false,
 182  	}
 183  }
 184  
 185  // SetProgressUpdater sets the progress updater for the media info.
 186  func (mi *MediaInfo) SetProgressUpdater(pu googleapi.ProgressUpdater) {
 187  	if mi != nil {
 188  		mi.progressUpdater = pu
 189  	}
 190  }
 191  
 192  // UploadType determines the type of upload: a single request, or a resumable
 193  // series of requests.
 194  func (mi *MediaInfo) UploadType() string {
 195  	if mi.singleChunk {
 196  		return "multipart"
 197  	}
 198  	return "resumable"
 199  }
 200  
 201  // UploadRequest sets up an HTTP request for media upload. It adds headers
 202  // as necessary, and returns a replacement for the body and a function for http.Request.GetBody.
 203  func (mi *MediaInfo) UploadRequest(reqHeaders http.Header, body io.Reader) (newBody io.Reader, getBody func() (io.ReadCloser, error), cleanup func()) {
 204  	if body == nil {
 205  		body = new(bytes.Buffer)
 206  	}
 207  	cleanup = func() {}
 208  	if mi == nil {
 209  		return body, nil, cleanup
 210  	}
 211  	var media io.Reader
 212  	if mi.media != nil {
 213  		// This only happens when the caller has turned off chunking. In that
 214  		// case, we write all of media in a single non-retryable request.
 215  		media = mi.media
 216  	} else if mi.singleChunk {
 217  		// The data fits in a single chunk, which has now been read into the MediaBuffer.
 218  		// We obtain that chunk so we can write it in a single request. The request can
 219  		// be retried because the data is stored in the MediaBuffer.
 220  		media, _, _, _ = mi.buffer.Chunk()
 221  	}
 222  	toCleanup := []io.Closer{}
 223  	if media != nil {
 224  		fb := readerFunc(body)
 225  		fm := readerFunc(media)
 226  		combined, ctype := CombineBodyMedia(body, "application/json", media, mi.mType)
 227  		toCleanup = append(toCleanup, combined)
 228  		if fb != nil && fm != nil {
 229  			getBody = func() (io.ReadCloser, error) {
 230  				rb := io.NopCloser(fb())
 231  				rm := io.NopCloser(fm())
 232  				var mimeBoundary string
 233  				if _, params, err := mime.ParseMediaType(ctype); err == nil {
 234  					mimeBoundary = params["boundary"]
 235  				}
 236  				r, _ := combineBodyMedia(rb, "application/json", rm, mi.mType, mimeBoundary)
 237  				toCleanup = append(toCleanup, r)
 238  				return r, nil
 239  			}
 240  		}
 241  		reqHeaders.Set("Content-Type", ctype)
 242  		body = combined
 243  	}
 244  	if mi.buffer != nil && mi.mType != "" && !mi.singleChunk {
 245  		// This happens when initiating a resumable upload session.
 246  		// The initial request contains a JSON body rather than media.
 247  		// It can be retried with a getBody function that re-creates the request body.
 248  		fb := readerFunc(body)
 249  		if fb != nil {
 250  			getBody = func() (io.ReadCloser, error) {
 251  				rb := io.NopCloser(fb())
 252  				toCleanup = append(toCleanup, rb)
 253  				return rb, nil
 254  			}
 255  		}
 256  		reqHeaders.Set("X-Upload-Content-Type", mi.mType)
 257  	}
 258  	// Ensure that any bodies created in getBody are cleaned up.
 259  	cleanup = func() {
 260  		for _, closer := range toCleanup {
 261  			_ = closer.Close()
 262  		}
 263  
 264  	}
 265  	return body, getBody, cleanup
 266  }
 267  
 268  // readerFunc returns a function that always returns an io.Reader that has the same
 269  // contents as r, provided that can be done without consuming r. Otherwise, it
 270  // returns nil.
 271  // See http.NewRequest (in net/http/request.go).
 272  func readerFunc(r io.Reader) func() io.Reader {
 273  	switch r := r.(type) {
 274  	case *bytes.Buffer:
 275  		buf := r.Bytes()
 276  		return func() io.Reader { return bytes.NewReader(buf) }
 277  	case *bytes.Reader:
 278  		snapshot := *r
 279  		return func() io.Reader { r := snapshot; return &r }
 280  	case *strings.Reader:
 281  		snapshot := *r
 282  		return func() io.Reader { r := snapshot; return &r }
 283  	default:
 284  		return nil
 285  	}
 286  }
 287  
 288  // ResumableUpload returns an appropriately configured ResumableUpload value if the
 289  // upload is resumable, or nil otherwise.
 290  func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload {
 291  	if mi == nil || mi.singleChunk {
 292  		return nil
 293  	}
 294  	return &ResumableUpload{
 295  		URI:       locURI,
 296  		Media:     mi.buffer,
 297  		MediaType: mi.mType,
 298  		Callback: func(curr int64) {
 299  			if mi.progressUpdater != nil {
 300  				mi.progressUpdater(curr, mi.size)
 301  			}
 302  		},
 303  		ChunkRetryDeadline:   mi.chunkRetryDeadline,
 304  		ChunkTransferTimeout: mi.chunkTransferTimeout,
 305  	}
 306  }
 307  
 308  // SetGetBody sets the GetBody field of req to f. This was once needed
 309  // to gracefully support Go 1.7 and earlier which didn't have that
 310  // field.
 311  //
 312  // Deprecated: the code generator no longer uses this as of
 313  // 2019-02-19. Nothing else should be calling this anyway, but we
 314  // won't delete this immediately; it will be deleted in as early as 6
 315  // months.
 316  func SetGetBody(req *http.Request, f func() (io.ReadCloser, error)) {
 317  	req.GetBody = f
 318  }
 319