psycopg_pool
– Connection pool implementations#
A connection pool is an object used to create and maintain a limited amount of PostgreSQL connections, reducing the time requested by the program to obtain a working connection and allowing an arbitrary large number of concurrent threads or tasks to use a controlled amount of resources on the server. See Connection pools for more details and usage pattern.
This package exposes a few connection pool classes:
ConnectionPool
is a synchronous connection pool yieldingConnection
objects and can be used by multithread applications.AsyncConnectionPool
has an interface similar toConnectionPool
, but withasyncio
functions replacing blocking functions, and yieldsAsyncConnection
instances.NullConnectionPool
is aConnectionPool
subclass exposing the same interface of its parent, but not keeping any unused connection in its state. See Null connection pools for details about related use cases.AsyncNullConnectionPool
has the same behaviour of theNullConnectionPool
, but with the same async interface of theAsyncConnectionPool
.
Note
The psycopg_pool
package is distributed separately from the main
psycopg
package: use pip install "psycopg[pool]"
, or pip install
psycopg_pool
, to make it available. See Installing the connection pool.
The version numbers indicated in this page refer to the psycopg_pool
package, not to psycopg
.
The ConnectionPool
class#
- class psycopg_pool.ConnectionPool(conninfo: str = '', *, connection_class: type[~CT] = <class 'psycopg.Connection'>, kwargs: ~typing.Optional[dict[str, typing.Any]] = None, min_size: int = 4, max_size: ~typing.Optional[int] = None, open: ~typing.Optional[bool] = None, configure: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.CT], None]] = None, check: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.CT], None]] = None, reset: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.CT], None]] = None, name: ~typing.Optional[str] = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Optional[~typing.Callable[[~psycopg_pool.pool.ConnectionPool[~typing.Any]], None]] = None, num_workers: int = 3)#
This class implements a connection pool serving
Connection
instances (or subclasses). The constructor has alot of arguments, but onlyconninfo
andmin_size
are the fundamental ones, all the other arguments have meaningful defaults and can probably be tweaked later, if required.- Parameters:
conninfo (
str
) – The connection string. Seeconnect()
for details.connection_class (
type
, default:Connection
) – The class of the connections to serve. It should be aConnection
subclass.kwargs (
dict
) – Extra arguments to pass toconnect()
. Note that this is one dict argument of the pool constructor, which is expanded asconnect()
keyword parameters.min_size (
int
, default: 4) – The minimum number of connection the pool will hold. The pool will actively try to create new connections if some are lost (closed, broken) and will try to never go belowmin_size
.max_size (
int
, default:None
) – The maximum number of connections the pool will hold. IfNone
, or equal tomin_size
, the pool will not grow or shrink. If larger thanmin_size
, the pool can grow if more thanmin_size
connections are requested at the same time and will shrink back after the extra connections have been unused for more thanmax_idle
seconds.open (
bool
, default:True
) – IfTrue
, open the pool, creating the required connections, on init. IfFalse
, open the pool whenopen()
is called or when the pool context is entered. See theopen()
method documentation for more details.configure (
Callable[[Connection], None]
) – A callback to configure a connection after creation. Useful, for instance, to configure its adapters. If the connection is used to run internal queries (to inspect the database) make sure to close an eventual transaction before leaving the function.check (
Callable[[Connection], None]
) – A callback to check that a connection is working correctly when obtained by the pool. The callback is called at everygetconn()
orconnection()
: the connection is only passed to the client if the callback doesn’t throw an exception. By default no check is made on the connection. You can provide thecheck_connection()
pool static method if you want to perform a simple check.reset (
Callable[[Connection], None]
) – A callback to reset a function after it has been returned to the pool. The connection is guaranteed to be passed to thereset()
function in “idle” state (no transaction). When leaving thereset()
function the connection must be left in idle state, otherwise it is discarded.name (
str
) – An optional name to give to the pool, useful, for instance, to identify it in the logs if more than one pool is used. if not specified pick a sequential name such aspool-1
,pool-2
, etc.timeout (
float
, default: 30 seconds) – The default maximum time in seconds that a client can wait to receive a connection from the pool (usingconnection()
orgetconn()
). Note that these methods allow to override thetimeout
default.max_waiting (
int
, default: 0) – Maximum number of requests that can be queued to the pool, after which new requests will fail, raisingTooManyRequests
. 0 means no queue limit.max_lifetime (
float
, default: 1 hour) – The maximum lifetime of a connection in the pool, in seconds. Connections used for longer get closed and replaced by a new one. The amount is reduced by a random 10% to avoid mass eviction.max_idle (
float
, default: 10 minutes) – Maximum time, in seconds, that a connection can stay unused in the pool before being closed, and the pool shrunk. This only happens to connections more thanmin_size
, ifmax_size
allowed the pool to grow.reconnect_timeout (
float
, default: 5 minutes) – Maximum time, in seconds, the pool will try to create a connection. If a connection attempt fails, the pool will try to reconnect a few times, using an exponential backoff and some random factor to avoid mass attempts. If repeated attempts fail, afterreconnect_timeout
second the connection attempt is aborted and thereconnect_failed()
callback invoked.reconnect_failed (
Callable[[ConnectionPool], None]
) – Callback invoked if an attempt to create a new connection fails for more thanreconnect_timeout
seconds. The user may decide, for instance, to terminate the program (executingsys.exit()
). By default don’t do anything: restart a new connection attempt (if the number of connection fell belowmin_size
).num_workers (
int
, default: 3) – Number of background worker threads used to maintain the pool state. Background workers are used for example to create new connections and to clean up connections when they are returned to the pool.
Changed in version 3.1: added
open
parameter to the constructor.Changed in version 3.2: added
check
parameter to the constructor.Changed in version 3.2: The class is generic and
connection_class
provides types type variable. See Generic pool types.Warning
At the moment, the default value for the
open
parameter isTrue
; In a future version, the default will be changed toFalse
.If you expect the pool to be open on creation even if you don’t use the pool as context manager, you should specify the parameter
open=True
explicitly.Starting from psycopg_pool 3.2, a warning is raised if the pool is used with the expectation of being implicitly opened in the constructor and
open
is not specified.- connection(timeout: float | None = None) Iterator[CT] #
Context manager to obtain a connection from the pool.
Return the connection immediately if available, otherwise wait up to timeout or
self.timeout
seconds and throwPoolTimeout
if a connection is not available in time.Upon context exit, return the connection to the pool. Apply the normal connection context behaviour (commit/rollback the transaction in case of success/error). If the connection is no more in working state, replace it with a new one.
with my_pool.connection() as conn: conn.execute(...) # the connection is now back in the pool
Changed in version 3.2: The connection returned is annotated as defined in
connection_class
. See Generic pool types.
- open(wait: bool = False, timeout: float = 30.0) None #
Open the pool by starting connecting and and accepting clients.
If wait is
False
, return immediately and let the background worker fill the pool ifmin_size
> 0. Otherwise wait up to timeout seconds for the requested number of connections to be ready (seewait()
for details).It is safe to call
open()
again on a pool already open (because the method was already called, or because the pool context was entered, or because the pool was initialized with open =True
) but you cannot currently re-open a closed pool.New in version 3.1.
- close(timeout: float = 5.0) None #
Close the pool and make it unavailable to new clients.
All the waiting and future clients will fail to acquire a connection with a
PoolClosed
exception. Currently used connections will not be closed until returned to the pool.Wait timeout seconds for threads to terminate their job, if positive. If the timeout expires the pool is closed anyway, although it may raise some warnings on exit.
Note
The pool can be also used as a context manager, in which case it will be opened (if necessary) on entering the block and closed on exiting it:
with ConnectionPool(...) as pool: # code using the pool
- wait(timeout: float = 30.0) None #
Wait for the pool to be full (with
min_size
connections) after creation.Close the pool, and raise
PoolTimeout
, if not ready within timeout sec.Calling this method is not mandatory: you can try and use the pool immediately after its creation. The first client will be served as soon as a connection is ready. You can use this method if you prefer your program to terminate in case the environment is not configured properly, rather than trying to stay up the hardest it can.
- min_size#
- resize(min_size: int, max_size: Optional[int] = None) None #
Change the size of the pool during runtime.
- check() None #
Verify the state of the connections currently in the pool.
Test each connection: if it works return it to the pool, otherwise dispose of it and create a new one.
- static check_connection(conn: CT) None #
A simple check to verify that a connection is still working.
Return quietly if the connection is still working, otherwise raise an exception.
Used internally by
check()
, but also available for client usage, for instance ascheck
callback when a pool is created.New in version 3.2.
- pop_stats() dict[str, int] #
Return current stats about the pool usage.
After the call, all the counters are reset to zero.
See Pool stats for the metrics returned.
Functionalities you may not need
- getconn(timeout: Optional[float] = None) CT #
Obtain a connection from the pool.
You should preferably use
connection()
. Use this function only if it is not possible to use the connection as context manager.After using this function you must call a corresponding
putconn()
: failing to do so will deplete the pool. A depleted pool is a sad pool: you don’t want a depleted pool.
- putconn(conn: CT) None #
Return a connection to the loving hands of its pool.
Use this function only paired with a
getconn()
. You don’t need to use it if you use the much more comfortableconnection()
context manager.
Pool exceptions#
- class psycopg_pool.PoolTimeout#
The pool couldn’t provide a connection in acceptable time.
Subclass of
OperationalError
- class psycopg_pool.PoolClosed#
Attempt to get a connection from a closed pool.
Subclass of
OperationalError
- class psycopg_pool.TooManyRequests#
Too many requests in the queue waiting for a connection from the pool.
Subclass of
OperationalError
The AsyncConnectionPool
class#
AsyncConnectionPool
has a very similar interface to the ConnectionPool
class but its blocking methods are implemented as async
coroutines. It
returns instances of AsyncConnection
, or of its subclass if
specified so in the connection_class
parameter.
Only the functions and parameters with different signature from
ConnectionPool
are listed here.
- class psycopg_pool.AsyncConnectionPool(conninfo: str = '', *, connection_class: type[~ACT] = <class 'psycopg.AsyncConnection'>, kwargs: ~typing.Optional[dict[str, typing.Any]] = None, min_size: int = 4, max_size: ~typing.Optional[int] = None, open: ~typing.Optional[bool] = None, configure: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.ACT], ~collections.abc.Awaitable[None]]] = None, check: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.ACT], ~collections.abc.Awaitable[None]]] = None, reset: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.ACT], ~collections.abc.Awaitable[None]]] = None, name: ~typing.Optional[str] = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Optional[~typing.Union[~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], None], ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], ~collections.abc.Awaitable[None]]]] = None, num_workers: int = 3)#
- Parameters:
connection_class (
type
, default:AsyncConnection
) – The class of the connections to serve. It should be anAsyncConnection
subclass.check (
async Callable[[Connection], None]
) – A callback to check that a connection is working correctly when obtained by the pool.configure (
async Callable[[AsyncConnection], None]
) – A callback to configure a connection after creation.reset (
async Callable[[AsyncConnection], None]
) – A callback to reset a function after it has been returned to the pool.reconnect_failed (
Callable[[AsyncConnectionPool], None]
orasync Callable[[AsyncConnectionPool], None]
) – Callback invoked if an attempt to create a new connection fails for more thanreconnect_timeout
seconds.
Changed in version 3.2: added
check
parameter to the constructor.Changed in version 3.2: The
reconnect_failed
parameter can beasync
.Warning
Opening an async pool in the constructor (using
open=True
on init) will become an error in a future pool versions. Please note that, currently,open=True
is the default; in a future version, the default for the parameter will be changed toFalse
.In order to make sure that your code will keep working as expected in future versions, please specify
open=False
in the constructor and use an explicitawait pool.open()
:pool = AsyncConnectionPool(..., open=False) await pool.open()
or use the pool context manager:
async with AsyncConnectionPool(..., open=False) as pool: ...
Starting from psycopg_pool 3.2, opening an async pool in the constructor raises a warning.
- connection(timeout: float | None = None) AsyncIterator[ACT] #
Context manager to obtain a connection from the pool.
Return the connection immediately if available, otherwise wait up to timeout or
self.timeout
seconds and throwPoolTimeout
if a connection is not available in time.Upon context exit, return the connection to the pool. Apply the normal connection context behaviour (commit/rollback the transaction in case of success/error). If the connection is no more in working state, replace it with a new one.
async with my_pool.connection() as conn: await conn.execute(...) # the connection is now back in the pool
- async open(wait: bool = False, timeout: float = 30.0) None #
Open the pool by starting connecting and and accepting clients.
If wait is
False
, return immediately and let the background worker fill the pool ifmin_size
> 0. Otherwise wait up to timeout seconds for the requested number of connections to be ready (seewait()
for details).It is safe to call
open()
again on a pool already open (because the method was already called, or because the pool context was entered, or because the pool was initialized with open =True
) but you cannot currently re-open a closed pool.
- async close(timeout: float = 5.0) None #
Close the pool and make it unavailable to new clients.
All the waiting and future clients will fail to acquire a connection with a
PoolClosed
exception. Currently used connections will not be closed until returned to the pool.Wait timeout seconds for threads to terminate their job, if positive. If the timeout expires the pool is closed anyway, although it may raise some warnings on exit.
Note
The pool can be also used as an async context manager, in which case it will be opened (if necessary) on entering the block and closed on exiting it:
async with AsyncConnectionPool(...) as pool: # code using the pool
All the other constructor parameters are the same of
ConnectionPool
.- async wait(timeout: float = 30.0) None #
Wait for the pool to be full (with
min_size
connections) after creation.Close the pool, and raise
PoolTimeout
, if not ready within timeout sec.Calling this method is not mandatory: you can try and use the pool immediately after its creation. The first client will be served as soon as a connection is ready. You can use this method if you prefer your program to terminate in case the environment is not configured properly, rather than trying to stay up the hardest it can.
- async resize(min_size: int, max_size: Optional[int] = None) None #
Change the size of the pool during runtime.
- async check() None #
Verify the state of the connections currently in the pool.
Test each connection: if it works return it to the pool, otherwise dispose of it and create a new one.
- async static check_connection(conn: ACT) None #
A simple check to verify that a connection is still working.
Return quietly if the connection is still working, otherwise raise an exception.
Used internally by
check()
, but also available for client usage, for instance ascheck
callback when a pool is created.New in version 3.2.
- async getconn(timeout: Optional[float] = None) ACT #
Obtain a connection from the pool.
You should preferably use
connection()
. Use this function only if it is not possible to use the connection as context manager.After using this function you must call a corresponding
putconn()
: failing to do so will deplete the pool. A depleted pool is a sad pool: you don’t want a depleted pool.
- async putconn(conn: ACT) None #
Return a connection to the loving hands of its pool.
Use this function only paired with a
getconn()
. You don’t need to use it if you use the much more comfortableconnection()
context manager.
Null connection pools#
New in version 3.1.
The NullConnectionPool
is a ConnectionPool
subclass which doesn’t create
connections preemptively and doesn’t keep unused connections in its state. See
Null connection pools for further details.
The interface of the object is entirely compatible with its parent class. Its behaviour is similar, with the following differences:
- class psycopg_pool.NullConnectionPool(conninfo: str = '', *, connection_class: type[~CT] = <class 'psycopg.Connection'>, kwargs: ~typing.Optional[dict[str, typing.Any]] = None, min_size: int = 0, max_size: ~typing.Optional[int] = None, open: ~typing.Optional[bool] = None, configure: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.CT], None]] = None, check: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.CT], None]] = None, reset: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.CT], None]] = None, name: ~typing.Optional[str] = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Optional[~typing.Callable[[~psycopg_pool.pool.ConnectionPool[~typing.Any]], None]] = None, num_workers: int = 3)#
All the other constructor parameters are the same as in
ConnectionPool
.- Parameters:
min_size (
int
, default: 0) – Always 0, cannot be changed.max_size (
int
, default: None) – If None or 0, create a new connection at every request, without a maximum. If greater than 0, don’t create more thanmax_size
connections and queue the waiting clients.reset (
Callable[[Connection], None]
) – It is only called when there are waiting clients in the queue, before giving them a connection already open. If no client is waiting, the connection is closed and discarded without a fuss.max_idle – Ignored, as null pools don’t leave idle connections sitting around.
- wait(timeout: float = 30.0) None #
Create a connection for test.
Calling this function will verify that the connectivity with the database works as expected. However the connection will not be stored in the pool.
Close the pool, and raise
PoolTimeout
, if not ready within timeout sec.
The AsyncNullConnectionPool
is, similarly, an AsyncConnectionPool
subclass
with the same behaviour of the NullConnectionPool
.
- class psycopg_pool.AsyncNullConnectionPool(conninfo: str = '', *, connection_class: type[~ACT] = <class 'psycopg.AsyncConnection'>, kwargs: ~typing.Optional[dict[str, typing.Any]] = None, min_size: int = 0, max_size: ~typing.Optional[int] = None, open: ~typing.Optional[bool] = None, configure: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.ACT], ~collections.abc.Awaitable[None]]] = None, check: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.ACT], ~collections.abc.Awaitable[None]]] = None, reset: ~typing.Optional[~typing.Callable[[~psycopg_pool.abc.ACT], ~collections.abc.Awaitable[None]]] = None, name: ~typing.Optional[str] = None, timeout: float = 30.0, max_waiting: int = 0, max_lifetime: float = 3600.0, max_idle: float = 600.0, reconnect_timeout: float = 300.0, reconnect_failed: ~typing.Optional[~typing.Union[~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], None], ~typing.Callable[[~psycopg_pool.pool_async.AsyncConnectionPool[~typing.Any]], ~collections.abc.Awaitable[None]]]] = None, num_workers: int = 3)#
The interface is the same of its parent class
AsyncConnectionPool
. The behaviour is different in the same way described forNullConnectionPool
.