FastAPI without ORM: Getting started with asyncpg

Let’s start with a simple FastAPI example:

# src/main.py

import uvicorn
from fastapi import FastAPI

app = FastAPI()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0")

Now, we want to add database logic to this app. We don’t want to use ORM, so we choose asyncpg. Let’s see how to integrate asyncpg with the above app.

We first create a postgres.py module that encapsulates the database connection, pool, termination, and other logic:

# src/commons/postgres.py

import asyncpg

DATABASE_URL = "postgresql://postgres@localhost/postgres"

class Postgres:
    def __init__(self, database_url: str):
        self.database_url = database_url

    async def connect(self):
        self.pool = await asyncpg.create_pool(self.database_url)

    async def disconnect(self):
        self.pool.close()

database = Postgres(DATABASE_URL)

I’ve hardcoded the database connection string here, but you can update this to pull from environment variables instead.

You’ll also notice that I’ve instantiated the Postgres class as database variable. This is intentional, and this module-scoped variable can be imported into other modules without the need for dependency injection.

Let’s update main.py to connect to the database:

  # src/main.py

  import uvicorn
  from fastapi import FastAPI
+ from contextlib import asynccontextmanager

+ from src.commons.postgres import database


+ @asynccontextmanager
+ async def lifespan(app: FastAPI):
+     await database.connect()
+     yield
+     await database.disconnect()

- app = FastAPI()
+ app = FastAPI(lifespan=lifespan)

  if __name__ == "__main__":
      uvicorn.run(app, host="0.0.0.0")

We’ve added a lifespan function that connects to the database during application startup and then terminates the connection during shutdown.

Now that the setup is complete, let’s create an example table:

CREATE TABLE IF NOT EXISTS users (
    name  TEXT  NOT NULL,
    email TEXT  NOT NULL
);

We then create an equivalent pydantic model:

# src/users/users_schema.py

from pydantic import BaseModel

class User(BaseModel):
    name: str
    email: str

Let’s proceed to write logic for interacting with the above users table:

# src/users/users_model.py

from typing import List, Optional
from src.commons.postgres import database
from src.users.users_schema import User

async def get_user_by_email(email: str) -> Optional[User]:
    query = "SELECT name, email FROM users WHERE email = $1"
    async with database.pool.acquire() as connection:
        row = await connection.fetchrow(query, email)
        if row is not None:
            user = User(name=row["name"], email=row["email"]) 
            return user
        return None


async def get_all_users(limit: int, offset: int) -> List[User]:
    query = "SELECT name, email FROM users LIMIT $1 OFFSET $2"
    async with database.pool.acquire() as connection:
        rows = await connection.fetch(query, limit, offset)
        users = [User(name=record["name"], email=record["email"]) for record in rows]
        return users


async def insert_user(user: User):
    query = "INSERT INTO users (name, email) VALUES ($1, $2)"
    async with database.pool.acquire() as connection:
        await connection.execute(query, user.name, user.email)


async def bulk_insert_users(users: List[User]):
    query = "INSERT INTO users (name, email) VALUES ($1, $2)"
    user_tuples = [(user.name, user.email) for user in users]
    async with database.pool.acquire() as connection:
        await connection.executemany(query, user_tuples)

Okay, that’s a lot of code, but each function should be self-explanatory. As mentioned before, we’ve used the module-scoped variable database from postgres.py module.

We first acquire the connection and then use the specialized methods within it:

  • fetchrow is for querying a single row.
  • fetch is for multiple rows.
  • execute is when you want to execute multiple queries without returning rows.
  • executemany is same as above and also accepts a list of arguments.

One thing to note is that we need to perform “type conversion” before and after talking to asyncpg. You can see that we’re converting the User object in insert_user and bulk_insert_users to primitives or list of tuples before inserting them. We’re also converting the return value of asyncpg, which is a dict-like object to User type.

We can tie this together with the rest of the application by creating a router:

# src/users/users_route.py

from typing import Optional, List
from fastapi import APIRouter
from src.users import users_model
from src.users.users_schema import User

users_router = APIRouter(prefix="/users")

@users_router.get("/")
async def get_all_users(limit: Optional[int] = 10, offset: Optional[int] = 0):
    return await users_model.get_all_users(limit, offset)

@users_router.get("/{email}")
async def get_user_by_email(email: str):
    return await users_model.get_user_by_email(email)

@users_router.post("/")
async def insert_user(user: User):
    return await users_model.insert_user(user)

@users_router.post("/bulk")
async def bulk_insert_users(users: List[User]):
    return await users_model.bulk_insert_users(users)

And then adding it to main.py:

  # src/main.py

  import uvicorn
  from fastapi import FastAPI
  from contextlib import asynccontextmanager

  from src.commons.postgres import database
+ from src.users.users_route import users_router

  @asynccontextmanager
  async def lifespan(app: FastAPI):
      await database.connect()
      yield
      await database.disconnect()

  app = FastAPI(lifespan=lifespan)

+ app.include_router(users_router)

  if __name__ == "__main__":
      uvicorn.run(app=app, host="0.0.0.0")

You can find the full codebase in this repo.

Related posts: