Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pgmq"
version = "1.0.2"
version = "1.0.3"
description = "Python client for the PGMQ Postgres extension."
readme = "README.md"
license = "Apache-2.0"
Expand Down
24 changes: 21 additions & 3 deletions src/pgmq/queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from dataclasses import dataclass, field
from typing import Optional, List, Union
from collections.abc import Callable
from typing import Optional, List, Union, Any
from psycopg.types.json import Jsonb
from psycopg.conninfo import make_conninfo
from psycopg_pool import ConnectionPool
import os
from pgmq.messages import Message, QueueMetrics
Expand All @@ -21,7 +23,7 @@ class PGMQueue:
delay: int = 0
vt: int = 30
pool_size: int = 10
kwargs: dict = field(default_factory=dict)
kwargs: Union[dict, Callable[[], dict[str, Any]]] = field(default_factory=dict)
verbose: bool = False
log_filename: Optional[str] = None
init_extension: bool = True
Expand All @@ -37,7 +39,23 @@ def __post_init__(self) -> None:
user={self.username}
password={self.password}
"""
self.pool = ConnectionPool(conninfo, open=True, **self.kwargs)
if callable(self.kwargs):
# When kwargs is callable, create a callable conninfo that merges
# the base connection string with dynamic values (e.g., IAM auth tokens).
# psycopg_pool calls this each time a new connection is needed.
kwargs_callable = self.kwargs

def get_conninfo() -> str:
extra = kwargs_callable() # e.g., {"password": "fresh_token"}
return make_conninfo(conninfo, **extra)

self.pool = ConnectionPool(get_conninfo, open=True)
else:
if "kwargs" in self.kwargs:
raise TypeError(
"The 'kwargs' key is reserved for callables and cannot be used in the kwargs dictionary."
)
self.pool = ConnectionPool(conninfo, open=True, **self.kwargs)
self._initialize_logging()
if self.init_extension:
self._initialize_extensions()
Expand Down
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.