reader.go raw

   1  // Copyright 2023 Huawei Technologies Co.,Ltd.
   2  //
   3  // Licensed to the Apache Software Foundation (ASF) under one
   4  // or more contributor license agreements.  See the NOTICE file
   5  // distributed with this work for additional information
   6  // regarding copyright ownership.  The ASF licenses this file
   7  // to you under the Apache License, Version 2.0 (the
   8  // "License"); you may not use this file except in compliance
   9  // with the License.  You may obtain a copy of the License at
  10  //
  11  //     http://www.apache.org/licenses/LICENSE-2.0
  12  //
  13  // Unless required by applicable law or agreed to in writing,
  14  // software distributed under the License is distributed on an
  15  // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16  // KIND, either express or implied.  See the License for the
  17  // specific language governing permissions and limitations
  18  // under the License.
  19  
  20  package progress
  21  
  22  import (
  23  	"io"
  24  )
  25  
  26  const defaultProgressInterval = int64(102400)
  27  
  28  func NewTeeReader(reader io.Reader, writer io.Writer, totalBytes int64, progressListener Listener, progressInterval int64) *TeeReader {
  29  	if progressInterval <= 0 {
  30  		progressInterval = defaultProgressInterval
  31  	}
  32  
  33  	return &TeeReader{
  34  		reader:           reader,
  35  		writer:           writer,
  36  		progressInterval: progressInterval,
  37  		totalBytes:       totalBytes,
  38  		listener:         progressListener,
  39  	}
  40  }
  41  
  42  type TeeReader struct {
  43  	reader           io.Reader
  44  	writer           io.Writer
  45  	cacheBytes       int64
  46  	progressInterval int64
  47  	transferredBytes int64
  48  	totalBytes       int64
  49  	listener         Listener
  50  }
  51  
  52  func (r *TeeReader) Read(p []byte) (int, error) {
  53  	if r.transferredBytes == 0 {
  54  		event := NewEvent(TransferStartedEvent, r.transferredBytes, r.totalBytes, nil)
  55  		r.listener.ProgressChanged(event)
  56  	}
  57  
  58  	n, err := r.reader.Read(p)
  59  	if err != nil && err != io.EOF {
  60  		event := NewEvent(TransferFailedEvent, r.transferredBytes, r.totalBytes, err)
  61  		r.listener.ProgressChanged(event)
  62  	}
  63  
  64  	if n > 0 {
  65  		n64 := int64(n)
  66  		r.transferredBytes += n64
  67  		if r.writer != nil {
  68  			if n, err := r.writer.Write(p[:n]); err != nil {
  69  				return n, err
  70  			}
  71  		}
  72  
  73  		r.cacheBytes += n64
  74  		if r.cacheBytes >= r.progressInterval || r.transferredBytes == r.totalBytes {
  75  			r.cacheBytes = 0
  76  			event := NewEvent(TransferDataEvent, r.transferredBytes, r.totalBytes, nil)
  77  			r.listener.ProgressChanged(event)
  78  		}
  79  	}
  80  
  81  	if err == io.EOF {
  82  		r.cacheBytes = 0
  83  		event := NewEvent(TransferCompletedEvent, r.transferredBytes, r.totalBytes, nil)
  84  		r.listener.ProgressChanged(event)
  85  	}
  86  
  87  	return n, err
  88  }
  89  
  90  func (r *TeeReader) Close() error {
  91  	if closer, ok := r.reader.(io.ReadCloser); ok {
  92  		return closer.Close()
  93  	}
  94  	return nil
  95  }
  96