-
First Check
Commit to Help
Example Code# An example based very roughly on https://github.com/grpc/grpc/blob/master/examples/python/helloworld/async_greeter_client.py
from fastapi import FastAPI, Depends
app = FastAPI()
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
async def get_stub():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
yield stub
@app.get("/")
async def my_endpoint_with_a_grpc_stub(stub = Depends(get_stub)):
response = await stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
return f"Greeter client received: {response.message}" DescriptionI am looking for advice on how to correctly use FastAPIs dependency injection the GRPC aio library (I need to use this to make requests to a dependent service) The examples all seem to open a channel and use it but then the docs for the sync bits imply that I should multiplex over a single channel https://github.com/grpc/grpc/tree/master/examples/python/multiplex while the AIO docs warn of thread safety and other things (https://grpc.github.io/grpc/python/grpc_asyncio.html) . Unlike the sync lib I cannot init the channel when the file is imported because there is no event loop when that occurs. Should I be doing it in a startup event ? Has anyone used the grpc.aio lib with fast api? Operating SystemLinux Operating System DetailsDebian / Ubuntu containers FastAPI Version0.85.0 Python Version3.10 Additional ContextNo response |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 1 reply
-
I haven't used that, it's probably better to ask them as the question is more related to their library than to FastAPI, there are better chances they will have answers for your questions. |
Beta Was this translation helpful? Give feedback.
-
Hello @jgould22, I don't know if you have found answers to your question. I am jumping in as we have face I think a similar issue where we created the Stub out of fastAPI application and it ended up using a different event loop. Something that worked for us was when we build our application to define a startup_function which will sets the stub in our application state def _get_stub() -> ServiceStub:
return ServiceStub(insecure_channel(URL))
async def startup_function(_app: FastAPI) -> None:
_app.state.stub = _get_stub() and to call it in a startup event @app.on_event("startup")
async def startup_event() -> None:
...
await startup_function(app) And finally in our application code we depends on the following function to inject the stub def get_stub(request: Request) -> ServiceStub:
"""GRPC Service to be injected."""
return request.app.state.stub
@app.get("/")
async def my_endpoint_with_a_grpc_stub(stub = Depends(get_stub)):
response = await stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
return f"Greeter client received: {response.message}" I hope it helps, curious to know if you have found another solution or best practice on this subject. |
Beta Was this translation helpful? Give feedback.
-
Hi @jccarles thanks for your input. I eventually landed on something quite similar that looks a little like the below I am not sure if it is perfect but it appears to be working For what it is worth I also ran into the duplicate event loop issue from fastapi import FastAPI
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
from settings import get_settings
settings = get_settings()
# Create App
app = FastAPI()
# Init the grpc channel connection for rpc
# Based on https://github.com/tiangolo/fastapi/issues/4035#issuecomment-941293227
@app.on_event("startup")
async def init_channel() -> None:
"""
Initialize the GRPC Channel on startup
Reused a single grpc channel to share and save the connections
"""
grpc_connection = GRPCConnection(settings.grpc_host, settings.grpc_port)
app.dependency_overrides[GRPCConnection] = lambda: grpc_connection
# Class to contain the channel
class GRPCConnection:
"""Class responsible for initializing grpc channel at startup"""
def __init__(self, host: str, port: int):
"""Init a channel"""
self.channel = grpc.aio.insecure_channel(
host + ":" + str(port),
)
self.stub = helloworld_pb2_grpc.GreeterStub(channel)
def get_stub(self):
"""Get a stub to use for the grpc"""
return self.stub
# I was not sure if I should be sharing or creating a new stub for each connection
# So far there did not seem to be a huge difference between the two
async def get_grpc_stub(
grpc_connection: GRPCConnection = Depends(),
) -> helloworld_pb2_grpc.GreeterStub:
"""Returns the Method Stub, for use with dependency injection"""
return grpc_connection.get_stub()
@app.get("/")
async def my_endpoint_with_a_grpc_stub(stub:helloworld_pb2_grpc.GreeterStub = Depends(get_grpc_stub)):
response = await stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
return f"Greeter client received: {response.message}"
|
Beta Was this translation helpful? Give feedback.
-
same with https://github.com/bali-framework/bali ? @jgould22 @tiangolo Can fastapi support extensions of similar projects? I look forward to your reply😄 |
Beta Was this translation helpful? Give feedback.
Hi @jccarles thanks for your input. I eventually landed on something quite similar that looks a little like the below
I am not sure if it is perfect but it appears to be working
For what it is worth I also ran into the duplicate event loop issue