From 08e289a68197dd21476908b2063202ceb2674fca Mon Sep 17 00:00:00 2001 From: Rob Clevenger Date: Tue, 16 Dec 2025 14:14:13 -0800 Subject: [PATCH 1/4] let kwargs be callable --- src/pgmq/queue.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/pgmq/queue.py b/src/pgmq/queue.py index 8883567..de0ab9d 100644 --- a/src/pgmq/queue.py +++ b/src/pgmq/queue.py @@ -1,5 +1,6 @@ from dataclasses import dataclass, field -from typing import Optional, List, Union +from collections.abc import Callable +from typing import Optional, List, Union, Any from psycopg.types.json import Jsonb from psycopg_pool import ConnectionPool import os @@ -21,7 +22,7 @@ class PGMQueue: delay: int = 0 vt: int = 30 pool_size: int = 10 - kwargs: dict = field(default_factory=dict) + kwargs: Union[dict, Callable[[], dict[str, Any]]] = field(default_factory=dict) verbose: bool = False log_filename: Optional[str] = None init_extension: bool = True @@ -37,7 +38,10 @@ def __post_init__(self) -> None: user={self.username} password={self.password} """ - self.pool = ConnectionPool(conninfo, open=True, **self.kwargs) + if callable(self.kwargs): + self.pool = ConnectionPool(conninfo, open=True, kwargs=self.kwargs) + else: + self.pool = ConnectionPool(conninfo, open=True, **self.kwargs) self._initialize_logging() if self.init_extension: self._initialize_extensions() From d6137a7c6cd31afcc4e2db48a49273a8825af81a Mon Sep 17 00:00:00 2001 From: Rob Clevenger Date: Tue, 16 Dec 2025 14:28:23 -0800 Subject: [PATCH 2/4] update version to 1.0.3 --- pyproject.toml | 2 +- uv.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 09bc7e6..cd0f10a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "pgmq" -version = "1.0.2" +version = "1.0.3" description = "Python client for the PGMQ Postgres extension." readme = "README.md" license = "Apache-2.0" diff --git a/uv.lock b/uv.lock index 4f4b499..82dfe30 100644 --- a/uv.lock +++ b/uv.lock @@ -1428,7 +1428,7 @@ wheels = [ [[package]] name = "pgmq" -version = "1.0.1" +version = "1.0.3" source = { editable = "." } dependencies = [ { name = "orjson" }, From 74f8c94ab92a9622af338d705773ef8f31c10d53 Mon Sep 17 00:00:00 2001 From: Rob Clevenger Date: Tue, 16 Dec 2025 16:03:02 -0800 Subject: [PATCH 3/4] addressed code review feedback --- src/pgmq/queue.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/pgmq/queue.py b/src/pgmq/queue.py index de0ab9d..0d08c76 100644 --- a/src/pgmq/queue.py +++ b/src/pgmq/queue.py @@ -41,6 +41,10 @@ def __post_init__(self) -> None: if callable(self.kwargs): self.pool = ConnectionPool(conninfo, open=True, kwargs=self.kwargs) else: + if "kwargs" in self.kwargs: + raise TypeError( + "The 'kwargs' key is reserved for callables and cannot be used in the kwargs dictionary." + ) self.pool = ConnectionPool(conninfo, open=True, **self.kwargs) self._initialize_logging() if self.init_extension: From c30f1c6f398274184f32abb1ffb45759cdae19a0 Mon Sep 17 00:00:00 2001 From: Rob Clevenger Date: Tue, 16 Dec 2025 17:14:52 -0800 Subject: [PATCH 4/4] When kwargs is a callable, use psycopg's make_conninfo to dynamically merge base connection parameters with values returned by the callable (e.g., fresh IAM authentication tokens). This allows the connection pool to fetch updated credentials each time a new connection is created. The other approach wasn't working on cloud run --- src/pgmq/queue.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/pgmq/queue.py b/src/pgmq/queue.py index 0d08c76..26b2c2c 100644 --- a/src/pgmq/queue.py +++ b/src/pgmq/queue.py @@ -2,6 +2,7 @@ from collections.abc import Callable from typing import Optional, List, Union, Any from psycopg.types.json import Jsonb +from psycopg.conninfo import make_conninfo from psycopg_pool import ConnectionPool import os from pgmq.messages import Message, QueueMetrics @@ -39,7 +40,16 @@ def __post_init__(self) -> None: password={self.password} """ if callable(self.kwargs): - self.pool = ConnectionPool(conninfo, open=True, kwargs=self.kwargs) + # When kwargs is callable, create a callable conninfo that merges + # the base connection string with dynamic values (e.g., IAM auth tokens). + # psycopg_pool calls this each time a new connection is needed. + kwargs_callable = self.kwargs + + def get_conninfo() -> str: + extra = kwargs_callable() # e.g., {"password": "fresh_token"} + return make_conninfo(conninfo, **extra) + + self.pool = ConnectionPool(get_conninfo, open=True) else: if "kwargs" in self.kwargs: raise TypeError(