From 68d4c993d9357d60db16aadef7113c918ae10cfa Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 23 Oct 2025 16:41:52 -0700 Subject: [PATCH 1/5] use Anthropic Python SDK; support web_search (server tool), configurable via ANTHROPIC_ENABLE_WEB_SEARCH; rewritten functions using asyncio; support emitting web_search streaming reasoning messages. --- functions/pipes/anthropic/main.py | 217 +++++++++++++----------------- 1 file changed, 90 insertions(+), 127 deletions(-) diff --git a/functions/pipes/anthropic/main.py b/functions/pipes/anthropic/main.py index 44b8e8e..b51123b 100644 --- a/functions/pipes/anthropic/main.py +++ b/functions/pipes/anthropic/main.py @@ -1,41 +1,50 @@ """ title: Anthropic Manifold Pipe -authors: justinh-rahb, christian-taillon, jfbloom22 +authors: justinh-rahb, christian-taillon, jfbloom22, aaronchan0 author_url: https://github.com/justinh-rahb funding_url: https://github.com/open-webui -version: 0.3.0 +version: 0.4.0 required_open_webui_version: 0.3.17 license: MIT """ import os import requests -import json import time -from typing import List, Union, Generator, Iterator, Optional, Dict +import json +from typing import List, Union, AsyncGenerator, Iterator, Optional, Dict from pydantic import BaseModel, Field from open_webui.utils.misc import pop_system_message +from anthropic import AsyncAnthropic +from anthropic.types import TextBlock class Pipe: class Valves(BaseModel): ANTHROPIC_API_KEY: str = Field(default="") + ANTHROPIC_ENABLE_WEB_SEARCH: bool = Field(default=False) def __init__(self): self.type = "manifold" self.id = "anthropic" - self.name = "anthropic/" + self.name = "" # anthropic/" self.valves = self.Valves( - **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", "")} + **{"ANTHROPIC_API_KEY": os.getenv("ANTHROPIC_API_KEY", ""), "ANTHROPIC_ENABLE_WEB_SEARCH": os.getenv("ANTHROPIC_ENABLE_WEB_SEARCH", False)} ) self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image - + self.client: AsyncAnthropic = AsyncAnthropic(api_key=self.valves.ANTHROPIC_API_KEY) + # Model cache self._model_cache: Optional[List[Dict[str, str]]] = None self._model_cache_time: float = 0 self._cache_ttl = int(os.getenv("ANTHROPIC_MODEL_CACHE_TTL", "600")) - def get_anthropic_models_from_api(self, force_refresh: bool = False) -> List[Dict[str, str]]: + def get_client(self) -> AsyncAnthropic: + if self.client.api_key != self.valves.ANTHROPIC_API_KEY: + self.client: AsyncAnthropic = AsyncAnthropic(api_key=self.valves.ANTHROPIC_API_KEY) + return self.client + + async def get_anthropic_models_from_api(self, force_refresh: bool = False) -> List[Dict[str, str]]: """ Retrieve available Anthropic models from the API. Uses caching to reduce API calls. @@ -48,11 +57,7 @@ def get_anthropic_models_from_api(self, force_refresh: bool = False) -> List[Dic """ # Check cache first current_time = time.time() - if ( - not force_refresh - and self._model_cache is not None - and (current_time - self._model_cache_time) < self._cache_ttl - ): + if not force_refresh and self._model_cache is not None and (current_time - self._model_cache_time) < self._cache_ttl: return self._model_cache if not self.valves.ANTHROPIC_API_KEY: @@ -64,50 +69,32 @@ def get_anthropic_models_from_api(self, force_refresh: bool = False) -> List[Dic ] try: - headers = { - "x-api-key": self.valves.ANTHROPIC_API_KEY, - "anthropic-version": "2023-06-01", - "content-type": "application/json", - } - - response = requests.get( - "https://api.anthropic.com/v1/models", - headers=headers, - timeout=10 - ) - - if response.status_code != 200: - raise Exception(f"HTTP Error {response.status_code}: {response.text}") - - data = response.json() - models = [] - - for model in data.get("data", []): - models.append({ - "id": model["id"], - "name": model.get("display_name", model["id"]), - }) - + anthropic_models = await self.get_client().models.list() + models = [{"id": model.id, "name": model.display_name} for model in anthropic_models.data] + # Update cache self._model_cache = models self._model_cache_time = current_time - + return models - + except Exception as e: print(f"Error fetching Anthropic models: {e}") return [ - {"id": "error", "name": f"Could not fetch models from Anthropic: {str(e)}"} + { + "id": "error", + "name": f"Could not fetch models from Anthropic: {str(e)}", + } ] - def get_anthropic_models(self) -> List[Dict[str, str]]: + async def get_anthropic_models(self) -> List[Dict[str, str]]: """ Get Anthropic models from the API. """ - return self.get_anthropic_models_from_api() + return await self.get_anthropic_models_from_api() - def pipes(self) -> List[dict]: - return self.get_anthropic_models() + async def pipes(self) -> List[dict]: + return await self.get_anthropic_models() def process_image(self, image_data): """Process image data with size validation.""" @@ -118,9 +105,7 @@ def process_image(self, image_data): # Check base64 image size image_size = len(base64_data) * 3 / 4 # Convert base64 size to bytes if image_size > self.MAX_IMAGE_SIZE: - raise ValueError( - f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB" - ) + raise ValueError(f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB") return { "type": "image", @@ -137,16 +122,14 @@ def process_image(self, image_data): content_length = int(response.headers.get("content-length", 0)) if content_length > self.MAX_IMAGE_SIZE: - raise ValueError( - f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB" - ) + raise ValueError(f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB") return { "type": "image", "source": {"type": "url", "url": url}, } - def pipe(self, body: dict) -> Union[str, Generator, Iterator]: + async def pipe(self, body: dict) -> Union[str, AsyncGenerator, Iterator]: system_message, messages = pop_system_message(body["messages"]) processed_messages = [] @@ -166,20 +149,12 @@ def pipe(self, body: dict) -> Union[str, Generator, Iterator]: if processed_image["source"]["type"] == "base64": image_size = len(processed_image["source"]["data"]) * 3 / 4 total_image_size += image_size - if ( - total_image_size > 100 * 1024 * 1024 - ): # 100MB total limit - raise ValueError( - "Total size of images exceeds 100 MB limit" - ) + if total_image_size > 100 * 1024 * 1024: # 100MB total limit + raise ValueError("Total size of images exceeds 100 MB limit") else: - processed_content = [ - {"type": "text", "text": message.get("content", "")} - ] + processed_content = [{"type": "text", "text": message.get("content", "")}] - processed_messages.append( - {"role": message["role"], "content": processed_content} - ) + processed_messages.append({"role": message["role"], "content": processed_content}) payload = { "model": body["model"][body["model"].find(".") + 1 :], @@ -187,87 +162,75 @@ def pipe(self, body: dict) -> Union[str, Generator, Iterator]: "max_tokens": body.get("max_tokens", 4096), "temperature": body.get("temperature", 0.8), "top_k": body.get("top_k", 40), - "top_p": body.get("top_p", 0.9), + # "top_p": body.get("top_p", 0.9), "stop_sequences": body.get("stop", []), **({"system": str(system_message)} if system_message else {}), "stream": body.get("stream", False), } - - headers = { - "x-api-key": self.valves.ANTHROPIC_API_KEY, - "anthropic-version": "2023-06-01", - "content-type": "application/json", - } - - url = "https://api.anthropic.com/v1/messages" - + payload["tools"] = ( + [ + { + "type": "web_search_20250305", + "name": "web_search", + "max_uses": 5, + "user_location": { + "type": "approximate", + "city": "San Ramon", + "region": "California", + "country": "US", + "timezone": "America/Los_Angeles", + }, + } + ] + if self.valves.ANTHROPIC_ENABLE_WEB_SEARCH + else [] + ) try: if body.get("stream", False): - return self.stream_response(url, headers, payload) + return self.stream_response(payload) else: - return self.non_stream_response(url, headers, payload) - except requests.exceptions.RequestException as e: - print(f"Request failed: {e}") - return f"Error: Request failed: {e}" + return await self.non_stream_response(payload) except Exception as e: print(f"Error in pipe method: {e}") return f"Error: {e}" - def stream_response(self, url, headers, payload): + async def stream_response(self, payload): try: - with requests.post( - url, headers=headers, json=payload, stream=True, timeout=(3.05, 60) - ) as response: - if response.status_code != 200: - raise Exception( - f"HTTP Error {response.status_code}: {response.text}" - ) - - for line in response.iter_lines(): - if line: - line = line.decode("utf-8") - if line.startswith("data: "): - try: - data = json.loads(line[6:]) - if data["type"] == "content_block_start": - yield data["content_block"]["text"] - elif data["type"] == "content_block_delta": - yield data["delta"]["text"] - elif data["type"] == "message_stop": - break - elif data["type"] == "message": - for content in data.get("content", []): - if content["type"] == "text": - yield content["text"] - - time.sleep( - 0.01 - ) # Delay to avoid overwhelming the client - - except json.JSONDecodeError: - print(f"Failed to parse JSON: {line}") - except KeyError as e: - print(f"Unexpected data structure: {e}") - print(f"Full data: {data}") - except requests.exceptions.RequestException as e: - print(f"Request failed: {e}") - yield f"Error: Request failed: {e}" + async with self.get_client().messages.stream( + model=payload["model"], max_tokens=payload["max_tokens"], messages=payload["messages"], tools=payload["tools"] + ) as stream: + input_json: str = "" + is_thinking: bool = False + async for event in stream: + if event.type == "content_block_start": + if event.content_block.type == "server_tool_use" or event.content_block.type == "tool_use": + if not is_thinking: + is_thinking = True + yield "" + input_json = "" + elif event.content_block.type == "text": + if is_thinking: + yield "" + is_thinking = False + elif event.type == "content_block_stop": + if event.content_block.type == "server_tool_use" or event.content_block.type == "tool_use": + input_params = ", ".join([f"{key}: {value}" for key, value in json.loads(input_json).items()]) + yield f"calling {event.content_block.name} with {input_params}\n" + elif event.type == "content_block_delta": + if event.delta.type == "text_delta": + yield event.delta.text + elif event.delta.type == "input_json_delta": + input_json += event.delta.partial_json except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}" - def non_stream_response(self, url, headers, payload): + async def non_stream_response(self, payload): try: - response = requests.post( - url, headers=headers, json=payload, timeout=(3.05, 60) + resp = await self.get_client().messages.create( + model=payload["model"], max_tokens=payload["max_tokens"], messages=payload["messages"], tools=payload["tools"] ) - if response.status_code != 200: - raise Exception(f"HTTP Error {response.status_code}: {response.text}") - - res = response.json() - return ( - res["content"][0]["text"] if "content" in res and res["content"] else "" - ) - except requests.exceptions.RequestException as e: + return "\n".join([r.text if isinstance(r, TextBlock) else "" for r in resp.content]) + except Exception as e: print(f"Failed non-stream request: {e}") return f"Error: {e}" From ac7779627a560c70e887322b08c2b03fbe36ea3e Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Thu, 23 Oct 2025 22:09:11 -0700 Subject: [PATCH 2/5] added system message passing --- functions/pipes/anthropic/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/functions/pipes/anthropic/main.py b/functions/pipes/anthropic/main.py index b51123b..7b45abb 100644 --- a/functions/pipes/anthropic/main.py +++ b/functions/pipes/anthropic/main.py @@ -197,7 +197,7 @@ async def pipe(self, body: dict) -> Union[str, AsyncGenerator, Iterator]: async def stream_response(self, payload): try: async with self.get_client().messages.stream( - model=payload["model"], max_tokens=payload["max_tokens"], messages=payload["messages"], tools=payload["tools"] + model=payload["model"], max_tokens=payload["max_tokens"], system=payload["system"], messages=payload["messages"], tools=payload["tools"] ) as stream: input_json: str = "" is_thinking: bool = False @@ -228,7 +228,7 @@ async def stream_response(self, payload): async def non_stream_response(self, payload): try: resp = await self.get_client().messages.create( - model=payload["model"], max_tokens=payload["max_tokens"], messages=payload["messages"], tools=payload["tools"] + model=payload["model"], max_tokens=payload["max_tokens"], system=payload["system"], messages=payload["messages"], tools=payload["tools"] ) return "\n".join([r.text if isinstance(r, TextBlock) else "" for r in resp.content]) except Exception as e: From f5bbce1d594fbce0d952aff7c98c4e03e8fad7a2 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Sat, 6 Dec 2025 10:47:54 -0800 Subject: [PATCH 3/5] fixed error when no system prompt is passed in --- functions/pipes/anthropic/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/functions/pipes/anthropic/main.py b/functions/pipes/anthropic/main.py index 7b45abb..4fb8c22 100644 --- a/functions/pipes/anthropic/main.py +++ b/functions/pipes/anthropic/main.py @@ -164,7 +164,7 @@ async def pipe(self, body: dict) -> Union[str, AsyncGenerator, Iterator]: "top_k": body.get("top_k", 40), # "top_p": body.get("top_p", 0.9), "stop_sequences": body.get("stop", []), - **({"system": str(system_message)} if system_message else {}), + "system": str(system_message) if system_message else "", "stream": body.get("stream", False), } payload["tools"] = ( @@ -210,8 +210,8 @@ async def stream_response(self, payload): input_json = "" elif event.content_block.type == "text": if is_thinking: - yield "" is_thinking = False + yield "" elif event.type == "content_block_stop": if event.content_block.type == "server_tool_use" or event.content_block.type == "tool_use": input_params = ", ".join([f"{key}: {value}" for key, value in json.loads(input_json).items()]) From 9a83bdce67cdcc6e562cad5b9cda90658ae24ef4 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Sun, 7 Dec 2025 09:46:31 -0800 Subject: [PATCH 4/5] openai version --- functions/pipes/openai/main.py | 271 +++++++++++++++++++++------------ 1 file changed, 175 insertions(+), 96 deletions(-) diff --git a/functions/pipes/openai/main.py b/functions/pipes/openai/main.py index 3d87c4d..c984033 100644 --- a/functions/pipes/openai/main.py +++ b/functions/pipes/openai/main.py @@ -1,118 +1,197 @@ """ title: OpenAI Manifold Pipe -author: open-webui -author_url: https://github.com/open-webui -funding_url: https://github.com/open-webui -version: 0.1.2 +authors: aaronchan0 +version: 0.1.0 +required_open_webui_version: 0.6.41 +license: MIT """ -from pydantic import BaseModel, Field -from typing import Optional, Union, Generator, Iterator -from open_webui.utils.misc import get_last_user_message - import os import requests +import time +import re +import json +from typing import List, Union, AsyncGenerator, Iterator, Optional, Dict +from pydantic import BaseModel, Field +from open_webui.utils.misc import pop_system_message +from openai import AsyncOpenAI class Pipe: class Valves(BaseModel): - NAME_PREFIX: str = Field( - default="OPENAI/", - description="The prefix applied before the model names.", - ) - OPENAI_API_BASE_URL: str = Field( - default="https://api.openai.com/v1", - description="The base URL for OpenAI API endpoints.", - ) - OPENAI_API_KEY: str = Field( - default="", - description="Required API key to retrieve the model list.", - ) - pass - - class UserValves(BaseModel): - OPENAI_API_KEY: str = Field( - default="", - description="User-specific API key for accessing OpenAI services.", - ) + OPENAI_API_KEY: str = Field(default="") + OPENAI_ENABLE_WEB_SEARCH: bool = Field(default=False) def __init__(self): self.type = "manifold" - self.valves = self.Valves() - pass - - def pipes(self): - if self.valves.OPENAI_API_KEY: - try: - headers = {} - headers["Authorization"] = f"Bearer {self.valves.OPENAI_API_KEY}" - headers["Content-Type"] = "application/json" - - r = requests.get( - f"{self.valves.OPENAI_API_BASE_URL}/models", headers=headers - ) - - models = r.json() - return [ - { - "id": model["id"], - "name": f'{self.valves.NAME_PREFIX}{model["name"] if "name" in model else model["id"]}', - } - for model in models["data"] - if "gpt" in model["id"] - ] - - except Exception as e: - - print(f"Error: {e}") - return [ - { - "id": "error", - "name": "Could not fetch models from OpenAI, please update the API Key in the valves.", - }, - ] - else: - return [ - { - "id": "error", - "name": "Global API Key not provided.", - }, - ] - - def pipe(self, body: dict, __user__: dict) -> Union[str, Generator, Iterator]: - # This is where you can add your custom pipelines like RAG. - print(f"pipe:{__name__}") - print(__user__) - - user_valves = __user__.get("valves") - - if not user_valves: - raise Exception("User Valves not configured.") + self.id = "openai" + self.name = "" # openai/" + self.valves = self.Valves( + **{"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", ""), "OPENAI_ENABLE_WEB_SEARCH": os.getenv("OPENAI_ENABLE_WEB_SEARCH", False)} + ) + self.MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB per image + self.client: AsyncOpenAI = AsyncOpenAI(api_key=self.valves.OPENAI_API_KEY) + + # Model cache + self._model_cache: Optional[List[Dict[str, str]]] = None + self._model_cache_time: float = 0 + self._cache_ttl = int(os.getenv("OPENAI_MODEL_CACHE_TTL", "600")) + + def get_client(self) -> AsyncOpenAI: + if self.client.api_key != self.valves.OPENAI_API_KEY: + self.client: AsyncOpenAI = AsyncOpenAI(api_key=self.valves.OPENAI_API_KEY) + return self.client + + async def get_openai_models_from_api(self, force_refresh: bool = False) -> List[Dict[str, str]]: + """ + Retrieve available Anthropic models from the API. + Uses caching to reduce API calls. + + Args: + force_refresh: Whether to force refreshing the model cache + + Returns: + List of dictionaries containing model id and name. + """ + # Check cache first + current_time = time.time() + if not force_refresh and self._model_cache is not None and (current_time - self._model_cache_time) < self._cache_ttl: + return self._model_cache + + if not self.valves.OPENAI_API_KEY: + return [{"id": "error", "name": "OPENAI_API_KEY is not set. Please update the API Key in the valves."}] - if not user_valves.OPENAI_API_KEY: - raise Exception("OPENAI_API_KEY not provided by the user.") + try: + openai_models = await self.get_client().models.list() + models = [{"id": model.id, "name": model.id} for model in openai_models.data if re.search(r"gpt-(4\.1|5\.1)", model.id)] + models.sort(key=lambda x: x["name"]) - headers = {} - headers["Authorization"] = f"Bearer {user_valves.OPENAI_API_KEY}" - headers["Content-Type"] = "application/json" + # Update cache + self._model_cache = models + self._model_cache_time = current_time - model_id = body["model"][body["model"].find(".") + 1 :] - payload = {**body, "model": model_id} - print(payload) + return models + except Exception as e: + print(f"Error fetching OpenAI models: {e}") + return [{"id": "error", "name": f"Could not fetch models from OpenAI: {str(e)}"}] + + async def get_openai_models(self) -> List[Dict[str, str]]: + """ + Get OpenAI models from the API. + """ + return await self.get_openai_models_from_api() + + async def pipes(self) -> List[dict]: + return await self.get_openai_models() + + def process_image(self, image_data): + """Process image data with size validation.""" + if image_data["image_url"]["url"].startswith("data:image"): + mime_type, base64_data = image_data["image_url"]["url"].split(",", 1) + media_type = mime_type.split(":")[1].split(";")[0] + + # Check base64 image size + image_size = len(base64_data) * 3 / 4 # Convert base64 size to bytes + if image_size > self.MAX_IMAGE_SIZE: + raise ValueError(f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB") + + return { + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": base64_data, + }, + } + else: + # For URL images, perform size check after fetching + url = image_data["image_url"]["url"] + response = requests.head(url, allow_redirects=True) + content_length = int(response.headers.get("content-length", 0)) + + if content_length > self.MAX_IMAGE_SIZE: + raise ValueError(f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB") + + return { + "type": "image", + "source": {"type": "url", "url": url}, + } + + async def pipe(self, body: dict) -> Union[str, AsyncGenerator, Iterator]: + system_message, messages = pop_system_message(body["messages"]) + + processed_messages = [] + total_image_size = 0 + + for message in messages: + processed_content = [] + if isinstance(message.get("content"), list): + for item in message["content"]: + if item["type"] == "text": + processed_content.append({"type": "text", "text": item["text"]}) + elif item["type"] == "image_url": + processed_image = self.process_image(item) + processed_content.append(processed_image) + + # Track total size for base64 images + if processed_image["source"]["type"] == "base64": + image_size = len(processed_image["source"]["data"]) * 3 / 4 + total_image_size += image_size + if total_image_size > 100 * 1024 * 1024: # 100MB total limit + raise ValueError("Total size of images exceeds 100 MB limit") + else: + processed_content = [{"type": "input_text" if message["role"] == "user" else "output_text", "text": message.get("content", "")}] + + processed_messages.append({"role": message["role"], "content": processed_content}) + + payload = { + "model": body["model"][body["model"].find(".") + 1 :], + "input": processed_messages, + "max_output_tokens": body.get("max_tokens", 4096), + # "temperature": body.get("temperature", 0.8), + "instructions": str(system_message) if system_message else "", + "tools": ( + [ + { + "type": "web_search", + "user_location": { + "type": "approximate", + "country": "US", + "city": "San Ramon", + "region": "CA", + }, + } + ] + if self.valves.OPENAI_ENABLE_WEB_SEARCH + else [] + ), + } try: - r = requests.post( - url=f"{self.valves.OPENAI_API_BASE_URL}/chat/completions", - json=payload, - headers=headers, - stream=True, - ) + if body.get("stream", False): + return self.stream_response(payload) + else: + return await self.non_stream_response(payload) + except Exception as e: + print(f"Error in pipe method: {e}") + return f"Error: {e}" - r.raise_for_status() + async def stream_response(self, payload): + try: + async with self.get_client().responses.stream(**payload) as stream: + input_json: str = "" + is_thinking: bool = False + async for event in stream: + if event.type == "response.output_text.delta": + yield event.delta + except Exception as e: + print(f"General error in stream_response method: {e}") + yield f"Error: {e}" - if body["stream"]: - return r.iter_lines() - else: - return r.json() + async def non_stream_response(self, payload): + try: + resp = await self.get_client().responses.create(**payload) + return resp.output_text except Exception as e: + print(f"Failed non-stream request: {e}") return f"Error: {e}" From 5f2a754051d56b511bbe0bc586a14431477e18a2 Mon Sep 17 00:00:00 2001 From: Aaron Chan Date: Mon, 8 Dec 2025 07:25:25 -0800 Subject: [PATCH 5/5] added thinking support for web search; fixed image upload --- functions/pipes/openai/main.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/functions/pipes/openai/main.py b/functions/pipes/openai/main.py index c984033..beb9f04 100644 --- a/functions/pipes/openai/main.py +++ b/functions/pipes/openai/main.py @@ -96,14 +96,7 @@ def process_image(self, image_data): if image_size > self.MAX_IMAGE_SIZE: raise ValueError(f"Image size exceeds 5MB limit: {image_size / (1024 * 1024):.2f}MB") - return { - "type": "image", - "source": { - "type": "base64", - "media_type": media_type, - "data": base64_data, - }, - } + return {"type": "input_image", "image_url": f"{mime_type},{base64_data}"} else: # For URL images, perform size check after fetching url = image_data["image_url"]["url"] @@ -114,7 +107,7 @@ def process_image(self, image_data): raise ValueError(f"Image at URL exceeds 5MB limit: {content_length / (1024 * 1024):.2f}MB") return { - "type": "image", + "type": "input_image", "source": {"type": "url", "url": url}, } @@ -129,19 +122,18 @@ async def pipe(self, body: dict) -> Union[str, AsyncGenerator, Iterator]: if isinstance(message.get("content"), list): for item in message["content"]: if item["type"] == "text": - processed_content.append({"type": "text", "text": item["text"]}) + processed_content.append({"type": "input_text" if message["role"] == "user" else "output_text", "text": item["text"]}) elif item["type"] == "image_url": processed_image = self.process_image(item) processed_content.append(processed_image) # Track total size for base64 images - if processed_image["source"]["type"] == "base64": - image_size = len(processed_image["source"]["data"]) * 3 / 4 - total_image_size += image_size - if total_image_size > 100 * 1024 * 1024: # 100MB total limit - raise ValueError("Total size of images exceeds 100 MB limit") + image_size = len(processed_image["image_url"]) * 3 / 4 + total_image_size += image_size + if total_image_size > 100 * 1024 * 1024: # 100MB total limit + raise ValueError("Total size of images exceeds 100 MB limit") else: - processed_content = [{"type": "input_text" if message["role"] == "user" else "output_text", "text": message.get("content", "")}] + processed_content = message.get("content", "") processed_messages.append({"role": message["role"], "content": processed_content}) @@ -184,6 +176,10 @@ async def stream_response(self, payload): async for event in stream: if event.type == "response.output_text.delta": yield event.delta + elif event.type == "response.web_search_call.in_progress": + yield "performing web search..." + elif event.type == "response.web_search_call.completed": + yield "" except Exception as e: print(f"General error in stream_response method: {e}") yield f"Error: {e}"