reader.go raw
1 // Copyright 2023 Huawei Technologies Co.,Ltd.
2 //
3 // Licensed to the Apache Software Foundation (ASF) under one
4 // or more contributor license agreements. See the NOTICE file
5 // distributed with this work for additional information
6 // regarding copyright ownership. The ASF licenses this file
7 // to you under the Apache License, Version 2.0 (the
8 // "License"); you may not use this file except in compliance
9 // with the License. You may obtain a copy of the License at
10 //
11 // http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing,
14 // software distributed under the License is distributed on an
15 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 // KIND, either express or implied. See the License for the
17 // specific language governing permissions and limitations
18 // under the License.
19
20 package progress
21
22 import (
23 "io"
24 )
25
26 const defaultProgressInterval = int64(102400)
27
28 func NewTeeReader(reader io.Reader, writer io.Writer, totalBytes int64, progressListener Listener, progressInterval int64) *TeeReader {
29 if progressInterval <= 0 {
30 progressInterval = defaultProgressInterval
31 }
32
33 return &TeeReader{
34 reader: reader,
35 writer: writer,
36 progressInterval: progressInterval,
37 totalBytes: totalBytes,
38 listener: progressListener,
39 }
40 }
41
42 type TeeReader struct {
43 reader io.Reader
44 writer io.Writer
45 cacheBytes int64
46 progressInterval int64
47 transferredBytes int64
48 totalBytes int64
49 listener Listener
50 }
51
52 func (r *TeeReader) Read(p []byte) (int, error) {
53 if r.transferredBytes == 0 {
54 event := NewEvent(TransferStartedEvent, r.transferredBytes, r.totalBytes, nil)
55 r.listener.ProgressChanged(event)
56 }
57
58 n, err := r.reader.Read(p)
59 if err != nil && err != io.EOF {
60 event := NewEvent(TransferFailedEvent, r.transferredBytes, r.totalBytes, err)
61 r.listener.ProgressChanged(event)
62 }
63
64 if n > 0 {
65 n64 := int64(n)
66 r.transferredBytes += n64
67 if r.writer != nil {
68 if n, err := r.writer.Write(p[:n]); err != nil {
69 return n, err
70 }
71 }
72
73 r.cacheBytes += n64
74 if r.cacheBytes >= r.progressInterval || r.transferredBytes == r.totalBytes {
75 r.cacheBytes = 0
76 event := NewEvent(TransferDataEvent, r.transferredBytes, r.totalBytes, nil)
77 r.listener.ProgressChanged(event)
78 }
79 }
80
81 if err == io.EOF {
82 r.cacheBytes = 0
83 event := NewEvent(TransferCompletedEvent, r.transferredBytes, r.totalBytes, nil)
84 r.listener.ProgressChanged(event)
85 }
86
87 return n, err
88 }
89
90 func (r *TeeReader) Close() error {
91 if closer, ok := r.reader.(io.ReadCloser); ok {
92 return closer.Close()
93 }
94 return nil
95 }
96