Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion monarch_hyperactor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ opentelemetry = "0.29"
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }
serde = { version = "1.0.219", features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
tempfile = "3.22"
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
Expand Down
113 changes: 0 additions & 113 deletions monarch_hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

use std::error::Error;
use std::future::pending;
use std::sync::Arc;
use std::sync::OnceLock;

use async_trait::async_trait;
Expand Down Expand Up @@ -57,9 +56,7 @@ use pyo3::types::PyList;
use pyo3::types::PyType;
use serde::Deserialize;
use serde::Serialize;
use serde_bytes::ByteBuf;
use serde_multipart::Part;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tracing::Instrument;

Expand All @@ -76,119 +73,11 @@ use crate::metrics::ENDPOINT_ACTOR_COUNT;
use crate::metrics::ENDPOINT_ACTOR_ERROR;
use crate::metrics::ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM;
use crate::metrics::ENDPOINT_ACTOR_PANIC;
use crate::proc::InstanceWrapper;
use crate::proc::PyActorId;
use crate::proc::PyProc;
use crate::proc::PySerialized;
use crate::pytokio::PythonTask;
use crate::runtime::get_tokio_runtime;
use crate::runtime::signal_safe_block_on;
use crate::supervision::MeshFailure;

#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")]
#[derive(Serialize, Deserialize, Named)]
pub struct PickledMessage {
sender_actor_id: ActorId,
message: ByteBuf,
}

impl std::fmt::Debug for PickledMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PickledMessage(sender_actor_id: {:?} message: {})",
self.sender_actor_id,
hyperactor::data::HexFmt(self.message.as_slice()),
)
}
}

#[pymethods]
impl PickledMessage {
#[new]
#[pyo3(signature = (*, sender_actor_id, message))]
fn new(sender_actor_id: &PyActorId, message: Vec<u8>) -> Self {
Self {
sender_actor_id: sender_actor_id.into(),
message: ByteBuf::from(message),
}
}

#[getter]
fn sender_actor_id(&self) -> PyActorId {
self.sender_actor_id.clone().into()
}

#[getter]
fn message<'a>(&self, py: Python<'a>) -> Bound<'a, PyBytes> {
PyBytes::new(py, self.message.as_ref())
}

fn serialize(&self) -> PyResult<PySerialized> {
PySerialized::new(self)
}
}

#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
pub struct PickledMessageClientActor {
instance: Arc<Mutex<InstanceWrapper<PickledMessage>>>,
}

#[pymethods]
impl PickledMessageClientActor {
#[new]
fn new(proc: &PyProc, actor_name: &str) -> PyResult<Self> {
Ok(Self {
instance: Arc::new(Mutex::new(InstanceWrapper::new(proc, actor_name)?)),
})
}

/// Send a message to any actor that can receive the corresponding serialized message.
fn send(&self, actor_id: &PyActorId, message: &PySerialized) -> PyResult<()> {
let instance = self.instance.blocking_lock();
instance.send(actor_id, message)
}

/// Get the next message from the queue. It will block until a message is received
/// or the timeout is reached in which case it will return None
/// If the actor has been stopped, this returns an error.
#[pyo3(signature = (*, timeout_msec = None))]
fn get_next_message<'py>(
&mut self,
py: Python<'py>,
timeout_msec: Option<u64>,
) -> PyResult<PyObject> {
let instance = self.instance.clone();
let result = signal_safe_block_on(py, async move {
instance.lock().await.next_message(timeout_msec).await
})?;
Python::with_gil(|py| {
result
.map(|res| res.into_py_any(py))?
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
})
}

/// Stop the background task and return any messages that were received.
/// TODO: This is currently just aborting the task, we should have a better way to stop it.
fn drain_and_stop<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
let mut instance = self.instance.blocking_lock();
let messages = instance
.drain_and_stop()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?
.into_iter()
.map(|message| message.into_py_any(py))
.collect::<PyResult<Vec<_>>>()?;
PyList::new(py, messages)
}

#[getter]
fn actor_id(&self) -> PyResult<PyActorId> {
let instance = self.instance.blocking_lock();
Ok(PyActorId::from(instance.actor_id().clone()))
}
}

#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum UnflattenArg {
Expand Down Expand Up @@ -1240,8 +1129,6 @@ impl LocalPort {
}

pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
hyperactor_mod.add_class::<PickledMessage>()?;
hyperactor_mod.add_class::<PickledMessageClientActor>()?;
hyperactor_mod.add_class::<PythonActorHandle>()?;
hyperactor_mod.add_class::<PythonMessage>()?;
hyperactor_mod.add_class::<PythonMessageKind>()?;
Expand Down
3 changes: 1 addition & 2 deletions monarch_hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,7 @@ impl PySerialized {

/// Wrapper around an instance of an actor that provides utilities to implement
/// a python actor. This helps by allowing users to specialize the actor to the
/// message type they want to handle. [`PickledMessageClientActor``] is a specialization of this
/// for handling messages that are serialized to bytes using pickle.
/// message type they want to handle.
pub struct InstanceWrapper<M: RemoteMessage> {
instance: Instance<()>,
message_receiver: PortReceiver<M>,
Expand Down
78 changes: 0 additions & 78 deletions python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,84 +33,6 @@ from monarch._rust_bindings.monarch_hyperactor.mailbox import (
from monarch._rust_bindings.monarch_hyperactor.proc import ActorId, Proc, Serialized
from monarch._rust_bindings.monarch_hyperactor.shape import Shape

@final
class PickledMessage:
"""
A message that can be sent to PickledMessage{,Client}Actor. It is a wrapper around
a serialized message and the sender's actor id.

Arguments:
- `sender_actor_id`: The actor id of the sender.
- `message`: The pickled message.
"""

def __init__(self, *, sender_actor_id: ActorId, message: bytes) -> None: ...
@property
def sender_actor_id(self) -> ActorId:
"""The actor id of the sender."""
...

@property
def message(self) -> bytes:
"""The pickled message."""
...

def serialize(self) -> Serialized:
"""Serialize the message into a Serialized object."""
...

@final
class PickledMessageClientActor:
"""
A python based detached actor that can be used to send messages to other
actors and recieve PickledMessage objects from them.

Arguments:
- `proc`: The proc the actor is a part of.
- `actor_name`: Name of the actor.
"""

def __init__(self, proc: Proc, actor_name: str) -> None: ...
def send(self, actor_id: ActorId, message: Serialized) -> None:
"""
Send a message to the actor with the given actor id.

Arguments:
- `actor_id`: The actor id of the actor to send the message to.
- `message`: The message to send.
"""
...

def get_next_message(
self, *, timeout_msec: int | None = None
) -> PickledMessage | None:
"""
Get the next message sent to the actor. If the timeout is reached
before a message is received, None is returned.

Arguments:
- `timeout_msec`: Number of milliseconds to wait for a message.
None means wait forever.
"""
...

def stop_worlds(self, world_names: List[str]) -> None:
"""Stop the system."""
...

def drain_and_stop(self) -> list[PickledMessage]:
"""Stop the actor and drain all messages."""
...

def world_status(self) -> dict[str, str]:
"""Get the world status from the system."""
...

@property
def actor_id(self) -> ActorId:
"""The actor id of the actor."""
...

class PythonMessageKind:
@classmethod
@property
Expand Down
78 changes: 0 additions & 78 deletions python/monarch/_rust_bindings/old.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -183,84 +183,6 @@ class PortId:
"""
...

@final
class PickledMessage:
"""
A message that can be sent to PickledMessage{,Client}Actor. It is a wrapper around
a serialized message and the sender's actor id.

Arguments:
- `sender_actor_id`: The actor id of the sender.
- `message`: The pickled message.
"""

def __init__(self, *, sender_actor_id: ActorId, message: bytes) -> None: ...
@property
def sender_actor_id(self) -> ActorId:
"""The actor id of the sender."""
...

@property
def message(self) -> bytes:
"""The pickled message."""
...

def serialize(self) -> Serialized:
"""Serialize the message into a Serialized object."""
...

@final
class PickledMessageClientActor:
"""
A python based detached actor that can be used to send messages to other
actors and recieve PickledMessage objects from them.

Arguments:
- `proc`: The proc the actor is a part of.
- `actor_name`: Name of the actor.
"""

def __init__(self, proc: Proc, actor_name: str) -> None: ...
def send(self, actor_id: ActorId, message: Serialized) -> None:
"""
Send a message to the actor with the given actor id.

Arguments:
- `actor_id`: The actor id of the actor to send the message to.
- `message`: The message to send.
"""
...

def get_next_message(
self, *, timeout_msec: int | None = None
) -> PickledMessage | None:
"""
Get the next message sent to the actor. If the timeout is reached
before a message is received, None is returned.

Arguments:
- `timeout_msec`: Number of milliseconds to wait for a message.
None means wait forever.
"""
...

def stop_worlds(self, world_names: List[str]) -> None:
"""Stop the system."""
...

def drain_and_stop(self) -> list[PickledMessage]:
"""Stop the actor and drain all messages."""
...

def world_status(self) -> dict[str, str]:
"""Get the world status from the system."""
...

@property
def actor_id(self) -> ActorId:
"""The actor id of the actor."""
...

@final
class Proc:
"""
Expand Down