How to implement dynamic API filtering using query parameters with Flask-SQLAlchemy16 min read

Mar 18, 2022 9 min

How to implement dynamic API filtering using query parameters with Flask-SQLAlchemy16 min read

Reading Time: 9 minutes

When building REST APIs, we usually focus on three key user-experience concepts: Filtering, Pagination, and Ordering. There is no built-in way or best practices for implementing those in the Flask / SQLAlchemy ecosystem, though many packages propose their own solutions. At Mindee, we looked into the different solutions but ultimately decided to build our own logic to make it more flexible for our use-cases without adding another external dependency. That said, this article will focus on dynamic API filtering rather than pagination or order, which may be the subject of a separate blog post in the future, then we’ll see how we used it to make it easily usable on any of our applications’ routes.

NoteAll code snippets in this article are compatible with python3.8 or higher and use isort & black for formatting)

Query Parameters

When it comes to API filtering, the most intuitive way of doing this is to use query parameters. Query parameters only having a key and a value, we had to find a format giving us the ability to specify:

  • field name
  • operator (ineqnotgteltegtltlike, …)
  • value

Inspired by a coworker that was a Django fan at the time, the convention we chose consists of using __ as a separator in the parameter key, between the field name and the operator.

  • ?name__eq=mindee would be an exact match on the name being mindee
  • ?unit_price__gte=20 would return the entities having an unit_price greater or equal to 20, etc.

For simplicity reasons, we decided that not providing an operator would fallback on the equality operator, making ?name__eq=mindee the same as ?name=mindee. We also chose the ~ character to specify if we want to negate the operation. ?unit_price__~gt=20 would match the unit_prices that are not greater than 20.

In addition to this, we wanted to be able to filter on relationships fields. This is done by using __ between the relationship name and its field. Let’s imagine we have an object that has an owner relation to its owner ?owner__name__like=%john this would return the objects whose authors have a name starting with john.

Now that the context is set, we can dive into the code part. To engineer this solution, it is very easy to split it into 3 main parts:

  • Parsing the parameters to extract its components (field name, operator, value).
  • Converting these into an SQLAlchemy query.
  • Make it easy and fast to add to any route of your application.

Base Query Structure

Before jumping into the parsing part, let’s have an overview of our own implementation of flask_sqlalchemy’s BaseQuery object, to have a better understanding of the inputs and outputs of the code.

from typing import Mapping

from flask_sqlalchemy import BaseQuery

# ... remaining code

class Query(BaseQuery):
    def magic_filter(self, filters: Mapping, raise_errors: bool = True):
        og_model = self._get_base_model()
        for key, value in filters.items():
            if parsed := parse_magic_filter_key(
                og_model, key, raise_errors=raise_errors
                model, attribute_name, models_to_join = parsed
                self, _status = self.join_models_if_not_already_joined(
                    models_to_join, raise_errors=raise_errors
                # this make sure our query doesn't break when we run it with raise_errors=False, and
                # an unsupported join is asked. This will just skip the current filter.
                # in case raise_errors=True, self.join_models_if_not_already_joined would already have
                # raised the expected Exception.
                if _status:
                    operation = build_magic_filter_operation(
                        model, attribute_name, value, key.split("__")[-1]
                    self = self.filter(operation)

        return self

# ... remaining code

Let’s take a look at the breakdown of this code.

We extract the components of the parameter using the base Model, in order to either skip malformed parameters or throw custom errors instead of SQLAlchemy’s ones.

At Mindee, we chose to extract the components of the parameter using the base Model, in order to be able to either skip malformed parameters or throw custom errors instead of SQLAlchemy’s ones. To do so, we first retrieved the base model of our query, this is the one we will use to parse attributes and relationships from our query parameters.

from flask_sqlalchemy import BaseQuery, Model

class Query(BaseQuery):
    def _get_base_model(self) -> Model:
        return self._raw_columns[0].entity_namespace

    # ... remaining code

In opposition to the built-in filter_by method that takes self.filter_by_zero() and returns the last joined model as it’s filtering model, in our case we want the first model so that the path to an attribute (including relationships) doesn’t change when we join some models on the fly later on.

Since our filters won’t need multiple joins on a same table, we will use custom join_models_if_not_already_joined method.

from typing import Dict

from flask_sqlalchemy import BaseQuery, Model
from sqlalchemy.sql.elements import BinaryExpression

class Query(BaseQuery):

    # ... remaining code

    def join_models_if_not_already_joined(
        models: Dict[Model, BinaryExpression],
        raise_errors: bool,
        is_outer: bool = False,
        # boolean that can be used to know if the join asked is supported, in case raise_errors was set to False.
        _status = True
        # construct a dict of shape {Model: BinaryExpression}
        joined_models = {e[0].entity_namespace: e[1] for e in self._legacy_setup_joins}

        for model, on_clause in models.items():
            already_joined_on_clause = joined_models.get(model, None)
            # it means this model has not been joined, so we join it, according to the models_to_join on_clause
            if already_joined_on_clause is None:
                self = self.join(model, on_clause, isouter=is_outer)

            # for now, we don't support joining two times the same model with different on_clauses
            # to do so, it would mean we would have to rework our structure, so that we can
            # use sqlalchemy.aliased to join multiple times the same model.
            elif (
                already_joined_on_clause is not None
                and not already_joined_on_clause.compare(on_clause)
                _status = False
                    f"Model {model} is already joined using a different on_clause"
                if raise_errors:
                    raise NotImplementedError()

        return self, _status

    # ... remaining code

As said in the introduction, since this is our very-own filtering method, we chose to make it work only against our use-cases. For now, we never need to filter on two different relationships linked to the same Model, we chose not to add any more complexity to our magic_filter by only raising a NotImplementedError in case two different joins are needed on the same table, with different on_clauses. Implementing this would need our whole logic to change, in order to be able to use sqlalchemy.aliased correctly on our query.

Parsing and Extracting Query Parameters

Now that the frame is set, we can now parse our attribute using the base Model, to determine what SQLAlchemy operation we need to build. Our function will use the base Model, the filter_key (our attribute), and an optional raise_errors parameter that will give us some flexibility on how we want malformed parameters to be handled. If the filter_key is well-formed, we will get the following output:

  • the SQLAlchemy’s Model object, on which the attribute we want to filter on is present
    in case of relationships, this won’t be the base model of our query, but for non-joined queries, it will.
  • the name of the attribute to filter on, on this model.
  • a python dict of shape Dict[Model, BinaryExpression], with all the models to join, and the on_clause to join on.
from typing import Dict, Optional, Tuple

from flask_sqlalchemy import Model
from sqlalchemy.orm import ColumnProperty, InstrumentedAttribute, RelationshipProperty
from sqlalchemy.sql.elements import BinaryExpression

def parse_magic_filter_key(
        model: Model, filter_key: str, raise_errors: bool = False
) -> Optional[Tuple[Model, str, Dict[Model, BinaryExpression]]]:
    models_to_join = {}
    for attribute in filter_key.split("__"):
        column = getattr(model, attribute, None)
        if isinstance(column, InstrumentedAttribute):
            if isinstance(column.property, ColumnProperty):
                return model, attribute, models_to_join
            elif isinstance(column.property, RelationshipProperty):
                model = column.property.entity.class_

                # we need to specify the join clause, so that it is always joined from the base_model
                # and the behavior doesn't depend on the order of our joins.
                models_to_join[model] = column.property.primaryjoin
                if raise_errors:
                    raise AttributeError(f"Invalid filtering attribute: {filter_key}")
                return None
            # we don't want to continue looping if first split is not an InstrumentedAttribute
            if raise_errors:
                raise AttributeError(f"Invalid filtering attribute: {filter_key}")
            return None

    if raise_errors:
        raise AttributeError("No attribute found to filter on")
    return None

The first thing we do is iterate over the components of our filter_key that we extract by splitting using our __ separator. If this is an InstrumentedAttribute, for our use-cases that means it is either a ColumnProperty (an attribute of its model) or a RealtionshipProperty (a relationship of its model). If this is not an InstrumentedAttribute or one of the two types of property, then it is not in our scope, so we raise an error or return None depending on the raise_errors value.

If this is a relationship, we replace our variable model with the associated model and add it to the models_to_join dict, with the associated on_clause (BinaryExpression). Then, the program continues looping and can handle nested relationships. At one time, we should reach the point where our split is a ColumnProperty which is the attribute, so we return our current model, the attribute, and our populated dict of models to join.

Notewe don’t parse the (optional) operator here, since it is placed after the attribute in our format, we always exit the loop before encountering the operator.

SQLAlchemy’s join method do not need the on_clause to be specified when working with Models having explicit foreign keys since it can be inferred. However, we need to specify the on_clause, because the inferred on_clause from SQLAlchemy will depend on our join orders. Not specifying on_clause manually could cause some inconsistencies between queries, not joining the models correctly.

Building the SQLAlchemy Query

Now that we correctly parsed our model, attribute_name, and models to join, we can start building the SQLAlchemy query. Since we build each operation one after another, this is the moment our method to join models only once is needed. First we join models if they’re not already joined, then, we build the operation and apply it with self.filter.

import json
from typing import Any

from flask_sqlalchemy import Model
from sqlalchemy import String, cast, func
from sqlalchemy.sql.elements import BinaryExpression

    "in": "in_",
    "eq": "__eq__",
    "not": "__ne__",
    "gte": "__ge__",
    "lte": "__le__",
    "gt": "__gt__",
    "lt": "__lt__",
    "like": "like",

def build_magic_filter_operation(
    model: Model, attribute_name: str, value: Any, possible_operator: str
) -> BinaryExpression:
    This returns a ready-to-use SQLAlchemy operation that can be passed to query.filter

    :param model: SQLAlchemy model containing the attribute to filter on
    :param attribute_name: name of the attribute to filter on
    :param value: value to filter with
    :param possible_operator: possible operator string

    column = getattr(model, attribute_name)
    revert, operator = _strip_not_operator(possible_operator)

    # if operator is "in", we need to cast the string value as a list
    if operator == "in":
            value = json.loads(value.replace("'", '"'))
            logger.exception("Couldn't parse filter for __in operator:")
            if not isinstance(value, list):
                value = []

    # make string filtering case insensitive
    if isinstance(column.property.columns[0].type, String):
        # lower database stored value, and cast it as string (avoid error on enum fields)
        column = func.lower(cast(column, String))
        # lower query value
        if operator == "in":
            # if operator is in, we need to lower every element of the list
            value = [e.lower() for e in value]
            value = func.lower(value)

    if operator == "isnull":
        usable_operator = "is_" if value in [True, "True", "true"] else "isnot"
        value = None
        usable_operator = FILTERING_OPERATORS.get(operator, "__eq__")

    operation = getattr(column, usable_operator)(value)
    if revert:
        operation = ~operation
    return operation

When building the operation, the first thing we do is, detecting if our not operator ~ is present or not, and get a boolean to know if we have to reverse the operation, as well as the correct operator, stripped from ~. Then, we have a custom clause to handle the in operator. This is a weird personal choice we made since this will only be used on our internal APIs, we decided not to follow the standard application/x-www-form-urlencoded format because we find it clearer and easier to handle it this way.

Passing a list must be done with a stringified JSON such as: my_value__in=[“value1”, “value2”] instead of my_value__in[]=value1&my_value__in[]=value2. This also permits us to reduce the request URI length, and avoid hitting the limits when doing some filtering.

Among the personal choices we made that may seem weird, you can see we decided to always make string filtering case-insensitive, as well as adding a custom isnull operator, that can be either True or False.

Note: With PostgreSQL, when comparing a string with an Enum field, the database value must be casted to string to avoid making the engine crash.

Use it as a Decorator

Now that our Query.magic_filter method is properly implemented, we can dive into how we integrated it into a python decorator, to make it fast and efficient to allow dynamic filtering on any of our back-end routes, without causing much friction between the frontend and backend developers.

It was important for us to make this filtering logic work outside of a route decorator context, and being able to use it on any SQLAlchemy Query object we manipulate in our code. However, our main goal being to speed-up and ease out the development process, we wanted something even faster to add to any API route.

import re
from functools import wraps
from typing import Callable, Literal, Union

from flask import request
from flask_sqlalchemy import DefaultMeta, Model

ALL_FIELDS = "__all__"
    "in": "in_",
    "eq": "__eq__",
    "not": "__ne__",
    "gte": "__ge__",
    "lte": "__le__",
    "gt": "__gt__",
    "lt": "__lt__",
    "like": "like",

def filter_query(
    base_query: Union[Model, Callable],
    search_fields: Union[list, Literal["__all__"]] = [],
    def wrapper(func):
        search_fields_term = "|".join(search_fields)
        operators = list(FILTERING_OPERATORS.keys()) + ["isnull"]
        operators_term = "|".join(operators)
        search_fields_regex = re.compile(

        def decorated(*args, **kwargs):
            query = (
                if isinstance(base_query, Model) or isinstance(base_query, DefaultMeta)
                else base_query(**kwargs)

            filtered_args = (
                if search_fields == ALL_FIELDS
                else {
                    k: v
                    for (k, v) in request.args.items()
                    if search_fields_regex.fullmatch(k)

            set_query(query.magic_filter(filtered_args, raise_errors=False))
            return func(*args, **kwargs)

        return decorated

    return wrapper

Disclaimer: For clarity reason, this example does not contain the code-block we use internally to autogenerate our swagger documentation for all endpoints using this decorator.

The first thing we do is decode our base_query parameter, by using a little trick in our specific use-case. Our internal framework starts by loading all routes into the application context, then it loads our database models. Doing so, we cannot use any model outside of the application context, before the routes are loaded. This means that the following code would raise an error when we try to launch our application.

def my_route():

One solution would have been to accept a Callable, and wrap MyModel.query in a python lambda function, however we stated that it would make the code verbosity too high. Therefore, we chose to accept both a Model or a Callable. When using a Model, the decorator will automatically query the whole table associated, using MyModel.query. For more complex queries, our way of doing things is to create a class method on our model, and use it as an argument of our filter_query method: @filter_query(MyModel.my_custom_class_method), with my_custom_class_method being a class method implemented on MyModel, returning a flask_sqlalchemy Query object.

Once we have our correct query depending on the input type, we decided to add some parametrizing, to choose what fields the filtering is allowed on: we might want to forbid filtering on specific fields, especially for customer facing endpoints. Using regex, we filter out parameters not allowed from request.args, before feeding the filtered dict to our magic_filter method. Here, we use a set_query function, that is basically a wrapper around the flask.g object, that comes with its associated get_query method that we can use in the route to retrieve the filtered Query object.

Here is a complete example of a route using this decorator, with its associated Model.

from flask_sqlalchemy import SQLAlchemy

# here, Query is the custom Query class we explained before, implementing `magic_filter`.
db = SQLAlchemy(query_class=Query)

class MyModel(db.Model):
  id = db.Column(db.Integer, primary_key=True, autoincrement=True)
  name = db.Column(db.String(255), nullable=False)
  private_name = db.Column(db.String(255), nullable=False)
  foo = db.Column(db.Boolean(), nullable=False)

  def only_foo(cls):
    return cls.filter_by(foo=True)

# here, app is your Flask instance
@app.route("/", methods=["GET"])
@filter_query(MyModel.only_foo, ["name"])
def my_route():
  query = get_query()
  # here, the remaining code of your endpoint

Doing this will enable filtering without coding anything else, only the name attribute. We used a custom only_foo method, so that, even with filtering, it is impossible to retrieve any row with foo set to False.


Implementing our custom web framework to easily add pagination, filtering, serialization, on any endpoint helped us at Mindee in reducing the frustrating back and forths between our frontend and backend developers when it comes to adding basic CRUD features to some of our tools. In addition to that, it reduced the verbosity of our back-end applications code and makes it easier to make it match our utilization of it if needed.

NoteThe entirety of the code presented in this article is easily testable, both unit or end-to-end, but we omitted the testing part because it is not completely in scope with our subject and would be too lengthy for a blog post.

Your email address will not be published.