From 0b0a0464c9f838105e9b15087dd80977bf52e196 Mon Sep 17 00:00:00 2001 From: Bart Platak Date: Thu, 2 Oct 2025 16:11:06 +0100 Subject: [PATCH 1/2] feat(reliability) Add metrics --- pyth_observer/__init__.py | 178 ++++++++++++++++++------- pyth_observer/dispatch.py | 78 +++++++---- pyth_observer/metrics.py | 265 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 447 insertions(+), 74 deletions(-) create mode 100644 pyth_observer/metrics.py diff --git a/pyth_observer/__init__.py b/pyth_observer/__init__.py index f3d94ba..32a7171 100644 --- a/pyth_observer/__init__.py +++ b/pyth_observer/__init__.py @@ -24,6 +24,7 @@ from pyth_observer.crosschain import CrosschainPrice from pyth_observer.crosschain import CrosschainPriceObserver as Crosschain from pyth_observer.dispatch import Dispatch +from pyth_observer.metrics import metrics from pyth_observer.models import Publisher PYTHTEST_HTTP_ENDPOINT = "https://api.pythtest.pyth.network/" @@ -71,7 +72,16 @@ def __init__( self.crosschain_throttler = Throttler(rate_limit=1, period=1) self.coingecko_mapping = coingecko_mapping + metrics.set_observer_info( + network=config["network"]["name"], + config=config, + ) + + metrics.observer_up = 1 + async def run(self): + # global states + states = [] while True: try: logger.info("Running checks") @@ -81,6 +91,10 @@ async def run(self): crosschain_prices = await self.get_crosschain_prices() health_server.observer_ready = True + metrics.observer_ready = 1 + + processed_feeds = 0 + active_publishers_by_symbol = {} for product in products: # Skip tombstone accounts with blank metadata @@ -121,80 +135,139 @@ async def run(self): if not price_account.aggregate_price_info: raise RuntimeError("Aggregate price info is missing") - states.append( - PriceFeedState( - symbol=product.attrs["symbol"], - asset_type=product.attrs["asset_type"], - schedule=MarketSchedule(product.attrs["schedule"]), - public_key=price_account.key, - status=price_account.aggregate_price_status, - # this is the solana block slot when price account was fetched - latest_block_slot=latest_block_slot, - latest_trading_slot=price_account.last_slot, - price_aggregate=price_account.aggregate_price_info.price, - confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, - coingecko_price=coingecko_prices.get( - product.attrs["base"] - ), - coingecko_update=coingecko_updates.get( - product.attrs["base"] - ), - crosschain_price=crosschain_price, - ) + price_feed_state = PriceFeedState( + symbol=product.attrs["symbol"], + asset_type=product.attrs["asset_type"], + schedule=MarketSchedule(product.attrs["schedule"]), + public_key=price_account.key, + status=price_account.aggregate_price_status, + # this is the solana block slot when price account was fetched + latest_block_slot=latest_block_slot, + latest_trading_slot=price_account.last_slot, + price_aggregate=price_account.aggregate_price_info.price, + confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, + coingecko_price=coingecko_prices.get(product.attrs["base"]), + coingecko_update=coingecko_updates.get( + product.attrs["base"] + ), + crosschain_price=crosschain_price, ) + states.append(price_feed_state) + processed_feeds += 1 + + metrics.update_price_feed_metrics(price_feed_state) + + symbol = product.attrs["symbol"] + if symbol not in active_publishers_by_symbol: + active_publishers_by_symbol[symbol] = { + "count": 0, + "asset_type": product.attrs["asset_type"], + } + for component in price_account.price_components: pub = self.publishers.get(component.publisher_key.key, None) publisher_name = ( (pub.name if pub else "") + f" ({component.publisher_key.key})" ).strip() - states.append( - PublisherState( - publisher_name=publisher_name, - symbol=product.attrs["symbol"], - asset_type=product.attrs["asset_type"], - schedule=MarketSchedule(product.attrs["schedule"]), - public_key=component.publisher_key, - confidence_interval=component.latest_price_info.confidence_interval, - confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, - price=component.latest_price_info.price, - price_aggregate=price_account.aggregate_price_info.price, - slot=component.latest_price_info.pub_slot, - aggregate_slot=price_account.last_slot, - # this is the solana block slot when price account was fetched - latest_block_slot=latest_block_slot, - status=component.latest_price_info.price_status, - aggregate_status=price_account.aggregate_price_status, - ) + + publisher_state = PublisherState( + publisher_name=publisher_name, + symbol=product.attrs["symbol"], + asset_type=product.attrs["asset_type"], + schedule=MarketSchedule(product.attrs["schedule"]), + public_key=component.publisher_key, + confidence_interval=component.latest_price_info.confidence_interval, + confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, + price=component.latest_price_info.price, + price_aggregate=price_account.aggregate_price_info.price, + slot=component.latest_price_info.pub_slot, + aggregate_slot=price_account.last_slot, + # this is the solana block slot when price account was fetched + latest_block_slot=latest_block_slot, + status=component.latest_price_info.price_status, + aggregate_status=price_account.aggregate_price_status, ) - await self.dispatch.run(states) + states.append(publisher_state) + active_publishers_by_symbol[symbol]["count"] += 1 + + metrics.price_feeds_processed.set(processed_feeds) + + for symbol, info in active_publishers_by_symbol.items(): + metrics.publishers_active.labels( + symbol=symbol, asset_type=info["asset_type"] + ).set(info["count"]) + + await self.dispatch.run(states) + except Exception as e: logger.error(f"Error in run loop: {e}") health_server.observer_ready = False + metrics.observer_ready = 0 + metrics.loop_errors_total.labels(error_type=type(e).__name__).inc() - logger.debug("Sleeping...") + metrics.observer_ready = 0 await asyncio.sleep(5) async def get_pyth_products(self) -> List[PythProductAccount]: logger.debug("Fetching Pyth product accounts...") - async with self.pyth_throttler: - return await self.pyth_client.refresh_products() + try: + async with self.pyth_throttler: + with metrics.time_operation( + metrics.api_request_duration, service="pyth", endpoint="products" + ): + result = await self.pyth_client.refresh_products() + metrics.api_request_total.labels( + service="pyth", endpoint="products", status="success" + ).inc() + return result + except Exception: + metrics.api_request_total.labels( + service="pyth", endpoint="products", status="error" + ).inc() + raise async def get_pyth_prices( self, product: PythProductAccount ) -> Dict[PythPriceType, PythPriceAccount]: logger.debug("Fetching Pyth price accounts...") - async with self.pyth_throttler: - return await product.refresh_prices() + try: + async with self.pyth_throttler: + with metrics.time_operation( + metrics.api_request_duration, service="pyth", endpoint="prices" + ): + result = await product.refresh_prices() + metrics.api_request_total.labels( + service="pyth", endpoint="prices", status="success" + ).inc() + return result + except Exception: + metrics.api_request_total.labels( + service="pyth", endpoint="prices", status="error" + ).inc() + raise async def get_coingecko_prices(self): logger.debug("Fetching CoinGecko prices...") - data = await get_coingecko_prices(self.coingecko_mapping) + try: + with metrics.time_operation( + metrics.api_request_duration, service="coingecko", endpoint="prices" + ): + data = await get_coingecko_prices(self.coingecko_mapping) + metrics.api_request_total.labels( + service="coingecko", endpoint="prices", status="success" + ).inc() + except Exception: + metrics.api_request_total.labels( + service="coingecko", endpoint="prices", status="error" + ).inc() + raise + prices: Dict[str, float] = {} updates: Dict[str, int] = {} # Unix timestamps @@ -205,4 +278,17 @@ async def get_coingecko_prices(self): return (prices, updates) async def get_crosschain_prices(self) -> Dict[str, CrosschainPrice]: - return await self.crosschain.get_crosschain_prices() + try: + with metrics.time_operation( + metrics.api_request_duration, service="crosschain", endpoint="prices" + ): + result = await self.crosschain.get_crosschain_prices() + metrics.api_request_total.labels( + service="crosschain", endpoint="prices", status="success" + ).inc() + return result + except Exception: + metrics.api_request_total.labels( + service="crosschain", endpoint="prices", status="error" + ).inc() + raise diff --git a/pyth_observer/dispatch.py b/pyth_observer/dispatch.py index de5fb73..f6d0171 100644 --- a/pyth_observer/dispatch.py +++ b/pyth_observer/dispatch.py @@ -6,7 +6,6 @@ from typing import Any, Awaitable, Dict, List from loguru import logger -from prometheus_client import Gauge from pyth_observer.check import Check, State from pyth_observer.check.price_feed import PRICE_FEED_CHECKS, PriceFeedState @@ -15,6 +14,7 @@ from pyth_observer.event import LogEvent # Used dynamically from pyth_observer.event import TelegramEvent # Used dynamically from pyth_observer.event import Context, Event, ZendutyEvent +from pyth_observer.metrics import metrics from pyth_observer.zenduty import send_zenduty_alert assert DatadogEvent @@ -32,16 +32,6 @@ class Dispatch: def __init__(self, config, publishers): self.config = config self.publishers = publishers - self.price_feed_check_gauge = Gauge( - "price_feed_check_failed", - "Price feed check failure status", - ["check", "symbol"], - ) - self.publisher_check_gauge = Gauge( - "publisher_check_failed", - "Publisher check failure status", - ["check", "symbol", "publisher"], - ) if "ZendutyEvent" in self.config["events"]: self.open_alerts_file = os.environ["OPEN_ALERTS_FILE"] self.open_alerts = self.load_alerts() @@ -98,48 +88,70 @@ async def run(self, states: List[State]): sent_events.append(event.send()) await asyncio.gather(*sent_events) + + metrics.update_alert_metrics(self.open_alerts) + if "ZendutyEvent" in self.config["events"]: await self.process_zenduty_events(current_time) def check_price_feed(self, state: PriceFeedState) -> List[Check]: failed_checks: List[Check] = [] + total_checks = 0 + passed_checks = 0 for check_class in PRICE_FEED_CHECKS: config = self.load_config(check_class.__name__, state.symbol) - check = check_class(state, config) - gauge = self.price_feed_check_gauge.labels( - check=check_class.__name__, - symbol=state.symbol, - ) if config["enable"]: - if check.run(): - gauge.set(0) + total_checks += 1 + check = check_class(state, config) + + with metrics.time_operation( + metrics.check_execution_duration, check_type=check_class.__name__ + ): + check_passed = check.run() + + if check_passed: + passed_checks += 1 else: failed_checks.append(check) - gauge.set(1) + + if total_checks > 0: + success_rate = passed_checks / total_checks + metrics.check_success_rate.labels( + check_type="price_feed", symbol=state.symbol + ).set(success_rate) return failed_checks def check_publisher(self, state: PublisherState) -> List[Check]: failed_checks: List[Check] = [] + total_checks = 0 + passed_checks = 0 for check_class in PUBLISHER_CHECKS: config = self.load_config(check_class.__name__, state.symbol) - check = check_class(state, config) - gauge = self.publisher_check_gauge.labels( - check=check_class.__name__, - symbol=state.symbol, - publisher=self.publishers.get(state.public_key, state.public_key), - ) if config["enable"]: - if check.run(): - gauge.set(0) + total_checks += 1 + check = check_class(state, config) + + with metrics.time_operation( + metrics.check_execution_duration, check_type=check_class.__name__ + ): + check_passed = check.run() + + if check_passed: + passed_checks += 1 else: - gauge.set(1) failed_checks.append(check) + if total_checks > 0: + success_rate = passed_checks / total_checks + metrics.check_success_rate.labels( + check_type="publisher", symbol=state.symbol + ).set(success_rate) + return failed_checks def load_config(self, check_name: str, symbol: str) -> Dict[str, Any]: @@ -187,12 +199,16 @@ async def process_zenduty_events(self, current_time): ): logger.debug(f"Resolving Zenduty alert {identifier}") resolved = True + if info["sent"]: response = await send_zenduty_alert( identifier, identifier, resolved=True ) if response and 200 <= response.status < 300: to_remove.append(identifier) + metrics.alerts_sent_total.labels( + alert_type=info["type"], channel="zenduty" + ).inc() else: to_remove.append(identifier) # Raise alert if failed > $threshold times within the last 5m window @@ -216,6 +232,10 @@ async def process_zenduty_events(self, current_time): event = self.delayed_events.get(key) if event: to_alert.append(event.send()) + metrics.alerts_sent_total.labels( + alert_type=info["type"], + channel=event_type.lower().replace("event", ""), + ).inc() # Send the alerts that were delayed due to thresholds await asyncio.gather(*to_alert) @@ -229,5 +249,7 @@ async def process_zenduty_events(self, current_time): if self.delayed_events.get(key): del self.delayed_events[key] + metrics.update_alert_metrics(self.open_alerts) + with open(self.open_alerts_file, "w") as file: json.dump(self.open_alerts, file) diff --git a/pyth_observer/metrics.py b/pyth_observer/metrics.py new file mode 100644 index 0000000..e06e8e3 --- /dev/null +++ b/pyth_observer/metrics.py @@ -0,0 +1,265 @@ +import time +from contextlib import contextmanager +from typing import Any, Dict, Optional + +from prometheus_client import ( + REGISTRY, + CollectorRegistry, + Counter, + Gauge, + Histogram, + Info, +) + + +class PythObserverMetrics: + observer_ready = Gauge("pyth_observer_up", "Pyth observer is up") + + def __init__(self, registry: CollectorRegistry = REGISTRY): + self.registry = registry + + self.observer_info = Info( + "pyth_observer_info", + "Information about the Pyth Observer instance", + registry=registry, + ) + + self.check_execution_duration = Histogram( + "pyth_observer_check_execution_duration_seconds", + "Time spent executing checks", + ["check_type"], + buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0], + registry=registry, + ) + + self.loop_errors_total = Counter( + "pyth_observer_loop_errors_total", + "Total number of errors in observation loop", + ["error_type"], + registry=registry, + ) + + self.price_feeds_processed = Gauge( + "pyth_observer_price_feeds_processed_total", + "Total number of price feeds processed in last cycle", + registry=registry, + ) + + self.price_feed_status = Gauge( + "pyth_observer_price_feed_status", + "Status of price feeds (1=trading, 0=not trading)", + ["symbol", "asset_type"], + registry=registry, + ) + + self.price_feed_staleness = Gauge( + "pyth_observer_price_feed_staleness_slots", + "Number of slots since last price update", + ["symbol", "asset_type"], + registry=registry, + ) + + self.price_feed_confidence_interval = Gauge( + "pyth_observer_price_feed_confidence_interval", + "Price feed confidence interval", + ["symbol", "asset_type"], + registry=registry, + ) + + self.check_success_rate = Gauge( + "pyth_observer_check_success_rate", + "Success rate of checks (0-1)", + ["check_type", "symbol"], + registry=registry, + ) + + self.price_deviation_from_coingecko = Gauge( + "pyth_observer_price_deviation_from_coingecko_percent", + "Price deviation from CoinGecko as percentage", + ["symbol"], + registry=registry, + ) + + self.coingecko_price_age = Gauge( + "pyth_observer_coingecko_price_age_seconds", + "Age of CoinGecko price data in seconds", + ["symbol"], + registry=registry, + ) + + self.publishers_active = Gauge( + "pyth_observer_publishers_active_total", + "Number of active publishers for a symbol", + ["symbol", "asset_type"], + registry=registry, + ) + + self.alerts_active = Gauge( + "pyth_observer_alerts_active_total", + "Number of currently active alerts", + ["alert_type"], + registry=registry, + ) + + self.alerts_sent_total = Counter( + "pyth_observer_alerts_sent_total", + "Total number of alerts sent", + ["alert_type", "channel"], + registry=registry, + ) + + self.api_request_duration = Histogram( + "pyth_observer_api_request_duration_seconds", + "Duration of external API requests", + ["service", "endpoint"], + buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0], + registry=registry, + ) + + self.api_request_total = Counter( + "pyth_observer_api_requests_total", + "Total number of API requests", + ["service", "endpoint", "status"], + registry=registry, + ) + + self.api_rate_limit_hits = Counter( + "pyth_observer_api_rate_limit_hits_total", + "Number of times rate limits were hit", + ["service"], + registry=registry, + ) + + self.crosschain_price_age = Gauge( + "pyth_observer_crosschain_price_age_seconds", + "Age of cross-chain price data in seconds", + ["symbol"], + registry=registry, + ) + + self.latest_block_slot = Gauge( + "pyth_observer_latest_block_slot", + "Latest Solana block slot observed", + registry=registry, + ) + + self.network_connection_status = Gauge( + "pyth_observer_network_connection_status", + "Network connection status (1=connected, 0=disconnected)", + ["network", "endpoint_type"], + registry=registry, + ) + + self.observer_up = 1 + self.observer_ready = 0 + + def set_observer_info(self, network: str, config: Dict[str, Any]): + """Set static information about the observer instance.""" + self.observer_info.info( + { + "network": network, + "checks_enabled": str( + len( + [ + c + for c in config.get("checks", {}).get("global", {}) + if config["checks"]["global"][c].get("enable", False) + ] + ) + ), + "event_handlers": ",".join(config.get("events", [])), + } + ) + + @contextmanager + def time_operation(self, metric: Histogram, **labels): + start_time = time.time() + try: + yield + finally: + duration = time.time() - start_time + metric.labels(**labels).observe(duration) + + def update_price_feed_metrics(self, state): + labels = {"symbol": state.symbol, "asset_type": state.asset_type} + + from pythclient.pythaccounts import PythPriceStatus + + status_value = 1 if state.status == PythPriceStatus.TRADING else 0 + self.price_feed_status.labels(**labels).set(status_value) + + staleness = state.latest_block_slot - state.latest_trading_slot + self.price_feed_staleness.labels(**labels).set(staleness) + + self.price_feed_confidence_interval.labels(**labels).set( + state.confidence_interval_aggregate + ) + + if state.coingecko_price: + deviation = ( + abs(state.price_aggregate - state.coingecko_price) + / state.coingecko_price + * 100 + ) + self.price_deviation_from_coingecko.labels(symbol=state.symbol).set( + deviation + ) + + if state.coingecko_update: + age = time.time() - state.coingecko_update + self.coingecko_price_age.labels(symbol=state.symbol).set(age) + + if state.crosschain_price and state.crosschain_price.get("publish_time"): + age = ( + state.crosschain_price["snapshot_time"] + - state.crosschain_price["publish_time"] + ) + self.crosschain_price_age.labels(symbol=state.symbol).set(age) + + self.latest_block_slot.set(state.latest_block_slot) + + def record_api_request( + self, + service: str, + endpoint: str, + duration: float, + status_code: int, + rate_limited: bool = False, + ): + status = "success" if 200 <= status_code < 300 else "error" + + self.api_request_duration.labels(service=service, endpoint=endpoint).observe( + duration + ) + self.api_request_total.labels( + service=service, endpoint=endpoint, status=status + ).inc() + + if rate_limited: + self.api_rate_limit_hits.labels(service=service).inc() + + def update_alert_metrics( + self, active_alerts: Dict[str, Any], sent_alert: Optional[str] = None + ): + alert_counts = {} + for alert_id, alert_info in active_alerts.items(): + alert_type = alert_info.get("type", "unknown") + alert_counts[alert_type] = alert_counts.get(alert_type, 0) + 1 + + for alert_type, count in alert_counts.items(): + self.alerts_active.labels(alert_type=alert_type).set(count) + + if sent_alert: + alert_type = sent_alert.split("-")[0] + self.alerts_sent_total.labels( + alert_type=alert_type, channel="configured" + ).inc() + + def set_network_status(self, network: str, endpoint_type: str, connected: bool): + status = 1 if connected else 0 + self.network_connection_status.labels( + network=network, endpoint_type=endpoint_type + ).set(status) + + +metrics = PythObserverMetrics() From dea0c1dd6380cca764507cd011cdd97c9d2ad624 Mon Sep 17 00:00:00 2001 From: Bart Platak Date: Thu, 2 Oct 2025 16:14:08 +0100 Subject: [PATCH 2/2] cleanup --- pyth_observer/__init__.py | 6 ------ pyth_observer/metrics.py | 5 ----- 2 files changed, 11 deletions(-) diff --git a/pyth_observer/__init__.py b/pyth_observer/__init__.py index 32a7171..36d41ad 100644 --- a/pyth_observer/__init__.py +++ b/pyth_observer/__init__.py @@ -77,8 +77,6 @@ def __init__( config=config, ) - metrics.observer_up = 1 - async def run(self): # global states states = [] @@ -91,7 +89,6 @@ async def run(self): crosschain_prices = await self.get_crosschain_prices() health_server.observer_ready = True - metrics.observer_ready = 1 processed_feeds = 0 active_publishers_by_symbol = {} @@ -205,10 +202,7 @@ async def run(self): except Exception as e: logger.error(f"Error in run loop: {e}") health_server.observer_ready = False - metrics.observer_ready = 0 metrics.loop_errors_total.labels(error_type=type(e).__name__).inc() - - metrics.observer_ready = 0 await asyncio.sleep(5) async def get_pyth_products(self) -> List[PythProductAccount]: diff --git a/pyth_observer/metrics.py b/pyth_observer/metrics.py index e06e8e3..8d331f9 100644 --- a/pyth_observer/metrics.py +++ b/pyth_observer/metrics.py @@ -13,8 +13,6 @@ class PythObserverMetrics: - observer_ready = Gauge("pyth_observer_up", "Pyth observer is up") - def __init__(self, registry: CollectorRegistry = REGISTRY): self.registry = registry @@ -150,9 +148,6 @@ def __init__(self, registry: CollectorRegistry = REGISTRY): registry=registry, ) - self.observer_up = 1 - self.observer_ready = 0 - def set_observer_info(self, network: str, config: Dict[str, Any]): """Set static information about the observer instance.""" self.observer_info.info(