diff --git a/runner/cmd/runner/main.go b/runner/cmd/runner/main.go index fc48233c6..27c07292b 100644 --- a/runner/cmd/runner/main.go +++ b/runner/cmd/runner/main.go @@ -38,7 +38,7 @@ func start(tempDir string, homeDir string, httpPort int, sshPort int, logLevel i log.DefaultEntry.Logger.SetOutput(io.MultiWriter(os.Stdout, defaultLogFile)) log.DefaultEntry.Logger.SetLevel(logrus.Level(logLevel)) - server, err := api.NewServer(tempDir, homeDir, fmt.Sprintf(":%d", httpPort), sshPort, version) + server, err := api.NewServer(context.TODO(), tempDir, homeDir, fmt.Sprintf(":%d", httpPort), sshPort, version) if err != nil { return fmt.Errorf("create server: %w", err) } diff --git a/runner/internal/metrics/cgroups.go b/runner/internal/metrics/cgroups.go new file mode 100644 index 000000000..9ce1e54fe --- /dev/null +++ b/runner/internal/metrics/cgroups.go @@ -0,0 +1,107 @@ +package metrics + +import ( + "bufio" + "context" + "errors" + "fmt" + "os" + "strings" + + "github.com/dstackai/dstack/runner/internal/log" +) + +func getProcessCgroupMountPoint(ctx context.Context, ProcPidMountsPath string) (string, error) { + // See proc_pid_mounts(5) for the ProcPidMountsPath file description + file, err := os.Open(ProcPidMountsPath) + if err != nil { + return "", fmt.Errorf("open mounts file: %w", err) + } + defer func() { + _ = file.Close() + }() + + mountPoint := "" + hasCgroupV1 := false + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + // See fstab(5) for the format description + fields := strings.Fields(line) + if len(fields) != 6 { + log.Warning(ctx, "Unexpected number of fields in mounts file", "num", len(fields), "line", line) + continue + } + fsType := fields[2] + if fsType == "cgroup2" { + mountPoint = fields[1] + break + } + if fsType == "cgroup" { + hasCgroupV1 = true + } + } + if err := scanner.Err(); err != nil { + log.Warning(ctx, "Error while scanning mounts file", "err", err) + } + + if mountPoint != "" { + return mountPoint, nil + } + + if hasCgroupV1 { + return "", errors.New("only cgroup v1 mounts found") + } + + return "", errors.New("no cgroup mounts found") +} + +func getProcessCgroupPathname(ctx context.Context, procPidCgroupPath string) (string, error) { + // See cgroups(7) for the procPidCgroupPath file description + file, err := os.Open(procPidCgroupPath) + if err != nil { + return "", fmt.Errorf("open cgroup file: %w", err) + } + defer func() { + _ = file.Close() + }() + + pathname := "" + hasCgroupV1 := false + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + // See cgroups(7) for the format description + fields := strings.Split(line, ":") + if len(fields) != 3 { + log.Warning(ctx, "Unexpected number of fields in cgroup file", "num", len(fields), "line", line) + continue + } + if fields[0] != "0" { + hasCgroupV1 = true + continue + } + if fields[1] != "" { + // Must be empty for v2 + log.Warning(ctx, "Unexpected v2 entry in cgroup file", "num", "line", line) + continue + } + pathname = fields[2] + break + } + if err := scanner.Err(); err != nil { + log.Warning(ctx, "Error while scanning cgroup file", "err", err) + } + + if pathname != "" { + return pathname, nil + } + + if hasCgroupV1 { + return "", errors.New("only cgroup v1 pathnames found") + } + + return "", errors.New("no cgroup pathname found") +} diff --git a/runner/internal/metrics/cgroups_test.go b/runner/internal/metrics/cgroups_test.go new file mode 100644 index 000000000..3e6e0abca --- /dev/null +++ b/runner/internal/metrics/cgroups_test.go @@ -0,0 +1,87 @@ +package metrics + +import ( + "fmt" + "os" + "path" + "testing" + + "github.com/stretchr/testify/require" +) + +const ( + cgroup2MountLine = "cgroup2 /sys/fs/cgroup cgroup2 rw,nosuid,nodev,noexec,relatime,nsdelegate,memory_recursiveprot 0 0" + cgroupMountLine = "cgroup /sys/fs/cgroup/cpu,cpuacct cgroup rw,nosuid,nodev,noexec,relatime,cpu,cpuacct 0 0" + rootMountLine = "/dev/nvme0n1p5 / ext4 rw,relatime 0 0" +) + +func TestGetProcessCgroupMountPoint_ErrorNoCgroupMounts(t *testing.T) { + procPidMountsPath := createProcFile(t, "mounts", rootMountLine, "malformed line") + + mountPoint, err := getProcessCgroupMountPoint(t.Context(), procPidMountsPath) + + require.ErrorContains(t, err, "no cgroup mounts found") + require.Equal(t, "", mountPoint) +} + +func TestGetProcessCgroupMountPoint_ErrorOnlyCgroupV1Mounts(t *testing.T) { + procPidMountsPath := createProcFile(t, "mounts", rootMountLine, cgroupMountLine) + + mountPoint, err := getProcessCgroupMountPoint(t.Context(), procPidMountsPath) + + require.ErrorContains(t, err, "only cgroup v1 mounts found") + require.Equal(t, "", mountPoint) +} + +func TestGetProcessCgroupMountPoint_OK(t *testing.T) { + procPidMountsPath := createProcFile(t, "mounts", rootMountLine, cgroupMountLine, cgroup2MountLine) + + mountPoint, err := getProcessCgroupMountPoint(t.Context(), procPidMountsPath) + + require.NoError(t, err) + require.Equal(t, "/sys/fs/cgroup", mountPoint) +} + +func TestGetProcessCgroupPathname_ErrorNoCgroup(t *testing.T) { + procPidCgroupPath := createProcFile(t, "cgroup", "malformed entry") + + mountPoint, err := getProcessCgroupPathname(t.Context(), procPidCgroupPath) + + require.ErrorContains(t, err, "no cgroup pathname found") + require.Equal(t, "", mountPoint) +} + +func TestGetProcessCgroupPathname_ErrorOnlyCgroupV1(t *testing.T) { + procPidCgroupPath := createProcFile(t, "cgroup", "7:cpu,cpuacct:/user.slice") + + pathname, err := getProcessCgroupPathname(t.Context(), procPidCgroupPath) + + require.ErrorContains(t, err, "only cgroup v1 pathnames found") + require.Equal(t, "", pathname) +} + +func TestGetProcessCgroupPathname_OK(t *testing.T) { + procPidCgroupPath := createProcFile(t, "cgroup", "7:cpu,cpuacct:/user.slice", "0::/user.slice/user-1000.slice/session-1.scope") + + mountPoint, err := getProcessCgroupPathname(t.Context(), procPidCgroupPath) + + require.NoError(t, err) + require.Equal(t, "/user.slice/user-1000.slice/session-1.scope", mountPoint) +} + +func createProcFile(t *testing.T, name string, lines ...string) string { + t.Helper() + tmpDir := t.TempDir() + pth := path.Join(tmpDir, name) + file, err := os.OpenFile(pth, os.O_WRONLY|os.O_CREATE, 0o600) + require.NoError(t, err) + defer func() { + err := file.Close() + require.NoError(t, err) + }() + for _, line := range lines { + _, err := fmt.Fprintln(file, line) + require.NoError(t, err) + } + return pth +} diff --git a/runner/internal/metrics/metrics.go b/runner/internal/metrics/metrics.go index 0a5c1a639..26acc2cdf 100644 --- a/runner/internal/metrics/metrics.go +++ b/runner/internal/metrics/metrics.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "os/exec" + "path" "strconv" "strings" "time" @@ -17,33 +18,42 @@ import ( ) type MetricsCollector struct { - cgroupVersion int - gpuVendor common.GpuVendor + cgroupMountPoint string + gpuVendor common.GpuVendor } -func NewMetricsCollector() (*MetricsCollector, error) { - cgroupVersion, err := getCgroupVersion() +func NewMetricsCollector(ctx context.Context) (*MetricsCollector, error) { + // It's unlikely that cgroup mount point will change during container lifetime, + // so we detect it only once and reuse. + cgroupMountPoint, err := getProcessCgroupMountPoint(ctx, "/proc/self/mounts") if err != nil { - return nil, err + return nil, fmt.Errorf("get cgroup mount point: %w", err) } gpuVendor := common.GetGpuVendor() return &MetricsCollector{ - cgroupVersion: cgroupVersion, - gpuVendor: gpuVendor, + cgroupMountPoint: cgroupMountPoint, + gpuVendor: gpuVendor, }, nil } func (s *MetricsCollector) GetSystemMetrics(ctx context.Context) (*schemas.SystemMetrics, error) { + // It's possible to move a process from one control group to another (it's unlikely, but nonetheless), + // so we detect the current group each time. + cgroupPathname, err := getProcessCgroupPathname(ctx, "/proc/self/cgroup") + if err != nil { + return nil, fmt.Errorf("get cgroup pathname: %w", err) + } + cgroupPath := path.Join(s.cgroupMountPoint, cgroupPathname) timestamp := time.Now() - cpuUsage, err := s.GetCPUUsageMicroseconds() + cpuUsage, err := s.GetCPUUsageMicroseconds(cgroupPath) if err != nil { return nil, err } - memoryUsage, err := s.GetMemoryUsageBytes() + memoryUsage, err := s.GetMemoryUsageBytes(cgroupPath) if err != nil { return nil, err } - memoryCache, err := s.GetMemoryCacheBytes() + memoryCache, err := s.GetMemoryCacheBytes(cgroupPath) if err != nil { return nil, err } @@ -61,28 +71,14 @@ func (s *MetricsCollector) GetSystemMetrics(ctx context.Context) (*schemas.Syste }, nil } -func (s *MetricsCollector) GetCPUUsageMicroseconds() (uint64, error) { - cgroupCPUUsagePath := "/sys/fs/cgroup/cpu.stat" - if s.cgroupVersion == 1 { - cgroupCPUUsagePath = "/sys/fs/cgroup/cpuacct/cpuacct.usage" - } +func (s *MetricsCollector) GetCPUUsageMicroseconds(cgroupPath string) (uint64, error) { + cgroupCPUUsagePath := path.Join(cgroupPath, "cpu.stat") data, err := os.ReadFile(cgroupCPUUsagePath) if err != nil { return 0, fmt.Errorf("could not read CPU usage: %w", err) } - if s.cgroupVersion == 1 { - // cgroup v1 provides usage in nanoseconds - usageStr := strings.TrimSpace(string(data)) - cpuUsage, err := strconv.ParseUint(usageStr, 10, 64) - if err != nil { - return 0, fmt.Errorf("could not parse CPU usage: %w", err) - } - // convert nanoseconds to microseconds - return cpuUsage / 1000, nil - } - // cgroup v2, we need to extract usage_usec from cpu.stat lines := strings.Split(string(data), "\n") for _, line := range lines { if strings.HasPrefix(line, "usage_usec") { @@ -100,11 +96,8 @@ func (s *MetricsCollector) GetCPUUsageMicroseconds() (uint64, error) { return 0, fmt.Errorf("usage_usec not found in cpu.stat") } -func (s *MetricsCollector) GetMemoryUsageBytes() (uint64, error) { - cgroupMemoryUsagePath := "/sys/fs/cgroup/memory.current" - if s.cgroupVersion == 1 { - cgroupMemoryUsagePath = "/sys/fs/cgroup/memory/memory.usage_in_bytes" - } +func (s *MetricsCollector) GetMemoryUsageBytes(cgroupPath string) (uint64, error) { + cgroupMemoryUsagePath := path.Join(cgroupPath, "memory.current") data, err := os.ReadFile(cgroupMemoryUsagePath) if err != nil { @@ -119,11 +112,8 @@ func (s *MetricsCollector) GetMemoryUsageBytes() (uint64, error) { return usedMemory, nil } -func (s *MetricsCollector) GetMemoryCacheBytes() (uint64, error) { - cgroupMemoryStatPath := "/sys/fs/cgroup/memory.stat" - if s.cgroupVersion == 1 { - cgroupMemoryStatPath = "/sys/fs/cgroup/memory/memory.stat" - } +func (s *MetricsCollector) GetMemoryCacheBytes(cgroupPath string) (uint64, error) { + cgroupMemoryStatPath := path.Join(cgroupPath, "memory.stat") statData, err := os.ReadFile(cgroupMemoryStatPath) if err != nil { @@ -132,8 +122,7 @@ func (s *MetricsCollector) GetMemoryCacheBytes() (uint64, error) { lines := strings.Split(string(statData), "\n") for _, line := range lines { - if (s.cgroupVersion == 1 && strings.HasPrefix(line, "total_inactive_file")) || - (s.cgroupVersion == 2 && strings.HasPrefix(line, "inactive_file")) { + if strings.HasPrefix(line, "inactive_file") { parts := strings.Fields(line) if len(parts) != 2 { return 0, fmt.Errorf("unexpected format in memory.stat") @@ -255,23 +244,6 @@ func (s *MetricsCollector) GetIntelAcceleratorMetrics(ctx context.Context) ([]sc return parseNVIDIASMILikeMetrics(out.String()) } -func getCgroupVersion() (int, error) { - data, err := os.ReadFile("/proc/self/mountinfo") - if err != nil { - return 0, fmt.Errorf("could not read /proc/self/mountinfo: %w", err) - } - - for _, line := range strings.Split(string(data), "\n") { - if strings.Contains(line, "cgroup2") { - return 2, nil - } else if strings.Contains(line, "cgroup") { - return 1, nil - } - } - - return 0, fmt.Errorf("could not determine cgroup version") -} - func parseNVIDIASMILikeMetrics(output string) ([]schemas.GPUMetrics, error) { metrics := []schemas.GPUMetrics{} diff --git a/runner/internal/metrics/metrics_test.go b/runner/internal/metrics/metrics_test.go index d547e2e33..152f31c1b 100644 --- a/runner/internal/metrics/metrics_test.go +++ b/runner/internal/metrics/metrics_test.go @@ -12,7 +12,7 @@ func TestGetAMDGPUMetrics_OK(t *testing.T) { if runtime.GOOS == "darwin" { t.Skip("Skipping on macOS") } - collector, err := NewMetricsCollector() + collector, err := NewMetricsCollector(t.Context()) assert.NoError(t, err) cases := []struct { @@ -46,7 +46,7 @@ func TestGetAMDGPUMetrics_ErrorGPUUtilNA(t *testing.T) { if runtime.GOOS == "darwin" { t.Skip("Skipping on macOS") } - collector, err := NewMetricsCollector() + collector, err := NewMetricsCollector(t.Context()) assert.NoError(t, err) metrics, err := collector.getAMDGPUMetrics("gpu,gfx,gfx_clock,vram_used,vram_total\n0,N/A,N/A,283,196300\n") assert.ErrorContains(t, err, "GPU utilization is N/A") diff --git a/runner/internal/runner/api/http.go b/runner/internal/runner/api/http.go index ac13b5e5b..bbf416efb 100644 --- a/runner/internal/runner/api/http.go +++ b/runner/internal/runner/api/http.go @@ -16,7 +16,6 @@ import ( "github.com/dstackai/dstack/runner/internal/api" "github.com/dstackai/dstack/runner/internal/executor" "github.com/dstackai/dstack/runner/internal/log" - "github.com/dstackai/dstack/runner/internal/metrics" "github.com/dstackai/dstack/runner/internal/schemas" ) @@ -28,11 +27,10 @@ func (s *Server) healthcheckGetHandler(w http.ResponseWriter, r *http.Request) ( } func (s *Server) metricsGetHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) { - metricsCollector, err := metrics.NewMetricsCollector() - if err != nil { - return nil, &api.Error{Status: http.StatusInternalServerError, Err: err} + if s.metricsCollector == nil { + return nil, &api.Error{Status: http.StatusNotFound, Msg: "Metrics collector is not available"} } - metrics, err := metricsCollector.GetSystemMetrics(r.Context()) + metrics, err := s.metricsCollector.GetSystemMetrics(r.Context()) if err != nil { return nil, &api.Error{Status: http.StatusInternalServerError, Err: err} } diff --git a/runner/internal/runner/api/server.go b/runner/internal/runner/api/server.go index be573cc66..9d98315b1 100644 --- a/runner/internal/runner/api/server.go +++ b/runner/internal/runner/api/server.go @@ -12,6 +12,7 @@ import ( "github.com/dstackai/dstack/runner/internal/api" "github.com/dstackai/dstack/runner/internal/executor" "github.com/dstackai/dstack/runner/internal/log" + "github.com/dstackai/dstack/runner/internal/metrics" ) type Server struct { @@ -29,15 +30,23 @@ type Server struct { executor executor.Executor cancelRun context.CancelFunc + metricsCollector *metrics.MetricsCollector + version string } -func NewServer(tempDir string, homeDir string, address string, sshPort int, version string) (*Server, error) { +func NewServer(ctx context.Context, tempDir string, homeDir string, address string, sshPort int, version string) (*Server, error) { r := api.NewRouter() ex, err := executor.NewRunExecutor(tempDir, homeDir, sshPort) if err != nil { return nil, err } + + metricsCollector, err := metrics.NewMetricsCollector(ctx) + if err != nil { + log.Warning(ctx, "Metrics collector is not available", "err", err) + } + s := &Server{ srv: &http.Server{ Addr: address, @@ -55,6 +64,8 @@ func NewServer(tempDir string, homeDir string, address string, sshPort int, vers executor: ex, + metricsCollector: metricsCollector, + version: version, } r.AddHandler("GET", "/api/healthcheck", s.healthcheckGetHandler) diff --git a/src/dstack/_internal/server/background/tasks/process_metrics.py b/src/dstack/_internal/server/background/tasks/process_metrics.py index d2197d422..ca2d25fe5 100644 --- a/src/dstack/_internal/server/background/tasks/process_metrics.py +++ b/src/dstack/_internal/server/background/tasks/process_metrics.py @@ -140,8 +140,12 @@ async def _collect_job_metrics(job_model: JobModel) -> Optional[JobMetricsPoint] return None if res is None: - logger.warning( - "Failed to collect job %s metrics. Runner version does not support metrics API.", + logger.debug( + ( + "Failed to collect job %s metrics." + " Either runner version does not support metrics API" + " or metrics collector is not available." + ), job_model.job_name, ) return None