diff --git a/projects/policyengine-api-full/pyproject.toml b/projects/policyengine-api-full/pyproject.toml index 6c91ce71..bc97a0da 100644 --- a/projects/policyengine-api-full/pyproject.toml +++ b/projects/policyengine-api-full/pyproject.toml @@ -12,7 +12,8 @@ dependencies = [ "opentelemetry-instrumentation-sqlalchemy (>=0.51b0,<0.52)", "pydantic-settings (>=2.7.1,<3.0.0)", "opentelemetry-instrumentation-fastapi (>=0.51b0,<0.52)", - "policyengine-fastapi" + "policyengine-fastapi", + "google-cloud-workflows>=1.18.1" ] [tool.uv.sources] diff --git a/projects/policyengine-api-full/src/policyengine_api/api/__init__.py b/projects/policyengine-api-full/src/policyengine_api/api/__init__.py index 0b880429..5553671c 100644 --- a/projects/policyengine-api-full/src/policyengine_api/api/__init__.py +++ b/projects/policyengine-api-full/src/policyengine_api/api/__init__.py @@ -4,7 +4,10 @@ from policyengine_api.fastapi.auth.jwt_decoder import JWTDecoder from policyengine_api.fastapi.database import create_session_dep from .household import include_all_routers - +from policyengine_api.api.routers import ( + economy, +) + """ Application defined as routers completely indipendent of environment allowing it to easily be run in whatever cloud provider container or desktop or test environment. @@ -24,3 +27,9 @@ def initialize(app: FastAPI, engine: Engine, jwt_issuer: str, jwt_audience: str) optional_auth=optional_auth, auth=auth, ) + + # Attaching economy simulation routes separately. + # These endpoints run macro-economic impact simulations and may trigger + # cloud workflows (GCP) or a local simulation API when running in desktop mode. + # Keeping them separate to avoid coupling with household/user CRUD routes. + app.include_router(economy.router) diff --git a/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py b/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py new file mode 100644 index 00000000..b11023c3 --- /dev/null +++ b/projects/policyengine-api-full/src/policyengine_api/api/routers/economy.py @@ -0,0 +1,46 @@ +# economy_simulation.py +from fastapi import APIRouter, Query, Body, HTTPException +from typing import Literal +from policyengine_api.api.services.simulation_runner import SimulationRunner + +router = APIRouter() +runner = SimulationRunner() + + +@router.post("/{country_id}/economy/{policy_id}/over/{baseline_policy_id}") +async def start_simulation( + country_id: str, + policy_id: int, + baseline_policy_id: int, + region: str = Query(...), + dataset: str = Query("default"), + time_period: str = Query(...), + target: Literal["general", "cliff"] = Query("general"), + version: str | None = Query(None), + reform: dict = Body(...), + baseline: dict = Body(...), +): + try: + result = await runner.start_simulation( + country_id=country_id, + reform=reform, + baseline=baseline, + region=region, + dataset=dataset, + time_period=time_period, + scope="macro", + model_version=version, + ) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/economy/result") +async def get_simulation_result(execution_id: str): + try: + result = await runner.get_simulation_result(execution_id) + print("SKLOGS: get api called") + return result + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py b/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py new file mode 100644 index 00000000..01c2bef1 --- /dev/null +++ b/projects/policyengine-api-full/src/policyengine_api/api/services/simulation_runner.py @@ -0,0 +1,155 @@ +import os +import json +import uuid +import asyncio +import httpx + +from typing import Literal, Any +from google.cloud import workflows_v1 +from google.cloud.workflows import executions_v1 +from google.cloud.workflows.executions_v1.types import Execution +from google.protobuf.json_format import MessageToDict +from policyengine_api_full.settings import get_settings, Environment + +class SimulationRunner: + def __init__(self): + # self.is_desktop = os.getenv("PE_MODE") == "desktop" + settings = get_settings() + self.is_desktop = settings.environment == Environment.DESKTOP + print(f"SKLOGS ENVIRONMENT: {get_settings().environment}") + + if not self.is_desktop: + self.project = "prod-api-v2-c4d5" + self.location = "us-central1" + self.workflow = "simulation-workflow" + self.execution_client = executions_v1.ExecutionsClient() + self.workflows_client = workflows_v1.WorkflowsClient() + else: + self.desktop_url = os.getenv( + "SIMULATION_LOCAL_URL", + "http://localhost:8081/simulate/economy/comparison", + ) + self._simulations: dict[str, dict] = {} + self._lock = asyncio.Lock() # To protect self._simulations + + def _build_payload( + self, + country_id: str, + reform: dict, + baseline: dict, + region: str, + dataset: str, + time_period: str, + scope: Literal["macro", "household"] = "macro", + model_version: str | None = None, + data_version: str | None = None, + ) -> dict[str, Any]: + return { + "country": country_id, + "scope": scope, + "reform": reform, + "baseline": baseline, + "time_period": time_period, + "region": region, + "data": dataset, + "model_version": model_version, + "data_version": data_version, + } + + async def start_simulation( + self, + country_id: str, + reform: dict, + baseline: dict, + region: str, + dataset: str, + time_period: str, + scope: Literal["macro", "household"] = "macro", + model_version: str | None = None, + data_version: str | None = None, + ) -> dict: + payload = self._build_payload( + country_id, + reform, + baseline, + region, + dataset, + time_period, + scope, + model_version, + data_version, + ) + + if self.is_desktop: + async with httpx.AsyncClient() as client: + response = await client.post(self.desktop_url, json=payload) + response.raise_for_status() + result = response.json() + + execution_id = f"desktop-{uuid.uuid4()}" + async with self._lock: + self._simulations[execution_id] = { + "status": "SUCCEEDED", + "result": result, + "error": None, + } + return {"execution_id": execution_id} + + else: + # Use asyncio.to_thread for blocking I/O + def create_execution(): + workflow_path = self.workflows_client.workflow_path( + self.project, self.location, self.workflow + ) + return self.execution_client.create_execution( + parent=workflow_path, + execution=Execution(argument=json.dumps(payload)), + ) + + execution = await asyncio.to_thread(create_execution) + return {"execution_id": execution.name} + + async def get_simulation_result(self, execution_id: str) -> dict: + if self.is_desktop: + print("SKLOGS: in desktop mode") + async with self._lock: + if execution_id not in self._simulations: + raise ValueError(f"Unknown execution ID: {execution_id}") + simulation = self._simulations[execution_id] + + return { + "execution_id": execution_id, + "status": simulation["status"], + "result": simulation["result"], + "error": simulation["error"], + } + + else: + print("SKLOGS: in prod mode") + + def get_execution(): + return self.execution_client.get_execution(name=execution_id) + + execution = await asyncio.to_thread(get_execution) + status = execution.state.name + + response = { + "execution_id": execution_id, + "status": status, + } + + if status == "SUCCEEDED": + response["result"] = json.loads(execution.result or "{}") + response["error"] = None + elif status == "FAILED": + try: + error_dict = MessageToDict(execution.error) + response["error"] = error_dict.get("message", str(execution.error)) + except Exception: + response["error"] = str(execution.error) + response["result"] = None + else: + response["result"] = None + response["error"] = None + + return response diff --git a/projects/policyengine-api-full/uv.lock b/projects/policyengine-api-full/uv.lock index 9ebefb70..7108032b 100644 --- a/projects/policyengine-api-full/uv.lock +++ b/projects/policyengine-api-full/uv.lock @@ -355,6 +355,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/96/7a8d271e91effa9ccc2fd7cfd5cf287a2d7900080a475477c2ac0c7a331d/google_cloud_trace-1.16.2-py3-none-any.whl", hash = "sha256:40fb74607752e4ee0f3d7e5fc6b8f6eb1803982254a1507ba918172484131456", size = 103755, upload-time = "2025-06-12T00:53:00.672Z" }, ] +[[package]] +name = "google-cloud-workflows" +version = "1.18.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core", extra = ["grpc"] }, + { name = "google-auth" }, + { name = "proto-plus" }, + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/67/70/19c6f828d9d677819837743b412a061d003497f433411dfbc86063e77c94/google_cloud_workflows-1.18.2.tar.gz", hash = "sha256:4bb936419bfe110c9f67a4631be340ffe6bd2014299cc4c4f9321b5e5202d4df", size = 209360, upload-time = "2025-06-12T00:13:16.243Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6c/46/471eb98ce1afa95003f1ebae0135dc6a710e9fb7144693710219ddd77c59/google_cloud_workflows-1.18.2-py3-none-any.whl", hash = "sha256:e1dd52203860d91363a005b6b41f97eb2439a7c954e9cd6f7c77d29fac0c4421", size = 198584, upload-time = "2025-06-12T00:13:14.96Z" }, +] + [[package]] name = "googleapis-common-protos" version = "1.70.0" @@ -867,6 +882,7 @@ name = "policyengine-api-full" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "google-cloud-workflows" }, { name = "opentelemetry-instrumentation-fastapi" }, { name = "opentelemetry-instrumentation-sqlalchemy" }, { name = "policyengine-fastapi" }, @@ -887,6 +903,7 @@ test = [ [package.metadata] requires-dist = [ { name = "black", marker = "extra == 'build'", specifier = ">=25.1.0" }, + { name = "google-cloud-workflows", specifier = ">=1.18.1" }, { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.51b0,<0.52" }, { name = "opentelemetry-instrumentation-sqlalchemy", specifier = ">=0.51b0,<0.52" }, { name = "policyengine-fastapi", editable = "../../libs/policyengine-fastapi" }, diff --git a/server_common.mk b/server_common.mk index 979cd154..5726dc7c 100644 --- a/server_common.mk +++ b/server_common.mk @@ -13,6 +13,10 @@ deploy: echo "Building ${SERVICE_NAME} docker image" cd ../../ && gcloud builds submit --region=${REGION} --substitutions=_IMAGE_TAG=${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/${SERVICE_NAME}:${TAG},_SERVICE_NAME=${SERVICE_NAME},_MODULE_NAME=${MODULE_NAME},_WORKER_COUNT=${WORKER_COUNT} ${BUILD_ARGS} +# Always run with a single worker in dev mode to match local desktop environment. +# Sets WORKER_COUNT=1 just for this target. +# Also set ENVIRONMENT=desktop in src/.env file for the app to pick up the correct config. +dev: WORKER_COUNT=1 dev: echo "Running ${SERVICE_NAME} dev instance" cd src && uv run uvicorn ${MODULE_NAME}:app --reload --port ${DEV_PORT} --workers ${WORKER_COUNT}