Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: executor to gcp custom container #6136

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions jina/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class ProviderType(BetterEnum):

NONE = 0 #: no provider
SAGEMAKER = 1 #: AWS SageMaker
GCP = 2 #: GCP


def replace_enum_to_str(obj):
Expand Down
10 changes: 5 additions & 5 deletions jina/orchestrate/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ def __init__(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -476,21 +476,21 @@ def __init__(
args = ArgNamespace.kwargs2namespace(kwargs, parser, True)
self.args = args
self._gateway_load_balancer = False
if self.args.provider == ProviderType.SAGEMAKER:
if self.args.provider in (ProviderType.SAGEMAKER, ProviderType.GCP):
if self._gateway_kwargs.get('port', 0) == 8080:
raise ValueError(
'Port 8080 is reserved for Sagemaker deployment. '
'Port 8080 is reserved for CSP deployment. '
'Please use another port'
)
if self.args.port != [8080]:
warnings.warn(
'Port is changed to 8080 for Sagemaker deployment. '
'Port is changed to 8080 for CSP deployment. '
f'Port {self.args.port} is ignored'
)
self.args.port = [8080]
if self.args.protocol != [ProtocolType.HTTP]:
warnings.warn(
'Protocol is changed to HTTP for Sagemaker deployment. '
'Protocol is changed to HTTP for CSP deployment. '
f'Protocol {self.args.protocol} is ignored'
)
self.args.protocol = [ProtocolType.HTTP]
Expand Down
12 changes: 6 additions & 6 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def __init__(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -464,7 +464,7 @@ def __init__(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -969,7 +969,7 @@ def add(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -1132,7 +1132,7 @@ def add(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -1396,7 +1396,7 @@ def config_gateway(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down Expand Up @@ -1496,7 +1496,7 @@ def config_gateway(

Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway

Expand Down
2 changes: 1 addition & 1 deletion jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ def serve(
:param port_monitoring: The port on which the prometheus server is exposed, default is a random port between [49152, 65535]
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER', 'GCP'].
:param py_modules: The customized python modules need to be imported before loading the executor

Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down
37 changes: 37 additions & 0 deletions jina/serve/runtimes/servers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,40 @@ def app(self):
cors=self.cors,
logger=self.logger,
)


class GCPHTTPServer(FastAPIBaseServer):
"""
:class:`GCPHTTPServer` is a FastAPIBaseServer that uses a custom FastAPI app for GCP endpoints

"""

@property
def port(self):
"""Get the port for the GCP server
:return: Return the port for the GCP server, always 8080"""
return 8080

@property
def ports(self):
"""Get the port for the GCP server
:return: Return the port for the GCP server, always 8080"""
return [8080]

@property
def app(self):
"""Get the GCP fastapi app
:return: Return a FastAPI app for the GCP container
"""
return self._request_handler._http_fastapi_gcp_app(
title=self.title,
description=self.description,
no_crud_endpoints=self.no_crud_endpoints,
no_debug_endpoints=self.no_debug_endpoints,
expose_endpoints=self.expose_endpoints,
expose_graphql_endpoint=self.expose_graphql_endpoint,
tracing=self.tracing,
tracer_provider=self.tracer_provider,
cors=self.cors,
logger=self.logger,
)
192 changes: 192 additions & 0 deletions jina/serve/runtimes/worker/http_gcp_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Union, Any

from jina._docarray import docarray_v2
from jina.importer import ImportExtensions
from jina.types.request.data import DataRequest

if TYPE_CHECKING:
from jina.logging.logger import JinaLogger

if docarray_v2:
from docarray import BaseDoc, DocList


def get_fastapi_app(
request_models_map: Dict,
caller: Callable,
logger: 'JinaLogger',
cors: bool = False,
**kwargs,
):
"""
Get the app from FastAPI as the REST interface.

:param request_models_map: Map describing the endpoints and its Pydantic models
:param caller: Callable to be handled by the endpoints of the returned FastAPI app
:param logger: Logger object
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param kwargs: Extra kwargs to make it compatible with other methods
:return: fastapi app
"""
with ImportExtensions(required=True):
import pydantic
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from pydantic.config import BaseConfig, inherit_config

import os

from jina.proto import jina_pb2
from jina.serve.runtimes.gateway.models import _to_camel_case

if not docarray_v2:
logger.warning('Only docarray v2 is supported with Sagemaker. ')
return

class Header(BaseModel):
request_id: Optional[str] = Field(
description='Request ID', example=os.urandom(16).hex()
)

class Config(BaseConfig):
alias_generator = _to_camel_case
allow_population_by_field_name = True

class InnerConfig(BaseConfig):
alias_generator = _to_camel_case
allow_population_by_field_name = True

class VertexAIResponse(BaseModel):
predictions: Any = Field(
description='Prediction results',
)

app = FastAPI()

if cors:
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
logger.warning('CORS is enabled. This service is accessible from any website!')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why warning?


def add_post_route(
endpoint_path,
input_model,
output_model,
input_doc_list_model=None,
):
from docarray.base_doc.docarray_response import DocArrayResponse

app_kwargs = dict(
path=f'/{endpoint_path.strip("/")}',
methods=['POST'],
summary=f'Endpoint {endpoint_path}',
response_model=Union[output_model, List[output_model]],
response_class=DocArrayResponse,
)

def is_valid_csv(content: str) -> bool:
import csv
from io import StringIO

try:
f = StringIO(content)
reader = csv.DictReader(f)
for _ in reader:
pass

return True
except Exception:
return False

async def process(body) -> output_model:
req = DataRequest()
if body.header is not None:
req.header.request_id = body.header.request_id

if body.parameters is not None:
req.parameters = body.parameters
req.header.exec_endpoint = endpoint_path
req.document_array_cls = DocList[input_doc_model]

data = body.data
if isinstance(data, list):
req.data.docs = DocList[input_doc_list_model](data)
else:
req.data.docs = DocList[input_doc_list_model]([data])
if body.header is None:
req.header.request_id = req.docs[0].id

resp = await caller(req)
status = resp.header.status

if status.code == jina_pb2.StatusProto.ERROR:
raise HTTPException(status_code=499, detail=status.description)
else:
return {"predictions": resp.docs}
return output_model(predictions=resp.docs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the first return is not good


@app.api_route(**app_kwargs)
async def post(request: Request):
content_type = request.headers.get('content-type')
if content_type == 'application/json':
json_body = await request.json()
transformed_json_body = {"data": [{"text": instance} for instance in json_body["instances"]]}
return await process(input_model(**transformed_json_body))

elif content_type in ('text/csv', 'application/csv'):
# TODO: fix here
return await process(input_model(data=[]))
else:
raise HTTPException(
status_code=400,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use constants and not magic numbers.

detail=f'Invalid content-type: {content_type}. '
f'Please use either application/json or text/csv.',
)

for endpoint, input_output_map in request_models_map.items():
if endpoint != '_jina_dry_run_':
input_doc_model = input_output_map['input']['model']
parameters_model = input_output_map['parameters']['model'] or Optional[Dict]
default_parameters = (
... if input_output_map['parameters']['model'] else None
)

_config = inherit_config(InnerConfig, BaseDoc.__config__)
endpoint_input_model = pydantic.create_model(
f'{endpoint.strip("/")}_input_model',
data=(Union[List[input_doc_model], input_doc_model], ...),
parameters=(parameters_model, default_parameters),
header=(Optional[Header], None),
__config__=_config,
)

add_post_route(
endpoint,
input_model=endpoint_input_model,
output_model=VertexAIResponse,
input_doc_list_model=input_doc_model,
)

from jina.serve.runtimes.gateway.health_model import JinaHealthModel

# `/ping` route is required by AWS Sagemaker
@app.get(
path='/ping',
summary='Get the health of Jina Executor service',
response_model=JinaHealthModel,
)
async def _executor_health():
"""
Get the health of this Gateway service.
.. # noqa: DAR201

"""
return {}

return app
22 changes: 22 additions & 0 deletions jina/serve/runtimes/worker/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,28 @@ async def _shutdown():

return app

def _http_fastapi_gcp_app(self, **kwargs):
from jina.serve.runtimes.worker.http_gcp_app import get_fastapi_app

request_models_map = self._executor._get_endpoint_models_dict()

def call_handle(request):
is_generator = request_models_map[request.header.exec_endpoint][
'is_generator'
]

return self.process_single_data(request, None, is_generator=is_generator)

app = get_fastapi_app(
request_models_map=request_models_map, caller=call_handle, **kwargs
)

@app.on_event('shutdown')
async def _shutdown():
await self.close()

return app

async def _hot_reload(self):
import inspect

Expand Down
7 changes: 7 additions & 0 deletions tests/integration/docarray_v2/gcp/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM jinaai/jina:test-pip

COPY . /executor_root/

WORKDIR /executor_root/SampleExecutor

ENTRYPOINT ["jina", "executor", "--uses", "config.yml"]
2 changes: 2 additions & 0 deletions tests/integration/docarray_v2/gcp/SampleExecutor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SampleExecutor

8 changes: 8 additions & 0 deletions tests/integration/docarray_v2/gcp/SampleExecutor/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
jtype: SampleExecutor
py_modules:
- executor.py
metas:
name: SampleExecutor
description:
url:
keywords: []