...
 
Commits (5)
......@@ -22,7 +22,7 @@ getdeps:
ifeq ($(GOARCH),s390x)
@which staticcheck 1>/dev/null || (echo "Installing staticcheck" && GO111MODULE=off go get honnef.co/go/tools/cmd/staticcheck)
else
@which staticcheck 1>/dev/null || (echo "Installing staticcheck" && wget --quiet https://github.com/dominikh/go-tools/releases/download/2019.2.3/staticcheck_${GOOS}_${GOARCH}.tar.gz && tar xf staticcheck_${GOOS}_${GOARCH}.tar.gz && mv staticcheck/staticcheck ${GOPATH}/bin/staticcheck && chmod +x ${GOPATH}/bin/staticcheck && rm -f staticcheck_${GOOS}_${GOARCH}.tar.gz && rm -rf staticcheck)
@which staticcheck 1>/dev/null || (echo "Installing staticcheck" && wget --quiet https://github.com/dominikh/go-tools/releases/download/2020.1.3/staticcheck_${GOOS}_${GOARCH}.tar.gz && tar xf staticcheck_${GOOS}_${GOARCH}.tar.gz && mv staticcheck/staticcheck ${GOPATH}/bin/staticcheck && chmod +x ${GOPATH}/bin/staticcheck && rm -f staticcheck_${GOOS}_${GOARCH}.tar.gz && rm -rf staticcheck)
endif
@which misspell 1>/dev/null || (echo "Installing misspell" && GO111MODULE=off go get -u github.com/client9/misspell/cmd/misspell)
......
checks = ["all", "-ST1005", "-ST1000", "-SA4000", "-SA9004", "-SA1019", "-SA1008", "-U1000", "-ST1003", "-ST1018"]
......@@ -106,12 +106,12 @@ func initHealState() *allHealState {
healSeqMap: make(map[string]*healSequence),
}
go healState.periodicHealSeqsClean()
go healState.periodicHealSeqsClean(GlobalContext)
return healState
}
func (ahs *allHealState) periodicHealSeqsClean() {
func (ahs *allHealState) periodicHealSeqsClean(ctx context.Context) {
// Launch clean-up routine to remove this heal sequence (after
// it ends) from the global state after timeout has elapsed.
ticker := time.NewTicker(time.Minute * 5)
......@@ -127,7 +127,7 @@ func (ahs *allHealState) periodicHealSeqsClean() {
}
}
ahs.Unlock()
case <-GlobalServiceDoneCh:
case <-ctx.Done():
// server could be restarting - need
// to exit immediately
return
......@@ -369,7 +369,7 @@ func newHealSequence(bucket, objPrefix, clientAddr string,
reqInfo := &logger.ReqInfo{RemoteHost: clientAddr, API: "Heal", BucketName: bucket}
reqInfo.AppendTags("prefix", objPrefix)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
return &healSequence{
bucket: bucket,
......@@ -603,7 +603,7 @@ func (h *healSequence) healItemsFromSourceCh() error {
h.lastHealActivity = UTCNow()
case <-h.traverseAndHealDoneCh:
return nil
case <-GlobalServiceDoneCh:
case <-h.ctx.Done():
return nil
}
}
......@@ -630,11 +630,6 @@ func (h *healSequence) healItems(bucketsOnly bool) error {
return err
}
// Start healing the background ops prefix.
if err := h.healMinioSysMeta(backgroundOpsMetaPrefix)(); err != nil {
logger.LogIf(h.ctx, err)
}
// Heal buckets and objects
return h.healBuckets(bucketsOnly)
}
......@@ -671,7 +666,7 @@ func (h *healSequence) healMinioSysMeta(metaPrefix string) func() error {
// NOTE: Healing on meta is run regardless
// of any bucket being selected, this is to ensure that
// meta are always upto date and correct.
return objectAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, func(bucket string, object string) error {
return objectAPI.HealObjects(h.ctx, minioMetaBucket, metaPrefix, h.settings, func(bucket string, object string) error {
if h.isQuitting() {
return errHealStopSignalled
}
......@@ -765,7 +760,7 @@ func (h *healSequence) healBucket(bucket string, bucketsOnly bool) error {
return nil
}
if err := objectAPI.HealObjects(h.ctx, bucket, h.objPrefix, h.healObject); err != nil {
if err := objectAPI.HealObjects(h.ctx, bucket, h.objPrefix, h.settings, h.healObject); err != nil {
return errFnHealFromAPIErr(h.ctx, err)
}
return nil
......
......@@ -122,7 +122,8 @@ const (
ErrMissingCredTag
ErrCredMalformed
ErrInvalidRegion
ErrInvalidService
ErrInvalidServiceS3
ErrInvalidServiceSTS
ErrInvalidRequestVersion
ErrMissingSignTag
ErrMissingSignHeadersTag
......@@ -653,9 +654,14 @@ var errorCodes = errorCodeMap{
// FIXME: Should contain the invalid param set as seen in https://github.com/minio/minio/issues/2385.
// right Description: "Error parsing the X-Amz-Credential parameter; incorrect service \"s4\". This endpoint belongs to \"s3\".".
// Need changes to make sure variable messages can be constructed.
ErrInvalidService: {
Code: "AuthorizationQueryParametersError",
Description: "Error parsing the X-Amz-Credential parameter; incorrect service. This endpoint belongs to \"s3\".",
ErrInvalidServiceS3: {
Code: "AuthorizationParametersError",
Description: "Error parsing the Credential/X-Amz-Credential parameter; incorrect service. This endpoint belongs to \"s3\".",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidServiceSTS: {
Code: "AuthorizationParametersError",
Description: "Error parsing the Credential parameter; incorrect service. This endpoint belongs to \"sts\".",
HTTPStatusCode: http.StatusBadRequest,
},
// FIXME: Should contain the invalid param set as seen in https://github.com/minio/minio/issues/2385.
......
......@@ -186,5 +186,5 @@ func bgHealObject(ctx context.Context, bucket, object string, opts madmin.HealOp
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealObject(ctx, bucket, object, opts.DryRun, opts.Remove, opts.ScanMode)
return objectAPI.HealObject(ctx, bucket, object, opts)
}
......@@ -26,7 +26,6 @@ import (
"github.com/minio/minio/cmd/config/cache"
"github.com/minio/minio/cmd/config/compress"
"github.com/minio/minio/cmd/config/etcd"
xetcd "github.com/minio/minio/cmd/config/etcd"
"github.com/minio/minio/cmd/config/etcd/dns"
xldap "github.com/minio/minio/cmd/config/identity/ldap"
"github.com/minio/minio/cmd/config/identity/openid"
......@@ -304,13 +303,13 @@ func lookupConfigs(s config.Config) {
}
}
etcdCfg, err := xetcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs)
etcdCfg, err := etcd.LookupConfig(s[config.EtcdSubSys][config.Default], globalRootCAs)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize etcd config: %w", err))
}
if etcdCfg.Enabled {
globalEtcdClient, err = xetcd.New(etcdCfg)
globalEtcdClient, err = etcd.New(etcdCfg)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize etcd config: %w", err))
}
......
......@@ -46,7 +46,7 @@ func (u Err) Clone() Err {
}
}
// Return the error message
// Error returns the error message
func (u Err) Error() string {
if u.detail == "" {
if u.msg != "" {
......
......@@ -60,11 +60,7 @@ func (c *CoreDNS) List() (map[string][]SrvRecord, error) {
if record.Key == "" {
continue
}
if _, ok := srvRecords[record.Key]; ok {
srvRecords[record.Key] = append(srvRecords[record.Key], record)
} else {
srvRecords[record.Key] = []SrvRecord{record}
}
srvRecords[record.Key] = append(srvRecords[record.Key], record)
}
}
return srvRecords, nil
......
This diff is collapsed.
This diff is collapsed.
package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"bytes"
"testing"
"github.com/tinylib/msgp/msgp"
)
func TestMarshalUnmarshaldataUsageCacheInfo(t *testing.T) {
v := dataUsageCacheInfo{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgdataUsageCacheInfo(b *testing.B) {
v := dataUsageCacheInfo{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgdataUsageCacheInfo(b *testing.B) {
v := dataUsageCacheInfo{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshaldataUsageCacheInfo(b *testing.B) {
v := dataUsageCacheInfo{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodedataUsageCacheInfo(t *testing.T) {
v := dataUsageCacheInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodedataUsageCacheInfo Msgsize() is inaccurate")
}
vn := dataUsageCacheInfo{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodedataUsageCacheInfo(b *testing.B) {
v := dataUsageCacheInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodedataUsageCacheInfo(b *testing.B) {
v := dataUsageCacheInfo{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshaldataUsageEntry(t *testing.T) {
v := dataUsageEntry{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgdataUsageEntry(b *testing.B) {
v := dataUsageEntry{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgdataUsageEntry(b *testing.B) {
v := dataUsageEntry{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshaldataUsageEntry(b *testing.B) {
v := dataUsageEntry{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodedataUsageEntry(t *testing.T) {
v := dataUsageEntry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodedataUsageEntry Msgsize() is inaccurate")
}
vn := dataUsageEntry{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodedataUsageEntry(b *testing.B) {
v := dataUsageEntry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodedataUsageEntry(b *testing.B) {
v := dataUsageEntry{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshalsizeHistogram(t *testing.T) {
v := sizeHistogram{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgsizeHistogram(b *testing.B) {
v := sizeHistogram{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgsizeHistogram(b *testing.B) {
v := sizeHistogram{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalsizeHistogram(b *testing.B) {
v := sizeHistogram{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestEncodeDecodesizeHistogram(t *testing.T) {
v := sizeHistogram{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodesizeHistogram Msgsize() is inaccurate")
}
vn := sizeHistogram{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
}
buf.Reset()
msgp.Encode(&buf, &v)
err = msgp.NewReader(&buf).Skip()
if err != nil {
t.Error(err)
}
}
func BenchmarkEncodesizeHistogram(b *testing.B) {
v := sizeHistogram{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
en := msgp.NewWriter(msgp.Nowhere)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.EncodeMsg(en)
}
en.Flush()
}
func BenchmarkDecodesizeHistogram(b *testing.B) {
v := sizeHistogram{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
rd := msgp.NewEndlessReader(buf.Bytes(), b)
dc := msgp.NewReader(rd)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := v.DecodeMsg(dc)
if err != nil {
b.Fatal(err)
}
}
}
This diff is collapsed.
This diff is collapsed.
......@@ -693,7 +693,7 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
return c, nil
}
func (c *cacheObjects) gc(ctx context.Context, doneCh chan struct{}) {
func (c *cacheObjects) gc(ctx context.Context, doneCh <-chan struct{}) {
ticker := time.NewTicker(cacheGCInterval)
defer ticker.Stop()
......
......@@ -11,173 +11,12 @@ package cmd
import (
"errors"
"os"
"path/filepath"
"strings"
"sync"
)
var errSkipFile = errors.New("fastwalk: skip this file")
// Walk is a faster implementation of filepath.Walk.
//
// filepath.Walk's design necessarily calls os.Lstat on each file,
// even if the caller needs less info.
// Many tools need only the type of each file.
// On some platforms, this information is provided directly by the readdir
// system call, avoiding the need to stat each file individually.
// fastwalk_unix.go contains a fork of the syscall routines.
//
// See golang.org/issue/16399
//
// Walk walks the file tree rooted at root, calling walkFn for
// each file or directory in the tree, including root.
//
// If fastWalk returns filepath.SkipDir, the directory is skipped.
//
// Unlike filepath.Walk:
// * file stat calls must be done by the user.
// The only provided metadata is the file type, which does not include
// any permission bits.
// * multiple goroutines stat the filesystem concurrently. The provided
// walkFn must be safe for concurrent use.
// * fastWalk can follow symlinks if walkFn returns the TraverseLink
// sentinel error. It is the walkFn's responsibility to prevent
// fastWalk from going into symlink cycles.
func fastWalk(root string, nworkers int, doneCh <-chan struct{}, walkFn func(path string, typ os.FileMode) error) error {
// Make sure to wait for all workers to finish, otherwise
// walkFn could still be called after returning. This Wait call
// runs after close(e.donec) below.
var wg sync.WaitGroup
defer wg.Wait()
w := &walker{
fn: walkFn,
enqueuec: make(chan walkItem, nworkers), // buffered for performance
workc: make(chan walkItem, nworkers), // buffered for performance
donec: make(chan struct{}),
// buffered for correctness & not leaking goroutines:
resc: make(chan error, nworkers),
}
defer close(w.donec)
for i := 0; i < nworkers; i++ {
wg.Add(1)
go w.doWork(&wg)
}
todo := []walkItem{{dir: root}}
out := 0
for {
workc := w.workc
var workItem walkItem
if len(todo) == 0 {
workc = nil
} else {
workItem = todo[len(todo)-1]
}
select {
case <-doneCh:
return nil
case workc <- workItem:
todo = todo[:len(todo)-1]
out++
case it := <-w.enqueuec:
todo = append(todo, it)
case err := <-w.resc:
out--
if err != nil {
return err
}
if out == 0 && len(todo) == 0 {
// It's safe to quit here, as long as the buffered
// enqueue channel isn't also readable, which might
// happen if the worker sends both another unit of
// work and its result before the other select was
// scheduled and both w.resc and w.enqueuec were
// readable.
select {
case it := <-w.enqueuec:
todo = append(todo, it)
default:
return nil
}
}
}
}
}
// doWork reads directories as instructed (via workc) and runs the
// user's callback function.
func (w *walker) doWork(wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-w.donec:
return
case it := <-w.workc:
select {
case <-w.donec:
return
case w.resc <- w.walk(it.dir, !it.callbackDone):
}
}
}
}
type walker struct {
fn func(path string, typ os.FileMode) error
donec chan struct{} // closed on fastWalk's return
workc chan walkItem // to workers
enqueuec chan walkItem // from workers
resc chan error // from workers
}
type walkItem struct {
dir string
callbackDone bool // callback already called; don't do it again
}
func (w *walker) enqueue(it walkItem) {
select {
case w.enqueuec <- it:
case <-w.donec:
}
}
var stringsBuilderPool = sync.Pool{
New: func() interface{} {
return &strings.Builder{}
},
}
func (w *walker) onDirEnt(dirName, baseName string, typ os.FileMode) error {
builder := stringsBuilderPool.Get().(*strings.Builder)
defer func() {
builder.Reset()
stringsBuilderPool.Put(builder)
}()
builder.WriteString(dirName)
if !strings.HasSuffix(dirName, SlashSeparator) {
builder.WriteString(SlashSeparator)
}
builder.WriteString(baseName)
if typ == os.ModeDir {
w.enqueue(walkItem{dir: builder.String()})
return nil
}
err := w.fn(builder.String(), typ)
if err == filepath.SkipDir || err == errSkipFile {
return nil
}
return err
}
func readDirFn(dirName string, fn func(dirName, entName string, typ os.FileMode) error) error {
func readDirFn(dirName string, fn func(entName string, typ os.FileMode) error) error {
fis, err := readDir(dirName)
if err != nil {
return err
......@@ -188,23 +27,9 @@ func readDirFn(dirName string, fn func(dirName, entName string, typ os.FileMode)
mode |= os.ModeDir
}
if err = fn(dirName, fi, mode); err != nil {
if err = fn(fi, mode); err != nil {
return err
}
}
return nil
}
func (w *walker) walk(root string, runUserCallback bool) error {
if runUserCallback {
err := w.fn(root, os.ModeDir)
if err == filepath.SkipDir || err == errSkipFile {
return nil
}
if err != nil {
return err
}
}
return readDirFn(root, w.onDirEnt)
}
......@@ -828,7 +828,7 @@ func ecDrivesNoConfig(drivesPerSet int) int {
// Make XL backend meta volumes.
func makeFormatXLMetaVolumes(disk StorageAPI) error {
// Attempt to create MinIO internal buckets.
return disk.MakeVolBulk(minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, minioMetaBackgroundOpsBucket)
return disk.MakeVolBulk(minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket)
}
var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists)
......
......@@ -757,7 +757,7 @@ func (fs *FSObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
// Removes multipart uploads if any older than `expiry` duration
// on all buckets for every `cleanupInterval`, this function is
// blocking and should be run in a go-routine.
func (fs *FSObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh chan struct{}) {
func (fs *FSObjects) cleanupStaleMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh <-chan struct{}) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
......
......@@ -21,6 +21,7 @@ import (
"context"
"os"
"path/filepath"
"sync"
"testing"
"time"
)
......@@ -34,33 +35,38 @@ func TestFSCleanupMultipartUploadsInRoutine(t *testing.T) {
obj := initFSObjects(disk, t)
fs := obj.(*FSObjects)
// Close the go-routine, we are going to
// manually start it and test in this test case.
GlobalServiceDoneCh <- struct{}{}
bucketName := "bucket"
objectName := "object"
obj.MakeBucketWithLocation(context.Background(), bucketName, "")
uploadID, err := obj.NewMultipartUpload(context.Background(), bucketName, objectName, ObjectOptions{})
// Create a context we can cancel.
ctx, cancel := context.WithCancel(context.Background())
obj.MakeBucketWithLocation(ctx, bucketName, "")
uploadID, err := obj.NewMultipartUpload(ctx, bucketName, objectName, ObjectOptions{})
if err != nil {
t.Fatal("Unexpected err: ", err)
}
go fs.cleanupStaleMultipartUploads(context.Background(), 20*time.Millisecond, 0, GlobalServiceDoneCh)
// Wait for 40ms such that - we have given enough time for
// cleanup routine to kick in.
time.Sleep(40 * time.Millisecond)
var cleanupWg sync.WaitGroup
cleanupWg.Add(1)
go func() {
defer cleanupWg.Done()
fs.cleanupStaleMultipartUploads(context.Background(), time.Millisecond, 0, ctx.Done())
}()
// Close the routine we do not need it anymore.
GlobalServiceDoneCh <- struct{}{}
// Wait for 100ms such that - we have given enough time for
// cleanup routine to kick in. Flaky on slow systems...
time.Sleep(100 * time.Millisecond)
cancel()
cleanupWg.Wait()
// Check if upload id was already purged.
if err = obj.AbortMultipartUpload(context.Background(), bucketName, objectName, uploadID); err != nil {
if _, ok := err.(InvalidUploadID); !ok {
t.Fatal("Unexpected err: ", err)
}
} else {
t.Error("Item was not cleaned up.")
}
}
......
......@@ -37,12 +37,11 @@ import (
"github.com/minio/minio/cmd/config"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
bucketsse "github.com/minio/minio/pkg/bucket/encryption"
"github.com/minio/minio/pkg/bucket/lifecycle"
"github.com/minio/minio/pkg/bucket/object/tagging"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/lock"
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/mimedb"
......@@ -112,7 +111,7 @@ func initMetaVolumeFS(fsPath, fsUUID string) error {
return err
}
if err := os.MkdirAll(pathJoin(fsPath, minioMetaBackgroundOpsBucket), 0777); err != nil {
if err := os.MkdirAll(pathJoin(fsPath, dataUsageBucket), 0777); err != nil {
return err
}
......@@ -235,9 +234,26 @@ func (fs *FSObjects) waitForLowActiveIO() {
}
// CrawlAndGetDataUsage returns data usage stats of the current FS deployment
func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, endCh <-chan struct{}) DataUsageInfo {
dataUsageInfo := updateUsage(fs.fsPath, endCh, fs.waitForLowActiveIO, func(item Item) (int64, error) {
// Get file size, symlinks which cannot bex
func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error {
// Load bucket totals
var oldCache dataUsageCache
err := oldCache.load(ctx, fs, dataUsageCacheName)
if err != nil {
return err
}
if oldCache.Info.Name == "" {
oldCache.Info.Name = dataUsageRoot
}
if dataUsageDebug {
logger.Info(color.Green("FSObjects.CrawlAndGetDataUsage:") + " Start crawl cycle")
}
buckets, err := fs.ListBuckets(ctx)
if err != nil {
return err
}
t := time.Now()
cache, err := updateUsage(ctx, fs.fsPath, oldCache, fs.waitForLowActiveIO, func(item Item) (int64, error) {
// Get file size, symlinks which cannot be
// followed are automatically filtered by fastwalk.
fi, err := os.Stat(item.Path)
if err != nil {
......@@ -245,11 +261,16 @@ func (fs *FSObjects) CrawlAndGetDataUsage(ctx context.Context, endCh <-chan stru
}
return fi.Size(), nil
})
if dataUsageDebug {
logger.Info(color.Green("FSObjects.CrawlAndGetDataUsage:")+" Crawl time: %v", time.Since(t))
}
// Even if there was an error, the new cache may have better info.
if cache.Info.LastUpdate.After(oldCache.Info.LastUpdate) {
logger.LogIf(ctx, cache.save(ctx, fs, dataUsageCacheName))
updates <- cache.dui(dataUsageRoot, buckets)
}
dataUsageInfo.LastUpdate = UTCNow()
atomic.StoreUint64(&fs.totalUsed, dataUsageInfo.ObjectsTotalSize)
return dataUsageInfo
return err
}
/// Bucket operations
......@@ -1236,7 +1257,7 @@ func (fs *FSObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealRe
}
// HealObject - no-op for fs. Valid only for XL.
func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (
func (fs *FSObjects) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (
res madmin.HealResultItem, err error) {
logger.LogIf(ctx, NotImplemented{})
return res, NotImplemented{}
......@@ -1259,7 +1280,7 @@ func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results ch
}
// HealObjects - no-op for fs. Valid only for XL.
func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn healObjectFn) (e error) {
logger.LogIf(ctx, NotImplemented{})
return NotImplemented{}
}
......
......@@ -356,8 +356,6 @@ func TestFSListBuckets(t *testing.T) {
t.Fatal("Unexpected error: ", err)
}
GlobalServiceDoneCh <- struct{}{}
// Create a bucket with invalid name
if err := os.MkdirAll(pathJoin(fs.fsPath, "vo^"), 0777); err != nil {
t.Fatal("Unexpected error: ", err)
......@@ -392,7 +390,7 @@ func TestFSHealObject(t *testing.T) {
defer os.RemoveAll(disk)
obj := initFSObjects(disk, t)
_, err := obj.HealObject(context.Background(), "bucket", "object", false, false, madmin.HealDeepScan)
_, err := obj.HealObject(context.Background(), "bucket", "object", madmin.HealOpts{})
if err == nil || !isSameType(err, NotImplemented{}) {
t.Fatalf("Heal Object should return NotImplemented error ")
}
......@@ -404,7 +402,7 @@ func TestFSHealObjects(t *testing.T) {
defer os.RemoveAll(disk)
obj := initFSObjects(disk, t)
err := obj.HealObjects(context.Background(), "bucket", "prefix", nil)
err := obj.HealObjects(context.Background(), "bucket", "prefix", madmin.HealOpts{}, nil)
if err == nil || !isSameType(err, NotImplemented{}) {
t.Fatalf("Heal Object should return NotImplemented error ")
}
......
......@@ -50,9 +50,9 @@ func NewGatewayLayerWithLocker(gwLayer ObjectLayer) ObjectLayer {
type GatewayUnsupported struct{}
// CrawlAndGetDataUsage - crawl is not implemented for gateway
func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, endCh <-chan struct{}) DataUsageInfo {
func (a GatewayUnsupported) CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error {
logger.CriticalIf(ctx, errors.New("not implemented"))
return DataUsageInfo{}
return NotImplemented{}
}
// NewNSLock is a dummy stub for gateway.
......@@ -167,7 +167,7 @@ func (a GatewayUnsupported) ListBucketsHeal(ctx context.Context) (buckets []Buck
}
// HealObject - Not implemented stub
func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (h madmin.HealResultItem, e error) {
func (a GatewayUnsupported) HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (h madmin.HealResultItem, e error) {
return h, NotImplemented{}
}
......@@ -182,7 +182,7 @@ func (a GatewayUnsupported) Walk(ctx context.Context, bucket, prefix string, res
}
// HealObjects - Not implemented stub
func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn healObjectFn) (e error) {
return NotImplemented{}
}
......
......@@ -39,7 +39,6 @@ import (
humanize "github.com/dustin/go-humanize"
"github.com/minio/cli"
miniogopolicy "github.com/minio/minio-go/v6/pkg/policy"
"github.com/minio/minio/cmd"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/policy"
......@@ -1196,7 +1195,7 @@ func (a *azureObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
if err != nil {
return objInfo, azureToObjectError(err, bucket, object)
}
objMetadata["md5sum"] = cmd.ComputeCompleteMultipartMD5(uploadedParts)
objMetadata["md5sum"] = minio.ComputeCompleteMultipartMD5(uploadedParts)
_, err = objBlob.CommitBlockList(ctx, allBlocks, objProperties, objMetadata, azblob.BlobAccessConditions{})
if err != nil {
......
......@@ -679,7 +679,7 @@ func getGWContentPath(object string) string {
}
// Clean-up the stale incomplete encrypted multipart uploads. Should be run in a Go routine.
func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh chan struct{}) {
func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration, doneCh <-chan struct{}) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
......
......@@ -353,7 +353,7 @@ func parseAmzDate(amzDateStr string) (amzDate time.Time, apiErr APIErrorCode) {
// supported amz date formats.
func parseAmzDateHeader(req *http.Request) (time.Time, APIErrorCode) {
for _, amzDateHeader := range amzDateHeaders {
amzDateStr := req.Header.Get(http.CanonicalHeaderKey(amzDateHeader))
amzDateStr := req.Header.Get(amzDateHeader)
if amzDateStr != "" {
return parseAmzDate(amzDateStr)
}
......
......@@ -39,7 +39,7 @@ var leaderLockTimeout = newDynamicTimeout(time.Minute, time.Minute)
func newBgHealSequence(numDisks int) *healSequence {
reqInfo := &logger.ReqInfo{API: "BackgroundHeal"}
ctx := logger.SetReqInfo(context.Background(), reqInfo)
ctx := logger.SetReqInfo(GlobalContext, reqInfo)
hs := madmin.HealOpts{
// Remove objects that do not have read-quorum
......
......@@ -101,28 +101,24 @@ type HTTPAPIStats struct {
// Inc increments the api stats counter.
func (stats *HTTPAPIStats) Inc(api string) {
stats.Lock()
defer stats.Unlock()
if stats == nil {
return
}
stats.Lock()
defer stats.Unlock()
if stats.apiStats == nil {
stats.apiStats = make(map[string]int)
}
if _, ok := stats.apiStats[api]; ok {
stats.apiStats[api]++
return
}
stats.apiStats[api] = 1
stats.apiStats[api]++
}
// Dec increments the api stats counter.
func (stats *HTTPAPIStats) Dec(api string) {
stats.Lock()
defer stats.Unlock()
if stats == nil {
return
}
stats.Lock()
defer stats.Unlock()
if val, ok := stats.apiStats[api]; ok && val > 0 {
stats.apiStats[api]--
}
......
......@@ -664,14 +664,15 @@ func listIAMConfigItems(objectAPI ObjectLayer, pathPrefix string, dirs bool,
}
func (iamOS *IAMObjectStore) watch(sys *IAMSys) {
ctx := GlobalContext
watchDisk := func() {
for {
select {
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
case <-time.NewTimer(globalRefreshIAMInterval).C:
err := iamOS.loadAll(sys, nil)
logger.LogIf(context.Background(), err)
logger.LogIf(ctx, err)
}
}
}
......
......@@ -27,7 +27,6 @@ import (
"time"
"github.com/dgrijalva/jwt-go"
jwtgo "github.com/dgrijalva/jwt-go"
)
var (
......@@ -49,7 +48,7 @@ var jwtTestData = []struct {
"",
defaultKeyFunc,
&MapClaims{
MapClaims: jwtgo.MapClaims{
MapClaims: jwt.MapClaims{
"foo": "bar",
},
},
......@@ -61,7 +60,7 @@ var jwtTestData = []struct {
"", // autogen
defaultKeyFunc,
&MapClaims{
MapClaims: jwtgo.MapClaims{
MapClaims: jwt.MapClaims{
"foo": "bar",
"exp": float64(time.Now().Unix() - 100),
},
......@@ -74,7 +73,7 @@ var jwtTestData = []struct {
"", // autogen
defaultKeyFunc,
&MapClaims{
MapClaims: jwtgo.MapClaims{
MapClaims: jwt.MapClaims{
"foo": "bar",
"nbf": float64(time.Now().Unix() + 100),
},
......@@ -87,7 +86,7 @@ var jwtTestData = []struct {
"", // autogen
defaultKeyFunc,
&MapClaims{
MapClaims: jwtgo.MapClaims{
MapClaims: jwt.MapClaims{
"foo": "bar",
"nbf": float64(time.Now().Unix() + 100),
"exp": float64(time.Now().Unix() - 100),
......@@ -101,7 +100,7 @@ var jwtTestData = []struct {
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJmb28iOiJiYXIifQ.EhkiHkoESI_cG3NPigFrxEk9Z60_oXrOT2vGm9Pn6RDgYNovYORQmmA0zs1AoAOf09ly2Nx2YAg6ABqAYga1AcMFkJljwxTT5fYphTuqpWdy4BELeSYJx5Ty2gmr8e7RonuUztrdD5WfPqLKMm1Ozp_T6zALpRmwTIW0QPnaBXaQD90FplAg46Iy1UlDKr-Eupy0i5SLch5Q-p2ZpaL_5fnTIUDlxC3pWhJTyx_71qDI-mAA_5lE_VdroOeflG56sSmDxopPEG3bFlSu1eowyBfxtu0_CuVd-M42RU75Zc4Gsj6uV77MBtbMrf4_7M_NUTSgoIF3fRqxrj0NzihIBg",
defaultKeyFunc,
&MapClaims{
MapClaims: jwtgo.MapClaims{
MapClaims: jwt.MapClaims{
"foo": "bar",
},
},
......@@ -113,7 +112,7 @@ var jwtTestData = []struct {
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJmb28iOiJiYXIifQ.FhkiHkoESI_cG3NPigFrxEk9Z60_oXrOT2vGm9Pn6RDgYNovYORQmmA0zs1AoAOf09ly2Nx2YAg6ABqAYga1AcMFkJljwxTT5fYphTuqpWdy4BELeSYJx5Ty2gmr8e7RonuUztrdD5WfPqLKMm1Ozp_T6zALpRmwTIW0QPnaBXaQD90FplAg46Iy1UlDKr-Eupy0i5SLch5Q-p2ZpaL_5fnTIUDlxC3pWhJTyx_71qDI-mAA_5lE_VdroOeflG56sSmDxopPEG3bFlSu1eowyBfxtu0_CuVd-M42RU75Zc4Gsj6uV77MBtbMrf4_7M_NUTSgoIF3fRqxrj0NzihIBg",
nil,
&MapClaims{
MapClaims: jwtgo.MapClaims{
MapClaims: jwt.MapClaims{
"foo": "bar",
},
},
......@@ -125,7 +124,7 @@ var jwtTestData = []struct {
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJmb28iOiJiYXIifQ.FhkiHkoESI_cG3NPigFrxEk9Z60_oXrOT2vGm9Pn6RDgYNovYORQmmA0zs1AoAOf09ly2Nx2YAg6ABqAYga1AcMFkJljwxTT5fYphTuqpWdy4BELeSYJx5Ty2gmr8e7RonuUztrdD5WfPqLKMm1Ozp_T6zALpRmwTIW0QPnaBXaQD90FplAg46Iy1UlDKr-Eupy0i5SLch5Q-p2ZpaL_5fnTIUDlxC3pWhJTyx_71qDI-mAA_5lE_VdroOeflG56sSmDxopPEG3bFlSu1eowyBfxtu0_CuVd-M42RU75Zc4Gsj6uV77MBtbMrf4_7M_NUTSgoIF3fRqxrj0NzihIBg",
emptyKeyFunc,
&MapClaims{
MapClaims: jwtgo.MapClaims{
MapClaims: jwt.MapClaims{
"foo": "bar",
},
},
......@@ -137,7 +136,7 @@ var jwtTestData = []struct {
"eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJmb28iOiJiYXIifQ.FhkiHkoESI_cG3NPigFrxEk9Z60_oXrOT2vGm9Pn6RDgYNovYORQmmA0zs1AoAOf09ly2Nx2YAg6ABqAYga1AcMFkJljwxTT5fYphTuqpWdy4BELeSYJx5Ty2gmr8e7RonuUztrdD5WfPqLKMm1Ozp_T6zALpRmwTIW0QPnaBXaQD90FplAg46Iy1UlDKr-Eupy0i5SLch5Q-p2ZpaL_5fnTIUDlxC3pWhJTyx_71qDI-mAA_5lE_VdroOeflG56sSmDxopPEG3bFlSu1eowyBfxtu0_CuVd-M42RU75Zc4Gsj6uV77MBtbMrf4_7M_NUTSgoIF3fRqxrj0NzihIBg",
errorKeyFunc,
&MapClaims{
MapClaims: jwtgo.MapClaims{
MapClaims: jwt.MapClaims{
"foo": "bar",
},
},
......@@ -149,7 +148,7 @@ var jwtTestData = []struct {
"",
defaultKeyFunc,
&StandardClaims{
StandardClaims: jwtgo.StandardClaims{
StandardClaims: jwt.StandardClaims{
ExpiresAt: time.Now().Add(time.Second * 10).Unix(),
},
},
......@@ -160,7 +159,7 @@ var jwtTestData = []struct {
func mapClaimsToken(claims *MapClaims) string {
claims.SetAccessKey("test")
j := jwtgo.NewWithClaims(jwtgo.SigningMethodHS512, claims)
j := jwt.NewWithClaims(jwt.SigningMethodHS512, claims)
tk, _ := j.SignedString([]byte("HelloSecret"))
return tk
}
......@@ -168,7 +167,7 @@ func mapClaimsToken(claims *MapClaims) string {
func standardClaimsToken(claims *StandardClaims) string {
claims.AccessKey = "test"
claims.Subject = "test"
j := jwtgo.NewWithClaims(jwtgo.SigningMethodHS512, claims)
j := jwt.NewWithClaims(jwt.SigningMethodHS512, claims)
tk, _ := j.SignedString([]byte("HelloSecret"))
return tk
}
......
......@@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"net/http"
gohttp "net/http"
"strings"
xhttp "github.com/minio/minio/cmd/http"
......@@ -41,7 +40,7 @@ type Target struct {
// User-Agent to be set on each log request sent to the `endpoint`
userAgent string
logKind string
client gohttp.Client
client http.Client
}
func (h *Target) startHTTPLogger() {
......@@ -54,7 +53,7 @@ func (h *Target) startHTTPLogger() {
continue
}
req, err := gohttp.NewRequest(http.MethodPost, h.endpoint, bytes.NewBuffer(logJSON))
req, err := http.NewRequest(http.MethodPost, h.endpoint, bytes.NewBuffer(logJSON))
if err != nil {
continue
}
......@@ -78,12 +77,12 @@ func (h *Target) startHTTPLogger() {
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(endpoint, userAgent, logKind string, transport *gohttp.Transport) *Target {
func New(endpoint, userAgent, logKind string, transport *http.Transport) *Target {
h := Target{
endpoint: endpoint,
userAgent: userAgent,
logKind: strings.ToUpper(logKind),
client: gohttp.Client{
client: http.Client{
Transport: transport,
},
logCh: make(chan interface{}, 10000),
......
......@@ -17,6 +17,7 @@
package cmd
import (
"context"
"io"
"sync"
)
......@@ -80,8 +81,8 @@ func (d *naughtyDisk) calcError() (err error) {
func (d *naughtyDisk) SetDiskID(id string) {
}
func (d *naughtyDisk) CrawlAndGetDataUsage(endCh <-chan struct{}) (info DataUsageInfo, err error) {
return d.disk.CrawlAndGetDataUsage(endCh)
func (d *naughtyDisk) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (info dataUsageCache, err error) {
return d.disk.CrawlAndGetDataUsage(ctx, cache)
}
func (d *naughtyDisk) DiskInfo() (info DiskInfo, err error) {
......
......@@ -18,6 +18,7 @@ package cmd
import (
"io"
"math"
"time"
"github.com/minio/minio/pkg/hash"
......@@ -77,6 +78,11 @@ type objectHistogramInterval struct {
start, end int64
}
const (
// dataUsageBucketLen must be length of ObjectsHistogramIntervals
dataUsageBucketLen = 7
)
// ObjectsHistogramIntervals is the list of all intervals
// of object sizes to be included in objects histogram.
var ObjectsHistogramIntervals = []objectHistogramInterval{
......@@ -86,20 +92,25 @@ var ObjectsHistogramIntervals = []objectHistogramInterval{
{"BETWEEN_10_MB_AND_64_MB", 1024 * 1024 * 10, 1024*1024*64 - 1},
{"BETWEEN_64_MB_AND_128_MB", 1024 * 1024 * 64, 1024*1024*128 - 1},
{"BETWEEN_128_MB_AND_512_MB", 1024 * 1024 * 128, 1024*1024*512 - 1},
{"GREATER_THAN_512_MB", 1024 * 1024 * 512, -1},
{"GREATER_THAN_512_MB", 1024 * 1024 * 512, math.MaxInt64},
}
// DataUsageInfo represents data usage stats of the underlying Object API
type DataUsageInfo struct {
// The timestamp of when the data usage info is generated
// LastUpdate is the timestamp of when the data usage info was last updated.
// This does not indicate a full scan.
LastUpdate time.Time `json:"lastUpdate"`
ObjectsCount uint64 `json:"objectsCount"`
// Objects total size
ObjectsTotalSize uint64 `json:"objectsTotalSize"`
ObjectsTotalSize uint64 `json:"objectsTotalSize"`
// ObjectsSizesHistogram contains information on objects across all buckets.
// See ObjectsHistogramIntervals.
ObjectsSizesHistogram map[string]uint64 `json:"objectsSizesHistogram"`
BucketsCount uint64 `json:"bucketsCount"`
BucketsCount uint64 `json:"bucketsCount"`
// BucketsSizes is "bucket name" -> size.
BucketsSizes map[string]uint64 `json:"bucketsSizes"`
}
......
......@@ -274,7 +274,7 @@ func (e BucketSSEConfigNotFound) Error() string {
// BucketNameInvalid - bucketname provided is invalid.
type BucketNameInvalid GenericError
// Return string an error formatted as the given text.
// Error returns string an error formatted as the given text.
func (e BucketNameInvalid) Error() string {
return "Bucket name invalid: " + e.Bucket
}
......@@ -290,17 +290,17 @@ type ObjectNameTooLong GenericError
// ObjectNamePrefixAsSlash - object name has a slash as prefix.
type ObjectNamePrefixAsSlash GenericError
// Return string an error formatted as the given text.
// Error returns string an error formatted as the given text.
func (e ObjectNameInvalid) Error() string {
return "Object name invalid: " + e.Bucket + "#" + e.Object
}
// Return string an error formatted as the given text.
// Error returns string an error formatted as the given text.
func (e ObjectNameTooLong) Error() string {
return "Object name too long: " + e.Bucket + "#" + e.Object
}
// Return string an error formatted as the given text.
// Error returns string an error formatted as the given text.
func (e ObjectNamePrefixAsSlash) Error() string {
return "Object name contains forward slash as pefix: " + e.Bucket + "#" + e.Object
}
......@@ -308,7 +308,7 @@ func (e ObjectNamePrefixAsSlash) Error() string {
// AllAccessDisabled All access to this object has been disabled
type AllAccessDisabled GenericError
// Return string an error formatted as the given text.
// Error returns string an error formatted as the given text.
func (e AllAccessDisabled) Error() string {
return "All access to this object has been disabled"
}
......@@ -316,7 +316,7 @@ func (e AllAccessDisabled) Error() string {
// IncompleteBody You did not provide the number of bytes specified by the Content-Length HTTP header.
type IncompleteBody GenericError
// Return string an error formatted as the given text.
// Error returns string an error formatted as the given text.
func (e IncompleteBody) Error() string {
return e.Bucket + "#" + e.Object + "has incomplete body"
}
......
......@@ -59,7 +59,7 @@ type ObjectLayer interface {
// Storage operations.
Shutdown(context.Context) error
CrawlAndGetDataUsage(context.Context, <-chan struct{}) DataUsageInfo
CrawlAndGetDataUsage(ctx context.Context, updates chan<- DataUsageInfo) error
StorageInfo(ctx context.Context, local bool) StorageInfo // local queries only local disks
// Bucket operations.
......@@ -101,9 +101,8 @@ type ObjectLayer interface {
ReloadFormat(ctx context.Context, dryRun bool) error
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error)
HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error
HealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error)
HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn healObjectFn) error
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
......
......@@ -51,10 +51,6 @@ import (
const (
// MinIO meta bucket.
minioMetaBucket = ".minio.sys"
// Background ops meta prefix
backgroundOpsMetaPrefix = "background-ops"
// MinIO Stats meta prefix.
minioMetaBackgroundOpsBucket = minioMetaBucket + SlashSeparator + backgroundOpsMetaPrefix
// Multipart meta prefix.
mpartMetaPrefix = "multipart"
// MinIO Multipart meta prefix.
......@@ -77,7 +73,7 @@ func isMinioMetaBucketName(bucket string) bool {
return bucket == minioMetaBucket ||
bucket == minioMetaMultipartBucket ||
bucket == minioMetaTmpBucket ||
bucket == minioMetaBackgroundOpsBucket
bucket == dataUsageBucket
}
// IsValidBucketName verifies that a bucket name is in accordance with
......
......@@ -1214,7 +1214,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}
if tags := r.Header.Get(http.CanonicalHeaderKey(xhttp.AmzObjectTagging)); tags != "" {
if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" {
metadata[xhttp.AmzObjectTagging], err = extractTags(ctx, tags)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
......
......@@ -17,6 +17,7 @@
package cmd
import (
"context"
"io"
)
......@@ -38,8 +39,8 @@ func (p *posixDiskIDCheck) IsOnline() bool {
return storedDiskID == p.diskID
}
func (p *posixDiskIDCheck) CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, error) {
return p.storage.CrawlAndGetDataUsage(endCh)
func (p *posixDiskIDCheck) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
return p.storage.CrawlAndGetDataUsage(ctx, cache)
}
func (p *posixDiskIDCheck) Hostname() string {
......
......@@ -338,8 +338,8 @@ func (s *posix) waitForLowActiveIO() {
}
}
func (s *posix) CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, error) {
dataUsageInfo := updateUsage(s.diskPath, endCh, s.waitForLowActiveIO, func(item Item) (int64, error) {
func (s *posix) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
dataUsageInfo, err := updateUsage(ctx, s.diskPath, cache, s.waitForLowActiveIO, func(item Item) (int64, error) {
// Look for `xl.json' at the leaf.
if !strings.HasSuffix(item.Path, SlashSeparator+xlMetaJSONFile) {
// if no xl.json found, skip the file.
......@@ -353,14 +353,20 @@ func (s *posix) CrawlAndGetDataUsage(endCh <-chan struct{}) (DataUsageInfo, erro
meta, err := xlMetaV1UnmarshalJSON(context.Background(), xlMetaBuf)
if err != nil {
return 0, errSkipFile
return 0, nil
}
return meta.Stat.Size, nil
})
dataUsageInfo.LastUpdate = UTCNow()
atomic.StoreUint64(&s.totalUsed, dataUsageInfo.ObjectsTotalSize)
if err != nil {
return dataUsageInfo, err
}
dataUsageInfo.Info.LastUpdate = time.Now()
total := dataUsageInfo.sizeRecursive(dataUsageInfo.Info.Name)