Articles tagged psycopg3
Automatic async to sync code conversion
Posted by Daniele Varrazzo on 2024-09-23
Tagged as
psycopg3,
development
Psycopg 3 provides both a sync and an async Python interface: for each object used to perform I/O operations, such as Connection, Cursor, there is an async counterpart: AsyncConnection, AsyncCursor, with an intuitive interface: just add the right async or await keyword where needed:
# Familiar sync code conn = psycopg.Connection.connect("") cur = conn.execute("select now()") print(cur.fetchone()[0]) # Similar async code aconn = await psycopg.AsyncConnection.connect("") acur = await aconn.execute("select now()") print((await acur.fetchone())[0])
The decision to provide both sync and async code was made early in the development of Psycopg 3 and most of the internal code is written in a way to be compatible with both sync and async code, in order to keep code duplication to a minimum. This was achieved by making all the libpq communication async, and writing the network code as generators, yielding at the moment they need to wait, isolating the differences in the sync/async wait policy all in wait() functions.
This helped to minimise the async/sync differences in the code related to the communication between PostgreSQL and Psycopg. However, the interface between Psycopg and the Python user is still a lot to maintain, and consists of a lot of code that is very similar, almost duplicated, between the sync and async sided. Apart from the obvious async/await keywords, there would be subtle implementation differences, for example:
- using asyncio functions instead of blocking counterparts, for instance await asyncio.sleep() instead of time.sleep();
- asyncio.create_task(f(arg1, arg2)) is similar to thread.Thread(f, (arg1, arg2)).start();
- threading.Event has a asyncio.Event counterpart whose lock() method doesn't have a timeout, parameter, so event.wait(timeout=10) needs to be rewritten as asyncio.wait_for(event.wait(), timeout=10).
Up until Psycopg 3.1, the two variants of each object were kept in sync manually. Every time changes were made on the sync side, they had to be ported to the async side, with cumbersome and noisy diffs, with subtle differences being introduced from time to time. Even the tests were pretty much duplicated (with some sync tests being accidentally lost on the async side, or vice versa). It seemed like a situation that could have been improved.
This is so boring that...
...a computer should do it for me instead.
Writing the async side starting from the sync side? Actually, the opposite. It is obvious that the async side has more information than the sync side (every method definition and call clearly indicates whether it will block or not) and most of the differences are minimal and repetitive. What we want then is a script that takes asyncio-based source code as input and outputs equivalent sync code.
This article describes what we did to implement such a script and how we used it for the initial transformation (replacing manually written sync code with auto-generated code without loss of quality) and how we are currently using it to maintain the Psycopg 3 codebase.
Abstract Syntax Tree
You would be tempted to write a bunch of regular expressions to just scrub away every async and await keyword found, but the source code is probably the wrong level to attack the problem: Python knows how to parse Python itself well and can allow us to reason at a higher level.
A better tool to work with is the Abstract Syntax Tree (AST): an in-memory representation of the code obtained after parsing. At this level we manipulate objects that represent "the for loop", or "the function call", and we are not fooled by unexpected spaces, extra brackets, comments, literal strings, and other traps.
The Python 'ast' module is the obvious starting point: if you have a bit of source code such as:
import asyncio async def async_square(n): # Squares are slow await asyncio.sleep(1) return n * n
you can pass it to the module to see the AST tree that represents it:
$ python -m ast ast1.py Module( body=[ Import( names=[ alias(name='asyncio')]), AsyncFunctionDef( name='async_square', args=arguments( args=[ arg(arg='n')], defaults=[]), body=[ Expr( value=Await( value=Call( func=Attribute( value=Name(id='asyncio'), attr='sleep'), args=[ Constant(value=1)]))), Return( value=BinOp( left=Name(id='n'), op=Mult(), right=Name(id='n')))], decorator_list=[], returns=Name(id='float'))])
You can see, highlighted, the nodes in the tree representing the main statements in the code: the tree represents a module, whose body contains two statements - an import and an async def - with the function body defining an await call and a return statement.
The same ast module can perform the reverse transformation, converting an AST tree back to source:
$ python -c "import ast; print(ast.unparse(ast.parse(open('ast1.py').read())))" import asyncio async def square(n: float) -> float: await asyncio.sleep(1) return n * n
As you can see, the transformation back to code is unfortunately not a perfect reconstruction of the original code, it is only equivalent, with missing comments and different spacing. This is because the syntax tree is abstract and whitespaces and comments don't affect it. If you wanted to take those details into account you would need a concrete syntax tree (something like that exists, but I haven't played with it).
Changing whitespaces is not a problem, but losing comments can be, especially when they are used to control linters (such as Flake8's noqa or Mypy's type: ignore), or simply when you happen to be a human being and want to read the source code. Fortunately there is a simple wrapper module, ast-comments, which does exactly what it says on the tin: it introduces Comment nodes as part of an AST. Playing around with it, it turned out to be a good compromise between an abstract and a concrete syntax tree, after some taming of the comments placement.
Du AST Mich
To perform the code transformation, we will walk over the abstract syntax tree and we will perform some operation to generate a different tree of our liking. Typically, this type of operation is performed using an implementation of the visitor pattern.
This pattern can be incredibly useful whenever you need to perform operations on data structures composed of heterogeneous nodes (I've seen it in applications ranging from converting UML representations to code, converting markup language to HTML, converting Kubernetes manifests to Helm charts, converting annotated lyrics files to Ukulele tab sheets...); unfortunately many of the descriptions of the pattern you can find online fail to make its brilliance immediately apparent (the Wikipedia page is pretty bad at it) because they have historically focused on solving the double-dispatch problem in static languages like as C++ or Java (which is trivial in a dynamic language like Python) rather than focusing on the awesome things you can do with it.
In a nutshell, you will have an object that traverses an input data structure, element by element, building an output structure in the process, allowing you to run different code and to perform different manipulations depending on the type of element being traversed.
In our case, both the input and the output are AST trees, which will happen to be very similar to each other (since we are just trying to translate some subtle differences from one Python module to another): for many nodes, the visitor will just output a copy of it (for example, the return statement in the above example is unchanged). But, if we see a pattern of interest, we can tell our visitor to produce a different node.
The ast module provides a base class ast.NodeTransformer which implements the node traversal and tree production parts. By itself it doesn't perform any operations on the nodes, so it just produces a copy of the input tree. However, by subclassing the class and adding visit methods, you can implement node-specific transformations.
With the AST node transformer, the method called is based on the name of the node being visited; for example, if you add a method called visit_Import to your subclass, the visitor will call it whenever it traverses an Import node, giving you the chance to manipulate an import statement. You can then decide whether you want to change some of the details of the node (drop some imports, change some names), or replace the node with something completely different (such as replacing an async function definition with a sync one).
Let's say that we want to produce a sync version of the above script: the differences should be the following:
@@ -1,7 +1,7 @@ -import asyncio +import time -async def async_square(n: float) -> float: +def square(n: float) -> float: # Squares are slow - await asyncio.sleep(1) + time.sleep(1) return n * n
In our toy example, we want to convert the asyncio module into the time module (which is obviously not the right thing to do in the general case, but let's keep the example simple). The following script implements the transformation and prints the converted module:
import ast class MyTransformer(ast.NodeTransformer): def visit_Import(self, node): for alias in node.names: if alias.name == "asyncio": alias.name = "time" return node with open("ast1.py") as f: tree = ast.parse(f.read()) tree = MyTransformer().visit(tree) print(ast.unparse(tree))
The script will print the new source, with an import time replacing the original import asyncio.
Changing the async call is a bit trickier: we want to change the highlighted parts of the original tree:
value=Await( << this node must be dropped, replaced by its value value=Call( func=Attribute( value=Name(id='asyncio'), << we want time here attr='sleep'), args=[ Constant(value=1)], keywords=[]))),
Adding the following two methods to the above class will implement what has been described.
def visit_Await(self, node): new_node = node.value # drop the node, continue to operate on the value self.visit(new_node) return new_node def visit_Call(self, node): match node.func: case ast.Attribute(value=ast.Name(id="asyncio"), attr="sleep"): node.func.value.id = "time" return node
To make sense of how these methods operate on their input nodes, and then to implement your own transformations, you can always look at the output of python -m ast in order to see the attributes on each node and how they are nested.
The visit_Call method shows how the structural pattern matching introduced in Python 3.10 comes in handy for this operation. The method is called for each function call found in the input code; checking whether the one just received requires any manipulation would have involved a cascade of ifs (the value is a Name, its id is asyncio, the attr is sleep...) which becomes pretty ugly pretty quickly, whereas instead a match statement can describe a complex nested test very succinctly.
Problems with sleep()
Performing the transformation from asyncio.sleep to time.sleep for real is much more complex than this. What if our source includes from asycio import sleep, Event? We would have to split the import into several parts:
from time import sleep from threading import Event
and the latter should be treated differently later because the two Event objects have a different wait() signatures.
To help with this operation, in Psycopg 3 we introduced an internal '_acompat' module (actually two, because the pool is released separately and uses different functions; actually three, because the tests also have their own...) to expose pairs of functions or objects that should be used alternatively in sync or in async mode.
For example we can solve the sleep() problem with:
# module _acompat.py import time import asyncio sleep = time.sleep def asleep(seconds: float) -> Coroutine[Any, Any, None]: """ Equivalent to asyncio.sleep(), converted to time.sleep() by async_to_sync. """ return asyncio.sleep(seconds)
Now it's easy to use from ._acompat import asleep; await asleep(1) and do some simple name substitutions in the AST: the resulting statement from ._acompat import sleep; sleep(1) will work as expected.
Other goodies we have implemented to help unify async and sync code are aspawn/spawn and agather/gather to unify threads and asyncio tasks creation, alist() to encapsulate [x for x in await iterable] in a way that can be easily converted to list(iterable) and many other helpers to smooth the transition.
When everything else fail
There may be parts of the codebase where the difference between sync and async versions is too difficult to handle in a practical way, and is not worth to put together a complex matching for a complex, one-off case. What we want is a simple "if async, do this, else do that".
We have solved this problem by using a pattern like:
if True: # ASYNC foo() else: bar()
The AST with this code, including the comments, looks like:
Module( body=[ If( test=Constant(value=True), body=[ Comment(value='# ASYNC', inline=True), Expr( value=Call( func=Name(id='foo'), args=[], keywords=[]))], orelse=[ Expr( value=Call( func=Name(id='bar'), args=[], keywords=[]))])], type_ignores=[])
Our transformation will find the ASYNC comment: in this case it will simply discard the if side of the condition, as well as the if itself, and will leave only the else branch in the sync code, allowing you to discard unneeded imports or other code that would simply be invalid in the sync context.
This pattern is also efficient, because the Python compiler is able to recognise that if True will always take the first branch, so it will discard the test and the code in the else branch. The dis(assembler) module shows no jump and that no reference to the bar() function call:
$ python -m dis ast3.py 1 0 NOP 2 2 LOAD_NAME 0 (foo) 4 CALL_FUNCTION 0 6 POP_TOP 8 LOAD_CONST 1 (None) 10 RETURN_VALUE
Conversion methodology
Once we have our conversion script, how do we use it to actually convert the code base, making sure to not break it? The process, for us, was iterative: going module by module and adding features to the script until all the "duplicated" modules were complete.
For each module to be converted, the procedure was roughly as follows.
First step: refactoring the code with the intention of not changing any behaviour, but of making the async module as similar as possible to the sync module. This might have meant some code reorganisation, the renaming of some variables, the swapping of some function definitions, the rediscovery of some forgotten skeletons and a chance of giving them a proper burial.
Often we would have implemented some non-I/O related helper function on the sync side and imported it on the async side:
# connection.py def clean_up_conninfo(conninfo): ... # hack hack return better_conninfo def connect(conninfo): better_conninfo = clean_up_conninfo(conninfo) conn = wait(connection_gen(bettern_conninfo)) return conn # connection_async.py from .connection import clean_up_conninfo async def connect_async(conninfo): better_conninfo = clean_up_conninfo(conninfo) aconn = await async_wait(connection_gen(bettern_conninfo)) return aconn
In this case we would have moved the shared functionality in a separate internal module and imported the function on both the sides:
# _connection.py def clean_up_conninfo(conninfo): ... # hack hack return better_conninfo # connection.py from ._connection import clean_up_conninfo def connect(conninfo): better_conninfo = clean_up_conninfo(conninfo) conn = wait(connection_gen(bettern_conninfo)) return conn # connection_async.py from ._connection import clean_up_conninfo async def connect_async(conninfo): better_conninfo = clean_up_conninfo(conninfo) aconn = await async_wait(connection_gen(bettern_conninfo)) return aconn
Now that the two modules are more similar we can run the test suite to verify that the library still works and can commit the current state to git.
Second step: run an async -> sync conversion with the current version of the script. Even running a no-op script is useful: it produces changes that can be easily seen with git diff, suggesting which conversion feature is missing, or what further cleanup we could have done in the code to make the sync and async flavours more similar.
For example, a no-op script that just copies the async side to the sync side, would show up in git diff as:
@@ -1,6 +1,6 @@ from ._connection import clean_up_conninfo -def connect(conninfo): +async def connect_async(conninfo): better_conninfo = clean_up_conninfo(conninfo) - conn = wait(connection_gen(bettern_conninfo)) - return conn + aconn = await async_wait(connection_gen(bettern_conninfo)) + return aconn
The first feature to add to the conversion script is to remove the async and await keywords. Run the conversion and diff again and you will see:
@@ -1,6 +1,6 @@ from ._connection import clean_up_conninfo -def connect(conninfo): +def connect_async(conninfo): better_conninfo = clean_up_conninfo(conninfo) - conn = wait(connection_gen(bettern_conninfo)) - return conn + aconn = async_wait(connection_gen(bettern_conninfo)) + return aconn
The next step is some renaming. If connect() and connect_async() are public functions we don't want to change their names. The script should have a name mapping function suggesting to convert:
- connect_async -> connect
- wait_async -> wait
Implementing this renaming in the AST we would bring us to the diff:
@@ -2,5 +2,5 @@ def connect(conninfo): better_conninfo = clean_up_conninfo(conninfo) - conn = wait(libpq.connect_async()) - return conn + aconn = async(libpq.connect_async()) + return aconn
We are getting there. This remaining aconnn/conn is actually a gratuitous difference: we can change the async side and call the local variable conn without losing readability and obviously without changing any behaviour.
Committing the change on the async side and re-running the conversion would show no more difference on the sync side. At this point we can commit the whole project (any remaining but acceptable change on the sync side, the new features added to the conversion script, new entries in the renaming mapping...), run the tests to verify that no regression has been introduced, and move on to the next module.
This operation, in Psycopg 3, started at commit 765f663f and can be seen in the git history as a parallel branch that was eventually merged in 8bb0f9bf. The diff --stat shows a whopping:
99 files changed, 9697 insertions(+), 8486 deletions(-)
which is obviously a monster changeset, but mostly consists of incremental refactorings, conversions, finding new ways to minimise differences. It could be an interesting ride if you have a project where you need to introduce a similar automatic conversion.
The final result
Here is the Psycopg 3 async to sync conversion script (as of the Psycopg 3.2 release). At the time of writing, It processes 27 files and automatically generates about the 25% of the codebase. Some of the features it boasts:
the AST transformations described above, including tricks like recursion into strings containing code to be transformed, such as Mypy annotations expressed as strings, adjusting the output and the comments to make the resulting unparsed code almost as good as the handwritten side;
it inserts non-essential whitespace, and runs black on the output, in order to make the resulting code as uniform as possible to the original and as good for humans to work with (to read, debug, diff, etc);
since different Python versions may generate different ASTs and different output code, it can run in a Docker container, whose image is created on the fly using as base the Python image of the reference version;
it adds a useful disclaimer to the top of the file:
# WARNING: this file is auto-generated by 'async_to_sync.py' # from the original file 'connection_async.py' # DO NOT CHANGE! Change the original file instead.
it has a "check" mode that runs in Github Action upon every commit, as part of the lint step, and will fail if it finds any files to convert that haven't been committed;
the check mode has its own check: if any script containing the above disclaimer is not included in the list of files to be converted, it will throw an error (because a converted file has not been added to the automatic conversion list);
the check of the check also has its own check! If no file with the disclaimer is found then it means that something is wrong... Maybe the disclaimer has been rewritten and the check doesn't work anymore;
it can run in parallel and only on the files that have changed. Almost as good as make (but for certain tasks it is useful to have all the input files at once, therefore, "better than make").
The code is specific to the Psycopg 3 codebase and formatting style, so it's probably not ready to be used as it is in other projects. But it is probably a good starting point to to your own conversion: change the list of files to process, the name mapping, and you should be good to start.
Hope this helps. Happy hacking!
Pipeline mode in Psycopg
Posted by Denis Laxalde on 2024-05-08
Tagged as
psycopg3,
development
Version 3.1 of Psycopg added support for libpq pipeline mode, bringing significant performance boost, especially when network latency is important. In this article, we’ll briefly describe how it works from users’ perspective and under the hood while also providing a few implementation details.
Building a Django driver for Psycopg 3
Posted by Daniele Varrazzo on 2021-08-02
Tagged as
psycopg3,
development,
recipe
One of the goals of the Psycopg 3 project is to make easy to port code developed from Psycopg 2. For this reason the creation of a Django backend (the module you specify in the settings as your database ENGINE) was a project with a double goal:
- A Django driver is a way to make Psycopg 3 useful from the start, with the possibility of dropping it in a project transparently and have available, when needed the new features offered (for instance the superior COPY support).
- The difficulty of introducing Psycopg 3 in the Django codebase and the type of changes required are indicative of the type of problems that could be found porting other projects.
...and it's done! A few days ago, the new Psycopg 3 Django backend could pass the entire Django test suite!
Designing a connection pool for psycopg3
Posted by Daniele Varrazzo on 2021-01-17
Tagged as
psycopg3,
development
The psycopg2 pool is a pretty simple object, little more than... a pool of open connections, and I think it falls short in several ways:
- the top usability problem is the fact that it cannot be used as context manager;
- if a connection is broken it is not noticed it until it is used by a client;
- if minconn connections are already taken, a new one is created and disposed of as soon as finished using, regardless of whether other clients may need it;
- if more than maxconn connections are requested the client will receive an error.
For psycopg3 I would like something better. I have read around, looking into other pool implementations to figure out what a well designed connection pool ought to do (a very well thought one seems the Java HikariCP) and these are a few ideas I'd like to work on: they are here for a feedback, before I jump into enthusiastically implementing the wrong thing...
The psycopg3 adaptation system
Posted by Daniele Varrazzo on 2020-11-24
Tagged as
psycopg3,
development,
news
The adaptation system between Python objects and PostgreSQL types is at the core of psycopg2 and psycopg3. The flexibility of the psycopg2 adaptation system provides good out-of-the-box object mapping and allows users to customise it to suit any need. Do you want your decimal numbers returned as float because you need speed over pennies? Do you want to map PostgreSQL Infinity dates to the 25th of December 3099? That's certainly doable.
The psycopg3 adaptation system needs some modification compared to psycopg2, because psycopg3 uses the "extended query protocol" to send query parameters separately from the query. Together, with the differences to accommodate, there is also a chance to improve a system that has been in use for several years and has shown its shortcomings together with its strengths.
The new COPY support in psycopg3
Posted by Daniele Varrazzo on 2020-11-15
Tagged as
psycopg3,
development,
news
psycopg2 allows interaction with PostgreSQL COPY commands. However what is possible to do with them is relatively limited: the only possible interaction is with file-like objects:
- there is no adaptation from Python objects to PostgreSQL, as there is for normal queries: data must be formatted "manually" by the user;
- psycopg2 "pulls" data from the file: writing a system that produces data and pushes it into PostgreSQL is a very contrived operation, requiring to write a blocking file-like object;
- there is no support for asynchronous copy.
psycopg3 addresses these shortcomings and makes it easy to write Python programs producing data and pushing it efficiently to the database using the COPY protocol.