...
 
Commits (2)
......@@ -1372,6 +1372,135 @@ func (a adminAPIHandlers) ServerHardwareInfoHandler(w http.ResponseWriter, r *ht
}
}
// OBDInfoHandler - GET /minio/admin/v2/obdinfo
// ----------
// Get server on-board diagnostics
func (a adminAPIHandlers) OBDInfoHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "OBDInfo")
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.OBDInfoAdminAction)
if objectAPI == nil {
return
}
deadlinedCtx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Minute))
obdInfo := madmin.OBDInfo{}
setCommonHeaders(w)
w.Header().Set(xhttp.ContentType, string(mimeJSON))
w.WriteHeader(http.StatusOK)
partialWrite := func() {
jsonBytes, _ := json.Marshal(obdInfo)
_, err := w.Write(jsonBytes)
logger.LogIf(ctx, err)
}
finish := func() {
partialWrite()
w.(http.Flusher).Flush()
cancel()
}
errResp := func(err error) {
errorResponse := getAPIErrorResponse(ctx, toAdminAPIErr(ctx, err), r.URL.String(),
w.Header().Get(xhttp.AmzRequestID), globalDeploymentID)
encodedErrorResponse := encodeResponse(errorResponse)
obdInfo.Error = string(encodedErrorResponse)
finish()
}
nsLock := objectAPI.NewNSLock(deadlinedCtx, minioMetaBucket, "obd-in-progress")
if err := nsLock.GetLock(newDynamicTimeout(10*time.Minute, 600*time.Second)); err != nil { // returns a locked lock
errResp(err)
return
}
defer nsLock.Unlock()
vars := mux.Vars(r)
if cpu, ok := vars["syscpu"]; ok && cpu == "true" {
cpuInfo := getLocalCPUOBDInfo(deadlinedCtx)
obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, cpuInfo)
obdInfo.Sys.CPUInfo = append(obdInfo.Sys.CPUInfo, globalNotificationSys.CPUOBDInfo(deadlinedCtx)...)
partialWrite()
}
if diskHw, ok := vars["sysdiskhw"]; ok && diskHw == "true" {
diskHwInfo := getLocalDiskHwOBD(deadlinedCtx)
obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, diskHwInfo)
obdInfo.Sys.DiskHwInfo = append(obdInfo.Sys.DiskHwInfo, globalNotificationSys.DiskHwOBDInfo(deadlinedCtx)...)
partialWrite()
}
if osInfo, ok := vars["sysosinfo"]; ok && osInfo == "true" {
osInfo := getLocalOsInfoOBD(deadlinedCtx)
obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, osInfo)
obdInfo.Sys.OsInfo = append(obdInfo.Sys.OsInfo, globalNotificationSys.OsOBDInfo(deadlinedCtx)...)
partialWrite()
}
if mem, ok := vars["sysmem"]; ok && mem == "true" {
memInfo := getLocalMemOBD(deadlinedCtx)
obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, memInfo)
obdInfo.Sys.MemInfo = append(obdInfo.Sys.MemInfo, globalNotificationSys.MemOBDInfo(deadlinedCtx)...)
partialWrite()
}
if proc, ok := vars["sysprocess"]; ok && proc == "true" {
procInfo := getLocalProcOBD(deadlinedCtx)
obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, procInfo)
obdInfo.Sys.ProcInfo = append(obdInfo.Sys.ProcInfo, globalNotificationSys.ProcOBDInfo(deadlinedCtx)...)
partialWrite()
}
if config, ok := vars["minioconfig"]; ok && config == "true" {
cfg, err := readServerConfig(ctx, objectAPI)
logger.LogIf(ctx, err)
obdInfo.Minio.Config = cfg
partialWrite()
}
if drive, ok := vars["perfdrive"]; ok && drive == "true" {
// Get drive obd details from local server's drive(s)
driveOBDSerial := getLocalDrivesOBD(deadlinedCtx, false, globalEndpoints, r)
driveOBDParallel := getLocalDrivesOBD(deadlinedCtx, true, globalEndpoints, r)
errStr := ""
if driveOBDSerial.Error != "" {
errStr = "serial: " + driveOBDSerial.Error
}
if driveOBDParallel.Error != "" {
errStr = errStr + " parallel: " + driveOBDParallel.Error
}
driveOBD := madmin.ServerDrivesOBDInfo{
Addr: driveOBDSerial.Addr,
Serial: driveOBDSerial.Serial,
Parallel: driveOBDParallel.Parallel,
Error: errStr,
}
obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBD)
// Notify all other MinIO peers to report drive obd numbers
driveOBDs := globalNotificationSys.DriveOBDInfo(deadlinedCtx)
obdInfo.Perf.DriveInfo = append(obdInfo.Perf.DriveInfo, driveOBDs...)
partialWrite()
}
if net, ok := vars["perfnet"]; ok && net == "true" && globalIsDistXL {
obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.NetOBDInfo(deadlinedCtx))
obdInfo.Perf.Net = append(obdInfo.Perf.Net, globalNotificationSys.DispatchNetOBDInfo(deadlinedCtx)...)
partialWrite()
}
finish()
}
// ServerInfoHandler - GET /minio/admin/v2/info
// ----------
// Get server information
......
......@@ -166,6 +166,12 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
//
adminRouter.Methods(http.MethodGet).Path(adminAPIVersionPrefix + "/kms/key/status").HandlerFunc(httpTraceAll(adminAPI.KMSKeyStatusHandler))
if !globalIsGateway {
// -- OBD API --
adminRouter.Methods(http.MethodGet).Path(adminAPIVersionPrefix+"/obdinfo").HandlerFunc(httpTraceHdrs(adminAPI.OBDInfoHandler)).Queries("perfdrive", "{perfdrive:true|false}", "perfnet", "{perfnet:true|false}", "minioinfo", "{minioinfo:true|false}", "minioconfig", "{minioconfig:true|false}", "syscpu", "{syscpu:true|false}", "sysdiskhw", "{sysdiskhw:true|false}", "sysosinfo", "{sysosinfo:true|false}", "sysmem", "{sysmem:true|false}", "sysprocess", "{sysprocess:true|false}")
}
// If none of the routes match add default error handler routes
adminRouter.NotFoundHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
adminRouter.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(errorResponseHandler))
......
......@@ -25,6 +25,7 @@ import (
"net"
"net/url"
"path"
"sort"
"strings"
"sync"
"time"
......@@ -38,6 +39,7 @@ import (
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio-go/v6/pkg/set"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net"
......@@ -890,6 +892,290 @@ func (sys *NotificationSys) CollectNetPerfInfo(size int64) map[string][]ServerNe
return reply
}
// NetOBDInfo - Net OBD information
func (sys *NotificationSys) NetOBDInfo(ctx context.Context) madmin.ServerNetOBDInfo {
var sortedGlobalEndpoints []string
/*
Ensure that only untraversed links are visited by this server
i.e. if netOBD tests have been performed between a -> b, then do
not run it between b -> a
The graph of tests looks like this
a b c d
a | o | x | x | x |
b | o | o | x | x |
c | o | o | o | x |
d | o | o | o | o |
'x's should be tested, and 'o's should be skipped
*/
stripPath := func(hostPath string) string {
return strings.Split(hostPath, slashSeparator)[0]
}
hostSet := set.NewStringSet()
for _, ez := range globalEndpoints {
for _, e := range ez.Endpoints {
host := stripPath(e.Host)
if hostSet.Contains(host) {
sortedGlobalEndpoints = append(sortedGlobalEndpoints, host)
hostSet.Add(host)
}
}
}
sort.Strings(sortedGlobalEndpoints)
var remoteTargets []*peerRESTClient
search := func(host string) *peerRESTClient {
for index, client := range sys.peerClients {
if client == nil {
continue
}
if sys.peerClients[index].host.String() == host {
return client
}
}
return nil
}
for i := 0; i < len(sortedGlobalEndpoints); i++ {
if sortedGlobalEndpoints[i] != GetLocalPeer(globalEndpoints) {
continue
}
for j := 0; j < len(sortedGlobalEndpoints); j++ {
if j > i {
remoteTarget := search(sortedGlobalEndpoints[j])
if remoteTarget != nil {
remoteTargets = append(remoteTargets, remoteTarget)
}
}
}
}
netOBDs := make([]madmin.NetOBDInfo, len(remoteTargets))
for index, client := range remoteTargets {
if client == nil {
continue
}
var err error
netOBDs[index], err = client.NetOBDInfo(ctx)
addr := client.host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
netOBDs[index].Addr = addr
if err != nil {
netOBDs[index].Error = err.Error()
}
}
return madmin.ServerNetOBDInfo{
Net: netOBDs,
Addr: GetLocalPeer(globalEndpoints),
}
}
// DispatchNetOBDInfo - Net OBD information from other nodes
func (sys *NotificationSys) DispatchNetOBDInfo(ctx context.Context) []madmin.ServerNetOBDInfo {
serverNetOBDs := []madmin.ServerNetOBDInfo{}
for index, client := range sys.peerClients {
if client == nil {
continue
}
serverNetOBD, err := sys.peerClients[index].DispatchNetOBDInfo(ctx)
if err != nil {
serverNetOBD.Addr = client.host.String()
serverNetOBD.Error = err.Error()
}
serverNetOBDs = append(serverNetOBDs, serverNetOBD)
}
return serverNetOBDs
}
// DriveOBDInfo - Drive OBD information
func (sys *NotificationSys) DriveOBDInfo(ctx context.Context) []madmin.ServerDrivesOBDInfo {
reply := make([]madmin.ServerDrivesOBDInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].DriveOBDInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// CPUOBDInfo - CPU OBD information
func (sys *NotificationSys) CPUOBDInfo(ctx context.Context) []madmin.ServerCPUOBDInfo {
reply := make([]madmin.ServerCPUOBDInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].CPUOBDInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// DiskHwOBDInfo - Disk HW OBD information
func (sys *NotificationSys) DiskHwOBDInfo(ctx context.Context) []madmin.ServerDiskHwOBDInfo {
reply := make([]madmin.ServerDiskHwOBDInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].DiskHwOBDInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// OsOBDInfo - Os OBD information
func (sys *NotificationSys) OsOBDInfo(ctx context.Context) []madmin.ServerOsOBDInfo {
reply := make([]madmin.ServerOsOBDInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].OsOBDInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// MemOBDInfo - Mem OBD information
func (sys *NotificationSys) MemOBDInfo(ctx context.Context) []madmin.ServerMemOBDInfo {
reply := make([]madmin.ServerMemOBDInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].MemOBDInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// ProcOBDInfo - Process OBD information
func (sys *NotificationSys) ProcOBDInfo(ctx context.Context) []madmin.ServerProcOBDInfo {
reply := make([]madmin.ServerProcOBDInfo, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].ProcOBDInfo(ctx)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
addr := sys.peerClients[index].host.String()
reqInfo := (&logger.ReqInfo{}).AppendTags("remotePeer", addr)
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
reply[index].Addr = addr
reply[index].Error = err.Error()
}
}
return reply
}
// DrivePerfInfo - Drive speed (read and write) information
func (sys *NotificationSys) DrivePerfInfo(size int64) []madmin.ServerDrivesPerfInfo {
reply := make([]madmin.ServerDrivesPerfInfo, len(sys.peerClients))
......
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package cmd
import (
"context"
"net/http"
"os"
"sync"
"syscall"
"github.com/minio/minio/pkg/disk"
"github.com/minio/minio/pkg/madmin"
cpuhw "github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/host"
memhw "github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/process"
)
func getLocalCPUOBDInfo(ctx context.Context) madmin.ServerCPUOBDInfo {
addr := ""
if globalIsDistXL {
addr = GetLocalPeer(globalEndpoints)
} else {
addr = "minio"
}
info, err := cpuhw.InfoWithContext(ctx)
if err != nil {
return madmin.ServerCPUOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
time, err := cpuhw.TimesWithContext(ctx, false)
if err != nil {
return madmin.ServerCPUOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
return madmin.ServerCPUOBDInfo{
Addr: addr,
CPUStat: info,
TimeStat: time,
Error: "",
}
}
func getLocalDrivesOBD(ctx context.Context, parallel bool, endpointZones EndpointZones, r *http.Request) madmin.ServerDrivesOBDInfo {
var drivesOBDInfo []madmin.DriveOBDInfo
wg := sync.WaitGroup{}
for _, ep := range endpointZones {
for i, endpoint := range ep.Endpoints {
// Only proceed for local endpoints
if endpoint.IsLocal {
if _, err := os.Stat(endpoint.Path); err != nil {
// Since this drive is not available, add relevant details and proceed
drivesOBDInfo = append(drivesOBDInfo, madmin.DriveOBDInfo{
Path: endpoint.Path,
Error: err.Error(),
})
continue
}
measure := func(index int) {
latency, throughput, err := disk.GetOBDInfo(ctx, pathJoin(endpoint.Path, minioMetaTmpBucket, mustGetUUID()))
driveOBDInfo := madmin.DriveOBDInfo{
Path: endpoint.Path,
Latency: latency,
Throughput: throughput,
}
if err != nil {
driveOBDInfo.Error = err.Error()
}
drivesOBDInfo = append(drivesOBDInfo, driveOBDInfo)
wg.Done()
}
wg.Add(1)
if parallel {
go measure(i)
} else {
measure(i)
}
}
}
}
wg.Wait()
addr := ""
if globalIsDistXL {
addr = GetLocalPeer(endpointZones)
} else {
addr = "minio"
}
if parallel {
return madmin.ServerDrivesOBDInfo{
Addr: addr,
Parallel: drivesOBDInfo,
}
}
return madmin.ServerDrivesOBDInfo{
Addr: addr,
Serial: drivesOBDInfo,
}
}
func getLocalMemOBD(ctx context.Context) madmin.ServerMemOBDInfo {
addr := ""
if globalIsDistXL {
addr = GetLocalPeer(globalEndpoints)
} else {
addr = "minio"
}
swap, err := memhw.SwapMemoryWithContext(ctx)
if err != nil {
return madmin.ServerMemOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
vm, err := memhw.VirtualMemoryWithContext(ctx)
if err != nil {
return madmin.ServerMemOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
return madmin.ServerMemOBDInfo{
Addr: addr,
SwapMem: swap,
VirtualMem: vm,
Error: "",
}
}
func getLocalProcOBD(ctx context.Context) madmin.ServerProcOBDInfo {
addr := ""
if globalIsDistXL {
addr = GetLocalPeer(globalEndpoints)
} else {
addr = "minio"
}
errProcInfo := func(err error) madmin.ServerProcOBDInfo {
return madmin.ServerProcOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
selfPid := int32(syscall.Getpid())
self, err := process.NewProcess(selfPid)
if err != nil {
return errProcInfo(err)
}
processes := []*process.Process{self}
if err != nil {
return errProcInfo(err)
}
sysProcs := []madmin.SysOBDProcess{}
for _, proc := range processes {
sysProc := madmin.SysOBDProcess{}
sysProc.Pid = proc.Pid
bg, err := proc.BackgroundWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Background = bg
cpuPercent, err := proc.CPUPercentWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.CPUPercent = cpuPercent
children, _ := proc.ChildrenWithContext(ctx)
for _, c := range children {
sysProc.Children = append(sysProc.Children, c.Pid)
}
cmdLine, err := proc.CmdlineWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.CmdLine = cmdLine
conns, err := proc.ConnectionsWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Connections = conns
createTime, err := proc.CreateTimeWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.CreateTime = createTime
cwd, err := proc.CwdWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Cwd = cwd
exe, err := proc.ExeWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Exe = exe
gids, err := proc.GidsWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Gids = gids
ioCounters, err := proc.IOCountersWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.IOCounters = ioCounters
isRunning, err := proc.IsRunningWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.IsRunning = isRunning
memInfo, err := proc.MemoryInfoWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.MemInfo = memInfo
memMaps, err := proc.MemoryMapsWithContext(ctx, true)
if err != nil {
return errProcInfo(err)
}
sysProc.MemMaps = memMaps
memPercent, err := proc.MemoryPercentWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.MemPercent = memPercent
name, err := proc.NameWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Name = name
netIOCounters, err := proc.NetIOCountersWithContext(ctx, false)
if err != nil {
return errProcInfo(err)
}
sysProc.NetIOCounters = netIOCounters
nice, err := proc.NiceWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Nice = nice
numCtxSwitches, err := proc.NumCtxSwitchesWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.NumCtxSwitches = numCtxSwitches
numFds, err := proc.NumFDsWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.NumFds = numFds
numThreads, err := proc.NumThreadsWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.NumThreads = numThreads
openFiles, err := proc.OpenFilesWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.OpenFiles = openFiles
pageFaults, err := proc.PageFaultsWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.PageFaults = pageFaults
parent, err := proc.ParentWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Parent = parent.Pid
ppid, err := proc.PpidWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Ppid = ppid
rlimit, err := proc.RlimitWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Rlimit = rlimit
status, err := proc.StatusWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Status = status
tgid, err := proc.Tgid()
if err != nil {
return errProcInfo(err)
}
sysProc.Tgid = tgid
threads, err := proc.ThreadsWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Threads = threads
times, err := proc.TimesWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Times = times
uids, err := proc.UidsWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Uids = uids
username, err := proc.UsernameWithContext(ctx)
if err != nil {
return errProcInfo(err)
}
sysProc.Username = username
sysProcs = append(sysProcs, sysProc)
}
return madmin.ServerProcOBDInfo{
Addr: addr,
Processes: sysProcs,
Error: "",
}
}
func getLocalOsInfoOBD(ctx context.Context) madmin.ServerOsOBDInfo {
addr := ""
if globalIsDistXL {
addr = GetLocalPeer(globalEndpoints)
} else {
addr = "minio"
}
info, err := host.InfoWithContext(ctx)
if err != nil {
return madmin.ServerOsOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
sensors, err := host.SensorsTemperaturesWithContext(ctx)
if err != nil {
return madmin.ServerOsOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
users, err := host.UsersWithContext(ctx)
if err != nil {
return madmin.ServerOsOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
return madmin.ServerOsOBDInfo{
Addr: addr,
Info: info,
Sensors: sensors,
Users: users,
Error: "",
}
}
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package cmd
import (
"context"
"github.com/minio/minio/pkg/madmin"
)
func getLocalDiskHwOBD(ctx context.Context) madmin.ServerDiskHwOBDInfo {
addr := ""
if globalIsDistXL {
addr = GetLocalPeer(globalEndpoints)
} else {
addr = "minio"
}
return madmin.ServerDiskHwOBDInfo{
Addr: addr,
Error: "unsupported platform",
}
}
// +build !freebsd
/*
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package cmd
import (
"context"
"strings"
"github.com/minio/minio/pkg/madmin"
diskhw "github.com/shirou/gopsutil/disk"
)
func getLocalDiskHwOBD(ctx context.Context) madmin.ServerDiskHwOBDInfo {
addr := ""
if globalIsDistXL {
addr = GetLocalPeer(globalEndpoints)
} else {
addr = "minio"
}
partitions, err := diskhw.PartitionsWithContext(ctx, true)
if err != nil {
return madmin.ServerDiskHwOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
drives := []string{}
paths := []string{}
for _, partition := range partitions {
device := partition.Device
path := partition.Mountpoint
if strings.Index(device, "/dev/") == 0 {
if strings.Contains(device, "loop") {
continue
}
drives = append(drives, device)
paths = append(paths, path)
}
}
ioCounters, err := diskhw.IOCountersWithContext(ctx, drives...)
if err != nil {
return madmin.ServerDiskHwOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
usages := []*diskhw.UsageStat{}
for _, path := range paths {
usage, err := diskhw.UsageWithContext(ctx, path)
if err != nil {
return madmin.ServerDiskHwOBDInfo{
Addr: addr,
Error: err.Error(),
}
}
usages = append(usages, usage)
}
return madmin.ServerDiskHwOBDInfo{
Addr: addr,
Usage: usages,
Partitions: partitions,
Counters: ioCounters,
Error: "",
}
}
......@@ -22,9 +22,12 @@ import (
"crypto/tls"
"encoding/gob"
"io"
"io/ioutil"
"math"
"math/rand"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
......@@ -41,7 +44,13 @@ import (
trace "github.com/minio/minio/pkg/trace"
)
// client to talk to peer NEndpoints.
const (
kiB int64 = 1 << 10
miB int64 = kiB << 10
giB int64 = miB << 10
)
// client to talk to peer Nodes.
type peerRESTClient struct {
host *xnet.Host
restClient *rest.Client
......@@ -190,6 +199,321 @@ func (client *peerRESTClient) NetworkInfo() (info madmin.ServerNetworkHardwareIn
return info, err
}
type networkOverloadedErr struct{}
var networkOverloaded networkOverloadedErr
func (n networkOverloadedErr) Error() string {
return "network overloaded"
}
type progressReader struct {
r io.Reader
progressChan chan int64
}
func (p *progressReader) Read(b []byte) (int, error) {
n, err := p.r.Read(b)
if err != nil && err != io.EOF {
return n, err
}
p.progressChan <- int64(n)
return n, err
}
func (client *peerRESTClient) doNetOBDTest(ctx context.Context, dataSize int64, threadCount uint) (info madmin.NetOBDInfo, err error) {
latencies := []float64{}
throughputs := []float64{}
buf := make([]byte, dataSize)
buflimiter := make(chan struct{}, threadCount)
errChan := make(chan error, threadCount)
totalTransferred := int64(0)
transferChan := make(chan int64, threadCount)
go func() {
for v := range transferChan {
atomic.AddInt64(&totalTransferred, v)
}
}()
// ensure enough samples to obtain normal distribution
maxSamples := int(10 * threadCount)
innerCtx, cancel := context.WithCancel(ctx)
slowSamples := int32(0)
maxSlowSamples := int32(maxSamples / 20)
slowSample := func() {
if slowSamples > maxSlowSamples { // 5% of total
return
}
if atomic.AddInt32(&slowSamples, 1) >= maxSlowSamples {
errChan <- networkOverloaded
cancel()
}
}
wg := sync.WaitGroup{}
finish := func() {
<-buflimiter
wg.Done()
}
for i := 0; i < maxSamples; i++ {
select {
case <-ctx.Done():
return info, ctx.Err()
case err = <-errChan:
case buflimiter <- struct{}{}:
wg.Add(1)
if innerCtx.Err() != nil {
finish()
continue
}
go func(i int) {
bufReader := bytes.NewReader(buf)
bufReadCloser := ioutil.NopCloser(&progressReader{
r: bufReader,
progressChan: transferChan,
})
start := time.Now()
before := atomic.LoadInt64(&totalTransferred)
ctx, cancel := context.WithTimeout(innerCtx, 10*time.Second)
defer cancel()
respBody, err := client.callWithContext(ctx, peerRESTMethodNetOBDInfo, nil, bufReadCloser, dataSize)
if err != nil {
if netErr, ok := err.(*rest.NetworkError); ok {
if urlErr, ok := netErr.Err.(*url.Error); ok {
if urlErr.Err.Error() == context.DeadlineExceeded.Error() {
slowSample()
finish()
return
}
}
}
errChan <- err
finish()
return
}
http.DrainBody(respBody)
after := atomic.LoadInt64(&totalTransferred)
finish()
end := time.Now()
latency := float64(end.Sub(start).Seconds())
if latency > maxLatencyForSizeThreads(dataSize, threadCount) {
slowSample()
}
/* Throughput = (total data transferred across all threads / time taken) */
throughput := float64(float64((after - before)) / latency)
latencies = append(latencies, latency)
throughputs = append(throughputs, throughput)
}(i)
}
}
wg.Wait()
if err != nil {
return info, err
}
latency, throughput, err := xnet.ComputeOBDStats(latencies, throughputs)
info = madmin.NetOBDInfo{
Latency: latency,
Throughput: throughput,
}
return info, err
}
func maxLatencyForSizeThreads(size int64, threadCount uint) float64 {
Gbit100 := 12.5 * float64(giB)
Gbit40 := 5.00 * float64(giB)
Gbit25 := 3.25 * float64(giB)
Gbit10 := 1.25 * float64(giB)
// Gbit1 := 0.25 * float64(giB)
// Given the current defaults, each combination of size/thread
// is supposed to fully saturate the intended pipe when all threads are active
// i.e. if the test is performed in a perfectly controlled environment, i.e. without
// CPU scheduling latencies and/or network jitters, then all threads working
// simultaneously should result in each of them completing in 1s
//
// In reality, I've assumed a normal distribution of latency with expected mean of 1s and min of 0s
// Then, 95% of threads should complete within 2 seconds (2 std. deviations from the mean). The 2s comes
// from fitting the normal curve such that the mean is 1.
//
// i.e. we expect that no more than 5% of threads to take longer than 2s to push the data.
//
// throughput | max latency
// 100 Gbit | 2s
// 40 Gbit | 2s
// 25 Gbit | 2s
// 10 Gbit | 2s
// 1 Gbit | inf
throughput := float64(int64(size) * int64(threadCount))
if throughput >= Gbit100 {
return 2.0
} else if throughput >= Gbit40 {
return 2.0
} else if throughput >= Gbit25 {
return 2.0
} else if throughput >= Gbit10 {
return 2.0
}
return math.MaxFloat64
}
// NetOBDInfo - fetch Net OBD information for a remote node.
func (client *peerRESTClient) NetOBDInfo(ctx context.Context) (info madmin.NetOBDInfo, err error) {
// 100 Gbit -> 256 MiB * 50 threads
// 40 Gbit -> 256 MiB * 20 threads
// 25 Gbit -> 128 MiB * 25 threads
// 10 Gbit -> 128 MiB * 10 threads
// 1 Gbit -> 64 MiB * 2 threads
type step struct {
size int64
threads uint
}
steps := []step{
{ // 100 Gbit
size: 256 * miB,
threads: 50,
},
{ // 40 Gbit
size: 256 * miB,
threads: 20,
},
{ // 25 Gbit
size: 128 * miB,
threads: 25,
},
{ // 10 Gbit
size: 128 * miB,
threads: 10,
},
{ // 1 Gbit
size: 64 * miB,
threads: 2,
},
}
for i := range steps {
size := steps[i].size
threads := steps[i].threads
if info, err = client.doNetOBDTest(ctx, size, threads); err != nil {
if err == networkOverloaded {
continue
}
if netErr, ok := err.(*rest.NetworkError); ok {
if urlErr, ok := netErr.Err.(*url.Error); ok {
if urlErr.Err.Error() == context.Canceled.Error() {
continue
}
if urlErr.Err.Error() == context.DeadlineExceeded.Error() {
continue
}
}
}
}
return info, err
}
return info, err
}
// DispatchNetOBDInfo - dispatch other nodes to run Net OBD.
func (client *peerRESTClient) DispatchNetOBDInfo(ctx context.Context) (info madmin.ServerNetOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodDispatchNetOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return
}
// DriveOBDInfo - fetch Drive OBD information for a remote node.
func (client *peerRESTClient) DriveOBDInfo(ctx context.Context) (info madmin.ServerDrivesOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodDriveOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// CPUOBDInfo - fetch CPU OBD information for a remote node.
func (client *peerRESTClient) CPUOBDInfo(ctx context.Context) (info madmin.ServerCPUOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodCPUOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// DiskHwOBDInfo - fetch Disk HW OBD information for a remote node.
func (client *peerRESTClient) DiskHwOBDInfo(ctx context.Context) (info madmin.ServerDiskHwOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodDiskHwOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// OsOBDInfo - fetch OsInfo OBD information for a remote node.
func (client *peerRESTClient) OsOBDInfo(ctx context.Context) (info madmin.ServerOsOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodOsInfoOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// MemOBDInfo - fetch MemInfo OBD information for a remote node.
func (client *peerRESTClient) MemOBDInfo(ctx context.Context) (info madmin.ServerMemOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodMemOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// ProcOBDInfo - fetch ProcInfo OBD information for a remote node.
func (client *peerRESTClient) ProcOBDInfo(ctx context.Context) (info madmin.ServerProcOBDInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodProcOBDInfo, nil, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// DrivePerfInfo - fetch Drive performance information for a remote node.
func (client *peerRESTClient) DrivePerfInfo(size int64) (info madmin.ServerDrivesPerfInfo, err error) {
params := make(url.Values)
......
......@@ -17,7 +17,7 @@
package cmd
const (
peerRESTVersion = "v6"
peerRESTVersion = "v7"
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
......@@ -30,6 +30,14 @@ const (
peerRESTMethodCPULoadInfo = "/cpuloadinfo"
peerRESTMethodMemUsageInfo = "/memusageinfo"
peerRESTMethodDrivePerfInfo = "/driveperfinfo"
peerRESTMethodDriveOBDInfo = "/driveobdinfo"
peerRESTMethodNetOBDInfo = "/netobdinfo"
peerRESTMethodCPUOBDInfo = "/cpuobdinfo"
peerRESTMethodDiskHwOBDInfo = "/diskhwobdinfo"
peerRESTMethodOsInfoOBDInfo = "/osinfoobdinfo"
peerRESTMethodMemOBDInfo = "/memobdinfo"
peerRESTMethodProcOBDInfo = "/procobdinfo"
peerRESTMethodDispatchNetOBDInfo = "/dispatchnetobdinfo"
peerRESTMethodDeleteBucket = "/deletebucket"
peerRESTMethodServerUpdate = "/serverupdate"
peerRESTMethodSignalService = "/signalservice"
......
......@@ -36,6 +36,7 @@ import (
objectlock "github.com/minio/minio/pkg/bucket/object/lock"
"github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/event"
"github.com/minio/minio/pkg/madmin"
trace "github.com/minio/minio/pkg/trace"
)
......@@ -509,6 +510,157 @@ func (s *peerRESTServer) ServerInfoHandler(w http.ResponseWriter, r *http.Reques
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
func (s *peerRESTServer) NetOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "NetOBDInfo")
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
// Use this trailer to send additional headers after sending body
w.Header().Set("Trailer", "FinalStatus")
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
n, err := io.Copy(ioutil.Discard, r.Body)
if err == io.ErrUnexpectedEOF {
w.Header().Set("FinalStatus", err.Error())
return
}
if err != nil && err != io.EOF {
logger.LogIf(ctx, err)
w.Header().Set("FinalStatus", err.Error())
return
}
if n != r.ContentLength {
err := fmt.Errorf("OBD: short read: expected %d found %d", r.ContentLength, n)
logger.LogIf(ctx, err)
w.Header().Set("FinalStatus", err.Error())
return
}
w.Header().Set("FinalStatus", "Success")
w.(http.Flusher).Flush()
}
func (s *peerRESTServer) DispatchNetOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx := newContext(r, w, "DispatchNetOBDInfo")
info := globalNotificationSys.NetOBDInfo(ctx)
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
w.(http.Flusher).Flush()
}
// DriveOBDInfoHandler - returns Drive OBD info.
func (s *peerRESTServer) DriveOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(newContext(r, w, "DriveOBDInfo"))
defer cancel()
infoSerial := getLocalDrivesOBD(ctx, false, globalEndpoints, r)
infoParallel := getLocalDrivesOBD(ctx, true, globalEndpoints, r)
errStr := ""
if infoSerial.Error != "" {
errStr = "serial: " + infoSerial.Error
}
if infoParallel.Error != "" {
errStr = errStr + " parallel: " + infoParallel.Error
}
info := madmin.ServerDrivesOBDInfo{
Addr: infoSerial.Addr,
Serial: infoSerial.Serial,
Parallel: infoParallel.Parallel,
Error: errStr,
}
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// CPUOBDInfoHandler - returns CPU OBD info.
func (s *peerRESTServer) CPUOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(newContext(r, w, "CpuOBDInfo"))
defer cancel()
info := getLocalCPUOBDInfo(ctx)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// DiskHwOBDInfoHandler - returns Disk HW OBD info.
func (s *peerRESTServer) DiskHwOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(newContext(r, w, "DiskHwOBDInfo"))
defer cancel()
info := getLocalDiskHwOBD(ctx)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// OsOBDInfoHandler - returns Os OBD info.
func (s *peerRESTServer) OsOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(newContext(r, w, "OsOBDInfo"))
defer cancel()
info := getLocalOsInfoOBD(ctx)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// ProcOBDInfoHandler - returns Proc OBD info.
func (s *peerRESTServer) ProcOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(newContext(r, w, "ProcOBDInfo"))
defer cancel()
info := getLocalProcOBD(ctx)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// MemOBDInfoHandler - returns Mem OBD info.
func (s *peerRESTServer) MemOBDInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
ctx, cancel := context.WithCancel(newContext(r, w, "MemOBDInfo"))
defer cancel()
info := getLocalMemOBD(ctx)
defer w.(http.Flusher).Flush()
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// DrivePerfInfoHandler - returns Drive Performance info.
func (s *peerRESTServer) DrivePerfInfoHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
......@@ -1184,6 +1336,14 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCPULoadInfo).HandlerFunc(httpTraceHdrs(server.CPULoadInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodMemUsageInfo).HandlerFunc(httpTraceHdrs(server.MemUsageInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodProcOBDInfo).HandlerFunc(httpTraceHdrs(server.ProcOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodMemOBDInfo).HandlerFunc(httpTraceHdrs(server.MemOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodOsInfoOBDInfo).HandlerFunc(httpTraceHdrs(server.OsOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDiskHwOBDInfo).HandlerFunc(httpTraceHdrs(server.DiskHwOBDInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodCPUOBDInfo).HandlerFunc(httpTraceHdrs(server.CPUOBDInfoHandler))