Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runner/cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func start(tempDir string, homeDir string, httpPort int, sshPort int, logLevel i
log.DefaultEntry.Logger.SetOutput(io.MultiWriter(os.Stdout, defaultLogFile))
log.DefaultEntry.Logger.SetLevel(logrus.Level(logLevel))

server, err := api.NewServer(tempDir, homeDir, fmt.Sprintf(":%d", httpPort), sshPort, version)
server, err := api.NewServer(context.TODO(), tempDir, homeDir, fmt.Sprintf(":%d", httpPort), sshPort, version)
if err != nil {
return fmt.Errorf("create server: %w", err)
}
Expand Down
107 changes: 107 additions & 0 deletions runner/internal/metrics/cgroups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package metrics

import (
"bufio"
"context"
"errors"
"fmt"
"os"
"strings"

"github.com/dstackai/dstack/runner/internal/log"
)

func getProcessCgroupMountPoint(ctx context.Context, ProcPidMountsPath string) (string, error) {
// See proc_pid_mounts(5) for the ProcPidMountsPath file description
file, err := os.Open(ProcPidMountsPath)
if err != nil {
return "", fmt.Errorf("open mounts file: %w", err)
}
defer func() {
_ = file.Close()
}()

mountPoint := ""
hasCgroupV1 := false

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
// See fstab(5) for the format description
fields := strings.Fields(line)
if len(fields) != 6 {
log.Warning(ctx, "Unexpected number of fields in mounts file", "num", len(fields), "line", line)
continue
}
fsType := fields[2]
if fsType == "cgroup2" {
mountPoint = fields[1]
break
}
if fsType == "cgroup" {
hasCgroupV1 = true
}
}
if err := scanner.Err(); err != nil {
log.Warning(ctx, "Error while scanning mounts file", "err", err)
}

if mountPoint != "" {
return mountPoint, nil
}

if hasCgroupV1 {
return "", errors.New("only cgroup v1 mounts found")
}

return "", errors.New("no cgroup mounts found")
}

func getProcessCgroupPathname(ctx context.Context, procPidCgroupPath string) (string, error) {
// See cgroups(7) for the procPidCgroupPath file description
file, err := os.Open(procPidCgroupPath)
if err != nil {
return "", fmt.Errorf("open cgroup file: %w", err)
}
defer func() {
_ = file.Close()
}()

pathname := ""
hasCgroupV1 := false

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
// See cgroups(7) for the format description
fields := strings.Split(line, ":")
if len(fields) != 3 {
log.Warning(ctx, "Unexpected number of fields in cgroup file", "num", len(fields), "line", line)
continue
}
if fields[0] != "0" {
hasCgroupV1 = true
continue
}
if fields[1] != "" {
// Must be empty for v2
log.Warning(ctx, "Unexpected v2 entry in cgroup file", "num", "line", line)
continue
}
pathname = fields[2]
break
}
if err := scanner.Err(); err != nil {
log.Warning(ctx, "Error while scanning cgroup file", "err", err)
}

if pathname != "" {
return pathname, nil
}

if hasCgroupV1 {
return "", errors.New("only cgroup v1 pathnames found")
}

return "", errors.New("no cgroup pathname found")
}
87 changes: 87 additions & 0 deletions runner/internal/metrics/cgroups_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package metrics

import (
"fmt"
"os"
"path"
"testing"

"github.com/stretchr/testify/require"
)

const (
cgroup2MountLine = "cgroup2 /sys/fs/cgroup cgroup2 rw,nosuid,nodev,noexec,relatime,nsdelegate,memory_recursiveprot 0 0"
cgroupMountLine = "cgroup /sys/fs/cgroup/cpu,cpuacct cgroup rw,nosuid,nodev,noexec,relatime,cpu,cpuacct 0 0"
rootMountLine = "/dev/nvme0n1p5 / ext4 rw,relatime 0 0"
)

func TestGetProcessCgroupMountPoint_ErrorNoCgroupMounts(t *testing.T) {
procPidMountsPath := createProcFile(t, "mounts", rootMountLine, "malformed line")

mountPoint, err := getProcessCgroupMountPoint(t.Context(), procPidMountsPath)

require.ErrorContains(t, err, "no cgroup mounts found")
require.Equal(t, "", mountPoint)
}

func TestGetProcessCgroupMountPoint_ErrorOnlyCgroupV1Mounts(t *testing.T) {
procPidMountsPath := createProcFile(t, "mounts", rootMountLine, cgroupMountLine)

mountPoint, err := getProcessCgroupMountPoint(t.Context(), procPidMountsPath)

require.ErrorContains(t, err, "only cgroup v1 mounts found")
require.Equal(t, "", mountPoint)
}

func TestGetProcessCgroupMountPoint_OK(t *testing.T) {
procPidMountsPath := createProcFile(t, "mounts", rootMountLine, cgroupMountLine, cgroup2MountLine)

mountPoint, err := getProcessCgroupMountPoint(t.Context(), procPidMountsPath)

require.NoError(t, err)
require.Equal(t, "/sys/fs/cgroup", mountPoint)
}

func TestGetProcessCgroupPathname_ErrorNoCgroup(t *testing.T) {
procPidCgroupPath := createProcFile(t, "cgroup", "malformed entry")

mountPoint, err := getProcessCgroupPathname(t.Context(), procPidCgroupPath)

require.ErrorContains(t, err, "no cgroup pathname found")
require.Equal(t, "", mountPoint)
}

func TestGetProcessCgroupPathname_ErrorOnlyCgroupV1(t *testing.T) {
procPidCgroupPath := createProcFile(t, "cgroup", "7:cpu,cpuacct:/user.slice")

pathname, err := getProcessCgroupPathname(t.Context(), procPidCgroupPath)

require.ErrorContains(t, err, "only cgroup v1 pathnames found")
require.Equal(t, "", pathname)
}

func TestGetProcessCgroupPathname_OK(t *testing.T) {
procPidCgroupPath := createProcFile(t, "cgroup", "7:cpu,cpuacct:/user.slice", "0::/user.slice/user-1000.slice/session-1.scope")

mountPoint, err := getProcessCgroupPathname(t.Context(), procPidCgroupPath)

require.NoError(t, err)
require.Equal(t, "/user.slice/user-1000.slice/session-1.scope", mountPoint)
}

func createProcFile(t *testing.T, name string, lines ...string) string {
t.Helper()
tmpDir := t.TempDir()
pth := path.Join(tmpDir, name)
file, err := os.OpenFile(pth, os.O_WRONLY|os.O_CREATE, 0o600)
require.NoError(t, err)
defer func() {
err := file.Close()
require.NoError(t, err)
}()
for _, line := range lines {
_, err := fmt.Fprintln(file, line)
require.NoError(t, err)
}
return pth
}
82 changes: 27 additions & 55 deletions runner/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"os"
"os/exec"
"path"
"strconv"
"strings"
"time"
Expand All @@ -17,33 +18,42 @@ import (
)

type MetricsCollector struct {
cgroupVersion int
gpuVendor common.GpuVendor
cgroupMountPoint string
gpuVendor common.GpuVendor
}

func NewMetricsCollector() (*MetricsCollector, error) {
cgroupVersion, err := getCgroupVersion()
func NewMetricsCollector(ctx context.Context) (*MetricsCollector, error) {
// It's unlikely that cgroup mount point will change during container lifetime,
// so we detect it only once and reuse.
cgroupMountPoint, err := getProcessCgroupMountPoint(ctx, "/proc/self/mounts")
if err != nil {
return nil, err
return nil, fmt.Errorf("get cgroup mount point: %w", err)
}
gpuVendor := common.GetGpuVendor()
return &MetricsCollector{
cgroupVersion: cgroupVersion,
gpuVendor: gpuVendor,
cgroupMountPoint: cgroupMountPoint,
gpuVendor: gpuVendor,
}, nil
}

func (s *MetricsCollector) GetSystemMetrics(ctx context.Context) (*schemas.SystemMetrics, error) {
// It's possible to move a process from one control group to another (it's unlikely, but nonetheless),
// so we detect the current group each time.
cgroupPathname, err := getProcessCgroupPathname(ctx, "/proc/self/cgroup")
if err != nil {
return nil, fmt.Errorf("get cgroup pathname: %w", err)
}
cgroupPath := path.Join(s.cgroupMountPoint, cgroupPathname)
timestamp := time.Now()
cpuUsage, err := s.GetCPUUsageMicroseconds()
cpuUsage, err := s.GetCPUUsageMicroseconds(cgroupPath)
if err != nil {
return nil, err
}
memoryUsage, err := s.GetMemoryUsageBytes()
memoryUsage, err := s.GetMemoryUsageBytes(cgroupPath)
if err != nil {
return nil, err
}
memoryCache, err := s.GetMemoryCacheBytes()
memoryCache, err := s.GetMemoryCacheBytes(cgroupPath)
if err != nil {
return nil, err
}
Expand All @@ -61,28 +71,14 @@ func (s *MetricsCollector) GetSystemMetrics(ctx context.Context) (*schemas.Syste
}, nil
}

func (s *MetricsCollector) GetCPUUsageMicroseconds() (uint64, error) {
cgroupCPUUsagePath := "/sys/fs/cgroup/cpu.stat"
if s.cgroupVersion == 1 {
cgroupCPUUsagePath = "/sys/fs/cgroup/cpuacct/cpuacct.usage"
}
func (s *MetricsCollector) GetCPUUsageMicroseconds(cgroupPath string) (uint64, error) {
cgroupCPUUsagePath := path.Join(cgroupPath, "cpu.stat")

data, err := os.ReadFile(cgroupCPUUsagePath)
if err != nil {
return 0, fmt.Errorf("could not read CPU usage: %w", err)
}

if s.cgroupVersion == 1 {
// cgroup v1 provides usage in nanoseconds
usageStr := strings.TrimSpace(string(data))
cpuUsage, err := strconv.ParseUint(usageStr, 10, 64)
if err != nil {
return 0, fmt.Errorf("could not parse CPU usage: %w", err)
}
// convert nanoseconds to microseconds
return cpuUsage / 1000, nil
}
// cgroup v2, we need to extract usage_usec from cpu.stat
lines := strings.Split(string(data), "\n")
for _, line := range lines {
if strings.HasPrefix(line, "usage_usec") {
Expand All @@ -100,11 +96,8 @@ func (s *MetricsCollector) GetCPUUsageMicroseconds() (uint64, error) {
return 0, fmt.Errorf("usage_usec not found in cpu.stat")
}

func (s *MetricsCollector) GetMemoryUsageBytes() (uint64, error) {
cgroupMemoryUsagePath := "/sys/fs/cgroup/memory.current"
if s.cgroupVersion == 1 {
cgroupMemoryUsagePath = "/sys/fs/cgroup/memory/memory.usage_in_bytes"
}
func (s *MetricsCollector) GetMemoryUsageBytes(cgroupPath string) (uint64, error) {
cgroupMemoryUsagePath := path.Join(cgroupPath, "memory.current")

data, err := os.ReadFile(cgroupMemoryUsagePath)
if err != nil {
Expand All @@ -119,11 +112,8 @@ func (s *MetricsCollector) GetMemoryUsageBytes() (uint64, error) {
return usedMemory, nil
}

func (s *MetricsCollector) GetMemoryCacheBytes() (uint64, error) {
cgroupMemoryStatPath := "/sys/fs/cgroup/memory.stat"
if s.cgroupVersion == 1 {
cgroupMemoryStatPath = "/sys/fs/cgroup/memory/memory.stat"
}
func (s *MetricsCollector) GetMemoryCacheBytes(cgroupPath string) (uint64, error) {
cgroupMemoryStatPath := path.Join(cgroupPath, "memory.stat")

statData, err := os.ReadFile(cgroupMemoryStatPath)
if err != nil {
Expand All @@ -132,8 +122,7 @@ func (s *MetricsCollector) GetMemoryCacheBytes() (uint64, error) {

lines := strings.Split(string(statData), "\n")
for _, line := range lines {
if (s.cgroupVersion == 1 && strings.HasPrefix(line, "total_inactive_file")) ||
(s.cgroupVersion == 2 && strings.HasPrefix(line, "inactive_file")) {
if strings.HasPrefix(line, "inactive_file") {
parts := strings.Fields(line)
if len(parts) != 2 {
return 0, fmt.Errorf("unexpected format in memory.stat")
Expand Down Expand Up @@ -255,23 +244,6 @@ func (s *MetricsCollector) GetIntelAcceleratorMetrics(ctx context.Context) ([]sc
return parseNVIDIASMILikeMetrics(out.String())
}

func getCgroupVersion() (int, error) {
data, err := os.ReadFile("/proc/self/mountinfo")
if err != nil {
return 0, fmt.Errorf("could not read /proc/self/mountinfo: %w", err)
}

for _, line := range strings.Split(string(data), "\n") {
if strings.Contains(line, "cgroup2") {
return 2, nil
} else if strings.Contains(line, "cgroup") {
return 1, nil
}
}

return 0, fmt.Errorf("could not determine cgroup version")
}

func parseNVIDIASMILikeMetrics(output string) ([]schemas.GPUMetrics, error) {
metrics := []schemas.GPUMetrics{}

Expand Down
4 changes: 2 additions & 2 deletions runner/internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestGetAMDGPUMetrics_OK(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Skipping on macOS")
}
collector, err := NewMetricsCollector()
collector, err := NewMetricsCollector(t.Context())
assert.NoError(t, err)

cases := []struct {
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestGetAMDGPUMetrics_ErrorGPUUtilNA(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Skipping on macOS")
}
collector, err := NewMetricsCollector()
collector, err := NewMetricsCollector(t.Context())
assert.NoError(t, err)
metrics, err := collector.getAMDGPUMetrics("gpu,gfx,gfx_clock,vram_used,vram_total\n0,N/A,N/A,283,196300\n")
assert.ErrorContains(t, err, "GPU utilization is N/A")
Expand Down
8 changes: 3 additions & 5 deletions runner/internal/runner/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/dstackai/dstack/runner/internal/api"
"github.com/dstackai/dstack/runner/internal/executor"
"github.com/dstackai/dstack/runner/internal/log"
"github.com/dstackai/dstack/runner/internal/metrics"
"github.com/dstackai/dstack/runner/internal/schemas"
)

Expand All @@ -28,11 +27,10 @@ func (s *Server) healthcheckGetHandler(w http.ResponseWriter, r *http.Request) (
}

func (s *Server) metricsGetHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) {
metricsCollector, err := metrics.NewMetricsCollector()
if err != nil {
return nil, &api.Error{Status: http.StatusInternalServerError, Err: err}
if s.metricsCollector == nil {
return nil, &api.Error{Status: http.StatusNotFound, Msg: "Metrics collector is not available"}
}
metrics, err := metricsCollector.GetSystemMetrics(r.Context())
metrics, err := s.metricsCollector.GetSystemMetrics(r.Context())
if err != nil {
return nil, &api.Error{Status: http.StatusInternalServerError, Err: err}
}
Expand Down
Loading