From 8d0c58fdd2943719237cdc8520403b75fd3f923d Mon Sep 17 00:00:00 2001 From: SakshiKekre Date: Fri, 27 Jun 2025 11:25:20 -0700 Subject: [PATCH 1/3] added draft endpoint and service layer for economy simulation --- .../policyengine_api/api/routers/economy.py | 44 +++++++++ .../api/services/simulation_runner.py | 91 +++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py create mode 100644 projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py diff --git a/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py b/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py new file mode 100644 index 00000000..dac40e5f --- /dev/null +++ b/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py @@ -0,0 +1,44 @@ +# economy_simulation.py +from fastapi import APIRouter, Query, Body, HTTPException +from typing import Literal +from policyengine_api.services.simulation_runner import SimulationRunner + +router = APIRouter() +runner = SimulationRunner() + +@router.post("/{country_id}/economy/{policy_id}/over/{baseline_policy_id}") +async def start_simulation( + country_id: str, + policy_id: int, + baseline_policy_id: int, + region: str = Query(...), + dataset: str = Query("default"), + time_period: str = Query(...), + target: Literal["general", "cliff"] = Query("general"), + version: str | None = Query(None), + reform: dict = Body(...), + baseline: dict = Body(...), +): + try: + result = runner.start_simulation( + country_id=country_id, + reform=reform, + baseline=baseline, + region=region, + dataset=dataset, + time_period=time_period, + scope="macro", + model_version=version, + ) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/economy/result") +async def get_simulation_result(execution_id: str): + try: + result = runner.get_simulation_result(execution_id) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py b/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py new file mode 100644 index 00000000..e28bd4d8 --- /dev/null +++ b/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py @@ -0,0 +1,91 @@ +# simulation_runner.py +import os +import json +import requests +from google.cloud import workflows_v1, executions_v1 +from google.cloud.workflows.executions_v1.types import Execution +from typing import Literal, Any + +class SimulationRunner: + def __init__(self): + self.is_desktop = os.getenv("PE_MODE") == "desktop" + if not self.is_desktop: + self.project = "prod-api-v2-c4d5" + self.location = "us-central1" + self.workflow = "simulation-workflow" + self.execution_client = executions_v1.ExecutionsClient() + self.workflows_client = workflows_v1.WorkflowsClient() + else: + self.desktop_url = os.getenv( + "SIMULATION_LOCAL_URL", + "http://localhost:8081/simulate/economy/comparison" + ) + + def _build_payload( + self, + country_id: str, + reform: dict, + baseline: dict, + region: str, + dataset: str, + time_period: str, + scope: Literal["macro", "household"] = "macro", + model_version: str | None = None, + data_version: str | None = None, + ) -> dict[str, Any]: + return { + "country": country_id, + "scope": scope, + "reform": reform, + "baseline": baseline, + "time_period": time_period, + "region": region, + "data": dataset, + "model_version": model_version, + "data_version": data_version, + } + + def start_simulation( + self, + country_id: str, + reform: dict, + baseline: dict, + region: str, + dataset: str, + time_period: str, + scope: Literal["macro", "household"] = "macro", + model_version: str | None = None, + data_version: str | None = None, + ) -> dict: + payload = self._build_payload( + country_id, + reform, + baseline, + region, + dataset, + time_period, + scope, + model_version, + data_version, + ) + + if self.is_desktop: + response = requests.post(self.desktop_url, json=payload) + response.raise_for_status() + return response.json() + else: + workflow_path = self.workflows_client.workflow_path( + self.project, self.location, self.workflow + ) + execution = self.execution_client.create_execution( + parent=workflow_path, + execution=Execution(argument=json.dumps(payload)), + ) + return {"execution_id": execution.name} + + def get_simulation_result(self, execution_id: str) -> dict: + if self.is_desktop: + raise RuntimeError("Polling is not supported in desktop mode.") + return json.loads( + self.execution_client.get_execution(name=execution_id).result + ) From 7cd05fd92939cd6116bceef97d20116f7e1f63d2 Mon Sep 17 00:00:00 2001 From: SakshiKekre Date: Tue, 1 Jul 2025 08:25:00 -0700 Subject: [PATCH 2/3] updates --- .../policyengine_api/api/routers/economy.py | 4 +- .../api/services/simulation_runner.py | 43 ++++++++++++++++--- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py b/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py index dac40e5f..24041b52 100644 --- a/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py +++ b/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py @@ -1,11 +1,12 @@ # economy_simulation.py from fastapi import APIRouter, Query, Body, HTTPException from typing import Literal -from policyengine_api.services.simulation_runner import SimulationRunner +from policyengine_api.api.services.simulation_runner import SimulationRunner router = APIRouter() runner = SimulationRunner() + @router.post("/{country_id}/economy/{policy_id}/over/{baseline_policy_id}") async def start_simulation( country_id: str, @@ -39,6 +40,7 @@ async def start_simulation( async def get_simulation_result(execution_id: str): try: result = runner.get_simulation_result(execution_id) + print("SKLOGS: get api called") return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) diff --git a/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py b/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py index e28bd4d8..88dfab38 100644 --- a/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py +++ b/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py @@ -1,10 +1,12 @@ -# simulation_runner.py import os import json import requests -from google.cloud import workflows_v1, executions_v1 +from google.cloud import workflows_v1 +from google.cloud.workflows import executions_v1 from google.cloud.workflows.executions_v1.types import Execution from typing import Literal, Any +from policyengine_api_full.settings import get_settings, Environment +from google.protobuf.json_format import MessageToDict class SimulationRunner: def __init__(self): @@ -18,7 +20,7 @@ def __init__(self): else: self.desktop_url = os.getenv( "SIMULATION_LOCAL_URL", - "http://localhost:8081/simulate/economy/comparison" + "http://localhost:8081/simulate/economy/comparison", ) def _build_payload( @@ -83,9 +85,38 @@ def start_simulation( ) return {"execution_id": execution.name} + # def get_simulation_result(self, execution_id: str) -> dict: + # if self.is_desktop: + # print("SKLOGS: in dev mode") + # raise RuntimeError("Polling is not supported in desktop mode.") + # print("SKLOGS: not in dev mode") + # return json.loads(self.execution_client.get_execution(name=execution_id).result) + def get_simulation_result(self, execution_id: str) -> dict: if self.is_desktop: + print("SKLOGS: in dev mode") raise RuntimeError("Polling is not supported in desktop mode.") - return json.loads( - self.execution_client.get_execution(name=execution_id).result - ) + + execution = self.execution_client.get_execution(name=execution_id) + status = execution.state.name + + response = { + "execution_id": execution_id, + "status": status, + } + + if status == "SUCCEEDED": + response["result"] = json.loads(execution.result or "{}") + elif status == "FAILED": + try: + error_dict = MessageToDict(execution.error) + response["error"] = error_dict.get("message", str(execution.error)) + except Exception: + # fallback to string representation + response["error"] = str(execution.error) + response["result"] = None + else: + # Pending or other states — no result or error yet + response["result"] = None + response["error"] = None + return response From 724e87e5be565397055e2931c9083b7466403d7f Mon Sep 17 00:00:00 2001 From: SakshiKekre Date: Fri, 4 Jul 2025 05:57:06 -0700 Subject: [PATCH 3/3] add router for economy and async thread in sim runner --- projects/policyengine-api-full/pyproject.toml | 3 +- .../src/policyengine_api/api/__init__.py | 11 +- .../policyengine_api/api/routers/economy.py | 4 +- .../api/services/simulation_runner.py | 127 +++++++++++------- projects/policyengine-api-full/uv.lock | 17 +++ server_common.mk | 4 + 6 files changed, 115 insertions(+), 51 deletions(-) diff --git a/projects/policyengine-api-full/pyproject.toml b/projects/policyengine-api-full/pyproject.toml index 6c91ce71..bc97a0da 100644 --- a/projects/policyengine-api-full/pyproject.toml +++ b/projects/policyengine-api-full/pyproject.toml @@ -12,7 +12,8 @@ dependencies = [ "opentelemetry-instrumentation-sqlalchemy (>=0.51b0,<0.52)", "pydantic-settings (>=2.7.1,<3.0.0)", "opentelemetry-instrumentation-fastapi (>=0.51b0,<0.52)", - "policyengine-fastapi" + "policyengine-fastapi", + "google-cloud-workflows>=1.18.1" ] [tool.uv.sources] diff --git a/projects/policyengine-api-full/src/policyengine_api/api/__init__.py b/projects/policyengine-api-full/src/policyengine_api/api/__init__.py index 0b880429..5553671c 100644 --- a/projects/policyengine-api-full/src/policyengine_api/api/__init__.py +++ b/projects/policyengine-api-full/src/policyengine_api/api/__init__.py @@ -4,7 +4,10 @@ from policyengine_api.fastapi.auth.jwt_decoder import JWTDecoder from policyengine_api.fastapi.database import create_session_dep from .household import include_all_routers - +from policyengine_api.api.routers import ( + economy, +) + """ Application defined as routers completely indipendent of environment allowing it to easily be run in whatever cloud provider container or desktop or test environment. @@ -24,3 +27,9 @@ def initialize(app: FastAPI, engine: Engine, jwt_issuer: str, jwt_audience: str) optional_auth=optional_auth, auth=auth, ) + + # Attaching economy simulation routes separately. + # These endpoints run macro-economic impact simulations and may trigger + # cloud workflows (GCP) or a local simulation API when running in desktop mode. + # Keeping them separate to avoid coupling with household/user CRUD routes. + app.include_router(economy.router) diff --git a/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py b/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py index 24041b52..b11023c3 100644 --- a/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py +++ b/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py @@ -21,7 +21,7 @@ async def start_simulation( baseline: dict = Body(...), ): try: - result = runner.start_simulation( + result = await runner.start_simulation( country_id=country_id, reform=reform, baseline=baseline, @@ -39,7 +39,7 @@ async def start_simulation( @router.get("/economy/result") async def get_simulation_result(execution_id: str): try: - result = runner.get_simulation_result(execution_id) + result = await runner.get_simulation_result(execution_id) print("SKLOGS: get api called") return result except Exception as e: diff --git a/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py b/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py index 88dfab38..01c2bef1 100644 --- a/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py +++ b/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py @@ -1,16 +1,23 @@ import os import json -import requests +import uuid +import asyncio +import httpx + +from typing import Literal, Any from google.cloud import workflows_v1 from google.cloud.workflows import executions_v1 from google.cloud.workflows.executions_v1.types import Execution -from typing import Literal, Any -from policyengine_api_full.settings import get_settings, Environment from google.protobuf.json_format import MessageToDict +from policyengine_api_full.settings import get_settings, Environment class SimulationRunner: def __init__(self): - self.is_desktop = os.getenv("PE_MODE") == "desktop" + # self.is_desktop = os.getenv("PE_MODE") == "desktop" + settings = get_settings() + self.is_desktop = settings.environment == Environment.DESKTOP + print(f"SKLOGS ENVIRONMENT: {get_settings().environment}") + if not self.is_desktop: self.project = "prod-api-v2-c4d5" self.location = "us-central1" @@ -22,6 +29,8 @@ def __init__(self): "SIMULATION_LOCAL_URL", "http://localhost:8081/simulate/economy/comparison", ) + self._simulations: dict[str, dict] = {} + self._lock = asyncio.Lock() # To protect self._simulations def _build_payload( self, @@ -47,7 +56,7 @@ def _build_payload( "data_version": data_version, } - def start_simulation( + async def start_simulation( self, country_id: str, reform: dict, @@ -72,51 +81,75 @@ def start_simulation( ) if self.is_desktop: - response = requests.post(self.desktop_url, json=payload) - response.raise_for_status() - return response.json() + async with httpx.AsyncClient() as client: + response = await client.post(self.desktop_url, json=payload) + response.raise_for_status() + result = response.json() + + execution_id = f"desktop-{uuid.uuid4()}" + async with self._lock: + self._simulations[execution_id] = { + "status": "SUCCEEDED", + "result": result, + "error": None, + } + return {"execution_id": execution_id} + else: - workflow_path = self.workflows_client.workflow_path( - self.project, self.location, self.workflow - ) - execution = self.execution_client.create_execution( - parent=workflow_path, - execution=Execution(argument=json.dumps(payload)), - ) + # Use asyncio.to_thread for blocking I/O + def create_execution(): + workflow_path = self.workflows_client.workflow_path( + self.project, self.location, self.workflow + ) + return self.execution_client.create_execution( + parent=workflow_path, + execution=Execution(argument=json.dumps(payload)), + ) + + execution = await asyncio.to_thread(create_execution) return {"execution_id": execution.name} - # def get_simulation_result(self, execution_id: str) -> dict: - # if self.is_desktop: - # print("SKLOGS: in dev mode") - # raise RuntimeError("Polling is not supported in desktop mode.") - # print("SKLOGS: not in dev mode") - # return json.loads(self.execution_client.get_execution(name=execution_id).result) - - def get_simulation_result(self, execution_id: str) -> dict: + async def get_simulation_result(self, execution_id: str) -> dict: if self.is_desktop: - print("SKLOGS: in dev mode") - raise RuntimeError("Polling is not supported in desktop mode.") - - execution = self.execution_client.get_execution(name=execution_id) - status = execution.state.name - - response = { - "execution_id": execution_id, - "status": status, - } + print("SKLOGS: in desktop mode") + async with self._lock: + if execution_id not in self._simulations: + raise ValueError(f"Unknown execution ID: {execution_id}") + simulation = self._simulations[execution_id] + + return { + "execution_id": execution_id, + "status": simulation["status"], + "result": simulation["result"], + "error": simulation["error"], + } - if status == "SUCCEEDED": - response["result"] = json.loads(execution.result or "{}") - elif status == "FAILED": - try: - error_dict = MessageToDict(execution.error) - response["error"] = error_dict.get("message", str(execution.error)) - except Exception: - # fallback to string representation - response["error"] = str(execution.error) - response["result"] = None else: - # Pending or other states — no result or error yet - response["result"] = None - response["error"] = None - return response + print("SKLOGS: in prod mode") + + def get_execution(): + return self.execution_client.get_execution(name=execution_id) + + execution = await asyncio.to_thread(get_execution) + status = execution.state.name + + response = { + "execution_id": execution_id, + "status": status, + } + + if status == "SUCCEEDED": + response["result"] = json.loads(execution.result or "{}") + response["error"] = None + elif status == "FAILED": + try: + error_dict = MessageToDict(execution.error) + response["error"] = error_dict.get("message", str(execution.error)) + except Exception: + response["error"] = str(execution.error) + response["result"] = None + else: + response["result"] = None + response["error"] = None + + return response diff --git a/projects/policyengine-api-full/uv.lock b/projects/policyengine-api-full/uv.lock index 9ebefb70..7108032b 100644 --- a/projects/policyengine-api-full/uv.lock +++ b/projects/policyengine-api-full/uv.lock @@ -355,6 +355,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/96/7a8d271e91effa9ccc2fd7cfd5cf287a2d7900080a475477c2ac0c7a331d/google_cloud_trace-1.16.2-py3-none-any.whl", hash = "sha256:40fb74607752e4ee0f3d7e5fc6b8f6eb1803982254a1507ba918172484131456", size = 103755, upload-time = "2025-06-12T00:53:00.672Z" }, ] +[[package]] +name = "google-cloud-workflows" +version = "1.18.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core", extra = ["grpc"] }, + { name = "google-auth" }, + { name = "proto-plus" }, + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/67/70/19c6f828d9d677819837743b412a061d003497f433411dfbc86063e77c94/google_cloud_workflows-1.18.2.tar.gz", hash = "sha256:4bb936419bfe110c9f67a4631be340ffe6bd2014299cc4c4f9321b5e5202d4df", size = 209360, upload-time = "2025-06-12T00:13:16.243Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6c/46/471eb98ce1afa95003f1ebae0135dc6a710e9fb7144693710219ddd77c59/google_cloud_workflows-1.18.2-py3-none-any.whl", hash = "sha256:e1dd52203860d91363a005b6b41f97eb2439a7c954e9cd6f7c77d29fac0c4421", size = 198584, upload-time = "2025-06-12T00:13:14.96Z" }, +] + [[package]] name = "googleapis-common-protos" version = "1.70.0" @@ -867,6 +882,7 @@ name = "policyengine-api-full" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "google-cloud-workflows" }, { name = "opentelemetry-instrumentation-fastapi" }, { name = "opentelemetry-instrumentation-sqlalchemy" }, { name = "policyengine-fastapi" }, @@ -887,6 +903,7 @@ test = [ [package.metadata] requires-dist = [ { name = "black", marker = "extra == 'build'", specifier = ">=25.1.0" }, + { name = "google-cloud-workflows", specifier = ">=1.18.1" }, { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.51b0,<0.52" }, { name = "opentelemetry-instrumentation-sqlalchemy", specifier = ">=0.51b0,<0.52" }, { name = "policyengine-fastapi", editable = "../../libs/policyengine-fastapi" }, diff --git a/server_common.mk b/server_common.mk index 979cd154..5726dc7c 100644 --- a/server_common.mk +++ b/server_common.mk @@ -13,6 +13,10 @@ deploy: echo "Building ${SERVICE_NAME} docker image" cd ../../ && gcloud builds submit --region=${REGION} --substitutions=_IMAGE_TAG=${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/${SERVICE_NAME}:${TAG},_SERVICE_NAME=${SERVICE_NAME},_MODULE_NAME=${MODULE_NAME},_WORKER_COUNT=${WORKER_COUNT} ${BUILD_ARGS} +# Always run with a single worker in dev mode to match local desktop environment. +# Sets WORKER_COUNT=1 just for this target. +# Also set ENVIRONMENT=desktop in src/.env file for the app to pick up the correct config. +dev: WORKER_COUNT=1 dev: echo "Running ${SERVICE_NAME} dev instance" cd src && uv run uvicorn ${MODULE_NAME}:app --reload --port ${DEV_PORT} --workers ${WORKER_COUNT}