Designing a connection pool for psycopg3
Posted by Daniele Varrazzo on 2021-01-17
Tagged as
psycopg3,
development
The psycopg2 pool is a pretty simple object, little more than... a pool of open connections, and I think it falls short in several ways:
- the top usability problem is the fact that it cannot be used as context manager;
- if a connection is broken it is not noticed it until it is used by a client;
- if minconn connections are already taken, a new one is created and disposed of as soon as finished using, regardless of whether other clients may need it;
- if more than maxconn connections are requested the client will receive an error.
For psycopg3 I would like something better. I have read around, looking into other pool implementations to figure out what a well designed connection pool ought to do (a very well thought one seems the Java HikariCP) and these are a few ideas I'd like to work on: they are here for a feedback, before I jump into enthusiastically implementing the wrong thing...
Client interface
Context manager
In modern Python it is expected that resources are handled by a context manager, so the canonical way to use a pooled connection should be:
with pool.connection() as conn: cur = conn.cursor() cur.execute("...") consume_data(cur.fetchall())
Note that, because there is a Connection.execute() method, the minimal use would be:
with pool.connection() as conn: conn.execute(DML_QUERY)
Blocking behaviour
Another important usability point is the behaviour in the face of an exhausted pool: the sane behaviour here is to block: not forever, but until either a connection is available or until a timeout expires, and only then throw an exception, so that in the face of a spike there is a buffer before requests start to get killed. I would expect no crash here:
async def worker(conn, work_for_sec): await conn.execute("select pg_sleep(%s)", (work_for_sec,)) pool = psycopg3.AsyncPool(maxconn=4, connection_timeout_sec=1.0): for i in range(8): async with pool.connection(): create_task(worker(conn, work_for_sec=0.5))
Which also illustrates that, as many other objects in psycopg3, there would be an asyncio version of the object and a sync one (which would work with normal threads and with Eventlet/gevent green threads).
Connection configuration
There should be a way to configure a Python connection, for instance to register adapters, before it is available to the application. Is subclassing the best way to do it? Someone might find more useful to choose dynamically what configuration to use.
# Subclassing: class MyPool(psycopg3.pool): def configure(self, conn): MyAdapter.register(conn) return conn pool = MyPool() # Configuring: def my_configure(self, conn): MyAdapter.register(conn) return conn pool = psycopg3.Pool(configure_func=my_configure)
Exclusive connections?
Should there be a way to create an exclusive connection? Sometimes I need one, in an otherwise pooled application, e.g. to receive notifications:
def listen_notifications(): with pool.exclusive() as conn: conn.autocommit = True for notify in conn.cursor().notifies(): handle_notification(notify)
But I don't think there is such need if the connection parameters are made available on the pool:
pool = Pool(maxconn=config.pool_maxconn, args=(config.db_dsn,)) def listen_notifications(): with psycopg3.connect(*pool.args, **pool.kwargs): conn.autocommit = True for notify in conn.cursor().notifies(): handle_notification(notify)
which leaves to the user the choice between what connection class to use, how to configure it, etc. So that's probably not so useful.
Internal behaviour
Pool worker
If we desire the pool to be a fast provider of connection, not slowing down the application operations, certain things should happen behind the scene:
- do we have a spike, so we need more connections?
- do we have a moment of calm, so we can do with less connections?
- are the connections in the pool still sane?
A trivial behaviour would be something like "if there are no connections available create a new one":
class Pool: def getconn(self): conn = self._get_a_connection_from_the_pool() if conn: return conn else: conn = psycopg3.connect(*self.args, **self.kwargs) self._put_it_in_the_pool_when_done(conn) return conn
This interesting article shows that this might not be the best way to do it, especially if the connection time is particularly long compared to the processing time. Growing the pool could be demanded to a background worker, following a strategy like:
class Pool: def getconn(self): if not self._pool(): self.worker.create_a_new_connection_and_put_it_in_the_pool() return self.wait_for_a_connection_from_the_pool()
If there is a workers infrastructure in place there can me more jobs for them: periodically checking for the state of the unused connections in the pool, for instance, or closing them if they have been alive more than a configured amount of time (30 minutes?) and replacing them with a fresh one, (or not replacing them in case we have more than minconn connections and a previous spike has now passed).
What to do in case of connection failure?
The pool acts as a shield helping the application to reconnect in case the connection with the database is lost. However, when does this behaviour stop being sane? If the database is unreachable - maybe misconfigured, maybe moved - there is a risk that the application doesn't die, because the pool keeps insisting to reconnect, but it doesn't work either, because there is no working database connection.
In my experience with live systems this can be a difficult condition to diagnose: several times I have wished that an application, screaming loud on a dashboard, instead of trying to stay up complaining quietly in some logging stream. On the other hand maybe the state the application holds is precious enough that it can be worth to wait a few minutes before crashing and burning.
A first step towards a sane behaviour could be to die early, on startup, if the connection is not working when the first pool population is done: the program would fail hard and fast if the database - assumed to be a prerequisite - is nowhere to be found.
What if the database is missing in action during the program lifetime? Following connection attempts may be repeated, with an exponential backoff, until dying after a few minutes of fruitless attempts (with a sys.exit(1) or some other termination function which might be subclassed).
Connections usage pattern
In which order should the connections in the pool be used? As a stack (put back a connection on top of the stack, it will be the next one to use)? As a queue (put back the connection at the bottom of the queue, try to use them uniformly)? Randomly (whatever comes out from whatever hash map used to keep the connection)?
Is there a reason to prefer one way or the other? ISTM that a stack behaviour allows a better reuse of prepared statements and an easy implementation of the max_idle_sec parameter (get connections from the top of the stack, evict idle ones from the bottom). I am sure I haven't thought about this well enough though.
Proposed API
Given the behaviour described, the methods available on the pool object might look like the ones described here.
The interface would be implemented in two version: a Pool class using normal blocking functions and threads for concurrency (which greenlet libraries such as Eventlet or gevent would be able to monkeypatch) and an AsyncPool implementation using the usual await and async with on blocking methods and using asyncio.Task for concurrency.
- connection(timeout_sec=None) method:
Open a context block and return a connection from the pool. The connection is returned to the pool at the end of the block.
On block exit, if a transaction is open, commit or roll back an open transaction, according to whether an exception has been raised in the block, consistently with what the connection block does.
- getconn(timeout_sec), putconn(conn) methods:
Obtain a connection from the pool and return it. To use if, for some reason, the context manager cannot be used.
On putconn() check the state of the connection: if it is broken dispose of it and create a new one; if in state of transaction of error roll back the transaction, consistently with what Connection.close() does.
- close()
- Close all the connections in the pool. Further attempts to create a new connection will fail immediately with a helpful message (like, "dude, what do you want to do, seriously?")
- configure(conn):
- Configure a connection before making it available to the pool. The default implementation is no-op: subclasses may override it to configure the connections; alternatively a configure_func might be passed to the Pool constructor.
- get_info():
- Return stats about the behaviour of the pool so far (connections open, reused, returned...) for monitoring.
- terminate()
- Terminate the program, invoked by a pool worker after connection attempts have been repeatedly fruitless. It is exposed to allow overriding the default behaviour (sys.exit(1)) in a subclass.
- get_maintenance_task():
Get the next maintenance task to perform on the pool. Having them available externally would make possible to have full control of when such tasks are executed (useful for testing or to provide some sort of sync behaviour, using no background worker).
Experimental idea: will drop it if it would make the pool implementation more difficult than it should be.
Key configuration parameters
These parameter would be passed to the class constructor.
- minconn:
- Minimum number of connections to keep open in the pool. If some are closed the pool should try to create new ones as quickly as possible to replace them. Proposed default: 4 (very defensive: to enable the pooling behaviour but to avoid to saturate a server unless configured up).
- maxconn:
- Maximum number of connections to open at any given time. If maxconn == minconn no extra connection is created in case more client requests arrive: they will just wait until a connection is made available again. If maxconn > minconn the pool can create new connections if the demand increases; later the extra connections created may be closed if deemed no more required (default: minconn).
- args, kwargs:
- Arguments to pass to the connection_factory to create a new connection (default: empty, connect as per PG* env vars, like psycopg3.connect()).
- connection_factory:
- The connection class to create (default: Connection for Pool, AsyncConnection for AsyncPool).
- configure_function:
- A function to configure the connection after its creation and before making it available to the pool. Alternative to subclassing the Pool class to configure Pool.configure(). It can be a callable taking a connection as parameter, or the dotted name of such callable, so that e.g. the function name might be passed to the application as an env var.
- timeout_sec:
- Default timeout before raising an exception if a connection cannot be served. It may be overridden by Pool.connection(timeout_sec=...) (default: 30 sec).
- max_lifetime_sec:
- Maximum time a connection should be kept in the pool and used. After such time, when the connection is not in use, it can be closed and replaced by a new one (default: 30 minutes - 10% random factor to avoid mass evictions).
- max_idle_sec:
- Time a connection can sit idle in the pool before being removed. Only connections above minconn are removed, if maxconn allows to create them (default: 10 min + 10% random).
- terminate_after_sec:
- Number of seconds to wait for a reconnection attempt before giving up and terminate the program, calling Pool.terminate() (default: 5 min, starting from 1sec ± 10%, doubled up until the precise threshold, then kaputt).
- num_workers:
- Number of background workers to perform maintenance tasks. If set to 0 the dynamic characteristics of the pool might be downgraded (e.g. creating a new connection will happen in the requesting thread, blocking it for the time it takes). Background jobs might be executed by the application calling get_maintenance_task() (default: 3).
Thoughts?
Please let me know what you think your Best Ever Connection Pool for psycopg3 should do. Thank you very much!