From 61a38081cc8bbb7a0bb32ba81f074669e73d7ae4 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Fri, 19 Dec 2025 10:44:22 -0800 Subject: [PATCH] delete unused PickledMessageClientActor Summary: Dead code? Differential Revision: D89554678 --- monarch_hyperactor/Cargo.toml | 1 - monarch_hyperactor/src/actor.rs | 113 ------------------ monarch_hyperactor/src/proc.rs | 3 +- .../monarch_hyperactor/actor.pyi | 78 ------------ python/monarch/_rust_bindings/old.pyi | 78 ------------ 5 files changed, 1 insertion(+), 272 deletions(-) diff --git a/monarch_hyperactor/Cargo.toml b/monarch_hyperactor/Cargo.toml index a2794dd1d..18360af60 100644 --- a/monarch_hyperactor/Cargo.toml +++ b/monarch_hyperactor/Cargo.toml @@ -49,7 +49,6 @@ opentelemetry = "0.29" pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] } pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] } serde = { version = "1.0.219", features = ["derive", "rc"] } -serde_bytes = "0.11" serde_multipart = { version = "0.0.0", path = "../serde_multipart" } tempfile = "3.22" tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] } diff --git a/monarch_hyperactor/src/actor.rs b/monarch_hyperactor/src/actor.rs index ea1189dab..8e5cafabc 100644 --- a/monarch_hyperactor/src/actor.rs +++ b/monarch_hyperactor/src/actor.rs @@ -8,7 +8,6 @@ use std::error::Error; use std::future::pending; -use std::sync::Arc; use std::sync::OnceLock; use async_trait::async_trait; @@ -57,9 +56,7 @@ use pyo3::types::PyList; use pyo3::types::PyType; use serde::Deserialize; use serde::Serialize; -use serde_bytes::ByteBuf; use serde_multipart::Part; -use tokio::sync::Mutex; use tokio::sync::oneshot; use tracing::Instrument; @@ -76,119 +73,11 @@ use crate::metrics::ENDPOINT_ACTOR_COUNT; use crate::metrics::ENDPOINT_ACTOR_ERROR; use crate::metrics::ENDPOINT_ACTOR_LATENCY_US_HISTOGRAM; use crate::metrics::ENDPOINT_ACTOR_PANIC; -use crate::proc::InstanceWrapper; use crate::proc::PyActorId; -use crate::proc::PyProc; -use crate::proc::PySerialized; use crate::pytokio::PythonTask; use crate::runtime::get_tokio_runtime; -use crate::runtime::signal_safe_block_on; use crate::supervision::MeshFailure; -#[pyclass(frozen, module = "monarch._rust_bindings.monarch_hyperactor.actor")] -#[derive(Serialize, Deserialize, Named)] -pub struct PickledMessage { - sender_actor_id: ActorId, - message: ByteBuf, -} - -impl std::fmt::Debug for PickledMessage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "PickledMessage(sender_actor_id: {:?} message: {})", - self.sender_actor_id, - hyperactor::data::HexFmt(self.message.as_slice()), - ) - } -} - -#[pymethods] -impl PickledMessage { - #[new] - #[pyo3(signature = (*, sender_actor_id, message))] - fn new(sender_actor_id: &PyActorId, message: Vec) -> Self { - Self { - sender_actor_id: sender_actor_id.into(), - message: ByteBuf::from(message), - } - } - - #[getter] - fn sender_actor_id(&self) -> PyActorId { - self.sender_actor_id.clone().into() - } - - #[getter] - fn message<'a>(&self, py: Python<'a>) -> Bound<'a, PyBytes> { - PyBytes::new(py, self.message.as_ref()) - } - - fn serialize(&self) -> PyResult { - PySerialized::new(self) - } -} - -#[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")] -pub struct PickledMessageClientActor { - instance: Arc>>, -} - -#[pymethods] -impl PickledMessageClientActor { - #[new] - fn new(proc: &PyProc, actor_name: &str) -> PyResult { - Ok(Self { - instance: Arc::new(Mutex::new(InstanceWrapper::new(proc, actor_name)?)), - }) - } - - /// Send a message to any actor that can receive the corresponding serialized message. - fn send(&self, actor_id: &PyActorId, message: &PySerialized) -> PyResult<()> { - let instance = self.instance.blocking_lock(); - instance.send(actor_id, message) - } - - /// Get the next message from the queue. It will block until a message is received - /// or the timeout is reached in which case it will return None - /// If the actor has been stopped, this returns an error. - #[pyo3(signature = (*, timeout_msec = None))] - fn get_next_message<'py>( - &mut self, - py: Python<'py>, - timeout_msec: Option, - ) -> PyResult { - let instance = self.instance.clone(); - let result = signal_safe_block_on(py, async move { - instance.lock().await.next_message(timeout_msec).await - })?; - Python::with_gil(|py| { - result - .map(|res| res.into_py_any(py))? - .map_err(|err| PyRuntimeError::new_err(err.to_string())) - }) - } - - /// Stop the background task and return any messages that were received. - /// TODO: This is currently just aborting the task, we should have a better way to stop it. - fn drain_and_stop<'py>(&mut self, py: Python<'py>) -> PyResult> { - let mut instance = self.instance.blocking_lock(); - let messages = instance - .drain_and_stop() - .map_err(|err| PyRuntimeError::new_err(err.to_string()))? - .into_iter() - .map(|message| message.into_py_any(py)) - .collect::>>()?; - PyList::new(py, messages) - } - - #[getter] - fn actor_id(&self) -> PyResult { - let instance = self.instance.blocking_lock(); - Ok(PyActorId::from(instance.actor_id().clone())) - } -} - #[pyclass(module = "monarch._rust_bindings.monarch_hyperactor.actor")] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub enum UnflattenArg { @@ -1240,8 +1129,6 @@ impl LocalPort { } pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> { - hyperactor_mod.add_class::()?; - hyperactor_mod.add_class::()?; hyperactor_mod.add_class::()?; hyperactor_mod.add_class::()?; hyperactor_mod.add_class::()?; diff --git a/monarch_hyperactor/src/proc.rs b/monarch_hyperactor/src/proc.rs index 37efb6322..7bb895c06 100644 --- a/monarch_hyperactor/src/proc.rs +++ b/monarch_hyperactor/src/proc.rs @@ -306,8 +306,7 @@ impl PySerialized { /// Wrapper around an instance of an actor that provides utilities to implement /// a python actor. This helps by allowing users to specialize the actor to the -/// message type they want to handle. [`PickledMessageClientActor``] is a specialization of this -/// for handling messages that are serialized to bytes using pickle. +/// message type they want to handle. pub struct InstanceWrapper { instance: Instance<()>, message_receiver: PortReceiver, diff --git a/python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi b/python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi index 796a1ea15..ee2fe4965 100644 --- a/python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi +++ b/python/monarch/_rust_bindings/monarch_hyperactor/actor.pyi @@ -33,84 +33,6 @@ from monarch._rust_bindings.monarch_hyperactor.mailbox import ( from monarch._rust_bindings.monarch_hyperactor.proc import ActorId, Proc, Serialized from monarch._rust_bindings.monarch_hyperactor.shape import Shape -@final -class PickledMessage: - """ - A message that can be sent to PickledMessage{,Client}Actor. It is a wrapper around - a serialized message and the sender's actor id. - - Arguments: - - `sender_actor_id`: The actor id of the sender. - - `message`: The pickled message. - """ - - def __init__(self, *, sender_actor_id: ActorId, message: bytes) -> None: ... - @property - def sender_actor_id(self) -> ActorId: - """The actor id of the sender.""" - ... - - @property - def message(self) -> bytes: - """The pickled message.""" - ... - - def serialize(self) -> Serialized: - """Serialize the message into a Serialized object.""" - ... - -@final -class PickledMessageClientActor: - """ - A python based detached actor that can be used to send messages to other - actors and recieve PickledMessage objects from them. - - Arguments: - - `proc`: The proc the actor is a part of. - - `actor_name`: Name of the actor. - """ - - def __init__(self, proc: Proc, actor_name: str) -> None: ... - def send(self, actor_id: ActorId, message: Serialized) -> None: - """ - Send a message to the actor with the given actor id. - - Arguments: - - `actor_id`: The actor id of the actor to send the message to. - - `message`: The message to send. - """ - ... - - def get_next_message( - self, *, timeout_msec: int | None = None - ) -> PickledMessage | None: - """ - Get the next message sent to the actor. If the timeout is reached - before a message is received, None is returned. - - Arguments: - - `timeout_msec`: Number of milliseconds to wait for a message. - None means wait forever. - """ - ... - - def stop_worlds(self, world_names: List[str]) -> None: - """Stop the system.""" - ... - - def drain_and_stop(self) -> list[PickledMessage]: - """Stop the actor and drain all messages.""" - ... - - def world_status(self) -> dict[str, str]: - """Get the world status from the system.""" - ... - - @property - def actor_id(self) -> ActorId: - """The actor id of the actor.""" - ... - class PythonMessageKind: @classmethod @property diff --git a/python/monarch/_rust_bindings/old.pyi b/python/monarch/_rust_bindings/old.pyi index 70d00b0f1..5e1e9c2b0 100644 --- a/python/monarch/_rust_bindings/old.pyi +++ b/python/monarch/_rust_bindings/old.pyi @@ -183,84 +183,6 @@ class PortId: """ ... -@final -class PickledMessage: - """ - A message that can be sent to PickledMessage{,Client}Actor. It is a wrapper around - a serialized message and the sender's actor id. - - Arguments: - - `sender_actor_id`: The actor id of the sender. - - `message`: The pickled message. - """ - - def __init__(self, *, sender_actor_id: ActorId, message: bytes) -> None: ... - @property - def sender_actor_id(self) -> ActorId: - """The actor id of the sender.""" - ... - - @property - def message(self) -> bytes: - """The pickled message.""" - ... - - def serialize(self) -> Serialized: - """Serialize the message into a Serialized object.""" - ... - -@final -class PickledMessageClientActor: - """ - A python based detached actor that can be used to send messages to other - actors and recieve PickledMessage objects from them. - - Arguments: - - `proc`: The proc the actor is a part of. - - `actor_name`: Name of the actor. - """ - - def __init__(self, proc: Proc, actor_name: str) -> None: ... - def send(self, actor_id: ActorId, message: Serialized) -> None: - """ - Send a message to the actor with the given actor id. - - Arguments: - - `actor_id`: The actor id of the actor to send the message to. - - `message`: The message to send. - """ - ... - - def get_next_message( - self, *, timeout_msec: int | None = None - ) -> PickledMessage | None: - """ - Get the next message sent to the actor. If the timeout is reached - before a message is received, None is returned. - - Arguments: - - `timeout_msec`: Number of milliseconds to wait for a message. - None means wait forever. - """ - ... - - def stop_worlds(self, world_names: List[str]) -> None: - """Stop the system.""" - ... - - def drain_and_stop(self) -> list[PickledMessage]: - """Stop the actor and drain all messages.""" - ... - - def world_status(self) -> dict[str, str]: - """Get the world status from the system.""" - ... - - @property - def actor_id(self) -> ActorId: - """The actor id of the actor.""" - ... - @final class Proc: """