From da9317a65435b27cc37063f71e93718db7924539 Mon Sep 17 00:00:00 2001 From: peterschmidt85 Date: Mon, 13 Oct 2025 16:15:57 -0700 Subject: [PATCH 01/11] [Internal]: Replace Instance.termination_reason values with codes #3182 --- src/dstack/_internal/core/models/instances.py | 13 +++++++ src/dstack/_internal/core/models/runs.py | 2 + .../background/tasks/process_instances.py | 38 ++++++++++++------- .../background/tasks/process_running_jobs.py | 11 +++++- ...504_instance_termination_reason_message.py | 34 +++++++++++++++++ src/dstack/_internal/server/models.py | 1 + .../_internal/server/services/instances.py | 1 + .../tasks/test_process_instances.py | 15 ++++---- 8 files changed, 93 insertions(+), 22 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index f1f802d54..ef4fbc17b 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -216,6 +216,18 @@ def finished_statuses(cls) -> List["InstanceStatus"]: return [cls.TERMINATING, cls.TERMINATED] +class InstanceTerminationReason(str, Enum): + IDLE_TIMEOUT = "idle_timeout" + PROOVISIONING_TIMEOUT = "provisioning_timeout" + ERROR = "error" + JOB_FINISHED = "job_finished" + TERMINATION_TIMEOUT = "termination_timeout" + STARTING_TIMEOUT = "starting_timeout" + NO_OFFERS = "no_offers" + MASTER_FAILED = "master_failed" + NO_BALANCE = "no_balance" + + class Instance(CoreModel): id: UUID project_name: str @@ -231,6 +243,7 @@ class Instance(CoreModel): unreachable: bool = False health_status: HealthStatus = HealthStatus.HEALTHY termination_reason: Optional[str] = None + termination_reason_message: Optional[str] = None created: datetime.datetime region: Optional[str] = None availability_zone: Optional[str] = None diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 969b336b9..dce8ef8e0 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -138,6 +138,7 @@ class JobTerminationReason(str, Enum): TERMINATED_BY_SERVER = "terminated_by_server" INACTIVITY_DURATION_EXCEEDED = "inactivity_duration_exceeded" TERMINATED_DUE_TO_UTILIZATION_POLICY = "terminated_due_to_utilization_policy" + NO_BALANCE = "no_balance" # Set by the runner CONTAINER_EXITED_WITH_ERROR = "container_exited_with_error" PORTS_BINDING_FAILED = "ports_binding_failed" @@ -161,6 +162,7 @@ def to_status(self) -> JobStatus: self.TERMINATED_BY_SERVER: JobStatus.TERMINATED, self.INACTIVITY_DURATION_EXCEEDED: JobStatus.TERMINATED, self.TERMINATED_DUE_TO_UTILIZATION_POLICY: JobStatus.TERMINATED, + self.NO_BALANCE: JobStatus.TERMINATED, self.CONTAINER_EXITED_WITH_ERROR: JobStatus.FAILED, self.PORTS_BINDING_FAILED: JobStatus.FAILED, self.CREATING_CONTAINER_ERROR: JobStatus.FAILED, diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index ec7ca8f7e..c08b5f0b7 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -45,6 +45,7 @@ InstanceOfferWithAvailability, InstanceRuntime, InstanceStatus, + InstanceTerminationReason, RemoteConnectionInfo, SSHKey, ) @@ -240,7 +241,7 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel delta = datetime.timedelta(seconds=idle_seconds) if idle_duration > delta: instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Idle timeout" + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT.value logger.info( "Instance %s idle duration expired: idle time %ss. Terminating", instance.name, @@ -262,7 +263,7 @@ async def _add_remote(instance: InstanceModel) -> None: retry_duration_deadline = instance.created_at + timedelta(seconds=PROVISIONING_TIMEOUT_SECONDS) if retry_duration_deadline < get_current_datetime(): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "Provisioning timeout expired" + instance.termination_reason = InstanceTerminationReason.PROOVISIONING_TIMEOUT.value logger.warning( "Failed to start instance %s in %d seconds. Terminating...", instance.name, @@ -285,7 +286,8 @@ async def _add_remote(instance: InstanceModel) -> None: ssh_proxy_pkeys = None except (ValueError, PasswordRequiredException): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "Unsupported private SSH key type" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = "Unsupported private SSH key type" logger.warning( "Failed to add instance %s: unsupported private SSH key type", instance.name, @@ -343,7 +345,10 @@ async def _add_remote(instance: InstanceModel) -> None: ) if instance_network is not None and internal_ip is None: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "Failed to locate internal IP address on the given network" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = ( + "Failed to locate internal IP address on the given network" + ) logger.warning( "Failed to add instance %s: failed to locate internal IP address on the given network", instance.name, @@ -356,7 +361,8 @@ async def _add_remote(instance: InstanceModel) -> None: if internal_ip is not None: if not is_ip_among_addresses(ip_address=internal_ip, addresses=host_network_addresses): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = ( + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = ( "Specified internal IP not found among instance interfaces" ) logger.warning( @@ -378,7 +384,8 @@ async def _add_remote(instance: InstanceModel) -> None: instance.total_blocks = blocks else: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "Cannot split into blocks" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = "Cannot split into blocks" logger.warning( "Failed to add instance %s: cannot split into blocks", instance.name, @@ -497,7 +504,8 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No requirements = get_instance_requirements(instance) except ValidationError as e: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = ( + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = ( f"Error to parse profile, requirements or instance_configuration: {e}" ) logger.warning( @@ -645,7 +653,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No ) return - _mark_terminated(instance, "All offers failed" if offers else "No offers found") + _mark_terminated(instance, InstanceTerminationReason.NO_OFFERS.value) if ( instance.fleet and _is_fleet_master_instance(instance) @@ -656,7 +664,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No for sibling_instance in instance.fleet.instances: if sibling_instance.id == instance.id: continue - _mark_terminated(sibling_instance, "Master instance failed to start") + _mark_terminated(sibling_instance, InstanceTerminationReason.MASTER_FAILED.value) def _mark_terminated(instance: InstanceModel, termination_reason: str) -> None: @@ -681,7 +689,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non ): # A busy instance could have no active jobs due to this bug: https://github.com/dstackai/dstack/issues/2068 instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Instance job finished" + instance.termination_reason = InstanceTerminationReason.JOB_FINISHED.value logger.info( "Detected busy instance %s with finished job. Marked as TERMINATING", instance.name, @@ -810,7 +818,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non deadline = instance.termination_deadline if get_current_datetime() > deadline: instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Termination deadline" + instance.termination_reason = InstanceTerminationReason.TERMINATION_TIMEOUT.value logger.warning( "Instance %s shim waiting timeout. Marked as TERMINATING", instance.name, @@ -839,7 +847,7 @@ async def _wait_for_instance_provisioning_data( "Instance %s failed because instance has not become running in time", instance.name ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Instance has not become running in time" + instance.termination_reason = InstanceTerminationReason.STARTING_TIMEOUT.value return backend = await backends_services.get_project_backend_by_type( @@ -852,7 +860,8 @@ async def _wait_for_instance_provisioning_data( instance.name, ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Backend not available" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = "Backend not available" return try: await run_async( @@ -869,7 +878,8 @@ async def _wait_for_instance_provisioning_data( repr(e), ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Error while waiting for instance to become running" + instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason_message = "Error while waiting for instance to become running" except Exception: logger.exception( "Got exception when updating instance %s provisioning data", instance.name diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index aa69f2f33..0d2a183de 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -18,6 +18,7 @@ from dstack._internal.core.models.files import FileArchiveMapping from dstack._internal.core.models.instances import ( InstanceStatus, + InstanceTerminationReason, RemoteConnectionInfo, SSHConnectionParams, ) @@ -372,6 +373,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): job_model.status = JobStatus.TERMINATING # job will be terminated and instance will be emptied by process_terminating_jobs else: + # job_model.instance.termination_reason # No job_model.termination_reason set means ssh connection failed if job_model.disconnected_at is None: job_model.disconnected_at = common_utils.get_current_datetime() @@ -383,7 +385,14 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): ) # TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE in 0.20 or # when CLI <= 0.19.8 is no longer supported - job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + if ( + job_model.instance is not None + and job_model.instance.termination_reason + == InstanceTerminationReason.NO_BALANCE.value + ): + job_model.termination_reason = JobTerminationReason.NO_BALANCE + else: + job_model.termination_reason = JobTerminationReason.INSTANCE_UNREACHABLE job_model.status = JobStatus.TERMINATING else: logger.warning( diff --git a/src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py b/src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py new file mode 100644 index 000000000..7b5cfae47 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py @@ -0,0 +1,34 @@ +"""instance.termination_reason_message + +Revision ID: a16a05249504 +Revises: 2498ab323443 +Create Date: 2025-10-13 15:29:56.691164 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "a16a05249504" +down_revision = "2498ab323443" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.add_column( + sa.Column("termination_reason_message", sa.String(length=4000), nullable=True) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.drop_column("termination_reason_message") + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index b21ba81a4..f11d8687d 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -615,6 +615,7 @@ class InstanceModel(BaseModel): # instance termination handling termination_deadline: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) termination_reason: Mapped[Optional[str]] = mapped_column(String(4000)) + termination_reason_message: Mapped[Optional[str]] = mapped_column(String(4000)) # Deprecated since 0.19.22, not used health_status: Mapped[Optional[str]] = mapped_column(String(4000)) health: Mapped[HealthStatus] = mapped_column( diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index 7c679b0cc..aba50cfc5 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -122,6 +122,7 @@ def instance_model_to_instance(instance_model: InstanceModel) -> Instance: unreachable=instance_model.unreachable, health_status=instance_model.health, termination_reason=instance_model.termination_reason, + termination_reason_message=instance_model.termination_reason_message, created=instance_model.created_at, total_blocks=instance_model.total_blocks, busy_blocks=instance_model.busy_blocks, diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index 690cb71d9..26af59fcb 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -27,6 +27,7 @@ InstanceOffer, InstanceOfferWithAvailability, InstanceStatus, + InstanceTerminationReason, InstanceType, Resources, ) @@ -251,7 +252,7 @@ async def test_check_shim_terminate_instance_by_deadline(self, test_db, session: assert instance is not None assert instance.status == InstanceStatus.TERMINATING assert instance.termination_deadline == termination_deadline_time - assert instance.termination_reason == "Termination deadline" + assert instance.termination_reason == InstanceTerminationReason.TERMINATION_TIMEOUT.value @pytest.mark.asyncio @pytest.mark.parametrize( @@ -510,7 +511,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession): await session.refresh(instance) assert instance is not None assert instance.status == InstanceStatus.TERMINATING - assert instance.termination_reason == "Idle timeout" + assert instance.termination_reason == InstanceTerminationReason.IDLE_TIMEOUT.value class TestSSHInstanceTerminateProvisionTimeoutExpired: @@ -531,7 +532,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession): await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == "Provisioning timeout expired" + assert instance.termination_reason == InstanceTerminationReason.PROOVISIONING_TIMEOUT.value class TestTerminate: @@ -800,7 +801,7 @@ async def test_fails_if_all_offers_fail(self, session: AsyncSession, err: Except await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == "All offers failed" + assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS.value async def test_fails_if_no_offers(self, session: AsyncSession): project = await create_project(session=session) @@ -813,19 +814,19 @@ async def test_fails_if_no_offers(self, session: AsyncSession): await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == "No offers found" + assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS.value @pytest.mark.parametrize( ("placement", "expected_termination_reasons"), [ pytest.param( InstanceGroupPlacement.CLUSTER, - {"No offers found": 1, "Master instance failed to start": 3}, + {InstanceTerminationReason.NO_OFFERS.value: 1, InstanceTerminationReason.MASTER_FAILED.value: 3}, id="cluster", ), pytest.param( None, - {"No offers found": 4}, + {InstanceTerminationReason.NO_OFFERS.value: 4}, id="non-cluster", ), ], From acd1f7eecac1eb770e436369fddde82b33cb1c58 Mon Sep 17 00:00:00 2001 From: peterschmidt85 Date: Mon, 13 Oct 2025 16:29:39 -0700 Subject: [PATCH 02/11] [Internal]: Replace Instance.termination_reason values with codes #3182 Linter --- .../server/background/tasks/test_process_instances.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index 26af59fcb..68dc5dea7 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -821,7 +821,10 @@ async def test_fails_if_no_offers(self, session: AsyncSession): [ pytest.param( InstanceGroupPlacement.CLUSTER, - {InstanceTerminationReason.NO_OFFERS.value: 1, InstanceTerminationReason.MASTER_FAILED.value: 3}, + { + InstanceTerminationReason.NO_OFFERS.value: 1, + InstanceTerminationReason.MASTER_FAILED.value: 3, + }, id="cluster", ), pytest.param( From 6f3f9c58050c703f61b7b34c84ae93f4e33b9bd3 Mon Sep 17 00:00:00 2001 From: peterschmidt85 Date: Mon, 13 Oct 2025 18:25:46 -0700 Subject: [PATCH 03/11] [Internal]: Replace Instance.termination_reason values with codes #3182 --- src/dstack/_internal/server/services/jobs/__init__.py | 2 ++ src/tests/_internal/core/models/test_runs.py | 1 + .../server/background/tasks/test_process_running_jobs.py | 2 +- src/tests/_internal/server/routers/test_fleets.py | 4 ++++ 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index ffea0c72e..e82300513 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -749,6 +749,8 @@ def _get_job_status_message(job_model: JobModel) -> str: return "stopped" elif job_model.termination_reason == JobTerminationReason.ABORTED_BY_USER: return "aborted" + elif job_model.termination_reason == JobTerminationReason.NO_BALANCE: + return "no balance" return job_model.status.value diff --git a/src/tests/_internal/core/models/test_runs.py b/src/tests/_internal/core/models/test_runs.py index 23b27c018..5e847beb4 100644 --- a/src/tests/_internal/core/models/test_runs.py +++ b/src/tests/_internal/core/models/test_runs.py @@ -43,6 +43,7 @@ def test_get_error_returns_expected_messages(): JobTerminationReason.ABORTED_BY_USER, JobTerminationReason.TERMINATED_BY_SERVER, JobTerminationReason.CONTAINER_EXITED_WITH_ERROR, + JobTerminationReason.NO_BALANCE, ] for reason in JobTerminationReason: diff --git a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py index e3cc011be..d13959b48 100644 --- a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py @@ -528,7 +528,7 @@ async def test_pulling_shim_failed(self, test_db, session: AsyncSession): assert SSHTunnelMock.call_count == 3 await session.refresh(job) assert job.status == JobStatus.TERMINATING - assert job.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + assert job.termination_reason == JobTerminationReason.INSTANCE_UNREACHABLE assert job.remove_at is None @pytest.mark.asyncio diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 934f333b6..7b34b20a6 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -402,6 +402,7 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "unreachable": False, "health_status": "healthy", "termination_reason": None, + "termination_reason_message": None, "created": "2023-01-02T03:04:00+00:00", "backend": None, "region": None, @@ -537,6 +538,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "unreachable": False, "health_status": "healthy", "termination_reason": None, + "termination_reason_message": None, "created": "2023-01-02T03:04:00+00:00", "region": "remote", "availability_zone": None, @@ -708,6 +710,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "unreachable": False, "health_status": "healthy", "termination_reason": None, + "termination_reason_message": None, "created": "2023-01-02T03:04:00+00:00", "region": "remote", "availability_zone": None, @@ -741,6 +744,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "unreachable": False, "health_status": "healthy", "termination_reason": None, + "termination_reason_message": None, "created": "2023-01-02T03:04:00+00:00", "region": "remote", "availability_zone": None, From 09aa1a498e3e5ed8713db7e515f24ffe9d56b2a9 Mon Sep 17 00:00:00 2001 From: peterschmidt85 Date: Tue, 14 Oct 2025 07:59:22 -0700 Subject: [PATCH 04/11] [Internal]: Replace Instance.termination_reason values with codes #3182 Review --- .../background/tasks/process_running_jobs.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 0d2a183de..080f8ffd2 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -59,6 +59,7 @@ from dstack._internal.server.services.jobs import ( find_job, get_job_attached_volumes, + get_job_provisioning_data, get_job_runtime_data, job_model_to_job_submission, ) @@ -373,7 +374,6 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): job_model.status = JobStatus.TERMINATING # job will be terminated and instance will be emptied by process_terminating_jobs else: - # job_model.instance.termination_reason # No job_model.termination_reason set means ssh connection failed if job_model.disconnected_at is None: job_model.disconnected_at = common_utils.get_current_datetime() @@ -383,16 +383,22 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): fmt(job_model), job_submission.age, ) - # TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE in 0.20 or - # when CLI <= 0.19.8 is no longer supported if ( job_model.instance is not None and job_model.instance.termination_reason == InstanceTerminationReason.NO_BALANCE.value ): + # if instance was terminated due to no balance, set job termination reason accodingly job_model.termination_reason = JobTerminationReason.NO_BALANCE else: - job_model.termination_reason = JobTerminationReason.INSTANCE_UNREACHABLE + job_provisioning_data = get_job_provisioning_data(job_model) + # use JobTerminationReason.INSTANCE_UNREACHABLE for on-demand instances only + job_model.termination_reason = ( + JobTerminationReason.INSTANCE_UNREACHABLE + if job_provisioning_data + and not job_provisioning_data.instance_type.resources.spot + else JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + ) job_model.status = JobStatus.TERMINATING else: logger.warning( From 9b3ff0236fa107e6026bc22986756a4f745b1791 Mon Sep 17 00:00:00 2001 From: peterschmidt85 Date: Tue, 14 Oct 2025 08:01:59 -0700 Subject: [PATCH 05/11] [Internal]: Replace Instance.termination_reason values with codes #3182 Review --- src/dstack/_internal/server/models.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index f11d8687d..b6537b65b 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -614,6 +614,7 @@ class InstanceModel(BaseModel): # instance termination handling termination_deadline: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) + # TODO: Migrate to EnumAsString(InstanceTerminationReason, 100) after enough releases to ensure backward compatibility termination_reason: Mapped[Optional[str]] = mapped_column(String(4000)) termination_reason_message: Mapped[Optional[str]] = mapped_column(String(4000)) # Deprecated since 0.19.22, not used From 84ca90338e8d42ead6411d9155e2265d82b7fa62 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Thu, 18 Dec 2025 20:42:46 +0100 Subject: [PATCH 06/11] Re-generate migration --- ...c693_add_instances_termination_reason_message.py} | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) rename src/dstack/_internal/server/migrations/versions/{a16a05249504_instance_termination_reason_message.py => c3de3e67c693_add_instances_termination_reason_message.py} (79%) diff --git a/src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py b/src/dstack/_internal/server/migrations/versions/c3de3e67c693_add_instances_termination_reason_message.py similarity index 79% rename from src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py rename to src/dstack/_internal/server/migrations/versions/c3de3e67c693_add_instances_termination_reason_message.py index 7b5cfae47..a4bc2db10 100644 --- a/src/dstack/_internal/server/migrations/versions/a16a05249504_instance_termination_reason_message.py +++ b/src/dstack/_internal/server/migrations/versions/c3de3e67c693_add_instances_termination_reason_message.py @@ -1,8 +1,8 @@ -"""instance.termination_reason_message +"""Add instances.termination_reason_message -Revision ID: a16a05249504 -Revises: 2498ab323443 -Create Date: 2025-10-13 15:29:56.691164 +Revision ID: c3de3e67c693 +Revises: 22d74df9897e +Create Date: 2025-12-18 20:41:20.376056 """ @@ -10,8 +10,8 @@ from alembic import op # revision identifiers, used by Alembic. -revision = "a16a05249504" -down_revision = "2498ab323443" +revision = "c3de3e67c693" +down_revision = "22d74df9897e" branch_labels = None depends_on = None From b8fe5bed7e017157a8f06eab9f93d9467eec33d3 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Thu, 18 Dec 2025 21:45:15 +0100 Subject: [PATCH 07/11] Use `InstanceTerminationReason` instead of strings Use the `InstanceTerminationReason` enum as the type of `InstanceModel.termination_reason`. To handle old instances, automatically convert their legacy termination reason strings to relevant `InstanceTerminationReason` enum members on reads. --- src/dstack/_internal/core/models/instances.py | 61 ++++++++++++++++++- .../server/background/tasks/process_fleets.py | 5 +- .../background/tasks/process_instances.py | 32 +++++----- .../background/tasks/process_running_jobs.py | 2 +- src/dstack/_internal/server/models.py | 43 ++++++++++--- .../tasks/test_process_instances.py | 27 ++++---- .../server/routers/test_instances.py | 22 +++++++ 7 files changed, 150 insertions(+), 42 deletions(-) diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index d159cf694..94ccab235 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -15,6 +15,9 @@ from dstack._internal.core.models.health import HealthStatus from dstack._internal.core.models.volumes import Volume from dstack._internal.utils.common import pretty_resources +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) class Gpu(CoreModel): @@ -256,14 +259,68 @@ def finished_statuses(cls) -> List["InstanceStatus"]: class InstanceTerminationReason(str, Enum): IDLE_TIMEOUT = "idle_timeout" - PROOVISIONING_TIMEOUT = "provisioning_timeout" + PROVISIONING_TIMEOUT = "provisioning_timeout" ERROR = "error" JOB_FINISHED = "job_finished" TERMINATION_TIMEOUT = "termination_timeout" STARTING_TIMEOUT = "starting_timeout" NO_OFFERS = "no_offers" MASTER_FAILED = "master_failed" - NO_BALANCE = "no_balance" + MAX_INSTANCES_LIMIT = "max_instances_limit" + NO_BALANCE = "no_balance" # used in dstack Sky + + @classmethod + def from_legacy_str(cls, v: str) -> "InstanceTerminationReason": + """ + Convert legacy termination reason string to relevant termination reason enum. + + dstack versions prior to 0.20.1 represented instance termination reasons as raw + strings. Such strings may still be stored in the database. + """ + + if v == "Idle timeout": + return cls.IDLE_TIMEOUT + if v in ( + "Provisioning timeout expired", + "Proivisioning timeout expired", # typo is intentional + "The proivisioning timeout expired", # typo is intentional + ): + return cls.PROVISIONING_TIMEOUT + if v in ( + "Unsupported private SSH key type", + "Failed to locate internal IP address on the given network", + "Specified internal IP not found among instance interfaces", + "Cannot split into blocks", + "Backend not available", + "Error while waiting for instance to become running", + "Empty profile, requirements or instance_configuration", + "Unable to locate the internal ip-address for the given network", + "Private SSH key is encrypted, password required", + "Cannot parse private key, key type is not supported", + ) or v.startswith("Error to parse profile, requirements or instance_configuration:"): + return cls.ERROR + if v in ( + "All offers failed", + "No offers found", + "There were no offers found", + "Retry duration expired", + "The retry's duration expired", + ): + return cls.NO_OFFERS + if v == "Master instance failed to start": + return cls.MASTER_FAILED + if v == "Instance job finished": + return cls.JOB_FINISHED + if v == "Termination deadline": + return cls.TERMINATION_TIMEOUT + if v == "Instance has not become running in time": + return cls.STARTING_TIMEOUT + if v == "Fleet has too many instances": + return cls.MAX_INSTANCES_LIMIT + if v == "Low account balance": + return cls.NO_BALANCE + logger.warning("Unexpected instance termination reason string: %r", v) + return cls.ERROR class Instance(CoreModel): diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index ffa83e10d..733029abf 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import joinedload, load_only, selectinload from dstack._internal.core.models.fleets import FleetSpec, FleetStatus -from dstack._internal.core.models.instances import InstanceStatus +from dstack._internal.core.models.instances import InstanceStatus, InstanceTerminationReason from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( FleetModel, @@ -213,7 +213,8 @@ def _maintain_fleet_nodes_in_min_max_range( break if instance.status in [InstanceStatus.IDLE]: instance.status = InstanceStatus.TERMINATING - instance.termination_reason = "Fleet has too many instances" + instance.termination_reason = InstanceTerminationReason.MAX_INSTANCES_LIMIT + instance.termination_reason_message = "Fleet has too many instances" nodes_redundant -= 1 logger.info( "Terminating instance %s: %s", diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index afbdead1b..f770020c4 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -275,7 +275,7 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel delta = datetime.timedelta(seconds=idle_seconds) if idle_duration > delta: instance.status = InstanceStatus.TERMINATING - instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT.value + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT logger.info( "Instance %s idle duration expired: idle time %ss. Terminating", instance.name, @@ -311,7 +311,7 @@ async def _add_remote(instance: InstanceModel) -> None: retry_duration_deadline = instance.created_at + timedelta(seconds=PROVISIONING_TIMEOUT_SECONDS) if retry_duration_deadline < get_current_datetime(): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = InstanceTerminationReason.PROOVISIONING_TIMEOUT.value + instance.termination_reason = InstanceTerminationReason.PROVISIONING_TIMEOUT logger.warning( "Failed to start instance %s in %d seconds. Terminating...", instance.name, @@ -334,7 +334,7 @@ async def _add_remote(instance: InstanceModel) -> None: ssh_proxy_pkeys = None except (ValueError, PasswordRequiredException): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason = InstanceTerminationReason.ERROR instance.termination_reason_message = "Unsupported private SSH key type" logger.warning( "Failed to add instance %s: unsupported private SSH key type", @@ -393,7 +393,7 @@ async def _add_remote(instance: InstanceModel) -> None: ) if instance_network is not None and internal_ip is None: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason = InstanceTerminationReason.ERROR instance.termination_reason_message = ( "Failed to locate internal IP address on the given network" ) @@ -409,7 +409,7 @@ async def _add_remote(instance: InstanceModel) -> None: if internal_ip is not None: if not is_ip_among_addresses(ip_address=internal_ip, addresses=host_network_addresses): instance.status = InstanceStatus.TERMINATED - instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason = InstanceTerminationReason.ERROR instance.termination_reason_message = ( "Specified internal IP not found among instance interfaces" ) @@ -432,7 +432,7 @@ async def _add_remote(instance: InstanceModel) -> None: instance.total_blocks = blocks else: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason = InstanceTerminationReason.ERROR instance.termination_reason_message = "Cannot split into blocks" logger.warning( "Failed to add instance %s: cannot split into blocks", @@ -552,7 +552,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No requirements = get_instance_requirements(instance) except ValidationError as e: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason = InstanceTerminationReason.ERROR instance.termination_reason_message = ( f"Error to parse profile, requirements or instance_configuration: {e}" ) @@ -679,17 +679,19 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No ) return - _mark_terminated(instance, InstanceTerminationReason.NO_OFFERS.value) + _mark_terminated(instance, InstanceTerminationReason.NO_OFFERS) if instance.fleet and is_fleet_master_instance(instance) and is_cloud_cluster(instance.fleet): # Do not attempt to deploy other instances, as they won't determine the correct cluster # backend, region, and placement group without a successfully deployed master instance for sibling_instance in instance.fleet.instances: if sibling_instance.id == instance.id: continue - _mark_terminated(sibling_instance, InstanceTerminationReason.MASTER_FAILED.value) + _mark_terminated(sibling_instance, InstanceTerminationReason.MASTER_FAILED) -def _mark_terminated(instance: InstanceModel, termination_reason: str) -> None: +def _mark_terminated( + instance: InstanceModel, termination_reason: InstanceTerminationReason +) -> None: instance.status = InstanceStatus.TERMINATED instance.termination_reason = termination_reason logger.info( @@ -711,7 +713,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non ): # A busy instance could have no active jobs due to this bug: https://github.com/dstackai/dstack/issues/2068 instance.status = InstanceStatus.TERMINATING - instance.termination_reason = InstanceTerminationReason.JOB_FINISHED.value + instance.termination_reason = InstanceTerminationReason.JOB_FINISHED logger.info( "Detected busy instance %s with finished job. Marked as TERMINATING", instance.name, @@ -840,7 +842,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non deadline = instance.termination_deadline if get_current_datetime() > deadline: instance.status = InstanceStatus.TERMINATING - instance.termination_reason = InstanceTerminationReason.TERMINATION_TIMEOUT.value + instance.termination_reason = InstanceTerminationReason.TERMINATION_TIMEOUT logger.warning( "Instance %s shim waiting timeout. Marked as TERMINATING", instance.name, @@ -869,7 +871,7 @@ async def _wait_for_instance_provisioning_data( "Instance %s failed because instance has not become running in time", instance.name ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = InstanceTerminationReason.STARTING_TIMEOUT.value + instance.termination_reason = InstanceTerminationReason.STARTING_TIMEOUT return backend = await backends_services.get_project_backend_by_type( @@ -882,7 +884,7 @@ async def _wait_for_instance_provisioning_data( instance.name, ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason = InstanceTerminationReason.ERROR instance.termination_reason_message = "Backend not available" return try: @@ -900,7 +902,7 @@ async def _wait_for_instance_provisioning_data( repr(e), ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = InstanceTerminationReason.ERROR.value + instance.termination_reason = InstanceTerminationReason.ERROR instance.termination_reason_message = "Error while waiting for instance to become running" except Exception: logger.exception( diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index b8dee25f5..154a2b097 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -393,7 +393,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): if ( job_model.instance is not None and job_model.instance.termination_reason - == InstanceTerminationReason.NO_BALANCE.value + == InstanceTerminationReason.NO_BALANCE ): # if instance was terminated due to no balance, set job termination reason accodingly job_model.termination_reason = JobTerminationReason.NO_BALANCE diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index cecd8e402..89a588e1d 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -1,7 +1,7 @@ import enum import uuid from datetime import datetime, timezone -from typing import Callable, List, Optional, Union +from typing import Callable, Generic, List, Optional, TypeVar, Union from sqlalchemy import ( BigInteger, @@ -30,7 +30,7 @@ from dstack._internal.core.models.fleets import FleetStatus from dstack._internal.core.models.gateways import GatewayStatus from dstack._internal.core.models.health import HealthStatus -from dstack._internal.core.models.instances import InstanceStatus +from dstack._internal.core.models.instances import InstanceStatus, InstanceTerminationReason from dstack._internal.core.models.profiles import ( DEFAULT_FLEET_TERMINATION_IDLE_TIME, TerminationPolicy, @@ -141,7 +141,10 @@ def process_result_value(self, value: Optional[str], dialect) -> Optional[Decryp return DecryptedString(plaintext=None, decrypted=False, exc=e) -class EnumAsString(TypeDecorator): +E = TypeVar("E", bound=enum.Enum) + + +class EnumAsString(TypeDecorator, Generic[E]): """ A custom type decorator that stores enums as strings in the DB. """ @@ -149,18 +152,34 @@ class EnumAsString(TypeDecorator): impl = String cache_ok = True - def __init__(self, enum_class: type[enum.Enum], *args, **kwargs): + def __init__( + self, + enum_class: type[E], + *args, + fallback_deserializer: Optional[Callable[[str], E]] = None, + **kwargs, + ): + """ + Args: + enum_class: The enum class to be stored. + fallback_deserializer: An optional function used when the string + from the DB does not match any enum member name. If not + provided, an exception will be raised in such cases. + """ self.enum_class = enum_class + self.fallback_deserializer = fallback_deserializer super().__init__(*args, **kwargs) - def process_bind_param(self, value: Optional[enum.Enum], dialect) -> Optional[str]: + def process_bind_param(self, value: Optional[E], dialect) -> Optional[str]: if value is None: return None return value.name - def process_result_value(self, value: Optional[str], dialect) -> Optional[enum.Enum]: + def process_result_value(self, value: Optional[str], dialect) -> Optional[E]: if value is None: return None + if value not in self.enum_class.__members__ and self.fallback_deserializer is not None: + return self.fallback_deserializer(value) return self.enum_class[value] @@ -641,8 +660,16 @@ class InstanceModel(BaseModel): # instance termination handling termination_deadline: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) - # TODO: Migrate to EnumAsString(InstanceTerminationReason, 100) after enough releases to ensure backward compatibility - termination_reason: Mapped[Optional[str]] = mapped_column(String(4000)) + # dstack versions prior to 0.20.1 represented instance termination reasons as raw strings. + # Such strings may still be stored in the database, so we are using a wide column (4000 chars) + # and a fallback deserializer to convert them to relevant enum members. + termination_reason: Mapped[Optional[InstanceTerminationReason]] = mapped_column( + EnumAsString( + InstanceTerminationReason, + 4000, + fallback_deserializer=InstanceTerminationReason.from_legacy_str, + ) + ) termination_reason_message: Mapped[Optional[str]] = mapped_column(String(4000)) # Deprecated since 0.19.22, not used health_status: Mapped[Optional[str]] = mapped_column(String(4000), deferred=True) diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index e8b04ea1a..e98152afe 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -263,7 +263,7 @@ async def test_check_shim_terminate_instance_by_deadline(self, test_db, session: assert instance is not None assert instance.status == InstanceStatus.TERMINATING assert instance.termination_deadline == termination_deadline_time - assert instance.termination_reason == InstanceTerminationReason.TERMINATION_TIMEOUT.value + assert instance.termination_reason == InstanceTerminationReason.TERMINATION_TIMEOUT @pytest.mark.asyncio @pytest.mark.parametrize( @@ -530,7 +530,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession): await session.refresh(instance) assert instance is not None assert instance.status == InstanceStatus.TERMINATING - assert instance.termination_reason == InstanceTerminationReason.IDLE_TIMEOUT.value + assert instance.termination_reason == InstanceTerminationReason.IDLE_TIMEOUT class TestSSHInstanceTerminateProvisionTimeoutExpired: @@ -551,7 +551,7 @@ async def test_terminate_by_idle_timeout(self, test_db, session: AsyncSession): await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == InstanceTerminationReason.PROOVISIONING_TIMEOUT.value + assert instance.termination_reason == InstanceTerminationReason.PROVISIONING_TIMEOUT class TestTerminate: @@ -576,8 +576,7 @@ async def test_terminate(self, test_db, session: AsyncSession): instance = await create_instance( session=session, project=project, status=InstanceStatus.TERMINATING ) - reason = "some reason" - instance.termination_reason = reason + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT instance.last_job_processed_at = get_current_datetime() + dt.timedelta(minutes=-19) await session.commit() @@ -589,7 +588,7 @@ async def test_terminate(self, test_db, session: AsyncSession): assert instance is not None assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == "some reason" + assert instance.termination_reason == InstanceTerminationReason.IDLE_TIMEOUT assert instance.deleted == True assert instance.deleted_at is not None assert instance.finished_at is not None @@ -604,7 +603,7 @@ async def test_terminate_retry(self, test_db, session: AsyncSession, error: Exce instance = await create_instance( session=session, project=project, status=InstanceStatus.TERMINATING ) - instance.termination_reason = "some reason" + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT initial_time = dt.datetime(2025, 1, 1, tzinfo=dt.timezone.utc) instance.last_job_processed_at = initial_time await session.commit() @@ -636,7 +635,7 @@ async def test_terminate_not_retries_if_too_early(self, test_db, session: AsyncS instance = await create_instance( session=session, project=project, status=InstanceStatus.TERMINATING ) - instance.termination_reason = "some reason" + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT initial_time = dt.datetime(2025, 1, 1, tzinfo=dt.timezone.utc) instance.last_job_processed_at = initial_time await session.commit() @@ -668,7 +667,7 @@ async def test_terminate_on_termination_deadline(self, test_db, session: AsyncSe instance = await create_instance( session=session, project=project, status=InstanceStatus.TERMINATING ) - instance.termination_reason = "some reason" + instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT initial_time = dt.datetime(2025, 1, 1, tzinfo=dt.timezone.utc) instance.last_job_processed_at = initial_time await session.commit() @@ -820,7 +819,7 @@ async def test_fails_if_all_offers_fail(self, session: AsyncSession, err: Except await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS.value + assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS async def test_fails_if_no_offers(self, session: AsyncSession): project = await create_project(session=session) @@ -833,7 +832,7 @@ async def test_fails_if_no_offers(self, session: AsyncSession): await session.refresh(instance) assert instance.status == InstanceStatus.TERMINATED - assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS.value + assert instance.termination_reason == InstanceTerminationReason.NO_OFFERS @pytest.mark.parametrize( ("placement", "expected_termination_reasons"), @@ -841,14 +840,14 @@ async def test_fails_if_no_offers(self, session: AsyncSession): pytest.param( InstanceGroupPlacement.CLUSTER, { - InstanceTerminationReason.NO_OFFERS.value: 1, - InstanceTerminationReason.MASTER_FAILED.value: 3, + InstanceTerminationReason.NO_OFFERS: 1, + InstanceTerminationReason.MASTER_FAILED: 3, }, id="cluster", ), pytest.param( None, - {InstanceTerminationReason.NO_OFFERS.value: 4}, + {InstanceTerminationReason.NO_OFFERS: 4}, id="non-cluster", ), ], diff --git a/src/tests/_internal/server/routers/test_instances.py b/src/tests/_internal/server/routers/test_instances.py index f4fe924e4..494721035 100644 --- a/src/tests/_internal/server/routers/test_instances.py +++ b/src/tests/_internal/server/routers/test_instances.py @@ -6,6 +6,7 @@ import pytest import pytest_asyncio from httpx import AsyncClient +from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from dstack._internal.core.models.instances import InstanceStatus @@ -372,3 +373,24 @@ async def test_returns_health_checks(self, session: AsyncSession, client: AsyncC {"collected_at": "2025-01-01T12:00:00+00:00", "status": "healthy", "events": []}, ] } + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +@pytest.mark.usefixtures("test_db") +class TestCompatibility: + async def test_converts_legacy_termination_reason_string( + self, session: AsyncSession, client: AsyncClient + ) -> None: + user = await create_user(session) + project = await create_project(session, owner=user) + fleet = await create_fleet(session, project) + await create_instance(session=session, project=project, fleet=fleet) + await session.execute( + text("UPDATE instances SET termination_reason = 'Fleet has too many instances'") + ) + resp = await client.post( + "/api/instances/list", headers=get_auth_headers(user.token), json={} + ) + # Must convert legacy "Fleet has too many instances" to "max_instances_limit" + assert resp.json()[0]["termination_reason"] == "max_instances_limit" From 1b20ae68501cc9eb7873231c8a6899fac3979f42 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Thu, 18 Dec 2025 23:15:31 +0100 Subject: [PATCH 08/11] Reconsider termination reason naming - Drop `starting_timeout`, use `provisioning_timeout` instead. These two termination reasons had similar semantics. The difference was that `starting_timeout` was used for cloud instances and `provisioning_timeout` for SSH. - Rename `termination_deadline` -> `unreachable`. The old name was based on implementation details rather than semantics, so it was not very informative to the user. - For `no_offers`, add a termination reason message to highlight the difference between failing to find offers and failing to provision offers. --- src/dstack/_internal/core/models/instances.py | 8 +++----- .../server/background/tasks/process_instances.py | 16 ++++++++++++---- .../background/tasks/test_process_instances.py | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index 94ccab235..ed4d16c4b 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -262,8 +262,7 @@ class InstanceTerminationReason(str, Enum): PROVISIONING_TIMEOUT = "provisioning_timeout" ERROR = "error" JOB_FINISHED = "job_finished" - TERMINATION_TIMEOUT = "termination_timeout" - STARTING_TIMEOUT = "starting_timeout" + UNREACHABLE = "unreachable" NO_OFFERS = "no_offers" MASTER_FAILED = "master_failed" MAX_INSTANCES_LIMIT = "max_instances_limit" @@ -281,6 +280,7 @@ def from_legacy_str(cls, v: str) -> "InstanceTerminationReason": if v == "Idle timeout": return cls.IDLE_TIMEOUT if v in ( + "Instance has not become running in time", "Provisioning timeout expired", "Proivisioning timeout expired", # typo is intentional "The proivisioning timeout expired", # typo is intentional @@ -312,9 +312,7 @@ def from_legacy_str(cls, v: str) -> "InstanceTerminationReason": if v == "Instance job finished": return cls.JOB_FINISHED if v == "Termination deadline": - return cls.TERMINATION_TIMEOUT - if v == "Instance has not become running in time": - return cls.STARTING_TIMEOUT + return cls.UNREACHABLE if v == "Fleet has too many instances": return cls.MAX_INSTANCES_LIMIT if v == "Low account balance": diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index f770020c4..4b45e68b1 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -679,7 +679,11 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No ) return - _mark_terminated(instance, InstanceTerminationReason.NO_OFFERS) + _mark_terminated( + instance, + InstanceTerminationReason.NO_OFFERS, + "All offers failed" if offers else "No offers found", + ) if instance.fleet and is_fleet_master_instance(instance) and is_cloud_cluster(instance.fleet): # Do not attempt to deploy other instances, as they won't determine the correct cluster # backend, region, and placement group without a successfully deployed master instance @@ -690,10 +694,13 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No def _mark_terminated( - instance: InstanceModel, termination_reason: InstanceTerminationReason + instance: InstanceModel, + termination_reason: InstanceTerminationReason, + termination_reason_message: Optional[str] = None, ) -> None: instance.status = InstanceStatus.TERMINATED instance.termination_reason = termination_reason + instance.termination_reason_message = termination_reason_message logger.info( "Terminated instance %s: %s", instance.name, @@ -842,7 +849,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non deadline = instance.termination_deadline if get_current_datetime() > deadline: instance.status = InstanceStatus.TERMINATING - instance.termination_reason = InstanceTerminationReason.TERMINATION_TIMEOUT + instance.termination_reason = InstanceTerminationReason.UNREACHABLE logger.warning( "Instance %s shim waiting timeout. Marked as TERMINATING", instance.name, @@ -871,7 +878,8 @@ async def _wait_for_instance_provisioning_data( "Instance %s failed because instance has not become running in time", instance.name ) instance.status = InstanceStatus.TERMINATING - instance.termination_reason = InstanceTerminationReason.STARTING_TIMEOUT + instance.termination_reason = InstanceTerminationReason.PROVISIONING_TIMEOUT + instance.termination_reason_message = "Backend did not complete provisioning in time" return backend = await backends_services.get_project_backend_by_type( diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index e98152afe..bed206e92 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -263,7 +263,7 @@ async def test_check_shim_terminate_instance_by_deadline(self, test_db, session: assert instance is not None assert instance.status == InstanceStatus.TERMINATING assert instance.termination_deadline == termination_deadline_time - assert instance.termination_reason == InstanceTerminationReason.TERMINATION_TIMEOUT + assert instance.termination_reason == InstanceTerminationReason.UNREACHABLE @pytest.mark.asyncio @pytest.mark.parametrize( From 5f41f2349fef7635c676012294fbf23720b94583 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Thu, 18 Dec 2025 23:44:55 +0100 Subject: [PATCH 09/11] Revert job-related changes To avoid extra testing and changes, limit the scope of the PR to instance termination reasons, as per the PR name. The introduction of `JobTerminationReason.NO_BALANCE` can be done in a separate PR. Its implementation may need to be updated to also use `JobTerminationReason.NO_BALANCE` on jobs with statuses other than `running`. --- src/dstack/_internal/core/models/runs.py | 2 -- .../background/tasks/process_running_jobs.py | 21 +++---------------- .../server/services/jobs/__init__.py | 2 -- src/tests/_internal/core/models/test_runs.py | 1 - .../tasks/test_process_running_jobs.py | 2 +- 5 files changed, 4 insertions(+), 24 deletions(-) diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 27177ac21..a966bc34a 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -139,7 +139,6 @@ class JobTerminationReason(str, Enum): TERMINATED_BY_SERVER = "terminated_by_server" INACTIVITY_DURATION_EXCEEDED = "inactivity_duration_exceeded" TERMINATED_DUE_TO_UTILIZATION_POLICY = "terminated_due_to_utilization_policy" - NO_BALANCE = "no_balance" # Set by the runner CONTAINER_EXITED_WITH_ERROR = "container_exited_with_error" PORTS_BINDING_FAILED = "ports_binding_failed" @@ -163,7 +162,6 @@ def to_status(self) -> JobStatus: self.TERMINATED_BY_SERVER: JobStatus.TERMINATED, self.INACTIVITY_DURATION_EXCEEDED: JobStatus.TERMINATED, self.TERMINATED_DUE_TO_UTILIZATION_POLICY: JobStatus.TERMINATED, - self.NO_BALANCE: JobStatus.TERMINATED, self.CONTAINER_EXITED_WITH_ERROR: JobStatus.FAILED, self.PORTS_BINDING_FAILED: JobStatus.FAILED, self.CREATING_CONTAINER_ERROR: JobStatus.FAILED, diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 154a2b097..341b47a38 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -18,7 +18,6 @@ from dstack._internal.core.models.files import FileArchiveMapping from dstack._internal.core.models.instances import ( InstanceStatus, - InstanceTerminationReason, RemoteConnectionInfo, SSHConnectionParams, ) @@ -59,7 +58,6 @@ from dstack._internal.server.services.jobs import ( find_job, get_job_attached_volumes, - get_job_provisioning_data, get_job_runtime_data, is_master_job, job_model_to_job_submission, @@ -390,22 +388,9 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): if job_model.disconnected_at is None: job_model.disconnected_at = common_utils.get_current_datetime() if _should_terminate_job_due_to_disconnect(job_model): - if ( - job_model.instance is not None - and job_model.instance.termination_reason - == InstanceTerminationReason.NO_BALANCE - ): - # if instance was terminated due to no balance, set job termination reason accodingly - job_model.termination_reason = JobTerminationReason.NO_BALANCE - else: - job_provisioning_data = get_job_provisioning_data(job_model) - # use JobTerminationReason.INSTANCE_UNREACHABLE for on-demand instances only - job_model.termination_reason = ( - JobTerminationReason.INSTANCE_UNREACHABLE - if job_provisioning_data - and not job_provisioning_data.instance_type.resources.spot - else JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY - ) + # TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE for on-demand. + job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + job_model.termination_reason_message = "Instance is unreachable" switch_job_status(session, job_model, JobStatus.TERMINATING) else: logger.warning( diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index b927f27c5..1ed3c5f99 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -814,8 +814,6 @@ def _get_job_status_message(job_model: JobModel) -> str: return "stopped" elif job_model.termination_reason == JobTerminationReason.ABORTED_BY_USER: return "aborted" - elif job_model.termination_reason == JobTerminationReason.NO_BALANCE: - return "no balance" return job_model.status.value diff --git a/src/tests/_internal/core/models/test_runs.py b/src/tests/_internal/core/models/test_runs.py index 5e847beb4..23b27c018 100644 --- a/src/tests/_internal/core/models/test_runs.py +++ b/src/tests/_internal/core/models/test_runs.py @@ -43,7 +43,6 @@ def test_get_error_returns_expected_messages(): JobTerminationReason.ABORTED_BY_USER, JobTerminationReason.TERMINATED_BY_SERVER, JobTerminationReason.CONTAINER_EXITED_WITH_ERROR, - JobTerminationReason.NO_BALANCE, ] for reason in JobTerminationReason: diff --git a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py index d13959b48..e3cc011be 100644 --- a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py @@ -528,7 +528,7 @@ async def test_pulling_shim_failed(self, test_db, session: AsyncSession): assert SSHTunnelMock.call_count == 3 await session.refresh(job) assert job.status == JobStatus.TERMINATING - assert job.termination_reason == JobTerminationReason.INSTANCE_UNREACHABLE + assert job.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY assert job.remove_at is None @pytest.mark.asyncio From 10779b3aff7952ad5b02ed76f8187359a0f07f50 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Fri, 19 Dec 2025 00:02:57 +0100 Subject: [PATCH 10/11] Add comment on `Instance.termination_reason` --- src/dstack/_internal/core/models/instances.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index ed4d16c4b..2bc0c1f89 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -335,6 +335,8 @@ class Instance(CoreModel): status: InstanceStatus unreachable: bool = False health_status: HealthStatus = HealthStatus.HEALTHY + # termination_reason stores InstanceTerminationReason. + # str allows adding new enum members without breaking compatibility with old clients. termination_reason: Optional[str] = None termination_reason_message: Optional[str] = None created: datetime.datetime From f438c542c5be9cd31083e4b4872854359b1950ea Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Fri, 19 Dec 2025 00:26:08 +0100 Subject: [PATCH 11/11] Fix unit test on Postgres --- src/tests/_internal/server/routers/test_instances.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/tests/_internal/server/routers/test_instances.py b/src/tests/_internal/server/routers/test_instances.py index 494721035..8aee09e6d 100644 --- a/src/tests/_internal/server/routers/test_instances.py +++ b/src/tests/_internal/server/routers/test_instances.py @@ -389,6 +389,7 @@ async def test_converts_legacy_termination_reason_string( await session.execute( text("UPDATE instances SET termination_reason = 'Fleet has too many instances'") ) + await session.commit() resp = await client.post( "/api/instances/list", headers=get_auth_headers(user.token), json={} )