Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Our Experiences with a similar structure #4

Open
ccrvlh opened this issue Aug 26, 2022 · 32 comments
Open

Our Experiences with a similar structure #4

ccrvlh opened this issue Aug 26, 2022 · 32 comments

Comments

@ccrvlh
Copy link

ccrvlh commented Aug 26, 2022

@zhanymkanov thanks for the write up. It’s great to have some benchmarks on professional implementations, this is awesome and one of the most valuables repositories, just a lot of production-ready and architecture tips, great stuff, thanks a lot for sharing this!

To our (very positive) surprise, this is very similar to what we are doing in our side. I though it was worth sharing our experiences and the choices we've made along the way good and bad.

⚠️ This ended up being a lot longer than what I expected, my apologies.

Project Structure

This is very similar to what we are doing. The functional way of splitting things doesn’t really work except for really small projects, so we also have a “module” based approach. Our application looks something like:

ourproject-backend
├── alembic/
├── app
│   ├── auth
│   │   ├── routes.py
│   │   ├── schemas.py  # pydantic models
│   │   ├── models.py  # db models
│   │   ├── permissions.py # our decorator
│   │   ├── exceptions.py
│   │   ├── service.py
│   │   └── utils.py
│   ├── core
│   │   ├── routes.py
│   │   ├── services.py
│   │   ├── ....
│   ├── users
│   │   ├── routes.py
│   │   ├── services.py
│   │   ├── ....
│   ├── tenants
│   │   ├── routes.py
│   │   ├── services.py
│   │   ├── ....
│   ├── extensions
│   │   ├── logs.py # JSON Logger etc
│   │   ├── middleware.py # correlation ID & request tracker
│   │   ├── ....
│   ├── services
│   │   ├── mailer.py # a client to SES
│   │   ├── filesystem.py #  a wrapper over S3
│   │   ├── ....
│   ├── db
│   │   ├── mixin.py
│   │   ├── base.py
│   │   ├── engine.py
│   │   ├── ....
│   ├── utils
│   │   ├── schemas.py
│   │   ├── helpers.py
│   │   ├── ....
│   ├── modules
│   │   ├── module_a
│   │   │   ├── models.py
│   │   │   ├── routes.py
│   │   │   ├── schemas.py
│   │   │   ├── ....
│   │   ├── module_b
│   │   │   ├── models.py
│   │   │   ├── routes.py
│   │   │   ├── schemas.py
│   │   │   ├── ....
│   ├── config.py # where the Dynaconf singleton lives
│   ├── exceptions.py
│   ├── routes.py # registration of all system routes
│   ├── hub.py # our event hub
│   └── main.py
├── tests/
│   ├── users
│   ├── tenants
│   └── module_a
├── .env
├── .secrets.toml
├── .gitignore
├── settings.toml
├── mypy.ini
└── alembic.ini

A few comments:

  • We use a sort of “mixed” structure in the sense that some global/generic modules (like Users/Tenants/Auth) have all the same structure and are in the top level, but the application specific business logic is in the modules module. We have been using this structure for the past couple of years and have been pretty happy with the separation of concerns it brings. We even reuse the same blueprint for different projects, we mostly just change the modules which is great.
  • Having a specific db module on the top level has helped a lot giving us flexibility to have more robust Mixin classes, better engine configuration and some other goodies.
  • We also are really happy with having a core module on the top level. This gives us flexibility to do things like a specific mock service, a taskStatus route or more generic resources.
  • We really like how predictable this is and much boilerplate code we can just copy around from module to module. We have dramatically speed up our development process of new modules with this. This also helped a lot new devs to understand the codebase logic.

Permissions & Auth

Although the “recommended” way of doing authentication in FastAPI would be the dependency injection, we have chosen to use a class-based decorator to control access on the route level.
So our routes look something like:

@route.get('/me')
@access_control(Resources.users_view_self) # this is a enum
def myroute(self):
...

@route.get('/superuser_only')
@access_control(superuser=True)
def myroute(self):
...


@route.get('/open')
@access_control(open=True)
def myroute(self):
...

And our access_control class looks like:

class access_control:  # pylint: disable=invalid-name
    MASTER_USER_ID = 0

    def __init__(
        cls,
        module: Optional[AppModules] = None,
        resource: Optional[AppActions] = None,
        superuser: bool = False,
        open: bool = False,
    ) -> None:
        cls.module = module
        cls.resource = resource
        cls.superuser = superuser
        cls.open: bool = open
        cls.tenant_id: Optional[int] = None
        cls.object_id: Optional[int] = None
        cls.current_user: Optional[UserResponse] = None
        cls.request: Optional[Request] = None
        cls.headers: Optional[dict[Any, Any]] = None
        cls.auth_header: Optional[str] = None
        cls.token: Optional[str] = None

    def __call__(cls, function) -> Callable[..., Any]:
        @functools.wraps(function)
        async def decorated(*args, **kwargs):
            t0 = time.time()
            try:
                await cls.parse_request(**kwargs)
                is_allowed = await cls.verify_request(*args, **kwargs)
                if not is_allowed:
                    raise HTTPException(403, "Not allowed.")
                return await function(*args, **kwargs)
            except exc.NotAllowed as error:
                raise HTTPException(403, str(error)) from error

        return decorated

    async def parse_request(cls, **kwargs) -> None:
        """Get the current user from the request"""
        dependencies = kwargs.get("self", kwargs.get("base_args"))
        base_args: Optional[RequestArgs] = getattr(dependencies, "base_args", None)
        if not base_args:
            return
        cls.tenant_id = base_args.tenant_id
        cls.current_user = base_args.current_user
        return None

    async def verify_request(cls, **kwargs) -> None:
        """Actually check for permission based on route, user, tenant etc"""
        ...

A few benefits we encountered, and few drawbacks:

  • This is great to accept multiple parameters like module or action or superuser=True and things like that.
  • The permission controller (the access_control class itself) is fairly easy to work on, being very powerful at the same time, since it has the *args and **kwargs from the request, and the full context (current user, path, tenant, etc), so all sort of checks can be used. As we increase the granularity over access control we have been considering implementing a permissions decorator for each module, so we can have more specific control over a given resource. But WIP still.

Class-based Services

Our service module service.py started to get big and a mess of functions, so we started having a few class based services, which have been working very well. Something like TenantService , UserService. This almost looks like a repository for simple modules (in some cases we even spiltd the service into service and repository (for more complex business logic). Now each service module has anything from 1 to 10 service classes, this greatly improved our organization and readability.

Class-based views

Earlier this year we refactor all of our routes to use a class based view that is included in the fastapi-utils package and this is made our code a lot cleaner. The main benefit for us, is that the basic authentication process (reading the token and the X-Tenant-ID for the header) is done in one place only, se we don’t have to repeat the dependencies.
What we’ve done is, we have a custom commons_deps function, and at the beginning of each route class we do something like:

@cbv(router)
class MyModuleRouter:
    commons = Depends(commons_deps)
    service = MyModuleService()		

    @route.get('/me')
    @access_control(Resources.users_view_self)
    def myroute(self):
         # And now here we can access the common deps & the service
         current_user = self.commons.current_user
         tenant_id = self.commons.tenant_id
         response = self.service.get_module_resource(tenant_id)

We have been experimenting with something slightly different nowadays, which is having the service being instantiated with the tenant_id and current_user in a dependency injection, so that our service starts up a bit more complete.

Task Queues

We are long time Celery users, but celery is overwhelming and fairly difficult to reason about when you get to the internals and specifics. We just switched to RQ and couldn’t be happier with a few caveats. The logic is amazing (the Queue , Job objets are really intuitive and easy to work with, as are dependency chains with depends_on. The thing is that there’s an issue with async functions. They work if you use the worker, but won’t work if you run in the same process, which is kind of a pain when debugging. We haven’t experimented with starlette’s. Background jobs as we always valued having a centralized dashboard for tasks and an easy way to get a task status for example. As we deploy most of our applications in Kubernetes, being able to scale the workers easily and indefinitely is awesome and we are really glad with it. I have been experimenting with a few different snippets to try to open a PR and make RQ compatible in every scenario.

The fancy architecture

In same cases (actually projects) we slightly changed our module architecture to account for a proper business oriented Model object.

...
│   ├── modules
│   │   ├── module_a
│   │   │   ├── routes.py
│   │   │   ├── services.py
│   │   │   ├── orm.py # the sqlalchemy classes
│   │   │   ├── models.py # "pure" modules (are also pydantic)
│   │   │   ├── schemas.py # the pydantic API schemas
│   │   │   ├── adapters.py
│   │   │   ├── builders.py
│   │   │   ├── interfaces.py
│   │   │   ├── repository.py

For fancier implementations this worked very well, although is a lot more complex to start with. This gives us a proper EntityModel and great separation of concerns, but it gets a lot more verbose really quick, so we found it was only worth it for very complex projects, but it’s also a possibility.

Custom Response Serializers & BaseSchema

We found that the response_class in FastAPI also serializes the data in Pydantic, so it’s not purely for documentation. You can, however, overwrite the default response behavior by making a custom response class, which we did going a bit of performance (anywhere from 50-100ms) and flexibility. So we have something like:

# utils/schemas.py

class JSONResponse(Response):
    media_type = "application/json"

    def __init__(
        self,
        content: typing.Any = None,
        status_code: int = 200,
        headers: t.Optional[t.Mapping[str, str]] = None,
        media_type: t.Optional[str] = None,
        background: t.Optional[BackgroundTasks] = None,
    ) -> None:
        self.status_code = status_code
        if media_type is not None:
            self.media_type = media_type
        self.background = background
        self.body = self.render(content)
        self.init_headers(headers)

    def render(self, content: BaseSchema | list[BaseSchema] | Any):
       # This is not 100% battle proof, but as our services are controlled (only return Pydantic modules) works fine
        if isinstance(content, BaseSchema):
            return content.json().encode("utf-8")
        if isinstance(content, list):
            if isinstance(content[0], BaseSchema):
                def uuid_decoder(obj):
                    if isinstance(obj, UUID):
                        return str(obj)
                return orjson.dumps([item.dict() for item in content], default=uuid_decoder)

And then we use the response directly like:

@cbv(router)
class MyModuleRouter:
    commons = Depends(commons_deps)
    service = MyModuleService()		

    @route.get('/me', response_class=[...])
    @access_control(Users.view_self) # this is a enum
    def myroute(self):
        # And now here we can access the commons
      	current_user = self.commons.current_user
      	tenant_id = self.commons.tenant_id
        response = self.service.get_module_resource(tenant_id)
	return JSONResponse(response, 200)

This gave us a cleaner router since we can use the status code on the response itself, which was more intuitive for use, gained a bit of performance with the orjson encoder and we just like it better. The (big) downside is that we face the risk of having documentation/API inconsistencies, in our case it happened once or twice, but we think it’s still worth it.

Just as you guys we also have a BaseSchema base for all Pydantic schemas we use that have a couple of configurations like orm_mode enum etc.

Using a DefaultResponse class

In several occasions the response is kind of generic, so we use a lot of a schema called DefaultResponse:

class DefaultResponse(BaseSchema):
    status: bool
    msg: str
    details: Optional[dict[Any, Any]] = {}

This is a kind of standardized way of communicating with our client (we have a React frontend) so the front devs always know what to look for when getting a DefaultResponse.

Configuration

Although Pydantic is nice for configuration as well, we couldn’t be happier using the amazing @dynaconf lib, developed and maintained by @BrunoRocha. This was a game changer in our settings management.

All of our settings/secrets went to .toml files and a few things happened:
- Only one file for multiple environments using toml headers
- Only one place to manage keys (in Flask we were used of having multiple configuration classes which were a pain to maintain)
- a singleton with global access our settings.py file has ~10 lines:

#app/config.py

from dynaconf import Dynaconf

settings = Dynaconf(
    settings_files=[".settings.toml", ".secrets.toml"],
    envvar_prefix="MYAPP",
    env_switcher="MYAPP_APP_ENV",
    load_dotenv=True,
    environments=True,
)

And now everywhere we can just

from app.config import settings

myvar = settings['MYVAR']
myvar_a = settings.MYVAR_A

And don’t need to change anything when deploying to K8S since we already inject everything with env vars (config). Can’t recommend it more. We still have to experiment with the Vault integration, which is the next step.

The Message Hub

This helped a lot while we were trying to further decouple our services.
The hub is a centralized hub to share message between modules, something like:

class MessageHub:
    """Message hub for events"""

    handlers = {
        module_a.ResourceCreated: [
            module_b.handle_resource_created,
            module_c.handle_resource_created,
        ],
        module_d.ResourceDeleted: [
            module_b.handle_resource_deleted,
            module_c.handle_resource_deleted,
        ],
    }  # type: dict[Type["Event"], list[Callable[..., Any]]]

    @classmethod
    async def track(cls, event: ApplicationEvent):
        """Tracks the Application activity.
        Receives the application event that will be used by the AuditService.

        Args:
            event (ApplicationEvent): The ApplicationEvent
        """
        await AuditService.save(event)

    @classmethod
    async def handle(cls, event: Event):
        """
        Handles an arbitrary event.
        It will receive the event, and get the handlers that should handle
        the event. The order on which the handlers will execute the event may vary.
        If the event is sent to the worker, the handlers are async, meaning they can run at the same time.
        If the event is synchronous, than each handlers will handle the event sequentially.

        Args:
            event (Event): The Event.
        """
        if type(event) not in cls.handlers:
            logger.info("No handlers for event: %s", event.__class__.__name__)
            return

        # Call listeners functions
        for fn in cls.handlers[type(event)]:
            if event.is_async:
                worker.enqueue(fn, event)
                return

            await fn(event)

And in most modules we have handlers.py module that will have a few functions that handle events. The services themselves usually dispatch events, like hub.MessageHub.handle(event_created_by_the_service), and we also use it to track application activity, normally called by the route hub.MessageHub.track(application_activity_schema)

Types & Docs

100% of arguments are typed and 100% of methods / functions have docstrings. I honestly can't live without anymore. Now wondering if could just compile the whole code to C and make it fly? Nuitka, MyPyC maybe? TBC...


Now the bad part, and our (really) bad practices

Local Session Management

For a couple of reasons we didn’t implement the request-coupled session management (inject the session through FastAPI’s dependency injection system) and we ended up having a lot of services that handle the session locally, which is not cool and not recommended by SQLAlchemy. Think of:

class ModuleService:
    ...
    async def module_method(self, ...):
       # Terrible
        async with async_session() as session:
	    ...
	return something

Managing the session lifecycle itself is fairly ok and it works for really simple services, but what we found is that for more complex services methods that call on another you end up nesting sessions which is terrible. Imagine calling other_method from module_method that also has the same session lifecycle management, now you just opened a session within another session. Jus terrible. We are gradually moving to better session management, but we are still trying to find better ways of handling it.

Little use of the Dependency Injection

In your write up a lot of great example of how to properly use and leverage the power of dependency injection, we don’t use much of those, and we definitely should.

Lack of Context in Services

Sometimes we found ourselves having a Service class that didn’t even have a initializer and was purely for organization, this is fine, but we are missing a lot of benefits of having some context in the service (example: tenant_id and session) which would save was from having the tenant_id being passed to every single method in a service class. So there’s definitely a lot to improve here.

There's obviously a lot to improve and a whole lot more of bad things that I probably forgot to mention, but again, I though it was worth sharing the experience. And to finish our Dockerfile, which is also pretty simple (using poetry and leveraging it's dev-dependencies logic something that was mentioned here as well #1 :

FROM python:3.10-slim
WORKDIR /app

COPY pyproject.toml .
COPY poetry.lock* .

RUN apt-get update -y && \
    apt-get install gcc -y && \
    apt-get install libpq-dev -y && \
    python -m venv .venv && \
    .venv/bin/pip install poetry && \
    .venv/bin/poetry export -f requirements.txt --output requirements.txt --no-dev --without-hashes && \
    .venv/bin/pip install -r requirements.txt && \
    apt-get remove gcc -y && \
    apt autoremove -y

ADD . /app
EXPOSE 8000
CMD [".venv/bin/uvicorn", "app.asgi:app", "--host", "0.0.0.0"]
@zhanymkanov
Copy link
Owner

I take my hat off to you, @lowercase00

That is excellent writing worth being shared in a separate post.

I would love to link it in the main branch and I want to re-read it later with a fresh mind.

Thank you.

@ThirVondukr
Copy link

Local Session Management - It's not wrong to create multiple sessions per request when it makes sense - think graphql resolvers or something similar 🤔

@ccrvlh
Copy link
Author

ccrvlh commented Aug 27, 2022

@zhanymkanov glad it was worth the read! of course, feel free to use this in anyway you think it's worth.

@ThirVondukr I don't really think the way we managed sessions makes much sense to be honest. probably there are situations where this is reasonable, but I think our code is just bad in that aspect, really...

@ThirVondukr
Copy link

In most REST endpoints it's enough to use one session per endpoint since it's a single operation that either completes or not.

@chichi13
Copy link

chichi13 commented Sep 4, 2022

I am truly grateful for this sharing of information !

I would like to know a few more things, how do you handle pagination in your project (limit/offset)?

@ThirVondukr
Copy link

That depends on the use case, some things need page based pagination, some need cursor pagination

@chichi13
Copy link

chichi13 commented Sep 5, 2022

Okay, you've made your own pagination system, you didn't use an external package ? (as fastapi-pagination for example)

@ThirVondukr
Copy link

Yes, because creating page based pagination isn't that hard compared to keyset/cursor pagination.

@reritom
Copy link

reritom commented Sep 21, 2022

I have been looking for something like this for a while. It seems impossible to find good examples and discussion about structuring and the architecture actual large applications.

I am encountering the session management problem too. If I have service layers that consume eachother, with the same session lifecycles, then the consumer service can be problematic for the nested service. I.e if the nested service performs an external change, but the consumer service has made the call within a nested transaction that it rolls back, the nested service now has lost sync with the action it performed, and its own state. That among other issues with session transaction/commit ownership.

I am wondering how you are handling business logic validation of payloads.

I currently have a large app that I am migrating to FastAPI (for the dependency injection to makes tests cleaner, and how clean openapi setup is).

My application uses Flask and Marshmallow, with some mixins for class based views that make the marshmallow schemas work pretty seamlessly.

In the application, the views use marshmallow schemas that both validate the payload content with basic type and format checks, but also apply business logic validations. So if the payload contains ids, these ids fields will have checks applied to them to confirm they are usable by the authenticated user.

Some will argue that this is a mixing of layers, but in my case it has been handled in a way that works well (in my opinion). The marshmallow schemas are given the user as a context, and then when validating, each field, the validator will then call a service layer interface.

My reasoning for this is:

  • The controller+framework is the rest api interface. It is the responsibility of this layer to make sure any input errors are mapped to the input request. So it doesn't make sense for only the service layer to do the validations, because the service layer will return errors that aren't specific to the rest api interface.
  • So the service layer will perform validations based on an object it receives, and these validations will not be optimised towards the structure of the rest api specific interface. The controllers could just remap these errors (which it does), but remapping after the fact is more restrictive than controller orchestrated validations.
  • The input to the rest api interface doesn't necessarily directly map to the service layer interface.

In FastAPI there doesn't seem to be any clear way to pass a context to a pydantic schema. Reading the issues on pydantics github, there was reference to using ContextVars. So if I could override how the payload gets implicitly loaded, I could wrap it in a context manager that sets the user on a ContextVar, and then use that within the pydantic schema, but I don't see any way to handle this (and I wouldn't want to have to every controller retrieve the raw body and manually load it). I could add a middleware that takes the user and adds it to a context, but then we lose the benefit of the def controller(user: User = Depends(get_user)) (unless the get_user accesses the request state to get the user)

So what other approaches are there?

  • If we load the pydantic model without business logic validation, how can I cleanly apply business logic to each field in a way where each field can then be checked with a context, with all errors being returned in one go instead of sequentially? (This doesnt address the service layer to api/controller layer remapping)
  • I have seen examples where in the controller you use:
def get_valid_item(item_id: int = Body(..., embed=False), user: User = Depends(get_user)):
    item = service.get_item(item_id, user)
    if not item:
       raise Something
    return item

def my_controller(item: Item = Depends(get_valid_item)):
    ...

but in that case, it seems it will get messy if you have payloads with numerous ids (and I am not sure whether it will return all ID errors, or just the first dependency that fails.

So the summarise, do you have a best practice performing business logic validations [that require a context] to fields in a way that returns as many errors as possible in a single request, that maps correctly to the api interface which doesnt necessarily map to the service interface?

@ccrvlh
Copy link
Author

ccrvlh commented Sep 21, 2022

A big disclaimer: I’m not really sure I can talk about the so called “best practices”, but I can share my experience, it might help somehow.

Validation

By your text I understand that your validation is a bit more business-oriented rather than just types/fields. The way we do it is: the route/controller is responsible for the interaction with the client, so if a string parameter is wrong for example, we perform the validation within the route (this happens when we accept a comma separated list of strings for example, that is not natively supported by FastAPI as a type per se). Otherwise most of our validation logic sits within the service layer.

Pydantic is great for the types/fields validation, but it also extremely flexible, so you could have a field that’s hidden for the API and use this field internally for example, or even allow Extras. With context within the schema you can now leverage all of Pydantic's strengths (like the validator decorators @root_validator and etc) You can always add methods to Pydantic as well, I guess is a matter of personal preference.

I’m personally not a big fan of too much business logic within Pydantic models. Simple stuff (like making sure two passwords are the same for example) are mostly fine, but I tend to prefer to have the schemas.py (where we keep all of Pydantic schemas) mostly just data containers parsing and validating data input (and not business logic).

The way I personally like to think about this is: imagine that the API is not the only entrypoint to the application, but rather just an implementation (for example having both a CLI and an API as entry points). If the validation would be needed both for the CLI and the API then most likely this is a business rule that should live in the service layer. If this is something API specific that hasn’t been solved by Pydantic already (type/format validation), the I might as well handle it in the router/controller (or maybe injecting a dependency). At the end of the day, we mostly have logic in the services, and the routes are usually catching exceptions from services and translating them to the proper HTTP Exceptions, sometimes making small validations with the params (and responding helpful error messages for the client). I honestly think that these are all just fine, and mostly personal preference (considering the trade-offs on testing capabilities, separation of concerns, readability, etc, etc).

Session management

At the beginning we were handling sessions within methods, which is less than ideal for a lot of reasons (unit of work, rollback, extra verbosity and context managers all around, etc, etc). We recently changed to a dependency injection approach, that goes like:

# .../module/depends.py
async def repository_injector() -> AsyncGenerator[MyRepository, None]:
    try:
        session = async_session()
        service = MyRepository(session=session)
        yield service
    finally:
        await session.close()

# .../module/repository.py
class MyRepository(...):
    def __init__(self, db: AsyncSession, ...):
        self._db = db
        super().__init__(db, ...)

    @property
    def db(self) -> AsyncSession:
        if not self._db:
            raise Exception("Database session is not initialized.")
        if not isinstance(self._db, AsyncSession):
            raise Exception(
                "Database session invalid. Expected AsyncSession, got {}".format(type(self._db))
            )
        return self._db

    @db.setter
    def db(self, db: AsyncSession) -> AsyncSession:
        if not isinstance(db, AsyncSession):
            raise Exception("Session is not valid.")
        self._db = db
        return self._db

    def my_method(self, ...):
        self.db.execute(...)

# .../module/services.py
class MyService(...)
    def __init__(self, db: AsyncSession, repo: MyRepository):
        ...

Now the repository, service (or whatever business layer uses the session), already has the session injected and you gain control over the transaction (you can commit or rollback a whole block, instead of individual pieces). One way to do that is by using session.flush() that sends the statement to the Database (generating automatic primary keys for example), but without committing, so you still have the rollback() at hand if anything goes wrong. For nested services you already have the session at hand, so you can just pass it along, no issues there.

What we also found in our case, is that when we started having too much nesting, there was something wrong with our design, actually that’s the reason why we started using the repository pattern more often (although with a single implementation) and having it injected to a service for example. We found that things were a lot cleaner (and easier) when doing that.

Now, the issue starts with concurrency (asyncio.gather(…)) because a single session can’t be used at the same time. I’m still trying to figure out how to deal with it, have a few ideas in mind, and just opened a discussion in the SQLALchemy Repo to better understand the issue.

@ThirVondukr
Copy link

@lowercase00 Databases don't support concurrent operations on same connection, that's not really a sqlalchemy issue, you can spawn more sessions if you need to.
Session dependency could be created like this, no need for try/expect:

async def get_db() -> AsyncIterable[AsyncSession]:
    async with async_session() as session:
        yield session

In my opinion pydantic should only perform simple validation that doesn't have anything to do with request state or database
If you need to "automatically" raise an error maybe create a method for that, e.g. get_or_404 to retrieve your models or raise 404 error, you can also make it a simple function that would throw an exception if it receives None 🤔

@ccrvlh
Copy link
Author

ccrvlh commented Sep 21, 2022

@ThirVondukr Sure, I don't think it is a SQLAlchemy issue, but rather a pattern to solve somehow (I haven't solved it). If asyncio's concurrency shines with I/O, there should be away to enjoy these benefits: using different connections perhaps?

From what I understand no (practical) difference in the try/except and the contextmanager? Both will enforce closing the session afterwards given the close() in the finally block. I tend to prefer calling the .close() explicitly since you only do it in a single place.

@ThirVondukr btw, if you would like to contribute, or are interested in the discussion about SQLAlchemy patterns to use with asyncio concurrency there's a discussion here.

@ThirVondukr
Copy link

Using a context manager is just more clean/pythonic way since it would not be possible to say forget to call .close() in that case

@reritom
Copy link

reritom commented Sep 21, 2022

I appreciate your reply @lowercase00, I will follow-up a bit because you appear very knowledgeable on the subject. Regarding the validation I will give an example.

Considering a payload like:

class Package(BaseModel):
    item_id: int
    tag_id: int
    # Maybe a bunch more ids
    ...

So from the controller perspective, we could just use this schema and validate that the ids are integers. From a business logic perspective, we need to apply access control logic to these ids. So while the format/type may be correct, the value should be considered as invalid because it doesn't exist for the authenticated user.

In my service layer I can do (I am neglecting some deps and sessions and whatnot in the snippets):

class PackageService:
    def __init__(user: User, ...):
        self.user = user

    def create_package(package: PackageSchema):
        item = ItemRepo.get(package.item_id, user_id=self.user.id)
        tag = TagRepo.get(package.tag_id, user_id=self.user.id)
        ... # Repeat for all ids

        package = PackageRepo.create(...)

The repos can raise whatever, which then gets mapped to some HTTP_40X error. But:

  1. if there are a lot of ids, this procedural approach to check each id seems unorganised, and as-is, if there are 10 invalid ids, you would need 11 requests to eventually fix them all and get your 201 (fix one, send the request, get another error for the next id, repeat)
  2. The raised exception loses granularity for the api consumer.

The service layer can't raise anything based on the api interface, so whatever it raises either needed to be remapped in the controller, or is generic and independent of the interface, such as raising something that returns {"message": "Item not found"}. The service can't meaningfully raise {"message": ..., "details": [{"validation error with loc"}] because the input to the service doesn't necessarily map directly to the interface defined on the api controller (in this example it does, but in practice its not always the case).

So as a consumer of the API, for better error handling I would like the error to be:

{ 
    "detail":[ 
        { 
            "loc":[ 
                "body",
                "tag_id"
            ],
            "msg":"invalid value",
            "type":"value_error.invalid"
        },
        ... # Repeat for the other ids
    ]
}

If you have frontend ui with a form, you can then map an error to the specific input element (Ignoring that the frontend shouldn't be showing ids that they can't use, because this general business logic validation applies to other things, more than just access control).

For this sort of granularity, we need the this validation to be performed in controller. Now we don't want business logic to be performed in the controller as such, but we can have services expose methods that allow the controller to enrich their validations to return more useful errors.

Like:

class PackageSchema(BaseModel):
    item_id: int
    tag_id: int
    # Maybe a bunch more ids
    ...

    @validator("item_id")
    def validate_item_id(cls, value):
        user = ... # User from somewhere, contextvars maybe
        if not ItemService(user=user).get_item(value):
            raise ValueError("Invalid item id")

But here the schema has no dependency injection and no clear way to handle how we pass the user context to the validators.

So I am wondering how you are approaching thing.

Are you performing business logic validations within the service layer, in a way where the returned error is not mapped to the specific field of the input?

Are you distinguishing between field level validations only handling basic validations of type/format/required of the value, with business logic validations of fields being less linked to the given field?

@ThirVondukr
Copy link

@reritom I'd personally recommend to not perform any database actions in your schema, better move them into your service layer 🤔

@reritom
Copy link

reritom commented Sep 21, 2022

@ThirVondukr So would you have service layers raise Exceptions raise SomeException("Item not found") and dismiss the more granular error handling on the api, or would you do somehow raise specific errors from the service layer that the controller then maps to the correct api schema field:

class Service:
    ...
   
    def create_package(self, package: PackageSchema):
        try:
            TagRepo.get_tag(package.tag_id, user_id=user_id)
        except NotFound:
            raise ExceptionWithSomeData("Tag not found", field="tag_id")

        ...

@app.get(...)
def controller(package: PackageSchema, user: User = Depends(get_user)):
    try:
        PackageService(...).create_package(...)
    except ExceptionWithSomeData as e:
        raise SomeValidationError(field=e.field)  # Here the mapping is one to one, but in practice it might not be 

@ccrvlh
Copy link
Author

ccrvlh commented Sep 21, 2022

@reritom I think there are a few different things here. I’m assuming that when you say “the ids” you are talking about all field_id fields in the Package Schema, right? So to be able to perform some random action, you have to check whether all of your fields are valid (business wise). While just looking at this specific problem, you could iterate through the schema structure, get the field names, and have a strategy (map) { field_a: validator_a } to run all validations. But… it honestly is looking a bit strange, there might be better ways to achieve what you want.

First, the frontend should only be loading ids that the user is allowed to see, so when performing the GET /items and GET /tags that is needed to load items on a select component for example, this should already be filtered by what the user can see, so this is the first part of the authorization. When you do that, the frontend only has valid IDs as potential values and hence a first layer of validation for the ids.

About mapping potential issues to specific fields, like: tag_id is invalid. We don’t usually do that automatically, I normally send specific responses using the DefaultResponse (described in the first post) and them pass details (such as these) to describe the specific issue. And then the frontend should be responsible to deal with it.

If you want to still validate the content from a business perspective, I would probably implement validate methods (in the service) that you call before even performing the action itself:

is_tag_valid = await service.validate_tag(...)
if not is_tag_valid:
	raise TagNotFoundException("Tag ID was not found")

And then catching the exception to translate it to a 404 for example. You could obviously do that sequentially or in bulk (performing all validations and then returning a specific message), returning maybe an array of objects each with known fields, maybe something like:

class ValidationDetailSchema(BaseModel):
	field_name: str
	error_code: int
	detail: str

I really like known objects with predictable keys, tends to be more friendly for parsing.

There’s a trade-off here: you can either call a ton of validate methods before going to create_package, but you would be going to the database quite a lot of times (one for each validation, one for the creation). I can’t say whether this is worth it, but it’s just something to keep in mind.

I’m not a big fan of having business logic in the @validator but it is possible, sure. I would still prefer to have a more complex service that handles it all.

That all being said… it does looks like this situation should probably not even be possible if your access control is working from the start, and your business layer is "complete" and properly separated from the Repository for example. From your text I got the impression that the API and the frontend are tightly coupled (you mention controlling form messages for example), this is not good/bad, but it does makes for specific issues to appear where more “generic” APIs wouldn’t worry too much about it. It does seem to be worth to revisit the architecture to see if there’s something to be done diferently.

In one project where our business logic was fairly complex, what we’ve done is we had a class XYZHandler (somewhat in the lines of what DDD calls UseCase) and the Handler (which is a callable class - with the __call__ method implemented) calls the services. This has been very useful in situations where there were a whole lot of steps to perform for a single action (like SendPackage in your case, but overkill if it’s a simple CRUD. In this case we had a HandlerInterface and several handlers, basically on for each UseCase, but that’s a totally different paradigm than the controller > service structure. In this specific project, it really helped me to think of the UseCase, the business process itself, instead of the methods that I would call (I think it’s pretty common to start at the code, and then try to solve the business issue, but the other way round tends to be more effective). And then the classes would change completely, like no more XYZService but rather PackageDispatcher (that would probably use a few services - acting more like utilities methods - and repositories).

Well, I really don’t know what would be the best way, again, just sharing experiences that might be relevant for your use case, hope it helps somehow!

@reritom
Copy link

reritom commented Sep 21, 2022

First, the frontend should only be loading ids that the user is allowed to see, so when performing the GET /items and GET /tags that is needed to load items on a select component for example, this should already be filtered by what the user can see, so this is the first part of the authorization. When you do that, the frontend only has valid IDs as potential values and hence a first layer of validation for the ids.

This is correct. When the frontend populates the input, it is using ids that are relevant to the given user and context. However, it is possible for the ids that are being listed to become outdated.

If the ids (well, their labels linked to an id) shown on the frontend view contains an id that another user for the same tenant has deleted (after this user loaded the view presenting the ids), then we have the case where you can post an invalid id. Ideally when the other user deletes an entity, it will trigger an event which will be sent the the current user using Pusher or some form of websockets, which can then invalidate the cache (or manually refresh) the ids presented to the user as options. But there is always the chance that this event will be lost/late or just not having that solution implemented.

In this case when we POST the payload, we can have different types of field specific errors that the frontend can use to perform different actions.

If the payload were {"tag_id": 1, "name": "somethingUnique"} we have the cases:

  • The "name" field unique constraint failed, so we should return something that resembles {"error": {"details": [{"field": "name", "message": "taken"]}} (Note: the message could be constants, enums, or error codes). In this case, the frontend can choose to put a red outline around the input (I am using the frontend as an example, but in practice its for giving more context to any api consumer).
  • The tag id is not found, so we return the same but with {"field": "tag_id", "message": "invalid"}. In this case, the frontend can be clever enough to know that the tag_id or any <entity>_id with invalid (or whatever enum or status code) should trigger a cache invalidation or re-retrieval of the entities related to that id.

So if I have multiple ids in my payload and one fails, it will use this information to just refresh the ids presented to the user (with some warning/error message), while an error for non-id field might just be used to show a red outline.

Again though, this sort of enriched validation needs to have at the controller level to combining various service interfaces/apis, because if these validations exist purely within the service layer, there will be overhead in the controller layer in mapping them to match the API definition.

Using your example here with the tag validation:

is_tag_valid = await service.validate_tag(...)
if not is_tag_valid:
	raise TagNotFoundException("Tag ID was not found")

We are able to achieve a similar goal of giving the API consumer enough information to know that their ids cache needs invalidating. Changing it to NotFoundException("...", entity="tag") (where entity maybes maps to the resource name in the openapi spec could add some flexibility. Again limited to returning one error at a time, while the field-level errors can trigger multiple consumer side invalidations from a single error response.

I’m not a big fan of having business logic in the @validator but it is possible, sure. I would still prefer to have a more complex service that handles it all.

I agree with this in principle, but also want to provide the API consumer with the tools to be as efficient as possible at correcting for bad requests.

From your text I got the impression that the API and the frontend are tightly coupled

The extent to which my frontend and backend are coupled is purely by the API contract. My existing error messages are marshmallow based so resemble this:

{
    "type": "ValidationError",
    "message": "...",
    "details": {
        "item_id": ["invalid"],
        "other_id": ["invalid"],
        "other_field": ["taken",] # Or other strings that indicate the length restrictions, etc, 
    }
}

And with this sort of error my frontend is able to know that the ids for those two entities need refreshing, while the other_field only needs to display some visual indication that the input isn't valid.

I prefer the FastAPI approach of error formats with the list of objects that represent the locator, message, etc. As you mentioned, objects with predictable keys are generally easier to work with.

Anyway thanks for your input, I found it interesting and enjoyed reading all of your replies in this thread.

@ccrvlh
Copy link
Author

ccrvlh commented Sep 21, 2022

Well, to be honest it seems like you kinda have it figured it out... and mostly of matter of putting the pieces together in a way that it makes sense to you and your team (for readability, maintainability and etc).

Perhaps you can have a ValidationService. In my experience the most important thing in this scenario is to have a clear contract of the return, I would totally give up on trying to mix this with the Pydantic validation and just have a custom validation Schema exactly like the one you already have, you would enforce a contract for the data validation that would make it easy for your client to parse it. My only suggestion here is to invert the logic, and to have predictable keys and custom values: so instead of having item_id: ['invalid']. something like field: 'item_id', status: True. A schema could be something like:

class FieldValidationResponse:
    field_name: str
    status: bool
    details: dict[str, Any]

class ValidationResponse:
    status: bool
    msg: str
    field_status: list[FieldValidationResponse]

This makes it for a much easier parsing experience for the client, and easier for you to build the object you are returning. With this schema you could then have a Union[ValidationSchema | PackageResposne] on your response model, or maybe even a separate route for validation if you think it's worth the extra request (would help to keep the routes more consistent... Then Pydantic would validate the data types, you would have a business validation layer, and then the service, which seems totally reasonable.

But again, total personal preference, I'm sure there are a whole lot of ways to deal with this...

PS: One thing that also comes to mind in this scenario is the Envelope pattern which is pretty nice, since you can encapsulate different types of messages while maintaining a predictable contract. This is a good article about it, I've used in another project and I quite enjoyed it, maybe it's worth to take a look.

@terrysclaw
Copy link

Amazing! that is what I want. Thank you @zhanymkanov, @lowercase00.

@DHuyAu
Copy link

DHuyAu commented Jan 2, 2023

Hi @lowercase00

Great post. Had a look at the project structure and was piqued by your extension package in regards to the correlation id middleware.

I had used ContextVars to manage the correlation id in conjunction with a well known key in the request header which if not present, a random uuid will be generated. If outbound requests are made, the correlation id is then propagated forward by adding it to the outbound headers payload.

It feels a bit clunky, how did you go about managing correlation ids?

@Uncensored-Developer
Copy link

Hi @lowercase00
Thanks a lot for this post,
Can you explain more on the role of the Message Hub, if possible describe a usecase.

@ccrvlh
Copy link
Author

ccrvlh commented Feb 3, 2023

@terrysclaw I just use the package asgi_correlation_id, the idea has always been to track inbound requests though, so not sure this is the same use case. We just wanted a way for the client to tell us the correlation ID so we could track actions on our side.

@Uncensored-Developer yeah, sure.
So the MessageHub is basically just a way to decouple things. Think of it as an internal task queue or pubsub system, but with some benefits and without overhead.

Say you have ServiceA on modulea and in a complex workflows, after it finishes a certain task, it triggers ServiceB on moduleb. You don't want ServiceA calling ServiceB directly for a whole lot of reasons: circular dependency, business logic coupling, potentially blocking operations, error in ServiceB could affect ServiceA response etc. So its just as if you sent the message to a task queue or to a pubsub (kafka/redis-stream) system. fire and foreget. ServiceA has done it's job, and can tell everyone interested that it has finished. Now ServiceB knows that ServiceA finished and can run whatever.
It's almost as if ServiceA and ServiceB were microservices, but they live in the same codebase, and you won't have all the challenges related to microservices (database, gateway, authentication, etc), while also giving all the benefits of a monolith.

Now that you have a hub on your application, you can build things on top of it: you can record every message that's going through the hub, you can have multiple handlers for a certain event (now when ServiceA finishes, run ServiceB and ServiceC in parallel), you can make be synchronous or asynchronous dynamically depending on the event body (sending to Celery/RQ), there just a lot of benefits (in my opinion) without the very heavy lifting of a external pubsub system.

@zhanymkanov zhanymkanov pinned this issue Feb 24, 2023
@Uncensored-Developer
Copy link

@ccrvlh
Please could you explain the role of the modules sub-folders and the core folders because I noticed they have similar contents with other folders like auth, users, tenants. etc

@ccrvlh
Copy link
Author

ccrvlh commented Apr 14, 2023

Hey @Uncensored-Developer sorry the delay.

So when you use the "modular" approach, you'll find that it's fairly easy to end up with a mess of circular dependencies (one module depending on the other). But you'll also find that those circular dependencies are usually all the same (or very similar), so what we've learned was that it made sense to have a sort of "global" set of packages that most of the modules use - the easiest way to think about this is: those are the modules that the application actually need to make sense (eg users, or auth, or tenants or some random set of functions that everybody needs eg core functions).

So what we did was: we promoted the ones that were inherently necessary for the application to function to the root of the package (app/....), and had everything that was just a bunch of functionalities that you could turn on/off within modules. That way your application is actually modular: you can add or remove any set of modules you want, and you won't break things - all you have to do most of the times is to register the route (in your global routes), and the models (in your db), and you're ready to go with a new module (new set of features).

If you ever find yourself having circular dependencies, probably there are better ways to organize your modules - normally you'll add another module (module A) that imports from two other modules (modules B and C), and you'll break the circular dependency between module B and C. In our experience this almost always will make 100% sense from a business perspective, and the app becomes obvious for anyone that looks at it - it's almost a perfect reflection of the underlying business mechanics.

That's why I added a core (some set of functions/classes that almost every module use, and your app just can't live without), and promoted some modules (users, tenants), that are also 100% linked to your app - your app just doesn't make sense without that base.

Hopefully this makes sense.

@gerazenobi
Copy link

gerazenobi commented May 25, 2023

@ccrvlh thanks for sharing your insights and experience with these type of projects, they are much appreciated 🙏

Noticed you switched from Celery to RQ: we've recently switched from RQ to Arq it's async counterpart and deeply inspired from RQ.

@bitstein
Copy link

@ccrvlh Thanks for the great write-ups. How are the modular models being picked up by Alembic for migration autogeneration?

@ccrvlh
Copy link
Author

ccrvlh commented Dec 27, 2023

Hey @bitstein i'd have a models.py file importing the models from every module, and then import the models file on alembic's env.py.

I did change this approach on my last projects though, and now I have all tables into a single db/models.py file. It's a rather long file, which is the downside, but I think it makes it easier for (1) allowing for more flexibility in terms of which module can use a model (2) detaching the persistence layer from the business logic layer (3) easier to reason about the database schema and relationships. Also, I like to fold things, and use comments to separate groups of tables in a somewhat logical manner, so I feel it makes a lot of sense and easier to work with.

@bitstein
Copy link

Hey @bitstein i'd have a models.py file importing the models from every module, and then import the models file on alembic's env.py.

I did change this approach on my last projects though, and now I have all tables into a single db/models.py file. It's a rather long file, which is the downside, but I think it makes it easier for (1) allowing for more flexibility in terms of which module can use a model (2) detaching the persistence layer from the business logic layer (3) easier to reason about the database schema and relationships. Also, I like to fold things, and use comments to separate groups of tables in a somewhat logical manner, so I feel it makes a lot of sense and easier to work with.

Thank you!

@copdips
Copy link

copdips commented Apr 1, 2024

@ccrvlh

Many thanks for sharing.

I'm wondering:

  1. if model_a needs to communicate to model_b, which files are used? by service or by route or by some public interface ?
  2. in case of many-to-many relationships between 2 modules, in which module you define the association table? Or do you place it in a third module dedicated for the associations ?

@ccrvlh
Copy link
Author

ccrvlh commented Apr 2, 2024

@copdips

You mean model or module? If module_a needs to communicate with module_b there a few options: they are both part of the same "workflow" in which case it might make sense to create module_c that depends on a and b. It might be that they are not part of the same workflow, and this could be done through the message hub / external worker (eg. updating a records triggers some other action in another module, but those are independent). Where, it kind of depends - I have some things being tracked on the route layer, and other on the service layer - as I'm mostly doing APIs the route ends up being the most relevant external interface (CLIs for example are mostly for internal usage, so no need to track those, and SDKs are based on the API, so the codebase itself is not meant to be a library - in that case tracking stuff on the service layer is less relevant).

As for the DB, in my latest projects I have moved the (db) models out of the modules to a dedicated db folder, this has given me a lot less headaches when designing the database, and lot more freedom for modules to use the persistence layer as they see fit - sometime you may want more flexibility when querying a table that is theoretically "owned" my another model, for example. I do try to restrict data manipulation to the module that owns the set of tables i'm working on, but this is just a very general guideline rather than a hard rule.

@copdips
Copy link

copdips commented Apr 2, 2024

@ccrvlh ,

I apologize for the misunderstanding earlier. But you correctly interpreted my intent :)

My initial questions were about:

  1. Communication between modules (domains)
  2. Many-to-many relationships between DB models

This morning, I searched on google, and found a doc, while designed for Django https://phalt.github.io/django-api-domains/examples/, has a very similar structure to what's recommended here.

The author introduces two new layers:

  1. interface: Contains all outgoing methods.
  2. apis: Contains all public incoming methods (not the API endpoints themselves). There's a separate file (urls.py) defines routes that consume the apis layer and then interact with backend database models through the services layer.

If I understand correctly, when domain A needs to communicate with domain B, it would call domain_b/apis.py from domain_a/interfaces.py. Behind the scenes, domain_b/apis.py could call domain_b/services.py.

So a services.py could have two consumers, one is the interfaces.py from another domain, and another is the urls.py in the same domain for routes.

The author does not explicitly discuss many-to-many relationships, only mentioning that each domain possesses its own database model file. Regarding your solution to centralize the database models from the domain layer into a single folder, the question remains: how are association tables handled? Is there a single file for all tables, or does each domain maintain its own database model file in this centralized folder ? If it's the latter, where are the association tables placed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests