268 lines
5.3 KiB
Go
268 lines
5.3 KiB
Go
package s3util
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/xml"
|
|
"github.com/kr/s3"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
// defined by amazon
|
|
const (
|
|
minPartSize = 5 * 1024 * 1024
|
|
maxPartSize = 1<<31 - 1 // for 32-bit use; amz max is 5GiB
|
|
maxObjSize = 5 * 1024 * 1024 * 1024 * 1024
|
|
maxNPart = 10000
|
|
)
|
|
|
|
const (
|
|
concurrency = 5
|
|
nTry = 2
|
|
)
|
|
|
|
type part struct {
|
|
r io.ReadSeeker
|
|
len int64
|
|
|
|
// read by xml encoder
|
|
PartNumber int
|
|
ETag string
|
|
}
|
|
|
|
type uploader struct {
|
|
s3 s3.Service
|
|
keys s3.Keys
|
|
url string
|
|
client *http.Client
|
|
UploadId string // written by xml decoder
|
|
|
|
bufsz int64
|
|
buf []byte
|
|
off int
|
|
ch chan *part
|
|
part int
|
|
closed bool
|
|
err error
|
|
wg sync.WaitGroup
|
|
|
|
xml struct {
|
|
XMLName string `xml:"CompleteMultipartUpload"`
|
|
Part []*part
|
|
}
|
|
}
|
|
|
|
// Create creates an S3 object at url and sends multipart upload requests as
|
|
// data is written.
|
|
//
|
|
// If h is not nil, each of its entries is added to the HTTP request header.
|
|
// If c is nil, Create uses DefaultConfig.
|
|
func Create(url string, h http.Header, c *Config) (io.WriteCloser, error) {
|
|
if c == nil {
|
|
c = DefaultConfig
|
|
}
|
|
return newUploader(url, h, c)
|
|
}
|
|
|
|
// Sends an S3 multipart upload initiation request.
|
|
// See http://docs.amazonwebservices.com/AmazonS3/latest/dev/mpuoverview.html.
|
|
// This initial request returns an UploadId that we use to identify
|
|
// subsequent PUT requests.
|
|
func newUploader(url string, h http.Header, c *Config) (u *uploader, err error) {
|
|
u = new(uploader)
|
|
u.s3 = *c.Service
|
|
u.url = url
|
|
u.keys = *c.Keys
|
|
u.client = c.Client
|
|
if u.client == nil {
|
|
u.client = http.DefaultClient
|
|
}
|
|
u.bufsz = minPartSize
|
|
r, err := http.NewRequest("POST", url+"?uploads", nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
|
|
for k := range h {
|
|
for _, v := range h[k] {
|
|
r.Header.Add(k, v)
|
|
}
|
|
}
|
|
u.s3.Sign(r, u.keys)
|
|
resp, err := u.client.Do(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != 200 {
|
|
return nil, newRespError(resp)
|
|
}
|
|
err = xml.NewDecoder(resp.Body).Decode(u)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
u.ch = make(chan *part)
|
|
for i := 0; i < concurrency; i++ {
|
|
go u.worker()
|
|
}
|
|
return u, nil
|
|
}
|
|
|
|
func (u *uploader) Write(p []byte) (n int, err error) {
|
|
if u.closed {
|
|
return 0, syscall.EINVAL
|
|
}
|
|
if u.err != nil {
|
|
return 0, u.err
|
|
}
|
|
for n < len(p) {
|
|
if cap(u.buf) == 0 {
|
|
u.buf = make([]byte, int(u.bufsz))
|
|
// Increase part size (1.001x).
|
|
// This lets us reach the max object size (5TiB) while
|
|
// still doing minimal buffering for small objects.
|
|
u.bufsz = min(u.bufsz+u.bufsz/1000, maxPartSize)
|
|
}
|
|
r := copy(u.buf[u.off:], p[n:])
|
|
u.off += r
|
|
n += r
|
|
if u.off == len(u.buf) {
|
|
u.flush()
|
|
}
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
func (u *uploader) flush() {
|
|
u.wg.Add(1)
|
|
u.part++
|
|
p := &part{bytes.NewReader(u.buf[:u.off]), int64(u.off), u.part, ""}
|
|
u.xml.Part = append(u.xml.Part, p)
|
|
u.ch <- p
|
|
u.buf, u.off = nil, 0
|
|
}
|
|
|
|
func (u *uploader) worker() {
|
|
for p := range u.ch {
|
|
u.retryUploadPart(p)
|
|
}
|
|
}
|
|
|
|
// Calls putPart up to nTry times to recover from transient errors.
|
|
func (u *uploader) retryUploadPart(p *part) {
|
|
defer u.wg.Done()
|
|
defer func() { p.r = nil }() // free the large buffer
|
|
var err error
|
|
for i := 0; i < nTry; i++ {
|
|
p.r.Seek(0, 0)
|
|
err = u.putPart(p)
|
|
if err == nil {
|
|
return
|
|
}
|
|
}
|
|
u.err = err
|
|
}
|
|
|
|
// Uploads part p, reading its contents from p.r.
|
|
// Stores the ETag in p.ETag.
|
|
func (u *uploader) putPart(p *part) error {
|
|
v := url.Values{}
|
|
v.Set("partNumber", strconv.Itoa(p.PartNumber))
|
|
v.Set("uploadId", u.UploadId)
|
|
req, err := http.NewRequest("PUT", u.url+"?"+v.Encode(), p.r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.ContentLength = p.len
|
|
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
|
|
u.s3.Sign(req, u.keys)
|
|
resp, err := u.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != 200 {
|
|
return newRespError(resp)
|
|
}
|
|
s := resp.Header.Get("etag") // includes quote chars for some reason
|
|
if len(s) < 2 {
|
|
return fmt.Errorf("received invalid etag %q", s)
|
|
}
|
|
p.ETag = s[1 : len(s)-1]
|
|
return nil
|
|
}
|
|
|
|
func (u *uploader) Close() error {
|
|
if u.closed {
|
|
return syscall.EINVAL
|
|
}
|
|
if cap(u.buf) > 0 {
|
|
u.flush()
|
|
}
|
|
u.wg.Wait()
|
|
close(u.ch)
|
|
u.closed = true
|
|
if u.err != nil {
|
|
u.abort()
|
|
return u.err
|
|
}
|
|
|
|
body, err := xml.Marshal(u.xml)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b := bytes.NewBuffer(body)
|
|
v := url.Values{}
|
|
v.Set("uploadId", u.UploadId)
|
|
req, err := http.NewRequest("POST", u.url+"?"+v.Encode(), b)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
|
|
u.s3.Sign(req, u.keys)
|
|
resp, err := u.client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp.StatusCode != 200 {
|
|
return newRespError(resp)
|
|
}
|
|
resp.Body.Close()
|
|
return nil
|
|
}
|
|
|
|
func (u *uploader) abort() {
|
|
// TODO(kr): devise a reasonable way to report an error here in addition
|
|
// to the error that caused the abort.
|
|
v := url.Values{}
|
|
v.Set("uploadId", u.UploadId)
|
|
s := u.url + "?" + v.Encode()
|
|
req, err := http.NewRequest("DELETE", s, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
req.Header.Set("Date", time.Now().UTC().Format(http.TimeFormat))
|
|
u.s3.Sign(req, u.keys)
|
|
resp, err := u.client.Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != 200 {
|
|
return
|
|
}
|
|
}
|
|
|
|
func min(a, b int64) int64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|