Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Retrieve multiple records at once with pipeline #523

Open
XChikuX opened this issue Jun 13, 2023 · 4 comments
Open

[Enhancement] Retrieve multiple records at once with pipeline #523

XChikuX opened this issue Jun 13, 2023 · 4 comments
Labels
enhancement New feature or request

Comments

@XChikuX
Copy link

XChikuX commented Jun 13, 2023

Currently there is implementation details for saving | deleting multiple records at once using redis pipelines.
Example:

@py_test_mark_asyncio
async def test_delete_many(m):
    member1 = m.Member(
        id=0,
        first_name="Andrew",
        last_name="Brookins",
        email="[email protected]",
        join_date=today,
        age=38,
        bio="This is the user bio.",
    )
    member2 = m.Member(
        id=1,
        first_name="Kim",
        last_name="Brookins",
        email="[email protected]",
        join_date=today,
        age=34,
        bio="This is the bio for Kim.",
    )
    members = [member1, member2]
    result = await m.Member.add(members)
    assert result == [member1, member2]
    result = await m.Member.delete_many(members)
    assert result == 2
    with pytest.raises(NotFoundError):
        await m.Member.get(pk=member1.key())

I want to be able to get multiple records at once, so as to minimize network overhead.
Please make it possible

eg. query_members=[pk1, pk2]

result = await m.Member.get_many(query_members)

So something like the delete_many function:

    @classmethod
    async def get_many(
        cls,
        models: Sequence["RedisModel"],
        pipeline: Optional[redis.client.Pipeline] = None,
    ) -> int:
        db = cls._get_db(pipeline)

        for chunk in ichunked(models, 100):
            pks = [model.key() for model in chunk]
            await cls._get(db, *pks)

        return len(models)
@slorello89
Copy link
Member

This is a pretty reasonable ask, my relatively naive thought (from my familiarity of other Redis clients) is that it would just pipeline any task not awaited. I have to admit though I'm not overly familiar with asyncio, but looking at the redis-py code underpinning all of this, it does not look like the physical write-command/read-response portion is re-entrant, so I'm not sure this is possible without some pretty significant refactoring.

As a work-around I think you should be able to leverage connection pooling and the raw database connection to increase concurrency.

@bsbodden or @tylerhutcherson - any thoughts by chance?

@slorello89 slorello89 added the enhancement New feature or request label May 3, 2024
@XChikuX
Copy link
Author

XChikuX commented May 3, 2024

@slorello89 As far as my understanding goes; Asynchronous code does not await on a functional basis. You can functionalise a series of synchronous steps that can be awaited on, but that doesn't mean multiple awaits in a function will pipeline.

It is more of a line-by-line interpretted basis for python. It isn't possible to call an async task without awaiting it.
Therefore, it will not auto pipeline. It must be explicitly supported by the database, by sending in all commands so database can execute (async or sync).

From the redis docs it is clear that the biggest bottleneck is socket I/O. Async programming does not help with this.

There's a good probability it makes it worse.

In python, each awaited call executes a network request to redis and awaits the result. During this waiting it switches context to other tasks and comes back to the result of this network request, which is an interrupt indicating that the program can now focus back onto this task.

@slorello89
Copy link
Member

Hi @XChikuX I'm pretty sure I'm right about this (as I said I would have thought there would be auto-pipelining, but that does not appear to be the case) rather, you can use a connection pool and scale out your reads/writes that way. Co-routines and tasks do not need to be awaited immediately (which is pretty normal in async programming), you can gather them together and await them in bulk.

you can use a conneciton pool to mitigate at least some of this in the interim (I agree this isn't ideal):

from aredis_om import (
    JsonModel,
    Migrator,
    Field
)

import time
import asyncio

from redis.asyncio.client import Redis, ConnectionPool
pool = ConnectionPool(host='localhost', port=6379, db=0)


class TestModel(JsonModel):
    name: str = Field(index=True)

    class Meta:
        database = Redis(connection_pool=pool)


async def await_each_save(num: int):
    for i in range(0,1000):
        tm_coroutine = TestModel(name="steve").save()
        await tm_coroutine


async def await_gather_save(num: int):
    coroutines = []
    for i in range(0, num):
        tm_coroutine = TestModel(name="steve").save()
        coroutines.append(tm_coroutine)
        if len(coroutines) == 50:
            await asyncio.gather(*coroutines)
            coroutines.clear()


async def await_each_search(num: int):
    for i in range(0, num):
        await TestModel.find(TestModel.name == 'steve').first()


async def await_gather_search(num: int):
    coroutines = []
    for i in range(0, num):
        coroutines.append(TestModel.find(TestModel.name == 'steve').first())
        if len(coroutines) == 50:
            await asyncio.gather(*coroutines)
            coroutines.clear()


async def start():
    await Migrator().run()

    # await each save
    start_time = time.time()
    await await_each_save(1000)
    end_time = time.time()
    execution_time = end_time - start_time
    print("Execution time:", execution_time, "seconds")

    # await gather save
    start_time = time.time()
    await await_gather_save(1000)
    end_time = time.time()
    execution_time = end_time - start_time
    print("Execution time:", execution_time, "seconds")

    # await each search
    start_time = time.time()
    await await_each_search(1000)
    end_time = time.time()
    execution_time = end_time - start_time
    print("Execution time:", execution_time, "seconds")

    # await gather search
    start_time = time.time()
    await await_gather_search(1000)
    end_time = time.time()
    execution_time = end_time - start_time
    print("Execution time:", execution_time, "seconds")

benchmarking the above code results in significantly better performance when gathering the coroutines together and awaiting them en-masse (which makes sense because you're scaling your writes out over a number of connection.

Should definitely look into some way of properly pipelining , but I fear it might be quite difficult considering the underpinning client's architecture.

@XChikuX
Copy link
Author

XChikuX commented May 3, 2024

I'll keep this in mind for now. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants