Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/dstack/_internal/core/models/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from dstack._internal.core.models.health import HealthStatus
from dstack._internal.core.models.volumes import Volume
from dstack._internal.utils.common import pretty_resources
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)


class Gpu(CoreModel):
Expand Down Expand Up @@ -254,6 +257,70 @@ def finished_statuses(cls) -> List["InstanceStatus"]:
return [cls.TERMINATING, cls.TERMINATED]


class InstanceTerminationReason(str, Enum):
IDLE_TIMEOUT = "idle_timeout"
PROVISIONING_TIMEOUT = "provisioning_timeout"
ERROR = "error"
JOB_FINISHED = "job_finished"
UNREACHABLE = "unreachable"
NO_OFFERS = "no_offers"
MASTER_FAILED = "master_failed"
MAX_INSTANCES_LIMIT = "max_instances_limit"
NO_BALANCE = "no_balance" # used in dstack Sky

@classmethod
def from_legacy_str(cls, v: str) -> "InstanceTerminationReason":
"""
Convert legacy termination reason string to relevant termination reason enum.

dstack versions prior to 0.20.1 represented instance termination reasons as raw
strings. Such strings may still be stored in the database.
"""

if v == "Idle timeout":
return cls.IDLE_TIMEOUT
if v in (
"Instance has not become running in time",
"Provisioning timeout expired",
"Proivisioning timeout expired", # typo is intentional
"The proivisioning timeout expired", # typo is intentional
):
return cls.PROVISIONING_TIMEOUT
if v in (
"Unsupported private SSH key type",
"Failed to locate internal IP address on the given network",
"Specified internal IP not found among instance interfaces",
"Cannot split into blocks",
"Backend not available",
"Error while waiting for instance to become running",
"Empty profile, requirements or instance_configuration",
"Unable to locate the internal ip-address for the given network",
"Private SSH key is encrypted, password required",
"Cannot parse private key, key type is not supported",
) or v.startswith("Error to parse profile, requirements or instance_configuration:"):
return cls.ERROR
if v in (
"All offers failed",
"No offers found",
"There were no offers found",
"Retry duration expired",
"The retry's duration expired",
):
return cls.NO_OFFERS
if v == "Master instance failed to start":
return cls.MASTER_FAILED
if v == "Instance job finished":
return cls.JOB_FINISHED
if v == "Termination deadline":
return cls.UNREACHABLE
if v == "Fleet has too many instances":
return cls.MAX_INSTANCES_LIMIT
if v == "Low account balance":
return cls.NO_BALANCE
logger.warning("Unexpected instance termination reason string: %r", v)
return cls.ERROR


class Instance(CoreModel):
id: UUID
project_name: str
Expand All @@ -268,7 +335,10 @@ class Instance(CoreModel):
status: InstanceStatus
unreachable: bool = False
health_status: HealthStatus = HealthStatus.HEALTHY
# termination_reason stores InstanceTerminationReason.
# str allows adding new enum members without breaking compatibility with old clients.
termination_reason: Optional[str] = None
termination_reason_message: Optional[str] = None
created: datetime.datetime
region: Optional[str] = None
availability_zone: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from sqlalchemy.orm import joinedload, load_only, selectinload

from dstack._internal.core.models.fleets import FleetSpec, FleetStatus
from dstack._internal.core.models.instances import InstanceStatus
from dstack._internal.core.models.instances import InstanceStatus, InstanceTerminationReason
from dstack._internal.server.db import get_db, get_session_ctx
from dstack._internal.server.models import (
FleetModel,
Expand Down Expand Up @@ -213,7 +213,8 @@ def _maintain_fleet_nodes_in_min_max_range(
break
if instance.status in [InstanceStatus.IDLE]:
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Fleet has too many instances"
instance.termination_reason = InstanceTerminationReason.MAX_INSTANCES_LIMIT
instance.termination_reason_message = "Fleet has too many instances"
nodes_redundant -= 1
logger.info(
"Terminating instance %s: %s",
Expand Down
50 changes: 35 additions & 15 deletions src/dstack/_internal/server/background/tasks/process_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
InstanceOfferWithAvailability,
InstanceRuntime,
InstanceStatus,
InstanceTerminationReason,
RemoteConnectionInfo,
SSHKey,
)
Expand Down Expand Up @@ -274,7 +275,7 @@ def _check_and_mark_terminating_if_idle_duration_expired(instance: InstanceModel
delta = datetime.timedelta(seconds=idle_seconds)
if idle_duration > delta:
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Idle timeout"
instance.termination_reason = InstanceTerminationReason.IDLE_TIMEOUT
logger.info(
"Instance %s idle duration expired: idle time %ss. Terminating",
instance.name,
Expand Down Expand Up @@ -310,7 +311,7 @@ async def _add_remote(instance: InstanceModel) -> None:
retry_duration_deadline = instance.created_at + timedelta(seconds=PROVISIONING_TIMEOUT_SECONDS)
if retry_duration_deadline < get_current_datetime():
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Provisioning timeout expired"
instance.termination_reason = InstanceTerminationReason.PROVISIONING_TIMEOUT
logger.warning(
"Failed to start instance %s in %d seconds. Terminating...",
instance.name,
Expand All @@ -333,7 +334,8 @@ async def _add_remote(instance: InstanceModel) -> None:
ssh_proxy_pkeys = None
except (ValueError, PasswordRequiredException):
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Unsupported private SSH key type"
instance.termination_reason = InstanceTerminationReason.ERROR
instance.termination_reason_message = "Unsupported private SSH key type"
logger.warning(
"Failed to add instance %s: unsupported private SSH key type",
instance.name,
Expand Down Expand Up @@ -391,7 +393,10 @@ async def _add_remote(instance: InstanceModel) -> None:
)
if instance_network is not None and internal_ip is None:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Failed to locate internal IP address on the given network"
instance.termination_reason = InstanceTerminationReason.ERROR
instance.termination_reason_message = (
"Failed to locate internal IP address on the given network"
)
logger.warning(
"Failed to add instance %s: failed to locate internal IP address on the given network",
instance.name,
Expand All @@ -404,7 +409,8 @@ async def _add_remote(instance: InstanceModel) -> None:
if internal_ip is not None:
if not is_ip_among_addresses(ip_address=internal_ip, addresses=host_network_addresses):
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = (
instance.termination_reason = InstanceTerminationReason.ERROR
instance.termination_reason_message = (
"Specified internal IP not found among instance interfaces"
)
logger.warning(
Expand All @@ -426,7 +432,8 @@ async def _add_remote(instance: InstanceModel) -> None:
instance.total_blocks = blocks
else:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "Cannot split into blocks"
instance.termination_reason = InstanceTerminationReason.ERROR
instance.termination_reason_message = "Cannot split into blocks"
logger.warning(
"Failed to add instance %s: cannot split into blocks",
instance.name,
Expand Down Expand Up @@ -545,7 +552,8 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
requirements = get_instance_requirements(instance)
except ValidationError as e:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = (
instance.termination_reason = InstanceTerminationReason.ERROR
instance.termination_reason_message = (
f"Error to parse profile, requirements or instance_configuration: {e}"
)
logger.warning(
Expand Down Expand Up @@ -671,19 +679,28 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
)
return

_mark_terminated(instance, "All offers failed" if offers else "No offers found")
_mark_terminated(
instance,
InstanceTerminationReason.NO_OFFERS,
"All offers failed" if offers else "No offers found",
)
if instance.fleet and is_fleet_master_instance(instance) and is_cloud_cluster(instance.fleet):
# Do not attempt to deploy other instances, as they won't determine the correct cluster
# backend, region, and placement group without a successfully deployed master instance
for sibling_instance in instance.fleet.instances:
if sibling_instance.id == instance.id:
continue
_mark_terminated(sibling_instance, "Master instance failed to start")
_mark_terminated(sibling_instance, InstanceTerminationReason.MASTER_FAILED)


def _mark_terminated(instance: InstanceModel, termination_reason: str) -> None:
def _mark_terminated(
instance: InstanceModel,
termination_reason: InstanceTerminationReason,
termination_reason_message: Optional[str] = None,
) -> None:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = termination_reason
instance.termination_reason_message = termination_reason_message
logger.info(
"Terminated instance %s: %s",
instance.name,
Expand All @@ -703,7 +720,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non
):
# A busy instance could have no active jobs due to this bug: https://github.com/dstackai/dstack/issues/2068
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Instance job finished"
instance.termination_reason = InstanceTerminationReason.JOB_FINISHED
logger.info(
"Detected busy instance %s with finished job. Marked as TERMINATING",
instance.name,
Expand Down Expand Up @@ -832,7 +849,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non
deadline = instance.termination_deadline
if get_current_datetime() > deadline:
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Termination deadline"
instance.termination_reason = InstanceTerminationReason.UNREACHABLE
logger.warning(
"Instance %s shim waiting timeout. Marked as TERMINATING",
instance.name,
Expand Down Expand Up @@ -861,7 +878,8 @@ async def _wait_for_instance_provisioning_data(
"Instance %s failed because instance has not become running in time", instance.name
)
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Instance has not become running in time"
instance.termination_reason = InstanceTerminationReason.PROVISIONING_TIMEOUT
instance.termination_reason_message = "Backend did not complete provisioning in time"
return

backend = await backends_services.get_project_backend_by_type(
Expand All @@ -874,7 +892,8 @@ async def _wait_for_instance_provisioning_data(
instance.name,
)
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Backend not available"
instance.termination_reason = InstanceTerminationReason.ERROR
instance.termination_reason_message = "Backend not available"
return
try:
await run_async(
Expand All @@ -891,7 +910,8 @@ async def _wait_for_instance_provisioning_data(
repr(e),
)
instance.status = InstanceStatus.TERMINATING
instance.termination_reason = "Error while waiting for instance to become running"
instance.termination_reason = InstanceTerminationReason.ERROR
instance.termination_reason_message = "Error while waiting for instance to become running"
except Exception:
logger.exception(
"Got exception when updating instance %s provisioning data", instance.name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add instances.termination_reason_message

Revision ID: c3de3e67c693
Revises: 22d74df9897e
Create Date: 2025-12-18 20:41:20.376056

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "c3de3e67c693"
down_revision = "22d74df9897e"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("instances", schema=None) as batch_op:
batch_op.add_column(
sa.Column("termination_reason_message", sa.String(length=4000), nullable=True)
)

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("instances", schema=None) as batch_op:
batch_op.drop_column("termination_reason_message")

# ### end Alembic commands ###
43 changes: 36 additions & 7 deletions src/dstack/_internal/server/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import enum
import uuid
from datetime import datetime, timezone
from typing import Callable, List, Optional, Union
from typing import Callable, Generic, List, Optional, TypeVar, Union

from sqlalchemy import (
BigInteger,
Expand Down Expand Up @@ -30,7 +30,7 @@
from dstack._internal.core.models.fleets import FleetStatus
from dstack._internal.core.models.gateways import GatewayStatus
from dstack._internal.core.models.health import HealthStatus
from dstack._internal.core.models.instances import InstanceStatus
from dstack._internal.core.models.instances import InstanceStatus, InstanceTerminationReason
from dstack._internal.core.models.profiles import (
DEFAULT_FLEET_TERMINATION_IDLE_TIME,
TerminationPolicy,
Expand Down Expand Up @@ -141,26 +141,45 @@ def process_result_value(self, value: Optional[str], dialect) -> Optional[Decryp
return DecryptedString(plaintext=None, decrypted=False, exc=e)


class EnumAsString(TypeDecorator):
E = TypeVar("E", bound=enum.Enum)


class EnumAsString(TypeDecorator, Generic[E]):
"""
A custom type decorator that stores enums as strings in the DB.
"""

impl = String
cache_ok = True

def __init__(self, enum_class: type[enum.Enum], *args, **kwargs):
def __init__(
self,
enum_class: type[E],
*args,
fallback_deserializer: Optional[Callable[[str], E]] = None,
**kwargs,
):
"""
Args:
enum_class: The enum class to be stored.
fallback_deserializer: An optional function used when the string
from the DB does not match any enum member name. If not
provided, an exception will be raised in such cases.
"""
self.enum_class = enum_class
self.fallback_deserializer = fallback_deserializer
super().__init__(*args, **kwargs)

def process_bind_param(self, value: Optional[enum.Enum], dialect) -> Optional[str]:
def process_bind_param(self, value: Optional[E], dialect) -> Optional[str]:
if value is None:
return None
return value.name

def process_result_value(self, value: Optional[str], dialect) -> Optional[enum.Enum]:
def process_result_value(self, value: Optional[str], dialect) -> Optional[E]:
if value is None:
return None
if value not in self.enum_class.__members__ and self.fallback_deserializer is not None:
return self.fallback_deserializer(value)
return self.enum_class[value]


Expand Down Expand Up @@ -641,7 +660,17 @@ class InstanceModel(BaseModel):

# instance termination handling
termination_deadline: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime)
termination_reason: Mapped[Optional[str]] = mapped_column(String(4000))
# dstack versions prior to 0.20.1 represented instance termination reasons as raw strings.
# Such strings may still be stored in the database, so we are using a wide column (4000 chars)
# and a fallback deserializer to convert them to relevant enum members.
termination_reason: Mapped[Optional[InstanceTerminationReason]] = mapped_column(
EnumAsString(
InstanceTerminationReason,
4000,
fallback_deserializer=InstanceTerminationReason.from_legacy_str,
)
)
termination_reason_message: Mapped[Optional[str]] = mapped_column(String(4000))
# Deprecated since 0.19.22, not used
health_status: Mapped[Optional[str]] = mapped_column(String(4000), deferred=True)
health: Mapped[HealthStatus] = mapped_column(
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/server/services/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def instance_model_to_instance(instance_model: InstanceModel) -> Instance:
unreachable=instance_model.unreachable,
health_status=instance_model.health,
termination_reason=instance_model.termination_reason,
termination_reason_message=instance_model.termination_reason_message,
created=instance_model.created_at,
total_blocks=instance_model.total_blocks,
busy_blocks=instance_model.busy_blocks,
Expand Down
Loading