1 // Copyright 2016 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 5 package gensupport
6 7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "mime"
12 "mime/multipart"
13 "net/http"
14 "net/textproto"
15 "strings"
16 "sync"
17 "time"
18 19 gax "github.com/googleapis/gax-go/v2"
20 "google.golang.org/api/googleapi"
21 )
22 23 type typeReader struct {
24 io.Reader
25 typ string
26 }
27 28 // multipartReader combines the contents of multiple readers to create a multipart/related HTTP body.
29 // Close must be called if reads from the multipartReader are abandoned before reaching EOF.
30 type multipartReader struct {
31 pr *io.PipeReader
32 ctype string
33 mu sync.Mutex
34 pipeOpen bool
35 }
36 37 // boundary optionally specifies the MIME boundary
38 func newMultipartReader(parts []typeReader, boundary string) *multipartReader {
39 mp := &multipartReader{pipeOpen: true}
40 var pw *io.PipeWriter
41 mp.pr, pw = io.Pipe()
42 mpw := multipart.NewWriter(pw)
43 if boundary != "" {
44 mpw.SetBoundary(boundary)
45 }
46 mp.ctype = "multipart/related; boundary=" + mpw.Boundary()
47 go func() {
48 for _, part := range parts {
49 w, err := mpw.CreatePart(typeHeader(part.typ))
50 if err != nil {
51 mpw.Close()
52 pw.CloseWithError(fmt.Errorf("googleapi: CreatePart failed: %v", err))
53 return
54 }
55 _, err = io.Copy(w, part.Reader)
56 if err != nil {
57 mpw.Close()
58 pw.CloseWithError(fmt.Errorf("googleapi: Copy failed: %v", err))
59 return
60 }
61 }
62 63 mpw.Close()
64 pw.Close()
65 }()
66 return mp
67 }
68 69 func (mp *multipartReader) Read(data []byte) (n int, err error) {
70 return mp.pr.Read(data)
71 }
72 73 func (mp *multipartReader) Close() error {
74 mp.mu.Lock()
75 if !mp.pipeOpen {
76 mp.mu.Unlock()
77 return nil
78 }
79 mp.pipeOpen = false
80 mp.mu.Unlock()
81 return mp.pr.Close()
82 }
83 84 // CombineBodyMedia combines a json body with media content to create a multipart/related HTTP body.
85 // It returns a ReadCloser containing the combined body, and the overall "multipart/related" content type, with random boundary.
86 //
87 // The caller must call Close on the returned ReadCloser if reads are abandoned before reaching EOF.
88 func CombineBodyMedia(body io.Reader, bodyContentType string, media io.Reader, mediaContentType string) (io.ReadCloser, string) {
89 return combineBodyMedia(body, bodyContentType, media, mediaContentType, "")
90 }
91 92 // combineBodyMedia is CombineBodyMedia but with an optional mimeBoundary field.
93 func combineBodyMedia(body io.Reader, bodyContentType string, media io.Reader, mediaContentType, mimeBoundary string) (io.ReadCloser, string) {
94 mp := newMultipartReader([]typeReader{
95 {body, bodyContentType},
96 {media, mediaContentType},
97 }, mimeBoundary)
98 return mp, mp.ctype
99 }
100 101 func typeHeader(contentType string) textproto.MIMEHeader {
102 h := make(textproto.MIMEHeader)
103 if contentType != "" {
104 h.Set("Content-Type", contentType)
105 }
106 return h
107 }
108 109 // PrepareUpload determines whether the data in the supplied reader should be
110 // uploaded in a single request, or in sequential chunks.
111 // chunkSize is the size of the chunk that media should be split into.
112 //
113 // If chunkSize is zero, media is returned as the first value, and the other
114 // two return values are nil, true.
115 //
116 // Otherwise, a MediaBuffer is returned, along with a bool indicating whether the
117 // contents of media fit in a single chunk.
118 //
119 // After PrepareUpload has been called, media should no longer be used: the
120 // media content should be accessed via one of the return values.
121 func PrepareUpload(media io.Reader, chunkSize int, enableAutoChecksum bool) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
122 if chunkSize == 0 { // do not chunk
123 return media, nil, true
124 }
125 mb = NewMediaBuffer(media, chunkSize)
126 mb.enableAutoChecksum = enableAutoChecksum
127 _, _, _, err := mb.Chunk()
128 // If err is io.EOF, we can upload this in a single request. Otherwise, err is
129 // either nil or a non-EOF error. If it is the latter, then the next call to
130 // mb.Chunk will return the same error. Returning a MediaBuffer ensures that this
131 // error will be handled at some point.
132 return nil, mb, err == io.EOF
133 }
134 135 // MediaInfo holds information for media uploads. It is intended for use by generated
136 // code only.
137 type MediaInfo struct {
138 // At most one of Media and MediaBuffer will be set.
139 media io.Reader
140 buffer *MediaBuffer
141 singleChunk bool
142 mType string
143 size int64 // mediaSize, if known. Used only for calls to progressUpdater_.
144 progressUpdater googleapi.ProgressUpdater
145 chunkRetryDeadline time.Duration
146 chunkTransferTimeout time.Duration
147 }
148 149 // NewInfoFromMedia should be invoked from the Media method of a call. It returns a
150 // MediaInfo populated with chunk size and content type, and a reader or MediaBuffer
151 // if needed.
152 func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
153 mi := &MediaInfo{}
154 opts := googleapi.ProcessMediaOptions(options)
155 if !opts.ForceEmptyContentType {
156 mi.mType = opts.ContentType
157 if mi.mType == "" {
158 r, mi.mType = gax.DetermineContentType(r)
159 }
160 }
161 mi.chunkRetryDeadline = opts.ChunkRetryDeadline
162 mi.chunkTransferTimeout = opts.ChunkTransferTimeout
163 mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize, opts.EnableAutoChecksum)
164 return mi
165 }
166 167 // NewInfoFromResumableMedia should be invoked from the ResumableMedia method of a
168 // call. It returns a MediaInfo using the given reader, size and media type.
169 func NewInfoFromResumableMedia(r io.ReaderAt, size int64, mediaType string) *MediaInfo {
170 rdr := ReaderAtToReader(r, size)
171 mType := mediaType
172 if mType == "" {
173 rdr, mType = gax.DetermineContentType(rdr)
174 }
175 176 return &MediaInfo{
177 size: size,
178 mType: mType,
179 buffer: NewMediaBuffer(rdr, googleapi.DefaultUploadChunkSize),
180 media: nil,
181 singleChunk: false,
182 }
183 }
184 185 // SetProgressUpdater sets the progress updater for the media info.
186 func (mi *MediaInfo) SetProgressUpdater(pu googleapi.ProgressUpdater) {
187 if mi != nil {
188 mi.progressUpdater = pu
189 }
190 }
191 192 // UploadType determines the type of upload: a single request, or a resumable
193 // series of requests.
194 func (mi *MediaInfo) UploadType() string {
195 if mi.singleChunk {
196 return "multipart"
197 }
198 return "resumable"
199 }
200 201 // UploadRequest sets up an HTTP request for media upload. It adds headers
202 // as necessary, and returns a replacement for the body and a function for http.Request.GetBody.
203 func (mi *MediaInfo) UploadRequest(reqHeaders http.Header, body io.Reader) (newBody io.Reader, getBody func() (io.ReadCloser, error), cleanup func()) {
204 if body == nil {
205 body = new(bytes.Buffer)
206 }
207 cleanup = func() {}
208 if mi == nil {
209 return body, nil, cleanup
210 }
211 var media io.Reader
212 if mi.media != nil {
213 // This only happens when the caller has turned off chunking. In that
214 // case, we write all of media in a single non-retryable request.
215 media = mi.media
216 } else if mi.singleChunk {
217 // The data fits in a single chunk, which has now been read into the MediaBuffer.
218 // We obtain that chunk so we can write it in a single request. The request can
219 // be retried because the data is stored in the MediaBuffer.
220 media, _, _, _ = mi.buffer.Chunk()
221 }
222 toCleanup := []io.Closer{}
223 if media != nil {
224 fb := readerFunc(body)
225 fm := readerFunc(media)
226 combined, ctype := CombineBodyMedia(body, "application/json", media, mi.mType)
227 toCleanup = append(toCleanup, combined)
228 if fb != nil && fm != nil {
229 getBody = func() (io.ReadCloser, error) {
230 rb := io.NopCloser(fb())
231 rm := io.NopCloser(fm())
232 var mimeBoundary string
233 if _, params, err := mime.ParseMediaType(ctype); err == nil {
234 mimeBoundary = params["boundary"]
235 }
236 r, _ := combineBodyMedia(rb, "application/json", rm, mi.mType, mimeBoundary)
237 toCleanup = append(toCleanup, r)
238 return r, nil
239 }
240 }
241 reqHeaders.Set("Content-Type", ctype)
242 body = combined
243 }
244 if mi.buffer != nil && mi.mType != "" && !mi.singleChunk {
245 // This happens when initiating a resumable upload session.
246 // The initial request contains a JSON body rather than media.
247 // It can be retried with a getBody function that re-creates the request body.
248 fb := readerFunc(body)
249 if fb != nil {
250 getBody = func() (io.ReadCloser, error) {
251 rb := io.NopCloser(fb())
252 toCleanup = append(toCleanup, rb)
253 return rb, nil
254 }
255 }
256 reqHeaders.Set("X-Upload-Content-Type", mi.mType)
257 }
258 // Ensure that any bodies created in getBody are cleaned up.
259 cleanup = func() {
260 for _, closer := range toCleanup {
261 _ = closer.Close()
262 }
263 264 }
265 return body, getBody, cleanup
266 }
267 268 // readerFunc returns a function that always returns an io.Reader that has the same
269 // contents as r, provided that can be done without consuming r. Otherwise, it
270 // returns nil.
271 // See http.NewRequest (in net/http/request.go).
272 func readerFunc(r io.Reader) func() io.Reader {
273 switch r := r.(type) {
274 case *bytes.Buffer:
275 buf := r.Bytes()
276 return func() io.Reader { return bytes.NewReader(buf) }
277 case *bytes.Reader:
278 snapshot := *r
279 return func() io.Reader { r := snapshot; return &r }
280 case *strings.Reader:
281 snapshot := *r
282 return func() io.Reader { r := snapshot; return &r }
283 default:
284 return nil
285 }
286 }
287 288 // ResumableUpload returns an appropriately configured ResumableUpload value if the
289 // upload is resumable, or nil otherwise.
290 func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload {
291 if mi == nil || mi.singleChunk {
292 return nil
293 }
294 return &ResumableUpload{
295 URI: locURI,
296 Media: mi.buffer,
297 MediaType: mi.mType,
298 Callback: func(curr int64) {
299 if mi.progressUpdater != nil {
300 mi.progressUpdater(curr, mi.size)
301 }
302 },
303 ChunkRetryDeadline: mi.chunkRetryDeadline,
304 ChunkTransferTimeout: mi.chunkTransferTimeout,
305 }
306 }
307 308 // SetGetBody sets the GetBody field of req to f. This was once needed
309 // to gracefully support Go 1.7 and earlier which didn't have that
310 // field.
311 //
312 // Deprecated: the code generator no longer uses this as of
313 // 2019-02-19. Nothing else should be calling this anyway, but we
314 // won't delete this immediately; it will be deleted in as early as 6
315 // months.
316 func SetGetBody(req *http.Request, f func() (io.ReadCloser, error)) {
317 req.GetBody = f
318 }
319