From bbfb362ce8ea81b27dbf199d9fa51da7678ea7ca Mon Sep 17 00:00:00 2001 From: John William Humphreys Date: Sun, 7 Dec 2025 11:42:39 -0800 Subject: [PATCH 1/4] Adding opts to torchx k8s scheduler to allow overriding cpu/memory overhead and AWS EFA device count. Summary: This diff adds support for new scheduler ops {reserved_millicpu, reserved_memmb, efa_device_count}. This lets you override the overhead CPU/memory/efa-devices for established resources. The first two are important as in kubernetes, the amount of schedulable room on a node is determined by its size *and* the other things running on it. Different clusters will have different security, logging, etc tools meaning that they will have different practically schedulable space per node. If torchx assumes only 100m (1/10th) of a node is required for headroom, clusters with more daemon sets for security/logging/etc (i.e. most enterprise clusters) will not be able to schedule its pods. For EFA specifically, clusters need to be built to support it and it can be off or on. So, it is good to assume a cluster uses it if a node supports it, but we should be able to fix it. Differential Revision: D88564180 --- torchx/schedulers/kubernetes_scheduler.py | 79 +++++++++++++++- .../test/kubernetes_scheduler_test.py | 92 ++++++++++++++++--- 2 files changed, 151 insertions(+), 20 deletions(-) diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index 2fb77f3bf..c017bdf5b 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -294,7 +294,14 @@ def sanitize_for_serialization(obj: object) -> object: return api.sanitize_for_serialization(obj) -def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod": +def role_to_pod( + name: str, + role: Role, + service_account: Optional[str], + reserved_millicpu: int = RESERVED_MILLICPU, + reserved_memmb: int = RESERVED_MEMMB, + efa_device_count: Optional[int] = None, +) -> "V1Pod": from kubernetes.client.models import ( # noqa: F811 redefinition of unused V1Container, V1ContainerPort, @@ -324,18 +331,29 @@ def role_to_pod(name: str, role: Role, service_account: Optional[str]) -> "V1Pod if resource.cpu > 0: mcpu = int(resource.cpu * 1000) limits["cpu"] = f"{mcpu}m" - request_mcpu = max(mcpu - RESERVED_MILLICPU, 0) + request_mcpu = max(mcpu - reserved_millicpu, 0) requests["cpu"] = f"{request_mcpu}m" if resource.memMB > 0: limits["memory"] = f"{int(resource.memMB)}M" - request_memMB = max(int(resource.memMB) - RESERVED_MEMMB, 0) + request_memMB = max(int(resource.memMB) - reserved_memmb, 0) requests["memory"] = f"{request_memMB}M" if resource.gpu > 0: requests["nvidia.com/gpu"] = limits["nvidia.com/gpu"] = str(resource.gpu) + EFA_DEVICE = "vpc.amazonaws.com/efa" for device_name, device_limit in resource.devices.items(): limits[device_name] = str(device_limit) + # Handle EFA device count override: + # - None (default): use whatever count is in the resource spec (already added above) + # - 0: remove EFA devices entirely + # - N > 0: set EFA device count to N (override or add) + if efa_device_count is not None: + if efa_device_count == 0: + limits.pop(EFA_DEVICE, None) + else: + limits[EFA_DEVICE] = str(efa_device_count) + resources = V1ResourceRequirements( limits=limits, requests=requests, @@ -475,6 +493,9 @@ def app_to_resource( queue: str, service_account: Optional[str], priority_class: Optional[str] = None, + reserved_millicpu: int = RESERVED_MILLICPU, + reserved_memmb: int = RESERVED_MEMMB, + efa_device_count: Optional[int] = None, ) -> Dict[str, Any]: """ app_to_resource creates a volcano job kubernetes resource definition from @@ -507,7 +528,14 @@ def app_to_resource( replica_role.env["TORCHX_RANK0_HOST"] = "localhost" replica_role.env["TORCHX_IMAGE"] = replica_role.image - pod = role_to_pod(name, replica_role, service_account) + pod = role_to_pod( + name, + replica_role, + service_account, + reserved_millicpu, + reserved_memmb, + efa_device_count, + ) if k8s_metadata := role.metadata.get("kubernetes"): if isinstance(k8s_metadata, str): import fsspec @@ -589,6 +617,9 @@ class KubernetesOpts(TypedDict, total=False): service_account: Optional[str] priority_class: Optional[str] validate_spec: Optional[bool] + reserved_millicpu: Optional[int] + reserved_memmb: Optional[int] + efa_device_count: Optional[int] class KubernetesScheduler(DockerWorkspaceMixin, Scheduler[KubernetesOpts]): @@ -783,7 +814,26 @@ def _submit_dryrun( priority_class, str ), "priority_class must be a str" - resource = app_to_resource(app, queue, service_account, priority_class) + reserved_millicpu = cfg.get("reserved_millicpu", RESERVED_MILLICPU) + assert isinstance(reserved_millicpu, int), "reserved_millicpu must be an int" + + reserved_memmb = cfg.get("reserved_memmb", RESERVED_MEMMB) + assert isinstance(reserved_memmb, int), "reserved_memmb must be an int" + + efa_device_count = cfg.get("efa_device_count") + assert efa_device_count is None or isinstance( + efa_device_count, int + ), "efa_device_count must be an int or None" + + resource = app_to_resource( + app, + queue, + service_account, + priority_class, + reserved_millicpu, + reserved_memmb, + efa_device_count, + ) if cfg.get("validate_spec"): try: @@ -889,6 +939,25 @@ def _run_opts(self) -> runopts: help="Validate job spec using Kubernetes API dry-run before submission", default=True, ) + opts.add( + "reserved_millicpu", + type_=int, + help="Amount of CPU in millicores to reserve for Kubernetes system overhead (default: 100)", + default=RESERVED_MILLICPU, + ) + opts.add( + "reserved_memmb", + type_=int, + help="Amount of memory in MB to reserve for Kubernetes system overhead (default: 1024)", + default=RESERVED_MEMMB, + ) + opts.add( + "efa_device_count", + type_=int, + help="EFA device count override: None/unset=use resource spec, " + "0=remove EFA, N>0=set EFA count to N", + default=None, + ) return opts def describe(self, app_id: str) -> Optional[DescribeAppResponse]: diff --git a/torchx/schedulers/test/kubernetes_scheduler_test.py b/torchx/schedulers/test/kubernetes_scheduler_test.py index edb643364..06ad5d59b 100644 --- a/torchx/schedulers/test/kubernetes_scheduler_test.py +++ b/torchx/schedulers/test/kubernetes_scheduler_test.py @@ -476,32 +476,91 @@ def test_device_mounts(self) -> None: ) self.assertTrue(pod.spec.containers[0].security_context.privileged) - def test_resource_devices(self) -> None: - scheduler = create_scheduler("test") - - role = specs.Role( + def test_efa_device_override(self) -> None: + """Test EFA device count can be overridden via efa_device_count parameter.""" + role_with_efa = specs.Role( name="foo", image="", resource=specs.Resource( cpu=2, memMB=3000, gpu=4, - devices={ - "vpc.amazonaws.com/efa": 4, - }, + devices={"vpc.amazonaws.com/efa": 4}, ), ) + role_without_efa = specs.Role( + name="foo", + image="", + resource=specs.Resource(cpu=2, memMB=3000, gpu=4), + ) + + # Default: use resource spec's EFA count (or no EFA if not in spec) + pod = role_to_pod("foo", role_with_efa, service_account="") + self.assertEqual( + pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "4" + ) + + pod = role_to_pod("foo", role_without_efa, service_account="") + self.assertNotIn( + "vpc.amazonaws.com/efa", pod.spec.containers[0].resources.limits + ) + + # Override to 0: remove EFA entirely + pod = role_to_pod("foo", role_with_efa, service_account="", efa_device_count=0) + self.assertNotIn( + "vpc.amazonaws.com/efa", pod.spec.containers[0].resources.limits + ) + + # Override to different count: use override value + pod = role_to_pod("foo", role_with_efa, service_account="", efa_device_count=8) + self.assertEqual( + pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "8" + ) + + # Add EFA when not in resource spec + pod = role_to_pod( + "foo", role_without_efa, service_account="", efa_device_count=32 + ) + self.assertEqual( + pod.spec.containers[0].resources.limits["vpc.amazonaws.com/efa"], "32" + ) + + def test_reserved_resources_override(self) -> None: + """Test that reserved_millicpu and reserved_memmb overrides work correctly.""" + role = specs.Role( + name="foo", + image="", + resource=specs.Resource(cpu=2, gpu=0, memMB=3000), + ) + + # Default: 100 millicpu and 1024 memmb reserved pod = role_to_pod("foo", role, service_account="") + self.assertEqual(pod.spec.containers[0].resources.limits["cpu"], "2000m") + self.assertEqual(pod.spec.containers[0].resources.limits["memory"], "3000M") self.assertEqual( - pod.spec.containers[0].resources.limits, - { - "cpu": "2000m", - "memory": "3000M", - "nvidia.com/gpu": "4", - "vpc.amazonaws.com/efa": "4", - }, + pod.spec.containers[0].resources.requests["cpu"], "1900m" + ) # 2000 - 100 + self.assertEqual( + pod.spec.containers[0].resources.requests["memory"], "1976M" + ) # 3000 - 1024 + + # Custom overrides for both CPU and memory + pod = role_to_pod( + "foo", role, service_account="", reserved_millicpu=300, reserved_memmb=1000 + ) + self.assertEqual( + pod.spec.containers[0].resources.requests["cpu"], "1700m" + ) # 2000 - 300 + self.assertEqual( + pod.spec.containers[0].resources.requests["memory"], "2000M" + ) # 3000 - 1000 + + # Zero reserved: requests equal limits + pod = role_to_pod( + "foo", role, service_account="", reserved_millicpu=0, reserved_memmb=0 ) - self.assertFalse(pod.spec.containers[0].security_context.privileged) + self.assertEqual(pod.spec.containers[0].resources.requests["cpu"], "2000m") + self.assertEqual(pod.spec.containers[0].resources.requests["memory"], "3000M") def test_instance_type(self) -> None: scheduler = create_scheduler("test") @@ -797,6 +856,9 @@ def test_runopts(self) -> None: "service_account", "priority_class", "validate_spec", + "reserved_millicpu", + "reserved_memmb", + "efa_device_count", }, ) From 3be7d890c7da7edde0c29812386d5688a65b74ab Mon Sep 17 00:00:00 2001 From: John William Humphreys Date: Sun, 7 Dec 2025 11:42:39 -0800 Subject: [PATCH 2/4] Populating the host_name field with cluster-local-ips. Summary: In the current implementation, the hostname is set to empty-string in the k8s scheduler's describe function. This diff uses the k8s client to map the pod-name to its ip, and generate the pop cluster-dns ..cluster.local and return it. If it fails, it makes a best attempt at a fallback. Note: The fallback does not match prior functionality; but diff #4 in this stack makes it fall back to empty-string and adds tests to cover this diff. Differential Revision: D88567449 --- torchx/schedulers/kubernetes_scheduler.py | 34 ++++++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index c017bdf5b..dfe96b171 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -149,7 +149,6 @@ from torchx.util.strings import normalize_str from torchx.workspace.docker_workspace import DockerWorkspaceMixin - if TYPE_CHECKING: from docker import DockerClient from kubernetes.client import ApiClient, CustomObjectsApi @@ -159,6 +158,7 @@ ) from kubernetes.client.rest import ApiException + logger: logging.Logger = logging.getLogger(__name__) # Kubernetes reserves a small amount of resources per host for the system. For @@ -961,6 +961,7 @@ def _run_opts(self) -> runopts: return opts def describe(self, app_id: str) -> Optional[DescribeAppResponse]: + from kubernetes import client from kubernetes.client.rest import ApiException namespace, name = app_id.split(":") @@ -986,8 +987,8 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: TASK_STATUS_COUNT = "taskStatusCount" if TASK_STATUS_COUNT in status: - for name, status in status[TASK_STATUS_COUNT].items(): - role, _, idx = name.rpartition("-") + for task_name, status in status[TASK_STATUS_COUNT].items(): + role, _, idx = task_name.rpartition("-") state_str = next(iter(status["phase"].keys())) state = TASK_STATE[state_str] @@ -996,8 +997,33 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: roles[role] = Role(name=role, num_replicas=0, image="") roles_statuses[role] = RoleStatus(role, []) roles[role].num_replicas += 1 + + # Pod name follows the pattern: {job_name}-{task_name}-0 + # Get the pod to retrieve its IP address + pod_name_k8s = f"{name}-{task_name}-0" + try: + core_api = client.CoreV1Api(self._api_client()) + pod = core_api.read_namespaced_pod( + name=pod_name_k8s, namespace=namespace + ) + pod_ip = pod.status.pod_ip + + # Convert IP to dashed format (e.g., 10.244.1.5 -> 10-244-1-5) + pod_ip_dashed = pod_ip.replace(".", "-") + + # Kubernetes DNS = ..pod.cluster.local + # Note: This will only be useful if the client using the IPs in in the cluster. + hostname = f"{pod_ip_dashed}.{namespace}.pod.cluster.local" + + except ApiException: + # Fallback to old behavior if pod not found + normalized_task_name = normalize_str(pod_name_k8s) + hostname = f"{normalized_task_name}.pod.cluster.local" + roles_statuses[role].replicas.append( - ReplicaStatus(id=int(idx), role=role, state=state, hostname="") + ReplicaStatus( + id=int(idx), role=role, state=state, hostname=hostname + ) ) else: app_state = AppState.UNKNOWN From 7c9336d6b959fb75f6a45df33f66d8bd59dbdac6 Mon Sep 17 00:00:00 2001 From: John William Humphreys Date: Sun, 7 Dec 2025 11:42:39 -0800 Subject: [PATCH 3/4] Feature to add cluster-auth as an option (i.e. k8s service account support). Summary: This diff adds cluster-auth as an option in the k8s scheduler by first checking if it works, and then falling back to the old auth scheme. So, if we are in pod in a k8s cluster and the pod has a k8s service-account, it will use that. If it does not, it will try to fallback to a kubeconfig. Differential Revision: D88568003 --- torchx/schedulers/kubernetes_scheduler.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index dfe96b171..2fa03ec82 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -738,9 +738,14 @@ def _api_client(self) -> "ApiClient": if c is None: configuration = client.Configuration() try: - config.load_kube_config(client_configuration=configuration) - except config.ConfigException as e: - warnings.warn(f"failed to load kube config: {e}") + # Try in-cluster config first (for pods with ServiceAccount) + config.load_incluster_config(client_configuration=configuration) + except config.ConfigException: + # Fall back to kubeconfig (for local development) + try: + config.load_kube_config(client_configuration=configuration) + except config.ConfigException as e: + warnings.warn(f"failed to load kube config: {e}") c = self._client = client.ApiClient(configuration) From 421fc35c6981099880ce961e074ff056c01d92dc Mon Sep 17 00:00:00 2001 From: John William Humphreys Date: Sun, 7 Dec 2025 11:42:39 -0800 Subject: [PATCH 4/4] Changing default behavior of getting hostname in describe to match previous behavior and adding tests. Summary: This diff amends diff #2 in this stack. It makes the default/fallback behavior match what it was before this stack (fall back to no-hostname/empty-string). It also adds tests for the working and fallback behavior. Differential Revision: D88570069 --- torchx/schedulers/kubernetes_scheduler.py | 3 +- .../test/kubernetes_scheduler_test.py | 66 ++++++++++++++++++- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/torchx/schedulers/kubernetes_scheduler.py b/torchx/schedulers/kubernetes_scheduler.py index 2fa03ec82..7c97f37bb 100644 --- a/torchx/schedulers/kubernetes_scheduler.py +++ b/torchx/schedulers/kubernetes_scheduler.py @@ -1022,8 +1022,7 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]: except ApiException: # Fallback to old behavior if pod not found - normalized_task_name = normalize_str(pod_name_k8s) - hostname = f"{normalized_task_name}.pod.cluster.local" + hostname = "" roles_statuses[role].replicas.append( ReplicaStatus( diff --git a/torchx/schedulers/test/kubernetes_scheduler_test.py b/torchx/schedulers/test/kubernetes_scheduler_test.py index 06ad5d59b..65b710052 100644 --- a/torchx/schedulers/test/kubernetes_scheduler_test.py +++ b/torchx/schedulers/test/kubernetes_scheduler_test.py @@ -741,8 +741,15 @@ def test_submit_job_name_conflict( second_call_kwargs = create_namespaced_custom_object.call_args_list[1][1] self.assertNotIn("dry_run", second_call_kwargs) + @patch("kubernetes.client.CoreV1Api.read_namespaced_pod") @patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object_status") - def test_describe(self, get_namespaced_custom_object_status: MagicMock) -> None: + def test_describe( + self, + get_namespaced_custom_object_status: MagicMock, + read_namespaced_pod: MagicMock, + ) -> None: + from kubernetes.client.rest import ApiException + get_namespaced_custom_object_status.return_value = { "status": { "state": {"phase": "Completed"}, @@ -750,6 +757,8 @@ def test_describe(self, get_namespaced_custom_object_status: MagicMock) -> None: "taskStatusCount": {"echo-0": {"phase": {"Succeeded": 1}}}, } } + # Simulate pod not found to trigger fallback to empty hostname + read_namespaced_pod.side_effect = ApiException(status=404, reason="Not Found") app_id = "testnamespace:testid" scheduler = create_scheduler("test") info = scheduler.describe(app_id) @@ -789,6 +798,61 @@ def test_describe(self, get_namespaced_custom_object_status: MagicMock) -> None: ), ) + @patch("kubernetes.client.CoreV1Api.read_namespaced_pod") + @patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object_status") + def test_describe_with_pod_ip( + self, + get_namespaced_custom_object_status: MagicMock, + read_namespaced_pod: MagicMock, + ) -> None: + get_namespaced_custom_object_status.return_value = { + "status": { + "state": {"phase": "Running"}, + "running": 1, + "taskStatusCount": {"worker-0": {"phase": {"Running": 1}}}, + } + } + # Mock a pod with a valid IP address + mock_pod = MagicMock() + mock_pod.status.pod_ip = "10.244.1.5" + read_namespaced_pod.return_value = mock_pod + + app_id = "testnamespace:testid" + scheduler = create_scheduler("test") + info = scheduler.describe(app_id) + + # Verify the pod API was called with correct parameters + read_namespaced_pod.assert_called_once_with( + name="testid-worker-0-0", namespace="testnamespace" + ) + + # Verify hostname follows the spec: {pod-ip-dashed}.{namespace}.pod.cluster.local + # IP 10.244.1.5 should become 10-244-1-5 + expected_hostname = "10-244-1-5.testnamespace.pod.cluster.local" + self.assertEqual( + info, + DescribeAppResponse( + app_id=app_id, + state=specs.AppState.RUNNING, + roles_statuses=[ + specs.RoleStatus( + "worker", + [ + specs.ReplicaStatus( + id=0, + role="worker", + state=specs.AppState.RUNNING, + hostname=expected_hostname, + ) + ], + ), + ], + roles=[ + specs.Role(name="worker", image="", num_replicas=1), + ], + ), + ) + @patch("kubernetes.client.CustomObjectsApi.get_namespaced_custom_object_status") def test_describe_unknown( self, get_namespaced_custom_object_status: MagicMock