...
 
Commits (3)
......@@ -226,7 +226,7 @@ func getClaimsFromToken(r *http.Request) (map[string]interface{}, error) {
if err != nil {
// Base64 decoding fails, we should log to indicate
// something is malforming the request sent by client.
logger.LogIf(context.Background(), err, logger.Application)
logger.LogIf(r.Context(), err, logger.Application)
return nil, errAuthentication
}
claims.MapClaims[iampolicy.SessionPolicyName] = string(spBytes)
......@@ -246,7 +246,7 @@ func checkClaimsFromToken(r *http.Request, cred auth.Credentials) (map[string]in
}
claims, err := getClaimsFromToken(r)
if err != nil {
return nil, toAPIErrorCode(context.Background(), err)
return nil, toAPIErrorCode(r.Context(), err)
}
return claims, ErrNone
}
......@@ -460,7 +460,7 @@ func (a authHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.handler.ServeHTTP(w, r)
return
}
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrSignatureVersionNotSupported), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrSignatureVersionNotSupported), r.URL, guessIsBrowserReq(r))
}
// isPutActionAllowed - check if PUT operation is allowed on the resource, this
......
......@@ -66,8 +66,7 @@ func waitForLowHTTPReq(tolerance int32) {
}
// Wait for heal requests and process them
func (h *healRoutine) run() {
ctx := context.Background()
func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) {
for {
select {
case task, ok := <-h.tasks:
......@@ -83,18 +82,18 @@ func (h *healRoutine) run() {
bucket, object := path2BucketObject(task.path)
switch {
case bucket == "" && object == "":
res, err = bgHealDiskFormat(ctx, task.opts)
res, err = healDiskFormat(ctx, objAPI, task.opts)
case bucket != "" && object == "":
res, err = bgHealBucket(ctx, bucket, task.opts)
res, err = objAPI.HealBucket(ctx, bucket, task.opts.DryRun, task.opts.Remove)
case bucket != "" && object != "":
res, err = bgHealObject(ctx, bucket, object, task.opts)
res, err = objAPI.HealObject(ctx, bucket, object, task.opts)
}
if task.responseCh != nil {
task.responseCh <- healResult{result: res, err: err}
}
case <-h.doneCh:
return
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
}
}
......@@ -108,22 +107,10 @@ func initHealRoutine() *healRoutine {
}
func startBackgroundHealing() {
ctx := context.Background()
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
func startBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
// Run the background healer
globalBackgroundHealRoutine = initHealRoutine()
go globalBackgroundHealRoutine.run()
go globalBackgroundHealRoutine.run(ctx, objAPI)
// Launch the background healer sequence to track
// background healing operations
......@@ -133,20 +120,14 @@ func startBackgroundHealing() {
globalBackgroundHealState.LaunchNewHealSequence(nh)
}
func initBackgroundHealing() {
go startBackgroundHealing()
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
go startBackgroundHealing(ctx, objAPI)
}
// bgHealDiskFormat - heals format.json, return value indicates if a
// healDiskFormat - heals format.json, return value indicates if a
// failure error occurred.
func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
res, err := objectAPI.HealFormat(ctx, opts.DryRun)
func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpts) (madmin.HealResultItem, error) {
res, err := objAPI.HealFormat(ctx, opts.DryRun)
// return any error, ignore error returned when disks have
// already healed.
......@@ -167,24 +148,3 @@ func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealRes
return res, nil
}
// bghealBucket - traverses and heals given bucket
func bgHealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealBucket(ctx, bucket, opts.DryRun, opts.Remove)
}
// bgHealObject - heal the given object and record result
func bgHealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Get current object layer instance.
objectAPI := newObjectLayerWithoutSafeModeFn()
if objectAPI == nil {
return madmin.HealResultItem{}, errServerNotInitialized
}
return objectAPI.HealObject(ctx, bucket, object, opts)
}
......@@ -25,32 +25,19 @@ import (
const defaultMonitorNewDiskInterval = time.Minute * 10
func initLocalDisksAutoHeal() {
go monitorLocalDisksAndHeal()
func initLocalDisksAutoHeal(ctx context.Context, objAPI ObjectLayer) {
go monitorLocalDisksAndHeal(ctx, objAPI)
}
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
// 1. Only the concerned erasure set will be listed and healed
// 2. Only the node hosting the disk is responsible to perform the heal
func monitorLocalDisksAndHeal() {
// Wait until the object layer is ready
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) {
z, ok := objAPI.(*xlZones)
if !ok {
return
}
ctx := context.Background()
var bgSeq *healSequence
var found bool
......@@ -64,64 +51,67 @@ func monitorLocalDisksAndHeal() {
// Perform automatic disk healing when a disk is replaced locally.
for {
time.Sleep(defaultMonitorNewDiskInterval)
// Attempt a heal as the server starts-up first.
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
for i, ep := range globalEndpoints {
localDisksToHeal := Endpoints{}
for _, endpoint := range ep.Endpoints {
if !endpoint.IsLocal {
continue
select {
case <-ctx.Done():
return
case <-time.After(defaultMonitorNewDiskInterval):
// Attempt a heal as the server starts-up first.
localDisksInZoneHeal := make([]Endpoints, len(z.zones))
for i, ep := range globalEndpoints {
localDisksToHeal := Endpoints{}
for _, endpoint := range ep.Endpoints {
if !endpoint.IsLocal {
continue
}
// Try to connect to the current endpoint
// and reformat if the current disk is not formatted
_, _, err := connectEndpoint(endpoint)
if err == errUnformattedDisk {
localDisksToHeal = append(localDisksToHeal, endpoint)
}
}
// Try to connect to the current endpoint
// and reformat if the current disk is not formatted
_, _, err := connectEndpoint(endpoint)
if err == errUnformattedDisk {
localDisksToHeal = append(localDisksToHeal, endpoint)
if len(localDisksToHeal) == 0 {
continue
}
localDisksInZoneHeal[i] = localDisksToHeal
}
if len(localDisksToHeal) == 0 {
continue
}
localDisksInZoneHeal[i] = localDisksToHeal
}
// Reformat disks
bgSeq.sourceCh <- SlashSeparator
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- nopHeal
var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal))
// Compute the list of erasure set to heal
for i, localDisksToHeal := range localDisksInZoneHeal {
var erasureSetToHeal []int
for _, endpoint := range localDisksToHeal {
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
if err != nil {
logger.LogIf(ctx, err)
continue
// Reformat disks
bgSeq.sourceCh <- SlashSeparator
// Ensure that reformatting disks is finished
bgSeq.sourceCh <- nopHeal
var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal))
// Compute the list of erasure set to heal
for i, localDisksToHeal := range localDisksInZoneHeal {
var erasureSetToHeal []int
for _, endpoint := range localDisksToHeal {
// Load the new format of this passed endpoint
_, format, err := connectEndpoint(endpoint)
if err != nil {
logger.LogIf(ctx, err)
continue
}
// Calculate the set index where the current endpoint belongs
setIndex, _, err := findDiskIndex(z.zones[i].format, format)
if err != nil {
logger.LogIf(ctx, err)
continue
}
erasureSetToHeal = append(erasureSetToHeal, setIndex)
}
// Calculate the set index where the current endpoint belongs
setIndex, _, err := findDiskIndex(z.zones[i].format, format)
if err != nil {
logger.LogIf(ctx, err)
continue
}
erasureSetToHeal = append(erasureSetToHeal, setIndex)
erasureSetInZoneToHeal[i] = erasureSetToHeal
}
erasureSetInZoneToHeal[i] = erasureSetToHeal
}
// Heal all erasure sets that need
for i, erasureSetToHeal := range erasureSetInZoneToHeal {
for _, setIndex := range erasureSetToHeal {
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex])
if err != nil {
logger.LogIf(ctx, err)
// Heal all erasure sets that need
for i, erasureSetToHeal := range erasureSetInZoneToHeal {
for _, setIndex := range erasureSetToHeal {
err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex])
if err != nil {
logger.LogIf(ctx, err)
}
}
}
}
......
......@@ -30,76 +30,30 @@ const (
bgLifecycleTick = time.Hour
)
type lifecycleOps struct {
LastActivity time.Time
}
// Register to the daily objects listing
var globalLifecycleOps = &lifecycleOps{}
func getLocalBgLifecycleOpsStatus() BgLifecycleOpsStatus {
return BgLifecycleOpsStatus{
LastActivity: globalLifecycleOps.LastActivity,
}
}
// initDailyLifecycle starts the routine that receives the daily
// listing of all objects and applies any matching bucket lifecycle
// rules.
func initDailyLifecycle() {
go startDailyLifecycle()
func initDailyLifecycle(ctx context.Context, objAPI ObjectLayer) {
go startDailyLifecycle(ctx, objAPI)
}
func startDailyLifecycle() {
var objAPI ObjectLayer
var ctx = context.Background()
// Wait until the object API is ready
func startDailyLifecycle(ctx context.Context, objAPI ObjectLayer) {
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
// Calculate the time of the last lifecycle operation in all peers node of the cluster
computeLastLifecycleActivity := func(status []BgOpsStatus) time.Time {
var lastAct time.Time
for _, st := range status {
if st.LifecycleOps.LastActivity.After(lastAct) {
lastAct = st.LifecycleOps.LastActivity
select {
case <-ctx.Done():
return
case <-time.NewTimer(bgLifecycleInterval).C:
// Perform one lifecycle operation
err := lifecycleRound(ctx, objAPI)
switch err.(type) {
case OperationTimedOut:
// Unable to hold a lock means there is another
// caller holding write lock, ignore and try next round.
continue
default:
logger.LogIf(ctx, err)
}
}
return lastAct
}
for {
// Check if we should perform lifecycle ops based on the last lifecycle activity, sleep one hour otherwise
allLifecycleStatus := []BgOpsStatus{
{LifecycleOps: getLocalBgLifecycleOpsStatus()},
}
if globalIsDistXL {
allLifecycleStatus = append(allLifecycleStatus, globalNotificationSys.BackgroundOpsStatus()...)
}
lastAct := computeLastLifecycleActivity(allLifecycleStatus)
if !lastAct.IsZero() && time.Since(lastAct) < bgLifecycleInterval {
time.Sleep(bgLifecycleTick)
}
// Perform one lifecycle operation
err := lifecycleRound(ctx, objAPI)
switch err.(type) {
// Unable to hold a lock means there is another
// instance doing the lifecycle round round
case OperationTimedOut:
time.Sleep(bgLifecycleTick)
default:
logger.LogIf(ctx, err)
time.Sleep(time.Minute)
continue
}
}
}
......@@ -107,13 +61,6 @@ func startDailyLifecycle() {
var lifecycleLockTimeout = newDynamicTimeout(60*time.Second, time.Second)
func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
// Lock to avoid concurrent lifecycle ops from other nodes
sweepLock := objAPI.NewNSLock(ctx, "system", "daily-lifecycle-ops")
if err := sweepLock.GetLock(lifecycleLockTimeout); err != nil {
return err
}
defer sweepLock.Unlock()
buckets, err := objAPI.ListBuckets(ctx)
if err != nil {
return err
......
......@@ -49,50 +49,19 @@ const (
)
// initDataUsageStats will start the crawler unless disabled.
func initDataUsageStats() {
func initDataUsageStats(ctx context.Context, objAPI ObjectLayer) {
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn {
go runDataUsageInfoUpdateRoutine()
go runDataUsageInfo(ctx, objAPI)
}
}
// runDataUsageInfoUpdateRoutine will contain the main crawler.
func runDataUsageInfoUpdateRoutine() {
// Wait until the object layer is ready
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
runDataUsageInfo(GlobalContext, objAPI)
}
var dataUsageLockTimeout = lifecycleLockTimeout
func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) {
// Make sure only 1 crawler is running on the cluster.
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info")
for {
err := locker.GetLock(dataUsageLockTimeout)
if err != nil {
time.Sleep(5 * time.Minute)
continue
}
// Break without unlocking, this node will acquire
// data usage calculator role for its lifetime.
break
}
for {
select {
case <-ctx.Done():
locker.Unlock()
return
// Wait before starting next cycle and wait on startup.
case <-time.NewTimer(dataUsageStartDelay).C:
// Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results)
err := objAPI.CrawlAndGetDataUsage(ctx, results)
......
......@@ -229,7 +229,7 @@ func (c *diskCache) toClear() uint64 {
}
// Purge cache entries that were not accessed.
func (c *diskCache) purge(ctx context.Context, doneCh <-chan struct{}) {
func (c *diskCache) purge(ctx context.Context) {
if c.diskUsageLow() {
return
}
......
......@@ -689,17 +689,17 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
if migrateSw {
go c.migrateCacheFromV1toV2(ctx)
}
go c.gc(ctx, GlobalServiceDoneCh)
go c.gc(ctx)
return c, nil
}
func (c *cacheObjects) gc(ctx context.Context, doneCh <-chan struct{}) {
func (c *cacheObjects) gc(ctx context.Context) {
ticker := time.NewTicker(cacheGCInterval)
defer ticker.Stop()
for {
select {
case <-doneCh:
case <-ctx.Done():
return
case <-ticker.C:
if c.migrating {
......@@ -714,7 +714,7 @@ func (c *cacheObjects) gc(ctx context.Context, doneCh <-chan struct{}) {
go func(d *diskCache) {
defer wg.Done()
d.resetGCCounter()
d.purge(ctx, doneCh)
d.purge(ctx)
}(dcache)
}
wg.Wait()
......
......@@ -267,7 +267,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(context.Background(), globalCacheConfig)
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
......@@ -277,7 +277,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
// Populate existing buckets to the etcd backend
if globalDNSConfig != nil {
buckets, err := newObject.ListBuckets(context.Background())
buckets, err := newObject.ListBuckets(GlobalContext)
if err != nil {
logger.Fatal(err, "Unable to list buckets")
}
......
......@@ -87,7 +87,7 @@ func setRequestHeaderSizeLimitHandler(h http.Handler) http.Handler {
// of the user-defined metadata to 2 KB.
func (h requestHeaderSizeLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if isHTTPHeaderSizeTooLarge(r.Header) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrMetadataTooLarge), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrMetadataTooLarge), r.URL, guessIsBrowserReq(r))
return
}
h.Handler.ServeHTTP(w, r)
......@@ -130,7 +130,7 @@ func filterReservedMetadata(h http.Handler) http.Handler {
// would be treated as metadata.
func (h reservedMetadataHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if containsReservedMetadata(r.Header) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrUnsupportedMetadata), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrUnsupportedMetadata), r.URL, guessIsBrowserReq(r))
return
}
h.Handler.ServeHTTP(w, r)
......@@ -371,14 +371,14 @@ func (h timeValidityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// All our internal APIs are sensitive towards Date
// header, for all requests where Date header is not
// present we will reject such clients.
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(errCode), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(errCode), r.URL, guessIsBrowserReq(r))
return
}
// Verify if the request date header is shifted by less than globalMaxSkewTime parameter in the past
// or in the future, reject request otherwise.
curTime := UTCNow()
if curTime.Sub(amzDate) > globalMaxSkewTime || amzDate.Sub(curTime) > globalMaxSkewTime {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrRequestTimeTooSkewed), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrRequestTimeTooSkewed), r.URL, guessIsBrowserReq(r))
return
}
}
......@@ -509,14 +509,14 @@ func (h resourceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// If bucketName is present and not objectName check for bucket level resource queries.
if bucketName != "" && objectName == "" {
if ignoreNotImplementedBucketResources(r) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
return
}
}
// If bucketName and objectName are present check for its resource queries.
if bucketName != "" && objectName != "" {
if ignoreNotImplementedObjectResources(r) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
return
}
}
......@@ -589,20 +589,20 @@ func hasMultipleAuth(r *http.Request) bool {
func (h requestValidityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Check for bad components in URL path.
if hasBadPathComponent(r.URL.Path) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r))
return
}
// Check for bad components in URL query values.
for _, vv := range r.URL.Query() {
for _, v := range vv {
if hasBadPathComponent(v) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r))
return
}
}
}
if hasMultipleAuth(r) {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r))
return
}
h.handler.ServeHTTP(w, r)
......@@ -648,10 +648,10 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
sr, err := globalDNSConfig.Get(bucket)
if err != nil {
if err == dns.ErrNoEntriesFound {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNoSuchBucket),
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNoSuchBucket),
r.URL, guessIsBrowserReq(r))
} else {
writeErrorResponse(context.Background(), w, toAPIError(context.Background(), err),
writeErrorResponse(r.Context(), w, toAPIError(r.Context(), err),
r.URL, guessIsBrowserReq(r))
}
return
......@@ -697,9 +697,9 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques
sr, err := globalDNSConfig.Get(bucket)
if err != nil {
if err == dns.ErrNoEntriesFound {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNoSuchBucket), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNoSuchBucket), r.URL, guessIsBrowserReq(r))
} else {
writeErrorResponse(context.Background(), w, toAPIError(context.Background(), err), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, toAPIError(r.Context(), err), r.URL, guessIsBrowserReq(r))
}
return
}
......@@ -772,7 +772,7 @@ type criticalErrorHandler struct{ handler http.Handler }
func (h criticalErrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err == logger.ErrCritical { // handle
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r))
} else if err != nil {
panic(err) // forward other panic calls
}
......@@ -791,7 +791,7 @@ func (h sseTLSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest))
} else {
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest), r.URL, guessIsBrowserReq(r))
writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest), r.URL, guessIsBrowserReq(r))
}
return
}
......
......@@ -18,7 +18,6 @@ package cmd
import (
"context"
"fmt"
"sync"
"time"
......@@ -126,65 +125,36 @@ func durationToNextHealRound(lastHeal time.Time) time.Duration {
}
// Healing leader will take the charge of healing all erasure sets
func execLeaderTasks(z *xlZones) {
ctx := context.Background()
// Hold a lock so only one server performs auto-healing
leaderLock := z.NewNSLock(ctx, minioMetaBucket, "leader")
for {
err := leaderLock.GetLock(leaderLockTimeout)
if err == nil {
break
}
time.Sleep(leaderTick)
}
// Hold a lock for healing the erasure set
zeroDuration := time.Millisecond
zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration)
lastScanTime := time.Now() // So that we don't heal immediately, but after one month.
func execLeaderTasks(ctx context.Context, z *xlZones) {
lastScanTime := UTCNow() // So that we don't heal immediately, but after one month.
for {
time.Sleep(durationToNextHealRound(lastScanTime))
for _, zone := range z.zones {
// Heal set by set
for i, set := range zone.sets {
setLock := z.zones[0].NewNSLock(ctx, "system", fmt.Sprintf("erasure-set-heal-%d", i))
if err := setLock.GetLock(zeroDynamicTimeout); err != nil {
logger.LogIf(ctx, err)
continue
}
if err := healErasureSet(ctx, i, set); err != nil {
setLock.Unlock()
logger.LogIf(ctx, err)
continue
select {
case <-ctx.Done():
return
case <-time.NewTimer(durationToNextHealRound(lastScanTime)).C:
for _, zone := range z.zones {
// Heal set by set
for i, set := range zone.sets {
if err := healErasureSet(ctx, i, set); err != nil {
logger.LogIf(ctx, err)
continue
}
}
setLock.Unlock()
}
lastScanTime = UTCNow()
}
lastScanTime = time.Now()
}
}
func startGlobalHeal() {
var objAPI ObjectLayer
for {
objAPI = newObjectLayerWithoutSafeModeFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
func startGlobalHeal(ctx context.Context, objAPI ObjectLayer) {
zones, ok := objAPI.(*xlZones)
if !ok {
return
}
execLeaderTasks(zones)
execLeaderTasks(ctx, zones)
}
func initGlobalHeal() {
go startGlobalHeal()
func initGlobalHeal(ctx context.Context, objAPI ObjectLayer) {
go startGlobalHeal(ctx, objAPI)
}
......@@ -293,9 +293,7 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
}
// Start lock maintenance from all lock servers.
func startLockMaintenance() {
var ctx = context.Background()
func startLockMaintenance(ctx context.Context) {
// Wait until the object API is ready
// no need to start the lock maintenance
// if ObjectAPI is not initialized.
......@@ -317,7 +315,7 @@ func startLockMaintenance() {
for {
// Verifies every minute for locks held more than 2 minutes.
select {
case <-GlobalServiceDoneCh:
case <-ctx.Done():
return
case <-ticker.C:
// Start with random sleep time, so as to avoid
......@@ -357,5 +355,5 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
}
}
go startLockMaintenance()
go startLockMaintenance(GlobalContext)
}
......@@ -126,6 +126,13 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) {
return d.disk.DeleteVol(volume)
}
func (d *naughtyDisk) WalkSplunk(volume, path, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if err := d.calcError(); err != nil {
return nil, err
}
return d.disk.WalkSplunk(volume, path, marker, endWalkCh)
}
func (d *naughtyDisk) Walk(volume, path, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if err := d.calcError(); err != nil {
return nil, err
......
......@@ -267,24 +267,6 @@ func (sys *NotificationSys) BackgroundHealStatus() []madmin.BgHealState {
return states
}
// BackgroundOpsStatus - returns the status of all background operations of all peers
func (sys *NotificationSys) BackgroundOpsStatus() []BgOpsStatus {
states := make([]BgOpsStatus, len(sys.peerClients))
for idx, client := range sys.peerClients {
if client == nil {
continue
}
st, err := client.BackgroundOpsStatus()
if err != nil {
logger.LogIf(context.Background(), err)
} else {
states[idx] = st
}
}
return states
}
// StartProfiling - start profiling on remote peers, by initiating a remote RPC.
func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr {
ng := WithNPeers(len(sys.peerClients))
......
......@@ -614,32 +614,6 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error)
return state, err
}
// BgLifecycleOpsStatus describes the status
// of the background lifecycle operations
type BgLifecycleOpsStatus struct {
LastActivity time.Time
}
// BgOpsStatus describes the status of all operations performed
// in background such as auto-healing and lifecycle.
// Notice: We need to increase peer REST API version when adding
// new fields to this struct.
type BgOpsStatus struct {
LifecycleOps BgLifecycleOpsStatus
}
func (client *peerRESTClient) BackgroundOpsStatus() (BgOpsStatus, error) {
respBody, err := client.call(peerRESTMethodBackgroundOpsStatus, nil, nil, -1)
if err != nil {
return BgOpsStatus{}, err
}
defer http.DrainBody(respBody)
state := BgOpsStatus{}
err = gob.NewDecoder(respBody).Decode(&state)
return state, err
}
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) {
values := make(url.Values)
values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll))
......
......@@ -34,7 +34,6 @@ const (
peerRESTMethodServerUpdate = "/serverupdate"
peerRESTMethodSignalService = "/signalservice"
peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus"
peerRESTMethodBackgroundOpsStatus = "/backgroundopsstatus"
peerRESTMethodGetLocks = "/getlocks"
peerRESTMethodBucketPolicyRemove = "/removebucketpolicy"
peerRESTMethodLoadUser = "/loaduser"
......
......@@ -1129,22 +1129,6 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
func (s *peerRESTServer) BackgroundOpsStatusHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
return
}
ctx := newContext(r, w, "BackgroundOpsStatus")
state := BgOpsStatus{
LifecycleOps: getLocalBgLifecycleOpsStatus(),
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
// ConsoleLogHandler sends console logs of this node back to peer rest client
func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
......@@ -1230,8 +1214,6 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketLifecycleRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketLifecycleHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketEncryptionSet).HandlerFunc(httpTraceHdrs(server.SetBucketSSEConfigHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketEncryptionRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketSSEConfigHandler)).Queries(restQueries(peerRESTBucket)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundOpsStatus).HandlerFunc(server.BackgroundOpsStatusHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(httpTraceHdrs(server.ListenHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler)
......
......@@ -117,6 +117,13 @@ func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive
return p.storage.Walk(volume, dirPath, marker, recursive, leafFile, readMetadataFn, endWalkCh)
}
func (p *posixDiskIDCheck) WalkSplunk(volume, dirPath string, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
}
return p.storage.WalkSplunk(volume, dirPath, marker, endWalkCh)
}
func (p *posixDiskIDCheck) ListDir(volume, dirPath string, count int, leafFile string) ([]string, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
......
......@@ -645,6 +645,129 @@ func (s *posix) DeleteVol(volume string) (err error) {
return nil
}
const guidSplunk = "guidSplunk"
// ListDirSplunk - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing SlashSeparator.
func (s *posix) ListDirSplunk(volume, dirPath string, count int) (entries []string, err error) {
guidIndex := strings.Index(dirPath, guidSplunk)
if guidIndex != -1 {
return nil, nil
}
const receiptJSON = "receipt.json"
atomic.AddInt32(&s.activeIOCount, 1)
defer func() {
atomic.AddInt32(&s.activeIOCount, -1)
}()
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat((volumeDir))
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
dirPath = pathJoin(volumeDir, dirPath)
if count > 0 {
entries, err = readDirN(dirPath, count)
} else {
entries, err = readDir(dirPath)
}
for i, entry := range entries {
if entry != receiptJSON {
continue
}
if _, serr := os.Stat(pathJoin(dirPath, entry, xlMetaJSONFile)); serr == nil {
entries[i] = strings.TrimSuffix(entry, SlashSeparator)
}
}
return entries, err
}
// WalkSplunk - is a sorted walker which returns file entries in lexically
// sorted order, additionally along with metadata about each of those entries.
// Implemented specifically for Splunk backend structure and List call with
// delimiter as "guidSplunk"
func (s *posix) WalkSplunk(volume, dirPath, marker string, endWalkCh <-chan struct{}) (ch chan FileInfo, err error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
ch = make(chan FileInfo)
go func() {
defer close(ch)
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) {
entries, err := s.ListDirSplunk(volume, dirPath, -1)
if err != nil {
return
}
if len(entries) == 0 {
return true, nil
}
sort.Strings(entries)
return false, filterMatchingPrefix(entries, dirEntry)
}
walkResultCh := startTreeWalk(context.Background(), volume, dirPath, marker, true, listDir, endWalkCh)
for {
walkResult, ok := <-walkResultCh
if !ok {
return
}
var fi FileInfo
if HasSuffix(walkResult.entry, SlashSeparator) {
fi = FileInfo{
Volume: volume,
Name: walkResult.entry,
Mode: os.ModeDir,
}
} else {
// Dynamic time delay.
t := UTCNow()
buf, err := s.ReadAll(volume, pathJoin(walkResult.entry, xlMetaJSONFile))
sleepDuration(time.Since(t), 10.0)
if err != nil {
continue
}
fi = readMetadata(buf, volume, walkResult.entry)
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
// Walk - is a sorted walker which returns file entries in lexically
// sorted order, additionally along with metadata about each of those entries.
func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile string,
......@@ -672,11 +795,15 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st
return nil, err
}
ch = make(chan FileInfo)
// buffer channel matches the S3 ListObjects implementation
ch = make(chan FileInfo, maxObjectList)
go func() {
defer close(ch)
listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) {
// Dynamic time delay.
t := UTCNow()
entries, err := s.ListDir(volume, dirPath, -1, leafFile)
sleepDuration(time.Since(t), 10.0)
if err != nil {
return
}
......@@ -701,7 +828,10 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st
Mode: os.ModeDir,
}
} else {
// Dynamic time delay.
t := UTCNow()
buf, err := s.ReadAll(volume, pathJoin(walkResult.entry, leafFile))
sleepDuration(time.Since(t), 10.0)
if err != nil {
continue
}
......
......@@ -26,6 +26,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"
"github.com/minio/cli"
"github.com/minio/minio/cmd/config"
......@@ -285,6 +286,27 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error)
return nil
}
func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) {
// Make sure only 1 crawler is running on the cluster.
locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader")
for {
err := locker.GetLock(leaderLockTimeout)
if err != nil {
time.Sleep(leaderTick)
continue
}
break
// No unlock for "leader" lock.
}
if globalIsXL {
initGlobalHeal(ctx, objAPI)
}
initDataUsageStats(ctx, objAPI)
initDailyLifecycle(ctx, objAPI)
}
// serverMain handler called for 'minio server' command.
func serverMain(ctx *cli.Context) {
if ctx.Args().First() == "help" || !endpointsPresent(ctx) {
......@@ -401,12 +423,11 @@ func serverMain(ctx *cli.Context) {
// Enable healing to heal drives if possible
if globalIsXL {
initBackgroundHealing()
initLocalDisksAutoHeal()
initGlobalHeal()
initBackgroundHealing(GlobalContext, newObject)
initLocalDisksAutoHeal(GlobalContext, newObject)
}
buckets, err := newObject.ListBuckets(context.Background())
buckets, err := newObject.ListBuckets(GlobalContext)
if err != nil {
logger.Fatal(err, "Unable to list buckets")
}
......@@ -416,7 +437,7 @@ func serverMain(ctx *cli.Context) {
if globalCacheConfig.Enabled {
// initialize the new disk cache objects.
var cacheAPI CacheObjectLayer
cacheAPI, err = newServerCacheObjects(context.Background(), globalCacheConfig)
cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig)
logger.FatalIf(err, "Unable to initialize disk caching")
globalObjLayerMutex.Lock()
......@@ -429,8 +450,7 @@ func serverMain(ctx *cli.Context) {
initFederatorBackend(buckets, newObject)
}
initDataUsageStats()
initDailyLifecycle()
go startBackgroundOps(GlobalContext, newObject)
// Disable safe mode operation, after all initialization is over.
globalObjLayerMutex.Lock()
......
......@@ -45,6 +45,8 @@ type StorageAPI interface {
// Walk in sorted order directly on disk.
Walk(volume, dirPath string, marker string, recursive bool, leafFile string,
readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error)
// Walk in sorted order directly on disk.
WalkSplunk(volume, dirPath string, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error)
// File operations.
ListDir(volume, dirPath string, count int, leafFile string) ([]string, error)
......
......@@ -333,6 +333,40 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf
return int64(n), err
}
func (client *storageRESTClient) WalkSplunk(volume, dirPath, marker string, endWalkCh <-chan struct{}) (chan FileInfo, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTDirPath, dirPath)
values.Set(storageRESTMarkerPath, marker)
respBody, err := client.call(storageRESTMethodWalkSplunk, values, nil, -1)
if err != nil {
return nil, err
}
ch := make(chan FileInfo)
go func() {
defer close(ch)
defer http.DrainBody(respBody)
decoder := gob.NewDecoder(respBody)
for {
var fi FileInfo
if gerr := decoder.Decode(&fi); gerr != nil {
// Upon error return
return
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
func (client *storageRESTClient) Walk(volume, dirPath, marker string, recursive bool, leafFile string,
readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) {
values := make(url.Values)
......
......@@ -40,6 +40,7 @@ const (
storageRESTMethodReadFileStream = "/readfilestream"
storageRESTMethodListDir = "/listdir"
storageRESTMethodWalk = "/walk"
storageRESTMethodWalkSplunk = "/walksplunk"
storageRESTMethodDeleteFile = "/deletefile"
storageRESTMethodDeleteFileBulk = "/deletefilebulk"
storageRESTMethodDeletePrefixes = "/deleteprefixes"
......
......@@ -428,6 +428,30 @@ func readMetadata(buf []byte, volume, entry string) FileInfo {
}
}
// WalkHandler - remote caller to start walking at a requested directory path.
func (s *storageRESTServer) WalkSplunkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
dirPath := vars[storageRESTDirPath]
markerPath := vars[storageRESTMarkerPath]
w.Header().Set(xhttp.ContentType, "text/event-stream")
encoder := gob.NewEncoder(w)
fch, err := s.storage.WalkSplunk(volume, dirPath, markerPath, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
}
for fi := range fch {
encoder.Encode(&fi)
}
w.(http.Flusher).Flush()
}
// WalkHandler - remote caller to start walking at a requested directory path.
func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
......@@ -446,12 +470,10 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request)
w.Header().Set(xhttp.ContentType, "text/event-stream")
encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w)
defer done()
fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, leafFile, readMetadata, r.Context().Done())
if err != nil {
logger.LogIf(r.Context(), err)
s.writeErrorResponse(w, err)
return
}
for fi := range fch {
......@@ -760,6 +782,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTLeafFile)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkSplunk).HandlerFunc(httpTraceHdrs(server.WalkSplunkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeletePrefixes).HandlerFunc(httpTraceHdrs(server.DeletePrefixesHandler)).
Queries(restQueries(storageRESTVolume)...)
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)).
......
......@@ -96,7 +96,8 @@ type xlSets struct {
distributionAlgo string
// Merge tree walk
pool *MergeWalkPool
pool *MergeWalkPool
poolSplunk *MergeWalkPool
mrfMU sync.Mutex
mrfUploads map[string]int
......@@ -289,6 +290,7 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS
disksConnectDoneCh: make(chan struct{}),
distributionAlgo: format.XL.DistributionAlgo,
pool: NewMergeWalkPool(globalMergeLookupTimeout),
poolSplunk: NewMergeWalkPool(globalMergeLookupTimeout),
mrfUploads: make(map[string]int),
}
......@@ -392,6 +394,11 @@ func (s *xlSets) StorageInfo(ctx context.Context, local bool) StorageInfo {
storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet)
}
if local {
// if local is true, we don't need to read format.json
return storageInfo
}
storageDisks, dErrs := initStorageDisksWithErrors(s.endpoints)
defer closeStorageDisks(storageDisks)
......@@ -882,8 +889,7 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
}
// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys.
// If partialQuorumOnly is set only objects that does not have full quorum is returned.
func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, partialQuorumOnly bool) (entries FilesInfo) {
func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, ndisks int) (entries FilesInfo) {
var i = 0
entriesInfos := make([]FileInfo, len(entryChs))
entriesValid := make([]bool, len(entryChs))
......@@ -894,27 +900,11 @@ func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, totalDrives int, partial
break
}
rquorum := fi.Quorum
// Quorum is zero for all directories.
if rquorum == 0 {
// Choose N/2 quoroum for directory entries.
rquorum = totalDrives / 2
if quorumCount < ndisks-1 {
// Skip entries which are not found on upto ndisks.
continue
}
if partialQuorumOnly {
// When healing is enabled, we should
// list only objects which need healing.
if quorumCount == totalDrives {
// Skip good entries.
continue
}
} else {
// Regular listing, we skip entries not in quorum.
if quorumCount < rquorum {
// Skip entries which do not have quorum.
continue
}
}
entries.Files = append(entries.Files, fi)
i++
if i == maxKeys {
......@@ -946,11 +936,19 @@ func isTruncated(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
return isTruncated
}
// Starts a walk channel across all disks and returns a slice.
func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh {
return s.startMergeWalksN(ctx, bucket, prefix, marker, recursive, endWalkCh, -1)
}
// Starts a walk channel across all disks and returns a slice of
// FileInfo channels which can be read from.
func (s *xlSets) startMergeWalksN(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}, ndisks int) []FileInfoCh {
var entryChs []FileInfoCh
var success int
for _, set := range s.sets {
for _, disk := range set.getDisks() {
// Reset for the next erasure set.
success = ndisks
for _, disk := range set.getLoadBalancedDisks() {
if disk == nil {
// Disk can be offline
continue
......@@ -963,6 +961,40 @@ func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker str
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
success--
if success == 0 {
break
}
}
}
return entryChs
}
// Starts a walk channel across all disks and returns a slice of
// FileInfo channels which can be read from.
func (s *xlSets) startSplunkMergeWalksN(ctx context.Context, bucket, prefix, marker string, endWalkCh <-chan struct{}, ndisks int) []FileInfoCh {
var entryChs []FileInfoCh
var success int
for _, set := range s.sets {
// Reset for the next erasure set.
success = ndisks
for _, disk := range set.getLoadBalancedDisks() {
if disk == nil {
// Disk can be offline
continue
}
entryCh, err := disk.WalkSplunk(bucket, prefix, marker, endWalkCh)
if err != nil {
// Disk walk returned error, ignore it.
continue
}
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
success--
if success == 0 {
break
}
}
}
return entryChs
......@@ -972,7 +1004,8 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
endWalkCh := make(chan struct{})
defer close(endWalkCh)
entryChs := s.startMergeWalks(context.Background(), bucket, prefix, "", true, endWalkCh)
const ndisks = 3
entryChs := s.startMergeWalksN(context.Background(), bucket, prefix, "", true, endWalkCh, ndisks)
var objInfos []ObjectInfo
var eof bool
......@@ -984,18 +1017,15 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
if len(objInfos) == maxKeys {
break
}
result, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
if !ok {
eof = true
break
}
rquorum := result.Quorum
// Quorum is zero for all directories.
if rquorum == 0 {
// Choose N/2 quorum for directory entries.
rquorum = s.drivesPerSet / 2
}
if quorumCount < rquorum {
if quorumCount < ndisks-1 {
// Skip entries which are not found on upto ndisks.
continue
}
......@@ -1075,7 +1105,7 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
// walked and merged at this layer. Resulting value through the merge process sends
// the data in lexically sorted order.
// If partialQuorumOnly is set only objects that does not have full quorum is returned.
func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, partialQuorumOnly bool) (loi ListObjectsInfo, err error) {
func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
if err = checkListObjsArgs(ctx, bucket, prefix, marker, s); err != nil {
return loi, err
}
......@@ -1118,13 +1148,16 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi
recursive = false
}
const ndisks = 3
entryChs, endWalkCh := s.pool.Release(listParams{bucket: bucket, recursive: recursive, marker: marker, prefix: prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = s.startMergeWalks(context.Background(), bucket, prefix, marker, recursive, endWalkCh)
// start file tree walk across at most randomly 3 disks in a set.
entryChs = s.startMergeWalksN(context.Background(), bucket, prefix, marker, recursive, endWalkCh, ndisks)
}
entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet, partialQuorumOnly)
entries := mergeEntriesCh(entryChs, maxKeys, ndisks)
if len(entries.Files) == 0 {
return loi, nil
}
......@@ -1152,7 +1185,7 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi
// walked and merged at this layer. Resulting value through the merge process sends
// the data in lexically sorted order.
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys, false)
return s.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
}
func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
......
......@@ -592,20 +592,16 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
endWalkCh := make(chan struct{})
defer close(endWalkCh)
const ndisks = 3
for _, zone := range z.zones {
zonesEntryChs = append(zonesEntryChs,
zone.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh))
zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, ndisks))
}
var objInfos []ObjectInfo
var eof bool
var prevPrefix string
var zoneDrivesPerSet []int
for _, zone := range z.zones {
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
}
var zonesEntriesInfos [][]FileInfo
var zonesEntriesValid [][]bool
for _, entryChs := range zonesEntryChs {
......@@ -617,18 +613,15 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
if len(objInfos) == maxKeys {
break
}
result, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
result, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid)
if !ok {
eof = true
break
}
rquorum := result.Quorum
// Quorum is zero for all directories.
if rquorum == 0 {
// Choose N/2 quorum for directory entries.
rquorum = zoneDrivesPerSet[zoneIndex] / 2
}
if quorumCount < rquorum {
if quorumCount < ndisks-1 {
// Skip entries which are not found on upto ndisks.
continue
}
......@@ -704,7 +697,59 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
return result, nil
}
func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, heal bool) (ListObjectsInfo, error) {
func (z *xlZones) listObjectsSplunk(ctx context.Context, bucket, prefix, marker string, maxKeys int) (loi ListObjectsInfo, err error) {
if strings.Contains(prefix, guidSplunk) {
logger.LogIf(ctx, NotImplemented{})